Persistence and Streaming
Session 2 - NATS Workshop
streams consumers persistence at-least-once
| Feature | Core NATS |
|---|---|
| Delivery | At-most-once |
| Persistence | None - fire and forget |
| Replay | Not possible |
| Acknowledgments | None |
What if we need guarantees?
Built-in persistence layer for NATS
| Feature | Core NATS | JetStream |
|---|---|---|
| Delivery | At-most-once | At-least-once / Exactly-once |
| Persistence | None | Memory or File |
| Replay | Not possible | From any point |
| Acks | None | Full ack protocol |
| Speed | 14M msg/s | ~1M msg/s |
A stream captures messages published to subjects
Messages get a sequence number and timestamp
import nats
from nats.js.api import StreamConfig
nc = await nats.connect()
js = nc.jetstream()
# Create stream
await js.add_stream(
name="ORDERS",
subjects=["orders.>"],
storage="file", # or "memory"
max_msgs=100000,
max_age=86400, # 24 hours in seconds
)
| Option | Purpose |
|---|---|
subjects |
Which subjects to capture |
storage |
file (durable) or memory (fast) |
max_msgs |
Maximum messages to keep |
max_age |
TTL for messages |
max_bytes |
Maximum storage size |
replicas |
Replication factor (clustering) |
from nats.js.api import RetentionPolicy
await js.add_stream(
name="TASKS",
subjects=["tasks.>"],
retention=RetentionPolicy.WORK_QUEUE,
)
# Simple publish - returns ack with sequence number
ack = await js.publish("orders.new", b'{"id": 123}')
print(f"Published: seq={ack.seq}")
# Publish with message ID for deduplication
ack = await js.publish(
"orders.new",
b'{"id": 123}',
headers={"Nats-Msg-Id": "order-123"}
)
# If same Nats-Msg-Id sent again, server returns
# existing sequence (no duplicate stored)
Consumers track position in a stream
Each consumer has independent position
| Type | Durable | Use Case |
|---|---|---|
| Durable | Yes | Survives restarts, remembers position |
| Ephemeral | No | Temporary, deleted when inactive |
# Durable consumer - survives restarts
await js.subscribe("orders.>", durable="my-service")
# Ephemeral consumer - temporary
await js.subscribe("orders.>")
Server pushes messages to you
sub = await js.subscribe(
"orders.>",
durable="processor",
)
async for msg in sub.messages:
process(msg)
await msg.ack()
You fetch messages on demand
sub = await js.pull_subscribe(
"orders.>",
durable="processor",
)
msgs = await sub.fetch(10)
for msg in msgs:
process(msg)
await msg.ack()
Consumers must acknowledge message processing
| Ack Type | Meaning |
|---|---|
msg.ack() |
Success - move on |
msg.nak() |
Failed - redeliver now |
msg.term() |
Failed permanently - do not retry |
msg.in_progress() |
Still working - extend timeout |
async for msg in subscription.messages:
try:
# Process the message
result = await process_order(msg.data)
if result.success:
await msg.ack()
else:
await msg.nak() # Retry
except PermanentError:
await msg.term() # Give up
except TimeoutError:
await msg.in_progress() # Need more time
Where should a new consumer start reading?
from nats.js.api import DeliverPolicy
await js.subscribe(
"orders.>",
durable="replay-all",
deliver_policy=DeliverPolicy.ALL,
)
from nats.js.api import DeliverPolicy
from datetime import datetime, timedelta
# Start from sequence number
await js.subscribe(
"orders.>",
deliver_policy=DeliverPolicy.BY_START_SEQUENCE,
opt_start_seq=1000,
)
# Start from timestamp
await js.subscribe(
"orders.>",
deliver_policy=DeliverPolicy.BY_START_TIME,
opt_start_time=datetime.now() - timedelta(hours=1),
)
# Publisher: add unique message ID
await js.publish(
"orders.new",
order_data,
headers={"Nats-Msg-Id": f"order-{order_id}"}
)
# If network fails and you retry with same ID,
# server recognizes duplicate and returns
# existing sequence number
# Consumer: use double-ack
await js.subscribe(
"orders.>",
ack_policy=AckPolicy.ALL, # Ack every message
)
# Pull consumers can batch
psub = await js.pull_subscribe("orders.>", "batch-proc")
# Fetch up to 100 messages, wait max 5 seconds
msgs = await psub.fetch(batch=100, timeout=5)
for msg in msgs:
process(msg)
await msg.ack()
# Great for batch processing workloads
# Multiple consumers with same queue group
# share the work (like Core NATS queue groups)
await js.subscribe(
"orders.>",
queue="processors",
durable="order-processor",
)
# Each message delivered to exactly ONE
# member of the queue group
# Get stream info
info = await js.stream_info("ORDERS")
print(f"Messages: {info.state.messages}")
print(f"Bytes: {info.state.bytes}")
# Purge all messages
await js.purge_stream("ORDERS")
# Delete specific message
await js.delete_msg("ORDERS", seq=42)
# Get message by sequence
msg = await js.get_msg("ORDERS", seq=100)
import nats
async def main():
nc = await nats.connect()
js = nc.jetstream()
# Create stream
await js.add_stream(name="ORDERS", subjects=["orders.>"])
# Subscribe with durable consumer
sub = await js.subscribe("orders.>", durable="processor")
# Publish
await js.publish("orders.new", b'{"id": 1}')
# Consume
msg = await sub.next_msg()
print(f"Got: {msg.data}")
await msg.ack()
await nc.close()
Stream: ORDERS
subjects: orders.>
retention: limits
max_age: 7 days
Consumers:
- "fulfillment" (durable, queue)
- "analytics" (durable, replay all)
- "notifications" (ephemeral)
Flow:
orders.new --> Stream --> Consumers
(persisted) (independent)
See streams, consumers, and message persistence in action:
Launch DemoConnected to live NATS JetStream at wss://i33ym.cc/ws/nats
| Concept | What It Does |
|---|---|
| Stream | Captures and persists messages |
| Consumer | Tracks reading position |
| Durable | Survives restarts |
| Ack/Nak/Term | Confirm processing |
| Replay | Start from any point |
| Dedup | Exactly-once with Nats-Msg-Id |
JetStream: Persistence and Streaming - Session 2