ԆEWzddlmZddlZddlmZmZmZmZmZm Z ddl m Z ddl m Z ddlmZerddlmZdZd ZGd d ZGd d ZdS)) annotationsN) TYPE_CHECKING AsyncIterator AwaitableCallableListOptional)uuid4)errors)Msg)JetStreamContextiiceZdZdZddddddeefd'dZed(dZed(dZ ed)dZ ed*dZ ed*dZ ed*dZ d+d,dZd Zd!Zd-d"Zd.d/d$Zd-d%Zd-d&ZdS)0 Subscriptiona A Subscription represents interest in a particular subject. A Subscription should not be constructed directly, rather `connection.subscribe()` should be used to get a subscription. :: nc = await nats.connect() # Async Subscription async def cb(msg): print('Received', msg) await nc.subscribe('foo', cb=cb) # Sync Subscription sub = nc.subscribe('foo') msg = await sub.next_msg() print('Received', msg) rNidintsubjectstrqueuecb*Optional[Callable[[Msg], Awaitable[None]]]futureOptional[asyncio.Future]max_msgspending_msgs_limitpending_bytes_limitreturnNonec .||_||_||_||_||_d|_||_||_d|_||_ | |_ tj ||_ |i|_nd|_d|_d|_d|_d|_dS)NrF)maxsize)_conn_id_subject_queue _max_msgs _received_cb_future_closed_pending_msgs_limit_pending_bytes_limitasyncioQueue_pending_queue_pending_next_msgs_calls _pending_size_wait_for_msgs_task_message_iterator_jsi) selfconnrrrrrrrrs u/builddir/build/BUILD/imunify360-venv-2.6.2/opt/imunify360/venv/lib/python3.11/site-packages/nats/aio/subscription.py__init__zSubscription.__init__?s   !  $6 $7!29-HZ2[2[2[ :,.D ) ),0D )#' !%6: c|jS)z< Returns the subject of the `Subscription`. )r#r4s r6rzSubscription.subjectfs }r8c|jS)zX Returns the queue name of the `Subscription` if part of a queue group. )r$r:s r6rzSubscription.queuems {r8AsyncIterator[Msg]cF|jstjd|jS)a Retrieves an async iterator for the messages from the subscription. This is only available if a callback isn't provided when creating a subscription. :: nc = await nats.connect() sub = await nc.subscribe('foo') # Use `async for` which implicitly awaits messages async for msg in sub.messages: print('Received', msg) zCcannot iterate over messages with a non iteration subscription type)r2r Errorr:s r6messageszSubscription.messagests* % f,dee e%%r8c4|jS)zw Number of delivered messages by the NATS Server that are being buffered in the pending queue. )r.qsizer:s r6 pending_msgszSubscription.pending_msgss "((***r8c|jS)zk Size of data sent by the NATS Server that is being buffered in the pending queue. )r0r:s r6 pending_byteszSubscription.pending_bytess !!r8c|jS)zK Number of delivered messages to this subscription so far. )r&r:s r6 deliveredzSubscription.delivereds ~r8?timeoutOptional[float]r cKdfd }jjr tjjrtjdt t} tj |}|j |<|d{V}xj t|j zc_ j|j |dS#tj$r%jjr tjtjtj$rjjr tjwxYw#j |dwxYw)a :params timeout: Time in seconds to wait for next message before timing out. :raises nats.errors.TimeoutError: next_msg can be used to retrieve the next message from a stream of messages using await syntax, this only works when not passing a callback on `subscribe`:: sub = await nc.subscribe('hello') msg = await sub.next_msg(timeout=1) rr clKtjjd{VSN)r,wait_forr.get)r4rHsr6 timed_getz(Subscription.next_msg..timed_gets; )$*=*A*A*C*CWMMMMMMMM Mr8z4nats: next_msg cannot be used in async subscriptionsNrr )r! is_closedr ConnectionClosedErrorr'r>rr r, create_taskr/r0lendatar. task_donepop TimeoutErrorCancelledError)r4rHrO task_namermsgs`` r6next_msgzSubscription.next_msgs N N N N N N N :  /. . 8 W,UVV VLL  ?(55F7=D )) 4,,,,,,C   #ch-- /     ) ) + + +  ) - -i > > > >!# & & &z# 322% %%   z# 322    ) - -i > > > >s.C! get_running_looprS_wait_for_msgsr1r(_SubscriptionMessageIteratorr2)r4error_cbs r6_startzSubscription._starts 8 H.tx88 Q&)) Q.5.I$(-.X.X Ql#OPPP'.'?'A'A'M'MdNaNabjNkNk'l'lD $ $ $ \ H D%A$%G%GD " " "r8cK|jjr tj|jjr tj|jr tj|d{VdS)zU Removes interest in a subject, but will process remaining messages. N) r!rQr rR is_drainingConnectionDrainingErrorr)BadSubscriptionError_drainr:s r6drainzSubscription.drainsm :  /. . : ! 10 0 < .- -kkmmr8cK |j|jd{V|jd{V|jr|jd{V||j|jn#tj $rwxYw d|_ dS#d|_ wxYwNT) r!_send_unsubscriber"flushr.join_stop_processing _remove_subr,rYr)r:s r6rjzSubscription._drains  *..tx88 8 8 8 8 8 8 8*""$$ $ $ $ $ $ $ $" 1)..000000000  ! ! # # # J " "48 , , , ,%      - DLLL4DL    sBB"!C"B33C C limitcK|jjr tj|jjr tj|jr tj||_|dks$|j |krS|j r:d|_| |j |j|jjs)|j|j|d{VdSdS)aX :param limit: Max number of messages to receive before unsubscribing. Removes interest in a subject, remaining messages will be discarded. If `limit` is greater than zero, interest is not immediately removed, rather, interest will be automatically removed after `limit` messages are received. rT)rsN)r!rQr rRrgrhr)rir%r&r.emptyrqrrr"is_reconnectingrn)r4rss r6 unsubscribezSubscription.unsubscribes :  /. . : ! 10 0 < .- - A::$.E11d6I6O6O6Q6Q1DL  ! ! # # # J " "48 , , ,z) F*..txu.EE E E E E E E E E E F Fr8c|jr2|js|j|jr|jdSdS)zF Stops the subscription from processing new messages. N)r1donecancelr2_cancelr:s r6rqzSubscription._stop_processingsj  # .D,D,I,I,K,K .  $ + + - - -  ! -  " * * , , , , , - -r8cK|js Jd |jd{V}|xjt |jzc_ ||d{VnT#t j$rY|jdSt$r}|r||d{VYd}~nd}~wwxYw|jn#|jwxYw|j dkr0|j |j kr |jj r| n#t j$rYdSwxYw?)zz A coroutine to read and process messages if a callback is provided. Should be called as a task. z-_wait_for_msgs can be called only from _startTNr)r'r.rNr0rTrUr,rYrV Exceptionr%r&rurq)r4rdr[es r6rbzSubscription._wait_for_msgs's xHHHHHx   /3355555555""c#(mm3""4((3--''''''''-'1133333!*** *&hqkk))))))) *'113333D'113333>A%%$.DN*J*JtObOh*J))+++)    1 s`AD<A43C"4CC"D< C(C;C"CC"D<"C==>D<<EE)rrrrrrrrrrrrrrrrrr)rr)rr<)rr)rG)rHrIrr rr)r)rsr)__name__ __module__ __qualname____doc__DEFAULT_SUB_PENDING_MSGS_LIMITDEFAULT_SUB_PENDING_BYTES_LIMITr7propertyrrr?rBrDrFr\rerkrjrwrqrbr8r6rr(s29=+/"@#B%:%:%:%:%:NX X &&&X&(+++X+"""X"X +?+?+?+?+?ZHHH$       2FFFFF4----      r8rc.eZdZd dZd dZd dZdd Zd S)rcsubrrrc\||_|j|_tj|_dSrL)_subr.r$r,Future_unsubscribed_future)r4rs r6r7z%_SubscriptionMessageIterator.__init__Ks)"% *-*< :A.:J:J!!!r8cp|js|jddSdSrm)rry set_resultr:s r6r{z$_SubscriptionMessageIterator._cancelPs@(--// 7  % 0 0 6 6 6 6 6 7 7r8c|SrLrr:s r6 __aiter__z&_SubscriptionMessageIterator.__aiter__Ts r8r clKtj|j}||jg}tj|tjd{V\}}|j}||vr|j | }|jxj t|j zc_ |jdkr$|j|jkr||S|jr|t&)N) return_whenr)r,rarSr$rNrwaitFIRST_COMPLETEDrrVresultr0rTrUr%r&r{ryrzStopAsyncIteration)r4get_tasktasksfinished_rr[s r6 __anext__z&_SubscriptionMessageIterator.__anext__Ws+--99$+//:K:KLL'/1J&K#LGrs5#""""")((((((!+"3________D !!!!!!!!!!r8