Source code for council.agents.agent_chain

from typing import Any, Optional

from council.chains import ChainBase
from council.contexts import AgentContext, ChainContext, ChatMessage, Monitored
from council.runners import RunnerExecutor
from .agent import Agent

[docs] class AgentChain(ChainBase): """ A chain that wraps an Agent, so that it can be invoked from another agent. """ _agent: Monitored[Agent]
[docs] def __init__(self, name: str, description: str, agent: Agent): """ Initialize a new instance. Args: name (str): The name of the chain. description (str): The description of the chain. agent (Agent): The agent to be wrapped """ super().__init__(name, description) self._agent = self.new_monitor("agent", agent)
@property def agent(self) -> Agent: return self._agent.inner def _execute( self, context: ChainContext, _executor: Optional[RunnerExecutor] = None, ) -> Any: result = self.agent.execute(AgentContext.from_chat_history(context.chat_history)) maybe_message = result.try_best_message if maybe_message.is_some(): message = maybe_message.unwrap() context.append(ChatMessage.skill(message.message,, message.source, message.is_error))