Coroflow: Easy and fast Pipelines

Getting Started

Coroflow makes it easy to run pipelines with coroutines and also support mixing in blocking functions and generators:

from coroflow import Node, Pipeline
import asyncio
import time


class GenNode(Node):
    async def execute():
        """
        The execute method of the first/root Node has to be a generator,
        either async or synchronous.
        """
        for url in ['img_url_1', 'img_url_2', 'img_url_3']:
            print(f"Yielding {url}")
            await asyncio.sleep(1)
            yield url
        print("Generator is exhausted")
        return


class DoSomething(Node):
    async def execute(inpt, param=None):
        """
        The execute method of all non-root Nodes should be a async
        or synchronous method.
        """
        # do your async pipelined work
        await asyncio.sleep(1)  # simulated IO delay
        outp = inpt
        print(f"func1: T1 sending {inpt}")
        return outp


p = Pipeline()
t0 = GenNode('gen', p)
t1 = DoSomething('func1', p, kwargs={'param': 'param_t1'})
t2 = DoSomething('func2', p, kwargs={'param': 'param_t2'})
t0.set_downstream(t1)
t1.set_downstream(t2)


start_time = time.time()
p.run()
print(f"Asynchronous duration: {time.time() - start_time}s.")

More Contents

Indices and tables