PypeRaptor is a Python3 library that provides a quick way of building pipelines from multithreaded processing withsome concurrency control.
Installation
pip3 install pyperaptor
Code
my_steps = [ ... ]
p = Pipeline(my_steps, parallel=True, workers=10)
p.lock()
results = p.process(my_input_generator)
A pipeline in PypeRaptor is a container that will carry inputs through the defined steps and return outputs.
Each pipeline is mainly construct by adding nodes. Each node is responsible for referencing an operation and is not aware of it’s surroudings. Nodes in PypeRaptor does not care what type of object it is carrying inside but it must implement a __call__()
method.
For example, you can use simple functions, lambdas or callable objects.
from pyperaptor import Pipeline, Node
class SomeObject():
def __call__(self, x):
return x
def some_function(x):
return x
some_object_instance = SomeObject()
my_pipeline = [
Node(some_function),
(lambda x: x),
some_object_instance
]
p = Pipeline(my_pipeline)
p.lock()
p.push(1)
# 1
Nodes can be added to the pipeline by p.add
method or through the +=
operator or in constructor.
def some_method(x):
return x
# Using constructor
my_pipe = [
some_method,
(lambda x: x * 2)
]
p = Pipeline(my_pipe)
# Adding iteratively or outside constructor
p = Pipeline()
# Adding using default .add method
p.add(some_method)
# Using += operator
p += Node(some_method)
# Using a comprehension list
function_list = [
some_method,
some_other_method
]
[ p.add(func) for func in function_list ]
#Using map
list(map(p.add, function_list))
The method p.push()
will execute the whole pipe for one single item. Note that the pipeline must be locked with p.lock()
before execution.
For executing several item at once the pipeline provides the p.process
method.
When using p.process
, data can be provided from outside or inside the pipeline.
p.process([1, 2, 3]) # Using a list
p.process(range(10)) # Using a generator
It is important to notice that as PypeRaptor multithreaded features appear, it becomes more interesting to add the generator to the pipeline itself.
def some_slow_generator():
# doing something slow
return result
p = Pipeline()
p.add(some_slow_generator)
p.add(some_function)
p.lock()
results = p.process()
It is so hard going multithread that it will take you about 5 seconds
# a foo pipeline
my_pipeline = [
(lambda x: x),
(lambda y: y + 1)
]
# Creating and executing a single thread pipeline
p = Pipeline(my_pipeline)
p.lock()
p.process(range(10))
# Creating a multithread multithread
p = Pipeline(my_pipeline, parallel=True, workers=10)
p.lock()
p.process(range(10))
I guess it was easy. Right?
Race conditions may arise if some resource is used by more than one thread or process. For example:
from pyperaptor import Pipeline, Device, Node
def sum1(x):
return x + 1
def printer(x):
print(x, end=" ")
return x
def a_generator():
for x in range(10):
yield x
p = Pipeline(parallel=True, workers=10)
p += Node(sum1) + \
printer
p.lock()
p.process(a_generator())
# 1 2 3 465 8 7 9 10
This example fails because the output device, in this case the stdout
is being used bymany threads at same time.
To solve this problem let’s guarantee that this will not happen
from pyperaptor import Pipeline, Device, Node
def sum1(x):
return x + 1
def printer(x):
print(x, end=" ")
return x
def a_generator():
for x in range(10):
yield x
TERM = Device("term1")
p = Pipeline(parallel=True, workers=10)
p += Node(sum1) + \
Node(printer, dev=TERM)
p.lock()
p.process(a_generator())
# 1 2 3 4 5 10 7 8 9 6
A Device may have multiple resources available. For example a buffer with multiple writers, or a pool of resources or a hardware resource. If a node needs to acquire one resource, you can use a Device specifying how many are available. The default is 1.
ANY_RESOURCE = Device("Some Name", 5)
p = Pipeline()
p += Node(a_function) + \
Node(other_function, dev=ANY_RESOURCE)
It is possible to use PypeRaptor Pipelines inside PypeRaptor Pipelines. This may be useful for organizing and scaling processing where it matters.
Check the following ludic siutation: You may take awhile to generate enough information to send to some other job. But once you start you want it all done as fast as possible.
from pyperaptor import Device, Node, Pipeline
import time, copy, threading
def heavy_work(x):
# doing something heavy
time.sleep(1)
print("doing heavy work at ", threading.current_thread().name)
return True
def wrapper(x = None):
if x:
return heavy_work(x)
def slow_number_yielder():
for a in range(5):
time.sleep(0.2)
yield a
class Buffer():
def __init__(self, limit_break = 5):
self.__lim__ = limit_break
self.elems = []
def __call__(self, x):
print("calling buffer")
self.elems.append(x)
if len(self.elems) == self.__lim__ :
self.elems.append(x)
c = copy.deepcopy(self.elems)
self.elems.clear()
return c
BUFFER = Device("buffer")
p = Pipeline([wrapper], parallel=True, workers=5)
fs = [
slow_number_yielder,
Node(Buffer(), dev=BUFFER),
p
]
q = Pipeline(fs)
p.lock()
q.lock()
q.process()
This will yield the following output:
calling buffer
calling buffer
calling buffer
calling buffer
calling buffer
doing heavy work at ThreadPoolExecutor-4_0
doing heavy work at ThreadPoolExecutor-4_2
doing heavy work at ThreadPoolExecutor-4_1
doing heavy work at ThreadPoolExecutor-4_3
doing heavy work at ThreadPoolExecutor-4_4
The highlighted area is done is parallel as indicated by the names of ThreadPoolExecutors.
If you wish to scale threads in a tree-fashion way you may join different pipelines with different threads.
In the following example the first pipeline q
will generate a hundred of elements to be processed by the
second pipeline p
.
This will cause 10 threads to be launched in your system. (number of parent threads) ** (number of child threads) + (one management thread per pipeline)
(2**3) + 2
def sum_set(x):
return sum(x)
def minus1(x):
return x - 1
def identity(x):
return x
def get_hundred_of(x):
return [x] * 100
p = Pipeline([minus1], parallel=True, workers=3)
p.lock()
q = Pipeline([get_hundred_of, p, sum_set], parallel=True, workers=2)
q.lock()
sum(q.process([1]*100))