Streaming AI Responses in Rails — ActionCable + Turbo + OpenAI Streaming

ruby dev.to

This is post #20 in the Ruby for AI series. Last time we built AI agents with tool use. Today we're solving a UX problem: nobody wants to stare at a spinner for 15 seconds while GPT thinks. We're building real-time streaming from OpenAI through ActionCable and Turbo Streams — token by token, as they arrive.

The Problem

Standard HTTP request-response doesn't work for AI. The model generates tokens over 5-20 seconds. Without streaming, your user sees nothing until the entire response is ready. That's unacceptable.

The fix: stream tokens from OpenAI → your Rails server → the browser, in real time.

Architecture

Here's the flow:

OpenAI API (SSE) → Rails Background Job → ActionCable → Turbo Stream → Browser DOM
Enter fullscreen mode Exit fullscreen mode

The user submits a prompt. A background job calls OpenAI with stream: true. Each chunk gets broadcast via ActionCable. Turbo Streams update the DOM. No custom JavaScript required.

Setting Up the Channel

Generate an ActionCable channel for chat streaming:

rails generate channel ChatStream
Enter fullscreen mode Exit fullscreen mode

Configure the channel to stream from a conversation-specific key:

# app/channels/chat_stream_channel.rb
class ChatStreamChannel < ApplicationCable::Channel
  def subscribed
    stream_from "chat_stream_#{params[:conversation_id]}"
  end

  def unsubscribed
    # Cleanup if needed
  end
end
Enter fullscreen mode Exit fullscreen mode

The Streaming Job

This is where the magic happens. We call OpenAI with streaming enabled and broadcast each chunk:

# app/jobs/stream_ai_response_job.rb
class StreamAiResponseJob < ApplicationJob
  queue_as :default

  def perform(conversation_id, prompt)
    client = OpenAI::Client.new(access_token: ENV["OPENAI_API_KEY"])
    message = Message.create!(
      conversation_id: conversation_id,
      role: "assistant",
      content: ""
    )

    client.chat(
      parameters: {
        model: "gpt-4o",
        messages: [{ role: "user", content: prompt }],
        stream: proc do |chunk, _bytesize|
          delta = chunk.dig("choices", 0, "delta", "content")
          next unless delta

          # Append to the database record
          message.update!(content: message.content + delta)

          # Broadcast the new token via ActionCable
          ActionCable.server.broadcast(
            "chat_stream_#{conversation_id}",
            {
              type: "token",
              content: delta,
              message_id: message.id
            }
          )
        end
      }
    )

    # Signal completion
    ActionCable.server.broadcast(
      "chat_stream_#{conversation_id}",
      { type: "done", message_id: message.id }
    )
  end
end
Enter fullscreen mode Exit fullscreen mode

Each token arrives as a Server-Sent Event from OpenAI. We grab the delta content, save it, and push it to the browser immediately.

The Controller

The controller kicks off the job and returns instantly:

# app/controllers/messages_controller.rb
class MessagesController < ApplicationController
  def create
    conversation = Conversation.find(params[:conversation_id])

    # Save user message
    Message.create!(
      conversation: conversation,
      role: "user",
      content: params[:content]
    )

    # Start streaming in background
    StreamAiResponseJob.perform_later(conversation.id, params[:content])

    head :ok
  end
end
Enter fullscreen mode Exit fullscreen mode

No waiting. The response streams in asynchronously.

The Turbo Stream Approach

Instead of raw ActionCable JavaScript, use Turbo Streams for zero-JS DOM updates. Add a Stimulus controller to handle the token appending:

# app/views/conversations/show.html.erb
<div id="messages" data-controller="chat-stream"
     data-chat-stream-conversation-id-value="<%= @conversation.id %>">
  <% @conversation.messages.each do |msg| %>
    <div class="message <%= msg.role %>">
      <%= msg.content %>
    </div>
  <% end %>

  <div id="streaming-response" class="message assistant" style="display: none;">
  </div>
</div>
Enter fullscreen mode Exit fullscreen mode
// app/javascript/controllers/chat_stream_controller.js
import { Controller } from "@hotwired/stimulus"
import { createConsumer } from "@rails/actioncable"

export default class extends Controller {
  static values = { conversationId: Number }

  connect() {
    this.channel = createConsumer().subscriptions.create(
      { channel: "ChatStreamChannel", conversation_id: this.conversationIdValue },
      {
        received: (data) => this.handleMessage(data)
      }
    )
  }

  handleMessage(data) {
    const el = document.getElementById("streaming-response")

    if (data.type === "token") {
      el.style.display = "block"
      el.textContent += data.content
    }

    if (data.type === "done") {
      // Finalize: replace streaming div with permanent message
      el.style.display = "none"
      const final = document.createElement("div")
      final.className = "message assistant"
      final.textContent = el.textContent
      el.parentNode.insertBefore(final, el)
      el.textContent = ""
    }
  }

  disconnect() {
    this.channel?.unsubscribe()
  }
}
Enter fullscreen mode Exit fullscreen mode

Handling Errors Mid-Stream

Streams can fail. Wrap your OpenAI call:

begin
  client.chat(parameters: { ... })
rescue Faraday::TimeoutError, OpenAI::Error => e
  ActionCable.server.broadcast(
    "chat_stream_#{conversation_id}",
    { type: "error", content: "Stream interrupted. Please retry." }
  )
  message.update!(content: message.content + "\n\n[Stream interrupted]")
end
Enter fullscreen mode Exit fullscreen mode

Performance Tips

Use Solid Queue or Sidekiq — the job holds a connection open for the entire stream duration (5-20 seconds). Make sure your worker pool can handle concurrent streams.

Don't save every token individually. Batch database writes — update every 10 tokens or every 500ms instead of on every single chunk:

buffer = ""
token_count = 0

stream: proc do |chunk, _bytesize|
  delta = chunk.dig("choices", 0, "delta", "content")
  next unless delta

  buffer += delta
  token_count += 1

  # Broadcast every token for smooth UX
  ActionCable.server.broadcast(channel, { type: "token", content: delta })

  # But only save to DB every 10 tokens
  if token_count % 10 == 0
    message.update!(content: message.content + buffer)
    buffer = ""
  end
end

# Flush remaining buffer
message.update!(content: message.content + buffer) if buffer.present?
Enter fullscreen mode Exit fullscreen mode

Set ActionCable adapter to Redis in production. The default async adapter only works in a single process.

What You Built

A full streaming pipeline: OpenAI SSE → Background Job → ActionCable → Browser DOM. Token-by-token updates. No page refreshes. No polling. Real-time AI responses that feel instant.

Next up: we'll explore LangChain.rb — Ruby's port of the popular LangChain framework, with chains, agents, and memory built in.

Read Full Tutorial open_in_new
arrow_back Back to Tutorials