B \NM @ s d Z dZddlZddlZddlmZ ddlZddlmZ ddlZ ddl mZ ddlm Z ddlZddlZddlmZ ddlZddlZddlZe Zd aG d d dZdd ZdZdZG dd deZG dd dZdd ZG dd de Z!G dd de Z"G dd de Z#G dd de Z$dd Z%d d! Z&d2d"d#Z'd$d% Z(d&d' Z)d(d) Z*d a+da,d*d+ Z-d,d- Z.G d.d/ d/ej/Z0G d0d1 d1ej1Z2e3e dS )3a* Implements ProcessPoolExecutor. The follow diagram and text describe the data-flow through the system: |======================= In-process =====================|== Out-of-process ==| +----------+ +----------+ +--------+ +-----------+ +---------+ | | => | Work Ids | | | | Call Q | | Process | | | +----------+ | | +-----------+ | Pool | | | | ... | | | | ... | +---------+ | | | 6 | => | | => | 5, call() | => | | | | | 7 | | | | ... | | | | Process | | ... | | Local | +-----------+ | Process | | Pool | +----------+ | Worker | | #1..n | | Executor | | Thread | | | | | +----------- + | | +-----------+ | | | | <=> | Work Items | <=> | | <= | Result Q | <= | | | | +------------+ | | +-----------+ | | | | | 6: call() | | | | ... | | | | | | future | | | | 4, result | | | | | | ... | | | | 3, except | | | +----------+ +------------+ +--------+ +-----------+ +---------+ Executor.submit() called: - creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict - adds the id of the _WorkItem to the "Work Ids" queue Local worker thread: - reads work ids from the "Work Ids" queue and looks up the corresponding WorkItem from the "Work Items" dict: if the work item has been cancelled then it is simply removed from the dict, otherwise it is repackaged as a _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q" until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because calls placed in the "Call Q" can no longer be cancelled with Future.cancel(). - reads _ResultItems from "Result Q", updates the future stored in the "Work Items" dict and deletes the dict entry Process #1..n: - reads _CallItems from "Call Q", executes the calls, and puts the resulting _ResultItems in "Result Q" z"Brian Quinlan (brian@sweetapp.com) N)_base)Full)wait)Queue)partialFc @ s, e Zd Zdd Zdd Zdd Zdd Zd S ) _ThreadWakeupc C s t jdd\| _| _d S )NF)Zduplex)mpZPipe_reader_writer)self r ?/opt/alt/python37/lib64/python3.7/concurrent/futures/process.py__init__R s z_ThreadWakeup.__init__c C s | j | j d S )N)r closer )r r r r r U s z_ThreadWakeup.closec C s | j d d S )N )r Z send_bytes)r r r r wakeupY s z_ThreadWakeup.wakeupc C s x| j r| j qW d S )N)r ZpollZ recv_bytes)r r r r clear\ s z_ThreadWakeup.clearN)__name__ __module____qualname__r r r r r r r r r Q s r c C sH da tt } x| D ]\}}| qW x| D ]\}}| q0W d S )NT)_global_shutdownlist_threads_wakeupsitemsr join)r _ thread_wakeuptr r r _python_exita s r = c @ s e Zd Zdd Zdd ZdS )_RemoteTracebackc C s || _ d S )N)tb)r r" r r r r z s z_RemoteTraceback.__init__c C s | j S )N)r" )r r r r __str__| s z_RemoteTraceback.__str__N)r r r r r# r r r r r! y s r! c @ s e Zd Zdd Zdd ZdS )_ExceptionWithTracebackc C s0 t t|||}d|}|| _d| | _d S )N z """ %s""") tracebackformat_exceptiontyper excr" )r r) r" r r r r s z _ExceptionWithTraceback.__init__c C s t | j| jffS )N)_rebuild_excr) r" )r r r r __reduce__ s z"_ExceptionWithTraceback.__reduce__N)r r r r r+ r r r r r$ s r$ c C s t || _| S )N)r! __cause__)r) r" r r r r* s r* c @ s e Zd Zdd ZdS ) _WorkItemc C s || _ || _|| _|| _d S )N)futurefnargskwargs)r r. r/ r0 r1 r r r r s z_WorkItem.__init__N)r r r r r r r r r- s r- c @ s e Zd ZdddZdS )_ResultItemNc C s || _ || _|| _d S )N)work_id exceptionresult)r r3 r4 r5 r r r r s z_ResultItem.__init__)NN)r r r r r r r r r2 s r2 c @ s e Zd Zdd ZdS ) _CallItemc C s || _ || _|| _|| _d S )N)r3 r/ r0 r1 )r r3 r/ r0 r1 r r r r s z_CallItem.__init__N)r r r r r r r r r6 s r6 c s. e Zd ZdZd fdd Z fddZ ZS ) _SafeQueuez=Safe Queue set exception to the future object linked to a jobr c s || _ t j||d d S )N)ctx)pending_work_itemssuperr )r max_sizer8 r9 ) __class__r r r s z_SafeQueue.__init__c sl t |trZtt|||j}tdd||_ | j |jd }|d k rh|j | nt || d S )Nz """ {}"""r% ) isinstancer6 r&