Source code for c7n.policy

# Copyright 2015-2018 Capital One Services, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import, division, print_function, unicode_literals

from datetime import datetime
from dateutil import parser, tz as tzutil
import json
import fnmatch
import itertools
import logging
import os
import time

import jmespath
import six

from c7n.cwe import CloudWatchEvents
from c7n.ctx import ExecutionContext
from c7n.exceptions import PolicyValidationError, ClientError, ResourceLimitExceeded
from c7n.output import DEFAULT_NAMESPACE
from c7n.resources import load_resources
from c7n.registry import PluginRegistry
from c7n.provider import clouds
from c7n import utils
from c7n.version import version

log = logging.getLogger('c7n.policy')


[docs]def load(options, path, format='yaml', validate=True, vars=None): # should we do os.path.expanduser here? if not os.path.exists(path): raise IOError("Invalid path for config %r" % path) load_resources() data = utils.load_file(path, format=format, vars=vars) if format == 'json': validate = False if isinstance(data, list): log.warning('yaml in invalid format. The "policies:" line is probably missing.') return None # Test for empty policy file if not data or data.get('policies') is None: return None if validate: from c7n.schema import validate errors = validate(data) if errors: raise PolicyValidationError( "Failed to validate policy %s \n %s" % ( errors[1], errors[0])) collection = PolicyCollection.from_data(data, options) if validate: # non schema validation of policies [p.validate() for p in collection] return collection
[docs]class PolicyCollection(object): log = logging.getLogger('c7n.policies') def __init__(self, policies, options): self.options = options self.policies = policies
[docs] @classmethod def from_data(cls, data, options): policies = [Policy(p, options, session_factory=cls.session_factory()) for p in data.get('policies', ())] return cls(policies, options)
def __add__(self, other): return self.__class__(self.policies + other.policies, self.options)
[docs] def filter(self, policy_name=None, resource_type=None): results = [] for policy in self.policies: if resource_type: if policy.resource_type != resource_type: continue if policy_name: if not fnmatch.fnmatch(policy.name, policy_name): continue results.append(policy) return PolicyCollection(results, self.options)
def __iter__(self): return iter(self.policies) def __contains__(self, policy_name): for p in self.policies: if p.name == policy_name: return True return False def __len__(self): return len(self.policies) @property def resource_types(self): """resource types used by the collection.""" rtypes = set() for p in self.policies: rtypes.add(p.resource_type) return rtypes # cli/collection tests patch this
[docs] @classmethod def session_factory(cls): return None
[docs]class PolicyExecutionMode(object): """Policy execution semantics""" POLICY_METRICS = ('ResourceCount', 'ResourceTime', 'ActionTime') def __init__(self, policy): self.policy = policy
[docs] def run(self, event=None, lambda_context=None): """Run the actual policy.""" raise NotImplementedError("subclass responsibility")
[docs] def provision(self): """Provision any resources needed for the policy."""
[docs] def get_logs(self, start, end): """Retrieve logs for the policy""" raise NotImplementedError("subclass responsibility")
[docs] def validate(self): """Validate configuration settings for execution mode."""
[docs] def get_metrics(self, start, end, period): """Retrieve any associated metrics for the policy.""" values = {} default_dimensions = { 'Policy': self.policy.name, 'ResType': self.policy.resource_type, 'Scope': 'Policy'} metrics = list(self.POLICY_METRICS) # Support action, and filter custom metrics for el in itertools.chain( self.policy.resource_manager.actions, self.policy.resource_manager.filters): if el.metrics: metrics.extend(el.metrics) session = utils.local_session(self.policy.session_factory) client = session.client('cloudwatch') for m in metrics: if isinstance(m, six.string_types): dimensions = default_dimensions else: m, m_dimensions = m dimensions = dict(default_dimensions) dimensions.update(m_dimensions) results = client.get_metric_statistics( Namespace=DEFAULT_NAMESPACE, Dimensions=[ {'Name': k, 'Value': v} for k, v in dimensions.items()], Statistics=['Sum', 'Average'], StartTime=start, EndTime=end, Period=period, MetricName=m) values[m] = results['Datapoints'] return values
[docs]class ServerlessExecutionMode(PolicyExecutionMode):
[docs] def run(self, event=None, lambda_context=None): """Run the actual policy.""" raise NotImplementedError("subclass responsibility")
[docs] def get_logs(self, start, end): """Retrieve logs for the policy""" raise NotImplementedError("subclass responsibility")
[docs] def provision(self): """Provision any resources needed for the policy.""" raise NotImplementedError("subclass responsibility")
execution = PluginRegistry('c7n.execution')
[docs]@execution.register('pull') class PullMode(PolicyExecutionMode): """Pull mode execution of a policy. Queries resources from cloud provider for filtering and actions. """ schema = utils.type_schema('pull')
[docs] def run(self, *args, **kw): if not self.is_runnable(): return with self.policy.ctx: self.policy.log.debug( "Running policy:%s resource:%s region:%s c7n:%s", self.policy.name, self.policy.resource_type, self.policy.options.region or 'default', version) s = time.time() try: resources = self.policy.resource_manager.resources() except ResourceLimitExceeded as e: self.policy.log.error(str(e)) self.policy.ctx.metrics.put_metric( 'ResourceLimitExceeded', e.selection_count, "Count") raise rt = time.time() - s self.policy.log.info( "policy:%s resource:%s region:%s count:%d time:%0.2f" % ( self.policy.name, self.policy.resource_type, self.policy.options.region, len(resources), rt)) self.policy.ctx.metrics.put_metric( "ResourceCount", len(resources), "Count", Scope="Policy") self.policy.ctx.metrics.put_metric( "ResourceTime", rt, "Seconds", Scope="Policy") self.policy._write_file( 'resources.json', utils.dumps(resources, indent=2)) if not resources: return [] if self.policy.options.dryrun: self.policy.log.debug("dryrun: skipping actions") return resources at = time.time() for a in self.policy.resource_manager.actions: s = time.time() with self.policy.ctx.tracer.subsegment('action:%s' % a.type): results = a.process(resources) self.policy.log.info( "policy:%s action:%s" " resources:%d" " execution_time:%0.2f" % ( self.policy.name, a.name, len(resources), time.time() - s)) if results: self.policy._write_file( "action-%s" % a.name, utils.dumps(results)) self.policy.ctx.metrics.put_metric( "ActionTime", time.time() - at, "Seconds", Scope="Policy") return resources
[docs] def get_logs(self, start, end): from c7n import logs_support log_source = self.policy.ctx.output log_gen = () if self.policy.options.log_group is not None: session = utils.local_session(self.policy.session_factory) log_gen = logs_support.log_entries_from_group( session, self.policy.options.log_group, start, end, ) elif log_source.type == 's3': raw_entries = logs_support.log_entries_from_s3( self.policy.session_factory, log_source, start, end, ) # log files can be downloaded out of order, so sort on timestamp # log_gen isn't really a generator once we do this, but oh well log_gen = sorted( logs_support.normalized_log_entries(raw_entries), key=lambda e: e.get('timestamp', 0), ) else: log_path = os.path.join(log_source.root_dir, 'custodian-run.log') with open(log_path) as log_fh: raw_entries = log_fh.readlines() log_gen = logs_support.normalized_log_entries(raw_entries) return logs_support.log_entries_in_range( log_gen, start, end, )
[docs] def is_runnable(self): now = datetime.now(self.policy.tz) if self.policy.start and self.policy.start > now: self.policy.log.info( "Skipping policy:%s start-date:%s is after current-date:%s", self.policy.name, self.policy.start, now) return False if self.policy.end and self.policy.end < now: self.policy.log.info( "Skipping policy:%s end-date:%s is before current-date:%s", self.policy.name, self.policy.end, now) return False if self.policy.region and ( self.policy.region != self.policy.options.region): self.policy.log.info( "Skipping policy:%s target-region:%s current-region:%s", self.policy.name, self.policy.region, self.policy.options.region) return False return True
[docs]class LambdaMode(ServerlessExecutionMode): """A policy that runs/executes in lambda.""" POLICY_METRICS = ('ResourceCount',) schema = { 'type': 'object', 'additionalProperties': False, 'properties': { 'execution-options': {'type': 'object'}, 'function-prefix': {'type': 'string'}, 'member-role': {'type': 'string'}, 'packages': {'type': 'array', 'items': {'type': 'string'}}, # Lambda passthrough config 'layers': {'type': 'array', 'items': {'type': 'string'}}, 'concurrency': {'type': 'integer'}, 'runtime': {'enum': ['python2.7', 'python3.6', 'python3.7']}, 'role': {'type': 'string'}, 'timeout': {'type': 'number'}, 'memory': {'type': 'number'}, 'environment': {'type': 'object'}, 'tags': {'type': 'object'}, 'dead_letter_config': {'type': 'object'}, 'kms_key_arn': {'type': 'string'}, 'tracing_config': {'type': 'object'}, 'security_groups': {'type': 'array'}, 'subnets': {'type': 'array'} } }
[docs] def validate(self): super(LambdaMode, self).validate() prefix = self.policy.data.get('function-prefix', 'custodian-') if len(prefix + self.policy.name) > 64: raise PolicyValidationError( "Custodian Lambda policies have a max length with prefix of 64" " policy:%s prefix:%s" % (prefix, self.policy.name))
[docs] def get_metrics(self, start, end, period): from c7n.mu import LambdaManager, PolicyLambda manager = LambdaManager(self.policy.session_factory) values = manager.metrics( [PolicyLambda(self.policy)], start, end, period)[0] values.update( super(LambdaMode, self).get_metrics(start, end, period)) return values
[docs] def get_member_account_id(self, event): return event.get('account')
[docs] def get_member_region(self, event): return event.get('region')
[docs] def assume_member(self, event): # if a member role is defined we're being run out of the master, and we need # to assume back into the member for policy execution. member_role = self.policy.data['mode'].get('member-role') member_id = self.get_member_account_id(event) region = self.get_member_region(event) if member_role and member_id and region: # In the master account we might be multiplexing a hot lambda across # multiple member accounts for each event/invocation. member_role = member_role.format(account_id=member_id) utils.reset_session_cache() self.policy.options['account_id'] = member_id self.policy.session_factory.region = region self.policy.session_factory.assume_role = member_role self.policy.log.info( "Assuming member role:%s", member_role) return True return False
[docs] def resolve_resources(self, event): self.assume_member(event) mode = self.policy.data.get('mode', {}) resource_ids = CloudWatchEvents.get_ids(event, mode) if resource_ids is None: raise ValueError("Unknown push event mode %s", self.data) self.policy.log.info('Found resource ids:%s', resource_ids) # Handle multi-resource type events, like ec2 CreateTags resource_ids = self.policy.resource_manager.match_ids(resource_ids) if not resource_ids: self.policy.log.warning("Could not find resource ids") return [] resources = self.policy.resource_manager.get_resources(resource_ids) if 'debug' in event: self.policy.log.info("Resources %s", resources) return resources
[docs] def run(self, event, lambda_context): """Run policy in push mode against given event. Lambda automatically generates cloud watch logs, and metrics for us, albeit with some deficienies, metrics no longer count against valid resources matches, but against execution. If metrics execution option is enabled, custodian will generate metrics per normal. """ from c7n.actions import EventAction mode = self.policy.data.get('mode', {}) if not bool(mode.get("log", True)): root = logging.getLogger() map(root.removeHandler, root.handlers[:]) root.handlers = [logging.NullHandler()] resources = self.resolve_resources(event) if not resources: return resources resources = self.policy.resource_manager.filter_resources( resources, event) if 'debug' in event: self.policy.log.info("Filtered resources %d" % len(resources)) if not resources: self.policy.log.info( "policy:%s resources:%s no resources matched" % ( self.policy.name, self.policy.resource_type)) return with self.policy.ctx: self.policy.ctx.metrics.put_metric( 'ResourceCount', len(resources), 'Count', Scope="Policy", buffer=False) if 'debug' in event: self.policy.log.info( "Invoking actions %s", self.policy.resource_manager.actions) self.policy._write_file( 'resources.json', utils.dumps(resources, indent=2)) for action in self.policy.resource_manager.actions: self.policy.log.info( "policy:%s invoking action:%s resources:%d", self.policy.name, action.name, len(resources)) if isinstance(action, EventAction): results = action.process(resources, event) else: results = action.process(resources) self.policy._write_file( "action-%s" % action.name, utils.dumps(results)) return resources
[docs] def provision(self): from c7n import mu with self.policy.ctx: self.policy.log.info( "Provisioning policy lambda %s", self.policy.name) try: manager = mu.LambdaManager(self.policy.session_factory) except ClientError: # For cli usage by normal users, don't assume the role just use # it for the lambda manager = mu.LambdaManager( lambda assume=False: self.policy.session_factory(assume)) return manager.publish( mu.PolicyLambda(self.policy), role=self.policy.options.assume_role)
[docs] def get_logs(self, start, end): from c7n import mu, logs_support manager = mu.LambdaManager(self.policy.session_factory) log_gen = manager.logs(mu.PolicyLambda(self.policy), start, end) return logs_support.log_entries_in_range( log_gen, start, end, )
[docs]@execution.register('periodic') class PeriodicMode(LambdaMode, PullMode): """A policy that runs in pull mode within lambda.""" POLICY_METRICS = ('ResourceCount', 'ResourceTime', 'ActionTime') schema = utils.type_schema( 'periodic', schedule={'type': 'string'}, rinherit=LambdaMode.schema)
[docs] def run(self, event, lambda_context): return PullMode.run(self)
[docs]@execution.register('phd') class PHDMode(LambdaMode): """Personal Health Dashboard event based policy execution.""" schema = utils.type_schema( 'phd', required=['events'], events={'type': 'array', 'items': {'type': 'string'}}, categories={'type': 'array', 'items': { 'enum': ['issue', 'accountNotification', 'scheduledChange']}}, statuses={'type': 'array', 'items': { 'enum': ['open', 'upcoming', 'closed']}}, rinherit=LambdaMode.schema)
[docs] def validate(self): super(PHDMode, self).validate() if self.policy.resource_type == 'account': return if 'health-event' not in self.policy.resource_manager.filter_registry: raise PolicyValidationError( "policy:%s phd event mode not supported for resource:%s" % ( self.policy.name, self.policy.resource_type))
[docs] @staticmethod def process_event_arns(client, event_arns): entities = [] paginator = client.get_paginator('describe_affected_entities') for event_set in utils.chunks(event_arns, 10): entities.extend(list(itertools.chain( *[p['entities'] for p in paginator.paginate( filter={'eventArns': event_arns})]))) return entities
[docs] def resolve_resources(self, event): session = utils.local_session(self.policy.resource_manager.session_factory) health = session.client('health') he_arn = event['detail']['eventArn'] resource_arns = self.process_event_arns(health, [he_arn]) m = self.policy.resource_manager.get_model() if 'arn' in m.id.lower(): resource_ids = [r['entityValue'].rsplit('/', 1)[-1] for r in resource_arns] else: resource_ids = [r['entityValue'] for r in resource_arns] resources = self.policy.resource_manager.get_resources(resource_ids) for r in resources: r.setdefault('c7n:HealthEvent', []).append(he_arn) return resources
[docs]@execution.register('cloudtrail') class CloudTrailMode(LambdaMode): """A lambda policy using cloudwatch events rules on cloudtrail api logs.""" schema = utils.type_schema( 'cloudtrail', events={'type': 'array', 'items': { 'oneOf': [ {'type': 'string'}, {'type': 'object', 'required': ['event', 'source', 'ids'], 'properties': { 'source': {'type': 'string'}, 'ids': {'type': 'string'}, 'event': {'type': 'string'}}}] }}, rinherit=LambdaMode.schema)
[docs] def validate(self): super(CloudTrailMode, self).validate() from c7n import query events = self.policy.data['mode'].get('events') assert events, "cloud trail mode requires specifiying events to subscribe" for e in events: if isinstance(e, six.string_types): assert e in CloudWatchEvents.trail_events, "event shortcut not defined: %s" % e if isinstance(e, dict): jmespath.compile(e['ids']) if isinstance(self.policy.resource_manager, query.ChildResourceManager): if not getattr(self.policy.resource_manager.resource_type, 'supports_trailevents', False): raise ValueError( "resource:%s does not support cloudtrail mode policies" % ( self.policy.resource_type))
[docs]@execution.register('ec2-instance-state') class EC2InstanceState(LambdaMode): """ A lambda policy that executes on ec2 instance state changes. https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-lifecycle.html """ schema = utils.type_schema( 'ec2-instance-state', rinherit=LambdaMode.schema, events={'type': 'array', 'items': { 'enum': ['pending', 'running', 'shutting-down', 'stopped', 'stopping', 'terminated']}})
[docs]@execution.register('asg-instance-state') class ASGInstanceState(LambdaMode): """a lambda policy that executes on an asg's ec2 instance state changes.""" schema = utils.type_schema( 'asg-instance-state', rinherit=LambdaMode.schema, events={'type': 'array', 'items': { 'enum': ['launch-success', 'launch-failure', 'terminate-success', 'terminate-failure']}})
[docs]@execution.register('guard-duty') class GuardDutyMode(LambdaMode): """Incident Response for AWS Guard Duty""" schema = utils.type_schema('guard-duty', rinherit=LambdaMode.schema) supported_resources = ('account', 'ec2', 'iam-user') id_exprs = { 'account': jmespath.compile('detail.accountId'), 'ec2': jmespath.compile('detail.resource.instanceDetails.instanceId'), 'iam-user': jmespath.compile('detail.resource.accessKeyDetails.userName')}
[docs] def get_member_account_id(self, event): return event['detail']['accountId']
[docs] def resolve_resources(self, event): self.assume_member(event) rid = self.id_exprs[self.policy.resource_type].search(event) resources = self.policy.resource_manager.get_resources([rid]) # For iam users annotate with the access key specified in the finding event if resources and self.policy.resource_type == 'iam-user': resources[0]['c7n:AccessKeys'] = { 'AccessKeyId': event['detail']['resource']['accessKeyDetails']['accessKeyId']} return resources
[docs] def validate(self): super(GuardDutyMode, self).validate() if self.policy.data['resource'] not in self.supported_resources: raise ValueError( "Policy:%s resource:%s Guard duty mode only supported for %s" % ( self.policy.data['name'], self.policy.data['resource'], self.supported_resources))
[docs] def provision(self): if self.policy.data['resource'] == 'ec2': self.policy.data['mode']['resource-filter'] = 'Instance' elif self.policy.data['resource'] == 'iam-user': self.policy.data['mode']['resource-filter'] = 'AccessKey' return super(GuardDutyMode, self).provision()
[docs]@execution.register('config-rule') class ConfigRuleMode(LambdaMode): """a lambda policy that executes as a config service rule. http://docs.aws.amazon.com/config/latest/APIReference/API_PutConfigRule.html """ cfg_event = None schema = utils.type_schema('config-rule', rinherit=LambdaMode.schema)
[docs] def resolve_resources(self, event): source = self.policy.resource_manager.get_source('config') return [source.load_resource(self.cfg_event['configurationItem'])]
[docs] def run(self, event, lambda_context): self.cfg_event = json.loads(event['invokingEvent']) cfg_item = self.cfg_event['configurationItem'] evaluation = None resources = [] # TODO config resource type matches policy check if event['eventLeftScope'] or cfg_item['configurationItemStatus'] in ( "ResourceDeleted", "ResourceNotRecorded", "ResourceDeletedNotRecorded"): evaluation = { 'annotation': 'The rule does not apply.', 'compliance_type': 'NOT_APPLICABLE'} if evaluation is None: resources = super(ConfigRuleMode, self).run(event, lambda_context) match = self.policy.data['mode'].get('match-compliant', False) self.policy.log.info( "found resources:%d match-compliant:%s", len(resources or ()), match) if (match and resources) or (not match and not resources): evaluation = { 'compliance_type': 'COMPLIANT', 'annotation': 'The resource is compliant with policy:%s.' % ( self.policy.name)} else: evaluation = { 'compliance_type': 'NON_COMPLIANT', 'annotation': 'Resource is not compliant with policy:%s' % ( self.policy.name) } client = utils.local_session( self.policy.session_factory).client('config') client.put_evaluations( Evaluations=[{ 'ComplianceResourceType': cfg_item['resourceType'], 'ComplianceResourceId': cfg_item['resourceId'], 'ComplianceType': evaluation['compliance_type'], 'Annotation': evaluation['annotation'], # TODO ? if not applicable use current timestamp 'OrderingTimestamp': cfg_item[ 'configurationItemCaptureTime']}], ResultToken=event.get('resultToken', 'No token found.')) return resources
[docs]class Policy(object): log = logging.getLogger('custodian.policy') def __init__(self, data, options, session_factory=None): self.data = data self.options = options assert "name" in self.data if session_factory is None: session_factory = clouds[self.provider_name]().get_session_factory(options) self.session_factory = session_factory self.ctx = ExecutionContext(self.session_factory, self, self.options) self.resource_manager = self.load_resource_manager() def __repr__(self): return "<Policy resource:%s name:%s region:%s>" % ( self.resource_type, self.name, self.options.region) @property def name(self): return self.data['name'] @property def resource_type(self): return self.data['resource'] @property def provider_name(self): if '.' in self.resource_type: provider_name, resource_type = self.resource_type.split('.', 1) else: provider_name = 'aws' return provider_name @property def region(self): return self.data.get('region') @property def tz(self): return tzutil.gettz(self.data.get('tz', 'UTC')) @property def start(self): if self.data.get('start'): return parser.parse(self.data.get('start'), ignoretz=True).replace(tzinfo=self.tz) return None @property def end(self): if self.data.get('end'): return parser.parse(self.data.get('end'), ignoretz=True).replace(tzinfo=self.tz) return None @property def max_resources(self): return self.data.get('max-resources') @property def max_resources_percent(self): return self.data.get('max-resources-percent') @property def tags(self): return self.data.get('tags', ())
[docs] def get_cache(self): return self.resource_manager._cache
@property def execution_mode(self): return self.data.get('mode', {'type': 'pull'})['type']
[docs] def get_execution_mode(self): exec_mode = execution[self.execution_mode] if exec_mode is None: return None return exec_mode(self)
@property def is_lambda(self): if 'mode' not in self.data: return False return True
[docs] def validate(self): m = self.get_execution_mode() if m is None: raise PolicyValidationError( "Invalid Execution mode in policy %s" % (self.data,)) m.validate() self.validate_policy_start_stop() self.resource_manager.validate() for f in self.resource_manager.filters: f.validate() for a in self.resource_manager.actions: a.validate()
[docs] def get_variables(self, variables=None): """Get runtime variables for policy interpolation. Runtime variables are merged with the passed in variables if any. """ # Global policy variable expansion, we have to carry forward on # various filter/action local vocabularies. Where possible defer # by using a format string. # # See https://github.com/cloud-custodian/cloud-custodian/issues/2330 if not variables: variables = {} if 'mode' in self.data: if 'role' in self.data['mode'] and not self.data['mode']['role'].startswith("arn:aws"): self.data['mode']['role'] = "arn:aws:iam::%s:role/%s" % \ (self.options.account_id, self.data['mode']['role']) variables.update({ # standard runtime variables for interpolation 'account': '{account}', 'account_id': self.options.account_id, 'region': self.options.region, # non-standard runtime variables from local filter/action vocabularies # # notify action 'policy': self.data, 'event': '{event}', # mark for op action 'op': '{op}', 'action_date': '{action_date}', # tag action pyformat-date handling 'now': utils.FormatDate(datetime.utcnow()), # account increase limit action 'service': '{service}', # s3 set logging action :-( see if we can revisit this one. 'bucket_region': '{bucket_region}', 'bucket_name': '{bucket_name}', 'source_bucket_name': '{source_bucket_name}', 'target_bucket_name': '{target_bucket_name}', 'target_prefix': '{target_prefix}', 'LoadBalancerName': '{LoadBalancerName}' }) return variables
[docs] def expand_variables(self, variables): """Expand variables in policy data. Updates the policy data in-place. """ # format string values returns a copy updated = utils.format_string_values(self.data, **variables) # Several keys should only be expanded at runtime, perserve them. if 'member-role' in updated.get('mode', {}): updated['mode']['member-role'] = self.data['mode']['member-role'] # Update ourselves in place self.data = updated # Reload filters/actions using updated data, we keep a reference # for some compatiblity preservation work. m = self.resource_manager self.resource_manager = self.load_resource_manager() # XXX: Compatiblity hack # Preserve notify action subject lines which support # embedded jinja2 as a passthrough to the mailer. for old_a, new_a in zip(m.actions, self.resource_manager.actions): if old_a.type == 'notify' and 'subject' in old_a.data: new_a.data['subject'] = old_a.data['subject']
[docs] def push(self, event, lambda_ctx): mode = self.get_execution_mode() return mode.run(event, lambda_ctx)
[docs] def provision(self): """Provision policy as a lambda function.""" mode = self.get_execution_mode() return mode.provision()
[docs] def poll(self): """Query resources and apply policy.""" mode = self.get_execution_mode() return mode.run()
[docs] def get_logs(self, start, end): mode = self.get_execution_mode() return mode.get_logs(start, end)
[docs] def get_metrics(self, start, end, period): mode = self.get_execution_mode() return mode.get_metrics(start, end, period)
[docs] def get_permissions(self): """get permissions needed by this policy""" permissions = set() permissions.update(self.resource_manager.get_permissions()) for f in self.resource_manager.filters: permissions.update(f.get_permissions()) for a in self.resource_manager.actions: permissions.update(a.get_permissions()) return permissions
def __call__(self): """Run policy in default mode""" mode = self.get_execution_mode() if self.options.dryrun: resources = PullMode(self).run() elif isinstance(mode, ServerlessExecutionMode): resources = mode.provision() else: resources = mode.run() # clear out resource manager post run, to clear cache self.resource_manager = self.load_resource_manager() return resources run = __call__ def _write_file(self, rel_path, value): with open(os.path.join(self.ctx.log_dir, rel_path), 'w') as fh: fh.write(value)
[docs] def load_resource_manager(self): resource_type = self.data.get('resource') provider = clouds.get(self.provider_name) if provider is None: raise ValueError( "Invalid cloud provider: %s" % self.provider_name) factory = provider.resources.get( resource_type.rsplit('.', 1)[-1]) if not factory: raise ValueError( "Invalid resource type: %s" % resource_type) return factory(self.ctx, self.data)
[docs] def validate_policy_start_stop(self): policy_name = self.data.get('name') policy_tz = self.data.get('tz') policy_start = self.data.get('start') policy_end = self.data.get('end') if policy_tz: try: p_tz = tzutil.gettz(policy_tz) except Exception as e: raise ValueError( "Policy: %s TZ not parsable: %s, %s" % (policy_name, policy_tz, e)) # Type will be tzwin on windows, but tzwin is null on linux if not (isinstance(p_tz, tzutil.tzfile) or (tzutil.tzwin and isinstance(p_tz, tzutil.tzwin))): raise ValueError( "Policy: %s TZ not parsable: %s" % (policy_name, policy_tz)) for i in [policy_start, policy_end]: if i: try: parser.parse(i) except Exception as e: raise ValueError( "Policy: %s Date/Time not parsable: %s, %s" % (policy_name, i, e))