621 lines
19 KiB
Python
621 lines
19 KiB
Python
# Copyright 2019 Camptocamp
|
|
# Copyright 2019 Guewen Baconnier
|
|
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html)
|
|
|
|
import itertools
|
|
import logging
|
|
import uuid
|
|
from collections import defaultdict, deque
|
|
|
|
from .job import Job
|
|
from .utils import must_run_without_delay
|
|
|
|
_logger = logging.getLogger(__name__)
|
|
|
|
|
|
def group(*delayables):
|
|
"""Return a group of delayable to form a graph
|
|
|
|
A group means that jobs can be executed concurrently.
|
|
A job or a group of jobs depending on a group can be executed only after
|
|
all the jobs of the group are done.
|
|
|
|
Shortcut to :class:`~odoo.addons.queue_job.delay.DelayableGroup`.
|
|
|
|
Example::
|
|
|
|
g1 = group(delayable1, delayable2)
|
|
g2 = group(delayable3, delayable4)
|
|
g1.on_done(g2)
|
|
g1.delay()
|
|
"""
|
|
return DelayableGroup(*delayables)
|
|
|
|
|
|
def chain(*delayables):
|
|
"""Return a chain of delayable to form a graph
|
|
|
|
A chain means that jobs must be executed sequentially.
|
|
A job or a group of jobs depending on a group can be executed only after
|
|
the last job of the chain is done.
|
|
|
|
Shortcut to :class:`~odoo.addons.queue_job.delay.DelayableChain`.
|
|
|
|
Example::
|
|
|
|
chain1 = chain(delayable1, delayable2, delayable3)
|
|
chain2 = chain(delayable4, delayable5, delayable6)
|
|
chain1.on_done(chain2)
|
|
chain1.delay()
|
|
"""
|
|
return DelayableChain(*delayables)
|
|
|
|
|
|
class Graph:
|
|
"""Acyclic directed graph holding vertices of any hashable type
|
|
|
|
This graph is not specifically designed to hold :class:`~Delayable`
|
|
instances, although ultimately it is used for this purpose.
|
|
"""
|
|
|
|
__slots__ = "_graph"
|
|
|
|
def __init__(self, graph=None):
|
|
if graph:
|
|
self._graph = graph
|
|
else:
|
|
self._graph = {}
|
|
|
|
def add_vertex(self, vertex):
|
|
"""Add a vertex
|
|
|
|
Has no effect if called several times with the same vertex
|
|
"""
|
|
self._graph.setdefault(vertex, set())
|
|
|
|
def add_edge(self, parent, child):
|
|
"""Add an edge between a parent and a child vertex
|
|
|
|
Has no effect if called several times with the same pair of vertices
|
|
"""
|
|
self.add_vertex(child)
|
|
self._graph.setdefault(parent, set()).add(child)
|
|
|
|
def vertices(self):
|
|
"""Return the vertices (nodes) of the graph"""
|
|
return set(self._graph)
|
|
|
|
def edges(self):
|
|
"""Return the edges (links) of the graph"""
|
|
links = []
|
|
for vertex, neighbours in self._graph.items():
|
|
for neighbour in neighbours:
|
|
links.append((vertex, neighbour))
|
|
return links
|
|
|
|
# from
|
|
# https://codereview.stackexchange.com/questions/55767/finding-all-paths-from-a-given-graph
|
|
def paths(self, vertex):
|
|
"""Generate the maximal cycle-free paths in graph starting at vertex.
|
|
|
|
>>> g = {1: [2, 3], 2: [3, 4], 3: [1], 4: []}
|
|
>>> sorted(self.paths(1))
|
|
[[1, 2, 3], [1, 2, 4], [1, 3]]
|
|
>>> sorted(self.paths(3))
|
|
[[3, 1, 2, 4]]
|
|
"""
|
|
path = [vertex] # path traversed so far
|
|
seen = {vertex} # set of vertices in path
|
|
|
|
def search():
|
|
dead_end = True
|
|
for neighbour in self._graph[path[-1]]:
|
|
if neighbour not in seen:
|
|
dead_end = False
|
|
seen.add(neighbour)
|
|
path.append(neighbour)
|
|
yield from search()
|
|
path.pop()
|
|
seen.remove(neighbour)
|
|
if dead_end:
|
|
yield list(path)
|
|
|
|
yield from search()
|
|
|
|
def topological_sort(self):
|
|
"""Yields a proposed order of nodes to respect dependencies
|
|
|
|
The order is not unique, the result may vary, but it is guaranteed
|
|
that a node depending on another is not yielded before.
|
|
It assumes the graph has no cycle.
|
|
"""
|
|
depends_per_node = defaultdict(int)
|
|
for __, tail in self.edges():
|
|
depends_per_node[tail] += 1
|
|
|
|
# the queue contains only elements for which all dependencies
|
|
# are resolved
|
|
queue = deque(self.root_vertices())
|
|
while queue:
|
|
vertex = queue.popleft()
|
|
yield vertex
|
|
for node in self._graph[vertex]:
|
|
depends_per_node[node] -= 1
|
|
if not depends_per_node[node]:
|
|
queue.append(node)
|
|
|
|
def root_vertices(self):
|
|
"""Returns the root vertices
|
|
|
|
meaning they do not depend on any other job.
|
|
"""
|
|
dependency_vertices = set()
|
|
for dependencies in self._graph.values():
|
|
dependency_vertices.update(dependencies)
|
|
return set(self._graph.keys()) - dependency_vertices
|
|
|
|
def __repr__(self):
|
|
paths = [path for vertex in self.root_vertices() for path in self.paths(vertex)]
|
|
lines = []
|
|
for path in paths:
|
|
lines.append(" → ".join(repr(vertex) for vertex in path))
|
|
return "\n".join(lines)
|
|
|
|
|
|
class DelayableGraph(Graph):
|
|
"""Directed Graph for :class:`~Delayable` dependencies
|
|
|
|
It connects together the :class:`~Delayable`, :class:`~DelayableGroup` and
|
|
:class:`~DelayableChain` graphs, and creates then enqueued the jobs.
|
|
"""
|
|
|
|
def _merge_graph(self, graph):
|
|
"""Merge a graph in the current graph
|
|
|
|
It takes each vertex, which can be :class:`~Delayable`,
|
|
:class:`~DelayableChain` or :class:`~DelayableGroup`, and updates the
|
|
current graph with the edges between Delayable objects (connecting
|
|
heads and tails of the groups and chains), so that at the end, the
|
|
graph contains only Delayable objects and their links.
|
|
"""
|
|
for vertex, neighbours in graph._graph.items():
|
|
tails = vertex._tail()
|
|
for tail in tails:
|
|
# connect the tails with the heads of each node
|
|
heads = {head for n in neighbours for head in n._head()}
|
|
self._graph.setdefault(tail, set()).update(heads)
|
|
|
|
def _connect_graphs(self):
|
|
"""Visit the vertices' graphs and connect them, return the whole graph
|
|
|
|
Build a new graph, walk the vertices and their related vertices, merge
|
|
their graph in the new one, until we have visited all the vertices
|
|
"""
|
|
graph = DelayableGraph()
|
|
graph._merge_graph(self)
|
|
|
|
seen = set()
|
|
visit_stack = deque([self])
|
|
while visit_stack:
|
|
current = visit_stack.popleft()
|
|
if current in seen:
|
|
continue
|
|
|
|
vertices = current.vertices()
|
|
for vertex in vertices:
|
|
vertex_graph = vertex._graph
|
|
graph._merge_graph(vertex_graph)
|
|
visit_stack.append(vertex_graph)
|
|
|
|
seen.add(current)
|
|
|
|
return graph
|
|
|
|
def _has_to_execute_directly(self, vertices):
|
|
"""Used for tests to run tests directly instead of storing them
|
|
|
|
In tests, prefer to use
|
|
:func:`odoo.addons.queue_job.tests.common.trap_jobs`.
|
|
"""
|
|
envs = {vertex.recordset.env for vertex in vertices}
|
|
for env in envs:
|
|
if must_run_without_delay(env):
|
|
return True
|
|
return False
|
|
|
|
@staticmethod
|
|
def _ensure_same_graph_uuid(jobs):
|
|
"""Set the same graph uuid on all jobs of the same graph"""
|
|
jobs_count = len(jobs)
|
|
if jobs_count == 0:
|
|
raise ValueError("Expecting jobs")
|
|
elif jobs_count == 1:
|
|
if jobs[0].graph_uuid:
|
|
raise ValueError(
|
|
"Job %s is a single job, it should not"
|
|
" have a graph uuid" % (jobs[0],)
|
|
)
|
|
else:
|
|
graph_uuids = {job.graph_uuid for job in jobs if job.graph_uuid}
|
|
if len(graph_uuids) > 1:
|
|
raise ValueError("Jobs cannot have dependencies between several graphs")
|
|
elif len(graph_uuids) == 1:
|
|
graph_uuid = graph_uuids.pop()
|
|
else:
|
|
graph_uuid = str(uuid.uuid4())
|
|
for job in jobs:
|
|
job.graph_uuid = graph_uuid
|
|
|
|
def delay(self):
|
|
"""Build the whole graph, creates jobs and delay them"""
|
|
graph = self._connect_graphs()
|
|
|
|
vertices = graph.vertices()
|
|
|
|
for vertex in vertices:
|
|
vertex._build_job()
|
|
|
|
self._ensure_same_graph_uuid([vertex._generated_job for vertex in vertices])
|
|
|
|
if self._has_to_execute_directly(vertices):
|
|
self._execute_graph_direct(graph)
|
|
return
|
|
|
|
for vertex, neighbour in graph.edges():
|
|
neighbour._generated_job.add_depends({vertex._generated_job})
|
|
|
|
# If all the jobs of the graph have another job with the same identity,
|
|
# we do not create them. Maybe we should check that the found jobs are
|
|
# part of the same graph, but not sure it's really required...
|
|
# Also, maybe we want to check only the root jobs.
|
|
existing_mapping = {}
|
|
for vertex in vertices:
|
|
if not vertex.identity_key:
|
|
continue
|
|
generated_job = vertex._generated_job
|
|
existing = generated_job.job_record_with_same_identity_key()
|
|
if not existing:
|
|
# at least one does not exist yet, we'll delay the whole graph
|
|
existing_mapping.clear()
|
|
break
|
|
existing_mapping[vertex] = existing
|
|
|
|
# We'll replace the generated jobs by the existing ones, so callers
|
|
# can retrieve the existing job in "_generated_job".
|
|
# existing_mapping contains something only if *all* the job with an
|
|
# identity have an existing one.
|
|
for vertex, existing in existing_mapping.items():
|
|
vertex._generated_job = existing
|
|
return
|
|
|
|
for vertex in vertices:
|
|
vertex._generated_job.store()
|
|
|
|
def _execute_graph_direct(self, graph):
|
|
for delayable in graph.topological_sort():
|
|
delayable._execute_direct()
|
|
|
|
|
|
class DelayableChain:
|
|
"""Chain of delayables to form a graph
|
|
|
|
Delayables can be other :class:`~Delayable`, :class:`~DelayableChain` or
|
|
:class:`~DelayableGroup` objects.
|
|
|
|
A chain means that jobs must be executed sequentially.
|
|
A job or a group of jobs depending on a group can be executed only after
|
|
the last job of the chain is done.
|
|
|
|
Chains can be connected to other Delayable, DelayableChain or
|
|
DelayableGroup objects by using :meth:`~done`.
|
|
|
|
A Chain is enqueued by calling :meth:`~delay`, which delays the whole
|
|
graph.
|
|
Important: :meth:`~delay` must be called on the top-level
|
|
delayable/chain/group object of the graph.
|
|
"""
|
|
|
|
__slots__ = ("_graph", "__head", "__tail")
|
|
|
|
def __init__(self, *delayables):
|
|
self._graph = DelayableGraph()
|
|
iter_delayables = iter(delayables)
|
|
head = next(iter_delayables)
|
|
self.__head = head
|
|
self._graph.add_vertex(head)
|
|
for neighbour in iter_delayables:
|
|
self._graph.add_edge(head, neighbour)
|
|
head = neighbour
|
|
self.__tail = head
|
|
|
|
def _head(self):
|
|
return self.__head._tail()
|
|
|
|
def _tail(self):
|
|
return self.__tail._head()
|
|
|
|
def __repr__(self):
|
|
inner_graph = "\n\t".join(repr(self._graph).split("\n"))
|
|
return "DelayableChain(\n\t{}\n)".format(inner_graph)
|
|
|
|
def on_done(self, *delayables):
|
|
"""Connects the current chain to other delayables/chains/groups
|
|
|
|
The delayables/chains/groups passed in the parameters will be executed
|
|
when the current Chain is done.
|
|
"""
|
|
for delayable in delayables:
|
|
self._graph.add_edge(self.__tail, delayable)
|
|
return self
|
|
|
|
def delay(self):
|
|
"""Delay the whole graph"""
|
|
self._graph.delay()
|
|
|
|
|
|
class DelayableGroup:
|
|
"""Group of delayables to form a graph
|
|
|
|
Delayables can be other :class:`~Delayable`, :class:`~DelayableChain` or
|
|
:class:`~DelayableGroup` objects.
|
|
|
|
A group means that jobs must be executed sequentially.
|
|
A job or a group of jobs depending on a group can be executed only after
|
|
the all the jobs of the group are done.
|
|
|
|
Groups can be connected to other Delayable, DelayableChain or
|
|
DelayableGroup objects by using :meth:`~done`.
|
|
|
|
A group is enqueued by calling :meth:`~delay`, which delays the whole
|
|
graph.
|
|
Important: :meth:`~delay` must be called on the top-level
|
|
delayable/chain/group object of the graph.
|
|
"""
|
|
|
|
__slots__ = ("_graph", "_delayables")
|
|
|
|
def __init__(self, *delayables):
|
|
self._graph = DelayableGraph()
|
|
self._delayables = set(delayables)
|
|
for delayable in delayables:
|
|
self._graph.add_vertex(delayable)
|
|
|
|
def _head(self):
|
|
return itertools.chain.from_iterable(node._head() for node in self._delayables)
|
|
|
|
def _tail(self):
|
|
return itertools.chain.from_iterable(node._tail() for node in self._delayables)
|
|
|
|
def __repr__(self):
|
|
inner_graph = "\n\t".join(repr(self._graph).split("\n"))
|
|
return "DelayableGroup(\n\t{}\n)".format(inner_graph)
|
|
|
|
def on_done(self, *delayables):
|
|
"""Connects the current group to other delayables/chains/groups
|
|
|
|
The delayables/chains/groups passed in the parameters will be executed
|
|
when the current Group is done.
|
|
"""
|
|
for parent in self._delayables:
|
|
for child in delayables:
|
|
self._graph.add_edge(parent, child)
|
|
return self
|
|
|
|
def delay(self):
|
|
"""Delay the whole graph"""
|
|
self._graph.delay()
|
|
|
|
|
|
class Delayable:
|
|
"""Unit of a graph, one Delayable will lead to an enqueued job
|
|
|
|
Delayables can have dependencies on each others, as well as dependencies on
|
|
:class:`~DelayableGroup` or :class:`~DelayableChain` objects.
|
|
|
|
This class will generally not be used directly, it is used internally
|
|
by :meth:`~odoo.addons.queue_job.models.base.Base.delayable`. Look
|
|
in the base model for more details.
|
|
|
|
Delayables can be connected to other Delayable, DelayableChain or
|
|
DelayableGroup objects by using :meth:`~done`.
|
|
|
|
Properties of the future job can be set using the :meth:`~set` method,
|
|
which always return ``self``::
|
|
|
|
delayable.set(priority=15).set({"max_retries": 5, "eta": 15}).delay()
|
|
|
|
It can be used for example to set properties dynamically.
|
|
|
|
A Delayable is enqueued by calling :meth:`delay()`, which delays the whole
|
|
graph.
|
|
Important: :meth:`delay()` must be called on the top-level
|
|
delayable/chain/group object of the graph.
|
|
"""
|
|
|
|
_properties = (
|
|
"priority",
|
|
"eta",
|
|
"max_retries",
|
|
"description",
|
|
"channel",
|
|
"identity_key",
|
|
)
|
|
__slots__ = _properties + (
|
|
"recordset",
|
|
"_graph",
|
|
"_job_method",
|
|
"_job_args",
|
|
"_job_kwargs",
|
|
"_generated_job",
|
|
)
|
|
|
|
def __init__(
|
|
self,
|
|
recordset,
|
|
priority=None,
|
|
eta=None,
|
|
max_retries=None,
|
|
description=None,
|
|
channel=None,
|
|
identity_key=None,
|
|
):
|
|
self._graph = DelayableGraph()
|
|
self._graph.add_vertex(self)
|
|
|
|
self.recordset = recordset
|
|
|
|
self.priority = priority
|
|
self.eta = eta
|
|
self.max_retries = max_retries
|
|
self.description = description
|
|
self.channel = channel
|
|
self.identity_key = identity_key
|
|
|
|
self._job_method = None
|
|
self._job_args = ()
|
|
self._job_kwargs = {}
|
|
|
|
self._generated_job = None
|
|
|
|
def _head(self):
|
|
return [self]
|
|
|
|
def _tail(self):
|
|
return [self]
|
|
|
|
def __repr__(self):
|
|
return "Delayable({}.{}({}, {}))".format(
|
|
self.recordset,
|
|
self._job_method.__name__ if self._job_method else "",
|
|
self._job_args,
|
|
self._job_kwargs,
|
|
)
|
|
|
|
def __del__(self):
|
|
if not self._generated_job:
|
|
_logger.warning("Delayable %s was prepared but never delayed", self)
|
|
|
|
def _set_from_dict(self, properties):
|
|
for key, value in properties.items():
|
|
if key not in self._properties:
|
|
raise ValueError("No property %s" % (key,))
|
|
setattr(self, key, value)
|
|
|
|
def set(self, *args, **kwargs):
|
|
"""Set job properties and return self
|
|
|
|
The values can be either a dictionary and/or keywork args
|
|
"""
|
|
if args:
|
|
# args must be a dict
|
|
self._set_from_dict(*args)
|
|
self._set_from_dict(kwargs)
|
|
return self
|
|
|
|
def on_done(self, *delayables):
|
|
"""Connects the current Delayable to other delayables/chains/groups
|
|
|
|
The delayables/chains/groups passed in the parameters will be executed
|
|
when the current Delayable is done.
|
|
"""
|
|
for child in delayables:
|
|
self._graph.add_edge(self, child)
|
|
return self
|
|
|
|
def delay(self):
|
|
"""Delay the whole graph"""
|
|
self._graph.delay()
|
|
|
|
def _build_job(self):
|
|
if self._generated_job:
|
|
return self._generated_job
|
|
self._generated_job = Job(
|
|
self._job_method,
|
|
args=self._job_args,
|
|
kwargs=self._job_kwargs,
|
|
priority=self.priority,
|
|
max_retries=self.max_retries,
|
|
eta=self.eta,
|
|
description=self.description,
|
|
channel=self.channel,
|
|
identity_key=self.identity_key,
|
|
)
|
|
return self._generated_job
|
|
|
|
def _store_args(self, *args, **kwargs):
|
|
self._job_args = args
|
|
self._job_kwargs = kwargs
|
|
return self
|
|
|
|
def __getattr__(self, name):
|
|
if name in self.__slots__:
|
|
return super().__getattr__(name)
|
|
if name in self.recordset:
|
|
raise AttributeError(
|
|
"only methods can be delayed (%s called on %s)" % (name, self.recordset)
|
|
)
|
|
recordset_method = getattr(self.recordset, name)
|
|
self._job_method = recordset_method
|
|
return self._store_args
|
|
|
|
def _execute_direct(self):
|
|
assert self._generated_job
|
|
self._generated_job.perform()
|
|
|
|
|
|
class DelayableRecordset(object):
|
|
"""Allow to delay a method for a recordset (shortcut way)
|
|
|
|
Usage::
|
|
|
|
delayable = DelayableRecordset(recordset, priority=20)
|
|
delayable.method(args, kwargs)
|
|
|
|
The method call will be processed asynchronously in the job queue, with
|
|
the passed arguments.
|
|
|
|
This class will generally not be used directly, it is used internally
|
|
by :meth:`~odoo.addons.queue_job.models.base.Base.with_delay`
|
|
"""
|
|
|
|
__slots__ = ("delayable",)
|
|
|
|
def __init__(
|
|
self,
|
|
recordset,
|
|
priority=None,
|
|
eta=None,
|
|
max_retries=None,
|
|
description=None,
|
|
channel=None,
|
|
identity_key=None,
|
|
):
|
|
self.delayable = Delayable(
|
|
recordset,
|
|
priority=priority,
|
|
eta=eta,
|
|
max_retries=max_retries,
|
|
description=description,
|
|
channel=channel,
|
|
identity_key=identity_key,
|
|
)
|
|
|
|
@property
|
|
def recordset(self):
|
|
return self.delayable.recordset
|
|
|
|
def __getattr__(self, name):
|
|
def _delay_delayable(*args, **kwargs):
|
|
getattr(self.delayable, name)(*args, **kwargs).delay()
|
|
return self.delayable._generated_job
|
|
|
|
return _delay_delayable
|
|
|
|
def __str__(self):
|
|
return "DelayableRecordset(%s%s)" % (
|
|
self.delayable.recordset._name,
|
|
getattr(self.delayable.recordset, "_ids", ""),
|
|
)
|
|
|
|
__repr__ = __str__
|