diff --git a/lib/vines/agent/connection.rb b/lib/vines/agent/connection.rb index 769788d..84a01eb 100644 --- a/lib/vines/agent/connection.rb +++ b/lib/vines/agent/connection.rb @@ -17,8 +17,10 @@ def initialize(options) *options.values_at(:domain, :password, :host, :port, :download, :conf) certs = File.expand_path('certs', conf) - @permissions, @services, @sessions, @component = {}, {}, {}, nil + @permissions, @services, @sessions, @components, @component = {}, {}, {}, [], nil @ready = false + @task_responses = {} + @mtx = Mutex.new jid = Blather::JID.new(fqdn, domain, 'vines') @stream = Blather::Client.setup(jid, password, host, port, certs) @@ -37,13 +39,16 @@ def initialize(options) end @stream.register_handler(:ready) do - # prevent handler called twice - unless @ready - log.info("Connected #{@stream.jid} agent to #{host}:#{port}") - log.warn("Agent must run as root user to allow user switching") unless root? - @ready = true - startup - end + # [AM] making sure we are to init once + # unless @ready is not enough for an obvious reason + @mtx.synchronize { + unless @ready + log.info("Connected #{@stream.jid} agent to #{host}:#{port}") + log.warn("Agent must run as root user to allow user switching") unless root? + @ready = true + startup + end + } end @stream.register_handler(:subscription, :request?) do |node| @@ -77,23 +82,14 @@ def start @stream.run end +# ————————————————————————————————————————————————————————————————————————— private +# ————————————————————————————————————————————————————————————————————————— # After the bot connects to the chat server, discover the component, send # our ohai system description data, and initialize permissions. def startup - cb = proc do |component| - if component - log.info("Found vines component at #{component}") - @component = component - send_system_info - request_permissions - else - log.info("Vines component not found, rediscovering . . .") - EM::Timer.new(10) { discover_component(&cb) } - end - end - discover_component(&cb) + discover_component end def version(node) @@ -173,33 +169,48 @@ def fqdn # The component broadcasting the http://getvines.com/protocol feature is our # Vines service. def discover_component + @components = [] disco = Blather::Stanza::DiscoItems.new disco.to = @stream.jid.domain @stream.write_with_handler(disco) do |result| - items = result.error? ? [] : result.items - Fiber.new do - # use fiber instead of EM::Iterator until EM 1.0.0 release - found = items.find {|item| component?(item.jid) } - yield found ? found.jid : nil - end.resume - end - end - - # Return true if this JID is the Vines component with which we need to - # communicate. This method suspends the Fiber that calls it in order to - # turn the disco#info requests synchronous. - def component?(jid) - fiber = Fiber.current - info = Blather::Stanza::DiscoInfo.new - info.to = jid - @stream.write_with_handler(info) do |reply| - features = reply.error? ? [] : reply.features - found = !!features.find {|f| f.var == NS } - fiber.resume(found) + unless result.error? + info = Blather::Stanza::DiscoInfo.new + # iterate through disco result and collect ’em all + EM::Iterator.new(result.items).map proc{ |comp, it_disco| + info.to = comp.jid.domain + @stream.write_with_handler(info) do |reply| + unless reply.error? + # iterate through info results and collect ’em all + EM::Iterator.new(reply.features).map proc{ |f, it_info| + it_info.return f.var == NS ? comp : nil + }, proc{ |comps| + # we have collected all the info replies for the + # disco given, let’s proceed with the next + it_disco.return comps - [nil] + } + end + end + }, proc{ |compss| + # Well, we yielded all the discos, let's request perms etc + @components = compss.flatten.uniq + + if !@components || @components.length < 1 + log.info("Vines component not found, rediscovering…") + EM::Timer.new(30) { discover_component } + else + @component = @components[0].jid + log.info("Vines component found #{@component}") + if @components.length > 1 + log.warn("Using one #{@component} out of #{@components.length} found") + end + send_system_info + request_permissions + end + } + end end - Fiber.yield end - + # Download the list of unix user accounts and the JID's that are allowed # to use them. This is used to determine if a change user command like # +v user root+ is allowed. @@ -238,8 +249,34 @@ def process_message(message) return unless valid_user?(bare) session = @sessions[full] ||= Shell.new(bare, @permissions) - session.run(message.body.strip) do |output| - @stream.write(reply(message, output, forward_to)) + + # [AM] Create a TickLoop to collect shell output and send + # back to recipient by portions. This is needed to + # implement non-blocking, but not annoying on the other hand + # service. E. g. “ls” im most cases will return immediately, + # while “ping google.com” will send back stanzas every second + + session.on_output = lambda {|output| @task_responses[message.id] += output } + session.on_error = lambda {|error| @task_responses[message.id] += "⇒ #{error}" } + + @task_responses[message.id] = "" + task_response_tickloop = EM.tick_loop do + unless @task_responses[message.id].empty? + @stream.write(reply(message, @task_responses[message.id], forward_to)) + @task_responses[message.id] = "" + end + sleep 1 + end + session.run(message.body.strip) do |output, exitstatus| + task_response_tickloop.stop + task_response_tickloop = nil + unless @task_responses[message.id].empty? + @stream.write(reply(message, @task_responses[message.id], forward_to)) + end + @task_responses.delete message.id + if exitstatus && exitstatus != 0 + @stream.write(reply(message, "#{exitstatus} ↵ #{message.body}", forward_to)) + end end end @@ -250,7 +287,7 @@ def reply(message, body, forward_to) Blather::Stanza::Message.new(message.from, body).tap do |node| node << node.document.create_element('jid', forward_to, xmlns: NS) if forward_to node.thread = message.thread if message.thread - node.xhtml = '' + node.xhtml = '' span = node.xhtml_node.elements.first body.each_line do |line| span.add_child(Nokogiri::XML::Text.new(line.chomp, span.document)) diff --git a/lib/vines/agent/shell.rb b/lib/vines/agent/shell.rb index 5fbec63..268771d 100644 --- a/lib/vines/agent/shell.rb +++ b/lib/vines/agent/shell.rb @@ -10,20 +10,25 @@ module Agent class Shell include Vines::Log - attr_writer :permissions - + attr_writer :permissions, :on_output, :on_error + # Create a new shell session to asynchronously execute commands for this # JID. The JID is validated in the permissions Hash before executing # commands. def initialize(jid, permissions) @jid, @permissions = jid, permissions @user = allowed_users.first if allowed_users.size == 1 + @on_output = nil + @on_error = nil @commands = EM::Queue.new process_command_queue end # Queue the shell command to run as soon as the currently executing tasks # complete. Yields the shell output to the callback block. + # [AM] “v reset” command is supposed to be executed immediately + # to give an ability of interrupting + # (in general, interferring) the current shell task queue def run(command, &callback) if reset?(command) callback.call(run_built_in(command)) @@ -46,8 +51,8 @@ def process_command_queue run_in_slave(command[:command]) end end - cb = proc do |output| - command[:callback].call(output) + cb = proc do |output, exitstatus| + command[:callback].call(output, exitstatus) process_command_queue end EM.defer(op, cb) @@ -58,13 +63,18 @@ def run_in_slave(command) return "-> no user selected, run 'v user'" unless @user log.info("Running #{command} as #{@user}") - spawn(@user) unless @shell + unless @shell + spawn(@user) + end + @shell.outproc = @on_output if @on_output + @shell.errproc = @on_error if @on_error + out, err = @shell.execute(command) output = [].tap do |arr| arr << out if out && !out.empty? arr << err if err && !err.empty? end.join("\n") - output.empty? ? '-> command completed' : output + return [output.empty? ? '-> command completed' : output, @shell.exitstatus] rescue close '-> restarted shell'