L #ddlmZddlZddlZddlZddlmZddlmZddl m Z m Z m Z m Z mZmZmZddlZddlZddlmZddlmZddlmZdd lmZmZmZmZmZdd lm Z dd l!m"Z"dd l#m$Z$m%Z%m&Z&m'Z'm(Z(e rdd lm)Z)dZ*e+dZ,e-e,Z.dZ/e-e/Z0dZ1dZ2e dge dfZ3dZ4dZ5dZ6Gdde"Z7dS)) annotationsN) BytesParser) token_hex) TYPE_CHECKINGAny AwaitableCallableDictListOptional)Msg) Subscription)api)BadBucketErrorBucketNotFoundErrorFetchTimeoutErrorInvalidBucketNameError NotFoundError)KeyValue)JetStreamManager)OBJ_ALL_CHUNKS_PRE_TEMPLATEOBJ_ALL_META_PRE_TEMPLATEOBJ_STREAM_TEMPLATEVALID_BUCKET_RE ObjectStore)NATS503sNATS/1.0s z KV_{bucket}z $KV.{bucket}.r ii@ceZdZdZejdddfdbdZedcdZdddZ dedZ dfdgd"Z dfdhd&Z did'Z ddd(Zdddddd)d)dd)eedddfdjd=Zdd)d)eefdkd@ZedldCZdddeedfdmdGZdddeeddfdndIZedodKZedpdMZedqdNZedqdOZedrdQZGdRdSZGdTd>>  ''---"6qqq9""4(((h  !4!;!;!=!=$BZ [[[[[[[[[[[r=msgr cK|jt|jjdzdzd}|j|}|sdS|rdS|jr]|jtj j tkr+| tjjjdS t#j|j}d|vrFtjjj|d}| |dStj|}||dS#t2jt2jf$rYdSwxYw)Nerror)subjectlenr/rEr3getdoneheadersrHeaderSTATUSNO_RESPONDERS_STATUS set_exceptionnatsjserrorsNoStreamResponseErrorjsonloadsdataAPIError from_errorPubAck from_response set_resultr4CancelledErrorInvalidStateError)r:rNtokenfutureresperracks r;rKz$JetStreamContext._handle_async_replysc C 677".handle_done sj  ' + +ELLNND A A A4.//144377999  1 9 9 ; ; ; ; ;r=)replyrW) r2rMrrXrwr$r*rxr4wait_forr9acquire TimeoutErrorrhr\r]r^TooManyStalledMsgsErrorr/rGrHrFrencodeFutureadd_done_callbackr3rJr6is_setclearr|) r:rSror}rrrWrtr{inboxrkrrjs ` @r; publish_asynczJetStreamContext.publish_asyncs((' +((** * * * * * * *''''  )C.4C * +  )C&)#g,,&7&7C " # 9"4#H#P#P#R#R\fggg g g g g g g g g$g&<= 9 9 9'.8 8 9 ##%% Yq\\((**+++(+ U!(!1!1 < < < < < <   ---6<#ELLNN3  . 5 5 7 7 8  / 5 5 7 7 7hwu||~~sSSSSSSSSS s 3B442C&c*t|jS)z@ returns the number of pending async publishes. )rTr3r?s r;publish_async_pendingz&JetStreamContext.publish_async_pendings4.///r=cHK|jd{VdS)zH waits for all pending async publishes to be completed. N)r6waitr?s r;publish_async_completedz(JetStreamContext.publish_async_completed%s516688888888888r=FqueuerDOptional[Callback]durableconfigOptional[api.ConsumerConfig] manual_ackboolordered_consumeridle_heartbeat flow_controlpending_msgs_limitpending_bytes_limitdeliver_policyOptional[api.DeliverPolicy] headers_onlyOptional[bool]inactive_thresholdPushSubscriptionc K| |j|d{V}d}d}|r5|r1||kr+tjjd|d|d|}d}| }|rF |j||d{V}|}n!#tjjj$rd}YnwxYw||j}|jj }|sS|r$tjjd|j r$tjjdni|s'tjjd|||kr*tjjd |d |n|r |tj }|j s||_ |j s||_ |js||_| r| |_|r||_|j |j}||_|js||_| |_| r| |_n |jpd } |r@d|_tjj|_d |_d |_| |_d |_d|_|j||d{V}|j }|tCd|tCd|"||||||| | d{VS)aCreate consumer if needed and push-subscribe to it. 1. Check if consumer exists. 2. Creates consumer if needed. 3. Calls `subscribe_bind`. :param subject: Subject from a stream from JetStream. :param queue: Deliver group name from a set a of queue subscribers. :param durable: Name of the durable consumer to which the the subscription should be bound. :param stream: Name of the stream to which the subscription should be bound. If not set, then the client will automatically look it up based on the subject. :param manual_ack: Disables auto acking for async subscriptions. :param ordered_consumer: Enable ordered consumer mode. :param idle_heartbeat: Enable Heartbeats for a consumer to detect failures. :param flow_control: Enable Flow Control for a consumer. :: import asyncio import nats async def main(): nc = await nats.connect() js = nc.jetstream() await js.add_stream(name='hello', subjects=['hello']) await js.publish('hello', b'Hello JS!') async def cb(msg): print('Received:', msg) # Ephemeral Async Subscribe await js.subscribe('hello', cb=cb) # Durable Async Subscribe # NOTE: Only one subscription can be bound to a durable name. It also auto acks by default. await js.subscribe('hello', cb=cb, durable='foo') # Durable Sync Subscribe # NOTE: Sync subscribers do not auto ack. await js.subscribe('hello', durable='bar') # Queue Async Subscribe # NOTE: Here 'workers' becomes deliver_group, durable name and queue name. await js.subscribe('hello', 'workers', cb=cb) if __name__ == '__main__': asyncio.run(main()) Nz"cannot create queue subscription 'z' to consumer ''TzIcannot create a queue subscription for a consumer without a deliver groupz+consumer is already bound to a subscriptionzAcannot create a subscription for a consumer with a deliver group z#cannot create a queue subscription z% for a consumer with a deliver group r!i`5rzcannot detect consumerz0config is required for existing durable consumer)rDrrrrrconsumerrr)#r@find_stream_name_by_subjectr\r]r^Error consumer_inforr deliver_group push_boundrConsumerConfig durable_namerrrdeliver_subjectr/ new_inboxfilter_subjectsfilter_subjectrr AckPolicyNONE ack_policy max_deliverack_wait num_replicas mem_storage add_consumername TypeErrorsubscribe_bind)r:rSrrDrrrrrrrrrrrrrdeliverrr should_creaters r;rIzJetStreamContext.subscribe+sH >9@@IIIIIIIIF   7e++gn**+pPU+p+pfm+p+p+pqqq #  % %&*i&=&=fg&N&N N N N N N N "7>/ % % % $  %  $")F*0>M   ^'...c#-^'.../\]]]^ '...k\ikkm++'...@e@@0=@@, , *~+--& .&-#' -',$& 3&2# 7(6%! ?,>)%-(,,..)0&) 0(/%#/F  <(6%%!'!6!;!  *&*#$'M$6!%&""+(6%&'#%)""&)"8"8"8"O"OOOOOOOM$)H  455 5 >NOO O((!-1 3)         s(#B B*)B*api.ConsumerConfigrc pK|r/|s-|jtjjur||}|jt d|j|j|j pd|||d{V} t || ||} t ||j||| | || _ |jr5tj| j | j _|r5tj| j | j _| S)z'Push-subscribe to an existing consumer.Nz"config.deliver_subject is required)rSrrDrr)r]r"rrorderedpsubsubccreq)rrrr_auto_ack_callbackrrr/rIrr r_JSI_jsirr4 create_taskactivity_check_hbtaskcheck_flow_control_response_fctask) r:rrrrrDrrrrrrs r;rzJetStreamContext.subscribe_bindsZ"  -z -(9AS(S(S((,,B  ! )@AA AH&&*&,"1 3 '         00sFHMM$(($)     N&2383J3J3L3LMMCH   [&2383W3W3Y3YZZCH  r=callbackCallbackcdfd }|S)NrNr r+r,cK|d{V |d{VdS#tjj$rYdSwxYwN)rnr\r^MsgAlreadyAckdError)rNrs r; new_callbackz9JetStreamContext._auto_ack_callback..new_callbacksp(3--        ggii;2    s2A  A rNr r+r,)rrs` r;rz#JetStreamContext._auto_ack_callbacks)      r= inbox_prefixOptional[bytes]!JetStreamContext.PullSubscriptionc>K| |j|d{V}d} |r#|j||d{Vd}n#tjjj$rYnwxYw|} |r|tj}|j s||_ |r||_ ||_ n7|j j} | |_ |j||d{V||||||| d{VS)a*Create consumer and pull subscription. 1. Find stream name by subject if `stream` is not passed. 2. Create consumer with the given `config` if not created. 3. Call `pull_subscribe_bind`. :: import asyncio import nats async def main(): nc = await nats.connect() js = nc.jetstream() await js.add_stream(name='mystream', subjects=['foo']) await js.publish('foo', b'Hello World!') sub = await js.pull_subscribe('foo', stream='mystream') msgs = await sub.fetch() msg = msgs[0] await msg.ack() await nc.close() if __name__ == '__main__': asyncio.run(main()) NTFr)rrrrrrr)r@rrr\r]r^rrrrrrrr/rGrHrJrpull_subscribe_bind) r:rSrrrrrrrr consumer_names r;pull_subscribezJetStreamContext.pull_subscribesP >9@@IIIIIIIIF   &i--fg>>>>>>>>> % w~+    D   @~+--) 0(/% ,% &-## $ 3 3 5 5 < < > > + )(((?? ? ? ? ? ? ? ?--% 31 .         s%AA*)A*rczK|std|$t|jjdddz}||jjz}|j|||d{V} d} |r|} n|r|} n|} t || || |S)a pull_subscribe returns a `PullSubscription` that can be delivered messages from a JetStream pull based consumer by calling `sub.fetch`. :: import asyncio import nats async def main(): nc = await nats.connect() js = nc.jetstream() await js.add_stream(name='mystream', subjects=['foo']) await js.publish('foo', b'Hello World!') msgs = await sub.fetch() msg = msgs[0] await msg.ack() await nc.close() if __name__ == '__main__': asyncio.run(main()) znats: stream name is requiredNrB)rr)r]rrrrr) ValueErrorrpr/rErGrHrIrJr PullSubscription) r:rrrrrrrrrrrs r;rz$JetStreamContext.pull_subscribe_bindksH ><== =   !7!:;;dBL!4!4!6!66H&& NN  1 3'            %#MM  % MM$M00" 1   r= Optional[Msg]cj||jdS|jtjjSr)rWrUrrXrY)clsrNs r; is_status_msgzJetStreamContext.is_status_msgs, ;#+-4{sz0111r=statusc|sdSt|rdStjjj|NTF)r _is_temporary_errorr\r]r^rcfrom_msg)rrrNs r;_is_processable_msgz$JetStreamContext._is_processable_msgsE 4  / / 7 7 5gn%..s333r=c|tjjks*|tjjks|tjjkrdSdSr)r StatusCode NO_MESSAGESCONFLICTREQUEST_TIMEOUTrrs r;rz$JetStreamContext._is_temporary_errors> cn0 0 000077745r=c4|tjjkrdSdSr)rrCONTROL_MESSAGErs r; _is_heartbeatzJetStreamContext._is_heartbeats S^3 3 345r= start_timec<|dS|tj|z z Sr)time monotonic)rr'rs r; _time_untilzJetStreamContext._time_untils$ ?4$.**Z788r=cPeZdZd!dZd"dZd"dZdZdZdZd#dZ d$dZ d%dZ d S)&JetStreamContext._JSIr]r r"rrrr$rrr!JetStreamContext.PushSubscriptionrrrrr+r,c||_||_||_||_||_||_||_d|_d|_|r|j r |j |_d|_ d|_ d|_ d|_ d|_d|_d|_d|_dSNrrT)_conn_js_stream_ordered_psub_sub_ccreqr_hbir_dseq_sseq_cmeta_fcr_fcd_fciseq_activer)r:r]r"rrrrrrs r;r<zJetStreamContext._JSI.__init__sDJDH!DL#DMDJDIDK DLDI 1- 1!0 DJDJ)-DK'+DIDIDL+/DLDLLLr=rc4|xjdz c_||_dS)Nr)rrr:rs r;track_sequencesz%JetStreamContext._JSI.track_sequencess LLA LLDKKKr=c:d|_||_|j|_dS)NT)rrrrrs r;schedule_flow_control_responsez4JetStreamContext._JSI.schedule_flow_control_responsesDLDI DIIIr=c~|jjr |jjS|j|jjz Sr)r _cb deliveredr_pending_queueqsizer?s r;get_js_deliveredz&JetStreamContext._JSI.get_js_delivereds7y} +y**<$)":"@"@"B"BB Br=cKd} |jjrdStj|j|zd{V|j}d|_|s*|jr#||jdzd{Vn#tj $rYdSwxYw)NrQTFr) r is_closedr4sleepr rrreset_ordered_consumerrrh)r: hbc_thresholdactives r;rz$JetStreamContext._JSI.activity_checksM z+ "- M(ABBBBBBBBB!\F#(DL!N=N"&"="=dj1n"M"MMMMMMMM-EE s A2AA22BBczK |jjrdS|j|jjz |jkrI|j} |r |j|d{Vn#t$rYnwxYwd|_d|_tj dd{Vn#tj $rYdSwxYw)NTrg?) rr rr rrrrr| Exceptionr4r!rh)r:fc_replys r;rz1JetStreamContext._JSI.check_flow_control_responses z+ tz'@'F'F'H'HHTYVV#'9!'C&*j&8&8&B&B B B B B B B B(!!! D!$( $% !------------EE s: B&6B& "A-,B&- A:7B&9A::+B&&B98B9rNr c,Kd|_|jsdS||j}t|d}d}|jr:|jt jj}|rt|}d}||krt|d}|j r$| |j dzd{V}nGtj j|||}|j|d{V|S)NTr!r)stream_resume_sequenceconsumer_sequencelast_consumer_sequence)rr_get_metadata_fieldsr*rWrUrrX LAST_CONSUMERrr"rr\r]r^ConsumerSequenceMismatchErrorr _error_cb) r:rNtokensdseqldseq ldseq_str did_resetsseqecss r;check_for_sequence_mismatchz1JetStreamContext._JSI.check_for_sequence_mismatch,s&DL; t--dk::Fvay>>DE{ +KOOCJ,DEE + NNEI}}6!9~~=4&*&A&A$*q.&Q&Q Q Q Q Q Q QII'.FF/3*./4GC *..s333333333 r=r6 Optional[int]rcK|jj}|j||j}|jxjdz c_|jj}|j|jj|<||j_||j_|j|d{V||j_ |j |jd{Vtj dd{Vd|_ d|_|j}||_t"jj|_||_||_tj|dSr)r _idr _remove_subr_sid_subsr _send_unsubscribe_subject_send_subscriber4r!rr r rr DeliverPolicyBY_START_SEQUENCEr opt_start_seqrrecreate_consumer)r:r6osid new_delivernsidrs r;r"z,JetStreamContext._JSI.reset_ordered_consumerHs`9=D J " "4 ( ( (*..00K JOOq OO:?D%)YDJ T " DIM!DJN*..t44 4 4 4 4 4 4 4"-DI *,,TY77 7 7 7 7 7 7 7-"" " " " " " " "DKDJ[F%0F "$'$5$GF !#'F  DK   6 6 8 8 9 9 94r=cK |jj|j|j|jjd{V}|j|j_dS#t$r+}|j |d{VYd}~dSd}~wwxYw)N)rr') rr@rrr r0rr  _consumerr&rr0)r:cinforms r;rEz'JetStreamContext._JSI.recreate_consumerss 0"hm88dkcgckct8uuuuuuuu',z $$$ 0 0 0j**3/////////////// 0sA A B BBN)r]r r"rrrr$rrrrrrrrr+r,)rr$r+r,)rNr r+r)r6r9r+rr+r,) __name__ __module__ __qualname__r<rrrrrr8r"rErr=r;rrs D     % % % %  C C C    (   &    8) ) ) ) V 0 0 0 0 0 0r=rceZdZdZdd Zdd ZeddZejdZedZ e jdZ dddZ d d!fd Z xZ S)"rzP PushSubscription is a subscription that is delivered messages. r]r rrrrr$rr+r,c||_||_||_||_|j|_|j|_|j|_|j|_|j|_|j |_ |j |_ |j |_ |j |_ |j |_ |j|_|j|_|j|_|j|_|j|_|j|_dSr)rrrJr rr;r@_queue _max_msgs _receivedr_future_closed_pending_msgs_limit_pending_bytes_limitr _pending_size_wait_for_msgs_task_message_iterator_pending_next_msgs_calls)r:r]rrrrs r;r<z*JetStreamContext.PushSubscription.__init__sDH!DL%DNDIDJwDHLDM*DK ]DN ]DNwDH;DL;DL(+'>D $(+(@D %"%"4D !$!2D '*'>D $%(%:D ",/,HD ) ) )r=api.ConsumerInfocjK|jj|j|jd{V}|Sze consumer_info gets the current info of the consumer from this subscription. Nrr@rrrJr:infos r;rz/JetStreamContext.PushSubscription.consumer_infosP44 DKr=r*c|jjSzS Number of delivered messages to this subscription so far. r rTr?s r;rz+JetStreamContext.PushSubscription.delivered 9& &r=c||j_dSrrer:values r;rz+JetStreamContext.PushSubscription.delivereds"'DI   r=c|jjSrr rYr?s r;rYz/JetStreamContext.PushSubscription._pending_sizes 9* *r=c||j_dSrrkrhs r;rYz/JetStreamContext.PushSubscription._pending_sizes&+DI # # #r=?r'rqr c|K|j|d{V}|jr|jjrd|jj_|jj|jjjkrD|jjj}|r1|j|d{Vd|jj_|S)aJ :params timeout: Time in seconds to wait for next message before timing out. :raises nats.errors.TimeoutError: next_msg can be used to retrieve the next message from a stream of messages using await syntax, this only works when not passing a callback on `subscribe`:: NT) r next_msgrrrrrrr|)r:r'rNr's r;roz*JetStreamContext.PushSubscription.next_msgs **733333333Cy 3TY^ 3)- &9>2244 8NNN#y~2H3"j00:::::::::.2 +Jr=rlimitc.Kt|d{V|jjjr#|jjj|jjjr%|jjjdSdS)z Unsubscribes from a subscription, canceling any heartbeat and flow control tasks, and optionally limits the number of messages to process before unsubscribing. N)super unsubscriber rrcancelr)r:rp __class__s r;rsz-JetStreamContext.PushSubscription.unsubscribes ''%%e,, , , , , , , ,y~% 0 &--///y~% 0 &--///// 0 0r=) r]r rrrrr$rr$r+r,r+r]r+r*)rm)r'rqr+r )r)rpr*) rMrNrO__doc__r<rpropertyrsetterrYrors __classcell__)rus@r;rz!JetStreamContext.PushSubscriptionzs    I I I I>      ' ' '  '   ( (   (  + +  +   , ,   ,     ( 0 0 0 0 0 0 0 0 0 0 0r=ceZdZdZd#d Zed$dZed$dZed$dZd%dZ d&dZ d'd(dZ d)d*d!Z d)d+d"Z dS),rzM PullSubscription is a subscription that can fetch messages. r]r rrrrr$rrrpr+r,c||_|j|_||_||_||_|jj}|d|d||_||_dS)Nz.CONSUMER.MSG.NEXT..) rr/r rrJr._nmsrJ_deliver)r:r]rrrrrr#s r;r<z*JetStreamContext.PullSubscription.__init__sgDHvDHDI!DL%DNX%F!IIfIIxIIDI#NN,,DMMMr=r*c>|jjS)z Number of delivered messages by the NATS Server that are being buffered in the pending queue. )r rrr?s r; pending_msgsz.JetStreamContext.PullSubscription.pending_msgss 9+1133 3r=c|jjS)zw Size of data sent by the NATS Server that is being buffered in the pending queue. rkr?s r; pending_bytesz/JetStreamContext.PullSubscription.pending_bytess 9* *r=c|jjSrdrer?s r;rz+JetStreamContext.PullSubscription.deliveredrfr=ctK|jtd|jd{VdS)z unsubscribe destroys the inboxes of the pull subscription making it unable to continue to receive messages. Nnats: invalid subscription)r rrsr?s r;rsz-JetStreamContext.PullSubscription.unsubscribe sK y  !=>>>)'')) ) ) ) ) ) ) ) ) )r=r]cjK|jj|j|jd{V}|Sr_r`ras r;rz/JetStreamContext.PullSubscription.consumer_infos<44T\4>RRRRRRRRDKr=rr!Nbatchr'rq heartbeat List[Msg]cHK|jtd|dkrtd||dkrtd|rt|dzdz nd}|dkr ||||d{V}|gS|||||d{V}|S) a fetch makes a request to JetStream to be delivered a set of messages. :param batch: Number of messages to fetch from server. :param timeout: Max duration of the fetch request before it expires. :param heartbeat: Idle Heartbeat interval in seconds for the fetch request. :: import asyncio import nats async def main(): nc = await nats.connect() js = nc.jetstream() await js.add_stream(name='mystream', subjects=['foo']) await js.publish('foo', b'Hello World!') msgs = await sub.fetch(5) for msg in msgs: await msg.ack() await nc.close() if __name__ == '__main__': asyncio.run(main()) Nrrznats: invalid batch sizerznats: invalid fetch timeoutʚ;i)r rr* _fetch_one_fetch_n)r:rr'rexpiresrNmsgss r;fetchz'JetStreamContext.PullSubscription.fetchsDy  !=>>>qyy !;<<<"w!|| !>???@GQc'M122W<YnwxYw) NrrrrrFTrr) r rempty get_nowaitrYrTrbr rr&r*r/r|rr`dumpsrrrrrrorrr\r^rr]rcrr4r) r:rr'rrrNrnext_reqrgot_any_responsedeadlines r;rz,JetStreamContext.PullSubscription._fetch_oneOs I,Ekkmm  **,,CI++s38}}<++-;;C@@F!!J Dkkmm H !HW  3&)'ll# L-0]1J-K-K)*(""  8$$++--         ))J$  /;;GZPPH $ 2 28 2 D DDDDDDDC.;;C@@F #+99&AA%/3,$,??GGH"&+"::#''."9"B"B3"G"GG" +   /;;GZPPH+1 ,4"33 + s2AA><A>> B  B ;A4H0AHt|d kr)tDj#j$j%&| Z t7| D]}t||}| |d kr|cS|j| d{V} t | } t| rd}t| | r| dz} | | n#t*j$rYnwxYwt|d kr |rtB|S) NFrrrrrTno_waitrr)'r rrrrrrYrTrbr rappendr&r*r/r|rr`rrrr4r!rorrrrangerrrrrrr\r]r^rcr)r:rrr'rrrrrneededrNrrir_s r;rz*JetStreamContext.PullSubscription._fetch_ns/DI,E))J$ F kkmm  **,,CI++s38}}<++-;;C@@F!!aKFKK$$$$ Dkkmm  H &HW  .&-# L-0]1J-K-K)*"&HY (""  8$$++--         -"" " " " " " " "  I..w77777777'    KKK   % %33C88F--f55 $( !55fcBB  C   ! "1f----#3#?#?#T#T$(I$6$6x$6$H$HHHHHHH!1!?!?!D!D!S^%???6S^MkCkCk"E-;;FCC-/3,$-AA&#NN-"aKF KK,,,+D 4yy1}} H &HW  .&-# L-0]1J-K-K)*(""  8$$++--         -"" " " " " " " "C& DQ;;K+77LLt99>>$(I$6$6x$6$H$HHHHHHH"/+4"33 $(I$6$6x$6$H$HHHHHHH"/D-;;C@@F'55f==!+/( ! D!  C(((3>#====Ta"gn5>>sCCCM& DR v ) )A/;;GZPPH+1 # $ 2 28 2 D DDDDDDDC-;;C@@F'55f==!+/( ';;FCHH)!  C((( )'     4yyA~~"2~''KstAB0B00 B=<B=) F F!F!C'K==LL0!PP,0!QQ$#Q$7W BWW-,W-) r]r rrrrr$rr$rrpr+r,rwrLrv)rr!N)rr*r'rqrrqr+rr)rr9r'rqrrqr+r ) rr*rr9r'rqrrqr+r)rMrNrOrxr<ryrrrrsrrrrrr=r;rz!JetStreamContext.PullSubscriptions(   - - - -(  4 4 4  4  + + +  +  ' ' '  '  * * * *    '()- 0 0 0 0 0 l*. E E E E E X*. o o o o o o o r=rbucketrc Ktj|tt|} ||d{V}n#t $rtwxYw|jj dkrtt||t||t|jjS)Nrrrrrprer]direct)rmatchrKV_STREAM_TEMPLATEformat stream_inforrrmax_msgs_per_subjectrrKV_PRE_TEMPLATEr allow_direct)r:rrrsis r; key_valuezJetStreamContext.key_valueMs   ( ( 0( (#**&*99 &''////////BB & & &% % & 9 )A - - &&f&55 .//     s AA(Optional[api.KeyValueConfig]c RK|tj|d}|jdi|}tj|jt d}|jr|j|kr|j}|jdkrtj j j tj didt|jd|jdd |jd gd |jd d dd dd dtjjd|d|jd|jddd|jddd|jd|jd|jd|j}||d{V}|jJt7|j|jt8|j|t;|jjS)z] create_key_value takes an api.KeyValueConfig and creates a KV in JetStream. Nrrxrr descriptionsubjectsz$KV.z.>rallow_rollup_hdrsT allow_msg_ttl deny_deletediscardduplicate_windowmax_age max_bytes max_consumers max_msg_sizemax_msgsrrstorage republishrr) rKeyValueConfigevolverrrrttlhistoryr\r]r^KeyHistoryTooLargeError StreamConfigrrrr DiscardPolicyNEWrmax_value_sizereplicasrr add_streamrrrrrr)r:rparamsrrrrs r;create_key_valuez!JetStreamContext.create_key_valueasC >'vh/?@@@F((((   / / 7( ("( : *&*'777%z  >B  '.8 8!   #**&-*@@@ ** /V]...//    #d  $   %)) .- JJ && "  .. R "(  ! "NN# $&&% (??6** * * * * * *{&&&;&&fm&<< .//     r=cKtj|tt|}||d{VS)zr delete_key_value deletes a JetStream KeyValue store by destroying the associated stream. Nr)rrrrr delete_streamr:rrrs r;delete_key_valuez!JetStreamContext.delete_key_values[   ( ( 0( (#**&*99''/////////r=rcKtj|ttj|} ||d{Vn#t $rtwxYwt|||S)Nrrrrr]) rrrrrrrrrrs r; object_storezJetStreamContext.object_stores   ( ( 0( ($+6::: &""6** * * * * * * * * & & &% % &    s AA"Optional[api.ObjectStoreConfig]cbK|tj|}n||_|jdi|}t j|jt |j}tj|}tj|}|j }|dkrd}tj tj|j|j ||g|j|d|j|j|jtjjdd }||d{V|jJt-|j|j|S) zd create_object_store takes an api.ObjectStoreConfig and creates a OBJ in JetStream. NrrrT) rrrrrrrr placementrrrrr)rObjectStoreConfigrrrrrrrrrrrrrrrrrrrrr) r:rrrrchunksmetarrrs r;create_object_storez$JetStreamContext.create_object_storesU >*&999FF"FM((((   / / 7( (},34@@@(/t<<<$ >>I!$+6=AAA*d^JN&%)"    oof%%%%%%%%%{&&&;    r=cKtj|ttj|}||d{VS)z] delete_object_store will delete the underlying stream for the named object. Nr)rrrrrrrs r;delete_object_storez$JetStreamContext.delete_object_storesX   ( ( 0( ($+6:::''/////////r=) r"rr#r$r%r&r'r(r)r*r+r,)r+rrLr)r=NNNN)rSr$rorpr'rqrrr&rWrsrtrqr+ru)rSr$rorpr}rqrrr&rWr~rtrqr+rrw) rSr$rr&rDrrr&rrr&rrrrrrrrqrrrr*rr*rrrrrrqr+r)rrr$rrrr$rDrrrrrrr*rr*r+r)rrr+r)rSr$rr&rrr&rrrr*rr*rrr+r)rr&rrr&rrrr*rr*rr&rr&r+r)rNrr+r&)rr&rNr r+r)rr&r+r)r'rqrr(r+rq)rr$r+rr)rrr+r)rr$r+r)rr$r+r)NN)rr$rrr+r)'rMrNrOrxrDEFAULT_PREFIXr<ryr@rMrKr|rrr!DEFAULT_JS_SUB_PENDING_MSGS_LIMIT"DEFAULT_JS_SUB_PENDING_BYTES_LIMITrIr staticmethodrrr classmethodrrrrrrrrrrrrrrrrr=r;r r Ks@( $)- ]]]]].   X  \ \ \ \>#' $,0#'+.+.+.+.+.`&* $"&#'?????B0000 9999 $!%!% $/3 !&*.""C#E6:'+.2!s s s s s t"& !&"C#E/////b\"& $/3"C#E(,M M M M M b#' $(,"C#E"!%@ @ @ @ @ D222[2 444[4[[ 999[9 d0d0d0d0d0d0d0d0L`0`0`0`0`0<`0`0`0Diiiiiiiib     ,044 4 4 4 4 l 0 0 0 0"    $26/ / / / / b000000r=r )8 __future__rr4r`r email.parserrsecretsrtypingrrrr r r r nats.errorsr\nats.js.errors nats.aio.msgr nats.aio.subscriptionrnats.jsrrrrrr nats.js.kvrnats.js.managerrnats.js.object_storerrrrrrrZ bytearray NATS_HDR_LINErTNATS_HDR_LINE_SIZE_CRLF_ _CRLF_LEN_rrrrrKV_MAX_HISTORYr rr=r;rs{#""""" $$$$$$...... ,,,,,, +&& S''  S[[ "! UGYt_, -%/!%6"f0f0f0f0f0'f0f0f0f0f0r=