ParallelFor#

class council.runners.ParallelFor(generator: Callable[[ChainContext], Iterable[Any]], skill: SkillRunnerBase, parallelism: int = 5)[source]#

Bases: LoopRunnerBase

Invoke a given skill for each value returned by a given generator function. Can run multiple iteration in parallel. For each invocation, the current iteration current is provided through the skill context SkillContext.iteration().

IterationContext.value() provides the value as returned by the generator function

IterationContext.index() provides the index of the iteration

Notes

Skill iteration are scheduled in the order given by the generator function. However, because multiple iterations can execute in parallel, no assumptions should be made on the order of results.

__init__(generator: Callable[[ChainContext], Iterable[Any]], skill: SkillRunnerBase, parallelism: int = 5) None[source]#

Initialize a new instance

Parameters:

generator (RunnerGenerator) – a generator function that yields results

render_as_dict(include_children: bool = True) Dict[str, Any]#

returns the graph of operation as a dictionary

render_as_json() str#

returns the graph of operation as a JSON string

Example 1#

The example below demonstrate how to use the parallel for in a chain.

from council.chains import Chain
from council.contexts import ChainContext
from council.runners import ParallelFor
from council.mocks import MockSkill

def generator(context: ChainContext):
    for i in range(0, 5):
        yield "hi"

chain = Chain(name="name", description="parallel for", runners=[ParallelFor(generator, MockSkill())])

Example 2#

This example builds on the previous one and shows how to consume the iteration into a skill.

from council.chains import Chain
from council.contexts import ChatMessage, ChainContext, SkillContext
from council.runners import ParallelFor
from council.skills import SkillBase

def generator(context: ChainContext):
    for i in range(0, 5):
        yield f"hi {i}"

class MySkill(SkillBase):
    def __init__(self):
        super().__init__("mySkill")

    def execute(self, context: SkillContext) -> ChatMessage:
        it = context.iteration.unwrap()
        return self.build_success_message(message=f"index {it.index}, {it.value}")


chain = Chain(name="name", description="parallel for", runners=[ParallelFor(generator, MySkill(), parallelism=5)])
context = ChainContext.empty()
chain.execute(context)
for message in context.messages:
    print(message.message)

The output would looks like.

index 0, hi 0
index 1, hi 1
index 2, hi 2
index 3, hi 3
index 4, hi 4