U
    bh1                     @   s`  d Z ddlZddlZddlZddlmZ ddlmZmZm	Z	m
Z
 ddlZddlmZmZ ddlmZmZ ddlmZ ddlmZ ejd	d
Zedi dZedddhdZeddhdZG dd dejZeddeddddfddZeddededdfeee e
e ef e	e  ee!e"eej d	ddZ#eddedfddZ$ee
e ef e dddd Z%dS )!z'Embedded workers for integration tests.    N)contextmanager)AnyIterableOptionalUnion)Celeryworker)_set_task_join_will_blockallow_join_result)Signal)anon_nodenameWORKER_LOGLEVELerrortest_worker_starting)nameZproviding_argstest_worker_startedr   consumertest_worker_stoppedc                       sX   e Zd ZdZdZdZ fddZG dd dejj	Z	 fdd	Z
d
d Zdd Z  ZS )TestWorkControllerz3Worker that can synchronize on being fully started.FNc                    s   t  | _t j|| | jjdd dkrddlm	} | | _
t | _zddlm} |  W n tk
rx   Y nX tj| j
t | _| j  d S )N.Zpreforkr   )Queue)pickling_support)	threadingEvent_on_startedsuper__init__pool_cls
__module__splitZbilliardr   logger_queueosgetpidpidZtblibr   installImportErrorlogginghandlersQueueListener	getLoggerZqueue_listenerstart)selfargskwargsr   r   	__class__ A/tmp/pip-unpacked-wheel-kcem4wq5/celery/contrib/testing/worker.pyr   '   s    

zTestWorkController.__init__c                   @   s   e Zd Zdd Zdd ZdS )zTestWorkController.QueueHandlerc                 C   s
   d|_ |S )NT)
from_queuer,   recordr1   r1   r2   prepare>   s    z'TestWorkController.QueueHandler.preparec                 C   s   t jr d S )N)r'   raiseExceptionsr4   r1   r1   r2   handleErrorC   s    z+TestWorkController.QueueHandler.handleErrorN)__name__r   __qualname__r6   r8   r1   r1   r1   r2   QueueHandler=   s   r;   c                    s@    j r6  j }| fdd t }|| t  S )Nc                    s   | j  jkot| dd S )Nr3   F)processr$   getattr)rr,   r1   r2   <lambda>J       z*TestWorkController.start.<locals>.<lambda>)r!   r;   	addFilterr'   r*   
addHandlerr   r+   )r,   handlerloggerr/   r?   r2   r+   G   s    
zTestWorkController.startc                 C   s    | j   tj| j| |d dS )z=Callback called when the Consumer blueprint is fully started.)senderr   r   N)r   setr   sendapp)r,   r   r1   r1   r2   on_consumer_readyO   s    
  z$TestWorkController.on_consumer_readyc                 C   s   | j   dS )zWait for worker to be fully up and running.

        Warning:
            Worker must be started within a thread for this to work,
            or it will block forever.
        N)r   waitr?   r1   r1   r2   ensure_startedV   s    z!TestWorkController.ensure_started)r9   r   r:   __doc__Z__test__r!   r   r'   r(   r;   r+   rJ   rL   __classcell__r1   r1   r/   r2   r      s   
r      ZsoloTg      $@c              
   k   s   t j| d d}	znt| f||||||d|F}	|rlddlm}
 t  |
 j|ddksbt	W 5 Q R X |	V  W 5 Q R X W 5 tj| |	d X dS )	z[Start embedded worker.

    Yields:
        celery.app.worker.Worker: worker instance.
    )rF   N)rF   r   )concurrencypoolloglevellogfileperform_ping_checkshutdown_timeoutrO   )ping)timeoutZpong)
r   rH   r   _start_worker_threadtasksrV   r
   delaygetAssertionError)rI   rP   rQ   rR   rS   rT   Zping_task_timeoutrU   r.   r   rV   r1   r1   r2   start_workera   s(    "r]   )	rI   rP   rQ   rR   rS   WorkControllerrT   rU   returnc                 k   s   t | || |rd| jkst| jtjdd}	|	jj W 5 Q R X |f | ||	dt
 |||d|	ddddd
|}
tj|
jdd	}|  |
  td
 z
|
V  W 5 ddlm} d|_|| | rtdd|_X dS )zaStart Celery worker in a thread.

    Yields:
        celery.worker.Worker: worker instance.
    zcelery.pingZTEST_BROKER)hostnamer`   Nwithout_heartbeatT)
rI   rP   r`   rQ   rR   rS   Zready_callbackra   Zwithout_mingleZwithout_gossip)targetdaemonFr   )statezWorker thread failed to exit within the allocated timeout. Consider raising `shutdown_timeout` if your tasks take longer to execute.)setup_app_for_workerrY   r\   
connectionr"   environr[   Zdefault_channelZqueue_declarepopr   r   Threadr+   rL   r	   Zcelery.workerrd   Zshould_terminatejoinis_aliveRuntimeError)rI   rP   rQ   rR   rS   r^   rT   rU   r.   connr   trd   r1   r1   r2   rX      sB    


rX   c           	      k   sH   ddl m}m} |   ||dg}|  z
dV  W 5 |  X dS )zfStart worker in separate process.

    Yields:
        celery.app.worker.Worker: worker instance.
    r   )ClusterNodeztestworker1@%hN)Zcelery.apps.multiro   rp   set_currentr+   Zstopwait)	rI   rP   rQ   rR   rS   r.   ro   rp   Zclusterr1   r1   r2   _start_worker_process   s    
rr   )rI   rR   rS   r_   c                 C   s8   |    |   |   dt| j_| jj||d dS )z9Setup the app to be used for starting an embedded worker.F)rR   rS   N)finalizerq   set_defaulttypelog_setupsetup)rI   rR   rS   r1   r1   r2   re      s
    re   )&rM   r'   r"   r   
contextlibr   typingr   r   r   r   Zcelery.worker.consumerZceleryr   r   Zcelery.resultr	   r
   Zcelery.utils.dispatchr   Zcelery.utils.nodenamesr   rg   r[   r   r   r   r   r^   r   r]   intstrboolfloatrX   rr   re   r1   r1   r1   r2   <module>   st   C'
7