U
    bh,                     @   s  d Z ddlmZmZmZ ddlmZ ddlmZ ddlm	Z	m
Z
 ddlmZ ddlmZ dd	lmZ zdd
lZW n ek
r   d
ZY nX erzddlmZ W n  ek
r   ddlmZ Y nX ddlmZ nd
ZG dd deZdZeddgZG dd deZd
S )zMongoDB result store backend.    )datetime	timedeltatimezone)EncodeError)cached_property)maybe_sanitize_urlurlparse)states)ImproperlyConfigured   )BaseBackendN)Binary)InvalidDocumentc                   @   s   e Zd ZdS )r   N)__name__
__module____qualname__ r   r   ;/tmp/pip-unpacked-wheel-kcem4wq5/celery/backends/mongodb.pyr      s   r   )MongoBackendpicklemsgpackc                       s  e Zd ZdZdZdZdZdZdZdZ	dZ
dZdZdZd	ZdZd3 fd
d	Zedd Zdd Zdd Z fddZ fddZd4ddZdd Zdd Zdd Zdd Zd d! Zd"d# Zd5 fd%d&	Zd'd( Ze d)d* Z!e d+d, Z"e d-d. Z#e d/d0 Z$d6d1d2Z%  Z&S )7r   zMongoDB result backend.

    Raises:
        celery.exceptions.ImproperlyConfigured:
            if module :pypi:`pymongo` is not available.
    N	localhostii  celeryZcelery_taskmetaZcelery_groupmeta
   Fc                    s  i | _ t j|f| ts"td|   D ]\}}| j || q.| jr| 	| j| _tj
| j}dd |d D }|d | _|d | _|| _|d r|d | _| j |d  | jjd	}|d k	rt|tstd
t|}d|ksd|krd | _|d| j| _|d| j| _|d| j| _|d| j| _|d| j| _|d| j| _|d| j| _|d| j| _| j |di  | j | d S )NzCYou need to install the pymongo library to use the MongoDB backend.c                 S   s"   g | ]}|d   d|d  qS )r   :r   r   ).0xr   r   r   
<listcomp>N   s    z)MongoBackend.__init__.<locals>.<listcomp>ZnodelistusernamepassworddatabaseoptionsZmongodb_backend_settingsz4MongoDB backend settings should be grouped in a dicthostport
mongo_hostusertaskmeta_collectiongroupmeta_collection)r!   super__init__pymongor
   _prepare_client_optionsitems
setdefaulturl_ensure_mongodb_uri_complianceZ
uri_parser	parse_urir%   r   r$   database_nameupdateappconfget
isinstancedictpopr"   r#   r&   r'   )selfr3   kwargskeyvalueZuri_dataZ	hostslistconfig	__class__r   r   r)   :   sZ    




  zMongoBackend.__init__c                 C   s2   t | }|jdsd|  } | dkr.| d7 } | S )NZmongodbzmongodb+
mongodb://r   )r   scheme
startswith)r.   
parsed_urlr   r   r   r/   v   s    
z+MongoBackend._ensure_mongodb_uri_compliancec                 C   s$   t jdkrd| jiS | jddS d S )N)   ZmaxPoolSizeF)max_pool_sizeZauto_start_request)r*   Zversion_tuplerE   r9   r   r   r   r+      s
    

z$MongoBackend._prepare_client_optionsc                 C   s   | j dkrddlm} | j}|sL| j}t|trL|dsLd| d| j }t	| j
}||d< | jrn| j|d< | jr~| j|d< |f || _ | j S )	zConnect to the MongoDB server.Nr   )MongoClientr@   r   r"   r   r   )_connectionr*   rG   r$   r"   r6   strrB   r#   r7   r!   r%   r   )r9   rG   r"   r4   r   r   r   _get_connection   s"    




zMongoBackend._get_connectionc                    s0   | j dkr|S t |}| j tkr,t|}|S NZbson)
serializerr(   encodeBINARY_CODECSr   )r9   datapayloadr>   r   r   rM      s    

zMongoBackend.encodec                    s   | j dkr|S t |S rK   )rL   r(   decode)r9   rO   r>   r   r   rQ      s    
zMongoBackend.decodec           	   
   K   sl   | j | ||||dd}||d< z| jjd|i|dd W n* tk
rf } zt|W 5 d}~X Y nX |S )z1Store return value and state of an executed task.F)resultstate	tracebackrequestZformat_date_idTZupsertN)Z_get_result_metarM   
collectionreplace_oner   r   )	r9   task_idrR   rS   rT   rU   r:   metaexcr   r   r   _store_result   s     zMongoBackend._store_resultc                 C   s   | j d|i}|r| jjddr~| |d |d |d |d |d |d |d	 |d
 |d |d |d | |d dS | |d |d | |d |d |d |d dS tjddS )z$Get task meta-data for a task by id.rV   extendedrR   nameargsqueuer:   statusworkerretrieschildren	date_donerT   )r_   r`   rZ   ra   r:   rb   rc   rd   re   rf   rT   rR   )rZ   rb   rR   rf   rT   re   N)rb   rR   )	rX   find_oner3   r4   Zfind_value_for_keyZmeta_from_decodedrQ   r	   ZPENDING)r9   rZ   objr   r   r   _get_task_meta_for   s4    zMongoBackend._get_task_meta_forc                 C   s>   ||  dd |D ttjd}| jjd|i|dd |S )zSave the group result.c                 S   s   g | ]
}|j qS r   )id)r   ir   r   r   r      s     z,MongoBackend._save_group.<locals>.<listcomp>)rV   rR   rf   rV   TrW   )rM   r   nowr   utcgroup_collectionrY   )r9   group_idrR   r[   r   r   r   _save_group   s    
zMongoBackend._save_groupc                    sD    j d|i}|r@|d |d  fdd |d D dS dS )z!Get the result for a group by id.rV   rf   c                    s   g | ]} j |qS r   )r3   ZAsyncResult)r   ZtaskrF   r   r   r      s   z/MongoBackend._restore_group.<locals>.<listcomp>rR   )rZ   rf   rR   N)rn   rg   rQ   )r9   ro   rh   r   rF   r   _restore_group   s    
zMongoBackend._restore_groupc                 C   s   | j d|i dS )zDelete a group by id.rV   N)rn   
delete_one)r9   ro   r   r   r   _delete_group   s    zMongoBackend._delete_groupc                 C   s   | j d|i dS )zRemove result from MongoDB.

        Raises:
            pymongo.exceptions.OperationsError:
                if the task_id could not be removed.
        rV   N)rX   rr   )r9   rZ   r   r   r   _forget   s    
zMongoBackend._forgetc                 C   sN   | j s
dS | jdd| j | j ii | jdd| j | j ii dS )zDelete expired meta-data.Nrf   z$lt)expiresrX   Zdelete_manyr3   rl   expires_deltarn   rF   r   r   r   cleanup	  s    zMongoBackend.cleanupr   c                    s(   |si n|}t  |t|| j| jdS )N)ru   r.   )r(   
__reduce__r7   ru   r.   )r9   r`   r:   r>   r   r   rx     s
     zMongoBackend.__reduce__c                 C   s   |   }|| j S )N)rJ   r1   )r9   connr   r   r   _get_database  s    zMongoBackend._get_databasec                 C   s   |   S )z]Get database from MongoDB connection.

        performs authentication if necessary.
        )rz   rF   r   r   r   r      s    zMongoBackend.databasec                 C   s   | j | j }|jddd |S z"Get the meta-data task collection.rf   T)Z
background)r    r&   create_indexr9   rX   r   r   r   rX   &  s    zMongoBackend.collectionc                 C   s   | j | j }|jddd |S r{   )r    r'   r|   r}   r   r   r   rn   0  s    zMongoBackend.group_collectionc                 C   s   t | jdS )N)seconds)r   ru   rF   r   r   r   rv   :  s    zMongoBackend.expires_deltac                 C   sL   | j s
dS |r| j S d| j kr(t| j S | j dd\}}dt||gS )z~Return the backend as an URI.

        Arguments:
            include_password (bool): Password censored if disabled.
        r@   ,r   )r.   r   splitjoin)r9   Zinclude_passwordZuri1	remainderr   r   r   as_uri>  s    

zMongoBackend.as_uri)N)NN)r   N)F)'r   r   r   __doc__r$   r"   r#   r%   r   r1   r&   r'   rE   r!   Zsupports_autoexpirerH   r)   staticmethodr/   r+   rJ   rM   rQ   r]   ri   rp   rq   rs   rt   rw   rx   rz   r   r    rX   rn   rv   r   __classcell__r   r   r>   r   r   #   sP   <

   



	
	
r   )r   r   r   r   Zkombu.exceptionsr   Zkombu.utils.objectsr   Zkombu.utils.urlr   r   r   r	   Zcelery.exceptionsr
   baser   r*   ImportErrorZbson.binaryr   Zpymongo.binaryZpymongo.errorsr   	Exception__all__	frozensetrN   r   r   r   r   r   <module>   s,   
