Source code for viswaternet.network.processing

# -*- coding: utf-8 -*-

"""
The viswaternet.network.processing module contains the code that extracts
data from the EPANET simulation. It also handles the binning of parameters
for discrete plotting.
"""
import numpy as np


[docs]def get_parameter( self, parameter_type, parameter, value=None, element_list=None, include_tanks=False, include_reservoirs=False, include_pumps=True, include_valves=True): """ Retrieves parameters from the network and simulation results Arguments --------- parameter_type : string The network object the parameter is associated with. Takes either 'node' or 'link'. parameter : string The parameter to be plotted. The following is a list of parameters available to use: **Static Parameters** - base_demand - elevation - emitter_coefficient - initial_quality **Time-Dependent Parameters** - head - demand - leak_demand - leak_area - leak_discharg_coeff - quality value : integer, string For time-varying parameters only. Specifies which timestep or data summary will be plotted. .. rubric:: Possible Inputs ======================= ========================================= int Plots element data for specified timestep 'min' Plots minimum data point for each element 'max' Plots maximum data point for each element 'mean' Plots mean for each element 'stddev' Plots standard deviation for each element 'range' Plots range for each element ======================= ========================================= element_list : array-like List of network elements that data will be retrieved for. include_reservoirs : boolean Determines if data for draw_reservoirs are retrieved. include_pumps : boolean Determines if data for draw_pumps are retrieved. include_valves : boolean Determines if data for draw_valves are retrieved. """ model = self.model results = model["results"] if parameter_type == "node": # If no element list is provided, set element list to all nodes of # the model if element_list is None: element_list = list.copy(model["node_names"]) # Get indices of nodes in model["node_names"] indices = [model["node_names"].index(i) for i in element_list] # WNTR differentiates between node attributes and simulation results. # To account for this all simulation result logic is put into a try # block and if a KeyError occurs, it goes into the except block to # get the node attribute instead. try: # If no value type is given (timestep, max, etc) then return # parameter at all timesteps if value is None: parameter_results = results.node[parameter].iloc[:, indices] else: if value == "max": parameter_results = np.max( results.node[parameter].iloc[:, indices], axis=0) elif value == "min": parameter_results = np.min( results.node[parameter].iloc[:, indices], axis=0) elif value == "mean": parameter_results = np.mean( results.node[parameter].iloc[:, indices], axis=0) elif value == "stddev": parameter_results = np.std( results.node[parameter].iloc[:, indices], axis=0) elif value == "range": parameter_results = np.ptp( results.node[parameter].iloc[:, indices], axis=0) # If an int is given, assume it is a timestep and get parameter # at given timestep elif isinstance(value, int): parameter_results = ( results.node[parameter].iloc[value, indices]) # Node attribute fetching logic except KeyError: parameter_results = model["wn"].query_node_attribute(parameter) elements_in_results = list(parameter_results.index) element_list_temp = list.copy(element_list) # Some elements do not have certain attributes. For instance, # reservoirs do not have an elevation. This code removes those # elements from the results for element in element_list_temp: try: if elements_in_results.index(element): pass except ValueError: element_list.remove(element) indices = [elements_in_results.index(i) for i in element_list] parameter_results = parameter_results.iloc[indices] if include_tanks: pass else: for tank in model["tank_names"]: # Try block to catch KeyErrors in instance where tank does # not exist in parameter_results try: parameter_results.drop(tank, inplace=True) element_list.remove(tank) except KeyError: pass if include_reservoirs: pass else: for reservoir in model["reservoir_names"]: # Try block to catch KeyErrors in instance where reservoir # does not exist in parameter_results try: parameter_results.drop(reservoir, inplace=True) element_list.remove(reservoir) except KeyError: pass elif parameter_type == "link": # If no element list is provided, set element list to all links of # the model if element_list is None: element_list = list.copy(model["G_pipe_name_list"]) indices = [model["G_pipe_name_list"].index(i) for i in element_list] # WNTR differentiates between link attributes and simulation results. # To account for this all simulation result logic is put into a try # block and if a KeyError occurs, it goes into the except block to # get the link attribute instead. try: if value is None: parameter_results = results.link[parameter].iloc[:, indices] else: if value == "max": parameter_results = np.max( results.link[parameter].iloc[:, indices], axis=0) elif value == "min": parameter_results = np.min( results.link[parameter].iloc[:, indices], axis=0) elif value == "mean": parameter_results = np.mean( results.link[parameter].iloc[:, indices], axis=0) elif value == "stddev": parameter_results = np.std( results.link[parameter].iloc[:, indices], axis=0) elif value == "range": parameter_results = np.ptp( results.link[parameter].iloc[:, indices], axis=0) elif type(value) is int: parameter_results = ( results.link[parameter].iloc[value, indices]) # Link attribute fetching logic except KeyError: parameter_results = model["wn"].query_link_attribute(parameter) elements_in_results = list(parameter_results.index) element_list_temp = list.copy(element_list) # Some elements do not have certain attributes. For instance, # reservoirs do not have an elevation. This code removes those # elements from the results for element in element_list_temp: try: if elements_in_results.index(element): pass except ValueError: element_list.remove(element) indices = [elements_in_results.index(i) for i in element_list] parameter_results = parameter_results.iloc[indices] if include_pumps: pass else: for pump in model["pump_names"]: # Try block to catch KeyErrors in instance where pump does # not exist in parameter_results try: parameter_results.drop(pump, inplace=True) element_list.remove(pump) except KeyError: pass if include_valves: pass else: for valve in model["valve_names"]: # Try block to catch KeyErrors in instance where valve # does not exist in parameter_results try: parameter_results.drop(valve, inplace=True) element_list.remove(valve) except KeyError: pass return parameter_results, element_list
[docs]def get_demand_patterns(self): """ Retrieves the demand pattern of each node, and bins them accordingly. """ model = self.model demand_patterns = [] # Get list of patterns patterns = model["wn"].pattern_name_list patterns.append("None") patterns = sorted(patterns) # Loop through junctions only (no reservoirs, tanks) for junction in model["junc_names"]: # Attempt to extract pattern name for each junction. If an attribute # error occurs, then the junction has no demand pattern associated with # it. try: demand_pattern = ( model["wn"].get_node( junction).demand_timeseries_list[0].pattern.name) demand_patterns = np.append(demand_patterns, demand_pattern) except AttributeError: demand_patterns = np.append(demand_patterns, "None") # Initialize pattern dictionary demand_pattern_nodes = {} for pattern in patterns: demand_pattern_nodes[pattern] = {} # Create pattern dictionary in the same form as what get_parameter outputs for i, junc_name in enumerate(model["junc_names"]): for pattern in patterns: if demand_patterns[i] == pattern: demand_pattern_nodes[pattern][junc_name] = \ model["junc_names"].index(junc_name) # Remove None key if no junctions are in it if len(demand_pattern_nodes['None']) == 0: patterns.remove("None") del demand_pattern_nodes['None'] return demand_pattern_nodes, patterns
[docs]def bin_parameter( self, parameter_results, element_list, num_intervals, intervals="automatic", disable_interval_deleting=False, style=None): """ Takes parameter results from get_parameter and puts them into bins. Arguments --------- parameter_results : array-like The data associated with each node. element_list : array-like List of network elements that data will be retrieved for. num_intervals : integer The number of intervals. Results in intervals-1 bins. intervals : array-like, string If set to 'automatic' then intervals are created automatically on an equal interval basis. Otherwise, it is the edges of the intervals to be created. intervals array length should be num_intervals + 1. disable_interval_deleting : boolean If True, empty intervals will be automatically deleted. style : VisWaterNet Style Object The style object to be used. """ model = self.model if style is None: style = self.default_style args = style.args legend_decimal_places = args['legend_decimal_places'] # Code that generates bins automatically based on number of intervals # specified. The code does this by creating num_intervals + 1 linearlly # spaced bins. There are currently no options to automatically create # bins other than linearlly spaced ones, use custom bin intervals to do # this if intervals == "automatic": bins = num_intervals + 1 intervals = np.linspace( np.min(parameter_results), np.max(parameter_results), bins) intervals = intervals.tolist() interval_results = {} interval_names = [] elements_with_parameter = element_list element_type = None for element_with_parameter in elements_with_parameter: if (element_with_parameter in model["node_names"]) is True: continue else: element_list = model["G_pipe_name_list"] element_type = "link" break if element_type != "link": element_list = model["node_names"] for i in range(len(intervals)): if i == 0: if np.min(parameter_results) < intervals[i]: interval_names.append("< {0:1.{j}f}".format( intervals[i], j=legend_decimal_places)) interval_names.append("{0:1.{j}f} - {1:1.{j}f}".format( intervals[i], intervals[i + 1], j=legend_decimal_places)) elif i < len(intervals) - 1: interval_names.append("{0:1.{j}f} - {1:1.{j}f}".format( intervals[i], intervals[i + 1], j=legend_decimal_places)) elif i == len(intervals) - 1: if np.max(parameter_results) > intervals[i]: interval_names.append("> {0:1.{j}f}".format( intervals[i], j=legend_decimal_places)) for bin_name in interval_names: interval_results[bin_name] = {} for i in range(len(intervals)): if i == 0: for j, parameter in enumerate(parameter_results): if parameter >= intervals[i] and parameter < intervals[i + 1]: interval_results["{0:1.{j}f} - {1:1.{j}f}".format( intervals[i], intervals[i + 1], j=legend_decimal_places)][elements_with_parameter[j]] \ = element_list.index(elements_with_parameter[j]) if parameter < intervals[i]: interval_results["< {0:1.{j}f}".format( intervals[i], j=legend_decimal_places)][elements_with_parameter[j]] \ = element_list.index(elements_with_parameter[j]) elif i == len(intervals) - 2: for j, parameter in enumerate(parameter_results): if parameter >= intervals[i] and parameter <= intervals[i + 1]: interval_results["{0:1.{j}f} - {1:1.{j}f}".format( intervals[i], intervals[i + 1], j=legend_decimal_places)][elements_with_parameter[j]] \ = element_list.index(elements_with_parameter[j]) elif i < len(intervals) - 2: for j, parameter in enumerate(parameter_results): if parameter >= intervals[i] and parameter < intervals[i + 1]: interval_results["{0:1.{j}f} - {1:1.{j}f}".format( intervals[i], intervals[i + 1], j=legend_decimal_places)][elements_with_parameter[j]] \ = element_list.index(elements_with_parameter[j]) elif i == len(intervals) - 1: for j, parameter in enumerate(parameter_results): if parameter > intervals[i]: interval_results["> {0:1.{j}f}".format( intervals[i], j=legend_decimal_places)][elements_with_parameter[j]] \ = element_list.index(elements_with_parameter[j]) if disable_interval_deleting is True: pass else: for bin_name in interval_names: if not interval_results[bin_name]: del interval_names[interval_names.index(bin_name)] del interval_results[bin_name] return interval_results, interval_names