# -*- coding: utf-8 -*-
import numpy as np
import pandas as pd
import matplotlib
import matplotlib.pyplot as plt
from packaging.version import parse
if parse(str(matplotlib.__version__)) < parse('3.6.0'):
import matplotlib.tight_bbox as tight_bbox
else:
import matplotlib._tight_bbox as tight_bbox
from matplotlib.transforms import Bbox, TransformedBbox, Affine2D
import imageio
import io
import math
from viswaternet.network import processing
from viswaternet.utils import unit_conversion, convert_excel
[docs]def animate_plot(
self,
function,
ax=None,
fps=3,
first_timestep=0,
last_timestep=None,
save_name="animation",
save_format="mp4",
time_unit='s',
**kwargs):
"""
Builds .gif file animating network data across timesteps.
Arguments
---------
ax : axes._subplots.AxesSubplot
Matplotlib axes object.
function : viswaternet
One of the general viswaternet drawing functions.
.. rubric:: Usable Functions
================================= ==
plot_basic_elements
plot_discrete_nodes
plot_discrete_links
plot_continuous_nodes
plot_continuous_links
plot_unique_data
================================= ==
fps : integer
Framerate that the .gif file will be generated with.
first_timestep : integer
The starting timestep of the .gif file.
last_timestep : integer
The last timestep of the .gif file.
unit : string
The time unit that will be reported for each frame of the .gif file.
.. rubric:: Time Units
==================== ====================================
Default :math:`s`
min :math:`min`
hr :math:`hr`
day :math:`day`
==================== ====================================
kwargs : Any
Any arguments for the plotting function passed into the function argument
can be passed into animate_plot.
"""
model = self.model
if ax is None:
if ax is None:
fig, ax = plt.subplots(figsize=self.figsize)
ax.set_frame_on(self.axis_frame)
frames = []
if function == self.plot_unique_data:
parameter_type = kwargs.get("parameter_type", None)
data_type = kwargs.get("data_type", None)
try:
custom_data_values = kwargs.get("custom_data_values")
data_values = pd.Series(custom_data_values[1],
custom_data_values[0])
except TypeError:
excel_columns = kwargs.get("excel_columns", None)
element_list, results = convert_excel(
self, kwargs.get("data_file", None),
parameter_type,
data_type,
excel_columns[0],
excel_columns[1])
# data_values.append(data['results'])
data_values = results
timesteps = data_values.shape[1]
values = range(timesteps)
if last_timestep is not None:
values = values[first_timestep:last_timestep]
if data_type == "continuous":
if kwargs.get("vmin", None) is None \
or kwargs.get("vmax", None) is None:
kwargs["vmin"], kwargs["vmax"] = make_vmin_vmax(data_values,
kwargs)
if data_type == 'discrete':
kwargs["disable_interval_deleting"] = True
if kwargs.get("intervals", None) is None:
kwargs["intervals"] = make_intervals(data_values, kwargs)
data_values = [data_values[i].tolist()
for i in data_values.columns]
else:
timesteps = int(
model["wn"].options.time.duration
/ model["wn"].options.time.report_timestep)
values = range(timesteps)
if last_timestep is not None:
values = values[first_timestep:last_timestep]
if function == self.plot_continuous_nodes \
or function == self.plot_discrete_nodes:
parameter_type = 'node'
elif function == self.plot_continuous_links \
or function == self.plot_discrete_links:
parameter_type = 'link'
if function == self.plot_continuous_nodes \
or function == self.plot_continuous_links:
data_type = 'continuous'
if kwargs.get("vmin", None) is None \
or kwargs.get("vmax", None) is None:
parameter_results, element_list = processing.get_parameter(
self,
parameter_type,
kwargs.get("parameter"), kwargs.get("value", None))
kwargs["vmin"], kwargs["vmax"] = make_vmin_vmax(parameter_results,
kwargs)
parameter_results = parameter_results.transpose()
if function == self.plot_discrete_nodes \
or function == self.plot_discrete_links:
kwargs["disable_interval_deleting"] = True
data_type = 'discrete'
if kwargs.get("intervals", None) is None:
parameter_results, element_list = processing.get_parameter(
self, parameter_type, kwargs.get(
"parameter"), kwargs.get("value", None))
kwargs["intervals"] = make_intervals(parameter_results, kwargs)
parameter_results = parameter_results.transpose()
if plt.isinteractive():
plt_interactive = plt.isinteractive()
plt.ioff()
else:
plt_interactive = False
for value in values:
fig = ax.get_figure()
if function == self.plot_unique_data:
try:
kwargs["custom_data_values"] = [custom_data_values[0],
custom_data_values[1][value]]
except Exception:
kwargs["custom_data_values"] = [element_list,
data_values[value]]
kwargs["parameter"] = 'custom_data'
function(ax=ax, savefig=False, **kwargs)
else:
kwargs["value"] = [parameter_results.iloc[:, value],
element_list]
function(ax=ax, savefig=False, **kwargs)
handles, labels = [], []
time = value*model["wn"].options.time.report_timestep
time = unit_conversion(time, "time", time_unit)
ax.legend(
handles,
labels,
title="Timestep "+str(time)+" "+time_unit,
loc="lower left",
frameon=False)
restore_bbox = bbox_inches_tight_resize(fig)
fig.canvas.draw()
with io.BytesIO() as buff:
fig.savefig(buff, format='raw')
buff.seek(0)
data = np.frombuffer(buff.getvalue(), dtype=np.uint8)
w, h = fig.canvas.get_width_height()
mat = data.reshape((int(h), int(w), -1))
frames.append(mat)
restore_bbox()
if function == self.plot_continuous_nodes \
or function == self.plot_continuous_links \
or data_type == 'continuous':
fig.axes[1].remove()
ax.clear()
if plt_interactive:
plt.ion()
if save_format == "gif" or save_format == "GIF":
imageio.mimsave(save_name+"."+save_format,
frames,
format='GIF',
fps=fps)
else:
imageio.mimsave(save_name+"."+save_format,
frames,
format='FFMPEG',
fps=fps,
quality=8,
ffmpeg_log_level='quiet')
[docs]def make_vmin_vmax(parameter, kwargs):
for value in np.min(parameter, axis=0):
if value < -1e-5:
if kwargs.get("vmin", None) is None:
vmin = - \
np.max(np.max(parameter, axis=0))
if kwargs.get("vmax", None) is None:
vmax = np.max(
np.max(parameter, axis=0))
break
else:
if kwargs.get("vmin", None) is None:
vmin = np.min(
np.min(parameter, axis=0))
if kwargs.get("vmax", None) is None:
vmax = np.max(
np.max(parameter, axis=0))
return vmin, vmax
[docs]def make_intervals(parameter, kwargs):
intervals = np.linspace(
np.min(np.min(parameter, axis=0), axis=0),
np.max(np.max(parameter, axis=0), axis=0),
kwargs.get("num_intervals", 5)).tolist()
return intervals
[docs]def bbox_inches_tight_resize(fig):
# Taken from matplotlib source code. This is how the bbox for
# saving figures with bbox_inches='tight' is done. Maybe there was a
# better way to do this? I don't know but tight_layout() wasn't doing
# the trick.
bbox_inches = fig.get_tightbbox(fig.canvas.get_renderer())
bbox_artists = fig.get_default_bbox_extra_artists()
bbox_filtered = []
for a in bbox_artists:
bbox = a.get_window_extent(fig.canvas.get_renderer())
if a.get_clip_on():
clip_box = a.get_clip_box()
if clip_box is not None:
bbox = Bbox.intersection(bbox, clip_box)
clip_path = a.get_clip_path()
if clip_path is not None and bbox is not None:
clip_path = clip_path.get_fully_transformed_path()
bbox = Bbox.intersection(bbox,
clip_path.get_extents())
if bbox is not None and (bbox.width != 0 or
bbox.height != 0):
if not np.isinf(bbox.width):
bbox_filtered.append(bbox)
if bbox_filtered:
_bbox = Bbox.union(bbox_filtered)
trans = Affine2D().scale(1.0 / fig.dpi)
bbox_extra = TransformedBbox(_bbox, trans)
bbox_inches = Bbox.union([bbox_inches, bbox_extra])
pad = 0.1
bbox_inches = bbox_inches.padded(pad)
restore_bbox = tight_bbox.adjust_bbox(fig, bbox_inches,
fig.canvas.fixed_dpi)
return restore_bbox