U
    bh=                     @  s   d Z ddlmZ ddlZddlmZ ddlZddlmZmZm	Z	 ddl
mZ ddlmZ ddlmZ dd	lmZ dd
lmZmZ ddlmZ ddlmZ ddlmZ ddlmZ dZdZG dd dZG dd dej Z G dd dej!Z!dS )a  MongoDB transport module for kombu.

Features
========
* Type: Virtual
* Supports Direct: Yes
* Supports Topic: Yes
* Supports Fanout: Yes
* Supports Priority: Yes
* Supports TTL: Yes

Connection String
=================
 *Unreviewed*

Transport Options
=================

* ``connect_timeout``,
* ``ssl``,
* ``ttl``,
* ``capped_queue_size``,
* ``default_hostname``,
* ``default_port``,
* ``default_database``,
* ``messages_collection``,
* ``routing_collection``,
* ``broadcast_collection``,
* ``queues_collection``,
* ``calc_queue_size``,
    )annotationsN)Empty)MongoClienterrors
uri_parser)
CursorType)VersionMismatch)_detect_environment)bytes_to_str)dumpsloads)cached_property)maybe_sanitize_url   )virtualto_rabbitmq_queue_argumentsz3Kombu requires MongoDB version 1.3+ (server is {0})zKKombu requires MongoDB version 2.2+ (server is {0}) for TTL indexes supportc                   @  sF   e Zd ZdZdd Zdd Zdd Zdd	d
Zdd Zdd Z	e	Z
dS )BroadcastCursorzCursor for broadcast queues.c                 C  s   || _ d| _| jdd d S )Nr   F)rewind)_cursor_offsetpurge)selfcursor r   ;/tmp/pip-unpacked-wheel-earovzxb/kombu/transport/mongodb.py__init__C   s    zBroadcastCursor.__init__c                 C  s   | j ji | j S N)r   
collectioncount_documentsr   r   r   r   r   get_sizeH   s    zBroadcastCursor.get_sizec                 C  s   | j   d S r   )r   closer    r   r   r   r"   K   s    zBroadcastCursor.closeTc                 C  s2   |r| j   | j ji | _| j | j| _ d S r   )r   r   r   r   r   skip)r   r   r   r   r   r   N   s    
zBroadcastCursor.purgec                 C  s   | S r   r   r    r   r   r   __iter__V   s    zBroadcastCursor.__iter__c              
   C  sj   zt | j}W qX tjjk
rR } z"dt|kr@|   W Y q  W 5 d }~X Y q X qXq |  jd7  _|S )Nznot valid at serverr   )nextr   pymongor   OperationFailurestrr   r   )r   msgexcr   r   r   __next__Y   s    zBroadcastCursor.__next__N)T)__name__
__module____qualname____doc__r   r!   r"   r   r$   r+   r%   r   r   r   r   r   @   s   
r   c                      s\  e Zd ZdZdZi ZdZdZdZdZ	dZ
dZdZdZd	Zd
ZdZdZejjd Z fddZdd Zdd Z fddZdd Zdd Zdd Zdd Zdd Z fd d!ZdGd#d$Zd%d& Z d'd( Z!dHd)d*Z"d+d, Z#d-d. Z$d/d0 Z%e&d1d2 Z'e&d3d4 Z(e&d5d6 Z)e&d7d8 Z*e&d9d: Z+d;d< Z,d=d> Z-d?d@ Z.dAdB Z/dCdD Z0dEdF Z1  Z2S )IChannelzMongoDB Channel.TFNi z	127.0.0.1ii  Zkombu_defaultmessageszmessages.routingzmessages.broadcastzmessages.queues)connect_timeoutsslttlcapped_queue_sizedefault_hostnamedefault_portdefault_databasemessages_collectionrouting_collectionbroadcast_collectionqueues_collectioncalc_queue_sizec                   s   t  j|| i | _| j d S r   )superr   _broadcast_cursorsclient)r   Zvargskwargs	__class__r   r   r      s    zChannel.__init__c              	   K  s4   | j r0| jjd|id||| |ddidd d S )N_id$set	x-expires)rD   options	expire_atTZupsert)r4   queues
update_one_get_queue_expire)r   queuerA   r   r   r   
_new_queue   s     	zChannel._new_queuec                 C  s   || j kr8zt| |}W qT tk
r4   d }Y qTX n| jjd|idtjfgd}| jrd| 	| |d krrt
 tt|d S )NrM   priority)sortpayload)_fanout_queuesr%   _get_broadcast_cursorStopIterationr1   Zfind_one_and_deleter&   Z	ASCENDINGr4   _update_queues_expirer   r   r
   )r   rM   r)   r   r   r   _get   s    


zChannel._getc                   s:   | j st |S || jkr*| | S | jd|iS NrM   )r=   r>   _sizerR   rS   r!   r1   r   r   rM   rB   r   r   rX      s
    
zChannel._sizec                 K  sr   t ||| j|ddd}| jrb| |d|d< | |}|d k	rb|d d ksZ||d k rb||d< | j| d S )NT)reverse)rQ   rM   rO   zx-message-ttlrH   )r   Z_get_message_priorityr4   rL   _get_message_expirer1   
insert_one)r   rM   messagerA   dataZ
msg_expirer   r   r   _put   s    


zChannel._putc                 K  s   | j t||d d S )N)rQ   rM   )	broadcastr\   r   )r   exchanger]   routing_keyrA   r   r   r   _put_fanout   s    zChannel._put_fanoutc                 C  s8   |  |}|| jkr$| |  n| jd|i |S rW   )rX   rR   rS   r   r1   delete_many)r   rM   sizer   r   r   _purge   s
    

zChannel._purgec                 C  s:   t | jj| d }| jd|i}|t dd |D B S )Ntablera   c                 s  s$   | ]}|d  |d |d fV  qdS )rb   patternrM   Nr   ).0rr   r   r   	<genexpr>   s   z$Channel.get_table.<locals>.<genexpr>)	frozensetstateZ	exchangesroutingfind)r   ra   ZlocalRoutesZbrokerRoutesr   r   r   	get_table   s    
zChannel.get_tablec                 C  sp   |  |jdkr*| |||| || j|< ||||d}| }| jrV| |d|d< | jj|d|idd d S )Nfanout)ra   rM   rb   rh   rF   rH   rE   TrI   )	Ztypeoftype_create_broadcast_cursorrR   copyr4   rL   rn   rK   )r   ra   rb   rh   rM   lookupr^   r   r   r   _queue_bind   s"       
zChannel._queue_bindc                   s~   | j d|i | jr&| jd|i t j|f| || jkrzz| j	|}W n t
k
rd   Y nX |  | j	| d S )NrM   rD   )rn   rd   r4   rJ   Z
delete_oner>   queue_deleterR   r?   popKeyErrorr"   )r   rM   rA   r   rB   r   r   rw     s    
zChannel.queue_delete
mongodb://c                 C  s0  | j j}|j}|dr$d}d| }||s6|| }|t|d  sP|| j7 }|jrd|kr|d\}}|j}|jr|d|j 7 }|d | d | }|j	r|j	n| j
}tj||dd}|d	 p|j}	|	d
kr| j}	d| j| jrt| jd nd d}
|
|d  | |
}
d|
kr&|
d ||	|
fS )Nzsrv://zmongodb+srv://zmongodb+@z://:F)validatedatabase)/NTi  )auto_start_requestr3   ZconnectTimeoutMSrG   Ztlsr3   )
connectionr@   hostname
startswithlenr6   Zuseridsplitpasswordportr7   r   	parse_uriZvirtual_hostr8   r3   r2   intupdate_prepare_client_optionsrx   )r   schemer@   r   headtailcredentialsr   parseddbnamerG   r   r   r   
_parse_uri  s>    





zChannel._parse_uric                 C  sB   t jdkr>|dd  t|dtr>t jj}||d  |d< |S )N   r   Zreadpreference)r&   version_tuplerx   
isinstancegetr   Zread_preferencesZ_MONGOS_MODES)r   rG   modesr   r   r   r   K  s    
zChannel._prepare_client_optionsc                 K  s   t |f|S r   r   )r   	argumentsrA   r   r   r   prepare_queue_argumentsS  s    zChannel.prepare_queue_argumentsc                 C  s   | j |d\}}}||d< t }|dkr>ddlm} |  n|dkrXddlm} |  tf |}|| }	| d }
|
	d	d }
t
tt|
	d
}|dk rtt|
n| jr|dk rtt|
|	S )N)r   hostgeventr   )monkeyeventlet)monkey_patchversion-.)r   r   )   r   )r   r	   r   r   Z	patch_allr   r   r   Zserver_infor   tuplemapr   r   E_SERVER_VERSIONformatr4   E_NO_TTL_INDEXES)r   r   r   r   confenvr   r   Z	mongoconnr~   version_strr   r   r   r   _openV  s&    

zChannel._openc                 C  s*   | j | krdS |j| j | jdd dS )z0Create capped collection for broadcast messages.NT)re   Zcapped)r;   Zlist_collection_namesZcreate_collectionr5   r   r~   r   r   r   _create_broadcastq  s    zChannel._create_broadcastc                 C  s   || j  }|jdddgdd || j dg || j }|ddg | jr|jdgdd	 |jdgdd	 || j jdgdd	 d
S )zEnsure indexes on collections.)rM   r   )rO   r   )rD   r   T)Z
background)ra   r   )rH   r   r   )ZexpireAfterSecondsN)r9   Zcreate_indexr;   r:   r4   r<   )r   r~   r1   rn   r   r   r   _ensure_indexesz  s    
 

 zChannel._ensure_indexesc                 C  s    |   }| | | | |S )zActually creates connection.)r   r   r   r   r   r   r   _create_client  s    

zChannel._create_clientc                 C  s   |   S r   )r   r    r   r   r   r@     s    zChannel.clientc                 C  s   | j | j S r   )r@   r9   r    r   r   r   r1     s    zChannel.messagesc                 C  s   | j | j S r   )r@   r:   r    r   r   r   rn     s    zChannel.routingc                 C  s   | j | j S r   )r@   r;   r    r   r   r   r`     s    zChannel.broadcastc                 C  s   | j | j S r   )r@   r<   r    r   r   r   rJ     s    zChannel.queuesc              	   C  s<   z| j | W S  tk
r6   | | j| d d | Y S X d S r   )r?   ry   rs   rR   rY   r   r   r   rS     s       zChannel._get_broadcast_cursorc                 C  sN   t jdkrd|itjd}nd|idd}| jjf |}t| }| j|< |S )Nr   rM   )filterZcursor_typeT)queryZtailable)r&   r   r   ZTAILABLEr`   ro   r   r?   )r   ra   rb   rh   rM   r   r   retr   r   r   rs     s    
z Channel._create_broadcast_cursorc                 C  s6   | di  d}|d k	r2|  tjt|d S d S )NZ
propertiesZ
expirationZmilliseconds)r   get_nowdatetime	timedeltar   )r   r]   valuer   r   r   r[     s    zChannel._get_message_expirec              	   C  sp   t |tr,| jd|i}|s"dS |d }n|}z|d | }W n ttfk
rZ   Y dS X |  tj|d S )zGet expiration header named `argument` of queue definition.

        Note:
        ----
            `queue` must be either queue name or options itself.
        rD   NrG   r   r   )	r   r(   rJ   Zfind_onery   	TypeErrorr   r   r   )r   rM   argumentdocr^   r   r   r   r   rL     s    

zChannel._get_queue_expirec                 C  sL   |  |d}|sdS | jd|idd|ii | jd|idd|ii dS )z,Update expiration field on queues documents.rF   NrM   rE   rH   rD   )rL   rn   Zupdate_manyrJ   )r   rM   rH   r   r   r   rU     s     
 
zChannel._update_queues_expirec                 C  s
   t j  S )zReturn current time in UTC.)r   utcnowr    r   r   r   r     s    zChannel.get_now)rz   )rz   )3r,   r-   r.   r/   Zsupports_fanoutrR   r3   r4   r2   r5   r=   r6   r7   r8   r9   r:   r;   r<   r   r0   Zfrom_transport_optionsr   rN   rV   rX   r_   rc   rf   rp   rv   rw   r   r   r   r   r   r   r   r   r@   r1   rn   r`   rJ   rS   rs   r[   rL   rU   r   __classcell__r   r   rB   r   r0   o   sb   


2
	




r0   c                   @  s   e Zd ZdZeZdZdZejZej	j
ejf Z
ej	jejejf ZdZdZej	jjedddgd	Zd
d ZddddddZdS )	TransportzMongoDB Transport.Tr   Zmongodbr&   directZtopicrq   )Zexchange_typec                 C  s   t jS r   )r&   r   r    r   r   r   driver_version	  s    zTransport.driver_versionF**r(   )urireturnc                 C  sB   |sdS |r|S d|kr t |S |dd\}}dt ||gS )Nrz   ,r   )r   r   join)r   r   Zinclude_passwordmaskZuri1	remainderr   r   r   as_uri  s    zTransport.as_uriN)Fr   )r,   r-   r.   r/   r0   Zcan_parse_urlZpolling_intervalr7   r   r   Zconnection_errorsr   ZConnectionFailureZchannel_errorsr'   Zdriver_typeZdriver_nameZ
implementsextendrl   r   r   r   r   r   r   r     s&   r   )"r/   
__future__r   r   rM   r   r&   r   r   r   Zpymongo.cursorr   Zkombu.exceptionsr   Zkombu.utils.compatr	   Zkombu.utils.encodingr
   Zkombu.utils.jsonr   r   Zkombu.utils.objectsr   Zkombu.utils.urlr    r   baser   r   r   r   r0   r   r   r   r   r   <module>   s,    /   