>>> queue = sqs.get_queue_by_name(QueueName='test')
>>> print(queue)
sqs.Queue(url='https://eu-west-1.queue.amazonaws.com/264418146286/test')
>>> queue = sqs.get_queue_by_name(QueueName='NOtest')
Traceback (most recent call last):
...
botocore.exceptions.ClientError: An error occurred (AWS.SimpleQueueService.NonExistentQueue) when calling the GetQueueUrl operation: The specified queue does not exist for this wsdl version.
Чтение из очереди:
>>> queue = sqs.create_queue(QueueName='test2', Attributes={'DelaySeconds': '5'})
>>> sqs = session.resource('sqs')
>>> for queue in sqs.queues.all():
... print(queue.url)
...
https://eu-west-1.queue.amazonaws.com/264418146286/test
https://eu-west-1.queue.amazonaws.com/264418146286/test2
И пример: скрипт пересылки запускается PHP-скриптом приложения, обращается к очереди, пытается читать сообщения. Если сообщения есть — он парсит его, выделяя поля to, subject и content. Из поля content — берется ссылка на PHP-шаблон письма, в котором с помощью Pythonstring.Template() заменяются переменные $email (логин пользователя) и $password, после чего письмо отправляется адресату с его данными доступа.
#!/usr/bin/env python
import urllib2
import json
import os
import sys
import logging
import collections
from boto3.session import Session
from string import Template
from urlparse import urljoin
# for raw_send
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
class Logger(object):
def __init__(self, logpath):
"""Create Logger"""
self.logpath = logpath
if not os.path.isdir(os.path.join(self.logpath, 'logs')):
os.mkdir(os.path.join(self.logpath, 'logs'))
def logger(self, modname):
self.log = logging.getLogger(modname)
formatter = logging.Formatter('%(asctime)s - %(filename)s[LINE:%(lineno)d] - %(name)s.%(funcName)s() - %(message)s')
self.log.setLevel(logging.DEBUG)
filehandler = logging.FileHandler(os.path.join(self.logpath, 'logs', 'courier.log'))
filehandler.setLevel(logging.DEBUG)
filehandler.setFormatter(formatter)
consolehandler = logging.StreamHandler()
consolehandler.setLevel(logging.INFO)
self.log.addHandler(filehandler)
self.log.addHandler(consolehandler)
logger = Logger('.')
logger.logger(__file__)
session = Session(aws_access_key_id='***',
aws_secret_access_key='***',
region_name='eu-west-1')
sqs = session.client('sqs')
ses = session.client('ses')
def convert(data):
""" UTF8 => ASCII converter for body['parameters']"""
if isinstance(data, basestring):
return str(data)
elif isinstance(data, collections.Mapping):
return dict(map(convert, data.iteritems()))
elif isinstance(data, collections.Iterable):
return type(data)(map(convert, data))
else:
return data
def ses_send(m_to, m_subj, m_text):
"""Obsolete but leave it here"""
logger.log.info('Sent email to: {}'.format(m_to))
response = ses.send_email(
Source='fromaddr@domain.com',
Destination={
'ToAddresses': [
m_to
]
},
Message={
'Subject': {
'Data': m_subj
},
'Body': {
'Text': {
'Data': 'Content text'
},
'Html': {
'Data': m_text
}
}
},
ReplyToAddresses=[
'fromaddr@domain.com'
],
SourceArn='arn:aws:ses:eu-west-1:***:identity/fromaddr@domain.com'
)
return response['ResponseMetadata']['HTTPStatusCode']
def ses_raw_send(m_to, m_subj, m_text):
to = []
to.append(m_to)
logger.log.info('Sent email to: {}'.format(m_to))
msg = MIMEMultipart()
msg['Subject'] = m_subj
msg['From'] = 'Sender Name <fromaddr@domain.com>'
msg['To'] = m_to
msg.attach(MIMEText(m_text,'html'))
response = ses.send_raw_email(
RawMessage={
'Data': msg.as_string()
},
Destinations=to,
SourceArn='arn:aws:ses:eu-west-1:***:identity/fromaddr@domain.com'
)
# 200 if OK
return response['ResponseMetadata']['HTTPStatusCode']
if __name__ == '__main__':
# call as
# ./courier.py fl-dev-emails
# ./courier.py fl-prod-emails
if len(sys.argv) == 2:
q_url = urljoin('https://sqs.eu-west-1.amazonaws.com/***/', sys.argv[1])
else:
logger.log.error('ERROR: SQS name must be specified as first argument. Exit.')
exit(1)
logger.log.info('Started with URL {}'.format(q_url))
while True:
message = sqs.receive_message(QueueUrl=q_url)
try:
body = json.loads(message.get('Messages')[0]["Body"])
m_id = message.get('Messages')[0]["ReceiptHandle"]
m_to = body['to']
m_subj = body['title']
m_url = body['content']
data = urllib2.urlopen('{}'.format(m_url))
m_content = """{}""".format(data.read())
# try fetch 'parameters' as some SQS messages may contain only static data
try:
tmpl = Template(m_content)
m_params = convert(body['parameters'])
m_text = tmpl.substitute(m_params)
except KeyError as error:
logger.log.info('No Params: {}'.format(error))
m_text = m_content
if ses_raw_send(m_to, m_subj, m_text) == 200:
sqs.delete_message(QueueUrl=q_url, ReceiptHandle=m_id)
logger.log.info('Message {} deleted from queue'.format(message.get('Messages')[0]["MessageId"]))
else:
print 'ERR'
exit(1)
# concerns about timeouts
# 'NoneType' object has no attribute '__getitem__'
except TypeError as error:
logger.log.info('Seems queue empty, exiting')
break
Пример скрипта для отправки тестового сообщения в SQS, которое потом обрабатывается скриптом выше: