Source code for comando.interfaces.gams

"""Routines for translation to GAMS syntax."""

# This file is part of the COMANDO project which is released under the MIT
# license. See file LICENSE for full license details.
#
# AUTHORS: Marco Langiu
import comando.utility
from comando.utility import StrParser, _str_map, is_indexed, split

gams_str_map = {
    "LessThan": lambda lhs, rhs: f"{lhs} =L= {rhs}",
    "GreaterThan": lambda lhs, rhs: f"{lhs} =G= {rhs}",
    "Equality": lambda lhs, rhs: f"{lhs} =E= {rhs}",
    "power": lambda base, exponent: f"power({base}, {exponent})",
    "exp": lambda arg: f"EXP({arg})",
    "log": lambda arg: f"LOG({arg})",
}
for key in "()", "Add", "Neg", "Sub", "Mul", "Div", "Inv", "Pow":
    gams_str_map[key] = _str_map[key]


[docs] def gams_pow_callback(parser, expr, idx): """Handle special pow calls in GAMS.""" base, exponent = expr.args if base == comando.E: return parser.str_map["exp"](*parser.parse_args((exponent,), idx)) # NOTE: GAMS provides a special function for integer powers. # the notation x ** y, is handled as exp(y * log(x)) if exponent.is_Integer and exponent.is_positive: return parser.str_map["power"]( *parser.parse_args( ( base, exponent, ), idx, ) ) return None # No special case, handle normally
[docs] class GamsParser(StrParser): # pylint: disable=too-few-public-methods """A class for parsing comando expressions to GAMS Syntax.""" def __init__(self, sym_map): super().__init__(sym_map, gams_str_map, pow_callback=gams_pow_callback)
def _entry(name, value, i_name=None, field="", doc=""): """Create a string representation of a gams section entry.""" entry = f"\t{name}" if i_name: entry += f"({i_name}) {doc}" if value is not None: delim = pad = "\n\t" data = delim.join( f"{idx}{field}\t{v}" for idx, v in zip(_entry.defs[i_name], value) ) entry += f"{pad}/{pad}{data}{pad}/" else: entry += f" {doc}" if value is not None: delim = ", " pad = " " try: # iterable of values data = delim.join(str(v) for v in value) except TypeError: # scalar data = str(value) entry += f"{pad}/{pad}{data}{pad}/" if not hasattr(_entry, "defs"): _entry.defs = {} _entry.defs[name] = value return entry + "\n"
[docs] def literal(i): """Get a gams representation for the literal index i in parentheses.""" if isinstance(i, tuple): # scenario-timestep pairs idx = f"'{'_'.join(str(e) for e in i)}'" else: # only timesteps idx = f"'{i}'" return f"({idx})"
GAMS_HEADER = ( "* GAMS cannot handle too many digits in numbers.\n" "* This tells it to ignore those extra digits.\n" "$OFFDIGIT\n\n" )
[docs] def write_sets_section(P): """Generate a string representation of a SETS section.""" ts = P.timesteps if ts is None: sets = f"SETS\n\n{_entry('s', P.scenarios, doc='scenarios')}" elif P.scenarios is None: sets = f"SETS\n\n{_entry('t', ts.keys(), doc='timesteps')}" else: sets = f"SETS\n\n{_entry('s', P.scenarios, doc='scenarios')}" timesteps = ["_".join(str(i) for i in st) for st in P.index] sets += f"\n{_entry('t', timesteps, doc='timesteps')}" return sets + ";"
[docs] def write_parameters_section(P, iname): """Generate a string representation of a PARAMETERS section.""" entries = [] if P.timesteps is not None: entries.append(_entry("Delta_t", P.timesteps, "t")) if P.scenarios is not None: entries.append(_entry("prob", P.scenario_weights, "s")) param_entries = ( *entries, *( _entry(p.name, p.value, iname) if p.is_indexed else _entry(p.name, p.value) for p in P.parameters ), ) return f"PARAMETERS\n\t\n{''.join(param_entries)};"
[docs] def write_variables_section(P, iname): """Write the different 'VARIABLES' sections and variable bounds.""" var_types = {"BINARY ": [], "INTEGER ": [], "": []} # last == continuous all_vars = P.design_variables.union(P.operational_variables) for v in all_vars: # Sort the variabes according to their domain if v.is_binary: var_types["BINARY "].append(v) elif v.is_integer: var_types["INTEGER "].append(v) else: var_types[""].append(v) def _var_values(var): # A helper method for variable entries """Convert a possibly indexed variable into GAMS representation.""" if var.is_indexed: return (v.value, iname) else: return (None, None) if var.value is None else ([f"L {var.value}"], None) variables = "" for prefix, vars in var_types.items(): if vars: # Only print this section if there are variables of this type # var_entries = (_entry(v.name, *_var_values(v)) for v in vars) # NOTE: For some odd reason neither of the following works: # [_entry(v.name, *_var_values(v), '.L') for v in vars] # # def var_entries(): # for v in vars: # yield _entry(v.name, *_var_values(v), '.L') var_entries = [] for v in vars: var_entries.append(_entry(v.name, *_var_values(v), ".L")) variables += f"{prefix}VARIABLES\n\n{''.join(var_entries)};\n\n" # Variable bounds def _bounds(v): # A helper method for variable bound entries """Return the string to set the bounds of variable v.""" b_types = ["LO", "UP"] if v.is_indexed: # print the default bounds and the indexed ones that are different: # e.g. for a positive vector foo -> foo(t, s).LO = 0; res = ( " ".join( f"{v.name}.{b_type}({iname}) = {val};" for b_type, val in zip(b_types, v._bounds) if val is not None ) + "\n" ) # if in scenario 's1' foo is bounded by 1 from above this gives: # foo('s1_t1').UB = 1; # foo('s1_t2').UB = 1 # ... for i, iv in v.expansion.items(): data = zip(b_types, iv.bounds, v._bounds) iv_entry = " ".join( f"{v.name}.{b_type}{literal(i)} = {val};" for b_type, val, default in data if val not in [default, None] ) if iv_entry: res += f"{iv_entry}\n" return res return ( " ".join( f"{v.name}.{b_type} = {val};" for b_type, val in zip(b_types, v.bounds) if val is not None ) + "\n" ) bounds = "".join(_bounds(v) for v in var_types["INTEGER "] + var_types[""]) if bounds: variables += f"* BOUNDS\n{bounds}" return variables
[docs] def write_equations_section(P, iname, parse): """Generate a string representation of a EQUATIONS section.""" def eq(id): """Normalize the equation id if it doesn't fit GAMS syntax.""" import re if re.match(r"^\w+$", id): eq_id = id else: eq_id = f"constraint_{eq.n}" eq.n += 1 eq.map[eq_id] = id return eq_id eq.n = 0 eq.map = {} cons = {} for c_id, c in P.constraints.items(): if comando.utility.is_indexed(c): cons[f"{eq(c_id)}({iname})"] = c else: cons[eq(c_id)] = c declarations = "\n".join(cons) if declarations: equations = f"EQUATIONS\n\n{declarations};\n\n* INSTANTIATIONS\n" + "\n".join( f"{c_id}.. {parse(c)};" for c_id, c in cons.items() ) return equations
[docs] def write_objective(P, parse): """Create the string for the objective function of P.""" return parse(P.objective) # simple solution
# explicit flat parsing # ts = [*P.timesteps.items()] # if P.scenarios is None: # oo = ' + '.join(f'{dt} * ({parse(ooe)})' for t, dt in ts) # else: # oo = " + ".join(f"""{p} * ({' + '.join(f'{dt} * ({parse(ooe, (s, t))})' # for t, dt in ts)})""" # for s, p in P.scenarios.items()) # return f"{do} + {oo}" # DEBUG: # S.existing_components.clear() # S = comando.System('TEST') # p = S.make_parameter('p') # q = S.make_parameter('q') # x = S.make_design_variable('x') # y = S.make_operational_variable('y') # S.add_eq_constraint(x ** 2, p, 'dc') # S.add_eq_constraint(y ** 0.5, -x * q, 'oc') # P = S.create_problem(x, y, timesteps=[1,2,3]) # P.set_values({'TEST_p': 0.5, 'TEST_q': {1: -42, 2: -15, 3: 0}}) # P.get_data() # DEBUG
[docs] def populate_sym_map(iname, dvars=(), ovars=(), pars=(), sym_map=None): """Create a GAMS symbol map or populate an existing one with new symbols. Arguments --------- dvars : iterable design variables ovars : iterable operational variables pars : iterable parameters Returns ------- sym_map : dict Symbol map with GAMS representation for each passed symbol. """ import re from itertools import chain p = re.compile(r"(\w+)\[\(?'?(\w+)'?(?:, '?(\w+)'?)*\)?\]") if sym_map is None: sym_map = {} dpars, opars = split(pars, is_indexed) for sym in chain(dpars, dvars): sym_map[sym] = sym.name for sym in chain(opars, ovars): sym_map[sym] = f"{sym.name}({iname})" for i, symi in sym.expansion.items(): name, *indices = p.match(symi.name).groups() sym_map[symi] = ( f"{name}('{'_'.join(i for i in indices if i)}')" # f'{sym.name}{literal(i)}' ) return sym_map
[docs] def write_gms_file(P, file_name, model_type="MINLP"): # , flat=False): """Write a GAMS file based on the COMANDO Problem.""" name = P.name iname = "t" if P.index.name is None else P.index.name # Parsing setup sym_map = populate_sym_map( iname, P.design_variables, P.operational_variables, P.parameters ) if P.states: from warnings import warn msg = ( "The handling of dynamic constraints in the GAMS interface " "is outdated; some features of COMANDO may not work as expected." ) warn(msg) from comando import is_trivial from comando.utility import implicitEuler state_constraints, prev_states = implicitEuler(P) # P.constraints.update(state_constraints) # populate_sym_map(ovars=prev_states, pars=[P.Delta_t], sym_map=sym_map) populate_sym_map(pars=[P.Delta_t], sym_map=sym_map) for c_id, con in state_constraints.items(): for i, idx in enumerate(P.index): # BUG: Gives False for some constraints! # con_i = comando.utility.parse(con, idx=idx) con_i_lhs = comando.utility.parse(con.lhs, idx=idx) con_i_rhs = comando.utility.parse(con.rhs, idx=idx) con_i = comando.Eq(con_i_lhs, con_i_rhs) c_name = f"{c_id}_{i}" if is_trivial(con_i): print( f"WARNING: Constraint {c_name} is trivially " "satisfied, skipping..." ) else: P.constraints[c_name] = con_i parse = GamsParser(sym_map) # OBJECTIVE # model_type = 'MINLP' # TODO: Automatic selection # + (f"OPTION {model_type} = {solver};\n" if solver else "") objective = ( "VARIABLE OBJ;\nEQUATION OBJ_EXPRESSION;\n" f"OBJ_EXPRESSION.. OBJ =E= {write_objective(P, parse)};\n\n" f"MODEL {name} /all/;\n" f"SOLVE {name} USING {model_type} minimising OBJ;\n" ) with open(file_name, "w") as f: f.write(GAMS_HEADER) f.write("\n\n") f.write(write_sets_section(P)) f.write("\n\n") f.write(write_parameters_section(P, iname)) f.write("\n\n") f.write(write_variables_section(P, iname)) f.write("\n\n") f.write(write_equations_section(P, iname, parse)) f.write("\n\n") f.write(objective)
# return sym_map
[docs] def get_results(results_file_name): """Code for parsing gams results files.""" import re p1 = re.compile(r"---- VAR (\S+)\s+\S+\s+(\S+)") p2 = re.compile(r"---- VAR (\S+)\s+\n") p3 = re.compile(r"(\S+)\s+\S+\s+(\S+)") val_map = {} with open(results_file_name, "r") as f: lines = iter(f.readlines()) def skip_header(line): if line.startswith("\x0cGAMS "): next(lines) next(lines) return next(lines) return line def advance(n=1): for i in range(n): line = skip_header(next(lines)) return line for line in lines: try: # Match scalar variable var_name, val = p1.search(line).groups() val_map[var_name] = 0 if val == "." else float(val) advance() continue except AttributeError: pass try: # Match indexed variable var_name = p2.search(line).group(1) line = advance(3) values = {} while True: try: line = skip_header(next(lines)) idx, value = p3.search(line).groups() values[tuple(idx.split("."))] = ( 0 if val == "." else float(value) ) except AttributeError: val_map[var_name] = values advance() break except AttributeError: pass if line.startswith("**** REPORT SUMMARY"): break return val_map
[docs] def solve(P, input_file_name=None, silent=False, **options): """Solve the problem specified in the input_file using GAMS. Apart from the usual GAMS options, the model type (e.g. LP/MINLP) may be specified explixitly using the model_type option, the default is MINLP. """ import os from comando.utility import syscall if input_file_name is None: base_name = P.name input_file_name = f"{P.name}.gms" elif input_file_name.endswith(".gms"): base_name = input_file_name[:-4] else: base_name = input_file_name input_file_name = base_name + ".gms" while os.path.isfile(input_file_name): yn = input(f"A file '{input_file_name}' already exists, overwrite (y/n)?") if yn.lower() == "y": break if yn.lower() == "n": print("Aborting...") return 1 model_type = options.pop("model_type", "MINLP") write_gms_file(P, input_file_name, model_type) ret = syscall( "gams", input_file_name, *(f"{option}={value}" for option, value in options.items()), log_name=f"{base_name}.gams.log", silent=silent, ) if ret != 0: print("Something went wrong during solving!") else: val_map = get_results(f"{base_name}.lst") for v in P.design_variables.union(P.operational_variables): v.value = val_map[v.name] return ret
###### # def is_num(x): # """Test whether x is numeric.""" # return isinstance(x, (int, float)) # def allnums(*args): # """Test if all arguments passed are numerical.""" # return all(is_num(arg) for arg in args) # # def is_neg(x): # """Test if x is a negated summand and store the result and negation.""" # if x in is_neg.cache: # return is_neg.cache[x][0] # try: # if x[0] == '-': # is_neg.cache[x] = [True, x[1:]] # return True # except TypeError: # if x < 0: # is_neg.cache[x] = [True, str(-x)] # return True # return False # # # is_neg.cache = {} # # # def gams_sum(*args): # """Create a string representation of `sum(args)` in GAMS syntax.""" # summands, negs = comando.utility.split(args, is_neg) # res = ' + '.join(str(s) for s in summands) # if negs: # negs is nonempty! # if len(negs) > 1: # subtrahends = f"[{' + '.join(is_neg.cache[n][1] for n in negs)}]" # else: # subtrahends = is_neg.cache[negs[0]][1] # return f'{res} - {subtrahends}' # return res # # # def is_inv(x): # """Test if x is an inverted factor and store the result and inversion.""" # if x in is_inv.cache: # return is_inv.cache[x][0] # try: # if x[:2] == '1/': # is_inv.cache[x] = [True, x[2:]] # return True # except TypeError: # pass # return False # # # is_inv.cache = {} # # # def gams_prod(*args): # """Create a string representation of `product(args)` in GAMS syntax.""" # negate = False # if args[0] == -1: # negate = True # args = args[1:] # factors, invs = comando.utility.split(args, is_inv) # res = ' * '.join(f'({f})' if '+' in f else str(f) for f in factors) # if invs: # invs is nonempty! # if len(invs) > 1: # dividends = f"[{' + '.join(is_inv.cache[i][1] for i in invs)}]" # else: # dividends = is_inv.cache[invs[0]][1] # res = f'{res} / {dividends}' # if negate: # return f"-[{res}]" if ' ' in res else f'-{res}' # return res # # # def gams_pow(*args): # """Create a string representation of `pow(*args)` in GAMS syntax.""" # if args[1] == -1: # return f'1/[{args[0]}]' # if args[1] == 2: # return f'SQR({args[0]})' # try: # test if the exponent is integer # if int(args[1]) == args[1]: # return f"POWER({args[0]}, {args[1]})" # except ValueError: # pass # # real power (equivalent to EXP[n*LOG(x)]) # return f"RPOWER({args[0]}, {args[1]})" # # # gams_map = { # 'Add': gams_sum, # 'Mul': gams_prod, # 'Pow': gams_pow, # 'LessThan': lambda x, y: f"{x} =L= {y}", # 'GreaterThan': lambda x, y: f"{x} =G= {y}", # 'Equality': lambda x, y: f"{x} =E= {y}"} # # # def experimental(): # from gams import GamsWorkspace # import os # import sys # cwd = os.getcwd() # filename = 'test_if_else.gms' # ws = GamsWorkspace() # system_directory='/Applications/GAMS25.0/sysdir/') # filename = "Voll.gms" # t1 = ws.add_job_from_file(os.path.join(cwd, filename)) # t1.run(output=sys.stdout) # # for o in t1.out_db: # print(o.name) # for record in o: # print('\t', record) # # try: # # # # # if record.keys: # # # keys = ", ".join(str(key) for key in record.keys) # # # print(f'\n\t({keys}): level = {record.level}, ' # # # f'marginal = {record.marginal}') # # # else: # # # print(f'\n\tlevel = {record.level}, ' # # # f'marginal = {record.marginal}') # # except: # # for record in o: # # print('\t', record)