Source code for OpenLA.data_visualization

import matplotlib.dates as mdates
from matplotlib import pyplot as plt
from matplotlib.colors import cnames
from matplotlib import ticker as tick
from pandas.plotting import register_matplotlib_converters
import numpy as np

from .data_extraction import *
from .check import _is_str, _is_str_list
from .data_classes.event_stream import EventStream
from .data_classes.operation_count import OperationCount
from .data_classes.time_range_aggregation import TimeRangeAggregation
from .data_classes.pagewise_aggregation import PageWiseAggregation, PageTransition


def _set_ax(ax, figsize):
    """
    Create new axes if it is None.

    :param ax: The axes to plot the figure on. If None, new axes is created
    :type ax: matplotlib.axes.Axes

    :param figsize: Figure size
    :type figsize: tuple(float, float)

    :return: The axes to plot the figure on
    """
    if ax is None:
        _, ax = plt.subplots(1, 1, figsize=figsize)

    return ax


def _stacked_bar(x, top, bottom, width, ax, label, color=None):
    if color is not None:
        ax.bar(x, top, bottom=bottom, width=width, label=label, color=color)
    else:
        ax.bar(x, top, bottom=bottom, width=width, label=label)

    next_bottom = bottom + top
    return next_bottom


[docs]def visualize_grade_distribution(course_info, plot_proportion=False, xlabel=None, ylabel=None, ax=None, figsize=None, fontsize=None, save_file=None): """ Drow a bar graph of grade distribution in the course. :param course_info: Instance of CourseInformation class :type course_info: CourseInformation :param plot_proportion: If False, the number of users in each grade is used for Y-axis. If True, the proportion of each grade is used for Y-axis. :type plot_proportion: bool :param xlabel: The label name of x-axis :type xlabel: str :param ylabel: The label name of y-axis :type ylabel: str :param ax: The axes to plot the figure on. If None, new axes is created :type ax: matplotlib.axes.Axes or None :param figsize: Figure size :type figsize: tuple(float, float) or None :param save_file: The file path for saving the graph :type save_file: str or None :return: Bar graph of grade distribution. :rtype: matplotlib.axes.Axes """ grade_distribution = course_info.grade_distribution() ax = _set_ax(ax, figsize) if plot_proportion: plot_column = "proportion" else: plot_column = "count" ax.bar(grade_distribution.index, grade_distribution[plot_column]) if fontsize is not None: plt.rcParams["font.size"] = fontsize if xlabel is not None: ax.set_xlabel(xlabel) if ylabel is not None: ax.set_ylabel(ylabel) if save_file is not None: plt.savefig(save_file) return ax
[docs]def visualize_time_series_graph(event_stream, column, graph_type='line', time_format='%Y/%m/%d %H:%M:%S', xlabel=None, ylabel=None, start_time=None, end_time=None, ax=None, figsize=None, fontsize=None, save_file=None): """ Draw a time series graph of indicated “column”. If the “save_file” is indicated, the graph is saved. :param event_stream: EventStream instance :type event_stream: EventStream :param column: Column to make Y-axis of time series graph :type colmn: str :param graph_type: The graph type selected from 'line', 'step', 'plot', or 'bar' :type graph_type: str :param time_format: The time format in x axis. For example, default format '%Y/%m/%d %H:%M:%S' converts "December 10, 2019 at 10:30 p.m." to "2019/12/10 22:30:00". The meaning of directive such as '%Y' is in https://docs.python.org/3/library/time.html. :type time_format: str :param xlabel: The label name of x-axis :type xlabel: str :param ylabel: The label name of y-axis :type ylabel: str :param start_time: The start time of time series :type start_time: pandas.Timestamp or datetime.datetime or None :param end_time: The end time of time series :type end_time: pandas.Timestamp or datetime.datetime or NOne :param ax: The axes to plot the figure on. If None, new axes is created :type ax: matplotlib.axes.Axes or None :param figsize: Figure size :type figsize: tuple(float, float) or None :param save_file: The file path for saving the graph :type save_file: str or None :return: The time series graph of selected type :rtype: matplotlib.axes.Axes """ df = event_stream.df df["eventtime"] = pd.to_datetime(df["eventtime"]) # time range if (start_time is None) and (end_time is None): df = df elif start_time is None: df = df[df["eventtime"] < end_time] elif end_time is None: df = df[start_time <= df["eventtime"]] else: df = df[(start_time <= df["eventtime"]) & (df["eventtime"] < end_time)] ax = _set_ax(ax, figsize) register_matplotlib_converters() # graph type if graph_type == "line": ax.plot(df["eventtime"], df[column]) elif graph_type == "step": ax.step(df["eventtime"], df[column]) elif graph_type == "plot": ax.plot(df["eventtime"], df[column], marker="s", linestyle='None') elif graph_type == "bar": ax.bar(df["eventtime"], df[column], linestyle='None') if fontsize is not None: plt.rcParams["font.size"] = fontsize if xlabel is not None: ax.set_xlabel(xlabel) if ylabel is not None: ax.set_ylabel(ylabel) ax.xaxis.set_major_formatter(mdates.DateFormatter(time_format)) plt.xticks(rotation=90) plt.tight_layout() plt.grid(True) if save_file is not None: plt.savefig(save_file) return ax
[docs]def visualize_operation_count_bar(operation_count, user_id=None, contents_id=None, operation_name=None, calculate_type="total", xlabel=None, ylabel=None, ax=None, figsize=None, fontsize=None, save_file=None): """ Draw a bar graph which represents each operation used by a specific learner. :param operation_count: OperationCount instance :type operation_count: OperationCount :param user_id: The user id to make graph. If it is None, the graph is made from all users data. :type user_id: str, List[str], or None :param contents_id: The contents id to make graph. If it is None, the graph is made from all contents data. :type contents_id: str, List[str], or None :param operation_name: The operation name to count. If it is None, the graph is made for all operations. :type operation_name: str, List[str], or None :param calculate_type: 'total' or 'average'. How a multiple values integrate. :type calculate_type: str :param xlabel: The label name of x-axis :type xlabel: str :param ylabel: The label name of y-axis :type ylabel: str :param ax: The axes to plot the figure on. If None, new axes is created :type ax: matplotlib.axes.Axes or None :param figsize: Figure size :type figsize: tuple(float, float) or None :param save_file: The file path for saving the graph :type save_file: str or None :return: The bar graph :rtype: matplotlib.axes.Axes """ if user_id is not None: operation_count = select_user(operation_count, user_id) if contents_id is not None: operation_count = select_contents(operation_count, contents_id) count_df = operation_count.df count_df = count_df.drop(['userid', 'contentsid'], axis=1) if operation_name is None: operation_name = operation_count.operation_name() elif _is_str(operation_name): operation_name = [operation_name] else: operation_name = operation_name if calculate_type == "total": count_df = count_df.sum(axis=0) elif calculate_type == "average": count_df = count_df.mean(axis=0) ax = _set_ax(ax, figsize) register_matplotlib_converters() ax.bar(operation_name, count_df.loc[operation_name]) plt.xticks(rotation=90) plt.tight_layout() if fontsize is not None: plt.rcParams["font.size"] = fontsize if xlabel is not None: ax.set_xlabel(xlabel) if ylabel is not None: ax.set_ylabel(ylabel) if save_file is not None: plt.savefig(save_file) return ax
[docs]def visualize_behavior_in_pages(pagewise_aggregation, contents_id, user_id=None, is_plot_operation=True, is_plot_reading_time=True, operation_name=None, reading_time_basis="minutes", calculate_type="total", operation_bar_colors=None, reading_time_color="brown", figsize=None, fontsize=None, save_file=None): """ Draw a bar graph which represents page-wise counting result of each operation and reading time. :param pagewise_aggregation: The instance of PageWiseAggregation :type pagewise_aggregation: PageWiseAggregation :param contents_id: The contents id to make graph. If it is None, the graph is made from all contents data. :type contents_id: str :param user_id: The user id to make graph. If it is None, the graph is made from all users data. :type user_id: str, List[str], or None :param is_plot_operation: Whether make a bar plot of operation count :type is_plot_operation: bool :param is_plot_reading_time: Whether make a bar plot of reading time :type is_plot_reading_time: bool :param operation_name: The operation name to make a bar graph. Default 'None' makes a bar graph for all operations. :type operation_name: str, List[str], or None :param reading_time_basis: 'seconds', 'minutes', or 'hours'. :type reading_time_basis: str :param calculate_type: 'total' or 'average'. How a multiple values integrate. :type calculate_type: str :param operation_bar_colors: The colors of operation bar plots. Required same number of elements with the number of operations. If default 'None', the colors are automatically decided. :type operation_bar_colors: List[str] or None :param reading_time_color: The color of reading-time bar plots. The default value is 'brouwn'. :type reading_time_color: str :param figsize: Figure size :type figsize: tuple(float, float) or None :param save_file: The file path for saving the graph :type save_file: str or None :return: The bar graph represents operation count and reading time in each page. :rtype: matplotlib.figure.Figure """ df = pagewise_aggregation.df if df.empty: return if _is_str(contents_id): pagewise_aggregation = select_contents(pagewise_aggregation, contents_id) elif contents_id is not None: raise ValueError("Please specify a contents id to the argument \"contents_id\"") if user_id is not None: pagewise_aggregation = select_user(pagewise_aggregation, user_id) num_pages = pagewise_aggregation.num_unique_pages() x = np.arange(num_pages) + 1 width = 0.4 if is_plot_operation: fig, ax_operation = plt.subplots(figsize=figsize) if operation_name is None: operation_name = pagewise_aggregation.operation_name() elif _is_str(operation_name): operation_name = [operation_name] else: operation_name = operation_name if calculate_type == "total": operation_count = df.groupby("pageno").sum() elif calculate_type == "average": operation_count = df.groupby("pageno").mean() next_bottom = 0 if operation_bar_colors is not None: for operation, color in zip(operation_name, operation_bar_colors): y = operation_count[operation].values next_bottom = _stacked_bar(x, y, next_bottom, width, ax_operation, operation, color) else: for operation in operation_name: y = operation_count[operation].values next_bottom = _stacked_bar(x, y, next_bottom, width, ax_operation, operation) # ax_operation.set_ylabel("Operation count", fontsize=fontsize) # ax_operation.set_xlabel("Page", fontsize=fontsize) ax_operation.set_ylabel("Operation count") ax_operation.set_xlabel("Page") ax_operation.legend(loc='upper left', bbox_to_anchor=(1.3, 0.95)) if is_plot_reading_time: if is_plot_operation: ax_time = ax_operation.twinx() else: fig, ax_time = plt.subplots(figsize=figsize) if calculate_type == "total": reading_time = df.groupby("pageno")["reading_seconds"].sum().values elif calculate_type == "average": reading_time = df.groupby("pageno")["reading_seconds"].mean().values if reading_time_basis == "seconds": pass elif reading_time_basis == "minutes": reading_time = reading_time / 60 elif reading_time_basis == "hours": reading_time = reading_time / 3600 else: raise ValueError("Invalid reading time basis") if is_plot_operation: ax_time.bar(x+width, reading_time, width=width, label="reading minutes", color=cnames[reading_time_color]) plt.xticks(x + width/2, x) else: ax_time.bar(x, reading_time, width=width, label="reading minutes", color=cnames[reading_time_color]) # ax_time.set_ylabel("Reading {} in the page".format(reading_time_basis), fontsize=fontsize) # ax_time.set_xlabel("Page", fontsize=fontsize) ax_time.set_ylabel("Reading {} in the page".format(reading_time_basis)) ax_time.set_xlabel("Page") ax_time.legend(loc='upper left', bbox_to_anchor=(1.3, 1.0)) plt.xticks(ticks=range(1, num_pages, 5), labels=range(1, num_pages, 5)) plt.tight_layout() if fontsize is not None: plt.rcParams["font.size"] = fontsize if save_file is not None: plt.savefig(save_file) if is_plot_operation and is_plot_reading_time: return ax_operation, ax_time elif is_plot_operation: return ax_operation elif is_plot_reading_time: return ax_time
[docs]def visualize_pages_in_time_range(time_range_aggregation, contents_id, user_id=None, xlabel=None, ylabel=None, ax=None, figsize=None, fontsize=None, save_file=None, show_legend=False): """ Draw a line graph which represents which page is read in time ranges. :param time_range_aggregation: TImeRangeAggregation instance :type time_range_aggregation: TimeRangeAggregation :param contents_id: The contents id to make graph :type contents_id: str :param user_id: The user id to make graph :type user_id: str, List[str], or None :param xlabel: The label name of x-axis :type xlabel: str :param ylabel: The label name of y-axis :type ylabel: str :param ax: The axes to plot the figure on. If None, new axes is created :type ax: matplotlib.axes.Axes or None :param figsize: Figure size :type figsize: tuple(float, float) or None :param save_file: The file path for saving the graph :type save_file: str or None :param show_legend: Whether to show legend of the graph. If the number of users in this graph is large, this argument is recommended to be set False. :type show_legend: bool :return: The line graph which shows the page tracking :rtype: matplotlib.axes.Axes """ if not _is_str(contents_id): raise ValueError("Please specify a contents id to the argument \"contents_id\"") ax = _set_ax(ax, figsize) if user_id is None: user_id = time_range_aggregation.user_id() elif _is_str(user_id): user_id = [user_id] time_range_aggregation = select_contents(time_range_aggregation, contents_id) time_range_df = time_range_aggregation.df for column in ['elapsed_seconds', 'elapsed_minutes', 'elapsed_hours']: if column in time_range_df.columns: time_range_basis = column break max_time = 0 max_page = 0 for user in user_id: user_df = time_range_df[time_range_df['userid'] == user] if user_df.empty: continue ax.step([0]+user_df[time_range_basis], [0]+user_df['pageno'], label=user) max_time = max(max_time, max(user_df[time_range_basis])) max_page = max(max_page, max(user_df['pageno'])) if fontsize is not None: plt.rcParams["font.size"] = fontsize if xlabel is None: xlabel = time_range_basis.replace("_", " ") if ylabel is None: ylabel = "page" ax.set_xlabel(xlabel) ax.set_ylabel(ylabel) if show_legend: ax.legend(loc='upper left', bbox_to_anchor=(1.05, 1.0)) plt.grid(axis='both', which='both') if save_file is not None: plt.savefig(save_file) return ax
[docs]def visualize_operation_in_time_range(time_range_aggregation, contents_id, user_id=None, operation_name=None, calculate_type="total", operation_bar_colors=None, xlabel=None, ylabel=None, ax=None, figsize=None, fontsize=None, save_file=None): """ Draw a bar graph which represents how many operations are used in time ranges. :param time_range_aggregation: TImeRangeAggregation instance :type time_range_aggregation: TimeRangeAggregation :param contents_id: The contents id to make graph :type contents_id: str :param user_id: The user id to make graph :type user_id: str, List[str], or None :param operation_name: The operation name to make a bar graph. Default 'None' makes a bar graph for all operations. :type operation_name: str, List[str], or None :param calculate_type: 'total' or 'average'. How a multiple values integrate. :type calculate_type: str :param operation_bar_colors: The colors of operation bar plots. Required same number of elements with the number of operations. If default 'None', the colors are automatically decided. :type operation_bar_colors: List[str] or None :param xlabel: The label name of x-axis :type xlabel: str :param ylabel: The label name of y-axis :type ylabel: str :param ax: The axes to plot the figure on. If None, new axes is created :type ax: matplotlib.axes.Axes or None :param figsize: Figure size :type figsize: tuple(float, float) or None :param save_file: The file path for saving the graph :type save_file: str or None :return: The line graph which shows the page tracking :rtype: matplotlib.axes.Axes """ ax = _set_ax(ax, figsize) if user_id is None: user_id = time_range_aggregation.user_id() time_range_aggregation = select_user(time_range_aggregation, user_id) time_range_aggregation = select_contents(time_range_aggregation, contents_id) time_range_df = time_range_aggregation.df for column in ['elapsed_seconds', 'elapsed_minutes', 'elapsed_hours']: if column in time_range_df.columns: time_range_basis = column break if operation_name is None: operation_name = time_range_aggregation.operation_name() elif _is_str(operation_name): operation_name = [operation_name] else: operation_name = operation_name if calculate_type == "total": operation_count = time_range_df.groupby(time_range_basis).sum() elif calculate_type == "average": operation_count = time_range_df.groupby(time_range_basis).mean() width = 0.8 next_bottom = 0 x = time_range_df[time_range_basis].unique() if operation_bar_colors is not None: for operation, color in zip(operation_name, operation_bar_colors): y = operation_count[operation].values next_bottom = _stacked_bar(x, y, next_bottom, width, ax, operation, color) else: for operation in operation_name: y = operation_count[operation].values next_bottom = _stacked_bar(x, y, next_bottom, width, ax, operation) xlabel = xlabel if xlabel is not None else time_range_basis.replace("_", " ") ylabel = ylabel if ylabel is not None else "operation count" ax.set_xlabel(xlabel) ax.set_ylabel(ylabel) ax.legend(loc='upper left', bbox_to_anchor=(1.05, 1.0)) plt.tight_layout() if fontsize is not None: plt.rcParams["font.size"] = fontsize if save_file is not None: plt.savefig(save_file) return ax