|e>kQ7dZddlZddlZddlZddlZddlZddlZddlZddl Z ddl Z ddl Z ddl mZejeZdZejddZeddzZejd d Zd Zd Zd ZGddZdefdZGddZeZ ej!e j"eZ#dS)u Lightweight message status publisher for asyncclient. Publishes MESSAGE_STATUS events to the local proxy which relays them to the EMQX broker via MQTT. Each call to report() submits an HTTP POST to a thread pool — no batching or internal queue. Usage:: publisher = MessageStatusPublisher() message_id_gen = Gen() msg = {...} message_id_gen.enrich(msg) # adds message_reporter_id / message_reporter_increment publisher.report(msg, reporter_id_gen) N) is_enabledz/var/imunify360/iaidIMUNIFY_PROXY_URLzhttp://127.0.0.1:11234/z/api/v1/mqtt-publishIMUNIFY_PROXY_API_KEYc6eZdZdZddZdefdZdeddfdZdS) Genz_ID + monotonic counter generator. Each instance has its own UUID and its own counter. returnNc~tjj|_d|_t j|_dS)Nr)uuiduuid4hexid_counter threadingLock_lockselfs g/opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/internals/message_status_publisher.py__init__z Gen.__init__:s,*,," ^%% cv|j5|j}|xjdz c_|cdddS#1swxYwYdS)N)rr)rvalues r_nextz Gen._next?s Z  ME MMQ MM                  s .22msgcH|j|d<||d<dS)z>Add message_reporter_id and message_reporter_increment to msg.message_reporter_idmessage_reporter_incrementN)rr)rr s renrichz Gen.enrichEs(%)W !",0JJLL ()))rr N) __name__ __module__ __qualname____doc__rintrdictr$rrr r 4so &&&& s 9$94999999rr r c tt5}|cdddS#1swxYwYdS#t$rYdSwxYw)Nr)open _IAID_PATHreadstripOSError)fs r _read_iaidr4Ks *   $6688>>## $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ rrs3A&A  A A  AA A A$#A$cVeZdZd dZd dZdedededdfdZd eded eddfd Z d d Z dS)MessageStatusPublisherr Ncd|_tj|_d|_t jtd|_ tj t|_ dS)NrFz msg-status) max_workersthread_name_prefix) _iaidrr _init_lock _initialized concurrentfuturesThreadPoolExecutor _MAX_WORKERS_poolBoundedSemaphore _MAX_INFLIGHT _inflightrs rrzMessageStatusPublisher.__init__Ts] #.**!'::$+;   #3MBBrc |jrdS|j5|jr ddddSt|_|js.tdt  ddddSd|_ddddS#1swxYwYdS)NzImsg-status: iaid not available yet (file %s missing or empty), will retryT)r<r;r4r:loggerinfor/rs r_ensure_initializedz*MessageStatusPublisher._ensure_initialized^s     F _ % %   % % % % % % % %$DJ:  *  % % % % % % % %!%D  % % % % % % % % % % % % % % % % % %s B;B/BB Br reporter_genstagecvtdsdS|dd}|dsdSjdstd||dSt j|j||dd|d d ||d } j j |||}n*#t$rj YdSwxYw|fd dS) z3Publish a status record via HTTP POST to the proxy. mqtt_trackingNmethodrr"F)blockingz3msg-status: queue full, dropping stage=%s method=%sr#r) timestamp reporter_idreporter_incrementr"r# message_typerJc6jS)N)rDrelease)_rs rz/MessageStatusPublisher.report..s4>+A+A+C+Cr)rgetrDacquirerFwarningtimerrrAsubmit_do_post RuntimeErrorrTadd_done_callback)rr rIrJrMrecordfutures` rreportzMessageStatusPublisher.reportnse /**  F2&&ww,--  F ~%%u%55  NNE    F'?"."4"4"6"6#&77+@"#E#E*-'',a++#    Z&&t}feVLLFF    N " " $ $ $ FF    !C!C!C!CDDDDDs"C77#DDr_rMc,||jsdS|j|d< tj|}ddi}t r t |d<t jt||d}t j |t5}| ddddS#1swxYwYdS#t$r(}td|||Yd}~dSd}~wwxYw) Niaidz Content-Typezapplication/jsonz X-API-KeyPOST)dataheadersrM)timeoutz.msg-status: POST failed stage=%s method=%s: %r)rHr:jsondumpsencode_PROXY_API_KEYurllibrequestRequest_PUBLISH_ENDPOINTurlopen _POST_TIMEOUTr0 ExceptionrFrY) rr_rJrMpayloadrfreqrespes rr\zMessageStatusPublisher._do_posts}   """z  Fv j((//11G%'9:G 6'5 $.((! )C '']'CC t                       NN@           s<B C!2C C!CC!CC!! D+DDc<|jddS)NF)wait)rAshutdownrs rryzMessageStatusPublisher.shutdowns! '''''rr%) r&r'r(rrHr+r strrar\ryr,rrr6r6SsCCCC%%%% 'E$'Ec'E#'E$'E'E'E'ERtC4((((((rr6)$r)atexitconcurrent.futuresr=rhloggingosrrZ urllib.errorrlurllib.requestr'defence360agent.internals.feature_flagsr getLoggerr&rFr/environrW _PROXY_URLrstriprorkrqr@rCr rzr4r6 publisherregisterrymessage_id_genr,rrrs(     >>>>>>  8 $ $ # Z^^/1I J J %%c**-CC  7<<   99999999.C_(_(_(_(_(_(_(_(D # " $ $  "###r