.Y'^ddlmZddlZddlZddlmZddlmZmZm Z m Z m Z m Z ddl mZddlmZddlmZmZmZerddlmZed ZeeZd ZeeZGd d ZdS) ) annotationsN) BytesParser) TYPE_CHECKINGAnyDictIterableListOptional)NoRespondersError)api)APIError NotFoundErrorServiceUnavailableError)NATSsNATS/1.0s ceZdZdZejdfdEd ZdFd ZdGdZdHdIdZ dHdJdZ dHdJdZ dKdZ dLdMdZ dHdNd#ZdOdPd&ZdOdQd(Z dRdSd+ZdTd,Z dHdUd/Z dHdVd0ZdHdWd3Z dXdYd:ZedZd;Zd[d=Z d\d]d>Z d^d_dDZdS)`JetStreamManagerzA JetStreamManager exposes management APIs for JetStream. connrprefixstrtimeoutfloatreturnNonecV||_||_||_t|_dSN)_prefix_nc_timeoutr _hdr_parser)selfrrrs o/builddir/build/BUILD/imunify360-venv-2.6.2/opt/imunify360/venv/lib/python3.11/site-packages/nats/js/manager.py__init__zJetStreamManager.__init__(s+   &==api.AccountInfocK||jdd|jd{V}tj|S)Nz.INFOr$r) _api_requestrrr AccountInfo from_response)r!resps r" account_infozJetStreamManager.account_info3sU&&$,'='='=sDM&ZZZZZZZZ,,T222r$subjectcK|jd}tjd|i}||||jd{V}|dst |ddS)zK Find the stream to which a subject belongs in an account. z .STREAM.NAMESr-r'Nstreamsr)rjsondumpsr(encoderr)r!r-req_subreq_datainfos r"find_stream_name_by_subjectz,JetStreamManager.find_stream_name_by_subject7s \000:y'233&&w0A0A4=&YYYYYYYYI  Iq!!r$Nnamesubjects_filter Optional[str]api.StreamInfocKd}|rtjd|i}||jd|||jd{V}t j|S)z; Get the latest StreamInfo by stream name. r8z .STREAM.INFO.r'N) r0r1r(rr2rr StreamInfor*)r!r7r8r4r+s r" stream_infozJetStreamManager.stream_infoCs  Hz#4o"FGGH&&| 0 0$ 0 0 OO  M'         ~++D111r$configOptional[api.StreamConfig]cf K|tj}|jd i|}|j t dt d}t fd|D}t d D}  }|s|s|rt d dtj | }| |j d | |j d{V}tj|S) z. add_stream creates a stream. Nnats: stream name is requiredz.*>/\c3 K|]}|vV dSr).0char stream_names r" z.JetStreamManager.add_stream.._s(NN 3NNNNNNr$c3>K|]}|VdSr)isspace)rErFs r"rHz.JetStreamManager.add_stream..`s*DDT\\^^DDDDDDr$znats: stream name (z) is invalid. Names cannot contain whitespace, '.', '*', '>', path separators (forward or backward slash), or non-printable characters.z.STREAM.CREATE.r'rD)r StreamConfigevolver7 ValueErrorsetany isprintabler0r1as_dictr(rr2rr=r*) r!r?params invalid_charshas_invalid_charshas_whitespaceis_not_printabledatar+rGs @r" add_streamzJetStreamManager.add_streamQs >%''F((((k  <== =H NNNN NNNNNDD DDDDD*66888   2B \k\\\  z&..**++&&| 9 9K 9 9 KKMMM'         ~++D111r$cK|tj}|jdi|}|jt dt j|}||j d|j| |j d{V}tj |S)z1 update_stream updates a stream. NrBz.STREAM.UPDATE.r'rD)r rKrLr7rMr0r1rQr(rr2rr=r*)r!r?rRrWr+s r" update_streamzJetStreamManager.update_streamqs >%''F(((( ; <== =z&..**++&&| 9 9FK 9 9 KKMMM'         ~++D111r$boolcnK||jd||jd{V}|dS)z* Delete a stream by name. z.STREAM.DELETE.r'Nsuccessr(rr)r!r7r+s r" delete_streamzJetStreamManager.delete_streamsP&&$,'M'Mt'M'MW[Wd&eeeeeeeeIr$seq Optional[int]keepcKi}|r||d<|r||d<|r||d<tj|}||jd|||jd{V}|dS)z) Purge a stream by name. r`filterrbz.STREAM.PURGE.r'Nr])r0r1r(rr2r)r!r7r`r-rb stream_reqreqr+s r" purge_streamzJetStreamManager.purge_streams&(  $ #Ju   +#*Jx  &!%Jv j$$&&$,'L'Ld'L'Lcjjlldhdq&rrrrrrrrIr$streamconsumerOptional[float]cK||j}||jd|d|d|d{V}tj|S)Nz.CONSUMER.INFO..r$r')rr(rr ConsumerInfor*)r!rhrirr+s r" consumer_infozJetStreamManager.consumer_infosp ?mG&&$,'Z'Zv'Z'ZPX'Z'Z\_ip&qqqqqqqq--d333r$rList[api.StreamInfo]c.K||jdtjd|i|jd{V}g}|dD]6}t j|}| |7|S)zS streams_info retrieves a list of streams with an optional offset. .STREAM.LISToffsetr'Nr/) r(rr0r1r2rr r=r*append)r!rrr+r/rhr>s r" streams_infozJetStreamManager.streams_infos&&| ) ) ) J&) * * 1 1 3 3M'         9o ( (F.66v>>K NN; ' ' ' 'r$Iterable[api.StreamInfo]cK||jdtjd|i|jd{V}t j|d|d|dS)zD streams_info retrieves a list of streams Iterator. rqrrr'Ntotalr/)r(rr0r1r2rr StreamsListIterator)r!rrr+s r"streams_info_iteratorz&JetStreamManager.streams_info_iterators&&| ) ) ) J&) * * 1 1 3 3M'         &tH~tG}d9oVVVr$Optional[api.ConsumerConfig]api.ConsumerInfocK|s|j}|tj}|jd i|}|j}||d}t j|}d}d} |j j } | j dko | j dk} | rK|j rD|jr(|jdkr|jd|d|j d|j} n3|jd|d|j } n|r|jd|d|} n |jd|} || || d{V}tj|S) N)rGr?r< >z.CONSUMER.CREATE.rlz.CONSUMER.DURABLE.CREATE.r'rD)rr ConsumerConfigrL durable_namerQr0r1r2rconnected_server_versionmajorminorr7filter_subjectrr(rmr*) r!rhr?rrRrrfr4r+r-versionconsumer_name_supporteds r" add_consumerzJetStreamManager.add_consumers $mG >'))F((((* $0@0@AA:c??))++(3")-1"4"K!9K " Av{ A$ S)>#)E)E!\jjFjjV[jjSYShjj!\RRFRRV[RR  AWWWWWWGG@@@@G&&w'&JJJJJJJJ--d333r$cvK||jd|d|d|jd{V}|dS)Nz.CONSUMER.DELETE.rlr$r'r]r^)r!rhrir+s r"delete_consumerz JetStreamManager.delete_consumersm&&| A Af A Ax A A M'         Ir$ pause_untilapi.ConsumerPausecK||j}d|i}tj|}||jd|d|||d{V}t j|S)a Pause a consumer until the specified time. Args: stream: The stream name consumer: The consumer name pause_until: RFC 3339 timestamp string (e.g., "2025-10-22T12:00:00Z") until which the consumer should be paused timeout: Request timeout in seconds Returns: ConsumerPause with paused status Note: Requires nats-server 2.11.0 or later Nrz.CONSUMER.PAUSE.rlr') rr0r1r2r(rr ConsumerPauser*)r!rhrirrrfr4r+s r"pause_consumerzJetStreamManager.pause_consumers. ?mGk*:c??))++&&| @ @V @ @h @ @ '          ..t444r$cBK|||d|d{VS)a Resume a paused consumer immediately. This is equivalent to calling pause_consumer with a timestamp in the past. Args: stream: The stream name consumer: The consumer name timeout: Request timeout in seconds Returns: ConsumerPause with paused=False Note: Requires nats-server 2.11.0 or later z1970-01-01T00:00:00ZN)r)r!rhrirs r"resume_consumerz JetStreamManager.resume_consumers6.((;QSZ[[[[[[[[[r$rrList[api.ConsumerInfo]c:K||jd||dn'tjd|i|jd{V}g}|dD]6}t j|}| |7|S)z consumers_info retrieves a list of consumers. Consumers list limit is 256 for more consider to use offset :param stream: stream to get consumers :param offset: consumers list offset z.CONSUMER.LIST.Nr$rrr' consumers) r(rr0r1r2rr rmr*rs)r!rhrrr+rrirns r"consumers_infozJetStreamManager.consumers_info+s&&| 4 4F 4 4>CCtz8V2D'E'E'L'L'N'NM'          [) , ,H,::8DDM   ] + + + +r$FrGdirectOptional[bool]nextapi.RawStreamMsgc,Kd}i}|r||d<|r d|d<|dd||d<|r%||d<d|d<|dd||d<tj|}|rx|r|d}|jd|d|}n |jd|}|j|||jd{V} t | } | S|jd |}| |||jd{V} tj | d } | jrtj| j} | t"t$zd} |j| }d}t+|d kr!i}|D] \}}|||< || _d}| jrtj| j}|| _| S) z< get_msg retrieves a message from a stream. Nr` last_by_subj next_by_subjr<z .DIRECT.GET.rlr'z.STREAM.MSG.GET.messager)popr0r1rrrequestr2rr_lift_msg_to_raw_msgr(r RawStreamMsgr*hdrsbase64 b64decodeNATS_HDR_LINE_SIZE _CRLF_LEN_r parsebyteslenitemsheadersrW)r!rGr`r-rr req_subjectrfrWr+raw_msg resp_datar raw_headersparsed_headersrkvmsg_datas r"get_msgzJetStreamManager.get_msg=so    CJ  *CJ GGE4 ")C   *CJ"&C  GGND ) ) )")C z#   ICK!%RR;RRRR !%HH;HH ))+t{{}}dm)\\\\\\\\D&;;DAAGNDD{DD ++KPTP]+^^^^^^^^ "0091EFF < &#GL11D1J>@@AK!-88EENG>''))**Q..*0022##DAq!"GAJJ%GO$( < 6' 55H r$cz|jsDd|_|jd}|r!|dkrtt j|t j}|jd}||_|jd}|rt||_ |j|_|j|_|S)NStatus404z Nats-Subjectz Nats-Sequence) rWrgetrr from_msgr rr-intr`)r!msgstatusrr-r`s r"rz%JetStreamManager._lift_msg_to_raw_msg{sx 1CH[__X..F 1U??''"+C000"$$+n-!kooo..  #c((GKx +r$rcK|jd|}d|i}tj|}|||d{V}|dS)zX delete_msg retrieves a message from a stream based on the sequence ID. z.STREAM.MSG.DELETE.r`Nr])rr0r1r(r2)r!rGr`rrfrWr+s r" delete_msgzJetStreamManager.delete_msgslGG+GG clz#&&{DKKMMBBBBBBBBIr$cBK||||d{VS)zH get_last_msg retrieves the last message from a stream. )r-rN)r)r!rGr-rs r" get_last_msgzJetStreamManager.get_last_msgs2\\+wv\NNNNNNNNNr$r$rrfbytesDict[str, Any]cK |j|||d{V}tj|j}n#t $rt wxYwd|vrtj|d|S)Nr'error) rrr0loadsrWr rr from_error)r!rrfrrr+s r"r(zJetStreamManager._api_requests  *((c7(KKKKKKKKC:ch''DD  * * *) ) * d??%d7m44 4 s rXrZr_rgrnrtryrrrrrr classmethodrrrr(rDr$r"rr#so( ) ) ) ) )3333 " " " " 2 2 2 2 222222@22222$"!%" ,44444      W W W W W04#' "4"4"4"4"4H$( "5"5"5"5"5P$( \\\\\2*"!%!&$ <<<<<|[,"' O O O O O r$r) __future__rrr0 email.parserrtypingrrrrr r nats.errorsr nats.jsr nats.js.errorsr rrnatsr bytearray NATS_HDR_LINErr_CRLF_rrrDr$r"rs6#""""" $$$$$$EEEEEEEEEEEEEEEE))))))KKKKKKKKKK +&& S''  S[[ TTTTTTTTTTr$