tԑuvddlZddlZddlZddlZddlZddlZddlZddlZddl m Z ddl m Z m Z mZmZddlmZddlmZddlmZmZmZddlmZmZddlmZmZdd lmZdd l m!Z!m"Z"m#Z#dd l$m%Z%ej&e'Z(eZ)eZ*eZ+Gd d Z,Gdde,eZ-dS)N) Generator)APIErrorAPIErrorTooManyRequests APITokenError send_message)license)Core)Message MessageList MessageType) MessageSinkexpect)Gen publisher)PersistentMessagesQueue)recurring_checksafe_cancel_taskScope)ServerJSONEncoderceZdZdZeejddZdZ dZ de j fdZ dd Zd Zed Zejdeejd d ffd Zeejdedd fdZee dZeddZdede fdZ!de defdZ"dZ#dZ$dZ%ddZ&d S)SendToServerClientaSend messages to server. * process Reportable messages; * add them to a pending messages list; * send all pending messages to server when list is full (contains _PENDING_MESSAGES_LIMIT items or more); * send all pending messages on plugin shutdown. IMUNIFYAV_MESSAGES_COUNT_TO_SENDi,2loopc~K||_t|_tj|_tj|_tj|_| | |_ | | |_ dSN)_loopr_pendingasyncioEvent _try_sendLock_lock_shutting_down create_task_send _sender_task_invoke_send_message_invoke_send_message_task)selfrs S/opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/plugins/client.py create_sinkzSendToServerClient.create_sink7s /11  \^^ %moo ,,TZZ\\::)-)9)9  % % ' '* * &&&returnNcxK|j tj||jd{Vnh#tj$rVtd|j|j st|j d{VYnwxYw|j j dkrrtd|j j |j td|j dSdS)a~ When shutdown begins it signals any in-flight HTTP sends to abort immediately (via _shutting_down event), then gives 50 seconds to finish the stop() sequence. If stop() isn't done in 50 seconds it force-cancels the sender task. Finally, any messages still in the buffer are flushed to persistent storage so nothing is lost. Nz5Timeout (%ds) sending messages to server on shutdown.rz&Save %s messages to persistent storagezStored queue %r)r%setr wait_forstop_SHUTDOWN_SEND_TIMEOUT TimeoutErrorloggererrorr( cancelledrr buffer_sizewarningpush_buffer_to_storageqsizer+s r,shutdownzSendToServerClient.shutdownBsJ !!! :"499;;0KLL L L L L L L L L# : : : LLG+   $..00 :&t'8999999999 : = $q ( ( NN8 )    M 0 0 2 2 2 NN,dm.A.A.C.C D D D D D ) (s2AA"B54B5cKtdt|jd{Vtd|j4d{Vtdt|jd{V|j|d{Vdddd{VdS#1d{VswxYwYdS)aq Stop sending. 1. wait for the lock being available i.e., while _sender_task finishes the current round of sending message (if it takes too long, then the timeout in shutdown() is triggered 2. once the sending round complete (we got the lock), cancel the next iteration of the _sender_task (it exits) 3. send _pending messages (again, if it takes too long, the timeout in shutdown() is triggered and the coroutine is cancelled That method makes sure that the coroutine that was started in it has ended. It excludes a situation when: -> The result of a coroutine that started BEFORE shutdown() is started. -> And the process of sending messages from _pending is interrupted because of it z2SendToServer.stop cancel _invoke_send_message_taskNzSendToServer.stop wait lockz4SendToServer.stop lock acquired, cancel _sender_task) r6inforr*r$r(r%clear_send_pending_messagesr=s r,r3zSendToServerClient.stopes0  HIIIt=>>>>>>>>> 1222: 0 0 0 0 0 0 0 0 KKN O O O"4#455 5 5 5 5 5 5 5   % % ' ' '--// / / / / / / / 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0sA(C C#&C#c,|tj|tj|tj|Sr)set_product_namer LicenseCLNget_product_name set_server_id get_server_id set_license get_token)apis r,_set_api_attrsz!SendToServerClient._set_api_attrssn W/@@BBCCC ',::<<=== *4466777 r.c#*Ktjd}tjd5}t jtj ||}| |VddddS#1swxYwYdS)NIMUNIFYAV_API_BASE) max_workers)executor) osenvironget concurrentfuturesThreadPoolExecutorrSendMessageAPIr VERSIONrL)r+base_urlrQrKs r,_get_apizSendToServerClient._get_apis:>>"677   2 2q 2 A A +X- hC%%c** * * *  + + + + + + + + + + + + + + + + + +s9BB B messagec@Kd|vrtj|d<d|vrtjj|d<|j|||jtj |tddS)N timestamp message_idz agent-queuedstage) timeuuiduuid4hexrput_encode_data_to_put_in_queuer"r1rreport_reporter_gen_queued)r+r\s r,send_to_serverz!SendToServerClient.send_to_servers g % %#'9;;GK w & &$(JLL$4GL ! $;;GDDEEE "6nMMMMMMr.c<K|jdSr)r"r1r=s r,r)z'SendToServerClient._invoke_send_messages  r.rctK|jd{V|j|j|jkrt dd}|j4d{Vt d | d{Vn8#tj $r&}t d|}Yd}~nd}~wwxYwdddd{Vn#1d{VswxYwYt d|r|dSdS)NzSendToServer._send wait lockz SendToServer._send lock acquiredz&SendToServer._send cancelled unlockingz SendToServer._send lock released) r"waitrArr<_PENDING_MESSAGES_LIMITr6r@r$rBr CancelledError)r+need_to_canceles r,r'zSendToServerClient._sendsn!!#########  =   D$@ @ @ KK6 7 7 7!Nz ' ' ' ' ' ' ' ' >???'557777777777-'''KK HIII%&NNNNNN'  ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' KK: ; ; ; %$$ A @ % %s<D"B=<D=C2 C-(D-C22D DDdatacftj|tdz}|S)N)cls )jsondumpsrencode)r+rrmsgs r,rgz/SendToServerClient._encode_data_to_put_in_queues*j#4555<zz||r.ctj|}|dr"t|dSt |S)Nlist)rvloadsrTr r )r+r\rrs r,_decode_messagez"SendToServerClient._decode_messagesKz'"" 88F   1txx//00 0t}}r.c|dddz|d<|j|||dS)Napi_retries_countrr^)rTrrfrg)r+r\r^s r,_requeue_messagez#SendToServerClient._requeue_messages]'.{{3F'J'JQ'N#$   - -g 6 6)      r.clKtj||}tj|j}tj||htjd{V\}}|D]}t |d{V||vr|dSdS)zRace the HTTP send against the shutdown signal. Returns True on success, raises on API error, or returns False if shutdown interrupted the send. ) return_whenNTF)r ensure_futurerr%rmFIRST_COMPLETEDrresult)r+rKr\ send_task shutdown_taskdone pending_taskstasks r,_send_one_messagez$SendToServerClient._send_one_messages )#*:*:7*C*CDD -d.A.F.F.H.HII $+L  &/% % %       m" ) )D"4(( ( ( ( ( ( ( ( (         4ur.cK|jr8td|j||dS||}|d|dd} tj |td| ||d {V}|s8td |j||dStj |td t d |d S#ttf$r=}td|||||Yd }~dSd }~wt"$r=}td|||||Yd }~dSd }~wwxYw)zTry sending a single message. Returns (stop, failed) where *stop* is True if further sends should stop (shutdown or server-level error) and *failed* is True when the message could not be delivered. z3Shutdown signal received, saving remaining messagesr)TFmethodr_)rr_z agent-sendingr`Nz?Shutdown signal received during send, saving remaining messagesz agent-sentzmessage sent %s)FFz'Failed to send message %s to server: %s)TT)FT)r%is_setr6r:rrfr}rTrrh_reporter_gen_sendingr_reporter_gen_sentr@rrrr)r+rKr^ message_bytesr\msg_infosentexcs r, _try_send_onez SendToServerClient._try_send_ones   % % ' '  NNE    M  my  A A A;&&}55kk(++!++l33     .o    //W========D #1 !!-9!EEE"{  W&8 M M M M KK)8 4 4 4<'7    NN98S     ! !'9 5 5 5:::::    NN98S     ! !'9 5 5 5;;;;;  s+A0E7EG2F  G2GGcfK|j}tdt |d}d}|5}|j2|D]!\}}|j||" ddddS |D]\}}||||d{V\}}|dz }|r|dz }|rS||d} | D]!\} } |j| | "|r|t | z }t |}n||dD]!\}}|j||"n1#||dD]!\}}|j||"wxYw dddn #1swxYwYtd|dS)NzSending %s messagesrrrz Unsuccessful to send %s messages) rpop_allr6r@lenr[ server_idrfr) r+messages failure_count processedrKr^rr3failed remainingtsmbs r,rBz)SendToServerClient._send_pending_messagess=((** )3x==999  ]]__ J}$08JJ,I}M%%my%IIII  J J J J J J J J  J08,I})-););Y **$$$$$$LD&NI+%*  $,YZZ$8 &/@@FB M--bB-????!<*S^^;M$'MM  190DJJ,I}M%%my%IIIIJ0DJJ,I}M%%my%IIIIJJ7 J J J J J J J J J J J J J J J:  6 FFFFFs+-F B E  -F  .E;;F  FF)r/N)'__name__ __module__ __qualname____doc__intrRrSrTrn_SEND_MESSAGE_RECURRING_TIMEr4r AbstractEventLoopr-r>r3 staticmethodrL contextlibcontextmanagerrrrXr[rr Reportabler rjrr)r'bytesrgr}rrrrBr.r,rr's 77"c 92>>$*   g&?     !E!E!E!EF$0$0$0L\ +)L$?t$KL++++ VK "## NG N N N N$# N_12232_Q%%%$Uu    4---^"G"G"G"G"G"Gr.rc eZdZejZdZdS) SendToServeriN)rrrrAVSCOPESHUTDOWN_PRIORITYrr.r,rr>s HEr.r).r concurrent.futuresrUrrvloggingrRrbrctypingrdefence360agent.api.serverrrrrdefence360agent.contractsr defence360agent.contracts.configr "defence360agent.contracts.messagesr r r !defence360agent.contracts.pluginsr r2defence360agent.internals.message_status_publisherrr,defence360agent.internals.persistent_messagerdefence360agent.utilsrrrdefence360agent.utils.jsonr getLoggerrr6rirrrrrr.r,rs#   .-----111111 BAAAAAAAMMMMMMMMKJJJJJJJJJ888888  8 $ $suuSUUTGTGTGTGTGTGTGTGn%{r.