Pub/Sub with Ruby

This guide builds a working publish/subscribe system as a Sinatra application in Ruby 4.0. An HTTP client posts an order to POST /orders; the Sinatra app publishes it to a RabbitMQ fanout exchange; three background threads each consume a copy independently one sends a confirmation email, one reserves inventory, one records the sale in analytics.

Everything runs in one process: Puma serves HTTP requests while three subscriber threads run alongside it, all started at boot from config.ru.

The key insight of pub/sub: the publisher does not know how many subscribers exist or what they do. Each subscriber is fully independent one crashing does not affect the others.

Application structure

orders-app/                        # root of the Sinatra application
│
├── Gemfile                        # gem dependencies (bunny, sinatra, puma, json)
├── Gemfile.lock                   # locked versions (generated by bundler)
│
├── config.ru                      # Rack entry point
│                                  #   1. requires app.rb and all subscribers
│                                  #   2. starts each subscriber in a Thread
│                                  #   3. hands control to Puma via `run OrdersApp`
│
├── app.rb                         # Sinatra application class (OrdersApp)
│                                  #   POST /orders  → publish to RabbitMQ, return 201
│                                  #   GET  /health  → return 200 OK
│
└── lib/                           # shared library code
    ├── connection.rb              # RabbitMQ::channel creates a Bunny channel
    ├── order.rb                   # Order = Data.define(...)  value object + #to_json
    └── subscribers/               # one file per subscriber, each owns its queue
        ├── email_subscriber.rb    # EmailSubscriber.start orders.email queue
        ├── inventory_subscriber.rb # InventorySubscriber.start orders.inventory queue
        └── analytics_subscriber.rb # AnalyticsSubscriber.start orders.analytics queue

Architecture

graph LR
  Client["HTTP client\nPOST /orders"]
  Sinatra["Sinatra app\napp.rb"]
  Exchange["orders\nfanout exchange"]
  Q1["orders.email"]
  Q2["orders.inventory"]
  Q3["orders.analytics"]
  T1["Thread: EmailSubscriber"]
  T2["Thread: InventorySubscriber"]
  T3["Thread: AnalyticsSubscriber"]

  Client -->|"JSON body"| Sinatra
  Sinatra -->|"publish"| Exchange
  Exchange --> Q1
  Exchange --> Q2
  Exchange --> Q3
  Q1 --> T1
  Q2 --> T2
  Q3 --> T3

One Puma process hosts both the HTTP server and all three subscriber threads. Each subscriber has its own Bunny channel channels are not thread-safe and must never be shared.

Project setup

1
2
3
4
5
6
7
8
9
10
# frozen_string_literal: true

source "https://rubygems.org"

ruby "~> 4.0"

gem "bunny",   "~> 2.22"
gem "json"
gem "sinatra", "~> 4.0"
gem "puma",    "~> 6.0"
1
2
3
mkdir orders-app && cd orders-app
mkdir -p lib/subscribers
bundle install

lib/order.rb

Ruby 4.0’s Data.define creates an immutable, comparable value object. Use it to represent the order payload throughout the codebase frozen at construction, no unexpected mutation in long-running threads.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# frozen_string_literal: true

require "json"

Order = Data.define(:order_id, :customer_email, :items, :total) do
  def to_json(*)
    {
      order_id: order_id,
      customer_email: customer_email,
      items: items,
      total: total
    }.to_json
  end
end

lib/connection.rb

All connection parameters come from environment variables with sensible localhost defaults for development. In production set these via your secrets manager never hardcode credentials.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# frozen_string_literal: true

require "bunny"

module RabbitMQ
  def self.channel
    connection = Bunny.new(
      host:     ENV.fetch("RABBITMQ_HOST",     "localhost"),
      port:     ENV.fetch("RABBITMQ_PORT",     5672).to_i,
      vhost:    ENV.fetch("RABBITMQ_VHOST",    "/"),
      user:     ENV.fetch("RABBITMQ_USER",     "guest"),
      password: ENV.fetch("RABBITMQ_PASSWORD", "guest")
    )
    connection.start
    connection.create_channel
  end
end

lib/subscribers/email_subscriber.rb

Each subscriber is a plain Ruby class with a self.start class method. start opens its own channel, declares and binds its queue, sets prefetch, then enters a blocking subscribe loop. Using block: false keeps the Bunny event loop non-blocking so the thread remains alive under Ruby’s scheduler.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# frozen_string_literal: true

require "json"
require_relative "../connection"

class EmailSubscriber
  EXCHANGE_NAME = "orders"
  QUEUE_NAME    = "orders.email"

  def self.start
    channel  = RabbitMQ.channel
    exchange = channel.fanout(EXCHANGE_NAME, durable: true)
    queue    = channel.queue(QUEUE_NAME, durable: true)
    queue.bind(exchange)
    channel.prefetch(1)

    puts "[Email] Subscriber started, waiting for orders..."

    queue.subscribe(manual_ack: true, block: false) do |delivery_info, _props, body|
      payload = JSON.parse(body, symbolize_names: true)

      payload in { order_id: String => order_id, customer_email: String => email }

      puts "[Email] Sending confirmation for #{order_id} to #{email}"
      sleep(0.1) # simulate delivery latency
      puts "[Email] Sent"

      channel.ack(delivery_info.delivery_tag)
    rescue NoMatchingPatternError => e
      puts "[Email] Rejected malformed message: #{e.message}"
      channel.nack(delivery_info.delivery_tag, false, false)
    end

    loop { sleep 5 } # keep thread alive
  end
end

lib/subscribers/inventory_subscriber.rb

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# frozen_string_literal: true

require "json"
require_relative "../connection"

class InventorySubscriber
  EXCHANGE_NAME = "orders"
  QUEUE_NAME    = "orders.inventory"

  def self.start
    channel  = RabbitMQ.channel
    exchange = channel.fanout(EXCHANGE_NAME, durable: true)
    queue    = channel.queue(QUEUE_NAME, durable: true)
    queue.bind(exchange)
    channel.prefetch(1)

    puts "[Inventory] Subscriber started, waiting for orders..."

    queue.subscribe(manual_ack: true, block: false) do |delivery_info, _props, body|
      payload = JSON.parse(body, symbolize_names: true)

      payload in { order_id: String => order_id, items: Array => items }

      items.each do |item|
        item in { sku: String => sku, qty: Integer => qty }
        puts "[Inventory] Reserving #{qty}x #{sku} for order #{order_id}"
      end

      channel.ack(delivery_info.delivery_tag)
    rescue NoMatchingPatternError => e
      puts "[Inventory] Rejected malformed message: #{e.message}"
      channel.nack(delivery_info.delivery_tag, false, false)
    end

    loop { sleep 5 }
  end
end

lib/subscribers/analytics_subscriber.rb

The union pattern (Integer | Float) accepts both numeric types for total a practical demonstration of Ruby 4.0 pattern matching in message validation.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# frozen_string_literal: true

require "json"
require_relative "../connection"

class AnalyticsSubscriber
  EXCHANGE_NAME = "orders"
  QUEUE_NAME    = "orders.analytics"

  def self.start
    channel  = RabbitMQ.channel
    exchange = channel.fanout(EXCHANGE_NAME, durable: true)
    queue    = channel.queue(QUEUE_NAME, durable: true)
    queue.bind(exchange)
    channel.prefetch(1)

    puts "[Analytics] Subscriber started, waiting for orders..."

    queue.subscribe(manual_ack: true, block: false) do |delivery_info, _props, body|
      payload = JSON.parse(body, symbolize_names: true)

      payload in { order_id: String => order_id, total: (Integer | Float) => total }

      puts "[Analytics] Recording sale #{order_id}: £#{format("%.2f", total)}"

      channel.ack(delivery_info.delivery_tag)
    rescue NoMatchingPatternError => e
      puts "[Analytics] Rejected malformed message: #{e.message}"
      channel.nack(delivery_info.delivery_tag, false, false)
    end

    loop { sleep 5 }
  end
end

app.rb

The Sinatra application exposes two routes. POST /orders parses the JSON body, builds an Order, publishes it to the fanout exchange, and returns 201 Created with the order ID. GET /health lets load balancers and orchestrators check liveness.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# frozen_string_literal: true

require "sinatra/base"
require "json"
require_relative "lib/connection"
require_relative "lib/order"

class OrdersApp < Sinatra::Base
  EXCHANGE_NAME = "orders"

  before do
    content_type :json
  end

  post "/orders" do
    body_data = JSON.parse(request.body.read, symbolize_names: true)

    order = Order.new(
      order_id:       body_data.fetch(:order_id, "ORD-#{SecureRandom.hex(4).upcase}"),
      customer_email: body_data.fetch(:customer_email),
      items:          body_data.fetch(:items),
      total:          body_data.fetch(:total)
    )

    channel  = RabbitMQ.channel
    exchange = channel.fanout(EXCHANGE_NAME, durable: true)
    exchange.publish(order.to_json, persistent: true, content_type: "application/json")
    channel.connection.close

    status 201
    { order_id: order.order_id, status: "published" }.to_json
  rescue KeyError => e
    status 422
    { error: "Missing field: #{e.message}" }.to_json
  rescue JSON::ParserError
    status 400
    { error: "Invalid JSON body" }.to_json
  end

  get "/health" do
    status 200
    { status: "ok" }.to_json
  end
end

A new RabbitMQ.channel (and connection) is created per request and closed after publishing. This keeps the HTTP path stateless and avoids channel lifecycle issues across Puma threads.

config.ru

config.ru is the single boot file. It starts each subscriber in a daemon thread before handing the Rack app to Puma. If a subscriber thread dies unexpectedly, the rescue block logs the error in production you would restart the thread or alert.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# frozen_string_literal: true

require_relative "app"
require_relative "lib/subscribers/email_subscriber"
require_relative "lib/subscribers/inventory_subscriber"
require_relative "lib/subscribers/analytics_subscriber"

[EmailSubscriber, InventorySubscriber, AnalyticsSubscriber].each do |subscriber|
  Thread.new do
    subscriber.start
  rescue => e
    warn "[#{subscriber}] Fatal error: #{e.message}"
    warn e.backtrace.first(5).join("\n")
  end
end

run OrdersApp

Running the application

1
bundle exec puma config.ru

Expected startup output:

[Email] Subscriber started, waiting for orders...
[Inventory] Subscriber started, waiting for orders...
[Analytics] Subscriber started, waiting for orders...
Puma starting in single mode...
* Puma version: 6.x.x
* Min threads: 0, max threads: 5
* Listening on http://0.0.0.0:9292

Publish an order via curl

1
2
3
4
5
6
7
curl -s -X POST http://localhost:9292/orders \
  -H "Content-Type: application/json" \
  -d '{
    "customer_email": "[email protected]",
    "items": [{"sku": "WIDGET-42", "qty": 2, "price": 19.99}],
    "total": 39.98
  }' | python3 -m json.tool

Expected HTTP response:

1
2
3
4
{
  "order_id": "ORD-A3F2C1B4",
  "status": "published"
}

Expected log output from the subscriber threads:

[Email] Sending confirmation for ORD-A3F2C1B4 to [email protected]
[Email] Sent
[Inventory] Reserving 2x WIDGET-42 for order ORD-A3F2C1B4
[Analytics] Recording sale ORD-A3F2C1B4: £39.98

Health check

1
2
curl -s http://localhost:9292/health
# {"status":"ok"}

What to avoid

Do not use block: true in subscriber threads. With block: true, Bunny’s internal event loop blocks the Ruby thread indefinitely and prevents the rescue wrapper in config.ru from ever catching a fatal error. Use block: false and keep the thread alive with loop { sleep 5 } instead this gives the thread scheduler room to breathe and lets exceptions propagate cleanly.

Do not share a channel across threads. Bunny channels are not thread-safe. Each subscriber must open its own channel via RabbitMQ.channel. Sharing a single channel between the HTTP request handler and a subscriber thread will cause unpredictable delivery failures and connection resets.

Do not use auto_ack: true in production. With automatic acknowledgement, RabbitMQ marks the message delivered the moment it is dispatched. If a subscriber thread crashes mid-processing, the message is lost permanently. Always use manual_ack: true and call channel.ack only after successful processing.

Do not open a persistent channel in app.rb. The POST /orders handler creates a new connection per request and closes it immediately after publishing. Opening a long-lived channel shared across Puma threads would require explicit locking and dramatically complicate error handling. The per-request connection cost is negligible for typical order throughput.

Do not forget frozen_string_literal: true in long-running subscriber files. Without it, every string literal in the subscribe callback allocates a new object on each message delivery. Over millions of messages this creates measurable GC pressure. The magic comment eliminates this at zero cost.