"""Utility functions used in various parts of COMANDO."""
# This file is part of the COMANDO project which is released under the MIT
# license. See file LICENSE for full license details.
#
# AUTHORS: Marco Langiu
import contextlib
import math
import operator
import os
from collections import defaultdict
from functools import partial, reduce
import numpy
from numpy import maximum, minimum
import comando
import comando.core
[docs]
def canonical_file_name(base_name, suffix, file_name=None) -> tuple((str, str)):
"""Get a canonical file name and the corresponding base name.
Arguments
---------
base_name : str
the fallback if filename is None
suffix : str
desired suffix
file_name : str or None
a desired name with or without the desired suffix or None
Returns
-------
base_name : str
the canonical base_name
file_name : str
the canonical file_name
"""
if file_name is None:
file_name = f"{base_name}{suffix}"
elif file_name.endswith(suffix):
base_name = file_name[:-4]
else:
base_name = file_name
file_name = base_name + suffix
return base_name, file_name
[docs]
def check_reuse_or_overwrite(file_name, reuse) -> None:
"""Check whether a file exists for reuse or overwriting.
Arguments
---------
file_name : str
the name of the file to reuse or overwrite
reuse : bool or None
whether to reuse the given file. There are three cases:
1. reuse is False -> do nothing
2. reuse is True -> check if file_name is a valid file
3. reuse is None -> if file_name is a file, ask before overwriting
"""
if reuse is False:
return
if reuse is True:
if not os.path.isfile(file_name):
raise FileNotFoundError(file_name)
else: # reuse is None, explicitly ask
while os.path.isfile(file_name):
yn = input(f"A file '{file_name}' already exists, overwrite (y/n)?")
if yn.lower() == "y":
break
if yn.lower() == "n":
raise FileExistsError(f"File {file_name} exists! Aborting...")
[docs]
def syscall(executable, *args, log_name=os.devnull, silent=False):
"""Issue a system call.
Arguments
---------
executable : str
the name of the executable to be called. Needs to be an executable in
one of the directories listed in the PATH environment variable.
args : list of str
the arguments passed to the executable
log_name : str
the location of a file to which the output of the system call is
mirrored in parralel to execution (default: os.devnull)
silent : bool
whether the output should be hidden from stdout (default: False)
Returns
-------
ret : int
the return code of the system call
"""
import sys
from subprocess import PIPE, STDOUT, Popen
if silent:
with open(log_name, "w", encoding="utf-8") as f:
process = Popen([executable, *args], stderr=STDOUT, stdout=f)
else:
process = Popen([executable, *args], stderr=STDOUT, stdout=PIPE)
with open(log_name, "w", encoding="utf-8") as f:
while True:
output = process.stdout.readline()
if process.poll() is not None and not output:
break
line = output.decode("UTF-8")
sys.stdout.write(line)
f.write(line)
process.wait() # wait until executable has finished
ret = process.returncode
return ret
def _implicit_euler_constraints(states, timesteps, s=slice(None)):
for state, (initial_state, derivative, _) in states.items():
if isinstance(initial_state, comando.core.Parameter):
if initial_state.is_indexed: # One parameter for each scenario
init_val = initial_state[s].value
if init_val != init_val: # float("nan") => cyclic!
prev = state[s].iloc[-1]
else:
prev = initial_state[s]
else: # Single parameter for all scenarios
init_val = initial_state.value
if init_val != init_val: # float("nan") => cyclic!
prev = state[s].iloc[-1]
else:
prev = initial_state
elif isinstance(
initial_state, (comando.core.Variable, comando.core.VariableVector)
):
if initial_state.is_indexed: # One Variable for each scenario
prev = initial_state[s]
else: # Single Variable for all scenarios
prev = initial_state
else:
msg = "Expected Variable or Parameter as initial state!"
raise NotImplementedError(msg)
for t, dt in timesteps.items():
var = state[t]
c_id = f"d_{state.name}_dt_at_{t}"
c = comando.Eq(derivative[t], (var - prev) / dt)
yield (c_id, c)
prev = var
[docs]
def handle_state_equations(P, callback=None):
"""Generate and add discretized state equations from the problem P.
Parameters
----------
P : Problem
The problem to generate the state equations for.
callback : function, optional
A function that is called for each generated constraint.
The first argument is the name of the constraint and the second
argument is the constraint expression.
If no callback is given, the constraints are added to the problem P.
"""
if callback is None:
callback = lambda name, con: P.add_constraint(name, con)
if P.timesteps is None:
print(
'INFO: Problem "',
P.name,
'" has no timesteps. ',
"Skipping discretized state equations for states: ",
[*P.states],
)
return
zero_length_timestaps = P.timesteps[P.timesteps == 0]
if len(zero_length_timestaps) == len(P.timesteps):
print(
'INFO: All timesteps of problem "',
P.name,
'" have zero length. ',
"Skipping discretized state equations for states: ",
[*P.states],
)
if P.scenarios is None: # We have timesteps for a single scenario
timesteps = P.timesteps.drop(zero_length_timestaps.index)
for c_id, c in _implicit_euler_constraints(P.states, timesteps):
callback(c_id, c)
return
for s in P.scenarios:
try:
timesteps = P.timesteps.loc[[s], :].drop(
zero_length_timestaps.index, errors="ignore"
)
except KeyError:
print(
'INFO: All timesteps of problem "',
P.name,
f'" for scenario "{s}" have zero length... skipping!',
)
continue
for c_id, c in _implicit_euler_constraints(P.states, timesteps, s):
callback(c_id, c)
[docs]
@contextlib.contextmanager
def silence():
"""Execute code without output to stdout."""
import sys
with open(os.devnull, "w") as f:
save_stdout = sys.stdout
sys.stdout = f
yield
sys.stdout = save_stdout
[docs]
@contextlib.contextmanager
def cleanup():
"""Take a snapshot of all files on current path and remove new ones."""
import os
dir = os.getcwd()
dir_contents = {*os.listdir(".")}
yield
os.chdir(dir) # ensure we didn't change the directory!
for f in {*os.listdir(".")} - dir_contents:
os.remove(os.path.join(".", f))
[docs]
def get_latest(path_glob):
"""Get the path to the latest file matching the path_glob."""
import glob
import os
matches = glob.glob(path_glob)
if matches:
return __builtins__["max"](matches, key=os.path.getctime)
else:
raise FileNotFoundError(
f"No file in {os.getcwd()} matches the glob '{path_glob}'!"
)
[docs]
def depth(expr):
"""Compute the depth of the given expression."""
if isinstance(expr, (int, float, comando.Number, comando.core.Symbol)):
return 0
else:
return 1 + __builtins__["max"]([depth(arg) for arg in expr.args])
[docs]
def fuzzy_not(v):
"""Return None if `v` is None else `not v`."""
if v is None:
return v
else:
return not v
[docs]
def smooth_abs(x, delta=1e-4):
"""Smooth approximation of abs(x)."""
with comando.evaluate(False):
res = ((x + delta) ** 2) ** 0.5
return res
[docs]
def smooth_max(a, b, delta=1e-4):
"""Smooth approximation of max(a, b)."""
return 0.5 * (a + b + smooth_abs(a - b, delta))
[docs]
def smooth_min(a, b, delta=1e-4):
"""Smooth approximation of min(a, b)."""
return 0.5 * (a + b - smooth_abs(a - b, delta))
# def min(iterable):
# """Generalize min function to array-like structures."""
# return reduce(minimum, iterable)
#
#
# def max(iterable):
# """Generalize max function to array-like structures."""
# return reduce(maximum, iterable)
# TODO: 'algebraic expression' has a different meaning in general (e.g. it
# excludes transcendental expressions or numbers). Also see the arcticle
# https://en.wikipedia.org/wiki/Algebraic_expression.
# A better name might be _is_numeric
def _assert_algebraic(expr):
"""Assert that `expr` is an 'algebraic expression'.
An algebraic expression is considered to be an inctance of type
`comando.Expr` that is not of type `comando.Boolean`.
An exception to this rule are symbols, which are always considered to be
algebraic expressions.
"""
# NOTE: Due to the imports in sympy's __init__.py, the type
# hierarchy does not correspond to the attribute hierarchy!
assert (
isinstance(expr, (int, float))
or expr.is_Symbol
or isinstance(expr, comando.Expr)
and not isinstance(expr, comando.Boolean)
), (
"expression should be 'algebraic sympy expressions'"
" (i.e., instances of type `comando.Expr` that are"
" not of type `comando.Boolean`)!"
)
[docs]
def is_indexed(expr):
"""Test whether the given expression is indexed."""
return any(sym.is_indexed for sym in expr.free_symbols)
[docs]
def is_negated(expr):
"""Check if expr is a negated expression or a negative number."""
return (
(expr.is_Mul and (expr.args[0].is_Number and bool(expr.args[0] < 0)))
or (expr.is_Number and bool(expr < 0))
or expr.is_Add
and all(is_negated(arg) for arg in expr.args)
)
[docs]
def get_vars(expr):
"""Get a set of all variables in `expr`."""
return {
sym
for sym in expr.free_symbols
if isinstance(sym, (comando.core.Variable, comando.core.VariableVector))
}
[docs]
def get_pars(expr):
"""Get a set of all parameters in `expr`."""
return {sym for sym in expr.free_symbols if isinstance(sym, comando.core.Parameter)}
[docs]
def split(iterable, condition, c_type=None):
"""Sort `iterable`'s elements into two containers based on `condition`.
Arguments
---------
iterable : iterable
iterable to be sorted
condition : callable
function used for sorting, return value must convert to bool
c_type : type
type of the resulting containers; if unset will attempt to use
`type(iterable)` to create the containers or default to `list` if
that fails. If after this both `isinstance` and `c_type()` are
mappings, the new containers will be submappings of `iterable`
and `condition` is only applied to the values of `iterable`.
Returns
-------
cond_true, cond_false : tuple
tuple containing:
- the container of elements that do **not** satisfy `condition`
- the container of elements that do satisfy `condition`
"""
if c_type is None:
c_type = type(iterable)
try: # Try to deduce the container type from iterable
res = [c_type(), c_type()]
except TypeError:
c_type = list
res = [c_type(), c_type()]
if c_type is list:
for x in iterable:
res[condition(x)].append(x)
elif c_type is set:
for x in iterable:
res[condition(x)].add(x)
elif c_type is tuple:
for x in iterable:
res[condition(x)] += (x,)
elif c_type is dict:
if isinstance(iterable, dict):
for k, v in iterable.items():
res[condition(v)][k] = v
else:
for x in iterable:
res[condition(x)][x[0]] = x[1]
else:
raise NotImplementedError(f"No insertion known for {c_type}!")
return res[False], res[True]
# NOTE: Precedence is already handled by the tree datastructures in the object
# oriented setting, but it becomes important when writing to strings!
# Thats where we will need to be able to handle the '()' entry in the
# op_map. For the object oriented case, '()' simply maps to identity.
[docs]
def identity(expr):
"""Return expr."""
return expr
def _sum(*args):
"""Return the sum of the elements in args."""
return sum(args)
return numpy.sum(args)
[docs]
def prod(*args):
"""Return the product of the elements in args."""
return reduce(operator.mul, args)
return numpy.prod(args)
# def pow(*args):
# """Return the result of args[0] to power of args[1]."""
# return 1/args[0] if args[1] == -1 else args[0] ** args[1]
# comando_op_map = {
# '()': identity,
# 'Add': _sum,
# 'Neg': lambda arg: -arg,
# 'Mul': prod,
# 'Div': operator.truediv,
# 'Pow': operator.pow,
# 'Inv': partial(operator.truediv, 1),
# 'LessThan': operator.le,
# 'GreaterThan': operator.ge,
# 'Equality': operator.eq}
numpy_op_map = {
"()": identity,
"Add": _sum,
"Neg": lambda arg: -arg,
"Mul": prod,
"Div": operator.truediv,
"Pow": operator.pow,
"Inv": partial(operator.truediv, 1),
"LessThan": operator.le,
"GreaterThan": operator.ge,
"Equality": operator.eq,
"Min": lambda *args: reduce(minimum, args),
"Max": lambda *args: reduce(maximum, args),
"Abs": numpy.abs,
}
# exponential_functions = {'exp', 'log'}
# nonsmooth_function_names = {'sign', 'Abs', 'Min', 'Max', 'ceiling', 'floor'}
# trigonometric_function_names = {'sin', 'cos', 'tan', 'cot', 'sec', 'csc'}
# trigonometric_inverse_function_names = {'asin', 'acos', 'atan', 'acot', 'asec',
# 'acsc'}
# hyperbolic_function_names = {'sinh', 'cosh', 'tanh', 'coth', 'sech', 'csch'}
# hyperbolic_inverse_function_names = {'asinh', 'acosh', 'atanh', 'acoth',
# 'asech', 'acsch'}
# comando_functions = set().union(
# exponential_functions, nonsmooth_function_names,
# trigonometric_function_names, trigonometric_inverse_function_names,
# hyperbolic_function_names, hyperbolic_inverse_function_names
# )
# NOTE: All of these functions depend on the backend and therefore need to be
# looked up dynamically to allow for backend switches!
# def _inject_funcdefs():
# for f_name in comando_functions:
# comando_op_map[f_name] = \
# lambda arg: getattr(comando.get_backend(), f_name)(arg)
# _inject_funcdefs()
for name in "exp", "log", "sin", "cos", "tan", "sinh", "cosh", "tanh":
numpy_op_map[name] = getattr(numpy, name)
for name in {"asin", "acos", "atan", "asinh", "acosh", "atanh"}:
numpy_op_map[name] = getattr(numpy, f"arc{name[1:]}")
[docs]
def make_str_func(name):
"""Create a default string representation of functions.
Arguments
=========
name : str
name of the function to be represented
Returns
=======
func : callable
function that returns a string representation for the function with
given string arguments
"""
return lambda *args: f"{name.lower()}({', '.join(args)})"
[docs]
class DefaultStringMap(defaultdict):
"""A default dict implementation whose factory is called with the key."""
def __missing__(self, key):
"""Call the factory with the missing key."""
if self.default_factory is None:
raise KeyError((key,))
self[key] = value = self.default_factory(key)
return value
_str_map = DefaultStringMap(
make_str_func,
{
"()": lambda expr: f"({expr})",
"Add": lambda *args: " + ".join(args),
"Sub": lambda minu, subtr: f"{minu} - {subtr}",
"Neg": lambda arg: f"-{arg}",
"Mul": lambda *args: " * ".join(args), #
"Div": lambda numer, denom: f"{numer} / {denom}",
"Inv": lambda arg: f"1 / {arg}",
"LessThan": lambda lhs, rhs: f"{lhs} <= {rhs}",
"GreaterThan": lambda lhs, rhs: f"{lhs} >= {rhs}",
"Equality": lambda lhs, rhs: f"{lhs} == {rhs}",
"Pow": lambda base, exponent: f"{base} ** {exponent}",
#'exp': lambda arg: f'exp({arg})',
#'log': lambda arg: f'log({arg})',
#'tanh': lambda arg: f'(1 - 2/(exp(2 * ({arg})) + 1))'
},
)
# for name in 'exp', 'log', 'sqrt', 'tan', 'sinh', 'tanh':
# numpy_op_map[name] = getattr(numpy, name)
# for name in 'asin', 'acos', 'atan', 'asinh', 'acosh', 'atanh':
# numpy_op_map[name] = getattr(numpy, f'arc{name[1:]}')
[docs]
def get_index(expr):
"""Get the index of the current expression or None if it is not indexed."""
dummy = sum((sym.value[:] * 0 for sym in expr.free_symbols if sym.is_indexed), 0)
return None if isinstance(dummy, int) else dummy.index
[docs]
def parse(expr, sym_map=None, op_map=None, idx=None, const_fun=float):
"""Parse the sympy expression `expr`, replacing symbols and operators.
The `sym_map` provides the possibility to replace symbols (Parameters and
Variables) with new objects, if the symbol map is empty or there is no
mapping for a given symbol, it is replaced with its `value` attrubute; if
`value` is `None`, the symbol is not replaced.
The `op_map` provides the possibility to replace sympy's operator objects
with other operators. If no op map is provided, the default python
operators are used.
"""
if sym_map is None:
if op_map is None and idx is None and const_fun is float:
return expr # Quick exit, nothing changes
sym_map = {}
for sym in expr.free_symbols: # Complete the sym map with missing symbols
if sym not in sym_map:
sym_map[sym] = sym
# Based on default or user-provided op_map:
if op_map is None:
op_map = comando.op_map
else:
op_map = {**comando.base_op_map, **op_map} # override default with user ops
if idx is None:
return _parse(expr, sym_map, op_map, const_fun)
return _idx_parse(expr, sym_map, op_map, idx, const_fun)
def _parse(expr, sym_map, op_map, const_fun):
"""Parse expr, all symbols/operators must be in sym_map/op_map."""
if expr.args:
try:
f_name = get_type_name(expr)
op = op_map[f_name]
except KeyError:
raise NotImplementedError(
f"The function '{f_name}' is not defined"
" in the operational map of the "
"interface you are using. Either provide"
" your own definition or reformulate."
)
args = tuple(_parse(arg, sym_map, op_map, const_fun) for arg in expr.args)
return op(*args)
if expr.is_Number:
return const_fun(expr)
return sym_map[expr]
def _idx_parse(expr, sym_map, op_map, idx, const_fun):
"""Parse expr at idx, all symbols/operators must be in sym_map/op_map."""
if expr.args:
try:
f_name = get_type_name(expr)
op = op_map[f_name]
except KeyError:
raise NotImplementedError(
f"The function '{f_name}' is not defined"
" in the operational map of the "
"interface you are using. Either provide"
" your own definition or reformulate."
)
args = tuple(
_idx_parse(arg, sym_map, op_map, idx, const_fun) for arg in expr.args
)
return op(*args)
if expr.is_Number:
return const_fun(expr)
rep = sym_map[expr]
if is_indexed(expr):
return rep[idx]
else:
return rep
[docs]
def evaluate(expr, idx=None):
"""Evaluate the given expression at the symbols' current values.
Note
----
To allow for indexed espressions to be evaluated, evaluate uses numpy and
thus double arithmetic instead of the arbitrary precision arithmetic
usually used by sympy or symengine backends.
This can result in different results in comparison to those obtained by
other tools such as solvers or AMLs interfaced to COMANDO.
Arguments
---------
expr : comando.Expr
The expression to be evaluated
idx : double, str, list, tuple
The index at which to evaluate
returns
result : float or pandas.Series
The numerical result of the evaluation
"""
index = get_index(expr)
if index is None: # scalar expression
return float(
_parse(
expr, {sym: sym.value for sym in expr.free_symbols}, numpy_op_map, float
)
)
# indexed expression
if idx is None: # No particular index desired
import pandas
res = _parse(
expr,
{
sym: sym.value.values if sym.is_indexed else sym.value
for sym in expr.free_symbols
},
numpy_op_map,
float,
)
return pandas.Series(res, index, float)
# only values for index=idx desired
return float(
expr.subs(
{
sym: sym[idx].value if sym.is_indexed else sym.value
for sym in expr.free_symbols
}
)
)
# TODO: Since we assume unique index elements, this should only come into
# play when we introduce slicing, e.g. idx is slice(None)
# if len(res) > 1:
# index = index[index == idx]
# ...
# res = _idx_parse(expr, {sym: sym[idx].value if sym.is_indexed else sym.value
# for sym in expr.free_symbols}, numpy_op_map, idx,
# float)
# return Series(res, index, float)
[docs]
def precedence(expr):
"""Get the operator precedence of the expression.
If the argument of an expression has a higher precedence than the
expression itself, precedence needs to be represented explicitly in string
representations. An example is (a + b) * c, where dropping the parentheses
changes the result. We define precedences as:
±x, f(x): 0
x ** y: 1
x * y, x/y: 2
x ± y: 3
"""
return {comando.Add: 3, comando.Mul: 2, comando.Pow: 1}.get(type(expr), 0)
[docs]
def as_numer_denom(expr):
"""Normalize as_numer_denom for sympy and symengine.
Also see:
https://github.com/symengine/symengine/issues/681#issuecomment-724039779
"""
numer, denom = expr.as_numer_denom()
return numer.subs({1.0: 1}), denom.subs({1.0: 1})
[docs]
def get_type_name(expr):
"""Get the type name of an expression."""
f_name = type(expr).__name__
if f_name == "FunctionSymbol": # User-defined functions in symengine
f_name = expr.get_name()
return f_name
# TODO: We can save a lot of time for indexed expressions if we parse the non
# changing part (which for large expressions will be the majority of the
# expression) only once. This would result in a string template instead
# of a string. The template arguments can then be replaced with the
# string corresponding to the index. This principle could also be nested!
[docs]
class StrParser:
"""A class for creating interface-specific string representations.
Callbacks must have a signature consisting of three arguments: The parser
the expression to be parsed and an optional index
Arguments:
=========
sym_map
str_map
add_callback
mul_callback
pow_callback
"""
def __init__(
self,
sym_map,
str_map=None,
override_default_str_map=True,
add_callback=None,
mul_callback=None,
pow_callback=None,
):
self.sym_map = sym_map
# Based on default or user-provided str_map:
if str_map is None:
self.str_map = _str_map.copy()
elif override_default_str_map:
self.str_map = _str_map.copy()
self.str_map.update(str_map)
else:
self.str_map = str_map
if add_callback is None:
self.parse_add = self._parse_add
else:
def parse_add(expr, idx):
res = add_callback(self, expr, idx)
if res is None:
return self._parse_add(expr, idx)
return res
self.parse_add = parse_add
if mul_callback is None:
self.parse_mul = self._parse_mul
else:
def parse_mul(expr, idx):
res = mul_callback(self, expr, idx)
if res is None:
return self._parse_mul(expr, idx)
return res
self.parse_mul = parse_mul
if pow_callback is None:
self.parse_pow = self._parse_pow
else:
def parse_pow(expr, idx):
res = pow_callback(self, expr, idx)
if res is None:
return self._parse_pow(expr, idx)
return res
self.parse_pow = parse_pow
self.cache = {}
def _parse_add(self, expr, idx):
p_args, n_args = split(expr.args, is_negated)
if p_args:
if n_args: # (a -b + c - d) -> a + c - (b + d)
m_expr = expr.func(*p_args) # minuend
s_expr = expr.func(*(-a for a in n_args)) # subtrahend
# While the minuend can be handled normally, the subtrahend
# must be parenthesized
args = (
*self.parse_args((m_expr,), idx, 3),
*self.parse_args((s_expr,), idx, 2.5),
)
return self.str_map["Sub"](*args)
return self.str_map["Add"](*self.parse_args(expr.args, idx, 3))
# This handles -a -b -> -(a + b)
# NOTE: We cannot just use -expr as this can result in an infinite
# recursion with the 'Neg' cases in expr.is_Mul!
neg_expr = expr.func(*(-arg for arg in n_args))
return self.str_map["Neg"](*self.parse_args((neg_expr,), idx, 2))
def _parse_mul(self, expr, idx):
a0 = expr.args[0]
if a0.is_Number and a0 < 0:
return self.str_map["Neg"](*self.parse_args((-expr,), idx, 2))
numer, denom = as_numer_denom(expr)
if denom == 1 or denom == 1.0:
return self.str_map["Mul"](*self.parse_args(expr.args, idx, 2))
# In contrast to sympy, we don't want to distinguish between cases
# like -(a + b) / c, -( (a + b) / c) and (a + b)/-c
if is_negated(numer):
return self.str_map["Neg"](*self.parse_args((-numer / denom,), idx, 2))
if is_negated(denom):
return self.str_map["Neg"](*self.parse_args((numer / -denom,), idx, 2))
# While the numerator can be handled normally, the denominator
# must be parenthesized if it's equivalent to a multiplication
# to correctly represent forms like a / (b * c)
args = (
*self.parse_args((numer,), idx, 2),
*self.parse_args((denom,), idx, 1.5),
)
return self.str_map["Div"](*args)
def _parse_pow(self, expr, idx):
base, exponent = expr.args
if is_negated(exponent):
return self.str_map["Inv"](*self.parse_args((base**-exponent,), idx, 1.5))
return self.str_map["Pow"](*self.parse_args(expr.args, idx, 1))
def __call__(self, expr, idx=None):
"""Execute the StrParser."""
if expr.is_Add:
return self.parse_add(expr, idx)
if expr.is_Mul:
return self.parse_mul(expr, idx)
if expr.is_Pow:
return self.parse_pow(expr, idx)
# prec == 0
if expr.args: # A univariate function
return self.str_map[get_type_name(expr)](*self.parse_args(expr.args, idx))
if expr.is_Symbol: # Variables or Parameters
replacement = self.sym_map.get(expr, None)
if replacement is None:
replacement = expr.value
if idx is not None and expr.is_indexed:
replacement = replacement[idx]
return str(replacement) # replacement specified
# Number
if expr.is_rational and not expr.is_integer:
# we parenthesize rationals to avoid problems with, e.g., a * b
# where b is a rational
return self.str_map["()"](str(expr))
return str(expr)
[docs]
def parse_args(self, args, idx, parent_prec=0):
"""Parse arguments of an expression with parentheses as appropriate."""
if parent_prec == 0:
return tuple(self(arg, idx) for arg in args)
else:
return tuple(
(
self.str_map["()"](self(arg, idx))
if precedence(arg) > parent_prec
else self(arg, idx)
)
for arg in args
)
[docs]
def cached_parse(self, expr, idx=None):
"""Look up previously parsed expressions in self.cache."""
# try:
if (expr, idx) in self.cache:
return self.cache[expr, idx]
else:
res = self(expr, idx)
self.cache[expr, idx] = res
return res
# except RecursionError as e:
# if not hasattr(e, 'expr'):
# e.expr = expr
# raise
[docs]
def str_parse(expr, sym_map=None, str_map=None, idx=None):
"""Parse the sympy expression `expr`, replacing symbols and operators.
The `sym_map` provides the possibility to replace symbols (Parameters and
Variables) with their string representations, if the symbol map is empty
or there is no mapping for a given symbol, it is replaced with its `value`
attribute.
The `str_map` provides the possibility to replace sympy's operator objects
with string representations.
"""
if sym_map is None:
sym_map = {}
# Based on default or user-provided str_map:
if str_map is None:
str_map = _str_map.copy()
else: # override default with user map
__str_map = _str_map.copy()
__str_map.update(str_map)
str_map = __str_map
def rec_str_parse(expr):
"""Recursively call to str_parse."""
return str_parse(expr, sym_map, str_map, idx)
def parse_args(args, parent_prec=0):
"""Parse arguments of an expression with parentheses as appropriate."""
if parent_prec == 0:
return tuple(str_parse(arg, sym_map, str_map, idx) for arg in args)
else:
return tuple(
(
str_map["()"](str_parse(arg, sym_map, str_map, idx))
if precedence(arg) > parent_prec
else str_parse(arg, sym_map, str_map, idx)
)
for arg in args
)
if expr.is_Add:
p_args, n_args = split(expr.args, is_negated)
if p_args:
if n_args: # (a -b + c - d) -> a + c - (b + d)
m_expr = expr.func(*p_args) # minuend
s_expr = expr.func(*(-a for a in n_args)) # subtrahend
# While the minuend can be handled normally, the subtrahend
# must be parenthesized
args = (*parse_args((m_expr,), 3), *parse_args((s_expr,), 2.5))
return str_map["Sub"](*args)
return str_map["Add"](*parse_args(expr.args, 3))
# This handles -a -b -> -(a + b)
# NOTE: We cannot just use -expr as this can result in an infinite
# recursion with the 'Neg' cases in expr.is_Mul!
neg_expr = expr.func(*(-arg for arg in n_args))
return str_map["Neg"](*parse_args((neg_expr,), 2))
if expr.is_Mul:
a0 = expr.args[0]
if a0.is_Number and a0 < 0:
return str_map["Neg"](*parse_args((-expr,), 2))
numer, denom = as_numer_denom(expr)
if denom == 1 or denom == 1.0:
return str_map["Mul"](*parse_args(expr.args, 2))
# In contrast to sympy, we don't want to distinguish between cases
# like -(a + b) / c, -( (a + b) / c) and (a + b)/-c
if is_negated(numer):
return str_map["Neg"](*parse_args((-numer / denom,), 2))
if is_negated(denom):
return str_map["Neg"](*parse_args((numer / -denom,), 2))
# While the numerator can be handled normally, the denominator
# must be parenthesized if it's equivalent to a multiplication
# to correctly represent forms like a / (b * c)
args = (*parse_args((numer,), 2), *parse_args((denom,), 1.5))
return str_map["Div"](*args)
if expr.is_Pow:
base, exponent = expr.args
if is_negated(exponent):
return str_map["Inv"](*parse_args((base**-exponent,), 1.5))
return str_map["Pow"](*parse_args(expr.args, 1))
# prec == 0
if expr.args: # A univariate function
return str_map[get_type_name(expr)](*parse_args(expr.args))
if expr.is_Symbol: # Variables or Parameters
replacement = sym_map.get(expr, None)
if replacement is None:
replacement = expr.value
if idx is not None and expr.is_indexed:
replacement = replacement[idx]
return str(replacement) # replacement specified
# Number
if expr.is_rational and not expr.is_integer:
# we parenthesize rationals to avoid problems with, e.g., a * b where
# b is a rational
return str_map["()"](str(expr))
return str(expr)
# DEBUG
# except RecursionError as e:
# if not hasattr(e, 'expr'):
# e.expr = expr
# raise
# DEBUG
# Method for tighter bounds of univariate continuous functions
[docs]
class RootFindingError(Exception):
pass
[docs]
def cont_univ_bounds(expr, var):
"""Compute tight bounds for bounded continuous univariate expressions.
A bounded continuous expression of a single variable can be bounded exactly
by looking at its function values at 'critical points', i.e., the union of
its bounds and the points at which its derivative is zero that are in the
interval implied by the bounds.
"""
from sympy import Interval
from sympy.solvers import solveset
# lb = var.lb if var.lb is not None else float('-inf')
# ub = var.ub if var.ub is not None else float('inf')
lb, ub = bounds(var)
crit_points = {lb, ub}
try:
crit_points.update(solveset(expr.diff(var), var, Interval(lb, ub)))
except Exception:
raise RootFindingError
crit_values = [expr.subs({var: point}) for point in crit_points]
return min(crit_values), max(crit_values)
# Functions for interval arithmetic
# NOTE: The below definitions assume that for any bound interval (lb, ub) with
# -∞ < lb == ub < ∞. In any case we should probably warn or throw an
# error if there are any infinite bounds.
[docs]
def sum_bounds(*bounds):
"""Return the sum of a sequence of intervals."""
_lb, _ub = 0, 0
for lb, ub in bounds:
_lb += lb
_ub += ub
return (_lb, _ub)
[docs]
def mul_bounds(a, b):
"""Return the result of a * b by interval arithmetic."""
# NOTE: If a and b come from the same expression, such as in the case
# x * x, the bound obtained by this function is larger than
# necessary as the multiplication is equivalent to x ** 2 and in
# general [lb, ub] ** 2 ⊆ [lb, ub] * [lb, ub] holds! Luckily
# sympy does a fairly good job of simplifying expressions, so
# this problem should not occur too often.
# NOTE: Edge cases of 0 * ±inf are treated as 0 because infinite bounds
# should never be binding, otherwise the solution would be unbounded
inf = float("inf")
S = [
i * j if {i, j} not in [{0, -inf}, {0, inf}] else 0 for i in a for j in b
] # all combinations of products from a and b
return (min(S), max(S))
[docs]
def prod_bounds(*bounds):
"""Return the product of a sequence of intervals."""
return reduce(mul_bounds, bounds)
[docs]
def pow_bounds(a, b):
"""Return the result of a ** b if a[0] == a[1] or b[0] == b[1].
The power function is monotonous on any domain that excludes zero!
We therefore only need to consider special cases explicitly and can
return the sorted tuple of powers of the base boundaries otherwise.
"""
# NOTE: Infinities give rise to another special case, but currently we're
# not interested in this, so we save ourselves the headache...
if b[0] in [None, -float("inf"), float("inf")] or b[0] != b[1]:
if a[0] == a[1]:
a = a[0]
if a < -1: # increasing and only valid for integral b
# vals = integer_values in [b[0], b[1]]
# # will be the last two elements of vals
# return (min(vals), max(vals))
raise NotImplementedError
elif a < 0: # decreasing and only valid for integral b
# vals = integer_values in [b[0], b[1]]
# # will be the first two elements of vals
# return (min(vals), max(vals))
raise NotImplementedError
elif a > 1: # decreasing
return (a ** b[1], a ** b[0])
elif a > 0: # 0 < a <= 1
return (a ** b[0], a ** b[1])
else: # a == 0
return (0, 0)
else:
raise ValueError(
"Interval power requires real-valued singleton "
f"exponents (with -∞ < lb == ub < ∞), got {b}"
)
b = float(b[0])
if b == 0:
return (1, 1) # x ** 0 == 1 for any float x, including 0, ±∞ and nan!
lb, ub = a
if (lb <= 0) and (ub >= 0): # 0 is contained in base set a
if b < 0: # negative exponent with 0 base -> Division by zero!
raise ValueError("Base of 0 in power with negative exponent")
if not b % 2: # b is an even power -> a ** b is symmetric around 0!
return (0, max([lb**b, ub**b]))
if lb < 0 and (b % 1) != 0: # b is not integral -> a ** b is complex!
raise ValueError("Negative base with a non-integer exponent")
return tuple(sorted([lb**b, ub**b]))
[docs]
def bLe(x, y):
"""Return the truth value of x <= y, if it can be determined, else None."""
if x[1] <= y[0]:
return True
if x[0] > y[1]:
return False
[docs]
def bGe(x, y):
"""Return the truth value of x >= y, if it can be determined, else None."""
if x[0] >= y[1]:
return True
if x[1] < y[0]:
return False
[docs]
def bEq(x, y):
"""Return the truth value of x == y, if it can be determined, else None."""
if x[0] == y[0] == x[1] == y[1]:
return True
if x[1] <= y[0] or y[1] <= x[0]:
return False
[docs]
def bounds_cost_turton(x, c1, c2, c3):
"""Compute bounds for the the Guthrie cost function.
cost_turton(x, c1, c2, c3) \
= pow(10, c1 + c2 * log10(x) + c3 * log10(x) ** 2)
"""
from numpy import log10
# bounds for log10(x)
log_bounds = log10(x[0]), log10(x[1])
# bounds for c2 * log10(x)
c2_bounds = mul_bounds(c2, log_bounds)
# bounds for log10(x) ** 2
log_squared_bounds = tuple(sorted([log_bounds[0] ** 2, log_bounds[1] ** 2]))
# bounds for c3 * log10(x) ** 2
c3_bounds = mul_bounds(c3, log_squared_bounds)
# bounds for c1 + c2 * log10(x) + c3 * log10(x) ** 2
exponent_bounds = sum_bounds(c1, c2_bounds, c3_bounds)
return (10 ** exponent_bounds[1], 10 ** exponent_bounds[0])
[docs]
def lmtd_bounds(dT1, dT2):
"""Calculate the logarithmic mean temperature difference.
lmtd(dT1, dT2) = (dT1 - dT2) / log(dT1 / dT2)
"""
from numpy import log
# assuming dT2 > 0!
inv_dT2_bounds = (1 / dT2[1], 1 / dT2[0])
# bounds for dT1 / dT2
arg_bounds = mul_bounds(dT1, inv_dT2_bounds)
# bounds for log(dT1 / dT2) and its inverse
log_bounds = log(arg_bounds[0]), log(arg_bounds[1])
inv_log_bounds = (1 / log_bounds[1], 1 / log_bounds[0])
# bounds for dT1 - dT2
diff_bounds = sum_bounds(dT1, (-dT2[1], -dT2[0]))
return mul_bounds(inv_log_bounds, diff_bounds)
[docs]
def rlmtd_bounds(dT1, dT2):
"""Calculate bounds for the inverse of the LMTD.
rlmtd(dT1, dT2) = log(dT1 / dT2) / (dT1 - dT2)
"""
from numpy import log
# assuming dT2 > 0!
inv_dT2_bounds = (1 / dT2[1], 1 / dT2[0])
# bounds for dT1 / dT2
arg_bounds = mul_bounds(dT1, inv_dT2_bounds)
# bounds for log(dT1 / dT2)
log_bounds = log(arg_bounds[0]), log(arg_bounds[1])
# bounds for dT1 - dT2 and its inverse
diff_bounds = sum_bounds(dT1, (-dT2[1], -dT2[0]))
inv_diff_bounds = (1 / diff_bounds[1], 1 / diff_bounds[0])
return mul_bounds(log_bounds, inv_diff_bounds)
[docs]
def floor_substitute_bounds(x, LB, UB):
from numpy import floor
return floor(LB[0]), floor(UB[1])
bounds_op_map = {
"Add": sum_bounds,
"Mul": prod_bounds,
"Pow": pow_bounds,
"LessThan": bLe,
"GreaterThan": bGe,
"Equality": bEq,
"cost_turton": bounds_cost_turton,
"lmtd": lmtd_bounds,
"rlmtd": rlmtd_bounds,
"floor_substitute": floor_substitute_bounds,
}
# EXPERIMENTAL!!!
def _cont_univ_bounds(func, lb, ub):
from sympy import Interval
from sympy.solvers import solveset
import comando
DUMMY = comando.core.Symbol("DUMMY")
f = func(DUMMY)
crit_points = {lb, ub}
try:
crit_points.update(solveset(f.diff(DUMMY), DUMMY, Interval(lb, ub)))
except Exception:
raise RootFindingError
crit_values = [f.subs({DUMMY: point}) for point in crit_points]
return min(crit_values), max(crit_values)
def _mon_inc(name, b):
func = getattr(math, name)
return func(b[0]), func(b[1])
# monotonously increasing functions
for name in [
"exp",
"log",
"sqrt",
"tan",
"sinh",
"tanh",
"asin",
"acos",
"atan",
"asinh",
"acosh",
"atanh",
]:
bounds_op_map[name] = partial(_mon_inc, name)
# symmetric functions
# bounds_op_map['cosh'] = ...
# for name in ['sin', 'cos']:
# func = getattr(comando, name)
# bounds_op_map[name] = lambda bounds: _cont_univ_bounds(func, *bounds)
bounds_op_map["Min"] = lambda *b: (min(bi[0] for bi in b), min(bi[1] for bi in b))
bounds_op_map["Max"] = lambda *b: (max(bi[0] for bi in b), max(bi[1] for bi in b))
def _normalize_bounds(bounds):
lb = float("-inf") if math.isnan(bounds[0]) else bounds[0]
ub = float("inf") if math.isnan(bounds[1]) else bounds[1]
return (lb, ub)
[docs]
def bounds(expr):
"""Propagate variable bounds through the given expression."""
from pandas import Series
index = get_index(expr)
if index is None:
return _bounds(expr)
lb = Series(-comando.INF, index)
ub = Series(comando.INF, index)
for i in index:
lb[i], ub[i] = _bounds(parse(expr, idx=i))
return lb, ub
def _bounds(expr):
"""Propagate variable bounds through the given expression."""
if expr.args:
op = bounds_op_map[get_type_name(expr)]
args = tuple(_bounds(arg) for arg in expr.args)
try:
return op(*args)
except ValueError as e:
raise ValueError(f"{e}: {expr}")
# if expr.is_Number: false for symengine.E
try:
return float(expr), float(expr)
except (TypeError, RuntimeError):
pass
try: # treating expr as a Variable by getting its bounds
return expr.bounds
except AttributeError:
# expr is a Parameter it has a value attribute...
val = expr.value
return _normalize_bounds((val, val))
[docs]
def make_tac_objective(
system, ic_labels=None, fc_labels=None, vc_labels=None, n=10, i=0.08
):
"""Create the objective components corresponding to total annualized cost.
Arguments
---------
ic_labels : iterable of `str`
labels for the investment cost expressions
fc_labels : iterable of `str`
labels for the fixed cost expressions
vc_labels : iterable of `str`
labels for the variable cost expressions
n : `int`
number of repetitions of the time-period specified by `timesteps`.
i : `float`
interest_rate
Returns
-------
Expressions for design_objective and operational_objective forming part
of the total annualized costs.
"""
if ic_labels is None:
ic_labels = ["investment_costs"]
if fc_labels is None:
fc_labels = ["fixed_costs"]
if vc_labels is None:
vc_labels = ["variable_costs"]
ic = sum(system.aggregate_component_expressions(ic_label) for ic_label in ic_labels)
fc = sum(system.aggregate_component_expressions(fc_label) for fc_label in fc_labels)
vc = sum(system.aggregate_component_expressions(vc_label) for vc_label in vc_labels)
af = ((1 + i) ** n * i) / ((1 + i) ** n - 1) # annuity factor
return af * ic + fc, vc
[docs]
def make_mayer_objective(system):
"""Create a Mayer term objective.
An option for dynamic optimization
phi(t_f) is minimized and phi_dot = f(operation)
"""
phi_dot = system.aggregate_component_expressions("variable Mayer objective")
system.objective_type = "Mayer term objective"
# no design objective, just operational part
return phi_dot
[docs]
def lambdify(expr, vars=None, eval_params=True, **kwargs):
"""Create a function for evaluation of expr with numpy.
A variant of sympy's lambdify which creates a function for fast numerical
evaluation of an expression.
Arguments
---------
expr : Expression
the expression to be turned into a function
eval_params : bool
Whether to replace parameters by their values (if expr contains indexed
parameters, evaluating the function will thus return a Series)
default: True
Returns
-------
f : callable
a function returning the value of the expression for given variable
values
"""
if vars is None:
vars = get_vars(expr)
from collections import OrderedDict
v_reps = OrderedDict((v, comando.core.Symbol(f"VALUE_OF_{v.name}")) for v in vars)
p_reps = {}
if eval_params:
p_reps = {p: p.value for p in get_pars(expr)}
if p_reps and get_index(sum(p_reps)) is not None: # multiple expressions
from pandas import Series
exprs = parse(expr, {**p_reps, **v_reps})
funcs = Series(
(comando.lambdify(list(v_reps.values()), ei, **kwargs) for ei in exprs),
exprs.index,
"O",
)
def func(*args):
return funcs.apply(lambda f: f(*args))
return func
lambdified = comando.lambdify(
list(v_reps.values()), [parse(expr, {**p_reps, **v_reps})], **kwargs
)
return lambdified
[docs]
def define_function(name, implementation):
"""Define a new symbolic function with a name and an implementation.
The given implementation will be used when the function is called with
nonsymbolic arguments only.
Arguments
---------
name : str
the function's name
implementation : callable
a callable that serves as the numerical implementation and is used when
no symbols are within the arguments. Must have a fixed number of
positional arguments.
Returns
-------
new_function : sympy.FunctionClass
the newly defined function
"""
from inspect import signature
sig = signature(implementation)
if any(arg.kind.value > 2 for arg in sig.parameters.values()):
raise ValueError(
"Arguments of implementation must be either "
"POSITIONAL_ONLY, POSITIONAL_OR_KEYWORD or "
"VAR_POSITIONAL!"
)
###########################################################################
# Needed for sympy evaluation
@classmethod
def eval(cls, *args):
pass
new_func = comando.Function(name)
new_func.get_name = lambda self: self.name
new_func.eval = eval
new_func.n_args = len(sig.parameters)
# allow evaluation
comando.utility.numpy_op_map[name] = implementation
###########################################################################
# Needed for symengine evaluation
if not hasattr(comando.Function, "implementations"):
comando.Function.implementations = {}
comando.Function.implementations[name] = implementation
def _wrapped(*args):
# print('wrapped call')
new_func_instance = new_func(*args)
def __float__(self):
# print(f'wrapped float of {self.get_name()}')
implementation = comando.Function.implementations[self.get_name()]
# return float(implementation(*self.args))
return float(implementation(*map(float, self.args)))
type(new_func_instance).__float__ = __float__
return new_func_instance
comando.op_map[name] = _wrapped
return _wrapped
[docs]
def make_function(expr):
"""Create a callable function from the given expression.
The user can either specify values for all Variables in the order specified
by the returned function's '__doc__' (in alphabetical order), or pass
keyword arguments in the form of identifier-value pairs. The values are
taken as alternatives for the default values used in the evaluation for the
Variables or Parameters with matching identifiers.
"""
syms = expr.free_symbols
pars = sorted(
(sym for sym in syms if isinstance(sym, comando.core.Parameter)),
key=lambda x: x.name,
)
vars = sorted(
(sym for sym in syms if isinstance(sym, comando.core.Variable)),
key=lambda x: x.name,
)
def function(*user_var_values, **user_values):
nargs = len(vars)
if user_var_values:
if len(user_var_values) != nargs:
raise ValueError(f"0 or {nargs} arguments required!")
sym_map = {var: user_var_values[i] for i, var in enumerate(vars)}
else:
sym_map = {var: var.value for var in vars}
par_values = {par: par.value for par in pars}
sym_map.update(par_values)
for id, val in user_values.items():
known_symbols = iter(sym_map)
while True:
try:
sym = next(known_symbols)
except StopIteration:
names = [sym.name for sym in sym_map]
raise ValueError(f"Expression only has symbols {names}!")
if sym.name == id:
break
sym_map[sym] = val
par_values.update(user_values)
return parse(expr, sym_map)
# Manually passing docstring to specify argument order
doc = f"Call {expr} providing {vars} and optionally {pars} as arguments."
function.__doc__ = doc
return function
# NOTE: We can now do things like the following
# my_pars = [Parameter(f"p{i}", value=i**2) for i in range(3)]
# my_vars = [Variable(f"v{i}", value=10-i) for i in range(2)]
# expr = my_vars[0] ** my_vars[1] + sum(my_pars)
# f = make_function(expr)
# f.__doc__
# f(1, 2)
#
# from pandas import Series
# s = Series([1,3,5])
# f(s, 2)