Source code for comando.core

"""Package for generic modeling of energy system design and operation."""
# This file is part of the COMANDO project which is released under the MIT
# license. See file LICENSE for full license details.
#
# AUTHORS: Marco Langiu
import comando

from collections.abc import Iterable, Mapping

from pandas import DataFrame, Index, MultiIndex, Series, options

from comando.utility import _assert_algebraic, split, indexed, get_type_name

options.display.float_format = '{:,.4g}'.format

REAL, INTEGER, BINARY = comando.Domain


[docs] class Connector: """An interface used to connect one or more `Component` objects.""" def __init__(self, component, name, expr): self.component = component self.name = name self.expr = expr def __repr__(self): return f"Connector({self.component!r}, {self.name!r}, {self.expr!r})"
[docs] class ImpossibleConstraintException(Exception): pass
[docs] def is_trivial(constraint, con_repr=None, warn=True): """Handle constraints that are trivially true or false.""" # NOTE: We need to manually check for two alternative representations of # Inequalities, since they appear to be implemented in a # nonsymmetrical way. This is a bug in Sympy, also see: # https://github.com/sympy/sympy/issues/20372 if get_type_name(constraint) in {'LessThan', 'GreaterThan'}: alt_repr = comando.Le(*reversed(constraint.args)) alternatives = [constraint, alt_repr] else: alternatives = [constraint] if con_repr is None: con_repr = 'Constraint' for constraint in alternatives: if constraint is comando.true: if warn: print(f"\nWARNING: {con_repr} is always satisfied, " "constraint will be skipped!\n") return True elif constraint is comando.false: raise ImpossibleConstraintException(f"{con_repr} can never be " "satisfied!") elif comando.get_backend() is not comando.sympy: return # Cannot simplify try: if bool(constraint): if warn: print(f"\nWARNING: {con_repr} is always satisfied, " "constraint will be skipped!\n") return True else: raise ImpossibleConstraintException(f"{con_repr} can never be " "satisfied!") except TypeError: pass return False
# TODO: Generalize the concept of objective to design cost and operational cost # with appropriate annualization in the EnergySystem class. For example, # when minimizing CO2 production, a Component's design cost might be the # Carbon footprint for its production and its operational cost might be # the CO2 ouput caused by operation.
[docs] class Component: """A component representing a part of an energy system. The Component class is a model of a generic real-world component of an energy system, represented by a collection of algebraic and logical expressions which describe how the component can be controlled, its limitations and its interactions with other components. For every component, two kinds of decisions may be taken: - design decisions a.k.a. 'here-and-now' decisions - operational decisions a.k.a. 'wait-and-see' decisions The former constitute decisions that are taken once, prior to all operation while the operational decisions need to be taken for every time-point under consideration. The component may define named expressions that can be accessed at any time for informational purposes. These expressions may also be used to impose additional constraints at a later stage, or to aggregate information from multiple different components. In a similar way an objective for a system optimization can be generated by aggregating different types of costs defined by some subset of the system's components. Parameters that are contained in the various expressions may or may not be given a default value that can be changed at a later stage. Attributes ---------- label : `str` A unique sting that serves as an identifier parameters : `set` of `Symbol` Set of unspecified system parameters the user can assign values to. design_variables : `set` of `Variable` Set of variables of the design stage, i.e. time-independent variables such as the number or size of newly added components. operational_variables : `set` of `Variable` Set of unique names for variables of the operational stage, e.g. the output of a given component or an operational state. states : dict Dictionary mapping a subset of operational_variables to automatically created Variables, representing the corresponding time derivatives. Examples are the SOC of a battery, the fill-level of a tank, etc. constraints : `dict` of `sympy.core.relational.Relational` Dictionary of relational sympy expressions that represent constraints. expressions : `dict` mapping of shared identifiers to algebraic sympy expressions. The expressions different `Component` instances associate with a shared identifier may be aggregated to additional constraints or objectives. connectors : `dict` mapping of strings to algebraic expressions representing in or outputs to the component. When multiple connectors are connected to each other, the connection represents the requirement that the sum of their expressions need to be equal to zero. """ existing_components = dict() def __init__(self, label): if label in self.existing_components: raise RuntimeError(f"A component with the label {label} has " "already been defined!") self.existing_components[label] = self self._label = label # self._parameters = set() self._parameters_dict = dict() # self._design_variables = set() self._design_variables_dict = dict() # self._operational_variables = set() self._operational_variables_dict = dict() self._constraints_dict = dict() self._expressions_dict = dict() self._states_dict = dict() self.connectors = dict() def _prefix(self, name): """Prefix the given name to obtain a unique identifier.""" return f'{self.label}_{name}' def __repr__(self): return f"Component({self.label!r})" def __iter__(self): """Prevent iteration that is implicitly introduced with __getitem__.""" raise TypeError('Component object is not iterable') def __getitem__(self, identifier): """Get a Connector, Parameter or Variable matching the identifier.""" # NOTE: We may add this for users confused by system scope access... # if identifier.startswith(self.label + '_'): # strip prefix # identifier = identifier[len(self.label) + 1:] for d in [self._parameters_dict, self._design_variables_dict, self._operational_variables_dict, self._constraints_dict, self._expressions_dict]: if identifier in d: return d[identifier] # TODO: also search components? raise KeyError(f"No entry for '{identifier}' found in {self}!") @property def label(self): """Get the Component's unique label.""" return self._label # Define descriptors for all symbols, constraints and expressions for ty in ['parameters', 'design_variables', 'operational_variables', 'constraints', 'expressions']: exec(f""" @property def {ty}(self): \"\"\"Get a set of the Component's {ty}.\"\"\" return set(self._{ty}_dict.values()) """) exec(f""" @property def {ty}_dict(self): \"\"\"Get a dictionary of the Component's {ty}.\"\"\" return {{self._prefix(n): p for n, p in self._{ty}_dict.items()}} """) del ty @property def states(self): return set(self._states_dict) @property def states_dict(self): return self._states_dict.copy()
[docs] def make_parameter(self, name, value=None): """Create a parameter with a localized name & store it.""" par = comando.Parameter(self._prefix(name), value=value) # self._parameters.add(par) self._parameters_dict[name] = par return par
[docs] def make_design_variable(self, name, domain=REAL, bounds=(None, None), init_val=None): """Create a design variable with a localized name & store it.""" var = comando.Variable(self._prefix(name), domain, bounds, init_val) self._design_variables_dict[name] = var return var
[docs] def make_operational_variable(self, name, domain=REAL, bounds=(None, None), init_val=None): """Create an operational variable with a localized name & store it.""" var = comando.VariableVector(self._prefix(name), domain, bounds, init_val) self._operational_variables_dict[name] = var return var
# TODO: Make the creation of derivative variable optional (and allow for # the use of state_change instead)
[docs] def declare_state(self, var, rate_of_change=None, init_state=float('nan'), der_bounds=(None, None), der_init_val=None): """Declare var to be a state. A state is an operational variable whose time derivative is described by the rate_of_change expression, beginning at an initial_state. """ if var.name.startswith(self.label): name = var.name[len(self.label) + 1:] # strip prefix else: # This can happen if the variable was not created in self! name = var.name der_s = self.make_operational_variable(f'{name}dot', var.domain, der_bounds, der_init_val) if isinstance(init_state, (comando.Symbol)): pass elif isinstance(init_state, (int, float)): # create parameter for user init_state = self.make_parameter(f'{name}_init', init_state) else: raise TypeError(f"Cannot use '{init_state}' as an initial state!") # TODO: should we already do this or wait until problem generation? if rate_of_change is not None: self.add_eq_constraint(der_s, rate_of_change, name=f'state_{name}') # TODO: Which order should we store this? self._states_dict[var] = (init_state, der_s, rate_of_change) return der_s
# TODO: Should we actually consider domains other than REAL???
[docs] def make_state(self, name, rate_of_change=None, init_state=float('nan'), domain=REAL, bounds=(None, None), der_bounds=(None, None), init_val=None, der_init_val=None): """Create a state with a localized name, its derivative and store them. A state is an operational variable whose time derivative is described by the rate_of_change expression, beginning at an initial_state. """ s = self.make_operational_variable(name, domain, bounds, init_val) return s, self.declare_state(s, rate_of_change, init_state, der_bounds, der_init_val)
def _handle_constraint(self, rel_op, lhs_expr, rhs_expr, name): _assert_algebraic(lhs_expr) _assert_algebraic(rhs_expr) token = {'Le': '≤', 'Eq': '=', 'Ge': '≥'}[rel_op] con = getattr(comando, rel_op)(lhs_expr, rhs_expr) if name is None: name = f'{lhs_expr} {token} {rhs_expr}' if not is_trivial(con, f'Constraint {name} in {self._label}'): self._constraints_dict[name] = con
[docs] def add_le_constraint(self, lhs_expr, rhs_expr, name=None): """Add a constraint of the form lhs_expr ≤ rhs_expr.""" self._handle_constraint('Le', lhs_expr, rhs_expr, name)
[docs] def add_ge_constraint(self, lhs_expr, rhs_expr, name=None): """Add a constraint of the form lhs_expr ≥ rhs_expr.""" self._handle_constraint('Ge', lhs_expr, rhs_expr, name)
[docs] def add_eq_constraint(self, lhs_expr, rhs_expr, name=None): """Add a constraint of the form lhs_expr = rhs_expr.""" self._handle_constraint('Eq', lhs_expr, rhs_expr, name)
# TODO: Logical constraints (http://ben-israel.rutgers.edu/386/Logic.pdf) ########################################################################### # def declare_alternative(self, con1_id, con2_id, name=None): # """Specify that at least one of the two constraints must hold.""" # pass # # def enforce_k(self, *con_ids, k, name=None): # """Specify that at least k of the constraints must hold.""" # pass # # def declare_exclusive(self, con1_id, con2_id, name=None): # """Specify that only a single one of the two constraints hold.""" # pass # # def declare_implication(self, con1_id, con2_id, name=None): # """Specify that if con1 holds con2 must also hold.""" # pass # # def declare_equivalence(self, con1_id, con2_id, name=None): # """Specify that either both con1 and con2 hold or none.""" # pass ########################################################################### # TODO: Either introduce a modifier of this form or a flag in the # `make_xxx_variable` methods. # def declare_semicont(self, var, bound): # """Specify that the given variable is semicontinuous. # # A semicontinuous variable can be either zero or above/below a certain # threshold value. This behavior is enfoced on the given variable by # introducing a binary variable and two constraints. # The bound argument may be positive or negative resulting in a # lower or upper semi-continuous bound respectively. # """ # name = var.name # active = self.make_operational_variable(f'{name}_active', # domain=INTEGER, bounds=(0, 1)) # # the upper/lower bound for Qdot_out is given by nominal output, but # # since that's a variable we have to write this bound as a constraint: # self.add_le_constraint(var, active * var.ub, name=f'{name}_limit') # self.add_ge_constraint(var, active * lb, name=f'{name}_threshold')
[docs] def add_expression(self, identifier, expr): """Add a named algebraic expression.""" _assert_algebraic(expr) self._expressions_dict[identifier] = comando.S(expr) return expr
[docs] def get_expression(self, identifier, default=None): """Get the expression corresponding to the identifier.""" if default is None: return self._expressions_dict[identifier] return self._expressions_dict.get(identifier, default)
[docs] def add_connector(self, id, expr): """Insert the connector in connectors and add it as an attribute.""" connector = Connector(self, id, expr) self.connectors[id] = connector setattr(self, id, connector)
# TODO: Probably we should have a simple add_connector method instead!
[docs] def add_connectors(self, id=None, expr=None, **connectors): """Add one or more named connectors to this component. Examples -------- >>> self.add_connectors('A', sympy.S('A')) >>> self.add_connectors(B=sympy.S('B'), C=sympy.S('C')) """ if id and expr is not None: # user passed (identifier, expr) if getattr(self, id, None): raise RuntimeError(f"Cannot add connector: {self} already has " f"an attribute called {id}!") _assert_algebraic(expr) self.add_connector(id, expr) else: # user passed (**connectors) for id, expr in connectors.items(): # Check if all is well first! if getattr(self, id, None): raise RuntimeError(f"Cannot add connector: {self} already " f"has an attribute called {id}!") _assert_algebraic(expr) for id, expr in connectors.items(): # Then do the work self.add_connector(id, expr)
[docs] def add_input(self, identifier, expr): """Add a connector that corresponds to an input into the component. The expr is assumed to always be positive and is thus bounded by 0 from below. By convention the input into a Component is positive so the new Connector's expression corresponds to expr. """ try: # If it's a variable, bound it corresponding to the input... expr.lb = 0 except AttributeError: # ...or add a constraint ensuring positivity self.add_le_constraint(0, expr, identifier) self.add_connector(identifier, expr)
[docs] def add_output(self, identifier, expr): """Add a connector that corresponds to an output from the component. The expr is assumed to always be positive and is thus bounded by 0 from below. By convention the output from a Component is negative so the new Connector's expression corresponds to the negated expr. """ try: # Bound the variable corresponding to the input... expr.lb = 0 except AttributeError: # ...or add a constraint ensuring positivity self.add_le_constraint(0, expr, identifier) self.add_connector(identifier, -expr)
[docs] class ConnectorUnion(Connector): """A union of multiple Connectors.""" def __init__(self, component, name, *connectors): self.component = component self.name = name self.elements = set(connectors) @property def expr(self): """Return an expression for the flow through the ConnectorUnion.""" return sum(c.expr for c in self.elements)
[docs] class System(Component): """A class for a generic system, made up of individual components. Note that a system is itself a component and can therefore function as a subsystem for a larger system, allowing for nested structures! Attributes ---------- components : iterable of `Component` components that form part of the considered energy system connections : dict mapping of `str` to an iterable of `Connector`. The in- and outputs of all connectors within an iterable are assumed to balance each other. """ def __init__(self, label, components=None, connections=None): super().__init__(label) self._components = set() if components is not None: for component in components: self.add(component) # NOTE: A 'bus' is understood to be a set of connected `Connector` # objects, while a 'connection' is a mapping of an identifier to # a bus! self.connections = {} if connections is not None: for bus_id, bus in connections.items(): self.connect(bus_id, bus) def __repr__(self): return f"System({self.label!r}, components={self._components}, " \ f"connections={self.connections})" def __iter__(self): """Iterate over the components of the system.""" for component in self.components: yield component def __getitem__(self, identifier): """Get a Connector, Parameter or Variable matching the identifier.""" # NOTE: We may add this for users confused by system scope access... # if identifier.startswith(self.label + '_'): # strip prefix # identifier = identifier[len(self.label) + 1:] for ty in ['parameters', 'design_variables', 'operational_variables', 'constraints', 'expressions']: # For all types of attributes check own then those of components... for d in [getattr(self, f'_{ty}_dict'), *[getattr(c, f'{ty}_dict') for c in self.components]]: if identifier in d: return d[identifier] raise KeyError(f"No entry for '{identifier}' found in {self}!") @property def components(self): """Get the set of components that are part of the system.""" return self._components.union(*(c.components for c in self._components if isinstance(c, System))) # Define descriptors for all symbols for ty in ['parameters', 'design_variables', 'operational_variables', 'constraints', 'expressions']: exec(f""" @property def {ty}(self): \"\"\"Get a set of the System's {ty}.\"\"\" res = set(self._{ty}_dict.values()) return res.union(*(c.{ty} for c in self.components)) """) exec(f""" @property def {ty}_dict(self): \"\"\"Get a dictionary of the System's {ty}.\"\"\" res = {{self._prefix(n): p for n, p in self._{ty}_dict.items()}} for c in self.components: res.update(c.{ty}_dict) return res """) del ty @property def states(self): """Get a set of the System's states.""" res = set(self._states_dict) return res.union(*(c.states for c in self.components)) @property def states_dict(self): """Get a set of the System's states.""" res = self._states_dict.copy() for c in self.components: res.update(c.states_dict) return res def _update_connection_constraint(self, bus_id): """Update the constraint enforcing a given connection.""" bus = self.connections[bus_id] lhs = sum(connector.expr for connector in bus) self.add_eq_constraint(lhs, 0, name=bus_id)
[docs] def close_connector(self, connector, expr=0): """Specify the flow over the connector (default 0).""" if connector.name in self.connections: specification = self.connections[connector.name] - {connector} if len(specification) != 1: raise RuntimeError('It seems that you used the identifier ' f"'{connector.name}' both as a connector " 'name and as a bus_id... You should be ' 'able to fix this by changing the bus_id ' 'in the corresponding connect statement of ' f'{self}!') spec = specification.pop() spec.expr = expr else: spec = Connector(self, f'{connector.name}_specification', expr) self.connect(connector.name, {connector, spec}) self._update_connection_constraint(connector.name)
[docs] def expose_connector(self, connector, alias=None): """Expose an existing connector to the outside of the system.""" if connector.component not in self.components: raise RuntimeError(f"Tried to expose connector '{connector.name}' " f"of component '{connector.component}', which " f"is not part of the system '{self}'!") id = alias if alias else connector.name if id in self.connectors: raise RuntimeError(f"Tried to expose connector '{connector.name}' " f"of component '{connector.component}', under " f"the name '{id}', which is alredy used. You " 'should be able to fix this by using a ' 'different alias!') self.connectors[id] = connector setattr(self, id, connector)
[docs] def extend_connection(self, bus_id, alias=None): """Extend the connection to the outside of the system.""" if bus_id not in self.connections: raise RuntimeError('Tried to extend nonexisting connection ' f"'{bus_id}'!") id = alias if alias else bus_id if id in self.connectors: raise RuntimeError(f"Tried to extend connection '{bus_id}' " f'of component {self}, as a connector with ' f"the name '{id}', which is alredy used. You " 'should be able to fix this by using a ' 'different alias!') cu = ConnectorUnion(self, id, *self.connections.pop(bus_id)) self._constraints_dict.pop(bus_id) # remove the previous constraint self.connectors[id] = cu setattr(self, id, cu)
# # TODO: blacklist variable names ending in '_extension' # varname = f'{bus_id}_extension' # v = self.operational_variables_dict.get( # varname, self.make_operational_variable(f'{bus_id}_extension') # ) # # internal connection with a dummy connector # c = Connector(self, f'{bus_id}_dummy', -v) # self.connect(bus_id, c) # # external connector exposing the connection # self.add_connectors(bus_id, v)
[docs] def connect(self, bus_id, connectors): """Connect all elements of `connectors` to a bus with id `bus_id`.""" connectors = set(connectors) # ensure connectors is a set non_members = set(c for c in connectors if c.component not in {self}.union(self.components)) if non_members: # TODO: Possibly we can just add the component here: # self.add(component) msg = '\n\t- '.join(repr(c) for c in non_members) raise RuntimeError('Attempted to connect the following connectors ' f'which are not part of the system {self} or ' f'any of its components:\n\t- {msg}') if bus_id not in self.connections: self.connections[bus_id] = set() self.connections[bus_id].update(connectors) self._update_connection_constraint(bus_id)
[docs] def detach(self, bus_id, connectors=None): """Detach all `Connector`s in `connectors` from the specified bus.""" bus = self.connections[bus_id] if connectors is None: # detach the entire bus del self.connections[bus_id] del self._constraints_dict[bus_id] else: invalid_connectors = set(c for c in connectors if c not in bus) if invalid_connectors: raise KeyError(f'The connectors {invalid_connectors} are not ' f'part of bus {bus}!') for connector in connectors: # TODO: What if the bus only has 1 connector left? bus.remove(connector) if len(bus) == 0: del self.connections[bus_id] del self._constraints_dict[bus_id] else: # if there are some connectors left self._update_connection_constraint(bus_id)
[docs] def add(self, component): """Add a `Component` to the system.""" if not isinstance(component, Component): raise RuntimeError('Tried to add something that is not a Component' f" to the system {self}") if isinstance(component, System) and self in component.components: raise RuntimeError(f'Tried to add {component} to {self} which is ' 'already a component of the former!') self._components.add(component) return component
[docs] def remove(self, component): """Remove a `Component` from the system.""" # Remove the eliminated component's connectors from all buses cc = set(component.connectors.values()) for bus_id, bus in self.connections.items(): if bus.intersection(cc): bus -= cc self._update_connection_constraint(bus_id)
[docs] def get_open_connectors(self): """Return the set of all connectors that are not connected yet.""" connectors = set() for com in self.components: for con in com.connectors.values(): connectors.add(con) # if not con.bus: # open_connectors.append(con) connected_connectors = set() for bus in self.connections.values(): for con in bus: connected_connectors.add(con) return connectors - connected_connectors
[docs] def aggregate_component_expressions(self, id, aggregator=sum): """Aggregate expressions from the components matching the identifier. The passed identifier is used to look up matching expressions in the components of this system. The resulting expressions are then aggregated using the passed aggregator function and the resulting expression is returned. """ return aggregator(c.get_expression(id, comando.Zero) for c in self.components)
[docs] def create_problem(self, design_objective=0, operational_objective=0, timesteps=None, scenarios=None, data=None, name=None): """Create a problem with the specified time and scenario structure. Arguments --------- T : The end time of the operational period (default 8760) If T is not specified, timesteps needs to be a mapping (see below). timesteps : tuple, Mapping or Series. If timesteps is tuple it is assumed to consist of timestep labels and data for the time horizon T. This data may be a scalar numeric value, a Mapping or a Series. In the latter two cases T maps from different scenarios s to corresponding time horizons T[s]. If timesteps is a Mapping, it can either be mapping from timestep labels to timestep lengths or from scenarios to a corresponding specification of the time structure, consisting of either the tuple representation or the timstep mapping. scenarios : None or an iterable of timestep labels. If scenarios is a Mapping or pandas Series, the values are interpreted as the probabilities / relative likelyhoods of the individual scenarios. """ return Problem(design_objective, operational_objective, self.constraints_dict, self.states_dict, timesteps, scenarios, data, name)
[docs] class DataProxy(comando.SlotSerializationMixin): """A proxy object to access and set data.""" from typing import Callable, Optional __slots__ = ('getter', 'setter') def __init__(self, getter: 'Callable(Optional[object])', setter: 'Callable(str, [int, float])') -> None: self.getter = getter self.setter = setter def __getitem__(self, key) -> object: """Get items from the object returned from the getter.""" return self.getter(key) def __setitem__(self, key, value) -> None: """Get items from the object returned from the getter.""" self.setter(key, value) def __delitem__(self, key, value): """Raise AttributeError since deleting items is not possible.""" raise AttributeError("can't delete item") def __getattr__(self, attr): """Defer all attribute access to the object returned by the getter.""" return getattr(self.getter(), attr) def __eq__(self, other): """Explicitly defer eq comparison.""" if isinstance(other, DataProxy): other = other.getter() return self.getter().__eq__(other) def __ne__(self, other): """Explicitly defer ne comparison.""" return ~(self.__eq__(other))
[docs] class Problem: """A simple optimization problem.""" def __init__(self, design_objective=0, operational_objective=0, constraints=None, states=None, timesteps=None, scenarios=None, data=None, name='Unnamed Problem'): self.name = name self.design_objective = design_objective self.operational_objective = operational_objective # TODO: In contrast to Components we store variables, constraints and # parameters as dicts and not as sets here. # As this can be confusing to users we should probably change it! self.constraints = {} if constraints is None \ else self._handle_duplicates(constraints) self.states = {} if states is None else states self.parameters = set() self.design_variables = set() self.operational_variables = set() self._collect_symbols() self.Delta_t = comando.Parameter(f'Delta_t_{self.name}') self._set_scenarios(scenarios) self._set_timesteps(timesteps) self._update_operational_index() self._initial_states = DataProxy(self._get_initial_states, self._set_initial_states) self._data = DataProxy(self._get_data, self._set_data) # Update parameter values with user given ones if data is not None: self.data = data def _collect_symbols(self): # NOTE: We could require the user to specify symbols as well, but # not all of them will be always be relevant. We can just get all # relevant symbols from the expressions for the objective # contributions, constraints and states... # Note that when manipulating the problem later on, care must be # taken to add or remove symbols, e.g., deleting the only # constraint with variable 'v', v will not be automatically # removed as a variable. expressions = (self._do, self._oo, *self.constraints.values(), *(e for s, (_, ds, f) in self.states.items() for e in (s, ds, f))) self.add_symbols(set().union(*(e.free_symbols for e in expressions)))
[docs] def add_symbols(self, syms): """Sort symbols into parameters, design- and operational variables.""" from comando.utility import split, indexed bvs, bps = split(syms, lambda s: s.is_Parameter) params = {p if p.parent is None else p.parent for p in bps} dvs, ovs = split(bvs, indexed) fake_dvs, real_dvs = split(dvs, lambda dv: dv.parent is None) ovs.update({fdv.parent for fdv in fake_dvs}) self.parameters.update(params) self.design_variables.update(real_dvs) self.operational_variables.update(ovs)
def _set_scenarios(self, scenarios): # TODO: maybe we should rename scenarios to probabilities and # introduce a tuple of scenarios as another attribute! if scenarios is None: self._scenario_weights = None elif isinstance(scenarios, (Mapping, Series)): self._scenario_weights = Series(scenarios, Index(scenarios.keys(), name='s'), name='pi') elif isinstance(scenarios, Iterable): self._scenario_weights = Series(1/len(scenarios), Index(scenarios, name='s'), name='pi') else: raise TypeError('scenarios should be an iterable of ' 'scenario-labels, a mapping from ' 'scenario-labels to the associated ' 'probabilities or `None`!') def _norm_timesteps(self, timesteps): """Normalize the time structure representation.""" if isinstance(timesteps, (Mapping, Series)): index = Index(timesteps.keys(), name='t') return Series(timesteps, index, name='timesteps') elif isinstance(timesteps[0], Iterable): # same length for all timesteps labels, T = timesteps index = Index(labels, name='t') return Series(T/len(labels), index, name='timesteps') else: raise ValueError('timesteps should be an iterable of ' 'timestep-labels, a mapping from timestep-labels ' 'to the associated timestep-length, or `None`!') def _set_timesteps(self, timesteps): scenarios = self.scenarios if scenarios is None: if timesteps is None: # TODO: We may relaxe this in the future, to allow for pure # design problems! raise ValueError('Either timesteps or scenarios must be ' 'specified!') # e.g. timesteps = {'t1': 1, 't2': 4, ...} or (['t1', ...], 24) self._timesteps = self._norm_timesteps(timesteps) self._uniform_timesteps = True else: if timesteps is None: self._timesteps = timesteps self._uniform_timesteps = True elif isinstance(timesteps, (Mapping, Series)): try: # to see whether timesteps is already a Multiindex Series if len(timesteps.index.levels) == 2: # looks like the user knows what they're doing! timesteps.index.rename(['s', 't'], inplace=True) timesteps.rename('timesteps', inplace=True) self._timesteps = timesteps # now we need to see whether timesteps are uniform first = timesteps[next(iter(scenarios))] for s in scenarios: try: if all(timesteps[s] == first): continue break except ValueError: break else: self._uniform_timesteps = True return self._uniform_timesteps = False return else: raise ValueError('Incorrect format of timesteps!') except AttributeError: pass first_key = next(iter(timesteps.keys())) first_value = timesteps[first_key] if isinstance(first_value, (Mapping, Series, tuple)): # e.g. timesteps = {'a': {'t1': 1, ...}, # 'b': (['t1', ...], 5)} data = {(s, t): dti for s in scenarios for t, dti in self._norm_timesteps(timesteps[s]).items()} index = MultiIndex.from_tuples(data, names=['s', 't']) self._timesteps = Series(data, index, name='timesteps') self._uniform_timesteps = False else: # e.g. timesteps = {'t1': 1, 't2': 4, ...} timesteps = self._norm_timesteps(timesteps) index = MultiIndex.from_product([scenarios, timesteps.keys()], names=['s', 't']) self._timesteps = \ Series((dti for s in scenarios for ti, dti in timesteps.items()), index, name='timesteps') self._uniform_timesteps = True else: # timesteps = labels, T labels, T = timesteps index = MultiIndex.from_product([scenarios, labels], names=['s', 't']) if isinstance(T, (Mapping, Series)): # scenario specific T self._timesteps = \ Series((dti for s in scenarios for dti in self._norm_timesteps([labels, T[s]])), index, name='timesteps') self._uniform_timesteps = False # same T for all scenarios timesteps = self._norm_timesteps(timesteps) self._timesteps = Series((dts for s in scenarios for dts in timesteps), index, name='timesteps') self._uniform_timesteps = True def _update_operational_index(self): """Update the index of operational symbols.""" for p in self.parameters: if p.indexed: # TODO: Reusing old data may not be what users expect or want, # Maybe it is better to force them to reset values. # On the other hand, scalar default values may have been # specified during Component modelling, so the best # option is likely to store these defaults during problem # initialization try: # To re-use existing data, e.g., when extending timesteps p.value = Series(p.value, self.index, float) except ValueError: # Default to nan p.value = Series(float('nan'), self.index, float) for ov in self.operational_variables: # TODO: This somehow causes VariableVector elements to be modified, # even if the index stays the same! # Oddly enough explicitly creating a variable in this loop # stops this from happening, i.e. if ov contains variables # named f'{ov.name}[1]' and f'{ov.name}[2]', adding the # following line: # comando.Variable(f'{ov.name}[1]') # will result in f'{ov.name}[1]' staying the same and # f'{ov.name}[2]' being replaced by another object! ov.instantiate(self.index) self.Delta_t.value = self._timesteps def _handle_duplicates(self, constraints): rev_dict = {} for con_id, con in constraints.items(): if con in rev_dict: print(f'INFO: Identifiers {rev_dict[con]} and {con_id} map to ' 'the same constraint! Keeping the former...') else: rev_dict[con] = con_id return {con_id: constraints[con_id] for con_id in rev_dict.values()} def __iter__(self): """Prevent iteration that is implicitly introduced with __getitem__.""" raise TypeError('Component object is not iterable') def __getitem__(self, identifier): """Get a Symbol or Expression matching the identifier.""" for s in [self.parameters, self.design_variables, self.operational_variables]: # TODO: This seems unnecessarily costly! d = {e.name: e for e in s} if identifier in d: return d[identifier] raise KeyError(f"No entry for '{identifier}' found in {self}!") def __setitem__(self, identifier, value): """Set the value of a Parameter.""" # Simple solution (requiring scalar or exactly matching dimension): # self.parameter_values[identifier] = value # return # if self.index is None: # # TODO this souldn't happen # raise NotImplementedError('Setting values without having timesteps' # 'specified has not been implemented yet') # TODO: unnecessarily costly if we only want to set Parameter values # item = self.parameters[identifier] item = self[identifier] if isinstance(value, Iterable): if item in self.design_variables: raise RuntimeError('Attempted to set value of design variable ' f'{item} with vector valued data!') # TODO If the value's index changes here this will raise an error! index = self.index data = Series(item.value, index, float, identifier) len_i = len(self.index) scenarios = self.scenarios len_s = 0 if scenarios is None else len(scenarios) if isinstance(value, Mapping): for k in index: try: # Get just the necessary data from the Mapping data[k] = value[k] except KeyError: pass # Stick to the previous data elif len(value) == len_i: # complete, ordered specification for k, v in zip(data.index, value): data[k] = v # NOTE: From here on it is assumed that the index consist of # scenario-time pairs and we interpret `value` as # either time or scenario dependent elif self._uniform_timesteps: if scenarios is None: len_t = len(self._timesteps) else: len_t = len(self._timesteps[next(iter(scenarios))]) if len(value) == len_t: # time dependent # NOTE: If the numbers of timesteps and scenarios are # identical we assume time-dependence! for s in scenarios: data[s] = value elif len(value) == len_s: # scenario dependent for i, s in enumerate(scenarios): for t in self.timesteps[s].keys(): data[s, t] = value[i] else: if len_t == len_s: msg = (f'{len_t} for time-dependent data, or {len_i} ' 'for data depending on both scenario and time') else: msg = (f'{len_t} for time-dependent data, {len_s} for ' f'scenario-dependent data or {len_i} for ' 'data depending on both scenario and time') raise ValueError('Value must be a scalar, a Mapping or an ' 'Iterable with appropriate length ' f'({msg})!') item.value = data else: # Value is a scalar, store it as such! item.value = value return def __repr__(self): return f'Problem(name={self.name!r})' @property def index(self): """Get the index of the Problem.""" return (self._scenario_weights.index if self._timesteps is None else self._timesteps.index) @property def T(self): """Get the Problem's end time.""" if self._scenario_weights is None: return sum(self._timesteps) return self._timesteps.sum(level=0).rename('T') @T.setter def T(self, T): """Set the Problem's end time. T : number, Mapping or Series The length of the time horizon. """ if self._scenario_weights is None: try: T = float(T) except TypeError: raise ValueError('T must be a number!') old_T = self.T if old_T == 0: # uniform timesteps self._timesteps = \ self._norm_timesteps((self._timesteps.index, T)) else: # Rescale timesteps self._timesteps *= T/old_T else: # self._scenario_weights is not None if isinstance(T, (Mapping, Series)): for s, old_Ti in self.T.items(): Ti = T[s] try: Ti = float(Ti) except TypeError: raise ValueError('T must contain numbers!') if old_Ti == 0: # uniform timesteps self._timesteps[s][:] = \ self._norm_timesteps((self._timesteps.index, Ti)) else: # Rescale timesteps self._timesteps[s][:] *= Ti/old_Ti else: try: T = float(T) except TypeError: raise ValueError('T must be a number, Mapping or Series!') for s, old_Ti in self.T.items(): if old_Ti == 0: # uniform timesteps self._timesteps[s][:] = \ self._norm_timesteps((self._timesteps.index, T)) else: # Rescale timesteps self._timesteps[s][:] *= T/old_Ti @property def timesteps(self): """Get the length of the Problem's timesteps.""" return None if self._timesteps is None else self._timesteps.copy() @timesteps.setter def timesteps(self, timesteps): """Update the Problem's timesteps. Arguments --------- timesteps : None or an iterable of timestep labels. If timesteps is a Mapping or pandas Series, the values are interpreted as the lengths of the individual timesteps and T is taken to be their sum. T : The end time of the operational period (default 8760) If T is not specified, timesteps needs to be a mapping. """ self._set_timesteps(timesteps) self._update_operational_index() @property def scenarios(self): """Get the Problem's scenarios.""" return None if self._scenario_weights is None \ else tuple(self._scenario_weights.keys()) @scenarios.setter def scenarios(self, scenarios, timesteps=None): """Update the Problem's scenarios. scenarios : None or an iterable of timestep labels. If scenarios is a Mapping or pandas Series, the values are interpreted as the probabilities / relative likelyhoods of the individual scenarios. """ if timesteps is None: if self._uniform_timesteps: if self._scenario_weights is None: timesteps = self.timesteps else: s, timesteps = next(iter(self.timesteps.groupby('s'))) timesteps = timesteps[s] else: raise ValueError('Tried to update scenarios for a problem ' 'without uniform timesteps.\nYou need to ' 'either make timesteps uniform first or ' 'specify scenario-specific timesteps.') self._set_scenarios(scenarios) self._set_timesteps(timesteps) self._update_operational_index() @property def scenario_weights(self): """Get the Problem's scenario weights.""" return None if self._scenario_weights is None \ else self._scenario_weights.copy() @scenario_weights.setter def scenario_weights(self, weights): """Update the Problem's scenario weights.""" if {*self._scenario_weights.keys()} == {weights.keys()}: self._scenario_weights.update(weights) @property def design_objective(self): return self._do @design_objective.setter def design_objective(self, do): self._do = comando.sympify(do) @property def operational_objective(self): return self._oo @operational_objective.setter def operational_objective(self, oo): self._oo = comando.sympify(oo)
[docs] def weighted_sum(self, op_expr, symbolic=True): """Compute the scenario- and/or time-weighted sum of an expression. Arguments --------- op_expr : Expression an operational expression that is to be weighted symbolic : bool if True creates a new expression (default) if False evaluate numerically """ if symbolic: # from comando.utility import _idx_parse # sym_map = {sym: sym for sym in op_expr.free_symbols} # op_map = comando.op_map def func(idx): # return _idx_parse(op_expr, sym_map, op_map, idx, float) sym_map = {sym: sym[idx] for sym in op_expr.free_symbols if sym.indexed} return op_expr.subs(sym_map) else: from comando.utility import evaluate vals = evaluate(op_expr) def func(idx): return vals[idx] if self._timesteps is None: return sum(p * func(s) for s, p in self._scenario_weights.items()) if self._scenario_weights is None: return sum(dti * func(t) for t, dti in self._timesteps.items()) return sum(p * dti * func((s, t)) for s, p in self._scenario_weights.items() for t, dti in self._timesteps[s].items())
@property def objective(self): """Get the objective expression of the problem.""" return self._do + self.weighted_sum(self._oo) @property def num_vars(self): """Get the total number of variables.""" return len(self.design_variables) \ + len(self.index) * len(self.operational_variables) @property def num_cons(self): """Get the total number of variables.""" dcons, ocons = split(self.constraints, indexed) return len(dcons) + len(self.index) * (len(ocons) + len(self.states)) @property def initial_states(self): """Aggregate and return the parameter data.""" return self._initial_states def _get_initial_states(self, key=None): """Get an overview over initial state types and values.""" if key is None: if self.scenarios: initial_states = DataFrame(index=self.scenarios, dtype=object) else: initial_states = Series(dtype=object) def rep(iv): if iv.is_Parameter: return iv.value if iv.is_Variable: return f'{iv.lb}{iv.value}{iv.ub}' raise TypeError("Initial Values should be of type Parameter or Variable!") for state, (iv, *_) in self.states.items(): if iv.indexed: initial_states[state.name] = iv.expansion.apply(rep) else: if self.scenarios: initial_states[state.name] = [rep(iv)] * len(self.scenarios) else: initial_states[state.name] = rep(iv) return initial_states.fillna(u'\U0001F504') else: for state, (iv, *_) in self.states.items(): if state.name == key: return iv else: raise KeyError(f"No initial value with the name '{key}'!") def _set_initial_states(self, state_name, value): """Provide values for a particular initial state of the problem.""" for state in self.states: if state.name == state_name: break else: raise KeyError(f"No state named {state_name} in problem!") i_state = self.states[state][0] # numerical value or cyclic (= nan) initial condition if isinstance(value, (int, float)): i_state.value = value else: _value = i_state.value try: i_state.value = Series(value, self.scenarios, float, i_state.name) if self.scenarios \ else value except ValueError: i_state.value = _value raise ValueError('Value does not match problem index!') # TODO: Rename data to values and make it a property with getters/setters # for the values of parameters AND variables? @property def data(self): """Aggregate and return the parameter data.""" return self._data def _get_data(self, key=None): if key is None: data = DataFrame(index=self.index) for par in self.parameters: try: data[par.name] = par.value.values except AttributeError: data[par.name] = par.value return data else: return self[key].value def _set_data(self, p_name, value): """Provide values for a particular parameter of the problem.""" p = {p.name: p for p in self.parameters}[p_name] _value = p.value try: p.value = value Problem._get_data(self) except ValueError: p.value = _value raise ValueError('Value does not match problem index!') @data.setter def data(self, data): """Provide values for multiple parameters of the problem.""" for par in self.parameters: try: entry = data[par.name] except KeyError: if par.value is None: print(f"WARNING: No data for parameter '{par}'!") else: print(f"INFO: Using default data for parameter '{par}'!") continue try: # To determine whether entry is a mappig... # NOTE: We're not using values her since it is an attribute for # normal Mappings but a property for pandas Series and # DataFrames! values = [v for k, v in entry.items()] # ...with identical values (then use scalar) or different ones if len({*values}) == 1: par.value = values[0] else: index = self.index scenarios = self.scenario_weights.index try: timesteps = self._timesteps[next(iter(scenarios))] except TypeError: timesteps = None if len(values) == len(index): par.value = Series(values, index, float, par.name) elif len(values) == len(scenarios): par.value = Series(values, scenarios, float, par.name) elif self._uniform_timesteps \ and len(values) == len(timesteps): par.value = Series(values, timesteps, float, par.name) else: raise ValueError(f'Data for Parameter {par.name} does ' 'not match length of Problem index, ' 'timesteps or scenarios!') except AttributeError: # Nope, it's a scalar par.value = entry @property def design(self): """Collect the design variable values in a DataFrame.""" dv = self.design_variables index = Index((v.name for v in dv), name='name') dv_data = Series(index=index, dtype=float, name='value') for v in dv: dv_data[v.name] = v.value return DataFrame(dv_data).sort_index() @design.setter def design(self, data): """Set the values of the design variables with the given data.""" for var in self.design_variables: try: var.value = data[var.name] except KeyError: print(f"WARNING: No data for variable '{var}'!") @property def operation(self): """Collect the operational variable values in a DataFrame.""" ov_data = DataFrame() for v in self.operational_variables: ov_data[v.name] = v.value res = ov_data.T.sort_index() res.index.rename('name', inplace=True) return res @operation.setter def operation(self, data): """Set the values of the operational variables with the given data.""" for var in self.operational_variables: try: var.value = data.loc[var.name] except KeyError: print(f"WARNING: No data for variable '{var}'!")
[docs] def store_variable_values(self, filename): """Serialize current variable values and store in file.""" import os import pickle if os.path.isfile(filename): while True: overwrite = input('File already exists. Overwrite? [y/n]\n') if overwrite.lower() in ('y', 'yes'): break if overwrite.lower() in ('n', 'no'): return with open(filename, 'wb') as file: pickle.dump((self.design, self.operation), file, protocol=pickle.HIGHEST_PROTOCOL)
[docs] def load_variable_values(self, filename): """Collect variable values from file.""" import pickle with open(filename, 'rb') as file: dv, ov = pickle.load(file) self.design = dv self.operation = ov
[docs] def get_constraint_violations(self, larger_than=0): """Collect the current constraint violations in a DataFrame.""" from comando.utility import evaluate res = Series(dtype=float, name='violation') for con_id, con in self.constraints.items(): try: # works for inequalities viol = evaluate(con.lts - con.gts) except AttributeError: viol = abs(evaluate(con.lhs - con.rhs)) try: # works for indexed constraints for i in viol.index: res[f'{con_id}[{i}]'] = viol[i] except AttributeError: res[con_id] = viol return res.sort_index()[res >= larger_than]
[docs] def subs(self, sym=None, rep=None, **reps): """Substitute an individual or multiple symbols in the problem.""" if reps: # keys are symbol identifiers reps = {self[sym]: rep for sym, rep in reps.items()} elif sym is not None and rep is not None: # single substitution if not isinstance(sym, comando.Symbol): # sym may be an identifier sym = self[sym] reps = {sym: rep} elif isinstance(sym, Mapping): # keys may be identifiers or symbols reps = {self[sym] if not isinstance(sym, comando.Symbol) else sym: rep for sym, rep in reps.items()} else: raise RuntimeError('Either provide a Mapping, a single symbol and ' 'replacement via separate arguments or a list ' 'of replacements via keyword arguments!') do = self._do.subs(reps) oo = self._oo.subs(reps) cons = self._handle_duplicates({c_id: con.subs(reps) for c_id, con in self.constraints.items()}) states = {key.subs(reps): (init_state.subs(reps), der.subs(reps), rate_of_change.subs(reps)) for key, (init_state, der, rate_of_change) in self.states.items()} # If we got here all went well, so we can update the symbol sets self._do = do self._oo = oo self.constraints = cons self.states = states self._collect_symbols()