# Copyright 2016-2019 Capital One Services, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import, division, print_function, unicode_literals
import logging
import functools
from concurrent.futures import as_completed
from c7n.actions import BaseAction
from c7n.filters import AgeFilter
import c7n.filters.vpc as net_filters
from c7n.manager import resources
from c7n.query import QueryResourceManager
from c7n import tags
from .aws import shape_validate
from c7n.exceptions import PolicyValidationError
from c7n.utils import (
type_schema, local_session, snapshot_identifier, chunks,
get_retry, generate_arn)
log = logging.getLogger('custodian.rds-cluster')
[docs]@resources.register('rds-cluster')
class RDSCluster(QueryResourceManager):
"""Resource manager for RDS clusters.
"""
[docs] class resource_type(object):
service = 'rds'
type = 'cluster'
enum_spec = ('describe_db_clusters', 'DBClusters', None)
name = id = 'DBClusterIdentifier'
filter_name = None
filter_type = None
dimension = 'DBClusterIdentifier'
date = None
retry = staticmethod(get_retry(('Throttled',)))
@property
def generate_arn(self):
if self._generate_arn is None:
self._generate_arn = functools.partial(
generate_arn, 'rds', region=self.config.region,
account_id=self.account_id,
resource_type=self.resource_type.type, separator=':')
return self._generate_arn
[docs] def augment(self, dbs):
return list(filter(None, _rds_cluster_tags(
self.get_model(),
dbs, self.session_factory,
self.generate_arn, self.retry)))
RDSCluster.filter_registry.register('tag-count', tags.TagCountFilter)
RDSCluster.filter_registry.register('marked-for-op', tags.TagActionFilter)
def _rds_cluster_tags(model, dbs, session_factory, generator, retry):
"""Augment rds clusters with their respective tags."""
client = local_session(session_factory).client('rds')
def process_tags(db):
try:
db['Tags'] = retry(
client.list_tags_for_resource,
ResourceName=generator(db[model.id]))['TagList']
return db
except client.exceptions.DBClusterNotFoundFault:
return None
# Rds maintains a low api call limit, so this can take some time :-(
return list(filter(None, map(process_tags, dbs)))
[docs]@RDSCluster.action_registry.register('mark-for-op')
class TagDelayedAction(tags.TagDelayedAction):
"""Mark a RDS cluster for specific custodian action
:example:
.. code-block:: yaml
policies:
- name: mark-for-delete
resource: rds-cluster
filters:
- type: value
key: default-vpc
value: True
actions:
- type: mark-for-op
op: delete
days: 7
"""
[docs]@RDSCluster.action_registry.register('tag')
@RDSCluster.action_registry.register('mark')
class Tag(tags.Tag):
"""Mark/tag a RDS cluster with a key/value
:example:
.. code-block:: yaml
policies:
- name: rds-cluster-owner-tag
resource: rds-cluster
filters:
- "tag:OwnerName": absent
actions:
- type: tag
key: OwnerName
value: OwnerName
"""
concurrency = 2
batch_size = 5
permissions = ('rds:AddTagsToResource',)
[docs] def process_resource_set(self, client, dbs, ts):
for db in dbs:
arn = self.manager.generate_arn(db['DBClusterIdentifier'])
client.add_tags_to_resource(ResourceName=arn, Tags=ts)
[docs]@RDSCluster.action_registry.register('remove-tag')
@RDSCluster.action_registry.register('unmark')
class RemoveTag(tags.RemoveTag):
"""Removes a tag or set of tags from RDS clusters
:example:
.. code-block:: yaml
policies:
- name: rds-unmark-cluster
resource: rds-cluster
filters:
- "tag:ExpiredTag": present
actions:
- type: unmark
tags: ["ExpiredTag"]
"""
concurrency = 2
batch_size = 5
permissions = ('rds:RemoveTagsFromResource',)
[docs] def process_resource_set(self, client, dbs, tag_keys):
for db in dbs:
client.remove_tags_from_resource(
ResourceName=self.manager.generate_arn(db['DBClusterIdentifier']),
TagKeys=tag_keys)
[docs]@RDSCluster.filter_registry.register('security-group')
class SecurityGroupFilter(net_filters.SecurityGroupFilter):
RelatedIdsExpression = "VpcSecurityGroups[].VpcSecurityGroupId"
[docs]@RDSCluster.filter_registry.register('subnet')
class SubnetFilter(net_filters.SubnetFilter):
RelatedIdsExpression = ""
[docs] def get_permissions(self):
return self.manager.get_resource_manager(
'rds-subnet-group').get_permissions()
[docs] def process(self, resources, event=None):
self.groups = {
r['DBSubnetGroupName']: r for r in
self.manager.get_resource_manager('rds-subnet-group').resources()}
return super(SubnetFilter, self).process(resources, event)
RDSCluster.filter_registry.register('network-location', net_filters.NetworkLocation)
[docs]@RDSCluster.action_registry.register('delete')
class Delete(BaseAction):
"""Action to delete a RDS cluster
To prevent unwanted deletion of clusters, it is recommended to apply a
filter to the rule
:example:
.. code-block:: yaml
policies:
- name: rds-cluster-delete-unused
resource: rds-cluster
filters:
- type: metrics
name: CPUUtilization
days: 21
value: 1.0
op: le
actions:
- type: delete
skip-snapshot: false
delete-instances: true
"""
schema = type_schema(
'delete', **{'skip-snapshot': {'type': 'boolean'},
'delete-instances': {'type': 'boolean'}})
permissions = ('rds:DeleteDBCluster',)
[docs] def process(self, clusters):
skip = self.data.get('skip-snapshot', False)
delete_instances = self.data.get('delete-instances', True)
client = local_session(self.manager.session_factory).client('rds')
for cluster in clusters:
if delete_instances:
for instance in cluster.get('DBClusterMembers', []):
client.delete_db_instance(
DBInstanceIdentifier=instance['DBInstanceIdentifier'],
SkipFinalSnapshot=True)
self.log.info(
'Deleted RDS instance: %s',
instance['DBInstanceIdentifier'])
params = {'DBClusterIdentifier': cluster['DBClusterIdentifier']}
if skip:
params['SkipFinalSnapshot'] = True
else:
params['FinalDBSnapshotIdentifier'] = snapshot_identifier(
'Final', cluster['DBClusterIdentifier'])
_run_cluster_method(
client.delete_db_cluster, params,
(client.exceptions.DBClusterNotFoundFault, client.exceptions.ResourceNotFoundFault),
client.exceptions.InvalidDBClusterStateFault)
[docs]@RDSCluster.action_registry.register('retention')
class RetentionWindow(BaseAction):
"""
Action to set the retention period on rds cluster snapshots,
enforce (min, max, exact) sets retention days occordingly.
:example:
.. code-block:: yaml
policies:
- name: rds-cluster-backup-retention
resource: rds-cluster
filters:
- type: value
key: BackupRetentionPeriod
value: 21
op: ne
actions:
- type: retention
days: 21
enforce: min
"""
date_attribute = "BackupRetentionPeriod"
# Tag copy not yet available for Aurora:
# https://forums.aws.amazon.com/thread.jspa?threadID=225812
schema = type_schema(
'retention', **{'days': {'type': 'number'},
'enforce': {'type': 'string', 'enum': [
'min', 'max', 'exact']}})
permissions = ('rds:ModifyDBCluster',)
[docs] def process(self, clusters):
client = local_session(self.manager.session_factory).client('rds')
for cluster in clusters:
self.process_snapshot_retention(client, cluster)
[docs] def process_snapshot_retention(self, client, cluster):
current_retention = int(cluster.get('BackupRetentionPeriod', 0))
new_retention = self.data['days']
retention_type = self.data.get('enforce', 'min').lower()
if retention_type == 'min':
self.set_retention_window(
client, cluster, max(current_retention, new_retention))
elif retention_type == 'max':
self.set_retention_window(
client, cluster, min(current_retention, new_retention))
elif retention_type == 'exact':
self.set_retention_window(client, cluster, new_retention)
[docs] def set_retention_window(self, client, cluster, retention):
_run_cluster_method(
client.modify_db_cluster,
dict(DBClusterIdentifier=cluster['DBClusterIdentifier'],
BackupRetentionPeriod=retention,
PreferredBackupWindow=cluster['PreferredBackupWindow'],
PreferredMaintenanceWindow=cluster['PreferredMaintenanceWindow']),
(client.exceptions.DBClusterNotFoundFault, client.exceptions.ResourceNotFoundFault),
client.exceptions.InvalidDBClusterStateFault)
[docs]@RDSCluster.action_registry.register('stop')
class Stop(BaseAction):
"""Stop a running db cluster
"""
schema = type_schema('stop')
permissions = ('rds:StopDBCluster',)
[docs] def process(self, clusters):
client = local_session(self.manager.session_factory).client('rds')
for c in clusters:
_run_cluster_method(
client.stop_db_cluster, dict(DBClusterIdentifier=c['DBClusterIdentifier']),
(client.exceptions.DBClusterNotFoundFault, client.exceptions.ResourceNotFoundFault),
client.exceptions.InvalidDBClusterStateFault)
[docs]@RDSCluster.action_registry.register('start')
class Start(BaseAction):
"""Start a stopped db cluster
"""
schema = type_schema('start')
permissions = ('rds:StartDBCluster',)
[docs] def process(self, clusters):
client = local_session(self.manager.session_factory).client('rds')
for c in clusters:
_run_cluster_method(
client.start_db_cluster, dict(DBClusterIdentifier=c['DBClusterIdentifier']),
(client.exceptions.DBClusterNotFoundFault, client.exceptions.ResourceNotFoundFault),
client.exceptions.InvalidDBClusterStateFault)
def _run_cluster_method(method, params, ignore=(), warn=(), method_name=""):
try:
method(**params)
except ignore:
pass
except warn as e:
log.warning(
"error %s on cluster %s error %s",
method_name or method.__name__, params['DBClusterIdentifier'], e)
[docs]@RDSCluster.action_registry.register('snapshot')
class Snapshot(BaseAction):
"""Action to create a snapshot of a rds cluster
:example:
.. code-block:: yaml
policies:
- name: rds-cluster-snapshot
resource: rds-cluster
actions:
- snapshot
"""
schema = type_schema('snapshot')
permissions = ('rds:CreateDBClusterSnapshot',)
[docs] def process(self, clusters):
client = local_session(self.manager.session_factory).client('rds')
for cluster in clusters:
_run_cluster_method(
client.create_db_cluster_snapshot,
dict(
DBClusterSnapshotIdentifier=snapshot_identifier(
'Backup', cluster['DBClusterIdentifier']),
DBClusterIdentifier=cluster['DBClusterIdentifier']),
(client.exceptions.DBClusterNotFoundFault, client.exceptions.ResourceNotFoundFault),
client.exceptions.InvalidDBClusterStateFault)
[docs]@RDSCluster.action_registry.register('modify-db-cluster')
class ModifyDbCluster(BaseAction):
"""Modifies an RDS instance based on specified parameter
using ModifyDbInstance.
'Immediate" determines whether the modification is applied immediately
or not. If 'immediate' is not specified, default is false.
:example:
.. code-block:: yaml
policies:
- name: disable-db-cluster-deletion-protection
resource: rds-cluster
filters:
- DeletionProtection: true
- PubliclyAccessible: true
actions:
- type: modify-db-cluster
attributes:
CopyTagsToSnapshot: true
DeletionProtection: false
"""
schema = type_schema(
'modify-db-cluster',
attributes={'type': 'object'},
required=('attributes',))
permissions = ('rds:ModifyDBCluster',)
shape = 'ModifyDBClusterMessage'
[docs] def validate(self):
attrs = dict(self.data['attributes'])
if 'DBClusterIdentifier' in attrs:
raise PolicyValidationError(
"Can't include DBClusterIdentifier in modify-db-cluster action")
attrs['DBClusterIdentifier'] = 'PolicyValidation'
return shape_validate(attrs, self.shape, 'rds')
[docs] def process(self, clusters):
client = local_session(self.manager.session_factory).client('rds')
for c in clusters:
client.modify_db_cluster(
DBClusterIdentifier=c['DBClusterIdentifier'],
**self.data['attributes'])
[docs]@resources.register('rds-cluster-snapshot')
class RDSClusterSnapshot(QueryResourceManager):
"""Resource manager for RDS cluster snapshots.
"""
[docs] class resource_type(object):
service = 'rds'
type = 'rds-cluster-snapshot'
enum_spec = (
'describe_db_cluster_snapshots', 'DBClusterSnapshots', None)
name = id = 'DBClusterSnapshotIdentifier'
filter_name = None
filter_type = None
dimension = None
date = 'SnapshotCreateTime'
[docs]@RDSClusterSnapshot.filter_registry.register('age')
class RDSSnapshotAge(AgeFilter):
"""Filters rds cluster snapshots based on age (in days)
:example:
.. code-block:: yaml
policies:
- name: rds-cluster-snapshots-expired
resource: rds-cluster-snapshot
filters:
- type: age
days: 30
op: gt
"""
schema = type_schema(
'age', days={'type': 'number'},
op={'$ref': '#/definitions/filters_common/comparison_operators'})
date_attribute = 'SnapshotCreateTime'
[docs]@RDSClusterSnapshot.action_registry.register('delete')
class RDSClusterSnapshotDelete(BaseAction):
"""Action to delete rds cluster snapshots
To prevent unwanted deletion of rds cluster snapshots, it is recommended
to apply a filter to the rule
:example:
.. code-block:: yaml
policies:
- name: rds-cluster-snapshots-expired-delete
resource: rds-cluster-snapshot
filters:
- type: age
days: 30
op: gt
actions:
- delete
"""
schema = type_schema('delete')
permissions = ('rds:DeleteDBClusterSnapshot',)
[docs] def process(self, snapshots):
log.info("Deleting %d RDS cluster snapshots", len(snapshots))
client = local_session(self.manager.session_factory).client('rds')
error = None
with self.executor_factory(max_workers=2) as w:
futures = []
for snapshot_set in chunks(reversed(snapshots), size=50):
futures.append(
w.submit(self.process_snapshot_set, client, snapshot_set))
for f in as_completed(futures):
if f.exception():
error = f.exception()
self.log.error(
"Exception deleting snapshot set \n %s",
f.exception())
if error:
raise error
return snapshots
[docs] def process_snapshot_set(self, client, snapshots_set):
for s in snapshots_set:
try:
client.delete_db_cluster_snapshot(
DBClusterSnapshotIdentifier=s['DBClusterSnapshotIdentifier'])
except (client.exceptions.DBSnapshotNotFoundFault,
client.exceptions.InvalidDBSnapshotStateFault):
continue