"""Input file generation for BARON."""
# This file is part of the COMANDO project which is released under the MIT
# license. See file LICENSE for full license details.
#
# AUTHORS: Marco Langiu
import sys
from functools import partial
from math import isinf
import comando
import comando.core
from comando import Eq, Le
from comando.utility import StrParser, get_index, get_vars, is_indexed, split
# In some newer macOS versions 'System Integrity Protection' appears to prevent
# interactive Python sessions to access certain environment variables like
# DYLD_LIBRARY_PATH which is where shared libraries such as for CPLEX are
# stored by default.
# As Baron relies on DYLD_LIBRARY_PATH to be set correclty, we do this here
if sys.platform == "darwin":
# NOTE: Make sure the shared library for CPLEX 12.10 can be found!
import os
import subprocess
def search_cplex_library(major="12", minor="10", patch="0", *ignored):
"""Locate the shared library file for the given CPLEX version."""
cplex_library = f"libcplex{major}{minor}{patch}.dylib"
print(f"searching for {cplex_library}...")
if ignored:
print("ignoring:", ignored)
res = subprocess.check_output(f"locate {cplex_library}", shell=True)
try:
loc = res.decode().split()[0]
print(f"...found at {loc}!")
print("Temporarily adding to DYLD_LIBRARY_PATH!")
os.environ["DYLD_LIBRARY_PATH"] = os.path.dirname(loc)
return True
except IndexError:
print("... nothing found!")
return False
if not search_cplex_library():
prompt = (
"Please enter the CPLEX version you have installed\n"
"in the form major.minor.patch, e.g., 12.10.0 or hit\n"
"enter if you do not have CPLEX or don't want to use it\n"
)
while True:
version = input(prompt).split(".")
if version[0] == "" or search_cplex_library(*version):
break
baron_str_map = {
"Neg": lambda arg: f"(-{arg})", # unary negation requires ()!
"LessThan": lambda lts_minus_gts, ZERO: f"{lts_minus_gts} <= 0",
"GreaterThan": lambda lts_minus_gts, ZERO: f"{lts_minus_gts} <= 0",
# NOTE: symengine stores both a == 0 and 0 == a as the latter!
"Equality": lambda ZERO,
lhs_minus_rhs: f"{lhs_minus_rhs} == 0", # BARON needs a == 0
"Pow": lambda base, exponent: f"{base} ^ {exponent}",
"tanh": lambda arg: f"(1 - 2/(exp(2 * ({arg})) + 1))",
}
[docs]
def baron_pow_callback(parser, expr, idx):
"""Handle special pow calls in BARON."""
base, exponent = expr.args
if base == comando.E:
return parser.str_map["exp"](*parser.parse_args((exponent,), idx))
# NOTE: BARON does not allow x^y, where x and y are both variables.
# It is permissible to have either x or y as a variable in this
# case but not both. The following reformulation can be used
# around this: x^y = exp(y * log(x))
if all(get_vars(arg) for arg in expr.args):
arg = base * comando.log(exponent)
return parser.str_map["exp"](*parser.parse_args((arg,), idx))
return None # No special case, handle normally
[docs]
class BaronParser(StrParser): # pylint: disable=too-few-public-methods
"""A class for parsing comando expressions to baron Syntax."""
def __init__(self, sym_map):
super().__init__(sym_map, baron_str_map, pow_callback=baron_pow_callback)
[docs]
def options_section(options):
"""Write the OPTIONS section."""
opts = "\n".join(f"{opt}: {val};" for opt, val in options.items())
return f"OPTIONS {{\n{opts}\n}}\n\n" if options else ""
def _name(prefix, n=1, i=None):
"""Generate n unused names of the form {prefix}{i} starting from i."""
if i is None:
try:
i = _name.i[prefix]
except (AttributeError, KeyError):
i = 0
for _ in range(n):
_name.i[prefix] = i + 1
yield f"{prefix}{i}"
i += 1
_name.i = {}
var_name = partial(_name, "x")
con_name = partial(_name, "c")
[docs]
def variables_section(var_map, prios=None):
"""Write the (BINARY/INTEGER/POSITIVE) VARIABLES sections."""
all_vars = set(var_map)
# continuous vs. integer
c_vars, i_vars = split(all_vars, lambda v: v.is_integer)
# general integer vs. binary
i_vars, b_vars = split(i_vars, lambda v: v.is_binary)
# general vs. 'positive' (lb = 0 for BARON)
_, p_vars = split(
all_vars - b_vars, lambda v: all(v.lb == 0) if v.is_indexed else v.lb == 0
)
gc_vars = c_vars - p_vars # 'general continuous' variables
variable_groups = {
"BINARY_VARIABLES": b_vars,
"INTEGER_VARIABLES": i_vars,
"POSITIVE_VARIABLES": p_vars,
"VARIABLES": gc_vars,
}
res = ""
for group, vars in variable_groups.items(): # sections for each group
if vars:
var_reps = (
", ".join(var_map[v].values()) if v.is_indexed else var_map[v]
for v in vars
)
res += f"""{group} {", ".join(var_reps)};\n"""
for var, repr in var_map.items():
if var.is_indexed:
for i, var_i_repr in repr.items():
res += f"// {var_i_repr}: {var}[{i}]\n"
else:
res += f"// {repr}: {var}\n"
lbs = i_vars.union(c_vars) - p_vars
# lbnds_gen = ('\n'.join(f'{r}: {v[i].lb};' for i, r in var_map[v].items()
# if not isinf(v[i].lb)) if v.is_indexed
# else ('' if isinf(v.lb) else f'{var_map[v]}: {v.lb}; ')
# for v in lbs)
# lbnds = '\n'.join(lbnds_gen)
def lbnds_gen():
for v in lbs:
if v.is_indexed:
yield from (
f"{r}: {v[i].lb};"
for i, r in var_map[v].items()
if not isinf(v[i].lb)
)
elif not isinf(v.lb):
yield f"{var_map[v]}: {v.lb}; "
lbnds = "\n".join(lbnds_gen())
ubs = lbs.union(p_vars)
# ubnds_gen = ('\n'.join(f'{r}: {v[i].ub};' for i, r in var_map[v].items()
# if not isinf(v[i].ub)) if v.is_indexed
# else ('' if isinf(v.ub) else f'{var_map[v]}: {v.ub}; ')
# for v in ubs)
# ubnds = '\n'.join(ubnds_gen)
def ubnds_gen():
for v in ubs:
if v.is_indexed:
yield from (
f"{r}: {v[i].ub};"
for i, r in var_map[v].items()
if not isinf(v[i].ub)
)
elif not isinf(v.ub):
yield f"{var_map[v]}: {v.ub}; "
ubnds = "\n".join(ubnds_gen())
for b_type, bnds in ("LOWER", lbnds), ("UPPER", ubnds):
if bnds:
res += f"\n{b_type}_BOUNDS {{\n{bnds}\n}}\n"
if prios:
prio_gen = (
(
"\n".join(f"{r}: {p};" for r in var_map[v].values())
if v.is_indexed
else f"{var_map[v]}: {p};"
)
for v, p in prios.items()
)
prio_section = "\n".join(prio_gen)
if prio_section:
res += f"\nBRANCHING_PRIORITIES {{\n{prio_section}\n}}\n"
return res
[docs]
def constraints_section(con_map, rel_only_cons, convex_cons, parse):
"""Write the (RELAXATION_ONLY/CONVEX) EQUATIONS sections."""
# Assuming constraints is a dict con->name and rel_only_cons and
# convex_cons are sets of cons
ro_cons = {con: con_map[con] for con in rel_only_cons}
conv_cons = {con: con_map[con] for con in convex_cons}
constraint_groups = {
"EQUATIONS": con_map,
"RELAXATION_ONLY_EQUATIONS": ro_cons,
"CONVEX_EQUATIONS": conv_cons,
}
res = ""
for group, cons in constraint_groups.items():
if cons:
res += f"""{group} {
", ".join(
", ".join(name.values()) if is_indexed(con) else name
for con, name in cons.items()
)
};\n"""
con_defs = (
(
"\n".join(f"{n[i]}: {parse(c, i)};" for i in get_index(c))
if is_indexed(c)
else f"{n}: {parse(c)};"
)
for c, n in con_map.items()
)
res += "\n\n" + "\n".join(con_defs)
return res
[docs]
def objective_section(P, parse):
do = parse(P.design_objective)
ooe = P.operational_objective
if ooe == 0:
return f"\nOBJ: minimize {do};"
ts = P.timesteps
if ts is None:
oo = " + ".join(
f"{p} * ({parse(ooe, s)})" for s, p in P.scenario_weights.items()
)
elif P.scenarios is None:
oo = " + ".join(f"{dt} * ({parse(ooe, t)})" for t, dt in ts.items())
else:
oo = " + ".join(
f"""{p} * ({
" + ".join(f"{dt} * ({parse(ooe, (s, t))})" for t, dt in ts[s].items())
})"""
for s, p in P.scenario_weights.items()
)
return f"\nOBJ: minimize {do} + {oo};"
[docs]
def start_section(var_map):
s_gen = (
(
"\n".join(f"{r}: {val};" for r, val in zip(var_map[v].values(), v.value))
if v.is_indexed
else f"{var_map[v]}: {v.value};"
)
for v in var_map
)
_start_section = "\n".join(s_gen)
return f"\nSTARTING_POINT {{\n{_start_section}\n}}\n"
def _spread_bounds(lb, ub, frac=0.1):
if lb < 0:
lb *= 1 + frac
else:
lb *= 1 - frac
if ub < 0:
ub *= 1 - frac
else:
ub *= 1 + frac
return lb, ub
# DEBUG:
# For very long expressions BARON seems to have issues, terminating without
# having reached the specified termination criteria and complaining that:
# User did not provide appropriate variable bounds.
# Some model expressions are unbounded.
# or incorrectly reporting problems as infeasible when intermediate expressions
# are introduced as variables with appropriate bounds.
# One issue might be expressions whose bounds are identical, however, even
# spreading the bounds of such expressions results in the reported
# infeasibility.
# Using extreme bounds that are certain to be valid again results in the
# complaint about unbounded expressions...
# b = -1e10, 1e10
[docs]
def apply_cse(P, var_map):
"""Replace reoccurring expressions with variables."""
do = P.design_objective
oo = P.operational_objective
cons = P.constraints
reps, exprs = comando.cse((do, oo, *cons.values()))
defs = {}
cons = {}
n = len(P.index)
for sym, rep in reps:
e = rep.subs(defs)
index = get_index(e)
if index is None:
b = comando.utility.bounds(e)
if b[1] - b[0] < 1e-15:
b = _spread_bounds(b[0], b[1])
x = comando.core.Variable(f"intermediate{sym.name[1:]}", bounds=b)
var_map[x] = next(var_name())
else:
b = comando.utility.bounds(e)
b = (min(b[0]), max(b[1]))
if b[1] - b[0] < 1e-15:
b = _spread_bounds(b[0], b[1])
x = comando.core.VariableVector(f"intermediate{sym.name[1:]}", bounds=b)
x.instantiate(get_index(e))
var_map[x] = dict(zip(index, var_name(len(index))))
# print(sym, b)
cons[f"{x.name}_def"] = Eq(x, e)
defs[sym] = x
P2 = comando.Problem(
exprs[0].subs(defs),
exprs[1].subs(defs),
{
**{
con_id: expr.subs(defs)
for con_id, expr in zip(P.constraints, exprs[2:])
},
**cons,
},
None,
P.timesteps,
P.scenarios,
name=f"CSE_reformulation_of_{P.name}",
)
return P2
[docs]
def handle_tanh(expr, tanh_definitions, var_map):
"""Search the expression for occurrences of tanh and substitute them."""
tanh_occurrences = {}
for atom in expr.atoms(comando.Function):
if (comando.utility.get_type_name(atom)) == "tanh":
arg = atom.args[0]
index = get_index(arg)
if index is None:
tanh_var = comando.core.Variable(f"tanh{handle_tanh.n}", bounds=(-1, 1))
var_map[tanh_var] = next(var_name())
else:
tanh_var = comando.core.VariableVector(
f"tanh{handle_tanh.n}", bounds=(-1, 1)
)
tanh_var.instantiate(index)
var_map[tanh_var] = dict(zip(index, var_name(len(index))))
handle_tanh.n += 1
tanh_occurrences[atom] = tanh_var
tanh_definitions[tanh_var] = 1 - 2 / (comando.exp(2 * arg) + 1)
return expr.xreplace(tanh_occurrences)
handle_tanh.n = 0
[docs]
def normalize(con):
"""Bring constraints to a normal form baron can handle."""
try:
return Le(con.lts - con.gts, 0)
except AttributeError:
return Eq(0, con.lhs - con.rhs)
[docs]
def write_bar_file(P, file_name, options=None, cse=False, reuse=False):
"""Write a baron input file for problem P."""
if options is None:
options = {}
prios = options.pop("branching_priorities", None)
n = len(P.index)
_name.i = {}
var_map = {}
for v in P.design_variables:
var_map[v] = next(var_name())
for v in P.operational_variables:
index = v.expansion.index
var_dict = dict(zip(index, var_name(len(index))))
for v_i, name_i in zip(v, var_dict.values()):
var_map[v_i] = name_i
var_map[v] = var_dict
if cse:
P = apply_cse(P, var_map)
# tanh handling
# TODO: Do not overwrite but use copies of constraints and objective!
tanh_definitions = {}
P.design_objective = handle_tanh(P.design_objective, tanh_definitions, var_map)
P.operational_objective = handle_tanh(
P.operational_objective, tanh_definitions, var_map
)
for c_id, con in P.constraints.items():
P.constraints[c_id] = handle_tanh(con, tanh_definitions, var_map)
for tanh_var, tanh_def in tanh_definitions.items():
P.constraints[f"{tanh_var.name}_def"] = Eq(tanh_var, tanh_def)
cons = P.constraints
if P.states:
for iv, *_ in P.states.values():
if isinstance(iv, (comando.core.Variable, comando.core.VariableVector)):
if iv.is_indexed:
for iv_j in iv:
var_map[iv_j] = next(var_name())
else:
var_map[iv] = next(var_name())
print(
"WARNING: There are differential constraints that will be "
"discretized using implicit Euler discretization, if you want "
"an advanced time discretization you must reformulate your "
"problem prior to passing it to the BARON interface!"
)
from comando.utility import handle_state_equations
handle_state_equations(P, cons.__setitem__)
parser = BaronParser(var_map)
parse = parser.cached_parse
def check(c_id, con):
"""Check whether a constraint has variables, if not evaluate."""
if get_vars(con):
return True
# If constraint doesn't have any variables...
res = comando.utility.parse(
con, # ...test whether it is satisfied
{p: p.value for p in con.free_symbols},
)
if not all(res) if is_indexed(con) else not res: # if there's a violation
from comando import ImpossibleConstraintException
raise ImpossibleConstraintException(
f'Constraint "{c_id}" contains only parameters and evaluates to False!'
)
else: # else continue (returns None which is equivalent to False!)
print(
f'INFO: Constraint "{c_id}" contains only parameters and '
"evaluates to True! Skipping..."
)
con_map = {
normalize(c): (
dict(zip(P.index, con_name(n))) if is_indexed(c) else next(con_name())
)
for c_id, c in cons.items()
if check(c_id, c)
}
if not reuse:
with open(file_name, "w") as f:
f.write(options_section(options))
f.write(variables_section(var_map, prios))
f.write(constraints_section(con_map, {}, {}, parse))
f.write(objective_section(P, parse))
f.write(start_section(var_map))
return var_map, con_map
# TESTS f = open(input_file_name, 'w')
# str_parse(-b, sym_map)
# str_parse(a + -b, sym_map)
# str_parse(1 + a + -b, sym_map)
# str_parse(a - (b + 1), sym_map)
# str_parse(a + 1 - (b + 1), sym_map)
# str_parse(2 ** a + -b, sym_map)
# str_parse(a + -(b + 1), sym_map)
# str_parse(a + -b - c, sym_map, idx=1)
# str_parse(a + -log(b), sym_map)
# str_parse(a + -log(b + 1), sym_map)
# str_parse(a + -exp(b + 1), sym_map)
# str_parse(a ** (log(b) + 1), sym_map)
# str_parse(a ** -(log(b) + 1), sym_map)
# str_parse(-(b + 1), sym_map)
# str_parse(a ** -1.2, sym_map)
#
# str_parse(a - b, sym_map)
# str_parse(a + -(1 + b), sym_map)
# str_parse(a + c**-(1 + b), sym_map, idx=1)
# str_parse(a + c**log(1 + b), sym_map, idx=1)
# str_parse(log(b + 1), sym_map, idx=1)
[docs]
def get_results(results_file_name="res.lst"):
"""Code for parsing baron results files."""
import re
p = re.compile(r" (\S+)\s+\S+\s+(\S+)")
val_map = {}
with open(results_file_name, "r") as f:
# Advance until the results section
# while not f.readline().startswith(' variable'):
# continue
ready = False
for line in f.readlines():
if not ready:
ready = line.startswith(" variable")
continue
try:
var_name, val = p.search(line).groups()
except AttributeError:
pass
val_map[var_name] = float(val)
return val_map
[docs]
def solve(P, file_name=None, silent=False, cse=False, reuse=None, **options):
"""Solve the problem specified in the input_file with baron."""
from comando.utility import canonical_file_name, check_reuse_or_overwrite, syscall
base_name, file_name = canonical_file_name(P.name, ".bar", file_name)
check_reuse_or_overwrite(file_name, reuse)
for out_name in "ResName", "SumName", "TimName":
if out_name not in options:
options[out_name] = f'"{base_name}.{out_name[:3].lower()}.lst"'
for bool_option in ["results", "summary", "times"]:
if bool_option in options:
options[bool_option] = int(options[bool_option]) # bool to int!
var_map, con_map = write_bar_file(P, file_name, options, cse, reuse)
result_file_name = options["ResName"][1:-1]
log_name = f"{base_name}.baron.log"
ret = syscall("baron", file_name, log_name=log_name, silent=silent)
if ret and not silent:
print(f"BARON terminated with return code {ret}!")
return ret
val_map = get_results(result_file_name)
if not val_map:
return -1 # no results (e.g. problem infeasible, no license)
for comando_var, baron_var in var_map.items():
if comando_var.is_indexed:
comando_var.value = {i: val_map[vi] for i, vi in baron_var.items()}
else:
comando_var.value = val_map[baron_var]
return ret
[docs]
def get_times_and_bounds(baron_log_file):
"""Code for parsing baron logs for time, and bounds."""
import re
p = re.compile(r"[ \*] +\S+ +\S+ +(\S+) +(\S+) +(\S+)")
times = []
lb_data = []
ub_data = []
lines = iter(baron_log_file.split("\n"))
baron_log_file.split("\n")[0].startswith(" Iteration")
for line in lines:
if not line.startswith(" Iteration"):
continue
break
else:
raise RuntimeError
lines = [*lines]
for line in lines:
try:
t, lb, ub = p.search(line).groups()
except AttributeError:
break
times.append(float(t))
lb_data.append(float(lb))
ub_data.append(float(ub))
return times, lb_data, ub_data