HEX
Server: Apache/2.4.6 (CentOS) OpenSSL/1.0.2k-fips PHP/7.2.34
System: Linux atalantini.com 3.10.0-1127.13.1.el7.x86_64 #1 SMP Tue Jun 23 15:46:38 UTC 2020 x86_64
User: root (0)
PHP: 7.2.34
Disabled: NONE
Upload Files
File: //lib/python2.7/site-packages/kobo/worker/task.py
# -*- coding: utf-8 -*-


import copy
import signal

from kobo.plugins import Plugin
from kobo.shortcuts import force_list
from kobo.client.constants import TASK_STATES


__all__ = (
    "TaskBase",
    "FailTaskException",
)


class FailTaskException(Exception):
    """Terminate the task."""
    pass


class TaskBase(Plugin):
    """Base class for all tasks."""
    enabled = True

    def __init__(self, hub, conf, task_id, args):
        self._hub = hub            # created by taskmanager
        self._conf = conf
        self._task_id = task_id
        self._task_info = self.hub.worker.get_task(self.task_id)
        self._task_manager = None  # created by taskmanager (only for foreground tasks)
        self._args = args
        self._subtask_list = []
        self.result = ""

    @property
    def hub(self):
        return self._hub

    @property
    def conf(self):
        return self._conf

    @property
    def task_id(self):
        return self._task_id

    @property
    def task_info(self):
        return self._task_info

    def _get_task_manager(self):
        if not getattr(self.__class__, "foreground", False):
            return None
        return self._task_manager

    def _set_task_manager(self, value):
        if not getattr(self.__class__, "foreground", False):
            raise ValueError("Cannot set task_manager for a background task.")
        self._task_manager = value

    task_manager = property(_get_task_manager, _set_task_manager)

    @property
    def args(self):
        # deepcopy to prevent modification
        return copy.deepcopy(self._args)

    @property
    def subtask_list(self):
        # deepcopy to prevent modification
        return copy.deepcopy(self._subtask_list)

    def run(self):
        """Run the task."""
        raise NotImplementedError()

    def fail(self):
        """Fail the task."""
        raise FailTaskException()

    @classmethod
    def cleanup(cls, hub, conf, task_info):
        pass

    @classmethod
    def notification(cls, hub, conf, task_info):
        pass

    def spawn_subtask(self, method, args, label=""):
        """Spawn a new subtask."""
        if self.foreground:
            raise RuntimeError("Foreground tasks can't spawn subtasks.")

        subtask_id = self.hub.worker.create_subtask(label, method, args, self.task_id)
        self._subtask_list.append(subtask_id)
        return subtask_id

    def wait(self, subtasks=None):
        """Wait until subtasks finish.

        subtasks = None - wait for all subtasks
        subtasks = [task_id list] - wait for selected subtasks
        """

        if self.foreground:
            # wait would call signal.pause() in the *main* worker thread and lock program forever
            raise RuntimeError("Foreground tasks can't wait on subtasks.")

        if subtasks is not None:
            subtasks = force_list(subtasks)

        self.hub.worker.wait(self.task_id, subtasks)

        finished = []
        while True:
            (finished, unfinished) = self.hub.worker.check_wait(self.task_id)

            if len(unfinished) == 0:
                # all done
                break

            # sleep
            signal.pause()
            # wake up on signal to check the status

        # remove finished subtasks from the list, check results
        fail = False
        for i in finished:
            state = self.hub.worker.get_task(i)
            if state['state'] != TASK_STATES['CLOSED']:
                fail = True
            self._subtask_list.remove(i)

        if fail:
            print "Failing because of at least one subtask hasn't closed properly."
            self.fail()

        return finished