
    Pho              	          U d Z ddlmZ ddlmZ ddlZddlmZ ddlm	Z	 ddl
mZmZmZmZmZmZmZmZmZmZ ddlZdd	lmZ dd
lmZmZmZ g dZee   ed<   eee   ee	f   Z ereed      Z!eee"eedef   e df   f      Z#neZ!eZ# G d d      Z$de!de#dejJ                  ddfdZ&deejJ                     deee!   ee#   f   fdZ'edeejJ                     deeee!   ee#   f   ddf   fd       Z(y)z'Multithreading in pipeline parallelism.    )contextmanager)QueueN)Thread)TracebackType)
TYPE_CHECKINGCallableDict	GeneratorListOptionalTupleTypeUnioncast   )Batch)AbstractStream
use_device
use_stream)Taskworkercreate_workersspawn_workers__all__r   c            	       `    e Zd ZdZdedeg ef   deeegdf      ddfdZdefdZ	d	eddfd
Z
y)r   a>  A task represents how to compute a micro-batch on a partition.

    It consists of two parts: :meth:`compute` and :meth:`finalize`.
    :meth:`compute` should be executed in worker threads concurrently.
    :meth:`finalize` should be executed after when worker threads complete to
    execute :meth:`compute`.

    :meth:`compute` might be boosted by worker threads. Because it produces
    several CUDA API calls by user code. In PyTorch, parallel CUDA API calls
    are not serialized through GIL. So more than one CUDA API call can be
    produced at the same time.

    streamcomputefinalizeNreturnc                `    || _         || _        || _        t        j                         | _        y N)r   _compute	_finalizetorchis_grad_enabled_grad_enabled)selfr   r   r   s       qC:\Users\daisl\Desktop\realtime-object-detection\venv\Lib\site-packages\torch/distributed/pipeline/sync/worker.py__init__zTask.__init__2   s*     !"224    c                     t        | j                        5  t        j                  | j                        5  | j                         cd d d        cd d d        S # 1 sw Y   nxY wd d d        y # 1 sw Y   y xY wr!   )r   r   r$   set_grad_enabledr&   r"   )r'   s    r(   r   zTask.compute:   sJ    $e&<&<T=O=O&P==? 'Q&P$$&P&P$$$s"    A.A	A.A"	A..A7batchc                     | j                   y t        | j                        5  t        j                  | j
                        5  | j                  |       d d d        d d d        y # 1 sw Y   xY w# 1 sw Y   y xY wr!   )r#   r   r   r$   r,   r&   )r'   r-   s     r(   r   zTask.finalize>   sT    >>!$e&<&<T=O=O&PNN5! 'Q$$&P&P$$s#    A2A&A2&A/	+A22A;)__name__
__module____qualname____doc__r   r   r   r   r)   r   r    r*   r(   r   r   #   si    5$52:2u92E5QYZbdicjlpcpZqQr5	5# #"e " "r*   in_queue	out_queuedevicer   c                 j   t        |      5  	 | j                         }|n'	 |j                         }|j                  d||ff       :	 ddd       d}|j                  |       y# t        $ r8 t	        t
        t        j                               }|j                  d|f       Y w xY w# 1 sw Y   axY w)zMain loop of a worker thread.TNF)FN)	r   getr   	Exceptionr   ExcInfosysexc_infoput)r4   r5   r6   taskr-   r<   dones          r(   r   r   E   s    	F	<<>D| MM4$/0   
  DMM$  8uh/0 
	s-   B)A%B)%>B&#B)%B&&B))B2devicesc                 p   g }g }i }dt         j                  dt         j                  fd}| D ]5  } ||      }	 ||   \  }}|j                  |       |j                  |       7 ||fS # t        $ rC t               }t               }||f||<   t	        t
        |||fd      }|j                          Y sw xY w)z<Spawns worker threads. A worker thread is bound to a device.r6   r   c                    | j                   dk(  r?| j                  3t        j                  dt        j                  j                               S | j                   dk(  r!| j                  t        j                  d      S | S )Ncuda)indexcpu)typerD   r$   r6   rC   current_device)r6   s    r(   normalize_devicez(create_workers.<locals>.normalize_devicec   s^    ;;& V\\%9<<ejj.G.G.IJJ;;%FLL$<<<&&r*   T)targetargsdaemon)r$   r6   KeyErrorr   r   r   startappend)	r@   	in_queues
out_queuesworkersrH   r6   r4   r5   ts	            r(   r   r   [   s    !I!#J =?G %,,  !&)	")&/Hi 	")$   z""  	wHI'3GFOfHi+HQUWAGGI	s   A))A	B54B5c              #   @   K   	 t        |       \  }}||f y # w xY wwr!   )r   )r@   rO   rP   s      r(   r   r   ~   s+     "0"9J*%%s    ))r2   
contextlibr   queuer   r;   	threadingr   typesr   typingr   r   r	   r
   r   r   r   r   r   r   r$   
microbatchr   r   r   r   r   r   str__annotations__BaseExceptionr:   InQueueboolOutQueuer   r6   r   r   r   r3   r*   r(   <module>r`      sH   . %  
   e e e   : :Jc J ]#]MA
B HV$%GU4uVU]';Wd'J!KKLMHGH" "DW  5<< D ,!#D. !#E$w-h:W4X !#F 4- 9U4=RVW_R`C`=acgim=m3n  r*   