2525from nats .aio .msg import Msg
2626from nats .aio .subscription import Subscription
2727from nats .js import api
28- from nats .js .errors import BadBucketError , BucketNotFoundError , InvalidBucketNameError , NotFoundError
28+ from nats .js .errors import BadBucketError , BucketNotFoundError , InvalidBucketNameError , NotFoundError , FetchTimeoutError
2929from nats .js .kv import KeyValue
3030from nats .js .manager import JetStreamManager
3131from nats .js .object_store import (
@@ -547,6 +547,13 @@ def _is_temporary_error(cls, status: Optional[str]) -> bool:
547547 else :
548548 return False
549549
550+ @classmethod
551+ def _is_heartbeat (cls , status : Optional [str ]) -> bool :
552+ if status == api .StatusCode .CONTROL_MESSAGE :
553+ return True
554+ else :
555+ return False
556+
550557 @classmethod
551558 def _time_until (cls , timeout : Optional [float ],
552559 start_time : float ) -> Optional [float ]:
@@ -620,9 +627,7 @@ async def activity_check(self):
620627 self ._active = False
621628 if not active :
622629 if self ._ordered :
623- await self .reset_ordered_consumer (
624- self ._sseq + 1
625- )
630+ await self .reset_ordered_consumer (self ._sseq + 1 )
626631 except asyncio .CancelledError :
627632 break
628633
@@ -882,14 +887,18 @@ async def consumer_info(self) -> api.ConsumerInfo:
882887 )
883888 return info
884889
885- async def fetch (self ,
886- batch : int = 1 ,
887- timeout : Optional [float ] = 5 ) -> List [Msg ]:
890+ async def fetch (
891+ self ,
892+ batch : int = 1 ,
893+ timeout : Optional [float ] = 5 ,
894+ heartbeat : Optional [float ] = None
895+ ) -> List [Msg ]:
888896 """
889897 fetch makes a request to JetStream to be delivered a set of messages.
890898
891899 :param batch: Number of messages to fetch from server.
892900 :param timeout: Max duration of the fetch request before it expires.
901+ :param heartbeat: Idle Heartbeat interval in seconds for the fetch request.
893902
894903 ::
895904
@@ -925,15 +934,16 @@ async def main():
925934 timeout * 1_000_000_000
926935 ) - 100_000 if timeout else None
927936 if batch == 1 :
928- msg = await self ._fetch_one (expires , timeout )
937+ msg = await self ._fetch_one (expires , timeout , heartbeat )
929938 return [msg ]
930- msgs = await self ._fetch_n (batch , expires , timeout )
939+ msgs = await self ._fetch_n (batch , expires , timeout , heartbeat )
931940 return msgs
932941
933942 async def _fetch_one (
934943 self ,
935944 expires : Optional [int ],
936945 timeout : Optional [float ],
946+ heartbeat : Optional [float ] = None
937947 ) -> Msg :
938948 queue = self ._sub ._pending_queue
939949
@@ -957,37 +967,66 @@ async def _fetch_one(
957967 next_req ['batch' ] = 1
958968 if expires :
959969 next_req ['expires' ] = int (expires )
970+ if heartbeat :
971+ next_req ['idle_heartbeat' ] = int (
972+ heartbeat * 1_000_000_000
973+ ) # to nanoseconds
960974
961975 await self ._nc .publish (
962976 self ._nms ,
963977 json .dumps (next_req ).encode (),
964978 self ._deliver ,
965979 )
966980
967- # Wait for the response or raise timeout.
968- msg = await self ._sub .next_msg (timeout )
969-
970- # Should have received at least a processable message at this point,
971- status = JetStreamContext .is_status_msg (msg )
981+ start_time = time .monotonic ()
982+ got_any_response = False
983+ while True :
984+ try :
985+ deadline = JetStreamContext ._time_until (
986+ timeout , start_time
987+ )
988+ # Wait for the response or raise timeout.
989+ msg = await self ._sub .next_msg (timeout = deadline )
972990
973- if status :
974- # In case of a temporary error, treat it as a timeout to retry.
975- if JetStreamContext ._is_temporary_error (status ):
976- raise nats .errors .TimeoutError
977- else :
978- # Any other type of status message is an error.
979- raise nats .js .errors .APIError .from_msg (msg )
980- return msg
991+ # Should have received at least a processable message at this point,
992+ status = JetStreamContext .is_status_msg (msg )
993+ if status :
994+ if JetStreamContext ._is_heartbeat (status ):
995+ got_any_response = True
996+ continue
997+
998+ # In case of a temporary error, treat it as a timeout to retry.
999+ if JetStreamContext ._is_temporary_error (status ):
1000+ raise nats .errors .TimeoutError
1001+ else :
1002+ # Any other type of status message is an error.
1003+ raise nats .js .errors .APIError .from_msg (msg )
1004+ else :
1005+ return msg
1006+ except asyncio .TimeoutError :
1007+ deadline = JetStreamContext ._time_until (
1008+ timeout , start_time
1009+ )
1010+ if deadline is not None and deadline < 0 :
1011+ # No response from the consumer could have been
1012+ # due to a reconnect while the fetch request,
1013+ # the JS API not responding on time, or maybe
1014+ # there were no messages yet.
1015+ if got_any_response :
1016+ raise FetchTimeoutError
1017+ raise
9811018
9821019 async def _fetch_n (
9831020 self ,
9841021 batch : int ,
9851022 expires : Optional [int ],
9861023 timeout : Optional [float ],
1024+ heartbeat : Optional [float ] = None
9871025 ) -> List [Msg ]:
9881026 msgs = []
9891027 queue = self ._sub ._pending_queue
9901028 start_time = time .monotonic ()
1029+ got_any_response = False
9911030 needed = batch
9921031
9931032 # Fetch as many as needed from the internal pending queue.
@@ -1013,6 +1052,10 @@ async def _fetch_n(
10131052 next_req ['batch' ] = needed
10141053 if expires :
10151054 next_req ['expires' ] = expires
1055+ if heartbeat :
1056+ next_req ['idle_heartbeat' ] = int (
1057+ heartbeat * 1_000_000_000
1058+ ) # to nanoseconds
10161059 next_req ['no_wait' ] = True
10171060 await self ._nc .publish (
10181061 self ._nms ,
@@ -1024,12 +1067,20 @@ async def _fetch_n(
10241067 try :
10251068 msg = await self ._sub .next_msg (timeout )
10261069 except asyncio .TimeoutError :
1070+ # Return any message that was already available in the internal queue.
10271071 if msgs :
10281072 return msgs
10291073 raise
10301074
1075+ got_any_response = False
1076+
10311077 status = JetStreamContext .is_status_msg (msg )
1032- if JetStreamContext ._is_processable_msg (status , msg ):
1078+ if JetStreamContext ._is_heartbeat (status ):
1079+ # Mark that we got any response from the server so this is not
1080+ # a possible i/o timeout error or due to a disconnection.
1081+ got_any_response = True
1082+ pass
1083+ elif JetStreamContext ._is_processable_msg (status , msg ):
10331084 # First processable message received, do not raise error from now.
10341085 msgs .append (msg )
10351086 needed -= 1
@@ -1045,6 +1096,10 @@ async def _fetch_n(
10451096 # No more messages after this so fallthrough
10461097 # after receiving the rest.
10471098 break
1099+ elif JetStreamContext ._is_heartbeat (status ):
1100+ # Skip heartbeats.
1101+ got_any_response = True
1102+ continue
10481103 elif JetStreamContext ._is_processable_msg (status , msg ):
10491104 needed -= 1
10501105 msgs .append (msg )
@@ -1063,6 +1118,11 @@ async def _fetch_n(
10631118 next_req ['batch' ] = needed
10641119 if expires :
10651120 next_req ['expires' ] = expires
1121+ if heartbeat :
1122+ next_req ['idle_heartbeat' ] = int (
1123+ heartbeat * 1_000_000_000
1124+ ) # to nanoseconds
1125+
10661126 await self ._nc .publish (
10671127 self ._nms ,
10681128 json .dumps (next_req ).encode (),
@@ -1083,7 +1143,12 @@ async def _fetch_n(
10831143 if len (msgs ) == 0 :
10841144 # Not a single processable message has been received so far,
10851145 # if this timed out then let the error be raised.
1086- msg = await self ._sub .next_msg (timeout = deadline )
1146+ try :
1147+ msg = await self ._sub .next_msg (timeout = deadline )
1148+ except asyncio .TimeoutError :
1149+ if got_any_response :
1150+ raise FetchTimeoutError
1151+ raise
10871152 else :
10881153 try :
10891154 msg = await self ._sub .next_msg (timeout = deadline )
@@ -1093,6 +1158,10 @@ async def _fetch_n(
10931158
10941159 if msg :
10951160 status = JetStreamContext .is_status_msg (msg )
1161+ if JetStreamContext ._is_heartbeat (status ):
1162+ got_any_response = True
1163+ continue
1164+
10961165 if not status :
10971166 needed -= 1
10981167 msgs .append (msg )
@@ -1116,6 +1185,9 @@ async def _fetch_n(
11161185
11171186 msg = await self ._sub .next_msg (timeout = deadline )
11181187 status = JetStreamContext .is_status_msg (msg )
1188+ if JetStreamContext ._is_heartbeat (status ):
1189+ got_any_response = True
1190+ continue
11191191 if JetStreamContext ._is_processable_msg (status , msg ):
11201192 needed -= 1
11211193 msgs .append (msg )
@@ -1124,6 +1196,9 @@ async def _fetch_n(
11241196 # at least one message has already arrived.
11251197 pass
11261198
1199+ if len (msgs ) == 0 and got_any_response :
1200+ raise FetchTimeoutError
1201+
11271202 return msgs
11281203
11291204 ######################
0 commit comments