diff --git a/pyagentspec/src/pyagentspec/adapters/crewai/_agentspecconverter.py b/pyagentspec/src/pyagentspec/adapters/crewai/_agentspecconverter.py index 31b3d6f7..9a3f4265 100644 --- a/pyagentspec/src/pyagentspec/adapters/crewai/_agentspecconverter.py +++ b/pyagentspec/src/pyagentspec/adapters/crewai/_agentspecconverter.py @@ -5,6 +5,7 @@ # (UPL) 1.0 (LICENSE-UPL or https://oss.oracle.com/licenses/upl), at your option. +import inspect import uuid from typing import ( Any, @@ -12,6 +13,7 @@ Dict, List, Optional, + Tuple, Type, Union, cast, @@ -25,12 +27,20 @@ from pyagentspec.adapters.crewai._types import ( CrewAIAgent, CrewAIBaseTool, + CrewAIFlow, CrewAILlm, CrewAIStructuredTool, CrewAITool, + FlowState, ) from pyagentspec.agent import Agent as AgentSpecAgent from pyagentspec.component import Component as AgentSpecComponent +from pyagentspec.flows.edges import ControlFlowEdge, DataFlowEdge +from pyagentspec.flows.flow import Flow as AgentSpecFlow +from pyagentspec.flows.node import Node as AgentSpecNode +from pyagentspec.flows.nodes import EndNode as AgentSpecEndNode +from pyagentspec.flows.nodes import StartNode as AgentSpecStartNode +from pyagentspec.flows.nodes import ToolNode as AgentSpecToolNode from pyagentspec.llms import LlmConfig as AgentSpecLlmConfig from pyagentspec.llms import LlmGenerationConfig as AgentSpecLlmGenerationConfig from pyagentspec.llms.ollamaconfig import OllamaConfig as AgentSpecOllamaModel @@ -123,6 +133,10 @@ def convert( agentspec_component = self._tool_convert_to_agentspec( crewai_component, referenced_objects ) + elif isinstance(crewai_component, CrewAIFlow): + agentspec_component = self._flow_convert_to_agentspec( + crewai_component, referenced_objects + ) else: raise NotImplementedError( f"The crewai type '{crewai_component.__class__.__name__}' is not yet supported " @@ -217,3 +231,267 @@ def _agent_convert_to_agentspec( for tool in (crewai_agent.tools or []) ], ) + + def _flow_convert_to_agentspec( + self, crewai_flow: CrewAIFlow[FlowState], referenced_objects: Dict[str, Any] + ) -> AgentSpecFlow: + + nodes: Dict[str, AgentSpecNode] = {} + + # Create a ToolNode for each method (i.e. node) in the flow + methods_by_name = getattr(crewai_flow, "_methods", {}) + start_method_names = set(getattr(crewai_flow, "_start_methods", [])) + for method_name in methods_by_name.keys(): + method_callable = methods_by_name[method_name] + + # Get method signature to infer properties since they're not explicitly defined + signature = inspect.signature(method_callable) + + # Create input properties for nodes: + # - for start nodes we use their input parameters + # - for other nodes we either leave it empty or have a single "object" property + parameters = [p for p in signature.parameters.values() if p.name != "self"] + node_inputs = [ + AgentSpecProperty(title=p.name, json_schema={"type": "object"}) + for p in parameters[: (len(parameters) if method_name in start_method_names else 1)] + ] + + # Create output properties for nodes: + # - if there is a return value annotation, add a single "object" property + return_annotation = signature.return_annotation + has_output = not ( + return_annotation is inspect.Signature.empty or return_annotation is None + ) + node_outputs = ( + [AgentSpecProperty(title=f"{method_name}_output", json_schema={"type": "object"})] + if has_output + else [] + ) + + tool = AgentSpecServerTool( + name=method_name, + description=f"Converted CrewAI flow method '{method_name}'", + inputs=node_inputs, + outputs=node_outputs, + ) + node = AgentSpecToolNode( + name=method_name, + tool=tool, + inputs=node_inputs, + outputs=node_outputs, + ) + nodes[method_name] = node + referenced_objects[method_name] = node + + # Start node with inferred properties + start_node_properties = [ + property + for start_method in getattr(crewai_flow, "_start_methods", []) + for property in (nodes[start_method].inputs or []) + ] + start_node = AgentSpecStartNode( + name="START", inputs=start_node_properties, outputs=start_node_properties + ) + nodes[start_node.name] = start_node + referenced_objects[start_node.name] = start_node + + control_flow_edges: list[ControlFlowEdge] = [] + data_flow_edges: list[DataFlowEdge] = [] + + # Connect START to all start methods + for start_method in getattr(crewai_flow, "_start_methods", []): + control_flow_edges, data_flow_edges = self._add_start_edges( + start_node, + nodes[start_method], + control_flow_edges, + data_flow_edges, + ) + + # Build edges based on listeners + listeners = getattr(crewai_flow, "_listeners", {}) + + for listener_name, condition in listeners.items(): + triggers, has_branching = self._extract_triggers_from_condition(condition) + if has_branching: + raise ValueError( + "Branching is not currently supported (AND, OR operators). " + f"Got node {listener_name} with triggers {', '.join(triggers)}" + ) + for trigger in triggers: + if trigger in getattr(crewai_flow, "_methods", {}): + control_flow_edges, data_flow_edges = self._add_listener_edges( + nodes[trigger], + nodes[listener_name], + control_flow_edges, + data_flow_edges, + ) + + # End node with inferred properties + has_outgoing_edges: set[str] = set( + edge.from_node.name for edge in control_flow_edges if edge.from_node.name != "END" + ) + end_methods = [ + method_name + for method_name in nodes + if method_name not in ("START", "END") and method_name not in has_outgoing_edges + ] + end_node_properties = [ + property for end_method in end_methods for property in (nodes[end_method].outputs or []) + ] + end_node = AgentSpecEndNode( + name="END", inputs=end_node_properties, outputs=end_node_properties + ) + nodes[end_node.name] = end_node + referenced_objects[end_node.name] = end_node + + # Connect END to all end methods + for end_method in end_methods: + control_flow_edges, data_flow_edges = self._add_end_edges( + nodes[end_method], + end_node, + control_flow_edges, + data_flow_edges, + ) + + return AgentSpecFlow( + name=(crewai_flow.name or crewai_flow.__class__.__name__), + start_node=start_node, + nodes=list(nodes.values()), + control_flow_connections=control_flow_edges, + data_flow_connections=data_flow_edges, + ) + + def _add_start_edges( + self, + start_node: AgentSpecNode, + destination_node: AgentSpecNode, + control_flow_edges: List[ControlFlowEdge], + data_flow_edges: List[DataFlowEdge], + ) -> Tuple[List[ControlFlowEdge], List[DataFlowEdge]]: + control_flow_edges.append( + ControlFlowEdge( + name=f"START_to_{destination_node.name}_control_edge", + from_node=start_node, + to_node=destination_node, + ) + ) + start_node_outputs = start_node.outputs or [] + start_node_output_property_names = [property.title for property in start_node_outputs] + destination_node_inputs = destination_node.inputs or [] + for property in destination_node_inputs: + if property.title in start_node_output_property_names: + data_flow_edges.append( + DataFlowEdge( + name=f"START_to_{destination_node.name}_data_edge", + source_node=start_node, + destination_node=destination_node, + source_output=property.title, + destination_input=property.title, + ) + ) + return control_flow_edges, data_flow_edges + + def _add_listener_edges( + self, + trigger_node: AgentSpecNode, + listener_node: AgentSpecNode, + control_flow_edges: List[ControlFlowEdge], + data_flow_edges: List[DataFlowEdge], + ) -> Tuple[List[ControlFlowEdge], List[DataFlowEdge]]: + control_flow_edges.append( + ControlFlowEdge( + name=f"{trigger_node.name}_to_{listener_node.name}_control_edge", + from_node=trigger_node, + to_node=listener_node, + ) + ) + trigger_node_outputs = trigger_node.outputs or [] + listener_node_inputs = listener_node.inputs or [] + if len(trigger_node_outputs) == 1 and len(listener_node_inputs) == 1: + data_flow_edges.append( + DataFlowEdge( + name=f"{trigger_node.name}_to_{listener_node.name}_data_edge", + source_node=trigger_node, + destination_node=listener_node, + source_output=trigger_node_outputs[0].title, + destination_input=listener_node_inputs[0].title, + ) + ) + return control_flow_edges, data_flow_edges + + def _add_end_edges( + self, + source_node: AgentSpecNode, + end_node: AgentSpecNode, + control_flow_edges: List[ControlFlowEdge], + data_flow_edges: List[DataFlowEdge], + ) -> Tuple[List[ControlFlowEdge], List[DataFlowEdge]]: + control_flow_edges.append( + ControlFlowEdge( + name=f"{source_node.name}_to_END_control_edge", + from_node=source_node, + to_node=end_node, + ) + ) + source_node_outputs = source_node.outputs or [] + end_node_inputs = end_node.inputs or [] + end_node_input_property_names = [property.title for property in end_node_inputs] + for property in source_node_outputs: + if property.title in end_node_input_property_names: + data_flow_edges.append( + DataFlowEdge( + name=f"{source_node.name}_to_END_data_edge", + source_node=source_node, + destination_node=end_node, + source_output=property.title, + destination_input=property.title, + ) + ) + return control_flow_edges, data_flow_edges + + def _extract_triggers_from_condition(self, condition: Any) -> Tuple[set[str], bool]: + """ + Extract flat trigger names from CrewAI listener condition. + Returns (triggers, has_branching) where has_branching indicates unsupported branching usage. + """ + has_branching = False + triggers: set[str] = set() + + def _is_branching_condition(condition_type: Any, methods: Any) -> bool: + return (str(condition_type).upper() in ("AND", "OR")) and len(methods) > 1 + + # Simple tuple form: (condition_type, methods) + if isinstance(condition, tuple) and len(condition) == 2: + condition_type, methods = condition + for method in methods or []: + triggers.add(str(method)) + return triggers, _is_branching_condition(condition_type, methods) + + # Dict form: {"type": "OR"/"AND", "methods": [...] } or {"type": ..., "conditions": [...]} + if isinstance(condition, dict): + condition_type = str(condition.get("type", "OR")).upper() + if "methods" in condition: + methods = condition.get("methods", []) + for method in methods: + triggers.add(str(method)) + return triggers, _is_branching_condition(condition_type, methods) + if "conditions" in condition: + subconditions = condition.get("conditions", []) + has_branching = _is_branching_condition(condition_type, subconditions) + for subcondition in subconditions: + if isinstance(subcondition, dict): + subtriggers, sub_has_branching = self._extract_triggers_from_condition( + subcondition + ) + triggers |= subtriggers + has_branching = has_branching or sub_has_branching + else: + triggers.add(str(subcondition)) + return triggers, has_branching + + # Direct string method/label + if isinstance(condition, str): + triggers.add(condition) + return triggers, has_branching + + return triggers, has_branching diff --git a/pyagentspec/src/pyagentspec/adapters/crewai/_crewaiconverter.py b/pyagentspec/src/pyagentspec/adapters/crewai/_crewaiconverter.py index 9408ec29..52b7e1a8 100644 --- a/pyagentspec/src/pyagentspec/adapters/crewai/_crewaiconverter.py +++ b/pyagentspec/src/pyagentspec/adapters/crewai/_crewaiconverter.py @@ -4,80 +4,47 @@ # (LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0) or Universal Permissive License # (UPL) 1.0 (LICENSE-UPL or https://oss.oracle.com/licenses/upl), at your option. -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, cast import httpx -from pydantic import BaseModel, Field, create_model +from pyagentspec import Component as AgentSpecComponent from pyagentspec.adapters._utils import render_template +from pyagentspec.adapters.crewai._node_execution import NodeExecutor from pyagentspec.adapters.crewai._types import ( + ControlFlow, CrewAIAgent, CrewAIBaseTool, + CrewAIFlow, + CrewAIListenNode, CrewAILlm, + CrewAIOrOperator, CrewAIServerToolType, + CrewAIStartNode, CrewAITool, + FlowState, ) +from pyagentspec.adapters.crewai._utils import create_pydantic_model_from_properties from pyagentspec.adapters.crewai.tracing import CrewAIAgentWithTracing from pyagentspec.agent import Agent as AgentSpecAgent -from pyagentspec.component import Component as AgentSpecComponent +from pyagentspec.flows.edges.controlflowedge import ControlFlowEdge +from pyagentspec.flows.flow import Flow as AgentSpecFlow +from pyagentspec.flows.node import Node as AgentSpecNode +from pyagentspec.flows.nodes import EndNode as AgentSpecEndNode +from pyagentspec.flows.nodes import InputMessageNode as AgentSpecInputMessageNode +from pyagentspec.flows.nodes import LlmNode as AgentSpecLlmNode +from pyagentspec.flows.nodes import OutputMessageNode as AgentSpecOutputMessageNode +from pyagentspec.flows.nodes import StartNode as AgentSpecStartNode +from pyagentspec.flows.nodes import ToolNode as AgentSpecToolNode from pyagentspec.llms import LlmConfig as AgentSpecLlmConfig -from pyagentspec.llms.ollamaconfig import OllamaConfig as AgentSpecOllamaModel -from pyagentspec.llms.openaicompatibleconfig import ( - OpenAiCompatibleConfig as AgentSpecOpenAiCompatibleConfig, -) -from pyagentspec.llms.openaiconfig import OpenAiConfig as AgentSpecOpenAiConfig -from pyagentspec.llms.vllmconfig import VllmConfig as AgentSpecVllmModel -from pyagentspec.property import Property as AgentSpecProperty -from pyagentspec.property import _empty_default as _agentspec_empty_default +from pyagentspec.llms import OllamaConfig as AgentSpecOllamaModel +from pyagentspec.llms import OpenAiCompatibleConfig as AgentSpecOpenAiCompatibleConfig +from pyagentspec.llms import OpenAiConfig as AgentSpecOpenAiConfig +from pyagentspec.llms import VllmConfig as AgentSpecVllmModel +from pyagentspec.tools import ClientTool as AgentSpecClientTool +from pyagentspec.tools import RemoteTool as AgentSpecRemoteTool +from pyagentspec.tools import ServerTool as AgentSpecServerTool from pyagentspec.tools import Tool as AgentSpecTool -from pyagentspec.tools.clienttool import ClientTool as AgentSpecClientTool -from pyagentspec.tools.remotetool import RemoteTool as AgentSpecRemoteTool -from pyagentspec.tools.servertool import ServerTool as AgentSpecServerTool - - -def _json_schema_type_to_python_annotation(json_schema: Dict[str, Any]) -> str: - if "anyOf" in json_schema: - possible_types = set( - _json_schema_type_to_python_annotation(inner_json_schema_type) - for inner_json_schema_type in json_schema["anyOf"] - ) - return f"Union[{','.join(possible_types)}]" - if isinstance(json_schema["type"], list): - possible_types = set( - _json_schema_type_to_python_annotation(inner_json_schema_type) - for inner_json_schema_type in json_schema["type"] - ) - return f"Union[{','.join(possible_types)}]" - mapping = { - "string": "str", - "number": "float", - "integer": "int", - "boolean": "bool", - "null": "None", - } - if json_schema["type"] == "object": - # We could do better in inferring the type of values, for now we just use Any - return "Dict[str, Any]" - if json_schema["type"] == "array": - return f"List[{_json_schema_type_to_python_annotation(json_schema['items'])}]" - return mapping.get(json_schema["type"], "Any") - - -def _create_pydantic_model_from_properties( - model_name: str, properties: List[AgentSpecProperty] -) -> type[BaseModel]: - """Create a Pydantic model CLASS whose attributes are the given properties.""" - fields: Dict[str, Any] = {} - for property_ in properties: - field_parameters: Dict[str, Any] = {} - param_name = property_.title - if property_.default is not _agentspec_empty_default: - field_parameters["default"] = property_.default - if property_.description: - field_parameters["description"] = property_.description - annotation = _json_schema_type_to_python_annotation(property_.json_schema) - fields[param_name] = (annotation, Field(**field_parameters)) - return create_model(model_name, **fields) class AgentSpecToCrewAIConverter: @@ -119,6 +86,14 @@ def convert( crewai_component = self._tool_convert_to_crewai( agentspec_component, tool_registry, converted_components ) + elif isinstance(agentspec_component, AgentSpecFlow): + crewai_component = self._flow_convert_to_crewai( + agentspec_component, tool_registry, converted_components + ) + elif isinstance(agentspec_component, AgentSpecNode): + crewai_component = self._node_convert_to_crewai( + agentspec_component, tool_registry, converted_components + ) elif isinstance(agentspec_component, AgentSpecComponent): raise NotImplementedError( f"The AgentSpec Component type '{agentspec_component.__class__.__name__}' is not yet supported " @@ -150,6 +125,155 @@ def convert( self._is_root_call = is_root_call return converted_crewai_component + def _flow_convert_to_crewai( + self, + flow: AgentSpecFlow, + tool_registry: Dict[str, CrewAIServerToolType], + converted_components: Dict[str, Any], + ) -> CrewAIFlow[FlowState]: + + node_executors: Dict[str, NodeExecutor] = { + node.id: self._node_convert_to_crewai( + node, + tool_registry=tool_registry, + converted_components=converted_components, + ) + for node in (flow.nodes or []) + } + + # Data flow edges (handled by node executors) + for data_flow_edge in flow.data_flow_connections or []: + node_executors[data_flow_edge.source_node.id].attach_edge(data_flow_edge) + node_executors[data_flow_edge.destination_node.id].attach_edge(data_flow_edge) + + # Initialize the class namespace for the CrewAI Flow + crewai_class_namespace: Dict[str, Any] = {} + crewai_class_namespace["name"] = flow.name or "ConvertedCrewAIFlow" + + # Prepare node executors to be decorated + for _node_id, node_executor in node_executors.items(): + node_wrapper = node_executor.get_node_wrapper() + node_wrapper.__name__ = node_executor.node.name + crewai_class_namespace[node_wrapper.__name__] = node_wrapper + + # Apply the @start decorator to the AgentSpec start node + start_node_name = flow.start_node.name + crewai_class_namespace[start_node_name] = CrewAIStartNode()( + crewai_class_namespace[start_node_name] + ) + + # Apply the @listen decorator to destination nodes + control_flow = self._create_control_flow(flow.control_flow_connections) + + listen_conditions: Dict[str, List[str]] = {} + for source_node_id, control_flow_mapping in control_flow.items(): + for _branch_label, destination_node_id in control_flow_mapping.items(): + source_node_name = node_executors[source_node_id].node.name + destination_node_name = node_executors[destination_node_id].node.name + listen_conditions.setdefault(destination_node_name, []).append(source_node_name) + + for listen_node_name, conditions in listen_conditions.items(): + conditions_union = CrewAIOrOperator(*conditions) + crewai_class_namespace[listen_node_name] = CrewAIListenNode(conditions_union)( + crewai_class_namespace[listen_node_name] + ) + + # Create the Flow subclass and return an instance of it + ConvertedCrewAIFlow = cast( + type[CrewAIFlow[FlowState]], + type( + "ConvertedCrewAIFlow", + (CrewAIFlow,), + crewai_class_namespace, + ), + ) + return ConvertedCrewAIFlow() + + def _create_control_flow(self, control_flow_connections: List[ControlFlowEdge]) -> ControlFlow: + control_flow: ControlFlow = {} + for control_flow_edge in control_flow_connections: + source_node_id = control_flow_edge.from_node.id + if source_node_id not in control_flow: + control_flow[source_node_id] = {} + + branch_name = control_flow_edge.from_branch or AgentSpecNode.DEFAULT_NEXT_BRANCH + control_flow[source_node_id][branch_name] = control_flow_edge.to_node.id + + return control_flow + + def _node_convert_to_crewai( + self, + node: AgentSpecNode, + tool_registry: Dict[str, CrewAIServerToolType], + converted_components: Optional[Dict[str, Any]] = None, + ) -> NodeExecutor: + if isinstance(node, AgentSpecStartNode): + return self._start_node_convert_to_crewai(node) + elif isinstance(node, AgentSpecEndNode): + return self._end_node_convert_to_crewai(node) + elif isinstance(node, AgentSpecToolNode): + return self._tool_node_convert_to_crewai(node, tool_registry, converted_components) + elif isinstance(node, AgentSpecLlmNode): + return self._llm_node_convert_to_crewai(node, tool_registry, converted_components) + elif isinstance(node, AgentSpecInputMessageNode): + return self._input_message_node_convert_to_crewai(node) + elif isinstance(node, AgentSpecOutputMessageNode): + return self._output_message_node_convert_to_crewai(node) + else: + raise NotImplementedError( + f"The AgentSpec component of type {type(node)} is not yet supported for conversion" + ) + + def _start_node_convert_to_crewai(self, node: AgentSpecStartNode) -> "NodeExecutor": + from pyagentspec.adapters.crewai._node_execution import StartNodeExecutor + + return StartNodeExecutor(node) + + def _end_node_convert_to_crewai(self, node: AgentSpecEndNode) -> "NodeExecutor": + from pyagentspec.adapters.crewai._node_execution import EndNodeExecutor + + return EndNodeExecutor(node) + + def _tool_node_convert_to_crewai( + self, + node: AgentSpecToolNode, + tool_registry: Dict[str, CrewAIServerToolType], + converted_components: Optional[Dict[str, Any]], + ) -> "NodeExecutor": + from pyagentspec.adapters.crewai._node_execution import ToolNodeExecutor + + tool = self.convert( + node.tool, tool_registry=tool_registry, converted_components=converted_components + ) + return ToolNodeExecutor(node, tool) + + def _llm_node_convert_to_crewai( + self, + node: AgentSpecLlmNode, + tool_registry: Dict[str, CrewAIServerToolType], + converted_components: Optional[Dict[str, Any]], + ) -> "NodeExecutor": + from pyagentspec.adapters.crewai._node_execution import LlmNodeExecutor + + llm = self.convert( + node.llm_config, tool_registry=tool_registry, converted_components=converted_components + ) + return LlmNodeExecutor(node, llm) + + def _input_message_node_convert_to_crewai( + self, node: AgentSpecInputMessageNode + ) -> "NodeExecutor": + from pyagentspec.adapters.crewai._node_execution import InputMessageNodeExecutor + + return InputMessageNodeExecutor(node) + + def _output_message_node_convert_to_crewai( + self, node: AgentSpecOutputMessageNode + ) -> "NodeExecutor": + from pyagentspec.adapters.crewai._node_execution import OutputMessageNodeExecutor + + return OutputMessageNodeExecutor(node) + def _llm_convert_to_crewai( self, agentspec_llm: AgentSpecLlmConfig, @@ -205,7 +329,7 @@ def _tool_convert_to_crewai( return CrewAITool( name=agentspec_tool.name, description=agentspec_tool.description or "", - args_schema=_create_pydantic_model_from_properties( + args_schema=create_pydantic_model_from_properties( agentspec_tool.name.title() + "InputSchema", agentspec_tool.inputs or [] ), func=tool, @@ -236,7 +360,7 @@ def client_tool(**kwargs: Any) -> Any: return CrewAITool( name=agentspec_tool.name, description=agentspec_tool.description or "", - args_schema=_create_pydantic_model_from_properties( + args_schema=create_pydantic_model_from_properties( agentspec_tool.name.title() + "InputSchema", agentspec_tool.inputs or [] ), func=client_tool, @@ -271,7 +395,7 @@ def _remote_tool(**kwargs: Any) -> Any: return CrewAITool( name=remote_tool.name, description=remote_tool.description or "", - args_schema=_create_pydantic_model_from_properties( + args_schema=create_pydantic_model_from_properties( remote_tool.name.title() + "InputSchema", remote_tool.inputs or [] ), func=_remote_tool, diff --git a/pyagentspec/src/pyagentspec/adapters/crewai/_node_execution.py b/pyagentspec/src/pyagentspec/adapters/crewai/_node_execution.py new file mode 100644 index 00000000..b6b46f06 --- /dev/null +++ b/pyagentspec/src/pyagentspec/adapters/crewai/_node_execution.py @@ -0,0 +1,266 @@ +# Copyright © 2025 Oracle and/or its affiliates. +# +# This software is under the Apache License 2.0 +# (LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0) or Universal Permissive License +# (UPL) 1.0 (LICENSE-UPL or https://oss.oracle.com/licenses/upl), at your option. + +from __future__ import annotations + +import json +from abc import ABC, abstractmethod +from typing import Any, Callable, Dict, List, Optional + +from pydantic import BaseModel + +from pyagentspec.adapters.crewai._template_rendering import render_template +from pyagentspec.adapters.crewai._types import ( + CrewAIFlow, + CrewAILlm, + CrewAITool, + ExecuteOutput, + FlowState, + NodeInput, + NodeOutput, +) +from pyagentspec.adapters.crewai._utils import create_pydantic_model_from_properties +from pyagentspec.flows.edges import DataFlowEdge +from pyagentspec.flows.node import Node as AgentSpecNode +from pyagentspec.flows.nodes import EndNode as AgentSpecEndNode +from pyagentspec.flows.nodes import InputMessageNode as AgentSpecInputMessageNode +from pyagentspec.flows.nodes import LlmNode as AgentSpecLlmNode +from pyagentspec.flows.nodes import OutputMessageNode as AgentSpecOutputMessageNode +from pyagentspec.flows.nodes import StartNode as AgentSpecStartNode +from pyagentspec.flows.nodes import ToolNode as AgentSpecToolNode +from pyagentspec.property import Property as AgentSpecProperty +from pyagentspec.property import _empty_default as pyagentspec_empty_default + + +class NodeExecutor(ABC): + def __init__(self, node: AgentSpecNode) -> None: + self.node = node + self.edges: List[DataFlowEdge] = [] + + def attach_edge(self, edge: DataFlowEdge) -> None: + self.edges.append(edge) + + @abstractmethod + def _execute(self, inputs: NodeInput) -> ExecuteOutput: + """ + Returns the output of executing node with the given inputs. + """ + + def get_node_wrapper(self) -> Callable[..., NodeOutput]: + """ + Prepares a callable that will be decorated and part of the CrewAI Flow class. + This wraps around the node's _execute method and handles all inputs and outputs + using the state variable of the CrewAI Flow. + """ + + def node_wrapper(self_flow: CrewAIFlow[FlowState], *args: Any, **kwargs: Any) -> NodeOutput: + inputs = self._get_inputs(self_flow.state) + execute_outputs = self._execute(inputs) + node_outputs = self._update_status(self_flow.state, execute_outputs) + + return node_outputs + + return node_wrapper + + def _get_inputs(self, state: FlowState) -> NodeInput: + input_properties = self.node.inputs or [] + input_values = {} + + for edge in self.edges: + if edge.destination_node.id == self.node.id: + property_name = edge.destination_input + property_name_global = self._globalize_property_name(property_name) + input_values[property_name] = state[property_name_global] + + return self._cast_values_and_add_defaults(input_values, input_properties) + + def _update_status(self, state: FlowState, outputs: ExecuteOutput) -> NodeOutput: + output_properties = self.node.outputs or [] + output_values = self._cast_values_and_add_defaults(outputs, output_properties) + + for edge in self.edges: + if edge.source_node.id == self.node.id: + property_value = output_values[edge.source_output] + + output_property_name_global = self._globalize_property_name(edge.source_output) + state[output_property_name_global] = property_value + + next_input_property_name_global = self._globalize_property_name( + edge.destination_input, edge.destination_node.id + ) + state[next_input_property_name_global] = property_value + + return { + property.title: output_values[property.title] + for property in output_properties + if property.title in output_values + } + + def _globalize_property_name(self, property_name: str, node_id: Optional[str] = None) -> str: + return f"{node_id or self.node.id}::{property_name}" + + def _cast_values_and_add_defaults( + self, + values_dict: Dict[str, Any], + properties: List[AgentSpecProperty], + ) -> Dict[str, Any]: + results_dict: Dict[str, Any] = {} + for property_ in properties: + key = property_.title + if key in values_dict: + value = values_dict.get(key) + if property_.type == "string" and not isinstance(value, str): + value = json.dumps(value) + elif property_.type == "boolean" and isinstance(value, (int, float)): + value = bool(value) + elif property_.type == "integer" and isinstance(value, (float, bool)): + value = int(value) + elif property_.type == "number" and isinstance(value, (int, bool)): + value = float(value) + results_dict[key] = value + elif property_.default is not pyagentspec_empty_default: + results_dict[key] = property_.default + else: + raise ValueError( + f"Expected node `{self.node.name}` to have a value " + f"for property `{property_.title}`, but none was found." + ) + return results_dict + + +class StartNodeExecutor(NodeExecutor): + node: AgentSpecStartNode + + def _get_inputs(self, state: FlowState) -> NodeInput: + input_properties = self.node.inputs or [] + input_values = {} + + for property in input_properties: + if property.title in state: + input_values[property.title] = state[property.title] + + return self._cast_values_and_add_defaults(input_values, input_properties) + + def _execute(self, inputs: NodeInput) -> ExecuteOutput: + return inputs + + +class EndNodeExecutor(NodeExecutor): + node: AgentSpecEndNode + + def _update_status(self, state: FlowState, outputs: ExecuteOutput) -> NodeOutput: + output_properties = self.node.outputs or [] + output_values = {} + + for property in output_properties: + if property.title in outputs: + output_values[property.title] = outputs[property.title] + + output_values = self._cast_values_and_add_defaults(output_values, output_properties) + + for property_name, property_value in output_values.items(): + state[self._globalize_property_name(property_name)] = property_value + + return output_values + + def _execute(self, inputs: NodeInput) -> ExecuteOutput: + return inputs + + +class ToolNodeExecutor(NodeExecutor): + node: AgentSpecToolNode + + def __init__(self, node: AgentSpecToolNode, tool: CrewAITool) -> None: + super().__init__(node) + if not isinstance(self.node, AgentSpecToolNode): + raise TypeError("ToolNodeExecutor can only be initialized with ToolNode") + self.tool = tool + + def _execute(self, inputs: NodeInput) -> ExecuteOutput: + tool_output = self.tool.run(**inputs) + + if isinstance(tool_output, dict): + return tool_output + + output_name = self.node.outputs[0].title if self.node.outputs else "tool_output" + return {output_name: tool_output} + + +class LlmNodeExecutor(NodeExecutor): + node: AgentSpecLlmNode + + def __init__(self, node: AgentSpecLlmNode, llm: CrewAILlm) -> None: + super().__init__(node) + + self.llm: CrewAILlm = llm + + node_outputs = self.node.outputs or [] + requires_structured_generation = not ( + len(node_outputs) == 1 and node_outputs[0].type == "string" + ) + if requires_structured_generation: + self.llm.response_format = create_pydantic_model_from_properties( + self.node.name.title() + "ResponseFormat", node_outputs + ) + + def _execute(self, inputs: Dict[str, Any]) -> ExecuteOutput: + prompt_template = self.node.prompt_template + rendered_prompt = render_template(prompt_template, inputs) + + generated_message = self.llm.call(rendered_prompt) + + if self.llm.response_format is not None: + if not isinstance(generated_message, BaseModel): + raise TypeError( + f"Expected LLM response format to be BaseModel, got {type(generated_message)!r}" + ) + + return generated_message.dict() + else: + node_outputs = self.node.outputs or [] + output_name = node_outputs[0].title if node_outputs else "generated_text" + return {output_name: generated_message} + + +class InputMessageNodeExecutor(NodeExecutor): + node: AgentSpecInputMessageNode + + def _execute(self, inputs: Dict[str, Any]) -> ExecuteOutput: + from crewai.events.event_listener import event_listener + from crewai.utilities.printer import Printer + + printer = Printer() + event_listener.formatter.pause_live_updates() + + try: + printer.print(content=f"\nPlease provide your input message:", color="bold_yellow") + response = input().strip() + output_name = ( + self.node.outputs[0].title + if self.node.outputs + else AgentSpecInputMessageNode.DEFAULT_OUTPUT + ) + return {output_name: response} + finally: + event_listener.formatter.resume_live_updates() + + +class OutputMessageNodeExecutor(NodeExecutor): + node: AgentSpecOutputMessageNode + + def _execute(self, inputs: Dict[str, Any]) -> ExecuteOutput: + from crewai.events.event_listener import event_listener + from crewai.utilities.printer import Printer + + printer = Printer() + event_listener.formatter.pause_live_updates() + + try: + message = render_template(self.node.message, inputs) + printer.print(content=f"\n{message}\n", color="bold_yellow") + return {} + finally: + event_listener.formatter.resume_live_updates() diff --git a/pyagentspec/src/pyagentspec/adapters/crewai/_template_rendering.py b/pyagentspec/src/pyagentspec/adapters/crewai/_template_rendering.py new file mode 100644 index 00000000..5fb12850 --- /dev/null +++ b/pyagentspec/src/pyagentspec/adapters/crewai/_template_rendering.py @@ -0,0 +1,33 @@ +# Copyright © 2025 Oracle and/or its affiliates. +# +# This software is under the Apache License 2.0 +# (LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0) or Universal Permissive License +# (UPL) 1.0 (LICENSE-UPL or https://oss.oracle.com/licenses/upl), at your option. + +import re +from typing import Any, Dict, List, Tuple + +from pyagentspec.templating import TEMPLATE_PLACEHOLDER_REGEXP + + +def render_template(template: str, inputs: Dict[str, Any]) -> str: + """Render a prompt template using inputs.""" + if not isinstance(template, str): + return str(template) + return _recursive_template_splitting_rendering( + template, [(input_title, input_value) for input_title, input_value in inputs.items()] + ) + + +def _recursive_template_splitting_rendering(template: str, inputs: List[Tuple[str, Any]]) -> str: + """Recursively split and join the templates using the list of inputs.""" + if len(inputs) == 0: + return template + input_title, input_value = inputs[-1] + splitting_regexp = TEMPLATE_PLACEHOLDER_REGEXP.replace(r"(\w+)", input_title) + split_templates = re.split(splitting_regexp, template) + rendered_split_templates = [ + _recursive_template_splitting_rendering(t, inputs[:-1]) for t in split_templates + ] + rendered_template = str(input_value).join(rendered_split_templates) + return rendered_template diff --git a/pyagentspec/src/pyagentspec/adapters/crewai/_types.py b/pyagentspec/src/pyagentspec/adapters/crewai/_types.py index 38202371..8bb11c7b 100644 --- a/pyagentspec/src/pyagentspec/adapters/crewai/_types.py +++ b/pyagentspec/src/pyagentspec/adapters/crewai/_types.py @@ -4,7 +4,9 @@ # (LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0) or Universal Permissive License # (UPL) 1.0 (LICENSE-UPL or https://oss.oracle.com/licenses/upl), at your option. -from typing import TYPE_CHECKING, Any, Callable, Union +from typing import TYPE_CHECKING, Any, Callable, Dict, Hashable, Union + +from typing_extensions import TypeAlias from pyagentspec._lazy_loader import LazyLoader @@ -40,6 +42,9 @@ from crewai.events.types.tool_usage_events import ( ToolUsageStartedEvent as CrewAIToolUsageStartedEvent, ) + from crewai.flow.flow import listen as CrewAIListenNode + from crewai.flow.flow import or_ as CrewAIOrOperator + from crewai.flow.flow import start as CrewAIStartNode from crewai.tools import BaseTool as CrewAIBaseTool from crewai.tools.base_tool import Tool as CrewAITool from crewai.tools.structured_tool import CrewStructuredTool as CrewAIStructuredTool @@ -78,10 +83,23 @@ CrewAIToolUsageStartedEvent = LazyLoader( "crewai.events.types.tool_usage_events" ).ToolUsageStartedEvent + CrewAIStartNode = LazyLoader("crewai.flow.flow").start + CrewAIListenNode = LazyLoader("crewai.flow.flow").listen + CrewAIOrOperator = LazyLoader("crewai.flow.flow").or_ CrewAIComponent = Union[CrewAIAgent, CrewAIFlow[Any]] CrewAIServerToolType = Union[CrewAITool, Callable[..., Any]] +NodeInput: TypeAlias = Union[Dict[str, Any]] +NodeOutput: TypeAlias = Union[Dict[str, Any]] +ExecuteOutput: TypeAlias = Union[Dict[str, Any]] +FlowState: TypeAlias = Dict[str, Any] + +SourceNodeId: TypeAlias = str +BranchName: TypeAlias = Hashable +TargetNodeId: TypeAlias = str +ControlFlow: TypeAlias = Dict[SourceNodeId, Dict[BranchName, TargetNodeId]] + __all__ = [ "crewai", "crewai_event_bus", @@ -91,6 +109,9 @@ "CrewAIBaseTool", "CrewAITool", "CrewAIStructuredTool", + "CrewAIStartNode", + "CrewAIListenNode", + "CrewAIOrOperator", "CrewAIComponent", "CrewAIServerToolType", "CrewAIBaseEvent", @@ -105,4 +126,11 @@ "CrewAIAgentExecutionCompletedEvent", "CrewAILiteAgentExecutionStartedEvent", "CrewAILiteAgentExecutionCompletedEvent", + "NodeInput", + "NodeOutput", + "FlowState", + "SourceNodeId", + "BranchName", + "TargetNodeId", + "ControlFlow", ] diff --git a/pyagentspec/src/pyagentspec/adapters/crewai/_utils.py b/pyagentspec/src/pyagentspec/adapters/crewai/_utils.py new file mode 100644 index 00000000..c248cecb --- /dev/null +++ b/pyagentspec/src/pyagentspec/adapters/crewai/_utils.py @@ -0,0 +1,125 @@ +# Copyright © 2025 Oracle and/or its affiliates. +# +# This software is under the Apache License 2.0 +# (LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0) or Universal Permissive License +# (UPL) 1.0 (LICENSE-UPL or https://oss.oracle.com/licenses/upl), at your option. + +from typing import Any, Dict, List, Literal, Tuple, Union + +from pydantic import BaseModel, ConfigDict, Field, create_model + +from pyagentspec.property import Property as AgentSpecProperty +from pyagentspec.property import _empty_default as _agentspec_empty_default + + +class SchemaRegistry: + def __init__(self) -> None: + self.models: Dict[str, type[BaseModel]] = {} + + +def _build_type_from_schema( + name: str, + schema: Dict[str, Any], + registry: SchemaRegistry, +) -> Any: + # Enum -> Literal[…] + if "enum" in schema and isinstance(schema["enum"], list): + values = schema["enum"] + # Literal supports a tuple of literal values as a single subscription argument + return Literal[tuple(values)] + + # anyOf / oneOf -> Union[…] + for key in ("anyOf", "oneOf"): + if key in schema: + variants = [ + _build_type_from_schema(f"{name}Alt{i}", s, registry) + for i, s in enumerate(schema[key]) + ] + return Union[tuple(variants)] + + t = schema.get("type") + + # list of types -> Union[…] + if isinstance(t, list): + variants = [ + _build_type_from_schema(f"{name}Alt{i}", {"type": subtype}, registry) + for i, subtype in enumerate(t) + ] + return Union[tuple(variants)] + + # arrays + if t == "array": + items_schema = schema.get("items", {"type": "any"}) + item_type = _build_type_from_schema(f"{name}Item", items_schema, registry) + return List[item_type] # type: ignore + # objects + if t == "object" or ("properties" in schema or "required" in schema): + # Create or reuse a Pydantic model for this object schema + model_name = schema.get("title") or name + unique_name = model_name + suffix = 1 + while unique_name in registry.models: + suffix += 1 + unique_name = f"{model_name}_{suffix}" + + props = schema.get("properties", {}) or {} + required = set(schema.get("required", [])) + + fields: Dict[str, Tuple[Any, Any]] = {} + for prop_name, prop_schema in props.items(): + prop_type = _build_type_from_schema(f"{unique_name}_{prop_name}", prop_schema, registry) + desc = prop_schema.get("description") + default_field = ( + Field(..., description=desc) + if prop_name in required + else Field(None, description=desc) + ) + fields[prop_name] = (prop_type, default_field) + + # Enforce additionalProperties: False (extra=forbid) + extra_forbid = schema.get("additionalProperties") is False + model_kwargs: Dict[str, Any] = {} + if extra_forbid: + # Pydantic v2: pass a ConfigDict/dict into __config__ + model_kwargs["__config__"] = ConfigDict(extra="forbid") + + model_cls = create_model(unique_name, **fields, **model_kwargs) # type: ignore + registry.models[unique_name] = model_cls + return model_cls + + # primitives / fallback + mapping = { + "string": str, + "number": float, + "integer": int, + "boolean": bool, + "null": type(None), + "any": Any, + None: Any, + "": Any, + } + return mapping.get(t, Any) + + +def create_pydantic_model_from_properties( + model_name: str, properties: List[AgentSpecProperty] +) -> type[BaseModel]: + registry = SchemaRegistry() + fields: Dict[str, Tuple[Any, Any]] = {} + + for property_ in properties: + # Build the annotation from the json_schema (handles enum/array/object/etc.) + annotation = _build_type_from_schema(property_.title, property_.json_schema, registry) + + field_params: Dict[str, Any] = {} + if property_.description: + field_params["description"] = property_.description + + if property_.default is not _agentspec_empty_default: + default_field = Field(property_.default, **field_params) + else: + default_field = Field(..., **field_params) + + fields[property_.title] = (annotation, default_field) + + return create_model(model_name, **fields) # type: ignore diff --git a/pyagentspec/tests/adapters/crewai/conftest.py b/pyagentspec/tests/adapters/crewai/conftest.py index 75d086a1..d0622a23 100644 --- a/pyagentspec/tests/adapters/crewai/conftest.py +++ b/pyagentspec/tests/adapters/crewai/conftest.py @@ -54,3 +54,14 @@ def fake_response(self, method: str, endpoint: str, **kwargs) -> requests.Respon monkeypatch.setattr(PlusAPI, "_make_request", fake_response, raising=True) except ImportError: pass + + +@pytest.fixture +def mute_crewai_event_bus(monkeypatch): + try: + from crewai.events.event_bus import crewai_event_bus + except Exception: + return + + # Replace emit/aemit with no-ops to avoid background threads, rich output, and side effects + monkeypatch.setattr(crewai_event_bus, "emit", lambda *args, **kwargs: None, raising=True) diff --git a/pyagentspec/tests/adapters/crewai/flows/test_inputmessagenode_crewai.py b/pyagentspec/tests/adapters/crewai/flows/test_inputmessagenode_crewai.py new file mode 100644 index 00000000..6644c02a --- /dev/null +++ b/pyagentspec/tests/adapters/crewai/flows/test_inputmessagenode_crewai.py @@ -0,0 +1,60 @@ +# Copyright © 2025 Oracle and/or its affiliates. +# +# This software is under the Apache License 2.0 +# (LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0) or Universal Permissive License +# (UPL) 1.0 (LICENSE-UPL or https://oss.oracle.com/licenses/upl), at your option. + +import pytest + +from pyagentspec.flows.edges import ControlFlowEdge, DataFlowEdge +from pyagentspec.flows.flow import Flow +from pyagentspec.flows.nodes import EndNode, InputMessageNode, StartNode +from pyagentspec.property import StringProperty + + +@pytest.mark.usefixtures("mute_crewai_event_bus") +def test_inputmessagenode_can_be_imported_and_executed(monkeypatch) -> None: + from crewai import Flow as CrewAIFlow + + from pyagentspec.adapters.crewai import AgentSpecLoader + + # Mock interactive input() used by InputMessageNodeExecutor + monkeypatch.setattr("builtins.input", lambda: "3") + + custom_input_property = StringProperty(title="custom_input") + input_message_node = InputMessageNode( + name="input_message", + outputs=[custom_input_property], + ) + start_node = StartNode(name="start") + end_node = EndNode(name="end", outputs=[custom_input_property]) + + flow = Flow( + name="flow", + start_node=start_node, + nodes=[start_node, input_message_node, end_node], + control_flow_connections=[ + ControlFlowEdge(name="start_to_node", from_node=start_node, to_node=input_message_node), + ControlFlowEdge(name="node_to_end", from_node=input_message_node, to_node=end_node), + ], + data_flow_connections=[ + DataFlowEdge( + name="input_edge", + source_node=input_message_node, + source_output=custom_input_property.title, + destination_node=end_node, + destination_input=custom_input_property.title, + ), + ], + outputs=[custom_input_property], + ) + + flow_instance = AgentSpecLoader().load_component(flow) + + assert isinstance(flow_instance, CrewAIFlow) + + result = flow_instance.kickoff() + + assert isinstance(result, dict) + assert custom_input_property.title in result + assert result[custom_input_property.title] == "3" diff --git a/pyagentspec/tests/adapters/crewai/flows/test_llmnode_crewai.py b/pyagentspec/tests/adapters/crewai/flows/test_llmnode_crewai.py new file mode 100644 index 00000000..5aaae75d --- /dev/null +++ b/pyagentspec/tests/adapters/crewai/flows/test_llmnode_crewai.py @@ -0,0 +1,77 @@ +# Copyright © 2025 Oracle and/or its affiliates. +# +# This software is under the Apache License 2.0 +# (LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0) or Universal Permissive License +# (UPL) 1.0 (LICENSE-UPL or https://oss.oracle.com/licenses/upl), at your option. + +import os + +import pytest + +from pyagentspec.flows.edges import ControlFlowEdge, DataFlowEdge +from pyagentspec.flows.flow import Flow +from pyagentspec.flows.nodes import EndNode, LlmNode, StartNode +from pyagentspec.llms import OpenAiCompatibleConfig +from pyagentspec.property import StringProperty + + +@pytest.mark.usefixtures("mute_crewai_event_bus") +def test_llmnode_can_be_imported_and_executed() -> None: + from crewai import Flow as CrewAIFlow + + from pyagentspec.adapters.crewai import AgentSpecLoader + + nationality_property = StringProperty(title="nationality") + car_property = StringProperty(title="car") + llm_config = OpenAiCompatibleConfig( + name="llm_config", + model_id="/storage/models/Llama-3.1-70B-Instruct", + url=os.environ.get("LLAMA_API_URL"), + ) + llm_node = LlmNode( + name="llm_node", + llm_config=llm_config, + prompt_template="What is the fastest {{nationality}} car? Output in the format : ", + inputs=[nationality_property], + outputs=[car_property], + ) + start_node = StartNode(name="start", inputs=[nationality_property]) + end_node = EndNode(name="end", outputs=[car_property]) + + flow = Flow( + name="flow", + start_node=start_node, + nodes=[start_node, llm_node, end_node], + control_flow_connections=[ + ControlFlowEdge(name="start_to_node", from_node=start_node, to_node=llm_node), + ControlFlowEdge(name="node_to_end", from_node=llm_node, to_node=end_node), + ], + data_flow_connections=[ + DataFlowEdge( + name="input_edge", + source_node=start_node, + source_output=nationality_property.title, + destination_node=llm_node, + destination_input=nationality_property.title, + ), + DataFlowEdge( + name="car_edge", + source_node=llm_node, + source_output=car_property.title, + destination_node=end_node, + destination_input=car_property.title, + ), + ], + outputs=[car_property], + ) + + flow_instance = AgentSpecLoader().load_component(flow) + + assert isinstance(flow_instance, CrewAIFlow) + + result = flow_instance.kickoff({"nationality": "italian"}) + + assert isinstance(result, dict) + assert car_property.title in result + assert isinstance(result[car_property.title], str) + assert result[car_property.title] != "" diff --git a/pyagentspec/tests/adapters/crewai/flows/test_outputmessagenode_crewai.py b/pyagentspec/tests/adapters/crewai/flows/test_outputmessagenode_crewai.py new file mode 100644 index 00000000..76efc71d --- /dev/null +++ b/pyagentspec/tests/adapters/crewai/flows/test_outputmessagenode_crewai.py @@ -0,0 +1,69 @@ +# Copyright © 2025 Oracle and/or its affiliates. +# +# This software is under the Apache License 2.0 +# (LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0) or Universal Permissive License +# (UPL) 1.0 (LICENSE-UPL or https://oss.oracle.com/licenses/upl), at your option. + +import pytest + +from pyagentspec.flows.edges import ControlFlowEdge, DataFlowEdge +from pyagentspec.flows.flow import Flow +from pyagentspec.flows.nodes import EndNode, OutputMessageNode, StartNode +from pyagentspec.property import StringProperty + + +@pytest.mark.usefixtures("mute_crewai_event_bus") +def test_outputmessagenode_can_be_imported_and_executed(monkeypatch) -> None: + from crewai import Flow as CrewAIFlow + + from pyagentspec.adapters.crewai import AgentSpecLoader + + # Capture printed message from OutputMessageNodeExecutor via Printer.print + captured: dict[str, str] = {} + + def fake_print(self, content: str = "", color=None) -> None: + captured["content"] = content + + monkeypatch.setattr("crewai.utilities.printer.Printer.print", fake_print, raising=True) + + custom_input_property = StringProperty(title="custom_input") + output_message_node = OutputMessageNode( + name="output_message", + message="Hey {{custom_input}}", + inputs=[custom_input_property], + ) + start_node = StartNode(name="start", inputs=[custom_input_property]) + end_node = EndNode(name="end") + + flow = Flow( + name="flow", + start_node=start_node, + nodes=[start_node, output_message_node, end_node], + control_flow_connections=[ + ControlFlowEdge( + name="start_to_node", from_node=start_node, to_node=output_message_node + ), + ControlFlowEdge(name="node_to_end", from_node=output_message_node, to_node=end_node), + ], + data_flow_connections=[ + DataFlowEdge( + name="input_edge", + source_node=start_node, + source_output=custom_input_property.title, + destination_node=output_message_node, + destination_input=custom_input_property.title, + ), + ], + inputs=[custom_input_property], + ) + + flow_instance = AgentSpecLoader().load_component(flow) + + assert isinstance(flow_instance, CrewAIFlow) + + result = flow_instance.kickoff({"custom_input": "custom"}) + + assert isinstance(result, dict) + assert result == {} + assert "content" in captured + assert captured["content"].strip() == "Hey custom" diff --git a/pyagentspec/tests/adapters/crewai/flows/test_toolnode_crewai.py b/pyagentspec/tests/adapters/crewai/flows/test_toolnode_crewai.py new file mode 100644 index 00000000..d331cfdd --- /dev/null +++ b/pyagentspec/tests/adapters/crewai/flows/test_toolnode_crewai.py @@ -0,0 +1,74 @@ +# Copyright © 2025 Oracle and/or its affiliates. +# +# This software is under the Apache License 2.0 +# (LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0) or Universal Permissive License +# (UPL) 1.0 (LICENSE-UPL or https://oss.oracle.com/licenses/upl), at your option. + +import pytest + +from pyagentspec.flows.edges import ControlFlowEdge, DataFlowEdge +from pyagentspec.flows.flow import Flow +from pyagentspec.flows.nodes import EndNode, StartNode, ToolNode +from pyagentspec.property import Property +from pyagentspec.tools import ServerTool + + +@pytest.mark.usefixtures("mute_crewai_event_bus") +def test_toolnode_can_be_imported_and_executed() -> None: + from crewai import Flow as CrewAIFlow + + from pyagentspec.adapters.crewai import AgentSpecLoader + + x_property = Property(json_schema={"title": "input", "type": "number"}) + x_square_property = Property(json_schema={"title": "input_square", "type": "number"}) + + square_tool = ServerTool( + name="square_tool", + description="Computes the square of a number", + inputs=[x_property], + outputs=[x_square_property], + ) + + start_node = StartNode(name="subflow_start", inputs=[x_property]) + end_node = EndNode(name="subflow_end", outputs=[x_square_property]) + square_tool_node = ToolNode(name="square_tool_node", tool=square_tool) + + flow = Flow( + name="Square number flow", + start_node=start_node, + nodes=[start_node, square_tool_node, end_node], + control_flow_connections=[ + ControlFlowEdge(name="start_to_tool", from_node=start_node, to_node=square_tool_node), + ControlFlowEdge(name="tool_to_end", from_node=square_tool_node, to_node=end_node), + ], + data_flow_connections=[ + DataFlowEdge( + name="input_edge", + source_node=start_node, + source_output="input", + destination_node=square_tool_node, + destination_input="input", + ), + DataFlowEdge( + name="input_square_edge", + source_node=square_tool_node, + source_output="input_square", + destination_node=end_node, + destination_input="input_square", + ), + ], + ) + + def square_tool_callable(input: float) -> float: + return input * input + + tool_registry = {"square_tool": square_tool_callable} + flow_instance = AgentSpecLoader(tool_registry).load_component(flow) + + assert isinstance(flow_instance, CrewAIFlow) + + result = flow_instance.kickoff({"input": 4}) + + assert isinstance(result, dict) + assert "input_square" in result + assert result["input_square"] == 16.0 diff --git a/pyagentspec/tests/adapters/crewai/test_crewai_to_agentspec.py b/pyagentspec/tests/adapters/crewai/test_crewai_to_agentspec.py new file mode 100644 index 00000000..4a6b04c6 --- /dev/null +++ b/pyagentspec/tests/adapters/crewai/test_crewai_to_agentspec.py @@ -0,0 +1,104 @@ +# Copyright © 2025 Oracle and/or its affiliates. +# +# This software is under the Apache License 2.0 +# (LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0) or Universal Permissive License +# (UPL) 1.0 (LICENSE-UPL or https://oss.oracle.com/licenses/upl), at your option. + + +import os +from secrets import choice +from typing import TYPE_CHECKING, Any, Dict + +import pytest + +from pyagentspec.component import Component as AgentSpecComponent +from pyagentspec.flows.flow import Flow as AgentSpecFlow + +if TYPE_CHECKING: + from crewai import Flow as CrewAIFlow + + from pyagentspec.adapters.crewai import AgentSpecExporter + + +@pytest.fixture +def agentspec_exporter() -> "AgentSpecExporter": + from pyagentspec.adapters.crewai import AgentSpecExporter + + return AgentSpecExporter() + + +@pytest.fixture +def crewai_flow() -> "CrewAIFlow": + from crewai import LLM as CrewAILlm + from crewai import Flow as CrewAIFlow + from crewai.flow.flow import listen, start + + class PoemFlow(CrewAIFlow): + + @start() + def generate_sentence_count(self) -> int: + return choice([1, 2, 3]) + + @listen(generate_sentence_count) + def generate_poem(self, sentence_count) -> str: + try: + llm = CrewAILlm( + model="openai//storage/models/Llama-3.1-70B-Instruct", + api_base=f"http://{os.environ.get('LLAMA_API_URL')}/v1", + api_key="fake-api-key", + ) + return llm.call(f"Generate a very short poem with {sentence_count} sentences") + except Exception: + return " ".join(["Very poetic sentence."] * sentence_count) + + @listen(generate_poem) + def add_title(self, poem_text) -> str: + return f"The Best Poem\n{poem_text}" + + return PoemFlow() + + +def _get_tool_registry(flow: "CrewAIFlow") -> Dict[str, Any]: + """ + CrewAI flows store the callables for their nodes in a private dict field called _methods. + We can reuse this as the tool registry for AgentSpecLoader. + """ + return flow._methods + + +def test_convert_flow_to_agentspec( + crewai_flow: "CrewAIFlow", + agentspec_exporter: "AgentSpecExporter", +) -> None: + agentspec_flow: AgentSpecComponent = agentspec_exporter.to_component(crewai_flow) + + assert isinstance(agentspec_flow, AgentSpecFlow) + assert len(agentspec_flow.nodes) == 5 + assert set([n.name for n in agentspec_flow.nodes]) == set( + ["START", "END", "generate_sentence_count", "generate_poem", "add_title"] + ) + assert len(agentspec_flow.control_flow_connections) == 4 + assert len(agentspec_flow.data_flow_connections) == 3 + assert len(agentspec_flow.outputs) == 1 + assert agentspec_flow.outputs[0].title == "add_title_output" + + +@pytest.mark.usefixtures("mute_crewai_event_bus") +def test_convert_flow_to_agentspec_and_back_with_kickoff( + crewai_flow: "CrewAIFlow", + agentspec_exporter: "AgentSpecExporter", +): + from pyagentspec.adapters.crewai import AgentSpecLoader + + agentspec_flow_json: AgentSpecComponent = agentspec_exporter.to_json(crewai_flow) + + tool_registry = _get_tool_registry(crewai_flow) + agentspec_loader = AgentSpecLoader(tool_registry=tool_registry) + crewai_flow_reproduced = agentspec_loader.load_json(agentspec_flow_json) + + result = crewai_flow_reproduced.kickoff() + + assert isinstance(result, dict) + assert len(result) == 1 + assert "add_title_output" in result + assert result["add_title_output"].startswith("The Best Poem") diff --git a/pyagentspec/tests/adapters/langgraph/flows/test_llmnode.py b/pyagentspec/tests/adapters/langgraph/flows/test_llmnode.py index edab9b61..617258e2 100644 --- a/pyagentspec/tests/adapters/langgraph/flows/test_llmnode.py +++ b/pyagentspec/tests/adapters/langgraph/flows/test_llmnode.py @@ -68,4 +68,5 @@ def test_llmnode_can_be_imported_and_executed() -> None: outputs = result["outputs"] assert car_property.title in outputs - assert "ital" in outputs[car_property.title].lower() + assert isinstance(outputs[car_property.title], str) + assert outputs[car_property.title] != ""