Source code for c7n.resources.redshift

# Copyright 2016-2017 Capital One Services, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import, division, print_function, unicode_literals

import functools
import json
import logging
import itertools

from botocore.exceptions import ClientError
from concurrent.futures import as_completed

from c7n.actions import ActionRegistry, BaseAction, ModifyVpcSecurityGroupsAction
from c7n.exceptions import PolicyValidationError
from c7n.filters import (
    FilterRegistry, ValueFilter, DefaultVpcBase, AgeFilter,
    CrossAccountAccessFilter)
import c7n.filters.vpc as net_filters
from c7n.filters.kms import KmsRelatedFilter

from c7n.manager import resources
from c7n.resolver import ValuesFrom
from c7n.query import QueryResourceManager
from c7n import tags
from c7n.utils import (
    type_schema, local_session, chunks, generate_arn, get_retry,
    snapshot_identifier)

log = logging.getLogger('custodian.redshift')

filters = FilterRegistry('redshift.filters')
actions = ActionRegistry('redshift.actions')
filters.register('marked-for-op', tags.TagActionFilter)


[docs]@resources.register('redshift') class Redshift(QueryResourceManager):
[docs] class resource_type(object): service = 'redshift' type = 'cluster' enum_spec = ('describe_clusters', 'Clusters', None) detail_spec = None name = id = 'ClusterIdentifier' filter_name = 'ClusterIdentifier' filter_type = 'scalar' date = 'ClusterCreateTime' dimension = 'ClusterIdentifier' config_type = "AWS::Redshift::Cluster"
filter_registry = filters action_registry = actions retry = staticmethod(get_retry(('Throttling',))) permissions = ('iam:ListRoles',) # account id retrieval _generate_arn = None @property def generate_arn(self): if self._generate_arn is None: self._generate_arn = functools.partial( generate_arn, 'redshift', region=self.config.region, account_id=self.account_id, resource_type='cluster', separator=':') return self._generate_arn
[docs]@filters.register('default-vpc') class DefaultVpc(DefaultVpcBase): """ Matches if an redshift database is in the default vpc :example: .. code-block:: yaml policies: - name: redshift-default-vpc resource: redshift filters: - default-vpc """ schema = type_schema('default-vpc') def __call__(self, redshift): return (redshift.get('VpcId') and self.match(redshift.get('VpcId')) or False)
[docs]@filters.register('security-group') class SecurityGroupFilter(net_filters.SecurityGroupFilter): RelatedIdsExpression = "VpcSecurityGroups[].VpcSecurityGroupId"
[docs]@filters.register('subnet') class SubnetFilter(net_filters.SubnetFilter): RelatedIdsExpression = ""
[docs] def get_permissions(self): return RedshiftSubnetGroup(self.manager.ctx, {}).get_permissions()
[docs] def process(self, resources, event=None): self.groups = {r['ClusterSubnetGroupName']: r for r in RedshiftSubnetGroup(self.manager.ctx, {}).resources()} return super(SubnetFilter, self).process(resources, event)
filters.register('network-location', net_filters.NetworkLocation)
[docs]@filters.register('param') class Parameter(ValueFilter): """Filter redshift clusters based on parameter values :example: .. code-block:: yaml policies: - name: redshift-no-ssl resource: redshift filters: - type: param key: require_ssl value: false op: eq """ schema = type_schema('param', rinherit=ValueFilter.schema) group_params = () permissions = ("redshift:DescribeClusterParameters",)
[docs] def process(self, clusters, event=None): groups = {} for r in clusters: for pg in r['ClusterParameterGroups']: groups.setdefault(pg['ParameterGroupName'], []).append( r['ClusterIdentifier']) def get_params(group_name): c = local_session(self.manager.session_factory).client('redshift') paginator = c.get_paginator('describe_cluster_parameters') param_group = list(itertools.chain(*[p['Parameters'] for p in paginator.paginate(ParameterGroupName=group_name)])) params = {} for p in param_group: v = p['ParameterValue'] if v != 'default' and p['DataType'] in ('integer', 'boolean'): # overkill.. v = json.loads(v) params[p['ParameterName']] = v return params with self.executor_factory(max_workers=3) as w: group_names = groups.keys() self.group_params = dict( zip(group_names, w.map(get_params, group_names))) return super(Parameter, self).process(clusters, event)
def __call__(self, db): params = {} for pg in db['ClusterParameterGroups']: params.update(self.group_params[pg['ParameterGroupName']]) return self.match(params)
[docs]@filters.register('kms-key') class KmsFilter(KmsRelatedFilter): """ Filter a resource by its associcated kms key and optionally the aliasname of the kms key by using 'c7n:AliasName' :example: .. code-block:: yaml policies: - name: redshift-kms-key-filters resource: redshift filters: - type: kms-key key: c7n:AliasName value: "^(alias/aws/)" op: regex """ RelatedIdsExpression = 'KmsKeyId'
[docs]@actions.register('delete') class Delete(BaseAction): """Action to delete a redshift cluster To prevent unwanted deletion of redshift clusters, it is recommended to apply a filter to the rule :example: .. code-block:: yaml policies: - name: redshift-no-ssl resource: redshift filters: - type: param key: require_ssl value: false op: eq actions: - type: delete """ schema = type_schema( 'delete', **{'skip-snapshot': {'type': 'boolean'}}) permissions = ('redshift:DeleteCluster',)
[docs] def process(self, clusters): with self.executor_factory(max_workers=2) as w: futures = [] for db_set in chunks(clusters, size=5): futures.append( w.submit(self.process_db_set, db_set)) for f in as_completed(futures): if f.exception(): self.log.error( "Exception deleting redshift set \n %s", f.exception())
[docs] def process_db_set(self, db_set): skip = self.data.get('skip-snapshot', False) c = local_session(self.manager.session_factory).client('redshift') for db in db_set: params = {'ClusterIdentifier': db['ClusterIdentifier']} if skip: params['SkipFinalClusterSnapshot'] = True else: params['FinalClusterSnapshotIdentifier'] = snapshot_identifier( 'Final', db['ClusterIdentifier']) try: c.delete_cluster(**params) except ClientError as e: if e.response['Error']['Code'] == "InvalidClusterState": self.log.warning( "Cannot delete cluster when not 'Available' state: %s", db['ClusterIdentifier']) continue raise
[docs]@actions.register('retention') class RetentionWindow(BaseAction): """Action to set the snapshot retention period (in days) :example: .. code-block:: yaml policies: - name: redshift-snapshot-retention resource: redshift filters: - type: value key: AutomatedSnapshotRetentionPeriod value: 21 op: ne actions: - type: retention days: 21 """ date_attribute = 'AutomatedSnapshotRetentionPeriod' schema = type_schema( 'retention', **{'days': {'type': 'number'}}) permissions = ('redshift:ModifyCluster',)
[docs] def process(self, clusters): with self.executor_factory(max_workers=2) as w: futures = [] for cluster in clusters: futures.append(w.submit( self.process_snapshot_retention, cluster)) for f in as_completed(futures): if f.exception(): self.log.error( "Exception setting Redshift retention \n %s", f.exception())
[docs] def process_snapshot_retention(self, cluster): current_retention = int(cluster.get(self.date_attribute, 0)) new_retention = self.data['days'] if current_retention < new_retention: self.set_retention_window( cluster, max(current_retention, new_retention)) return cluster
[docs] def set_retention_window(self, cluster, retention): c = local_session(self.manager.session_factory).client('redshift') c.modify_cluster( ClusterIdentifier=cluster['ClusterIdentifier'], AutomatedSnapshotRetentionPeriod=retention)
[docs]@actions.register('snapshot') class Snapshot(BaseAction): """Action to take a snapshot of a redshift cluster :example: .. code-block:: yaml policies: - name: redshift-snapshot resource: redshift filters: - type: value key: ClusterStatus value: available op: eq actions: - snapshot """ schema = type_schema('snapshot') permissions = ('redshift:CreateClusterSnapshot',)
[docs] def process(self, clusters): client = local_session(self.manager.session_factory).client('redshift') with self.executor_factory(max_workers=2) as w: futures = [] for cluster in clusters: futures.append(w.submit( self.process_cluster_snapshot, client, cluster)) for f in as_completed(futures): if f.exception(): self.log.error( "Exception creating Redshift snapshot \n %s", f.exception()) return clusters
[docs] def process_cluster_snapshot(self, client, cluster): cluster_tags = cluster.get('Tags') client.create_cluster_snapshot( SnapshotIdentifier=snapshot_identifier( 'Backup', cluster['ClusterIdentifier']), ClusterIdentifier=cluster['ClusterIdentifier'], Tags=cluster_tags)
[docs]@actions.register('enable-vpc-routing') class EnhancedVpcRoutine(BaseAction): """Action to enable enhanced vpc routing on a redshift cluster More: https://docs.aws.amazon.com/redshift/latest/mgmt/enhanced-vpc-routing.html :example: .. code-block:: yaml policies: - name: redshift-enable-enhanced-routing resource: redshift filters: - type: value key: EnhancedVpcRouting value: false op: eq actions: - type: enable-vpc-routing value: true """ schema = type_schema( 'enable-vpc-routing', value={'type': 'boolean'}) permissions = ('redshift:ModifyCluster',)
[docs] def process(self, clusters): with self.executor_factory(max_workers=3) as w: futures = [] for cluster in clusters: futures.append(w.submit( self.process_vpc_routing, cluster)) for f in as_completed(futures): if f.exception(): self.log.error( "Exception changing Redshift VPC routing \n %s", f.exception()) return clusters
[docs] def process_vpc_routing(self, cluster): current_routing = bool(cluster.get('EnhancedVpcRouting', False)) new_routing = self.data.get('value', True) if current_routing != new_routing: c = local_session(self.manager.session_factory).client('redshift') c.modify_cluster( ClusterIdentifier=cluster['ClusterIdentifier'], EnhancedVpcRouting=new_routing)
[docs]@actions.register('set-public-access') class RedshiftSetPublicAccess(BaseAction): """ Action to set the 'PubliclyAccessible' setting on a redshift cluster :example: .. code-block:: yaml policies: - name: redshift-set-public-access resource: redshift filters: - PubliclyAccessible: true actions: - type: set-public-access state: false """ schema = type_schema( 'set-public-access', state={'type': 'boolean'}) permissions = ('redshift:ModifyCluster',)
[docs] def set_access(self, c): client = local_session(self.manager.session_factory).client('redshift') client.modify_cluster( ClusterIdentifier=c['ClusterIdentifier'], PubliclyAccessible=self.data.get('state', False))
[docs] def process(self, clusters): with self.executor_factory(max_workers=2) as w: futures = {w.submit(self.set_access, c): c for c in clusters} for f in as_completed(futures): if f.exception(): self.log.error( "Exception setting Redshift public access on %s \n %s", futures[f]['ClusterIdentifier'], f.exception()) return clusters
[docs]@actions.register('mark-for-op') class TagDelayedAction(tags.TagDelayedAction): """Action to create an action to be performed at a later time :example: .. code-block:: yaml policies: - name: redshift-terminate-unencrypted resource: redshift filters: - "tag:custodian_cleanup": absent - type: value key: Encrypted value: false op: eq actions: - type: mark-for-op tag: custodian_cleanup op: delete days: 5 msg: "Unencrypted Redshift cluster: {op}@{action_date}" """
[docs]@actions.register('tag') class Tag(tags.Tag): """Action to add tag/tags to a redshift cluster :example: .. code-block:: yaml policies: - name: redshift-tag resource: redshift filters: - "tag:RedshiftTag": absent actions: - type: tag key: RedshiftTag value: "Redshift Tag Value" """ concurrency = 2 batch_size = 5 permissions = ('redshift:CreateTags',)
[docs] def process_resource_set(self, client, resources, tags): for rarn, r in zip(self.manager.get_arns(resources), resources): client.create_tags(ResourceName=rarn, Tags=tags)
[docs]@actions.register('unmark') @actions.register('remove-tag') class RemoveTag(tags.RemoveTag): """Action to remove tag/tags from a redshift cluster :example: .. code-block:: yaml policies: - name: redshift-remove-tag resource: redshift filters: - "tag:RedshiftTag": present actions: - type: remove-tag tags: ["RedshiftTags"] """ concurrency = 2 batch_size = 5 permissions = ('redshift:DeleteTags',)
[docs] def process_resource_set(self, client, resources, tag_keys): for rarn, r in zip(self.manager.get_arns(resources), resources): client.delete_tags(ResourceName=rarn, TagKeys=tag_keys)
[docs]@actions.register('tag-trim') class TagTrim(tags.TagTrim): """Action to remove tags from a redshift cluster This can be used to prevent reaching the ceiling limit of tags on a resource :example: .. code-block:: yaml policies: - name: redshift-tag-trim resource: redshift filters: - type: value key: "length(Tags)" op: ge value: 10 actions: - type: tag-trim space: 1 preserve: - RequiredTag1 - RequiredTag2 """ max_tag_count = 10 permissions = ('redshift:DeleteTags',)
[docs] def process_tag_removal(self, client, resource, candidates): arn = self.manager.generate_arn(resource['DBInstanceIdentifier']) client.delete_tags(ResourceName=arn, TagKeys=candidates)
[docs]@resources.register('redshift-subnet-group') class RedshiftSubnetGroup(QueryResourceManager): """Redshift subnet group."""
[docs] class resource_type(object): service = 'redshift' type = 'redshift-subnet-group' id = name = 'ClusterSubnetGroupName' enum_spec = ( 'describe_cluster_subnet_groups', 'ClusterSubnetGroups', None) filter_name = 'ClusterSubnetGroupName' filter_type = 'scalar' dimension = None date = None config_type = "AWS::Redshift::ClusterSubnetGroup"
[docs]@resources.register('redshift-snapshot') class RedshiftSnapshot(QueryResourceManager): """Resource manager for Redshift snapshots. """ filter_registry = FilterRegistry('redshift-snapshot.filters') action_registry = ActionRegistry('redshift-snapshot.actions') filter_registry.register('marked-for-op', tags.TagActionFilter) _generate_arn = None @property def generate_arn(self): if self._generate_arn is None: self._generate_arn = functools.partial( generate_arn, 'redshift', region=self.config.region, account_id=self.account_id, resource_type='snapshot', separator=':') return self._generate_arn
[docs] class resource_type(object): service = 'redshift' type = 'redshift-snapshot' enum_spec = ('describe_cluster_snapshots', 'Snapshots', None) name = id = 'SnapshotIdentifier' filter_name = None filter_type = None dimension = None date = 'SnapshotCreateTime' config_type = "AWS::Redshift::ClusterSnapshot"
[docs]@actions.register('modify-security-groups') class RedshiftModifyVpcSecurityGroups(ModifyVpcSecurityGroupsAction): """Modify security groups on a Redshift cluster""" permissions = ('redshift:ModifyCluster',)
[docs] def process(self, clusters): client = local_session(self.manager.session_factory).client('redshift') groups = super( RedshiftModifyVpcSecurityGroups, self).get_groups(clusters) for idx, c in enumerate(clusters): client.modify_cluster( ClusterIdentifier=c['ClusterIdentifier'], VpcSecurityGroupIds=groups[idx])
[docs]@RedshiftSnapshot.filter_registry.register('age') class RedshiftSnapshotAge(AgeFilter): """Filters redshift snapshots based on age (in days) :example: .. code-block:: yaml policies: - name: redshift-old-snapshots resource: redshift-snapshot filters: - type: age days: 21 op: gt """ schema = type_schema( 'age', days={'type': 'number'}, op={'$ref': '#/definitions/filters_common/comparison_operators'}) date_attribute = 'SnapshotCreateTime'
[docs]@RedshiftSnapshot.filter_registry.register('cross-account') class RedshiftSnapshotCrossAccount(CrossAccountAccessFilter): """Filter all accounts that allow access to non-whitelisted accounts """ permissions = ('redshift:DescribeClusterSnapshots',) schema = type_schema( 'cross-account', whitelist={'type': 'array', 'items': {'type': 'string'}}, whitelist_from=ValuesFrom.schema)
[docs] def process(self, snapshots, event=None): accounts = self.get_accounts() snapshots = [s for s in snapshots if s.get('AccountsWithRestoreAccess')] results = [] for s in snapshots: s_accounts = {a.get('AccountId') for a in s[ 'AccountsWithRestoreAccess']} delta_accounts = s_accounts.difference(accounts) if delta_accounts: s['c7n:CrossAccountViolations'] = list(delta_accounts) results.append(s) return results
[docs]@RedshiftSnapshot.action_registry.register('delete') class RedshiftSnapshotDelete(BaseAction): """Filters redshift snapshots based on age (in days) :example: .. code-block:: yaml policies: - name: redshift-delete-old-snapshots resource: redshift-snapshot filters: - type: age days: 21 op: gt actions: - delete """ schema = type_schema('delete') permissions = ('redshift:DeleteClusterSnapshot',)
[docs] def process(self, snapshots): log.info("Deleting %d Redshift snapshots", len(snapshots)) with self.executor_factory(max_workers=3) as w: futures = [] for snapshot_set in chunks(reversed(snapshots), size=50): futures.append( w.submit(self.process_snapshot_set, snapshot_set)) for f in as_completed(futures): if f.exception(): self.log.error( "Exception deleting snapshot set \n %s", f.exception()) return snapshots
[docs] def process_snapshot_set(self, snapshots_set): c = local_session(self.manager.session_factory).client('redshift') for s in snapshots_set: c.delete_cluster_snapshot( SnapshotIdentifier=s['SnapshotIdentifier'], SnapshotClusterIdentifier=s['ClusterIdentifier'])
[docs]@RedshiftSnapshot.action_registry.register('mark-for-op') class RedshiftSnapshotTagDelayedAction(tags.TagDelayedAction): """Action to create a delayed actions to be performed on a redshift snapshot :example: .. code-block:: yaml policies: - name: redshift-snapshot-expiring resource: redshift-snapshot filters: - "tag:custodian_cleanup": absent - type: age days: 14 op: eq actions: - type: mark-for-op tag: custodian_cleanup msg: "Snapshot expiring: {op}@{action_date}" op: delete days: 7 """
[docs]@RedshiftSnapshot.action_registry.register('tag') class RedshiftSnapshotTag(tags.Tag): """Action to add tag/tags to a redshift snapshot :example: .. code-block:: yaml policies: - name: redshift-required-tags resource: redshift-snapshot filters: - "tag:RequiredTag1": absent actions: - type: tag key: RequiredTag1 value: RequiredValue1 """ concurrency = 2 batch_size = 5 permissions = ('redshift:CreateTags',)
[docs] def process_resource_set(self, client, resources, tags): for r in resources: arn = self.manager.generate_arn( r['ClusterIdentifier'] + '/' + r['SnapshotIdentifier']) client.create_tags(ResourceName=arn, Tags=tags)
[docs]@RedshiftSnapshot.action_registry.register('unmark') @RedshiftSnapshot.action_registry.register('remove-tag') class RedshiftSnapshotRemoveTag(tags.RemoveTag): """Action to remove tag/tags from a redshift snapshot :example: .. code-block:: yaml policies: - name: redshift-remove-tags resource: redshift-snapshot filters: - "tag:UnusedTag1": present actions: - type: remove-tag tags: ["UnusedTag1"] """ concurrency = 2 batch_size = 5 permissions = ('redshift:DeleteTags',)
[docs] def process_resource_set(self, client, resources, tag_keys): for r in resources: arn = self.manager.generate_arn( r['ClusterIdentifier'] + '/' + r['SnapshotIdentifier']) client.delete_tags(ResourceName=arn, TagKeys=tag_keys)
[docs]@RedshiftSnapshot.action_registry.register('revoke-access') class RedshiftSnapshotRevokeAccess(BaseAction): """Revokes ability of accounts to restore a snapshot :example: .. code-block: yaml policies: - name: redshift-snapshot-revoke-access resource: redshift-snapshot filters: - type: cross-account whitelist: - 012345678910 actions: - type: revoke-access """ permissions = ('redshift:RevokeSnapshotAccess',) schema = type_schema('revoke-access')
[docs] def validate(self): for f in self.manager.iter_filters(): if isinstance(f, RedshiftSnapshotCrossAccount): return self raise PolicyValidationError( '`revoke-access` may only be used in ' 'conjunction with `cross-account` filter on %s' % (self.manager.data,))
[docs] def process_snapshot_set(self, client, snapshot_set): for s in snapshot_set: for a in s.get('c7n:CrossAccountViolations', []): try: self.manager.retry( client.revoke_snapshot_access, SnapshotIdentifier=s['SnapshotIdentifier'], AccountWithRestoreAccess=a) except ClientError as e: if e.response['Error']['Code'] == 'ClusterSnapshotNotFound': continue raise
[docs] def process(self, snapshots): client = local_session(self.manager.session_factory).client('redshift') with self.executor_factory(max_workers=2) as w: futures = {} for snapshot_set in chunks(snapshots, 25): futures[w.submit( self.process_snapshot_set, client, snapshot_set) ] = snapshot_set for f in as_completed(futures): if f.exception(): self.log.exception( 'Exception while revoking access on %s: %s' % ( ', '.join( [s['SnapshotIdentifier'] for s in futures[f]]), f.exception()))