Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 65 additions & 30 deletions Sources/StitchEngine/Actions/GraphCalculate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,46 +16,23 @@ extension GraphCalculatable {
Set<NodePortType<Node>>, // portsToUpdate
Bool // shouldResortPreviewLayers
) {
let graphState = self

var visitedNodes = Set<Node.ID>()
var queue = nodeIds

// Reset state on scheduled cycle nodes
self.topologicalData.nodesForNextGraphStep = .init()

var portsToUpdate = Set<NodePortType<Node>>()
let redEdgeCycles = self.topologicalData.cycleNodesForNextGraphStep

while let nodeId = topologicalData.getNextNodeToCalculate(for: queue,
visitedNodes: visitedNodes) {
queue.remove(nodeId)

guard !visitedNodes.contains(nodeId) else {
assertInDebug(self.topologicalData.cycleContains(nodeId))

#if DEV_DEBUG
// print("GraphState.calculate: scheduling cycle node \(nodeId)")
#endif

self.topologicalData.nodesForNextGraphStep.insert(nodeId)
continue
}

visitedNodes.insert(nodeId)

return self.traverseGraph(from: nodeIds,
redEdgeCycles: redEdgeCycles) { queuedNodeResult, queue in
// Retrieve the node afresh everytime,
// since an upstream node's changes may have changed its inputs
guard let node = self.getNode(id: nodeId) else {
guard let node = self.getNode(id: queuedNodeResult.nodeId) else {
// Not necessarily bad -- can happen if we deleted nodes but those nodes were still scheduled to run.
// fatalErrorIfDebug()
continue
return
}

let existingOutputValues = node.outputsValuesList
let outputCoordinates = node.outputCoordinates

guard let evalResult = self.calculateNode(node) else {
continue
return
}

// Update queue with NodeIds for nodes which need re-evaluation
Expand All @@ -71,9 +48,67 @@ extension GraphCalculatable {

// Update queue set with changed downstream nodes
queue = changedNodeIds.union(Set(queue))
}
}

@MainActor
/// The main graph calculator. Shouldn't be called directly unless you know what you're doing.
public func traverseGraph(
from nodeIds: Set<Node.ID>,
redEdgeCycles: Set<CycleNodeUpdateData<Node>>,
callback: @escaping @MainActor (QueuedNodeResultType<Node>, inout Set<Node.ID>) -> ()
) -> (
Set<NodePortType<Node>>, // portsToUpdate
Bool // shouldResortPreviewLayers
) {
let graphState = self

var visitedNodes = Set<Node.ID>()
var queue = nodeIds

// Reset state on scheduled cycle nodes
self.topologicalData.nodesForNextGraphStep = .init()
self.topologicalData.cycleNodesForNextGraphStep = .init()

var portsToUpdate = Set<NodePortType<Node>>()

// First handle red edge cycle nodes from last graph step by update inputs of nodes
for cycleNodeData in redEdgeCycles {
let didChangeInputs = self.updateInputs(inputCoordinate: cycleNodeData.portId,
upstreamOutputValues: cycleNodeData.values,
mediaList: cycleNodeData.mediaList,
upstreamOutputChanged: true,
// empty cycle starting points ensures we update inputs
cycleStartingPoints: .init())

// Run red edge cycle node's eval if inputs changed from this update
if didChangeInputs {
queue.insert(cycleNodeData.nodeId)
}
}

while let queuedNodeResult = topologicalData.getNextNodeToCalculate(for: queue,
visitedNodes: visitedNodes) {
let nodeId = queuedNodeResult.nodeId
queue.remove(nodeId)

guard !visitedNodes.contains(nodeId) else {
assertInDebug(self.topologicalData.cycleContains(nodeId))

#if DEV_DEBUG
// print("GraphState.calculate: scheduling cycle node \(nodeId)")
#endif

self.topologicalData.nodesForNextGraphStep.insert(nodeId)
continue
}

visitedNodes.insert(nodeId)

callback(queuedNodeResult, &queue)

// Track changed outputs here, inputs in didInputsUpdate
portsToUpdate.insert(NodePortType<Node>.allOutputs(node.id))
portsToUpdate.insert(NodePortType<Node>.allOutputs(nodeId))
} // while let ...


Expand Down
115 changes: 76 additions & 39 deletions Sources/StitchEngine/Actions/HoseFlow.swift
Original file line number Diff line number Diff line change
Expand Up @@ -68,48 +68,85 @@ extension GraphCalculatable {
var changedIds = Set<Node.InputRow.ID>()

for inputCoordinate in inputs {
// Get kind of downstream node
if let nodeViewModel = self.getNode(id: inputCoordinate.nodeId),
let inputObserver = nodeViewModel.getInputRowObserver(for: inputCoordinate.portType) {
let inputOldValues = inputObserver.values

if inputObserver.isPulseNodeType && !upstreamOutputChanged {
// If this is a pulse type input and the upstream output did not change,
// do not set the flowing value into the input.
// (Truthy values are coerced to current graph time, i.e. a pulse; we can only pulse when values actually change.)
continue
// } else if upstreamOutputChanged {
} else {

guard let existingInputValue = inputOldValues.first else {
continue
}

// `updateInputs(incomingValues: PortValues, graphTime) -> Bool`
// if we true, then

// Note: if the input supports directly copying, then these values will not actually be coerced
let flowValuesCoercedToThisInputType = inputObserver.coerce(
theseValues: upstreamOutputValues,
toThisType: existingInputValue,
currentGraphTime: self.currentGraphTime)

if inputOldValues != flowValuesCoercedToThisInputType {

inputObserver.setValuesInInput(flowValuesCoercedToThisInputType)
changedIds.insert(inputObserver.id)

// Update downstream observers
if let mediaList = mediaList,
let mediaObservers = nodeViewModel.getMediaObservers(port: inputCoordinate) {
nodeViewModel.updateInputMedia(inputCoordinate: inputCoordinate,
mediaList: mediaList)
}
}
}
let didChangeInputs = self.updateInputs(inputCoordinate: inputCoordinate,
upstreamOutputValues: upstreamOutputValues,
mediaList: mediaList,
upstreamOutputChanged: upstreamOutputChanged,
cycleStartingPoints: self.topologicalData.cycleStartingPoints)

if didChangeInputs {
changedIds.insert(inputCoordinate)
}
} // for inputCoordinate in ...

return changedIds
}

@MainActor
/// Returns: true if inputs had changed.
func updateInputs(inputCoordinate: Self.Node.InputRow.RowID,
upstreamOutputValues: [Self.Node.PortData],
mediaList: [Self.Node.EvalResult.MediaType?]?,
upstreamOutputChanged: Bool,
cycleStartingPoints: Set<Node.ID>) -> Bool {
// Get kind of downstream node
guard let nodeViewModel = self.getNode(id: inputCoordinate.nodeId),
let inputObserver = nodeViewModel.getInputRowObserver(for: inputCoordinate.portType) else {
return .init()
}

let inputOldValues = inputObserver.values
var didChange = false

if inputObserver.isPulseNodeType && !upstreamOutputChanged {
// If this is a pulse type input and the upstream output did not change,
// do not set the flowing value into the input.
// (Truthy values are coerced to current graph time, i.e. a pulse; we can only pulse when values actually change.)
return .init()
// } else if upstreamOutputChanged {
} else {
let isCycleRedEdge = cycleStartingPoints.contains(inputCoordinate.nodeId)

guard let existingInputValue = inputOldValues.first else {
return .init()
}

// `updateInputs(incomingValues: PortValues, graphTime) -> Bool`
// if we true, then

// Note: if the input supports directly copying, then these values will not actually be coerced
let flowValuesCoercedToThisInputType = inputObserver.coerce(
theseValues: upstreamOutputValues,
toThisType: existingInputValue,
currentGraphTime: self.currentGraphTime,
isCycleRedEdge: isCycleRedEdge)

if inputOldValues != flowValuesCoercedToThisInputType {
// Catch cycle red edge case if not yet tracked
if isCycleRedEdge {
self.topologicalData.cycleNodesForNextGraphStep.insert(
.init(portId: inputCoordinate,
values: flowValuesCoercedToThisInputType,
mediaList: mediaList)
)

// Skip updating this node on this cycle
return false
}


inputObserver.setValuesInInput(flowValuesCoercedToThisInputType)
didChange = true

// Update downstream observers
if let mediaList = mediaList,
let mediaObservers = nodeViewModel.getMediaObservers(port: inputCoordinate) {
nodeViewModel.updateInputMedia(inputCoordinate: inputCoordinate,
mediaList: mediaList)
}
}
}

return didChange
}
}
30 changes: 29 additions & 1 deletion Sources/StitchEngine/Actions/TopologicalDataRefresh.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ extension GraphCalculatable {

// All nodes excludes group nodes
let allNodes = Array(self.nodes.values.filter { !$0.isGroupNode })
let allNodeIds: Set<Self.Node.ID> = Set(allNodes.map(\.id))
let connections = self.createConnections()

// Maps connections by node instead of by coordinate
Expand All @@ -35,8 +36,9 @@ extension GraphCalculatable {
self.nodesForNextGraphStep = prevIdsToCalculate

// Gets a set of all node cycles
self.topologicalData.nodeCycles = TopologicalData
let nodeCycles = TopologicalData
.findAllCycles(downstreamNodesMap: downstream)
self.topologicalData.nodeCycles = nodeCycles

self.topologicalData.shallowDownstreamNodes = downstream

Expand All @@ -47,5 +49,31 @@ extension GraphCalculatable {
self.topologicalData._allMustRunNodes = self.topologicalData.nodesToAlwaysRun
.union(self.topologicalData.nodesScheduledToRun)
.union(self.topologicalData.keyboardNodes)

// Traverses full graph to identify cycle nodes that need to establish a red edge
var cycleStartingPoints = Set<Self.Node.ID>()
let _ = self.traverseGraph(from: allNodeIds,
// can be empty as we're just trying to traverse
redEdgeCycles: .init()) { queuedNodeResult, queue in
switch queuedNodeResult {
case .cycle(let nextCycleNodeResult):
guard let allNodesInCycle = nodeCycles.first(where: {
$0.contains(nextCycleNodeResult.nodeId)
}) else {
fatalErrorIfDebug()
return
}

// Only add red edge if this cycle not yet tracked
if cycleStartingPoints.intersection(allNodesInCycle).isEmpty {
cycleStartingPoints.insert(nextCycleNodeResult.nodeId)
}

default:
return
}
}

self.topologicalData.cycleStartingPoints = cycleStartingPoints
}
}
8 changes: 5 additions & 3 deletions Sources/StitchEngine/Data/GraphCalculatable.swift
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,11 @@ extension GraphCalculatable {
}

@MainActor public var nodesToRunOnGraphStep: Set<Node.ID> {
get {
self.topologicalData.nodesToRunOnGraphStep
}
self.topologicalData.nodesToRunOnGraphStep
}

@MainActor public var hasUnprocessedCycleNodes: Bool {
!self.topologicalData.cycleNodesForNextGraphStep.isEmpty
}

@MainActor public func setNodesForNextGraphStep(_ nodeIds: Set<Node.ID>) {
Expand Down
3 changes: 2 additions & 1 deletion Sources/StitchEngine/Data/Node/NodeRowCalculatable.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ public protocol InputNodeRowCalculatable: NodeRowCalculatable {

@MainActor func coerce(theseValues: [PortData],
toThisType: PortData,
currentGraphTime: TimeInterval) -> [PortData]
currentGraphTime: TimeInterval,
isCycleRedEdge: Bool) -> [PortData]

@MainActor func didInputsUpdate(newValues: [PortData],
oldValues: [PortData])
Expand Down
Loading