Skip to content

Commit 3fff2bd

Browse files
authored
Fix potential busy loop in plain NATS receive (#73)
1 parent b995418 commit 3fff2bd

File tree

1 file changed

+36
-21
lines changed

1 file changed

+36
-21
lines changed

connections/plain.go

Lines changed: 36 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ func (c *plainConnection) CreateSubscription(ctx context.Context, opts *Subscrip
8888
}
8989

9090
return &natsConsumer{consumer: subsc, isQueueGroup: true,
91-
batchFetchTimeout: opts.ConsumerConfig.MaxRequestExpires,
91+
batchFetchTimeout: opts.ReceiveWaitTimeOut,
9292
useV1Decoding: useV1Decoding}, nil
9393
}
9494

@@ -100,7 +100,7 @@ func (c *plainConnection) CreateSubscription(ctx context.Context, opts *Subscrip
100100
}
101101

102102
return &natsConsumer{consumer: subsc, isQueueGroup: false,
103-
batchFetchTimeout: opts.ConsumerConfig.MaxRequestExpires,
103+
batchFetchTimeout: opts.ReceiveWaitTimeOut,
104104
useV1Decoding: useV1Decoding, connector: connector}, nil
105105

106106
}
@@ -195,17 +195,20 @@ func (q *natsConsumer) ReceiveMessages(ctx context.Context, batchCount int) ([]*
195195
batchCount = 1
196196
}
197197

198-
// Use the context's deadline if available, otherwise fall back to the configured timeout
198+
// Use the context's deadline if available, otherwise fall back to the configured timeout.
199+
// A fetchTimeout of 0 means "wait indefinitely until the context ends".
199200
fetchTimeout := q.batchFetchTimeout
200-
if deadline, ok := ctx.Deadline(); ok {
201-
// Use the remaining time from the context, but don't exceed our configured timeout
202-
remainingTime := time.Until(deadline)
203-
if remainingTime < fetchTimeout {
204-
fetchTimeout = remainingTime
205-
}
206-
// Ensure we have at least a minimal timeout to prevent spinning
207-
if fetchTimeout <= 0 {
208-
fetchTimeout = time.Millisecond
201+
if fetchTimeout > 0 {
202+
if deadline, ok := ctx.Deadline(); ok {
203+
// Use the remaining time from the context, but don't exceed our configured timeout
204+
remainingTime := time.Until(deadline)
205+
if remainingTime < fetchTimeout {
206+
fetchTimeout = remainingTime
207+
}
208+
// Ensure we have at least a minimal timeout to prevent spinning
209+
if fetchTimeout <= 0 {
210+
fetchTimeout = time.Millisecond
211+
}
209212
}
210213
}
211214

@@ -217,20 +220,32 @@ func (q *natsConsumer) ReceiveMessages(ctx context.Context, batchCount int) ([]*
217220
return messages, errorutil.Wrap(err, "context canceled while receiving messages")
218221
}
219222

220-
// Calculate timeout for this fetch attempt (use shorter timeouts for subsequent messages)
221-
attemptTimeout := fetchTimeout
222-
if i > 0 {
223-
// Use progressively shorter timeouts for subsequent messages
224-
// This enables quick return when no more messages are available
225-
attemptTimeout = fetchTimeout / time.Duration(i*2+1)
223+
var (
224+
msg *nats.Msg
225+
err error
226+
)
227+
228+
if fetchTimeout == 0 && i == 0 {
229+
// Block until a message arrives or the context ends.
230+
msg, err = q.consumer.NextMsgWithContext(ctx)
231+
} else {
232+
// Calculate timeout for this fetch attempt (use shorter timeouts for subsequent messages)
233+
attemptTimeout := fetchTimeout
234+
if fetchTimeout == 0 {
235+
attemptTimeout = time.Millisecond
236+
} else if i > 0 {
237+
// Use progressively shorter timeouts for subsequent messages
238+
// This enables quick return when no more messages are available
239+
attemptTimeout = fetchTimeout / time.Duration(i*2+1)
240+
}
226241
if attemptTimeout < time.Millisecond {
227242
attemptTimeout = time.Millisecond
228243
}
229-
}
230244

231-
msg, err := q.consumer.NextMsg(attemptTimeout)
245+
msg, err = q.consumer.NextMsg(attemptTimeout)
246+
}
232247
if err != nil {
233-
// Not an error if we timeout or context is cancelled
248+
// Not an error if we timeout
234249
if errors.Is(err, nats.ErrTimeout) || errors.Is(err, context.DeadlineExceeded) {
235250
// Just return what we have so far
236251
return messages, nil

0 commit comments

Comments
 (0)