Source code for c7n.resources.cloudtrail

# Copyright 2017-2019 Capital One Services, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import, division, print_function, unicode_literals

import logging
import operator

from c7n.actions import Action
from c7n.exceptions import PolicyValidationError
from c7n.filters import ValueFilter, Filter
from c7n.manager import resources
from c7n.query import QueryResourceManager
from c7n.utils import local_session, type_schema

from .aws import shape_validate, Arn

log = logging.getLogger('c7n.resources.cloudtrail')


[docs]@resources.register('cloudtrail') class CloudTrail(QueryResourceManager):
[docs] class resource_type(object): service = 'cloudtrail' enum_spec = ('describe_trails', 'trailList', None) filter_name = 'trailNameList' filter_type = 'list' arn = id = 'TrailARN' name = 'Name' dimension = None config_type = "AWS::CloudTrail::Trail"
[docs]@CloudTrail.filter_registry.register('is-shadow') class IsShadow(Filter): """Identify shadow trails (secondary copies), shadow trails can't be modified directly, the origin trail needs to be modified. Shadow trails are created for multi-region trails as well for organizational trails. """ schema = type_schema('is-shadow', state={'type': 'boolean'}) permissions = ('cloudtrail:DescribeTrails',) embedded = False
[docs] def process(self, resources, event=None): anded = lambda x: True # NOQA op = self.data.get('state', True) and anded or operator.__not__ rcount = len(resources) trails = [t for t in resources if op(self.is_shadow(t))] if len(trails) != rcount and self.embedded: self.log.info("implicitly filtering shadow trails %d -> %d", rcount, len(trails)) return trails
[docs] def is_shadow(self, t): if t.get('IsOrganizationTrail') and self.manager.config.account_id not in t['TrailARN']: return True if t.get('IsMultiRegionTrail') and t['HomeRegion'] not in t['TrailARN']: return True
[docs]@CloudTrail.filter_registry.register('status') class Status(ValueFilter): """Filter a cloudtrail by its status. :Example: .. code-block:: yaml policies: - name: cloudtrail-not-active resource: aws.cloudtrail filters: - type: status key: IsLogging value: False """ schema = type_schema('status', rinherit=ValueFilter.schema) permissions = ('cloudtrail:GetTrailStatus',) annotation_key = 'c7n:TrailStatus'
[docs] def process(self, resources, event=None): for r in resources: region = self.manager.config.region trail_arn = Arn.parse(r['TrailARN']) if (r.get('IsOrganizationTrail') and self.manager.config.account_id != trail_arn.account_id): continue if r.get('HomeRegion') and r['HomeRegion'] != region: region = trail_arn.region if self.annotation_key in r: continue client = local_session(self.manager.session_factory).client( 'cloudtrail', region_name=region) status = client.get_trail_status(Name=r['Name']) status.pop('ResponseMetadata') r[self.annotation_key] = status return super(Status, self).process(resources)
def __call__(self, r): return self.match(r['c7n:TrailStatus'])
[docs]@CloudTrail.action_registry.register('update-trail') class UpdateTrail(Action): """Update trail attributes. :Example: .. code-block:: yaml policies: - name: cloudtrail-set-log resource: aws.cloudtrail filters: - or: - KmsKeyId: empty - LogFileValidationEnabled: false actions: - type: update-trail attributes: KmsKeyId: arn:aws:kms:us-west-2:111122223333:key/1234abcd-12ab-34cd-56ef EnableLogFileValidation: true """ schema = type_schema( 'update-trail', attributes={'type': 'object'}, required=('attributes',)) shape = 'UpdateTrailRequest' permissions = ('cloudtrail:UpdateTrail',)
[docs] def validate(self): attrs = dict(self.data['attributes']) if 'Name' in attrs: raise PolicyValidationError( "Can't include Name in update-trail action") attrs['Name'] = 'PolicyValidation' return shape_validate( attrs, self.shape, self.manager.resource_type.service)
[docs] def process(self, resources): client = local_session(self.manager.session_factory).client('cloudtrail') shadow_check = IsShadow({'state': False}, self.manager) shadow_check.embedded = True resources = shadow_check.process(resources) for r in resources: client.update_trail( Name=r['Name'], **self.data['attributes'])
[docs]@CloudTrail.action_registry.register('set-logging') class SetLogging(Action): """Set the logging state of a trail :Example: .. code-block:: yaml policies: - name: cloudtrail-not-active resource: aws.cloudtrail filters: - type: status key: IsLogging value: False actions: - type: set-logging enabled: True """ schema = type_schema( 'set-logging', enabled={'type': 'boolean'})
[docs] def get_permissions(self): enable = self.data.get('enabled', True) if enable is True: return ('cloudtrail:StartLogging',) else: return ('cloudtrail:StopLogging',)
[docs] def process(self, resources): client = local_session(self.manager.session_factory).client('cloudtrail') shadow_check = IsShadow({'state': False}, self.manager) shadow_check.embedded = True resources = shadow_check.process(resources) enable = self.data.get('enabled', True) for r in resources: if enable: client.start_logging(Name=r['Name']) else: client.stop_logging(Name=r['Name'])