U
    bhM                  
   @   s   d Z ddlZddlZddlmZmZmZ ddlmZ ddl	m
Z
 ddlmZ ddlmZ ddlmZ dd	lmZ dd
lmZ ddlmZmZ ddlmZ ddlmZ ddlmZ dZdZ ee!Z"edddddddddgZ#dd Z$dd Z%G dd  d eZ&d!d" Z'd#d$ Z(e( d%d& Z)e(d'd(d)efgd*dd,d-Z*d.d/ Z+e(d0d1d2d3d4 Z,ej-j.fd5d6Z/ej0j1ej2j1fd7d8Z3e'd9d1d2dd:d;Z4e'd<d=d2dd>d?Z5dd@dAZ6e'd9dBe7fgdCdDdEdF Z8e'dGe7fdHe7fgdIdJdKdH Z9e'dGe7fdLe:fdMe:fgdNdJddOdPZ;e( dQdR Z<e' ddSdTZ=e' dUdV Z>e' dWdX Z?e' dYdZ Z@e(d+d[dd\d]ZAe(d^d_d`da ZBe( dbdc ZCe(dddedfdg ZDdhdi ZEe(djdedkdl ZFe(dmdeddndoZGe(dpdedqdr ZHe(dsdtdudvddwdxZIe(dyde7fdzeJfd{eJfgd|d}dddZKe( dd ZLe(deJfgddJdddZMe'deJfgddJdddZNe'deJfgddJdddZOe' dddZPe'deJfdeJfgddJdddZQe' dddZRe'de7fde7fde7fde7fgddJdddZSe'de7fgddJdd ZTe( dd ZUdS )z.Worker remote control command implementations.    N)UserDictdefaultdict
namedtuple)TERM_SIGNAME)	safe_repr)WorkerShutdown)EX_OK)signals)
maybe_list)
get_logger)jsonify	strtobool)rate   state)Request)Panel)exchangerouting_key
rate_limitcontroller_info_taliastypevisibledefault_timeouthelp	signatureargsvariadicc                 C   s   d| iS )Nok valuer!   r!   9/tmp/pip-unpacked-wheel-kcem4wq5/celery/worker/control.pyr       s    r    c                 C   s   d| iS )Nerrorr!   r"   r!   r!   r$   nok#   s    r&   c                
   @   s2   e Zd ZdZi Zi Zedd Zed
dd	ZdS )r   z+Global registry of remote control commands.c                 O   s    |r| j f || S | j f |S N)	_register)clsr   kwargsr!   r!   r$   register-   s    zPanel.registerNcontrolT      ?c
              
      s"    	f
dd}
|
S )Nc              	      s^   p| j }p$| jpd dd }| j|< t 	|j|<  rZ| j < | S )N 
r   )__name____doc__stripsplitdatar   meta)ZfunZcontrol_nameZ_help
r   r   r)   r   r   namer   r   r   r   r!   r$   _inner8   s     

      

zPanel._register.<locals>._innerr!   )r)   r7   r   r   r   r   r   r   r   r   r8   r!   r6   r$   r(   3   s    
zPanel._register)	NNr,   Tr-   NNNN)	r0   
__module____qualname__r1   r4   r5   classmethodr+   r(   r!   r!   r!   r$   r   '   s   
           r   c                  K   s   t jf ddi| S )Nr   r,   r   r+   r*   r!   r!   r$   control_commandE   s    r>   c                  K   s   t jf ddi| S )Nr   inspectr<   r=   r!   r!   r$   inspect_commandI   s    r@   c                 C   s   t | j S )z6Information about Celery installation for bug reports.)r    appZ	bugreportr   r!   r!   r$   reportO   s    rB   Z	dump_confz[include_defaults=False]with_defaults)r   r   r   Fc                 K   s   t | jjj|dttdS )zList configuration.)rC   )Z	keyfilterZunknown_type_filter)r   rA   conftable_wanted_config_keyr   )r   rC   r*   r!   r!   r$   rD   U   s    rD   c                 C   s   t | to| d S )N__)
isinstancestr
startswith)keyr!   r!   r$   rF   a   s    rF   idsz[id1 [id2 [... [idN]]]])r   r   c                 K   s   dd t t|D S )z!Query for task information by id.c                 S   s    i | ]}|j t|| fqS r!   )id_state_of_taskinfo).0reqr!   r!   r$   
<dictcomp>m   s    zquery_task.<locals>.<dictcomp>)_find_requests_by_idr
   )r   rL   r*   r!   r!   r$   
query_taskg   s    
rT   c              	   c   s2   | D ](}z||V  W q t k
r*   Y qX qd S r'   )KeyError)rL   get_requesttask_idr!   r!   r$   rS   s   s
    rS   c                 C   s   || rdS || rdS dS )Nactivereservedreadyr!   )requestZ	is_activeis_reservedr!   r!   r$   rN   |   s
    rN   rW   c                 K   sN   t t|pg d }}t| |||f|}t|tr>d|kr>|S td| dS )zRevoke task by task id (or list of ids).

    Keyword Arguments:
        terminate (bool): Also terminate the process if the task is active.
        signal (str): Name of signal to use for terminate (e.g., ``KILL``).
    Nr    ztasks z flagged as revoked)setr
   _revokerH   dictr    )r   rW   	terminatesignalr*   task_idsr!   r!   r$   revoke   s
    rc   headersz/[key1=value1 [key2=value2 [... [keyN=valueN]]]]c                 K   s.  t |p
t}t|tr&dd |D }| D ]2\}}ttj	|pFg tt| }|tj|< q.|svt
d| dS ttj}	tt}
|	D ]z}t|dr|jr| D ]\\}}||jkrt|}t|j| }t|t|@ }|r|
| | |j| jj|d qq|
st
d| dS t
d|
 dS )	a  Revoke task by header (or list of headers).

    Keyword Arguments:
        headers(dictionary): Dictionary that contains stamping scheme name as keys and stamps as values.
                             If headers is a list, it will be converted to a dictionary.
        terminate (bool): Also terminate the process if the task is active.
        signal (str): Name of signal to use for terminate (e.g., ``KILL``).
    Sample headers input:
        {'mtask_id': [id1, id2, id3]}
    c                 S   s&   i | ]}| d d | d d qS )=r   r   )r3   )rP   hr!   r!   r$   rR      s      z-revoke_by_stamped_headers.<locals>.<dictcomp>zheaders z' flagged as revoked, but not terminatedstampsra   z were not terminatedz revoked)_signalssignumr   rH   listitemsr
   worker_stateZrevoked_stampsgetr    active_requestsr   r]   hasattrrg   updater`   consumerpool)r   rd   r`   ra   r*   rj   headerrg   Zupdated_stampsro   Z#terminated_scheme_to_stamps_mappingrQ   Zexpected_header_keyZexpected_header_valueZactual_headerZmatching_stamps_for_requestr!   r!   r$   revoke_by_stamped_headers   s.    
 

ru   c           
      K   s   t |}t }tj| |rt|p(t}t|D ]L}|j	|kr4|
|j	 td|j	| |j| jj|d t ||kr4 qq4|stdS tdd|S d|}	td|	 |S )NzTerminating %s (%s)rh   zterminate: tasks unknownzterminate: {}z, zTasks flagged as revoked: %s)lenr]   rm   revokedrq   ri   rj   r   rS   rM   addloggerrO   r`   rr   rs   r    formatjoin)
r   rb   r`   ra   r*   sizeZ
terminatedrj   r[   Zidstrr!   r!   r$   r^      s$    

r^   ra   z <signal> [id1 [id2 [... [idN]]]])r   r   r   c                 K   s   t | |d|dS )z+Terminate task by task id (or list of ids).T)r`   ra   )rc   )r   ra   rW   r*   r!   r!   r$   r`      s    r`   	task_namer   z0<task_name> <rate_limit (e.g., 5/s | 5/m | 5/h)>)r   r   c              
   K   s   zt | W n4 tk
r@ } ztd| W Y S d}~X Y nX z|| jj| _W n, tk
r   tjd|dd td Y S X | j	
  |std| tdS td	|| td
S )zTell worker(s) to modify the rate limit for a task by type.

    See Also:
        :attr:`celery.app.task.Task.rate_limit`.

    Arguments:
        task_name (str): Type of task to set rate limit for.
        rate_limit (int, str): New rate limit.
    zInvalid rate limit string: Nz&Rate limit attempt for unknown task %sTexc_infounknown taskz)Rate limits disabled for tasks of type %sz rate limit disabled successfullyz(New rate limit for tasks of type %s: %s.znew rate limit set successfully)r   
ValueErrorr&   rA   tasksr   rU   ry   r%   rr   Zreset_rate_limitsrO   r    )r   r}   r   r*   excr!   r!   r$   r      s*    $ 
 softhardz#<task_name> <soft_secs> [hard_secs]c                 K   sb   z| j j| }W n, tk
r<   tjd|dd td Y S X ||_||_td||| t	dS )zTell worker(s) to modify the time limit for task by type.

    Arguments:
        task_name (str): Name of task to change.
        hard (float): Hard time limit.
        soft (float): Soft time limit.
    z-Change time limit attempt for unknown task %sTr~   r   z5New time limits for tasks of type %s: soft=%s hard=%sztime limits set successfully)
rA   r   rU   ry   r%   r&   Zsoft_time_limit
time_limitrO   r    )r   r}   r   r   r*   taskr!   r!   r$   r      s        r   c                 K   s   d| j jjiS )z Get current logical clock value.clock)rA   r   r#   r   r*   r!   r!   r$   r   >  s    r   c                 K   s   | j jr| j j||| dS )zHold election.

    Arguments:
        id (str): Unique election id.
        topic (str): Election topic.
        action (str): Action to take for elected actor.
    N)rr   Zgossipelection)r   rM   Ztopicactionr*   r!   r!   r$   r   D  s    	r   c                 C   s>   | j j}|jr6d|jkr6|jd td tdS tdS )z+Tell worker(s) to send task-related events.r   z)Events of group {task} enabled by remote.ztask events enabledztask events already enabled)rr   event_dispatchergroupsrx   ry   rO   r    r   
dispatcherr!   r!   r$   enable_eventsQ  s    
r   c                 C   s8   | j j}d|jkr0|jd td tdS tdS )z3Tell worker(s) to stop sending task-related events.r   z*Events of group {task} disabled by remote.ztask events disabledztask events already disabled)rr   r   r   discardry   rO   r    r   r!   r!   r$   disable_events\  s    

r   c                 C   s,   t d | jj}|jddditj dS )z3Tell worker(s) to send event heartbeat immediately.zHeartbeat requested by remote.worker-heartbeatfreq   N)r   )ry   debugrr   r   sendrm   ZSOFTWARE_INFOr   r!   r!   r$   	heartbeatg  s    
r   )r   c                 K   sJ   || j krFtd| |r&tj| tj  tjj| jj	
 dS dS )zRequest mingle sync-data.zsync with %s)rw   r   N)hostnamery   rO   rm   rw   rq   purge_datarA   r   Zforward)r   Z	from_noderw   r*   r!   r!   r$   helloq  s    


r   g?)r   c                 K   s   t dS )zPing worker(s).Zpong)r    r   r!   r!   r$   ping  s    r   c                 K   s   | j j S )z&Request worker statistics/information.)rr   
controllerstatsr   r!   r!   r$   r     s    r   Zdump_schedule)r   c                 K   s   t t| jjS )z0List of currently scheduled ETA/countdown tasks.)rk   _iter_schedule_requestsrr   timerr   r!   r!   r$   	scheduled  s    r   c              
   c   sn   | j jD ]`}z|jjd }W n ttfk
r8   Y qY qX t|tr|jrT|j	 nd |j
| dV  qd S )Nr   )etapriorityr[   )schedulequeueentryr   
IndexError	TypeErrorrH   r   r   	isoformatr   rO   )r   ZwaitingZarg0r!   r!   r$   r     s    

r   Zdump_reservedc                 K   s.   |  tj|  tj }|s g S dd |D S )zAList of currently reserved tasks, not including scheduled/active.c                 S   s   g | ]}|  qS r!   rO   rP   r[   r!   r!   r$   
<listcomp>  s     zreserved.<locals>.<listcomp>)tsetrm   reserved_requestsro   )r   r*   Zreserved_tasksr!   r!   r$   rY     s    

rY   Zdump_activec                    s    fdd|  tjD S )z'List of tasks currently being executed.c                    s   g | ]}|j  d qS )safer   r   r   r!   r$   r     s   zactive.<locals>.<listcomp>)r   rm   ro   )r   r   r*   r!   r   r$   rX     s    

rX   Zdump_revokedc                 K   s
   t tjS )zList of revoked task-ids.)rk   rm   rw   r   r!   r!   r$   rw     s    rw   Z
dump_taskstaskinfoitemsz[attr1 [attr2 [... [attrN]]]])r   r   r   c                    sJ   | j jpt|rndd D }fdd  fddt|D S )zList of registered tasks.

    Arguments:
        taskinfoitems (Sequence[str]): List of task attributes to include.
            Defaults to ``exchange,routing_key,rate_limit``.
        builtins (bool): Also include built-in tasks.
    c                 s   s   | ]}| d s|V  qdS )zcelery.N)rJ   rP   r   r!   r!   r$   	<genexpr>  s    
 zregistered.<locals>.<genexpr>c                    sB    fddD }|r<dd |  D }d jd|S  jS )Nc                    s.   i | ]&}t  |d d k	r|tt  |d qS r'   )getattrrI   )rP   fieldr   r!   r$   rR     s    z5registered.<locals>._extract_info.<locals>.<dictcomp>c                 S   s   g | ]}d  |qS )re   )r{   )rP   fr!   r!   r$   r     s     z5registered.<locals>._extract_info.<locals>.<listcomp>z{} [{}] )rl   rz   r7   r{   )r   fieldsrO   )r   r   r$   _extract_info  s    
z!registered.<locals>._extract_infoc                    s   g | ]} | qS r!   r!   r   )r   regr!   r$   r     s     zregistered.<locals>.<listcomp>)rA   r   DEFAULT_TASK_INFO_ITEMSsorted)r   r   builtinsr*   r   r!   )r   r   r   r$   
registered  s    
r   g      N@num	max_depthz.[object_type=Request] [num=200 [max_depth=10]])r   r   r      
   r   c              
      s   zddl }W n tk
r(   tdY nX td| tjddddF}||d|  |j | fd	d
|jd d|jiW  5 Q R  S Q R X dS )a  Create graph of uncollected objects (memory-leak debugging).

    Arguments:
        num (int): Max number of objects to graph.
        max_depth (int): Traverse at most n levels deep.
        type (str): Name of object to graph.  Default is ``"Request"``.
    r   NzRequires the objgraph libraryzDumping graph for type %rZcobjgz.pngF)prefixsuffixdeletec                    s   |  kS r'   r!   )vZobjectsr!   r$   <lambda>      zobjgraph.<locals>.<lambda>)r   Z	highlightfilenamer   )	objgraphImportErrorry   rO   tempfileNamedTemporaryFileZby_typeZshow_backrefsr7   )r   r   r   r   Z	_objgraphfhr!   r   r$   r     s$      
r   c                 K   s   ddl m} | S )z Sample current RSS memory usage.r   )
sample_mem)Zcelery.utils.debugr   )r   r*   r   r!   r!   r$   	memsample  s    r   samplesz[n_samples=10]c                 K   s(   ddl m} t }|j|d | S )z/Dump statistics of previous memsample requests.r   )r   )file)Zcelery.utilsr   ioStringIOmemdumpgetvalue)r   r   r*   r   outr!   r!   r$   r     s    r   nz[N=1]c                 K   s4   | j jjrtdS | j j| | j | tdS )z!Grow pool by n processes/threads.zJpool_grow is not supported with autoscale. Adjust autoscale range instead.zpool will grow)rr   r   
autoscalerr&   rs   Zgrow_update_prefetch_countr    r   r   r*   r!   r!   r$   	pool_grow  s
    
r   c                 K   s6   | j jjrtdS | j j| | j |  tdS )z#Shrink pool by n processes/threads.zLpool_shrink is not supported with autoscale. Adjust autoscale range instead.zpool will shrink)rr   r   r   r&   rs   shrinkr   r    r   r!   r!   r$   pool_shrink  s
    
r   c                 K   s2   | j jjr&| jjj|||d tdS tddS )zRestart execution pool.)reloaderzreload startedzPool restarts not enabledN)rA   rD   Zworker_pool_restartsrr   r   reloadr    r   )r   modulesr   r   r*   r!   r!   r$   pool_restart-  s    
r   maxminz[max [min]]c                 C   s>   | j jj}|r2|||\}}td| d| S tddS )zModify autoscale settings.zautoscale now max=z min=zAutoscale not enabledN)rr   r   r   rq   r    r   )r   r   r   r   Zmax_Zmin_r!   r!   r$   	autoscale7  s
    
r   Got shutdown from remotec                 K   s   t | ttdS )zShutdown worker(s).N)ry   warningr   r   )r   msgr*   r!   r!   r$   shutdownD  s    
r   r   r   exchange_typer   z'<queue> [exchange [type [routing_key]]]c                 K   s.   | j j| j j|||pd|f| td| S )z2Tell worker(s) to consume from task queue by name.directzadd consumer )rr   	call_soonZadd_task_queuer    )r   r   r   r   r   optionsr!   r!   r$   add_consumerM  s       r   z<queue>c                 K   s    | j | j j| td| S )z9Tell worker(s) to stop consuming from task queue by name.zno longer consuming from )rr   r   Zcancel_task_queuer    )r   r   _r!   r!   r$   cancel_consumer_  s
     r   c                 C   s    | j jrdd | j jjD S g S )z:List the task queues a worker is currently consuming from.c                 S   s   g | ]}t |jd dqS )T)Zrecurse)r_   Zas_dict)rP   r   r!   r!   r$   r   o  s   z!active_queues.<locals>.<listcomp>)rr   Ztask_consumerZqueuesr   r!   r!   r$   active_queuesk  s
    r   )F)FN)FN)FN)NNN)N)N)F)NF)r   r   r   )r   )r   )r   )NFN)NN)r   )NNN)Vr1   r   r   collectionsr   r   r   Zbilliard.commonr   Zkombu.utils.encodingr   Zcelery.exceptionsr   Zcelery.platformsr   r	   ri   Zcelery.utils.functionalr
   Zcelery.utils.logr   Zcelery.utils.serializationr   r   Zcelery.utils.timer   r.   r   rm   r[   r   __all__r   r0   ry   r   r    r&   r   r>   r@   rB   rD   rF   rT   requests__getitem__rS   ro   __contains__r   rN   rc   ru   r^   rI   r`   r   floatr   r   r   r   r   r   r   r   r   r   r   rY   rX   rw   r   intr   r   r   r   r   r   r   r   r   r   r   r!   r!   r!   r$   <module>   s>        

	



6


$





	








			  	
