Skip to content

Commit 36c6948

Browse files
committed
added support for flows in crewai adapter
1 parent 078600a commit 36c6948

File tree

12 files changed

+1220
-63
lines changed

12 files changed

+1220
-63
lines changed

pyagentspec/src/pyagentspec/adapters/crewai/_agentspecconverter.py

Lines changed: 279 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,15 @@
55
# (UPL) 1.0 (LICENSE-UPL or https://oss.oracle.com/licenses/upl), at your option.
66

77

8+
import inspect
89
import uuid
910
from typing import (
1011
Any,
1112
Callable,
1213
Dict,
1314
List,
1415
Optional,
16+
Tuple,
1517
Type,
1618
Union,
1719
cast,
@@ -25,12 +27,20 @@
2527
from pyagentspec.adapters.crewai._types import (
2628
CrewAIAgent,
2729
CrewAIBaseTool,
30+
CrewAIFlow,
2831
CrewAILlm,
2932
CrewAIStructuredTool,
3033
CrewAITool,
34+
FlowState,
3135
)
3236
from pyagentspec.agent import Agent as AgentSpecAgent
3337
from pyagentspec.component import Component as AgentSpecComponent
38+
from pyagentspec.flows.edges import ControlFlowEdge, DataFlowEdge
39+
from pyagentspec.flows.flow import Flow as AgentSpecFlow
40+
from pyagentspec.flows.node import Node as AgentSpecNode
41+
from pyagentspec.flows.nodes import EndNode as AgentSpecEndNode
42+
from pyagentspec.flows.nodes import StartNode as AgentSpecStartNode
43+
from pyagentspec.flows.nodes import ToolNode as AgentSpecToolNode
3444
from pyagentspec.llms import LlmConfig as AgentSpecLlmConfig
3545
from pyagentspec.llms import LlmGenerationConfig as AgentSpecLlmGenerationConfig
3646
from pyagentspec.llms.ollamaconfig import OllamaConfig as AgentSpecOllamaModel
@@ -123,6 +133,10 @@ def convert(
123133
agentspec_component = self._tool_convert_to_agentspec(
124134
crewai_component, referenced_objects
125135
)
136+
elif isinstance(crewai_component, CrewAIFlow):
137+
agentspec_component = self._flow_convert_to_agentspec(
138+
crewai_component, referenced_objects
139+
)
126140
else:
127141
raise NotImplementedError(
128142
f"The crewai type '{crewai_component.__class__.__name__}' is not yet supported "
@@ -217,3 +231,268 @@ def _agent_convert_to_agentspec(
217231
for tool in (crewai_agent.tools or [])
218232
],
219233
)
234+
235+
def _flow_convert_to_agentspec(
236+
self, crewai_flow: CrewAIFlow[FlowState], referenced_objects: Dict[str, Any]
237+
) -> AgentSpecFlow:
238+
239+
nodes: Dict[str, AgentSpecNode] = {}
240+
241+
# Create a ToolNode for each method (i.e. node) in the flow
242+
methods_by_name = getattr(crewai_flow, "_methods", {})
243+
start_method_names = set(getattr(crewai_flow, "_start_methods", []))
244+
for method_name in methods_by_name.keys():
245+
method_callable = methods_by_name[method_name]
246+
247+
# Get method signature to infer properties since they're not explicitly defined
248+
signature = inspect.signature(method_callable)
249+
250+
# Create input properties for nodes:
251+
# - for start nodes we use their input parameters
252+
# - for other nodes we either leave it empty or have a single "object" property
253+
parameters = [p for p in signature.parameters.values() if p.name != "self"]
254+
node_inputs = [
255+
AgentSpecProperty(title=p.name, json_schema={"type": "object"})
256+
for p in parameters[: (len(parameters) if method_name in start_method_names else 1)]
257+
]
258+
259+
# Create output properties for nodes:
260+
# - if there is a return value annotation, add a single "object" property
261+
return_annotation = signature.return_annotation
262+
has_output = not (
263+
return_annotation is inspect.Signature.empty or return_annotation is None
264+
)
265+
node_outputs = (
266+
[AgentSpecProperty(title=f"{method_name}_output", json_schema={"type": "object"})]
267+
if has_output
268+
else []
269+
)
270+
271+
tool = AgentSpecServerTool(
272+
name=method_name,
273+
description=f"Converted CrewAI flow method '{method_name}'",
274+
inputs=node_inputs,
275+
outputs=node_outputs,
276+
)
277+
node = AgentSpecToolNode(
278+
name=str(method_name),
279+
tool=tool,
280+
inputs=node_inputs,
281+
outputs=node_outputs,
282+
)
283+
nodes[str(method_name)] = node
284+
referenced_objects[str(method_name)] = node
285+
286+
# Start node with inferred properties
287+
start_node_properties = [
288+
property
289+
for start_method in getattr(crewai_flow, "_start_methods", [])
290+
for property in (nodes[start_method].inputs or [])
291+
]
292+
start_node = AgentSpecStartNode(
293+
name="START", inputs=start_node_properties, outputs=start_node_properties
294+
)
295+
nodes[start_node.name] = start_node
296+
referenced_objects[start_node.name] = start_node
297+
298+
control_flow_edges: list[ControlFlowEdge] = []
299+
data_flow_edges: list[DataFlowEdge] = []
300+
301+
# Connect START to all start methods
302+
for start_method in getattr(crewai_flow, "_start_methods", []):
303+
control_flow_edges, data_flow_edges = self._add_start_edges(
304+
start_node,
305+
nodes[start_method],
306+
control_flow_edges,
307+
data_flow_edges,
308+
)
309+
310+
# Build edges based on listeners
311+
listeners = getattr(crewai_flow, "_listeners", {})
312+
313+
for listener_name, condition in listeners.items():
314+
triggers, has_branching = self._extract_triggers_from_condition(condition)
315+
if has_branching:
316+
raise ValueError(
317+
"Branching is not currently supported (AND, OR operators). "
318+
f"Got node {listener_name} with triggers {', '.join(triggers)}"
319+
)
320+
for trigger in triggers:
321+
if trigger in getattr(crewai_flow, "_methods", {}):
322+
control_flow_edges, data_flow_edges = self._add_listener_edges(
323+
nodes[trigger],
324+
nodes[listener_name],
325+
control_flow_edges,
326+
data_flow_edges,
327+
)
328+
329+
# End node with inferred properties
330+
has_outgoing_edges: set[str] = set(
331+
edge.from_node.name for edge in control_flow_edges if edge.from_node.name != "END"
332+
)
333+
end_methods = [
334+
method_name
335+
for method_name in nodes
336+
if method_name not in ("START", "END") and method_name not in has_outgoing_edges
337+
]
338+
end_node_properties = [
339+
property for end_method in end_methods for property in (nodes[end_method].outputs or [])
340+
]
341+
end_node = AgentSpecEndNode(
342+
name="END", inputs=end_node_properties, outputs=end_node_properties
343+
)
344+
nodes[end_node.name] = end_node
345+
referenced_objects[end_node.name] = end_node
346+
347+
# Connect END to all end methods
348+
for end_method in end_methods:
349+
control_flow_edges, data_flow_edges = self._add_end_edges(
350+
nodes[end_method],
351+
end_node,
352+
control_flow_edges,
353+
data_flow_edges,
354+
)
355+
356+
flow_name = getattr(crewai_flow, "name", None) or crewai_flow.__class__.__name__
357+
return AgentSpecFlow(
358+
name=flow_name,
359+
start_node=start_node,
360+
nodes=list(nodes.values()),
361+
control_flow_connections=control_flow_edges,
362+
data_flow_connections=data_flow_edges,
363+
)
364+
365+
def _add_start_edges(
366+
self,
367+
start_node: AgentSpecNode,
368+
destination_node: AgentSpecNode,
369+
control_flow_edges: List[ControlFlowEdge],
370+
data_flow_edges: List[DataFlowEdge],
371+
) -> Tuple[List[ControlFlowEdge], List[DataFlowEdge]]:
372+
control_flow_edges.append(
373+
ControlFlowEdge(
374+
name=f"START_to_{destination_node.name}_control_edge",
375+
from_node=start_node,
376+
to_node=destination_node,
377+
)
378+
)
379+
start_node_outputs = start_node.outputs or []
380+
start_node_output_property_names = [property.title for property in start_node_outputs]
381+
destination_node_inputs = destination_node.inputs or []
382+
for property in destination_node_inputs:
383+
if property.title in start_node_output_property_names:
384+
data_flow_edges.append(
385+
DataFlowEdge(
386+
name=f"START_to_{destination_node.name}_data_edge",
387+
source_node=start_node,
388+
destination_node=destination_node,
389+
source_output=property.title,
390+
destination_input=property.title,
391+
)
392+
)
393+
return control_flow_edges, data_flow_edges
394+
395+
def _add_listener_edges(
396+
self,
397+
trigger_node: AgentSpecNode,
398+
listener_node: AgentSpecNode,
399+
control_flow_edges: List[ControlFlowEdge],
400+
data_flow_edges: List[DataFlowEdge],
401+
) -> Tuple[List[ControlFlowEdge], List[DataFlowEdge]]:
402+
control_flow_edges.append(
403+
ControlFlowEdge(
404+
name=f"{trigger_node.name}_to_{listener_node.name}_control_edge",
405+
from_node=trigger_node,
406+
to_node=listener_node,
407+
)
408+
)
409+
trigger_node_outputs = trigger_node.outputs or []
410+
listener_node_inputs = listener_node.inputs or []
411+
if len(trigger_node_outputs) == 1 and len(listener_node_inputs) == 1:
412+
data_flow_edges.append(
413+
DataFlowEdge(
414+
name=f"{trigger_node.name}_to_{listener_node.name}_data_edge",
415+
source_node=trigger_node,
416+
destination_node=listener_node,
417+
source_output=trigger_node_outputs[0].title,
418+
destination_input=listener_node_inputs[0].title,
419+
)
420+
)
421+
return control_flow_edges, data_flow_edges
422+
423+
def _add_end_edges(
424+
self,
425+
source_node: AgentSpecNode,
426+
end_node: AgentSpecNode,
427+
control_flow_edges: List[ControlFlowEdge],
428+
data_flow_edges: List[DataFlowEdge],
429+
) -> Tuple[List[ControlFlowEdge], List[DataFlowEdge]]:
430+
control_flow_edges.append(
431+
ControlFlowEdge(
432+
name=f"{source_node.name}_to_END_control_edge",
433+
from_node=source_node,
434+
to_node=end_node,
435+
)
436+
)
437+
source_node_outputs = source_node.outputs or []
438+
end_node_inputs = end_node.inputs or []
439+
end_node_input_property_names = [property.title for property in end_node_inputs]
440+
for property in source_node_outputs:
441+
if property.title in end_node_input_property_names:
442+
data_flow_edges.append(
443+
DataFlowEdge(
444+
name=f"{source_node.name}_to_END_data_edge",
445+
source_node=source_node,
446+
destination_node=end_node,
447+
source_output=property.title,
448+
destination_input=property.title,
449+
)
450+
)
451+
return control_flow_edges, data_flow_edges
452+
453+
def _extract_triggers_from_condition(self, condition: Any) -> Tuple[set[str], bool]:
454+
"""
455+
Extract flat trigger names from CrewAI listener condition.
456+
Returns (triggers, has_branching) where has_branching indicates unsupported branching usage.
457+
"""
458+
has_branching = False
459+
triggers: set[str] = set()
460+
461+
def _is_branching_condition(condition_type: Any, methods: Any) -> bool:
462+
return (str(condition_type).upper() in ("AND", "OR")) and len(methods) > 1
463+
464+
# Simple tuple form: (condition_type, methods)
465+
if isinstance(condition, tuple) and len(condition) == 2:
466+
condition_type, methods = condition
467+
for method in methods or []:
468+
triggers.add(str(method))
469+
return triggers, _is_branching_condition(condition_type, methods)
470+
471+
# Dict form: {"type": "OR"/"AND", "methods": [...] } or {"type": ..., "conditions": [...]}
472+
if isinstance(condition, dict):
473+
condition_type = str(condition.get("type", "OR")).upper()
474+
if "methods" in condition:
475+
methods = condition.get("methods", [])
476+
for method in methods:
477+
triggers.add(str(method))
478+
return triggers, _is_branching_condition(condition_type, methods)
479+
if "conditions" in condition:
480+
subconditions = condition.get("conditions", [])
481+
has_branching = _is_branching_condition(condition_type, subconditions)
482+
for subcondition in subconditions:
483+
if isinstance(subcondition, dict):
484+
subtriggers, sub_has_branching = self._extract_triggers_from_condition(
485+
subcondition
486+
)
487+
triggers |= subtriggers
488+
has_branching = has_branching or sub_has_branching
489+
else:
490+
triggers.add(str(subcondition))
491+
return triggers, has_branching
492+
493+
# Direct string method/label
494+
if isinstance(condition, str):
495+
triggers.add(condition)
496+
return triggers, has_branching
497+
498+
return triggers, has_branching

0 commit comments

Comments
 (0)