fermentord/vendor/github.com/nats-io/nats.go/js.go

3880 lines
110 KiB
Go

// Copyright 2020-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package nats
import (
"bytes"
"context"
"crypto/sha256"
"encoding/json"
"errors"
"fmt"
"math/rand"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/nats-io/nats.go/internal/parser"
"github.com/nats-io/nuid"
)
// JetStream allows persistent messaging through JetStream.
//
// NOTE: JetStream is part of legacy API.
// Users are encouraged to switch to the new JetStream API for enhanced capabilities and
// simplified API. Please refer to the `jetstream` package.
// See: https://github.com/nats-io/nats.go/blob/main/jetstream/README.md
type JetStream interface {
// Publish publishes a message to JetStream.
Publish(subj string, data []byte, opts ...PubOpt) (*PubAck, error)
// PublishMsg publishes a Msg to JetStream.
PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error)
// PublishAsync publishes a message to JetStream and returns a PubAckFuture.
// The data should not be changed until the PubAckFuture has been processed.
PublishAsync(subj string, data []byte, opts ...PubOpt) (PubAckFuture, error)
// PublishMsgAsync publishes a Msg to JetStream and returns a PubAckFuture.
// The message should not be changed until the PubAckFuture has been processed.
PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error)
// PublishAsyncPending returns the number of async publishes outstanding for this context.
PublishAsyncPending() int
// PublishAsyncComplete returns a channel that will be closed when all outstanding messages are ack'd.
PublishAsyncComplete() <-chan struct{}
// Subscribe creates an async Subscription for JetStream.
// The stream and consumer names can be provided with the nats.Bind() option.
// For creating an ephemeral (where the consumer name is picked by the server),
// you can provide the stream name with nats.BindStream().
// If no stream name is specified, the library will attempt to figure out which
// stream the subscription is for. See important notes below for more details.
//
// IMPORTANT NOTES:
// * If none of the options Bind() nor Durable() are specified, the library will
// send a request to the server to create an ephemeral JetStream consumer,
// which will be deleted after an Unsubscribe() or Drain(), or automatically
// by the server after a short period of time after the NATS subscription is
// gone.
// * If Durable() option is specified, the library will attempt to lookup a JetStream
// consumer with this name, and if found, will bind to it and not attempt to
// delete it. However, if not found, the library will send a request to
// create such durable JetStream consumer. Note that the library will delete
// the JetStream consumer after an Unsubscribe() or Drain() only if it
// created the durable consumer while subscribing. If the durable consumer
// already existed prior to subscribing it won't be deleted.
// * If Bind() option is provided, the library will attempt to lookup the
// consumer with the given name, and if successful, bind to it. If the lookup fails,
// then the Subscribe() call will return an error.
Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscription, error)
// SubscribeSync creates a Subscription that can be used to process messages synchronously.
// See important note in Subscribe()
SubscribeSync(subj string, opts ...SubOpt) (*Subscription, error)
// ChanSubscribe creates channel based Subscription.
// See important note in Subscribe()
ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error)
// ChanQueueSubscribe creates channel based Subscription with a queue group.
// See important note in QueueSubscribe()
ChanQueueSubscribe(subj, queue string, ch chan *Msg, opts ...SubOpt) (*Subscription, error)
// QueueSubscribe creates a Subscription with a queue group.
// If no optional durable name nor binding options are specified, the queue name will be used as a durable name.
// See important note in Subscribe()
QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) (*Subscription, error)
// QueueSubscribeSync creates a Subscription with a queue group that can be used to process messages synchronously.
// See important note in QueueSubscribe()
QueueSubscribeSync(subj, queue string, opts ...SubOpt) (*Subscription, error)
// PullSubscribe creates a Subscription that can fetch messages.
// See important note in Subscribe(). Additionally, for an ephemeral pull consumer, the "durable" value must be
// set to an empty string.
PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription, error)
}
// JetStreamContext allows JetStream messaging and stream management.
//
// NOTE: JetStreamContext is part of legacy API.
// Users are encouraged to switch to the new JetStream API for enhanced capabilities and
// simplified API. Please refer to the `jetstream` package.
// See: https://github.com/nats-io/nats.go/blob/main/jetstream/README.md
type JetStreamContext interface {
JetStream
JetStreamManager
KeyValueManager
ObjectStoreManager
}
// Request API subjects for JetStream.
const (
// defaultAPIPrefix is the default prefix for the JetStream API.
defaultAPIPrefix = "$JS.API."
// jsDomainT is used to create JetStream API prefix by specifying only Domain
jsDomainT = "$JS.%s.API."
// jsExtDomainT is used to create a StreamSource External APIPrefix
jsExtDomainT = "$JS.%s.API"
// apiAccountInfo is for obtaining general information about JetStream.
apiAccountInfo = "INFO"
// apiConsumerCreateT is used to create consumers.
// it accepts stream name and consumer name.
apiConsumerCreateT = "CONSUMER.CREATE.%s.%s"
// apiConsumerCreateT is used to create consumers.
// it accepts stream name, consumer name and filter subject
apiConsumerCreateWithFilterSubjectT = "CONSUMER.CREATE.%s.%s.%s"
// apiLegacyConsumerCreateT is used to create consumers.
// this is a legacy endpoint to support creating ephemerals before nats-server v2.9.0.
apiLegacyConsumerCreateT = "CONSUMER.CREATE.%s"
// apiDurableCreateT is used to create durable consumers.
// this is a legacy endpoint to support creating durable consumers before nats-server v2.9.0.
apiDurableCreateT = "CONSUMER.DURABLE.CREATE.%s.%s"
// apiConsumerInfoT is used to create consumers.
apiConsumerInfoT = "CONSUMER.INFO.%s.%s"
// apiRequestNextT is the prefix for the request next message(s) for a consumer in worker/pull mode.
apiRequestNextT = "CONSUMER.MSG.NEXT.%s.%s"
// apiConsumerDeleteT is used to delete consumers.
apiConsumerDeleteT = "CONSUMER.DELETE.%s.%s"
// apiConsumerListT is used to return all detailed consumer information
apiConsumerListT = "CONSUMER.LIST.%s"
// apiConsumerNamesT is used to return a list with all consumer names for the stream.
apiConsumerNamesT = "CONSUMER.NAMES.%s"
// apiStreams can lookup a stream by subject.
apiStreams = "STREAM.NAMES"
// apiStreamCreateT is the endpoint to create new streams.
apiStreamCreateT = "STREAM.CREATE.%s"
// apiStreamInfoT is the endpoint to get information on a stream.
apiStreamInfoT = "STREAM.INFO.%s"
// apiStreamUpdateT is the endpoint to update existing streams.
apiStreamUpdateT = "STREAM.UPDATE.%s"
// apiStreamDeleteT is the endpoint to delete streams.
apiStreamDeleteT = "STREAM.DELETE.%s"
// apiStreamPurgeT is the endpoint to purge streams.
apiStreamPurgeT = "STREAM.PURGE.%s"
// apiStreamListT is the endpoint that will return all detailed stream information
apiStreamListT = "STREAM.LIST"
// apiMsgGetT is the endpoint to get a message.
apiMsgGetT = "STREAM.MSG.GET.%s"
// apiMsgGetT is the endpoint to perform a direct get of a message.
apiDirectMsgGetT = "DIRECT.GET.%s"
// apiDirectMsgGetLastBySubjectT is the endpoint to perform a direct get of a message by subject.
apiDirectMsgGetLastBySubjectT = "DIRECT.GET.%s.%s"
// apiMsgDeleteT is the endpoint to remove a message.
apiMsgDeleteT = "STREAM.MSG.DELETE.%s"
// orderedHeartbeatsInterval is how fast we want HBs from the server during idle.
orderedHeartbeatsInterval = 5 * time.Second
// Scale for threshold of missed HBs or lack of activity.
hbcThresh = 2
// For ChanSubscription, we can't update sub.delivered as we do for other
// type of subscriptions, since the channel is user provided.
// With flow control in play, we will check for flow control on incoming
// messages (as opposed to when they are delivered), but also from a go
// routine. Without this, the subscription would possibly stall until
// a new message or heartbeat/fc are received.
chanSubFCCheckInterval = 250 * time.Millisecond
// Default time wait between retries on Publish iff err is NoResponders.
DefaultPubRetryWait = 250 * time.Millisecond
// Default number of retries
DefaultPubRetryAttempts = 2
// defaultAsyncPubAckInflight is the number of async pub acks inflight.
defaultAsyncPubAckInflight = 4000
)
// Types of control messages, so far heartbeat and flow control
const (
jsCtrlHB = 1
jsCtrlFC = 2
)
// js is an internal struct from a JetStreamContext.
type js struct {
nc *Conn
opts *jsOpts
// For async publish context.
mu sync.RWMutex
rpre string
rsub *Subscription
pafs map[string]*pubAckFuture
stc chan struct{}
dch chan struct{}
rr *rand.Rand
connStatusCh chan (Status)
replyPrefix string
replyPrefixLen int
}
type jsOpts struct {
ctx context.Context
// For importing JetStream from other accounts.
pre string
// Amount of time to wait for API requests.
wait time.Duration
// For async publish error handling.
aecb MsgErrHandler
// Max async pub ack in flight
maxpa int
// the domain that produced the pre
domain string
// enables protocol tracing
ctrace ClientTrace
shouldTrace bool
// purgeOpts contains optional stream purge options
purgeOpts *StreamPurgeRequest
// streamInfoOpts contains optional stream info options
streamInfoOpts *StreamInfoRequest
// streamListSubject is used for subject filtering when listing streams / stream names
streamListSubject string
// For direct get message requests
directGet bool
// For direct get next message
directNextFor string
// featureFlags are used to enable/disable specific JetStream features
featureFlags featureFlags
}
const (
defaultRequestWait = 5 * time.Second
defaultAccountCheck = 20 * time.Second
)
// JetStream returns a JetStreamContext for messaging and stream management.
// Errors are only returned if inconsistent options are provided.
//
// NOTE: JetStreamContext is part of legacy API.
// Users are encouraged to switch to the new JetStream API for enhanced capabilities and
// simplified API. Please refer to the `jetstream` package.
// See: https://github.com/nats-io/nats.go/blob/main/jetstream/README.md
func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error) {
js := &js{
nc: nc,
opts: &jsOpts{
pre: defaultAPIPrefix,
wait: defaultRequestWait,
maxpa: defaultAsyncPubAckInflight,
},
}
inboxPrefix := InboxPrefix
if js.nc.Opts.InboxPrefix != _EMPTY_ {
inboxPrefix = js.nc.Opts.InboxPrefix + "."
}
js.replyPrefix = inboxPrefix
js.replyPrefixLen = len(js.replyPrefix) + aReplyTokensize + 1
for _, opt := range opts {
if err := opt.configureJSContext(js.opts); err != nil {
return nil, err
}
}
return js, nil
}
// JSOpt configures a JetStreamContext.
type JSOpt interface {
configureJSContext(opts *jsOpts) error
}
// jsOptFn configures an option for the JetStreamContext.
type jsOptFn func(opts *jsOpts) error
func (opt jsOptFn) configureJSContext(opts *jsOpts) error {
return opt(opts)
}
type featureFlags struct {
useDurableConsumerCreate bool
}
// UseLegacyDurableConsumers makes JetStream use the legacy (pre nats-server v2.9.0) subjects for consumer creation.
// If this option is used when creating JetStremContext, $JS.API.CONSUMER.DURABLE.CREATE.<stream>.<consumer> will be used
// to create a consumer with Durable provided, rather than $JS.API.CONSUMER.CREATE.<stream>.<consumer>.
func UseLegacyDurableConsumers() JSOpt {
return jsOptFn(func(opts *jsOpts) error {
opts.featureFlags.useDurableConsumerCreate = true
return nil
})
}
// ClientTrace can be used to trace API interactions for the JetStream Context.
type ClientTrace struct {
RequestSent func(subj string, payload []byte)
ResponseReceived func(subj string, payload []byte, hdr Header)
}
func (ct ClientTrace) configureJSContext(js *jsOpts) error {
js.ctrace = ct
js.shouldTrace = true
return nil
}
// Domain changes the domain part of JetStream API prefix.
func Domain(domain string) JSOpt {
if domain == _EMPTY_ {
return APIPrefix(_EMPTY_)
}
return jsOptFn(func(js *jsOpts) error {
js.domain = domain
js.pre = fmt.Sprintf(jsDomainT, domain)
return nil
})
}
func (s *StreamPurgeRequest) configureJSContext(js *jsOpts) error {
js.purgeOpts = s
return nil
}
func (s *StreamInfoRequest) configureJSContext(js *jsOpts) error {
js.streamInfoOpts = s
return nil
}
// APIPrefix changes the default prefix used for the JetStream API.
func APIPrefix(pre string) JSOpt {
return jsOptFn(func(js *jsOpts) error {
if pre == _EMPTY_ {
return nil
}
js.pre = pre
if !strings.HasSuffix(js.pre, ".") {
js.pre = js.pre + "."
}
return nil
})
}
// DirectGet is an option that can be used to make GetMsg() or GetLastMsg()
// retrieve message directly from a group of servers (leader and replicas)
// if the stream was created with the AllowDirect option.
func DirectGet() JSOpt {
return jsOptFn(func(js *jsOpts) error {
js.directGet = true
return nil
})
}
// DirectGetNext is an option that can be used to make GetMsg() retrieve message
// directly from a group of servers (leader and replicas) if the stream was
// created with the AllowDirect option.
// The server will find the next message matching the filter `subject` starting
// at the start sequence (argument in GetMsg()). The filter `subject` can be a
// wildcard.
func DirectGetNext(subject string) JSOpt {
return jsOptFn(func(js *jsOpts) error {
js.directGet = true
js.directNextFor = subject
return nil
})
}
// StreamListFilter is an option that can be used to configure `StreamsInfo()` and `StreamNames()` requests.
// It allows filtering the returned streams by subject associated with each stream.
// Wildcards can be used. For example, `StreamListFilter(FOO.*.A) will return
// all streams which have at least one subject matching the provided pattern (e.g. FOO.TEST.A).
func StreamListFilter(subject string) JSOpt {
return jsOptFn(func(opts *jsOpts) error {
opts.streamListSubject = subject
return nil
})
}
func (js *js) apiSubj(subj string) string {
if js.opts.pre == _EMPTY_ {
return subj
}
var b strings.Builder
b.WriteString(js.opts.pre)
b.WriteString(subj)
return b.String()
}
// PubOpt configures options for publishing JetStream messages.
type PubOpt interface {
configurePublish(opts *pubOpts) error
}
// pubOptFn is a function option used to configure JetStream Publish.
type pubOptFn func(opts *pubOpts) error
func (opt pubOptFn) configurePublish(opts *pubOpts) error {
return opt(opts)
}
type pubOpts struct {
ctx context.Context
ttl time.Duration
id string
lid string // Expected last msgId
str string // Expected stream name
seq *uint64 // Expected last sequence
lss *uint64 // Expected last sequence per subject
// Publish retries for NoResponders err.
rwait time.Duration // Retry wait between attempts
rnum int // Retry attempts
// stallWait is the max wait of a async pub ack.
stallWait time.Duration
}
// pubAckResponse is the ack response from the JetStream API when publishing a message.
type pubAckResponse struct {
apiResponse
*PubAck
}
// PubAck is an ack received after successfully publishing a message.
type PubAck struct {
Stream string `json:"stream"`
Sequence uint64 `json:"seq"`
Duplicate bool `json:"duplicate,omitempty"`
Domain string `json:"domain,omitempty"`
}
// Headers for published messages.
const (
MsgIdHdr = "Nats-Msg-Id"
ExpectedStreamHdr = "Nats-Expected-Stream"
ExpectedLastSeqHdr = "Nats-Expected-Last-Sequence"
ExpectedLastSubjSeqHdr = "Nats-Expected-Last-Subject-Sequence"
ExpectedLastMsgIdHdr = "Nats-Expected-Last-Msg-Id"
MsgRollup = "Nats-Rollup"
)
// Headers for republished messages and direct gets.
const (
JSStream = "Nats-Stream"
JSSequence = "Nats-Sequence"
JSTimeStamp = "Nats-Time-Stamp"
JSSubject = "Nats-Subject"
JSLastSequence = "Nats-Last-Sequence"
)
// MsgSize is a header that will be part of a consumer's delivered message if HeadersOnly requested.
const MsgSize = "Nats-Msg-Size"
// Rollups, can be subject only or all messages.
const (
MsgRollupSubject = "sub"
MsgRollupAll = "all"
)
// PublishMsg publishes a Msg to a stream from JetStream.
func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) {
var o = pubOpts{rwait: DefaultPubRetryWait, rnum: DefaultPubRetryAttempts}
if len(opts) > 0 {
if m.Header == nil {
m.Header = Header{}
}
for _, opt := range opts {
if err := opt.configurePublish(&o); err != nil {
return nil, err
}
}
}
// Check for option collisions. Right now just timeout and context.
if o.ctx != nil && o.ttl != 0 {
return nil, ErrContextAndTimeout
}
if o.ttl == 0 && o.ctx == nil {
o.ttl = js.opts.wait
}
if o.stallWait > 0 {
return nil, fmt.Errorf("nats: stall wait cannot be set to sync publish")
}
if o.id != _EMPTY_ {
m.Header.Set(MsgIdHdr, o.id)
}
if o.lid != _EMPTY_ {
m.Header.Set(ExpectedLastMsgIdHdr, o.lid)
}
if o.str != _EMPTY_ {
m.Header.Set(ExpectedStreamHdr, o.str)
}
if o.seq != nil {
m.Header.Set(ExpectedLastSeqHdr, strconv.FormatUint(*o.seq, 10))
}
if o.lss != nil {
m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(*o.lss, 10))
}
var resp *Msg
var err error
if o.ttl > 0 {
resp, err = js.nc.RequestMsg(m, time.Duration(o.ttl))
} else {
resp, err = js.nc.RequestMsgWithContext(o.ctx, m)
}
if err != nil {
for r, ttl := 0, o.ttl; errors.Is(err, ErrNoResponders) && (r < o.rnum || o.rnum < 0); r++ {
// To protect against small blips in leadership changes etc, if we get a no responders here retry.
if o.ctx != nil {
select {
case <-o.ctx.Done():
case <-time.After(o.rwait):
}
} else {
time.Sleep(o.rwait)
}
if o.ttl > 0 {
ttl -= o.rwait
if ttl <= 0 {
err = ErrTimeout
break
}
resp, err = js.nc.RequestMsg(m, time.Duration(ttl))
} else {
resp, err = js.nc.RequestMsgWithContext(o.ctx, m)
}
}
if err != nil {
if errors.Is(err, ErrNoResponders) {
err = ErrNoStreamResponse
}
return nil, err
}
}
var pa pubAckResponse
if err := json.Unmarshal(resp.Data, &pa); err != nil {
return nil, ErrInvalidJSAck
}
if pa.Error != nil {
return nil, pa.Error
}
if pa.PubAck == nil || pa.PubAck.Stream == _EMPTY_ {
return nil, ErrInvalidJSAck
}
return pa.PubAck, nil
}
// Publish publishes a message to a stream from JetStream.
func (js *js) Publish(subj string, data []byte, opts ...PubOpt) (*PubAck, error) {
return js.PublishMsg(&Msg{Subject: subj, Data: data}, opts...)
}
// PubAckFuture is a future for a PubAck.
type PubAckFuture interface {
// Ok returns a receive only channel that can be used to get a PubAck.
Ok() <-chan *PubAck
// Err returns a receive only channel that can be used to get the error from an async publish.
Err() <-chan error
// Msg returns the message that was sent to the server.
Msg() *Msg
}
type pubAckFuture struct {
js *js
msg *Msg
pa *PubAck
st time.Time
err error
errCh chan error
doneCh chan *PubAck
}
func (paf *pubAckFuture) Ok() <-chan *PubAck {
paf.js.mu.Lock()
defer paf.js.mu.Unlock()
if paf.doneCh == nil {
paf.doneCh = make(chan *PubAck, 1)
if paf.pa != nil {
paf.doneCh <- paf.pa
}
}
return paf.doneCh
}
func (paf *pubAckFuture) Err() <-chan error {
paf.js.mu.Lock()
defer paf.js.mu.Unlock()
if paf.errCh == nil {
paf.errCh = make(chan error, 1)
if paf.err != nil {
paf.errCh <- paf.err
}
}
return paf.errCh
}
func (paf *pubAckFuture) Msg() *Msg {
paf.js.mu.RLock()
defer paf.js.mu.RUnlock()
return paf.msg
}
// For quick token lookup etc.
const aReplyTokensize = 6
func (js *js) newAsyncReply() string {
js.mu.Lock()
if js.rsub == nil {
// Create our wildcard reply subject.
sha := sha256.New()
sha.Write([]byte(nuid.Next()))
b := sha.Sum(nil)
for i := 0; i < aReplyTokensize; i++ {
b[i] = rdigits[int(b[i]%base)]
}
js.rpre = fmt.Sprintf("%s%s.", js.replyPrefix, b[:aReplyTokensize])
sub, err := js.nc.Subscribe(fmt.Sprintf("%s*", js.rpre), js.handleAsyncReply)
if err != nil {
js.mu.Unlock()
return _EMPTY_
}
js.rsub = sub
js.rr = rand.New(rand.NewSource(time.Now().UnixNano()))
}
if js.connStatusCh == nil {
js.connStatusCh = js.nc.StatusChanged(RECONNECTING, CLOSED)
go js.resetPendingAcksOnReconnect()
}
var sb strings.Builder
sb.WriteString(js.rpre)
rn := js.rr.Int63()
var b [aReplyTokensize]byte
for i, l := 0, rn; i < len(b); i++ {
b[i] = rdigits[l%base]
l /= base
}
sb.Write(b[:])
js.mu.Unlock()
return sb.String()
}
func (js *js) resetPendingAcksOnReconnect() {
js.mu.Lock()
connStatusCh := js.connStatusCh
js.mu.Unlock()
for {
newStatus, ok := <-connStatusCh
if !ok || newStatus == CLOSED {
return
}
js.mu.Lock()
errCb := js.opts.aecb
for id, paf := range js.pafs {
paf.err = ErrDisconnected
if paf.errCh != nil {
paf.errCh <- paf.err
}
if errCb != nil {
// clear reply subject so that new one is created on republish
js.mu.Unlock()
errCb(js, paf.msg, ErrDisconnected)
js.mu.Lock()
}
delete(js.pafs, id)
}
if js.dch != nil {
close(js.dch)
js.dch = nil
}
js.mu.Unlock()
}
}
func (js *js) cleanupReplySub() {
js.mu.Lock()
if js.rsub != nil {
js.rsub.Unsubscribe()
js.rsub = nil
}
if js.connStatusCh != nil {
close(js.connStatusCh)
js.connStatusCh = nil
}
js.mu.Unlock()
}
// registerPAF will register for a PubAckFuture.
func (js *js) registerPAF(id string, paf *pubAckFuture) (int, int) {
js.mu.Lock()
if js.pafs == nil {
js.pafs = make(map[string]*pubAckFuture)
}
paf.js = js
js.pafs[id] = paf
np := len(js.pafs)
maxpa := js.opts.maxpa
js.mu.Unlock()
return np, maxpa
}
// Lock should be held.
func (js *js) getPAF(id string) *pubAckFuture {
if js.pafs == nil {
return nil
}
return js.pafs[id]
}
// clearPAF will remove a PubAckFuture that was registered.
func (js *js) clearPAF(id string) {
js.mu.Lock()
delete(js.pafs, id)
js.mu.Unlock()
}
// PublishAsyncPending returns how many PubAckFutures are pending.
func (js *js) PublishAsyncPending() int {
js.mu.RLock()
defer js.mu.RUnlock()
return len(js.pafs)
}
func (js *js) asyncStall() <-chan struct{} {
js.mu.Lock()
if js.stc == nil {
js.stc = make(chan struct{})
}
stc := js.stc
js.mu.Unlock()
return stc
}
// Handle an async reply from PublishAsync.
func (js *js) handleAsyncReply(m *Msg) {
if len(m.Subject) <= js.replyPrefixLen {
return
}
id := m.Subject[js.replyPrefixLen:]
js.mu.Lock()
paf := js.getPAF(id)
if paf == nil {
js.mu.Unlock()
return
}
// Remove
delete(js.pafs, id)
// Check on anyone stalled and waiting.
if js.stc != nil && len(js.pafs) < js.opts.maxpa {
close(js.stc)
js.stc = nil
}
// Check on anyone one waiting on done status.
if js.dch != nil && len(js.pafs) == 0 {
dch := js.dch
js.dch = nil
// Defer here so error is processed and can be checked.
defer close(dch)
}
doErr := func(err error) {
paf.err = err
if paf.errCh != nil {
paf.errCh <- paf.err
}
cb := js.opts.aecb
js.mu.Unlock()
if cb != nil {
cb(paf.js, paf.msg, err)
}
}
// Process no responders etc.
if len(m.Data) == 0 && m.Header.Get(statusHdr) == noResponders {
doErr(ErrNoResponders)
return
}
var pa pubAckResponse
if err := json.Unmarshal(m.Data, &pa); err != nil {
doErr(ErrInvalidJSAck)
return
}
if pa.Error != nil {
doErr(pa.Error)
return
}
if pa.PubAck == nil || pa.PubAck.Stream == _EMPTY_ {
doErr(ErrInvalidJSAck)
return
}
// So here we have received a proper puback.
paf.pa = pa.PubAck
if paf.doneCh != nil {
paf.doneCh <- paf.pa
}
js.mu.Unlock()
}
// MsgErrHandler is used to process asynchronous errors from
// JetStream PublishAsync. It will return the original
// message sent to the server for possible retransmitting and the error encountered.
type MsgErrHandler func(JetStream, *Msg, error)
// PublishAsyncErrHandler sets the error handler for async publishes in JetStream.
func PublishAsyncErrHandler(cb MsgErrHandler) JSOpt {
return jsOptFn(func(js *jsOpts) error {
js.aecb = cb
return nil
})
}
// PublishAsyncMaxPending sets the maximum outstanding async publishes that can be inflight at one time.
func PublishAsyncMaxPending(max int) JSOpt {
return jsOptFn(func(js *jsOpts) error {
if max < 1 {
return errors.New("nats: max ack pending should be >= 1")
}
js.maxpa = max
return nil
})
}
// PublishAsync publishes a message to JetStream and returns a PubAckFuture
func (js *js) PublishAsync(subj string, data []byte, opts ...PubOpt) (PubAckFuture, error) {
return js.PublishMsgAsync(&Msg{Subject: subj, Data: data}, opts...)
}
const defaultStallWait = 200 * time.Millisecond
func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) {
var o pubOpts
if len(opts) > 0 {
if m.Header == nil {
m.Header = Header{}
}
for _, opt := range opts {
if err := opt.configurePublish(&o); err != nil {
return nil, err
}
}
}
// Timeouts and contexts do not make sense for these.
if o.ttl != 0 || o.ctx != nil {
return nil, ErrContextAndTimeout
}
stallWait := defaultStallWait
if o.stallWait > 0 {
stallWait = o.stallWait
}
// FIXME(dlc) - Make common.
if o.id != _EMPTY_ {
m.Header.Set(MsgIdHdr, o.id)
}
if o.lid != _EMPTY_ {
m.Header.Set(ExpectedLastMsgIdHdr, o.lid)
}
if o.str != _EMPTY_ {
m.Header.Set(ExpectedStreamHdr, o.str)
}
if o.seq != nil {
m.Header.Set(ExpectedLastSeqHdr, strconv.FormatUint(*o.seq, 10))
}
if o.lss != nil {
m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(*o.lss, 10))
}
// Reply
if m.Reply != _EMPTY_ {
return nil, errors.New("nats: reply subject should be empty")
}
reply := m.Reply
m.Reply = js.newAsyncReply()
defer func() { m.Reply = reply }()
if m.Reply == _EMPTY_ {
return nil, errors.New("nats: error creating async reply handler")
}
id := m.Reply[js.replyPrefixLen:]
paf := &pubAckFuture{msg: m, st: time.Now()}
numPending, maxPending := js.registerPAF(id, paf)
if maxPending > 0 && numPending >= maxPending {
select {
case <-js.asyncStall():
case <-time.After(stallWait):
js.clearPAF(id)
return nil, errors.New("nats: stalled with too many outstanding async published messages")
}
}
if err := js.nc.PublishMsg(m); err != nil {
js.clearPAF(id)
return nil, err
}
return paf, nil
}
// PublishAsyncComplete returns a channel that will be closed when all outstanding messages have been ack'd.
func (js *js) PublishAsyncComplete() <-chan struct{} {
js.mu.Lock()
defer js.mu.Unlock()
if js.dch == nil {
js.dch = make(chan struct{})
}
dch := js.dch
if len(js.pafs) == 0 {
close(js.dch)
js.dch = nil
}
return dch
}
// MsgId sets the message ID used for deduplication.
func MsgId(id string) PubOpt {
return pubOptFn(func(opts *pubOpts) error {
opts.id = id
return nil
})
}
// ExpectStream sets the expected stream to respond from the publish.
func ExpectStream(stream string) PubOpt {
return pubOptFn(func(opts *pubOpts) error {
opts.str = stream
return nil
})
}
// ExpectLastSequence sets the expected sequence in the response from the publish.
func ExpectLastSequence(seq uint64) PubOpt {
return pubOptFn(func(opts *pubOpts) error {
opts.seq = &seq
return nil
})
}
// ExpectLastSequencePerSubject sets the expected sequence per subject in the response from the publish.
func ExpectLastSequencePerSubject(seq uint64) PubOpt {
return pubOptFn(func(opts *pubOpts) error {
opts.lss = &seq
return nil
})
}
// ExpectLastMsgId sets the expected last msgId in the response from the publish.
func ExpectLastMsgId(id string) PubOpt {
return pubOptFn(func(opts *pubOpts) error {
opts.lid = id
return nil
})
}
// RetryWait sets the retry wait time when ErrNoResponders is encountered.
func RetryWait(dur time.Duration) PubOpt {
return pubOptFn(func(opts *pubOpts) error {
opts.rwait = dur
return nil
})
}
// RetryAttempts sets the retry number of attempts when ErrNoResponders is encountered.
func RetryAttempts(num int) PubOpt {
return pubOptFn(func(opts *pubOpts) error {
opts.rnum = num
return nil
})
}
// StallWait sets the max wait when the producer becomes stall producing messages.
func StallWait(ttl time.Duration) PubOpt {
return pubOptFn(func(opts *pubOpts) error {
if ttl <= 0 {
return fmt.Errorf("nats: stall wait should be more than 0")
}
opts.stallWait = ttl
return nil
})
}
type ackOpts struct {
ttl time.Duration
ctx context.Context
nakDelay time.Duration
}
// AckOpt are the options that can be passed when acknowledge a message.
type AckOpt interface {
configureAck(opts *ackOpts) error
}
// MaxWait sets the maximum amount of time we will wait for a response.
type MaxWait time.Duration
func (ttl MaxWait) configureJSContext(js *jsOpts) error {
js.wait = time.Duration(ttl)
return nil
}
func (ttl MaxWait) configurePull(opts *pullOpts) error {
opts.ttl = time.Duration(ttl)
return nil
}
// AckWait sets the maximum amount of time we will wait for an ack.
type AckWait time.Duration
func (ttl AckWait) configurePublish(opts *pubOpts) error {
opts.ttl = time.Duration(ttl)
return nil
}
func (ttl AckWait) configureSubscribe(opts *subOpts) error {
opts.cfg.AckWait = time.Duration(ttl)
return nil
}
func (ttl AckWait) configureAck(opts *ackOpts) error {
opts.ttl = time.Duration(ttl)
return nil
}
// ContextOpt is an option used to set a context.Context.
type ContextOpt struct {
context.Context
}
func (ctx ContextOpt) configureJSContext(opts *jsOpts) error {
opts.ctx = ctx
return nil
}
func (ctx ContextOpt) configurePublish(opts *pubOpts) error {
opts.ctx = ctx
return nil
}
func (ctx ContextOpt) configureSubscribe(opts *subOpts) error {
opts.ctx = ctx
return nil
}
func (ctx ContextOpt) configurePull(opts *pullOpts) error {
opts.ctx = ctx
return nil
}
func (ctx ContextOpt) configureAck(opts *ackOpts) error {
opts.ctx = ctx
return nil
}
// Context returns an option that can be used to configure a context for APIs
// that are context aware such as those part of the JetStream interface.
func Context(ctx context.Context) ContextOpt {
return ContextOpt{ctx}
}
type nakDelay time.Duration
func (d nakDelay) configureAck(opts *ackOpts) error {
opts.nakDelay = time.Duration(d)
return nil
}
// Subscribe
// ConsumerConfig is the configuration of a JetStream consumer.
type ConsumerConfig struct {
Durable string `json:"durable_name,omitempty"`
Name string `json:"name,omitempty"`
Description string `json:"description,omitempty"`
DeliverPolicy DeliverPolicy `json:"deliver_policy"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
AckPolicy AckPolicy `json:"ack_policy"`
AckWait time.Duration `json:"ack_wait,omitempty"`
MaxDeliver int `json:"max_deliver,omitempty"`
BackOff []time.Duration `json:"backoff,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
FilterSubjects []string `json:"filter_subjects,omitempty"`
ReplayPolicy ReplayPolicy `json:"replay_policy"`
RateLimit uint64 `json:"rate_limit_bps,omitempty"` // Bits per sec
SampleFrequency string `json:"sample_freq,omitempty"`
MaxWaiting int `json:"max_waiting,omitempty"`
MaxAckPending int `json:"max_ack_pending,omitempty"`
FlowControl bool `json:"flow_control,omitempty"`
Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`
HeadersOnly bool `json:"headers_only,omitempty"`
// Pull based options.
MaxRequestBatch int `json:"max_batch,omitempty"`
MaxRequestExpires time.Duration `json:"max_expires,omitempty"`
MaxRequestMaxBytes int `json:"max_bytes,omitempty"`
// Push based consumers.
DeliverSubject string `json:"deliver_subject,omitempty"`
DeliverGroup string `json:"deliver_group,omitempty"`
// Inactivity threshold.
InactiveThreshold time.Duration `json:"inactive_threshold,omitempty"`
// Generally inherited by parent stream and other markers, now can be configured directly.
Replicas int `json:"num_replicas"`
// Force memory storage.
MemoryStorage bool `json:"mem_storage,omitempty"`
// Metadata is additional metadata for the Consumer.
// Keys starting with `_nats` are reserved.
// NOTE: Metadata requires nats-server v2.10.0+
Metadata map[string]string `json:"metadata,omitempty"`
}
// ConsumerInfo is the info from a JetStream consumer.
type ConsumerInfo struct {
Stream string `json:"stream_name"`
Name string `json:"name"`
Created time.Time `json:"created"`
Config ConsumerConfig `json:"config"`
Delivered SequenceInfo `json:"delivered"`
AckFloor SequenceInfo `json:"ack_floor"`
NumAckPending int `json:"num_ack_pending"`
NumRedelivered int `json:"num_redelivered"`
NumWaiting int `json:"num_waiting"`
NumPending uint64 `json:"num_pending"`
Cluster *ClusterInfo `json:"cluster,omitempty"`
PushBound bool `json:"push_bound,omitempty"`
}
// SequenceInfo has both the consumer and the stream sequence and last activity.
type SequenceInfo struct {
Consumer uint64 `json:"consumer_seq"`
Stream uint64 `json:"stream_seq"`
Last *time.Time `json:"last_active,omitempty"`
}
// SequencePair includes the consumer and stream sequence info from a JetStream consumer.
type SequencePair struct {
Consumer uint64 `json:"consumer_seq"`
Stream uint64 `json:"stream_seq"`
}
// nextRequest is for getting next messages for pull based consumers from JetStream.
type nextRequest struct {
Expires time.Duration `json:"expires,omitempty"`
Batch int `json:"batch,omitempty"`
NoWait bool `json:"no_wait,omitempty"`
MaxBytes int `json:"max_bytes,omitempty"`
Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`
}
// jsSub includes JetStream subscription info.
type jsSub struct {
js *js
// For pull subscribers, this is the next message subject to send requests to.
nms string
psubj string // the subject that was passed by user to the subscribe calls
consumer string
stream string
deliver string
pull bool
dc bool // Delete JS consumer
ackNone bool
// This is ConsumerInfo's Pending+Consumer.Delivered that we get from the
// add consumer response. Note that some versions of the server gather the
// consumer info *after* the creation of the consumer, which means that
// some messages may have been already delivered. So the sum of the two
// is a more accurate representation of the number of messages pending or
// in the process of being delivered to the subscription when created.
pending uint64
// Ordered consumers
ordered bool
dseq uint64
sseq uint64
ccreq *createConsumerRequest
// Heartbeats and Flow Control handling from push consumers.
hbc *time.Timer
hbi time.Duration
active bool
cmeta string
fcr string
fcd uint64
fciseq uint64
csfct *time.Timer
// Cancellation function to cancel context on drain/unsubscribe.
cancel func()
}
// Deletes the JS Consumer.
// No connection nor subscription lock must be held on entry.
func (sub *Subscription) deleteConsumer() error {
sub.mu.Lock()
jsi := sub.jsi
if jsi == nil {
sub.mu.Unlock()
return nil
}
if jsi.stream == _EMPTY_ || jsi.consumer == _EMPTY_ {
sub.mu.Unlock()
return nil
}
stream, consumer := jsi.stream, jsi.consumer
js := jsi.js
sub.mu.Unlock()
return js.DeleteConsumer(stream, consumer)
}
// SubOpt configures options for subscribing to JetStream consumers.
type SubOpt interface {
configureSubscribe(opts *subOpts) error
}
// subOptFn is a function option used to configure a JetStream Subscribe.
type subOptFn func(opts *subOpts) error
func (opt subOptFn) configureSubscribe(opts *subOpts) error {
return opt(opts)
}
// Subscribe creates an async Subscription for JetStream.
// The stream and consumer names can be provided with the nats.Bind() option.
// For creating an ephemeral (where the consumer name is picked by the server),
// you can provide the stream name with nats.BindStream().
// If no stream name is specified, the library will attempt to figure out which
// stream the subscription is for. See important notes below for more details.
//
// IMPORTANT NOTES:
// * If none of the options Bind() nor Durable() are specified, the library will
// send a request to the server to create an ephemeral JetStream consumer,
// which will be deleted after an Unsubscribe() or Drain(), or automatically
// by the server after a short period of time after the NATS subscription is
// gone.
// * If Durable() option is specified, the library will attempt to lookup a JetStream
// consumer with this name, and if found, will bind to it and not attempt to
// delete it. However, if not found, the library will send a request to create
// such durable JetStream consumer. The library will delete the JetStream consumer
// after an Unsubscribe() or Drain().
// * If Bind() option is provided, the library will attempt to lookup the
// consumer with the given name, and if successful, bind to it. If the lookup fails,
// then the Subscribe() call will return an error.
func (js *js) Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) {
if cb == nil {
return nil, ErrBadSubscription
}
return js.subscribe(subj, _EMPTY_, cb, nil, false, false, opts)
}
// SubscribeSync creates a Subscription that can be used to process messages synchronously.
// See important note in Subscribe()
func (js *js) SubscribeSync(subj string, opts ...SubOpt) (*Subscription, error) {
mch := make(chan *Msg, js.nc.Opts.SubChanLen)
return js.subscribe(subj, _EMPTY_, nil, mch, true, false, opts)
}
// QueueSubscribe creates a Subscription with a queue group.
// If no optional durable name nor binding options are specified, the queue name will be used as a durable name.
// See important note in Subscribe()
func (js *js) QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) {
if cb == nil {
return nil, ErrBadSubscription
}
return js.subscribe(subj, queue, cb, nil, false, false, opts)
}
// QueueSubscribeSync creates a Subscription with a queue group that can be used to process messages synchronously.
// See important note in QueueSubscribe()
func (js *js) QueueSubscribeSync(subj, queue string, opts ...SubOpt) (*Subscription, error) {
mch := make(chan *Msg, js.nc.Opts.SubChanLen)
return js.subscribe(subj, queue, nil, mch, true, false, opts)
}
// ChanSubscribe creates channel based Subscription.
// Using ChanSubscribe without buffered capacity is not recommended since
// it will be prone to dropping messages with a slow consumer error. Make sure to give the channel enough
// capacity to handle bursts in traffic, for example other Subscribe APIs use a default of 512k capacity in comparison.
// See important note in Subscribe()
func (js *js) ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) {
return js.subscribe(subj, _EMPTY_, nil, ch, false, false, opts)
}
// ChanQueueSubscribe creates channel based Subscription with a queue group.
// See important note in QueueSubscribe()
func (js *js) ChanQueueSubscribe(subj, queue string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) {
return js.subscribe(subj, queue, nil, ch, false, false, opts)
}
// PullSubscribe creates a Subscription that can fetch messages.
// See important note in Subscribe()
func (js *js) PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription, error) {
mch := make(chan *Msg, js.nc.Opts.SubChanLen)
if durable != "" {
opts = append(opts, Durable(durable))
}
return js.subscribe(subj, _EMPTY_, nil, mch, true, true, opts)
}
func processConsInfo(info *ConsumerInfo, userCfg *ConsumerConfig, isPullMode bool, subj, queue string) (string, error) {
ccfg := &info.Config
// Make sure this new subject matches or is a subset.
if ccfg.FilterSubject != _EMPTY_ && subj != ccfg.FilterSubject {
return _EMPTY_, ErrSubjectMismatch
}
// Prevent binding a subscription against incompatible consumer types.
if isPullMode && ccfg.DeliverSubject != _EMPTY_ {
return _EMPTY_, ErrPullSubscribeToPushConsumer
} else if !isPullMode && ccfg.DeliverSubject == _EMPTY_ {
return _EMPTY_, ErrPullSubscribeRequired
}
// If pull mode, nothing else to check here.
if isPullMode {
return _EMPTY_, checkConfig(ccfg, userCfg)
}
// At this point, we know the user wants push mode, and the JS consumer is
// really push mode.
dg := info.Config.DeliverGroup
if dg == _EMPTY_ {
// Prevent an user from attempting to create a queue subscription on
// a JS consumer that was not created with a deliver group.
if queue != _EMPTY_ {
return _EMPTY_, fmt.Errorf("cannot create a queue subscription for a consumer without a deliver group")
} else if info.PushBound {
// Need to reject a non queue subscription to a non queue consumer
// if the consumer is already bound.
return _EMPTY_, fmt.Errorf("consumer is already bound to a subscription")
}
} else {
// If the JS consumer has a deliver group, we need to fail a non queue
// subscription attempt:
if queue == _EMPTY_ {
return _EMPTY_, fmt.Errorf("cannot create a subscription for a consumer with a deliver group %q", dg)
} else if queue != dg {
// Here the user's queue group name does not match the one associated
// with the JS consumer.
return _EMPTY_, fmt.Errorf("cannot create a queue subscription %q for a consumer with a deliver group %q",
queue, dg)
}
}
if err := checkConfig(ccfg, userCfg); err != nil {
return _EMPTY_, err
}
return ccfg.DeliverSubject, nil
}
func checkConfig(s, u *ConsumerConfig) error {
makeErr := func(fieldName string, usrVal, srvVal any) error {
return fmt.Errorf("configuration requests %s to be %v, but consumer's value is %v", fieldName, usrVal, srvVal)
}
if u.Durable != _EMPTY_ && u.Durable != s.Durable {
return makeErr("durable", u.Durable, s.Durable)
}
if u.Description != _EMPTY_ && u.Description != s.Description {
return makeErr("description", u.Description, s.Description)
}
if u.DeliverPolicy != deliverPolicyNotSet && u.DeliverPolicy != s.DeliverPolicy {
return makeErr("deliver policy", u.DeliverPolicy, s.DeliverPolicy)
}
if u.OptStartSeq > 0 && u.OptStartSeq != s.OptStartSeq {
return makeErr("optional start sequence", u.OptStartSeq, s.OptStartSeq)
}
if u.OptStartTime != nil && !u.OptStartTime.IsZero() && !(*u.OptStartTime).Equal(*s.OptStartTime) {
return makeErr("optional start time", u.OptStartTime, s.OptStartTime)
}
if u.AckPolicy != ackPolicyNotSet && u.AckPolicy != s.AckPolicy {
return makeErr("ack policy", u.AckPolicy, s.AckPolicy)
}
if u.AckWait > 0 && u.AckWait != s.AckWait {
return makeErr("ack wait", u.AckWait, s.AckWait)
}
if u.MaxDeliver > 0 && u.MaxDeliver != s.MaxDeliver {
return makeErr("max deliver", u.MaxDeliver, s.MaxDeliver)
}
if u.ReplayPolicy != replayPolicyNotSet && u.ReplayPolicy != s.ReplayPolicy {
return makeErr("replay policy", u.ReplayPolicy, s.ReplayPolicy)
}
if u.RateLimit > 0 && u.RateLimit != s.RateLimit {
return makeErr("rate limit", u.RateLimit, s.RateLimit)
}
if u.SampleFrequency != _EMPTY_ && u.SampleFrequency != s.SampleFrequency {
return makeErr("sample frequency", u.SampleFrequency, s.SampleFrequency)
}
if u.MaxWaiting > 0 && u.MaxWaiting != s.MaxWaiting {
return makeErr("max waiting", u.MaxWaiting, s.MaxWaiting)
}
if u.MaxAckPending > 0 && u.MaxAckPending != s.MaxAckPending {
return makeErr("max ack pending", u.MaxAckPending, s.MaxAckPending)
}
// For flow control, we want to fail if the user explicit wanted it, but
// it is not set in the existing consumer. If it is not asked by the user,
// the library still handles it and so no reason to fail.
if u.FlowControl && !s.FlowControl {
return makeErr("flow control", u.FlowControl, s.FlowControl)
}
if u.Heartbeat > 0 && u.Heartbeat != s.Heartbeat {
return makeErr("heartbeat", u.Heartbeat, s.Heartbeat)
}
if u.Replicas > 0 && u.Replicas != s.Replicas {
return makeErr("replicas", u.Replicas, s.Replicas)
}
if u.MemoryStorage && !s.MemoryStorage {
return makeErr("memory storage", u.MemoryStorage, s.MemoryStorage)
}
return nil
}
func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, isPullMode bool, opts []SubOpt) (*Subscription, error) {
cfg := ConsumerConfig{
DeliverPolicy: deliverPolicyNotSet,
AckPolicy: ackPolicyNotSet,
ReplayPolicy: replayPolicyNotSet,
}
o := subOpts{cfg: &cfg}
if len(opts) > 0 {
for _, opt := range opts {
if opt == nil {
continue
}
if err := opt.configureSubscribe(&o); err != nil {
return nil, err
}
}
}
// If no stream name is specified, the subject cannot be empty.
if subj == _EMPTY_ && o.stream == _EMPTY_ {
return nil, fmt.Errorf("nats: subject required")
}
// Note that these may change based on the consumer info response we may get.
hasHeartbeats := o.cfg.Heartbeat > 0
hasFC := o.cfg.FlowControl
// Some checks for pull subscribers
if isPullMode {
// No deliver subject should be provided
if o.cfg.DeliverSubject != _EMPTY_ {
return nil, ErrPullSubscribeToPushConsumer
}
}
// Some check/setting specific to queue subs
if queue != _EMPTY_ {
// Queue subscriber cannot have HB or FC (since messages will be randomly dispatched
// to members). We may in the future have a separate NATS subscription that all members
// would subscribe to and server would send on.
if o.cfg.Heartbeat > 0 || o.cfg.FlowControl {
// Not making this a public ErrXXX in case we allow in the future.
return nil, fmt.Errorf("nats: queue subscription doesn't support idle heartbeat nor flow control")
}
// If this is a queue subscription and no consumer nor durable name was specified,
// then we will use the queue name as a durable name.
if o.consumer == _EMPTY_ && o.cfg.Durable == _EMPTY_ {
if err := checkConsumerName(queue); err != nil {
return nil, err
}
o.cfg.Durable = queue
}
}
var (
err error
shouldCreate bool
info *ConsumerInfo
deliver string
stream = o.stream
consumer = o.consumer
isDurable = o.cfg.Durable != _EMPTY_
consumerBound = o.bound
ctx = o.ctx
skipCInfo = o.skipCInfo
notFoundErr bool
lookupErr bool
nc = js.nc
nms string
hbi time.Duration
ccreq *createConsumerRequest // In case we need to hold onto it for ordered consumers.
maxap int
)
// Do some quick checks here for ordered consumers. We do these here instead of spread out
// in the individual SubOpts.
if o.ordered {
// Make sure we are not durable.
if isDurable {
return nil, fmt.Errorf("nats: durable can not be set for an ordered consumer")
}
// Check ack policy.
if o.cfg.AckPolicy != ackPolicyNotSet {
return nil, fmt.Errorf("nats: ack policy can not be set for an ordered consumer")
}
// Check max deliver.
if o.cfg.MaxDeliver != 1 && o.cfg.MaxDeliver != 0 {
return nil, fmt.Errorf("nats: max deliver can not be set for an ordered consumer")
}
// No deliver subject, we pick our own.
if o.cfg.DeliverSubject != _EMPTY_ {
return nil, fmt.Errorf("nats: deliver subject can not be set for an ordered consumer")
}
// Queue groups not allowed.
if queue != _EMPTY_ {
return nil, fmt.Errorf("nats: queues not be set for an ordered consumer")
}
// Check for bound consumers.
if consumer != _EMPTY_ {
return nil, fmt.Errorf("nats: can not bind existing consumer for an ordered consumer")
}
// Check for pull mode.
if isPullMode {
return nil, fmt.Errorf("nats: can not use pull mode for an ordered consumer")
}
// Setup how we need it to be here.
o.cfg.FlowControl = true
o.cfg.AckPolicy = AckNonePolicy
o.cfg.MaxDeliver = 1
o.cfg.AckWait = 22 * time.Hour // Just set to something known, not utilized.
// Force R1 and MemoryStorage for these.
o.cfg.Replicas = 1
o.cfg.MemoryStorage = true
if !hasHeartbeats {
o.cfg.Heartbeat = orderedHeartbeatsInterval
}
hasFC, hasHeartbeats = true, true
o.mack = true // To avoid auto-ack wrapping call below.
hbi = o.cfg.Heartbeat
}
// In case a consumer has not been set explicitly, then the
// durable name will be used as the consumer name.
if consumer == _EMPTY_ {
consumer = o.cfg.Durable
}
// Find the stream mapped to the subject if not bound to a stream already.
if stream == _EMPTY_ {
stream, err = js.StreamNameBySubject(subj)
if err != nil {
return nil, err
}
}
// With an explicit durable name, we can lookup the consumer first
// to which it should be attaching to.
// If SkipConsumerLookup was used, do not call consumer info.
if consumer != _EMPTY_ && !o.skipCInfo {
info, err = js.ConsumerInfo(stream, consumer)
notFoundErr = errors.Is(err, ErrConsumerNotFound)
lookupErr = err == ErrJetStreamNotEnabled || errors.Is(err, ErrTimeout) || errors.Is(err, context.DeadlineExceeded)
}
switch {
case info != nil:
deliver, err = processConsInfo(info, o.cfg, isPullMode, subj, queue)
if err != nil {
return nil, err
}
icfg := &info.Config
hasFC, hbi = icfg.FlowControl, icfg.Heartbeat
hasHeartbeats = hbi > 0
maxap = icfg.MaxAckPending
case (err != nil && !notFoundErr) || (notFoundErr && consumerBound):
// If the consumer is being bound and we got an error on pull subscribe then allow the error.
if !(isPullMode && lookupErr && consumerBound) {
return nil, err
}
case skipCInfo:
// When skipping consumer info, need to rely on the manually passed sub options
// to match the expected behavior from the subscription.
hasFC, hbi = o.cfg.FlowControl, o.cfg.Heartbeat
hasHeartbeats = hbi > 0
maxap = o.cfg.MaxAckPending
deliver = o.cfg.DeliverSubject
if consumerBound {
break
}
// When not bound to a consumer already, proceed to create.
fallthrough
default:
// Attempt to create consumer if not found nor using Bind.
shouldCreate = true
if o.cfg.DeliverSubject != _EMPTY_ {
deliver = o.cfg.DeliverSubject
} else if !isPullMode {
deliver = nc.NewInbox()
cfg.DeliverSubject = deliver
}
// Do filtering always, server will clear as needed.
cfg.FilterSubject = subj
// Pass the queue to the consumer config
if queue != _EMPTY_ {
cfg.DeliverGroup = queue
}
// If not set, default to deliver all
if cfg.DeliverPolicy == deliverPolicyNotSet {
cfg.DeliverPolicy = DeliverAllPolicy
}
// If not set, default to ack explicit.
if cfg.AckPolicy == ackPolicyNotSet {
cfg.AckPolicy = AckExplicitPolicy
}
// If not set, default to instant
if cfg.ReplayPolicy == replayPolicyNotSet {
cfg.ReplayPolicy = ReplayInstantPolicy
}
// If we have acks at all and the MaxAckPending is not set go ahead
// and set to the internal max for channel based consumers
if cfg.MaxAckPending == 0 && ch != nil && cfg.AckPolicy != AckNonePolicy {
cfg.MaxAckPending = cap(ch)
}
// Create request here.
ccreq = &createConsumerRequest{
Stream: stream,
Config: &cfg,
}
hbi = cfg.Heartbeat
}
if isPullMode {
nms = fmt.Sprintf(js.apiSubj(apiRequestNextT), stream, consumer)
deliver = nc.NewInbox()
// for pull consumers, create a wildcard subscription to differentiate pull requests
deliver += ".*"
}
// In case this has a context, then create a child context that
// is possible to cancel via unsubscribe / drain.
var cancel func()
if ctx != nil {
ctx, cancel = context.WithCancel(ctx)
}
jsi := &jsSub{
js: js,
stream: stream,
consumer: consumer,
deliver: deliver,
hbi: hbi,
ordered: o.ordered,
ccreq: ccreq,
dseq: 1,
pull: isPullMode,
nms: nms,
psubj: subj,
cancel: cancel,
ackNone: o.cfg.AckPolicy == AckNonePolicy,
}
// Auto acknowledge unless manual ack is set or policy is set to AckNonePolicy
if cb != nil && !o.mack && o.cfg.AckPolicy != AckNonePolicy {
ocb := cb
cb = func(m *Msg) { ocb(m); m.Ack() }
}
sub, err := nc.subscribe(deliver, queue, cb, ch, isSync, jsi)
if err != nil {
return nil, err
}
// If we fail and we had the sub we need to cleanup, but can't just do a straight Unsubscribe or Drain.
// We need to clear the jsi so we do not remove any durables etc.
cleanUpSub := func() {
if sub != nil {
sub.mu.Lock()
sub.jsi = nil
sub.mu.Unlock()
sub.Unsubscribe()
}
}
// If we are creating or updating let's process that request.
consName := o.cfg.Name
if shouldCreate {
if cfg.Durable != "" {
consName = cfg.Durable
} else if consName == "" {
consName = getHash(nuid.Next())
}
info, err := js.upsertConsumer(stream, consName, ccreq.Config)
if err != nil {
var apiErr *APIError
if ok := errors.As(err, &apiErr); !ok {
cleanUpSub()
return nil, err
}
if consumer == _EMPTY_ ||
(apiErr.ErrorCode != JSErrCodeConsumerAlreadyExists && apiErr.ErrorCode != JSErrCodeConsumerNameExists) {
cleanUpSub()
if errors.Is(apiErr, ErrStreamNotFound) {
return nil, ErrStreamNotFound
}
return nil, err
}
// We will not be using this sub here if we were push based.
if !isPullMode {
cleanUpSub()
}
info, err = js.ConsumerInfo(stream, consumer)
if err != nil {
return nil, err
}
deliver, err = processConsInfo(info, o.cfg, isPullMode, subj, queue)
if err != nil {
return nil, err
}
if !isPullMode {
// We can't reuse the channel, so if one was passed, we need to create a new one.
if isSync {
ch = make(chan *Msg, cap(ch))
} else if ch != nil {
// User provided (ChanSubscription), simply try to drain it.
for done := false; !done; {
select {
case <-ch:
default:
done = true
}
}
}
jsi.deliver = deliver
jsi.hbi = info.Config.Heartbeat
// Recreate the subscription here.
sub, err = nc.subscribe(jsi.deliver, queue, cb, ch, isSync, jsi)
if err != nil {
return nil, err
}
hasFC = info.Config.FlowControl
hasHeartbeats = info.Config.Heartbeat > 0
}
} else {
// Since the library created the JS consumer, it will delete it on Unsubscribe()/Drain()
sub.mu.Lock()
sub.jsi.dc = true
sub.jsi.pending = info.NumPending + info.Delivered.Consumer
// If this is an ephemeral, we did not have a consumer name, we get it from the info
// after the AddConsumer returns.
if consumer == _EMPTY_ {
sub.jsi.consumer = info.Name
if isPullMode {
sub.jsi.nms = fmt.Sprintf(js.apiSubj(apiRequestNextT), stream, info.Name)
}
}
sub.mu.Unlock()
}
// Capture max ack pending from the info response here which covers both
// success and failure followed by consumer lookup.
maxap = info.Config.MaxAckPending
}
// If maxap is greater than the default sub's pending limit, use that.
if maxap > DefaultSubPendingMsgsLimit {
// For bytes limit, use the min of maxp*1MB or DefaultSubPendingBytesLimit
bl := maxap * 1024 * 1024
if bl < DefaultSubPendingBytesLimit {
bl = DefaultSubPendingBytesLimit
}
if err := sub.SetPendingLimits(maxap, bl); err != nil {
return nil, err
}
}
// Do heartbeats last if needed.
if hasHeartbeats {
sub.scheduleHeartbeatCheck()
}
// For ChanSubscriptions, if we know that there is flow control, we will
// start a go routine that evaluates the number of delivered messages
// and process flow control.
if sub.Type() == ChanSubscription && hasFC {
sub.chanSubcheckForFlowControlResponse()
}
// Wait for context to get canceled if there is one.
if ctx != nil {
go func() {
<-ctx.Done()
sub.Unsubscribe()
}()
}
return sub, nil
}
// InitialConsumerPending returns the number of messages pending to be
// delivered to the consumer when the subscription was created.
func (sub *Subscription) InitialConsumerPending() (uint64, error) {
sub.mu.Lock()
defer sub.mu.Unlock()
if sub.jsi == nil || sub.jsi.consumer == _EMPTY_ {
return 0, fmt.Errorf("%w: not a JetStream subscription", ErrTypeSubscription)
}
return sub.jsi.pending, nil
}
// This long-lived routine is used per ChanSubscription to check
// on the number of delivered messages and check for flow control response.
func (sub *Subscription) chanSubcheckForFlowControlResponse() {
sub.mu.Lock()
// We don't use defer since if we need to send an RC reply, we need
// to do it outside the sub's lock. So doing explicit unlock...
if sub.closed {
sub.mu.Unlock()
return
}
var fcReply string
var nc *Conn
jsi := sub.jsi
if jsi.csfct == nil {
jsi.csfct = time.AfterFunc(chanSubFCCheckInterval, sub.chanSubcheckForFlowControlResponse)
} else {
fcReply = sub.checkForFlowControlResponse()
nc = sub.conn
// Do the reset here under the lock, it's ok...
jsi.csfct.Reset(chanSubFCCheckInterval)
}
sub.mu.Unlock()
// This call will return an error (which we don't care here)
// if nc is nil or fcReply is empty.
nc.Publish(fcReply, nil)
}
// ErrConsumerSequenceMismatch represents an error from a consumer
// that received a Heartbeat including sequence different to the
// one expected from the view of the client.
type ErrConsumerSequenceMismatch struct {
// StreamResumeSequence is the stream sequence from where the consumer
// should resume consuming from the stream.
StreamResumeSequence uint64
// ConsumerSequence is the sequence of the consumer that is behind.
ConsumerSequence uint64
// LastConsumerSequence is the sequence of the consumer when the heartbeat
// was received.
LastConsumerSequence uint64
}
func (ecs *ErrConsumerSequenceMismatch) Error() string {
return fmt.Sprintf("nats: sequence mismatch for consumer at sequence %d (%d sequences behind), should restart consumer from stream sequence %d",
ecs.ConsumerSequence,
ecs.LastConsumerSequence-ecs.ConsumerSequence,
ecs.StreamResumeSequence,
)
}
// isJSControlMessage will return true if this is an empty control status message
// and indicate what type of control message it is, say jsCtrlHB or jsCtrlFC
func isJSControlMessage(msg *Msg) (bool, int) {
if len(msg.Data) > 0 || msg.Header.Get(statusHdr) != controlMsg {
return false, 0
}
val := msg.Header.Get(descrHdr)
if strings.HasPrefix(val, "Idle") {
return true, jsCtrlHB
}
if strings.HasPrefix(val, "Flow") {
return true, jsCtrlFC
}
return true, 0
}
// Keeps track of the incoming message's reply subject so that the consumer's
// state (deliver sequence, etc..) can be checked against heartbeats.
// We will also bump the incoming data message sequence that is used in FC cases.
// Runs under the subscription lock
func (sub *Subscription) trackSequences(reply string) {
// For flow control, keep track of incoming message sequence.
sub.jsi.fciseq++
sub.jsi.cmeta = reply
}
// Check to make sure messages are arriving in order.
// Returns true if the sub had to be replaced. Will cause upper layers to return.
// The caller has verified that sub.jsi != nil and that this is not a control message.
// Lock should be held.
func (sub *Subscription) checkOrderedMsgs(m *Msg) bool {
// Ignore msgs with no reply like HBs and flow control, they are handled elsewhere.
if m.Reply == _EMPTY_ {
return false
}
// Normal message here.
tokens, err := parser.GetMetadataFields(m.Reply)
if err != nil {
return false
}
sseq, dseq := parser.ParseNum(tokens[parser.AckStreamSeqTokenPos]), parser.ParseNum(tokens[parser.AckConsumerSeqTokenPos])
jsi := sub.jsi
if dseq != jsi.dseq {
sub.resetOrderedConsumer(jsi.sseq + 1)
return true
}
// Update our tracking here.
jsi.dseq, jsi.sseq = dseq+1, sseq
return false
}
// Update and replace sid.
// Lock should be held on entry but will be unlocked to prevent lock inversion.
func (sub *Subscription) applyNewSID() (osid int64) {
nc := sub.conn
sub.mu.Unlock()
nc.subsMu.Lock()
osid = sub.sid
delete(nc.subs, osid)
// Place new one.
nc.ssid++
nsid := nc.ssid
nc.subs[nsid] = sub
nc.subsMu.Unlock()
sub.mu.Lock()
sub.sid = nsid
return osid
}
// We are here if we have detected a gap with an ordered consumer.
// We will create a new consumer and rewire the low level subscription.
// Lock should be held.
func (sub *Subscription) resetOrderedConsumer(sseq uint64) {
nc := sub.conn
if sub.jsi == nil || nc == nil || sub.closed {
return
}
var maxStr string
// If there was an AUTO_UNSUB done, we need to adjust the new value
// to send after the SUB for the new sid.
if sub.max > 0 {
if sub.jsi.fciseq < sub.max {
adjustedMax := sub.max - sub.jsi.fciseq
maxStr = strconv.Itoa(int(adjustedMax))
} else {
// We are already at the max, so we should just unsub the
// existing sub and be done
go func(sid int64) {
nc.mu.Lock()
nc.bw.appendString(fmt.Sprintf(unsubProto, sid, _EMPTY_))
nc.kickFlusher()
nc.mu.Unlock()
}(sub.sid)
return
}
}
// Quick unsubscribe. Since we know this is a simple push subscriber we do in place.
osid := sub.applyNewSID()
// Grab new inbox.
newDeliver := nc.NewInbox()
sub.Subject = newDeliver
// Snapshot the new sid under sub lock.
nsid := sub.sid
// We are still in the low level readLoop for the connection so we need
// to spin a go routine to try to create the new consumer.
go func() {
// Unsubscribe and subscribe with new inbox and sid.
// Remap a new low level sub into this sub since its client accessible.
// This is done here in this go routine to prevent lock inversion.
nc.mu.Lock()
nc.bw.appendString(fmt.Sprintf(unsubProto, osid, _EMPTY_))
nc.bw.appendString(fmt.Sprintf(subProto, newDeliver, _EMPTY_, nsid))
if maxStr != _EMPTY_ {
nc.bw.appendString(fmt.Sprintf(unsubProto, nsid, maxStr))
}
nc.kickFlusher()
nc.mu.Unlock()
pushErr := func(err error) {
nc.handleConsumerSequenceMismatch(sub, fmt.Errorf("%w: recreating ordered consumer", err))
nc.unsubscribe(sub, 0, true)
}
sub.mu.Lock()
jsi := sub.jsi
// Reset some items in jsi.
jsi.dseq = 1
jsi.cmeta = _EMPTY_
jsi.fcr, jsi.fcd = _EMPTY_, 0
jsi.deliver = newDeliver
// Reset consumer request for starting policy.
cfg := jsi.ccreq.Config
cfg.DeliverSubject = newDeliver
cfg.DeliverPolicy = DeliverByStartSequencePolicy
cfg.OptStartSeq = sseq
// In case the consumer was created with a start time, we need to clear it
// since we are now using a start sequence.
cfg.OptStartTime = nil
js := jsi.js
sub.mu.Unlock()
sub.mu.Lock()
// Attempt to delete the existing consumer.
// We don't wait for the response since even if it's unsuccessful,
// inactivity threshold will kick in and delete it.
if jsi.consumer != _EMPTY_ {
go js.DeleteConsumer(jsi.stream, jsi.consumer)
}
jsi.consumer = ""
sub.mu.Unlock()
consName := getHash(nuid.Next())
cinfo, err := js.upsertConsumer(jsi.stream, consName, cfg)
if err != nil {
var apiErr *APIError
if errors.Is(err, ErrJetStreamNotEnabled) || errors.Is(err, ErrTimeout) || errors.Is(err, context.DeadlineExceeded) {
// if creating consumer failed, retry
return
} else if errors.As(err, &apiErr) && apiErr.ErrorCode == JSErrCodeInsufficientResourcesErr {
// retry for insufficient resources, as it may mean that client is connected to a running
// server in cluster while the server hosting R1 JetStream resources is restarting
return
}
pushErr(err)
return
}
sub.mu.Lock()
jsi.consumer = cinfo.Name
sub.mu.Unlock()
}()
}
// For jetstream subscriptions, returns the number of delivered messages.
// For ChanSubscription, this value is computed based on the known number
// of messages added to the channel minus the current size of that channel.
// Lock held on entry
func (sub *Subscription) getJSDelivered() uint64 {
if sub.typ == ChanSubscription {
return sub.jsi.fciseq - uint64(len(sub.mch))
}
return sub.delivered
}
// checkForFlowControlResponse will check to see if we should send a flow control response
// based on the subscription current delivered index and the target.
// Runs under subscription lock
func (sub *Subscription) checkForFlowControlResponse() string {
// Caller has verified that there is a sub.jsi and fc
jsi := sub.jsi
jsi.active = true
if sub.getJSDelivered() >= jsi.fcd {
fcr := jsi.fcr
jsi.fcr, jsi.fcd = _EMPTY_, 0
return fcr
}
return _EMPTY_
}
// Record an inbound flow control message.
// Runs under subscription lock
func (sub *Subscription) scheduleFlowControlResponse(reply string) {
sub.jsi.fcr, sub.jsi.fcd = reply, sub.jsi.fciseq
}
// Checks for activity from our consumer.
// If we do not think we are active send an async error.
func (sub *Subscription) activityCheck() {
sub.mu.Lock()
jsi := sub.jsi
if jsi == nil || sub.closed {
sub.mu.Unlock()
return
}
active := jsi.active
jsi.hbc.Reset(jsi.hbi * hbcThresh)
jsi.active = false
nc := sub.conn
sub.mu.Unlock()
if !active {
if !jsi.ordered || nc.Status() != CONNECTED {
nc.mu.Lock()
if errCB := nc.Opts.AsyncErrorCB; errCB != nil {
nc.ach.push(func() { errCB(nc, sub, ErrConsumerNotActive) })
}
nc.mu.Unlock()
return
}
sub.mu.Lock()
sub.resetOrderedConsumer(jsi.sseq + 1)
sub.mu.Unlock()
}
}
// scheduleHeartbeatCheck sets up the timer check to make sure we are active
// or receiving idle heartbeats..
func (sub *Subscription) scheduleHeartbeatCheck() {
sub.mu.Lock()
defer sub.mu.Unlock()
jsi := sub.jsi
if jsi == nil {
return
}
if jsi.hbc == nil {
jsi.hbc = time.AfterFunc(jsi.hbi*hbcThresh, sub.activityCheck)
} else {
jsi.hbc.Reset(jsi.hbi * hbcThresh)
}
}
// handleConsumerSequenceMismatch will send an async error that can be used to restart a push based consumer.
func (nc *Conn) handleConsumerSequenceMismatch(sub *Subscription, err error) {
nc.mu.Lock()
errCB := nc.Opts.AsyncErrorCB
if errCB != nil {
nc.ach.push(func() { errCB(nc, sub, err) })
}
nc.mu.Unlock()
}
// checkForSequenceMismatch will make sure we have not missed any messages since last seen.
func (nc *Conn) checkForSequenceMismatch(msg *Msg, s *Subscription, jsi *jsSub) {
// Process heartbeat received, get latest control metadata if present.
s.mu.Lock()
ctrl, ordered := jsi.cmeta, jsi.ordered
jsi.active = true
s.mu.Unlock()
if ctrl == _EMPTY_ {
return
}
tokens, err := parser.GetMetadataFields(ctrl)
if err != nil {
return
}
// Consumer sequence.
var ldseq string
dseq := tokens[parser.AckConsumerSeqTokenPos]
hdr := msg.Header[lastConsumerSeqHdr]
if len(hdr) == 1 {
ldseq = hdr[0]
}
// Detect consumer sequence mismatch and whether
// should restart the consumer.
if ldseq != dseq {
// Dispatch async error including details such as
// from where the consumer could be restarted.
sseq := parser.ParseNum(tokens[parser.AckStreamSeqTokenPos])
if ordered {
s.mu.Lock()
s.resetOrderedConsumer(jsi.sseq + 1)
s.mu.Unlock()
} else {
ecs := &ErrConsumerSequenceMismatch{
StreamResumeSequence: uint64(sseq),
ConsumerSequence: parser.ParseNum(dseq),
LastConsumerSequence: parser.ParseNum(ldseq),
}
nc.handleConsumerSequenceMismatch(s, ecs)
}
}
}
type streamRequest struct {
Subject string `json:"subject,omitempty"`
}
type streamNamesResponse struct {
apiResponse
apiPaged
Streams []string `json:"streams"`
}
type subOpts struct {
// For attaching.
stream, consumer string
// For creating or updating.
cfg *ConsumerConfig
// For binding a subscription to a consumer without creating it.
bound bool
// For manual ack
mack bool
// For an ordered consumer.
ordered bool
ctx context.Context
// To disable calling ConsumerInfo
skipCInfo bool
}
// SkipConsumerLookup will omit looking up consumer when [Bind], [Durable]
// or [ConsumerName] are provided.
//
// NOTE: This setting may cause an existing consumer to be overwritten. Also,
// because consumer lookup is skipped, all consumer options like AckPolicy,
// DeliverSubject etc. need to be provided even if consumer already exists.
func SkipConsumerLookup() SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.skipCInfo = true
return nil
})
}
// OrderedConsumer will create a FIFO direct/ephemeral consumer for in order delivery of messages.
// There are no redeliveries and no acks, and flow control and heartbeats will be added but
// will be taken care of without additional client code.
func OrderedConsumer() SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.ordered = true
return nil
})
}
// ManualAck disables auto ack functionality for async subscriptions.
func ManualAck() SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.mack = true
return nil
})
}
// Description will set the description for the created consumer.
func Description(description string) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.Description = description
return nil
})
}
// Durable defines the consumer name for JetStream durable subscribers.
// This function will return ErrInvalidConsumerName if the name contains
// any dot ".".
func Durable(consumer string) SubOpt {
return subOptFn(func(opts *subOpts) error {
if opts.cfg.Durable != _EMPTY_ {
return fmt.Errorf("nats: option Durable set more than once")
}
if opts.consumer != _EMPTY_ && opts.consumer != consumer {
return fmt.Errorf("nats: duplicate consumer names (%s and %s)", opts.consumer, consumer)
}
if err := checkConsumerName(consumer); err != nil {
return err
}
opts.cfg.Durable = consumer
return nil
})
}
// DeliverAll will configure a Consumer to receive all the
// messages from a Stream.
func DeliverAll() SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.DeliverPolicy = DeliverAllPolicy
return nil
})
}
// DeliverLast configures a Consumer to receive messages
// starting with the latest one.
func DeliverLast() SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.DeliverPolicy = DeliverLastPolicy
return nil
})
}
// DeliverLastPerSubject configures a Consumer to receive messages
// starting with the latest one for each filtered subject.
func DeliverLastPerSubject() SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.DeliverPolicy = DeliverLastPerSubjectPolicy
return nil
})
}
// DeliverNew configures a Consumer to receive messages
// published after the subscription.
func DeliverNew() SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.DeliverPolicy = DeliverNewPolicy
return nil
})
}
// StartSequence configures a Consumer to receive
// messages from a start sequence.
func StartSequence(seq uint64) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.DeliverPolicy = DeliverByStartSequencePolicy
opts.cfg.OptStartSeq = seq
return nil
})
}
// StartTime configures a Consumer to receive
// messages from a start time.
func StartTime(startTime time.Time) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.DeliverPolicy = DeliverByStartTimePolicy
opts.cfg.OptStartTime = &startTime
return nil
})
}
// AckNone requires no acks for delivered messages.
func AckNone() SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.AckPolicy = AckNonePolicy
return nil
})
}
// AckAll when acking a sequence number, this implicitly acks all sequences
// below this one as well.
func AckAll() SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.AckPolicy = AckAllPolicy
return nil
})
}
// AckExplicit requires ack or nack for all messages.
func AckExplicit() SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.AckPolicy = AckExplicitPolicy
return nil
})
}
// MaxDeliver sets the number of redeliveries for a message.
func MaxDeliver(n int) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.MaxDeliver = n
return nil
})
}
// MaxAckPending sets the number of outstanding acks that are allowed before
// message delivery is halted.
func MaxAckPending(n int) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.MaxAckPending = n
return nil
})
}
// ReplayOriginal replays the messages at the original speed.
func ReplayOriginal() SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.ReplayPolicy = ReplayOriginalPolicy
return nil
})
}
// ReplayInstant replays the messages as fast as possible.
func ReplayInstant() SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.ReplayPolicy = ReplayInstantPolicy
return nil
})
}
// RateLimit is the Bits per sec rate limit applied to a push consumer.
func RateLimit(n uint64) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.RateLimit = n
return nil
})
}
// BackOff is an array of time durations that represent the time to delay based on delivery count.
func BackOff(backOff []time.Duration) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.BackOff = backOff
return nil
})
}
// BindStream binds a consumer to a stream explicitly based on a name.
// When a stream name is not specified, the library uses the subscribe
// subject as a way to find the stream name. It is done by making a request
// to the server to get list of stream names that have a filter for this
// subject. If the returned list contains a single stream, then this
// stream name will be used, otherwise the `ErrNoMatchingStream` is returned.
// To avoid the stream lookup, provide the stream name with this function.
// See also `Bind()`.
func BindStream(stream string) SubOpt {
return subOptFn(func(opts *subOpts) error {
if opts.stream != _EMPTY_ && opts.stream != stream {
return fmt.Errorf("nats: duplicate stream name (%s and %s)", opts.stream, stream)
}
opts.stream = stream
return nil
})
}
// Bind binds a subscription to an existing consumer from a stream without attempting to create.
// The first argument is the stream name and the second argument will be the consumer name.
func Bind(stream, consumer string) SubOpt {
return subOptFn(func(opts *subOpts) error {
if stream == _EMPTY_ {
return ErrStreamNameRequired
}
if consumer == _EMPTY_ {
return ErrConsumerNameRequired
}
// In case of pull subscribers, the durable name is a required parameter
// so check that they are not different.
if opts.cfg.Durable != _EMPTY_ && opts.cfg.Durable != consumer {
return fmt.Errorf("nats: duplicate consumer names (%s and %s)", opts.cfg.Durable, consumer)
}
if opts.stream != _EMPTY_ && opts.stream != stream {
return fmt.Errorf("nats: duplicate stream name (%s and %s)", opts.stream, stream)
}
opts.stream = stream
opts.consumer = consumer
opts.bound = true
return nil
})
}
// EnableFlowControl enables flow control for a push based consumer.
func EnableFlowControl() SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.FlowControl = true
return nil
})
}
// IdleHeartbeat enables push based consumers to have idle heartbeats delivered.
// For pull consumers, idle heartbeat has to be set on each [Fetch] call.
func IdleHeartbeat(duration time.Duration) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.Heartbeat = duration
return nil
})
}
// DeliverSubject specifies the JetStream consumer deliver subject.
//
// This option is used only in situations where the consumer does not exist
// and a creation request is sent to the server. If not provided, an inbox
// will be selected.
// If a consumer exists, then the NATS subscription will be created on
// the JetStream consumer's DeliverSubject, not necessarily this subject.
func DeliverSubject(subject string) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.DeliverSubject = subject
return nil
})
}
// HeadersOnly() will instruct the consumer to only deliver headers and no payloads.
func HeadersOnly() SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.HeadersOnly = true
return nil
})
}
// MaxRequestBatch sets the maximum pull consumer batch size that a Fetch()
// can request.
func MaxRequestBatch(max int) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.MaxRequestBatch = max
return nil
})
}
// MaxRequestExpires sets the maximum pull consumer request expiration that a
// Fetch() can request (using the Fetch's timeout value).
func MaxRequestExpires(max time.Duration) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.MaxRequestExpires = max
return nil
})
}
// MaxRequesMaxBytes sets the maximum pull consumer request bytes that a
// Fetch() can receive.
func MaxRequestMaxBytes(bytes int) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.MaxRequestMaxBytes = bytes
return nil
})
}
// InactiveThreshold indicates how long the server should keep a consumer
// after detecting a lack of activity. In NATS Server 2.8.4 and earlier, this
// option only applies to ephemeral consumers. In NATS Server 2.9.0 and later,
// this option applies to both ephemeral and durable consumers, allowing durable
// consumers to also be deleted automatically after the inactivity threshold has
// passed.
func InactiveThreshold(threshold time.Duration) SubOpt {
return subOptFn(func(opts *subOpts) error {
if threshold < 0 {
return fmt.Errorf("invalid InactiveThreshold value (%v), needs to be greater or equal to 0", threshold)
}
opts.cfg.InactiveThreshold = threshold
return nil
})
}
// ConsumerReplicas sets the number of replica count for a consumer.
func ConsumerReplicas(replicas int) SubOpt {
return subOptFn(func(opts *subOpts) error {
if replicas < 1 {
return fmt.Errorf("invalid ConsumerReplicas value (%v), needs to be greater than 0", replicas)
}
opts.cfg.Replicas = replicas
return nil
})
}
// ConsumerMemoryStorage sets the memory storage to true for a consumer.
func ConsumerMemoryStorage() SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.MemoryStorage = true
return nil
})
}
// ConsumerName sets the name for a consumer.
func ConsumerName(name string) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.Name = name
return nil
})
}
// ConsumerFilterSubjects can be used to set multiple subject filters on the consumer.
// It has to be used in conjunction with [nats.BindStream] and
// with empty 'subject' parameter.
func ConsumerFilterSubjects(subjects ...string) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.FilterSubjects = subjects
return nil
})
}
func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) {
sub.mu.Lock()
// TODO(dlc) - Better way to mark especially if we attach.
if sub.jsi == nil || sub.jsi.consumer == _EMPTY_ {
sub.mu.Unlock()
return nil, ErrTypeSubscription
}
// Consumer info lookup should fail if in direct mode.
js := sub.jsi.js
stream, consumer := sub.jsi.stream, sub.jsi.consumer
sub.mu.Unlock()
return js.getConsumerInfo(stream, consumer)
}
type pullOpts struct {
maxBytes int
ttl time.Duration
ctx context.Context
hb time.Duration
}
// PullOpt are the options that can be passed when pulling a batch of messages.
type PullOpt interface {
configurePull(opts *pullOpts) error
}
// PullMaxWaiting defines the max inflight pull requests.
func PullMaxWaiting(n int) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.MaxWaiting = n
return nil
})
}
type PullHeartbeat time.Duration
func (h PullHeartbeat) configurePull(opts *pullOpts) error {
if h <= 0 {
return fmt.Errorf("%w: idle heartbeat has to be greater than 0", ErrInvalidArg)
}
opts.hb = time.Duration(h)
return nil
}
// PullMaxBytes defines the max bytes allowed for a fetch request.
type PullMaxBytes int
func (n PullMaxBytes) configurePull(opts *pullOpts) error {
opts.maxBytes = int(n)
return nil
}
var (
// errNoMessages is an error that a Fetch request using no_wait can receive to signal
// that there are no more messages available.
errNoMessages = errors.New("nats: no messages")
// errRequestsPending is an error that represents a sub.Fetch requests that was using
// no_wait and expires time got discarded by the server.
errRequestsPending = errors.New("nats: requests pending")
)
// Returns if the given message is a user message or not, and if
// `checkSts` is true, returns appropriate error based on the
// content of the status (404, etc..)
func checkMsg(msg *Msg, checkSts, isNoWait bool) (usrMsg bool, err error) {
// Assume user message
usrMsg = true
// If payload or no header, consider this a user message
if len(msg.Data) > 0 || len(msg.Header) == 0 {
return
}
// Look for status header
val := msg.Header.Get(statusHdr)
// If not present, then this is considered a user message
if val == _EMPTY_ {
return
}
// At this point, this is not a user message since there is
// no payload and a "Status" header.
usrMsg = false
// If we don't care about status, we are done.
if !checkSts {
return
}
// if it's a heartbeat message, report as not user msg
if isHb, _ := isJSControlMessage(msg); isHb {
return
}
switch val {
case noResponders:
err = ErrNoResponders
case noMessagesSts:
// 404 indicates that there are no messages.
err = errNoMessages
case reqTimeoutSts:
// In case of a fetch request with no wait request and expires time,
// need to skip 408 errors and retry.
if isNoWait {
err = errRequestsPending
} else {
// Older servers may send a 408 when a request in the server was expired
// and interest is still found, which will be the case for our
// implementation. Regardless, ignore 408 errors until receiving at least
// one message when making requests without no_wait.
err = ErrTimeout
}
case jetStream409Sts:
if strings.Contains(strings.ToLower(msg.Header.Get(descrHdr)), "consumer deleted") {
err = ErrConsumerDeleted
break
}
if strings.Contains(strings.ToLower(msg.Header.Get(descrHdr)), "leadership change") {
err = ErrConsumerLeadershipChanged
break
}
fallthrough
default:
err = fmt.Errorf("nats: %s", msg.Header.Get(descrHdr))
}
return
}
// Fetch pulls a batch of messages from a stream for a pull consumer.
func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
if sub == nil {
return nil, ErrBadSubscription
}
if batch < 1 {
return nil, ErrInvalidArg
}
var o pullOpts
for _, opt := range opts {
if err := opt.configurePull(&o); err != nil {
return nil, err
}
}
if o.ctx != nil && o.ttl != 0 {
return nil, ErrContextAndTimeout
}
sub.mu.Lock()
jsi := sub.jsi
// Reject if this is not a pull subscription. Note that sub.typ is SyncSubscription,
// so check for jsi.pull boolean instead.
if jsi == nil || !jsi.pull {
sub.mu.Unlock()
return nil, ErrTypeSubscription
}
nc := sub.conn
nms := sub.jsi.nms
rply, _ := newFetchInbox(jsi.deliver)
js := sub.jsi.js
pmc := len(sub.mch) > 0
// All fetch requests have an expiration, in case of no explicit expiration
// then the default timeout of the JetStream context is used.
ttl := o.ttl
if ttl == 0 {
ttl = js.opts.wait
}
sub.mu.Unlock()
// Use the given context or setup a default one for the span
// of the pull batch request.
var (
ctx = o.ctx
err error
cancel context.CancelFunc
)
if ctx == nil {
ctx, cancel = context.WithTimeout(context.Background(), ttl)
} else if _, hasDeadline := ctx.Deadline(); !hasDeadline {
// Prevent from passing the background context which will just block
// and cannot be canceled either.
if octx, ok := ctx.(ContextOpt); ok && octx.Context == context.Background() {
return nil, ErrNoDeadlineContext
}
// If the context did not have a deadline, then create a new child context
// that will use the default timeout from the JS context.
ctx, cancel = context.WithTimeout(ctx, ttl)
} else {
ctx, cancel = context.WithCancel(ctx)
}
defer cancel()
// if heartbeat is set, validate it against the context timeout
if o.hb > 0 {
deadline, _ := ctx.Deadline()
if 2*o.hb >= time.Until(deadline) {
return nil, fmt.Errorf("%w: idle heartbeat value too large", ErrInvalidArg)
}
}
// Check if context not done already before making the request.
select {
case <-ctx.Done():
if o.ctx != nil { // Timeout or Cancel triggered by context object option
err = ctx.Err()
} else { // Timeout triggered by timeout option
err = ErrTimeout
}
default:
}
if err != nil {
return nil, err
}
var (
msgs = make([]*Msg, 0, batch)
msg *Msg
)
for pmc && len(msgs) < batch {
// Check next msg with booleans that say that this is an internal call
// for a pull subscribe (so don't reject it) and don't wait if there
// are no messages.
msg, err = sub.nextMsgWithContext(ctx, true, false)
if err != nil {
if errors.Is(err, errNoMessages) {
err = nil
}
break
}
// Check msg but just to determine if this is a user message
// or status message, however, we don't care about values of status
// messages at this point in the Fetch() call, so checkMsg can't
// return an error.
if usrMsg, _ := checkMsg(msg, false, false); usrMsg {
msgs = append(msgs, msg)
}
}
var hbTimer *time.Timer
var hbErr error
sub.mu.Lock()
subClosed := sub.closed || sub.draining
sub.mu.Unlock()
if subClosed {
err = errors.Join(ErrBadSubscription, ErrSubscriptionClosed)
}
hbLock := sync.Mutex{}
if err == nil && len(msgs) < batch && !subClosed {
// For batch real size of 1, it does not make sense to set no_wait in
// the request.
noWait := batch-len(msgs) > 1
var nr nextRequest
sendReq := func() error {
// The current deadline for the context will be used
// to set the expires TTL for a fetch request.
deadline, _ := ctx.Deadline()
ttl = time.Until(deadline)
// Check if context has already been canceled or expired.
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// Make our request expiration a bit shorter than the current timeout.
expires := ttl
if ttl >= 20*time.Millisecond {
expires = ttl - 10*time.Millisecond
}
nr.Batch = batch - len(msgs)
nr.Expires = expires
nr.NoWait = noWait
nr.MaxBytes = o.maxBytes
if 2*o.hb < expires {
nr.Heartbeat = o.hb
} else {
nr.Heartbeat = 0
}
req, _ := json.Marshal(nr)
if err := nc.PublishRequest(nms, rply, req); err != nil {
return err
}
if o.hb > 0 {
if hbTimer == nil {
hbTimer = time.AfterFunc(2*o.hb, func() {
hbLock.Lock()
hbErr = ErrNoHeartbeat
hbLock.Unlock()
cancel()
})
} else {
hbTimer.Reset(2 * o.hb)
}
}
return nil
}
err = sendReq()
for err == nil && len(msgs) < batch {
// Ask for next message and wait if there are no messages
msg, err = sub.nextMsgWithContext(ctx, true, true)
if err == nil {
if hbTimer != nil {
hbTimer.Reset(2 * o.hb)
}
var usrMsg bool
usrMsg, err = checkMsg(msg, true, noWait)
if err == nil && usrMsg {
msgs = append(msgs, msg)
} else if noWait && (errors.Is(err, errNoMessages) || errors.Is(err, errRequestsPending)) && len(msgs) == 0 {
// If we have a 404/408 for our "no_wait" request and have
// not collected any message, then resend request to
// wait this time.
noWait = false
err = sendReq()
} else if errors.Is(err, ErrTimeout) && len(msgs) == 0 {
// If we get a 408, we will bail if we already collected some
// messages, otherwise ignore and go back calling nextMsg.
err = nil
}
}
}
if hbTimer != nil {
hbTimer.Stop()
}
}
// If there is at least a message added to msgs, then need to return OK and no error
if err != nil && len(msgs) == 0 {
hbLock.Lock()
defer hbLock.Unlock()
if hbErr != nil {
return nil, hbErr
}
return nil, o.checkCtxErr(err)
}
return msgs, nil
}
// newFetchInbox returns subject used as reply subject when sending pull requests
// as well as request ID. For non-wildcard subject, request ID is empty and
// passed subject is not transformed
func newFetchInbox(subj string) (string, string) {
if !strings.HasSuffix(subj, ".*") {
return subj, ""
}
reqID := nuid.Next()
var sb strings.Builder
sb.WriteString(subj[:len(subj)-1])
sb.WriteString(reqID)
return sb.String(), reqID
}
func subjectMatchesReqID(subject, reqID string) bool {
subjectParts := strings.Split(subject, ".")
if len(subjectParts) < 2 {
return false
}
return subjectParts[len(subjectParts)-1] == reqID
}
// MessageBatch provides methods to retrieve messages consumed using [Subscribe.FetchBatch].
type MessageBatch interface {
// Messages returns a channel on which messages will be published.
Messages() <-chan *Msg
// Error returns an error encountered when fetching messages.
Error() error
// Done signals end of execution.
Done() <-chan struct{}
}
type messageBatch struct {
msgs chan *Msg
err error
done chan struct{}
}
func (mb *messageBatch) Messages() <-chan *Msg {
return mb.msgs
}
func (mb *messageBatch) Error() error {
return mb.err
}
func (mb *messageBatch) Done() <-chan struct{} {
return mb.done
}
// FetchBatch pulls a batch of messages from a stream for a pull consumer.
// Unlike [Subscription.Fetch], it is non blocking and returns [MessageBatch],
// allowing to retrieve incoming messages from a channel.
// The returned channel is always closed after all messages for a batch have been
// delivered by the server - it is safe to iterate over it using range.
//
// To avoid using default JetStream timeout as fetch expiry time, use [nats.MaxWait]
// or [nats.Context] (with deadline set).
//
// This method will not return error in case of pull request expiry (even if there are no messages).
// Any other error encountered when receiving messages will cause FetchBatch to stop receiving new messages.
func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, error) {
if sub == nil {
return nil, ErrBadSubscription
}
if batch < 1 {
return nil, ErrInvalidArg
}
var o pullOpts
for _, opt := range opts {
if err := opt.configurePull(&o); err != nil {
return nil, err
}
}
if o.ctx != nil && o.ttl != 0 {
return nil, ErrContextAndTimeout
}
sub.mu.Lock()
jsi := sub.jsi
// Reject if this is not a pull subscription. Note that sub.typ is SyncSubscription,
// so check for jsi.pull boolean instead.
if jsi == nil || !jsi.pull {
sub.mu.Unlock()
return nil, ErrTypeSubscription
}
nc := sub.conn
nms := sub.jsi.nms
rply, reqID := newFetchInbox(sub.jsi.deliver)
js := sub.jsi.js
pmc := len(sub.mch) > 0
// All fetch requests have an expiration, in case of no explicit expiration
// then the default timeout of the JetStream context is used.
ttl := o.ttl
if ttl == 0 {
ttl = js.opts.wait
}
sub.mu.Unlock()
// Use the given context or setup a default one for the span
// of the pull batch request.
var (
ctx = o.ctx
cancel context.CancelFunc
cancelContext = true
)
if ctx == nil {
ctx, cancel = context.WithTimeout(context.Background(), ttl)
} else if _, hasDeadline := ctx.Deadline(); !hasDeadline {
// Prevent from passing the background context which will just block
// and cannot be canceled either.
if octx, ok := ctx.(ContextOpt); ok && octx.Context == context.Background() {
return nil, ErrNoDeadlineContext
}
// If the context did not have a deadline, then create a new child context
// that will use the default timeout from the JS context.
ctx, cancel = context.WithTimeout(ctx, ttl)
} else {
ctx, cancel = context.WithCancel(ctx)
}
defer func() {
// only cancel the context here if we are sure the fetching goroutine has not been started yet
if cancelContext {
cancel()
}
}()
// if heartbeat is set, validate it against the context timeout
if o.hb > 0 {
deadline, _ := ctx.Deadline()
if 2*o.hb >= time.Until(deadline) {
return nil, fmt.Errorf("%w: idle heartbeat value too large", ErrInvalidArg)
}
}
// Check if context not done already before making the request.
select {
case <-ctx.Done():
if o.ctx != nil { // Timeout or Cancel triggered by context object option
return nil, ctx.Err()
} else { // Timeout triggered by timeout option
return nil, ErrTimeout
}
default:
}
result := &messageBatch{
msgs: make(chan *Msg, batch),
done: make(chan struct{}, 1),
}
var msg *Msg
for pmc && len(result.msgs) < batch {
// Check next msg with booleans that say that this is an internal call
// for a pull subscribe (so don't reject it) and don't wait if there
// are no messages.
msg, err := sub.nextMsgWithContext(ctx, true, false)
if err != nil {
if errors.Is(err, errNoMessages) {
err = nil
}
result.err = err
break
}
// Check msg but just to determine if this is a user message
// or status message, however, we don't care about values of status
// messages at this point in the Fetch() call, so checkMsg can't
// return an error.
if usrMsg, _ := checkMsg(msg, false, false); usrMsg {
result.msgs <- msg
}
}
sub.mu.Lock()
subClosed := sub.closed || sub.draining
sub.mu.Unlock()
if len(result.msgs) == batch || result.err != nil || subClosed {
close(result.msgs)
if subClosed && len(result.msgs) == 0 {
return nil, errors.Join(ErrBadSubscription, ErrSubscriptionClosed)
}
result.done <- struct{}{}
return result, nil
}
deadline, _ := ctx.Deadline()
ttl = time.Until(deadline)
// Make our request expiration a bit shorter than the current timeout.
expires := ttl
if ttl >= 20*time.Millisecond {
expires = ttl - 10*time.Millisecond
}
requestBatch := batch - len(result.msgs)
req := nextRequest{
Expires: expires,
Batch: requestBatch,
MaxBytes: o.maxBytes,
Heartbeat: o.hb,
}
reqJSON, err := json.Marshal(req)
if err != nil {
close(result.msgs)
result.done <- struct{}{}
result.err = err
return result, nil
}
if err := nc.PublishRequest(nms, rply, reqJSON); err != nil {
if len(result.msgs) == 0 {
return nil, err
}
close(result.msgs)
result.done <- struct{}{}
result.err = err
return result, nil
}
var hbTimer *time.Timer
var hbErr error
hbLock := sync.Mutex{}
if o.hb > 0 {
hbTimer = time.AfterFunc(2*o.hb, func() {
hbLock.Lock()
hbErr = ErrNoHeartbeat
hbLock.Unlock()
cancel()
})
}
cancelContext = false
go func() {
defer cancel()
var requestMsgs int
for requestMsgs < requestBatch {
// Ask for next message and wait if there are no messages
msg, err = sub.nextMsgWithContext(ctx, true, true)
if err != nil {
break
}
if hbTimer != nil {
hbTimer.Reset(2 * o.hb)
}
var usrMsg bool
usrMsg, err = checkMsg(msg, true, false)
if err != nil {
if errors.Is(err, ErrTimeout) {
if reqID != "" && !subjectMatchesReqID(msg.Subject, reqID) {
// ignore timeout message from server if it comes from a different pull request
continue
}
err = nil
}
break
}
if usrMsg {
result.msgs <- msg
requestMsgs++
}
}
if err != nil {
hbLock.Lock()
if hbErr != nil {
result.err = hbErr
} else {
result.err = o.checkCtxErr(err)
}
hbLock.Unlock()
}
close(result.msgs)
result.done <- struct{}{}
}()
return result, nil
}
// checkCtxErr is used to determine whether ErrTimeout should be returned in case of context timeout
func (o *pullOpts) checkCtxErr(err error) error {
if o.ctx == nil && errors.Is(err, context.DeadlineExceeded) {
return ErrTimeout
}
return err
}
func (js *js) getConsumerInfo(stream, consumer string) (*ConsumerInfo, error) {
ctx, cancel := context.WithTimeout(context.Background(), js.opts.wait)
defer cancel()
return js.getConsumerInfoContext(ctx, stream, consumer)
}
func (js *js) getConsumerInfoContext(ctx context.Context, stream, consumer string) (*ConsumerInfo, error) {
ccInfoSubj := fmt.Sprintf(apiConsumerInfoT, stream, consumer)
resp, err := js.apiRequestWithContext(ctx, js.apiSubj(ccInfoSubj), nil)
if err != nil {
if errors.Is(err, ErrNoResponders) {
err = ErrJetStreamNotEnabled
}
return nil, err
}
var info consumerResponse
if err := json.Unmarshal(resp.Data, &info); err != nil {
return nil, err
}
if info.Error != nil {
if errors.Is(info.Error, ErrConsumerNotFound) {
return nil, ErrConsumerNotFound
}
if errors.Is(info.Error, ErrStreamNotFound) {
return nil, ErrStreamNotFound
}
return nil, info.Error
}
if info.Error == nil && info.ConsumerInfo == nil {
return nil, ErrConsumerNotFound
}
return info.ConsumerInfo, nil
}
// a RequestWithContext with tracing via TraceCB
func (js *js) apiRequestWithContext(ctx context.Context, subj string, data []byte) (*Msg, error) {
if js.opts.shouldTrace {
ctrace := js.opts.ctrace
if ctrace.RequestSent != nil {
ctrace.RequestSent(subj, data)
}
}
resp, err := js.nc.RequestWithContext(ctx, subj, data)
if err != nil {
return nil, err
}
if js.opts.shouldTrace {
ctrace := js.opts.ctrace
if ctrace.RequestSent != nil {
ctrace.ResponseReceived(subj, resp.Data, resp.Header)
}
}
return resp, nil
}
func (m *Msg) checkReply() error {
if m == nil || m.Sub == nil {
return ErrMsgNotBound
}
if m.Reply == _EMPTY_ {
return ErrMsgNoReply
}
return nil
}
// ackReply handles all acks. Will do the right thing for pull and sync mode.
// It ensures that an ack is only sent a single time, regardless of
// how many times it is being called to avoid duplicated acks.
func (m *Msg) ackReply(ackType []byte, sync bool, opts ...AckOpt) error {
var o ackOpts
for _, opt := range opts {
if err := opt.configureAck(&o); err != nil {
return err
}
}
if err := m.checkReply(); err != nil {
return err
}
var ackNone bool
var js *js
sub := m.Sub
sub.mu.Lock()
nc := sub.conn
if jsi := sub.jsi; jsi != nil {
js = jsi.js
ackNone = jsi.ackNone
}
sub.mu.Unlock()
// Skip if already acked.
if atomic.LoadUint32(&m.ackd) == 1 {
return ErrMsgAlreadyAckd
}
if ackNone {
return ErrCantAckIfConsumerAckNone
}
usesCtx := o.ctx != nil
usesWait := o.ttl > 0
// Only allow either AckWait or Context option to set the timeout.
if usesWait && usesCtx {
return ErrContextAndTimeout
}
sync = sync || usesCtx || usesWait
ctx := o.ctx
wait := defaultRequestWait
if usesWait {
wait = o.ttl
} else if js != nil {
wait = js.opts.wait
}
var body []byte
var err error
// This will be > 0 only when called from NakWithDelay()
if o.nakDelay > 0 {
body = []byte(fmt.Sprintf("%s {\"delay\": %d}", ackType, o.nakDelay.Nanoseconds()))
} else {
body = ackType
}
if sync {
if usesCtx {
_, err = nc.RequestWithContext(ctx, m.Reply, body)
} else {
_, err = nc.Request(m.Reply, body, wait)
}
} else {
err = nc.Publish(m.Reply, body)
}
// Mark that the message has been acked unless it is ackProgress
// which can be sent many times.
if err == nil && !bytes.Equal(ackType, ackProgress) {
atomic.StoreUint32(&m.ackd, 1)
}
return err
}
// Ack acknowledges a message. This tells the server that the message was
// successfully processed and it can move on to the next message.
func (m *Msg) Ack(opts ...AckOpt) error {
return m.ackReply(ackAck, false, opts...)
}
// AckSync is the synchronous version of Ack. This indicates successful message
// processing.
func (m *Msg) AckSync(opts ...AckOpt) error {
return m.ackReply(ackAck, true, opts...)
}
// Nak negatively acknowledges a message. This tells the server to redeliver
// the message. You can configure the number of redeliveries by passing
// nats.MaxDeliver when you Subscribe. The default is infinite redeliveries.
func (m *Msg) Nak(opts ...AckOpt) error {
return m.ackReply(ackNak, false, opts...)
}
// Nak negatively acknowledges a message. This tells the server to redeliver
// the message after the give `delay` duration. You can configure the number
// of redeliveries by passing nats.MaxDeliver when you Subscribe.
// The default is infinite redeliveries.
func (m *Msg) NakWithDelay(delay time.Duration, opts ...AckOpt) error {
if delay > 0 {
opts = append(opts, nakDelay(delay))
}
return m.ackReply(ackNak, false, opts...)
}
// Term tells the server to not redeliver this message, regardless of the value
// of nats.MaxDeliver.
func (m *Msg) Term(opts ...AckOpt) error {
return m.ackReply(ackTerm, false, opts...)
}
// InProgress tells the server that this message is being worked on. It resets
// the redelivery timer on the server.
func (m *Msg) InProgress(opts ...AckOpt) error {
return m.ackReply(ackProgress, false, opts...)
}
// MsgMetadata is the JetStream metadata associated with received messages.
type MsgMetadata struct {
Sequence SequencePair
NumDelivered uint64
NumPending uint64
Timestamp time.Time
Stream string
Consumer string
Domain string
}
// Metadata retrieves the metadata from a JetStream message. This method will
// return an error for non-JetStream Msgs.
func (m *Msg) Metadata() (*MsgMetadata, error) {
if err := m.checkReply(); err != nil {
return nil, err
}
tokens, err := parser.GetMetadataFields(m.Reply)
if err != nil {
return nil, err
}
meta := &MsgMetadata{
Domain: tokens[parser.AckDomainTokenPos],
NumDelivered: parser.ParseNum(tokens[parser.AckNumDeliveredTokenPos]),
NumPending: parser.ParseNum(tokens[parser.AckNumPendingTokenPos]),
Timestamp: time.Unix(0, int64(parser.ParseNum(tokens[parser.AckTimestampSeqTokenPos]))),
Stream: tokens[parser.AckStreamTokenPos],
Consumer: tokens[parser.AckConsumerTokenPos],
}
meta.Sequence.Stream = parser.ParseNum(tokens[parser.AckStreamSeqTokenPos])
meta.Sequence.Consumer = parser.ParseNum(tokens[parser.AckConsumerSeqTokenPos])
return meta, nil
}
// AckPolicy determines how the consumer should acknowledge delivered messages.
type AckPolicy int
const (
// AckNonePolicy requires no acks for delivered messages.
AckNonePolicy AckPolicy = iota
// AckAllPolicy when acking a sequence number, this implicitly acks all
// sequences below this one as well.
AckAllPolicy
// AckExplicitPolicy requires ack or nack for all messages.
AckExplicitPolicy
// For configuration mismatch check
ackPolicyNotSet = 99
)
func jsonString(s string) string {
return "\"" + s + "\""
}
func (p *AckPolicy) UnmarshalJSON(data []byte) error {
switch string(data) {
case jsonString("none"):
*p = AckNonePolicy
case jsonString("all"):
*p = AckAllPolicy
case jsonString("explicit"):
*p = AckExplicitPolicy
default:
return fmt.Errorf("nats: can not unmarshal %q", data)
}
return nil
}
func (p AckPolicy) MarshalJSON() ([]byte, error) {
switch p {
case AckNonePolicy:
return json.Marshal("none")
case AckAllPolicy:
return json.Marshal("all")
case AckExplicitPolicy:
return json.Marshal("explicit")
default:
return nil, fmt.Errorf("nats: unknown acknowledgement policy %v", p)
}
}
func (p AckPolicy) String() string {
switch p {
case AckNonePolicy:
return "AckNone"
case AckAllPolicy:
return "AckAll"
case AckExplicitPolicy:
return "AckExplicit"
case ackPolicyNotSet:
return "Not Initialized"
default:
return "Unknown AckPolicy"
}
}
// ReplayPolicy determines how the consumer should replay messages it already has queued in the stream.
type ReplayPolicy int
const (
// ReplayInstantPolicy will replay messages as fast as possible.
ReplayInstantPolicy ReplayPolicy = iota
// ReplayOriginalPolicy will maintain the same timing as the messages were received.
ReplayOriginalPolicy
// For configuration mismatch check
replayPolicyNotSet = 99
)
func (p *ReplayPolicy) UnmarshalJSON(data []byte) error {
switch string(data) {
case jsonString("instant"):
*p = ReplayInstantPolicy
case jsonString("original"):
*p = ReplayOriginalPolicy
default:
return fmt.Errorf("nats: can not unmarshal %q", data)
}
return nil
}
func (p ReplayPolicy) MarshalJSON() ([]byte, error) {
switch p {
case ReplayOriginalPolicy:
return json.Marshal("original")
case ReplayInstantPolicy:
return json.Marshal("instant")
default:
return nil, fmt.Errorf("nats: unknown replay policy %v", p)
}
}
var (
ackAck = []byte("+ACK")
ackNak = []byte("-NAK")
ackProgress = []byte("+WPI")
ackTerm = []byte("+TERM")
)
// DeliverPolicy determines how the consumer should select the first message to deliver.
type DeliverPolicy int
const (
// DeliverAllPolicy starts delivering messages from the very beginning of a
// stream. This is the default.
DeliverAllPolicy DeliverPolicy = iota
// DeliverLastPolicy will start the consumer with the last sequence
// received.
DeliverLastPolicy
// DeliverNewPolicy will only deliver new messages that are sent after the
// consumer is created.
DeliverNewPolicy
// DeliverByStartSequencePolicy will deliver messages starting from a given
// sequence.
DeliverByStartSequencePolicy
// DeliverByStartTimePolicy will deliver messages starting from a given
// time.
DeliverByStartTimePolicy
// DeliverLastPerSubjectPolicy will start the consumer with the last message
// for all subjects received.
DeliverLastPerSubjectPolicy
// For configuration mismatch check
deliverPolicyNotSet = 99
)
func (p *DeliverPolicy) UnmarshalJSON(data []byte) error {
switch string(data) {
case jsonString("all"), jsonString("undefined"):
*p = DeliverAllPolicy
case jsonString("last"):
*p = DeliverLastPolicy
case jsonString("new"):
*p = DeliverNewPolicy
case jsonString("by_start_sequence"):
*p = DeliverByStartSequencePolicy
case jsonString("by_start_time"):
*p = DeliverByStartTimePolicy
case jsonString("last_per_subject"):
*p = DeliverLastPerSubjectPolicy
}
return nil
}
func (p DeliverPolicy) MarshalJSON() ([]byte, error) {
switch p {
case DeliverAllPolicy:
return json.Marshal("all")
case DeliverLastPolicy:
return json.Marshal("last")
case DeliverNewPolicy:
return json.Marshal("new")
case DeliverByStartSequencePolicy:
return json.Marshal("by_start_sequence")
case DeliverByStartTimePolicy:
return json.Marshal("by_start_time")
case DeliverLastPerSubjectPolicy:
return json.Marshal("last_per_subject")
default:
return nil, fmt.Errorf("nats: unknown deliver policy %v", p)
}
}
// RetentionPolicy determines how messages in a set are retained.
type RetentionPolicy int
const (
// LimitsPolicy (default) means that messages are retained until any given limit is reached.
// This could be one of MaxMsgs, MaxBytes, or MaxAge.
LimitsPolicy RetentionPolicy = iota
// InterestPolicy specifies that when all known observables have acknowledged a message it can be removed.
InterestPolicy
// WorkQueuePolicy specifies that when the first worker or subscriber acknowledges the message it can be removed.
WorkQueuePolicy
)
// DiscardPolicy determines how to proceed when limits of messages or bytes are
// reached.
type DiscardPolicy int
const (
// DiscardOld will remove older messages to return to the limits. This is
// the default.
DiscardOld DiscardPolicy = iota
//DiscardNew will fail to store new messages.
DiscardNew
)
const (
limitsPolicyString = "limits"
interestPolicyString = "interest"
workQueuePolicyString = "workqueue"
)
func (rp RetentionPolicy) String() string {
switch rp {
case LimitsPolicy:
return "Limits"
case InterestPolicy:
return "Interest"
case WorkQueuePolicy:
return "WorkQueue"
default:
return "Unknown Retention Policy"
}
}
func (rp RetentionPolicy) MarshalJSON() ([]byte, error) {
switch rp {
case LimitsPolicy:
return json.Marshal(limitsPolicyString)
case InterestPolicy:
return json.Marshal(interestPolicyString)
case WorkQueuePolicy:
return json.Marshal(workQueuePolicyString)
default:
return nil, fmt.Errorf("nats: can not marshal %v", rp)
}
}
func (rp *RetentionPolicy) UnmarshalJSON(data []byte) error {
switch string(data) {
case jsonString(limitsPolicyString):
*rp = LimitsPolicy
case jsonString(interestPolicyString):
*rp = InterestPolicy
case jsonString(workQueuePolicyString):
*rp = WorkQueuePolicy
default:
return fmt.Errorf("nats: can not unmarshal %q", data)
}
return nil
}
func (dp DiscardPolicy) String() string {
switch dp {
case DiscardOld:
return "DiscardOld"
case DiscardNew:
return "DiscardNew"
default:
return "Unknown Discard Policy"
}
}
func (dp DiscardPolicy) MarshalJSON() ([]byte, error) {
switch dp {
case DiscardOld:
return json.Marshal("old")
case DiscardNew:
return json.Marshal("new")
default:
return nil, fmt.Errorf("nats: can not marshal %v", dp)
}
}
func (dp *DiscardPolicy) UnmarshalJSON(data []byte) error {
switch strings.ToLower(string(data)) {
case jsonString("old"):
*dp = DiscardOld
case jsonString("new"):
*dp = DiscardNew
default:
return fmt.Errorf("nats: can not unmarshal %q", data)
}
return nil
}
// StorageType determines how messages are stored for retention.
type StorageType int
const (
// FileStorage specifies on disk storage. It's the default.
FileStorage StorageType = iota
// MemoryStorage specifies in memory only.
MemoryStorage
)
const (
memoryStorageString = "memory"
fileStorageString = "file"
)
func (st StorageType) String() string {
switch st {
case MemoryStorage:
return "Memory"
case FileStorage:
return "File"
default:
return "Unknown Storage Type"
}
}
func (st StorageType) MarshalJSON() ([]byte, error) {
switch st {
case MemoryStorage:
return json.Marshal(memoryStorageString)
case FileStorage:
return json.Marshal(fileStorageString)
default:
return nil, fmt.Errorf("nats: can not marshal %v", st)
}
}
func (st *StorageType) UnmarshalJSON(data []byte) error {
switch string(data) {
case jsonString(memoryStorageString):
*st = MemoryStorage
case jsonString(fileStorageString):
*st = FileStorage
default:
return fmt.Errorf("nats: can not unmarshal %q", data)
}
return nil
}
type StoreCompression uint8
const (
NoCompression StoreCompression = iota
S2Compression
)
func (alg StoreCompression) String() string {
switch alg {
case NoCompression:
return "None"
case S2Compression:
return "S2"
default:
return "Unknown StoreCompression"
}
}
func (alg StoreCompression) MarshalJSON() ([]byte, error) {
var str string
switch alg {
case S2Compression:
str = "s2"
case NoCompression:
str = "none"
default:
return nil, fmt.Errorf("unknown compression algorithm")
}
return json.Marshal(str)
}
func (alg *StoreCompression) UnmarshalJSON(b []byte) error {
var str string
if err := json.Unmarshal(b, &str); err != nil {
return err
}
switch str {
case "s2":
*alg = S2Compression
case "none":
*alg = NoCompression
default:
return fmt.Errorf("unknown compression algorithm")
}
return nil
}
// Length of our hash used for named consumers.
const nameHashLen = 8
// Computes a hash for the given `name`.
func getHash(name string) string {
sha := sha256.New()
sha.Write([]byte(name))
b := sha.Sum(nil)
for i := 0; i < nameHashLen; i++ {
b[i] = rdigits[int(b[i]%base)]
}
return string(b[:nameHashLen])
}