z\݋ddlZddlZddlZddlZddlZddlZddlZddlmZddl m Z m Z ddl m Z ddlmZmZddlmZddlmZmZmZddlmZejeZeZejd d d gZGd d e ZGddeZ dZ!Gdde"Z#Gdde"Z$Gddej%Z&dS)N) attrgetter)MessageReject)BaseMessageProcessor)Gen publisher)safe_cancel_task)DAY ServiceBase rate_limit)gProcessingMessagemessage start_timec2eZdZdZdZdZdZdZdZdS)TheSinkct|td|_||_t |t |j|_|t_dS)NPROCESSING_ORDER)key) sortedr_sinks_ordered_loop TaskManagerMessageProcessor _task_managerr sink)self sink_listloops W/opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/internals/the_sink.py__init__zTheSink.__init__s`$ :&899    ( "4#677  c8|jjd|jjS)N.) __class__ __module____name__rs r __repr__zTheSink.__repr__%s .333T^5L5LMMr"cfd|jD}t|dks Jdtt|dS)ze introspection: decompose a specific role :return classobj: instance or None c4g|]}t||S) isinstance).0rclassobjs r z%TheSink.decompose..-s8   JtX4N4N    r"zAmbiguous requestN)rlennextiter)rr/optionss ` r decomposezTheSink.decompose(sf     !0   7||q   "5   DMM4(((r"c8|jdS)z Make sure to run message processing bus only when every MessageSource (or MessageSource+MessageSink mix) got initialized N)rstartr(s r r8z TheSink.start3s   """""r"cXKtd|jtd|jdd{Vtd|jd{VdS)Nzshutdown the sink startedzwait for current taskstimeoutzfinish wait task)loggerinfor should_stopwait_current_taskswaitr(s r shutdownzTheSink.shutdown;s /000 &&((( ,--- 33A3>>>>>>>>> &''' %%'''''''''''r"cJK|j|d{VdSN)rpush_msg)rrs r process_messagezTheSink.process_messageCs5 ))'22222222222r"N) r'r& __qualname__r!r)r6r8rBrFr,r"r rrsqNNN ) ) )###(((33333r"rcpeZdZdZdZdZfdZdZedZ d dZ d Z d Z e d ZxZS) rir:c~t|t|j|_|j|_|j|_||_ tj |_ tttj|_|tj|_dS)N)maxsize)periodon_drop)superr! MessageQueueMAXSIZE_queue CONCURRENCY _concurrencyTIMEOUT_process_message_timeout_msg_processorweakrefWeakSettasksr r r=warning_throttled_loggererrorthrottled_log_error)rr msg_processorr%s r r!zTaskManager.__init__Os "4<888  ,(, %+_&& !+3!O!O!O#'#9#9&,#G#G   r"c,K|js/|jt|d{VdS|jjrd|j|j|f}n"d|j|j|f}|j|dS)z&Push message unless the queue is full.NzNMessage queue is full %s. Current processing messages: %s. Message ignored: %szZMessage queue is full. Queue size: %s Current processing messages: %s. Message ignored: %s) rQfullputMessageComparabler[should_be_calledcurrent_processing_messagesqsizer])rmsgargss r rEzTaskManager.push_msgYs{!! ,+//"3C"8"899 9 9 9 9 9 9 9 9 9%6 OK4OK%%''4 %D $d + + + +r"c>td|jDS)Nc3K|]R}||jjtt j|jjz dfVSdS)N)doneprocessing_msgrroundtime monotonicr)r.tasks r z:TaskManager.current_processing_messages..{sq  99;;  #+dn&&)<)GGKK       r")tuplerYr(s r rdz'TaskManager.current_processing_messagests6         r"NcK|jrOd|jD}td|t j|j|d{VdSdS)Ncjg|]0\}}|d|d|f1S)method message_id)get)r.mlastings r r0z2TaskManager.wait_current_tasks..sIAwx!%% "5"5w?r"z#Waiting for %r processing to finishr;)rYrdr=r>asynciorA)rr<msg_to_processs r r@zTaskManager.wait_current_taskss : <"&"BN KK5   ,tz7;;; ; ; ; ; ; ; ; ; ; < z"TaskManager._run..si.?.?.A.Ar"z3There is still %s unprocessed messages in the queue Error during message processing:)rzBoundedSemaphorerS _should_stopr=debugrQre_TaskManager__limit_concurrencyrwCancelledErrorr create_taskrVrfrrnrorladd_done_callback_on_msg_processedrYaddrZ exception)rmsg_comparablet unprocessedrs @r _runzTaskManager._runs,T->??  A' " 5t{7H7H7J7JKKK229=========+/;??+<+<%<%<%<%<%<% %r)filerzCMessage %r was not processed in the %r plugin in %ss; Traceback: %szError processing %r in %rz%s processed in %.4f secondszE%s message took longer to process than expected (%.4f sec > %.4f sec))rreport_reporter_gen_sinkrnrorrzrrFrshieldTIMEOUT_TO_SINK_PROCESSr-rrrrr=r>strrioStringIO print_stackseekr\read ExceptionrPROCESSING_TIME_THRESHOLDr]) rrfr8rprocess_message_task processedrstackprocessing_times r rzMessageProcessor._call_unlockedsM08MNNNN  J* 8* 8D) 8'.':((--(($#*"2 N#788 8 ### Di11$#C7)   8""6777777777777    0#a&&#>>>2""67777777777771'     $00e0<<< 1  $0JJLL ""6777777777777     !)klassrfr)formatr%r'rfrr(s r r)zMessageComparable.__repr__%s28??.)]@   r") r'r&rG__doc__rrrrr)rrs@r rbrbsm-- E\444       r"rbc4eZdZfdZdeffd ZdZxZS)rOctj|i|tj|_d|j_d|j_dS)N2i)rNr!reprlibRepr_repr maxstringmaxtuple)rrgkwargsr%s r r!zMessageQueue.__init__.sF$)&)))\^^ ! " r"itemcVKt|d{VSrD)rNra)rrr%s r razMessageQueue.put4s/WW[[&&&&&&&&&r"cttjd|jDdd}d|jd|d|j|dS) Nc0g|]}|jjjSr,)rfr%rG)r.rs r r0z(MessageQueue.__str__..;s IIIT#0IIIr"c|dSrr,)rs r rz&MessageQueue.__str__..=s T!Wr"T)rreversez) r collectionsCounterrQitemsrKrerrepr)r msg_countss r __str__zMessageQueue.__str__7s  IIT[III  egg$$     rs$  >>>>>>>>BBBBBBMMMMMMMM222222EEEEEEEEEE444444  8 $ $SUU*K*)\2 *3*3*3*3*3"*3*3*3ZwMwMwMwMwM+wMwMwMt%%% KKKKKvKKK\        2     7(     r"