ParallelFor#
- class council.runners.ParallelFor(generator: Callable[[ChainContext], Iterable[Any]], skill: SkillRunnerBase, parallelism: int = 5)[source]#
Bases:
LoopRunnerBase
Invoke a given skill for each value returned by a given generator function. Can run multiple iteration in parallel. For each invocation, the current iteration current is provided through the skill context
SkillContext.iteration()
.IterationContext.value()
provides the value as returned by the generator functionIterationContext.index()
provides the index of the iterationNotes
Skill iteration are scheduled in the order given by the generator function. However, because multiple iterations can execute in parallel, no assumptions should be made on the order of results.
- __init__(generator: Callable[[ChainContext], Iterable[Any]], skill: SkillRunnerBase, parallelism: int = 5) None [source]#
Initialize a new instance
- Parameters:
generator (RunnerGenerator) – a generator function that yields results
- render_as_dict(include_children: bool = True) Dict[str, Any] #
returns the graph of operation as a dictionary
- render_as_json() str #
returns the graph of operation as a JSON string
Example 1#
The example below demonstrate how to use the parallel for in a chain.
from council.chains import Chain
from council.contexts import ChainContext
from council.runners import ParallelFor
from council.mocks import MockSkill
def generator(context: ChainContext):
for i in range(0, 5):
yield "hi"
chain = Chain(name="name", description="parallel for", runners=[ParallelFor(generator, MockSkill())])
Example 2#
This example builds on the previous one and shows how to consume the iteration into a skill.
from council.chains import Chain
from council.contexts import ChatMessage, ChainContext, SkillContext
from council.runners import ParallelFor
from council.skills import SkillBase
def generator(context: ChainContext):
for i in range(0, 5):
yield f"hi {i}"
class MySkill(SkillBase):
def __init__(self):
super().__init__("mySkill")
def execute(self, context: SkillContext) -> ChatMessage:
it = context.iteration.unwrap()
return self.build_success_message(message=f"index {it.index}, {it.value}")
chain = Chain(name="name", description="parallel for", runners=[ParallelFor(generator, MySkill(), parallelism=5)])
context = ChainContext.empty()
chain.execute(context)
for message in context.messages:
print(message.message)
The output would looks like.
index 0, hi 0
index 1, hi 1
index 2, hi 2
index 3, hi 3
index 4, hi 4