U
    bh"                     @  s  d Z ddlmZ ddlZddlmZ ddlmZ ddlm	Z	 ddl
mZ ddlmZmZ dd	lmZ d
dlmZ zddlmZ W n ek
r   dZY nX zddlmZmZ W n ek
r   dZdZY nX dd ejD ZG dd dejZG dd dejZdS )a  Azure Storage Queues transport module for kombu.

More information about Azure Storage Queues:
https://azure.microsoft.com/en-us/services/storage/queues/

Features
========
* Type: Virtual
* Supports Direct: *Unreviewed*
* Supports Topic: *Unreviewed*
* Supports Fanout: *Unreviewed*
* Supports Priority: *Unreviewed*
* Supports TTL: *Unreviewed*

Connection String
=================

Connection string has the following formats:

.. code-block::

    azurestoragequeues://<STORAGE_ACCOUNT_ACCESS_KEY>@<STORAGE_ACCOUNT_URL>
    azurestoragequeues://<SAS_TOKEN>@<STORAGE_ACCOUNT_URL>
    azurestoragequeues://DefaultAzureCredential@<STORAGE_ACCOUNT_URL>
    azurestoragequeues://ManagedIdentityCredential@<STORAGE_ACCOUNT_URL>

Note that if the access key for the storage account contains a forward slash
(``/``), it will have to be regenerated before it can be used in the connection
URL.

.. code-block::

    azurestoragequeues://DefaultAzureCredential@<STORAGE_ACCOUNT_URL>
    azurestoragequeues://ManagedIdentityCredential@<STORAGE_ACCOUNT_URL>

If you wish to use an `Azure Managed Identity` you may use the
``DefaultAzureCredential`` format of the connection string which will use
``DefaultAzureCredential`` class in the azure-identity package. You may want to
read the `azure-identity documentation` for more information on how the
``DefaultAzureCredential`` works.

.. _azure-identity documentation:
https://learn.microsoft.com/en-us/python/api/overview/azure/identity-readme?view=azure-python
.. _Azure Managed Identity:
https://learn.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/overview

Transport Options
=================

* ``queue_name_prefix``
    )annotationsN)Empty)Any)ResourceExistsError)safe_str)dumpsloads)cached_property   )virtual)QueueServiceClient)DefaultAzureCredentialManagedIdentityCredentialc                 C  s   i | ]}t |d qS )-   )ord).0c r   F/tmp/pip-unpacked-wheel-earovzxb/kombu/transport/azurestoragequeues.py
<dictcomp>Q   s     r   c                      s   e Zd ZU dZdZded< dZded< i Zded	< d
Zded< e	 Z
ded<  fddZ fddZefddddZdd Zdd Zdd Zd+ddZdd Zd d! Zed"dd#d$Zed%d& Zed'd( Zeddd)d*Z  ZS ),ChannelzAzure Storage Queues channel.zkombu%(vhost)sstrdomain_formatNzQueueServiceClient | None_queue_servicezdict[Any, Any]_queue_name_cacheTboolno_ackzset[Any]_noack_queuesc                   sV   t d krtdt j|| t| jj\| _| _	| j
 D ]}|| j|d < q>d S )NzGAzure Storage Queues transport requires the azure-storage-queue libraryname)r   ImportErrorsuper__init__	Transport	parse_uriconninfohostname_credential_urlqueue_serviceZlist_queuesr   )selfargskwargsqueue	__class__r   r   r!   _   s    zChannel.__init__c                   s&   |r| j | t j||f||S N)r   addr    basic_consume)r)   r,   r   r*   r+   r-   r   r   r1   m   s    zChannel.basic_consume)returnc                 C  s   t t||S )z=Format AMQP queue name into a valid Azure Storage Queue name.)r   r   	translate)r)   r   tabler   r   r   entity_namet   s    zChannel.entity_namec                 C  s   |  | j| }z| jj| j| d}W nV tk
r~   z| j|}W n" tk
rj   | jj|d}Y nX |	 | j|< Y nX |S )zEnsure a queue exists.)r,   )
r5   queue_name_prefixr   Zget_queue_clientr   KeyErrorr(   Zcreate_queuer   get_queue_propertiesr)   r,   qr   r   r   _ensure_queuex   s    
zChannel._ensure_queuec                 O  s(   |  |}| j|d | j| dS )zDelete queue by name.N)r5   r   popr(   Zdelete_queue)r)   r,   r*   r+   
queue_namer   r   r   _delete   s    
zChannel._deletec                 K  s    |  |}t|}|| dS )zPut message onto queue.N)r;   r   send_message)r)   r,   messager+   r:   Zencoded_messager   r   r   _put   s    
zChannel._putc                 C  sZ   |  |}|jd|d}zt|}W n tk
r>   t Y nX t|j}|j|d |S )z/Try to retrieve a single message off ``queue``.r
   )Zmessages_per_pagetimeout)r@   )r;   Zreceive_messagesnextStopIterationr   r   contentZdelete_message)r)   r,   rB   r:   messagesr@   rE   r   r   r   _get   s    

zChannel._getc                 C  s   |  |}| jS )z)Return the number of messages in a queue.)r;   r8   Zapproximate_message_countr9   r   r   r   _size   s    
zChannel._sizec                 C  s"   |  |}| |j}|  |S )z'Delete all current messages in a queue.)r;   rH   r=   Zclear_messages)r)   r,   r:   nr   r   r   _purge   s    
zChannel._purger   c                 C  s"   | j d krt| j| jd| _ | j S )N)Zaccount_url
credential)r   r   r'   r&   r)   r   r   r   r(      s    
 zChannel.queue_servicec                 C  s   | j jS r/   )
connectionclientrL   r   r   r   r$      s    zChannel.conninfoc                 C  s
   | j jjS r/   )rM   rN   transport_optionsrL   r   r   r   rO      s    zChannel.transport_optionsc                 C  s   | j ddS )Nr6    )rO   getrL   r   r   r   r6      s    zChannel.queue_name_prefix)N)__name__
__module____qualname____doc__r   __annotations__r   r   r   setr   r!   r1   CHARS_REPLACE_TABLEr5   r;   r>   rA   rG   rH   rJ   propertyr(   r$   rO   r	   r6   __classcell__r   r   r-   r   r   V   s.   



r   c                   @  sh   e Zd ZU dZeZdZded< dZded< dZd	ed
< e	dddddZ
eddd	dddddZdS )r"   zAzure Storage Queues transport.r
   intpolling_intervalNz
int | Nonedefault_portTr   can_parse_urlr   ztuple[str | dict, str])urir2   c                 C  s   z|  dd} | dd\}}d | krFtd kr>tdt }nBd | krntd krftdt }nd	|krd
|krd	|d}t||gstW n tk
r   t	dY nX ||fS )Nzazurestoragequeues://rP   @r
   r   z`Azure Storage Queues transport with a DefaultAzureCredential requires the azure-identity libraryr   zcAzure Storage Queues transport with a ManagedIdentityCredential requires the azure-identity libraryZdevstoreaccount1z.core.windows.net)Zaccount_nameZaccount_keyzNeed a URI like azurestoragequeues://{SAS or access key}@{URL}, azurestoragequeues://DefaultAzureCredential@{URL}, , or azurestoragequeues://ManagedIdentityCredential@{URL})
replacersplitlowerr   r   r   allAssertionError	Exception
ValueError)r_   rK   urlr   r   r   r#      s*    
zTransport.parse_uriF**)r_   include_passwordmaskr2   c                 C  s"   |  |\}}d|r|n||S )Nzazurestoragequeues://{}@{})r#   format)clsr_   rj   rk   rK   rh   r   r   r   as_uri   s
    
 zTransport.as_uri)Fri   )rR   rS   rT   rU   r   r\   rV   r]   r^   staticmethodr#   classmethodrn   r   r   r   r   r"      s   
0   r"   )rU   
__future__r   stringr,   r   typingr   Zazure.core.exceptionsr   Zkombu.utils.encodingr   Zkombu.utils.jsonr   r   Zkombu.utils.objectsr	   rP   r   Zazure.storage.queuer   r   Zazure.identityr   r   punctuationrX   r   r"   r   r   r   r   <module>   s.   4

p