PypeRaptor is a Python3 library that provides a quick way of building pipelines from multithreaded processing withsome concurrency control.
pip3 install pyperaptor
my_steps = [ ... ] p = Pipeline(my_steps, parallel=True, workers=10) p.lock() results = p.process(my_input_generator)
A pipeline in PypeRaptor is a container that will carry inputs through the defined steps and return outputs.
Each pipeline is mainly construct by adding nodes. Each node is responsible for referencing an operation and is not aware of it’s surroudings. Nodes in PypeRaptor does not care what type of object it is carrying inside but it must implement a
For example, you can use simple functions, lambdas or callable objects.
from pyperaptor import Pipeline, Node class SomeObject(): def __call__(self, x): return x def some_function(x): return x some_object_instance = SomeObject() my_pipeline = [ Node(some_function), (lambda x: x), some_object_instance ] p = Pipeline(my_pipeline) p.lock() p.push(1) # 1
Nodes can be added to the pipeline by
p.add method or through the
+= operator or in constructor.
def some_method(x): return x # Using constructor my_pipe = [ some_method, (lambda x: x * 2) ] p = Pipeline(my_pipe) # Adding iteratively or outside constructor p = Pipeline() # Adding using default .add method p.add(some_method) # Using += operator p += Node(some_method) # Using a comprehension list function_list = [ some_method, some_other_method ] [ p.add(func) for func in function_list ] #Using map list(map(p.add, function_list))
p.push() will execute the whole pipe for one single item. Note that the pipeline must be locked with
p.lock() before execution.
For executing several item at once the pipeline provides the
p.process, data can be provided from outside or inside the pipeline.
p.process([1, 2, 3]) # Using a list p.process(range(10)) # Using a generator
It is important to notice that as PypeRaptor multithreaded features appear, it becomes more interesting to add the generator to the pipeline itself.
def some_slow_generator(): # doing something slow return result p = Pipeline() p.add(some_slow_generator) p.add(some_function) p.lock() results = p.process()
It is so hard going multithread that it will take you about 5 seconds
# a foo pipeline my_pipeline = [ (lambda x: x), (lambda y: y + 1) ] # Creating and executing a single thread pipeline p = Pipeline(my_pipeline) p.lock() p.process(range(10)) # Creating a multithread multithread p = Pipeline(my_pipeline, parallel=True, workers=10) p.lock() p.process(range(10))
I guess it was easy. Right?
Race conditions may arise if some resource is used by more than one thread or process. For example:
from pyperaptor import Pipeline, Device, Node def sum1(x): return x + 1 def printer(x): print(x, end=" ") return x def a_generator(): for x in range(10): yield x p = Pipeline(parallel=True, workers=10) p += Node(sum1) + \ printer p.lock() p.process(a_generator()) # 1 2 3 465 8 7 9 10
This example fails because the output device, in this case the
stdout is being used bymany threads at same time.
To solve this problem let’s guarantee that this will not happen
from pyperaptor import Pipeline, Device, Node def sum1(x): return x + 1 def printer(x): print(x, end=" ") return x def a_generator(): for x in range(10): yield x TERM = Device("term1") p = Pipeline(parallel=True, workers=10) p += Node(sum1) + \ Node(printer, dev=TERM) p.lock() p.process(a_generator()) # 1 2 3 4 5 10 7 8 9 6
A Device may have multiple resources available. For example a buffer with multiple writers, or a pool of resources or a hardware resource. If a node needs to acquire one resource, you can use a Device specifying how many are available. The default is 1.
ANY_RESOURCE = Device("Some Name", 5) p = Pipeline() p += Node(a_function) + \ Node(other_function, dev=ANY_RESOURCE)
It is possible to use PypeRaptor Pipelines inside PypeRaptor Pipelines. This may be useful for organizing and scaling processing where it matters.
Check the following ludic siutation: You may take awhile to generate enough information to send to some other job. But once you start you want it all done as fast as possible.
from pyperaptor import Device, Node, Pipeline import time, copy, threading def heavy_work(x): # doing something heavy time.sleep(1) print("doing heavy work at ", threading.current_thread().name) return True def wrapper(x = None): if x: return heavy_work(x) def slow_number_yielder(): for a in range(5): time.sleep(0.2) yield a class Buffer(): def __init__(self, limit_break = 5): self.__lim__ = limit_break self.elems =  def __call__(self, x): print("calling buffer") self.elems.append(x) if len(self.elems) == self.__lim__ : self.elems.append(x) c = copy.deepcopy(self.elems) self.elems.clear() return c BUFFER = Device("buffer") p = Pipeline([wrapper], parallel=True, workers=5) fs = [ slow_number_yielder, Node(Buffer(), dev=BUFFER), p ] q = Pipeline(fs) p.lock() q.lock() q.process()
This will yield the following output:
calling buffer calling buffer calling buffer calling buffer calling buffer doing heavy work at ThreadPoolExecutor-4_0 doing heavy work at ThreadPoolExecutor-4_2 doing heavy work at ThreadPoolExecutor-4_1 doing heavy work at ThreadPoolExecutor-4_3 doing heavy work at ThreadPoolExecutor-4_4
The highlighted area is done is parallel as indicated by the names of ThreadPoolExecutors.
If you wish to scale threads in a tree-fashion way you may join different pipelines with different threads.
In the following example the first pipeline
q will generate a hundred of elements to be processed by the
This will cause 10 threads to be launched in your system.
(number of parent threads) ** (number of child threads) + (one management thread per pipeline)
(2**3) + 2
def sum_set(x): return sum(x) def minus1(x): return x - 1 def identity(x): return x def get_hundred_of(x): return [x] * 100 p = Pipeline([minus1], parallel=True, workers=3) p.lock() q = Pipeline([get_hundred_of, p, sum_set], parallel=True, workers=2) q.lock() sum(q.process(*100))