@@ -88,7 +88,7 @@ func (c *plainConnection) CreateSubscription(ctx context.Context, opts *Subscrip
8888 }
8989
9090 return & natsConsumer {consumer : subsc , isQueueGroup : true ,
91- batchFetchTimeout : opts .ConsumerConfig . MaxRequestExpires ,
91+ batchFetchTimeout : opts .ReceiveWaitTimeOut ,
9292 useV1Decoding : useV1Decoding }, nil
9393 }
9494
@@ -100,7 +100,7 @@ func (c *plainConnection) CreateSubscription(ctx context.Context, opts *Subscrip
100100 }
101101
102102 return & natsConsumer {consumer : subsc , isQueueGroup : false ,
103- batchFetchTimeout : opts .ConsumerConfig . MaxRequestExpires ,
103+ batchFetchTimeout : opts .ReceiveWaitTimeOut ,
104104 useV1Decoding : useV1Decoding , connector : connector }, nil
105105
106106}
@@ -195,17 +195,20 @@ func (q *natsConsumer) ReceiveMessages(ctx context.Context, batchCount int) ([]*
195195 batchCount = 1
196196 }
197197
198- // Use the context's deadline if available, otherwise fall back to the configured timeout
198+ // Use the context's deadline if available, otherwise fall back to the configured timeout.
199+ // A fetchTimeout of 0 means "wait indefinitely until the context ends".
199200 fetchTimeout := q .batchFetchTimeout
200- if deadline , ok := ctx .Deadline (); ok {
201- // Use the remaining time from the context, but don't exceed our configured timeout
202- remainingTime := time .Until (deadline )
203- if remainingTime < fetchTimeout {
204- fetchTimeout = remainingTime
205- }
206- // Ensure we have at least a minimal timeout to prevent spinning
207- if fetchTimeout <= 0 {
208- fetchTimeout = time .Millisecond
201+ if fetchTimeout > 0 {
202+ if deadline , ok := ctx .Deadline (); ok {
203+ // Use the remaining time from the context, but don't exceed our configured timeout
204+ remainingTime := time .Until (deadline )
205+ if remainingTime < fetchTimeout {
206+ fetchTimeout = remainingTime
207+ }
208+ // Ensure we have at least a minimal timeout to prevent spinning
209+ if fetchTimeout <= 0 {
210+ fetchTimeout = time .Millisecond
211+ }
209212 }
210213 }
211214
@@ -217,20 +220,32 @@ func (q *natsConsumer) ReceiveMessages(ctx context.Context, batchCount int) ([]*
217220 return messages , errorutil .Wrap (err , "context canceled while receiving messages" )
218221 }
219222
220- // Calculate timeout for this fetch attempt (use shorter timeouts for subsequent messages)
221- attemptTimeout := fetchTimeout
222- if i > 0 {
223- // Use progressively shorter timeouts for subsequent messages
224- // This enables quick return when no more messages are available
225- attemptTimeout = fetchTimeout / time .Duration (i * 2 + 1 )
223+ var (
224+ msg * nats.Msg
225+ err error
226+ )
227+
228+ if fetchTimeout == 0 && i == 0 {
229+ // Block until a message arrives or the context ends.
230+ msg , err = q .consumer .NextMsgWithContext (ctx )
231+ } else {
232+ // Calculate timeout for this fetch attempt (use shorter timeouts for subsequent messages)
233+ attemptTimeout := fetchTimeout
234+ if fetchTimeout == 0 {
235+ attemptTimeout = time .Millisecond
236+ } else if i > 0 {
237+ // Use progressively shorter timeouts for subsequent messages
238+ // This enables quick return when no more messages are available
239+ attemptTimeout = fetchTimeout / time .Duration (i * 2 + 1 )
240+ }
226241 if attemptTimeout < time .Millisecond {
227242 attemptTimeout = time .Millisecond
228243 }
229- }
230244
231- msg , err := q .consumer .NextMsg (attemptTimeout )
245+ msg , err = q .consumer .NextMsg (attemptTimeout )
246+ }
232247 if err != nil {
233- // Not an error if we timeout or context is cancelled
248+ // Not an error if we timeout
234249 if errors .Is (err , nats .ErrTimeout ) || errors .Is (err , context .DeadlineExceeded ) {
235250 // Just return what we have so far
236251 return messages , nil
0 commit comments