i33ym

NATS JetStream

Core NATS gives you speed: millions of messages per second, microsecond latency, fire-and-forget simplicity. But sometimes you need more. You need messages to persist. You need to replay history. You need acknowledgments and delivery guarantees. This is what JetStream provides.

JetStream is not a separate system. It is a persistence layer built into NATS, enabled with a single configuration flag. The same subjects, the same clients, the same simplicity—but with durability.

Streams

A stream is a message store. You configure it to capture messages published to certain subjects, and it persists them with sequence numbers and timestamps.

import nats
from nats.js.api import StreamConfig

nc = await nats.connect()
js = nc.jetstream()

await js.add_stream(
    name="ORDERS",
    subjects=["orders.>"],
    storage="file",
    max_msgs=100000,
    max_age=86400,
)

This creates a stream called ORDERS that captures all messages published to any subject starting with orders.. Messages are stored on disk, with a limit of 100,000 messages or 24 hours, whichever comes first.

The key difference from Core NATS: publishers no longer need active subscribers. Messages go into the stream and wait. Consumers can read them at any time, at their own pace, even days later.

Stream Configuration

Streams have several configuration options that control behavior:

Storage can be file for durability across restarts, or memory for speed when persistence is not required. File storage survives server restarts; memory storage is lost.

Retention determines when messages are deleted. The default limits policy deletes old messages when max_msgs, max_bytes, or max_age limits are reached. The interest policy keeps messages until all consumers have acknowledged them. The workqueue policy deletes messages immediately after the first acknowledgment—useful for task queues where each message should be processed exactly once.

Replicas controls how many copies of the stream exist in a cluster. With three replicas, the stream survives the loss of any single server.

Publishing to Streams

Publishing to JetStream is similar to Core NATS, but you get an acknowledgment back:

ack = await js.publish("orders.new", b'{"id": 123}')
print(f"Stored at sequence {ack.seq}")

The ack contains the sequence number assigned to the message. This is useful for tracking and deduplication.

For exactly-once publishing, add a message ID:

ack = await js.publish(
    "orders.new",
    b'{"id": 123}',
    headers={"Nats-Msg-Id": "order-123"}
)

If you publish the same message ID again—perhaps because you are retrying after a network timeout—the server recognizes the duplicate and returns the existing sequence number without storing a second copy. This is how you achieve exactly-once semantics.

Consumers

A consumer is a view into a stream. It tracks position (which messages have been read) and delivers messages to subscribers. Multiple consumers can read from the same stream independently, each maintaining their own position.

sub = await js.subscribe("orders.>", durable="my-processor")

async for msg in sub.messages:
    process(msg)
    await msg.ack()

The durable parameter gives the consumer a name. Durable consumers survive client disconnections and server restarts. When you reconnect with the same name, you resume from where you left off.

Ephemeral consumers (created without a name) are temporary. They are deleted after a period of inactivity. Use them for ad-hoc queries or monitoring.

Push vs Pull

Consumers come in two flavors. Push consumers deliver messages to you as they arrive:

sub = await js.subscribe("orders.>", durable="push-consumer")

async for msg in sub.messages:
    # Messages arrive automatically
    await msg.ack()

Pull consumers let you fetch messages on demand:

sub = await js.pull_subscribe("orders.>", durable="pull-consumer")

# Fetch up to 10 messages, wait up to 5 seconds
msgs = await sub.fetch(10, timeout=5)
for msg in msgs:
    process(msg)
    await msg.ack()

Pull consumers are useful for batch processing, rate limiting, or when you want explicit control over when messages are fetched.

Acknowledgments

Unlike Core NATS, JetStream requires acknowledgments. When you receive a message, you must tell the server what happened:

msg.ack() means success. The message is marked as delivered and the consumer moves on.

msg.nak() means failure with retry. The message will be redelivered immediately.

msg.term() means permanent failure. Do not retry this message. Use this for poison messages that will never succeed.

msg.in_progress() means you are still working on it. This extends the ack timeout, preventing the server from redelivering while you are still processing.

async for msg in subscription.messages:
    try:
        result = await process_order(msg.data)
        if result.success:
            await msg.ack()
        else:
            await msg.nak()
    except PermanentError:
        await msg.term()
    except SlowProcessingError:
        await msg.in_progress()
        # Continue processing...

Replay Policies

When you create a consumer, you choose where it starts reading. The deliver_policy option controls this:

DeliverPolicy.ALL replays everything from the beginning of the stream. Use this when you need complete history, like rebuilding state from events.

DeliverPolicy.LAST starts from the most recent message. Useful for getting the current state without processing history.

DeliverPolicy.NEW only delivers messages published after the consumer is created. Use this for live processing where history is not needed.

DeliverPolicy.BY_START_SEQUENCE and BY_START_TIME let you start from a specific point:

from datetime import datetime, timedelta

# Start from sequence 1000
await js.subscribe(
    "orders.>",
    deliver_policy=DeliverPolicy.BY_START_SEQUENCE,
    opt_start_seq=1000,
)

# Start from one hour ago
await js.subscribe(
    "orders.>",
    deliver_policy=DeliverPolicy.BY_START_TIME,
    opt_start_time=datetime.now() - timedelta(hours=1),
)

Delivery Guarantees

JetStream provides three levels of delivery guarantees:

At-most-once is what Core NATS provides. Messages might be lost if no subscriber is listening.

At-least-once is what JetStream provides by default. Messages are stored and redelivered until acknowledged. You might see duplicates if ack is lost.

Exactly-once requires two things: publisher deduplication with Nats-Msg-Id, and consumer idempotency. The server prevents duplicate storage; your consumer must prevent duplicate processing.

Queue Groups

JetStream supports queue groups just like Core NATS. Multiple consumers with the same queue name share the work:

await js.subscribe(
    "orders.>",
    queue="processors",
    durable="order-processor",
)

Each message is delivered to exactly one member of the queue group. Combined with persistence, this gives you reliable distributed work queues.

Stream Operations

You can inspect and manage streams programmatically:

# Get stream info
info = await js.stream_info("ORDERS")
print(f"Messages: {info.state.messages}")
print(f"Bytes: {info.state.bytes}")
print(f"First seq: {info.state.first_seq}")
print(f"Last seq: {info.state.last_seq}")

# Get specific message
msg = await js.get_msg("ORDERS", seq=100)

# Delete specific message
await js.delete_msg("ORDERS", seq=100)

# Purge all messages
await js.purge_stream("ORDERS")

When to Use JetStream

Use JetStream when you need any of these:

  • Message persistence across restarts
  • Replay capability for new consumers
  • Delivery guarantees beyond at-most-once
  • Consumer acknowledgments
  • Work queues with reliable delivery

Stick with Core NATS when:

  • Maximum speed is the priority
  • Messages are ephemeral (telemetry, heartbeats)
  • Losing occasional messages is acceptable
  • You do not need replay

Performance Considerations

JetStream is fast—around one million messages per second on a single server—but not as fast as Core NATS. The persistence layer adds overhead. For most applications this is irrelevant; one million messages per second is more than enough.

Memory storage is faster than file storage but does not survive restarts. Use memory for high-throughput scenarios where durability is less important than speed.

Pull consumers with batching are more efficient than push consumers for high-volume processing. Fetch 100 messages at a time instead of one by one.

Try It Yourself

The interactive demo for this session lets you experiment with streams, consumers, acknowledgments, replay policies, and deduplication: i33ym.cc/demo-nats-jetstream

What's Next

JetStream provides more than just message persistence. In the next session, we cover Key-Value Store and Object Store—distributed data structures built on top of JetStream. We will also explore stream mirroring for geo-distribution and advanced patterns like work queues and event sourcing.

↑ 0 ↓ 0

Comments (0)

No comments yet. Be the first to share your thoughts.

← back to essays