"""Routines for translation to GAMS syntax."""
# This file is part of the COMANDO project which is released under the MIT
# license. See file LICENSE for full license details.
#
# AUTHORS: Marco Langiu
import comando.utility
from comando.utility import indexed, split, _str_map, StrParser
gams_str_map = {'LessThan': lambda lhs, rhs: f'{lhs} =L= {rhs}',
'GreaterThan': lambda lhs, rhs: f'{lhs} =G= {rhs}',
'Equality': lambda lhs, rhs: f'{lhs} =E= {rhs}',
'power': lambda base, exponent: f'power({base}, {exponent})',
'exp': lambda arg: f'EXP({arg})',
'log': lambda arg: f'LOG({arg})'}
for key in '()', 'Add', 'Neg', 'Sub', 'Mul', 'Div', 'Inv', 'Pow':
gams_str_map[key] = _str_map[key]
[docs]
def gams_pow_callback(parser, expr, idx):
"""Handle special pow calls in GAMS."""
base, exponent = expr.args
if base == comando.E:
return parser.str_map['exp'](*parser.parse_args((exponent, ), idx))
# NOTE: GAMS provides a special function for integer powers.
# the notation x ** y, is handled as exp(y * log(x))
if exponent.is_Integer and exponent.is_positive:
return parser.str_map['power'](*parser.parse_args((base, exponent, ),
idx))
return None # No special case, handle normally
[docs]
class GamsParser(StrParser): # pylint: disable=too-few-public-methods
"""A class for parsing comando expressions to GAMS Syntax."""
def __init__(self, sym_map):
super().__init__(sym_map, gams_str_map,
pow_callback=gams_pow_callback)
def _entry(name, value, i_name=None, field='', doc=''):
"""Create a string representation of a gams section entry."""
entry = f'\t{name}'
if i_name:
entry += f"({i_name}) {doc}"
if value is not None:
delim = pad = '\n\t'
data = delim.join(f'{idx}{field}\t{v}'
for idx, v in zip(_entry.defs[i_name], value))
entry += f'{pad}/{pad}{data}{pad}/'
else:
entry += f' {doc}'
if value is not None:
delim = ', '
pad = ' '
try: # iterable of values
data = delim.join(str(v) for v in value)
except TypeError: # scalar
data = str(value)
entry += f'{pad}/{pad}{data}{pad}/'
if not hasattr(_entry, 'defs'):
_entry.defs = {}
_entry.defs[name] = value
return entry + '\n'
[docs]
def literal(i):
"""Get a gams representation for the literal index i in parentheses."""
if isinstance(i, tuple): # scenario-timestep pairs
idx = f"'{'_'.join(str(e) for e in i)}'"
else: # only timesteps
idx = f"'{i}'"
return f'({idx})'
GAMS_HEADER = '* GAMS cannot handle too many digits in numbers.\n' \
'* This tells it to ignore those extra digits.\n' '$OFFDIGIT\n\n'
[docs]
def write_sets_section(P):
"""Generate a string representation of a SETS section."""
ts = P.timesteps
if ts is None:
sets = f"SETS\n\n{_entry('s', P.scenarios, doc='scenarios')}"
elif P.scenarios is None:
sets = f"SETS\n\n{_entry('t', ts.keys(), doc='timesteps')}"
else:
sets = f"SETS\n\n{_entry('s', P.scenarios, doc='scenarios')}"
timesteps = ['_'.join(str(i) for i in st) for st in P.index]
sets += f"\n{_entry('t', timesteps, doc='timesteps')}"
return sets + ';'
[docs]
def write_parameters_section(P, iname):
"""Generate a string representation of a PARAMETERS section."""
entries = []
if P.timesteps is not None:
entries.append(_entry('Delta_t', P.timesteps, 't'))
if P.scenarios is not None:
entries.append(_entry('prob', P.scenario_weights, 's'))
param_entries = (*entries,
*(_entry(p.name, p.value, iname) if p.indexed
else _entry(p.name, p.value) for p in P.parameters))
return f"PARAMETERS\n\t\n{''.join(param_entries)};"
[docs]
def write_variables_section(P, iname):
"""Write the different 'VARIABLES' sections and variable bounds."""
var_types = {'BINARY ': [], 'INTEGER ': [], '': []} # last == continuous
all_vars = P.design_variables.union(P.operational_variables)
for v in all_vars: # Sort the variabes according to their domain
if v.is_binary:
var_types['BINARY '].append(v)
elif v.is_integer:
var_types['INTEGER '].append(v)
else:
var_types[''].append(v)
def _var_values(var): # A helper method for variable entries
"""Convert a possibly indexed variable into GAMS representation."""
if var.indexed:
return (v.value, iname)
else:
return (None, None) if var.value is None \
else ([f'L {var.value}'], None)
variables = ""
for prefix, vars in var_types.items():
if vars: # Only print this section if there are variables of this type
# var_entries = (_entry(v.name, *_var_values(v)) for v in vars)
# NOTE: For some odd reason neither of the following works:
# [_entry(v.name, *_var_values(v), '.L') for v in vars]
#
# def var_entries():
# for v in vars:
# yield _entry(v.name, *_var_values(v), '.L')
var_entries = []
for v in vars:
var_entries.append(_entry(v.name, *_var_values(v), '.L'))
variables += f"{prefix}VARIABLES\n\n{''.join(var_entries)};\n\n"
# Variable bounds
def _bounds(v): # A helper method for variable bound entries
"""Return the string to set the bounds of variable v."""
b_types = ['LO', 'UP']
if v.indexed:
# print the default bounds and the indexed ones that are different:
# e.g. for a positive vector foo -> foo(t, s).LO = 0;
res = ' '.join(f'{v.name}.{b_type}({iname}) = {val};'
for b_type, val in zip(b_types, v._bounds)
if val is not None) + '\n'
# if in scenario 's1' foo is bounded by 1 from above this gives:
# foo('s1_t1').UB = 1;
# foo('s1_t2').UB = 1
# ...
for i, iv in v.expansion.items():
data = zip(b_types, iv.bounds, v._bounds)
iv_entry = ' '.join(f'{v.name}.{b_type}{literal(i)} = {val};'
for b_type, val, default in data
if val not in [default, None])
if iv_entry:
res += f'{iv_entry}\n'
return res
return ' '.join(f'{v.name}.{b_type} = {val};' for b_type, val in
zip(b_types, v.bounds) if val is not None) + '\n'
bounds = ''.join(_bounds(v) for v in var_types['INTEGER '] + var_types[''])
if bounds:
variables += f'* BOUNDS\n{bounds}'
return variables
[docs]
def write_equations_section(P, iname, parse):
"""Generate a string representation of a EQUATIONS section."""
def eq(id):
"""Normalize the equation id if it doesn't fit GAMS syntax."""
import re
if re.match(r'^\w+$', id):
eq_id = id
else:
eq_id = f'constraint_{eq.n}'
eq.n += 1
eq.map[eq_id] = id
return eq_id
eq.n = 0
eq.map = {}
cons = {}
for c_id, c in P.constraints.items():
if comando.utility.indexed(c):
cons[f'{eq(c_id)}({iname})'] = c
else:
cons[eq(c_id)] = c
declarations = "\n".join(cons)
if declarations:
equations = f'EQUATIONS\n\n{declarations};\n\n* INSTANTIATIONS\n' + \
"\n".join(f'{c_id}.. {parse(c)};'
for c_id, c in cons.items())
return equations
[docs]
def write_objective(P, parse):
"""Create the string for the objective function of P."""
return parse(P.objective) # simple solution
# explicit flat parsing
# ts = [*P.timesteps.items()]
# if P.scenarios is None:
# oo = ' + '.join(f'{dt} * ({parse(ooe)})' for t, dt in ts)
# else:
# oo = " + ".join(f"""{p} * ({' + '.join(f'{dt} * ({parse(ooe, (s, t))})'
# for t, dt in ts)})"""
# for s, p in P.scenarios.items())
# return f"{do} + {oo}"
# DEBUG:
# S.existing_components.clear()
# S = comando.System('TEST')
# p = S.make_parameter('p')
# q = S.make_parameter('q')
# x = S.make_design_variable('x')
# y = S.make_operational_variable('y')
# S.add_eq_constraint(x ** 2, p, 'dc')
# S.add_eq_constraint(y ** 0.5, -x * q, 'oc')
# P = S.create_problem(x, y, timesteps=[1,2,3])
# P.set_values({'TEST_p': 0.5, 'TEST_q': {1: -42, 2: -15, 3: 0}})
# P.get_data()
# DEBUG
[docs]
def populate_sym_map(iname, dvars=(), ovars=(), pars=(), sym_map=None):
"""Create a GAMS symbol map or populate an existing one with new symbols.
Arguments
---------
dvars : iterable
design variables
ovars : iterable
operational variables
pars : iterable
parameters
Returns
-------
sym_map : dict
Symbol map with GAMS representation for each passed symbol.
"""
from itertools import chain
import re
p = re.compile(r"(\w+)\[\(?'?(\w+)'?(?:, '?(\w+)'?)*\)?\]")
if sym_map is None:
sym_map = {}
dpars, opars = split(pars, indexed)
for sym in chain(dpars, dvars):
sym_map[sym] = sym.name
for sym in chain(opars, ovars):
sym_map[sym] = f'{sym.name}({iname})'
for i, symi in sym.expansion.items():
name, *indices = p.match(symi.name).groups()
sym_map[symi] = f"{name}('{'_'.join(i for i in indices if i)}')" # f'{sym.name}{literal(i)}'
return sym_map
[docs]
def write_gms_file(P, file_name, model_type='MINLP'): # , flat=False):
"""Write a GAMS file based on the COMANDO Problem."""
name = P.name
iname = 't' if P.index.name is None else P.index.name
# Parsing setup
sym_map = populate_sym_map(iname, P.design_variables,
P.operational_variables,
P.parameters)
if P.states:
from comando.utility import implicitEuler
from comando import is_trivial
state_constraints, prev_states = implicitEuler(P)
# P.constraints.update(state_constraints)
# populate_sym_map(ovars=prev_states, pars=[P.Delta_t], sym_map=sym_map)
populate_sym_map(pars=[P.Delta_t], sym_map=sym_map)
for c_id, con in state_constraints.items():
for i, idx in enumerate(P.index):
# BUG: Gives False for some constraints!
# con_i = comando.utility.parse(con, idx=idx)
con_i_lhs = comando.utility.parse(con.lhs, idx=idx)
con_i_rhs = comando.utility.parse(con.rhs, idx=idx)
con_i = comando.Eq(con_i_lhs, con_i_rhs)
c_name = f'{c_id}_{i}'
if is_trivial(con_i):
print(f'WARNING: Constraint {c_name} is trivially '
'satisfied, skipping...')
else:
P.constraints[c_name] = con_i
parse = GamsParser(sym_map)
# OBJECTIVE
# model_type = 'MINLP' # TODO: Automatic selection
# + (f"OPTION {model_type} = {solver};\n" if solver else "")
objective = "VARIABLE OBJ;\nEQUATION OBJ_EXPRESSION;\n" \
f"OBJ_EXPRESSION.. OBJ =E= {write_objective(P, parse)};\n\n" \
f"MODEL {name} /all/;\n" \
f"SOLVE {name} USING {model_type} minimising OBJ;\n"
with open(file_name, 'w') as f:
f.write(GAMS_HEADER)
f.write('\n\n')
f.write(write_sets_section(P))
f.write('\n\n')
f.write(write_parameters_section(P, iname))
f.write('\n\n')
f.write(write_variables_section(P, iname))
f.write('\n\n')
f.write(write_equations_section(P, iname, parse))
f.write('\n\n')
f.write(objective)
# return sym_map
[docs]
def get_results(results_file_name):
"""Code for parsing gams results files."""
import re
p1 = re.compile(r"---- VAR (\S+)\s+\S+\s+(\S+)")
p2 = re.compile(r"---- VAR (\S+)\s+\n")
p3 = re.compile(r"(\S+)\s+\S+\s+(\S+)")
val_map = {}
with open(results_file_name, 'r') as f:
lines = iter(f.readlines())
def skip_header(line):
if line.startswith('\x0cGAMS '):
next(lines)
next(lines)
return next(lines)
return line
def advance(n=1):
for i in range(n):
line = skip_header(next(lines))
return line
for line in lines:
try: # Match scalar variable
var_name, val = p1.search(line).groups()
val_map[var_name] = 0 if val == '.' else float(val)
advance()
continue
except AttributeError:
pass
try: # Match indexed variable
var_name = p2.search(line).group(1)
line = advance(3)
values = {}
while True:
try:
line = skip_header(next(lines))
idx, value = p3.search(line).groups()
values[tuple(idx.split('.'))] = 0 if val == '.' \
else float(value)
except AttributeError:
val_map[var_name] = values
advance()
break
except AttributeError:
pass
if line.startswith('**** REPORT SUMMARY'):
break
return val_map
[docs]
def solve(P, input_file_name=None, silent=False, **options):
"""Solve the problem specified in the input_file using GAMS.
Apart from the usual GAMS options, the model type (e.g. LP/MINLP) may be
specified explixitly using the model_type option, the default is MINLP.
"""
from comando.utility import syscall
import os
if input_file_name is None:
base_name = P.name
input_file_name = f'{P.name}.gms'
elif input_file_name.endswith('.gms'):
base_name = input_file_name[:-4]
else:
base_name = input_file_name
input_file_name = base_name + '.gms'
while os.path.isfile(input_file_name):
yn = input(f"A file '{input_file_name}' already exists, overwrite "
"(y/n)?")
if yn.lower() == 'y':
break
if yn.lower() == 'n':
print('Aborting...')
return 1
model_type = options.pop('model_type', 'MINLP')
write_gms_file(P, input_file_name, model_type)
ret = syscall('gams', input_file_name,
*(f'{option}={value}' for option, value in options.items()),
log_name=f'{base_name}.gams.log', silent=silent)
if ret != 0:
print('Something went wrong during solving!')
else:
val_map = get_results(f'{base_name}.lst')
for v in P.design_variables.union(P.operational_variables):
v.value = val_map[v.name]
return ret
######
# def is_num(x):
# """Test whether x is numeric."""
# return isinstance(x, (int, float))
# def allnums(*args):
# """Test if all arguments passed are numerical."""
# return all(is_num(arg) for arg in args)
#
# def is_neg(x):
# """Test if x is a negated summand and store the result and negation."""
# if x in is_neg.cache:
# return is_neg.cache[x][0]
# try:
# if x[0] == '-':
# is_neg.cache[x] = [True, x[1:]]
# return True
# except TypeError:
# if x < 0:
# is_neg.cache[x] = [True, str(-x)]
# return True
# return False
#
#
# is_neg.cache = {}
#
#
# def gams_sum(*args):
# """Create a string representation of `sum(args)` in GAMS syntax."""
# summands, negs = comando.utility.split(args, is_neg)
# res = ' + '.join(str(s) for s in summands)
# if negs: # negs is nonempty!
# if len(negs) > 1:
# subtrahends = f"[{' + '.join(is_neg.cache[n][1] for n in negs)}]"
# else:
# subtrahends = is_neg.cache[negs[0]][1]
# return f'{res} - {subtrahends}'
# return res
#
#
# def is_inv(x):
# """Test if x is an inverted factor and store the result and inversion."""
# if x in is_inv.cache:
# return is_inv.cache[x][0]
# try:
# if x[:2] == '1/':
# is_inv.cache[x] = [True, x[2:]]
# return True
# except TypeError:
# pass
# return False
#
#
# is_inv.cache = {}
#
#
# def gams_prod(*args):
# """Create a string representation of `product(args)` in GAMS syntax."""
# negate = False
# if args[0] == -1:
# negate = True
# args = args[1:]
# factors, invs = comando.utility.split(args, is_inv)
# res = ' * '.join(f'({f})' if '+' in f else str(f) for f in factors)
# if invs: # invs is nonempty!
# if len(invs) > 1:
# dividends = f"[{' + '.join(is_inv.cache[i][1] for i in invs)}]"
# else:
# dividends = is_inv.cache[invs[0]][1]
# res = f'{res} / {dividends}'
# if negate:
# return f"-[{res}]" if ' ' in res else f'-{res}'
# return res
#
#
# def gams_pow(*args):
# """Create a string representation of `pow(*args)` in GAMS syntax."""
# if args[1] == -1:
# return f'1/[{args[0]}]'
# if args[1] == 2:
# return f'SQR({args[0]})'
# try: # test if the exponent is integer
# if int(args[1]) == args[1]:
# return f"POWER({args[0]}, {args[1]})"
# except ValueError:
# pass
# # real power (equivalent to EXP[n*LOG(x)])
# return f"RPOWER({args[0]}, {args[1]})"
#
#
# gams_map = {
# 'Add': gams_sum,
# 'Mul': gams_prod,
# 'Pow': gams_pow,
# 'LessThan': lambda x, y: f"{x} =L= {y}",
# 'GreaterThan': lambda x, y: f"{x} =G= {y}",
# 'Equality': lambda x, y: f"{x} =E= {y}"}
#
#
# def experimental():
# from gams import GamsWorkspace
# import os
# import sys
# cwd = os.getcwd()
# filename = 'test_if_else.gms'
# ws = GamsWorkspace() # system_directory='/Applications/GAMS25.0/sysdir/')
# filename = "Voll.gms"
# t1 = ws.add_job_from_file(os.path.join(cwd, filename))
# t1.run(output=sys.stdout)
#
# for o in t1.out_db:
# print(o.name)
# for record in o:
# print('\t', record)
# # try:
# #
# # # if record.keys:
# # # keys = ", ".join(str(key) for key in record.keys)
# # # print(f'\n\t({keys}): level = {record.level}, '
# # # f'marginal = {record.marginal}')
# # # else:
# # # print(f'\n\tlevel = {record.level}, '
# # # f'marginal = {record.marginal}')
# # except:
# # for record in o:
# # print('\t', record)