U
    bh=                     @   sB  d Z ddlZddlZddlmZmZ ddlmZ ddlmZ ddl	m
Z
 ddlmZ ddlmZ dd	lmZ dd
lmZmZ ddlmZmZmZ ddlmZmZ ddlmZ ddlmZ ddlmZ ddl m!Z!m"Z" ddl#m$Z$ ddl%m&Z& ddl'm(Z( zddl)Z)W n e*k
r   dZ)Y nX dZ+dZ,dZ-dZ.G dd dZ/dS )a  WorkController can be used to instantiate in-process workers.

The command-line interface for the worker is in :mod:`celery.bin.worker`,
while the worker program is in :mod:`celery.apps.worker`.

The worker program is responsible for adding signal handlers,
setting up logging, etc.  This is a bare-bones worker without
global side-effects (i.e., except for the global state stored in
:mod:`celery.worker.state`).

The worker consists of several components, all managed by bootsteps
(mod:`celery.bootsteps`).
    N)datetimetimezone)sleep)	cpu_count)detect_environment)	bootsteps)concurrency)signals)RUN	TERMINATE)ImproperlyConfiguredTaskRevokedErrorWorkerTerminate)
EX_FAILUREcreate_pidlock)reload_from_cwd)mlevel)worker_logger)default_nodenameworker_direct)str_to_list)default_socket_timeout   state)WorkControllerg      @z
Trying to select queue subset of {0!r}, but queue {1} isn't
defined in the `task_queues` setting.

If you want to automatically declare unknown queues you can
enable the `task_create_missing_queues` setting.
ze
Trying to deselect queue subset of {0!r}, but queue {1} isn't
defined in the `task_queues` setting.
c                   @   sZ  e Zd ZdZdZdZdZdZdZdZ	G dd de
jZdJddZdKddZd	d
 Zdd Zdd Zdd Zdd Zdd Zdd ZdLddZdd Zdd Zdd Zdd  Zd!d" Zd#d$ Zd%d& Zd'd( Zd)d* ZdMd,d-ZdNd.d/Z dOd1d2Z!dPd3d4Z"dQd5d6Z#dRd7d8Z$d9d: Z%d;d< Z&d=d> Z'd?d@ Z(dAdB Z)e*dCdD Z+dSdFdGZ,dHdI Z-dS )Tr   zUnmanaged worker instance.Nc                   @   s&   e Zd ZdZdZddddddd	hZd
S )zWorkController.BlueprintzWorker bootstep blueprint.ZWorkerzcelery.worker.components:Hubzcelery.worker.components:Poolzcelery.worker.components:Beatzcelery.worker.components:Timerz celery.worker.components:StateDBz!celery.worker.components:Consumerz'celery.worker.autoscale:WorkerComponentN)__name__
__module____qualname____doc__nameZdefault_steps r!   r!   8/tmp/pip-unpacked-wheel-kcem4wq5/celery/worker/worker.py	BlueprintL   s   r#   c                 K   sl   |p| j | _ t|| _ttj| _| j j	  | j
f | | jf | | jf | | jf | jf | d S N)appr   hostnamer   nowr   utcstartup_timeloaderZinit_workeron_before_initsetup_defaultson_after_initsetup_instanceprepare_args)selfr%   r&   kwargsr!   r!   r"   __init__Z   s    
zWorkController.__init__c                 K   s   || _ | || | t| | jsNzt | _W n tk
rL   d| _Y nX t| j| _|pb| j	| _
| j | _|d kr|  n|| _|| _tjj| d t| j| _g | _|   | j| jjd | j| j| jd| _| jj| f| d S )N   ZsenderZworker)stepson_starton_close
on_stopped)pidfilesetup_queuessetup_includesr   r   r   NotImplementedErrorr   loglevelon_consumer_readyready_callbackr%   Zconnection_for_read	_conninfoshould_use_eventloopuse_eventloopoptionsr	   Zworker_initsend_concurrencyZget_implementationpool_clsr5   on_init_blueprintr#   r6   r7   r8   	blueprintapply)r0   queuesr?   r9   includerB   Zexclude_queuesr1   r!   r!   r"   r.   e   s4    
zWorkController.setup_instancec                 C   s   d S r$   r!   r0   r!   r!   r"   rG      s    z WorkController.on_init_blueprintc                 K   s   d S r$   r!   r0   r1   r!   r!   r"   r+      s    zWorkController.on_before_initc                 K   s   d S r$   r!   rM   r!   r!   r"   r-      s    zWorkController.on_after_initc                 C   s   | j rt| j | _d S r$   )r9   r   pidlockrL   r!   r!   r"   r6      s    zWorkController.on_startc                 C   s   d S r$   r!   )r0   consumerr!   r!   r"   r>      s    z WorkController.on_consumer_readyc                 C   s   | j j  d S r$   )r%   r*   Zshutdown_workerrL   r!   r!   r"   r7      s    zWorkController.on_closec                 C   s(   | j   | j  | jr$| j  d S r$   )ZtimerstoprO   shutdownrN   releaserL   r!   r!   r"   r8      s    

zWorkController.on_stoppedc              
   C   s   t |}t |}z| jjj| W n6 tk
rZ } ztt 	||W 5 d }~X Y nX z| jjj
| W n6 tk
r } ztt 	||W 5 d }~X Y nX | jjjr| jjjt| j d S r$   )r   r%   ZamqprJ   selectKeyErrorr   SELECT_UNKNOWN_QUEUEstripformatZdeselectDESELECT_UNKNOWN_QUEUEconfr   Z
select_addr&   )r0   rK   excludeexcr!   r!   r"   r:      s     
zWorkController.setup_queuesc                    sf   t  jjj}|r0|t |7 } fdd|D  | _dd  jj D }t t||B  jj_d S )Nc                    s   g | ]} j j|qS r!   )r%   r*   Zimport_task_module.0mrL   r!   r"   
<listcomp>   s     z1WorkController.setup_includes.<locals>.<listcomp>c                 S   s   h | ]}|j jqS r!   )	__class__r   )r]   Ztaskr!   r!   r"   	<setcomp>   s   z0WorkController.setup_includes.<locals>.<setcomp>)tupler%   rY   rK   Ztasksvaluesset)r0   Zincludesprevtask_modulesr!   rL   r"   r;      s    
zWorkController.setup_includesc                 K   s   |S r$   r!   rM   r!   r!   r"   r/      s    zWorkController.prepare_argsc                 C   s   t jj| d d S )Nr4   )r	   Zworker_shutdownrD   rL   r!   r!   r"   _send_worker_shutdown   s    z$WorkController._send_worker_shutdownc              
   C   s   z| j |  W n tk
r,   |   Y n tk
rh } z tjd|dd | jtd W 5 d }~X Y nN t	k
r } z| j|j
d W 5 d }~X Y n  tk
r   | jtd Y nX d S )NzUnrecoverable error: %rT)exc_info)exitcode)rH   startr   	terminate	ExceptionloggercriticalrP   r   
SystemExitcodeKeyboardInterrupt)r0   r[   r!   r!   r"   rj      s    zWorkController.startc                 C   s   | j j| d|fdd d S )Nregister_with_event_loopzhub.register)argsdescription)rH   Zsend_all)r0   Zhubr!   r!   r"   rr      s      z'WorkController.register_with_event_loopc                 C   s   |  | j|S r$   )Z_quick_acquire_process_taskr0   reqr!   r!   r"   _process_task_sem   s    z WorkController._process_task_semc                 C   sL   z| | j W n6 tk
rF   z|   W n tk
r@   Y nX Y nX dS )z2Process task by sending it to the pool of workers.N)Zexecute_using_poolpoolr   Z_quick_releaseAttributeErrorrv   r!   r!   r"   ru      s    zWorkController._process_taskc                 C   s(   z| j   W n tk
r"   Y nX d S r$   )rO   closerz   rL   r!   r!   r"   signal_consumer_close   s    z$WorkController.signal_consumer_closec                 C   s    t  dko| jjjjo| jj S )Ndefault)r   r@   	transportZ
implementsZasynchronousr%   Z
IS_WINDOWSrL   r!   r!   r"   rA      s
    

z#WorkController.should_use_eventloopFc                 C   sF   |dk	r|| _ | jjtkr:|   |r.| jjr:| jdd |   dS )z7Graceful shutdown of the worker server (Warm shutdown).NTwarm)	ri   rH   r   r
   r|   ry   signal_safe	_shutdownrg   )r0   in_sighandlerri   r!   r!   r"   rP      s    zWorkController.stopc                 C   s0   | j jtkr,|   |r | jjr,| jdd dS )z>Not so graceful shutdown of the worker server (Cold shutdown).Fr   N)rH   r   r   r|   ry   r   r   )r0   r   r!   r!   r"   rk      s    zWorkController.terminateTc              	   C   s>   | j d k	r:tt" | j j| | d | j   W 5 Q R X d S )N)rk   )rH   r   SHUTDOWN_SOCKET_TIMEOUTrP   join)r0   r   r!   r!   r"   r     s    

zWorkController._shutdownc                 C   sV   t | j|||d | jr.| j  | j  z| j  W n tk
rP   Y nX d S )N)force_reloadreloader)list_reload_modulesrO   Zupdate_strategiesZreset_rate_limitsry   Zrestartr<   )r0   modulesreloadr   r!   r!   r"   r     s      

zWorkController.reloadc                    s.    fddt |d kr jjjn|p&dD S )Nc                 3   s   | ]}j |f V  qd S r$   )_maybe_reload_moduler\   r1   r0   r!   r"   	<genexpr>  s   z1WorkController._reload_modules.<locals>.<genexpr>r!   )rd   r%   r*   rf   )r0   r   r1   r!   r   r"   r     s    zWorkController._reload_modulesc                 C   sH   |t jkr$td| | jj|S |rDtd| tt j| |S d S )Nzimporting module %szreloading module %s)sysr   rm   debugr%   r*   Zimport_from_cwdr   )r0   moduler   r   r!   r!   r"   r      s    
z#WorkController._maybe_reload_modulec                 C   s8   t tj| j }| jjt t	| j
jt| dS )N)totalpidclockuptime)r   r'   r   r(   r)   r   Ztotal_countosgetpidstrr%   r   roundtotal_seconds)r0   r   r!   r!   r"   info(  s    

zWorkController.infoc                 C   sb   t d krtdt t j}|j|j|j|j|j|j	|j
|j|j|j|j|j|j|j|j|jdS )Nz%rusage not supported by this platform)utimeZstimeZmaxrssZixrssZidrssZisrssZminfltZmajfltZnswapZinblockZoublockZmsgsndZmsgrcvZnsignalsZnvcswZnivcsw)resourcer<   Z	getrusageZRUSAGE_SELFZru_utimeZru_stimeZ	ru_maxrssZru_ixrssZru_idrssZru_isrssZ	ru_minfltZ	ru_majfltZru_nswapZ
ru_inblockZ
ru_oublockZ	ru_msgsndZ	ru_msgrcvZru_nsignalsZru_nvcswZ	ru_nivcsw)r0   sr!   r!   r"   rusage/  s(    zWorkController.rusagec                 C   sb   |   }|| j |  || jj | j z|  |d< W n tk
r\   d|d< Y nX |S )Nr   zN/A)r   updaterH   rO   r   r<   )r0   r   r!   r!   r"   statsF  s    zWorkController.statsc                 C   s   dj | | jr| j nddS )z``repr(worker)``.z#<Worker: {self.hostname} ({state})>ZINIT)r0   r   )rW   rH   Zhuman_staterL   r!   r!   r"   __repr__P  s    zWorkController.__repr__c                 C   s   | j S )z#``str(worker) == worker.hostname``.)r&   rL   r!   r!   r"   __str__W  s    zWorkController.__str__c                 C   s   t S r$   r   rL   r!   r!   r"   r   [  s    zWorkController.stateWARNc                 K   s  | j j}|| _|| _|d|| _|d|| _|d||| _|d|| _|d|| _|d|| _	|pd|| _
|d|	| _|d|
| _|d	|| _|d
||| _|d|| _|d||| _|d||| _|d||| _|d|| _|d|| _t|d|| _|d|| _|d|| _d S )NZworker_concurrencyZworker_send_task_eventsZworker_poolZworker_consumerZworker_timerZworker_timer_precisionZworker_autoscalerZworker_pool_putlocksZworker_pool_restartsZworker_state_dbZbeat_schedule_filenameZbeat_schedulertask_time_limittask_soft_time_limitZworker_max_tasks_per_childZworker_max_memory_per_childZworker_prefetch_multiplierZworker_disable_rate_limitsworker_lost_wait)r%   eitherr=   logfiler   task_eventsrF   consumer_cls	timer_clstimer_precisionoptimizationautoscaler_clspool_putlockspool_restartsstatedbschedule_filename	scheduler
time_limitsoft_time_limitmax_tasks_per_childmax_memory_per_childintprefetch_multiplierdisable_rate_limitsr   )r0   r   r=   r   r   ry   r   r   r   r   r   r   r   Or   r   r   r   rF   Zstate_dbr   r   Zscheduler_clsr   r   r   r   r   r   Z_kwr   r!   r!   r"   r,   _  sb     
         zWorkController.setup_defaultsc                 C   sV   | j }ttj}|jjrd}|jjdkrR|rRd|jj d}t| t	|jj dS )a  Wait :setting:`worker_soft_shutdown_timeout` if soft shutdown is enabled.

        To enable soft shutdown, set the :setting:`worker_soft_shutdown_timeout` in the
        configuration. Soft shutdown can be used to allow the worker to finish processing
        few more tasks before initiating a cold shutdown. This mechanism allows the worker
        to finish short tasks that are already in progress and requeue long-running tasks
        to be picked up by another worker.

        .. warning::
            If there are no tasks in the worker, the worker will not wait for the
            soft shutdown timeout even if it is set as it makes no sense to wait for
            the timeout when there are no tasks to process.
        Tr   z)Initiating Soft Shutdown, terminating in z secondsN)
r%   rb   r   Zactive_requestsrY   Z#worker_enable_soft_shutdown_on_idleZworker_soft_shutdown_timeoutrm   warningr   )r0   r%   requestslogr!   r!   r"   wait_for_soft_shutdown  s    

z%WorkController.wait_for_soft_shutdown)NN)NNNNNN)N)FN)F)T)NFN)N)FN)Nr   NNNNNNNNNNNNNNNNNNNNNNNNNN).r   r   r   r   r%   rN   rH   ry   Z	semaphoreri   r   r#   r2   r.   rG   r+   r-   r6   r>   r7   r8   r:   r;   r/   rg   rj   rr   rx   ru   r|   rA   rP   rk   r   r   r   r   r   r   r   r   r   propertyr   r,   r   r!   r!   r!   r"   r   ?   s   
      
(










                               
=r   )0r   r   r   r   r   timer   Zbilliardr   Zkombu.utils.compatr   Zceleryr   r   rE   r	   Zcelery.bootstepsr
   r   Zcelery.exceptionsr   r   r   Zcelery.platformsr   r   Zcelery.utils.importsr   Zcelery.utils.logr   r   rm   Zcelery.utils.nodenamesr   r   Zcelery.utils.textr   Zcelery.utils.threadsr    r   r   ImportError__all__r   rU   rX   r   r!   r!   r!   r"   <module>   s8   
