U
    bhk                     @  s
  d Z ddlmZ ddlZddlZddlZddlZddlmZm	Z	m
Z
mZ ddlmZ ddlmZ ddlmZ ddlmZ dd	lmZmZ dd
lmZmZ ddlmZ ddlmZ ddlmZmZm Z  ddl!m"Z" ddl#m$Z$ ddl%m&Z& ddl'm(Z(m)Z) ddl'm*Z+ ddl,m*Z- ddl.m*Z/ ddl0m1Z2 ddl3m4Z4 ddl5m6Z6 ddl7m8Z8m9Z9 ddl:m;Z;m<Z< ddl=m>Z> ddl?m@Z@ e6dZAeBejCdddh ZDeEdeEdidd  eDD ZFG d!d" d"ZGG d#d$ d$ZHejIG d%d& d&ZJG d'd( d(e@jKZKG d)d* d*e@jLZLdS )+ap  GCP Pub/Sub transport module for kombu.

More information about GCP Pub/Sub:
https://cloud.google.com/pubsub

Features
========
* Type: Virtual
* Supports Direct: Yes
* Supports Topic: No
* Supports Fanout: Yes
* Supports Priority: No
* Supports TTL: No

Connection String
=================

Connection string has the following formats:

.. code-block::

    gcpubsub://projects/project-name

Transport Options
=================
* ``queue_name_prefix``: (str) Prefix for queue names.
* ``ack_deadline_seconds``: (int) The maximum time after receiving a message
  and acknowledging it before pub/sub redelivers the message.
* ``expiration_seconds``: (int) Subscriptions without any subscriber
  activity or changes made to their properties are removed after this period.
  Examples of subscriber activities include open connections,
  active pulls, or successful pushes.
* ``wait_time_seconds``: (int) The maximum time to wait for new messages.
  Defaults to 10.
* ``retry_timeout_seconds``: (int) The maximum time to wait before retrying.
* ``bulk_max_messages``: (int) The maximum number of messages to pull in bulk.
  Defaults to 32.
    )annotationsN)FIRST_COMPLETEDFutureThreadPoolExecutorwait)suppress)getpid)Empty)Lock)	monotonicsleep)NAMESPACE_OIDuuid3)gethostnametimeout)AlreadyExistsDeadlineExceededPermissionDenied)Retry)monitoring_v3)query)PublisherClientSubscriberClient)
exceptions)gapic_version)TRANSIENT_DELIVERY_MODE)
get_logger)bytes_to_strsafe_str)dumpsloads)cached_property   )virtualzkombu.transport.gcpubsub_.-c                 C  s   i | ]}t |t d qS )r%   )ord).0c r+   </tmp/pip-unpacked-wheel-earovzxb/kombu/transport/gcpubsub.py
<dictcomp>S   s      r-   c                   @  sP   e Zd ZdZdd Zdd Zdddd	ZdddZdd Zdd Z	dd Z
dS )
UnackedIdszThreadsafe list of ack_ids.c                 C  s   g | _ t | _d S N)_listr
   _lockselfr+   r+   r,   __init__Z   s    zUnackedIds.__init__c                 C  s   | j | d S r/   )r0   appendr3   valr+   r+   r,   r5   ^   s    zUnackedIds.appendlist)valsc                 C  s   | j | d S r/   )r0   extend)r3   r9   r+   r+   r,   r:   b   s    zUnackedIds.extendc              
   C  s*   | j  | j|W  5 Q R  S Q R X d S r/   )r1   r0   pop)r3   indexr+   r+   r,   r<   f   s    zUnackedIds.popc              
   C  s6   | j & tt | j| W 5 Q R X W 5 Q R X d S r/   )r1   r   
ValueErrorr0   remover6   r+   r+   r,   r?   j   s    zUnackedIds.removec              
   C  s(   | j  t| jW  5 Q R  S Q R X d S r/   )r1   lenr0   r2   r+   r+   r,   __len__n   s    zUnackedIds.__len__c                 C  s
   | j | S r/   )r0   )r3   itemr+   r+   r,   __getitem__r   s    zUnackedIds.__getitem__N)r;   )__name__
__module____qualname____doc__r4   r5   r:   r<   r?   rA   rC   r+   r+   r+   r,   r.   W   s   
r.   c                   @  s6   e Zd ZdZdddZdddZddd	Zd
d ZdS )AtomicCounterzIThreadsafe counter.

    Returns the value after inc/dec operations.
    r   c                 C  s   || _ t | _d S r/   )_valuer
   r1   )r3   initialr+   r+   r,   r4   }   s    zAtomicCounter.__init__r#   c              
   C  s2   | j " |  j|7  _| jW  5 Q R  S Q R X d S r/   r1   rI   r3   nr+   r+   r,   inc   s    zAtomicCounter.incc              
   C  s2   | j " |  j|8  _| jW  5 Q R  S Q R X d S r/   rK   rL   r+   r+   r,   dec   s    zAtomicCounter.decc              
   C  s$   | j  | jW  5 Q R  S Q R X d S r/   rK   r2   r+   r+   r,   get   s    zAtomicCounter.getN)r   )r#   )r#   )rD   rE   rF   rG   r4   rN   rO   rP   r+   r+   r+   r,   rH   w   s
   


rH   c                   @  sF   e Zd ZU dZded< ded< ded< ded< ejedZded	< d
S )QueueDescriptorzPub/Sub queue descriptor.strname
topic_pathsubscription_idsubscription_path)default_factoryr.   unacked_idsN)	rD   rE   rF   rG   __annotations__dataclassesfieldr.   rX   r+   r+   r+   r,   rQ      s   
rQ   c                      s*  e Zd ZU dZdZdZdZdZdZdZ	dZ
dZe Zd	Zd
ed< e Ze Zi Zded< e Zded<  fddZefdddddZdd ZdbdddddddZdddddZdcddddddd d!d"Zd#d$ Zd%d& Zd'd( Z dddd)d*d+d,Z!d-d.d/d0Z"dd)d*d1d2Z#dd3d4d5Z$de fd6d7	Z%ddd8d9d:Z&df fd;d<	Z'd=dd>d?d@Z(ddAdBdCZ)dDdE Z*ddAdFdGZ+e,dHdI Z-e,dJdK Z.e,dLdM Z/e0dNdO Z1e0dPdQ Z2e,dRdS Z3e,dTdU Z4e,dVdW Z5e,dXdY Z6e,dZd[ Z7e,d\d] Z8 fd^d_Z9e:d`da Z;  Z<S )gChannelzGCP Pub/Sub channel.TF
      iQ i,      Nzthreading.Thread_unacked_extenderzdict[str, QueueDescriptor]_queue_cachezset[str]_tmp_subscriptionsc                   sn   t  j|| t | _td| jj t	| jj| _
| j dkrjtj| jddt_| j  tj  d S )Nznew GCP pub/sub channel: %sr#   T)targetdaemon)superr4   r   poolloggerinfoconninfohostname	Transport	parse_uri
project_id_n_channelsrN   	threadingThread_extend_unacked_deadliner\   r`   _stop_extenderclearstart)r3   argskwargs	__class__r+   r,   r4      s    
zChannel.__init__rR   )rS   returnc                 C  s(   | | js| j| }tt||S )z7Format AMQP queue name into a valid Pub/Sub queue name.)
startswithqueue_name_prefixrR   r   	translate)r3   rS   tabler+   r+   r,   entity_name   s    
zChannel.entity_namec                 C  s
  |  |j}| |}td|||| i }|dkr\dd| di}| j| j|}| j}nn|dkrt	t
t  dt   }	| d|	 }
| j| j|
}| j| | j| d	}ntd
| d| | j||}| j||||d t||||d}|| j|< d S )Nz9binding queue: %s to %s exchange: %s with routing_key: %sdirectfilterzattributes.routing_key=""fanoutr&   r'   iX  zexchange type z not implemented)rT   rV   filter_argsmsg_retention)rS   rT   rU   rV   )typeoftyper~   rg   debug
subscriberrV   rm   expiration_secondsr   r   r   r   rb   add_fanout_exchangesNotImplementedError_create_topic_create_subscriptionrQ   ra   )r3   exchangerouting_keypatternqueueexchange_typer   rV   message_retention_durationuidZuniq_sub_nameZexchange_topicqdescr+   r+   r,   _queue_bind   sd    
 
  
  zChannel._queue_bindint)rm   topic_idr   ry   c                 C  sz   | j ||}| |r(td| |S z8td| d|i}|rP| d|d< | j j|d W n tk
rt   Y nX |S )Nztopic: %s existszcreating topic: %srS   sr   request)	publisherrT   _is_topic_existsrg   r   Zcreate_topicr   )r3   rm   r   r   rT   r   r+   r+   r,   r     s     
zChannel._create_topicbool)rT   ry   c                 C  s8   | j jdd| j id}|D ]}|j|kr dS qdS )Nprojectz	projects/r   TF)r   Zlist_topicsrm   rS   )r3   rT   Ztopicstr+   r+   r,   r     s    
zChannel._is_topic_exists)rm   r   rT   rV   r   ry   c                 C  s   |p| j | j|}|p$| j||}zTtd||| |p@| j}| j j||| j	d| j di| dd|ppi d W n t
k
r   Y nX |S )Nz0creating subscription: %s, topic: %s, filter: %sZttlr   )rS   Ztopicack_deadline_secondsZexpiration_policyr   r   )r   rV   rm   r   rT   rg   r   r   Zcreate_subscriptionr   r   )r3   rm   r   rT   rV   r   r   r+   r+   r,   r   #  s<    

 
 

zChannel._create_subscriptionc                 O  sP   |  |}td| | j|}|s*dS | jjd|jid | j|d dS )zDelete a queue by name.zdeleting queue: %sNsubscriptionr   )	r~   rg   rh   ra   rP   r   delete_subscriptionrV   r<   )r3   r   ru   rv   r   r+   r+   r,   _deleteK  s    
zChannel._deletec                 K  sV   |  |}| j| }| |}td||j| t|}| jj|j|	d|d dS )zPut a message onto the queue.z8putting message to queue: %s, topic: %s, routing_key: %sutf-8)r   N)
r~   ra   _get_routing_keyrg   r   rT   r    r   publishencode)r3   r   messagerv   r   r   encoded_messager+   r+   r,   _putW  s    


zChannel._putc                 K  sV   |  || | j| j|}td|| t|}| jj||dt	| j
dd dS )z#Put a message onto fanout exchange.z-putting msg to fanout exchange: %s, topic: %sr   deadline)retryN)_lookupr   rT   rm   rg   r   r    r   r   r   retry_timeout_seconds)r3   r   r   r   rv   rT   r   r+   r+   r,   _put_fanouti  s    
zChannel._put_fanoutfloat)r   r   c           	      C  s   |  |}| j| }z,| jj|jddt| jd|p8| jd}W n tk
rZ   t	 Y nX t
|jdkrpt	 |jd }|j}t|jj}|d d }td|||d  | |d rtd	| | |g|j n$|||jj|jd
|d< |j| |S )z(Retrieves a single message from a queue.r#   r   max_messagesr   r   r   r   r   
propertiesdelivery_infoz-queue:%s got message, ack_id: %s, payload: %szauto acking message ack_id: %sr   ack_id
message_idrV   gcpubsub_message)r~   ra   r   pullrV   r   r   wait_time_secondsr   r	   r@   received_messagesr   r!   r   datarg   r   _is_auto_ack_do_ackr   rX   r5   )	r3   r   r   r   responser   r   payloadr   r+   r+   r,   _gety  sD    





zChannel._getdict)payload_propertiesc                 C  s&   |d d }|d }|t kp$|| jkS )Nr   r   delivery_mode)r   r   )r3   r   r   r   r+   r+   r,   r     s
    zChannel._is_auto_ackc                 C  s<  |  |}| j| }|  }|s&t z,| jj|j|dt| jd|pJ| j	d}W n t
k
rl   t Y nX |j}t|dkrt g }g }	tdt|| |D ]n}
|
j}tt|
jj}|d d }|||
jj|jd|d	< | |d r|| n|j| |	| q|r4td
| | ||j ||	fS )z(Retrieves bulk of messages from a queue.r   r   r   r   z#batching %d messages from queue: %sr   r   r   r   zauto acking ack_ids: %s)r~   ra   _get_max_messages_estimater	   r   r   rV   r   r   r   r   r   r@   rg   r   r   r!   r   r   r   r   r   r5   rX   r   )r3   r   r   Zprefixed_queuer   r   r   r   Zauto_ack_idsZret_payloadsr   r   r   r   r+   r+   r,   	_get_bulk  sT    




zChannel._get_bulk)ry   c                 C  s    | j  }| j}|d kr|S |S r/   )qosZcan_consume_max_estimatebulk_max_messages)r3   Zmax_allowedZmax_if_unlimitedr+   r+   r,   r     s    
z"Channel._get_max_messages_estimatec                   sh   | j j|i }|s$t |||S | || ||||}|rH|S t	d| | 
||| |gS )Nz3no queues bound to exchange: %s, binding on the fly)stateZ	exchangesrP   re   r   r   lookupZ	get_tablerg   r   Z
queue_bind)r3   r   r   defaultZexchange_inforetrw   r+   r,   r     s"    
zChannel._lookup)r   ry   c              
   C  s|   |  |}|| jkrdS | j| }tj| j| jdtj ddj|j	d}t
t  tdd |D W  5 Q R  S Q R X dS )	zReturn the number of messages in a queue.

        This is a *rough* estimation, as Pub/Sub doesn't provide
        an exact API.
        r   z;pubsub.googleapis.com/subscription/num_undelivered_messagesr#   )Zend_timeminutes)rU   c                 s  s   | ]}|j d  jjV  qdS )r   N)ZpointsvalueZint64_value)r)   contentr+   r+   r,   	<genexpr>  s    z Channel._size.<locals>.<genexpr>r;   )r~   ra   r   ZQuerymonitorrm   datetimenowZselect_resourcesrU   r   r   sum)r3   r   r   resultr+   r+   r,   _size  s$    



zChannel._sizec           	        s|   |rt d| j|j}|d }|d }|d }td|| |d }| |g| | j| }|j	| t
 | dS )zAcknowledge one message.zmultiple acks not implementedr   r   r   z!ack message. queue: %s ack_id: %srV   N)r   r   rP   r   rg   r   r   ra   rX   r?   re   	basic_ack)	r3   Zdelivery_tagmultipler   Zpubsub_messager   r   rV   r   rw   r+   r,   r     s    
zChannel.basic_ackz	list[str])ack_idsrV   c                 C  s"   | j j||dt| jdd d S )N)r   r   r   )r   r   )r   Zacknowledger   r   )r3   r   rV   r+   r+   r,   r   #  s    
zChannel._do_ack)r   c                 C  sH   |  |}| j|}|sdS | |}| jj|jtj dd |S )z'Delete all current messages in a queue.N)r   timer   )	r~   ra   rP   r   r   seekrV   r   r   )r3   r   r   rM   r+   r+   r,   _purge)  s    

zChannel._purgec              	   C  s   t  }td| | jd }t|| jd }| j|s| j	
 D ]d}t|jdkrhtd||j qDtd||jt|jt|j | jj|jt|j| jdd qDq.td	| d S )
Nz/unacked deadline extension thread: [%s] started      r   z'thread [%s]: no unacked messages for %sz5thread [%s]: extend ack deadline for %s: %d msgs [%s])r   r   r   r   z.unacked deadline extension thread [%s] stopped)ro   get_native_idrg   rh   _min_ack_deadlinemaxr   rr   r   ra   valuesr@   rX   r   rV   r8   r   Zmodify_ack_deadline)r3   Z	thread_idZmin_deadline_sleepZ
sleep_timer   r+   r+   r,   rq   9  sB    

 z Channel._extend_unacked_deadlinec                 C  s8   |  |}| j| j|}td|| | j| d S )Nz0after_reply_message_received: queue: %s, sub: %s)r~   r   rV   rm   rg   r   rb   r   )r3   r   subr+   r+   r,   after_reply_message_received\  s    
  z$Channel.after_reply_message_receivedc                 C  s   t  S r/   )r   r2   r+   r+   r,   r   d  s    zChannel.subscriberc                 C  s   t  S r/   )r   r2   r+   r+   r,   r   h  s    zChannel.publisherc                 C  s   t  S r/   )r   ZMetricServiceClientr2   r+   r+   r,   r   l  s    zChannel.monitorc                 C  s   | j jS r/   )
connectionclientr2   r+   r+   r,   ri   p  s    zChannel.conninfoc                 C  s
   | j jjS r/   )r   r   transport_optionsr2   r+   r+   r,   r   t  s    zChannel.transport_optionsc                 C  s   | j d| jS )Nr   )r   rP   default_wait_time_secondsr2   r+   r+   r,   r   x  s     zChannel.wait_time_secondsc                 C  s   | j d| jS )Nr   )r   rP   default_retry_timeout_secondsr2   r+   r+   r,   r   ~  s     zChannel.retry_timeout_secondsc                 C  s   | j d| jS )Nr   )r   rP   default_ack_deadline_secondsr2   r+   r+   r,   r     s     zChannel.ack_deadline_secondsc                 C  s   | j ddS )Nr{   zkombu-)r   rP   r2   r+   r+   r,   r{     s    zChannel.queue_name_prefixc                 C  s   | j d| jS )Nr   )r   rP   default_expiration_secondsr2   r+   r+   r,   r     s     zChannel.expiration_secondsc                 C  s   | j d| jS )Nr   )r   rP   default_bulk_max_messagesr2   r+   r+   r,   r     s     zChannel.bulk_max_messagesc              	     sz   t d | jrN| j }tt$ t d| | jjd|id W 5 Q R X q
| j	 sl| j
  tj  t   dS )zClose the channel.zclosing channelzdeleting subscription: %sr   r   N)rg   r   rb   r<   r   	Exceptionr   r   rn   rO   rr   setr\   r`   joinre   close)r3   r   rw   r+   r,   r     s    





zChannel.closec                 C  s   | d  di  dd}|S )Nr   r   r    )rP   )r   r   r+   r+   r,   r     s      zChannel._get_routing_key)N)NNNNNN)N)N)F)=rD   rE   rF   rG   Zsupports_fanoutZ
do_restorer   r   r   r   r   r   r   r   r`   rY   ro   Eventrr   rH   rn   ra   rb   r4   CHARS_REPLACE_TABLEr~   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   rq   r   r"   r   r   r   propertyri   r   r   r   r   r{   r   r   r   staticmethodr   __classcell__r+   r+   rw   r,   r\      s   
C       (+3#










r\   c                      s   e Zd ZdZeZdZdZejj	e
jf Z	ejjejejejejejf ejf ZdZdZejjjeddgdZ fd	d
Zdd ZedddddZeddddddZdddZdd Z dd Z!dd Z"  Z#S ) rk   zGCP Pub/Sub transport.Tg?ZgcpubsubZ	pubsub_v1r   r   )r   c                   s$   t  j|f| t | _t | _d S r/   )re   r4   r   _poolr   _get_bulk_future_to_queue)r3   r   rv   rw   r+   r,   r4     s    zTransport.__init__c                 C  s   t jS r/   )package_version__version__r2   r+   r+   r,   driver_version  s    zTransport.driver_versionrR   )uriry   c                 C  s   |  dd }|dS )Nzgcpubsub://projects/r#   /)splitstrip)r   r   r+   r+   r,   rl     s    zTransport.parse_uriF**c                 C  s   |pdS )Nzgcpubsub://r+   )r3   r   Zinclude_passwordmaskr+   r+   r,   as_uri  s    zTransport.as_uriNc                 C  sr   t  }| j}|r |r ||kr |}z| j|d W qn tk
rh   |rXt  | |krXt |rdt| Y q X qnq d S )Nr   )r   polling_interval_drain_from_active_queuesr	   socket_timeoutr   )r3   r   r   Z
time_startr  r+   r+   r,   drain_events  s    zTransport.drain_eventsc           	      C  s   |    | jdd t| j|td\}}dd |D }||8 }|D ]}| j|d  qB|s`t tdt	| |D ]X}|
 \}}|D ]4}td| || jkrtd| q| || q| j|d  qtd S )	Nr]   r   )r   Zreturn_whenc                 S  s   h | ]}|  r|qS r+   )	exceptionr)   fr+   r+   r,   	<setcomp>  s      z6Transport._drain_from_active_queues.<locals>.<setcomp>zgot %d done get_bulk tasksz consuming message from queue: %sz&Message for queue %s without consumers)_rm_empty_bulk_requests_submit_get_bulk_requestsr   r   r   r<   r	   rg   r   r@   r   Z
_callbackswarningZ_deliver)	r3   r   doner%   emptyr  r   Zpayloadsr   r+   r+   r,   r    s4    

 z#Transport._drain_from_active_queuesc                 C  s,   dd | j D }|D ]}| j |d  qd S )Nc                 S  s    h | ]}|  r| r|qS r+   )r  r  r  r+   r+   r,   r    s    z4Transport._rm_empty_bulk_requests.<locals>.<setcomp>)r   r<   )r3   r  r  r+   r+   r,   r    s
    z!Transport._rm_empty_bulk_requestsc                 C  sP   t | j }| jD ]6}|jD ]*}||kr,q| j|j||}|| j|< qqd S r/   )r   r   r   ZchannelsZ_active_queuesr   Zsubmitr   )r3   r   Zqueues_with_submitted_get_bulkZchannelr   futurer+   r+   r,   r     s    

z#Transport._submit_get_bulk_requests)Fr  )N)$rD   rE   rF   rG   r\   Zcan_parse_urlr  r$   rk   Zconnection_errorspubsub_exceptionsTimeoutErrorZchannel_errorspublisher_exceptionsZFlowControlLimitErrorZMessageTooLargeErrorZPublishErrorZ#PublishToPausedOrderingKeyExceptionsubscriber_exceptionsZAcknowledgeErrorZdriver_typeZdriver_nameZ
implementsr:   	frozensetr4   r   r   rl   classmethodr  r
  r  r  r  r   r+   r+   rw   r,   rk     sB   

#	rk   )MrG   
__future__r   rZ   r   stringro   concurrent.futuresr   r   r   r   
contextlibr   osr   r   r	   r
   r   r   r   uuidr   r   _socketr   r   r	  Zgoogle.api_core.exceptionsr   r   r   Zgoogle.api_core.retryr   Zgoogle.cloudr   Zgoogle.cloud.monitoring_v3r   Zgoogle.cloud.pubsub_v1r   r   r   r  Z google.cloud.pubsub_v1.publisherr  Z!google.cloud.pubsub_v1.subscriberr  Zgoogle.pubsub_v1r   r   Zkombu.entityr   Z	kombu.logr   Zkombu.utils.encodingr   r   Zkombu.utils.jsonr    r!   Zkombu.utils.objectsr"   r   r$   rg   r   punctuationZPUNCTUATIONS_TO_REPLACEr(   r   r.   rH   Z	dataclassrQ   r\   rk   r+   r+   r+   r,   <module>   s\   '  
    