Python: пример AWS boto3 SQS и SES

Автор: | 19/03/2016
 

Pythonboto3 – набор библиотек для Python, для работы с AWS.

AWS CLI “под капотом” использует boto3 для работы с ядром Amazon Web Services.

Данный пост – ни разу не HowTo, а скорее просто набор заметок и быстрых примеров + пример скрипта, использующего boto3 для работы с AWS SQS и SES.

Документация: https://boto3.readthedocs.org/en/latest

Документация по модулю Sessionhttp://boto3.readthedocs.org/en/latest/reference/core/session.html

Документация по модулю sqshttps://boto3.readthedocs.org/en/latest/guide/sqs.html

Документация по модулю seshttps://boto3.readthedocs.org/en/latest/reference/services/ses.html

Установка:

$ sudo pip install boto3

Для авторизации boto3 использует ~/.aws/credentials, ~/.aws/config либо данные, указанные в самом коде при создании сессии session.Session().

Проверяем:

>>> import boto3
>>> ec2 = boto3.resource('ec2')
>>> help(ec2)

Результат:

Help on ec2.ServiceResource in module boto3.resources.factory object:

class ec2.ServiceResource(boto3.resources.base.ServiceResource)
 |  Method resolution order:
 |      ec2.ServiceResource
 |      boto3.resources.base.ServiceResource
 |      __builtin__.object
 |
 |  Methods defined here:
 |
 |  ClassicAddress(self, *args, **kwargs)
 |      Creates a ClassicAddress resource.::

...

Создать сессию и просмотреть имеющиеся инстансы EC2 можно так:

>>> from boto3.session import Session
>>> session = Session(aws_access_key_id='AKI***D2A',
aws_secret_access_key='E4Y***uR7', region_name='eu-west-1')
>>> ec2 = session.resource('ec2')
>>> for instance in ec2.instances.all():
...   print(instance.id) 
...       
i-82a14b08

Использование фильтров:

>>> instances = ec2.instances.filter(
...   Filters=[{'Name': 'instance-state-name', 'Values': ['running']}])
>>> for instance in instances:
...   print(instance.id, instance.instance_type)
...
('i-82a14b08', 't2.micro') 

Статус:

>>> for status in ec2.meta.client.describe_instance_status()['InstanceStatuses']:
...   print(status)
... 
{u'InstanceId': 'i-82a14b08', u'InstanceState': {u'Code': 16, u'Name': 'running'}, u'AvailabilityZone': 'eu-west-1a', u'SystemStatus': {u'Status': 'ok', u'Details': [{u'Status': 'passed', u'Name': 'reachability'}]}, u'InstanceStatus': {u'Status': 'ok', u'Details': [{u'Status': 'passed', u'Name': 'reachability'}]}}

Работа с AWS SQS

Создаем новую очередь:

>>> sqs = session.resource('sqs')
>>> queue = sqs.create_queue(QueueName='test', Attributes={'DelaySeconds': '5'})

Вот так она выглядит в консоли AWS:

Screen Shot 2016-03-03 at 12.41.05

Или в AWS CLI:

$ aws sqs list-queues
{
    "QueueUrls": [
        "https://eu-west-1.queue.amazonaws.com/264418146286/test"
    ]
}

Чтение из очереди:

>>> print(queue.url)
https://eu-west-1.queue.amazonaws.com/264418146286/test
>>> print(queue.attributes.get('DelaySeconds'))
5

Поиск очереди по имени:

>>> queue = sqs.get_queue_by_name(QueueName='test')
>>> print(queue)
sqs.Queue(url='https://eu-west-1.queue.amazonaws.com/264418146286/test')
>>> queue = sqs.get_queue_by_name(QueueName='NOtest')
Traceback (most recent call last):
    ...
botocore.exceptions.ClientError: An error occurred (AWS.SimpleQueueService.NonExistentQueue) when calling the GetQueueUrl operation: The specified queue does not exist for this wsdl version.

Чтение из очереди:

>>> queue = sqs.create_queue(QueueName='test2', Attributes={'DelaySeconds': '5'})
>>> sqs = session.resource('sqs')
>>> for queue in sqs.queues.all():
...   print(queue.url)
...
https://eu-west-1.queue.amazonaws.com/264418146286/test
https://eu-west-1.queue.amazonaws.com/264418146286/test2

Отправка сообщения в очередь:

>>> sqs = session.resource('sqs')
>>> queue = sqs.get_queue_by_name(QueueName='test')
>>> response = queue.send_message(MessageBody='world')
>>> print(response.get('MessageId'))
1d134a72-91a2-4f48-aa1b-38d66b414b9b
>>> print(response.get('MD5OfMessageBody'))
7d793037a0760186574b0282f2f435e7

Еще немного документации:

http://docs.aws.amazon.com/cli/latest/reference/sqs/list-queues.html

https://boto3.readthedocs.org/en/latest/guide/sqs.html

Работа с AWS SES

Используем resource вместо client.

Для отправки писем можно использовать методы send_email() и send_raw_email().
Пример с send_mail():

ses = session.client('ses')

print(ses)

response = ses.send_email(
    Source='[email protected]',
    Destination={
        'ToAddresses': [
            '[email protected]', '[email protected]'
        ],
        'CcAddresses': [
            '[email protected]', '[email protected]',
        ]
    },
    Message={
        'Subject': {
            'Data': 'Test SES Boto3 - Subj'
        },
        'Body': {
            'Text': {
                'Data': 'Test SES Boto3 - Body'
            }
        }
    },
    ReplyToAddresses=[
        '[email protected]'
    ],
    SourceArn='arn:aws:ses:eu-west-1:***:identity/***@***.com'
)

print(response)</pre>
Результат:
$ ./queues.py
&lt;botocore.client.SES object at 0x1037abf10&gt;
{'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': '3d200e82-e137-11e5-9b70-93e5e7cade1b'}, u'MessageId': '010201533c57e697-87c0b5a9-204c-4a8e-98df-1a5b5e247be4-000000'};

И пример: скрипт пересылки запускается PHP-скриптом приложения, обращается к очереди, пытается читать сообщения. Если сообщения есть – он парсит его, выделяя поля to, subject и content. Из поля content – берется ссылка на PHP-шаблон письма, в котором с помощью Python string.Template() заменяются переменные $email (логин пользователя) и $password, после чего письмо отправляется адресату с его данными доступа.

#!/usr/bin/env python

import urllib2
import json
import os
import sys
import logging
import collections

from boto3.session import Session
from string import Template
from urlparse import urljoin

# for raw_send
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText

class Logger(object):
    def __init__(self, logpath):

        """Create Logger"""

        self.logpath = logpath

        if not os.path.isdir(os.path.join(self.logpath, 'logs')):
            os.mkdir(os.path.join(self.logpath, 'logs'))

    def logger(self, modname):

        self.log = logging.getLogger(modname)

        formatter = logging.Formatter('%(asctime)s  - %(filename)s[LINE:%(lineno)d] - %(name)s.%(funcName)s() - %(message)s')

        self.log.setLevel(logging.DEBUG)

        filehandler = logging.FileHandler(os.path.join(self.logpath, 'logs', 'courier.log'))
        filehandler.setLevel(logging.DEBUG)
        filehandler.setFormatter(formatter)

        consolehandler = logging.StreamHandler()
        consolehandler.setLevel(logging.INFO)

        self.log.addHandler(filehandler)
        self.log.addHandler(consolehandler)


logger = Logger('.')
logger.logger(__file__)

session = Session(aws_access_key_id='***',
                  aws_secret_access_key='***',
                  region_name='eu-west-1')

sqs = session.client('sqs')
ses = session.client('ses')


def convert(data):

    """ UTF8 =&gt; ASCII converter for body['parameters']"""

    if isinstance(data, basestring):
        return str(data)
    elif isinstance(data, collections.Mapping):
        return dict(map(convert, data.iteritems()))
    elif isinstance(data, collections.Iterable):
        return type(data)(map(convert, data))
    else:
        return data


def ses_send(m_to, m_subj, m_text):

    """Obsolete but leave it here"""

    logger.log.info('Sent email to: {}'.format(m_to))

    response = ses.send_email(
    Source='[email protected]',
    Destination={
        'ToAddresses': [
            m_to
        ]
    },
    Message={
        'Subject': {
            'Data': m_subj
        },
        'Body': {
            'Text': {
                'Data': 'Content text'
            },
            'Html': {
                'Data': m_text
            }
        }
    },
    ReplyToAddresses=[
        '[email protected]'
    ],
    SourceArn='arn:aws:ses:eu-west-1:***:identity/[email protected]'
)

    return response['ResponseMetadata']['HTTPStatusCode']


def ses_raw_send(m_to, m_subj, m_text):

    to = []
    to.append(m_to)

    logger.log.info('Sent email to: {}'.format(m_to))

    msg = MIMEMultipart()
    msg['Subject'] = m_subj
    msg['From'] = 'Sender Name &lt;[email protected]&gt;'
    msg['To'] = m_to
    msg.attach(MIMEText(m_text,'html'))

    response = ses.send_raw_email(
        RawMessage={
            'Data': msg.as_string()
        },
        Destinations=to,
        SourceArn='arn:aws:ses:eu-west-1:***:identity/[email protected]'
    )

    # 200 if OK
    return response['ResponseMetadata']['HTTPStatusCode']


if __name__ == '__main__':

    # call as
    # ./courier.py fl-dev-emails
    # ./courier.py fl-prod-emails
    if len(sys.argv) == 2:
        q_url = urljoin('https://sqs.eu-west-1.amazonaws.com/***/', sys.argv[1])
    else:
        logger.log.error('ERROR: SQS name must be specified as first argument. Exit.')
        exit(1)

    logger.log.info('Started with URL {}'.format(q_url))

    while True:

        message = sqs.receive_message(QueueUrl=q_url)

        try:

            body = json.loads(message.get('Messages')[0]["Body"])

            m_id = message.get('Messages')[0]["ReceiptHandle"]
            m_to = body['to']
            m_subj = body['title']
            m_url = body['content']

            data = urllib2.urlopen('{}'.format(m_url))
            m_content = """{}""".format(data.read())

            # try fetch 'parameters' as some SQS messages may contain only static data
            try:
                tmpl = Template(m_content)
                m_params = convert(body['parameters'])
                m_text = tmpl.substitute(m_params)
            except KeyError as error:
                logger.log.info('No Params: {}'.format(error))
                m_text = m_content

            if ses_raw_send(m_to, m_subj, m_text) == 200:
                sqs.delete_message(QueueUrl=q_url, ReceiptHandle=m_id)
                logger.log.info('Message {} deleted from queue'.format(message.get('Messages')[0]["MessageId"]))
            else:
                print 'ERR'
                exit(1)
        # concerns about timeouts
        # 'NoneType' object has no attribute '__getitem__'
        except TypeError as error:
            logger.log.info('Seems queue empty, exiting')
            break

Пример скрипта для отправки тестового сообщения в SQS, которое потом обрабатывается скриптом выше:

#!/usr/bin/env python

import time

from boto3.session import Session

session = Session(aws_access_key_id='***',
                  aws_secret_access_key='***',
                  region_name='eu-west-1')

sqs = session.resource('sqs')
#sqs = session.client('ses')
queue = sqs.get_queue_by_name(QueueName='fl-dev-emails')

count = 10
a = 1

while a &lt;= count:

    response = queue.send_messages(Entries=[
        {
            'Id': '{}'.format(a),
           'MessageBody': '{"to": "[email protected]", '
                           '"title": "Test - Entry welcome", '
                           '"content": "http:\/\/entry.dev.domain.com\/api\/application\/welcome.php",'
                            '"parameters": {"email": "[email protected]", "password": "p@ssw0rd"}'
                           '}'
        }
    ])

    a += 1</pre>

 

UPD Наткнулся на неплохой обзор с примерами по boto3 тут>>>.