Source code for reana_job_controller.job_manager
# -*- coding: utf-8 -*-
#
# This file is part of REANA.
# Copyright (C) 2019 CERN.
#
# REANA is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
"""Job Manager."""
import json
import shlex
from flask import current_app
from reana_commons.utils import calculate_file_access_time
from reana_db.database import Session
from reana_db.models import Job as JobTable
from reana_db.models import JobCache, JobStatus, Workflow
[docs]class JobManager():
"""Job management interface."""
def __init__(self, docker_img='', cmd=[], env_vars={}, job_id=None,
workflow_uuid=None, workflow_workspace=None, job_name=None):
"""Instanciates basic job.
:param docker_img: Docker image.
:type docker_img: str
:param cmd: Command to execute.
:type cmd: list
:param env_vars: Environment variables.
:type env_vars: dict
:param job_id: Unique job id.
:type job_id: str
:param workflow_uuid: Unique workflow id.
:type workflow_uuid: str
:param workflow_workspace: Absolute path to workspace
:type workflow_workspace: str
:param job_name: Name of the job.
:type job_name: str
"""
self.docker_img = docker_img or ''
if isinstance(cmd, str):
self.cmd = shlex.split(cmd)
else:
self.cmd = cmd or []
self.job_id = job_id
self.workflow_uuid = workflow_uuid
self.workflow_workspace = workflow_workspace
self.job_name = job_name
self.env_vars = self._extend_env_vars(env_vars)
[docs] def execution_hook(fn):
"""Add before execution hooks and DB operations."""
def wrapper(inst, *args, **kwargs):
inst.before_execution()
backend_job_id = fn(inst, *args, **kwargs)
inst.create_job_in_db(backend_job_id)
inst.cache_job()
return backend_job_id
return wrapper
[docs] def before_execution(self):
"""Before job submission hook."""
pass
[docs] def after_execution(self):
"""After job submission hook."""
pass
@execution_hook
def execute(self):
"""Execute a job.
:returns: Job ID.
:rtype: str
"""
raise NotImplementedError
[docs] def get_status(self):
"""Get job status.
:returns: job status.
:rtype: str
"""
raise NotImplementedError
[docs] def get_logs(self):
"""Get job log.
:returns: stderr, stdout of a job.
:rtype: dict
"""
raise NotImplementedError
[docs] def stop(self):
"""Stop a job."""
raise NotImplementedError
[docs] def create_job_in_db(self, backend_job_id):
"""Create job in db."""
job_db_entry = JobTable(
backend_job_id=backend_job_id,
workflow_uuid=self.workflow_uuid,
status=JobStatus.created.name,
compute_backend=self.compute_backend,
cvmfs_mounts=self.cvmfs_mounts or '',
shared_file_system=self.shared_file_system or False,
docker_img=self.docker_img,
cmd=json.dumps(self.cmd),
env_vars=json.dumps(self.env_vars),
deleted=False,
job_name=self.job_id,
prettified_cmd=json.dumps(self.cmd))
Session.add(job_db_entry)
Session.commit()
self.job_id = str(job_db_entry.id_)
[docs] def cache_job(self):
"""Cache a job."""
workflow = Session.query(Workflow).filter_by(
id_=self.workflow_uuid).one_or_none()
access_times = calculate_file_access_time(workflow.get_workspace())
prepared_job_cache = JobCache()
prepared_job_cache.job_id = self.job_id
prepared_job_cache.access_times = access_times
Session.add(prepared_job_cache)
Session.commit()
[docs] def update_job_status(self):
"""Update job status in DB."""
pass
def _extend_env_vars(self, env_vars):
"""Extend environment variables with REANA specific ones."""
prefix = 'REANA'
env_vars[prefix + '_WORKSPACE'] = self.workflow_workspace
env_vars[prefix + '_WORKFLOW_UUID'] = str(self.workflow_uuid)
return env_vars