Source code for viswaternet.drawing.animate

# -*- coding: utf-8 -*-

"""
The viswaternet.drawing.animate module contains the code that handles creation
of animations. One can create animations from simulation data, or pass in
outside data to be animated on the network.
"""
import numpy as np
import pandas as pd
import matplotlib
import matplotlib.pyplot as plt
from packaging.version import parse
if parse(str(matplotlib.__version__)) < parse('3.6.0'):
    import matplotlib.tight_bbox as tight_bbox
else:
    import matplotlib._tight_bbox as tight_bbox
from matplotlib.transforms import Bbox, TransformedBbox, Affine2D
import imageio
import io
from viswaternet.network import processing
from viswaternet.utils import unit_conversion, convert_excel


[docs]def animate_plot( self, function, ax=None, fps=3, first_timestep=0, last_timestep=None, save_name="animation", save_format="mp4", time_unit='s', **kwargs): """ Builds .gif file animating network data across timesteps. Arguments --------- ax : axes._subplots.AxesSubplot Matplotlib axes object. function : viswaternet One of the general viswaternet drawing functions. .. rubric:: Usable Functions ================================= == plot_basic_elements plot_discrete_nodes plot_discrete_links plot_continuous_nodes plot_continuous_links plot_unique_data ================================= == fps : integer Framerate that the .gif file will be generated with. first_timestep : integer The starting timestep of the .gif file. last_timestep : integer The last timestep of the .gif file. save_name : string The file name that will be used when saving the animation. save_format : string The file format that the animation will be saved to. A comprehensive list of formats can be found on the imageio docs page: https://imageio.readthedocs.io/en/stable/formats/index.html time_unit : string The time unit that will be reported for each frame of the .gif file. .. rubric:: Time Units ==================== ==================================== Default :math:`s` min :math:`min` hr :math:`hr` day :math:`day` ==================== ==================================== kwargs : Any Any arguments for the plotting function passed into the function argument can be passed into animate_plot. """ model = self.model if ax is None: if ax is None: fig, ax = plt.subplots(figsize=self.figsize) ax.set_frame_on(self.axis_frame) frames = [] if function == self.plot_unique_data: parameter_type = kwargs.get("parameter_type", None) data_type = kwargs.get("data_type", None) try: custom_data_values = kwargs.get("custom_data_values") data_values = pd.Series(custom_data_values[1], custom_data_values[0]) except TypeError: excel_columns = kwargs.get("excel_columns", None) element_list, results = convert_excel( self, kwargs.get("data_file", None), parameter_type, data_type, excel_columns[0], excel_columns[1]) # data_values.append(data['results']) data_values = results timesteps = data_values.shape[1] values = range(timesteps) if last_timestep is not None: values = values[first_timestep:last_timestep] if data_type == "continuous": if kwargs.get("vmin", None) is None \ or kwargs.get("vmax", None) is None: kwargs["vmin"], kwargs["vmax"] = make_vmin_vmax(data_values, kwargs) if data_type == 'discrete': kwargs["disable_interval_deleting"] = True if kwargs.get("intervals", None) is None: kwargs["intervals"] = make_intervals(data_values, kwargs) data_values = [data_values[i].tolist() for i in data_values.columns] else: timesteps = int( model["wn"].options.time.duration / model["wn"].options.time.report_timestep) values = range(timesteps) if last_timestep is not None: values = values[first_timestep:last_timestep] if function == self.plot_continuous_nodes \ or function == self.plot_discrete_nodes: parameter_type = 'node' elif function == self.plot_continuous_links \ or function == self.plot_discrete_links: parameter_type = 'link' if function == self.plot_continuous_nodes \ or function == self.plot_continuous_links: data_type = 'continuous' if kwargs.get("vmin", None) is None \ or kwargs.get("vmax", None) is None: parameter_results, element_list = processing.get_parameter( self, parameter_type, kwargs.get("parameter"), kwargs.get("value", None)) kwargs["vmin"], kwargs["vmax"] = make_vmin_vmax(parameter_results, kwargs) parameter_results = parameter_results.transpose() if function == self.plot_discrete_nodes \ or function == self.plot_discrete_links: kwargs["disable_interval_deleting"] = True data_type = 'discrete' if kwargs.get("intervals", None) is None: parameter_results, element_list = processing.get_parameter( self, parameter_type, kwargs.get( "parameter"), kwargs.get("value", None)) kwargs["intervals"] = make_intervals(parameter_results, kwargs) parameter_results = parameter_results.transpose() if plt.isinteractive(): plt_interactive = plt.isinteractive() plt.ioff() else: plt_interactive = False for value in values: fig = ax.get_figure() if function == self.plot_unique_data: try: kwargs["custom_data_values"] = [custom_data_values[0], custom_data_values[1][value]] except Exception: kwargs["custom_data_values"] = [element_list, data_values[value]] kwargs["parameter"] = 'custom_data' function(ax=ax, savefig=False, **kwargs) else: kwargs["value"] = [parameter_results.iloc[:, value], element_list] function(ax=ax, savefig=False, **kwargs) handles, labels = [], [] time = value*model["wn"].options.time.report_timestep time = unit_conversion(time, "time", time_unit) ax.legend( handles, labels, title="Timestep "+str(time)+" "+time_unit, loc="lower left", frameon=False) restore_bbox = bbox_inches_tight_resize(fig) fig.canvas.draw() with io.BytesIO() as buff: fig.savefig(buff, format='raw') buff.seek(0) data = np.frombuffer(buff.getvalue(), dtype=np.uint8) w, h = fig.canvas.get_width_height() mat = data.reshape((int(h), int(w), -1)) frames.append(mat) restore_bbox() if function == self.plot_continuous_nodes \ or function == self.plot_continuous_links \ or data_type == 'continuous': fig.axes[1].remove() ax.clear() if plt_interactive: plt.ion() if save_format == "gif" or save_format == "GIF": imageio.mimsave(save_name+"."+save_format, frames, format='GIF', fps=fps) else: imageio.mimsave(save_name+"."+save_format, frames, format='FFMPEG', fps=fps, quality=8, ffmpeg_log_level='quiet')
[docs]def make_vmin_vmax(parameter, kwargs): for value in np.min(parameter, axis=0): if value < -1e-5: if kwargs.get("vmin", None) is None: vmin = - \ np.max(np.max(parameter, axis=0)) if kwargs.get("vmax", None) is None: vmax = np.max( np.max(parameter, axis=0)) break else: if kwargs.get("vmin", None) is None: vmin = np.min( np.min(parameter, axis=0)) if kwargs.get("vmax", None) is None: vmax = np.max( np.max(parameter, axis=0)) return vmin, vmax
[docs]def make_intervals(parameter, kwargs): intervals = np.linspace( np.min(np.min(parameter, axis=0), axis=0), np.max(np.max(parameter, axis=0), axis=0), kwargs.get("num_intervals", 5)).tolist() return intervals
[docs]def bbox_inches_tight_resize(fig): # Taken from matplotlib source code. This is how the bbox for # saving figures with bbox_inches='tight' is done. Maybe there was a # better way to do this? I don't know but tight_layout() wasn't doing # the trick. bbox_inches = fig.get_tightbbox(fig.canvas.get_renderer()) bbox_artists = fig.get_default_bbox_extra_artists() bbox_filtered = [] for a in bbox_artists: bbox = a.get_window_extent(fig.canvas.get_renderer()) if a.get_clip_on(): clip_box = a.get_clip_box() if clip_box is not None: bbox = Bbox.intersection(bbox, clip_box) clip_path = a.get_clip_path() if clip_path is not None and bbox is not None: clip_path = clip_path.get_fully_transformed_path() bbox = Bbox.intersection(bbox, clip_path.get_extents()) if bbox is not None and (bbox.width != 0 or bbox.height != 0): if not np.isinf(bbox.width): bbox_filtered.append(bbox) if bbox_filtered: _bbox = Bbox.union(bbox_filtered) trans = Affine2D().scale(1.0 / fig.dpi) bbox_extra = TransformedBbox(_bbox, trans) bbox_inches = Bbox.union([bbox_inches, bbox_extra]) pad = 0.1 bbox_inches = bbox_inches.padded(pad) restore_bbox = tight_bbox.adjust_bbox(fig, bbox_inches, fig.canvas.fixed_dpi) return restore_bbox