diff --git a/lib/vines/agent/connection.rb b/lib/vines/agent/connection.rb
index 769788d..84a01eb 100644
--- a/lib/vines/agent/connection.rb
+++ b/lib/vines/agent/connection.rb
@@ -17,8 +17,10 @@ def initialize(options)
*options.values_at(:domain, :password, :host, :port, :download, :conf)
certs = File.expand_path('certs', conf)
- @permissions, @services, @sessions, @component = {}, {}, {}, nil
+ @permissions, @services, @sessions, @components, @component = {}, {}, {}, [], nil
@ready = false
+ @task_responses = {}
+ @mtx = Mutex.new
jid = Blather::JID.new(fqdn, domain, 'vines')
@stream = Blather::Client.setup(jid, password, host, port, certs)
@@ -37,13 +39,16 @@ def initialize(options)
end
@stream.register_handler(:ready) do
- # prevent handler called twice
- unless @ready
- log.info("Connected #{@stream.jid} agent to #{host}:#{port}")
- log.warn("Agent must run as root user to allow user switching") unless root?
- @ready = true
- startup
- end
+ # [AM] making sure we are to init once
+ # unless @ready is not enough for an obvious reason
+ @mtx.synchronize {
+ unless @ready
+ log.info("Connected #{@stream.jid} agent to #{host}:#{port}")
+ log.warn("Agent must run as root user to allow user switching") unless root?
+ @ready = true
+ startup
+ end
+ }
end
@stream.register_handler(:subscription, :request?) do |node|
@@ -77,23 +82,14 @@ def start
@stream.run
end
+# —————————————————————————————————————————————————————————————————————————
private
+# —————————————————————————————————————————————————————————————————————————
# After the bot connects to the chat server, discover the component, send
# our ohai system description data, and initialize permissions.
def startup
- cb = proc do |component|
- if component
- log.info("Found vines component at #{component}")
- @component = component
- send_system_info
- request_permissions
- else
- log.info("Vines component not found, rediscovering . . .")
- EM::Timer.new(10) { discover_component(&cb) }
- end
- end
- discover_component(&cb)
+ discover_component
end
def version(node)
@@ -173,33 +169,48 @@ def fqdn
# The component broadcasting the http://getvines.com/protocol feature is our
# Vines service.
def discover_component
+ @components = []
disco = Blather::Stanza::DiscoItems.new
disco.to = @stream.jid.domain
@stream.write_with_handler(disco) do |result|
- items = result.error? ? [] : result.items
- Fiber.new do
- # use fiber instead of EM::Iterator until EM 1.0.0 release
- found = items.find {|item| component?(item.jid) }
- yield found ? found.jid : nil
- end.resume
- end
- end
-
- # Return true if this JID is the Vines component with which we need to
- # communicate. This method suspends the Fiber that calls it in order to
- # turn the disco#info requests synchronous.
- def component?(jid)
- fiber = Fiber.current
- info = Blather::Stanza::DiscoInfo.new
- info.to = jid
- @stream.write_with_handler(info) do |reply|
- features = reply.error? ? [] : reply.features
- found = !!features.find {|f| f.var == NS }
- fiber.resume(found)
+ unless result.error?
+ info = Blather::Stanza::DiscoInfo.new
+ # iterate through disco result and collect ’em all
+ EM::Iterator.new(result.items).map proc{ |comp, it_disco|
+ info.to = comp.jid.domain
+ @stream.write_with_handler(info) do |reply|
+ unless reply.error?
+ # iterate through info results and collect ’em all
+ EM::Iterator.new(reply.features).map proc{ |f, it_info|
+ it_info.return f.var == NS ? comp : nil
+ }, proc{ |comps|
+ # we have collected all the info replies for the
+ # disco given, let’s proceed with the next
+ it_disco.return comps - [nil]
+ }
+ end
+ end
+ }, proc{ |compss|
+ # Well, we yielded all the discos, let's request perms etc
+ @components = compss.flatten.uniq
+
+ if !@components || @components.length < 1
+ log.info("Vines component not found, rediscovering…")
+ EM::Timer.new(30) { discover_component }
+ else
+ @component = @components[0].jid
+ log.info("Vines component found #{@component}")
+ if @components.length > 1
+ log.warn("Using one #{@component} out of #{@components.length} found")
+ end
+ send_system_info
+ request_permissions
+ end
+ }
+ end
end
- Fiber.yield
end
-
+
# Download the list of unix user accounts and the JID's that are allowed
# to use them. This is used to determine if a change user command like
# +v user root+ is allowed.
@@ -238,8 +249,34 @@ def process_message(message)
return unless valid_user?(bare)
session = @sessions[full] ||= Shell.new(bare, @permissions)
- session.run(message.body.strip) do |output|
- @stream.write(reply(message, output, forward_to))
+
+ # [AM] Create a TickLoop to collect shell output and send
+ # back to recipient by portions. This is needed to
+ # implement non-blocking, but not annoying on the other hand
+ # service. E. g. “ls” im most cases will return immediately,
+ # while “ping google.com” will send back stanzas every second
+
+ session.on_output = lambda {|output| @task_responses[message.id] += output }
+ session.on_error = lambda {|error| @task_responses[message.id] += "⇒ #{error}" }
+
+ @task_responses[message.id] = ""
+ task_response_tickloop = EM.tick_loop do
+ unless @task_responses[message.id].empty?
+ @stream.write(reply(message, @task_responses[message.id], forward_to))
+ @task_responses[message.id] = ""
+ end
+ sleep 1
+ end
+ session.run(message.body.strip) do |output, exitstatus|
+ task_response_tickloop.stop
+ task_response_tickloop = nil
+ unless @task_responses[message.id].empty?
+ @stream.write(reply(message, @task_responses[message.id], forward_to))
+ end
+ @task_responses.delete message.id
+ if exitstatus && exitstatus != 0
+ @stream.write(reply(message, "#{exitstatus} ↵ #{message.body}", forward_to))
+ end
end
end
@@ -250,7 +287,7 @@ def reply(message, body, forward_to)
Blather::Stanza::Message.new(message.from, body).tap do |node|
node << node.document.create_element('jid', forward_to, xmlns: NS) if forward_to
node.thread = message.thread if message.thread
- node.xhtml = ''
+ node.xhtml = ''
span = node.xhtml_node.elements.first
body.each_line do |line|
span.add_child(Nokogiri::XML::Text.new(line.chomp, span.document))
diff --git a/lib/vines/agent/shell.rb b/lib/vines/agent/shell.rb
index 5fbec63..268771d 100644
--- a/lib/vines/agent/shell.rb
+++ b/lib/vines/agent/shell.rb
@@ -10,20 +10,25 @@ module Agent
class Shell
include Vines::Log
- attr_writer :permissions
-
+ attr_writer :permissions, :on_output, :on_error
+
# Create a new shell session to asynchronously execute commands for this
# JID. The JID is validated in the permissions Hash before executing
# commands.
def initialize(jid, permissions)
@jid, @permissions = jid, permissions
@user = allowed_users.first if allowed_users.size == 1
+ @on_output = nil
+ @on_error = nil
@commands = EM::Queue.new
process_command_queue
end
# Queue the shell command to run as soon as the currently executing tasks
# complete. Yields the shell output to the callback block.
+ # [AM] “v reset” command is supposed to be executed immediately
+ # to give an ability of interrupting
+ # (in general, interferring) the current shell task queue
def run(command, &callback)
if reset?(command)
callback.call(run_built_in(command))
@@ -46,8 +51,8 @@ def process_command_queue
run_in_slave(command[:command])
end
end
- cb = proc do |output|
- command[:callback].call(output)
+ cb = proc do |output, exitstatus|
+ command[:callback].call(output, exitstatus)
process_command_queue
end
EM.defer(op, cb)
@@ -58,13 +63,18 @@ def run_in_slave(command)
return "-> no user selected, run 'v user'" unless @user
log.info("Running #{command} as #{@user}")
- spawn(@user) unless @shell
+ unless @shell
+ spawn(@user)
+ end
+ @shell.outproc = @on_output if @on_output
+ @shell.errproc = @on_error if @on_error
+
out, err = @shell.execute(command)
output = [].tap do |arr|
arr << out if out && !out.empty?
arr << err if err && !err.empty?
end.join("\n")
- output.empty? ? '-> command completed' : output
+ return [output.empty? ? '-> command completed' : output, @shell.exitstatus]
rescue
close
'-> restarted shell'