Source code for reana_job_controller.rest

# -*- coding: utf-8 -*-
#
# This file is part of REANA.
# Copyright (C) 2017, 2018 CERN.
#
# REANA is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""Rest API endpoint for job management."""

import copy
import json
import logging

from flask import Blueprint, current_app, jsonify, request

from reana_job_controller.errors import ComputingBackendSubmissionError
from reana_job_controller.job_db import (JOB_DB, job_exists, job_is_cached,
                                         retrieve_all_jobs,
                                         retrieve_backend_job_id, retrieve_job,
                                         retrieve_job_logs)
from reana_job_controller.schemas import Job, JobRequest
from reana_job_controller.utils import update_workflow_logs

blueprint = Blueprint('jobs', __name__)

job_request_schema = JobRequest()
job_schema = Job()


@blueprint.route('/job_cache', methods=['GET'])
[docs]def check_if_cached(): r"""Check if job is cached. --- get: summary: Returns boolean depicting if job is in cache. description: >- This resource takes a job specification and the workflow json, and checks if the job to be created, already exists in the cache. operationId: check_if_cached parameters: - name: job_spec in: query description: Required. Specification of the job. required: true type: string - name: workflow_json in: query description: Required. Specification of the workflow. required: true type: string - name: workflow_workspace in: query description: Required. Path to workflow workspace. required: true type: string produces: - application/json responses: 200: description: >- Request succeeded. Returns boolean depicting if job is in cache. examples: application/json: { "cached": True, "result_path": "/reana/default/0000/xe2123d/archive/asd213" } 400: description: >- Request failed. The incoming data specification seems malformed. 500: description: >- Request failed. Internal controller error. """ job_spec = json.loads(request.args['job_spec']) workflow_json = json.loads(request.args['workflow_json']) workflow_workspace = request.args['workflow_workspace'] result = job_is_cached(job_spec, workflow_json, workflow_workspace) if result: return jsonify({"cached": True, "result_path": result['result_path'], "job_id": result['job_id']}), 200 else: return jsonify({"cached": False, "result_path": None}), 200
@blueprint.route('/jobs', methods=['GET'])
[docs]def get_jobs(): # noqa r"""Get all active jobs. --- get: summary: Returns list of all active jobs. description: >- This resource is not expecting parameters and it will return a list representing all active jobs in JSON format. operationId: get_jobs produces: - application/json responses: 200: description: >- Request succeeded. The response contains the list of all active jobs. schema: type: array items: $ref: '#/definitions/Job' examples: application/json: { "jobs": { "1612a779-f3fa-4344-8819-3d12fa9b9d90": { "cmd": "date", "cvmfs_mounts": ['atlas.cern.ch', 'atlas-condb.cern.ch'], "docker_img": "busybox", "experiment": "atlas", "job_id": "1612a779-f3fa-4344-8819-3d12fa9b9d90", "max_restart_count": 3, "restart_count": 0, "status": "succeeded" }, "2e4bbc1d-db5e-4ee0-9701-6e2b1ba55c20": { "cmd": "date", "cvmfs_mounts": ['atlas.cern.ch', 'atlas-condb.cern.ch'], "docker_img": "busybox", "experiment": "atlas", "job_id": "2e4bbc1d-db5e-4ee0-9701-6e2b1ba55c20", "max_restart_count": 3, "restart_count": 0, "status": "started" } } } """ return jsonify({"jobs": retrieve_all_jobs()}), 200
@blueprint.route('/jobs', methods=['POST'])
[docs]def create_job(): # noqa r"""Create a new job. --- post: summary: Creates a new job. description: >- This resource is expecting JSON data with all the necessary information of a new job. operationId: create_job consumes: - application/json produces: - application/json parameters: - name: job in: body description: Information needed to instantiate a Job required: true schema: $ref: '#/definitions/JobRequest' responses: 201: description: Request succeeded. The job has been launched. schema: type: object properties: job_id: type: string examples: application/json: { "job_id": "cdcf48b1-c2f3-4693-8230-b066e088c6ac" } 400: description: >- Request failed. The incoming data specification seems malformed. 500: description: >- Request failed. Internal controller error. The job could probably not have been allocated. """ json_data = request.get_json() if not json_data: return jsonify({'message': 'Empty request'}), 400 # Validate and deserialize input job_request, errors = job_request_schema.load(json_data) if errors: return jsonify(errors), 400 compute_backend = job_request.get( 'compute_backend', current_app.config['DEFAULT_COMPUTE_BACKEND']) if compute_backend not in current_app.config['SUPPORTED_COMPUTE_BACKENDS']: msg = 'Job submission failed. Backend {} is not supported.'.format( compute_backend) logging.error(msg, exc_info=True) update_workflow_logs(job_request['workflow_uuid'], msg) return jsonify({'job': msg}), 500 job_obj = current_app.config['COMPUTE_BACKENDS'][compute_backend]( docker_img=job_request['docker_img'], cmd=job_request['cmd'], env_vars=job_request['env_vars'], workflow_uuid=job_request['workflow_uuid'], workflow_workspace=str(job_request['workflow_workspace']), cvmfs_mounts=job_request['cvmfs_mounts'], shared_file_system=job_request['shared_file_system'], job_name=job_request.get('job_name', ''), kerberos=job_request.get('kerberos', ''), ) backend_jod_id = job_obj.execute() if job_obj: job = copy.deepcopy(job_request) job['status'] = 'started' job['restart_count'] = 0 job['max_restart_count'] = 3 job['deleted'] = False job['obj'] = job_obj job['job_id'] = job_obj.job_id job['backend_job_id'] = backend_jod_id job['compute_backend'] = compute_backend JOB_DB[str(job['job_id'])] = job current_app.config['JOB_MONITORS'][compute_backend]() return jsonify({'job_id': job['job_id']}), 201 else: return jsonify({'job': 'Could not be allocated'}), 500
@blueprint.route('/jobs/<job_id>', methods=['GET'])
[docs]def get_job(job_id): # noqa r"""Get a job. --- get: summary: Returns details about a given job. description: >- This resource is expecting the job's UUID as a path parameter. Its information will be served in JSON format. operationId: get_job produces: - application/json parameters: - name: job_id in: path description: Required. ID of the job. required: true type: string responses: 200: description: >- Request succeeded. The response contains details about the given job ID. schema: $ref: '#/definitions/Job' examples: application/json: "job": { "cmd": "date", "cvmfs_mounts": ['atlas.cern.ch', 'atlas-condb.cern.ch'], "docker_img": "busybox", "experiment": "atlas", "job_id": "cdcf48b1-c2f3-4693-8230-b066e088c6ac", "max_restart_count": 3, "restart_count": 0, "status": "started" } 404: description: Request failed. The given job ID does not seem to exist. examples: application/json: "message": >- The job cdcf48b1-c2f3-4693-8230-b066e088444c doesn't exist """ if job_exists(job_id): jobdict = retrieve_job(job_id) return jsonify(jobdict), 200 else: return jsonify({'message': 'The job {} doesn\'t exist' .format(job_id)}), 400
@blueprint.route('/jobs/<job_id>/logs', methods=['GET'])
[docs]def get_logs(job_id): # noqa r"""Job logs. --- get: summary: Returns the logs for a given job. description: >- This resource is expecting the job's UUID as a path parameter. Its information will be served in JSON format. operationId: get_logs produces: - application/json parameters: - name: job_id in: path description: Required. ID of the job. required: true type: string responses: 200: description: >- Request succeeded. The response contains the logs for the given job. examples: application/json: "log": "Tue May 16 13:52:00 CEST 2017\n" 404: description: Request failed. The given job ID does not seem to exist. examples: application/json: "message": >- The job cdcf48b1-c2f3-4693-8230-b066e088444c doesn't exist """ if job_exists(job_id): return retrieve_job_logs(job_id) else: return jsonify({'message': 'The job {} doesn\'t exist' .format(job_id)}), 404
@blueprint.route('/jobs/<job_id>/', methods=['DELETE'])
[docs]def delete_job(job_id): # noqa r"""Delete a given job. --- delete: summary: Deletes a given job. description: >- This resource expects the `job_id` of the job to be deleted. operationId: delete_job consumes: - application/json produces: - application/json parameters: - name: job_id in: path description: Required. ID of the job to be deleted. required: true type: string - name: compute_backend in: query description: Job compute backend. required: false type: string responses: 204: description: >- Request accepted. A request to delete the job has been sent to the compute backend. 404: description: Request failed. The given job ID does not seem to exist. examples: application/json: "message": >- The job cdcf48b1-c2f3-4693-8230-b066e088444c doesn't exist 502: description: >- Request failed. Something went wrong while calling the compute backend. examples: application/json: "message": >- Connection to compute backend failed: [reason] """ if job_exists(job_id): try: compute_backend = request.args.get( 'compute_backend', current_app.config['DEFAULT_COMPUTE_BACKEND']) backend_job_id = retrieve_backend_job_id(job_id) current_app.config['COMPUTE_BACKENDS'][compute_backend].stop( backend_job_id) return jsonify(), 204 except ComputingBackendSubmissionError as e: return jsonify( {'message': 'Connection to compute backend failed:\n{}' .format(e)}), 502 else: return jsonify({'message': 'The job {} doesn\'t exist' .format(job_id)}), 404
@blueprint.route('/apispec', methods=['GET']) def get_openapi_spec(): """Get OpenAPI Spec.""" return jsonify(current_app.config['OPENAPI_SPEC'])