Source code for c7n.sqsexec

# Copyright 2015-2017 Capital One Services, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""

concurrent.futures implementation over sqs


Scatter/Gather or Map/Reduce style over two sqs queues.

"""
from __future__ import absolute_import, division, print_function, unicode_literals

import random
import logging
import inspect

from c7n import utils

from concurrent.futures import Executor, Future

log = logging.getLogger('custodian.sqsexec')


[docs]def named(o): assert inspect.isfunction(o) return "%s:%s" % (o.__module__, o.__name__)
[docs]def resolve(o): name, func = o.rsplit(':', 1) module = __import__(name, fromlist=[True]) return getattr(module, func)
[docs]class SQSExecutor(Executor): def __init__(self, session_factory, map_queue, reduce_queue): self.session_factory = session_factory self.map_queue = map_queue self.reduce_queue = reduce_queue self.sqs = utils.local_session(self.session_factory).client('sqs') self.op_sequence = self.op_sequence_start = int(random.random() * 1000000) self.futures = {}
[docs] def submit(self, func, *args, **kwargs): """Submit a function for serialized execution on sqs """ self.op_sequence += 1 self.sqs.send_message( QueueUrl=self.map_queue, MessageBody=utils.dumps({'args': args, 'kwargs': kwargs}), MessageAttributes={ 'sequence_id': { 'StringValue': str(self.op_sequence), 'DataType': 'Number'}, 'op': { 'StringValue': named(func), 'DataType': 'String', }, 'ser': { 'StringValue': 'json', 'DataType': 'String'}} ) self.futures[self.op_sequence] = f = SQSFuture( self.op_sequence) return f
[docs] def gather(self): """Fetch results from separate queue """ limit = self.op_sequence - self.op_sequence_start results = MessageIterator(self.sqs, self.reduce_queue, limit) for m in results: # sequence_id from above msg_id = int(m['MessageAttributes']['sequence_id']['StringValue']) if (not msg_id > self.op_sequence_start or not msg_id <= self.op_sequence or msg_id not in self.futures): raise RuntimeError( "Concurrent queue user from different " "process or previous results") f = self.futures[msg_id] f.set_result(m) results.ack(m)
def __enter__(self): return self def __exit__(self, *args): return False
[docs]class MessageIterator(object): msg_attributes = ['sequence_id', 'op', 'ser'] def __init__(self, client, queue_url, limit=0, timeout=10): self.client = client self.queue_url = queue_url self.limit = limit or limit self.timeout = timeout self.messages = [] def __iter__(self): return self def __next__(self): if self.messages: return self.messages.pop(0) response = self.client.receive_message( QueueUrl=self.queue_url, WaitTimeSeconds=self.timeout, MessageAttributeNames=self.msg_attributes) msgs = response.get('Messages', []) for m in msgs: self.messages.append(m) if self.messages: return self.messages.pop(0) raise StopIteration() next = __next__ # back-compat
[docs] def ack(self, m): self.client.delete_message( QueueUrl=self.queue_url, ReceiptHandle=m['ReceiptHandle'])
[docs]class SQSWorker(object): stopped = False def __init__(self, session_factory, map_queue, reduce_queue, limit=0): self.session_factory = session_factory self.client = utils.local_session(self.session_factory).client('sqs') self.receiver = MessageIterator(self.client, map_queue, limit)
[docs] def run(self): for m in self.receiver: while not self.stopped: self.process_message(m) self.receiver.ack(m)
[docs] def stop(self): self.stopped = True
[docs] def process_message(self, m): msg = utils.loads(m['Body']) op_name = m['MessageAttributes']['op']['StringValue'] func = resolve(op_name) try: func(*msg['args'], **msg['kwargs']) except Exception as e: log.exception( "Error invoking %s %s" % ( op_name, e)) return
[docs]class SQSFuture(Future): marker = object() def __init__(self, sequence_id): super(SQSFuture, self).__init__() self.sequence_id = sequence_id