U
    bh
                     @  sx   d Z ddlmZ ddlmZmZ ddlmZ ddlm	Z	 ddl
mZ ddlmZ d	Ze	eZejZG d
d dejZdS )zWorker Task Consumer Bootstep.    )annotations)QoSignore_errors)	bootsteps)
get_logger)detect_quorum_queues   )Mingle)Tasksc                      sT   e Zd ZdZefZ fddZdd Zdd Zdd	 Z	d
d Z
ddddZ  ZS )r
   z,Bootstep starting the task message consumer.c                   s    d  |_ |_t j|f| d S )N)task_consumerqossuper__init__)selfckwargs	__class__ @/tmp/pip-unpacked-wheel-kcem4wq5/celery/worker/consumer/tasks.pyr      s    zTasks.__init__c                   s^       |   jjd j  jjj j j	d _
 fdd}t| j _dS )zStart task consumer.r   )on_decode_errorc                   s    j j| dS )N)prefetch_countZapply_global)r   r   )r   r   
qos_globalr   r   set_prefetch_count,   s    z'Tasks.start.<locals>.set_prefetch_countN)Zupdate_strategiesr   
connectionZdefault_channelZ	basic_qosZinitial_prefetch_countappZamqpZTaskConsumerr   r   r   r   )r   r   r   r   r   r   start   s    
   zTasks.startc                 C  s    |j rtd t||j j dS )zStop task consumer.zCanceling task consumer...N)r   debugr   cancelr   r   r   r   r   stop3   s    z
Tasks.stopc                 C  s0   |j r,| | td t||j j d|_ dS )zShutdown task consumer.zClosing consumer channel...N)r   r!   r   r   closer    r   r   r   shutdown9   s
    
zTasks.shutdownc                 C  s   d|j r|j jndiS )zReturn task consumer info.r   zN/A)r   valuer    r   r   r   infoA   s    z
Tasks.infobool)returnc                 C  s@   |j j }|jjjr<t|j|j jj\}}|r<d}t	d |S )zDetermine if global QoS should be applied.

        Additional information:
            https://www.rabbitmq.com/docs/consumer-prefetch
            https://www.rabbitmq.com/docs/quorum-queues#global-qos
        Fz5Global QoS is disabled. Prefetch count in now static.)
r   Zqos_semantics_matches_specr   confZworker_detect_quorum_queuesr   	transportZdriver_typeloggerr%   )r   r   r   Zusing_quorum_queuesqnamer   r   r   r   E   s    



zTasks.qos_global)__name__
__module____qualname____doc__r	   requiresr   r   r!   r#   r%   r   __classcell__r   r   r   r   r
      s   r
   N)r/   
__future__r   Zkombu.commonr   r   Zceleryr   Zcelery.utils.logr   Zcelery.utils.quorum_queuesr   Zmingler	   __all__r,   r*   r   ZStartStopStepr
   r   r   r   r   <module>   s   