U
    bh9                     @  s"  d Z ddlmZ ddlZddlZddlmZmZ ddlm	Z	 ddl
m
Z
 ddlmZ ddlmZ d	d
lmZmZmZmZ d	dlmZ d	dlmZmZ d	dlmZ d	dlmZ d	dlmZ d	dlmZm Z  d	dl!m"Z" d	dl#m$Z$ dZ%dZ&dZ'ee(Z)e)j*e)j+ Z*Z+G dd dZ,G dd dZ-dS )zGeneric process mailbox.    )annotationsN)defaultdictdeque)contextmanager)copy)count)time   )ConsumerExchangeProducerQueue)LamportClock)maybe_declareoid_from)InconsistencyError)
get_logger)match)maybe_evaluatereprcall)cached_property)uuid
   zA node named {node.hostname} is already using this process mailbox!

Maybe you forgot to shutdown the other node or did not do so properly?
Or if you meant to start multiple nodes on the same host please make sure
you give each node a unique node name!
)NodeMailboxc                   @  s   e Zd ZdZdZdZdZdZdZdddZ	dddZ
dd	 Zd
d ZdddZdddZdddZdd Zdd ZdddZeZdd ZdS ) r   zMailbox node.Nc                 C  s:   || _ || _|| _|| _| jjj| _|d kr0i }|| _d S N)channelmailboxhostnamestateclockadjustadjust_clockhandlers)selfr   r   r   r#   r    r%   0/tmp/pip-unpacked-wheel-earovzxb/kombu/pidbox.py__init__8   s    zNode.__init__Tc                   sP    j  j} fdd}||_t|p* j|gf||d krB j jn|d|S )Nc                   s   |rt tj d d S )N)node)warningswarnW_PIDBOX_IN_USEformat)namemessagesZ	consumersr$   r%   r&   verify_exclusiveF   s    z'Node.Consumer.<locals>.verify_exclusive)no_ackaccept)r   	get_queuer   Zon_declaredr
   r   r2   )r$   r   r1   r2   optionsqueuer0   r%   r/   r&   r
   C   s     zNode.Consumerc                 C  s   || j |j< |S r   )r#   __name__)r$   Zfunr%   r%   r&   handlerQ   s    zNode.handlerc                 C  s   t d|dd d S )NzCannot decode message: %rr	   exc_info)error)r$   messageexcr%   r%   r&   on_decode_errorU   s    zNode.on_decode_errorc                 C  s&   | j ||p| jg| jd}|  |S )N)r   	callbacksr=   )r
   handle_messager=   consume)r$   r   callbackconsumerr%   r%   r&   listenX   s    
zNode.listenc           	   
   K  s   |pi }t dt|d|d|| |r*| jp.| j}z|||}W nP tk
rT    Y n< tk
r } ztd|dd dt|i}W 5 d }~X Y nX |r| j| j	|i|d |d	 |d
 |S )Nz1pidbox received method %s [reply_to:%s ticket:%s]r%   )kwargszpidbox command error: %rr	   r8   r:   exchangerouting_key)rE   rF   ticket)
debugr   handle_callhandle_cast
SystemExit	Exceptionr:   reprreplyr   )	r$   method	argumentsreply_torG   rD   handlerN   r<   r%   r%   r&   dispatch_   s*      zNode.dispatchc                 C  s    |si n|}| j | | jf|S r   )r#   r   r$   rO   rP   r%   r%   r&   rR   t   s    zNode.handlec                 C  s   |  ||S r   rR   rT   r%   r%   r&   rI   x   s    zNode.handle_callc                 C  s   |  ||S r   rU   rT   r%   r%   r&   rJ   {   s    zNode.handle_castc                 C  s   | d}| d}| d}|r8| |j dp4d | j}d}|rT||krrd}n|rn|rnt|||rrd}nd}|r| jf |S d S )Ndestinationpatternmatcherr    r   FT)getr"   headersr   r   rS   )r$   bodyr;   rV   rW   rX   r   Zrun_dispatchr%   r%   r&   r?   ~   s     


zNode.handle_messagec                 K  s"   | j j||||| j| j jd d S )N)r   
serializer)r   _publish_replyr   r\   )r$   datarE   rF   rG   rD   r%   r%   r&   rN      s    z
Node.reply)NNNN)NTN)NN)NNN)N)N)r6   
__module____qualname____doc__r   r   r#   r   r   r'   r
   r7   r=   rC   rS   rR   rI   rJ   r?   Zdispatch_from_messagerN   r%   r%   r%   r&   r   &   s0       


    


r   c                
   @  s   e Zd ZdZeZdZdZdZdZ	dZ
dZdZdgZdZd0dd	Zd
d Zd1ddZd2ddZd3ddZd4ddZd5ddZdd Zedd Zdd Zed6ddZd7dd Zd8d!d"Zd9d$d%Zd:d&d'Zd(d) Zd*d+ Z e!d,d- Z"ed.d/ Z#dS );r   zProcess Mailbox.z	%s.pidboxzreply.%s.pidboxNdirectjson      $@c                 C  s   || _ || _|| _|d kr t n|| _| | j | j| _| | j | _t	t
| _|d kr^| jn|| _|d krr| jn|| _|| _|	| _|
| _|| _|| _d S r   )	namespace
connectiontyper   r    _get_exchangerE   _get_reply_exchangereply_exchanger   r   	unclaimedr2   r\   	queue_ttlqueue_expiresreply_queue_ttlreply_queue_expires_producer_pool)r$   re   rg   rf   r    r2   r\   producer_poolrl   rm   rn   ro   r%   r%   r&   r'      s    
zMailbox.__init__c                 C  s   t | }||_|S r   )r   rf   )r$   rf   boundr%   r%   r&   __call__   s    zMailbox.__call__c                 C  s    |p
t  }| j||||| dS )N)r   )socketgethostnamenode_cls)r$   r   r   r   r#   r%   r%   r&   r      s    zMailbox.Nodec              	   C  s$   |si n|}| j |||d|||dS )NT)rN   timeoutrA   r   
_broadcast)r$   rV   commandrD   rw   rA   r   r%   r%   r&   call   s    
 zMailbox.callc                 C  s   |si n|}| j |||ddS NF)rN   rx   )r$   rV   rz   rD   r%   r%   r&   cast   s    zMailbox.castc                 C  s   |si n|}| j ||ddS r|   rx   )r$   rz   rD   r%   r%   r&   abcast   s    zMailbox.abcastr	   c              	   C  s$   |si n|}| j ||d||||dS )NT)rN   rw   limitrA   r   rx   )r$   rz   rD   rw   r   rA   r   r%   r%   r&   
multi_call   s    
 zMailbox.multi_callc              	   C  s0   | j }t| d| jj | j|dd| j| jdS )N.FT)rE   rF   durableauto_deleteexpiresmessage_ttl)oidr   rj   r-   ro   rn   )r$   r   r%   r%   r&   get_reply_queue   s    zMailbox.get_reply_queuec                 C  s   |   S r   )r   r/   r%   r%   r&   reply_queue   s    zMailbox.reply_queuec                 C  s(   t | d| j d| jdd| j| jdS )Nr   z.pidboxFT)rE   r   r   r   r   )r   re   rE   rm   rl   )r$   r   r%   r%   r&   r3      s    zMailbox.get_queuec              	   c  sB   |r|V  n2| j r0| j  }|V  W 5 Q R X nt|ddV  d S )NF)Zauto_declare)rq   acquirer   )r$   producerr   r%   r%   r&   producer_or_acquire  s    zMailbox.producer_or_acquirec           	   	   K  s~   |p
| j j}t|dddd}| ||L}z0|j|f|||g|| j ddd| W n tk
rn   Y nX W 5 Q R X d S )Nrb   	transientF)Zexchange_typedelivery_moder   )rG   r    T)rE   rF   declarerZ   retry)rf   default_channelr   r   publishr    forwardr   )	r$   rN   rE   rF   rG   r   r   optschanr%   r%   r&   r]     s,      
zMailbox._publish_replyc              	   C  s   ||||	|
d}|p| j j}| j}|rNt| | |j|| jj| jdd |pV| j	}| 
||:}|j||j|g| j |rt | ndd|dd W 5 Q R X d S )N)rO   rP   rV   rW   rX   )rE   rF   )rG   rQ   r   )r    r   T)rE   r   rZ   r\   r   )rf   r   rE   r   r   updaterj   r-   r   r\   r   r   r    r   r   )r$   rg   rP   rV   reply_ticketr   rw   r\   r   rW   rX   r;   r   rE   r%   r%   r&   _publish   s6    
   zMailbox._publishFc                 C  s   |d k	r(t |ttfs(tdt||
d k	rdt |
tsd|d k	rdt |tsdtdt|
t||pji }|rvt pxd }|p| jj	}|d kr|r|rt
|pd }|	p| j}	| j|||||||	|
|d	 |r| j|||||dS d S )Nz'destination must be a list/tuple not {}z.pattern and matcher must be strings not {}, {})rV   r   r   rw   r\   rW   rX   )r   rw   rA   r   )
isinstancelisttuple
ValueErrorr,   rg   strr   rf   r   lenr\   r   _collect)r$   rz   rP   rV   rN   rw   r   rA   r   r\   rW   rX   r   r   r%   r%   r&   ry   8  sN     

zMailbox._broadcastc              
     s  |d kr| j }|p| jj}| j}t||g|dd}	g | j| jj zW S  t	k
rd   Y nX  fdd}
|	
|
 zl|	^ |rt|pt D ]4}z| jj|d W q tjk
r   Y  qY qX qW  5 Q R  W S Q R X W 5 ||j X d S )NT)r2   r1   c                   sn   |j j} |dpd |d}|r2t |kr2d S |d}|kr\rP|  |  n| |  d S )Nr    r   r   rG   )rZ   rY   r   append)r[   r;   headerr   Zthis_idr"   rA   	responsesrG   rk   r%   r&   
on_messagep  s    
z$Mailbox._collect.<locals>.on_message)rw   )r2   rf   r   r   r
   rk   r    r!   popKeyErrorZregister_callbackZafter_reply_message_receivedr-   ranger   Zdrain_eventsrt   rw   )r$   rG   r   rw   rA   r   r2   r   r5   rB   r   ir%   r   r&   r   _  s.    
zMailbox._collectc                 C  s   t | j| |dddS )NFr   rg   r   r   )r   exchange_fmt)r$   re   rg   r%   r%   r&   rh     s
    
zMailbox._get_exchangec                 C  s   t | j| ddddS )Nrb   Fr   r   )r   reply_exchange_fmt)r$   re   r%   r%   r&   ri     s
    
zMailbox._get_reply_exchangec                 C  s   t | S r   )r   r/   r%   r%   r&   r     s    zMailbox.oidc                 C  s
   t | jS r   )r   rp   r/   r%   r%   r&   rq     s    zMailbox.producer_pool)
rb   NNNNNNNNrd   )NNNN)NNNN)N)N)Nr	   NNN)NN)NN)NNNNNNNN)
NNFr	   NNNNNN)Nr	   NNN)$r6   r_   r`   ra   r   rv   r   r   re   rf   rg   rE   rj   r2   r\   r'   rs   r{   r}   r~   r   r   r   r   r3   r   r   r]   r   ry   r   rh   ri   propertyr   rq   r%   r%   r%   r&   r      s                   

      


      



   
             
              
(        
,
r   ).ra   
__future__r   rt   r)   collectionsr   r   
contextlibr   r   	itertoolsr   r    r
   r   r   r   Zclocksr   commonr   r   
exceptionsr   logr   rX   r   Zutils.functionalr   r   Zutils.objectsr   Z
utils.uuidr   ZREPLY_QUEUE_EXPIRESr+   __all__r6   loggerrH   r:   r   r   r%   r%   r%   r&   <module>   s0   r