From 34800924a1f5060025376a50f6c1690a9d81f318 Mon Sep 17 00:00:00 2001 From: Lance-Drane Date: Thu, 13 Nov 2025 21:58:43 -0500 Subject: [PATCH 1/3] #230 - add fallback bridge component to log events without portal Signed-off-by: Lance-Drane --- ipsframework/bridges/__init__.py | 6 + ipsframework/bridges/basic_bridge.py | 119 ++++++ ipsframework/bridges/local_event_logger.py | 344 ++++++++++++++++ .../portal_bridge.py} | 366 ++---------------- ipsframework/configurationManager.py | 71 ++-- ipsframework/services.py | 26 +- 6 files changed, 552 insertions(+), 380 deletions(-) create mode 100644 ipsframework/bridges/__init__.py create mode 100644 ipsframework/bridges/basic_bridge.py create mode 100644 ipsframework/bridges/local_event_logger.py rename ipsframework/{portalBridge.py => bridges/portal_bridge.py} (51%) diff --git a/ipsframework/bridges/__init__.py b/ipsframework/bridges/__init__.py new file mode 100644 index 00000000..9df9d819 --- /dev/null +++ b/ipsframework/bridges/__init__.py @@ -0,0 +1,6 @@ +"""Bridges are components which handle supplementary tasks in the IPS Framework. Applications should not need to import these classes directly. + +There are two bridges available: +- `BasicBridge`, which provides simple system logging +- `PortalBridge`, which provides all functionality of `BasicBridge` and additionally allows interfacing with a remote IPS Portal +""" diff --git a/ipsframework/bridges/basic_bridge.py b/ipsframework/bridges/basic_bridge.py new file mode 100644 index 00000000..1b5f5756 --- /dev/null +++ b/ipsframework/bridges/basic_bridge.py @@ -0,0 +1,119 @@ +# ------------------------------------------------------------------------------- +# Copyright 2006-2022 UT-Battelle, LLC. See LICENSE for more information. +# ------------------------------------------------------------------------------- +import hashlib +import time +from typing import Literal + +from ipsframework import Component +from ipsframework.bridges.local_event_logger import LocalEventLogger, SimulationData +from ipsframework.cca_es_spec import Event + + +class BasicBridge(Component): + """ + Framework component meant to handle simple event logging. + + This component should not exist in the event that the simulation is interacting with the web framework - see the PortalBridge component instead. + """ + + def __init__(self, services, config): + """ + Declaration of private variables and initialization of + :py:class:`component.Component` object. + """ + super().__init__(services, config) + self.sim_map: dict[str, SimulationData] = {} + self.done = False + self.local_event_logger = LocalEventLogger() + + def init(self, timestamp=0.0, **keywords): + """ + Subscribe to *_IPS_MONITOR* events and register callback :py:meth:`.process_event`. + """ + self.services.subscribe('_IPS_MONITOR', 'process_event') + self.local_event_logger.init(self.services) + + def step(self, timestamp=0.0, **keywords): + """ + Poll for events. + """ + while not self.done: + self.services.process_events() + time.sleep(0.5) + + def finalize(self, timestamp=0.0, **keywords): + self.local_event_logger.finalize(self.sim_map) + + def process_event(self, topicName: str, theEvent: Event): + """ + Process a single event *theEvent* on topic *topicName*. + """ + event_body = theEvent.getBody() + sim_name = event_body['sim_name'] + portal_data = event_body['portal_data'] + try: + portal_data['sim_name'] = event_body['real_sim_name'] + except KeyError: + portal_data['sim_name'] = sim_name + + if portal_data['eventtype'] == 'IPS_START': + sim_root = event_body['sim_root'] + self.init_simulation(sim_name, sim_root) + + sim_data = self.sim_map[sim_name] + if portal_data['eventtype'] == 'PORTALBRIDGE_UPDATE_TIMESTAMP': + sim_data.phys_time_stamp = portal_data['phystimestamp'] + return + else: + portal_data['phystimestamp'] = sim_data.phys_time_stamp + + if portal_data['eventtype'] == 'PORTAL_REGISTER_NOTEBOOK': + return + + if portal_data['eventtype'] == 'PORTAL_ADD_JUPYTER_DATA': + return + + if portal_data['eventtype'] == 'PORTAL_UPLOAD_ENSEMBLE_PARAMS': + return + + portal_data['portal_runid'] = sim_data.portal_runid + + if portal_data['eventtype'] == 'IPS_SET_MONITOR_URL': + sim_data.monitor_url = portal_data['vizurl'] + elif sim_data.monitor_url: + portal_data['vizurl'] = sim_data.monitor_url + + if portal_data['eventtype'] == 'IPS_START' and 'parent_portal_runid' not in portal_data: + portal_data['parent_portal_runid'] = sim_data.parent_portal_runid + portal_data['seqnum'] = sim_data.counter + + if 'trace' in portal_data: + portal_data['trace']['traceId'] = hashlib.md5(sim_data.portal_runid.encode()).hexdigest() + + self.local_event_logger.send_event(self.services, sim_data, portal_data) + + if portal_data['eventtype'] == 'IPS_END': + del self.sim_map[sim_name] + + if len(self.sim_map) == 0: + self.done = True + self.services.debug('No more simulation to monitor - exiting') + time.sleep(1) + + def init_simulation(self, sim_name: str, sim_root: str): + """ + Create and send information about simulation *sim_name* living in + *sim_root* so the portal can set up corresponding structures to manage + data from the sim. + """ + self.services.debug('Initializing simulation using BasicBridge: %s -- %s ', sim_name, sim_root) + sim_data = self.local_event_logger.init_simulation(self.services, sim_name, sim_root, self.HOST, self.USER) + self.sim_map[sim_data.sim_name] = sim_data + + def terminate(self, status: Literal[0, 1]): + """ + Clean up services and call :py:obj:`sys_exit`. + """ + + Component.terminate(self, status) diff --git a/ipsframework/bridges/local_event_logger.py b/ipsframework/bridges/local_event_logger.py new file mode 100644 index 00000000..ff35de45 --- /dev/null +++ b/ipsframework/bridges/local_event_logger.py @@ -0,0 +1,344 @@ +import datetime +import glob +import hashlib +import itertools +import json +import os +import re +import time +from collections import defaultdict +from typing import TYPE_CHECKING, Any + +from ipsframework import ipsutil +from ipsframework.convert_log_function import convert_logdata_to_html +from ipsframework.services import ServicesProxy + +if TYPE_CHECKING: + from io import FileIO + + +def hash_file(file_name): # pragma: no cover + """ + Return the MD5 hash of a file + :rtype: str + :param file_name: Full path to file + :return: MD5 of file_name + """ + BLOCKSIZE = 65536 + hasher = hashlib.md5() + with open(file_name, 'rb') as afile: + buf = afile.read(BLOCKSIZE) + while len(buf) > 0: + hasher.update(buf) + buf = afile.read(BLOCKSIZE) + return hasher.hexdigest() + + +class SimulationData: + """ + Container for simulation data. + """ + + def __init__(self): + self.counter = 0 + self.monitor_file_name = '' + self.portal_runid = '' + self.parent_portal_runid = '' + self.sim_name = '' + self.sim_root = '' + self.monitor_file: FileIO = None # type: ignore + self.json_monitor_file: FileIO = None # type: ignore + self.phys_time_stamp = -1 + self.monitor_url = '' + self.mpo_steps = [None] + self.mpo_wid = None + self.bigbuf = '' + + +class LocalEventLogger: + """The LocalEventLogger class manages event logging for the supplemental bridge components.""" + + def __init__(self) -> None: + # self.curTime = time.localtime() + # self.startTime = self.curTime + self.mpo = None + self.mpo_name_counter: defaultdict[str, int] = defaultdict(lambda: 0) + self.counter = 0 + self.dump_freq = 10 + self.min_dump_interval = 300 # Minimum time interval in Sec for HTML dump operation + self.last_dump_time = time.time() + self.write_to_htmldir = True + self.html_dir = '' + self.first_portal_runid = None + + def init(self, services: ServicesProxy) -> None: + """Called from the bridge component's `init` function.""" + try: + freq = int(services.get_config_param('HTML_DUMP_FREQ', silent=True)) + except Exception: + pass + else: + self.dump_freq = freq + + try: + self.html_dir = services.get_config_param('USER_W3_DIR', silent=True) or '' + except Exception: + services.warning('Missing USER_W3_DIR configuration - disabling web-visible logging') + self.write_to_htmldir = False + else: + if self.html_dir.strip() == '': + services.warning('Empty USER_W3_DIR configuration - disabling web-visible logging') + self.write_to_htmldir = False + else: + try: + os.mkdir(self.html_dir) + except FileExistsError: + pass + except Exception: + services.warning('Unable to create HTML directory - disabling web-visible logging') + self.write_to_htmldir = False + + def finalize(self, sim_map: dict[str, SimulationData]) -> None: + for sim_data in sim_map.values(): + try: + sim_data.monitor_file.close() + sim_data.json_monitor_file.close() + except Exception: + pass + + def init_simulation(self, services: ServicesProxy, sim_name: str, sim_root: str, hostname: str, username: str) -> SimulationData: + """ + Create and send information about simulation *sim_name* living in + *sim_root* so the portal can set up corresponding structures to manage + data from the sim. + + :returns: + """ + sim_data = SimulationData() + sim_data.sim_name = sim_name + sim_data.sim_root = sim_root + + d = datetime.datetime.now() + date_str = '%s.%03d' % (d.strftime('%Y-%m-%dT%H:%M:%S'), int(d.microsecond / 1000)) + sim_data.portal_runid = f'{sim_name}_{hostname}_{username}_{date_str}' + try: + services.set_config_param('PORTAL_RUNID', sim_data.portal_runid, target_sim_name=sim_name) + except Exception: + services.error('Simulation %s is not accessible', sim_name) + return + + if self.first_portal_runid: + sim_data.parent_portal_runid = self.first_portal_runid + else: + self.first_portal_runid = sim_data.portal_runid + + if sim_data.sim_root.strip() == '.': + sim_data.sim_root = os.environ['IPS_INITIAL_CWD'] + sim_log_dir = os.path.join(sim_data.sim_root, 'simulation_log') + try: + os.makedirs(sim_log_dir, exist_ok=True) + except OSError as oserr: + services.exception('Error creating Simulation Log directory %s : %d %s' % (sim_log_dir, oserr.errno, oserr.strerror)) + raise + + sim_data.monitor_file_name = os.path.join(sim_log_dir, sim_data.portal_runid + '.eventlog') + try: + sim_data.monitor_file = open(sim_data.monitor_file_name, 'wb', 0) + except IOError as oserr: + services.error('Error opening file %s: error(%s): %s' % (sim_data.monitor_file_name, oserr.errno, oserr.strerror)) + services.error('Using /dev/null instead') + sim_data.monitor_file = open('/dev/null', 'w') + json_fname = sim_data.monitor_file_name.replace('eventlog', 'jsonl') + sim_data.json_monitor_file = open(json_fname, 'w') + + if self.mpo: # pragma: no cover + try: + sim_data.mpo_wid = self.mpo.init(name='SWIM Workflow ' + os.environ['USER'], desc=sim_data.sim_name, wtype='SWIM') + print('sim_data.mpo_wid = ', sim_data.mpo_wid) + except Exception as e: + print(e) + print('sim_data.mpo_wid = ', sim_data.mpo_wid) + sim_data.mpo_wid = None + else: + sim_data.mpo_steps = [sim_data.mpo_wid['uid']] + + return sim_data + + def send_event(self, services: ServicesProxy, sim_data: SimulationData, event_data: dict[str, Any]): + """ + Send contents of *event_data* and *sim_data* to portal. + """ + timestamp = ipsutil.getTimeString() + buf = '%8d %s ' % (sim_data.counter, timestamp) + for k, v in event_data.items(): + if len(str(v).strip()) == 0: + continue + if ' ' in str(v): + buf += "%s='%s' " % (k, str(v)) + else: + buf += '%s=%s ' % (k, str(v)) + buf += '\n' + sim_data.monitor_file.write(bytes(buf, encoding='UTF-8')) + sim_data.bigbuf += buf + + buf = json.dumps(event_data) + sim_data.json_monitor_file.write('%s\n' % buf) + + freq = self.dump_freq + if ((self.counter % freq == 0) and (time.time() - self.last_dump_time > self.min_dump_interval)) or (event_data['eventtype'] == 'IPS_END'): + self.last_dump_time = time.time() + html_filename = sim_data.monitor_file_name.replace('eventlog', 'html') + html_page = convert_logdata_to_html(sim_data.bigbuf) + open(html_filename, 'w').writelines(html_page) + if self.write_to_htmldir: + html_file = os.path.join(self.html_dir, os.path.basename(html_filename)) + try: + open(html_file, 'w').writelines(html_page) + except Exception: + services.exception('Error writing html file into USER_W3_DIR directory') + self.write_to_htmldir = False + + if sim_data.mpo_wid: + self.send_mpo_data(event_data, sim_data) + + def send_mpo_data(self, event_data, sim_data: SimulationData): # pragma: no cover + def md5(fname): + "Courtesy of stackoverflow 3431825" + hash_md5 = hashlib.md5() + with open(fname, 'rb') as f: + for chunk in iter(lambda: f.read(4096), b''): + hash_md5.update(chunk) + return hash_md5.hexdigest() + + def mpo_add_file(workflow, parent, file, shortname='Need a name', longdesc='did not add a description.'): + """Add a local file to the workflow attaching to parent. Calculate + checksum and if the file is already in the mpo database, use the + already the UID of the already existing file when adding the data + object - this creates a linkage to the original. The checksum and + local file path and name are added as metadata. + + This function relies on the user space metadata, ips_checksum + and ips_filename. The checksum is the md5 sum and the filename + is expected should have at least a relative qualifying path. + + workflow : workflow_id + parent : parent_id + """ + # if file exist, look for its checksum in the database + try: + checksum = md5(file) + except Exception: + print(('checksum could not find file:', file)) + raise + + is_checksum = self.mpo.search('metadata', params={'key': 'ips_checksum', 'value': checksum}) + # search always returns a list of dictionaries + # if checksum exists, use first dataobject that has it + # api search results are sorted by time + # Note, check this with eqdsk dataobject in test-api + print('%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%') + print(len(is_checksum), file) + print('%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%') + + if len(is_checksum) > 0: + # uid is chosen to be first occurrence + # parent_uid is uid of object metadata is attached to. + file_uid = is_checksum[0]['parent_uid'] + + # Create dataobject reference by uid in the workflow + dataobject = self.mpo.add(workflow, parent, uid=file_uid, name=shortname, desc=longdesc) + self.mpo.meta(dataobject['uid'], 'ips_checksum', checksum) + # add filename metadata the dataobject reference + self.mpo.meta(dataobject['uid'], 'ips_filename', file) + else: + print(('file', file)) + file_uri = file + # Create new dataobject by uri and insert reference in to workflow + dataobject = self.mpo.add(workflow, parent, uri=file_uri, name=shortname, desc=longdesc) + # add checksum metadata to original data object + # add function currently only returns uri field, so fetch full record + full_dataobject = self.mpo.search('dataobject/' + dataobject['uid'])[0] + # add checksum so dataobject and also + self.mpo.meta(full_dataobject['do_uid'], 'ips_checksum', checksum) + self.mpo.meta(dataobject['uid'], 'ips_filename', file) + self.mpo.meta(dataobject['uid'], 'ips_checksum', checksum) + dataobject = full_dataobject + return dataobject + + recordable_events = ['IPS_CALL_BEGIN', 'IPS_STAGE_INPUTS', 'IPS_STAGE_OUTPUTS', 'IPS_CALL_END'] + recordable_mpo_activities = ['IPS_CALL_BEGIN'] + comment = event_data['comment'] + event_type = event_data['eventtype'] + + if event_type not in recordable_events: + return + inp_objs = [] + if event_type == 'IPS_CALL_END': + del sim_data.mpo_steps[-1] + return + try: + if event_type == 'IPS_STAGE_INPUTS': + r = re.compile(r'^Elapsed time = ([0-9]*\.[0-9]*) Path = ([^ ]*) Files = (.*)') + o = r.match(comment) + (_, path, files) = o.groups() + glist = [glob.glob(os.path.join(path, f)) for f in files.split()] + for file_name in [os.path.basename(f) for f in itertools.chain(*glist)]: + mpo_data_obj = mpo_add_file( + sim_data.mpo_wid, sim_data.mpo_wid['uid'], os.path.join(path, file_name), shortname=file_name, longdesc='An input file' + ) + inp_objs.append(mpo_data_obj['uid']) + + if event_type == 'IPS_STAGE_INPUTS' and not inp_objs: + return + + count = self.mpo_name_counter[sim_data.sim_name + event_data['code']] + if event_type == 'IPS_CALL_BEGIN': + target = event_data['comment'].split()[-1] + step_name = '%s %d' % (target, count) + else: + step_name = '{0:s} {1:s} {2:d}'.format(event_data['code'].split('_')[-1], event_type, count) + + if event_type == 'IPS_STAGE_OUTPUTS': + r = re.compile(r'^Elapsed time = ([0-9]*\.[0-9]*) Path = ([^ ]*) Files = (.*)') + o = r.match(comment) + (_, path, files) = o.groups() + if not files: + return + activity = self.mpo.step( + workflow_ID=sim_data.mpo_wid, parentobj_ID=sim_data.mpo_steps[-1], input_objs=inp_objs, name=step_name, desc='%s' % event_data['comment'] + ) + self.mpo_name_counter[sim_data.sim_name + event_data['code']] += 1 + if event_type == 'IPS_STAGE_OUTPUTS': + glist = [glob.glob(os.path.join(path, f)) for f in files.split()] + for file_name in [os.path.basename(f) for f in itertools.chain(*glist)]: + """ + (f_uid, f_hash) = get_file_uid(path, file_name) + if f_uid: + mpo_data_obj = self.mpo.add(workflow_ID=sim_data.mpo_wid, + parentobj_ID=activity['uid'], + name=file_name, + desc="An output file", + uri='file:' + os.path.join(path, file_name), + uid=f_uid, + source = f_uid) + else: + mpo_data_obj = self.mpo.add(workflow_ID=sim_data.mpo_wid, + parentobj_ID=activity['uid'], + name=file_name, + desc="An output file", + uri='file:' + os.path.join(path, file_name)) + """ + mpo_data_obj = mpo_add_file( + sim_data.mpo_wid, + activity['uid'], + # sim_data.mpo_wid['uid'], + os.path.join(path, file_name), + shortname=file_name, + longdesc='An output file', + ) + + except Exception as e: + print('*************', e) + else: + if event_type in recordable_mpo_activities: + sim_data.mpo_steps.append(activity['uid']) diff --git a/ipsframework/portalBridge.py b/ipsframework/bridges/portal_bridge.py similarity index 51% rename from ipsframework/portalBridge.py rename to ipsframework/bridges/portal_bridge.py index 528924c5..425442c2 100644 --- a/ipsframework/portalBridge.py +++ b/ipsframework/bridges/portal_bridge.py @@ -1,16 +1,11 @@ # ------------------------------------------------------------------------------- # Copyright 2006-2022 UT-Battelle, LLC. See LICENSE for more information. # ------------------------------------------------------------------------------- -import datetime -import glob import hashlib -import itertools import json import os -import re import tarfile import time -from collections import defaultdict from multiprocessing import Event, Pipe, Process from multiprocessing.connection import Connection from multiprocessing.synchronize import Event as EventType @@ -18,25 +13,8 @@ import urllib3 -from ipsframework import Component, ipsutil -from ipsframework.convert_log_function import convert_logdata_to_html - - -def hash_file(file_name): # pragma: no cover - """ - Return the MD5 hash of a file - :rtype: str - :param file_name: Full path to file - :return: MD5 of file_name - """ - BLOCKSIZE = 65536 - hasher = hashlib.md5() - with open(file_name, 'rb') as afile: - buf = afile.read(BLOCKSIZE) - while len(buf) > 0: - hasher.update(buf) - buf = afile.read(BLOCKSIZE) - return hasher.hexdigest() +from ipsframework import Component +from ipsframework.bridges.local_event_logger import LocalEventLogger, SimulationData def send_post(conn: Connection, stop: EventType, url: str): @@ -203,40 +181,19 @@ def __init__(self, target: Callable, *args): class PortalBridge(Component): """ - Framework component to communicate with the SWIM web portal. + Framework component to communicate with the IPS web portal. """ - class SimulationData: - """ - Container for simulation data. - """ - - def __init__(self): - self.counter = 0 - self.monitor_file_name = '' - self.portal_runid = None - self.parent_portal_runid = None - self.sim_name = '' - self.sim_root = '' - self.monitor_file = None - self.json_monitor_file = None - self.phys_time_stamp = -1 - self.monitor_url = None - self.mpo_steps = [None] - self.mpo_wid = None - self.bigbuf = '' - def __init__(self, services, config): """ Declaration of private variables and initialization of :py:class:`component.Component` object. """ super().__init__(services, config) - self.curTime = time.localtime() - self.startTime = self.curTime - self.sim_map = {} - self.portal_url = None + self.sim_map: dict[str, SimulationData] = {} self.done = False + self.local_event_logger = LocalEventLogger() + self.portal_url = '' self.first_event = True self.childProcess = None self.childProcessStop = None @@ -244,15 +201,6 @@ def __init__(self, services, config): self.url_manager_jupyter_notebook = None self.url_manager_jupyter_data = None self.url_manager_ensemble_uploads = None - self.mpo = None - self.mpo_name_counter = defaultdict(lambda: 0) - self.counter = 0 - self.dump_freq = 10 - self.min_dump_interval = 300 # Minimum time interval in Sec for HTML dump operation - self.last_dump_time = time.time() - self.write_to_htmldir = True - self.html_dir = '' - self.first_portal_runid = None def init(self, timestamp=0.0, **keywords): """ @@ -267,31 +215,9 @@ def init(self, timestamp=0.0, **keywords): self.portal_api_key = self._IPS_PORTAL_API_KEY except AttributeError: pass - self.services.subscribe('_IPS_MONITOR', 'process_event') - try: - freq = int(self.services.get_config_param('HTML_DUMP_FREQ', silent=True)) - except Exception: - pass - else: - self.dump_freq = freq - try: - self.html_dir = self.services.get_config_param('USER_W3_DIR', silent=True) or '' - except Exception: - self.services.warning('Missing USER_W3_DIR configuration - disabling web-visible logging') - self.write_to_htmldir = False - else: - if self.html_dir.strip() == '': - self.services.warning('Empty USER_W3_DIR configuration - disabling web-visible logging') - self.write_to_htmldir = False - else: - try: - os.mkdir(self.html_dir) - except FileExistsError: - pass - except Exception: - self.services.warning('Unable to create HTML directory - disabling web-visible logging') - self.write_to_htmldir = False + self.services.subscribe('_IPS_MONITOR', 'process_event') + self.local_event_logger.init(self.services) def step(self, timestamp=0.0, **keywords): """ @@ -302,12 +228,7 @@ def step(self, timestamp=0.0, **keywords): time.sleep(0.5) def finalize(self, timestamp=0.0, **keywords): - for sim_data in self.sim_map.values(): - try: - sim_data.monitor_file.close() - sim_data.json_monitor_file.close() - except Exception: - pass + self.local_event_logger.finalize(self.sim_map) def process_event(self, topicName, theEvent): """ @@ -370,55 +291,7 @@ def process_event(self, topicName, theEvent): if 'trace' in portal_data: portal_data['trace']['traceId'] = hashlib.md5(sim_data.portal_runid.encode()).hexdigest() - self.send_event(sim_data, portal_data) - sim_data.counter += 1 - self.counter += 1 - - if portal_data['eventtype'] == 'IPS_END': - del self.sim_map[sim_name] - - if len(self.sim_map) == 0: - if self.childProcess: - self.childProcessStop.set() - self.childProcess.join() - self.check_send_post_responses() - self.done = True - self.services.debug('No more simulation to monitor - exiting') - time.sleep(1) - - def send_event(self, sim_data, event_data): - """ - Send contents of *event_data* and *sim_data* to portal. - """ - timestamp = ipsutil.getTimeString() - buf = '%8d %s ' % (sim_data.counter, timestamp) - for k, v in event_data.items(): - if len(str(v).strip()) == 0: - continue - if ' ' in str(v): - buf += "%s='%s' " % (k, str(v)) - else: - buf += '%s=%s ' % (k, str(v)) - buf += '\n' - sim_data.monitor_file.write(bytes(buf, encoding='UTF-8')) - sim_data.bigbuf += buf - - buf = json.dumps(event_data) - sim_data.json_monitor_file.write('%s\n' % buf) - - freq = self.dump_freq - if ((self.counter % freq == 0) and (time.time() - self.last_dump_time > self.min_dump_interval)) or (event_data['eventtype'] == 'IPS_END'): - self.last_dump_time = time.time() - html_filename = sim_data.monitor_file_name.replace('eventlog', 'html') - html_page = convert_logdata_to_html(sim_data.bigbuf) - open(html_filename, 'w').writelines(html_page) - if self.write_to_htmldir: - html_file = os.path.join(self.html_dir, os.path.basename(html_filename)) - try: - open(html_file, 'w').writelines(html_page) - except Exception: - self.services.exception('Error writing html file into USER_W3_DIR directory') - self.write_to_htmldir = False + self.local_event_logger.send_event(self.services, sim_data, portal_data) if self.portal_url: if self.first_event: # First time, launch sendPost.py daemon @@ -429,14 +302,23 @@ def send_event(self, sim_data, event_data): self.first_event = False try: - self.parent_conn.send(event_data) + self.parent_conn.send(portal_data) except OSError: pass self.check_send_post_responses() - if sim_data.mpo_wid: - self.send_mpo_data(event_data, sim_data) + if portal_data['eventtype'] == 'IPS_END': + del self.sim_map[sim_name] + + if len(self.sim_map) == 0: + if self.childProcess: + self.childProcessStop.set() + self.childProcess.join() + self.check_send_post_responses() + self.done = True + self.services.debug('No more simulation to monitor - exiting') + time.sleep(1) def check_send_post_responses(self): while self.parent_conn.poll(): @@ -459,7 +341,7 @@ def check_send_post_responses(self): self.services.error('Portal Error: %d %s', code, msg) elif code == -1: # disable portal, stop trying to send more data - self.portal_url = None + self.portal_url = '' self.services.error('Disabling portal because: %s', msg) else: self.services.debug('Portal Response: %d %s', code, msg) @@ -478,14 +360,14 @@ def http_req_and_response(self, manager: UrlRequestProcessManager, event_data): if code == -1: # disable portal, stop trying to send more data - self.portal_url = None + self.portal_url = '' self.services.error('Disabling portal because: %s', msg) elif code >= 400: self.services.error('Portal Error: %d %s', code, msg) else: self.services.debug('Portal Response: %d %s', code, msg) - def send_jupyter_notebook(self, sim_data, event_data): + def send_jupyter_notebook(self, sim_data: SimulationData, event_data): if self.portal_url and self.portal_api_key: if not self.url_manager_jupyter_notebook: self.url_manager_jupyter_notebook = UrlRequestProcessManager( @@ -493,7 +375,7 @@ def send_jupyter_notebook(self, sim_data, event_data): ) self.http_req_and_response(self.url_manager_jupyter_notebook, event_data) - def send_notebook_data(self, sim_data, event_data): + def send_notebook_data(self, sim_data: SimulationData, event_data): if self.portal_url and self.portal_api_key: if not self.url_manager_jupyter_data: self.url_manager_jupyter_data = UrlRequestProcessManager( @@ -501,7 +383,7 @@ def send_notebook_data(self, sim_data, event_data): ) self.http_req_and_response(self.url_manager_jupyter_data, event_data) - def send_ensemble_variables(self, sim_data, event_data): + def send_ensemble_variables(self, sim_data: SimulationData, event_data): if self.portal_url and self.portal_api_key: if not self.url_manager_ensemble_uploads: self.url_manager_ensemble_uploads = UrlRequestProcessManager( @@ -509,205 +391,17 @@ def send_ensemble_variables(self, sim_data, event_data): ) self.http_req_and_response(self.url_manager_ensemble_uploads, event_data) - def send_mpo_data(self, event_data, sim_data): # pragma: no cover - def md5(fname): - "Courtesy of stackoverflow 3431825" - hash_md5 = hashlib.md5() - with open(fname, 'rb') as f: - for chunk in iter(lambda: f.read(4096), b''): - hash_md5.update(chunk) - return hash_md5.hexdigest() - - def mpo_add_file(workflow, parent, file, shortname='Need a name', longdesc='did not add a description.'): - """Add a local file to the workflow attaching to parent. Calculate - checksum and if the file is already in the mpo database, use the - already the UID of the already existing file when adding the data - object - this creates a linkage to the original. The checksum and - local file path and name are added as metadata. - - This function relies on the user space metadata, ips_checksum - and ips_filename. The checksum is the md5 sum and the filename - is expected should have at least a relative qualifying path. - - workflow : workflow_id - parent : parent_id - """ - # if file exist, look for its checksum in the database - try: - checksum = md5(file) - except Exception: - print(('checksum could not find file:', file)) - raise - - is_checksum = self.mpo.search('metadata', params={'key': 'ips_checksum', 'value': checksum}) - # search always returns a list of dictionaries - # if checksum exists, use first dataobject that has it - # api search results are sorted by time - # Note, check this with eqdsk dataobject in test-api - print('%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%') - print(len(is_checksum), file) - print('%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%') - - if len(is_checksum) > 0: - # uid is chosen to be first occurrence - # parent_uid is uid of object metadata is attached to. - file_uid = is_checksum[0]['parent_uid'] - - # Create dataobject reference by uid in the workflow - dataobject = self.mpo.add(workflow, parent, uid=file_uid, name=shortname, desc=longdesc) - self.mpo.meta(dataobject['uid'], 'ips_checksum', checksum) - # add filename metadata the dataobject reference - self.mpo.meta(dataobject['uid'], 'ips_filename', file) - else: - print(('file', file)) - file_uri = file - # Create new dataobject by uri and insert reference in to workflow - dataobject = self.mpo.add(workflow, parent, uri=file_uri, name=shortname, desc=longdesc) - # add checksum metadata to original data object - # add function currently only returns uri field, so fetch full record - full_dataobject = self.mpo.search('dataobject/' + dataobject['uid'])[0] - # add checksum so dataobject and also - self.mpo.meta(full_dataobject['do_uid'], 'ips_checksum', checksum) - self.mpo.meta(dataobject['uid'], 'ips_filename', file) - self.mpo.meta(dataobject['uid'], 'ips_checksum', checksum) - dataobject = full_dataobject - return dataobject - - recordable_events = ['IPS_CALL_BEGIN', 'IPS_STAGE_INPUTS', 'IPS_STAGE_OUTPUTS', 'IPS_CALL_END'] - recordable_mpo_activities = ['IPS_CALL_BEGIN'] - comment = event_data['comment'] - event_type = event_data['eventtype'] - - if event_type not in recordable_events: - return - inp_objs = [] - if event_type == 'IPS_CALL_END': - del sim_data.mpo_steps[-1] - return - try: - if event_type == 'IPS_STAGE_INPUTS': - r = re.compile(r'^Elapsed time = ([0-9]*\.[0-9]*) Path = ([^ ]*) Files = (.*)') - o = r.match(comment) - (_, path, files) = o.groups() - glist = [glob.glob(os.path.join(path, f)) for f in files.split()] - for file_name in [os.path.basename(f) for f in itertools.chain(*glist)]: - mpo_data_obj = mpo_add_file( - sim_data.mpo_wid, sim_data.mpo_wid['uid'], os.path.join(path, file_name), shortname=file_name, longdesc='An input file' - ) - inp_objs.append(mpo_data_obj['uid']) - - if event_type == 'IPS_STAGE_INPUTS' and not inp_objs: - return - - count = self.mpo_name_counter[sim_data.sim_name + event_data['code']] - if event_type == 'IPS_CALL_BEGIN': - target = event_data['comment'].split()[-1] - step_name = '%s %d' % (target, count) - else: - step_name = '{0:s} {1:s} {2:d}'.format(event_data['code'].split('_')[-1], event_type, count) - - if event_type == 'IPS_STAGE_OUTPUTS': - r = re.compile(r'^Elapsed time = ([0-9]*\.[0-9]*) Path = ([^ ]*) Files = (.*)') - o = r.match(comment) - (_, path, files) = o.groups() - if not files: - return - activity = self.mpo.step( - workflow_ID=sim_data.mpo_wid, parentobj_ID=sim_data.mpo_steps[-1], input_objs=inp_objs, name=step_name, desc='%s' % event_data['comment'] - ) - self.mpo_name_counter[sim_data.sim_name + event_data['code']] += 1 - if event_type == 'IPS_STAGE_OUTPUTS': - glist = [glob.glob(os.path.join(path, f)) for f in files.split()] - for file_name in [os.path.basename(f) for f in itertools.chain(*glist)]: - """ - (f_uid, f_hash) = get_file_uid(path, file_name) - if f_uid: - mpo_data_obj = self.mpo.add(workflow_ID=sim_data.mpo_wid, - parentobj_ID=activity['uid'], - name=file_name, - desc="An output file", - uri='file:' + os.path.join(path, file_name), - uid=f_uid, - source = f_uid) - else: - mpo_data_obj = self.mpo.add(workflow_ID=sim_data.mpo_wid, - parentobj_ID=activity['uid'], - name=file_name, - desc="An output file", - uri='file:' + os.path.join(path, file_name)) - """ - mpo_data_obj = mpo_add_file( - sim_data.mpo_wid, - activity['uid'], - # sim_data.mpo_wid['uid'], - os.path.join(path, file_name), - shortname=file_name, - longdesc='An output file', - ) - - except Exception as e: - print('*************', e) - else: - if event_type in recordable_mpo_activities: - sim_data.mpo_steps.append(activity['uid']) - - def init_simulation(self, sim_name, sim_root): + def init_simulation(self, sim_name: str, sim_root: str): """ Create and send information about simulation *sim_name* living in *sim_root* so the portal can set up corresponding structures to manage data from the sim. """ - self.services.debug('Initializing simulation : %s -- %s ', sim_name, sim_root) - sim_data = self.SimulationData() - sim_data.sim_name = sim_name - sim_data.sim_root = sim_root + self.services.debug('Initializing simulation using PortalBridge: %s -- %s ', sim_name, sim_root) if hasattr(self, '_IPS_PORTAL_API_KEY'): self.services.set_config_param('_IPS_PORTAL_API_KEY', self._IPS_PORTAL_API_KEY, target_sim_name=sim_name) - d = datetime.datetime.now() - date_str = '%s.%03d' % (d.strftime('%Y-%m-%dT%H:%M:%S'), int(d.microsecond / 1000)) - sim_data.portal_runid = f'{sim_name}_{self.HOST}_{self.USER}_{date_str}' - try: - self.services.set_config_param('PORTAL_RUNID', sim_data.portal_runid, target_sim_name=sim_name) - except Exception: - self.services.error('Simulation %s is not accessible', sim_name) - return - - if self.first_portal_runid: - sim_data.parent_portal_runid = self.first_portal_runid - else: - self.first_portal_runid = sim_data.portal_runid - - if sim_data.sim_root.strip() == '.': - sim_data.sim_root = os.environ['IPS_INITIAL_CWD'] - sim_log_dir = os.path.join(sim_data.sim_root, 'simulation_log') - try: - os.makedirs(sim_log_dir, exist_ok=True) - except OSError as oserr: - self.services.exception('Error creating Simulation Log directory %s : %d %s' % (sim_log_dir, oserr.errno, oserr.strerror)) - raise - - sim_data.monitor_file_name = os.path.join(sim_log_dir, sim_data.portal_runid + '.eventlog') - try: - sim_data.monitor_file = open(sim_data.monitor_file_name, 'wb', 0) - except IOError as oserr: - self.services.error('Error opening file %s: error(%s): %s' % (sim_data.monitor_file_name, oserr.errno, oserr.strerror)) - self.services.error('Using /dev/null instead') - sim_data.monitor_file = open('/dev/null', 'w') - json_fname = sim_data.monitor_file_name.replace('eventlog', 'json') - sim_data.json_monitor_file = open(json_fname, 'w') - - if self.mpo: # pragma: no cover - try: - sim_data.mpo_wid = self.mpo.init(name='SWIM Workflow ' + os.environ['USER'], desc=sim_data.sim_name, wtype='SWIM') - print('sim_data.mpo_wid = ', sim_data.mpo_wid) - except Exception as e: - print(e) - print('sim_data.mpo_wid = ', sim_data.mpo_wid) - sim_data.mpo_wid = None - else: - sim_data.mpo_steps = [sim_data.mpo_wid['uid']] - + sim_data = self.local_event_logger.init_simulation(self.services, sim_name, sim_root, self.HOST, self.USER) self.sim_map[sim_data.sim_name] = sim_data def terminate(self, status: Literal[0, 1]): diff --git a/ipsframework/configurationManager.py b/ipsframework/configurationManager.py index e30eae8b..826ee649 100644 --- a/ipsframework/configurationManager.py +++ b/ipsframework/configurationManager.py @@ -358,42 +358,53 @@ def _initialize_fwk_components(self): # SIMYAN: set up The Portal bridge, allowing for an absence of a portal use_portal = True - if 'USE_PORTAL' in self.sim_map[self.fwk_sim_name].sim_conf: + # If users explicitly disable the Portal via 'USE_PORTAL=false', or do not include a PORTAL_URL, assume the no-portal workflow. + # Otherwise, always initialize the Portal workflow + if 'PORTAL_URL' not in self.sim_map[self.fwk_sim_name].sim_conf: + use_portal = False + elif 'USE_PORTAL' in self.sim_map[self.fwk_sim_name].sim_conf: use_portal = self.sim_map[self.fwk_sim_name].sim_conf['USE_PORTAL'] if use_portal.lower() == 'false': use_portal = False - if use_portal: - portal_conf = {} - portal_conf['CLASS'] = 'FWK' - portal_conf['SUB_CLASS'] = 'COMP' - portal_conf['NAME'] = 'PortalBridge' - if 'FWK_COMPS_PATH' in self.sim_map[self.fwk_sim_name].sim_conf: - portal_conf['BIN_PATH'] = self.sim_map[self.fwk_sim_name].sim_conf['FWK_COMPS_PATH'] - portal_conf['SCRIPT'] = os.path.join(portal_conf['BIN_PATH'], 'portalBridge.py') - else: - portal_conf['SCRIPT'] = '' - portal_conf['MODULE'] = 'ipsframework.portalBridge' - portal_conf['INPUT_DIR'] = '/dev/null' - portal_conf['INPUT_FILES'] = '' - portal_conf['DATA_FILES'] = '' - portal_conf['OUTPUT_FILES'] = '' - portal_conf['NPROC'] = 1 - portal_conf['LOG_LEVEL'] = 'INFO' - try: - portal_conf['USER'] = self.sim_map[self.fwk_sim_name].sim_conf['USER'] - except KeyError: - portal_conf['USER'] = self.platform_conf['USER'] - portal_conf['HOST'] = self.platform_conf['HOST'] - if self.fwk.log_level == logging.DEBUG: - portal_conf['LOG_LEVEL'] = 'DEBUG' - portal_conf['PORTAL_URL'] = self.get_platform_parameter('PORTAL_URL', silent=True) + if use_portal: + bridge_module = 'portal_bridge' + bridge_name = 'PortalBridge' + else: + bridge_module = 'basic_bridge' + bridge_name = 'BasicBridge' + + bridge_conf = {} + bridge_conf['CLASS'] = 'FWK' + bridge_conf['SUB_CLASS'] = 'COMP' + bridge_conf['NAME'] = bridge_name + if 'FWK_COMPS_PATH' in self.sim_map[self.fwk_sim_name].sim_conf: + bridge_conf['BIN_PATH'] = self.sim_map[self.fwk_sim_name].sim_conf['FWK_COMPS_PATH'] + bridge_conf['SCRIPT'] = os.path.join(bridge_conf['BIN_PATH'], 'bridges', f'{bridge_module}.py') + else: + bridge_conf['SCRIPT'] = '' + bridge_conf['MODULE'] = f'ipsframework.bridges.{bridge_module}' + bridge_conf['INPUT_DIR'] = '/dev/null' + bridge_conf['INPUT_FILES'] = '' + bridge_conf['DATA_FILES'] = '' + bridge_conf['OUTPUT_FILES'] = '' + bridge_conf['NPROC'] = 1 + bridge_conf['LOG_LEVEL'] = 'INFO' + try: + bridge_conf['USER'] = self.sim_map[self.fwk_sim_name].sim_conf['USER'] + except KeyError: + bridge_conf['USER'] = self.platform_conf['USER'] + bridge_conf['HOST'] = self.platform_conf['HOST'] + if self.fwk.log_level == logging.DEBUG: + bridge_conf['LOG_LEVEL'] = 'DEBUG' - if portal_conf['PORTAL_URL']: - portal_conf['_IPS_PORTAL_API_KEY'] = self.get_platform_parameter('_IPS_PORTAL_API_KEY', silent=True) + if use_portal: + bridge_conf['PORTAL_URL'] = self.get_platform_parameter('PORTAL_URL', silent=True) + if bridge_conf['PORTAL_URL']: + bridge_conf['_IPS_PORTAL_API_KEY'] = self.get_platform_parameter('_IPS_PORTAL_API_KEY', silent=True) - component_id = self._create_component(portal_conf, self.sim_map[self.fwk_sim_name]) - self.fwk_components.append(component_id) + component_id = self._create_component(bridge_conf, self.sim_map[self.fwk_sim_name]) + self.fwk_components.append(component_id) def _initialize_sim(self, sim_data): """ diff --git a/ipsframework/services.py b/ipsframework/services.py index c7005128..115a9bad 100644 --- a/ipsframework/services.py +++ b/ipsframework/services.py @@ -64,7 +64,7 @@ def launch(binary: Any, task_name: str, working_dir: Union[str, os.PathLike], *a * `timeout` - The timeout in seconds for the task to complete. * `cpus_per_proc` - The number of cpus per process to use for the task. This implies that the DVMPlugin has set up a DVM daemon for this node. * `oversubscribe` - If `True`, then the number of processes can exceed the number of cores on the node. Default is `False`. - + If the worker has the attribute `dvm_uri_file`, then we are running with a DVM (Distributed Virtual Machine) so the `binary` needs a `prun` prepended pointing to that. @@ -139,18 +139,18 @@ def launch(binary: Any, task_name: str, working_dir: Union[str, os.PathLike], *a worker.logger.info(f'Using DVM URI file: {dvm_uri_file}') print(f'Using DVM URI file: {dvm_uri_file}', flush=True) - if task_env is not None and task_env is not {}: - if not 'PMIX_SERVER_URI41' in task_env: + if task_env is not None and task_env != {}: + if 'PMIX_SERVER_URI41' not in task_env: worker.logger.error('DVM environment variable PMIX_SERVER_URI41 not set in task_env') print('DVM environment variable PMIX_SERVER_URI41 not set in task_env', flush=True) else: - worker.logger.info(f"DVM environment variable PMIX_SERVER_URI41 set in task_env to {{task_env['PMIX_SERVER_URI41']}}") + worker.logger.info("DVM environment variable PMIX_SERVER_URI41 set in task_env to {task_env['PMIX_SERVER_URI41']}") print(f'DVM environment variable PMIX_SERVER_URI41 set in task_env to {task_env["PMIX_SERVER_URI41"]}', flush=True) - if not 'PMIX_SERVER_URI41' in os.environ: + if 'PMIX_SERVER_URI41' not in os.environ: worker.logger.error('DVM environment variable PMIX_SERVER_URI41 not set in os.environ') print('DVM environment variable PMIX_SERVER_URI41 not set in os.environ', flush=True) else: - worker.logger.info(f"DVM environment variable PMIX_SERVER_URI41 set in os.environ to {{os.environ['PMIX_SERVER_URI41']}}") + worker.logger.info("DVM environment variable PMIX_SERVER_URI41 set in os.environ to {os.environ['PMIX_SERVER_URI41']}") print(f'DVM environment variable PMIX_SERVER_URI41 set in os.environ to {os.environ["PMIX_SERVER_URI41"]}', flush=True) timeout = float(keywords.get('timeout', 1.0e9)) @@ -1229,7 +1229,8 @@ def get_config_param(self, param: str, silent: bool = False, log: bool = True) - val = self._get_service_response(msg_id, block=True) except Exception: if not silent: - self.exception('Error retrieving value of config parameter %s', param) + if log: + self.exception('Error retrieving value of config parameter %s', param) raise return None return val @@ -2525,7 +2526,6 @@ def send_ensemble_instance_to_portal(ensemble_name: str, data_path: Path) -> Non self.info(f'Preparing to run ensembles in {run_dir}') - # Ensure that we create a unique task pool name for this using the # instance prefix `name` # check this first to ensure uniqueness of `name` parameter @@ -2651,7 +2651,7 @@ def setup(self, worker: Worker): # invoking client.forward_logging() elsewhere, so I shouldn't have to # do this. self.logger.setLevel(logging.DEBUG) - self.logger.info(f'Launching DVM') + self.logger.info('Launching DVM') # TODO could make this more OS agnostic by using tempfile.mkstemp self.worker.dvm_uri_file = f'/tmp/dvm.uri.{os.getpid()}' command = ['prte', '--report-uri', self.worker.dvm_uri_file] @@ -2659,11 +2659,11 @@ def setup(self, worker: Worker): mapping_policy = 'core' # by default bind to cores if self.hwthreads: # ... unless you want to bind to hardware threads - self.logger.info(f'Binding to hardware threads') + self.logger.info('Binding to hardware threads') mapping_policy = 'hwtcpus' if self.oversubscribe: - self.logger.info(f'Allowing oversubscription of nodes') + self.logger.info('Allowing oversubscription of nodes') mapping_policy += ':oversubscribe' # This environment variable is specific to OpenMPI's PRTE @@ -2682,7 +2682,6 @@ def setup(self, worker: Worker): os.environ['PMIX_SERVER_URI41'] = self.worker.dvm_uri - return def teardown(self, worker: Worker): self.logger.info(f'Shutting down DVM at {self.worker.dvm_uri}') @@ -2942,8 +2941,7 @@ def _make_worker_args(num_workers: int, num_threads: int, use_shifter: bool, shi self.services.debug(f'Dask scheduler pid: {self.dask_sched_pid}') if not Path(self.dask_scheduler_file).exists(): - self.services.critical(f'Dask scheduler file ' - f'{self.dask_scheduler_file} does not exist') + self.services.critical(f'Dask scheduler file {self.dask_scheduler_file} does not exist') dask_nodes = 1 if dask_nodes is None else dask_nodes if services.get_config_param('MPIRUN') == 'eval': From 650cd7d2bbcedd6e4cb912a97d753e6f05cf2f13 Mon Sep 17 00:00:00 2001 From: Lance-Drane Date: Fri, 14 Nov 2025 11:43:55 -0500 Subject: [PATCH 2/3] remove mpo logic Signed-off-by: Lance-Drane --- ipsframework/bridges/local_event_logger.py | 160 --------------------- 1 file changed, 160 deletions(-) diff --git a/ipsframework/bridges/local_event_logger.py b/ipsframework/bridges/local_event_logger.py index ff35de45..97b57242 100644 --- a/ipsframework/bridges/local_event_logger.py +++ b/ipsframework/bridges/local_event_logger.py @@ -50,8 +50,6 @@ def __init__(self): self.json_monitor_file: FileIO = None # type: ignore self.phys_time_stamp = -1 self.monitor_url = '' - self.mpo_steps = [None] - self.mpo_wid = None self.bigbuf = '' @@ -61,8 +59,6 @@ class LocalEventLogger: def __init__(self) -> None: # self.curTime = time.localtime() # self.startTime = self.curTime - self.mpo = None - self.mpo_name_counter: defaultdict[str, int] = defaultdict(lambda: 0) self.counter = 0 self.dump_freq = 10 self.min_dump_interval = 300 # Minimum time interval in Sec for HTML dump operation @@ -151,17 +147,6 @@ def init_simulation(self, services: ServicesProxy, sim_name: str, sim_root: str, json_fname = sim_data.monitor_file_name.replace('eventlog', 'jsonl') sim_data.json_monitor_file = open(json_fname, 'w') - if self.mpo: # pragma: no cover - try: - sim_data.mpo_wid = self.mpo.init(name='SWIM Workflow ' + os.environ['USER'], desc=sim_data.sim_name, wtype='SWIM') - print('sim_data.mpo_wid = ', sim_data.mpo_wid) - except Exception as e: - print(e) - print('sim_data.mpo_wid = ', sim_data.mpo_wid) - sim_data.mpo_wid = None - else: - sim_data.mpo_steps = [sim_data.mpo_wid['uid']] - return sim_data def send_event(self, services: ServicesProxy, sim_data: SimulationData, event_data: dict[str, Any]): @@ -197,148 +182,3 @@ def send_event(self, services: ServicesProxy, sim_data: SimulationData, event_da except Exception: services.exception('Error writing html file into USER_W3_DIR directory') self.write_to_htmldir = False - - if sim_data.mpo_wid: - self.send_mpo_data(event_data, sim_data) - - def send_mpo_data(self, event_data, sim_data: SimulationData): # pragma: no cover - def md5(fname): - "Courtesy of stackoverflow 3431825" - hash_md5 = hashlib.md5() - with open(fname, 'rb') as f: - for chunk in iter(lambda: f.read(4096), b''): - hash_md5.update(chunk) - return hash_md5.hexdigest() - - def mpo_add_file(workflow, parent, file, shortname='Need a name', longdesc='did not add a description.'): - """Add a local file to the workflow attaching to parent. Calculate - checksum and if the file is already in the mpo database, use the - already the UID of the already existing file when adding the data - object - this creates a linkage to the original. The checksum and - local file path and name are added as metadata. - - This function relies on the user space metadata, ips_checksum - and ips_filename. The checksum is the md5 sum and the filename - is expected should have at least a relative qualifying path. - - workflow : workflow_id - parent : parent_id - """ - # if file exist, look for its checksum in the database - try: - checksum = md5(file) - except Exception: - print(('checksum could not find file:', file)) - raise - - is_checksum = self.mpo.search('metadata', params={'key': 'ips_checksum', 'value': checksum}) - # search always returns a list of dictionaries - # if checksum exists, use first dataobject that has it - # api search results are sorted by time - # Note, check this with eqdsk dataobject in test-api - print('%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%') - print(len(is_checksum), file) - print('%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%') - - if len(is_checksum) > 0: - # uid is chosen to be first occurrence - # parent_uid is uid of object metadata is attached to. - file_uid = is_checksum[0]['parent_uid'] - - # Create dataobject reference by uid in the workflow - dataobject = self.mpo.add(workflow, parent, uid=file_uid, name=shortname, desc=longdesc) - self.mpo.meta(dataobject['uid'], 'ips_checksum', checksum) - # add filename metadata the dataobject reference - self.mpo.meta(dataobject['uid'], 'ips_filename', file) - else: - print(('file', file)) - file_uri = file - # Create new dataobject by uri and insert reference in to workflow - dataobject = self.mpo.add(workflow, parent, uri=file_uri, name=shortname, desc=longdesc) - # add checksum metadata to original data object - # add function currently only returns uri field, so fetch full record - full_dataobject = self.mpo.search('dataobject/' + dataobject['uid'])[0] - # add checksum so dataobject and also - self.mpo.meta(full_dataobject['do_uid'], 'ips_checksum', checksum) - self.mpo.meta(dataobject['uid'], 'ips_filename', file) - self.mpo.meta(dataobject['uid'], 'ips_checksum', checksum) - dataobject = full_dataobject - return dataobject - - recordable_events = ['IPS_CALL_BEGIN', 'IPS_STAGE_INPUTS', 'IPS_STAGE_OUTPUTS', 'IPS_CALL_END'] - recordable_mpo_activities = ['IPS_CALL_BEGIN'] - comment = event_data['comment'] - event_type = event_data['eventtype'] - - if event_type not in recordable_events: - return - inp_objs = [] - if event_type == 'IPS_CALL_END': - del sim_data.mpo_steps[-1] - return - try: - if event_type == 'IPS_STAGE_INPUTS': - r = re.compile(r'^Elapsed time = ([0-9]*\.[0-9]*) Path = ([^ ]*) Files = (.*)') - o = r.match(comment) - (_, path, files) = o.groups() - glist = [glob.glob(os.path.join(path, f)) for f in files.split()] - for file_name in [os.path.basename(f) for f in itertools.chain(*glist)]: - mpo_data_obj = mpo_add_file( - sim_data.mpo_wid, sim_data.mpo_wid['uid'], os.path.join(path, file_name), shortname=file_name, longdesc='An input file' - ) - inp_objs.append(mpo_data_obj['uid']) - - if event_type == 'IPS_STAGE_INPUTS' and not inp_objs: - return - - count = self.mpo_name_counter[sim_data.sim_name + event_data['code']] - if event_type == 'IPS_CALL_BEGIN': - target = event_data['comment'].split()[-1] - step_name = '%s %d' % (target, count) - else: - step_name = '{0:s} {1:s} {2:d}'.format(event_data['code'].split('_')[-1], event_type, count) - - if event_type == 'IPS_STAGE_OUTPUTS': - r = re.compile(r'^Elapsed time = ([0-9]*\.[0-9]*) Path = ([^ ]*) Files = (.*)') - o = r.match(comment) - (_, path, files) = o.groups() - if not files: - return - activity = self.mpo.step( - workflow_ID=sim_data.mpo_wid, parentobj_ID=sim_data.mpo_steps[-1], input_objs=inp_objs, name=step_name, desc='%s' % event_data['comment'] - ) - self.mpo_name_counter[sim_data.sim_name + event_data['code']] += 1 - if event_type == 'IPS_STAGE_OUTPUTS': - glist = [glob.glob(os.path.join(path, f)) for f in files.split()] - for file_name in [os.path.basename(f) for f in itertools.chain(*glist)]: - """ - (f_uid, f_hash) = get_file_uid(path, file_name) - if f_uid: - mpo_data_obj = self.mpo.add(workflow_ID=sim_data.mpo_wid, - parentobj_ID=activity['uid'], - name=file_name, - desc="An output file", - uri='file:' + os.path.join(path, file_name), - uid=f_uid, - source = f_uid) - else: - mpo_data_obj = self.mpo.add(workflow_ID=sim_data.mpo_wid, - parentobj_ID=activity['uid'], - name=file_name, - desc="An output file", - uri='file:' + os.path.join(path, file_name)) - """ - mpo_data_obj = mpo_add_file( - sim_data.mpo_wid, - activity['uid'], - # sim_data.mpo_wid['uid'], - os.path.join(path, file_name), - shortname=file_name, - longdesc='An output file', - ) - - except Exception as e: - print('*************', e) - else: - if event_type in recordable_mpo_activities: - sim_data.mpo_steps.append(activity['uid']) From 8dcde272a57f21654496a59bbe68d138b07a35cf Mon Sep 17 00:00:00 2001 From: Lance-Drane Date: Fri, 14 Nov 2025 12:02:07 -0500 Subject: [PATCH 3/3] a few linting fixes Signed-off-by: Lance-Drane --- ipsframework/services.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ipsframework/services.py b/ipsframework/services.py index 8b988081..4fb87b0c 100644 --- a/ipsframework/services.py +++ b/ipsframework/services.py @@ -147,7 +147,7 @@ def launch(binary: Any, task_name: str, working_dir: Union[str, os.PathLike], *a # It can be defined in `task_env` or in `os.environ`, so we look in # both locations to just echo its presence. The flushes are necessary # in some HPC environments to ensure the output appears in the logs. - if task_env is not None and task_env is not {}: + if task_env is not None and task_env != {}: if 'PMIX_SERVER_URI41' in task_env: worker.logger.debug(f"DVM environment variable PMIX_SERVER_URI41 " f"set in task_env to " @@ -2664,12 +2664,12 @@ def setup(self, worker: Worker): 'for Dask worker') # Necessary to ensure the DVM "sees" all the resources to manage - os.environ['PRTE_MCA_ras_slurm_use_entire_allocation'] = "1" + os.environ['PRTE_MCA_ras_slurm_use_entire_allocation'] = '1' self.worker = worker worker.logger = self.logger - self.logger.info(f'Launching DVM') + self.logger.info('Launching DVM') self.worker.dvm_uri_file = f'/tmp/dvm.uri.{os.getpid()}' command = [#'srun', '--mpi=pmix_v4', '-N', os.environ['SLURM_NNODES'], '--ntasks-per-node=1', 'prte', #'--no-daemonize',