Source code for c7n.ctx

# Copyright 2015-2018 Capital One Services, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import, division, print_function, unicode_literals

import time
import uuid
import os


from c7n.output import (
    api_stats_outputs,
    blob_outputs,
    log_outputs,
    metrics_outputs,
    sys_stats_outputs,
    tracer_outputs)

from c7n.utils import reset_session_cache, dumps, local_session
from c7n.version import version


[docs]class ExecutionContext(object): """Policy Execution Context.""" def __init__(self, session_factory, policy, options): self.policy = policy self.options = options self.session_factory = session_factory # Runtime initialized during policy execution # We treat policies as a fly weight pre-execution. self.start_time = None self.execution_id = None self.output = None self.api_stats = None self.sys_stats = None # A few tests patch on metrics flush self.metrics = metrics_outputs.select(self.options.metrics_enabled, self) # Tracer is wired into core filtering code / which is getting # invoked sans execution context entry in tests self.tracer = tracer_outputs.select(self.options.tracer, self)
[docs] def initialize(self): self.output = blob_outputs.select(self.options.output_dir, self) self.logs = log_outputs.select(self.options.log_group, self) # Always do file/blob storage outputs self.output_logs = None if not isinstance(self.logs, log_outputs['default']): self.output_logs = log_outputs.select(None, self) # Look for customizations, but fallback to default for api_stats_type in (self.policy.provider_name, 'default'): if api_stats_type in api_stats_outputs: self.api_stats = api_stats_outputs.select(api_stats_type, self) break for sys_stats_type in ('psutil', 'default'): if sys_stats_type in sys_stats_outputs: self.sys_stats = sys_stats_outputs.select(sys_stats_type, self) break self.start_time = time.time() self.execution_id = str(uuid.uuid4())
@property def log_dir(self): return self.output.root_dir def __enter__(self): self.initialize() self.session_factory.policy_name = self.policy.name self.sys_stats.__enter__() self.output.__enter__() self.logs.__enter__() if self.output_logs: self.output_logs.__enter__() self.api_stats.__enter__() self.tracer.__enter__() # Api stats and user agent modification by policy require updating # in place the cached session thread local. update_session = getattr(self.session_factory, 'update', None) if update_session: update_session(local_session(self.session_factory)) return self def __exit__(self, exc_type=None, exc_value=None, exc_traceback=None): if exc_type is not None and self.metrics: self.metrics.put_metric('PolicyException', 1, "Count") self.policy._write_file( 'metadata.json', dumps(self.get_metadata(), indent=2)) self.api_stats.__exit__(exc_type, exc_value, exc_traceback) with self.tracer.subsegment('output'): self.metrics.flush() self.logs.__exit__(exc_type, exc_value, exc_traceback) if self.output_logs: self.output_logs.__exit__(exc_type, exc_value, exc_traceback) self.output.__exit__(exc_type, exc_value, exc_traceback) self.tracer.__exit__() self.session_factory.policy_name = None # IMPORTANT: multi-account execution (c7n-org and others) need # to manually reset this. Why: Not doing this means we get # excessive memory usage from client reconstruction for dynamic-gen # sdks. if os.environ.get('C7N_TEST_RUN'): reset_session_cache()
[docs] def get_metadata(self, include=('sys-stats', 'api-stats', 'metrics')): t = time.time() md = { 'policy': self.policy.data, 'version': version, 'execution': { 'id': self.execution_id, 'start': self.start_time, 'end_time': t, 'duration': t - self.start_time}, 'config': dict(self.options) } if 'sys-stats' in include and self.sys_stats: md['sys-stats'] = self.sys_stats.get_metadata() if 'api-stats' in include and self.api_stats: md['api-stats'] = self.api_stats.get_metadata() if 'metrics' in include and self.metrics: md['metrics'] = self.metrics.get_metadata() return md