This is post #20 in the Ruby for AI series. Last time we built AI agents with tool use. Today we're solving a UX problem: nobody wants to stare at a spinner for 15 seconds while GPT thinks. We're building real-time streaming from OpenAI through ActionCable and Turbo Streams — token by token, as they arrive.
The Problem
Standard HTTP request-response doesn't work for AI. The model generates tokens over 5-20 seconds. Without streaming, your user sees nothing until the entire response is ready. That's unacceptable.
The fix: stream tokens from OpenAI → your Rails server → the browser, in real time.
Architecture
Here's the flow:
OpenAI API (SSE) → Rails Background Job → ActionCable → Turbo Stream → Browser DOM
The user submits a prompt. A background job calls OpenAI with stream: true. Each chunk gets broadcast via ActionCable. Turbo Streams update the DOM. No custom JavaScript required.
Setting Up the Channel
Generate an ActionCable channel for chat streaming:
rails generate channel ChatStream
Configure the channel to stream from a conversation-specific key:
# app/channels/chat_stream_channel.rb
class ChatStreamChannel < ApplicationCable::Channel
def subscribed
stream_from "chat_stream_#{params[:conversation_id]}"
end
def unsubscribed
# Cleanup if needed
end
end
The Streaming Job
This is where the magic happens. We call OpenAI with streaming enabled and broadcast each chunk:
# app/jobs/stream_ai_response_job.rb
class StreamAiResponseJob < ApplicationJob
queue_as :default
def perform(conversation_id, prompt)
client = OpenAI::Client.new(access_token: ENV["OPENAI_API_KEY"])
message = Message.create!(
conversation_id: conversation_id,
role: "assistant",
content: ""
)
client.chat(
parameters: {
model: "gpt-4o",
messages: [{ role: "user", content: prompt }],
stream: proc do |chunk, _bytesize|
delta = chunk.dig("choices", 0, "delta", "content")
next unless delta
# Append to the database record
message.update!(content: message.content + delta)
# Broadcast the new token via ActionCable
ActionCable.server.broadcast(
"chat_stream_#{conversation_id}",
{
type: "token",
content: delta,
message_id: message.id
}
)
end
}
)
# Signal completion
ActionCable.server.broadcast(
"chat_stream_#{conversation_id}",
{ type: "done", message_id: message.id }
)
end
end
Each token arrives as a Server-Sent Event from OpenAI. We grab the delta content, save it, and push it to the browser immediately.
The Controller
The controller kicks off the job and returns instantly:
# app/controllers/messages_controller.rb
class MessagesController < ApplicationController
def create
conversation = Conversation.find(params[:conversation_id])
# Save user message
Message.create!(
conversation: conversation,
role: "user",
content: params[:content]
)
# Start streaming in background
StreamAiResponseJob.perform_later(conversation.id, params[:content])
head :ok
end
end
No waiting. The response streams in asynchronously.
The Turbo Stream Approach
Instead of raw ActionCable JavaScript, use Turbo Streams for zero-JS DOM updates. Add a Stimulus controller to handle the token appending:
# app/views/conversations/show.html.erb
<div id="messages" data-controller="chat-stream"
data-chat-stream-conversation-id-value="<%= @conversation.id %>">
<% @conversation.messages.each do |msg| %>
<div class="message <%= msg.role %>">
<%= msg.content %>
</div>
<% end %>
<div id="streaming-response" class="message assistant" style="display: none;">
</div>
</div>
// app/javascript/controllers/chat_stream_controller.js
import { Controller } from "@hotwired/stimulus"
import { createConsumer } from "@rails/actioncable"
export default class extends Controller {
static values = { conversationId: Number }
connect() {
this.channel = createConsumer().subscriptions.create(
{ channel: "ChatStreamChannel", conversation_id: this.conversationIdValue },
{
received: (data) => this.handleMessage(data)
}
)
}
handleMessage(data) {
const el = document.getElementById("streaming-response")
if (data.type === "token") {
el.style.display = "block"
el.textContent += data.content
}
if (data.type === "done") {
// Finalize: replace streaming div with permanent message
el.style.display = "none"
const final = document.createElement("div")
final.className = "message assistant"
final.textContent = el.textContent
el.parentNode.insertBefore(final, el)
el.textContent = ""
}
}
disconnect() {
this.channel?.unsubscribe()
}
}
Handling Errors Mid-Stream
Streams can fail. Wrap your OpenAI call:
begin
client.chat(parameters: { ... })
rescue Faraday::TimeoutError, OpenAI::Error => e
ActionCable.server.broadcast(
"chat_stream_#{conversation_id}",
{ type: "error", content: "Stream interrupted. Please retry." }
)
message.update!(content: message.content + "\n\n[Stream interrupted]")
end
Performance Tips
Use Solid Queue or Sidekiq — the job holds a connection open for the entire stream duration (5-20 seconds). Make sure your worker pool can handle concurrent streams.
Don't save every token individually. Batch database writes — update every 10 tokens or every 500ms instead of on every single chunk:
buffer = ""
token_count = 0
stream: proc do |chunk, _bytesize|
delta = chunk.dig("choices", 0, "delta", "content")
next unless delta
buffer += delta
token_count += 1
# Broadcast every token for smooth UX
ActionCable.server.broadcast(channel, { type: "token", content: delta })
# But only save to DB every 10 tokens
if token_count % 10 == 0
message.update!(content: message.content + buffer)
buffer = ""
end
end
# Flush remaining buffer
message.update!(content: message.content + buffer) if buffer.present?
Set ActionCable adapter to Redis in production. The default async adapter only works in a single process.
What You Built
A full streaming pipeline: OpenAI SSE → Background Job → ActionCable → Browser DOM. Token-by-token updates. No page refreshes. No polling. Real-time AI responses that feel instant.
Next up: we'll explore LangChain.rb — Ruby's port of the popular LangChain framework, with chains, agents, and memory built in.