Source code for c7n.executor

# Copyright 2015-2017 Capital One Services, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import, division, print_function, unicode_literals

from concurrent.futures import (
    ProcessPoolExecutor, ThreadPoolExecutor)

from c7n.registry import PluginRegistry

import threading


[docs]class ExecutorRegistry(PluginRegistry): def __init__(self, plugin_type): super(ExecutorRegistry, self).__init__(plugin_type) self.register('process', ProcessPoolExecutor) self.register('thread', ThreadPoolExecutor) self.register('main', MainThreadExecutor)
[docs]def executor(name, **kw): factory = executors.get(name) # post element refactoring # factory.validate(kw) if factory is None: raise ValueError("No Such Executor %s" % name) return factory(**kw)
[docs]class MainThreadExecutor(object): """ For running tests. c7n_async == True -> catch exceptions and store them in the future. c7n_async == False -> let exceptions bubble up. """ c7n_async = True # For Dev/Unit Testing with concurrent.futures def __init__(self, *args, **kw): self.args = args self.kw = kw
[docs] def map(self, func, iterable): for args in iterable: yield func(args)
[docs] def submit(self, func, *args, **kw): try: return MainThreadFuture(func(*args, **kw)) except Exception as e: if self.c7n_async: return MainThreadFuture(None, exception=e) raise
def __enter__(self): return self def __exit__(self, *args): return False
[docs]class MainThreadFuture(object): # For Dev/Unit Testing with concurrent.futures def __init__(self, value, exception=None): self.value = value self._exception = exception # Sigh concurrent.futures pokes at privates self._state = 'FINISHED' self._waiters = [] self._condition = threading.Condition()
[docs] def cancel(self): return False
[docs] def cancelled(self): return False
[docs] def exception(self): return self._exception
[docs] def done(self): return True
[docs] def result(self, timeout=None): if self._exception: raise self._exception return self.value
[docs] def add_done_callback(self, fn): return fn(self)
executors = ExecutorRegistry('executor') executors.load_plugins()