This guide builds a working publish/subscribe system as a Sinatra application in Ruby 4.0. An HTTP client posts an order to POST /orders; the Sinatra app publishes it to a RabbitMQ fanout exchange; three background threads each consume a copy independently one sends a confirmation email, one reserves inventory, one records the sale in analytics.
Everything runs in one process: Puma serves HTTP requests while three subscriber threads run alongside it, all started at boot from config.ru.
The key insight of pub/sub: the publisher does not know how many subscribers exist or what they do. Each subscriber is fully independent one crashing does not affect the others.
Application structure
orders-app/ # root of the Sinatra application
│
├── Gemfile # gem dependencies (bunny, sinatra, puma, json)
├── Gemfile.lock # locked versions (generated by bundler)
│
├── config.ru # Rack entry point
│ # 1. requires app.rb and all subscribers
│ # 2. starts each subscriber in a Thread
│ # 3. hands control to Puma via `run OrdersApp`
│
├── app.rb # Sinatra application class (OrdersApp)
│ # POST /orders → publish to RabbitMQ, return 201
│ # GET /health → return 200 OK
│
└── lib/ # shared library code
├── connection.rb # RabbitMQ::channel creates a Bunny channel
├── order.rb # Order = Data.define(...) value object + #to_json
└── subscribers/ # one file per subscriber, each owns its queue
├── email_subscriber.rb # EmailSubscriber.start orders.email queue
├── inventory_subscriber.rb # InventorySubscriber.start orders.inventory queue
└── analytics_subscriber.rb # AnalyticsSubscriber.start orders.analytics queue
Architecture
graph LR
Client["HTTP client\nPOST /orders"]
Sinatra["Sinatra app\napp.rb"]
Exchange["orders\nfanout exchange"]
Q1["orders.email"]
Q2["orders.inventory"]
Q3["orders.analytics"]
T1["Thread: EmailSubscriber"]
T2["Thread: InventorySubscriber"]
T3["Thread: AnalyticsSubscriber"]
Client -->|"JSON body"| Sinatra
Sinatra -->|"publish"| Exchange
Exchange --> Q1
Exchange --> Q2
Exchange --> Q3
Q1 --> T1
Q2 --> T2
Q3 --> T3
One Puma process hosts both the HTTP server and all three subscriber threads. Each subscriber has its own Bunny channel channels are not thread-safe and must never be shared.
Project setup
1
2
3
4
5
6
7
8
9
10
# frozen_string_literal: true
source "https://rubygems.org"
ruby "~> 4.0"
gem "bunny", "~> 2.22"
gem "json"
gem "sinatra", "~> 4.0"
gem "puma", "~> 6.0"
1
2
3
mkdir orders-app && cd orders-app
mkdir -p lib/subscribers
bundle install
lib/order.rb
Ruby 4.0’s Data.define creates an immutable, comparable value object. Use it to represent the order payload throughout the codebase frozen at construction, no unexpected mutation in long-running threads.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# frozen_string_literal: true
require "json"
Order = Data.define(:order_id, :customer_email, :items, :total) do
def to_json(*)
{
order_id: order_id,
customer_email: customer_email,
items: items,
total: total
}.to_json
end
end
lib/connection.rb
All connection parameters come from environment variables with sensible localhost defaults for development. In production set these via your secrets manager never hardcode credentials.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# frozen_string_literal: true
require "bunny"
module RabbitMQ
def self.channel
connection = Bunny.new(
host: ENV.fetch("RABBITMQ_HOST", "localhost"),
port: ENV.fetch("RABBITMQ_PORT", 5672).to_i,
vhost: ENV.fetch("RABBITMQ_VHOST", "/"),
user: ENV.fetch("RABBITMQ_USER", "guest"),
password: ENV.fetch("RABBITMQ_PASSWORD", "guest")
)
connection.start
connection.create_channel
end
end
lib/subscribers/email_subscriber.rb
Each subscriber is a plain Ruby class with a self.start class method. start opens its own channel, declares and binds its queue, sets prefetch, then enters a blocking subscribe loop. Using block: false keeps the Bunny event loop non-blocking so the thread remains alive under Ruby’s scheduler.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# frozen_string_literal: true
require "json"
require_relative "../connection"
class EmailSubscriber
EXCHANGE_NAME = "orders"
QUEUE_NAME = "orders.email"
def self.start
channel = RabbitMQ.channel
exchange = channel.fanout(EXCHANGE_NAME, durable: true)
queue = channel.queue(QUEUE_NAME, durable: true)
queue.bind(exchange)
channel.prefetch(1)
puts "[Email] Subscriber started, waiting for orders..."
queue.subscribe(manual_ack: true, block: false) do |delivery_info, _props, body|
payload = JSON.parse(body, symbolize_names: true)
payload in { order_id: String => order_id, customer_email: String => email }
puts "[Email] Sending confirmation for #{order_id} to #{email}"
sleep(0.1) # simulate delivery latency
puts "[Email] Sent"
channel.ack(delivery_info.delivery_tag)
rescue NoMatchingPatternError => e
puts "[Email] Rejected malformed message: #{e.message}"
channel.nack(delivery_info.delivery_tag, false, false)
end
loop { sleep 5 } # keep thread alive
end
end
lib/subscribers/inventory_subscriber.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# frozen_string_literal: true
require "json"
require_relative "../connection"
class InventorySubscriber
EXCHANGE_NAME = "orders"
QUEUE_NAME = "orders.inventory"
def self.start
channel = RabbitMQ.channel
exchange = channel.fanout(EXCHANGE_NAME, durable: true)
queue = channel.queue(QUEUE_NAME, durable: true)
queue.bind(exchange)
channel.prefetch(1)
puts "[Inventory] Subscriber started, waiting for orders..."
queue.subscribe(manual_ack: true, block: false) do |delivery_info, _props, body|
payload = JSON.parse(body, symbolize_names: true)
payload in { order_id: String => order_id, items: Array => items }
items.each do |item|
item in { sku: String => sku, qty: Integer => qty }
puts "[Inventory] Reserving #{qty}x #{sku} for order #{order_id}"
end
channel.ack(delivery_info.delivery_tag)
rescue NoMatchingPatternError => e
puts "[Inventory] Rejected malformed message: #{e.message}"
channel.nack(delivery_info.delivery_tag, false, false)
end
loop { sleep 5 }
end
end
lib/subscribers/analytics_subscriber.rb
The union pattern (Integer | Float) accepts both numeric types for total a practical demonstration of Ruby 4.0 pattern matching in message validation.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# frozen_string_literal: true
require "json"
require_relative "../connection"
class AnalyticsSubscriber
EXCHANGE_NAME = "orders"
QUEUE_NAME = "orders.analytics"
def self.start
channel = RabbitMQ.channel
exchange = channel.fanout(EXCHANGE_NAME, durable: true)
queue = channel.queue(QUEUE_NAME, durable: true)
queue.bind(exchange)
channel.prefetch(1)
puts "[Analytics] Subscriber started, waiting for orders..."
queue.subscribe(manual_ack: true, block: false) do |delivery_info, _props, body|
payload = JSON.parse(body, symbolize_names: true)
payload in { order_id: String => order_id, total: (Integer | Float) => total }
puts "[Analytics] Recording sale #{order_id}: £#{format("%.2f", total)}"
channel.ack(delivery_info.delivery_tag)
rescue NoMatchingPatternError => e
puts "[Analytics] Rejected malformed message: #{e.message}"
channel.nack(delivery_info.delivery_tag, false, false)
end
loop { sleep 5 }
end
end
app.rb
The Sinatra application exposes two routes. POST /orders parses the JSON body, builds an Order, publishes it to the fanout exchange, and returns 201 Created with the order ID. GET /health lets load balancers and orchestrators check liveness.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# frozen_string_literal: true
require "sinatra/base"
require "json"
require_relative "lib/connection"
require_relative "lib/order"
class OrdersApp < Sinatra::Base
EXCHANGE_NAME = "orders"
before do
content_type :json
end
post "/orders" do
body_data = JSON.parse(request.body.read, symbolize_names: true)
order = Order.new(
order_id: body_data.fetch(:order_id, "ORD-#{SecureRandom.hex(4).upcase}"),
customer_email: body_data.fetch(:customer_email),
items: body_data.fetch(:items),
total: body_data.fetch(:total)
)
channel = RabbitMQ.channel
exchange = channel.fanout(EXCHANGE_NAME, durable: true)
exchange.publish(order.to_json, persistent: true, content_type: "application/json")
channel.connection.close
status 201
{ order_id: order.order_id, status: "published" }.to_json
rescue KeyError => e
status 422
{ error: "Missing field: #{e.message}" }.to_json
rescue JSON::ParserError
status 400
{ error: "Invalid JSON body" }.to_json
end
get "/health" do
status 200
{ status: "ok" }.to_json
end
end
A new RabbitMQ.channel (and connection) is created per request and closed after publishing. This keeps the HTTP path stateless and avoids channel lifecycle issues across Puma threads.
config.ru
config.ru is the single boot file. It starts each subscriber in a daemon thread before handing the Rack app to Puma. If a subscriber thread dies unexpectedly, the rescue block logs the error in production you would restart the thread or alert.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# frozen_string_literal: true
require_relative "app"
require_relative "lib/subscribers/email_subscriber"
require_relative "lib/subscribers/inventory_subscriber"
require_relative "lib/subscribers/analytics_subscriber"
[EmailSubscriber, InventorySubscriber, AnalyticsSubscriber].each do |subscriber|
Thread.new do
subscriber.start
rescue => e
warn "[#{subscriber}] Fatal error: #{e.message}"
warn e.backtrace.first(5).join("\n")
end
end
run OrdersApp
Running the application
1
bundle exec puma config.ru
Expected startup output:
[Email] Subscriber started, waiting for orders...
[Inventory] Subscriber started, waiting for orders...
[Analytics] Subscriber started, waiting for orders...
Puma starting in single mode...
* Puma version: 6.x.x
* Min threads: 0, max threads: 5
* Listening on http://0.0.0.0:9292
Publish an order via curl
1
2
3
4
5
6
7
curl -s -X POST http://localhost:9292/orders \
-H "Content-Type: application/json" \
-d '{
"customer_email": "[email protected]",
"items": [{"sku": "WIDGET-42", "qty": 2, "price": 19.99}],
"total": 39.98
}' | python3 -m json.tool
Expected HTTP response:
1
2
3
4
{
"order_id": "ORD-A3F2C1B4",
"status": "published"
}
Expected log output from the subscriber threads:
[Email] Sending confirmation for ORD-A3F2C1B4 to [email protected]
[Email] Sent
[Inventory] Reserving 2x WIDGET-42 for order ORD-A3F2C1B4
[Analytics] Recording sale ORD-A3F2C1B4: £39.98
Health check
1
2
curl -s http://localhost:9292/health
# {"status":"ok"}
What to avoid
Do not use block: true in subscriber threads. With block: true, Bunny’s internal event loop blocks the Ruby thread indefinitely and prevents the rescue wrapper in config.ru from ever catching a fatal error. Use block: false and keep the thread alive with loop { sleep 5 } instead this gives the thread scheduler room to breathe and lets exceptions propagate cleanly.
Do not share a channel across threads. Bunny channels are not thread-safe. Each subscriber must open its own channel via RabbitMQ.channel. Sharing a single channel between the HTTP request handler and a subscriber thread will cause unpredictable delivery failures and connection resets.
Do not use auto_ack: true in production. With automatic acknowledgement, RabbitMQ marks the message delivered the moment it is dispatched. If a subscriber thread crashes mid-processing, the message is lost permanently. Always use manual_ack: true and call channel.ack only after successful processing.
Do not open a persistent channel in app.rb. The POST /orders handler creates a new connection per request and closes it immediately after publishing. Opening a long-lived channel shared across Puma threads would require explicit locking and dramatically complicate error handling. The per-request connection cost is negligible for typical order throughput.
Do not forget frozen_string_literal: true in long-running subscriber files. Without it, every string literal in the subscribe callback allocates a new object on each message delivery. Over millions of messages this creates measurable GC pressure. The magic comment eliminates this at zero cost.