Source code for c7n_gcp.policy

# Copyright 2018 Capital One Services, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging

from dateutil.tz import tz

from c7n.exceptions import PolicyValidationError
from c7n.policy import execution, ServerlessExecutionMode, PullMode
from c7n.utils import local_session, type_schema

from c7n_gcp import mu


[docs]class FunctionMode(ServerlessExecutionMode): schema = type_schema( 'gcp', **{'execution-options': {'type': 'object'}, 'timeout': {'type': 'string'}, 'memory-size': {'type': 'integer'}, 'labels': {'type': 'object'}, 'network': {'type': 'string'}, 'region': {'type': 'string'}, 'max-instances': {'type': 'integer'}, 'service-account': {'type': 'string'}, 'environment': {'type': 'object'}}) def __init__(self, policy): self.policy = policy self.log = logging.getLogger('custodian.gcp.funcexec')
[docs] def run(self): raise NotImplementedError("subclass responsibility")
[docs] def provision(self): self.log.info("Provisioning policy function %s", self.policy.name) manager = mu.CloudFunctionManager(self.policy.session_factory) return manager.publish(self._get_function())
[docs] def deprovision(self): manager = mu.CloudFunctionManager(self.policy.session_factory) return manager.remove(self._get_function())
[docs] def validate(self): pass
def _get_function(self): raise NotImplementedError("subclass responsibility")
[docs]@execution.register('gcp-periodic') class PeriodicMode(FunctionMode, PullMode): schema = type_schema( 'gcp-periodic', rinherit=FunctionMode.schema, required=['schedule'], **{'trigger-type': {'enum': ['http', 'pubsub']}, 'tz': {'type': 'string'}, 'schedule': {'type': 'string'}})
[docs] def validate(self): mode = self.policy.data['mode'] if 'tz' in mode: error = PolicyValidationError( "policy:%s gcp-periodic invalid tz:%s" % ( self.policy.name, mode['tz'])) # We can't catch all errors statically, our local tz retrieval # then the form gcp is using, ie. not all the same aliases are # defined. tzinfo = tz.gettz(mode['tz']) if tzinfo is None: raise error
def _get_function(self): events = [mu.PeriodicEvent( local_session(self.policy.session_factory), self.policy.data['mode'])] return mu.PolicyFunction(self.policy, events=events)
[docs] def run(self, event, context): return PullMode.run(self)
[docs]@execution.register('gcp-audit') class ApiAuditMode(FunctionMode): """Custodian policy execution on gcp api audit logs """ schema = type_schema( 'gcp-audit', methods={'type': 'array', 'items': {'type': 'string'}}, required=['methods'], rinherit=FunctionMode.schema)
[docs] def resolve_resources(self, event): """Resolve a gcp resource from its audit trail metadata. """ if self.policy.resource_manager.resource_type.get_requires_event: return [self.policy.resource_manager.get_resource(event)] resource_info = event.get('resource') if resource_info is None or 'labels' not in resource_info: self.policy.log.warning("Could not find resource information in event") return # copy resource name, the api doesn't like resource ids, just names. if 'resourceName' in event['protoPayload']: resource_info['labels']['resourceName'] = event['protoPayload']['resourceName'] resource = self.policy.resource_manager.get_resource(resource_info['labels']) return [resource]
def _get_function(self): events = [mu.ApiSubscriber( local_session(self.policy.session_factory), self.policy.data['mode'])] return mu.PolicyFunction(self.policy, events=events)
[docs] def validate(self): if not self.policy.resource_manager.resource_type.get: raise PolicyValidationError( "Resource:%s does not implement retrieval method" % ( self.policy.resource_type))
[docs] def run(self, event, context): """Execute a gcp serverless model""" resources = self.resolve_resources(event) if not resources: return resources = self.policy.resource_manager.filter_resources( resources, event) self.policy.log.info("Filtered resources %d" % len(resources)) if not resources: return for action in self.policy.resource_manager.actions: action.process(resources) return resources