"""Package for generic modeling of energy system design and operation."""
# This file is part of the COMANDO project which is released under the MIT
# license. See file LICENSE for full license details.
#
# AUTHORS: Marco Langiu
import comando
from collections.abc import Iterable, Mapping
from pandas import DataFrame, Index, MultiIndex, Series, options
from comando.utility import _assert_algebraic, split, indexed, get_type_name
options.display.float_format = '{:,.4g}'.format
REAL, INTEGER, BINARY = comando.Domain
[docs]
class Connector:
"""An interface used to connect one or more `Component` objects."""
def __init__(self, component, name, expr):
self.component = component
self.name = name
self.expr = expr
def __repr__(self):
return f"Connector({self.component!r}, {self.name!r}, {self.expr!r})"
[docs]
class ImpossibleConstraintException(Exception): pass
[docs]
def is_trivial(constraint, con_repr=None, warn=True):
"""Handle constraints that are trivially true or false."""
# NOTE: We need to manually check for two alternative representations of
# Inequalities, since they appear to be implemented in a
# nonsymmetrical way. This is a bug in Sympy, also see:
# https://github.com/sympy/sympy/issues/20372
if get_type_name(constraint) in {'LessThan', 'GreaterThan'}:
alt_repr = comando.Le(*reversed(constraint.args))
alternatives = [constraint, alt_repr]
else:
alternatives = [constraint]
if con_repr is None:
con_repr = 'Constraint'
for constraint in alternatives:
if constraint is comando.true:
if warn:
print(f"\nWARNING: {con_repr} is always satisfied, "
"constraint will be skipped!\n")
return True
elif constraint is comando.false:
raise ImpossibleConstraintException(f"{con_repr} can never be "
"satisfied!")
elif comando.get_backend() is not comando.sympy:
return # Cannot simplify
try:
if bool(constraint):
if warn:
print(f"\nWARNING: {con_repr} is always satisfied, "
"constraint will be skipped!\n")
return True
else:
raise ImpossibleConstraintException(f"{con_repr} can never be "
"satisfied!")
except TypeError:
pass
return False
# TODO: Generalize the concept of objective to design cost and operational cost
# with appropriate annualization in the EnergySystem class. For example,
# when minimizing CO2 production, a Component's design cost might be the
# Carbon footprint for its production and its operational cost might be
# the CO2 ouput caused by operation.
[docs]
class Component:
"""A component representing a part of an energy system.
The Component class is a model of a generic real-world component of an
energy system, represented by a collection of algebraic and logical
expressions which describe how the component can be controlled, its
limitations and its interactions with other components.
For every component, two kinds of decisions may be taken:
- design decisions a.k.a. 'here-and-now' decisions
- operational decisions a.k.a. 'wait-and-see' decisions
The former constitute decisions that are taken once, prior to all operation
while the operational decisions need to be taken for every time-point under
consideration.
The component may define named expressions that can be accessed at any time
for informational purposes. These expressions may also be used to impose
additional constraints at a later stage, or to aggregate information from
multiple different components. In a similar way an objective for a system
optimization can be generated by aggregating different types of costs
defined by some subset of the system's components.
Parameters that are contained in the various expressions may or may not be
given a default value that can be changed at a later stage.
Attributes
----------
label : `str`
A unique sting that serves as an identifier
parameters : `set` of `Symbol`
Set of unspecified system parameters the user can assign values to.
design_variables : `set` of `Variable`
Set of variables of the design stage, i.e. time-independent variables
such as the number or size of newly added components.
operational_variables : `set` of `Variable`
Set of unique names for variables of the operational stage, e.g.
the output of a given component or an operational state.
states : dict
Dictionary mapping a subset of operational_variables to automatically
created Variables, representing the corresponding time derivatives.
Examples are the SOC of a battery, the fill-level of a tank, etc.
constraints : `dict` of `sympy.core.relational.Relational`
Dictionary of relational sympy expressions that represent constraints.
expressions : `dict`
mapping of shared identifiers to algebraic sympy expressions.
The expressions different `Component` instances associate with a shared
identifier may be aggregated to additional constraints or objectives.
connectors : `dict`
mapping of strings to algebraic expressions representing in or outputs
to the component. When multiple connectors are connected to each other,
the connection represents the requirement that the sum of their
expressions need to be equal to zero.
"""
existing_components = dict()
def __init__(self, label):
if label in self.existing_components:
raise RuntimeError(f"A component with the label {label} has "
"already been defined!")
self.existing_components[label] = self
self._label = label
# self._parameters = set()
self._parameters_dict = dict()
# self._design_variables = set()
self._design_variables_dict = dict()
# self._operational_variables = set()
self._operational_variables_dict = dict()
self._constraints_dict = dict()
self._expressions_dict = dict()
self._states_dict = dict()
self.connectors = dict()
def _prefix(self, name):
"""Prefix the given name to obtain a unique identifier."""
return f'{self.label}_{name}'
def __repr__(self):
return f"Component({self.label!r})"
def __iter__(self):
"""Prevent iteration that is implicitly introduced with __getitem__."""
raise TypeError('Component object is not iterable')
def __getitem__(self, identifier):
"""Get a Connector, Parameter or Variable matching the identifier."""
# NOTE: We may add this for users confused by system scope access...
# if identifier.startswith(self.label + '_'): # strip prefix
# identifier = identifier[len(self.label) + 1:]
for d in [self._parameters_dict, self._design_variables_dict,
self._operational_variables_dict, self._constraints_dict,
self._expressions_dict]:
if identifier in d:
return d[identifier]
# TODO: also search components?
raise KeyError(f"No entry for '{identifier}' found in {self}!")
@property
def label(self):
"""Get the Component's unique label."""
return self._label
# Define descriptors for all symbols, constraints and expressions
for ty in ['parameters', 'design_variables', 'operational_variables',
'constraints', 'expressions']:
exec(f"""
@property
def {ty}(self):
\"\"\"Get a set of the Component's {ty}.\"\"\"
return set(self._{ty}_dict.values())
""")
exec(f"""
@property
def {ty}_dict(self):
\"\"\"Get a dictionary of the Component's {ty}.\"\"\"
return {{self._prefix(n): p for n, p in self._{ty}_dict.items()}}
""")
del ty
@property
def states(self):
return set(self._states_dict)
@property
def states_dict(self):
return self._states_dict.copy()
[docs]
def make_parameter(self, name, value=None):
"""Create a parameter with a localized name & store it."""
par = comando.Parameter(self._prefix(name), value=value)
# self._parameters.add(par)
self._parameters_dict[name] = par
return par
[docs]
def make_design_variable(self, name, domain=REAL, bounds=(None, None),
init_val=None):
"""Create a design variable with a localized name & store it."""
var = comando.Variable(self._prefix(name), domain, bounds, init_val)
self._design_variables_dict[name] = var
return var
[docs]
def make_operational_variable(self, name, domain=REAL,
bounds=(None, None), init_val=None):
"""Create an operational variable with a localized name & store it."""
var = comando.VariableVector(self._prefix(name), domain, bounds,
init_val)
self._operational_variables_dict[name] = var
return var
# TODO: Make the creation of derivative variable optional (and allow for
# the use of state_change instead)
[docs]
def declare_state(self, var, rate_of_change=None, init_state=float('nan'),
der_bounds=(None, None), der_init_val=None):
"""Declare var to be a state.
A state is an operational variable whose time derivative is described
by the rate_of_change expression, beginning at an initial_state.
"""
if var.name.startswith(self.label):
name = var.name[len(self.label) + 1:] # strip prefix
else: # This can happen if the variable was not created in self!
name = var.name
der_s = self.make_operational_variable(f'{name}dot', var.domain,
der_bounds, der_init_val)
if isinstance(init_state, (comando.Symbol)):
pass
elif isinstance(init_state, (int, float)): # create parameter for user
init_state = self.make_parameter(f'{name}_init', init_state)
else:
raise TypeError(f"Cannot use '{init_state}' as an initial state!")
# TODO: should we already do this or wait until problem generation?
if rate_of_change is not None:
self.add_eq_constraint(der_s, rate_of_change, name=f'state_{name}')
# TODO: Which order should we store this?
self._states_dict[var] = (init_state, der_s, rate_of_change)
return der_s
# TODO: Should we actually consider domains other than REAL???
[docs]
def make_state(self, name, rate_of_change=None, init_state=float('nan'),
domain=REAL, bounds=(None, None), der_bounds=(None, None),
init_val=None, der_init_val=None):
"""Create a state with a localized name, its derivative and store them.
A state is an operational variable whose time derivative is described
by the rate_of_change expression, beginning at an initial_state.
"""
s = self.make_operational_variable(name, domain, bounds, init_val)
return s, self.declare_state(s, rate_of_change, init_state,
der_bounds, der_init_val)
def _handle_constraint(self, rel_op, lhs_expr, rhs_expr, name):
_assert_algebraic(lhs_expr)
_assert_algebraic(rhs_expr)
token = {'Le': '≤', 'Eq': '=', 'Ge': '≥'}[rel_op]
con = getattr(comando, rel_op)(lhs_expr, rhs_expr)
if name is None:
name = f'{lhs_expr} {token} {rhs_expr}'
if not is_trivial(con, f'Constraint {name} in {self._label}'):
self._constraints_dict[name] = con
[docs]
def add_le_constraint(self, lhs_expr, rhs_expr, name=None):
"""Add a constraint of the form lhs_expr ≤ rhs_expr."""
self._handle_constraint('Le', lhs_expr, rhs_expr, name)
[docs]
def add_ge_constraint(self, lhs_expr, rhs_expr, name=None):
"""Add a constraint of the form lhs_expr ≥ rhs_expr."""
self._handle_constraint('Ge', lhs_expr, rhs_expr, name)
[docs]
def add_eq_constraint(self, lhs_expr, rhs_expr, name=None):
"""Add a constraint of the form lhs_expr = rhs_expr."""
self._handle_constraint('Eq', lhs_expr, rhs_expr, name)
# TODO: Logical constraints (http://ben-israel.rutgers.edu/386/Logic.pdf)
###########################################################################
# def declare_alternative(self, con1_id, con2_id, name=None):
# """Specify that at least one of the two constraints must hold."""
# pass
#
# def enforce_k(self, *con_ids, k, name=None):
# """Specify that at least k of the constraints must hold."""
# pass
#
# def declare_exclusive(self, con1_id, con2_id, name=None):
# """Specify that only a single one of the two constraints hold."""
# pass
#
# def declare_implication(self, con1_id, con2_id, name=None):
# """Specify that if con1 holds con2 must also hold."""
# pass
#
# def declare_equivalence(self, con1_id, con2_id, name=None):
# """Specify that either both con1 and con2 hold or none."""
# pass
###########################################################################
# TODO: Either introduce a modifier of this form or a flag in the
# `make_xxx_variable` methods.
# def declare_semicont(self, var, bound):
# """Specify that the given variable is semicontinuous.
#
# A semicontinuous variable can be either zero or above/below a certain
# threshold value. This behavior is enfoced on the given variable by
# introducing a binary variable and two constraints.
# The bound argument may be positive or negative resulting in a
# lower or upper semi-continuous bound respectively.
# """
# name = var.name
# active = self.make_operational_variable(f'{name}_active',
# domain=INTEGER, bounds=(0, 1))
# # the upper/lower bound for Qdot_out is given by nominal output, but
# # since that's a variable we have to write this bound as a constraint:
# self.add_le_constraint(var, active * var.ub, name=f'{name}_limit')
# self.add_ge_constraint(var, active * lb, name=f'{name}_threshold')
[docs]
def add_expression(self, identifier, expr):
"""Add a named algebraic expression."""
_assert_algebraic(expr)
self._expressions_dict[identifier] = comando.S(expr)
return expr
[docs]
def get_expression(self, identifier, default=None):
"""Get the expression corresponding to the identifier."""
if default is None:
return self._expressions_dict[identifier]
return self._expressions_dict.get(identifier, default)
[docs]
def add_connector(self, id, expr):
"""Insert the connector in connectors and add it as an attribute."""
connector = Connector(self, id, expr)
self.connectors[id] = connector
setattr(self, id, connector)
# TODO: Probably we should have a simple add_connector method instead!
[docs]
def add_connectors(self, id=None, expr=None, **connectors):
"""Add one or more named connectors to this component.
Examples
--------
>>> self.add_connectors('A', sympy.S('A'))
>>> self.add_connectors(B=sympy.S('B'), C=sympy.S('C'))
"""
if id and expr is not None: # user passed (identifier, expr)
if getattr(self, id, None):
raise RuntimeError(f"Cannot add connector: {self} already has "
f"an attribute called {id}!")
_assert_algebraic(expr)
self.add_connector(id, expr)
else: # user passed (**connectors)
for id, expr in connectors.items(): # Check if all is well first!
if getattr(self, id, None):
raise RuntimeError(f"Cannot add connector: {self} already "
f"has an attribute called {id}!")
_assert_algebraic(expr)
for id, expr in connectors.items(): # Then do the work
self.add_connector(id, expr)
[docs]
def add_output(self, identifier, expr):
"""Add a connector that corresponds to an output from the component.
The expr is assumed to always be positive and is thus bounded by 0 from
below. By convention the output from a Component is negative so the new
Connector's expression corresponds to the negated expr.
"""
try: # Bound the variable corresponding to the input...
expr.lb = 0
except AttributeError: # ...or add a constraint ensuring positivity
self.add_le_constraint(0, expr, identifier)
self.add_connector(identifier, -expr)
[docs]
class ConnectorUnion(Connector):
"""A union of multiple Connectors."""
def __init__(self, component, name, *connectors):
self.component = component
self.name = name
self.elements = set(connectors)
@property
def expr(self):
"""Return an expression for the flow through the ConnectorUnion."""
return sum(c.expr for c in self.elements)
[docs]
class System(Component):
"""A class for a generic system, made up of individual components.
Note that a system is itself a component and can therefore function as a
subsystem for a larger system, allowing for nested structures!
Attributes
----------
components : iterable of `Component`
components that form part of the considered energy system
connections : dict
mapping of `str` to an iterable of `Connector`. The in- and outputs of
all connectors within an iterable are assumed to balance each other.
"""
def __init__(self, label, components=None, connections=None):
super().__init__(label)
self._components = set()
if components is not None:
for component in components:
self.add(component)
# NOTE: A 'bus' is understood to be a set of connected `Connector`
# objects, while a 'connection' is a mapping of an identifier to
# a bus!
self.connections = {}
if connections is not None:
for bus_id, bus in connections.items():
self.connect(bus_id, bus)
def __repr__(self):
return f"System({self.label!r}, components={self._components}, " \
f"connections={self.connections})"
def __iter__(self):
"""Iterate over the components of the system."""
for component in self.components:
yield component
def __getitem__(self, identifier):
"""Get a Connector, Parameter or Variable matching the identifier."""
# NOTE: We may add this for users confused by system scope access...
# if identifier.startswith(self.label + '_'): # strip prefix
# identifier = identifier[len(self.label) + 1:]
for ty in ['parameters', 'design_variables', 'operational_variables',
'constraints', 'expressions']:
# For all types of attributes check own then those of components...
for d in [getattr(self, f'_{ty}_dict'),
*[getattr(c, f'{ty}_dict') for c in self.components]]:
if identifier in d:
return d[identifier]
raise KeyError(f"No entry for '{identifier}' found in {self}!")
@property
def components(self):
"""Get the set of components that are part of the system."""
return self._components.union(*(c.components for c in self._components
if isinstance(c, System)))
# Define descriptors for all symbols
for ty in ['parameters', 'design_variables', 'operational_variables',
'constraints', 'expressions']:
exec(f"""
@property
def {ty}(self):
\"\"\"Get a set of the System's {ty}.\"\"\"
res = set(self._{ty}_dict.values())
return res.union(*(c.{ty} for c in self.components))
""")
exec(f"""
@property
def {ty}_dict(self):
\"\"\"Get a dictionary of the System's {ty}.\"\"\"
res = {{self._prefix(n): p for n, p in self._{ty}_dict.items()}}
for c in self.components:
res.update(c.{ty}_dict)
return res
""")
del ty
@property
def states(self):
"""Get a set of the System's states."""
res = set(self._states_dict)
return res.union(*(c.states for c in self.components))
@property
def states_dict(self):
"""Get a set of the System's states."""
res = self._states_dict.copy()
for c in self.components:
res.update(c.states_dict)
return res
def _update_connection_constraint(self, bus_id):
"""Update the constraint enforcing a given connection."""
bus = self.connections[bus_id]
lhs = sum(connector.expr for connector in bus)
self.add_eq_constraint(lhs, 0, name=bus_id)
[docs]
def close_connector(self, connector, expr=0):
"""Specify the flow over the connector (default 0)."""
if connector.name in self.connections:
specification = self.connections[connector.name] - {connector}
if len(specification) != 1:
raise RuntimeError('It seems that you used the identifier '
f"'{connector.name}' both as a connector "
'name and as a bus_id... You should be '
'able to fix this by changing the bus_id '
'in the corresponding connect statement of '
f'{self}!')
spec = specification.pop()
spec.expr = expr
else:
spec = Connector(self, f'{connector.name}_specification', expr)
self.connect(connector.name, {connector, spec})
self._update_connection_constraint(connector.name)
[docs]
def expose_connector(self, connector, alias=None):
"""Expose an existing connector to the outside of the system."""
if connector.component not in self.components:
raise RuntimeError(f"Tried to expose connector '{connector.name}' "
f"of component '{connector.component}', which "
f"is not part of the system '{self}'!")
id = alias if alias else connector.name
if id in self.connectors:
raise RuntimeError(f"Tried to expose connector '{connector.name}' "
f"of component '{connector.component}', under "
f"the name '{id}', which is alredy used. You "
'should be able to fix this by using a '
'different alias!')
self.connectors[id] = connector
setattr(self, id, connector)
[docs]
def extend_connection(self, bus_id, alias=None):
"""Extend the connection to the outside of the system."""
if bus_id not in self.connections:
raise RuntimeError('Tried to extend nonexisting connection '
f"'{bus_id}'!")
id = alias if alias else bus_id
if id in self.connectors:
raise RuntimeError(f"Tried to extend connection '{bus_id}' "
f'of component {self}, as a connector with '
f"the name '{id}', which is alredy used. You "
'should be able to fix this by using a '
'different alias!')
cu = ConnectorUnion(self, id, *self.connections.pop(bus_id))
self._constraints_dict.pop(bus_id) # remove the previous constraint
self.connectors[id] = cu
setattr(self, id, cu)
# # TODO: blacklist variable names ending in '_extension'
# varname = f'{bus_id}_extension'
# v = self.operational_variables_dict.get(
# varname, self.make_operational_variable(f'{bus_id}_extension')
# )
# # internal connection with a dummy connector
# c = Connector(self, f'{bus_id}_dummy', -v)
# self.connect(bus_id, c)
# # external connector exposing the connection
# self.add_connectors(bus_id, v)
[docs]
def connect(self, bus_id, connectors):
"""Connect all elements of `connectors` to a bus with id `bus_id`."""
connectors = set(connectors) # ensure connectors is a set
non_members = set(c for c in connectors
if c.component not in {self}.union(self.components))
if non_members:
# TODO: Possibly we can just add the component here:
# self.add(component)
msg = '\n\t- '.join(repr(c) for c in non_members)
raise RuntimeError('Attempted to connect the following connectors '
f'which are not part of the system {self} or '
f'any of its components:\n\t- {msg}')
if bus_id not in self.connections:
self.connections[bus_id] = set()
self.connections[bus_id].update(connectors)
self._update_connection_constraint(bus_id)
[docs]
def detach(self, bus_id, connectors=None):
"""Detach all `Connector`s in `connectors` from the specified bus."""
bus = self.connections[bus_id]
if connectors is None: # detach the entire bus
del self.connections[bus_id]
del self._constraints_dict[bus_id]
else:
invalid_connectors = set(c for c in connectors if c not in bus)
if invalid_connectors:
raise KeyError(f'The connectors {invalid_connectors} are not '
f'part of bus {bus}!')
for connector in connectors:
# TODO: What if the bus only has 1 connector left?
bus.remove(connector)
if len(bus) == 0:
del self.connections[bus_id]
del self._constraints_dict[bus_id]
else: # if there are some connectors left
self._update_connection_constraint(bus_id)
[docs]
def add(self, component):
"""Add a `Component` to the system."""
if not isinstance(component, Component):
raise RuntimeError('Tried to add something that is not a Component'
f" to the system {self}")
if isinstance(component, System) and self in component.components:
raise RuntimeError(f'Tried to add {component} to {self} which is '
'already a component of the former!')
self._components.add(component)
return component
[docs]
def remove(self, component):
"""Remove a `Component` from the system."""
# Remove the eliminated component's connectors from all buses
cc = set(component.connectors.values())
for bus_id, bus in self.connections.items():
if bus.intersection(cc):
bus -= cc
self._update_connection_constraint(bus_id)
[docs]
def get_open_connectors(self):
"""Return the set of all connectors that are not connected yet."""
connectors = set()
for com in self.components:
for con in com.connectors.values():
connectors.add(con)
# if not con.bus:
# open_connectors.append(con)
connected_connectors = set()
for bus in self.connections.values():
for con in bus:
connected_connectors.add(con)
return connectors - connected_connectors
[docs]
def aggregate_component_expressions(self, id, aggregator=sum):
"""Aggregate expressions from the components matching the identifier.
The passed identifier is used to look up matching expressions in the
components of this system. The resulting expressions are then
aggregated using the passed aggregator function and the resulting
expression is returned.
"""
return aggregator(c.get_expression(id, comando.Zero)
for c in self.components)
[docs]
def create_problem(self, design_objective=0, operational_objective=0,
timesteps=None, scenarios=None, data=None, name=None):
"""Create a problem with the specified time and scenario structure.
Arguments
---------
T : The end time of the operational period (default 8760)
If T is not specified, timesteps needs to be a mapping (see below).
timesteps : tuple, Mapping or Series.
If timesteps is tuple it is assumed to consist of timestep labels
and data for the time horizon T. This data may be a scalar numeric
value, a Mapping or a Series. In the latter two cases T maps from
different scenarios s to corresponding time horizons T[s].
If timesteps is a Mapping, it can either be mapping from timestep
labels to timestep lengths or from scenarios to a corresponding
specification of the time structure, consisting of either the tuple
representation or the timstep mapping.
scenarios : None or an iterable of timestep labels.
If scenarios is a Mapping or pandas Series, the values are
interpreted as the probabilities / relative likelyhoods of the
individual scenarios.
"""
return Problem(design_objective, operational_objective,
self.constraints_dict, self.states_dict, timesteps,
scenarios, data, name)
[docs]
class DataProxy(comando.SlotSerializationMixin):
"""A proxy object to access and set data."""
from typing import Callable, Optional
__slots__ = ('getter', 'setter')
def __init__(self, getter: 'Callable(Optional[object])',
setter: 'Callable(str, [int, float])') -> None:
self.getter = getter
self.setter = setter
def __getitem__(self, key) -> object:
"""Get items from the object returned from the getter."""
return self.getter(key)
def __setitem__(self, key, value) -> None:
"""Get items from the object returned from the getter."""
self.setter(key, value)
def __delitem__(self, key, value):
"""Raise AttributeError since deleting items is not possible."""
raise AttributeError("can't delete item")
def __getattr__(self, attr):
"""Defer all attribute access to the object returned by the getter."""
return getattr(self.getter(), attr)
def __eq__(self, other):
"""Explicitly defer eq comparison."""
if isinstance(other, DataProxy):
other = other.getter()
return self.getter().__eq__(other)
def __ne__(self, other):
"""Explicitly defer ne comparison."""
return ~(self.__eq__(other))
[docs]
class Problem:
"""A simple optimization problem."""
def __init__(self, design_objective=0, operational_objective=0,
constraints=None, states=None, timesteps=None,
scenarios=None, data=None,
name='Unnamed Problem'):
self.name = name
self.design_objective = design_objective
self.operational_objective = operational_objective
# TODO: In contrast to Components we store variables, constraints and
# parameters as dicts and not as sets here.
# As this can be confusing to users we should probably change it!
self.constraints = {} if constraints is None \
else self._handle_duplicates(constraints)
self.states = {} if states is None else states
self.parameters = set()
self.design_variables = set()
self.operational_variables = set()
self._collect_symbols()
self.Delta_t = comando.Parameter(f'Delta_t_{self.name}')
self._set_scenarios(scenarios)
self._set_timesteps(timesteps)
self._update_operational_index()
self._initial_states = DataProxy(self._get_initial_states,
self._set_initial_states)
self._data = DataProxy(self._get_data, self._set_data)
# Update parameter values with user given ones
if data is not None:
self.data = data
def _collect_symbols(self):
# NOTE: We could require the user to specify symbols as well, but
# not all of them will be always be relevant. We can just get all
# relevant symbols from the expressions for the objective
# contributions, constraints and states...
# Note that when manipulating the problem later on, care must be
# taken to add or remove symbols, e.g., deleting the only
# constraint with variable 'v', v will not be automatically
# removed as a variable.
expressions = (self._do, self._oo, *self.constraints.values(),
*(e for s, (_, ds, f) in self.states.items()
for e in (s, ds, f)))
self.add_symbols(set().union(*(e.free_symbols for e in expressions)))
[docs]
def add_symbols(self, syms):
"""Sort symbols into parameters, design- and operational variables."""
from comando.utility import split, indexed
bvs, bps = split(syms, lambda s: s.is_Parameter)
params = {p if p.parent is None else p.parent for p in bps}
dvs, ovs = split(bvs, indexed)
fake_dvs, real_dvs = split(dvs, lambda dv: dv.parent is None)
ovs.update({fdv.parent for fdv in fake_dvs})
self.parameters.update(params)
self.design_variables.update(real_dvs)
self.operational_variables.update(ovs)
def _set_scenarios(self, scenarios):
# TODO: maybe we should rename scenarios to probabilities and
# introduce a tuple of scenarios as another attribute!
if scenarios is None:
self._scenario_weights = None
elif isinstance(scenarios, (Mapping, Series)):
self._scenario_weights = Series(scenarios,
Index(scenarios.keys(), name='s'),
name='pi')
elif isinstance(scenarios, Iterable):
self._scenario_weights = Series(1/len(scenarios),
Index(scenarios, name='s'),
name='pi')
else:
raise TypeError('scenarios should be an iterable of '
'scenario-labels, a mapping from '
'scenario-labels to the associated '
'probabilities or `None`!')
def _norm_timesteps(self, timesteps):
"""Normalize the time structure representation."""
if isinstance(timesteps, (Mapping, Series)):
index = Index(timesteps.keys(), name='t')
return Series(timesteps, index, name='timesteps')
elif isinstance(timesteps[0], Iterable): # same length for all timesteps
labels, T = timesteps
index = Index(labels, name='t')
return Series(T/len(labels), index, name='timesteps')
else:
raise ValueError('timesteps should be an iterable of '
'timestep-labels, a mapping from timestep-labels '
'to the associated timestep-length, or `None`!')
def _set_timesteps(self, timesteps):
scenarios = self.scenarios
if scenarios is None:
if timesteps is None:
# TODO: We may relaxe this in the future, to allow for pure
# design problems!
raise ValueError('Either timesteps or scenarios must be '
'specified!')
# e.g. timesteps = {'t1': 1, 't2': 4, ...} or (['t1', ...], 24)
self._timesteps = self._norm_timesteps(timesteps)
self._uniform_timesteps = True
else:
if timesteps is None:
self._timesteps = timesteps
self._uniform_timesteps = True
elif isinstance(timesteps, (Mapping, Series)):
try: # to see whether timesteps is already a Multiindex Series
if len(timesteps.index.levels) == 2:
# looks like the user knows what they're doing!
timesteps.index.rename(['s', 't'], inplace=True)
timesteps.rename('timesteps', inplace=True)
self._timesteps = timesteps
# now we need to see whether timesteps are uniform
first = timesteps[next(iter(scenarios))]
for s in scenarios:
try:
if all(timesteps[s] == first):
continue
break
except ValueError:
break
else:
self._uniform_timesteps = True
return
self._uniform_timesteps = False
return
else:
raise ValueError('Incorrect format of timesteps!')
except AttributeError:
pass
first_key = next(iter(timesteps.keys()))
first_value = timesteps[first_key]
if isinstance(first_value, (Mapping, Series, tuple)):
# e.g. timesteps = {'a': {'t1': 1, ...},
# 'b': (['t1', ...], 5)}
data = {(s, t): dti for s in scenarios for t, dti in
self._norm_timesteps(timesteps[s]).items()}
index = MultiIndex.from_tuples(data, names=['s', 't'])
self._timesteps = Series(data, index, name='timesteps')
self._uniform_timesteps = False
else: # e.g. timesteps = {'t1': 1, 't2': 4, ...}
timesteps = self._norm_timesteps(timesteps)
index = MultiIndex.from_product([scenarios,
timesteps.keys()],
names=['s', 't'])
self._timesteps = \
Series((dti for s in scenarios
for ti, dti in timesteps.items()),
index, name='timesteps')
self._uniform_timesteps = True
else: # timesteps = labels, T
labels, T = timesteps
index = MultiIndex.from_product([scenarios, labels],
names=['s', 't'])
if isinstance(T, (Mapping, Series)): # scenario specific T
self._timesteps = \
Series((dti for s in scenarios for dti in
self._norm_timesteps([labels, T[s]])),
index, name='timesteps')
self._uniform_timesteps = False
# same T for all scenarios
timesteps = self._norm_timesteps(timesteps)
self._timesteps = Series((dts for s in scenarios
for dts in timesteps),
index, name='timesteps')
self._uniform_timesteps = True
def _update_operational_index(self):
"""Update the index of operational symbols."""
for p in self.parameters:
if p.indexed:
# TODO: Reusing old data may not be what users expect or want,
# Maybe it is better to force them to reset values.
# On the other hand, scalar default values may have been
# specified during Component modelling, so the best
# option is likely to store these defaults during problem
# initialization
try: # To re-use existing data, e.g., when extending timesteps
p.value = Series(p.value, self.index, float)
except ValueError: # Default to nan
p.value = Series(float('nan'), self.index, float)
for ov in self.operational_variables:
# TODO: This somehow causes VariableVector elements to be modified,
# even if the index stays the same!
# Oddly enough explicitly creating a variable in this loop
# stops this from happening, i.e. if ov contains variables
# named f'{ov.name}[1]' and f'{ov.name}[2]', adding the
# following line:
# comando.Variable(f'{ov.name}[1]')
# will result in f'{ov.name}[1]' staying the same and
# f'{ov.name}[2]' being replaced by another object!
ov.instantiate(self.index)
self.Delta_t.value = self._timesteps
def _handle_duplicates(self, constraints):
rev_dict = {}
for con_id, con in constraints.items():
if con in rev_dict:
print(f'INFO: Identifiers {rev_dict[con]} and {con_id} map to '
'the same constraint! Keeping the former...')
else:
rev_dict[con] = con_id
return {con_id: constraints[con_id] for con_id in rev_dict.values()}
def __iter__(self):
"""Prevent iteration that is implicitly introduced with __getitem__."""
raise TypeError('Component object is not iterable')
def __getitem__(self, identifier):
"""Get a Symbol or Expression matching the identifier."""
for s in [self.parameters, self.design_variables,
self.operational_variables]:
# TODO: This seems unnecessarily costly!
d = {e.name: e for e in s}
if identifier in d:
return d[identifier]
raise KeyError(f"No entry for '{identifier}' found in {self}!")
def __setitem__(self, identifier, value):
"""Set the value of a Parameter."""
# Simple solution (requiring scalar or exactly matching dimension):
# self.parameter_values[identifier] = value
# return
# if self.index is None:
# # TODO this souldn't happen
# raise NotImplementedError('Setting values without having timesteps'
# 'specified has not been implemented yet')
# TODO: unnecessarily costly if we only want to set Parameter values
# item = self.parameters[identifier]
item = self[identifier]
if isinstance(value, Iterable):
if item in self.design_variables:
raise RuntimeError('Attempted to set value of design variable '
f'{item} with vector valued data!')
# TODO If the value's index changes here this will raise an error!
index = self.index
data = Series(item.value, index, float, identifier)
len_i = len(self.index)
scenarios = self.scenarios
len_s = 0 if scenarios is None else len(scenarios)
if isinstance(value, Mapping):
for k in index:
try: # Get just the necessary data from the Mapping
data[k] = value[k]
except KeyError:
pass # Stick to the previous data
elif len(value) == len_i: # complete, ordered specification
for k, v in zip(data.index, value):
data[k] = v
# NOTE: From here on it is assumed that the index consist of
# scenario-time pairs and we interpret `value` as
# either time or scenario dependent
elif self._uniform_timesteps:
if scenarios is None:
len_t = len(self._timesteps)
else:
len_t = len(self._timesteps[next(iter(scenarios))])
if len(value) == len_t: # time dependent
# NOTE: If the numbers of timesteps and scenarios are
# identical we assume time-dependence!
for s in scenarios:
data[s] = value
elif len(value) == len_s: # scenario dependent
for i, s in enumerate(scenarios):
for t in self.timesteps[s].keys():
data[s, t] = value[i]
else:
if len_t == len_s:
msg = (f'{len_t} for time-dependent data, or {len_i} '
'for data depending on both scenario and time')
else:
msg = (f'{len_t} for time-dependent data, {len_s} for '
f'scenario-dependent data or {len_i} for '
'data depending on both scenario and time')
raise ValueError('Value must be a scalar, a Mapping or an '
'Iterable with appropriate length '
f'({msg})!')
item.value = data
else: # Value is a scalar, store it as such!
item.value = value
return
def __repr__(self):
return f'Problem(name={self.name!r})'
@property
def index(self):
"""Get the index of the Problem."""
return (self._scenario_weights.index if self._timesteps is None
else self._timesteps.index)
@property
def T(self):
"""Get the Problem's end time."""
if self._scenario_weights is None:
return sum(self._timesteps)
return self._timesteps.sum(level=0).rename('T')
@T.setter
def T(self, T):
"""Set the Problem's end time.
T : number, Mapping or Series
The length of the time horizon.
"""
if self._scenario_weights is None:
try:
T = float(T)
except TypeError:
raise ValueError('T must be a number!')
old_T = self.T
if old_T == 0: # uniform timesteps
self._timesteps = \
self._norm_timesteps((self._timesteps.index, T))
else: # Rescale timesteps
self._timesteps *= T/old_T
else: # self._scenario_weights is not None
if isinstance(T, (Mapping, Series)):
for s, old_Ti in self.T.items():
Ti = T[s]
try:
Ti = float(Ti)
except TypeError:
raise ValueError('T must contain numbers!')
if old_Ti == 0: # uniform timesteps
self._timesteps[s][:] = \
self._norm_timesteps((self._timesteps.index, Ti))
else: # Rescale timesteps
self._timesteps[s][:] *= Ti/old_Ti
else:
try:
T = float(T)
except TypeError:
raise ValueError('T must be a number, Mapping or Series!')
for s, old_Ti in self.T.items():
if old_Ti == 0: # uniform timesteps
self._timesteps[s][:] = \
self._norm_timesteps((self._timesteps.index, T))
else: # Rescale timesteps
self._timesteps[s][:] *= T/old_Ti
@property
def timesteps(self):
"""Get the length of the Problem's timesteps."""
return None if self._timesteps is None else self._timesteps.copy()
@timesteps.setter
def timesteps(self, timesteps):
"""Update the Problem's timesteps.
Arguments
---------
timesteps : None or an iterable of timestep labels.
If timesteps is a Mapping or pandas Series, the values are
interpreted as the lengths of the individual timesteps and T is
taken to be their sum.
T : The end time of the operational period (default 8760)
If T is not specified, timesteps needs to be a mapping.
"""
self._set_timesteps(timesteps)
self._update_operational_index()
@property
def scenarios(self):
"""Get the Problem's scenarios."""
return None if self._scenario_weights is None \
else tuple(self._scenario_weights.keys())
@scenarios.setter
def scenarios(self, scenarios, timesteps=None):
"""Update the Problem's scenarios.
scenarios : None or an iterable of timestep labels.
If scenarios is a Mapping or pandas Series, the values are
interpreted as the probabilities / relative likelyhoods of the
individual scenarios.
"""
if timesteps is None:
if self._uniform_timesteps:
if self._scenario_weights is None:
timesteps = self.timesteps
else:
s, timesteps = next(iter(self.timesteps.groupby('s')))
timesteps = timesteps[s]
else:
raise ValueError('Tried to update scenarios for a problem '
'without uniform timesteps.\nYou need to '
'either make timesteps uniform first or '
'specify scenario-specific timesteps.')
self._set_scenarios(scenarios)
self._set_timesteps(timesteps)
self._update_operational_index()
@property
def scenario_weights(self):
"""Get the Problem's scenario weights."""
return None if self._scenario_weights is None \
else self._scenario_weights.copy()
@scenario_weights.setter
def scenario_weights(self, weights):
"""Update the Problem's scenario weights."""
if {*self._scenario_weights.keys()} == {weights.keys()}:
self._scenario_weights.update(weights)
@property
def design_objective(self):
return self._do
@design_objective.setter
def design_objective(self, do):
self._do = comando.sympify(do)
@property
def operational_objective(self):
return self._oo
@operational_objective.setter
def operational_objective(self, oo):
self._oo = comando.sympify(oo)
[docs]
def weighted_sum(self, op_expr, symbolic=True):
"""Compute the scenario- and/or time-weighted sum of an expression.
Arguments
---------
op_expr : Expression
an operational expression that is to be weighted
symbolic : bool
if True creates a new expression (default) if False evaluate
numerically
"""
if symbolic:
# from comando.utility import _idx_parse
# sym_map = {sym: sym for sym in op_expr.free_symbols}
# op_map = comando.op_map
def func(idx):
# return _idx_parse(op_expr, sym_map, op_map, idx, float)
sym_map = {sym: sym[idx]
for sym in op_expr.free_symbols if sym.indexed}
return op_expr.subs(sym_map)
else:
from comando.utility import evaluate
vals = evaluate(op_expr)
def func(idx):
return vals[idx]
if self._timesteps is None:
return sum(p * func(s) for s, p in self._scenario_weights.items())
if self._scenario_weights is None:
return sum(dti * func(t) for t, dti in self._timesteps.items())
return sum(p * dti * func((s, t))
for s, p in self._scenario_weights.items()
for t, dti in self._timesteps[s].items())
@property
def objective(self):
"""Get the objective expression of the problem."""
return self._do + self.weighted_sum(self._oo)
@property
def num_vars(self):
"""Get the total number of variables."""
return len(self.design_variables) \
+ len(self.index) * len(self.operational_variables)
@property
def num_cons(self):
"""Get the total number of variables."""
dcons, ocons = split(self.constraints, indexed)
return len(dcons) + len(self.index) * (len(ocons) + len(self.states))
@property
def initial_states(self):
"""Aggregate and return the parameter data."""
return self._initial_states
def _get_initial_states(self, key=None):
"""Get an overview over initial state types and values."""
if key is None:
if self.scenarios:
initial_states = DataFrame(index=self.scenarios, dtype=object)
else:
initial_states = Series(dtype=object)
def rep(iv):
if iv.is_Parameter:
return iv.value
if iv.is_Variable:
return f'{iv.lb} ≤ {iv.value} ≤ {iv.ub}'
raise TypeError("Initial Values should be of type Parameter or Variable!")
for state, (iv, *_) in self.states.items():
if iv.indexed:
initial_states[state.name] = iv.expansion.apply(rep)
else:
if self.scenarios:
initial_states[state.name] = [rep(iv)] * len(self.scenarios)
else:
initial_states[state.name] = rep(iv)
return initial_states.fillna(u'\U0001F504')
else:
for state, (iv, *_) in self.states.items():
if state.name == key:
return iv
else:
raise KeyError(f"No initial value with the name '{key}'!")
def _set_initial_states(self, state_name, value):
"""Provide values for a particular initial state of the problem."""
for state in self.states:
if state.name == state_name:
break
else:
raise KeyError(f"No state named {state_name} in problem!")
i_state = self.states[state][0]
# numerical value or cyclic (= nan) initial condition
if isinstance(value, (int, float)):
i_state.value = value
else:
_value = i_state.value
try:
i_state.value = Series(value, self.scenarios, float, i_state.name) if self.scenarios \
else value
except ValueError:
i_state.value = _value
raise ValueError('Value does not match problem index!')
# TODO: Rename data to values and make it a property with getters/setters
# for the values of parameters AND variables?
@property
def data(self):
"""Aggregate and return the parameter data."""
return self._data
def _get_data(self, key=None):
if key is None:
data = DataFrame(index=self.index)
for par in self.parameters:
try:
data[par.name] = par.value.values
except AttributeError:
data[par.name] = par.value
return data
else:
return self[key].value
def _set_data(self, p_name, value):
"""Provide values for a particular parameter of the problem."""
p = {p.name: p for p in self.parameters}[p_name]
_value = p.value
try:
p.value = value
Problem._get_data(self)
except ValueError:
p.value = _value
raise ValueError('Value does not match problem index!')
@data.setter
def data(self, data):
"""Provide values for multiple parameters of the problem."""
for par in self.parameters:
try:
entry = data[par.name]
except KeyError:
if par.value is None:
print(f"WARNING: No data for parameter '{par}'!")
else:
print(f"INFO: Using default data for parameter '{par}'!")
continue
try: # To determine whether entry is a mappig...
# NOTE: We're not using values her since it is an attribute for
# normal Mappings but a property for pandas Series and
# DataFrames!
values = [v for k, v in entry.items()]
# ...with identical values (then use scalar) or different ones
if len({*values}) == 1:
par.value = values[0]
else:
index = self.index
scenarios = self.scenario_weights.index
try:
timesteps = self._timesteps[next(iter(scenarios))]
except TypeError:
timesteps = None
if len(values) == len(index):
par.value = Series(values, index, float, par.name)
elif len(values) == len(scenarios):
par.value = Series(values, scenarios, float, par.name)
elif self._uniform_timesteps \
and len(values) == len(timesteps):
par.value = Series(values, timesteps, float, par.name)
else:
raise ValueError(f'Data for Parameter {par.name} does '
'not match length of Problem index, '
'timesteps or scenarios!')
except AttributeError: # Nope, it's a scalar
par.value = entry
@property
def design(self):
"""Collect the design variable values in a DataFrame."""
dv = self.design_variables
index = Index((v.name for v in dv), name='name')
dv_data = Series(index=index, dtype=float, name='value')
for v in dv:
dv_data[v.name] = v.value
return DataFrame(dv_data).sort_index()
@design.setter
def design(self, data):
"""Set the values of the design variables with the given data."""
for var in self.design_variables:
try:
var.value = data[var.name]
except KeyError:
print(f"WARNING: No data for variable '{var}'!")
@property
def operation(self):
"""Collect the operational variable values in a DataFrame."""
ov_data = DataFrame()
for v in self.operational_variables:
ov_data[v.name] = v.value
res = ov_data.T.sort_index()
res.index.rename('name', inplace=True)
return res
@operation.setter
def operation(self, data):
"""Set the values of the operational variables with the given data."""
for var in self.operational_variables:
try:
var.value = data.loc[var.name]
except KeyError:
print(f"WARNING: No data for variable '{var}'!")
[docs]
def store_variable_values(self, filename):
"""Serialize current variable values and store in file."""
import os
import pickle
if os.path.isfile(filename):
while True:
overwrite = input('File already exists. Overwrite? [y/n]\n')
if overwrite.lower() in ('y', 'yes'):
break
if overwrite.lower() in ('n', 'no'):
return
with open(filename, 'wb') as file:
pickle.dump((self.design,
self.operation),
file, protocol=pickle.HIGHEST_PROTOCOL)
[docs]
def load_variable_values(self, filename):
"""Collect variable values from file."""
import pickle
with open(filename, 'rb') as file:
dv, ov = pickle.load(file)
self.design = dv
self.operation = ov
[docs]
def get_constraint_violations(self, larger_than=0):
"""Collect the current constraint violations in a DataFrame."""
from comando.utility import evaluate
res = Series(dtype=float, name='violation')
for con_id, con in self.constraints.items():
try: # works for inequalities
viol = evaluate(con.lts - con.gts)
except AttributeError:
viol = abs(evaluate(con.lhs - con.rhs))
try: # works for indexed constraints
for i in viol.index:
res[f'{con_id}[{i}]'] = viol[i]
except AttributeError:
res[con_id] = viol
return res.sort_index()[res >= larger_than]
[docs]
def subs(self, sym=None, rep=None, **reps):
"""Substitute an individual or multiple symbols in the problem."""
if reps: # keys are symbol identifiers
reps = {self[sym]: rep for sym, rep in reps.items()}
elif sym is not None and rep is not None: # single substitution
if not isinstance(sym, comando.Symbol): # sym may be an identifier
sym = self[sym]
reps = {sym: rep}
elif isinstance(sym, Mapping): # keys may be identifiers or symbols
reps = {self[sym] if not isinstance(sym, comando.Symbol)
else sym: rep for sym, rep in reps.items()}
else:
raise RuntimeError('Either provide a Mapping, a single symbol and '
'replacement via separate arguments or a list '
'of replacements via keyword arguments!')
do = self._do.subs(reps)
oo = self._oo.subs(reps)
cons = self._handle_duplicates({c_id: con.subs(reps) for c_id, con in
self.constraints.items()})
states = {key.subs(reps): (init_state.subs(reps), der.subs(reps),
rate_of_change.subs(reps))
for key, (init_state, der, rate_of_change) in
self.states.items()}
# If we got here all went well, so we can update the symbol sets
self._do = do
self._oo = oo
self.constraints = cons
self.states = states
self._collect_symbols()