fsan

PypeRaptor

Posted at — Jan 11, 2020

PypeRaptor is a Python3 library that provides a quick way of building pipelines from multithreaded processing withsome concurrency control.

TLDR;

  1. Install pyperaptor
  2. Create Pipeline object
  3. Create Node(s) object(s) containing the function to be executed.
  4. Lock the Pipeline
  5. Execute the pipeline

Installation

pip3 install pyperaptor

Code

my_steps = [ ... ]
p = Pipeline(my_steps, parallel=True, workers=10)
p.lock()
results = p.process(my_input_generator)

Example

Introduction

A pipeline in PypeRaptor is a container that will carry inputs through the defined steps and return outputs.

Each pipeline is mainly construct by adding nodes. Each node is responsible for referencing an operation and is not aware of it’s surroudings. Nodes in PypeRaptor does not care what type of object it is carrying inside but it must implement a __call__() method.

For example, you can use simple functions, lambdas or callable objects.

from pyperaptor import Pipeline, Node

class SomeObject():
	def __call__(self, x):
		return x

def some_function(x):
	return x

some_object_instance = SomeObject()


my_pipeline = [
	Node(some_function),
	(lambda x: x),
	some_object_instance
]

p = Pipeline(my_pipeline)
p.lock()
p.push(1)
# 1

Nodes can be added to the pipeline by p.add method or through the += operator or in constructor.

def some_method(x):
	return x

# Using constructor
my_pipe = [
	some_method,
	(lambda x: x * 2)
]

p = Pipeline(my_pipe)

# Adding iteratively or outside constructor
p = Pipeline()

# Adding using default .add method
p.add(some_method)

# Using += operator
p += Node(some_method)

# Using a comprehension list
function_list = [
	some_method,
	some_other_method
]

[ p.add(func) for func in function_list ]

#Using map
list(map(p.add, function_list))

The method p.push() will execute the whole pipe for one single item. Note that the pipeline must be locked with p.lock() before execution. For executing several item at once the pipeline provides the p.process method.

When using p.process, data can be provided from outside or inside the pipeline.

p.process([1, 2, 3]) # Using a list
p.process(range(10)) # Using a generator

It is important to notice that as PypeRaptor multithreaded features appear, it becomes more interesting to add the generator to the pipeline itself.

def some_slow_generator():
	# doing something slow
	return result

p = Pipeline()
p.add(some_slow_generator)
p.add(some_function)
p.lock()
results = p.process()

Going multithread

It is so hard going multithread that it will take you about 5 seconds

# a foo pipeline
my_pipeline = [
	(lambda x: x),
	(lambda y: y + 1)
]

# Creating and executing a single thread pipeline
p = Pipeline(my_pipeline)
p.lock()
p.process(range(10))

# Creating a multithread multithread
p = Pipeline(my_pipeline, parallel=True, workers=10)
p.lock()
p.process(range(10))

I guess it was easy. Right?

Dealing with racing conditions

Race conditions may arise if some resource is used by more than one thread or process. For example:

from pyperaptor import Pipeline, Device, Node

def sum1(x):
    return x + 1

def printer(x):
    print(x, end=" ")
    return x

def a_generator():
    for x in range(10):
        yield x

p = Pipeline(parallel=True, workers=10)
p += Node(sum1) + \
     printer

p.lock()
p.process(a_generator())
# 1 2 3 465 8  7 9 10

This example fails because the output device, in this case the stdout is being used bymany threads at same time. To solve this problem let’s guarantee that this will not happen

from pyperaptor import Pipeline, Device, Node

def sum1(x):
    return x + 1

def printer(x):
    print(x, end=" ")
    return x

def a_generator():
    for x in range(10):
        yield x
        
TERM = Device("term1")
        
p = Pipeline(parallel=True, workers=10)
p += Node(sum1) + \
     Node(printer, dev=TERM)

p.lock()

p.process(a_generator())
# 1 2 3 4 5 10 7 8 9 6

A Device may have multiple resources available. For example a buffer with multiple writers, or a pool of resources or a hardware resource. If a node needs to acquire one resource, you can use a Device specifying how many are available. The default is 1.

ANY_RESOURCE = Device("Some Name", 5)

p = Pipeline()
p += Node(a_function) + \
     Node(other_function, dev=ANY_RESOURCE)

Integrating multiple Pipelines

It is possible to use PypeRaptor Pipelines inside PypeRaptor Pipelines. This may be useful for organizing and scaling processing where it matters.

Check the following ludic siutation: You may take awhile to generate enough information to send to some other job. But once you start you want it all done as fast as possible.

from pyperaptor import Device, Node, Pipeline
import time, copy, threading

def heavy_work(x):
    # doing something heavy
    time.sleep(1)
    print("doing heavy work at ", threading.current_thread().name)
    return True

def wrapper(x = None):
    if x:
        return heavy_work(x)

def slow_number_yielder():
    for a in range(5):
        time.sleep(0.2)
        yield a
        
class Buffer():
    def __init__(self, limit_break = 5):
        self.__lim__ = limit_break
        self.elems = []
        
    def __call__(self, x):
        print("calling buffer")
        self.elems.append(x)
        if len(self.elems) == self.__lim__ :
            self.elems.append(x)
            c = copy.deepcopy(self.elems)
            self.elems.clear()
            return c

BUFFER = Device("buffer")
        
p = Pipeline([wrapper], parallel=True, workers=5)

fs = [
    slow_number_yielder,
    Node(Buffer(), dev=BUFFER),
    p
]

q = Pipeline(fs)

p.lock()
q.lock()

q.process()

This will yield the following output:

calling buffer
calling buffer
calling buffer
calling buffer
calling buffer
doing heavy work at  ThreadPoolExecutor-4_0
doing heavy work at  ThreadPoolExecutor-4_2
doing heavy work at  ThreadPoolExecutor-4_1
doing heavy work at  ThreadPoolExecutor-4_3
doing heavy work at  ThreadPoolExecutor-4_4

The highlighted area is done is parallel as indicated by the names of ThreadPoolExecutors.

Joining multithread pipelines

If you wish to scale threads in a tree-fashion way you may join different pipelines with different threads. In the following example the first pipeline q will generate a hundred of elements to be processed by the second pipeline p. This will cause 10 threads to be launched in your system. (number of parent threads) ** (number of child threads) + (one management thread per pipeline) (2**3) + 2

def sum_set(x):
    return sum(x)

def minus1(x):
    return x - 1

def identity(x):
    return x

def get_hundred_of(x):
    return [x] * 100

p = Pipeline([minus1], parallel=True, workers=3)
p.lock()

q = Pipeline([get_hundred_of, p, sum_set], parallel=True, workers=2)
q.lock()
sum(q.process([1]*100))

Change Log

PypeRaptor 0.1.3

PypeRaptor 0.1.2

PypeRaptor 0.1.1

PypeRaptor 0.1.0

comments powered by Disqus