Source code for oar.modules.bipbip_commander

#!/usr/bin/env python
# coding: utf-8

"""
Process that launches and manages bipbip and leon processes.

..
    OAREXEC_REGEXP   'OAREXEC_(\d+)_(\d+)_(\d+|N)_(\d+)'
    OARRUNJOB_REGEXP  'OARRUNJOB_(\d+)'
    LEONEXTERMINATE_REGEXP   'LEONEXTERMINATE_(\d+)'

Commands:
    - OAREXEC
    - OARRUNJOB
    - LEONEXTERMINATE


TODO: jsonify ?

Example:

.. code-block:: JSON

    {
        "job_id": 5,
        "cmd": "LEONEXTERMINATE"
        "args": [5]
    }

"""  # noqa: W605

import os
import socket
from typing import Any

import zmq

import oar.lib.tools as tools
from oar.lib.globals import get_logger, init_config


[docs]def launch_command(command, logger): """Launch the command line passed in parameter""" # TODO move to oar.lib.tools # global finishTag logger.debug("Launching command : [" + command + "]") p = tools.Popen(command, stdout=tools.PIPE, stderr=tools.PIPE, shell=True) stdout, stderr = p.communicate() return_code = p.wait() logger.debug(command + " terminated") logger.debug("Exit value : " + str(return_code)) if return_code != 0: logger.debug("Command failed with error: {}".format(stderr.decode("utf-8"))) return return_code
[docs]def bipbip_leon_executor( command: dict[str, Any], leon_command: str, bipbip_command: str, logger ): job_id = command["job_id"] logger.info(f"executing job: {job_id}") if command["cmd"] == "LEONEXTERMINATE": cmd_arg = [leon_command, str(job_id)] else: cmd_arg = [bipbip_command, str(job_id)] + command["args"] logger.debug("Launching: " + str(cmd_arg)) # TODO returncode, launch_command(" ".join(cmd_arg), logger)
# tools.call(cmd_arg)
[docs]class BipbipCommander(object): def __init__(self, config=None): if not config: config = init_config() # Set undefined config value to default one DEFAULT_CONFIG = { "SERVER_HOSTNAME": "localhost", "APPENDICE_SERVER_PORT": "6670", "BIPBIP_COMMANDER_SERVER": "localhost", "BIPBIP_COMMANDER_PORT": "6671", "MAX_CONCURRENT_JOBS_STARTING_OR_TERMINATING": "25", "DETACH_JOB_FROM_SERVER": "1", "LOG_FILE": "/var/log/oar.log", } config.setdefault_config(DEFAULT_CONFIG) # Max number of concurrent bipbip processes self.Max_bipbip_processes = int( config["MAX_CONCURRENT_JOBS_STARTING_OR_TERMINATING"] ) self.Detach_oarexec = config["DETACH_JOB_FROM_SERVER"] # Maximum duration a a bipbip process (after that time the process is killed) self.Max_bipbip_process_duration = 30 * 60 self.logger = get_logger("oar.modules.bipbip_commander", forward_stderr=True) self.logger.info("Start Bipbip Commander") if "OARDIR" in os.environ: binpath = os.environ["OARDIR"] else: binpath = "/usr/local/lib/oar/" os.environ["OARDIR"] = binpath self.logger.warning( "OARDIR env variable must be defined, " + binpath + " is used by default" ) self.leon_command = os.path.join(binpath, "oar-leon") self.bipbip_command = os.path.join(binpath, "oar-bipbip") # Initialize zeromq context self.context = zmq.Context() # TODO signal Almighty # self.appendice = self.context.socket(zmq.PUSH) # to signal Almighty # self.appendice.connect('tcp://' + config['SERVER_HOSTNAME'] + ':' + config['APPENDICE_SERVER_PORT']) # IP addr is required when bind function is used on zmq socket ip_addr_bipbip_commander = socket.gethostbyname( config["BIPBIP_COMMANDER_SERVER"] ) self.notification = self.context.socket( zmq.PULL ) # receive zmq formatted OAREXEC / OARRUNJOB / LEONEXTERMINATE self.notification.bind( "tcp://" + ip_addr_bipbip_commander + ":" + config["BIPBIP_COMMANDER_PORT"] ) self.bipbip_leon_commands_to_run = [] self.bipbip_leon_commands_to_requeue = [] self.bipbip_leon_executors = {}
[docs] def set_notification_timeout(self, timeout): """Set timeout for zmq notification socket""" self.notification.RCVTIMEO = timeout
[docs] def run(self, loop=True): # TODO: add a shutdown procedure while True: # add_timeout if bipbip_leon_commands_to_run is not empty try: command = self.notification.recv_json() self.logger.debug( "bipbip commander received notification:" + str(command) ) self.bipbip_leon_commands_to_run.append(command) except zmq.error.Again as e: self.logger.debug("Timeout on notification:" + str(e)) if self.bipbip_leon_commands_to_run == []: self.logger.error( "Not queued commands with timeout actived is abnormal" ) except zmq.ZMQError as e: self.logger.error( "Something is wrong with notification reception" + str(e) ) exit(1) while ( len(self.bipbip_leon_commands_to_run) > 0 and len(self.bipbip_leon_executors.keys()) <= self.Max_bipbip_processes ): self.logger.debug("some job to run!") command = self.bipbip_leon_commands_to_run.pop(0) job_id = command["job_id"] flag_exec = True if job_id in self.bipbip_leon_executors: if not self.bipbip_leon_executors[job_id].is_alive(): del self.bipbip_leon_executors[job_id] else: flag_exec = False # requeue command self.logger.debug( "A process is already running for the job " + str(job_id) + ". We requeue: " + str(command) ) self.bipbip_leon_commands_to_requeue.append(command) if flag_exec: # exec self.logger.info("starting a new bl executor") executor = tools.Process( target=bipbip_leon_executor, args=( command, self.leon_command, self.bipbip_command, self.logger, ), kwargs={}, ) executor.start() self.bipbip_leon_executors[job_id] = executor # append commands to requeue self.bipbip_leon_commands_to_run += self.bipbip_leon_commands_to_requeue self.bipbip_leon_commands_to_requeue = [] # Remove finished executors: for job_id in list(self.bipbip_leon_executors.keys()): if not self.bipbip_leon_executors[job_id].is_alive(): self.logger.debug( "Executor Exitcode: " + str(self.bipbip_leon_executors[job_id].exitcode) ) del self.bipbip_leon_executors[job_id] if self.bipbip_leon_commands_to_run == []: self.set_notification_timeout(-1) else: self.set_notification_timeout(500) if not loop: break
[docs]def main(): # pragma: no cover bipbip_commander = BipbipCommander() bipbip_commander.run()
if __name__ == "__main__": # pragma: no cover main()