"""Code to generate an ALE input file for MAiNGO.
ALE is a library for Algebraic Logical Expressions that can be used to generate
MAiNGO problems from human-readable input files.
This module allows to generate such input files based on a COMANDO problem.
"""
# This file is part of the COMANDO project which is released under the MIT
# license. See file LICENSE for full license details.
#
# AUTHORS: Marco Langiu
import re
import comando
import comando.core
from comando.utility import StrParser, is_indexed, split
inf = float("inf")
INF = 1e10
maingo_str_map = {
"()": lambda expr: f"({expr})",
"Add": lambda *args: " + ".join(args),
"Mul": lambda *args: " * ".join(f"{arg}" for arg in args),
"Pow": lambda base, exponent: f"pow({base}, {exponent})",
"LessThan": lambda lhs, rhs: f"{lhs} <= {rhs}",
"GreaterThan": lambda lhs, rhs: f"{lhs} >= {rhs}",
"Equality": lambda lhs, rhs: f"{lhs} = {rhs}",
}
[docs]
def maingo_pow_callback(parser, expr, idx):
"""Handle special pow calls in MAiNGO."""
base, exponent = expr.args
if base == comando.E:
return parser.str_map["exp"](*parser.parse_args((exponent,), idx))
# if exponent.is_Number:
# if exponent < 0:
# return parser.str_map['Inv'](base ** -exponent, idx)
# from math import log2
# log2_exp = log2(float(exponent))
# if log2_exp % 1 == 0: # exponent is a multiple of 2 or 0.5
# # NOTE: the case log2_exp == 0 is simplified to 1 by the backend!
# if log2_exp > 0:
# func = parser.str_map['sqr']
# res = func(*parser.parse_args((base, ), idx))
# while True:
# log2_exp -= 1
# if log2_exp == 0:
# return res
# res = func(res)
# if log2_exp < 0:
# func = parser.str_map['sqrt']
# res = func(*parser.parse_args((base, ), idx))
# while True:
# log2_exp += 1
# if log2_exp == 0:
# return res
# res = func(res)
return None # No special case, handle normally
[docs]
class AleParser(StrParser): # pylint: disable=too-few-public-methods
"""A class for parsing comando expressions to baron Syntax."""
def __init__(self, sym_map):
super().__init__(sym_map, maingo_str_map, pow_callback=maingo_pow_callback)
# TODO: Actively check that no two labels coincide, also could be generalized!
def _normalize(label, replacement="__"):
return re.sub("[^0-9a-zA-Z]+", replacement, label)
def _ale_var_rep(var, name):
lb = -INF if var.lb == -inf else var.lb
ub = INF if var.ub == inf else var.ub
if var.is_integer:
if (lb, ub) == (0, 1):
return f"binary {name};\n"
return f"integer {name} in [{lb}, {ub}];\n"
return f"real {name} in [{lb}, {ub}];\n"
def _write_cons(file, cons, parse, suffixes, prefix=""):
dcons, ocons = split(cons, is_indexed)
file.write(f"\n{prefix}constraints:\n")
for c_id, con in dcons.items():
file.write(f'{parse(con)} "{c_id}";\n')
for c_id, con in ocons.items():
for sfx in suffixes:
file.write(f'{parse(con, sfx)} "{c_id}_{sfx}";\n')
def _write_ale_file(
P,
file_name,
relaxation_only_constraints,
squashing_constraints,
cse,
outputs,
add_intermediates_as_output,
sym_map,
suffixes,
dvs,
ovs,
parse,
):
# Shorthands
do = P.design_objective
oo = P.operational_objective
cons = P.constraints
ro_cons = relaxation_only_constraints
sq_cons = squashing_constraints
with open(file_name, "w") as file:
# Variables section
file.write("definitions:\n\n# Variables\n")
for dv in dvs:
file.write(_ale_var_rep(dv, sym_map[dv]))
for ov in ovs:
for idx in suffixes.keys():
file.write(_ale_var_rep(ov[idx], sym_map[ov][idx]))
if cse: # Using Common Subexpression Elimination to reduce file size
from comando.utility import evaluate
file.write("\n# Intermediate variables:\n")
orig_exprs = [do, oo, *cons.values()]
if ro_cons:
orig_exprs.extend(ro_cons.values())
if sq_cons:
orig_exprs.extend(sq_cons.values())
if outputs:
orig_exprs.extend(outputs.values())
reps, exprs = comando.cse(orig_exprs)
defs = {}
expr_defs = {}
for sym, rep in reps:
e = rep.subs(defs)
if is_indexed(e):
x = comando.core.VariableVector(sym.name)
x.instantiate(P.index)
x_map = sym_map[x] = {}
for idx, sfx in suffixes.items():
n = x_map[idx] = _normalize(x[idx].name)
file.write(f"real {n} := {parse(e, idx)};\n")
else:
x = comando.core.Variable(sym.name)
sym_map[x] = _normalize(x.name)
file.write(f"real {x} := {parse(e)};\n")
x.value = evaluate(e)
defs[sym] = x
expr_defs[x] = e
do = exprs[0].subs(defs)
oo = exprs[1].subs(defs)
con_end = 2 + len(cons)
cons = {
con_id: expr.subs(defs)
for con_id, expr in zip(P.constraints, exprs[2:con_end])
}
if ro_cons:
ro_con_end = con_end + len(ro_cons)
ro_cons = {
con_id: expr.subs(defs)
for con_id, expr in zip(ro_cons, exprs[con_end:ro_con_end])
}
else:
ro_con_end = con_end
if sq_cons:
sq_con_end = ro_con_end + len(sq_cons)
sq_cons = {
con_id: expr.subs(defs)
for con_id, expr in zip(sq_cons, exprs[ro_con_end:sq_con_end])
}
P.cse = [do, oo, cons]
P.expr_defs = expr_defs
if outputs:
outputs = {
k: v.subs(defs) for k, v in zip(outputs, exprs[-len(outputs) :])
}
else:
outputs = {}
if add_intermediates_as_output:
outputs.update({aux.name: aux for aux in defs.values()})
file.write("\n# Initial point\n")
for dv in dvs:
file.write(f"{sym_map[dv]}.init <- {dv.value};\n")
for ov in ovs:
for idx, val in zip(suffixes, ov.value):
file.write(f"{sym_map[ov][idx]}.init <- {float(val)};\n")
# Constraints sections
if cons:
_write_cons(file, cons, parse, suffixes)
if ro_cons:
_write_cons(file, ro_cons, parse, suffixes, "relaxation only ")
if sq_cons:
_write_cons(file, sq_cons, parse, suffixes, "squashing ")
# Adding objective
file.write("\nobjective:\n")
# file.write(parse(P.objective))
file.write(parse(do))
if oo != 0:
file.write(" + ")
ts = P.timesteps
if ts is None: # Assume scenarios is not None
file.write(
" + ".join(
f"{p} * ({parse(oo, s)})" for s, p in P.scenario_weights.items()
)
)
elif P.scenarios is None:
file.write(
" + ".join(f"{dt} * ({parse(oo, t)})" for t, dt in ts.items())
)
else:
file.write(
" + ".join(
f"""{p} * ({
" + ".join(
f"{dt} * ({parse(oo, (s, t))})"
for t, dt in ts[s].items()
)
})"""
for s, p in P.scenario_weights.items()
)
)
file.write(";\n")
if outputs:
from comando.utility import get_index
file.write("\noutputs:\n")
for k, v in outputs.items():
index = get_index(v)
if index is None:
file.write(f' {parse(v)} "{k}";\n')
else:
for i in index:
file.write(f' {parse(v, i)} "{k}[{i}]";\n')
[docs]
def write_ale_file(
P,
file_name,
relaxation_only_constraints=None,
squashing_constraints=None,
cse=True,
outputs=None,
add_intermediates_as_output=False,
reuse=False,
):
"""Write the problem in ALE syntax to a file or stdout."""
if P.states:
msg = (
"Solving problems with states currently only "
"works via the maingopy interface!"
)
raise NotImplementedError(msg)
suffixes = (
{i: str(i) for i in P.index}
if P.scenarios is None or P.timesteps is None
else {ii: "_".join(str(i) for i in ii) for ii in P.index}
)
dvs = sorted(P.design_variables, key=lambda x: x.name)
ovs = sorted(P.operational_variables, key=lambda x: x.name)
consts, params = split(P.parameters, is_indexed)
sym_map = {c: str(c.value) for c in consts}
for dv in dvs:
sym_map[dv] = _normalize(dv.name)
for ov in ovs:
sym_map[ov] = {idx: _normalize(ov[idx].name) for idx, sfx in suffixes.items()}
for p in params:
sym_map[p] = {idx: str(val) for idx, val in zip(suffixes, p.value)}
parser = AleParser(sym_map)
parse = parser.cached_parse
if not reuse:
_write_ale_file(
P,
file_name,
relaxation_only_constraints,
squashing_constraints,
cse,
outputs,
add_intermediates_as_output,
sym_map,
suffixes,
dvs,
ovs,
parse,
)
return sym_map, suffixes
# TODO: Can probably be generalized and moved to utility.py
[docs]
def write_settings_file(options, settings_name="MAiNGOSettings.txt"):
"""Generate a settings file with the given options."""
with open(settings_name, "w") as f:
f.writelines(f"{option} {value}\n" for option, value in options.items())
[docs]
def call_maingo(file_name, settings_name=None, silent=False):
"""Call the maingo executable with a problem and possibly settings file."""
from comando.utility import syscall
if settings_name:
return syscall("MAiNGO", file_name, settings_name, silent=silent)
# Will try to use MAiNGOSettings.txt
return syscall("MAiNGO", file_name, silent=silent)
[docs]
def get_results(results_file_name="MAiNGOresult.txt"):
"""Code for parsing MAiNGO results files."""
import itertools
import re
p = re.compile(r" +(\S+) +\S+ +(\S+)")
val_map = {}
with open(results_file_name, "r") as f:
# Advance two lines
for line in itertools.islice(f, 2, None):
try:
var_name, val = p.search(line).groups()
except AttributeError:
break
val_map[var_name] = val
return val_map
[docs]
def solve(
P,
file_name=None,
relaxation_only_constraints=None,
squashing_constraints=None,
silent=False,
cse=True,
outputs=None,
add_intermediates_as_output=False,
reuse=None,
**options,
):
"""Solve problem P using MAiNGO."""
from comando.utility import canonical_file_name, check_reuse_or_overwrite
base_name, file_name = canonical_file_name(P.name, ".ale", file_name)
check_reuse_or_overwrite(file_name, reuse)
sym_map, suffixes = write_ale_file(
P,
file_name,
relaxation_only_constraints,
squashing_constraints,
cse,
outputs,
add_intermediates_as_output,
reuse,
)
if options:
settings_name = f"{base_name}_Settings.txt"
write_settings_file(options, settings_name)
else:
settings_name = None
ret = call_maingo(file_name, settings_name, silent)
if ret != 0:
raise RuntimeError("ERROR: Solver returned nonzero exit status!")
with open("MAiNGO.log", "r") as f:
content = f.read()
INF_MSG = "Problem is infeasible!"
if INF_MSG in content:
print("Log says:", INF_MSG)
else:
vals = get_results()
for dv in P.design_variables:
dv.value = vals[sym_map[dv]]
for ov in P.operational_variables:
for idx, name in sym_map[ov].items():
ov[idx] = vals[name]
return ret