Source code for comando.interfaces.modelica

"""Routines for translation to Modelica syntax."""

import comando.core
import comando.utility
from comando.utility import StrParser, _str_map, is_indexed, split


[docs] def raise_(ex): raise ex
modelica_str_map = { "LessThan": lambda lhs, rhs: raise_( NotImplemented("Modelica does not support inequality constraints!") ), "GreaterThan": lambda lhs, rhs: raise_( NotImplemented("Modelica does not support inequality constraints!") ), "Equality": lambda lhs, rhs: f"{lhs} = {rhs}", "exp": lambda arg: f"exp({arg})", "log": lambda arg: f"log({arg})", "Pow": lambda base, exponent: f"{base}^({exponent})", } for key in "()", "Add", "Neg", "Sub", "Mul", "Div", "Inv": modelica_str_map[key] = _str_map[key]
[docs] class ModelicaParser(StrParser): # pylint: disable=too-few-public-methods """A class for parsing comando expressions to Modelica Syntax.""" def __init__(self, sym_map): super().__init__(sym_map, modelica_str_map)
from enum import Enum Modifier = Enum("modifier", ["parameter", "input", "output"]) def _entry(name, value=None, modifier=None, kind="Real"): """Create a string representation of a modelica symbol. For <kind="Real" or "Integer">, states are represented as <kind> state(start=<initial_value>, fixed=true) controls as input <kind> control(start=<initial_value_guess_if_available>) all algebraic variables as output <kind> var(start=<initial_value_guess_if_available>) NOTE: The `output` modifier may cause issues when using fmus generated from the resulting Modelica model in certain Programs, e.g., DyOS, if var is constant through some combination of equations! """ if modifier: # in {'parameter', 'input', 'output'}: entry = f" {modifier.name} {kind} {name}" else: entry = f" {kind} {name}" if value is not None: if kind == "Integer": value = int(value) # Ensure appropriate value type # For states modifier == None, otherwise value is a guess entry += f"(start={value}{'' if modifier else ', fixed=true'})" return entry + ";\n" def _data_entry(parameter, timesteps): """Helper function to include time-variable data.""" import pandas as pd if isinstance(parameter.value.index, pd.MultiIndex): res = " " for s, data in parameter.value.groupby(level=0): t = timesteps[s].cumsum() v = data.values table_entries = ";\n ".join(f"{ti}, {vi}" for ti, vi in zip(t, v)) res += f"""CombiTimeTable {parameter.name}_{s}( table=[ // Time (s), Value; {table_entries} ], tableOnFile=false, extrapolation=Extrapolation.HoldLastPoint, smoothness=Smoothness.ConstantSegments ) "data for parameter {parameter.name} in scenario {s}";\n\n """ else: t = timesteps.cumsum() v = parameter.value table_entries = ";\n ".join(f"{ti}, {vi}" for ti, vi in zip(t, v)) res = f""" CombiTimeTable {parameter.name}( table=[ // Time (s), Value; {table_entries} ], tableOnFile=false, extrapolation=Extrapolation.HoldLastPoint, smoothness=Smoothness.ConstantSegments ) "data for parameter {parameter.name}";\n\n """ return res Modelica_HEADER = """// This file was automatically generated via the COMANDO-modelica interface. """ def _imports_section(*modules, **options): modules = set(modules) if options.pop("use_time_variable_data", False): modules.add("Modelica.Blocks.Sources.CombiTimeTable") modules.add("Modelica.Blocks.Types.Extrapolation") modules.add("Modelica.Blocks.Types.Smoothness") if modules: return ( " " + " ".join(f"import {module};\n" for module in sorted(modules)) + "\n\n" ) else: return "" def _parameters_section(P, use_time_variable_data): """Generate the parameters section.""" if P.scenarios: # one entry per scenario for each indexed parameter def _get_param_entries(): for p in sorted(P.parameters, key=lambda p: p.name): if p.is_indexed: if use_time_variable_data: yield _data_entry(p, P.timesteps) else: for s in P.scenarios: s_val = p.value[s] yield _entry( p.name + f"_{s}", ( s_val if isinstance(s_val, (int, float)) else s_val[P.timesteps[s].index[0]] ), Modifier.input, ) else: yield _entry(p.name, p.value, Modifier.input) else: def _get_param_entries(): for p in sorted(P.parameters, key=lambda p: p.name): if p.is_indexed: if use_time_variable_data: yield _data_entry(p, P.timesteps) else: yield _entry( p.name, p.value[P.timesteps.index[0]], Modifier.input ) # raise NotImplementedError(err_msg) else: yield _entry(p.name, p.value, Modifier.input) if P.parameters: return f" // parameters\n{''.join(_get_param_entries())}\n" return "" def _variables_section(P, controls): """Write the different 'VARIABLES' sections and variable bounds.""" _bd = "binary design variables" _id = "integer design variables" _cd = "continuous design variables" _bo = "binary operational variables" _io = "integer operational variables" _co = "continuous operational variables" from comando.utility import evaluate var_types = {_bd: [], _id: [], _cd: [], _bo: [], _io: [], _co: []} for v in P.design_variables: # Sort the variabes according to their domain if v.is_binary: var_types[_bd].append(v) elif v.is_integer: var_types[_id].append(v) else: var_types[_cd].append(v) for v in P.operational_variables: if v.is_binary: var_types[_bo].append(v) elif v.is_integer: var_types[_io].append(v) else: var_types[_co].append(v) if not controls: # Ensure there is at least one input dummy_input = comando.core.VariableVector("dummy_input") dummy_input.instantiate(P.index) controls = {dummy_input} var_types[_co].append(dummy_input) variables = "" # Design for v_type, kind in zip([_bd, _id, _cd], ["Integer", "Integer", "Real"]): vars = var_types[v_type] if vars: # Only print this section if there are variables of this type var_entries = [] for v in sorted(vars, key=lambda v: v.name): var_entries.append(_entry(v.name, v.value, Modifier.input, kind)) variables += f" // {v_type}\n{''.join(var_entries)}\n" # Operation for v_type, kind in zip([_bo, _io, _co], ["Integer", "Integer", "Real"]): vars = var_types[v_type] if vars: # Only print this section if there are variables of this type var_entries = [] for v in sorted(vars, key=lambda v: v.name): if v in P.states: mod = None init_state, *_ = P.states[v] init_value = init_state.value else: # Use first time value as initial value init_value = ( v.value.groupby(level=0).head(1).droplevel(1) if P.scenarios else v.value.iloc[0] ) # all controls are inputs, all algebraic variables are outputs mod = Modifier.input if v in controls else Modifier.output if P.scenarios: for s in P.scenarios: try: init_val = init_value[s] except TypeError: init_val = init_value var_entries.append( _entry(v.name + f"_{s}", init_val, mod, kind) ) else: var_entries.append(_entry(v.name, init_value, mod, kind)) variables += f" // {v_type}\n{''.join(var_entries)}\n" dobj_init = evaluate(P.design_objective) t0 = ( {s: P.timesteps[s].index[0] for s in P.scenarios} if P.scenarios else P.timesteps.index[0] ) # objective definitions variables += ( " // objective\n" f" Real d_obj(start={dobj_init});\n" + ( "".join( f" Real o_obj_{s}(start={evaluate(P.operational_objective, (s, t0[s]))});\n" for s in P.scenarios ) if P.scenarios else f" Real o_obj(start={evaluate(P.operational_objective, t0)});\n" ) + ( "".join( f" Real o_obj_int_{s}(start=0, fixed=true);\n" for s in P.scenarios ) if P.scenarios else " Real o_obj_int(start=0, fixed=true);\n" ) + f" output Real obj(start={dobj_init});\n" ) return variables def _equations_section(P, ineqs, parse): """Generate a string representation of all equations.""" from comando.utility import evaluate if not (P.states or P.constraints): return "\n\nequation\n\n " # for mandatory objective definitions t0 = ( {s: P.timesteps[s].index[0] for s in P.scenarios} if P.scenarios else P.timesteps.index[0] ) inequalities = ( ( "\n // inequalities\n " + " ".join( ( " ".join( f"output Real {name}_{s}(start={evaluate(ineq, (s, t0[s]))}); " f"// {c_id} for scenario {s}\n" for s in P.scenarios ) if P.scenarios and comando.utility.is_indexed(ineq) else f"output Real {name}(start={evaluate(ineq, t0)}); // {c_id}\n" ) for c_id, (ineq, name) in ineqs.items() ) ) if ineqs else "" ) def eq(id): """Normalize the equation id if it doesn't fit Modelica syntax.""" import re if re.match(r"^\w+$", id): eq_id = id else: eq_id = f"constraint_{eq.n}" eq.n += 1 eq.map[eq_id] = id return eq_id eq.n = 0 eq.map = {} equations = f"{inequalities}\nequation\n" if P.states: equations += "\n // differential equations\n" for state, (_, der_state_var, _) in P.states.items(): if P.scenarios: for s in P.scenarios: equations += f" der({state.name}_{s}) = {der_state_var.name}_{s};\n" else: equations += f" der({state.name}) = {der_state_var.name};\n" if P.constraints: equations += "\n // constraints\n" for c_id, c in P.constraints.items(): if not c.is_Equality: # Define auxiliary variables for inequalities ineq, name = ineqs[c_id] if P.scenarios and comando.utility.is_indexed(ineq): for s in P.scenarios: equations += f" {name}_{s} = {parse(ineq, s)}; // {c_id} for scenario {s}\n" else: equations += f" {name} = {parse(ineq)}; // {c_id}\n" continue if P.scenarios and comando.utility.is_indexed(c): for s in P.scenarios: equations += f" {parse(c, s)}; // {c_id} for scenario {s}\n" else: equations += f" {parse(c)}; // {c_id}\n" return equations def _objective_section(P, parse): return ( "\n // objective definitions\n" f" d_obj = {parse(P.design_objective)}; // design objective\n " + ( ( " ".join( f"o_obj_{s} = {parse(P.operational_objective, s)}; " f"// operational objective for scenario {s}\n" for s in P.scenarios ) + " obj = d_obj + " + " + ".join( f"{ws} * o_obj_int_{s}" for s, ws in P.scenario_weights.items() ) + "; // overall objective\n " + " ".join(f"der(o_obj_int_{s}) = o_obj_{s};\n" for s in P.scenarios) ) if P.scenarios else ( f"o_obj = {parse(P.operational_objective)}; // operational objective\n" " der(o_obj_int) = o_obj;\n" " obj = d_obj + o_obj_int; // overall objective\n" ) ) ) def _create_sym_map( dvars=(), ovars=(), pars=(), scenarios=None, use_time_variable_data=False, sym_map=None, ): """Create a Modelica symbol map or populate an existing one with new symbols. Arguments --------- dvars : iterable design variables ovars : iterable operational variables pars : iterable parameters Returns ------- sym_map : dict Symbol map with Modelica representation for each passed symbol. """ import re from itertools import chain p = re.compile(r"(\w+)\[\(?'?(\w+)'?(?:, '?(\w+)'?)*\)?\]") if sym_map is None: sym_map = {} dpars, opars = split(pars, is_indexed) for sym in chain(dpars, dvars): sym_map[sym] = sym.name for sym in chain(opars, ovars): if scenarios and sym.is_indexed: sym_map[sym] = {s: f"{sym.name}_{s}" for s in scenarios} else: sym_map[sym] = f"{sym.name}" if use_time_variable_data: for p in opars: if scenarios: sym_map[p] = {s: n + ".y[1]" for s, n in sym_map[p].items()} else: sym_map[p] = sym_map[p] + ".y[1]" return sym_map
[docs] def write_mo_file( P, mo_file_path, controls, *modules, use_time_variable_data=False, precheck=False ): """Write a Modelica file based on the COMANDO Problem. Arguments --------- P : comando.Problem the problem to translate mo_file_path : str the path to the Modelica file to be created, i.e., <optional_path>\<class_name>.mo controls : list[comando.VariableVector] the operational variables to be treated as controls (i.e., inputs in Modelica terms) modules : tuple[str] additional module to be imported options : dict[str -> Any] additional options """ import os if P.timesteps is None: raise RuntimeError( "Trying to generate a modelica file for a Problem without time-steps." ) eqs = {} ineqs = {} n_ineqs = 0 for c_id, c in P.constraints.items(): if c.is_Equality: eqs[c_id] = c else: ineqs[c_id] = (c.lts - c.gts, f"inequality_{n_ineqs}") n_ineqs += 1 _states = set(P.states) _controls = set() if isinstance(controls, (str, comando.core.Symbol)): # single control controls = [controls] for control in controls: if isinstance(control, str): try: control = P[control] except KeyError: raise KeyError(f'"{control}" is not a valid symbol name.') if control not in P.operational_variables: raise RuntimeError(f'"{control}" is not an operational variable.') _controls.add(control) alg_vars = P.operational_variables - _states - _controls nEqs = len(eqs) nVars = len(alg_vars) if precheck: if nEqs > nVars: raise RuntimeError( f"System is overconstrained: nEqs ({nEqs}) > nVars ({nVars})!" ) elif nEqs < nVars: raise RuntimeError( f"System is underconstrained: nEqs ({nEqs}) < nVars ({nVars})!" ) else: import pandas as pd incidence = pd.DataFrame( [[int(v in eq.free_symbols) for v in alg_vars] for eq in eqs.values()], eqs, (v.name for v in alg_vars), int, ) from numpy.linalg import matrix_rank rank = matrix_rank(incidence) if rank < nEqs: raise RuntimeError( "System is structurally singular, incidence matrix:\n\n" + str(incidence) ) # Parsing setup sym_map = _create_sym_map( P.design_variables, P.operational_variables, P.parameters, P.scenarios, use_time_variable_data, ) parse = ModelicaParser(sym_map) class_name = os.path.basename(mo_file_path).split(".")[0] with open(mo_file_path, "w") as f: f.write(Modelica_HEADER) f.write(f"\nmodel {class_name}\n\n") f.write( _imports_section(*modules, use_time_variable_data=use_time_variable_data) ) f.write(_parameters_section(P, use_time_variable_data)) f.write(_variables_section(P, _controls)) f.write(_equations_section(P, ineqs, parse)) f.write(_objective_section(P, parse)) f.write(f"\nend {class_name};\n")