
    Ph,                        U d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
Z
d dlZd dlZd dlmZ d dlmZ d dlmZ d dlmZ d dlmZmZmZ d dlmZ d dlmZmZmZmZ d d	l m!Z! d dl"Z"d dl#Z"d dl$Z"d dl%m&Z' d dl(m)Z) d d
l*m+Z+m,Z,m-Z-m.Z.m/Z/m0Z0m1Z1m2Z2m3Z3 d dl4m5Z5m6Z6m7Z7  ejp                  ejr                          ejt                  e;      Z< G d de      Z=i d e=dd      d e=dd      d e=dd      d e=dd      d e=dd      d e=dd       d! e=d"d#      d$ e=d%d&      d' e=d(d)      d* e=d+d,      d- e=d.d/      d0 e=d1d2      d3 e=d4d5      d6 e=d7d8      d9 e=d:d;      d< e=d=d>      d? e=d@dA      Z>e G dB dC             Z?dD Z@dE ZAdF ZBdG ZCdH ZDdI ZEdJ ZFdK ZGdL ZHdM ZIdN ZJdO ZKdP ZLdQ ZMdR ZNdS ZOdT ZPe.dUdVdW edXY      dWdZdZfd[       ZQe2rd\ZRn eS ej                  d]d^            ZRd_d`iZUe1rdaeUdb<   ddcZVddeSfdeZWedf        ZXddgeSdheSdieSfdjZYdheSdkeZfdlZ[da\eej                     e^dm<   ddneeZ   dddfdoZ_ddpZ`dqZa G dr dse3      ZbdacddedfdtZedu ZfdeReafdvZg G dw dxe3      Zh G dy dze)j                        Zj G d{ d|e)j                        Zkedd}       Zl G d~ de"j                  j                  jf                        Zo G d deb      Zpy)    N)contextmanager)	dataclass)	timedelta)Enum)partialreducewraps)StringIO)Dict
NamedTupleOptionalUnion)patch)	FILE_SCHEMAfind_free_portIS_SANDCASTLEretry_on_connect_failuresskip_but_pass_in_sandcastleskip_but_pass_in_sandcastle_ifTEST_WITH_ROCMTEST_WITH_TSANTestCase)_install_threaded_pg_uninstall_threaded_pgProcessLocalGroup)levelc                   "    e Zd ZU eed<   eed<   y)TestSkip	exit_codemessageN)__name__
__module____qualname__int__annotations__str     uC:\Users\daisl\Desktop\realtime-object-detection\venv\Lib\site-packages\torch/testing/_internal/common_distributed.pyr   r   1   s    NLr(   r   backend_unavailableH   z5Skipped because distributed backend is not available.small_worldsizeI   z Skipped due to small world size.odd_worldsizeW   zSkipped due to odd world size.no_cudaJ   zCUDA is not available.zmulti-gpu-1K   zNeed at least 1 CUDA devicezmulti-gpu-2M   zNeed at least 2 CUDA deviceszmulti-gpu-3P   zNeed at least 3 CUDA deviceszmulti-gpu-4Q   zNeed at least 4 CUDA deviceszmulti-gpu-5R   zNeed at least 5 CUDA deviceszmulti-gpu-6S   zNeed at least 6 CUDA deviceszmulti-gpu-7T   zNeed at least 7 CUDA deviceszmulti-gpu-8U   zNeed at least 8 CUDA devicesncclL   z#c10d not compiled with NCCL support
skipIfRocmN   zTest skipped for ROCmno_peer_accessO   z'Test skipped because no GPU peer accessgenericV   zHTest skipped at subprocess level, look at subprocess log for skip reasonimporterrorX   z"Test skipped due to missing importc                       e Zd Zi Zh ded<    e       ed<   ddhed<   ddhed<   i Zh ded	<   h ded
<   h ded<   h ded<    e       ed<   y)DistTestCases>   mpiuccr:   allgather_coalescedr   r:   rG   zsendrecv anysourcezcpu barrier>   rG   gloor:   gpucudaddpsubgrouppluginN)r!   r"   r#   skip_collectivesetbackend_featurer'   r(   r)   rE   rE   O   sx     O-CO)* #OH-3UOO()&,e_OM" O4OE5OF4OE"9OJ #OHr(   rE   c                 .     t                fd       }|S )zSkips if the world size exceeds the number of GPUs, ensuring that if the
    test is run, each rank has its own GPU via ``torch.cuda.device(rank)``.c                  h   t         j                  j                         s&t        j                  t
        d   j                         t        t        j                  d         }t         j                  j                         |k  r)t        j                  t
        d|    j                          | i |S )Nr0   
WORLD_SIZE
multi-gpu-)torchrK   is_availablesysexit
TEST_SKIPSr   r$   osenvirondevice_count)argskwargs
world_sizefuncs      r)   wrapperzskip_if_no_gpu.<locals>.wrappere   s    zz&&(HHZ	*445L12
::""$z1HHZ*ZL 9:DDET$V$$r(   r	   ra   rb   s   ` r)   skip_if_no_gpure   a   s"     4[% % Nr(   c                 .     t                fd       }|S )Nc                      t         j                  d   dk7  rEt        t         j                  d         dk  r&t        j                  t
        d   j                          | i |S )NBACKENDrF   rT      r,   r[   r\   r$   rX   rY   rZ   r   r^   r_   ra   s     r)   rb   z(skip_if_small_worldsize.<locals>.wrappers   sR    JJy!U*BJJ|4L0MQR0RHHZ 12<<=T$V$$r(   rc   rd   s   ` r)   skip_if_small_worldsizerl   r        
4[% % Nr(   c                 .     t                fd       }|S )Nc                      t         j                  d   dk7  rHt        t         j                  d         dz  dk(  r&t        j                  t
        d   j                          | i |S )Nrh   rF   rT   ri      r.   rj   rk   s     r)   rb   z&skip_if_odd_worldsize.<locals>.wrapper~   sW    JJy!U*BJJ|4L0MPQ0QUV0VHHZ0::;T$V$$r(   rc   rd   s   ` r)   skip_if_odd_worldsizerq   }   rm   r(   c                       fd}|S )Nc                 4     t                fd       }|S )Nc                      dk(  rKt         j                  j                         k  r*t        j                  t
        d    j                         y  | i |S Nr:   rU   )rV   rK   r]   rX   rY   rZ   r   )r^   r_   backendra   ns     r)   rb   zCrequire_n_gpus_for_nccl_backend.<locals>.decorator.<locals>.wrapper   sM    & UZZ%<%<%>%Bj$45??@T,V,,r(   rc   )ra   rb   rv   rw   s   ` r)   	decoratorz2require_n_gpus_for_nccl_backend.<locals>.decorator   s     	t	- 
	- r(   r'   )rw   rv   rx   s   `` r)   require_n_gpus_for_nccl_backendry      s     r(   c                      d } | S )Nc                 .     t                fd       }|S )Nc                      	 ddl m}m}  | i |S # t        $ r) t	        j
                  t        d   j                         Y y w xY w)Nr   )AutoModelForMaskedLM
BertConfigrB   )transformersr}   r~   ImportErrorrX   rY   rZ   r   )r^   r_   r}   r~   ra   s       r)   rb   z?import_transformers_or_skip.<locals>.decorator.<locals>.wrapper   sE    >
 T,V,, >M2<<=>s    /AArc   rd   s   ` r)   rx   z.import_transformers_or_skip.<locals>.decorator   s     	t		> 
		> r(   r'   )rx   s    r)   import_transformers_or_skipr      s     r(   c                       fd}|S )Nc                 2     t                fd       }|S )Nc                      t         j                  j                         r)t         j                  j                         k\  r | i |S t	        j
                  t        d    j                         y )NrU   rV   rK   rW   r]   rX   rY   rZ   r   )r^   r_   ra   xs     r)   rb   z4skip_if_lt_x_gpu.<locals>.decorator.<locals>.wrapper   sU    zz&&(UZZ-D-D-F!-KT,V,,HHZ*QC 01;;<r(   rc   )ra   rb   r   s   ` r)   rx   z#skip_if_lt_x_gpu.<locals>.decorator   s     	t	= 
	=
 r(   r'   )r   rx   s   ` r)   skip_if_lt_x_gpur      s     r(   c                       fd}|S )Nc                 4     t                fd       }|S )Nc                      dk7  r | i |S t         j                  j                         r)t         j                  j                         k\  r | i |S t	        j
                  t        d    j                         y ru   r   )r^   r_   rv   ra   r   s     r)   rb   z9nccl_skip_if_lt_x_gpu.<locals>.decorator.<locals>.wrapper   sm    & T,V,,zz&&(UZZ-D-D-F!-KT,V,,HHZ*QC 01;;<r(   rc   )ra   rb   rv   r   s   ` r)   rx   z(nccl_skip_if_lt_x_gpu.<locals>.decorator   s     	t	= 
	= r(   r'   )rv   r   rx   s   `` r)   nccl_skip_if_lt_x_gpur      s    	 r(   c                     | j                         }d|v sJ d|v sJ d|v sJ |d   }|j                  d      dk(  r|n|j                  d      d   }||v sJ d| d|        y )	N	iteration	has_errorerrorz
Exception raised from r   zDid not find expected z in ddp logging data error: )_get_ddp_logging_datafindsplit)	model_DDP
err_substrddp_logging_datalogging_erractuals        r)   verify_ddp_error_loggedr      s     668********&&&&"7+K ??56"< 	89!<  	+R	x'CK=QRr(   c                 .     t                fd       }|S )aJ  
    Convenience decorator to set/unset TORCH_NCCL_BLOCKING_WAIT flag. Note that use of
    this decorator will override the setting of TORCH_NCCL_ASYNC_ERROR_HANDLING for
    the particular test. After the test, both TORCH_NCCL_BLOCKING_WAIT and
    TORCH_NCCL_ASYNC_ERROR_HANDLING will be restored to their original values.
    c                     	 t         j                  d   }t         j                  d= 	 t         j                  d   }dt         j                  d<   	  | i |}|||t         j                  d<   ||t         j                  d<   S S # t        $ r d }Y jw xY w# t        $ r d }Y gw xY w# dt         j                  d<   w xY w# ||t         j                  d<   ||t         j                  d<   w w xY w)NTORCH_NCCL_ASYNC_ERROR_HANDLINGTORCH_NCCL_BLOCKING_WAIT1)r[   r\   KeyError)r^   r_    cached_nccl_async_error_handlingcached_nccl_blocking_waitretra   s        r)   rb   z(with_nccl_blocking_wait.<locals>.wrapper   s   	4AC1B, 

<=	9:<***;% 69BJJ12	S''C 0; 5 

5 )49R

56 51  	4/3,	4  	-(,%	- 69BJJ12 0; 5 

5 )49R

56 5s@   $B B 	B> BBB# B& "B##B& &B;>-C+rc   rd   s   ` r)   with_nccl_blocking_waitr      s%     4[ S  SD Nr(   c                       fd}|S )zK
    Runs a test for each distributed debug level specified in levels.
    c                 2     t                fd       }|S )Nc                     t         j                  j                  dd       }D ][  }|t         j                  d<   t        j                           | i |}t        j
                          |I|t         j                  d<   ] S )NTORCH_DISTRIBUTED_DEBUG)r[   r\   getc10dset_debug_level_from_envbarrier)r^   r_   	old_levelr   r   ra   levelss        r)   rb   z:with_dist_debug_levels.<locals>.decorator.<locals>.wrapper  sq    

'@$GI8=

45--/D+F+(<EBJJ89   Jr(   rc   )ra   rb   r   s   ` r)   rx   z)with_dist_debug_levels.<locals>.decorator  s     	t	 
	 r(   r'   )r   rx   s   ` r)   with_dist_debug_levelsr     s    
$ r(   c                  @    t        t        j                          d      S )Nz+c10d was not compiled with the Gloo backend)r   r   is_gloo_availabler'   r(   r)   requires_gloor      !    )""$$5 r(   c                    t        j                         st        d      S t        t        j
                  j                  j                         | k  dj                  | t        j
                  j                  j                         |            S )N+c10d was not compiled with the NCCL backendzIRequires NCCL version greater than or equal to: {}, found: {}, reason: {})	r   is_nccl_availabler   r   rV   rK   r:   versionformat)r   msgs     r)   requires_nccl_versionr   '  sl    !!#*9
 	
 .JJOO##%/W^^002C
 	
r(   c                  @    t        t        j                          d      S )Nr   )r   r   r   r'   r(   r)   requires_ncclr   5  r   r(   c                  @    t        t        j                          d      S )Nz*c10d was not compiled with the UCC backend)r   r   is_ucc_availabler'   r(   r)   requires_uccr   ;  !    )!!##4 r(   c                  @    t        t        j                          d      S )Nz*c10d was not compiled with the MPI backend)r   r   is_mpi_availabler'   r(   r)   requires_mpir   A  r   r(   c                 <     d _         t                fd       }|S )zSkips a test for ROCmTc                  n    t         s | i |S t        j                  t        d   j                         y )Nr<   )r   rX   rY   rZ   r   rk   s     r)   rb   zskip_if_rocm.<locals>.wrapperL  s-    (((L)334r(   )skip_if_rocmr	   rd   s   ` r)   r   r   H  s(    D
4[5 5
 Nr(   c                  <    t        t        j                  dk(  d      S )Nwin32z8This unit test case is not supported on Windows platform)r   rX   platformr'   r(   r)   skip_if_win32r   U  s    )B r(   	localhostrp   T   )minutesFc                     t               }|rEt        |t        d      z        }t        j                  j
                  j                  | ||||      S t        j                  | |||||      S )zL
    Creates a TCP store. Retries if the chosen port is already in use.
    rp   )milliseconds)wait_for_workers	use_libuv)r   r$   r   rV   classes	dist_c10dTCPStorer   )	addrr`   	is_mastertimeoutr   	jit_classr   porttimeout_milliseconds	            r)   create_tcp_storer   \  so     D!'I1,E"EF}}&&//$
I/B
 	
 }}$
I@P\e
 	
r(   i  !DISTRIBUTED_TESTS_DEFAULT_TIMEOUT300test_ddp_uneven_inputsi     test_join_kwargsc                     t         j                  dk(  s|  t        j                  j	                  d      S t        j                  j	                  |       S )Nr   z	127.0.0.1)hostname	interface)rX   r   r   ProcessGroupGloocreate_devicer   s    r)   r   r     sG    
||w)"3$$22K2HH$$22Y2GGr(   returnc                 Z    t         j                  | j                  d      d   t              S N.r   )TIMEOUT_OVERRIDEr   r   TIMEOUT_DEFAULT)test_ids    r)   get_timeoutr     s#    c 22 6HHr(   c               #   N  K   t               t               }} t        j                  t        j                  }}	 | |ct        _        t        _        t        j                  t        j                  f ||ct        _        t        _        y # ||ct        _        t        _        w xY wwN)r
   rX   stdoutstderr)new_outnew_errold_outold_errs       r)   captured_outputr     sl     z8:WGzz3::WG2!('
CJjj#**$$!('
CJ'
CJs   5B%9B	 1B%	B""B%rankr`   
num_inputsc                    ddt         dt         dt         dt         fd}dt         fd}t        |d      t        |d	      t        |d
      t        |d      t        |d	      t        |d
      fD cg c]N  }t        |      D cg c]  } ||| z  |z   ||z         c}t        |      D cg c]  } ||||z         c}fP c}}S c c}w c c}w c c}}w )z
    Generate a number of basic test cases for sparse reduction.
    These cover tensors with a varying number of sparse dimensions and a varying
    number of dense dimensions. The only reduction operation we support is sum.
    rp   r   r`   sparse_dims
dense_dimsc           	         t        j                  t        j                  | dz         d| dz   f      }|gt        |      D cg c]  }d c}z   }t        |dz
        D ]A  }t        j                  |t        j
                  d| dz         f      }|j                  |       C t        j                  | dz   gt        |      D cg c]  }d c}z         }t        j                  |||      S c c}w c c}w )Nrp   ri   )	rV   reshapearangerangecatzerosappendonessparse_coo_tensor)r   r`   r   r   indices_shapevaluess           r)   generatez,simple_sparse_reduce_tests.<locals>.generate  s     --TAX 6D1HF5+<=+<a+<=={Q'Aii%++a*B CDGLL$ ( TAXJU:5F)G5F!5F)GGH&&w>>  > *Hs   	C+	C0
c           
      b    t        d t        |      D cg c]  } | ||       c}      S c c}w )Nc                     | |z   S r   r'   )abs     r)   <lambda>zAsimple_sparse_reduce_tests.<locals>.compute_sum.<locals>.<lambda>  s    Qr(   )r   r   )fnr`   r   s      r)   compute_sumz/simple_sparse_reduce_tests.<locals>.compute_sum  s4    %
BS TBS$D*!5BS T
 	
 Ts   ,
)r   ri      )r   )rp   r   )r$   r   r   )r   r`   r   r	  r  r  is          r)   simple_sparse_reduce_testsr    s   
?s 
? 
?# 
?s 
?
C 
 H!,H!,H!,H+H+H+

B	 z**A :$q(*z*AB* @EZ?PQ?P![Z*45?PQ	

  Rs$   5CC C/CC
Crv   c           
          t         j                  j                         }t        |      }d}| |kD  r|| z  }t        |       D ci c]  }|t	        |||z  |dz   |z          }}|S c c}w )zMultigpu tests are designed to simulate the multi nodes with multi
    GPUs on each node. Nccl backend requires equal #GPUs in each process.
    On a single node, all visible GPUs are evenly
    divided to subsets, each process only uses a subset.
    rp   )rV   rK   r]   r   list)r`   rv   nGPUsvisible_devicesnGPUs_per_processr  rank_to_GPUs          r)   init_multigpu_helperr    s     JJ##%EElO E!Z/ z""A 	
4$5 5QBS8STUU"   	s   A&tmp_dirinit_methodc                    t        j                         at        j                  t        j
                  d<   t	        j                  t        j                  j                  t        j                  d             t	        j                  t        j                  j                  t        j                  d             t        j                  j                  t        j                  d      }t	        j                  |       | | t        j
                  d<   y t        t        j                  j                  |d      z   t        j
                  d<   y )NTEMP_DIRr   test_dirinit_dirINIT_METHODshared_init_file)
tempfileTemporaryDirectoryr  namer[   r\   mkdirpathjoinr   )r  init_dir_paths     r)   initialize_temp_directoriesr*    s    ))+G$\\BJJzHHRWW\\',,	23HHRWW\\',,
34GGLLz:MHH]$/

=!$/"'',,-3
 %


=!r(   c                  :    t         t         j                          y y r   )r  cleanupr'   r(   r)   cleanup_temp_dirr-    s     r(      c            	       "    e Zd ZdZdZdefdZedefd       Z	d Z
ddeddf fd	Zd fd
Zd fdZdefdZddZddZ G d de      Zedefd       Zededededdfd       ZdeddfdZddZddZddZddZedefd       Z xZS )MultiProcessTestCaser   
   r   c                      y)NFr'   selfs    r)   _should_stop_test_suitez,MultiProcessTestCase._should_stop_test_suite
  s    r(   c                     t         S r   DEFAULT_WORLD_SIZEr3  s    r)   r`   zMultiProcessTestCase.world_size      !!r(   c                 V    t              fd       }t        j                  ||       S )Nc                 j    | j                   | j                  k(  r| j                         y          y r   )r   MAIN_PROCESS_RANK_join_processesr4  r  s    r)   rb   z1MultiProcessTestCase.join_or_run.<locals>.wrapper  s(    yyD222$$R(r(   r	   types
MethodTyper4  r  rb   s    ` r)   join_or_runz MultiProcessTestCase.join_or_run  .    	r	 
	 ..r(   method_nameNc                 t    t         |   |       t        | |      }t        | || j	                  |             y r   super__init__getattrsetattrrC  )r4  rE  r  	__class__s      r)   rI  zMultiProcessTestCase.__init__  s3    %T;'k4#3#3B#78r(   c                     t         |           g | _        g | _        | j                  | _        t        j                  d      j                  | _	        i | _
        y )NF)delete)rH  setUpskip_return_code_checks	processesr<  r   r#  NamedTemporaryFiler%  	file_namepid_to_piper4  rL  s    r)   rO  zMultiProcessTestCase.setUp$  sH    ')$**	!44EBGGr(   c                 r    t         |           | j                  D ]  }|j                           g | _        y r   )rH  tearDownrQ  	terminate)r4  prL  s     r)   rW  zMultiProcessTestCase.tearDown-  s.    AKKM   r(   c                 F    | j                         j                  d      d   S r   idr   r3  s    r)   _current_test_namez'MultiProcessTestCase._current_test_name7  s    wwys#B''r(   c           	         g | _         t        t        | j                              D ]  }t        j
                  j                         \  }} || j                  j                  dt        |      z   || j                         | j                  |f      }|j                          t        j                  d||j                         || j                   |j                  <   | j                   j#                  |        y )Nzprocess )targetr%  r^   zStarted process %s with pid %s)rQ  r   r$   r`   rV   multiprocessingPiperL  _runr&   r]  rS  startloggerinfopidrT  r  )r4  procr   parent_conn
child_connprocesss         r)   _start_processesz%MultiProcessTestCase._start_processes;  s    #doo./D&+&;&;&@&@&B#K~~**#d)+D335t~~zRG
 MMOKK8$L,7DW[[)NN!!'* 0r(   c                 x    t         j                  j                  d      j                  }| j	                  |       y )Nspawn)rV   r`  get_contextProcessrk  )r4  rg  s     r)   _spawn_processesz%MultiProcessTestCase._spawn_processesI  s,    $$009AAd#r(   c                       e Zd ZdZy)MultiProcessTestCase.Eventrp   N)r!   r"   r#   GET_TRACEBACKr'   r(   r)   Eventrr  M  s    r(   rt  r   c                    t         j                  d|       	 t        j                  j	                  | |g      }| |v r| j
                  rt         j                  d|       y | j                         }t         j                  d||       |t        j                  j                  k(  rt        j                  d      5 }t        j                  |       |j                          |j                  d       | j!                  |j#                                t         j                  d|       d d d        ||v ry # 1 sw Y   xY w)Nz*Starting event listener thread for rank %sz:Pipe closed for process %s, stopping event listener threadzReceived event %s on process %szr+)moder   zProcess %s sent traceback)rd  re  r`  
connectionwaitclosedrecvr0  rt  rs  r#  rR  faulthandlerdump_tracebackflushseeksendread)parent_pipesignal_piper   ready_pipeseventtmp_files         r)   _event_listenerz$MultiProcessTestCase._event_listenerP  s   @$G)4499;:TUKk)%%KKTVZ #((*=udK066DDD!44$?8$33H= ( a(#((9$?F @ k)5   @?s   :A,D55D>	test_namerS  c                 T     | |      }||_         ||_        |j                  ||       y r   r   rS  run_testclsr   r  rS  r  r4  s         r)   rb  zMultiProcessTestCase._runo  s'    9~	"i-r(   c           	         t         j                  j                  d      \  }}t        j                  t
        j                  ||| j                  fd      }|j                          t        j                  dk7  r2t        j                  dk7  rt         j                  j                  d       dt        j                  d<   	  t        | |              ||j;                  d        |J |j=                          |j?                          y # t         j"                  $ r[}t$        j'                  d	| j                  |t)        |             t        j*                  t,        d
   j.                         Y d }~d }~wt0        $ r}t$        j3                  dt5        j6                         | j                  t
        j8                         |j;                  t5        j6                                t        j*                  t
        j8                         Y d }~;d }~ww xY w# ||j;                  d        |J |j=                          |j?                          w xY w)NF)duplexT)r_  r^   daemonr   darwinr   TORCH_SHOW_CPP_STACKTRACESz4Process %s skipping test %s for following reason: %sr@   z;Caught exception: 
%s exiting process %s with exit code: %s) rV   r`  ra  	threadingThreadr0  r  r   rc  rX   r   _C'_set_print_stack_traces_on_fatal_signalr[   r\   rJ  unittestSkipTestrd  re  r&   rY   rZ   r   	Exceptionr   	traceback
format_excTEST_ERROR_EXIT_CODEr  r(  close)r4  r  r  signal_recv_pipesignal_send_pipeevent_listener_threadsees           r)   r  zMultiProcessTestCase.run_testv  s   -2-B-B-G-Gu-G-U** ) 0 0'77/;!

 	##%<<7"s||x'? HH<<TB36

/0	 $GD)$&   + %%d+(444!&&(+    	6KKF		S\^abd^e HHZ	*4455 	@LL0$$&		3G3\3\ Y1134HH)>>??	@  + %%d+(444!&&(s>    D	 	HAE2-H 2H>BHH HH 9Ic                    g }t        | j                        D ]h  \  }}|j                  | j                  |j                     }	 |j                  t        j                  j                         |j                  ||f       j |D ]x  \  }}	 |j                  d      rK|j                  rt        j                  d|       ;|j!                         }t        j                  d||       nt        j                  d|       z y # t        $ r"}t        j                  d||       Y d }~d }~ww xY w# t        $ r!}t        j                  d||       Y d }~d }~ww xY w)NzBEncountered error while trying to get traceback for process %s: %sr   z5Pipe closed for process %s, cannot retrieve tracebackz)Process %s timed out with traceback: 

%sz6Could not retrieve traceback for timed out process: %s)	enumeraterQ  exitcoderT  rf  r  r0  rt  rs  r  ConnectionErrorrd  r   pollry  re  rz  )r4  pipesr  rj  piper  r   r  s           r)   _get_timedout_process_tracebackz4MultiProcessTestCase._get_timedout_process_traceback  s4   #DNN3JAw'''4II288FFGLL!T+ 4  JD$99Q<{{SUY ! $		ILLEtY LLPRV   ' LL\^_ab 0 # XZ^`a s6   <D3D/ >D/	D,
D''D,/	E8EEc                    t        | j                               }t        j                         }d}	 	 t        | j                        D ]w  \  }}|j
                  t        j                  k(  s$t        d| d|j
                   d       t        j                  j                         }|D ]  }|j                           d} n |rnt        d | j                  D              rntt        j                         |z
  }	|	|kD  rA| j                          t        d| d       | j                  D ]  }|j                           nt        j                  d	       #t        j                         |z
  }
|| j                   v r| j#                  |
       n| j%                  |
       | j&                  j)                         D ]  }|j+                           y # | j&                  j)                         D ]  }|j+                           w xY w)
NFTProcess z terminated with exit code z", terminating remaining processes.c              3   8   K   | ]  }|j                   d u  y wr   )r  ).0rY  s     r)   	<genexpr>z7MultiProcessTestCase._join_processes.<locals>.<genexpr>  s     F~!qzz-~s   zTiming out after z" seconds and killing subprocesses.g?)r   r\  timer  rQ  r  r0  r  printrV   r`  active_childrenrX  allr  sleeprP  _check_no_test_errors_check_return_codesrT  r  r  )r4  r  r   
start_timesubprocess_errorr  rY  r  acelapsedelapsed_timer  s               r)   r=  z$MultiProcessTestCase._join_processes  s   dggi(YY[
 *	'7FQ zz%9%N%NN&qc)DQZZLPrs +0*?*?*O*O*Q"1BLLN #2+/( 8 $Ft~~FF))+
2W$88:+G94VW "^^ ,

3= @  99;3LT111**<8((6 ((//1

 2((//1

 2s   9G +D2G 1G>c                     t        | j                        D ]I  \  }}|j                  t        d| d| d      | j	                  | j
                  |j                         K y)zV
        Checks that we didn't have any errors thrown in the child processes.
        Nr  z timed out after  seconds)r  rQ  r  RuntimeErrorassertNotEqualr  )r4  r  r  rY  s       r)   r  z*MultiProcessTestCase._check_no_test_errors  sa     dnn-DAqzz!"qc!2<.I   9 91::F .r(   c           
      V   | j                   st        j                  d       y| j                   d   }t        | j                         D cg c]&  \  }}|j                  t
        j                  k(  r||f( }}}|rbd}|D ]P  \  }}| j                  |j                     j                         }|dj                  |t
        j                  |      z  }R t        |      t        | j                         D ]p  \  }}|j                  t        d| d| d      | j                  |j                  |j                  d	j                  ||j                  |j                        
       r t        j                         D ]q  }	|j                  |	j                  k(  st         r1t        j#                  d| j%                         |	j&                          yt)        j*                  |	j&                         | j                  |j                  dd|j                   d|j                   
       yc c}}w )z
        Checks that the return codes of all spawned processes match, and skips
        tests if they returned a return code indicating a skipping condition.
        z<Note: no subprocesses were spawned, test was likely skipped.Nr    z7Process {} exited with error code {} and exception:
{}
r   terminated or timed out after r  zJExpect process {} exit code to match Process 0 exit code of {}, but got {})r   6Skipping %s on sandcastle for the following reason: %sz Expected zero exit code but got z
 for pid: )rQ  rd  warningr  r  r0  r  rT  rf  rz  r   r  assertEqualrZ   r  r   r   re  r\  r    r  r  )
r4  r  first_processr  rY  errored_processesr   rj  error_messageskips
             r)   r  z(MultiProcessTestCase._check_return_codes  s    ~~NNYZq) "$..1
11zz1FFF F1 	 

 E/
7 $ 0 0 = B B DOVV/DDm 0 u%% dnn-DAqzz!"qc!@hW  

&&`gg}--qzz   . %%'D%%7 
 KKPRVRYRYR[]a]i]i "++DLL99 ( 	""2=3I3I2J*UbUfUfTgh 	 	
Y
s   
+H%c                      | j                   dk(  S )Nr   r   r3  s    r)   r   zMultiProcessTestCase.is_masterD  s    yyA~r(   runTestr   N)r!   r"   r#   r<  r  boolr5  propertyr$   r`   rC  r&   rI  rO  rW  r]  rk  rp  r   rt  staticmethodr  classmethodrb  r  r  r=  r  r  r   __classcell__rL  s   @r)   r0  r0    s   
   "C " "/9C 9 9
(C (+$    < . . . .T . .) # ) t ) V#J.`	GA
F 4  r(   r0  c                      t         t         S 	 t        j                  g dd      j                  dk(  a t         S # t        $ r
 da Y t         S w xY w)a   
    If shell command `fi_info -p efa -t FI_EP_RDM` returns exit code 0 then we assume that the machine has
    Libfabric EFA interfaces and EFA software components installed,
    see https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/efa-start.html.
    )fi_infoz-pefaz-t	FI_EP_RDMF)checkr   )EFA_PROBE_RESULT
subprocessrun
returncodeFileNotFoundErrorr'   r(   r)   has_efar  M  sZ     #!NNFeT__cdd 	
   ! !s   &: AAc                  "    t               rddgS dS )a  
    If the machine has Libfabric EFA interfaces and EFA software components installed it may cause
    'RuntimeError: In operator() at tensorpipe/common/ibv.h:172 "": Operation not supported' if tensorpipe
    uses InfiniBand transport, so we exclude it from tensorpipe transports,
    see https://github.com/pytorch/pytorch/issues/73885 and https://github.com/pytorch/pytorch/issues/65022
    shmuvN)r  r'   r(   r)   tp_transportsr  `  s     $IE4=/4/r(   c                 d      t        t        |      S d t                fd       }|S )z+
    Wrapper to use with a test method
    )r   r`   c                      t               t        j                         }fd fd}g }t               D ]=  }t	        j
                  |||f      }|j                          |j                  |       ? |S )Nc                  >     t         j                  j                  k(  S r   r   distributed_c10d_worldworlds   r)   world_is_validzaspawn_threads_and_init_comms.<locals>._run_test_method_with_multi_threads.<locals>.world_is_validz      D118888r(   c                 ~   t        j                  d| |       	                  rt        j                          y y # t        $ rR}t        j                  j                  | t        j                         f       t        j                  |       Y d }~sd }~ww xY w#         rt        j                          w w xY w)Nthreadedrv   r   r`   store)r   init_process_groupBaseExceptionMultiThreadedTestCaseexception_queueputrX   exc_infor   exception_handledestroy_process_group)r   world_pgr  excallbackr  r`   s       r)   workerzYspawn_threads_and_init_comms.<locals>._run_test_method_with_multi_threads.<locals>.worker}  s    ##"*E1
 "#..0 $ ! 7%55994:PQ!222667
 "#..0 $s*   A   	B	ABB BB B<r_  r^   )r   r   	HashStorer   r  r  rc  r  )	r`   r  global_storer  threadsr   tr  r  s	   ``     @@r)   #_run_test_method_with_multi_threadszIspawn_threads_and_init_comms.<locals>._run_test_method_with_multi_threadsv  sl    $&~~'	9	1 *%D  dE<5PQAGGINN1 &
 r(   c                 V       fd      }t         j                  |       y )Nc                       g i S r   r'   )r^   ra   r_   r4  s   r)   r  z?spawn_threads_and_init_comms.<locals>.wrapper.<locals>.<lambda>  s    $tJeVZJe^dJer(   )r  _join_threads)r4  r^   r_   r   r  ra   r`   s   ``` r)   rb   z-spawn_threads_and_init_comms.<locals>.wrapper  s"     6jBef++GT:r(   )r   spawn_threads_and_init_commsr	   )ra   r   r`   rb   r  s   ` ` @r)   r  r  j  sB     |('j
 	

< 4[; ; Nr(   c                        e Zd ZdZ ej
                         ZdZd Zdde	ddf fdZ
d Zd	 Zd fd
Z fdZd Zed        Zd Zed        Zed        Zedefd       Zede	fd       ZddddZddddZ xZS )r  a5  
    Test runner that runs all tests with the in-proc process group using
    multiple threads with the threaded process group.

    Each test spawns world_size threads and run the test method in each thread.

    Difference from regular MultiProcess test runner:
    Must explicitly defines SetUp and call self._spawn_threads() to run the tests.
    Cannot use setUp / tearDown (must use perThreadSetup / perThreadShutdown)
        to set up / tear down each thread when running each test.
    No global state possible
        How bad of a limitation is this?
    r   c                 V    t              fd       }t        j                  ||       S )Nc                     | j                   | j                  k(  r| j                  | j                         y          y r   )r   MAIN_THREAD_RANKr  r   r>  s    r)   rb   z2MultiThreadedTestCase.join_or_run.<locals>.wrapper  s.    yyD111""4<<4r(   r?  rB  s    ` r)   rC  z!MultiThreadedTestCase.join_or_run  rD  r(   rE  r   Nc                 v    t         |   |       t        | |d       }t        | || j	                  |             y r   rG  )r4  rE  test_fnrL  s      r)   rI  zMultiThreadedTestCase.__init__  s5    %$T2k4#3#3G#<=r(   c                      y r   r'   r3  s    r)   perThreadSetUpz$MultiThreadedTestCase.perThreadSetUp  s    r(   c                      y r   r'   r3  s    r)   perThreadTearDownz'MultiThreadedTestCase.perThreadTearDown  s    r(   c                 x    t         |           | j                  | _        g | _        dt
        j                  d<   y)z
        setUp only set up things in the main thread, if you want to configure things
        in the spawned threads, use perThreadSetUp
        r   r  N)rH  rO  r
  r   r   r[   r\   rU  s    r)   rO  zMultiThreadedTestCase.setUp  s1    
 	))	36

/0r(   c                 0    t         |           g | _        y)z
        tearDown only set up things in the main thread, if you want to configure things
        in the spawned threads, use perThreadTearDown
        N)rH  rW  r   rU  s    r)   rW  zMultiThreadedTestCase.tearDown  s    
 	r(   c                    | j                   }t               t        j                         | j                  _        fd} |       st        d      t        | j                        D ]e  }t        j                  | j                  j                  ||| j                  f      }|j                          | j                  j                  |       g y)zk
        class method to spawn threads and run test, use this method in the SetUp of your TestCase
        c                  >     t         j                  j                  k(  S r   r  r  s   r)   r  z<MultiThreadedTestCase._spawn_threads.<locals>.world_is_valid  r  r(   zInvalid worldr  N)r]  r   r   r  rL  r  r  r   r`   r  r  rb  rc  r   r  )r4  r  r  r   r  r  s        @r)   _spawn_threadsz$MultiThreadedTestCase._spawn_threads  s     ++	$&&*nn&6#	9 //$//*D  (;(;9dTXTcTcBdeAGGILL" +r(   c                     | |      }||_         t        |d      rWt        j                         |_        t
        j                  |j                  _        t
        j                  |j                  _	        |j                  |||       y )N_tls)r   hasattrr  localr  r   
_precision	precision_rel_tolrel_tolrun_test_with_threaded_pg)r  r  r   r`   r4  s        r)   rb  zMultiThreadedTestCase._run  sb    9~	 4 !)DI"*"5"5DII ( 1 1DII&&y$
Cr(   c                    t        j                  d||| j                  j                         | j	                          	  t        | |              t        j                          | j                          y# t        $ rN}| j                  j                  |t        j                         f       t        j                  |       Y d}~wd}~ww xY w# t        j                          | j                          w xY w)zd
        Run the current test associated with `test_name` using the threaded process group.
        r  r  N)r   r  rL  r  r  rJ  r  r  r  rX   r  r   r  r  r  )r4  r  r   r`   r  s        r)   r  z/MultiThreadedTestCase.run_test_with_threaded_pg  s     	TjHcHc	
 		%$GD)$&
 &&(""$  	3  $$dCLLN%;<..r22	3 &&(""$s*   A5 5	C>ACC CC &C5c           
      >   t         }	 t        |      D ]f  \  }}|j                  t        d|             |j	                         s2t
        j                  j                  |t        t        d| d      d ff       h t        j                          g }| j                  j                         sF| j                  j                         }|j                  |       | j                  j                         sFt                | j                  |||       y # t                w xY w)Nr   zRank failed to join in under r  )r   r  r(  maxis_aliver  r  r  TimeoutErrorr   resetemptyr   r  r   r  )r  r   r  r   idxthreadfailed_ranksfailures           r)   r  z#MultiThreadedTestCase._join_threads  s    !	%(1VC7O,??$)99== , ,&CG9H$U!" !%	  2 ##%L))//1--113##G, ))//1 #$gr: #$s   <D B,D Dc           	         d}d}|D ]1  \  }}|d   }t        |t        j                        r;t        j	                  d||t        |             |dk  sMt        d   j                  }at        |t              r)d| d| d	}	t        j                  |	       t        |	      t        |t              rEdj                  t        j                  |       }	t        j                  d
|	|       |d| d|	 dz  }t        |t              st!        |j"                        t$        k(  s|dk  s&|j"                  }4 t'        |      dkD  rt        |      |dkD  rqt        j)                         D ]Y  }
||
j                  k(  st*        r#t        j	                  d||
j,                          y t        j                  |
j,                         y y )Nr  r   rp   z3Thread %s skipping test %s for following reason: %sr   r@   zThread r  z	 seconds
z'Caught exception: 
%s exiting thread %sz exited with exception:

r  )
isinstancer  r  rd  re  r&   rZ   r   r#  r   r  r  r(  r  format_exception
SystemExittypecoder$   lenr  r   r    )r  r(  r   r  	error_msg	skip_coder   r  excr   r  s              r)   r  z)MultiThreadedTestCase._check_return_codes-  s    		*ND(1+C#x001I4QSUXY\U] q= *9 5 ? ?IC.v%DWIZXS!"3''C+ggi88(CD>T dV#<SED	 C,>S(Y] #I- +2 y>Ay))q="))+.$TVXZ^ZfZf &//== , r(   c                     t         S r   r7  r3  s    r)   r`   z MultiThreadedTestCase.world_size^  r9  r(   c                 F    | j                         j                  d      d   S r   r[  r3  s    r)   r]  z(MultiThreadedTestCase._current_test_nameb  s     wwys#B''r(   r   r  c                J    | j                   |k(  r| j                  |||       yy)z
        The reason why we have this util function instead of
        self.assertEqual is all threads are sharing one CPU RNG
        so the assertion result is only reliable on rank 0
        N)r   r  r4  r   yr   r   s        r)   assertEqualOnRankz'MultiThreadedTestCase.assertEqualOnRankg  s'     99Q3' r(   c                H    | j                   |k(  r| j                  ||       y y r   )r   r  r8  s        r)   assertNotEqualOnRankz*MultiThreadedTestCase.assertNotEqualOnRankp  s#    991% r(   r  r  r   )r!   r"   r#   __doc__queueQueuer  r
  rC  r&   rI  r  r  rO  rW  r  r  rb  r  r  r  r  r$   r`   r]  r:  r<  r  r  s   @r)   r  r    s     "ekkmO/>C > >
	7#( D D%& ; ;8 .> .>` "C " " (C ( (( (&1 & &r(   r  c                        e Zd Zdeej
                  ej                  f   deddf fdZ	dej                  dej                  fdZ
 xZS )SaveForwardInputsModuleforward_inputscast_forward_inputsr   Nc                 t    t         |           t        j                  dd      | _        || _        || _        y )Nd   )rH  rI  nnLinearlrB  rC  r4  rB  rC  rL  s      r)   rI  z SaveForwardInputsModule.__init__v  s2    
 	3$,#6 r(   r   c                     || j                   | <   | j                  | j                  r3|j                  | j                  j                  j
                              S |      S r   )rB  rH  rC  toweightdtyper4  r   s     r)   forwardzSaveForwardInputsModule.forward  sI    $%D!vv43K3Kadd466==../SSQRSSr(   r!   r"   r#   r   rF  ModulerV   Tensorr  rI  rO  r  r  s   @r)   rA  rA  u  sT    7RYY457 "7 
	7T T%,, Tr(   rA  c                        e Zd Zdeej
                  ej                  f   deddf fdZ	dej                  dej                  fdZ
 xZS )SaveForwardInputsModelrB  rC  r   Nc                 t    t         |           t        ||      | _        t        ||      | _        || _        y r   )rH  rI  rA  c1c2rB  rI  s      r)   rI  zSaveForwardInputsModel.__init__  s6    
 	).:MN).:MN,r(   r   c                 `    || j                   | <   | j                  | j                  |            S r   )rB  rW  rV  rN  s     r)   rO  zSaveForwardInputsModel.forward  s)    $%D!wwtwwqz""r(   rP  r  s   @r)   rT  rT    sQ    -RYY45- "- 
	-# #%,, #r(   rT  c              #     K   t         j                  j                  |        dt        j                  d<   dt        j                  d<   |rt        j                  d| |       t         j                  j                          t         j                  j                  j                  j                          	 d  t         j                  j                          t         j                  j                  j                  j                          |rt        j                          y y # t         j                  j                          t         j                  j                  j                  j                          |rt        j                          w w xY ww)Nr   MASTER_ADDR6789MASTER_PORTr:   r   r`   )rV   rK   
set_devicer[   r\   r   r  _dynamor$  utilscountersclearr  )r   r`   init_pgs      r)   _dynamo_dist_per_rank_initrd    s     
JJ$ +BJJ} &BJJ}TjI	MM	MM  &&()$$**,&&(  	$$**,&&( s    B0F3D 7A(FA)FFc                   @     e Zd ZdZe fd       Ze fd       Z xZS )#DynamoDistributedSingleProcTestCasez
    Test harness for single-process dynamo distributed tests,
    initializes dist process group.

    Prefer this for simple tests, as it's easier to debug.
    c                 `   t         |           | j                  j                  t	        j
                  t        j                  ddd             d| _        d| j                   | _	        d| j                  v rd n| j                  g| _
        t        j                  d| j                  d	       y )
Nr   12355)rZ  r\  r   zcuda:rK   r:   rp   r]  )rH  
setUpClass_exit_stackenter_contextr   dictr[   r\   r   device
device_idsr   r  r  rL  s    r)   ri  z.DynamoDistributedSingleProcTestCase.setUpClass  s    %%JJ

#.#*	
 SXXJ'
!'3::!5CHH:SXX!Dr(   c                 J    t        j                          t        |           y r   )r   r  rH  tearDownClassro  s    r)   rq  z1DynamoDistributedSingleProcTestCase.tearDownClass  s    ""$r(   )r!   r"   r#   r=  r  ri  rq  r  r  s   @r)   rf  rf    s2     E E"    r(   rf  c            	       d     e Zd ZdZ fdZ fdZedefd       Ze	dede
de
dd	fd
       Z xZS )"DynamoDistributedMultiProcTestCasea   
    Use this for tests that actually run on multiple GPUs.

    Decorate tests with @skip_if_lt_x_gpu(ngpu)

    Note: MultiProcTestCase spawns processes per test and is slow.
    Prefer MultiThreadedTestCase for most tests. Perhaps use this one
    sparingly for integration tests.
    c                 B    t         |           | j                          y r   )rH  rO  rp  rU  s    r)   rO  z(DynamoDistributedMultiProcTestCase.setUp  s    r(   c                     t         |           	 t        j                  | j                         y # t
        $ r Y y w xY wr   )rH  rW  r[   removerS  OSErrorrU  s    r)   rW  z+DynamoDistributedMultiProcTestCase.tearDown  s5    	IIdnn% 		s   1 	==r   c                 >    t         j                  j                         S r   )rV   rK   r]   r3  s    r)   r`   z-DynamoDistributedMultiProcTestCase.world_size  s    zz&&((r(   r   r  rS  Nc                 T     | |      }||_         ||_        |j                  ||       y r   r  r  s         r)   rb  z'DynamoDistributedMultiProcTestCase._run  s)     9~	"i-r(   )r!   r"   r#   r=  rO  rW  r  r$   r`   r  r&   rb  r  r  s   @r)   rs  rs    s]      )C ) ) . . . .T . .r(   rs  r   )rp   r  )T)qr{  loggingr`  r[   r>  r  rX   r#  r  r  r  r@  r  
contextlibr   dataclassesr   datetimer   enumr   	functoolsr   r   r	   ior
   typingr   r   r   r   unittest.mockr   rV   torch._dynamo.test_casetorch.cuda.nccltorch.distributeddistributedr   torch.nnrF  $torch.testing._internal.common_utilsr   r   r   r   r   r   r   r   r   5torch.testing._internal.distributed.multi_threaded_pgr   r   r   basicConfigINFO	getLoggerr!   rd  r   rZ   rE   re   rl   rq   ry   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r$   getenvr   r   r   r   r  r&   r  r  r$  r%   r*  r-  r8  r0  r  r  r  r  r  r  rQ  rA  rT  rd  r_  	test_caserf  rs  r'   r(   r)   <module>r     s      	   
       % !   , ,  4 4       
 
 
    ',, '			8	$z 
8
C x$FG	
 Xb"BC x45 8B => 8B >? 8B >? 8B >? 8B >? 8B >? 8B >? 8B >? HR>?  (267!" hr#LM#$ x
V%* 8B DE+
2 & & &""&
R$+\4

 	a 
 
0 O)"))$GOPO,c2  +.'(HIC I 2 2(S (c (s (XS 3 * 26(--	. 5
Xc] 
d 
"  E8 ER
   &0 
3E1hT&H T&nTbii T #RYY # ) )& %--*A*A*J*J  @.)= .r(   