Source code for comando.interfaces.maingo_ale

"""Code to generate an ALE input file for MAiNGO.

ALE is a library for Algebraic Logical Expressions that can be used to generate
MAiNGO problems from human-readable input files.
This module allows to generate such input files based on a COMANDO problem.
"""
# This file is part of the COMANDO project which is released under the MIT
# license. See file LICENSE for full license details.
#
# AUTHORS: Marco Langiu
import re

import comando
from comando.utility import split, indexed, StrParser

inf = float('inf')
INF = 1e10


maingo_str_map = {
   '()': lambda expr: f'({expr})',
   'Add': lambda *args: " + ".join(args),
   'Mul': lambda *args: " * ".join(f'{arg}' for arg in args),
   'Pow': lambda base, exponent: f"pow({base}, {exponent})",
   'LessThan': lambda lhs, rhs: f'{lhs} <= {rhs}',
   'GreaterThan': lambda lhs, rhs: f'{lhs} >= {rhs}',
   'Equality': lambda lhs, rhs: f'{lhs} = {rhs}'
}


[docs] def maingo_pow_callback(parser, expr, idx): """Handle special pow calls in MAiNGO.""" base, exponent = expr.args if base == comando.E: return parser.str_map['exp'](*parser.parse_args((exponent, ), idx)) # if exponent.is_Number: # if exponent < 0: # return parser.str_map['Inv'](base ** -exponent, idx) # from math import log2 # log2_exp = log2(float(exponent)) # if log2_exp % 1 == 0: # exponent is a multiple of 2 or 0.5 # # NOTE: the case log2_exp == 0 is simplified to 1 by the backend! # if log2_exp > 0: # func = parser.str_map['sqr'] # res = func(*parser.parse_args((base, ), idx)) # while True: # log2_exp -= 1 # if log2_exp == 0: # return res # res = func(res) # if log2_exp < 0: # func = parser.str_map['sqrt'] # res = func(*parser.parse_args((base, ), idx)) # while True: # log2_exp += 1 # if log2_exp == 0: # return res # res = func(res) return None # No special case, handle normally
[docs] class AleParser(StrParser): # pylint: disable=too-few-public-methods """A class for parsing comando expressions to baron Syntax.""" def __init__(self, sym_map): super().__init__(sym_map, maingo_str_map, pow_callback=maingo_pow_callback)
# TODO: Actively check that no two labels coincide, also could be generalized! def _normalize(label, replacement='__'): return re.sub('[^0-9a-zA-Z]+', replacement, label) def _ale_var_rep(var, name): lb = -INF if var.lb == -inf else var.lb ub = INF if var.ub == inf else var.ub if var.is_integer: if (lb, ub) == (0, 1): return f'binary {name};\n' return f'integer {name} in [{lb}, {ub}];\n' return f'real {name} in [{lb}, {ub}];\n' def _write_cons(file, cons, parse, suffixes, prefix=''): dcons, ocons = split(cons, indexed) file.write(f'\n{prefix}constraints:\n') for c_id, con in dcons.items(): file.write(f'{parse(con)} "{c_id}";\n') for c_id, con in ocons.items(): for sfx in suffixes: file.write(f'{parse(con, sfx)} "{c_id}_{sfx}";\n') def _write_ale_file(P, file_name, relaxation_only_constraints, squashing_constraints, cse, outputs, add_intermediates_as_output, sym_map, suffixes, dvs, ovs, parse): # Shorthands do = P.design_objective oo = P.operational_objective cons = P.constraints ro_cons = relaxation_only_constraints sq_cons = squashing_constraints with open(file_name, 'w') as file: # Variables section file.write('definitions:\n\n# Variables\n') for dv in dvs: file.write(_ale_var_rep(dv, sym_map[dv])) for ov in ovs: for idx in suffixes.keys(): file.write(_ale_var_rep(ov[idx], sym_map[ov][idx])) if cse: # Using Common Subexpression Elimination to reduce file size from comando.utility import evaluate file.write('\n# Intermediate variables:\n') orig_exprs = [do, oo, *cons.values()] if ro_cons: orig_exprs.extend(ro_cons.values()) if sq_cons: orig_exprs.extend(sq_cons.values()) if outputs: orig_exprs.extend(outputs.values()) reps, exprs = comando.cse(orig_exprs) defs = {} expr_defs = {} for sym, rep in reps: e = rep.subs(defs) if indexed(e): x = comando.VariableVector(sym.name) x.instantiate(P.index) x_map = sym_map[x] = {} for idx, sfx in suffixes.items(): n = x_map[idx] = _normalize(x[idx].name) file.write(f'real {n} := {parse(e, idx)};\n') else: x = comando.Variable(sym.name) sym_map[x] = _normalize(x.name) file.write(f'real {x} := {parse(e)};\n') x.value = evaluate(e) defs[sym] = x expr_defs[x] = e do = exprs[0].subs(defs) oo = exprs[1].subs(defs) con_end = 2 + len(cons) cons = {con_id: expr.subs(defs) for con_id, expr in zip(P.constraints, exprs[2:con_end])} if ro_cons: ro_con_end = con_end + len(ro_cons) ro_cons = {con_id: expr.subs(defs) for con_id, expr in zip(ro_cons, exprs[con_end:ro_con_end])} else: ro_con_end = con_end if sq_cons: sq_con_end = ro_con_end + len(sq_cons) sq_cons = {con_id: expr.subs(defs) for con_id, expr in zip(sq_cons, exprs[ro_con_end:sq_con_end])} P.cse = [do, oo, cons] P.expr_defs = expr_defs if outputs: outputs = {k: v.subs(defs) for k, v in zip(outputs, exprs[-len(outputs):])} else: outputs = {} if add_intermediates_as_output: outputs.update({aux.name: aux for aux in defs.values()}) file.write('\n# Initial point\n') for dv in dvs: file.write(f'{sym_map[dv]}.init <- {dv.value};\n') for ov in ovs: for idx, val in zip(suffixes, ov.value): file.write(f'{sym_map[ov][idx]}.init <- {float(val)};\n') # Constraints sections if cons: _write_cons(file, cons, parse, suffixes) if ro_cons: _write_cons(file, ro_cons, parse, suffixes, 'relaxation only ') if sq_cons: _write_cons(file, sq_cons, parse, suffixes, 'squashing ') # Adding objective file.write('\nobjective:\n') # file.write(parse(P.objective)) file.write(parse(do)) if oo != 0: file.write(' + ') ts = P.timesteps if ts is None: # Assume scenarios is not None file.write(' + '.join(f'{p} * ({parse(oo, s)})' for s, p in P.scenario_weights.items())) elif P.scenarios is None: file.write(' + '.join(f'{dt} * ({parse(oo, t)})' for t, dt in ts.items())) else: file.write( ' + '.join( f"""{p} * ({' + '.join(f'{dt} * ({parse(oo, (s, t))})' for t, dt in ts[s].items())})""" for s, p in P.scenario_weights.items() ) ) file.write(';\n') if outputs: from comando.utility import get_index file.write('\noutputs:\n') for k, v in outputs.items(): index = get_index(v) if index is None: file.write(f' {parse(v)} "{k}";\n') else: for i in index: file.write(f' {parse(v, i)} "{k}[{i}]";\n')
[docs] def write_ale_file(P, file_name, relaxation_only_constraints=None, squashing_constraints=None, cse=True, outputs=None, add_intermediates_as_output=False, reuse=False): """Write the problem in ALE syntax to a file or stdout.""" suffixes = {i: str(i) for i in P.index} \ if P.scenarios is None or P.timesteps is None \ else {ii: '_'.join(str(i) for i in ii) for ii in P.index} dvs = sorted(P.design_variables, key=lambda x: x.name) ovs = sorted(P.operational_variables, key=lambda x: x.name) consts, params = split(P.parameters, indexed) sym_map = {c: str(c.value) for c in consts} for dv in dvs: sym_map[dv] = _normalize(dv.name) for ov in ovs: sym_map[ov] = {idx: _normalize(ov[idx].name) for idx, sfx in suffixes.items()} for p in params: sym_map[p] = {idx: str(val) for idx, val in zip(suffixes, p.value)} parser = AleParser(sym_map) parse = parser.cached_parse if not reuse: _write_ale_file(P, file_name, relaxation_only_constraints, squashing_constraints, cse, outputs, add_intermediates_as_output, sym_map, suffixes, dvs, ovs, parse) return sym_map, suffixes
# TODO: Can probably be generalized and moved to utility.py
[docs] def write_settings_file(options, settings_name='MAiNGOSettings.txt'): """Generate a settings file with the given options.""" with open(settings_name, 'w') as f: f.writelines(f'{option} {value}\n' for option, value in options.items())
[docs] def call_maingo(file_name, settings_name=None, silent=False): """Call the maingo executable with a problem and possibly settings file.""" from comando.utility import syscall if settings_name: return syscall('MAiNGO', file_name, settings_name, silent=silent) # Will try to use MAiNGOSettings.txt return syscall('MAiNGO', file_name, silent=silent)
[docs] def get_results(results_file_name='MAiNGOresult.txt'): """Code for parsing MAiNGO results files.""" import itertools import re p = re.compile(r" +(\S+) +\S+ +(\S+)") val_map = {} with open(results_file_name, 'r') as f: # Advance two lines for line in itertools.islice(f, 2, None): try: var_name, val = p.search(line).groups() except AttributeError: break val_map[var_name] = val return val_map
[docs] def solve(P, file_name=None, relaxation_only_constraints=None, squashing_constraints=None, silent=False, cse=True, outputs=None, add_intermediates_as_output=False, reuse=None, **options): """Solve poblem P using MAiNGO.""" from comando.utility import canonical_file_name, check_reuse_or_overwrite base_name, file_name = canonical_file_name(P.name, '.ale', file_name) check_reuse_or_overwrite(file_name, reuse) sym_map, suffixes = write_ale_file(P, file_name, relaxation_only_constraints, squashing_constraints, cse, outputs, add_intermediates_as_output, reuse) if options: settings_name = f'{base_name}_Settings.txt' write_settings_file(options, settings_name) else: settings_name = None ret = call_maingo(file_name, settings_name, silent) if ret != 0: raise RuntimeError('ERROR: Solver returned nonzero exit status!') with open('MAiNGO.log', 'r') as f: content = f.read() INF_MSG = "Problem is infeasible!" if INF_MSG in content: print('Log says:', INF_MSG) else: vals = get_results() for dv in P.design_variables: dv.value = vals[sym_map[dv]] for ov in P.operational_variables: for idx, name in sym_map[ov].items(): ov[idx] = vals[name] return ret