U
    bh|L                     @   s   d Z ddlmZ ddlmZ ddlmZmZ ddlmZm	Z	 ddl
mZ ddlmZ ddlmZ d	d
lmZ zddlZddlmZ W n ek
r   d ZZY nX dZeddZeeZG dd deZdS )z"AWS DynamoDB result store backend.    )
namedtuple)
ip_address)sleeptime)AnyDict)
_parse_url)ImproperlyConfigured)
get_logger   )KeyValueStoreBackendN)ClientError)DynamoDBBackendDynamoDBAttributename	data_typec                       sj  e Zd ZdZdZdZdZdZdZdZ	dZ
edddZed	d
dZedddZedddZedddZdZdZd? fdd	Zedd Zd@ddZdd Zdd Zdd Zdd Zdd Zdd  Zd!d" ZdAd$d%Zd&d' Zd(d) Z e!e"e!e#f d*d+d,Z$e!e"e!e#f d*d-d.Z%d/d0 Z&e'd1d2 Z(d3d4 Z)d5d6 Z*d7d8 Z+d9d: Z,e-e.d*d;d<Z/ fd=d>Z0  Z1S )Br   zAWS DynamoDB result backend.

    Raises:
        celery.exceptions.ImproperlyConfigured:
            if module :pypi:`boto3` is not available.
    Zceleryr   NTidSr   resultBZchord_countN	timestampttlc              
      s  t  j|| || _|p| j| _ts,tdd}d }d }|d k	r|t|\}}	}
}}}}|}|}|d k	}|d k	}||krtd|}|	dkst|	rd|	 d|
 | _	d| _
td| j	 n|	| _
| jjj}|d	}|r|| _	t|d
| j| _t|d| j| _|d| j}|rnzt|| _W n> tk
rl } ztjd| d|d |W 5 d }~X Y nX |px| j| _| j| j| jf| _d | _|r| j||d d S )NzBYou need to install the boto3 library to use the DynamoDB backend.Fz6You need to specify both the Access Key ID and Secret.	localhostzhttp://:z	us-east-1z*Using local-only DynamoDB endpoint URL: {}Zdynamodb_endpoint_urlreadwriteZttl_secondszTTL must be a number; got "")exc_info)access_key_idsecret_access_key)super__init__url
table_nameboto3r	   	parse_urlr   _is_valid_ipendpoint_url
aws_regionloggerwarningformatZappconfgetintread_capacity_unitswrite_capacity_unitstime_to_live_seconds
ValueErrorerror
_key_field_value_field_timestamp_field_available_fields_client_get_client)selfr$   r%   argskwargsZaws_credentials_givenaws_access_key_idaws_secret_access_keyschemeZregionportusernamepasswordtablequeryZaccess_key_givenZsecret_key_given_getZconfig_endpoint_urlr   e	__class__ </tmp/pip-unpacked-wheel-kcem4wq5/celery/backends/dynamodb.pyr#   D   s    


zDynamoDBBackend.__init__c                 C   s*   zt |  W dS  tk
r$   Y dS X d S )NTF)r   r4   )iprK   rK   rL   r(      s
    zDynamoDBBackend._is_valid_ipc                 C   sx   | j dkrrd| ji}|dk	r,|||d | jdk	r@| j|d< tjd|| _ |   |  dk	rr|   | 	  | j S )zGet client connection.NZregion_name)r?   r@   r)   dynamodb)rN   )
r:   r*   updater)   r&   client_get_or_create_table_has_ttl_validate_ttl_methods_set_table_ttl)r<   r    r!   Zclient_parametersrK   rK   rL   r;      s*    
 

 zDynamoDBBackend._get_clientc                 C   s6   | j j| j jdg| j| j jddg| j| jddS )z=Get the boto3 structure describing the DynamoDB table schema.)AttributeNameZAttributeTypeHASH)rU   ZKeyType)ZReadCapacityUnitsZWriteCapacityUnits)ZAttributeDefinitions	TableNameZ	KeySchemaZProvisionedThroughput)r6   r   r   r%   r1   r2   r<   rK   rK   rL   _get_table_schema   s    z!DynamoDBBackend._get_table_schemac              
   C   s   |   }z| jj| jdW S  tk
r } zj|jd dd}|dkr| jjf |}t	d
| j | d t	d
| j | W Y 
S |W 5 d	}~X Y nX d	S )
z=Create table if not exists, otherwise return the description.rW   ErrorCodeUnknownZResourceNotFoundExceptionz*DynamoDB Table {} did not exist, creating.ACTIVEz#DynamoDB Table {} is now available.N)rY   r:   describe_tabler%   r   responser/   Zcreate_tabler+   infor-   _wait_for_table_status)r<   Ztable_schemarH   
error_codetable_descriptionrK   rK   rL   rQ      s(    
z$DynamoDBBackend._get_or_create_tablec                 C   s   | j dkrdS | j dkS )zReturn the desired Time to Live config.

        - True:  Enable TTL on the table; use expiry.
        - False: Disable TTL on the table; don't use expiry.
        - None:  Ignore TTL on the table; don't use expiry.
        Nr   )r3   rX   rK   rK   rL   rR      s    zDynamoDBBackend._has_ttlc                 C   sb   d}g }t |D ]}t| j|s|| q|r^tdjd|d tdjd|ddS )z:Verify boto support for the DynamoDB Time to Live methods.)update_time_to_livedescribe_time_to_livezdboto3 method(s) {methods} not found; ensure that boto3>=1.9.178 and botocore>=1.12.178 are installed,)methodsz#boto3 method(s) {methods} not foundN)	listhasattrr:   appendr+   r5   r-   joinAttributeError)r<   Zrequired_methodsZmissing_methodsmethodrK   rK   rL   rS      s"    z%DynamoDBBackend._validate_ttl_methodsc                 C   s   | j |  |ddS )zBGet the boto3 structure describing the DynamoDB TTL specification.)ZEnabledrU   )rW   ZTimeToLiveSpecification)r%   rR   )r<   ttl_attr_namerK   rK   rL   _get_ttl_specification  s
    z&DynamoDBBackend._get_ttl_specificationc              
   C   s|   z| j j| jd}W nb tk
rv } zD|jd dd}|jd dd}tdj| j||d |W 5 d }~X Y nX |S )NrZ   r[   r\   r]   MessagezJError describing Time to Live on DynamoDB table {table}: {code}: {message})rE   codemessage)	r:   rf   r%   r   r`   r/   r+   r5   r-   )r<   descriptionrH   rc   error_messagerK   rK   rL   _get_table_ttl_description!  s     
z*DynamoDBBackend._get_table_ttl_descriptionc           	      C   s|  |   }|d d }|dkrd|d d }|  r|| jjkrtdj|dkrPdnd| jd	 |S nN|d
kr|  stdj|dkrdnd| jd	 |S ntdj|| jd |dkr|n| jj}z<| j	j
f | j|d}tdj| j|  | jjd |W S  tk
rv } zT|jd dd}|jd dd}tdj|  rPdnd| j||d |W 5 d}~X Y nX dS )z,Enable or disable Time to Live on the table.ZTimeToLiveDescriptionZTimeToLiveStatus)ENABLEDZENABLINGrU   z5DynamoDB Time to Live is {situation} on table {table}rw   zalready enabledzcurrently being enabled)Z	situationrE   )DISABLEDZ	DISABLINGrx   zalready disabledzcurrently being disabledzWUnknown DynamoDB Time to Live status {status} on table {table}. Attempting to continue.)statusrE   )ro   zUDynamoDB table Time to Live updated: table={table} enabled={enabled} attribute={attr})rE   Zenabledattrr[   r\   r]   rq   zHError {action} Time to Live on DynamoDB table {table}: {code}: {message}ZenablingZ	disabling)actionrE   rr   rs   N)rv   rR   
_ttl_fieldr   r+   debugr-   r%   r,   r:   re   rp   ra   r   r`   r/   r5   )	r<   rt   ry   Zcur_attr_name	attr_namespecificationrH   rc   ru   rK   rK   rL   rT   6  s|    
		&
	zDynamoDBBackend._set_table_ttlr^   c                 C   sN   d}|sJ| j j| jd}td| j| |d d }||k}td qdS )z#Poll for the expected table status.FrZ   z+Waiting for DynamoDB table {} to become {}.ZTableZTableStatusr   N)rP   r_   r%   r+   r}   r-   r   )r<   expectedZachieved_staterd   Zcurrent_statusrK   rK   rL   rb     s    z&DynamoDBBackend._wait_for_table_statusc                 C   s   | j | jj| jj|iidS )z0Construct the item retrieval request parameters.)rW   Key)r%   r6   r   r   r<   keyrK   rK   rL   _prepare_get_request  s     z$DynamoDBBackend._prepare_get_requestc              	   C   s~   t  }| j| jj| jj|i| jj| jj|i| jj| jjt|iid}|  rz|d 	| j
j| j
jtt|| j ii |S )z/Construct the item creation request parameters.rW   Itemr   )r   r%   r6   r   r   r7   r8   strrR   rO   r|   r0   r3   )r<   r   valuer   Zput_requestrK   rK   rL   _prepare_put_request  s0       z$DynamoDBBackend._prepare_put_request)r   returnc              	   C   sF   t  }| j| jj| jj|i| jj| jjdi| jj| jjt|iidS )z7Construct the counter initialization request parameters0r   )r   r%   r6   r   r   _count_filedr8   r   )r<   r   r   rK   rK   rL   _prepare_init_count_request  s        z+DynamoDBBackend._prepare_init_count_requestc                 C   s@   | j | jj| jj|iid| jj d| jj ddddiiddS )	z2Construct the counter increment request parameterszset z = z + :numz:numr   1ZUPDATED_NEW)rW   r   ZUpdateExpressionZExpressionAttributeValuesZReturnValues)r%   r6   r   r   r   r   rK   rK   rL   _prepare_inc_count_request  s      z*DynamoDBBackend._prepare_inc_count_requestc                    s    d kri S  fdd| j D S )z1Convert get_item() response to field-value pairs.r   c                    s$   i | ]}|j  d  |j  |j qS )r   r   ).0fieldraw_responserK   rL   
<dictcomp>  s    z1DynamoDBBackend._item_to_dict.<locals>.<dictcomp>)r9   )r<   r   rK   r   rL   _item_to_dict  s
    
zDynamoDBBackend._item_to_dictc                 C   s   |   S N)r;   rX   rK   rK   rL   rP     s    zDynamoDBBackend.clientc                 C   s8   t |}| |}| jjf |}| |}|| jjS r   )r   r   rP   Zget_itemr   r/   r7   r   )r<   r   request_parametersitem_responseitemrK   rK   rL   r/     s
    

zDynamoDBBackend.getc                 C   s&   t |}| ||}| jjf | d S r   )r   r   rP   put_item)r<   r   r   r   rK   rK   rL   set  s    zDynamoDBBackend.setc                    s    fdd|D S )Nc                    s   g | ]}  |qS rK   )r/   )r   r   rX   rK   rL   
<listcomp>  s     z(DynamoDBBackend.mget.<locals>.<listcomp>rK   )r<   keysrK   rX   rL   mget  s    zDynamoDBBackend.mgetc                 C   s$   t |}| |}| jjf | d S r   )r   r   rP   Zdelete_item)r<   r   r   rK   rK   rL   delete  s    
zDynamoDBBackend.deletec                 C   s@   t |}| |}| jjf |}|d | jj | jj }t|S )z<Atomically increase the chord_count and return the new countZ
Attributes)r   r   rP   Zupdate_itemr   r   r   r0   )r<   r   r   r   Z	new_countrK   rK   rL   incr  s
    
zDynamoDBBackend.incrc                    s<   |  |d }| t|}| jjf | t j||f|S )Nr   )Zget_key_for_chordr   r   rP   r   r"   _apply_chord_incr)r<   Zheader_result_argsbodyr>   Z	chord_keyZinit_count_requestrI   rK   rL   r   '  s     z!DynamoDBBackend._apply_chord_incr)NN)NN)r^   )2__name__
__module____qualname____doc__r%   r1   r2   r*   r)   r3   Zsupports_autoexpirer   r6   r7   r   r8   r|   r9   Zimplements_incrr#   staticmethodr(   r;   rY   rQ   rR   rS   rp   rv   rT   rb   r   r   r   r   r   r   r   r   propertyrP   r/   r   r   r   bytesr0   r   r   __classcell__rK   rK   rI   rL   r      sP   Y



p
	
r   )r   collectionsr   	ipaddressr   r   r   typingr   r   Zkombu.utils.urlr   r'   Zcelery.exceptionsr	   Zcelery.utils.logr
   baser   r&   Zbotocore.exceptionsr   ImportError__all__r   r   r+   r   rK   rK   rK   rL   <module>   s"   
