JetStream

Persistence and Streaming

Session 2 - NATS Workshop

streams consumers persistence at-least-once

Recap: Core NATS

Feature Core NATS
Delivery At-most-once
Persistence None - fire and forget
Replay Not possible
Acknowledgments None

What if we need guarantees?

Enter JetStream

Built-in persistence layer for NATS

Publisher
-->
Stream
persisted
-->
Consumer
tracks position
Key Difference
Messages are stored in streams. Consumers read from streams at their own pace.

Core NATS vs JetStream

Feature Core NATS JetStream
Delivery At-most-once At-least-once / Exactly-once
Persistence None Memory or File
Replay Not possible From any point
Acks None Full ack protocol
Speed 14M msg/s ~1M msg/s

Streams

A stream captures messages published to subjects

ORDERS Stream
subjects: orders.>
1
2
3
4
5
...
sequence: 1, 2, 3, ...

Messages get a sequence number and timestamp

Creating a Stream

import nats
from nats.js.api import StreamConfig

nc = await nats.connect()
js = nc.jetstream()

# Create stream
await js.add_stream(
    name="ORDERS",
    subjects=["orders.>"],
    storage="file",        # or "memory"
    max_msgs=100000,
    max_age=86400,         # 24 hours in seconds
)

Stream Configuration

Option Purpose
subjects Which subjects to capture
storage file (durable) or memory (fast)
max_msgs Maximum messages to keep
max_age TTL for messages
max_bytes Maximum storage size
replicas Replication factor (clustering)

Retention Policies

Limits
Limits (default)
Delete when max reached
Interest
Interest
Delete when all consumers ack
Work
WorkQueue
Delete after first ack
from nats.js.api import RetentionPolicy

await js.add_stream(
    name="TASKS",
    subjects=["tasks.>"],
    retention=RetentionPolicy.WORK_QUEUE,
)

Publishing to JetStream

# Simple publish - returns ack with sequence number
ack = await js.publish("orders.new", b'{"id": 123}')
print(f"Published: seq={ack.seq}")

# Publish with message ID for deduplication
ack = await js.publish(
    "orders.new",
    b'{"id": 123}',
    headers={"Nats-Msg-Id": "order-123"}
)

# If same Nats-Msg-Id sent again, server returns
# existing sequence (no duplicate stored)

Consumers

Consumers track position in a stream

ORDERS Stream
1
2
3
4
5
analytics
position: 5
billing
position: 3

Each consumer has independent position

Consumer Types

Type Durable Use Case
Durable Yes Survives restarts, remembers position
Ephemeral No Temporary, deleted when inactive
# Durable consumer - survives restarts
await js.subscribe("orders.>", durable="my-service")

# Ephemeral consumer - temporary
await js.subscribe("orders.>")

Push vs Pull

Push Consumer

Server pushes messages to you

sub = await js.subscribe(
    "orders.>",
    durable="processor",
)

async for msg in sub.messages:
    process(msg)
    await msg.ack()

Pull Consumer

You fetch messages on demand

sub = await js.pull_subscribe(
    "orders.>",
    durable="processor",
)

msgs = await sub.fetch(10)
for msg in msgs:
    process(msg)
    await msg.ack()

Acknowledgments

Consumers must acknowledge message processing

Ack Type Meaning
msg.ack() Success - move on
msg.nak() Failed - redeliver now
msg.term() Failed permanently - do not retry
msg.in_progress() Still working - extend timeout

Ack Example

async for msg in subscription.messages:
    try:
        # Process the message
        result = await process_order(msg.data)
        
        if result.success:
            await msg.ack()
        else:
            await msg.nak()  # Retry
            
    except PermanentError:
        await msg.term()  # Give up
        
    except TimeoutError:
        await msg.in_progress()  # Need more time

Replay Policies

Where should a new consumer start reading?

all
Replay everything from the beginning
last
Start from last message
new
Only new messages
from nats.js.api import DeliverPolicy

await js.subscribe(
    "orders.>",
    durable="replay-all",
    deliver_policy=DeliverPolicy.ALL,
)

Start From Specific Point

from nats.js.api import DeliverPolicy
from datetime import datetime, timedelta

# Start from sequence number
await js.subscribe(
    "orders.>",
    deliver_policy=DeliverPolicy.BY_START_SEQUENCE,
    opt_start_seq=1000,
)

# Start from timestamp
await js.subscribe(
    "orders.>",
    deliver_policy=DeliverPolicy.BY_START_TIME,
    opt_start_time=datetime.now() - timedelta(hours=1),
)

Delivery Guarantees

At-Most-Once
Core NATS
Fire and forget
At-Least-Once
JetStream + Acks
May see duplicates
Exactly-Once
JetStream + Dedup
No duplicates

Exactly-Once with Deduplication

# Publisher: add unique message ID
await js.publish(
    "orders.new",
    order_data,
    headers={"Nats-Msg-Id": f"order-{order_id}"}
)

# If network fails and you retry with same ID,
# server recognizes duplicate and returns
# existing sequence number

# Consumer: use double-ack
await js.subscribe(
    "orders.>",
    ack_policy=AckPolicy.ALL,  # Ack every message
)

Consumer Batching

# Pull consumers can batch
psub = await js.pull_subscribe("orders.>", "batch-proc")

# Fetch up to 100 messages, wait max 5 seconds
msgs = await psub.fetch(batch=100, timeout=5)

for msg in msgs:
    process(msg)
    await msg.ack()

# Great for batch processing workloads

Queue Groups in JetStream

# Multiple consumers with same queue group
# share the work (like Core NATS queue groups)

await js.subscribe(
    "orders.>",
    queue="processors",
    durable="order-processor",
)

# Each message delivered to exactly ONE
# member of the queue group
Best of Both
Persistence + Load Balancing = Reliable distributed processing

Stream Operations

# Get stream info
info = await js.stream_info("ORDERS")
print(f"Messages: {info.state.messages}")
print(f"Bytes: {info.state.bytes}")

# Purge all messages
await js.purge_stream("ORDERS")

# Delete specific message
await js.delete_msg("ORDERS", seq=42)

# Get message by sequence
msg = await js.get_msg("ORDERS", seq=100)

Full Example

import nats

async def main():
    nc = await nats.connect()
    js = nc.jetstream()
    
    # Create stream
    await js.add_stream(name="ORDERS", subjects=["orders.>"])
    
    # Subscribe with durable consumer
    sub = await js.subscribe("orders.>", durable="processor")
    
    # Publish
    await js.publish("orders.new", b'{"id": 1}')
    
    # Consume
    msg = await sub.next_msg()
    print(f"Got: {msg.data}")
    await msg.ack()
    
    await nc.close()

Real-World: Order Processing

Stream: ORDERS
  subjects: orders.>
  retention: limits
  max_age: 7 days

Consumers:
  - "fulfillment" (durable, queue)
  - "analytics" (durable, replay all)
  - "notifications" (ephemeral)

Flow:
  orders.new --> Stream --> Consumers
              (persisted)  (independent)

Interactive Demo

See streams, consumers, and message persistence in action:

Launch Demo

Connected to live NATS JetStream at wss://i33ym.cc/ws/nats

Summary

Concept What It Does
Stream Captures and persists messages
Consumer Tracks reading position
Durable Survives restarts
Ack/Nak/Term Confirm processing
Replay Start from any point
Dedup Exactly-once with Nats-Msg-Id

Key Takeaway

Stream + Consumer = Reliable Messaging
1
Create stream to capture subjects
2
Create durable consumer for reliability
3
Ack messages after successful processing

Resources

Next Session

Key-Value and Object Store

  • KV Store: distributed key-value with history
  • Object Store: large file storage
  • Work queues and stream sourcing
  • Mirroring for geo-distribution

Questions?

JetStream: Persistence and Streaming - Session 2