diff --git a/lib/syskit.rb b/lib/syskit.rb index 1b0e0fc65..f0524cc0e 100644 --- a/lib/syskit.rb +++ b/lib/syskit.rb @@ -77,6 +77,7 @@ module ProcessManagers require "syskit/models/task_context" require "syskit/models/ruby_task_context" require "syskit/models/deployment" +require "syskit/models/deployed_task_instanciation" require "syskit/models/configured_deployment" require "syskit/models/deployment_group" diff --git a/lib/syskit/component.rb b/lib/syskit/component.rb index e5887ca90..f3a9ff401 100644 --- a/lib/syskit/component.rb +++ b/lib/syskit/component.rb @@ -1,7 +1,7 @@ # frozen_string_literal: true module Syskit - Roby::EventStructure.relation "SyskitConfigurationPrecedence" + Roby::EventStructure.relation "SyskitConfigurationPrecedence", strong: true # Base class for models that represent components (TaskContext, # Composition) diff --git a/lib/syskit/deployment.rb b/lib/syskit/deployment.rb index ea8dd2926..a207e510e 100644 --- a/lib/syskit/deployment.rb +++ b/lib/syskit/deployment.rb @@ -97,8 +97,8 @@ def has_orocos_name?(orocos_name) end def instanciate_all_tasks - model.each_orogen_deployed_task_context_model.map do |act| - task(name_mappings[act.name]) + each_orogen_deployed_task_context_model.map do |act| + task(act.name) end end @@ -106,49 +106,20 @@ def instanciate_all_tasks # # It takes into account deployment prefix def each_orogen_deployed_task_context_model(&block) - model.each_orogen_deployed_task_context_model(&block) + orogen_model.task_activities.each(&block) end - # Either find the existing task that matches the given deployment specification, - # or creates and adds it. - # - # @param (see #task) - def find_or_create_task(name, syskit_task_model = nil, auto_conf: false) - orogen_task_deployment_model = deployed_orogen_model_by_name(name) - if orogen_master = orogen_task_deployment_model.master - mapped_master = name_mappings[orogen_master.name] - scheduler_task = find_or_create_task( - mapped_master, auto_conf: true - ) - candidates = scheduler_task.each_parent_task - else - candidates = each_executed_task - end + # The deployment's orogen model with the name mappings applied + def orogen_model + return @orogen_model if @orogen_model - # I don't know why name_mappings[orogen.name] would not be - # equal to 'name' and I couldn't find a reason for this in the - # git history when I refactored this. - # - # I keep it here for now, just in case, but that would need to - # be investigated - # - # TODO - mapped_name = name_mappings[orogen_task_deployment_model.name] - candidates.each do |task| - return task if task.orocos_name == mapped_name - end - - create_deployed_task( - orogen_task_deployment_model, - syskit_task_model, - scheduler_task, auto_conf: auto_conf - ) + @orogen_model = model.map_orogen_model(name_mappings) end def deployed_orogen_model_by_name(name) orogen_task_deployment = each_orogen_deployed_task_context_model - .find { |act| name == name_mappings[act.name] } + .find { |act| name == act.name } unless orogen_task_deployment available = each_orogen_deployed_task_context_model .map { |act| name_mappings[act.name] } @@ -168,8 +139,7 @@ def deployed_orogen_model_by_name(name) # Create and add a task model supported by this deployment # # @param [OroGen::Spec::TaskDeployment] orogen_task_deployment_model - # the orogen model that describes this - # deployment + # the orogen model that describes this deployment, already mapped # @param [Models::TaskContext,nil] syskit_task_model the expected # syskit task model, or nil if it is meant to use the basic model. # This is useful in specialized models (e.g. dynamic services) @@ -180,17 +150,11 @@ def deployed_orogen_model_by_name(name) # a configuration that matches the task's orocos name (if it exists). This # is mostly used for scheduling tasks, which are automatically instanciated # by Syskit. - # - # @see find_or_create_task task - def create_deployed_task( - orogen_task_deployment_model, - syskit_task_model, scheduler_task, auto_conf: false - ) - mapped_name = name_mappings[orogen_task_deployment_model.name] + def create_deployed_task(orogen_task_deployment_model, syskit_task_model) + mapped_name = orogen_task_deployment_model.name if ready? && !(remote_handles = remote_task_handles[mapped_name]) raise InternalError, - "no remote handle describing #{mapped_name} in #{self}" \ - "(got #{remote_task_handles.keys.sort.join(', ')})" + "cannot find remote task handles for #{mapped_name}" end if task_context_in_fatal?(mapped_name) @@ -199,29 +163,15 @@ def create_deployed_task( "from #{self}" end - base_syskit_task_model = model.resolve_syskit_model_for_deployed_task( - orogen_task_deployment_model + task = instanciate_deployed_task( + mapped_name, + orogen_model: orogen_task_deployment_model, + syskit_model: syskit_task_model, + plan: plan ) - syskit_task_model ||= base_syskit_task_model - unless syskit_task_model <= base_syskit_task_model - raise ArgumentError, - "incompatible explicit selection of task model " \ - "#{syskit_task_model} for the model of #{mapped_name} in #{self}, " \ - "expected #{base_syskit_task_model} or one of its subclasses" - end - task = syskit_task_model - .new(orocos_name: mapped_name, read_only: read_only?(mapped_name)) - plan.add(task) task.executed_by self - if scheduler_task - task.depends_on scheduler_task, role: "scheduler" - task.should_configure_after scheduler_task.start_event - end - - task.orogen_model = orogen_task_deployment_model task.initialize_remote_handles(remote_handles) if remote_handles - auto_select_conf(task) if auto_conf task end @@ -237,7 +187,7 @@ def read_only?(mapped_name) # model that should be used to create the task, if it is not the # same as the base model. This is used for specialized models (e.g. # dynamic services) - def task(name, syskit_task_model = nil) + def task(name, syskit_task_model = nil, setup_scheduler: true) if finishing? || finished? raise InvalidState, "#{self} is either finishing or already " \ @@ -245,32 +195,34 @@ def task(name, syskit_task_model = nil) end orogen_task_deployment_model = deployed_orogen_model_by_name(name) + task = create_deployed_task(orogen_task_deployment_model, syskit_task_model) + if setup_scheduler + task_setup_scheduler(task, existing_tasks: executed_tasks_by_name) + end + task + end - if (orogen_master = orogen_task_deployment_model.master) - scheduler_task = find_or_create_task( - orogen_master.name, auto_conf: true + # (see DeployedTaskInstanciation#task_setup_scheduler) + def task_setup_scheduler(task, existing_tasks: {}) + return unless (scheduler_task = super) + + scheduler_task.executed_by self + + if ready? && !scheduler_task.has_remote_information? + scheduler_task.initialize_remote_handles( + remote_task_handles.fetch(task.orocos_name) ) end - create_deployed_task( - orogen_task_deployment_model, - syskit_task_model, scheduler_task - ) + scheduler_task end - # Selects the configuration of a master task + include Models::DeployedTaskInstanciation + + # The tasks executed by this deployment, as a name to task hash # - # Master tasks are auto-injected in the network, and as such the - # user cannot select their configuration. This picks either - # ['default', task.orocos_name] if the master task's has a configuration - # section matching the task's name, or ['default'] otherwise. - private def auto_select_conf(task) - manager = task.model.configuration_manager - task.conf = - if manager.has_section?(task.orocos_name) - ["default", task.orocos_name] - else - ["default"] - end + # @return [Hash] + def executed_tasks_by_name + each_executed_task.to_h { |t| [t.orocos_name, t] } end ## diff --git a/lib/syskit/models/configured_deployment.rb b/lib/syskit/models/configured_deployment.rb index 7823ca84c..cba5a9232 100644 --- a/lib/syskit/models/configured_deployment.rb +++ b/lib/syskit/models/configured_deployment.rb @@ -49,6 +49,12 @@ def non_logger_task_names @name_mappings.values.grep_v(/_Logger$/) end + def read_only?(orocos_name) + read_only.include?(orocos_name) + end + + include DeployedTaskInstanciation + # @api private # # Resolves the read_only argument into an Array of task names to be set as @@ -102,13 +108,7 @@ def command_line(loader: Roby.app.default_pkgconfig_loader, **options) def orogen_model return @orogen_model if @orogen_model - @orogen_model = model.orogen_model.dup - orogen_model.task_activities.map! do |activity| - activity = activity.dup - activity.name = name_mappings[activity.name] || activity.name - activity - end - @orogen_model + @orogen_model = model.map_orogen_model(name_mappings) end # Enumerate the tasks that are deployed by this configured @@ -131,14 +131,8 @@ def each_deployed_task_model # Enumerate the oroGen specification for the deployed tasks # # @yieldparam [OroGen::Spec::TaskDeployment] - def each_orogen_deployed_task_context_model - return enum_for(__method__) unless block_given? - - model.each_orogen_deployed_task_context_model do |deployed_task| - task = deployed_task.dup - task.name = name_mappings[task.name] || task.name - yield(task) - end + def each_orogen_deployed_task_context_model(&block) + orogen_model.task_activities.each(&block) end # Create a new deployment task that can represent self in a plan diff --git a/lib/syskit/models/deployed_task_instanciation.rb b/lib/syskit/models/deployed_task_instanciation.rb new file mode 100644 index 000000000..f3ebb05b6 --- /dev/null +++ b/lib/syskit/models/deployed_task_instanciation.rb @@ -0,0 +1,116 @@ +# frozen_string_literal: true + +module Syskit + module Models + # Features that implement the instanciation of deployed tasks, compatible with + # both {ConfiguredDeployment} and {Syskit::Deployment} + module DeployedTaskInstanciation + # Create a task from this deployment's + # + # @param [String] orocos_name the mapped name of the deployed task + # @param [OroGen::Spec::DeployedTask,nil] orogen_model the orogen model of the + # deployed task, if known. Pass nil if it has to be resolved. + # @param [TaskContext,nil] syskit_model the syskit model that should be used + # to create the task instance. It may be a submodel of the deployment's task + # model. Pass nil to resolve from the deployment model + # @param [Roby::Plan,nil] plan the plan in which the task should be created + # @return [(OroGen::Spec::DeployedTask,TaskContext)] the resolved orogen + # models and + def instanciate_deployed_task( + orocos_name, orogen_model: nil, syskit_model: nil, plan: nil + ) + orogen_model, syskit_model = + instanciate_deployed_task_resolve_task_model( + orocos_name, orogen_model, syskit_model + ) + + args = { + orogen_model: orogen_model, + orocos_name: orocos_name, + read_only: read_only?(orocos_name) + } + args[:plan] = plan if plan + syskit_model.new(**args) + end + + # Create the 'scheduler' task of a task (if needed) + # + # OroGen components can be explicitly triggered by another component. This + # shows up as having a 'master' in the deployed task model. This method makes + # sure that the scheduler component is instanciated, and sets up proper + # relationship between the scheduled task and the scheduler task + def task_setup_scheduler(task, existing_tasks: {}) + return unless (orogen_master_m = task.orogen_model.master) + + mapped_master_name = orogen_master_m.name + unless (scheduler_task = existing_tasks[mapped_master_name]) + scheduler_task = + instanciate_deployed_task(mapped_master_name, plan: task.plan) + scheduler_task.select_conf_from_name + + existing_tasks = + existing_tasks.merge({ mapped_master_name => scheduler_task }) + end + + task_setup_scheduler(scheduler_task, existing_tasks: existing_tasks) + + task.depends_on scheduler_task, role: "scheduler" + task.should_configure_after scheduler_task.start_event + scheduler_task + end + + # Helper for {#instanciate_deployed_task} to resolve the syskit task model + # that should be used to represent the deployed task + # + # @param [String] orocos_name the mapped name of the deployed task + # @param [OroGen::Spec::DeployedTask,nil] orogen_model the orogen model of the + # deployed task, if known. Pass nil if it has to be resolved. + # @param [TaskContext,nil] syskit_model the syskit model that should be used + # to create the task instance. It may be a submodel of the deployment's task + # model. Pass nil to resolve from the deployment model + # @return [(OroGen::Spec::DeployedTask,TaskContext)] the resolved orogen + # models and + def instanciate_deployed_task_resolve_task_model( + orocos_name, orogen_model = nil, syskit_model = nil + ) + orogen_model ||= + each_orogen_deployed_task_context_model + .find { |m| m.name == orocos_name } + + unless orogen_model + raise ArgumentError, "no deployed task found for #{orocos_name}" + end + + base_syskit_model = resolve_syskit_model_for_deployed_task(orogen_model) + if syskit_model && !(syskit_model <= base_syskit_model) # rubocop:disable Style/InverseMethods + raise ArgumentError, + "incompatible explicit selection of task model " \ + "#{syskit_model} for the model of #{orogen_model} in " \ + "#{self}, expected #{base_syskit_model} " \ + "or one of its subclasses" + end + + [orogen_model, syskit_model || base_syskit_model] + end + + # Resolve the syskit task context model that should be used to represent a + # deployed task + # + # @param [OroGen::Spec::DeployedTask] orogen_deployed_task the model, which + # name has been mapped + def resolve_syskit_model_for_deployed_task(orogen_deployed_task) + unmapped_name = name_mappings.rassoc(orogen_deployed_task.name)&.first + unless unmapped_name + raise ArgumentError, + "no mapping points to name #{orogen_deployed_task.name}, " \ + "known names: #{name_mappings}. This method expects the " \ + "mapped name as argument" + end + + model.resolve_syskit_model_for_deployed_task( + orogen_deployed_task, name: unmapped_name + ) + end + end + end +end diff --git a/lib/syskit/models/deployment.rb b/lib/syskit/models/deployment.rb index 50707de32..966647ef3 100644 --- a/lib/syskit/models/deployment.rb +++ b/lib/syskit/models/deployment.rb @@ -76,6 +76,13 @@ def method_missing(name, *args, **kw) end end + # Return the task model for the given deployed task + # + # @return [Class] + def task(name) + @task_name_to_syskit_model.fetch(name) + end + # Creates a new deployment model # # @option options [OroGen::Spec::Deployment] orogen_model the oroGen @@ -174,9 +181,10 @@ def find_syskit_model_for_deployed_task(deployed_task) # @api private # # Resolve the Syskit task model for one of this deployment's tasks - def resolve_syskit_model_for_deployed_task(deployed_task) - task_name = deployed_task.name - if (registered = @task_name_to_syskit_model[task_name]) + def resolve_syskit_model_for_deployed_task( + deployed_task, name: deployed_task.name + ) + if (registered = @task_name_to_syskit_model[name]) return registered end @@ -188,7 +196,7 @@ def resolve_syskit_model_for_deployed_task(deployed_task) "Deployment.define_deployed_task" ) - @task_name_to_syskit_model[task_name] = + @task_name_to_syskit_model[name] = ::Syskit::TaskContext.model_for(deployed_task.task_model) end @@ -212,6 +220,29 @@ def each_deployed_task_model def each_orogen_deployed_task_context_model(&block) orogen_model.task_activities.each(&block) end + + # Return the deployment's orogen model with a given name mapping applied + def map_orogen_model(name_mappings) + mapped_model = orogen_model.dup + activity_mapping = {} + mapped_model.task_activities.map! do |activity| + mapped_activity = activity.dup + activity_mapping[activity] = mapped_activity + mapped_activity.name = name_mappings[activity.name] || activity.name + mapped_activity + end + + mapped_model.task_activities.each do |t| + if (mapped_master = activity_mapping[t.master]) + t.master = mapped_master + end + + t.slaves.replace( + t.slaves.map { |t| activity_mapping[t] } + ) + end + mapped_model + end end end end diff --git a/lib/syskit/models/deployment_group.rb b/lib/syskit/models/deployment_group.rb index 9c55d46f2..7ce1186fa 100644 --- a/lib/syskit/models/deployment_group.rb +++ b/lib/syskit/models/deployment_group.rb @@ -21,11 +21,16 @@ class DeploymentGroup attr_reader :deployments DeployedTask = Struct.new :configured_deployment, :mapped_task_name do - # Create an instance of this deployed task on the given plan + # Create an instance of this deployed task as well as its deployment + # (execution agent) # - # @param [ConfiguredDeployment=>Syskit::Deployment] already - # instanciated deployment tasks, to be reused if self + # @param [Roby::Plan] plan the plan the new tasks should be added to + # @param [Boolean] permanent set to true if the deployment task should + # be set as permanent + # @param [ConfiguredDeployment=>Syskit::Deployment] deployment_tasks + # already instanciated deployment tasks, to be reused if self # is part of the same ConfiguredDeployment + # @return [(TaskContext,Deployment)] a deployed task def instanciate(plan, permanent: true, deployment_tasks: {}) deployment_task = ( deployment_tasks[[configured_deployment]] ||= @@ -39,6 +44,22 @@ def instanciate(plan, permanent: true, deployment_tasks: {}) end [deployment_task.task(mapped_task_name), deployment_task] end + + def orocos_name + mapped_task_name + end + + def read_only? + configured_deployment.read_only?(mapped_task_name) + end + + def orogen_model + return @orogen_model if @orogen_model + + @orogen_model = + configured_deployment + .orogen_model.find_task_by_name(mapped_task_name) + end end def initialize diff --git a/lib/syskit/network_generation.rb b/lib/syskit/network_generation.rb index 84c1c7b56..6d333ca6a 100644 --- a/lib/syskit/network_generation.rb +++ b/lib/syskit/network_generation.rb @@ -17,5 +17,6 @@ module NetworkGeneration require "syskit/network_generation/logger" require "syskit/network_generation/merge_solver" require "syskit/network_generation/resolution_error_handler" +require "syskit/network_generation/runtime_network_adaptation" require "syskit/network_generation/system_network_deployer" require "syskit/network_generation/system_network_generator" diff --git a/lib/syskit/network_generation/dataflow_computation.rb b/lib/syskit/network_generation/dataflow_computation.rb index 2b0cf0c30..16eca19c5 100644 --- a/lib/syskit/network_generation/dataflow_computation.rb +++ b/lib/syskit/network_generation/dataflow_computation.rb @@ -192,27 +192,32 @@ def propagate(tasks) debug "" debug "== Gathering Initial Information" tasks.each do |task| - debug { "computing initial information for #{task}" } - - log_nest(4) do + debug "computing initial information for #{task}" + log_nest(2) do initial_information(task) - if connections = triggering_port_connections(task) - triggering_connections[task] = connections - triggering_dependencies[task] = connections.map do |port_name, triggers| - triggers.ports.map(&:first) - end + end + end - debug do - debug "#{connections.size} triggering connections for #{task}" - connections.each do |port_name, info| - debug " for #{port_name}" - log_nest(8) do - log_pp :debug, info - end - end - break + # This MUST be done after the loop above, don't merge them + tasks.each do |task| # rubocop:disable Style/CombinableLoops + connections = log_nest(2) { triggering_port_connections(task) } + next unless connections + + triggering_connections[task] = connections + triggering_dependencies[task] = + connections.map do |port_name, triggers| + triggers.ports.map(&:first) + end + + debug do + debug " #{connections.size} triggering connections for #{task}" + connections.each do |port_name, info| + debug " for #{port_name}" + log_nest(6) do + log_pp :debug, info end end + break end end diff --git a/lib/syskit/network_generation/dataflow_dynamics.rb b/lib/syskit/network_generation/dataflow_dynamics.rb index 6b69b8bc3..6e604aaff 100644 --- a/lib/syskit/network_generation/dataflow_dynamics.rb +++ b/lib/syskit/network_generation/dataflow_dynamics.rb @@ -369,6 +369,11 @@ def initial_slaves_information(task) # # Computes a task's initial information def initial_task_information(task) + # We need to resolve the slaves, as the master's task done_task_info + # will call done_task_info on the slaves as well + # + # Note that #initial_information is explicitly skipping tasks + # that have a master activity initial_slaves_information(task) set_port_info(task, nil, PortDynamics.new("#{task.orocos_name}.main")) @@ -517,6 +522,11 @@ def propagate_task(task) done end + # Tests whether the given task is deployed + def deployed_task?(task) + task.orocos_name + end + # Computes desired connection policies, based on the port dynamics # and the oroGen's input port specifications. See the user's guide # for more details @@ -526,7 +536,7 @@ def compute_connection_policies # We only act on deployed tasks, as we need to know how the # tasks are triggered (what activity / priority / ...) deployed_tasks = plan.find_local_tasks(TaskContext) - .find_all(&:execution_agent) + .find_all { deployed_task?(_1) } propagate(deployed_tasks) diff --git a/lib/syskit/network_generation/engine.rb b/lib/syskit/network_generation/engine.rb index e6d0ed4ab..fec4415cd 100644 --- a/lib/syskit/network_generation/engine.rb +++ b/lib/syskit/network_generation/engine.rb @@ -89,6 +89,7 @@ def initialize( event_logger: event_logger, resolution_control: resolution_control ) + @used_deployments = {} @required_instances = {} end @@ -114,6 +115,7 @@ def compute_deployed_network( required_instances: [], default_deployment_group: Syskit.conf.deployment_group, compute_policies: true, + lazy_deploy: Syskit.conf.lazy_deploy?, validate_deployed_network: true ) resolution_errors = [] @@ -126,15 +128,17 @@ def compute_deployed_network( resolution_control: @resolution_control ) - deployer.deploy( - error_handler: error_handler, validate: validate_deployed_network + used_deployments, = deployer.deploy( + error_handler: error_handler, validate: validate_deployed_network, + lazy: lazy_deploy ) + @used_deployments = @used_deployments.merge(used_deployments) resolution_errors = error_handler.process_failures( required_instances, cleanup_failed_tasks: true ) # Sanity check that the plan was properly cleaned up SystemNetworkDeployer.verify_all_tasks_deployed( - work_plan, default_deployment_group + work_plan, default_deployment_group, lazy: lazy_deploy ) SystemNetworkGenerator.verify_all_deployments_are_unique( work_plan, toplevel_tasks_to_requirements.dup @@ -154,7 +158,7 @@ def compute_deployed_network( @dataflow_dynamics.result.each do |task, dynamics| task.trigger_information = dynamics end - interruption_point "compute_connection_policies" + interruption_point "syskit-netgen:compute_connection_policies" end resolution_errors @@ -169,11 +173,15 @@ def apply_deployed_network_to_plan # running tasks @deployment_tasks, @deployed_tasks = log_timepoint_group "finalize_deployed_tasks" do - finalize_deployed_tasks + adaptation = RuntimeNetworkAdaptation.new( + work_plan, @used_deployments, + merge_solver: @merge_solver, + event_logger: @event_logger, + resolution_control: @resolution_control + ) + adaptation.apply end - sever_old_plan_from_new_plan - if @dataflow_dynamics @dataflow_dynamics.apply_merges(merge_solver) log_timepoint "apply_merged_to_dataflow_dynamics" @@ -184,42 +192,6 @@ def apply_deployed_network_to_plan end end - # "Cut" relations between the "old" plan and the new one - # - # At this stage, old components (task contexts and compositions) - # that are not part of the new plan may still be child of bits of - # the new plan. This happens if they are added as children of other - # task contexts. The transformer does this to register dynamic - # transformation producers - # - # This pass looks for all proxies of compositions and task contexts - # that are not the target of a merge operation. When this happens, - # we know that the component is not being reused, and we remove all - # dependency relations where it is child and where the parent is - # "useful" - # - # Note that we do this only for relations between Syskit - # components. Relations with "plan" Roby tasks are updated because - # we replace toplevel tasks. - def sever_old_plan_from_new_plan - old_tasks = - work_plan - .find_local_tasks(Syskit::Component) - .find_all(&:transaction_proxy?) - - merge_leaves = merge_solver.each_merge_leaf.to_set - old_tasks.each do |old_task| - next if merge_leaves.include?(old_task) - - parents = - old_task - .each_parent_task - .find_all { |t| merge_leaves.include?(t) } - - parents.each { |t| t.remove_child(old_task) } - end - end - class << self # Set of blocks registered with # register_instanciation_postprocessing @@ -375,366 +347,6 @@ def fix_toplevel_tasks(required_instances) end end - # Given the network with deployed tasks, this method looks at how we - # could adapt the running network to the new one - def finalize_deployed_tasks - used_deployments = work_plan.find_local_tasks(Deployment).to_set - used_tasks = work_plan.find_local_tasks(Component).to_set - - all_tasks = import_existing_tasks - interruption_point "syskit-netgen:apply:imported-existing-tasks" - imported_tasks_remove_direct_connections(all_tasks - used_tasks) - interruption_point( - "syskit-netgen:apply:imported-tasks-removed-connections" - ) - - finishing_deployments, existing_deployments = - import_existing_deployments(used_deployments) - interruption_point "syskit-netgen:apply:import-existing-deployments" - - debug do - debug " Mapping deployments in the network to the existing ones" - debug " Network deployments:" - used_deployments.each { |dep| debug " #{dep}" } - debug " Existing deployments:" - existing_deployments - .values.flatten.each { |dep| debug " #{dep}" } - break - end - - newly_deployed_tasks = Set.new - reused_deployed_tasks = Set.new - selected_deployment_tasks = Set.new - used_deployments.each do |deployment_task| - # Check for the corresponding task in the plan - process_name = deployment_task.process_name - existing_deployment_tasks = existing_deployments[process_name] || [] - - if existing_deployment_tasks.size > 1 - raise InternalError, - "more than one task for #{process_name} " \ - "present in the plan: #{existing_deployment_tasks}" - end - - selected, new, reused = handle_required_deployment( - deployment_task, - existing_deployment_tasks.first, - finishing_deployments[process_name] - ) - newly_deployed_tasks.merge(new) - reused_deployed_tasks.merge(reused) - selected_deployment_tasks << selected - interruption_point( - "syskit-netgen:apply:select-deployment", - log_on_interruption_only: true - ) - end - log_timepoint "syskit-netgen:selected-deployments" - - reused_deployed_tasks = - reconfigure_tasks_on_static_port_modification(reused_deployed_tasks) - log_timepoint( - "syskit-netgen:reconfigure_tasks_on_static_port_modification" - ) - - debug do - debug "#{reused_deployed_tasks.size} tasks reused during deployment" - reused_deployed_tasks.each do |t| - debug " #{t}" - end - break - end - - # This is required to merge the already existing compositions - # with the ones in the plan - merge_solver.merge_compositions - log_timepoint "syskit-netgen:merge" - - [selected_deployment_tasks, reused_deployed_tasks | newly_deployed_tasks] - end - - # Process a single deployment in {#finalize_deployed_tasks} - # - # @param [Syskit::Deployment] required the deployment task, part of - # the new network - # @param [Syskit::Deployment,nil] usable usable deployment candidate - # found in the running plan - # @param [Syskit::Deployment,nil] not_reusable deployment instance found - # in the running plan, matching required, but not reusable. Both usable - # and not_reusable may be non-nil if usable is pending. It is not possible - # otherwise (can't have the same deployment running twice) - def handle_required_deployment(required, usable, not_reusable) - debug do - debug " looking to reuse a deployment for " \ - "#{required.process_name} (#{required})" - debug " candidate: #{usable}" - debug " not reusable deployment: #{not_reusable}" - break - end - - if usable - usable, not_reusable = validate_usable_deployment( - required, usable, not_reusable - ) - end - - if usable - newly_deployed_tasks = [] - reused_deployed_tasks = adapt_existing_deployment(required, usable) - selected = usable - else - # Nothing to do, we leave the plan as it is - newly_deployed_tasks = required.each_executed_task - reused_deployed_tasks = [] - selected = required - end - - if not_reusable - work_plan.unmark_permanent_task(not_reusable) - not_reusable.scheduled_for_kill! - selected.should_start_after(not_reusable.stop_event) - end - [selected, newly_deployed_tasks, reused_deployed_tasks] - end - - # Validate that the usable deployment we found is actually usable - # - # @see existing_deployment_needs_restart? - def validate_usable_deployment(required, usable, non_reusable) - # Check if the existing deployment would need to be restarted - # because of quarantine/fatal error tasks - needs_restart = existing_deployment_needs_restart?(required, usable) - return [usable, non_reusable] unless needs_restart - - # non_reusable_deployment should be nil here. There should not - # be one if the usable deployment is running, and it is running - # since existing_deployment_needs_restart? can't return true - # for a pending deployment - return [nil, usable] unless non_reusable - - raise InternalError, - "non-nil non_reusable_deployment found in #{__method__} while " \ - "existing_deployment_needs_restart? returned true" - end - - # Do deeper 'usability' checks for an existing deployment found for - # a required one - # - # In some cases (quarantined tasks, FATAL_ERROR), an existing deployment - # that seem reusable actually cannot. This check is dependent on which - # task contexts are needed, which cannot be done within Deployment#reusable? - # - # @param [Syskit::Deployment] required the deployment part of the network - # being deployed - # @param [Syskit::Deployment] existing the deployment part of the running - # plan that is being considered - def existing_deployment_needs_restart?(required, existing) - restart_enabled = - Syskit.conf.auto_restart_deployments_with_quarantines? - return unless restart_enabled - return unless existing.has_fatal_errors? || existing.has_quarantines? - - required.each_executed_task do |t| - return true if existing.task_context_in_fatal?(t.orocos_name) - return true if existing.task_context_quarantined?(t.orocos_name) - end - false - end - - # Import the component objects that are already in the main plan - # - # The graphs are modified to handle the deployment of the network - # being generated - # - # @param [Array] used_tasks the tasks that are part of the - # new network - def import_existing_tasks - all_tasks = work_plan.find_tasks(Component).to_set - interruption_point "syskit-engine:imported-tasks" - - all_tasks.delete_if do |t| - if !t.reusable? - debug { " clearing the relations of the finished task #{t}" } - t.remove_relations(Syskit::Flows::DataFlow) - t.remove_relations(Roby::TaskStructure::Dependency) - true - elsif t.transaction_proxy? && t.abstract? - work_plan.remove_task(t) - true - end - end - interruption_point "syskit-engine:imported-tasks:cleanup" - - all_tasks - end - - # Remove connections that are not forwarding connections (e.g. - # composition exports) - def imported_tasks_remove_direct_connections(tasks) - dataflow_graph = - work_plan.task_relation_graph_for(Syskit::Flows::DataFlow) - tasks.each do |t| - dataflow_graph.in_neighbours(t).dup.each do |source_t| - connections = dataflow_graph.edge_info(source_t, t).dup - connections.delete_if do |(source_port, sink_port), _policy| - both_output = source_t.find_output_port(source_port) && - t.find_output_port(sink_port) - both_input = source_t.find_input_port(source_port) && - t.find_input_port(sink_port) - !both_output && !both_input - end - if connections.empty? - dataflow_graph.remove_edge(source_t, t) - else - dataflow_graph.set_edge_info(source_t, t, connections) - end - end - interruption_point( - "syskit-engine:imported-tasks:dataflow-cleanup", - log_on_interruption_only: true - ) - end - end - - # Import all non-finished deployments from the actual plan into the - # work plan, and sort them into those we can use and those we can't - def import_existing_deployments(used_deployments) - deployments = work_plan.find_tasks(Syskit::Deployment).not_finished - - finishing_deployments = {} - existing_deployments = {} - deployments.each do |task| - if !task.reusable? - finishing_deployments[task.process_name] = task - elsif !used_deployments.include?(task) - (existing_deployments[task.process_name] ||= []) << task - end - end - - [finishing_deployments, existing_deployments] - end - - # After the deployment phase, we check whether some static ports are - # modified and cause their task to be reconfigured. - # - # Note that tasks that are already reconfigured because of - # {#adapt_existing_deployment} will be fine as the task is not - # configured yet - def reconfigure_tasks_on_static_port_modification(deployed_tasks) - final_deployed_tasks = deployed_tasks.dup - - # We filter against 'deployed_tasks' to always select the tasks - # that have been selected in this deployment. It does mean that - # the task is always the 'current' one, that is we would pick - # the new deployment task and ignore the one that is being - # replaced - already_setup_tasks = - work_plan - .find_tasks(Syskit::TaskContext).not_finished.not_finishing - .find_all { |t| !t.read_only? } - .find_all do |t| - deployed_tasks.include?(t) && (t.setting_up? || t.setup?) - end - - already_setup_tasks.each do |t| - next unless t.transaction_modifies_static_ports? - - debug do - "#{t} was selected as deployment, but it would require " \ - "modifications on static ports, spawning a new task" - end - - new_task = t.execution_agent.task(t.orocos_name, t.concrete_model) - merge_solver.apply_merge_group(t => new_task) - new_task.should_configure_after t.stop_event - final_deployed_tasks.delete(t) - final_deployed_tasks << new_task - end - final_deployed_tasks - end - - # Find the "last" deployed task in a set of related deployed tasks - # in the plan - # - # Ordering is encoded in the should_configure_after relation - def find_current_deployed_task(deployed_tasks) - configuration_precedence_graph = work_plan.event_relation_graph_for( - Roby::EventStructure::SyskitConfigurationPrecedence - ) - - tasks = deployed_tasks.find_all do |t| - t.reusable? && configuration_precedence_graph.leaf?(t.stop_event) - end - - if tasks.size > 1 - raise InternalError, - "could not find the current task in " \ - "#{deployed_tasks.map(&:to_s).sort.join(', ')}" - end - - tasks.first - end - - # Given a required deployment task in {#work_plan} and a proxy - # representing an existing deployment task in {#real_plan}, modify - # the plan to reuse the existing deployment - # - # @return [Array] the set of TaskContext - # instances that have been used to replace the task contexts - # generated during network generation. They are all deployed by - # existing_deployment_task, and some of them might be transaction - # proxies. - def adapt_existing_deployment(deployment_task, existing_deployment_task) - orocos_name_to_existing = {} - existing_deployment_task.each_executed_task do |t| - next if t.finished? - - (orocos_name_to_existing[t.orocos_name] ||= []) << t - end - - applied_merges = Set.new - deployed_tasks = deployment_task.each_executed_task.to_a - deployed_tasks.each do |task| - existing_tasks = - orocos_name_to_existing[task.orocos_name] || [] - existing_task = find_current_deployed_task(existing_tasks) - - if !existing_task || !task.can_be_deployed_by?(existing_task) - debug do - if existing_task - " task #{task.orocos_name} has been deployed, but " \ - "I can't merge with the existing deployment " \ - "(#{existing_task})" - else - " task #{task.orocos_name} has not yet been deployed" - end - end - - new_task = existing_deployment_task - .task(task.orocos_name, task.concrete_model) - debug do - " creating #{new_task} for #{task} (#{task.orocos_name})" - end - - existing_tasks.each do |previous_task| - debug do - " #{new_task} needs to wait for #{existing_task} " \ - "to finish before reconfiguring" - end - - new_task.should_configure_after(previous_task.stop_event) - end - existing_task = new_task - end - - merge_solver.apply_merge_group(task => existing_task) - applied_merges << existing_task - debug { " using #{existing_task} for #{task} (#{task.orocos_name})" } - end - work_plan.remove_task(deployment_task) - applied_merges - end - # Computes the set of requirement tasks that should be used for # deployment within the given plan def self.discover_requirement_tasks_from_plan(plan) @@ -759,7 +371,8 @@ def compute_system_network( validate_generated_network: true, default_deployment_group: Syskit.conf.deployment_group, early_deploy: Syskit.conf.early_deploy?, - validate_deployed_network: early_deploy, + lazy_deploy: Syskit.conf.lazy_deploy?, + validate_deployed_network: early_deploy && !lazy_deploy, cleanup_resolution_errors: true ) requirement_tasks = requirement_tasks.to_a @@ -772,6 +385,7 @@ def compute_system_network( event_logger: event_logger, merge_solver: merge_solver, default_deployment_group: default_deployment_group, + lazy_deploy: lazy_deploy, early_deploy: early_deploy, resolution_control: @resolution_control ) @@ -780,7 +394,7 @@ def compute_system_network( instance_requirements ) - system_network_generator.resolve_system_network( + _, @used_deployments = system_network_generator.resolve_system_network( garbage_collect: garbage_collect, validate_abstract_network: validate_abstract_network, validate_generated_network: validate_generated_network, @@ -840,6 +454,7 @@ def resolve_system_network( default_deployment_group: Syskit.conf.deployment_group, compute_policies: true, early_deploy: Syskit.conf.early_deploy?, + lazy_deploy: Syskit.conf.lazy_deploy?, capture_errors_during_network_resolution: Syskit.conf.capture_errors_during_network_resolution?, cleanup_resolution_errors: true @@ -862,6 +477,7 @@ def resolve_system_network( (default_deployment_group if early_deploy), validate_deployed_network: validate_deployed_network, early_deploy: early_deploy && compute_deployments, + lazy_deploy: lazy_deploy && compute_deployments, cleanup_resolution_errors: cleanup_resolution_errors ) @@ -874,6 +490,7 @@ def resolve_system_network( required_instances: required_instances, default_deployment_group: default_deployment_group, compute_policies: compute_policies, + lazy_deploy: lazy_deploy, validate_deployed_network: validate_deployed_network ) resolution_errors.concat(deployment_resolution_errors) @@ -922,20 +539,21 @@ def resolve( cleanup_resolution_errors: on_error != :commit ) merge_solver.merge_task_contexts_with_same_agent = early_deploy - required_instances, resolution_errors = resolve_system_network( - requirement_tasks, - garbage_collect: garbage_collect, - validate_abstract_network: validate_abstract_network, - validate_generated_network: validate_generated_network, - compute_deployments: compute_deployments, - default_deployment_group: default_deployment_group, - compute_policies: compute_policies, - validate_deployed_network: validate_deployed_network, - early_deploy: early_deploy, - capture_errors_during_network_resolution: - capture_errors_during_network_resolution, - cleanup_resolution_errors: cleanup_resolution_errors - ) + required_instances, resolution_errors = + resolve_system_network( + requirement_tasks, + garbage_collect: garbage_collect, + validate_abstract_network: validate_abstract_network, + validate_generated_network: validate_generated_network, + compute_deployments: compute_deployments, + default_deployment_group: default_deployment_group, + compute_policies: compute_policies, + validate_deployed_network: validate_deployed_network, + early_deploy: early_deploy, + capture_errors_during_network_resolution: + capture_errors_during_network_resolution, + cleanup_resolution_errors: cleanup_resolution_errors + ) # Can only be reached if the capture_error_during_network_resolution flag # is true diff --git a/lib/syskit/network_generation/merge_solver.rb b/lib/syskit/network_generation/merge_solver.rb index 3d475aef9..69cd71dea 100644 --- a/lib/syskit/network_generation/merge_solver.rb +++ b/lib/syskit/network_generation/merge_solver.rb @@ -235,8 +235,9 @@ def may_merge_components?(merged_task, task) # Tests whether task.merge(target_task) is a valid operation # - # @param [Syskit::TaskContext] task - # @param [Syskit::TaskContext] target_task + # @param [Syskit::TaskContext] merged_task the task that we will merge into + # the other one. This task disappears after the merge + # @param [Syskit::TaskContext] task the task we will merge into # # @return [false,true] if false, the merge is not possible. If # true, it is possible. @@ -253,12 +254,18 @@ def may_merge_task_contexts?(merged_task, task) true end + # Tests if two tasks can be merged w.r.t. their selected deployment + # + # @param [Syskit::TaskContext] merged_task the task that we will merge into + # the other one. This task disappears after the merge + # @param [Syskit::TaskContext] task the task we will merge into def mergeable_agents?(merged_task, task) unless merge_task_contexts_with_same_agent? return !(task.execution_agent && merged_task.execution_agent) end - return false unless task.execution_agent && merged_task.execution_agent + return false unless task.arguments.set?(:orocos_name) && + merged_task.arguments.set?(:orocos_name) task.orocos_name == merged_task.orocos_name end diff --git a/lib/syskit/network_generation/runtime_network_adaptation.rb b/lib/syskit/network_generation/runtime_network_adaptation.rb new file mode 100644 index 000000000..3371873d9 --- /dev/null +++ b/lib/syskit/network_generation/runtime_network_adaptation.rb @@ -0,0 +1,600 @@ +# frozen_string_literal: true + +module Syskit + module NetworkGeneration + # Encapsulation of the part of the deployment algorithm that deals with + # adapting the runtime network to match the system network + class RuntimeNetworkAdaptation + extend Logger::Hierarchy + include Logger::Hierarchy + include Roby::DRoby::EventLogging + + attr_reader :event_logger + + # @param [{Component=>DeploymentGroup::DeployedTask}] used_deployments the + # mapping from task instances to the deployment used for it + def initialize( + work_plan, used_deployments, + merge_solver:, + event_logger: work_plan.event_logger, + resolution_control: Async::Control.new + ) + @work_plan = work_plan + @event_logger = event_logger + @resolution_control = resolution_control + @merge_solver = merge_solver + + tasks_per_configured_deployments = + used_deployments.each_with_object({}) do |(task, deployed_task), h| + (h[deployed_task.configured_deployment] ||= []) << task + end + @used_deployments = + tasks_per_configured_deployments.map do |configured_deployment, tasks| + UsedDeployment.new( + configured_deployment: configured_deployment, tasks: tasks + ) + end + end + + def apply + result = finalize_deployed_tasks + sever_old_plan_from_new_plan + result + end + + def interruption_point(name, log_on_interruption_only: false) + continue = @resolution_control.interruption_point( + self, name, log_on_interruption_only: log_on_interruption_only + ) + throw :syskit_netgen_cancelled unless continue + end + + # Given the network with deployed tasks, this method looks at how we + # could adapt the running network to the new one + def finalize_deployed_tasks + finishing_deployments, existing_deployments = + import_from_runtime_plan + interruption_point "syskit-netgen:apply:import-from-runtime-plan" + + used_deployments_with_existing = + used_deployments_find_existing( + @used_deployments, existing_deployments, finishing_deployments + ) + + newly_deployed_tasks, reused_deployed_tasks, selected_deployment_tasks = + finalize_used_deployments(used_deployments_with_existing) + + debug do + debug "#{reused_deployed_tasks.size} tasks reused during deployment" + reused_deployed_tasks.each do |t| + debug " #{t}" + end + break + end + + # This is required to merge the already existing compositions + # with the ones in the plan + @merge_solver.merge_compositions + log_timepoint "syskit-netgen:merge" + + [selected_deployment_tasks, reused_deployed_tasks | newly_deployed_tasks] + end + + def import_from_runtime_plan + used_tasks = @work_plan.find_local_tasks(Component).to_set + + all_tasks = import_existing_tasks + interruption_point "syskit-netgen:apply:imported-existing-tasks" + imported_tasks_remove_direct_connections(all_tasks - used_tasks) + + finishing_deployments, existing_deployments = + import_existing_deployments(@used_deployments) + + [finishing_deployments, existing_deployments] + end + + # Find existing deployment tasks and finishing deployment tasks for the + # system's used deployments + # + # @return an array of 3-tuples (deployment_task, existing_deployment_task, + # finishing_deployment) + def used_deployments_find_existing( + used_deployments, existing_deployments, finishing_deployments + ) + used_deployments.map do |deployment_task| + # Check for the corresponding task in the plan + process_name = deployment_task.process_name + existing_deployment_tasks = existing_deployments[process_name] || [] + + if existing_deployment_tasks.size > 1 + raise InternalError, + "more than one task for #{process_name} " \ + "present in the plan: #{existing_deployment_tasks}" + end + + [deployment_task, existing_deployment_tasks.first, + finishing_deployments[process_name]] + end + end + + # Make sure that the tasks that need to be deployed in the work plan are + # actually deployed + def finalize_used_deployments(used_deployments_with_existing) + newly_deployed_tasks = Set.new + reused_deployed_tasks = Set.new + selected_deployment_tasks = Set.new + used_deployments_with_existing.each do |deployment, existing, finishing| + selected, new, reused = + handle_required_deployment(deployment, existing, finishing) + + newly_deployed_tasks.merge(new) + reused_deployed_tasks.merge(reused) + selected_deployment_tasks << selected + interruption_point( + "syskit-netgen:apply:select-deployment", + log_on_interruption_only: true + ) + end + log_timepoint "syskit-netgen:selected-deployments" + + reused_deployed_tasks = + reconfigure_tasks_on_static_port_modification(reused_deployed_tasks) + log_timepoint( + "syskit-netgen:reconfigure_tasks_on_static_port_modification" + ) + + [newly_deployed_tasks, reused_deployed_tasks, selected_deployment_tasks] + end + + # Process a single deployment in {#finalize_deployed_tasks} + # + # @param [Syskit::Deployment] required the deployment task, part of + # the new network + # @param [Syskit::Deployment,nil] usable usable deployment candidate + # found in the running plan + # @param [Syskit::Deployment,nil] not_reusable deployment instance found + # in the running plan, matching required, but not reusable. Both usable + # and not_reusable may be non-nil if usable is pending. It is not possible + # otherwise (can't have the same deployment running twice) + def handle_required_deployment(required, usable, not_reusable) + debug do + debug " looking to reuse a deployment for " \ + "#{required.process_name} (#{required})" + debug " candidate: #{usable}" + debug " not reusable deployment: #{not_reusable}" + break + end + + if usable + usable, not_reusable = validate_usable_deployment( + required, usable, not_reusable + ) + end + + if usable + newly_deployed_tasks = [] + reused_deployed_tasks = adapt_existing_deployment(required, usable) + selected = usable + else + reused_deployed_tasks = [] + selected, newly_deployed_tasks = handle_new_deployment(required) + end + + if not_reusable + @work_plan.unmark_permanent_task(not_reusable) + not_reusable.scheduled_for_kill! + selected.should_start_after(not_reusable.stop_event) + end + [selected, newly_deployed_tasks, reused_deployed_tasks] + end + + # Handle new deployments for {#handle_required_deployment} + # + # In the old deployment method (the "eager" deployment), this is essentially + # a no-op. In the new method (the "lazy" method), this is where new + # deployments get created and replace the tasks + def handle_new_deployment(required) + executed_tasks = required.each_executed_task.to_a + lazily_deployed_tasks = executed_tasks.find_all { !_1.execution_agent } + + if lazily_deployed_tasks.empty? + deployment_task = executed_tasks.first.execution_agent + return [deployment_task, executed_tasks] + elsif lazily_deployed_tasks.size != executed_tasks.size + raise InternalError, + "in #handle_new_deployment: some tasks are deployed and some " \ + "are not" + end + + used_deployment_instanciate(required) + end + + # Instanciate a UsedDeployment + def used_deployment_instanciate(required) + deployment_task = required.configured_deployment.new + if Syskit.conf.permanent_deployments? + @work_plan.add_permanent_task(deployment_task) + else + @work_plan.add_task(deployment_task) + end + + deployed_tasks = required.tasks.map do |initial_deployed_task| + deployed_task = deployment_task.task( + initial_deployed_task.orocos_name, + setup_scheduler: false + ) + + # !!! Cf. comment in SystemNetworkDeployer#apply_selected_deployments + @merge_solver.apply_merge_group( + initial_deployed_task => deployed_task + ) + deployed_task + end + + [deployment_task, deployed_tasks] + end + + # Validate that the usable deployment we found is actually usable + # + # @see existing_deployment_needs_restart? + def validate_usable_deployment(required, usable, non_reusable) + # Check if the existing deployment would need to be restarted + # because of quarantine/fatal error tasks + needs_restart = existing_deployment_needs_restart?(required, usable) + return [usable, non_reusable] unless needs_restart + + # non_reusable_deployment should be nil here. There should not + # be one if the usable deployment is running, and it is running + # since existing_deployment_needs_restart? can't return true + # for a pending deployment + return [nil, usable] unless non_reusable + + raise InternalError, + "non-nil non_reusable_deployment found in #{__method__} while " \ + "existing_deployment_needs_restart? returned true" + end + + # Do deeper 'usability' checks for an existing deployment found for + # a required one + # + # In some cases (quarantined tasks, FATAL_ERROR), an existing deployment + # that seem reusable actually cannot. This check is dependent on which + # task contexts are needed, which cannot be done within Deployment#reusable? + # + # @param [Syskit::Deployment] required the deployment part of the network + # being deployed + # @param [Syskit::Deployment] existing the deployment part of the running + # plan that is being considered + def existing_deployment_needs_restart?(required, existing) + restart_enabled = + Syskit.conf.auto_restart_deployments_with_quarantines? + return false unless restart_enabled + unless existing.has_fatal_errors? || existing.has_quarantines? + return false + end + + required.each_executed_task do |t| + return true if existing.task_context_in_fatal?(t.orocos_name) + return true if existing.task_context_quarantined?(t.orocos_name) + end + false + end + + # Import the component objects that are already in the main plan + # + # The graphs are modified to handle the deployment of the network + # being generated + # + # @param [Array] used_tasks the tasks that are part of the + # new network + def import_existing_tasks + all_tasks = @work_plan.find_tasks(Component).to_set + interruption_point "syskit-engine:imported-tasks" + + all_tasks.delete_if do |t| + if t.finished? + debug { " clearing the relations of the finished task #{t}" } + t.remove_relations(Syskit::Flows::DataFlow) + t.remove_relations(Roby::TaskStructure::Dependency) + true + elsif t.transaction_proxy? && t.abstract? + @work_plan.remove_task(t) + true + end + end + interruption_point "syskit-engine:imported-tasks:cleanup" + + all_tasks + end + + # Remove connections that are not forwarding connections (e.g. + # composition exports) + def imported_tasks_remove_direct_connections(tasks) + dataflow_graph = + @work_plan.task_relation_graph_for(Syskit::Flows::DataFlow) + tasks.each do |t| + dataflow_graph.in_neighbours(t).dup.each do |source_t| + connections = dataflow_graph.edge_info(source_t, t).dup + connections_remove_non_exports(t, source_t, connections) + if connections.empty? + dataflow_graph.remove_edge(source_t, t) + else + dataflow_graph.set_edge_info(source_t, t, connections) + end + end + interruption_point( + "syskit-engine:imported-tasks:dataflow-cleanup", + log_on_interruption_only: true + ) + end + end + + # Remove the connections that do not represent a composition export + # from the connection set + # + # @param [Hash] connections the connection hash as stored in the dataflow + # graph + def connections_remove_non_exports(task, source_task, connections) + connections.delete_if do |(source_port, sink_port), _policy| + both_output = source_task.find_output_port(source_port) && + task.find_output_port(sink_port) + both_input = source_task.find_input_port(source_port) && + task.find_input_port(sink_port) + !both_output && !both_input + end + end + + # Import all non-finished deployments from the actual plan into the + # work plan, and sort them into those we can use and those we can't + def import_existing_deployments(used_deployments) + deployments = @work_plan.find_tasks(Syskit::Deployment).not_finished + used_deployments.to_set(&:process_name) + + finishing_deployments = {} + existing_deployments = {} + deployments.each do |task| + process_name = task.process_name + if !task.reusable? + finishing_deployments[process_name] = task + elsif task.transaction_proxy? + (existing_deployments[process_name] ||= []) << task + end + end + + [finishing_deployments, existing_deployments] + end + + # After the deployment phase, we check whether some static ports are + # modified and cause their task to be reconfigured. + # + # Note that tasks that are already reconfigured because of + # {#adapt_existing_deployment} will be fine as the task is not + # configured yet + def reconfigure_tasks_on_static_port_modification(deployed_tasks) + final_deployed_tasks = deployed_tasks.dup + + # We filter against 'deployed_tasks' to always select the tasks + # that have been selected in this deployment. It does mean that + # the task is always the 'current' one, that is we would pick + # the new deployment task and ignore the one that is being + # replaced + already_setup_tasks = + find_all_setup_tasks.find_all { deployed_tasks.include?(_1) } + + already_setup_tasks.each do |t| + next unless t.transaction_modifies_static_ports? + + debug do + "#{t} was selected as deployment, but it would require " \ + "modifications on static ports, spawning a new task" + end + + new_task = reconfigure_task(t) + final_deployed_tasks.delete(t) + final_deployed_tasks << new_task + end + final_deployed_tasks + end + + def find_all_setup_tasks + @work_plan + .find_tasks(Syskit::TaskContext).not_finished.not_finishing + .find_all { |t| !t.read_only? } + .find_all { |t| t.setting_up? || t.setup? } + end + + def reconfigure_task(task) + new_task = task.execution_agent.task( + task.orocos_name, task.concrete_model + ) + @merge_solver.apply_merge_group(task => new_task) + new_task.should_configure_after task.stop_event + new_task + end + + # Find the "last" deployed task in a set of related deployed tasks + # in the plan + # + # Ordering is encoded in the should_configure_after relation + def find_current_deployed_task(deployed_tasks) + configuration_precedence_graph = @work_plan.event_relation_graph_for( + Roby::EventStructure::SyskitConfigurationPrecedence + ) + + tasks = deployed_tasks.find_all do |t| + configuration_precedence_graph.leaf?(t.stop_event) + end + + if tasks.size > 1 + raise InternalError, + "could not find the current task in " \ + "#{deployed_tasks.map(&:to_s).sort.join(', ')}" + end + + tasks.first + end + + # Given a required deployment task in {#work_plan} and a proxy + # representing an existing deployment task in {#real_plan}, modify + # the plan to reuse the existing deployment + # + # @param [UsedDeployment] used_deployment + # @return [Array] the set of TaskContext + # instances that have been used to replace the task contexts + # generated during network generation. They are all deployed by + # existing_deployment_task, and some of them might be transaction + # proxies. + def adapt_existing_deployment(used_deployment, existing_deployment_task) + orocos_name_to_existing = + adapt_existing_create_orocos_name_mapping(existing_deployment_task) + + applied_merges = Set.new + deployed_tasks = used_deployment.each_executed_task.to_a + initial_deployment_task = deployed_tasks.first.execution_agent + deployed_tasks.each do |task| + existing_tasks = + orocos_name_to_existing[task.orocos_name] || [] + existing_task = adapt_existing_deployed_task( + task, existing_tasks, existing_deployment_task + ) + applied_merges << existing_task + debug { " using #{existing_task} for #{task} (#{task.orocos_name})" } + end + @work_plan.remove_task(initial_deployment_task) if initial_deployment_task + applied_merges + end + + # Make sure a task from the work plan is deployed + # + # It can either re-use an existing deployed task from `existing_tasks`, or + # create a new one if needed + # + # @return [TaskContext] the used task instance + def adapt_existing_deployed_task( + task, existing_tasks, existing_deployment_task + ) + existing_task = find_current_deployed_task(existing_tasks) + return if task == existing_task + + if !existing_task || !task.can_be_deployed_by?(existing_task) + new_task = adapt_existing_create_new( + task, existing_task, existing_deployment_task + ) + adapt_existing_synchronize_new(new_task, existing_tasks) + existing_task = new_task + end + + @merge_solver.apply_merge_group(task => existing_task) + existing_task + end + + # Create a string-to-tasks mapping for the existing executed tasks of a + # deployment + # + # @return [{String=>Array}] the mapping, where a name can be + # mapped to more than one task because of possible reconfigurations + def adapt_existing_create_orocos_name_mapping(deployment_task) + deployment_task.each_executed_task.with_object({}) do |t, h| + next if t.finished? + + (h[t.orocos_name] ||= []) << t + end + end + + # Create a new deployed task within {#adapt_existing_deployment} + def adapt_existing_create_new(task, existing_task, existing_deployment_task) + debug do + if existing_task + " task #{task.orocos_name} has been deployed, but " \ + "I can't merge with the existing deployment " \ + "(#{existing_task})" + else + " task #{task.orocos_name} has not yet been deployed" + end + end + + new_task = + existing_deployment_task + .task(task.orocos_name, task.concrete_model, setup_scheduler: false) + + debug do + " created #{new_task} for #{task} (#{task.orocos_name})" + end + new_task + end + + # Make sure a new task instance is configured only when previous one + # in {#adapt_existing_deployment} + def adapt_existing_synchronize_new(new_task, existing_tasks) + existing_tasks.each do |previous_task| + debug do + " #{new_task} will wait for #{previous_task} " \ + "to finish before reconfiguring" + end + + new_task.should_configure_after(previous_task.stop_event) + end + end + + # "Cut" relations between the "old" plan and the new one + # + # At this stage, old components (task contexts and compositions) + # that are not part of the new plan may still be child of bits of + # the new plan. This happens if they are added as children of other + # task contexts. The transformer does this to register dynamic + # transformation producers + # + # This pass looks for all proxies of compositions and task contexts + # that are not the target of a merge operation. When this happens, + # we know that the component is not being reused, and we remove all + # dependency relations where it is child and where the parent is + # "useful" + # + # Note that we do this only for relations between Syskit + # components. Relations with "plan" Roby tasks are updated because + # we replace toplevel tasks. + def sever_old_plan_from_new_plan + old_tasks = + @work_plan + .find_local_tasks(Syskit::Component) + .find_all(&:transaction_proxy?) + + merge_leaves = @merge_solver.each_merge_leaf.to_set + old_tasks.each do |old_task| + next if merge_leaves.include?(old_task) + + parents = + old_task + .each_parent_task + .find_all { |t| merge_leaves.include?(t) } + + parents.each { |t| t.remove_child(old_task) } + end + end + + UsedDeployment = Struct.new( + :configured_deployment, :tasks, keyword_init: true + ) do + def process_name + configured_deployment.process_name + end + + def each_executed_task(&block) + tasks.each(&block) + end + end + + def debug_output_used_deployments_with_existing( + used_deployments_with_existing + ) + return unless Roby.log_level_enabled?(self, :debug) + + debug " Mapping deployments in the network to the existing ones" + used_deployments_with_existing.each do |used, existing, _| + debug " network: #{used}" + debug " existing: #{existing}" + end + end + end + end +end diff --git a/lib/syskit/network_generation/system_network_deployer.rb b/lib/syskit/network_generation/system_network_deployer.rb index ba80d5061..bef3b005c 100644 --- a/lib/syskit/network_generation/system_network_deployer.rb +++ b/lib/syskit/network_generation/system_network_deployer.rb @@ -72,10 +72,12 @@ def interruption_point(name, log_on_interruption_only: false) # # @param [Boolean] validate if true, {#validate_deployed_networks} # will run on the generated network - # @return [Set] the set of tasks for which the deployer could + # @return [({Component=>DeploymentGroup::DeployedTask},Set)] the + # used deployments, as a map of task instance to the deployment's + # description, and the set of tasks for which the deployer could # not find a deployment def deploy(error_handler: RaiseErrorHandler.new, validate: true, - reuse_deployments: false, deployment_tasks: {}) + reuse_deployments: false, deployment_tasks: {}, lazy: false) debug "Deploying the system network" all_tasks = plan.find_local_tasks(TaskContext).to_a @@ -83,15 +85,21 @@ def deploy(error_handler: RaiseErrorHandler.new, validate: true, select_deployments(all_tasks, reuse: reuse_deployments) interruption_point "syskit-netgen:select_deployments" - apply_selected_deployments(selected_deployments, deployment_tasks) + if lazy + used_deployments = + lazy_apply_selected_deployments(selected_deployments) + else + used_deployments = + apply_selected_deployments(selected_deployments, deployment_tasks) + end interruption_point "syskit-netgen:apply_selected_deployments" if validate - validate_deployed_network(error_handler: error_handler) + validate_deployed_network(error_handler: error_handler, lazy: lazy) log_timepoint "syskit-netgen:validate-deployed-network" end - missing_deployments + [used_deployments, missing_deployments] end # @return [Set] @@ -183,9 +191,10 @@ def select_deployments(tasks, reuse: false) missing_deployments << task elsif !reuse && used_deployments.include?(selected) debug do - machine, configured_deployment, task_name = *selected + configured_deployment = selected.configured_deployment + task_name = selected.mapped_task_name "#{task} resolves to #{configured_deployment}.#{task_name} " \ - "on #{machine} for its deployment, but it is already used" + "for its deployment, but it is already used" end missing_deployments << task else @@ -198,30 +207,96 @@ def select_deployments(tasks, reuse: false) # Modify the plan to apply a deployment selection # - # @param [Component=>Deployment] selected_deployments the + # @param [{Component=>DeploymentGroup::DeployedTask}] selected_deployments the # component-to-deployment association - # @return [void] + # @param deployment_tasks a memoization object that allows the system to + # instanciate a deployment only once + # @return [{Component=>DeploymentGroup::DeployedTask}] the mapping between + # the deployed task (the instance remaining in the plan) and the + # deployedtask that was used to create it def apply_selected_deployments(selected_deployments, deployment_tasks = {}) - selected_deployments.each do |task, deployed_task| - deployed_task, = deployed_task.instanciate( - plan, - permanent: Syskit.conf.permanent_deployments?, - deployment_tasks: deployment_tasks - ) - debug do - agent = deployed_task.execution_agent - "deploying #{task} with #{agent.process_name} (#{agent})" + selected_deployments + .each_with_object({}) do |(task, deployed_task_m), used_deployments| + deployed_task, = deployed_task_m.instanciate( + plan, + permanent: Syskit.conf.permanent_deployments?, + deployment_tasks: deployment_tasks + ) + debug do + agent = deployed_task.execution_agent + "deploying #{task} with #{agent.process_name} (#{agent})" + end + # We MUST merge one-by-one here. Calling apply_merge_group + # on all the merges at once would NOT copy the connections + # that exist between the tasks of the "from" group to the + # "to" group, which is really not what we want + # + # Calling with all the mappings would be useful if what + # we wanted is replace a subnet of the plan by another + # subnet. This is not the goal here. + merge_solver.apply_merge_group(task => deployed_task) + used_deployments[deployed_task] = deployed_task_m + + used_deployments.merge!( + apply_selected_deployments_discover_schedulers( + deployed_task, deployed_task_m.configured_deployment + ) + ) + end + end + + # Return entries compatible with used_deployments for a task's scheduler + # task(s) - resolved recursively + def apply_selected_deployments_discover_schedulers( + task, configured_deployment + ) + return {} unless task.orogen_model.master + + scheduler_task = task.scheduler_child + scheduler_name = scheduler_task.orocos_name + scheduler_deployed_task = Models::DeploymentGroup::DeployedTask.new( + configured_deployment, scheduler_name + ) + + recursive = apply_selected_deployments_discover_schedulers( + scheduler_task, configured_deployment + ) + { scheduler_task => scheduler_deployed_task }.merge(recursive) + end + + # Apply deployments selected during {#deploy} by setting the task's + # orocos_name argument accordingly + # + # @param [Component=>DeploymentGroup::DeployedTask] selected_deployments the + # component-to-deployment association + def lazy_apply_selected_deployments(selected_deployments) + with_master = selected_deployments.find_all do |task, sel| + unless sel.orocos_name + raise "found selected deployment without a task name" end - # We MUST merge one-by-one here. Calling apply_merge_group - # on all the merges at once would NOT copy the connections - # that exist between the tasks of the "from" group to the - # "to" group, which is really not what we want - # - # Calling with all the mappings would be useful if what - # we wanted is replace a subnet of the plan by another - # subnet. This is not the goal here. - merge_solver.apply_merge_group(task => deployed_task) + + task.orocos_name ||= sel.orocos_name + task.orogen_model = sel.orogen_model + task.orogen_model.master end + return selected_deployments if with_master.empty? + + used_deployments = selected_deployments.dup + by_name = selected_deployments + .to_h { |task, _| [task.orocos_name, task] } + with_master.each do |task, sel| + deployment = sel.configured_deployment + scheduler_task = + deployment.task_setup_scheduler(task, existing_tasks: by_name) + scheduler_name = scheduler_task.orocos_name + by_name[scheduler_name] = scheduler_task + + scheduler_deployed_task = Models::DeploymentGroup::DeployedTask.new( + deployment, scheduler_name + ) + used_deployments[scheduler_task] = scheduler_deployed_task + end + used_deployments end # Sanity checks to verify that the result of #deploy_system_network @@ -229,8 +304,10 @@ def apply_selected_deployments(selected_deployments, deployment_tasks = {}) # # @return [Array] all the resolution errors of the deployed # network. - def validate_deployed_network(error_handler: RaiseErrorHandler.new) - verify_all_tasks_deployed(error_handler: error_handler) + def validate_deployed_network( + error_handler: RaiseErrorHandler.new, lazy: false + ) + verify_all_tasks_deployed(error_handler: error_handler, lazy: lazy) verify_all_configurations_exist(error_handler: error_handler) verify_all_process_managers_enabled(error_handler: error_handler) end @@ -238,23 +315,44 @@ def validate_deployed_network(error_handler: RaiseErrorHandler.new) # Verifies that all tasks in the plan are deployed # # @param [ResolutionErrorHandler | RaiseErrorHandler] error_handler - def verify_all_tasks_deployed(error_handler: RaiseErrorHandler.new) + def verify_all_tasks_deployed( + error_handler: RaiseErrorHandler.new, lazy: false + ) self.class.verify_all_tasks_deployed( - plan, default_deployment_group, error_handler: error_handler + plan, default_deployment_group, + error_handler: error_handler, lazy: lazy ) end + # Tests whether the given task is deployed when the network generation + # runs in eager deployment mode + def self.deployed_task?(task) + task.execution_agent + end + + # Tests whether the given task is deployed when the network generation + # runs in lazy deployment mode + def self.lazily_deployed_task?(task) + task.orocos_name + end + # @see #verify_all_tasks_deployed # # @param [Component=>DeploymentGroup] deployment_groups which # deployment groups has been used for which task. This is used # to generate the error messages when needed. def self.verify_all_tasks_deployed( - plan, default_deployment_group, error_handler: RaiseErrorHandler.new + plan, default_deployment_group, + error_handler: RaiseErrorHandler.new, lazy: false ) - not_deployed = plan.find_local_tasks(TaskContext) - .not_finished.not_abstract - .find_all { |t| !t.execution_agent } + query = plan.find_local_tasks(TaskContext) + .not_finished.not_abstract + not_deployed = + if lazy + query.find_all { !lazily_deployed_task?(_1) } + else + query.find_all { !deployed_task?(_1) } + end return if not_deployed.empty? diff --git a/lib/syskit/network_generation/system_network_generator.rb b/lib/syskit/network_generation/system_network_generator.rb index 579ab744c..206ebef72 100644 --- a/lib/syskit/network_generation/system_network_generator.rb +++ b/lib/syskit/network_generation/system_network_generator.rb @@ -24,15 +24,21 @@ def early_deploy? @early_deploy end - def initialize( # rubocop:disable Metrics/ParameterLists - plan, + # Indicates if deployment is done lazily or eagerly + # + # Lazy deployment is only setting orocos_name until the very last moment + def lazy_deploy? + @lazy_deploy + end + + def initialize(plan, # rubocop:disable Metrics/ParameterLists event_logger: plan.event_logger, default_deployment_group: nil, early_deploy: false, + lazy_deploy: false, error_handler: RaiseErrorHandler.new, resolution_control: Async::Control.new, - merge_solver: nil - ) + merge_solver: nil) @plan = plan @event_logger = event_logger @resolution_control = resolution_control @@ -47,6 +53,7 @@ def initialize( # rubocop:disable Metrics/ParameterLists @merge_solver = merge_solver @default_deployment_group = default_deployment_group @early_deploy = early_deploy + @lazy_deploy = lazy_deploy @error_handler = error_handler end @@ -232,9 +239,10 @@ def deploy(deployment_tasks) default_deployment_group: default_deployment_group ) - network_deployer.deploy(validate: false, - reuse_deployments: true, - deployment_tasks: deployment_tasks) + network_deployer.deploy( + validate: false, reuse_deployments: true, lazy: lazy_deploy?, + deployment_tasks: deployment_tasks + ) end def instanciate_system_network(instance_requirements) @@ -260,6 +268,7 @@ def resolve_system_network( validate_deployed_network: true ) deployment_tasks = {} + @used_deployments = {} early_deploy(deployment_tasks) merge_solver.merge_identical_tasks @@ -314,13 +323,16 @@ def resolve_system_network( early_deploy? && validate_deployed_network ) interruption_point("syskit-netgen:validation") - @toplevel_tasks + + @used_deployments.transform_keys! { @merge_solver.replacement_for(_1) } + [@toplevel_tasks, @used_deployments] end def early_deploy(deployment_tasks) return unless early_deploy? - deploy(deployment_tasks) + used_deployments, = deploy(deployment_tasks) + @used_deployments.merge!(used_deployments) interruption_point "syskit-netgen:early-deploy" end @@ -542,8 +554,9 @@ def validate_generated_network(error_handler: @error_handler) end def validate_deployed_network(error_handler: @error_handler) - self.class.verify_all_tasks_deployed( - plan, default_deployment_group, error_handler: error_handler + SystemNetworkDeployer.verify_all_tasks_deployed( + plan, default_deployment_group, + error_handler: error_handler, lazy: lazy_deploy? ) self.class.verify_all_deployments_are_unique( plan, toplevel_tasks_to_requirements.dup, error_handler: error_handler @@ -551,16 +564,6 @@ def validate_deployed_network(error_handler: @error_handler) super if defined? super end - def self.verify_all_tasks_deployed( - plan, default_deployment_group, error_handler: RaiseErrorHandler.new - ) - SystemNetworkDeployer.verify_all_tasks_deployed( - plan, - default_deployment_group, - error_handler: error_handler - ) - end - # @api private # # Helper for {#initialize} to create the default merge solver diff --git a/lib/syskit/roby_app/configuration.rb b/lib/syskit/roby_app/configuration.rb index 7183cb118..56d35547f 100644 --- a/lib/syskit/roby_app/configuration.rb +++ b/lib/syskit/roby_app/configuration.rb @@ -183,6 +183,21 @@ def capture_errors_during_network_resolution? # resolution attr_writer :capture_errors_during_network_resolution + # Switch from a full deployment during network generation to a "lazy" one + # + # If true, the system selects a cheaper method to indicate deployments during + # network generation. This is experimental. The default is false. + # + # @see lazy_deployment_during_network_generation= + def lazy_deploy? + @lazy_deploy + end + + # Controls usage of the lazy deployment during network generation + # + # @see lazy_deploy? + attr_writer :lazy_deploy + # Only look for configuration files from the current app root path, ignoring # any other registered app attr_writer :local_only_configuration_files diff --git a/lib/syskit/task_context.rb b/lib/syskit/task_context.rb index d76aec098..fbdcd51c8 100644 --- a/lib/syskit/task_context.rb +++ b/lib/syskit/task_context.rb @@ -226,6 +226,32 @@ def distance_to(other) execution_agent.distance_to(other.execution_agent) end + # Whether the remote task information has been set by our deployment + # + # The task's deployment must set remote information gathered during deployment + # execution by calling {#initialize_remote_handles}. This method verifies that + # this step has been done. + # + # It will not be done until the deployment is ready + def has_remote_information? + orocos_task + end + + # Automatically select the task configuration based on its orocos_name + # + # Some tasks (mainly master tasks) are auto-injected in the network, and as + # such the user cannot select their configuration. This picks either + # ['default', task.orocos_name] if the master task's has a configuration + # section matching the task's name, or ['default'] otherwise. + def select_conf_from_name + self.conf = + if model.configuration_manager.has_section?(orocos_name) + ["default", orocos_name] + else + ["default"] + end + end + # Verifies if a task could be replaced by this one # # @return [Boolean] true if #merge(other_task) can be called and diff --git a/lib/syskit/test/network_manipulation.rb b/lib/syskit/test/network_manipulation.rb index 8589915c4..2aeb92614 100644 --- a/lib/syskit/test/network_manipulation.rb +++ b/lib/syskit/test/network_manipulation.rb @@ -380,7 +380,7 @@ def syskit_stub_configured_deployment( # rubocop:disable Metrics/ParameterLists # available # @return [Models::ConfiguredDeployment] the configured deployment def syskit_stub_deployment_model( - task_model = nil, name = nil, register: true, &block + task_model = nil, name = @__stubs.default_stub_name, register: true, &block ) @__stubs.stub_deployment_model( task_model, name, register: register, &block diff --git a/lib/syskit/test/self.rb b/lib/syskit/test/self.rb index 5f79292a0..0340934be 100644 --- a/lib/syskit/test/self.rb +++ b/lib/syskit/test/self.rb @@ -163,7 +163,7 @@ def setup_default_logger log_level = if (log_level_name = ENV.fetch("TEST_LOG_LEVEL", nil)) Test::Self.issue_once_warning_about_log_level( - "running tests with logger in #{log_level} mode " \ + "running tests with logger in #{log_level_name} mode " \ "(from TEST_LOG_LEVEL)" ) Logger.const_get(log_level_name) diff --git a/lib/syskit/test/stubs.rb b/lib/syskit/test/stubs.rb index 5808924a4..b020f9c60 100644 --- a/lib/syskit/test/stubs.rb +++ b/lib/syskit/test/stubs.rb @@ -60,6 +60,10 @@ def stub_deployment_model( task_model = nil, name = default_stub_name, logger_name: nil, register: true, &block ) + unless name + raise ArgumentError, "cannot provide a nil deployment model stub name" + end + task_model = task_model&.to_component_model process_server = Syskit.conf.process_server_for("stubs") diff --git a/test/features/all_features.rb b/test/features/all_features.rb index d5ac74d26..58c5d8a6b 100644 --- a/test/features/all_features.rb +++ b/test/features/all_features.rb @@ -5,3 +5,4 @@ Syskit.conf.compositions_use_schedule_as = true Syskit.conf.use_rock_time_field_for_logging = true Syskit::Runtime.syskit_async_method = Syskit::NetworkGeneration::AsyncFiber +Syskit.conf.lazy_deploy = true diff --git a/test/network_generation/test_dataflow_dynamics.rb b/test/network_generation/test_dataflow_dynamics.rb index b638b9856..7d0941169 100644 --- a/test/network_generation/test_dataflow_dynamics.rb +++ b/test/network_generation/test_dataflow_dynamics.rb @@ -134,7 +134,7 @@ module NetworkGeneration describe "master/slave deployments" do before do - task_m = Syskit::TaskContext.new_submodel do + task_m = @task_m = Syskit::TaskContext.new_submodel do output_port "out", "/double" end deployment_m = Syskit::Deployment.new_submodel do @@ -151,21 +151,41 @@ module NetworkGeneration end it "resolves if called for the slave after the master" do - flexmock(dynamics).should_receive(:set_port_info) - .with(@slave, nil, any).once - flexmock(dynamics).should_receive(:set_port_info) - dynamics.initial_information(@master) - dynamics.initial_information(@slave) + flexmock(dynamics) + .should_receive(:set_port_info) + .with(@slave, nil, any).once.pass_thru + dynamics.should_receive(:set_port_info).pass_thru + dynamics.propagate([@master, @slave]) assert dynamics.has_final_information_for_task?(@master) assert dynamics.has_final_information_for_task?(@slave) end it "resolves if called for the master after the slave" do - flexmock(dynamics).should_receive(:set_port_info) - .with(@slave, nil, any).once - flexmock(dynamics).should_receive(:set_port_info) - dynamics.initial_information(@slave) - dynamics.initial_information(@master) + flexmock(dynamics) + .should_receive(:set_port_info) + .with(@slave, nil, any).once.pass_thru + dynamics.should_receive(:set_port_info).pass_thru + dynamics.propagate([@slave, @master]) + assert dynamics.has_final_information_for_task?(@master) + assert dynamics.has_final_information_for_task?(@slave) + end + + it "resolves if called for the slave after the master on " \ + "a task whose outputs are not triggered by the task itself" do + @task_m.out_port.orogen_model.triggered_on_update = false + flexmock(dynamics) + .should_receive(:set_port_info) + .with(@slave, nil, any).once.pass_thru + dynamics.should_receive(:set_port_info).pass_thru + dynamics.propagate([@master, @slave]) + assert dynamics.has_final_information_for_task?(@master) + assert dynamics.has_final_information_for_task?(@slave) + end + + it "resolves if called for the master after the slave on " \ + "a task whose outputs are not triggered by the task itself" do + @task_m.out_port.orogen_model.triggered_on_update = false + dynamics.propagate([@slave, @master]) assert dynamics.has_final_information_for_task?(@master) assert dynamics.has_final_information_for_task?(@slave) end @@ -223,10 +243,10 @@ module NetworkGeneration end it "computes policies and saves them in the graph's policy_graph" do - plan.add(task0 = @task_m.new) - plan.add(task1 = @task_m.new) + task0 = add_task("task0") + task1 = add_task("task1") + tasks = [task0, task1] - add_agents(tasks = [task0, task1]) flexmock(@dynamics).should_receive(:propagate).with(tasks) task0.out_port.connect_to(task1.in_port) @@ -242,10 +262,10 @@ module NetworkGeneration it "adds init: true policy if available and saves it " \ "in the graph's policy_graph" do - plan.add(task0 = @task_m.new) - plan.add(task1 = @task_m.new) + task0 = add_task("task0") + task1 = add_task("task1") + tasks = [task0, task1] - add_agents(tasks = [task0, task1]) flexmock(@dynamics).should_receive(:propagate).with(tasks) task0.out_port.model.init_policy(true) @@ -262,10 +282,10 @@ module NetworkGeneration it "adds init: false policy if available and saves it " \ "in the graph's policy_graph" do - plan.add(task0 = @task_m.new) - plan.add(task1 = @task_m.new) + task0 = add_task("task0") + task1 = add_task("task1") + tasks = [task0, task1] - add_agents(tasks = [task0, task1]) flexmock(@dynamics).should_receive(:propagate).with(tasks) task0.out_port.model.init_policy(false) @@ -281,10 +301,11 @@ module NetworkGeneration end it "computes the policies on the concrete connections" do - plan.add(task = @task_m.new) + task = add_task("task") cmp = @cmp_m.instanciate(plan) + cmp.c_child.orocos_name = "child" - add_agents(tasks = [task, cmp.c_child]) + tasks = [task, cmp.c_child] flexmock(@dynamics).should_receive(:propagate).with(tasks) cmp.c_child.out_port.connect_to(task.in_port) @@ -299,10 +320,10 @@ module NetworkGeneration end it "uses in-graph policies over the computed ones" do - plan.add(task0 = @task_m.new) - plan.add(task1 = @task_m.new) + task0 = add_task("task0") + task1 = add_task("task1") + tasks = [task0, task1] - add_agents(tasks = [task0, task1]) flexmock(@dynamics).should_receive(:propagate).with(tasks) task0.out_port.connect_to(task1.in_port, type: :buffer, size: 42) @@ -315,10 +336,10 @@ module NetworkGeneration end it "passes the fallback policy to #policy_for if there is one" do - plan.add(task0 = @task_m.new) - plan.add(task1 = @task_m.new) + task0 = add_task("task0") + task1 = add_task("task1") + tasks = [task0, task1] - add_agents(tasks = [task0, task1]) flexmock(@dynamics).should_receive(:propagate).with(tasks) task0.out_port.connect_to( @@ -336,10 +357,10 @@ module NetworkGeneration end it "ignores the fallback policy if there is a policy in-graph" do - plan.add(task0 = @task_m.new) - plan.add(task1 = @task_m.new) + task0 = add_task("task0") + task1 = add_task("task1") + tasks = [task0, task1] - add_agents(tasks = [task0, task1]) flexmock(@dynamics).should_receive(:propagate).with(tasks) task0.out_port.connect_to( @@ -355,9 +376,9 @@ module NetworkGeneration end it "ignores non-deployed tasks" do - tasks = (0...4).map { @task_m.new } + tasks = 5.times.map { |_i| @task_m.new } tasks.each { |t| plan.add(t) } - add_agents(tasks[0, 2]) + tasks[0, 2].each_with_index { |t, i| t.orocos_name = i } flexmock(@dynamics).should_receive(:propagate).with(tasks[0, 2]) end end @@ -607,6 +628,11 @@ module NetworkGeneration end end + def add_task(name) + plan.add(task = @task_m.new(orocos_name: name)) + task + end + def add_agents(tasks) unless @agent_m @agent_m = Roby::Task.new_submodel diff --git a/test/network_generation/test_engine.rb b/test/network_generation/test_engine.rb index 0270ca9f7..45cad030c 100644 --- a/test/network_generation/test_engine.rb +++ b/test/network_generation/test_engine.rb @@ -151,384 +151,6 @@ def work_plan end end - describe "#reconfigure_tasks_on_static_port_modification" do - it "reconfigures already-configured tasks whose static input ports have been modified" do - task = syskit_stub_deploy_and_configure("Task", as: "task") { input_port("in", "/double").static } - proxy = work_plan[task] - flexmock(proxy).should_receive(:transaction_modifies_static_ports?).once.and_return(true) - syskit_engine.reconfigure_tasks_on_static_port_modification([proxy]) - tasks = work_plan.find_local_tasks(Syskit::TaskContext) - .with_arguments(orocos_name: task.orocos_name).to_a - assert_equal 2, tasks.size - tasks.delete(proxy) - new_task = tasks.first - - assert_child_of proxy.stop_event, new_task.start_event, - Roby::EventStructure::SyskitConfigurationPrecedence - end - - it "does not reconfigure already-configured tasks whose static input ports have not been modified" do - task = syskit_stub_deploy_and_configure("Task", as: "task") { input_port("in", "/double").static } - proxy = work_plan[task] - flexmock(proxy).should_receive(:transaction_modifies_static_ports?).once.and_return(false) - syskit_engine.reconfigure_tasks_on_static_port_modification([proxy]) - tasks = work_plan.find_local_tasks(Syskit::TaskContext) - .with_arguments(orocos_name: task.orocos_name).to_a - assert_equal work_plan.wrap([task]), tasks - end - - it "does not reconfigure not-setup tasks" do - task = syskit_stub_and_deploy("Task") { input_port("in", "/double").static } - syskit_engine.reconfigure_tasks_on_static_port_modification([task]) - tasks = work_plan.find_local_tasks(Syskit::TaskContext) - .with_arguments(orocos_name: task.orocos_name).to_a - assert_equal work_plan.wrap([task]), tasks - end - end - - describe "#adapt_existing_deployment" do - attr_reader :task_m, :deployment_m - attr_reader :deployment_task, :existing_deployment_task - # All the merges that happened during a given test - attr_reader :applied_merge_mappings - - before do - task_m = @task_m = Syskit::Component.new_submodel do - argument :orocos_name - argument :conf - end - - @applied_merge_mappings = {} - plan.add(existing_deployment_task = EngineTestStubDeployment.new(task_m)) - @existing_deployment_task = work_plan[existing_deployment_task] - flexmock(syskit_engine.merge_solver) - .should_receive(:apply_merge_group) - .with( - lambda do |mappings| - applied_merge_mappings.merge!(mappings) - true - end - ) - .pass_thru - work_plan.add(@deployment_task = EngineTestStubDeployment.new(task_m)) - end - - it "creates a new deployed task if there is not one already" do - task = deployment_task.task "test" - syskit_engine.adapt_existing_deployment(deployment_task, existing_deployment_task) - created_task = existing_deployment_task.created_tasks[0].last - assert_equal [["test", task_m, created_task]], existing_deployment_task.created_tasks - assert_equal Hash[task => created_task], applied_merge_mappings - end - it "reuses an existing deployment" do - existing_task = existing_deployment_task.task("test", record: false) - task = deployment_task.task "test" - syskit_engine.adapt_existing_deployment(deployment_task, existing_deployment_task) - assert existing_deployment_task.created_tasks.empty? - assert_equal Hash[task => existing_task], applied_merge_mappings - end - - describe "there is a deployment and it cannot be reused" do - attr_reader :task, :existing_task - - before do - @existing_task = existing_deployment_task.task("test", record: false) - @task = deployment_task.task "test" - flexmock(task).should_receive(:can_be_deployed_by?) - .with(existing_task).and_return(false) - end - - it "creates a new deployed task" do - syskit_engine.adapt_existing_deployment(deployment_task, existing_deployment_task) - created_task = existing_deployment_task.created_tasks[0].last - assert_equal [["test", task_m, created_task]], existing_deployment_task.created_tasks - assert_equal Hash[task => created_task], applied_merge_mappings - end - it "synchronizes the newly created task with the end of the existing one" do - syskit_engine.adapt_existing_deployment(deployment_task, existing_deployment_task) - created_task = existing_deployment_task.created_tasks[0].last - assert_equal [created_task.start_event], - existing_task.stop_event.each_syskit_configuration_precedence(false).to_a - end - it "re-synchronizes with all the existing tasks if more than one is present at a given time" do - syskit_engine.adapt_existing_deployment(deployment_task, existing_deployment_task) - first_new_task = existing_deployment_task.created_tasks[0].last - - work_plan.add(deployment_task = EngineTestStubDeployment.new(task_m)) - task = deployment_task.task("test") - flexmock(task).should_receive(:can_be_deployed_by?) - .with(first_new_task).and_return(false) - syskit_engine.adapt_existing_deployment(deployment_task, existing_deployment_task) - second_new_task = existing_deployment_task.created_tasks[1].last - - assert_equal [first_new_task.start_event, second_new_task.start_event], - existing_task.stop_event.each_syskit_configuration_precedence(false).to_a - assert_equal [second_new_task.start_event], - first_new_task.stop_event.each_syskit_configuration_precedence(false).to_a - end - - it "synchronizes with the existing tasks even if there are no current ones" do - flexmock(syskit_engine).should_receive(:find_current_deployed_task) - .once.and_return(nil) - syskit_engine.adapt_existing_deployment(deployment_task, existing_deployment_task) - created_task = existing_deployment_task.created_tasks[0].last - assert_equal [created_task.start_event], - existing_task.stop_event.each_syskit_configuration_precedence(false).to_a - end - end - end - - describe "when scheduling tasks for reconfiguration" do - it "ensures that the old task is garbage collected " \ - "when child of a composition" do - task_m = Syskit::TaskContext.new_submodel - cmp_m = Syskit::Composition.new_submodel - cmp_m.add task_m, as: "test" - - syskit_stub_configured_deployment(task_m) - cmp = syskit_deploy(cmp_m) - original_task = cmp.test_child - flexmock(task_m) - .new_instances - .should_receive(:can_be_deployed_by?) - .with(->(proxy) { proxy.__getobj__ == cmp.test_child }) - .and_return(false) - new_cmp = syskit_deploy(cmp_m) - - # Should have instanciated a new composition since the children - # differ - refute_equal new_cmp, cmp - # Should have of course created a new task - refute_equal new_cmp.test_child, cmp.test_child - # And the old tasks should be ready to garbage-collect - expect_execution.garbage_collect(true).to do - finalize cmp, original_task - end - end - - it "ensures that the old task gets garbage collected when child " \ - "of another still useful task" do - child_m = Syskit::TaskContext.new_submodel - parent_m = Syskit::TaskContext.new_submodel - parent_m.singleton_class.class_eval do - define_method(:instanciate) do |*args, **kw| - task = super(*args, **kw) - task.depends_on(child_m.instanciate(*args, **kw), - role: "test") - task - end - end - - syskit_stub_configured_deployment(child_m) - parent_m = syskit_stub_requirements(parent_m) - parent = syskit_deploy(parent_m) - child = parent.test_child - - flexmock(child_m) - .new_instances.should_receive(:can_be_deployed_by?) - .with(->(proxy) { proxy.__getobj__ == child }).and_return(false) - new_parent = syskit_deploy(parent_m) - new_child = new_parent.test_child - - assert_equal new_parent, parent - refute_equal new_child, child - # And the old tasks should be ready to garbage-collect - expect_execution.garbage_collect(true).to do - finalize child - end - end - - it "ensures that the old task gets garbage collected when child " \ - "of a composition, itself child of a useful task" do - child_m = Syskit::TaskContext.new_submodel - cmp_m = Syskit::Composition.new_submodel - cmp_m.add child_m, as: "task" - parent_m = Syskit::TaskContext.new_submodel - parent_m.singleton_class.class_eval do - define_method(:instanciate) do |*args, **kw| - task = super(*args, **kw) - task.depends_on(cmp_m.instanciate(*args, **kw), - role: "test") - task - end - end - - syskit_stub_configured_deployment(child_m) - parent_m = syskit_stub_requirements(parent_m) - parent = syskit_deploy(parent_m) - child = parent.test_child - child_task = child.task_child - - flexmock(child_m) - .new_instances.should_receive(:can_be_deployed_by?) - .with(->(proxy) { proxy.__getobj__ == child_task }) - .and_return(false) - new_parent = syskit_deploy(parent_m) - new_child = new_parent.test_child - new_child_task = new_child.task_child - - assert_equal new_parent, parent - refute_equal new_child, child - refute_equal new_child_task, child_task - # And the old tasks should be ready to garbage-collect - expect_execution.garbage_collect(true).to do - finalize child, child_task - end - end - end - - describe "#find_current_deployed_task" do - it "ignores garbage tasks that have not been finalized yet" do - component_m = Syskit::Component.new_submodel - plan.add(task0 = component_m.new) - flexmock(task0).should_receive(can_finalize?: false) - plan.add(task1 = component_m.new) - task1.should_configure_after(task0.stop_event) - execute { plan.garbage_task(task0) } - task0 = syskit_engine.work_plan[task0] - task1 = syskit_engine.work_plan[task1] - assert_equal task1, syskit_engine.find_current_deployed_task([task0, task1]) - end - - it "ignores all non-reusable tasks" do - component_m = Syskit::Component.new_submodel - plan.add(task0 = component_m.new) - plan.add(task1 = component_m.new) - task1.should_configure_after(task0.stop_event) - task0.do_not_reuse - task1.do_not_reuse - task0 = syskit_engine.work_plan[task0] - task1 = syskit_engine.work_plan[task1] - assert_nil syskit_engine.find_current_deployed_task([task0, task1]) - end - end - - describe "#finalize_deployed_tasks" do - it "creates a transaction proxy of the already existing tasks and deployments" do - deployment_m = create_deployment_model(task_count: 1) - existing_deployment, = - add_deployment_and_tasks(plan, deployment_m, %w[task0]) - add_deployment_and_tasks(work_plan, deployment_m, %w[task0]) - - selected_deployments, = - syskit_engine.finalize_deployed_tasks - - assert_equal [work_plan[existing_deployment]], - selected_deployments.to_a - end - - it "ignores existing deployments that are not needed by the network" do - deployment_m = create_deployment_model(task_count: 1) - add_deployment_and_tasks(plan, deployment_m, %w[task0]) - - selected_deployments, = - syskit_engine.finalize_deployed_tasks - assert selected_deployments.empty? - end - - it "creates a new deployment if needed" do - deployment = create_deployment_model(task_count: 2) - required_deployment, = - add_deployment_and_tasks(work_plan, deployment, %w[task0 task1]) - - selected_deployments, selected_deployed_tasks = - syskit_engine.finalize_deployed_tasks - - assert_equal [required_deployment], selected_deployments.to_a - selected_deployed_tasks.each do |t| - assert_equal t.execution_agent, required_deployment - end - end - - it "updates an existing deployment, proxying the existing " \ - "tasks and creating new ones" do - deployment_m = create_deployment_model(task_count: 3) - existing_deployment, (task0, task1) = - add_deployment_and_tasks(plan, deployment_m, %w[task0 task1]) - required_deployment, (required0, task2) = - add_deployment_and_tasks(work_plan, deployment_m, %w[task0 task2]) - - selected_deployments, selected_deployed_tasks = - syskit_engine.finalize_deployed_tasks - - expected_deployment = work_plan[existing_deployment] - assert_equal [expected_deployment], selected_deployments.to_a - - task2 = work_plan.find_local_tasks - .with_arguments(orocos_name: "task2").first - assert task2 - refute task2.transaction_proxy? - - assert_equal [work_plan[task0], work_plan[task1], task2].to_set, - expected_deployment.each_executed_task.to_set - assert_equal [work_plan[task0], task2].to_set, - selected_deployed_tasks.to_set - end - - it "maintains the dependencies" do - deployment_m = create_deployment_model(task_count: 2) - existing_deployment, (existing0, existing1) = - add_deployment_and_tasks(plan, deployment_m, %w[task0 task1]) - - required_deployment, (required0, required1) = - add_deployment_and_tasks(work_plan, deployment_m, %w[task0 task1]) - - existing0.depends_on(existing1) - selected_deployments, selected_deployed_tasks = - syskit_engine.finalize_deployed_tasks - - assert work_plan[existing0].depends_on?(work_plan[existing1]) - end - - it "maintains the dependencies with two or more layers" do - deployment_m = create_deployment_model(task_count: 3) - _, (existing0,) = - add_deployment_and_tasks(plan, deployment_m, %w[task0]) - - _, (required0, required1, required2) = - add_deployment_and_tasks(work_plan, deployment_m, %w[task0 task1 task2]) - - required0.depends_on required2 - required1.depends_on required2 - - selected_deployments, selected_deployed_tasks = - syskit_engine.finalize_deployed_tasks - - required2 = work_plan[existing0].children.first - assert_equal "task2", required2.orocos_name - assert required2.each_parent_task - .find { |t| t.orocos_name == "task1" } - end - - it "raises if there is a repeated deployment" do - deployment_m = create_deployment_model(task_count: 1) - add_deployment_and_tasks(plan, deployment_m, %w[task0]) - add_deployment_and_tasks(plan, deployment_m, %w[task0]) - add_deployment_and_tasks(work_plan, deployment_m, %w[task0]) - - assert_raises Syskit::InternalError do - syskit_engine.finalize_deployed_tasks - end - end - - def create_deployment_model(task_count:) - task_m = (0...task_count).map { TaskContext.new_submodel } - (@created_task_models ||= []).concat(task_m) - - Deployment.new_submodel do - task_m.each_with_index do |m, i| - task "task#{i}", m - end - end - end - - def add_deployment_and_tasks(plan, deployment_m, task_names) - plan.add(deployment_task = deployment_m.new) - tasks = task_names.map { |name| deployment_task.task(name) } - [deployment_task, tasks] - end - end - describe "synthetic tests" do it "deploys a mission as mission" do task_model = Syskit::TaskContext.new_submodel @@ -859,6 +481,100 @@ def deploy_dev_and_bus end end + describe "master/slave setups" do + before do + @task_m = task_m = TaskContext.new_submodel do + argument :name + end + @deployment_m = Deployment.new_submodel(name: "test") do + scheduled1 = task "scheduled1", task_m + scheduled2 = task "scheduled2", task_m + scheduler = task "scheduler", task_m + scheduled1.slave_of(scheduler) + scheduled2.slave_of(scheduler) + end + + @configured_deployment = + use_deployment(@deployment_m => "prefix_").first + end + + it "deploys slave tasks from scratch" do + syskit_deploy( + [@task_m.with_arguments(name: "1").prefer_deployed_tasks(/1/), + @task_m.with_arguments(name: "2").prefer_deployed_tasks(/2/)], + default_deployment_group: default_deployment_group + ) + + assert_master_slave_pattern_correct + end + + it "deploys a slave task when another of the same deployment exists" do + syskit_deploy( + [@task_m.with_arguments(name: "1").prefer_deployed_tasks(/1/)], + default_deployment_group: default_deployment_group + ) + + initial_tasks = plan.find_tasks(@task_m).to_a + execution_agent = initial_tasks.first.execution_agent + + syskit_deploy( + [@task_m.with_arguments(name: "1").prefer_deployed_tasks(/1/), + @task_m.with_arguments(name: "2").prefer_deployed_tasks(/2/)], + default_deployment_group: default_deployment_group + ) + + initial_tasks.each do |t| + assert plan.has_task?(t) + end + assert_master_slave_pattern_correct + end + + it "deploys slave tasks when the deploymentc exists" do + deployment_task = @configured_deployment.new(plan: plan) + + syskit_deploy( + [@task_m.with_arguments(name: "1").prefer_deployed_tasks(/1/), + @task_m.with_arguments(name: "2").prefer_deployed_tasks(/2/)], + default_deployment_group: default_deployment_group + ) + + assert_master_slave_pattern_correct(deployment_task: deployment_task) + end + + it "deploys slave tasks when the scheduler task exists" do + deployment_task = @configured_deployment.new(plan: plan) + scheduler_task = deployment_task.task("prefix_scheduler") + + syskit_deploy( + [@task_m.with_arguments(name: "1").prefer_deployed_tasks(/1/), + @task_m.with_arguments(name: "2").prefer_deployed_tasks(/2/)], + default_deployment_group: default_deployment_group + ) + + tasks = assert_master_slave_pattern_correct( + deployment_task: deployment_task + ) + assert_same scheduler_task, tasks[-1] + end + + def assert_master_slave_pattern_correct(deployment_task: nil) + tasks = plan.find_tasks(@task_m).sort_by(&:orocos_name) + assert_equal( + %w[prefix_scheduled1 prefix_scheduled2 prefix_scheduler], + tasks.map(&:orocos_name) + ) + tasks[0, 2].each do |scheduled_task| + assert_same tasks[-1], scheduled_task.scheduler_child + end + + deployment_task ||= tasks.first.execution_agent + tasks.each do |t| + assert_same deployment_task, t.execution_agent + end + tasks + end + end + describe "the hooks" do before do task_m = Syskit::TaskContext.new_submodel @@ -978,27 +694,95 @@ def deploy_dev_and_bus end end end - end - class EngineTestStubDeployment < Roby::Task - attr_reader :tasks, :created_tasks + describe "when scheduling tasks for reconfiguration" do + it "ensures that the old task is garbage collected " \ + "when child of a composition" do + task_m = Syskit::TaskContext.new_submodel + cmp_m = Syskit::Composition.new_submodel + cmp_m.add task_m, as: "test" - def initialize(task_m, **arguments) - super(**arguments) - @task_m = task_m - @created_tasks = [] - @tasks = {} - end + syskit_stub_configured_deployment(task_m) + cmp = syskit_deploy(cmp_m) + cmp.test_child.do_not_reuse + original_task = cmp.test_child + new_cmp = syskit_deploy(cmp_m) + + # Should have of course created a new task + refute_equal new_cmp.test_child, original_task + # Should have instanciated a new composition since the children + # differ + refute_equal new_cmp, cmp + # And the old tasks should be ready to garbage-collect + expect_execution.garbage_collect(true).to do + finalize cmp, original_task + end + end + + it "ensures that the old task gets garbage collected when child " \ + "of another still useful task" do + child_m = Syskit::TaskContext.new_submodel + parent_m = Syskit::TaskContext.new_submodel + parent_m.singleton_class.class_eval do + define_method(:instanciate) do |*args, **kw| + task = super(*args, **kw) + task.depends_on(child_m.instanciate(*args, **kw), + role: "test") + task + end + end + + syskit_stub_configured_deployment(child_m) + parent_m = syskit_stub_requirements(parent_m) + parent = syskit_deploy(parent_m) + child = parent.test_child + + child.do_not_reuse + new_parent = syskit_deploy(parent_m) + new_child = new_parent.test_child + + assert_equal new_parent, parent + refute_equal new_child, child + # And the old tasks should be ready to garbage-collect + expect_execution.garbage_collect(true).to do + finalize child + end + end + + it "ensures that the old task gets garbage collected when child " \ + "of a composition, itself child of a useful task" do + child_m = Syskit::TaskContext.new_submodel + cmp_m = Syskit::Composition.new_submodel + cmp_m.add child_m, as: "task" + parent_m = Syskit::TaskContext.new_submodel + parent_m.singleton_class.class_eval do + define_method(:instanciate) do |*args, **kw| + task = super(*args, **kw) + task.depends_on(cmp_m.instanciate(*args, **kw), + role: "test") + task + end + end - event :ready + syskit_stub_configured_deployment(child_m) + parent_m = syskit_stub_requirements(parent_m) + parent = syskit_deploy(parent_m) + child = parent.test_child + child_task = child.task_child - define_method :task do |task_name, task_model = nil, record: true| - task = @task_m.new(orocos_name: task_name) - if record - @created_tasks << [task_name, task_model, task] + child_task.do_not_reuse + new_parent = syskit_deploy(parent_m) + new_child = new_parent.test_child + new_child_task = new_child.task_child + + assert_equal new_parent, parent + refute_equal new_child, child + refute_equal new_child_task, child_task + # And the old tasks should be ready to garbage-collect + expect_execution.garbage_collect(true).to do + finalize child, child_task + end end - task.executed_by self - task end end end diff --git a/test/network_generation/test_merge_solver.rb b/test/network_generation/test_merge_solver.rb index 3a0b43742..592262e45 100644 --- a/test/network_generation/test_merge_solver.rb +++ b/test/network_generation/test_merge_solver.rb @@ -48,32 +48,75 @@ target_task.should_receive(:can_merge?).with(task).and_return(false).once assert !solver.may_merge_task_contexts?(task, target_task) end - it "returns false if both tasks have execution agents and " \ - "merge_when_identical_agents is false" do - plan.add(task1 = simple_component_model.new) - plan.add(task2 = simple_composition_model.new) - [task1, task2].permutation.each do |t1, t2| - flexmock(t1).should_receive(:execution_agent).and_return(true) - t1.should_receive(:can_merge?).with(t2).and_return(true).once + + describe "merge_when_identical_agents is false" do + before do + plan.add(@merged_task = simple_component_model.new) + plan.add(@target_task = simple_component_model.new) + flexmock(@merged_task) + .should_receive(:can_merge?) + .with(@target_task).and_return(true) + + @solver = Syskit::NetworkGeneration::MergeSolver.new(plan) + @solver.merge_task_contexts_with_same_agent = false + end + + it "returns true if the merged task has an execution agent " \ + "and the target task does not" do + flexmock(@merged_task).should_receive(execution_agent: true) + assert @solver.may_merge_task_contexts?(@merged_task, @target_task) + end + + it "returns true if the merged task does not have an execution agent " \ + "and the target task does" do + flexmock(@target_task).should_receive(execution_agent: true) + assert @solver.may_merge_task_contexts?(@merged_task, @target_task) + end + + it "returns false if both tasks have execution agents" do + flexmock(@merged_task).should_receive(execution_agent: true) + flexmock(@target_task).should_receive(execution_agent: true) + refute @solver.may_merge_task_contexts?(@merged_task, @target_task) end - refute solver.may_merge_task_contexts?(task1, task2) - refute solver.may_merge_task_contexts?(task2, task1) end - it "returns false for tasks that do not have execution agents when " \ - "merge_when_identical_agents is true" do - plan.add(task1 = simple_component_model.new) - plan.add(task2 = simple_composition_model.new) - - [task1, task2].permutation.each do |t1, t2| - flexmock(t1).should_receive(:execution_agent).and_return(false) - t1.should_receive(:can_merge?).with(t2).and_return(true).once + + describe "merge_when_identical_agents is true" do + before do + plan.add(@merged_task = simple_component_model.new) + plan.add(@target_task = simple_component_model.new) + flexmock(@merged_task) + .should_receive(:can_merge?) + .with(@target_task).and_return(true) + + @solver = Syskit::NetworkGeneration::MergeSolver.new(plan) + @solver.merge_task_contexts_with_same_agent = true end - local_solver = Syskit::NetworkGeneration::MergeSolver.new(plan) - local_solver.merge_task_contexts_with_same_agent = true + it "returns false if orocos_name is not set on either tasks" do + refute @solver.may_merge_task_contexts?(@merged_task, @target_task) + end - refute local_solver.may_merge_task_contexts?(task1, task2) - refute local_solver.may_merge_task_contexts?(task2, task1) + it "returns false if orocos_name is set only on the merged task" do + @merged_task.orocos_name = "foo" + refute @solver.may_merge_task_contexts?(@merged_task, @target_task) + end + + it "returns false if orocos_name is set only on the target task" do + @target_task.orocos_name = "foo" + refute @solver.may_merge_task_contexts?(@merged_task, @target_task) + end + + it "returns true if orocos_name is set on both if the value is identical" do + @merged_task.orocos_name = "foo" + @target_task.orocos_name = "foo" + assert @solver.may_merge_task_contexts?(@merged_task, @target_task) + end + + it "returns false if orocos_name is set on both and the value differs" do + @merged_task.orocos_name = "foo" + @target_task.orocos_name = "bar" + refute @solver.may_merge_task_contexts?(@merged_task, @target_task) + end end end diff --git a/test/network_generation/test_runtime_network_adaptation.rb b/test/network_generation/test_runtime_network_adaptation.rb new file mode 100644 index 000000000..9f357f4db --- /dev/null +++ b/test/network_generation/test_runtime_network_adaptation.rb @@ -0,0 +1,776 @@ +# frozen_string_literal: true + +require "syskit/test/self" +require "./test/fixtures/simple_composition_model" + +module Syskit + module NetworkGeneration + describe RuntimeNetworkAdaptation do + include Syskit::Fixtures::SimpleCompositionModel + + attr_reader :work_plan, :merge_solver + + before do + @work_plan = Roby::Transaction.new(plan) + @merge_solver = MergeSolver.new(@work_plan) + @task_models = 5.times.map { TaskContext.new_submodel } + end + + describe "#finalize_deployed_tasks" do + describe "eagerly deployed networks" do + it "replaces existing tasks by their match in the plan" do + deployment_m = create_deployment_model(task_count: 1) + _, initial = + add_deployment_and_tasks(work_plan, deployment_m, %w[task0]) + adapter = create_adapter( + used_deployments_from_tasks(initial) + ) + + existing_deployment, = + add_deployment_and_tasks(plan, deployment_m, %w[task0]) + + selected_deployments, = adapter.finalize_deployed_tasks + assert_equal [work_plan[existing_deployment]], + selected_deployments.to_a + end + + it "ignores existing deployments " \ + "that are not needed by the network" do + deployment_m = create_deployment_model(task_count: 1) + add_deployment_and_tasks(plan, deployment_m, %w[task0]) + + adapter = create_adapter({}) + selected_deployments, = adapter.finalize_deployed_tasks + assert selected_deployments.empty? + end + + it "creates a new deployment if needed" do + deployment = create_deployment_model(task_count: 2) + required_deployment, tasks = + add_deployment_and_tasks(work_plan, deployment, + %w[task0 task1]) + adapter = create_adapter( + used_deployments_from_tasks(tasks) + ) + + selected_deployments, selected_deployed_tasks = + adapter.finalize_deployed_tasks + + assert_equal [required_deployment], selected_deployments.to_a + selected_deployed_tasks.each do |t| + assert_equal t.execution_agent, required_deployment + end + end + + it "successfully creates a new master/slave setup" do + task_m = TaskContext.new_submodel + deployment = Deployment.new_submodel do + scheduled = task "scheduled", task_m + scheduler = task "scheduler", task_m + scheduled.slave_of(scheduler) + end + + # NOTE: cannot call with 'scheduler', as creating 'scheduled' will + # automatically create the scheduler too + required_deployment, = + add_deployment_and_tasks(work_plan, deployment, %w[scheduled]) + adapter = create_adapter( + used_deployments_from_tasks(plan.find_tasks(task_m).to_a) + ) + adapter.finalize_deployed_tasks + + tasks = @work_plan.find_tasks(task_m).sort_by(&:orocos_name) + assert_equal %w[scheduled scheduler], tasks.map(&:orocos_name) + scheduled_task = tasks.first + scheduler_task = tasks.last + assert scheduled_task.depends_on?(scheduler_task) + assert_same required_deployment, + scheduler_task.execution_agent + assert_same required_deployment, + scheduler_task.execution_agent + end + + it "updates an existing deployment, proxying the existing " \ + "tasks and creating new ones" do + deployment_m = create_deployment_model(task_count: 3) + existing_deployment, (task0, task1) = + add_deployment_and_tasks(plan, deployment_m, %w[task0 task1]) + _, (required0, task2) = + add_deployment_and_tasks( + work_plan, deployment_m, %w[task0 task2] + ) + + adapter = create_adapter( + used_deployments_from_tasks([required0, task2]) + ) + + selected_deployments, selected_deployed_tasks = + adapter.finalize_deployed_tasks + + expected_deployment = work_plan[existing_deployment] + assert_equal [expected_deployment], selected_deployments.to_a + + task2 = work_plan.find_local_tasks + .with_arguments(orocos_name: "task2").first + assert task2 + refute task2.transaction_proxy? + + assert_equal [work_plan[task0], work_plan[task1], task2].to_set, + expected_deployment.each_executed_task.to_set + assert_equal [work_plan[task0], task2].to_set, + selected_deployed_tasks.to_set + end + + it "maintains the dependencies" do + deployment_m = create_deployment_model(task_count: 2) + _, (existing0, existing1) = + add_deployment_and_tasks(plan, deployment_m, %w[task0 task1]) + + _, (required0, required1) = + add_deployment_and_tasks( + work_plan, deployment_m, %w[task0 task1] + ) + + existing0.depends_on(existing1) + + adapter = create_adapter( + used_deployments_from_tasks([required0, required1]) + ) + adapter.finalize_deployed_tasks + + assert work_plan[existing0].depends_on?(work_plan[existing1]) + end + + it "maintains the dependencies with two or more layers" do + deployment_m = create_deployment_model(task_count: 3) + _, (existing0,) = + add_deployment_and_tasks(plan, deployment_m, %w[task0]) + + _, (required0, required1, required2) = + add_deployment_and_tasks(work_plan, deployment_m, + %w[task0 task1 task2]) + + required0.depends_on required2 + required1.depends_on required2 + + adapter = create_adapter( + used_deployments_from_tasks([required0, required1, required2]) + ) + adapter.finalize_deployed_tasks + + required2 = work_plan[existing0].children.first + assert_equal "task2", required2.orocos_name + assert(required2.each_parent_task + .find { |t| t.orocos_name == "task1" }) + end + + it "raises if there is a repeated deployment" do + deployment_m = create_deployment_model(task_count: 1) + add_deployment_and_tasks(plan, deployment_m, %w[task0]) + add_deployment_and_tasks(plan, deployment_m, %w[task0]) + _, tasks = + add_deployment_and_tasks(work_plan, deployment_m, %w[task0]) + + adapter = create_adapter(used_deployments_from_tasks(tasks)) + assert_raises Syskit::InternalError do + adapter.finalize_deployed_tasks + end + end + + it "reuses an existing scheduler task" do + task_m = TaskContext.new_submodel + deployment_m = Deployment.new_submodel do + scheduled1 = task "scheduled1", task_m + scheduled2 = task "scheduled2", task_m + scheduler = task "scheduler", task_m + scheduled1.slave_of(scheduler) + scheduled2.slave_of(scheduler) + end + + _, (existing_scheduler,) = + add_deployment_and_tasks(plan, deployment_m, %w[scheduler]) + + add_deployment_and_tasks( + work_plan, deployment_m, %w[scheduled1 scheduled2] + ) + adapter = create_adapter( + used_deployments_from_tasks(work_plan.find_tasks(task_m).to_a) + ) + adapter.finalize_deployed_tasks + + tasks = @work_plan.find_tasks(task_m).sort_by(&:orocos_name) + assert_equal %w[scheduled1 scheduled2 scheduler], + tasks.map(&:orocos_name) + scheduler_task = tasks[-1] + assert_equal work_plan[existing_scheduler], scheduler_task + refute_nil scheduler_task.execution_agent + tasks[0, 2].each do |scheduled_task| + assert scheduled_task.depends_on?(scheduler_task) + assert_same scheduler_task.execution_agent, + scheduler_task.execution_agent + end + end + + it "properly instanciates a master/slave setup " \ + "if the deployment task already exists" do + task_m = TaskContext.new_submodel + deployment_m = Deployment.new_submodel do + scheduled1 = task "scheduled1", task_m + scheduled2 = task "scheduled2", task_m + scheduler = task "scheduler", task_m + scheduled1.slave_of(scheduler) + scheduled2.slave_of(scheduler) + end + + deployment, = add_deployment_and_tasks(plan, deployment_m, %w[]) + + add_deployment_and_tasks( + work_plan, deployment_m, %w[scheduled1 scheduled2] + ) + adapter = create_adapter( + used_deployments_from_tasks(work_plan.find_tasks(task_m).to_a) + ) + adapter.finalize_deployed_tasks + + tasks = @work_plan.find_tasks(task_m).sort_by(&:orocos_name) + assert_equal %w[scheduled1 scheduled2 scheduler], + tasks.map(&:orocos_name) + tasks.each do |t| + assert_equal work_plan[deployment], t.execution_agent + end + scheduler_task = tasks[-1] + tasks[0, 2].each do |scheduled_task| + assert scheduled_task.depends_on?(scheduler_task) + assert_same scheduler_task.execution_agent, + scheduler_task.execution_agent + end + end + end + + describe "lazily deployed networks" do + it "replaces existing tasks by their match in the plan" do + deployment_m = create_deployment_model(task_count: 1) + existing_deployment, = + add_deployment_and_tasks(plan, deployment_m, %w[task0]) + initial = add_lazy_tasks(work_plan, deployment_m, %w[task0]) + adapter = create_adapter( + used_deployments_from_lazy(initial, deployment_m) + ) + + selected_deployments, = adapter.finalize_deployed_tasks + assert_equal [work_plan[existing_deployment]], + selected_deployments.to_a + end + + it "ignores existing deployments " \ + "that are not needed by the network" do + deployment_m = create_deployment_model(task_count: 1) + add_deployment_and_tasks(plan, deployment_m, %w[task0]) + + adapter = create_adapter({}) + selected_deployments, = adapter.finalize_deployed_tasks + assert selected_deployments.empty? + end + + it "creates a new deployment if needed" do + deployment_m = create_deployment_model(task_count: 2) + tasks = add_lazy_tasks(work_plan, deployment_m, %w[task0 task1]) + adapter = create_adapter( + used_deployments_from_lazy(tasks, deployment_m) + ) + + selected_deployments, = + adapter.finalize_deployed_tasks + + assert_equal 1, selected_deployments.size + task_names = + selected_deployments + .first.each_executed_task.map(&:orocos_name).sort + assert_equal %w[task0 task1], task_names + end + + it "updates an existing deployment, proxying the existing " \ + "tasks and creating new ones" do + deployment_m = create_deployment_model(task_count: 3) + existing_deployment, (task0, task1) = + add_deployment_and_tasks(plan, deployment_m, %w[task0 task1]) + (required0, task2) = add_lazy_tasks( + work_plan, deployment_m, %w[task0 task2] + ) + + adapter = create_adapter( + used_deployments_from_lazy([required0, task2], deployment_m) + ) + + selected_deployments, selected_deployed_tasks = + adapter.finalize_deployed_tasks + + expected_deployment = work_plan[existing_deployment] + assert_equal [expected_deployment], selected_deployments.to_a + + task2 = work_plan.find_local_tasks + .with_arguments(orocos_name: "task2").first + assert task2 + refute task2.transaction_proxy? + + assert_equal [work_plan[task0], work_plan[task1], task2].to_set, + expected_deployment.each_executed_task.to_set + assert_equal [work_plan[task0], task2].to_set, + selected_deployed_tasks.to_set + end + + it "maintains the dependencies" do + deployment_m = create_deployment_model(task_count: 2) + _, (existing0, existing1) = + add_deployment_and_tasks(plan, deployment_m, %w[task0 task1]) + + (required0, required1) = add_lazy_tasks( + work_plan, deployment_m, %w[task0 task1] + ) + + existing0.depends_on(existing1) + + adapter = create_adapter( + used_deployments_from_lazy( + [required0, required1], deployment_m + ) + ) + adapter.finalize_deployed_tasks + + assert work_plan[existing0].depends_on?(work_plan[existing1]) + end + + it "maintains the dependencies with two or more layers" do + deployment_m = create_deployment_model(task_count: 3) + _, (existing0,) = + add_deployment_and_tasks(plan, deployment_m, %w[task0]) + + (required0, required1, required2) = + add_lazy_tasks(work_plan, deployment_m, %w[task0 task1 task2]) + + required0.depends_on required2 + required1.depends_on required2 + + adapter = create_adapter( + used_deployments_from_lazy( + [required0, required1, required2], deployment_m + ) + ) + adapter.finalize_deployed_tasks + + required2 = work_plan[existing0].children.first + assert_equal "task2", required2.orocos_name + assert(required2.each_parent_task + .find { |t| t.orocos_name == "task1" }) + end + + it "raises if there is a repeated deployment" do + deployment_m = create_deployment_model(task_count: 1) + add_deployment_and_tasks(plan, deployment_m, %w[task0]) + add_deployment_and_tasks(plan, deployment_m, %w[task0]) + tasks = add_lazy_tasks(work_plan, deployment_m, %w[task0]) + + adapter = create_adapter( + used_deployments_from_lazy(tasks, deployment_m) + ) + assert_raises Syskit::InternalError do + adapter.finalize_deployed_tasks + end + end + + it "successfully creates a new master/slave setup" do + task_m = TaskContext.new_submodel + deployment_m = Deployment.new_submodel do + scheduled1 = task "scheduled1", task_m + scheduled2 = task "scheduled2", task_m + scheduler = task "scheduler", task_m + scheduled1.slave_of(scheduler) + scheduled2.slave_of(scheduler) + end + + tasks = add_lazy_tasks( + work_plan, deployment_m, %w[scheduled1 scheduled2 scheduler] + ) + tasks[0].depends_on(tasks.last) + tasks[1].depends_on(tasks.last) + adapter = create_adapter( + used_deployments_from_lazy(tasks, deployment_m) + ) + adapter.finalize_deployed_tasks + + tasks = @work_plan.find_tasks(task_m).sort_by(&:orocos_name) + assert_equal %w[scheduled1 scheduled2 scheduler], + tasks.map(&:orocos_name) + scheduler_task = tasks[-1] + refute_nil scheduler_task.execution_agent + tasks[0, 2].each do |scheduled_task| + assert scheduled_task.depends_on?(scheduler_task) + assert_same scheduler_task.execution_agent, + scheduler_task.execution_agent + end + end + + it "reuses an existing scheduler task" do + task_m = TaskContext.new_submodel + deployment_m = Deployment.new_submodel do + scheduled1 = task "scheduled1", task_m + scheduled2 = task "scheduled2", task_m + scheduler = task "scheduler", task_m + scheduled1.slave_of(scheduler) + scheduled2.slave_of(scheduler) + end + + _, (existing_scheduler,) = + add_deployment_and_tasks(plan, deployment_m, %w[scheduler]) + + tasks = add_lazy_tasks( + work_plan, deployment_m, %w[scheduled1 scheduled2 scheduler] + ) + tasks[0].depends_on(tasks.last) + tasks[1].depends_on(tasks.last) + adapter = create_adapter( + used_deployments_from_lazy(tasks, deployment_m) + ) + adapter.finalize_deployed_tasks + + tasks = @work_plan.find_tasks(task_m).sort_by(&:orocos_name) + assert_equal %w[scheduled1 scheduled2 scheduler], + tasks.map(&:orocos_name) + scheduler_task = tasks[-1] + assert_equal work_plan[existing_scheduler], scheduler_task + refute_nil scheduler_task.execution_agent + tasks[0, 2].each do |scheduled_task| + assert scheduled_task.depends_on?(scheduler_task) + assert_same scheduler_task.execution_agent, + scheduler_task.execution_agent + end + end + + it "properly instanciates a master/slave setup " \ + "if the deployment task already exists" do + task_m = TaskContext.new_submodel + deployment_m = Deployment.new_submodel do + scheduled1 = task "scheduled1", task_m + scheduled2 = task "scheduled2", task_m + scheduler = task "scheduler", task_m + scheduled1.slave_of(scheduler) + scheduled2.slave_of(scheduler) + end + + add_deployment_and_tasks(plan, deployment_m, %w[]) + + tasks = add_lazy_tasks( + work_plan, deployment_m, %w[scheduled1 scheduled2 scheduler] + ) + tasks[0].depends_on(tasks.last) + tasks[1].depends_on(tasks.last) + adapter = create_adapter( + used_deployments_from_lazy(tasks, deployment_m) + ) + adapter.finalize_deployed_tasks + + tasks = @work_plan.find_tasks(task_m).sort_by(&:orocos_name) + assert_equal %w[scheduled1 scheduled2 scheduler], + tasks.map(&:orocos_name) + scheduler_task = tasks[-1] + refute_nil scheduler_task.execution_agent + tasks[0, 2].each do |scheduled_task| + assert scheduled_task.depends_on?(scheduler_task) + assert_same scheduler_task.execution_agent, + scheduler_task.execution_agent + end + end + end + + def create_deployment_model(task_count:) + if task_count > @task_models.size + raise ArgumentError, + "only provisioned #{@task_models.size} models" + end + + task_models = @task_models + Deployment.new_submodel do + task_count.times { |i| task "task#{i}", task_models[i] } + end + end + + def add_deployment_and_tasks(plan, deployment_m, task_names) + plan.add(deployment_task = deployment_m.new) + tasks = task_names.map { |name| deployment_task.task(name) } + [deployment_task, tasks] + end + + def add_lazy_tasks(plan, deployment_m, task_names) + tasks = task_names.each_with_index.map do |name, _i| + deployment_m.task(name).new(orocos_name: name) + end + plan.add(tasks) + tasks + end + end + + describe "#reconfigure_tasks_on_static_port_modification" do + before do + @adapter = RuntimeNetworkAdaptation.new( + @work_plan, {}, merge_solver: @merge_solver + ) + end + + it "reconfigures already-configured tasks " \ + "whose static input ports have been modified" do + task = syskit_stub_deploy_and_configure("Task", as: "task") do + input_port("in", "/double").static + end + proxy = work_plan[task] + flexmock(proxy) + .should_receive(:transaction_modifies_static_ports?) + .once.and_return(true) + @adapter.reconfigure_tasks_on_static_port_modification([proxy]) + tasks = work_plan.find_local_tasks(Syskit::TaskContext) + .with_arguments(orocos_name: task.orocos_name).to_a + assert_equal 2, tasks.size + tasks.delete(proxy) + new_task = tasks.first + + assert_child_of proxy.stop_event, new_task.start_event, + Roby::EventStructure::SyskitConfigurationPrecedence + end + + it "does not reconfigure already-configured tasks " \ + "whose static input ports have not been modified" do + task = syskit_stub_deploy_and_configure("Task", as: "task") do + input_port("in", "/double").static + end + proxy = work_plan[task] + flexmock(proxy).should_receive(:transaction_modifies_static_ports?) + .once.and_return(false) + @adapter.reconfigure_tasks_on_static_port_modification([proxy]) + tasks = work_plan.find_local_tasks(Syskit::TaskContext) + .with_arguments(orocos_name: task.orocos_name).to_a + assert_equal work_plan.wrap([task]), tasks + end + + it "does not reconfigure not-setup tasks" do + task = syskit_stub_and_deploy("Task") do + input_port("in", "/double").static + end + @adapter.reconfigure_tasks_on_static_port_modification([task]) + tasks = work_plan.find_local_tasks(Syskit::TaskContext) + .with_arguments(orocos_name: task.orocos_name).to_a + assert_equal work_plan.wrap([task]), tasks + end + end + + describe "#adapt_existing_deployment" do + attr_reader :task_m, :deployment_m + attr_reader :deployment_task, :existing_deployment_task + # All the merges that happened during a given test + attr_reader :applied_merge_mappings + + before do + task_m = @task_m = Syskit::Component.new_submodel do + argument :orocos_name + argument :conf + end + + @applied_merge_mappings = {} + existing_deployment_task = EngineTestStubDeployment.new(task_m) + plan.add(existing_deployment_task) + @existing_deployment_task = work_plan[existing_deployment_task] + flexmock(@merge_solver) + .should_receive(:apply_merge_group) + .with( + lambda do |mappings| + applied_merge_mappings.merge!(mappings) + true + end + ) + .pass_thru + work_plan.add(@deployment_task = EngineTestStubDeployment.new(task_m)) + @adapter = RuntimeNetworkAdaptation.new( + @work_plan, {}, merge_solver: @merge_solver + ) + end + + it "creates a new deployed task if there is not one already" do + task = deployment_task.task "test" + @adapter.adapt_existing_deployment(deployment_task, + existing_deployment_task) + created_task = existing_deployment_task.created_tasks[0].last + assert_equal [["test", task_m, created_task]], + existing_deployment_task.created_tasks + assert_equal({ task => created_task }, applied_merge_mappings) + end + + it "reuses an existing deployment" do + existing_task = existing_deployment_task.task("test", record: false) + task = deployment_task.task "test" + @adapter.adapt_existing_deployment(deployment_task, + existing_deployment_task) + assert existing_deployment_task.created_tasks.empty? + assert_equal({ task => existing_task }, applied_merge_mappings) + end + + describe "there is a deployment and it cannot be reused" do + attr_reader :task, :existing_task + + before do + @existing_task = existing_deployment_task.task("test", + record: false) + @task = deployment_task.task "test" + flexmock(task).should_receive(:can_be_deployed_by?) + .with(existing_task).and_return(false) + end + + it "creates a new deployed task" do + @adapter.adapt_existing_deployment(deployment_task, + existing_deployment_task) + created_task = existing_deployment_task.created_tasks[0].last + assert_equal [["test", task_m, created_task]], + existing_deployment_task.created_tasks + assert_equal({ task => created_task }, applied_merge_mappings) + end + it "synchronizes the newly created task " \ + "with the end of the existing one" do + @adapter.adapt_existing_deployment(deployment_task, + existing_deployment_task) + created_task = existing_deployment_task.created_tasks[0].last + assert_has_precedence( + [created_task.start_event], existing_task.stop_event + ) + end + it "re-synchronizes with all the existing tasks " \ + "if more than one is present at a given time" do + @adapter.adapt_existing_deployment(deployment_task, + existing_deployment_task) + first_new_task = existing_deployment_task.created_tasks[0].last + + deployment_task = EngineTestStubDeployment.new(task_m) + work_plan.add(deployment_task) + task = deployment_task.task("test") + flexmock(task).should_receive(:can_be_deployed_by?) + .with(first_new_task).and_return(false) + @adapter.adapt_existing_deployment(deployment_task, + existing_deployment_task) + second_new_task = existing_deployment_task.created_tasks[1].last + + assert_has_precedence( + [first_new_task.start_event, second_new_task.start_event], + existing_task.stop_event + ) + assert_has_precedence( + [second_new_task.start_event], first_new_task.stop_event + ) + end + + it "synchronizes with the existing tasks " \ + "even if there are no current ones" do + flexmock(@adapter).should_receive(:find_current_deployed_task) + .once.and_return(nil) + @adapter.adapt_existing_deployment( + deployment_task, existing_deployment_task + ) + created_task = existing_deployment_task.created_tasks[0].last + + assert_has_precedence( + [created_task.start_event], existing_task.stop_event + ) + end + end + end + + describe "#find_current_deployed_task" do + it "returns the 'last' task" do + component_m = Syskit::Component.new_submodel + plan.add(task0 = component_m.new) + plan.add(task1 = component_m.new) + task1.should_configure_after(task0.stop_event) + task0 = work_plan[task0] + task1 = work_plan[task1] + adapter = create_adapter([]) + assert_equal task1, adapter.find_current_deployed_task([task0, task1]) + end + + it "ignores garbage tasks that have not been finalized yet" do + component_m = Syskit::Component.new_submodel + plan.add(task0 = component_m.new) + flexmock(task0).should_receive(can_finalize?: false) + plan.add(task1 = component_m.new) + task1.should_configure_after(task0.stop_event) + execute { plan.garbage_task(task0) } + task0 = work_plan[task0] + task1 = work_plan[task1] + adapter = create_adapter([]) + assert_equal task1, adapter.find_current_deployed_task([task0, task1]) + end + + it "does not ignore non-reusable tasks" do + component_m = Syskit::Component.new_submodel + plan.add(task0 = component_m.new) + plan.add(task1 = component_m.new) + task1.should_configure_after(task0.stop_event) + task0.do_not_reuse + task1.do_not_reuse + task0 = work_plan[task0] + task1 = work_plan[task1] + adapter = create_adapter([]) + assert_equal task1, adapter.find_current_deployed_task([task0, task1]) + end + end + + def create_adapter(used_deployments) + RuntimeNetworkAdaptation.new(work_plan, used_deployments, + merge_solver: @merge_solver) + end + + def used_deployments_from_tasks(tasks) + configured_deployments = tasks.each_with_object({}) do |task, per_name| + name = task.execution_agent.process_name + per_name[name] ||= flexmock(process_name: name) + end + + tasks.each_with_object({}) do |task, used_deployments| + name = task.execution_agent.process_name + used_deployments[task] = flexmock( + configured_deployment: configured_deployments.fetch(name) + ) + end + end + + def used_deployments_from_lazy(tasks, deployment_m) + configured_deployment = Models::ConfiguredDeployment.new( + "localhost", deployment_m + ) + tasks.each_with_object({}) do |task, used_deployments| + used_deployments[task] = + flexmock(configured_deployment: configured_deployment) + end + end + + def assert_has_precedence(preceding_events, event) + assert_equal preceding_events.to_set, + event.each_syskit_configuration_precedence(false).to_set + end + end + + class EngineTestStubDeployment < Roby::Task + attr_reader :tasks, :created_tasks + + def initialize(task_m, **arguments) + super(**arguments) + @task_m = task_m + @created_tasks = [] + @tasks = {} + end + + event :ready + + define_method :task do |task_name, task_model = nil, record: true, **| + task = @task_m.new(orocos_name: task_name) + @created_tasks << [task_name, task_model, task] if record + task.executed_by self + task + end + end + end +end diff --git a/test/network_generation/test_system_network_deployer.rb b/test/network_generation/test_system_network_deployer.rb index 28928ab85..17dc4c66d 100644 --- a/test/network_generation/test_system_network_deployer.rb +++ b/test/network_generation/test_system_network_deployer.rb @@ -373,7 +373,7 @@ def make_candidates(count) task0.requirements.use_deployment(deployment_models[0]) plan.add(task1 = task_models[1].new) - missing = execute { deployer.deploy(validate: false) } + _, missing = execute { deployer.deploy(validate: false) } assert_equal Set[task1], missing deployment_task = plan.find_local_tasks(deployment_models[0]).first assert deployment_task @@ -391,7 +391,7 @@ def make_candidates(count) root.depends_on(task = task_models[0].new, role: "t") task.requirements.use_deployment(deployment_models[0]) - missing = execute { deployer.deploy(validate: false) } + _, missing = execute { deployer.deploy(validate: false) } assert_equal Set.new, missing refute_equal task, root.t_child assert_equal "task", root.t_child.orocos_name @@ -402,7 +402,7 @@ def make_candidates(count) plan.add(task1 = task_models[1].new) task0.out_port.connect_to task1.in_port - missing = execute { deployer.deploy(validate: false) } + _, missing = execute { deployer.deploy(validate: false) } assert_equal Set.new, missing deployed_task0 = deployer.merge_solver.replacement_for(task0) deployed_task1 = deployer.merge_solver.replacement_for(task1) @@ -434,7 +434,7 @@ def make_candidates(count) .with(hsh(on: "machine")).once.pass_thru # And finally replace the task with the deployed task - missing = execute { deployer.deploy(validate: false) } + _, missing = execute { deployer.deploy(validate: false) } assert_equal Set.new, missing refute_equal root.t0_child, task0 refute_equal root.t1_child, task1 @@ -613,6 +613,93 @@ def make_candidates(count) end end + describe "scheduled tasks during lazy deployments" do + # Slave tasks are handled by Syskit::Deployment in eager deployment mode + + before do + @task_m = TaskContext.new_submodel + + @orogen_model = Models.create_orogen_deployment_model("deployment") + @orogen_scheduler = + @orogen_model.task("scheduler", @task_m.orogen_model) + @orogen_scheduled = + @orogen_model.task("scheduled", @task_m.orogen_model) + @orogen_scheduled.slave_of(@orogen_scheduler) + end + + it "adds its master task as dependency" do + scheduled_task = create_task("prefix_scheduled") + scheduler_tests_deploy + scheduler_task = scheduled_task.child_from_role("scheduler") + + assert_equal "prefix_scheduler", scheduler_task.orocos_name + end + + it "auto-selects the configuration of the master task" do + scheduled_task = create_task("prefix_scheduled") + syskit_stub_conf @task_m, "prefix_scheduler" + scheduler_tests_deploy + scheduler_task = scheduled_task.child_from_role("scheduler") + + assert_equal %w[default prefix_scheduler], scheduler_task.conf + end + + it "constrains the configuration of the slave task" do + scheduled_task = create_task("prefix_scheduled") + scheduler_tests_deploy + scheduler_task = scheduled_task.child_from_role("scheduler") + + assert scheduler_task.start_event.child_object?( + scheduled_task.start_event, + Roby::EventStructure::SyskitConfigurationPrecedence + ) + end + + it "reuses an existing scheduler task" do + scheduled_task = create_task("prefix_scheduled") + scheduler_task = create_task("prefix_scheduler") + scheduler_tests_deploy + + assert_same scheduler_task, + scheduled_task.child_from_role("scheduler") + end + + it "creates a scheduler task only once" do + orogen_scheduled2 = + @orogen_model.task("scheduled2", @task_m.orogen_model) + orogen_scheduled2.slave_of(@orogen_scheduler) + scheduled_task = create_task("prefix_scheduled") + scheduled2_task = create_task("prefix_scheduled2") + scheduler_tests_deploy + + scheduler = scheduled_task.child_from_role("scheduler") + assert_equal "prefix_scheduler", scheduler.orocos_name + assert_same scheduler, scheduled2_task.child_from_role("scheduler") + end + + it "creates scheduler tasks recursively" do + orogen_scheduler2 = + @orogen_model.task("scheduler2", @task_m.orogen_model) + @orogen_scheduler.slave_of(orogen_scheduler2) + scheduled_task = create_task("prefix_scheduled") + scheduler_tests_deploy + + scheduler = scheduled_task.child_from_role("scheduler") + scheduler2 = scheduler.child_from_role("scheduler") + assert_equal "prefix_scheduler2", scheduler2.orocos_name + end + + def create_task(orocos_name) + @task_m.new(plan: plan, orocos_name: orocos_name) + end + + def scheduler_tests_deploy + deployment_m = Deployment.new_submodel(orogen_model: @orogen_model) + default_deployment_group.use_deployment(deployment_m => "prefix_") + deployer.deploy(lazy: true) + end + end + def deployed_task_helper(model, name) Models::DeploymentGroup::DeployedTask.new(model, name) end diff --git a/test/network_generation/test_system_network_generator.rb b/test/network_generation/test_system_network_generator.rb index 77e24f820..6cddf157d 100644 --- a/test/network_generation/test_system_network_generator.rb +++ b/test/network_generation/test_system_network_generator.rb @@ -221,6 +221,31 @@ def arg=(value) end end + it "updates the task's orogen_model attribute " \ + "with the deployed task model" do + task_m = TaskContext.new_submodel(name: "Task") do + argument :arg + input_port "in", "/double" + end + deployment_m = Syskit::Deployment.new_submodel do + task("task", task_m).periodic(1) + end + + local_net_gen = SystemNetworkGenerator.new( + Roby::Plan.new, + default_deployment_group: Models::DeploymentGroup.new, + early_deploy: true, lazy_deploy: true + ) + toplevel_tasks, = local_net_gen.compute_system_network( + [task_m.to_instance_requirements + .use_deployment(deployment_m)], + validate_deployed_network: true + ) + assert_equal( + "Periodic", toplevel_tasks.first.orogen_model.activity_type.name + ) + end + describe "resolve_system_network with error capture" do attr_reader :error_handler, :generator diff --git a/test/roby_app/test_plugin.rb b/test/roby_app/test_plugin.rb index 76cf2eb2d..0533e9fd0 100644 --- a/test/roby_app/test_plugin.rb +++ b/test/roby_app/test_plugin.rb @@ -352,7 +352,7 @@ def rotate_log attr_reader :group def use_model_on_group(model, name, server) - deployment_m = syskit_stub_deployment_model(model) + deployment_m = syskit_stub_deployment_model(model, "") @group.use_deployment( Hash[deployment_m => name], on: server, diff --git a/test/test_deployment.rb b/test/test_deployment.rb index 71031303e..5410b56f0 100644 --- a/test/test_deployment.rb +++ b/test/test_deployment.rb @@ -185,7 +185,9 @@ def mock_raw_port(task, port_name) assert_equal "other_name", deployment_task.task("other_name").orocos_name end it "sets orogen_model on the new task" do - assert_equal orogen_deployed_task, deployment_task.task("mapped_task_name").orogen_model + model = deployment_task.task("mapped_task_name").orogen_model + assert_equal "mapped_task_name", model.name + assert_equal orogen_deployed_task.task_model, model.task_model end it "adds the deployment task as an execution agent for the new task" do flexmock(task_m).new_instances.should_receive(:executed_by).with(deployment_task).once @@ -199,7 +201,7 @@ def mock_raw_port(task, port_name) deployment_task.task("mapped_task_name") end it "does runtime initialization if it is already ready" do - task = flexmock(task_m.new) + task = flexmock(task_m.new(orocos_name: "mapped_task_name")) flexmock(task_m).should_receive(:new).and_return(task) remote_handle = flexmock(in_fatal: false) diff --git a/test/test_exceptions.rb b/test/test_exceptions.rb index ab6d3d066..26abf6218 100644 --- a/test/test_exceptions.rb +++ b/test/test_exceptions.rb @@ -215,8 +215,8 @@ module Syskit expected = <<~PP.chomp deployed task 'test_syskit_tests_empty' from deployment \ - 'syskit_tests_empty' defined in 'orogen_syskit_tests' on 'localhost' is \ - assigned to 2 tasks. Below is the list of \ + 'test_syskit_tests_empty' defined in 'orogen_syskit_tests' on \ + 'localhost' is assigned to 2 tasks. Below is the list of \ the dependent non-deployed actions. Right after the list is \ a detailed explanation of why the first two tasks are not merged: OroGen.orogen_syskit_tests.Empty(arg: 1, conf: ["default"], \