Session 1 - Messaging Patterns
pub/sub request/reply queue groups subjects
A connective technology for modern distributed systems. Simple, fast, and lightweight messaging.
messages per second (single server)
| Feature | NATS | Kafka | RabbitMQ |
|---|---|---|---|
| Latency | ~100us | ~5ms | ~1ms |
| Binary Size | 20MB | 100MB+ | 50MB |
| Memory | 10-50MB | 1GB+ | 128MB+ |
| Setup | Single binary | ZooKeeper | Erlang |
Messages are published to subjects, not queues or topics.
Use . as delimiter to create hierarchies:
company.department.action
orders.new
orders.fulfilled
orders.cancelled
payments.processed
payments.failed
Enables powerful wildcard subscriptions
* Single Tokenorders.*
orders.new OK
orders.fulfilled OK
orders.us.new NO
> Multiple Tokensorders.>
orders.new OK
orders.fulfilled OK
orders.us.east.new OK
> must be the last token
All subscribers receive every message (fan-out)
import asyncio
import nats
async def main():
nc = await nats.connect("nats://localhost:4222")
async def handler(msg):
print(f"Received: {msg.data.decode()}")
await nc.subscribe("orders.>", cb=handler)
await nc.publish("orders.new", b"Order #123")
await asyncio.sleep(1)
await nc.close()
asyncio.run(main())
Only ONE subscriber receives each message (load balancing)
async def worker(nc, worker_id):
async def handler(msg):
print(f"Worker {worker_id}: {msg.data.decode()}")
# Join queue group "workers"
await nc.subscribe("tasks", queue="workers", cb=handler)
# NATS automatically load-balances between workers
Synchronous RPC over async messaging
# Service
async def service(nc):
async def handler(msg):
response = f"Processed: {msg.data.decode()}"
await nc.publish(msg.reply, response.encode())
await nc.subscribe("api.process", cb=handler)
# Client
resp = await nc.request("api.process", b"data", timeout=1.0)
print(f"Got: {resp.data.decode()}")
Subject: orders.new (required)
Reply-To: _INBOX.xyz123 (optional)
Headers: {"X-Trace-Id": "abc"} (optional)
Payload: {"id": 123, ...} (bytes, max 1MB)
Need persistence? That is JetStream (Session 2)
nc = await nats.connect(
servers=["nats://localhost:4222"],
reconnect_time_wait=2,
max_reconnect_attempts=-1,
ping_interval=20,
)
nc.on_disconnect = lambda: print("Disconnected!")
nc.on_reconnect = lambda: print("Reconnected!")
await nc.drain() # Graceful shutdown
from nats.errors import TimeoutError, NoRespondersError
try:
resp = await nc.request("api.users", b"get", timeout=1.0)
except TimeoutError:
print("Service did not respond in time")
except NoRespondersError:
print("No service listening on this subject")
Mix and match: queue group of services handling requests
Subjects:
payments.initiated
payments.{provider}.result
payments.completed
payments.failed
Queue Groups:
payments.initiated queue="processors"
Request/Reply:
api.balance.check Balance Service
Try pub/sub, wildcards, queue groups, and request/reply live:
Launch DemoConnected to live NATS server at wss://i33ym.cc/ws/nats
| Concept | What It Does |
|---|---|
| Subject | Address for routing messages |
| Wildcards (* >) | Subscribe to multiple subjects |
| Pub/Sub | Fan-out to all subscribers |
| Queue Group | Load balance between workers |
| Request/Reply | Synchronous RPC pattern |
NATS Core Fundamentals - Session 1