From ccbfeec58eb66c7458be8a4016b94d1c3140c501 Mon Sep 17 00:00:00 2001 From: Kieran Klaassen Date: Sat, 22 Nov 2025 14:55:17 -0800 Subject: [PATCH 1/2] Fix callback persistence chaining using Fanout pattern MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When using acts_as_chat, persistence callbacks were being overwritten when users added their own callbacks via on_new_message, on_end_message, etc. This caused silent data loss. The fix adds a simple CallbackFanout class that appends callbacks instead of replacing them, so both persistence and user callbacks run. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- lib/ruby_llm/active_record/acts_as_legacy.rb | 18 +-- lib/ruby_llm/active_record/chat_methods.rb | 18 +-- lib/ruby_llm/chat.rb | 27 +++- ...s_on_to_llm_without_losing_persistence.yml | 116 ++++++++++++++++++ spec/ruby_llm/active_record/acts_as_spec.rb | 14 +++ 5 files changed, 155 insertions(+), 38 deletions(-) create mode 100644 spec/fixtures/vcr_cassettes/activerecord_actsas_event_callbacks_allows_chaining_callbacks_on_to_llm_without_losing_persistence.yml diff --git a/lib/ruby_llm/active_record/acts_as_legacy.rb b/lib/ruby_llm/active_record/acts_as_legacy.rb index 97679c126..f2856460c 100644 --- a/lib/ruby_llm/active_record/acts_as_legacy.rb +++ b/lib/ruby_llm/active_record/acts_as_legacy.rb @@ -152,26 +152,12 @@ def with_schema(...) end def on_new_message(&block) - to_llm - - existing_callback = @chat.instance_variable_get(:@on)[:new_message] - - @chat.on_new_message do - existing_callback&.call - block&.call - end + to_llm.on_new_message(&block) self end def on_end_message(&block) - to_llm - - existing_callback = @chat.instance_variable_get(:@on)[:end_message] - - @chat.on_end_message do |msg| - existing_callback&.call(msg) - block&.call(msg) - end + to_llm.on_end_message(&block) self end diff --git a/lib/ruby_llm/active_record/chat_methods.rb b/lib/ruby_llm/active_record/chat_methods.rb index 41930548c..6bd3a9eb2 100644 --- a/lib/ruby_llm/active_record/chat_methods.rb +++ b/lib/ruby_llm/active_record/chat_methods.rb @@ -140,26 +140,12 @@ def with_schema(...) end def on_new_message(&block) - to_llm - - existing_callback = @chat.instance_variable_get(:@on)[:new_message] - - @chat.on_new_message do - existing_callback&.call - block&.call - end + to_llm.on_new_message(&block) self end def on_end_message(&block) - to_llm - - existing_callback = @chat.instance_variable_get(:@on)[:end_message] - - @chat.on_end_message do |msg| - existing_callback&.call(msg) - block&.call(msg) - end + to_llm.on_end_message(&block) self end diff --git a/lib/ruby_llm/chat.rb b/lib/ruby_llm/chat.rb index d03d872ca..051540920 100644 --- a/lib/ruby_llm/chat.rb +++ b/lib/ruby_llm/chat.rb @@ -7,6 +7,26 @@ class Chat attr_reader :model, :messages, :tools, :params, :headers, :schema + # Stores multiple callbacks per key and invokes all of them + class CallbackFanout + def initialize + @callbacks = {} + end + + def [](key) + callbacks = @callbacks[key] + return if callbacks.nil? || callbacks.empty? + + ->(*args) { callbacks.each { |cb| cb.call(*args) } } + end + + def []=(key, callable) + return unless callable + + (@callbacks[key] ||= []) << callable + end + end + def initialize(model: nil, provider: nil, assume_model_exists: false, context: nil) if assume_model_exists && !provider raise ArgumentError, 'Provider must be specified if assume_model_exists is true' @@ -22,12 +42,7 @@ def initialize(model: nil, provider: nil, assume_model_exists: false, context: n @params = {} @headers = {} @schema = nil - @on = { - new_message: nil, - end_message: nil, - tool_call: nil, - tool_result: nil - } + @on = CallbackFanout.new end def ask(message = nil, with: nil, &) diff --git a/spec/fixtures/vcr_cassettes/activerecord_actsas_event_callbacks_allows_chaining_callbacks_on_to_llm_without_losing_persistence.yml b/spec/fixtures/vcr_cassettes/activerecord_actsas_event_callbacks_allows_chaining_callbacks_on_to_llm_without_losing_persistence.yml new file mode 100644 index 000000000..bcc0bac29 --- /dev/null +++ b/spec/fixtures/vcr_cassettes/activerecord_actsas_event_callbacks_allows_chaining_callbacks_on_to_llm_without_losing_persistence.yml @@ -0,0 +1,116 @@ +--- +http_interactions: +- request: + method: post + uri: https://api.openai.com/v1/chat/completions + body: + encoding: UTF-8 + string: '{"model":"gpt-4.1-nano","messages":[{"role":"user","content":"Hello"}],"stream":false}' + headers: + User-Agent: + - Faraday v2.12.2 + Authorization: + - Bearer + Content-Type: + - application/json + Accept-Encoding: + - gzip;q=1.0,deflate;q=0.6,identity;q=0.3 + Accept: + - "*/*" + response: + status: + code: 200 + message: OK + headers: + Date: + - Thu, 20 Nov 2025 04:59:37 GMT + Content-Type: + - application/json + Transfer-Encoding: + - chunked + Connection: + - keep-alive + Access-Control-Expose-Headers: + - X-Request-ID + Openai-Organization: + - "" + Openai-Processing-Ms: + - '438' + Openai-Project: + - proj_JsyAYubtMiaADRh0HSQAkspY + Openai-Version: + - '2020-10-01' + X-Envoy-Upstream-Service-Time: + - '612' + X-Ratelimit-Limit-Requests: + - '30000' + X-Ratelimit-Limit-Tokens: + - '150000000' + X-Ratelimit-Remaining-Requests: + - '29999' + X-Ratelimit-Remaining-Tokens: + - '149999997' + X-Ratelimit-Reset-Requests: + - 2ms + X-Ratelimit-Reset-Tokens: + - 0s + X-Request-Id: + - "" + X-Openai-Proxy-Wasm: + - v0.1 + Cf-Cache-Status: + - DYNAMIC + Set-Cookie: + - "" + - "" + Strict-Transport-Security: + - max-age=31536000; includeSubDomains; preload + X-Content-Type-Options: + - nosniff + Server: + - cloudflare + Cf-Ray: + - "" + Alt-Svc: + - h3=":443"; ma=86400 + body: + encoding: ASCII-8BIT + string: | + { + "id": "chatcmpl-Cdr3YpCf1Q7ttTCYdItalr3u0saaF", + "object": "chat.completion", + "created": 1763614776, + "model": "gpt-4.1-nano-2025-04-14", + "choices": [ + { + "index": 0, + "message": { + "role": "assistant", + "content": "Hello! How can I assist you today?", + "refusal": null, + "annotations": [] + }, + "logprobs": null, + "finish_reason": "stop" + } + ], + "usage": { + "prompt_tokens": 8, + "completion_tokens": 9, + "total_tokens": 17, + "prompt_tokens_details": { + "cached_tokens": 0, + "audio_tokens": 0 + }, + "completion_tokens_details": { + "reasoning_tokens": 0, + "audio_tokens": 0, + "accepted_prediction_tokens": 0, + "rejected_prediction_tokens": 0 + } + }, + "service_tier": "default", + "system_fingerprint": "fp_1a97b5aa6c" + } + recorded_at: Thu, 20 Nov 2025 04:59:36 GMT +recorded_with: VCR 6.3.1 diff --git a/spec/ruby_llm/active_record/acts_as_spec.rb b/spec/ruby_llm/active_record/acts_as_spec.rb index a36c00b49..3aff2f1de 100644 --- a/spec/ruby_llm/active_record/acts_as_spec.rb +++ b/spec/ruby_llm/active_record/acts_as_spec.rb @@ -622,6 +622,20 @@ def uploaded_file(path, type) expect(chat.messages.count).to eq(2) # Persistence still works end + it 'allows chaining callbacks on to_llm without losing persistence' do + chat = Chat.create!(model: model) + llm_chat = chat.to_llm + + user_callback_called = false + # Directly attach callback to the underlying Chat object + llm_chat.on_new_message { user_callback_called = true } + + chat.ask('Hello') + + expect(user_callback_called).to be true + expect(chat.messages.count).to eq(2) # Persistence still works + end + it 'calls on_tool_call and on_tool_result callbacks' do tool_call_received = nil tool_result_received = nil From 6303cdbda8e85e08712228aec2aed2febd19d446 Mon Sep 17 00:00:00 2001 From: Kieran Klaassen Date: Mon, 15 Dec 2025 08:17:37 -0800 Subject: [PATCH 2/2] Refactor callback fanout to method-based API # Conflicts: # lib/ruby_llm/chat.rb --- .ruby-version | 1 + lib/ruby_llm/chat.rb | 84 +++++++++++++++++++++----------------------- 2 files changed, 41 insertions(+), 44 deletions(-) create mode 100644 .ruby-version diff --git a/.ruby-version b/.ruby-version new file mode 100644 index 000000000..be94e6f53 --- /dev/null +++ b/.ruby-version @@ -0,0 +1 @@ +3.2.2 diff --git a/lib/ruby_llm/chat.rb b/lib/ruby_llm/chat.rb index 051540920..c1ec1df62 100644 --- a/lib/ruby_llm/chat.rb +++ b/lib/ruby_llm/chat.rb @@ -7,23 +7,26 @@ class Chat attr_reader :model, :messages, :tools, :params, :headers, :schema - # Stores multiple callbacks per key and invokes all of them + # Stores multiple callbacks per key and invokes all of them. + # + # Internally we keep a callable per event (via `callback_for`) so higher + # level code can safely chain callbacks without overwriting persistence. class CallbackFanout def initialize - @callbacks = {} + @callbacks = Hash.new { |h, k| h[k] = [] } end - def [](key) - callbacks = @callbacks[key] - return if callbacks.nil? || callbacks.empty? + def add(key, callable) + return unless callable - ->(*args) { callbacks.each { |cb| cb.call(*args) } } + @callbacks[key] << callable end - def []=(key, callable) - return unless callable + def callback_for(key) + callbacks = @callbacks[key] + return if callbacks.empty? - (@callbacks[key] ||= []) << callable + ->(*args) { callbacks.each { |cb| cb.call(*args) } } end end @@ -45,9 +48,9 @@ def initialize(model: nil, provider: nil, assume_model_exists: false, context: n @on = CallbackFanout.new end - def ask(message = nil, with: nil, &) - add_message role: :user, content: build_content(message, with) - complete(&) + def ask(message = nil, with: nil, &block) + add_message role: :user, content: Content.new(message, with) + complete(&block) end alias say ask @@ -72,7 +75,7 @@ def with_tools(*tools, replace: false) end def with_model(model_id, provider: nil, assume_exists: false) - @model, @provider = Models.resolve(model_id, provider:, assume_exists:, config: @config) + @model, @provider = Models.resolve(model_id, provider: provider, assume_exists: assume_exists, config: @config) @connection = @provider.connection self end @@ -113,30 +116,30 @@ def with_schema(schema) end def on_new_message(&block) - @on[:new_message] = block + @on.add(:new_message, block) self end def on_end_message(&block) - @on[:end_message] = block + @on.add(:end_message, block) self end def on_tool_call(&block) - @on[:tool_call] = block + @on.add(:tool_call, block) self end def on_tool_result(&block) - @on[:tool_result] = block + @on.add(:tool_result, block) self end - def each(&) - messages.each(&) + def each(&block) + messages.each(&block) end - def complete(&) # rubocop:disable Metrics/PerceivedComplexity + def complete(&block) # rubocop:disable Metrics/PerceivedComplexity response = @provider.complete( messages, tools: @tools, @@ -145,10 +148,10 @@ def complete(&) # rubocop:disable Metrics/PerceivedComplexity params: @params, headers: @headers, schema: @schema, - &wrap_streaming_block(&) + &wrap_streaming_block(&block) ) - @on[:new_message]&.call unless block_given? + callback_for(:new_message)&.call unless block if @schema && response.content.is_a?(String) begin @@ -159,10 +162,10 @@ def complete(&) # rubocop:disable Metrics/PerceivedComplexity end add_message response - @on[:end_message]&.call(response) + callback_for(:end_message)&.call(response) if response.tool_call? - handle_tool_calls(response, &) + handle_tool_calls(response, &block) else response end @@ -184,6 +187,10 @@ def instance_variables private + def callback_for(key) + @on.callback_for(key) + end + def wrap_streaming_block(&block) return nil unless block_given? @@ -193,30 +200,29 @@ def wrap_streaming_block(&block) # Create message on first content chunk unless first_chunk_received first_chunk_received = true - @on[:new_message]&.call + callback_for(:new_message)&.call end block.call chunk end end - def handle_tool_calls(response, &) # rubocop:disable Metrics/PerceivedComplexity + def handle_tool_calls(response, &block) # rubocop:disable Metrics/PerceivedComplexity halt_result = nil response.tool_calls.each_value do |tool_call| - @on[:new_message]&.call - @on[:tool_call]&.call(tool_call) + callback_for(:new_message)&.call + callback_for(:tool_call)&.call(tool_call) result = execute_tool tool_call - @on[:tool_result]&.call(result) - tool_payload = result.is_a?(Tool::Halt) ? result.content : result - content = content_like?(tool_payload) ? tool_payload : tool_payload.to_s - message = add_message role: :tool, content:, tool_call_id: tool_call.id - @on[:end_message]&.call(message) + callback_for(:tool_result)&.call(result) + content = result.is_a?(Content) ? result : result.to_s + message = add_message role: :tool, content: content, tool_call_id: tool_call.id + callback_for(:end_message)&.call(message) halt_result = result if result.is_a?(Tool::Halt) end - halt_result || complete(&) + halt_result || complete(&block) end def execute_tool(tool_call) @@ -224,15 +230,5 @@ def execute_tool(tool_call) args = tool_call.arguments tool.call(args) end - - def build_content(message, attachments) - return message if content_like?(message) - - Content.new(message, attachments) - end - - def content_like?(object) - object.is_a?(Content) || object.is_a?(Content::Raw) - end end end