#!/usr/bin/env python
# coding: utf-8
"""
This module is the OAR server. It decides what actions must be performed. It is divided into 3 processes:
- One listens to a TCP/IP socket. It waits information or commands from OAR
user program or from the other modules.
- Another one deals with commands thanks to an automaton and launch right
modules one after one.
- The third one handles a pool of forked processes that are used to launch and
stop the jobs.
"""
import os
import re
import signal
import socket
import sys
import time
import zmq
import oar.lib.tools as tools
from oar.lib.globals import get_logger, init_config
# Everything is run by oar user (The real uid of this process.)
os.environ["OARDO_UID"] = str(os.geteuid())
logger = get_logger("oar.modules.almighty", forward_stderr=True)
logger.info("Start Almighty")
# TODO
# send_log_by_email("Start OAR server","[Almighty] Start Almighty");
if "OARDIR" in os.environ:
binpath = os.environ["OARDIR"]
else:
binpath = "/usr/local/lib/oar/"
logger.warning(
"OARDIR env variable must be defined, set it to default value:" + binpath
)
os.environ["OARDIR"] = binpath
leon_command = os.path.join(binpath, "oar-leon")
check_for_villains_command = os.path.join(binpath, "oar-sarko")
check_for_node_changes = os.path.join(binpath, "oar-finaud")
nodeChangeState_command = os.path.join(binpath, "oar-node-change-state")
# Legacy OAR2
# leon_command = binpath + 'Leon'
# check_for_villains_command = binpath + 'sarko'
# check_for_node_changes = binpath + 'finaud'
# nodeChangeState_command = binpath + 'NodeChangeState'
# nodeChangeState_command = 'true'
proxy_appendice_command = os.path.join(binpath, "oar-appendice-proxy")
bipbip_commander = os.path.join(binpath, "oar-bipbip-commander")
greta_command = os.path.join(binpath, "oar-greta")
# This timeout is used to slowdown the main automaton when the
# command queue is empty, it correspond to a blocking read of
# new commands. A High value is likely to reduce the CPU usage of
# the Almighty.
# Setting it to 0 or a low value is not likely to improve performance
# dramatically (because it blocks only when nothing else is to be done).
# Nevertheless it is closely related to the precision at which the
# internal counters are checked
read_commands_timeout = 5 * 1000 # in ms
# This parameter sets the number of pending commands read from
# appendice before proceeding with internal work
# should not be set at a too high value as this would make the
# Almighty weak against flooding
max_successive_read = 1
# Max waiting time before new scheduling attempt (in the case of
# no notification)
schedulertimeout = 60
# Max waiting time before check for jobs whose time allowed has elapsed
villainstimeout = 10
energy_pid = 0
# Signal handle
finishTag = False
# The signal handler must take two arguments
# https://docs.python.org/3.8/library/signal.html#signal.signal
[docs]def signal_handler(sig, stack):
global finishTag
finishTag = True
#
# To avoid zombie processes
#
signal.signal(signal.SIGUSR1, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
[docs]def launch_command(command):
"""Launch the command line passed in parameter"""
# TODO move to oar.lib.tools
# global finishTag
logger.debug("Launching command : [" + command + "]")
p = tools.Popen(command, stdout=tools.PIPE, stderr=tools.PIPE, shell=True)
stdout, stderr = p.communicate()
return_code = p.wait()
logger.debug(command + " terminated")
logger.debug("Exit value : " + str(return_code))
if return_code != 0:
logger.debug("Command failed with error: {}".format(stderr.decode("utf-8")))
return return_code
[docs]def start_greta() -> tools.Popen:
"""Start :mod:`oar.kao.greta`"""
command = greta_command
logger.debug("Launching command : [" + command + "]")
greta = tools.Popen(command)
try:
stdout, stderr = greta.communicate(timeout=10)
logger.info(f"greta: {stdout}\n{stderr}")
except Exception as e:
logger.info(f"greta: {e}")
pass
return greta
[docs]def check_greta(greta, logger):
"""Check the presence greta process"""
logger.debug(f"checking if Greta is still alive: pid:{greta.pid}")
res = tools.check_process(greta.pid, logger)
try:
greta.communicate(timeout=0)
except Exception as e:
logger.debug(f"{vars(e)}")
pass
return res
#
# functions associated with each state of the automaton
#
[docs]def check_for_villains():
"""Start :mod:`oar.modules.sarko`"""
return launch_command(check_for_villains_command)
[docs]def check_nodes():
"""Start :mod:`oar.modules.finaud`"""
return launch_command(check_for_node_changes)
[docs]def leon():
"""Start :mod:`oar.modules.leon`"""
return launch_command(leon_command)
[docs]def nodeChangeState():
"""Start :mod:`oar.modules.node_change_state`"""
return launch_command(nodeChangeState_command)
[docs]class Almighty(object):
def __init__(self):
self.state = "Init"
self._init(binpath)
logger.debug("Current state [" + self.state + "]")
# Activate appendice socket
self.context = zmq.Context()
self.appendice = self.context.socket(zmq.PULL)
ip_addr_server = socket.gethostbyname(self.config["SERVER_HOSTNAME"])
try:
self.appendice.bind(
"tcp://" + ip_addr_server + ":" + self.config["APPENDICE_SERVER_PORT"]
)
except Exception as e:
logger.error(f"Failed to activate appendice endpoint: {e}")
sys.exit(1)
self.set_appendice_timeout(read_commands_timeout)
# Starting of Greta, the Energy saving module
self.greta = None
if self.config["ENERGY_SAVING_INTERNAL"] == "yes":
logger.info("Energy saving internal mode: Starting up Greta")
self.greta = start_greta()
logger.info(f"{self.greta}")
self.lastscheduler = 0
self.lastvillains = 0
self.lastchecknodes = 0
self.command_queue = []
self.scheduler_wanted = 0 # 1 if the scheduler must be run next time update
logger.debug("Init done")
self.state = "Qget"
self.start_companions()
def _init(self, binpath):
config = init_config()
# Set undefined config value to default one
DEFAULT_CONFIG = {
"META_SCHED_CMD": "kao",
"SERVER_HOSTNAME": "localhost",
"APPENDICE_SERVER_PORT": "6670", # new endpoint which replaces appendice
"SCHEDULER_MIN_TIME_BETWEEN_2_CALLS": "1",
"FINAUD_FREQUENCY": "300",
"LOG_FILE": "/var/log/oar.log",
"ENERGY_SAVING_INTERNAL": "no",
}
config.setdefault_config(DEFAULT_CONFIG)
self.meta_sched_command = config["META_SCHED_CMD"]
m = re.match(r"^\/", self.meta_sched_command)
if not m:
self.meta_sched_command = os.path.join(binpath, self.meta_sched_command)
# Min waiting time before 2 scheduling attempts
self.scheduler_min_time_between_2_calls = int(
config["SCHEDULER_MIN_TIME_BETWEEN_2_CALLS"]
)
# Max waiting time before check node states
self.checknodestimeout = int(config["FINAUD_FREQUENCY"])
self.Log_file = config["LOG_FILE"]
self.config = config
[docs] def start_companions(self):
"""Start appendice :mod:`oar.modules.appendice_proxy` and :mod:`oar.modules.bipbip_commander` commander processes"""
self.appendice_proxy = tools.Popen(proxy_appendice_command)
self.bipbip_commander = tools.Popen(bipbip_commander)
[docs] def time_update(self):
current = tools.get_time() # ---> TODO my $current = time; -> ???
logger.debug("Timeouts check : " + str(current))
# check timeout for scheduler
if (
(current >= (self.lastscheduler + schedulertimeout))
or (self.scheduler_wanted >= 1)
and (
current
>= (self.lastscheduler + self.scheduler_min_time_between_2_calls)
)
):
logger.debug("Scheduling timeout")
# lastscheduler = current + schedulertimeout
self.add_command("Scheduling")
if current >= (self.lastvillains + villainstimeout):
logger.debug("Villains check timeout")
# lastvillains = current + villainstimeout
self.add_command("Villains")
if (current >= (self.lastchecknodes + self.checknodestimeout)) and (
self.checknodestimeout > 0
):
logger.debug("Node check timeout")
# lastchecknodes = -current + checknodestimeout
self.add_command("Finaud")
[docs] def set_appendice_timeout(self, timeout):
"""Set timeout appendice socket"""
self.appendice.RCVTIMEO = timeout
[docs] def qget(self, timeout):
"""function used by the main automaton to get notifications from appendice"""
# timeout = 10 * 1000
self.set_appendice_timeout(timeout)
logger.debug("Timeout value:" + str(timeout))
try:
answer = self.appendice.recv_json()
except zmq.error.Again as e:
logger.debug("Timeout from appendice:" + str(e))
# return (None, {'cmd': 'Time'})
return {"cmd": "Time"}
except zmq.ZMQError as e:
logger.error("Something is wrong with appendice" + str(e))
# return (15, None)
return {"cmd": "Time"}
# return (None, answer)
return answer
[docs] def add_command(self, command):
"""as commands are just notifications that will
handle all the modifications in the base up to now, we should
avoid duplication in the command file"""
m = re.compile("^" + command + "$")
flag = True
for cmd in self.command_queue:
if re.match(m, cmd):
flag = False
break
if flag:
self.command_queue.append(command)
[docs] def read_commands(self, timeout=read_commands_timeout): # TODO
"""read commands until reaching the maximal successive read value or
having read all of the pending commands"""
command = None
remaining = max_successive_read
while (command != "Time") and remaining:
command = self.qget(timeout)
if remaining != max_successive_read:
timeout = 0
if command is None:
logger.debug(f"qget command: {command}")
break
self.add_command(command["cmd"])
remaining -= 1
logger.debug(
"Got command " + command["cmd"] + ", " + str(remaining) + " remaining"
)
[docs] def run(self, loop=True):
"""Start :mod:`oar.modules.almigthy` main loop."""
global finishTag
while True:
logger.debug("Current state [" + self.state + "]")
# We stop Almighty and its child
if finishTag:
if energy_pid:
logger.debug("kill child process " + str(energy_pid))
tools.kill(energy_pid, signal.SIGKILL)
# TODO: $Redirect_STD_process = OAR::Modules::Judas::redirect_everything();
Redirect_STD_process = False
if Redirect_STD_process:
tools.kill(Redirect_STD_process, signal.SIGKILL)
# TODO ipc_clean()
logger.warning("Stop Almighty\n")
# TODO: send_log_by_email("Stop OAR server", "[Almighty] Stop Almighty")
return 10
# We check Greta
if self.greta and not check_greta(self.greta, logger):
logger.warning("Energy saving module (greta) died. Restarting it.")
self.greta = start_greta()
# QGET
elif self.state == "Qget":
# if len(self.command_queue) > 0:
# self.read_commands(0)
# pass
# else:
self.read_commands(read_commands_timeout)
logger.debug("Command queue : " + str(self.command_queue))
command = self.command_queue.pop(0)
# Remove useless 'Time' command to enhance reactivity
if command == "Time" and self.command_queue != []:
command = self.command_queue.pop(0)
logger.debug("Qtype = [" + command + "]")
if (
(command == "Qsub")
or (command == "Qsub -I")
or (command == "Term")
or (command == "BipBip")
or (command == "Scheduling")
or (command == "Qresume")
or (command == "Walltime")
):
self.state = "Scheduler"
elif command == "Qdel":
self.state = "Leon"
elif command == "Villains":
self.state = "Check for villains"
elif command == "Finaud":
self.state = "Check node states"
elif command == "Time":
self.state = "Time update"
elif command == "ChState":
self.state = "Change node state"
else:
logger.error("Unknown command found in queue : " + command)
# SCHEDULER
elif self.state == "Scheduler":
current_time = tools.get_time()
if current_time >= (
self.lastscheduler + self.scheduler_min_time_between_2_calls
):
self.scheduler_wanted = 0
# First, check pending events
check_result = nodeChangeState()
if check_result == 2:
self.state = "Leon"
self.add_command("Term")
elif check_result == 1:
self.state = "Scheduler"
elif check_result == 0:
# Launch the scheduler
# We check Greta just before starting the scheduler
# because if the pipe is not read, it may freeze oar
if (energy_pid > 0) and not check_greta(self.greta, logger):
logger.warning(
"Energy saving module (greta) died. Restarting it."
)
time.sleep(5)
start_greta()
scheduler_result = self.meta_scheduler()
self.lastscheduler = tools.get_time()
if scheduler_result == 0:
self.state = "Time update"
elif scheduler_result == 1:
self.state = "Scheduler"
elif scheduler_result == 2:
self.state = "Leon"
else:
logger.error(
"Scheduler returned an unknown value : scheduler_result"
)
finishTag = 1
else:
logger.error(
"nodeChangeState_command returned an unknown value."
)
finishTag = 1
else:
self.scheduler_wanted = 1
self.state = "Time update"
logger.debug(
"Scheduler call too early, waiting... ("
+ str(current_time)
+ ">= ("
+ str(self.lastscheduler)
+ " + "
+ str(self.scheduler_min_time_between_2_calls)
+ ")"
)
# TIME UPDATE
elif self.state == "Time update":
self.time_update()
self.state = "Qget"
# CHECK FOR VILLAINS
elif self.state == "Check for villains":
check_result = check_for_villains()
self.lastvillains = tools.get_time()
if check_result == 1:
self.state = "Leon"
elif check_result == 0:
self.state = "Time update"
else:
logger.error(
"check_for_villains_command returned an unknown value : check_result."
)
finishTag = 1
# CHECK NODE STATES
elif self.state == "Check node states":
check_result = check_nodes()
self.lastchecknodes = tools.get_time()
if check_result == 1:
self.state = "Change node state"
elif check_result == 0:
self.state = "Time update"
else:
logger.error("check_for_node_changes returned an unknown value.")
finishTag = 1
# LEON
elif self.state == "Leon":
check_result = leon()
self.state = "Time update"
if check_result == 1:
self.add_command("Term")
# Change state for dynamic nodes
elif self.state == "Change node state":
check_result = nodeChangeState()
if check_result == 2:
self.state = "Leon"
self.add_command("Term")
elif check_result == 1:
self.state = "Scheduler"
elif check_result == 0:
self.state = "Time update"
else:
logger.error("nodeChangeState_command returned an unknown value.")
finishTag = 1
else:
logger.warning("Critical bug !!!!\n")
logger.error("Almighty just falled into an unknown state !!!.")
finishTag = 1
if not loop:
break
return 0
[docs]def main(): # pragma: no cover
almighty = Almighty()
return almighty.run()
if __name__ == "__main__": # pragma: no cover
sys.exit(main())