Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
278 changes: 278 additions & 0 deletions pyagentspec/src/pyagentspec/adapters/crewai/_agentspecconverter.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
# (UPL) 1.0 (LICENSE-UPL or https://oss.oracle.com/licenses/upl), at your option.


import inspect
import uuid
from typing import (
Any,
Callable,
Dict,
List,
Optional,
Tuple,
Type,
Union,
cast,
Expand All @@ -25,12 +27,20 @@
from pyagentspec.adapters.crewai._types import (
CrewAIAgent,
CrewAIBaseTool,
CrewAIFlow,
CrewAILlm,
CrewAIStructuredTool,
CrewAITool,
FlowState,
)
from pyagentspec.agent import Agent as AgentSpecAgent
from pyagentspec.component import Component as AgentSpecComponent
from pyagentspec.flows.edges import ControlFlowEdge, DataFlowEdge
from pyagentspec.flows.flow import Flow as AgentSpecFlow
from pyagentspec.flows.node import Node as AgentSpecNode
from pyagentspec.flows.nodes import EndNode as AgentSpecEndNode
from pyagentspec.flows.nodes import StartNode as AgentSpecStartNode
from pyagentspec.flows.nodes import ToolNode as AgentSpecToolNode
from pyagentspec.llms import LlmConfig as AgentSpecLlmConfig
from pyagentspec.llms import LlmGenerationConfig as AgentSpecLlmGenerationConfig
from pyagentspec.llms.ollamaconfig import OllamaConfig as AgentSpecOllamaModel
Expand Down Expand Up @@ -123,6 +133,10 @@ def convert(
agentspec_component = self._tool_convert_to_agentspec(
crewai_component, referenced_objects
)
elif isinstance(crewai_component, CrewAIFlow):
agentspec_component = self._flow_convert_to_agentspec(
crewai_component, referenced_objects
)
else:
raise NotImplementedError(
f"The crewai type '{crewai_component.__class__.__name__}' is not yet supported "
Expand Down Expand Up @@ -217,3 +231,267 @@ def _agent_convert_to_agentspec(
for tool in (crewai_agent.tools or [])
],
)

def _flow_convert_to_agentspec(
self, crewai_flow: CrewAIFlow[FlowState], referenced_objects: Dict[str, Any]
) -> AgentSpecFlow:

nodes: Dict[str, AgentSpecNode] = {}

# Create a ToolNode for each method (i.e. node) in the flow
methods_by_name = getattr(crewai_flow, "_methods", {})
start_method_names = set(getattr(crewai_flow, "_start_methods", []))
for method_name in methods_by_name.keys():
method_callable = methods_by_name[method_name]

# Get method signature to infer properties since they're not explicitly defined
signature = inspect.signature(method_callable)

# Create input properties for nodes:
# - for start nodes we use their input parameters
# - for other nodes we either leave it empty or have a single "object" property
parameters = [p for p in signature.parameters.values() if p.name != "self"]
node_inputs = [
AgentSpecProperty(title=p.name, json_schema={"type": "object"})
for p in parameters[: (len(parameters) if method_name in start_method_names else 1)]
]

# Create output properties for nodes:
# - if there is a return value annotation, add a single "object" property
return_annotation = signature.return_annotation
has_output = not (
return_annotation is inspect.Signature.empty or return_annotation is None
)
node_outputs = (
[AgentSpecProperty(title=f"{method_name}_output", json_schema={"type": "object"})]
if has_output
else []
)

tool = AgentSpecServerTool(
name=method_name,
description=f"Converted CrewAI flow method '{method_name}'",
inputs=node_inputs,
outputs=node_outputs,
)
node = AgentSpecToolNode(
name=method_name,
tool=tool,
inputs=node_inputs,
outputs=node_outputs,
)
nodes[method_name] = node
referenced_objects[method_name] = node

# Start node with inferred properties
start_node_properties = [
property
for start_method in getattr(crewai_flow, "_start_methods", [])
for property in (nodes[start_method].inputs or [])
]
start_node = AgentSpecStartNode(
name="START", inputs=start_node_properties, outputs=start_node_properties
)
nodes[start_node.name] = start_node
referenced_objects[start_node.name] = start_node

control_flow_edges: list[ControlFlowEdge] = []
data_flow_edges: list[DataFlowEdge] = []

# Connect START to all start methods
for start_method in getattr(crewai_flow, "_start_methods", []):
control_flow_edges, data_flow_edges = self._add_start_edges(
start_node,
nodes[start_method],
control_flow_edges,
data_flow_edges,
)

# Build edges based on listeners
listeners = getattr(crewai_flow, "_listeners", {})

for listener_name, condition in listeners.items():
triggers, has_branching = self._extract_triggers_from_condition(condition)
if has_branching:
raise ValueError(
"Branching is not currently supported (AND, OR operators). "
f"Got node {listener_name} with triggers {', '.join(triggers)}"
)
for trigger in triggers:
if trigger in getattr(crewai_flow, "_methods", {}):
control_flow_edges, data_flow_edges = self._add_listener_edges(
nodes[trigger],
nodes[listener_name],
control_flow_edges,
data_flow_edges,
)

# End node with inferred properties
has_outgoing_edges: set[str] = set(
edge.from_node.name for edge in control_flow_edges if edge.from_node.name != "END"
)
end_methods = [
method_name
for method_name in nodes
if method_name not in ("START", "END") and method_name not in has_outgoing_edges
]
end_node_properties = [
property for end_method in end_methods for property in (nodes[end_method].outputs or [])
]
end_node = AgentSpecEndNode(
name="END", inputs=end_node_properties, outputs=end_node_properties
)
nodes[end_node.name] = end_node
referenced_objects[end_node.name] = end_node

# Connect END to all end methods
for end_method in end_methods:
control_flow_edges, data_flow_edges = self._add_end_edges(
nodes[end_method],
end_node,
control_flow_edges,
data_flow_edges,
)

return AgentSpecFlow(
name=(crewai_flow.name or crewai_flow.__class__.__name__),
start_node=start_node,
nodes=list(nodes.values()),
control_flow_connections=control_flow_edges,
data_flow_connections=data_flow_edges,
)

def _add_start_edges(
self,
start_node: AgentSpecNode,
destination_node: AgentSpecNode,
control_flow_edges: List[ControlFlowEdge],
data_flow_edges: List[DataFlowEdge],
) -> Tuple[List[ControlFlowEdge], List[DataFlowEdge]]:
control_flow_edges.append(
ControlFlowEdge(
name=f"START_to_{destination_node.name}_control_edge",
from_node=start_node,
to_node=destination_node,
)
)
start_node_outputs = start_node.outputs or []
start_node_output_property_names = [property.title for property in start_node_outputs]
destination_node_inputs = destination_node.inputs or []
for property in destination_node_inputs:
if property.title in start_node_output_property_names:
data_flow_edges.append(
DataFlowEdge(
name=f"START_to_{destination_node.name}_data_edge",
source_node=start_node,
destination_node=destination_node,
source_output=property.title,
destination_input=property.title,
)
)
return control_flow_edges, data_flow_edges

def _add_listener_edges(
self,
trigger_node: AgentSpecNode,
listener_node: AgentSpecNode,
control_flow_edges: List[ControlFlowEdge],
data_flow_edges: List[DataFlowEdge],
) -> Tuple[List[ControlFlowEdge], List[DataFlowEdge]]:
control_flow_edges.append(
ControlFlowEdge(
name=f"{trigger_node.name}_to_{listener_node.name}_control_edge",
from_node=trigger_node,
to_node=listener_node,
)
)
trigger_node_outputs = trigger_node.outputs or []
listener_node_inputs = listener_node.inputs or []
if len(trigger_node_outputs) == 1 and len(listener_node_inputs) == 1:
data_flow_edges.append(
DataFlowEdge(
name=f"{trigger_node.name}_to_{listener_node.name}_data_edge",
source_node=trigger_node,
destination_node=listener_node,
source_output=trigger_node_outputs[0].title,
destination_input=listener_node_inputs[0].title,
)
)
return control_flow_edges, data_flow_edges

def _add_end_edges(
self,
source_node: AgentSpecNode,
end_node: AgentSpecNode,
control_flow_edges: List[ControlFlowEdge],
data_flow_edges: List[DataFlowEdge],
) -> Tuple[List[ControlFlowEdge], List[DataFlowEdge]]:
control_flow_edges.append(
ControlFlowEdge(
name=f"{source_node.name}_to_END_control_edge",
from_node=source_node,
to_node=end_node,
)
)
source_node_outputs = source_node.outputs or []
end_node_inputs = end_node.inputs or []
end_node_input_property_names = [property.title for property in end_node_inputs]
for property in source_node_outputs:
if property.title in end_node_input_property_names:
data_flow_edges.append(
DataFlowEdge(
name=f"{source_node.name}_to_END_data_edge",
source_node=source_node,
destination_node=end_node,
source_output=property.title,
destination_input=property.title,
)
)
return control_flow_edges, data_flow_edges

def _extract_triggers_from_condition(self, condition: Any) -> Tuple[set[str], bool]:
"""
Extract flat trigger names from CrewAI listener condition.
Returns (triggers, has_branching) where has_branching indicates unsupported branching usage.
"""
has_branching = False
triggers: set[str] = set()

def _is_branching_condition(condition_type: Any, methods: Any) -> bool:
return (str(condition_type).upper() in ("AND", "OR")) and len(methods) > 1

# Simple tuple form: (condition_type, methods)
if isinstance(condition, tuple) and len(condition) == 2:
condition_type, methods = condition
for method in methods or []:
triggers.add(str(method))
return triggers, _is_branching_condition(condition_type, methods)

# Dict form: {"type": "OR"/"AND", "methods": [...] } or {"type": ..., "conditions": [...]}
if isinstance(condition, dict):
condition_type = str(condition.get("type", "OR")).upper()
if "methods" in condition:
methods = condition.get("methods", [])
for method in methods:
triggers.add(str(method))
return triggers, _is_branching_condition(condition_type, methods)
if "conditions" in condition:
subconditions = condition.get("conditions", [])
has_branching = _is_branching_condition(condition_type, subconditions)
for subcondition in subconditions:
if isinstance(subcondition, dict):
subtriggers, sub_has_branching = self._extract_triggers_from_condition(
subcondition
)
triggers |= subtriggers
has_branching = has_branching or sub_has_branching
else:
triggers.add(str(subcondition))
return triggers, has_branching

# Direct string method/label
if isinstance(condition, str):
triggers.add(condition)
return triggers, has_branching

return triggers, has_branching
Loading