# Copyright 2015-2017 Capital One Services, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
RDS Resource Manager
====================
Example Policies
----------------
Find rds instances that are publicly available
.. code-block:: yaml
policies:
- name: rds-public
resource: rds
filters:
- PubliclyAccessible: true
Find rds instances that are not encrypted
.. code-block:: yaml
policies:
- name: rds-non-encrypted
resource: rds
filters:
- type: value
key: StorageEncrypted
value: true
op: ne
"""
from __future__ import absolute_import, division, print_function, unicode_literals
import functools
import itertools
import logging
import operator
import re
from decimal import Decimal as D, ROUND_HALF_UP
from distutils.version import LooseVersion
from botocore.exceptions import ClientError
from concurrent.futures import as_completed
from c7n.actions import (
ActionRegistry, BaseAction, ModifyVpcSecurityGroupsAction)
from c7n.actions.securityhub import OtherResourcePostFinding
from c7n.exceptions import PolicyValidationError
from c7n.filters import (
CrossAccountAccessFilter, FilterRegistry, Filter, ValueFilter, AgeFilter)
from c7n.filters.offhours import OffHour, OnHour
import c7n.filters.vpc as net_filters
from c7n.manager import resources
from c7n.query import QueryResourceManager, DescribeSource, ConfigSource
from c7n import tags
from c7n.tags import universal_augment, register_universal_tags
from c7n.utils import (
local_session, type_schema,
get_retry, chunks, generate_arn, snapshot_identifier)
from c7n.resources.kms import ResourceKmsKeyAlias
log = logging.getLogger('custodian.rds')
filters = FilterRegistry('rds.filters')
actions = ActionRegistry('rds.actions')
[docs]@resources.register('rds')
class RDS(QueryResourceManager):
"""Resource manager for RDS DB instances.
"""
[docs] class resource_type(object):
service = 'rds'
type = 'db'
enum_spec = ('describe_db_instances', 'DBInstances', None)
id = 'DBInstanceIdentifier'
name = 'Endpoint.Address'
filter_name = 'DBInstanceIdentifier'
filter_type = 'scalar'
date = 'InstanceCreateTime'
dimension = 'DBInstanceIdentifier'
config_type = 'AWS::RDS::DBInstance'
arn = 'DBInstanceArn'
default_report_fields = (
'DBInstanceIdentifier',
'DBName',
'Engine',
'EngineVersion',
'MultiAZ',
'AllocatedStorage',
'StorageEncrypted',
'PubliclyAccessible',
'InstanceCreateTime',
)
filter_registry = filters
action_registry = actions
_generate_arn = None
def __init__(self, data, options):
super(RDS, self).__init__(data, options)
@property
def generate_arn(self):
return functools.partial(
generate_arn, 'rds',
region=self.config.region,
account_id=self.config.account_id,
resource_type='db', separator=':')
[docs] def get_source(self, source_type):
if source_type == 'describe':
return DescribeRDS(self)
elif source_type == 'config':
return ConfigRDS(self)
raise ValueError("Unsupported source: %s for %s" % (
source_type, self.resource_type.config_type))
[docs]class DescribeRDS(DescribeSource):
[docs] def augment(self, dbs):
return universal_augment(
self.manager, super(DescribeRDS, self).augment(dbs))
[docs]class ConfigRDS(ConfigSource):
[docs] def load_resource(self, item):
resource = super(ConfigRDS, self).load_resource(item)
resource['Tags'] = [{u'Key': t['key'], u'Value': t['value']}
for t in item['supplementaryConfiguration']['Tags']]
return resource
register_universal_tags(
RDS.filter_registry,
RDS.action_registry)
def _db_instance_eligible_for_backup(resource):
db_instance_id = resource['DBInstanceIdentifier']
# Database instance is not in available state
if resource.get('DBInstanceStatus', '') != 'available':
log.debug(
"DB instance %s is not in available state",
db_instance_id)
return False
# The specified DB Instance is a member of a cluster and its
# backup retention should not be modified directly. Instead,
# modify the backup retention of the cluster using the
# ModifyDbCluster API
if resource.get('DBClusterIdentifier', ''):
log.debug(
"DB instance %s is a cluster member",
db_instance_id)
return False
# DB Backups not supported on a read replica for engine postgres
if (resource.get('ReadReplicaSourceDBInstanceIdentifier', '') and
resource.get('Engine', '') == 'postgres'):
log.debug(
"DB instance %s is a postgres read-replica",
db_instance_id)
return False
# DB Backups not supported on a read replica running a mysql
# version before 5.6
if (resource.get('ReadReplicaSourceDBInstanceIdentifier', '') and
resource.get('Engine', '') == 'mysql'):
engine_version = resource.get('EngineVersion', '')
# Assume "<major>.<minor>.<whatever>"
match = re.match(r'(?P<major>\d+)\.(?P<minor>\d+)\..*', engine_version)
if (match and int(match.group('major')) < 5 or
(int(match.group('major')) == 5 and int(match.group('minor')) < 6)):
log.debug(
"DB instance %s is a version %s mysql read-replica",
db_instance_id,
engine_version)
return False
return True
def _db_instance_eligible_for_final_snapshot(resource):
status = resource.get('DBInstanceStatus', '')
# If the DB instance you are deleting has a status of "Creating,"
# you will not be able to have a final DB snapshot taken
# If the DB instance is in a failure state with a status of "failed,"
# "incompatible-restore," or "incompatible-network," you can only delete
# the instance when the SkipFinalSnapshot parameter is set to "true."
eligible_for_final_snapshot = True
if status in ['creating', 'failed', 'incompatible-restore', 'incompatible-network']:
eligible_for_final_snapshot = False
# FinalDBSnapshotIdentifier can not be specified when deleting a
# replica instance
if resource.get('ReadReplicaSourceDBInstanceIdentifier', ''):
eligible_for_final_snapshot = False
# if it's a rds-cluster, don't try to run the rds instance snapshot api call
if resource.get('DBClusterIdentifier', False):
eligible_for_final_snapshot = False
if not eligible_for_final_snapshot:
log.debug('DB instance is not eligible for a snapshot:/n %s', resource)
return eligible_for_final_snapshot
def _get_available_engine_upgrades(client, major=False):
"""Returns all extant rds engine upgrades.
As a nested mapping of engine type to known versions
and their upgrades.
Defaults to minor upgrades, but configurable to major.
Example::
>>> _get_engine_upgrades(client)
{
'oracle-se2': {'12.1.0.2.v2': '12.1.0.2.v5',
'12.1.0.2.v3': '12.1.0.2.v5'},
'postgres': {'9.3.1': '9.3.14',
'9.3.10': '9.3.14',
'9.3.12': '9.3.14',
'9.3.2': '9.3.14'}
}
"""
results = {}
engine_versions = client.describe_db_engine_versions()['DBEngineVersions']
for v in engine_versions:
if not v['Engine'] in results:
results[v['Engine']] = {}
if 'ValidUpgradeTarget' not in v or len(v['ValidUpgradeTarget']) == 0:
continue
for t in v['ValidUpgradeTarget']:
if not major and t['IsMajorVersionUpgrade']:
continue
if LooseVersion(t['EngineVersion']) > LooseVersion(
results[v['Engine']].get(v['EngineVersion'], '0.0.0')):
results[v['Engine']][v['EngineVersion']] = t['EngineVersion']
return results
[docs]@filters.register('offhour')
class RDSOffHour(OffHour):
"""Scheduled action on rds instance.
"""
[docs]@filters.register('onhour')
class RDSOnHour(OnHour):
"""Scheduled action on rds instance."""
[docs]@filters.register('default-vpc')
class DefaultVpc(net_filters.DefaultVpcBase):
""" Matches if an rds database is in the default vpc
:example:
.. code-block:: yaml
policies:
- name: default-vpc-rds
resource: rds
filters:
- type: default-vpc
"""
schema = type_schema('default-vpc')
def __call__(self, rdb):
return self.match(rdb['DBSubnetGroup']['VpcId'])
[docs]@filters.register('security-group')
class SecurityGroupFilter(net_filters.SecurityGroupFilter):
RelatedIdsExpression = "VpcSecurityGroups[].VpcSecurityGroupId"
[docs]@filters.register('subnet')
class SubnetFilter(net_filters.SubnetFilter):
RelatedIdsExpression = "DBSubnetGroup.Subnets[].SubnetIdentifier"
[docs]@filters.register('vpc')
class VpcFilter(net_filters.VpcFilter):
RelatedIdsExpression = "DBSubnetGroup.Subnets[].VpcId"
filters.register('network-location', net_filters.NetworkLocation)
[docs]@filters.register('kms-alias')
class KmsKeyAlias(ResourceKmsKeyAlias):
[docs] def process(self, dbs, event=None):
return self.get_matching_aliases(dbs)
[docs]@actions.register('auto-patch')
class AutoPatch(BaseAction):
"""Toggle AutoMinorUpgrade flag on RDS instance
'window' parameter needs to be in the format 'ddd:hh:mm-ddd:hh:mm' and
have at least 30 minutes between start & end time.
If 'window' is not specified, AWS will assign a random maintenance window
to each instance selected.
:example:
.. code-block:: yaml
policies:
- name: enable-rds-autopatch
resource: rds
filters:
- AutoMinorVersionUpgrade: false
actions:
- type: auto-patch
minor: true
window: Mon:23:00-Tue:01:00
"""
schema = type_schema(
'auto-patch',
minor={'type': 'boolean'}, window={'type': 'string'})
permissions = ('rds:ModifyDBInstance',)
[docs] def process(self, dbs):
client = local_session(
self.manager.session_factory).client('rds')
params = {'AutoMinorVersionUpgrade': self.data.get('minor', True)}
if self.data.get('window'):
params['PreferredMaintenanceWindow'] = self.data['window']
for db in dbs:
client.modify_db_instance(
DBInstanceIdentifier=db['DBInstanceIdentifier'],
**params)
[docs]@filters.register('upgrade-available')
class UpgradeAvailable(Filter):
""" Scan DB instances for available engine upgrades
This will pull DB instances & check their specific engine for any
engine version with higher release numbers than the current one
This will also annotate the rds instance with 'target_engine' which is
the most recent version of the engine available
:example:
.. code-block:: yaml
policies:
- name: rds-upgrade-available
resource: rds
filters:
- type: upgrade-available
major: False
"""
schema = type_schema('upgrade-available',
major={'type': 'boolean'},
value={'type': 'boolean'})
permissions = ('rds:DescribeDBEngineVersions',)
[docs] def process(self, resources, event=None):
client = local_session(self.manager.session_factory).client('rds')
check_upgrade_extant = self.data.get('value', True)
check_major = self.data.get('major', False)
engine_upgrades = _get_available_engine_upgrades(
client, major=check_major)
results = []
for r in resources:
target_upgrade = engine_upgrades.get(
r['Engine'], {}).get(r['EngineVersion'])
if target_upgrade is None:
if check_upgrade_extant is False:
results.append(r)
continue
r['c7n-rds-engine-upgrade'] = target_upgrade
results.append(r)
return results
[docs]@actions.register('upgrade')
class UpgradeMinor(BaseAction):
"""Upgrades a RDS instance to the latest major/minor version available
Use of the 'immediate' flag (default False) will automatically upgrade
the RDS engine disregarding the existing maintenance window.
:example:
.. code-block:: yaml
policies:
- name: upgrade-rds-minor
resource: rds
actions:
- type: upgrade
major: False
immediate: False
"""
schema = type_schema(
'upgrade',
major={'type': 'boolean'},
immediate={'type': 'boolean'})
permissions = ('rds:ModifyDBInstance',)
[docs] def process(self, resources):
client = local_session(self.manager.session_factory).client('rds')
engine_upgrades = None
for r in resources:
if 'EngineVersion' in r['PendingModifiedValues']:
# Upgrade has already been scheduled
continue
if 'c7n-rds-engine-upgrade' not in r:
if engine_upgrades is None:
engine_upgrades = _get_available_engine_upgrades(
client, major=self.data.get('major', False))
target = engine_upgrades.get(
r['Engine'], {}).get(r['EngineVersion'])
if target is None:
log.debug(
"implicit filter no upgrade on %s",
r['DBInstanceIdentifier'])
continue
r['c7n-rds-engine-upgrade'] = target
client.modify_db_instance(
DBInstanceIdentifier=r['DBInstanceIdentifier'],
EngineVersion=r['c7n-rds-engine-upgrade'],
ApplyImmediately=self.data.get('immediate', False))
[docs]@actions.register('tag-trim')
class TagTrim(tags.TagTrim):
permissions = ('rds:RemoveTagsFromResource',)
[docs] def process_tag_removal(self, client, resource, candidates):
client.remove_tags_from_resource(ResourceName=resource['DBInstanceArn'], TagKeys=candidates)
START_STOP_ELIGIBLE_ENGINES = {
'postgres', 'sqlserver-ee',
'oracle-se2', 'mariadb', 'oracle-ee',
'sqlserver-ex', 'sqlserver-se', 'oracle-se',
'mysql', 'oracle-se1', 'sqlserver-web'}
def _eligible_start_stop(db, state="available"):
# See conditions noted here
# https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_StopInstance.html
# Note that this doesn't really specify what happens for all the nosql engines
# that are available as rds engines.
if db.get('DBInstanceStatus') != state:
return False
if db.get('MultiAZ') and db['Engine'].startswith('sqlserver-'):
return False
if db['Engine'] not in START_STOP_ELIGIBLE_ENGINES:
return False
if db.get('ReadReplicaDBInstanceIdentifiers'):
return False
if db.get('ReadReplicaSourceDBInstanceIdentifier'):
return False
# TODO is SQL Server mirror is detectable.
return True
[docs]@actions.register('stop')
class Stop(BaseAction):
"""Stop an rds instance.
https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_StopInstance.html
"""
schema = type_schema('stop')
permissions = ("rds:StopDBInstance",)
[docs] def process(self, resources):
client = local_session(self.manager.session_factory).client('rds')
for r in filter(_eligible_start_stop, resources):
try:
client.stop_db_instance(
DBInstanceIdentifier=r['DBInstanceIdentifier'])
except ClientError as e:
log.exception(
"Error stopping db instance:%s err:%s",
r['DBInstanceIdentifier'], e)
[docs]@actions.register('start')
class Start(BaseAction):
"""Start an rds instance.
"""
schema = type_schema('start')
permissions = ("rds:StartDBInstance",)
[docs] def process(self, resources):
client = local_session(self.manager.session_factory).client('rds')
start_filter = functools.partial(_eligible_start_stop, state='stopped')
for r in filter(start_filter, resources):
try:
client.start_db_instance(
DBInstanceIdentifier=r['DBInstanceIdentifier'])
except ClientError as e:
log.exception(
"Error starting db instance:%s err:%s",
r['DBInstanceIdentifier'], e)
[docs]@actions.register('delete')
class Delete(BaseAction):
"""Deletes selected RDS instances
This will delete RDS instances. It is recommended to apply with a filter
to avoid deleting all RDS instances in the account.
:example:
.. code-block:: yaml
policies:
- name: rds-delete
resource: rds
filters:
- default-vpc
actions:
- type: delete
skip-snapshot: true
"""
schema = type_schema('delete', **{
'skip-snapshot': {'type': 'boolean'},
'copy-restore-info': {'type': 'boolean'}
})
permissions = ('rds:DeleteDBInstance', 'rds:AddTagsToResource')
[docs] def validate(self):
if self.data.get('skip-snapshot', False) and self.data.get(
'copy-restore-info'):
raise PolicyValidationError(
"skip-snapshot cannot be specified with copy-restore-info on %s" % (
self.manager.data,))
return self
[docs] def process(self, dbs):
skip = self.data.get('skip-snapshot', False)
# Concurrency feels like overkill here.
client = local_session(self.manager.session_factory).client('rds')
for db in dbs:
params = dict(
DBInstanceIdentifier=db['DBInstanceIdentifier'])
if skip or not _db_instance_eligible_for_final_snapshot(db):
params['SkipFinalSnapshot'] = True
else:
params['FinalDBSnapshotIdentifier'] = snapshot_identifier(
'Final', db['DBInstanceIdentifier'])
if self.data.get('copy-restore-info', False):
self.copy_restore_info(client, db)
if not db['CopyTagsToSnapshot']:
client.modify_db_instance(
DBInstanceIdentifier=db['DBInstanceIdentifier'],
CopyTagsToSnapshot=True)
self.log.info(
"Deleting rds: %s snapshot: %s",
db['DBInstanceIdentifier'],
params.get('FinalDBSnapshotIdentifier', False))
try:
client.delete_db_instance(**params)
except ClientError as e:
if e.response['Error']['Code'] == "InvalidDBInstanceState":
continue
raise
return dbs
[docs] def copy_restore_info(self, client, instance):
tags = []
tags.append({
'Key': 'VPCSecurityGroups',
'Value': ''.join([
g['VpcSecurityGroupId'] for g in instance['VpcSecurityGroups']
])})
tags.append({
'Key': 'OptionGroupName',
'Value': instance['OptionGroupMemberships'][0]['OptionGroupName']})
tags.append({
'Key': 'ParameterGroupName',
'Value': instance['DBParameterGroups'][0]['DBParameterGroupName']})
tags.append({
'Key': 'InstanceClass',
'Value': instance['DBInstanceClass']})
tags.append({
'Key': 'StorageType',
'Value': instance['StorageType']})
tags.append({
'Key': 'MultiAZ',
'Value': str(instance['MultiAZ'])})
tags.append({
'Key': 'DBSubnetGroupName',
'Value': instance['DBSubnetGroup']['DBSubnetGroupName']})
client.add_tags_to_resource(
ResourceName=self.manager.generate_arn(
instance['DBInstanceIdentifier']),
Tags=tags)
[docs]@RDS.action_registry.register("post-finding")
class DbInstanceFinding(OtherResourcePostFinding):
fields = [
{'key': 'DBSubnetGroupName', 'expr': 'DBSubnetGroup.DBSubnetGroupName'},
{'key': 'VpcId', 'expr': 'DBSubnetGroup.VpcId'},
]
[docs]@actions.register('snapshot')
class Snapshot(BaseAction):
"""Creates a manual snapshot of a RDS instance
:example:
.. code-block:: yaml
policies:
- name: rds-snapshot
resource: rds
actions:
- snapshot
"""
schema = type_schema('snapshot')
permissions = ('rds:CreateDBSnapshot',)
[docs] def process(self, dbs):
with self.executor_factory(max_workers=3) as w:
futures = []
for db in dbs:
futures.append(w.submit(
self.process_rds_snapshot,
db))
for f in as_completed(futures):
if f.exception():
self.log.error(
"Exception creating rds snapshot \n %s",
f.exception())
return dbs
[docs] def process_rds_snapshot(self, resource):
if not _db_instance_eligible_for_backup(resource):
return
c = local_session(self.manager.session_factory).client('rds')
c.create_db_snapshot(
DBSnapshotIdentifier=snapshot_identifier(
self.data.get('snapshot-prefix', 'Backup'),
resource['DBInstanceIdentifier']),
DBInstanceIdentifier=resource['DBInstanceIdentifier'])
[docs]@actions.register('resize')
class ResizeInstance(BaseAction):
"""Change the allocated storage of an rds instance.
:example:
This will find databases using over 85% of their allocated
storage, and resize them to have an additional 30% storage
the resize here is async during the next maintenance.
.. code-block:: yaml
policies:
- name: rds-snapshot-retention
resource: rds
filters:
- type: metrics
name: FreeStorageSpace
percent-attr: AllocatedStorage
attr-multiplier: 1073741824
value: 90
op: greater-than
actions:
- type: resize
percent: 30
This will find databases using under 20% of their allocated
storage, and resize them to be 30% smaller, the resize here
is configured to be immediate.
.. code-block:: yaml
policies:
- name: rds-snapshot-retention
resource: rds
filters:
- type: metrics
name: FreeStorageSpace
percent-attr: AllocatedStorage
attr-multiplier: 1073741824
value: 90
op: greater-than
actions:
- type: resize
percent: -30
immediate: true
"""
schema = type_schema(
'resize',
percent={'type': 'number'},
immediate={'type': 'boolean'})
permissions = ('rds:ModifyDBInstance',)
[docs] def process(self, resources):
c = local_session(self.manager.session_factory).client('rds')
for r in resources:
old_val = D(r['AllocatedStorage'])
_100 = D(100)
new_val = ((_100 + D(self.data['percent'])) / _100) * old_val
rounded = int(new_val.quantize(D('0'), ROUND_HALF_UP))
c.modify_db_instance(
DBInstanceIdentifier=r['DBInstanceIdentifier'],
AllocatedStorage=rounded,
ApplyImmediately=self.data.get('immediate', False))
[docs]@actions.register('retention')
class RetentionWindow(BaseAction):
"""
Sets the 'BackupRetentionPeriod' value for automated snapshots,
enforce (min, max, exact) sets retention days occordingly.
:example:
.. code-block:: yaml
policies:
- name: rds-snapshot-retention
resource: rds
filters:
- type: value
key: BackupRetentionPeriod
value: 7
op: lt
actions:
- type: retention
days: 7
copy-tags: true
enforce: exact
"""
date_attribute = "BackupRetentionPeriod"
schema = type_schema(
'retention', **{'days': {'type': 'number'},
'copy-tags': {'type': 'boolean'},
'enforce': {'type': 'string', 'enum': [
'min', 'max', 'exact']}})
permissions = ('rds:ModifyDBInstance',)
[docs] def process(self, dbs):
with self.executor_factory(max_workers=3) as w:
futures = []
for db in dbs:
futures.append(w.submit(
self.process_snapshot_retention,
db))
for f in as_completed(futures):
if f.exception():
self.log.error(
"Exception setting rds retention \n %s",
f.exception())
return dbs
[docs] def process_snapshot_retention(self, resource):
current_retention = int(resource.get('BackupRetentionPeriod', 0))
current_copy_tags = resource['CopyTagsToSnapshot']
new_retention = self.data['days']
new_copy_tags = self.data.get('copy-tags', True)
retention_type = self.data.get('enforce', 'min').lower()
if ((retention_type == 'min' or
current_copy_tags != new_copy_tags) and
_db_instance_eligible_for_backup(resource)):
self.set_retention_window(
resource,
max(current_retention, new_retention),
new_copy_tags)
return resource
if ((retention_type == 'max' or
current_copy_tags != new_copy_tags) and
_db_instance_eligible_for_backup(resource)):
self.set_retention_window(
resource,
min(current_retention, new_retention),
new_copy_tags)
return resource
if ((retention_type == 'exact' or
current_copy_tags != new_copy_tags) and
_db_instance_eligible_for_backup(resource)):
self.set_retention_window(resource, new_retention, new_copy_tags)
return resource
[docs] def set_retention_window(self, resource, retention, copy_tags):
c = local_session(self.manager.session_factory).client('rds')
c.modify_db_instance(
DBInstanceIdentifier=resource['DBInstanceIdentifier'],
BackupRetentionPeriod=retention,
CopyTagsToSnapshot=copy_tags)
[docs]@actions.register('set-public-access')
class RDSSetPublicAvailability(BaseAction):
"""
This action allows for toggling an RDS instance
'PubliclyAccessible' flag to true or false
:example:
.. code-block:: yaml
policies:
- name: disable-rds-public-accessibility
resource: rds
filters:
- PubliclyAccessible: true
actions:
- type: set-public-access
state: false
"""
schema = type_schema(
"set-public-access",
state={'type': 'boolean'})
permissions = ('rds:ModifyDBInstance',)
[docs] def set_accessibility(self, r):
client = local_session(self.manager.session_factory).client('rds')
client.modify_db_instance(
DBInstanceIdentifier=r['DBInstanceIdentifier'],
PubliclyAccessible=self.data.get('state', False))
[docs] def process(self, rds):
with self.executor_factory(max_workers=2) as w:
futures = {w.submit(self.set_accessibility, r): r for r in rds}
for f in as_completed(futures):
if f.exception():
self.log.error(
"Exception setting public access on %s \n %s",
futures[f]['DBInstanceIdentifier'], f.exception())
return rds
[docs]@resources.register('rds-subscription')
class RDSSubscription(QueryResourceManager):
[docs] class resource_type(object):
service = 'rds'
type = 'rds-subscription'
enum_spec = (
'describe_event_subscriptions', 'EventSubscriptionsList', None)
name = id = "EventSubscriptionArn"
date = "SubscriptionCreateTime"
config_type = "AWS::DB::EventSubscription"
dimension = None
# SubscriptionName isn't part of describe events results?! all the
# other subscription apis.
# filter_name = 'SubscriptionName'
# filter_type = 'scalar'
filter_name = None
filter_type = None
[docs]@resources.register('rds-snapshot')
class RDSSnapshot(QueryResourceManager):
"""Resource manager for RDS DB snapshots.
"""
[docs] class resource_type(object):
service = 'rds'
type = 'rds-snapshot'
enum_spec = ('describe_db_snapshots', 'DBSnapshots', None)
name = id = 'DBSnapshotIdentifier'
filter_name = None
filter_type = None
dimension = None
date = 'SnapshotCreateTime'
config_type = "AWS::RDS::DBSnapshot"
# Need resource_type for Universal Tagging
resource_type = "rds:snapshot"
filter_registry = FilterRegistry('rds-snapshot.filters')
action_registry = ActionRegistry('rds-snapshot.actions')
_generate_arn = None
retry = staticmethod(get_retry(('Throttled',)))
@property
def generate_arn(self):
if self._generate_arn is None:
self._generate_arn = functools.partial(
generate_arn, 'rds', region=self.config.region,
account_id=self.account_id, resource_type='snapshot',
separator=':')
return self._generate_arn
[docs] def get_source(self, source_type):
if source_type == 'describe':
return DescribeRDSSnapshot(self)
elif source_type == 'config':
return ConfigRDSSnapshot(self)
raise ValueError("Unsupported source: %s for %s" % (
source_type, self.resource_type.config_type))
[docs]class DescribeRDSSnapshot(DescribeSource):
[docs] def augment(self, snaps):
return universal_augment(
self.manager, super(DescribeRDSSnapshot, self).augment(snaps))
[docs]class ConfigRDSSnapshot(ConfigSource):
[docs] def load_resource(self, item):
resource = super(ConfigRDSSnapshot, self).load_resource(item)
resource['Tags'] = [{u'Key': t['key'], u'Value': t['value']}
for t in item['supplementaryConfiguration']['Tags']]
# TODO: Load DBSnapshotAttributes into annotation
return resource
register_universal_tags(
RDSSnapshot.filter_registry,
RDSSnapshot.action_registry)
[docs]@RDSSnapshot.filter_registry.register('onhour')
class RDSSnapshotOnHour(OnHour):
"""Scheduled action on rds snapshot."""
[docs]@RDSSnapshot.filter_registry.register('latest')
class LatestSnapshot(Filter):
"""Return the latest snapshot for each database.
"""
schema = type_schema('latest', automatic={'type': 'boolean'})
permissions = ('rds:DescribeDBSnapshots',)
[docs] def process(self, resources, event=None):
results = []
if not self.data.get('automatic', True):
resources = [r for r in resources if r['SnapshotType'] == 'manual']
for db_identifier, snapshots in itertools.groupby(
resources, operator.itemgetter('DBInstanceIdentifier')):
results.append(
sorted(snapshots,
key=operator.itemgetter('SnapshotCreateTime'))[-1])
return results
[docs]@RDSSnapshot.filter_registry.register('age')
class RDSSnapshotAge(AgeFilter):
"""Filters RDS snapshots based on age (in days)
:example:
.. code-block:: yaml
policies:
- name: rds-snapshot-expired
resource: rds-snapshot
filters:
- type: age
days: 28
op: ge
actions:
- delete
"""
schema = type_schema(
'age', days={'type': 'number'},
op={'$ref': '#/definitions/filters_common/comparison_operators'})
date_attribute = 'SnapshotCreateTime'
[docs]@RDSSnapshot.action_registry.register('restore')
class RestoreInstance(BaseAction):
"""Restore an rds instance from a snapshot.
Note this requires the snapshot or db deletion be taken
with the `copy-restore-info` boolean flag set to true, as
various instance metadata is stored on the snapshot as tags.
additional parameters to restore db instance api call be overriden
via `restore_options` settings. various modify db instance parameters
can be specified via `modify_options` settings.
"""
schema = type_schema(
'restore',
restore_options={'type': 'object'},
modify_options={'type': 'object'})
permissions = (
'rds:ModifyDBInstance',
'rds:ModifyDBParameterGroup',
'rds:ModifyOptionGroup',
'rds:RebootDBInstance',
'rds:RestoreDBInstanceFromDBSnapshot')
poll_period = 60
restore_keys = set((
'VPCSecurityGroups', 'MultiAZ', 'DBSubnetGroupName',
'InstanceClass', 'StorageType', 'ParameterGroupName',
'OptionGroupName'))
[docs] def validate(self):
found = False
for f in self.manager.iter_filters():
if isinstance(f, LatestSnapshot):
found = True
if not found:
# do we really need this...
raise PolicyValidationError(
"must filter by latest to use restore action %s" % (
self.manager.data,))
return self
[docs] def process(self, resources):
client = local_session(self.manager.session_factory).client('rds')
# restore up to 10 in parallel, we have to wait on each.
with self.executor_factory(
max_workers=min(10, len(resources) or 1)) as w:
futures = {}
for r in resources:
tags = {t['Key']: t['Value'] for t in r['Tags']}
if not set(tags).issuperset(self.restore_keys):
self.log.warning(
"snapshot:%s missing restore tags",
r['DBSnapshotIdentifier'])
continue
futures[w.submit(self.process_instance, client, r)] = r
for f in as_completed(futures):
r = futures[f]
if f.exception():
self.log.warning(
"Error restoring db:%s from:%s error:\n%s",
r['DBInstanceIdentifier'], r['DBSnapshotIdentifier'],
f.exception())
continue
[docs] def process_instance(self, client, r):
params, post_modify = self.get_restore_from_tags(r)
self.manager.retry(
client.restore_db_instance_from_db_snapshot, **params)
waiter = client.get_waiter('db_instance_available')
# wait up to 40m
waiter.config.delay = self.poll_period
waiter.wait(DBInstanceIdentifier=params['DBInstanceIdentifier'])
self.manager.retry(
client.modify_db_instance,
DBInstanceIdentifier=params['DBInstanceIdentifier'],
ApplyImmediately=True,
**post_modify)
self.manager.retry(
client.reboot_db_instance,
DBInstanceIdentifier=params['DBInstanceIdentifier'],
ForceFailover=False)
[docs]@RDSSnapshot.filter_registry.register('cross-account')
class CrossAccountAccess(CrossAccountAccessFilter):
permissions = ('rds:DescribeDBSnapshotAttributes',)
[docs] def process(self, resources, event=None):
self.accounts = self.get_accounts()
results = []
with self.executor_factory(max_workers=2) as w:
futures = []
for resource_set in chunks(resources, 20):
futures.append(w.submit(
self.process_resource_set, resource_set))
for f in as_completed(futures):
if f.exception():
self.log.error(
"Exception checking cross account access\n %s" % (
f.exception()))
continue
results.extend(f.result())
return results
[docs] def process_resource_set(self, resource_set):
client = local_session(self.manager.session_factory).client('rds')
results = []
for r in resource_set:
attrs = {t['AttributeName']: t['AttributeValues']
for t in client.describe_db_snapshot_attributes(
DBSnapshotIdentifier=r['DBSnapshotIdentifier'])[
'DBSnapshotAttributesResult']['DBSnapshotAttributes']}
r['c7n:attributes'] = attrs
shared_accounts = set(attrs.get('restore', []))
delta_accounts = shared_accounts.difference(self.accounts)
if delta_accounts:
r['c7n:CrossAccountViolations'] = list(delta_accounts)
results.append(r)
return results
[docs]@RDSSnapshot.action_registry.register('region-copy')
class RegionCopySnapshot(BaseAction):
"""Copy a snapshot across regions.
Note there is a max in flight for cross region rds snapshots
of 5 per region. This action will attempt to retry automatically
for an hr.
Example::
- name: copy-encrypted-snapshots
description: |
copy snapshots under 1 day old to dr region with kms
resource: rdb-snapshot
region: us-east-1
filters:
- Status: available
- type: value
key: SnapshotCreateTime
value_type: age
value: 1
op: less-than
actions:
- type: region-copy
target_region: us-east-2
target_key: arn:aws:kms:us-east-2:0000:key/cb291f53-c9cf61
copy_tags: true
tags:
- OriginRegion: us-east-1
"""
schema = type_schema(
'region-copy',
target_region={'type': 'string'},
target_key={'type': 'string'},
copy_tags={'type': 'boolean'},
tags={'type': 'object'},
required=('target_region',))
permissions = ('rds:CopyDBSnapshot',)
min_delay = 120
max_attempts = 30
[docs] def validate(self):
if self.data.get('target_region') and self.manager.data.get('mode'):
raise PolicyValidationError(
"cross region snapshot may require waiting for "
"longer then lambda runtime allows %s" % (self.manager.data,))
return self
[docs] def process(self, resources):
if self.data['target_region'] == self.manager.config.region:
self.log.warning(
"Source and destination region are the same, skipping copy")
return
for resource_set in chunks(resources, 20):
self.process_resource_set(resource_set)
[docs] def process_resource(self, target, key, tags, snapshot):
p = {}
if key:
p['KmsKeyId'] = key
p['TargetDBSnapshotIdentifier'] = snapshot[
'DBSnapshotIdentifier'].replace(':', '-')
p['SourceRegion'] = self.manager.config.region
p['SourceDBSnapshotIdentifier'] = snapshot['DBSnapshotArn']
if self.data.get('copy_tags', True):
p['CopyTags'] = True
if tags:
p['Tags'] = tags
retry = get_retry(
('SnapshotQuotaExceeded',),
# TODO make this configurable, class defaults to 1hr
min_delay=self.min_delay,
max_attempts=self.max_attempts,
log_retries=logging.DEBUG)
try:
result = retry(target.copy_db_snapshot, **p)
except ClientError as e:
if e.response['Error']['Code'] == 'DBSnapshotAlreadyExists':
self.log.warning(
"Snapshot %s already exists in target region",
snapshot['DBSnapshotIdentifier'])
return
raise
snapshot['c7n:CopiedSnapshot'] = result[
'DBSnapshot']['DBSnapshotArn']
[docs] def process_resource_set(self, resource_set):
target_client = self.manager.session_factory(
region=self.data['target_region']).client('rds')
target_key = self.data.get('target_key')
tags = [{'Key': k, 'Value': v} for k, v
in self.data.get('tags', {}).items()]
for snapshot_set in chunks(resource_set, 5):
for r in snapshot_set:
# If tags are supplied, copy tags are ignored, and
# we need to augment the tag set with the original
# resource tags to preserve the common case.
rtags = tags and list(tags) or None
if tags and self.data.get('copy_tags', True):
rtags.extend(r['Tags'])
self.process_resource(target_client, target_key, rtags, r)
[docs]@RDSSnapshot.action_registry.register('delete')
class RDSSnapshotDelete(BaseAction):
"""Deletes a RDS snapshot resource
:example:
.. code-block:: yaml
policies:
- name: rds-snapshot-delete-stale
resource: rds-snapshot
filters:
- type: age
days: 28
op: ge
actions:
- delete
"""
schema = type_schema('delete')
permissions = ('rds:DeleteDBSnapshot',)
[docs] def process(self, snapshots):
log.info("Deleting %d rds snapshots", len(snapshots))
with self.executor_factory(max_workers=3) as w:
futures = []
for snapshot_set in chunks(reversed(snapshots), size=50):
futures.append(
w.submit(self.process_snapshot_set, snapshot_set))
for f in as_completed(futures):
if f.exception():
self.log.error(
"Exception deleting snapshot set \n %s",
f.exception())
return snapshots
[docs] def process_snapshot_set(self, snapshots_set):
c = local_session(self.manager.session_factory).client('rds')
for s in snapshots_set:
c.delete_db_snapshot(
DBSnapshotIdentifier=s['DBSnapshotIdentifier'])
[docs]@actions.register('modify-security-groups')
class RDSModifyVpcSecurityGroups(ModifyVpcSecurityGroupsAction):
permissions = ('rds:ModifyDBInstance', 'rds:ModifyDBCluster')
[docs] def process(self, rds_instances):
replication_group_map = {}
client = local_session(self.manager.session_factory).client('rds')
groups = super(RDSModifyVpcSecurityGroups, self).get_groups(
rds_instances)
# either build map for DB cluster or modify DB instance directly
for idx, i in enumerate(rds_instances):
if i.get('DBClusterIdentifier'):
# build map of Replication Groups to Security Groups
replication_group_map[i['DBClusterIdentifier']] = groups[idx]
else:
client.modify_db_instance(
DBInstanceIdentifier=i['DBInstanceIdentifier'],
VpcSecurityGroupIds=groups[idx])
# handle DB cluster, if necessary
for idx, r in enumerate(replication_group_map.keys()):
client.modify_db_cluster(
DBClusterIdentifier=r,
VpcSecurityGroupIds=replication_group_map[r]
)
[docs]@resources.register('rds-subnet-group')
class RDSSubnetGroup(QueryResourceManager):
"""RDS subnet group."""
[docs] class resource_type(object):
service = 'rds'
type = 'rds-subnet-group'
id = name = 'DBSubnetGroupName'
enum_spec = (
'describe_db_subnet_groups', 'DBSubnetGroups', None)
filter_name = 'DBSubnetGroupName'
filter_type = 'scalar'
dimension = None
date = None
[docs] def augment(self, resources):
_db_subnet_group_tags(
resources, self.session_factory, self.executor_factory, self.retry)
return resources
def _db_subnet_group_tags(subnet_groups, session_factory, executor_factory, retry):
client = local_session(session_factory).client('rds')
def process_tags(g):
try:
g['Tags'] = client.list_tags_for_resource(
ResourceName=g['DBSubnetGroupArn'])['TagList']
return g
except client.exceptions.DBSubnetGroupNotFoundFault:
return None
return list(filter(None, map(process_tags, subnet_groups)))
[docs]@RDSSubnetGroup.action_registry.register('delete')
class RDSSubnetGroupDeleteAction(BaseAction):
"""Action to delete RDS Subnet Group
It is recommended to apply a filter to the delete policy to avoid unwanted
deletion of any rds subnet groups.
:example:
.. code-block:: yaml
policies:
- name: rds-subnet-group-delete-unused
resource: rds-subnet-group
filters:
- Instances: []
actions:
- delete
"""
schema = type_schema('delete')
permissions = ('rds:DeleteDBSubnetGroup',)
[docs] def process(self, subnet_group):
with self.executor_factory(max_workers=2) as w:
list(w.map(self.process_subnetgroup, subnet_group))
[docs] def process_subnetgroup(self, subnet_group):
client = local_session(self.manager.session_factory).client('rds')
client.delete_db_subnet_group(DBSubnetGroupName=subnet_group['DBSubnetGroupName'])
[docs]@RDSSubnetGroup.filter_registry.register('unused')
class UnusedRDSSubnetGroup(Filter):
"""Filters all launch rds subnet groups that are not in use but exist
:example:
.. code-block:: yaml
policies:
- name: rds-subnet-group-delete-unused
resource: rds-subnet-group
filters:
- unused
"""
schema = type_schema('unused')
[docs] def get_permissions(self):
return self.manager.get_resource_manager('rds').get_permissions()
[docs] def process(self, configs, event=None):
rds = self.manager.get_resource_manager('rds').resources()
self.used = set([
r.get('DBSubnetGroupName', r['DBInstanceIdentifier'])
for r in rds])
return super(UnusedRDSSubnetGroup, self).process(configs)
def __call__(self, config):
return config['DBSubnetGroupName'] not in self.used
[docs]@filters.register('db-parameter')
class ParameterFilter(ValueFilter):
"""
Applies value type filter on set db parameter values.
:example:
.. code-block:: yaml
policies:
- name: rds-pg
resource: rds
filters:
- type: db-parameter
key: someparam
op: eq
value: someval
"""
schema = type_schema('db-parameter', rinherit=ValueFilter.schema)
permissions = ('rds:DescribeDBInstances', 'rds:DescribeDBParameters', )
[docs] @staticmethod
def recast(val, datatype):
""" Re-cast the value based upon an AWS supplied datatype
and treat nulls sensibly.
"""
ret_val = val
if datatype == 'string':
ret_val = str(val)
elif datatype == 'boolean':
# AWS returns 1s and 0s for boolean for most of the cases
if val.isdigit():
ret_val = bool(int(val))
# AWS returns 'TRUE,FALSE' for Oracle engine
elif val == 'TRUE':
ret_val = True
elif val == 'FALSE':
ret_val = False
elif datatype == 'integer':
if val.isdigit():
ret_val = int(val)
elif datatype == 'float':
ret_val = float(val) if val else 0.0
return ret_val
[docs] def process(self, resources, event=None):
results = []
paramcache = {}
client = local_session(self.manager.session_factory).client('rds')
paginator = client.get_paginator('describe_db_parameters')
param_groups = {db['DBParameterGroups'][0]['DBParameterGroupName']
for db in resources}
for pg in param_groups:
cache_key = {
'region': self.manager.config.region,
'account_id': self.manager.config.account_id,
'rds-pg': pg}
pg_values = self.manager._cache.get(cache_key)
if pg_values is not None:
paramcache[pg] = pg_values
continue
param_list = list(itertools.chain(*[p['Parameters']
for p in paginator.paginate(DBParameterGroupName=pg)]))
paramcache[pg] = {
p['ParameterName']: self.recast(p['ParameterValue'], p['DataType'])
for p in param_list if 'ParameterValue' in p}
self.manager._cache.save(cache_key, paramcache[pg])
for resource in resources:
for pg in resource['DBParameterGroups']:
pg_values = paramcache[pg['DBParameterGroupName']]
if self.match(pg_values):
resource.setdefault('c7n:MatchedDBParameter', []).append(
self.data.get('key'))
results.append(resource)
break
return results
[docs]@actions.register('modify-db')
class ModifyDb(BaseAction):
"""Modifies an RDS instance based on specified parameter
using ModifyDbInstance.
'Update' is an array with with key value pairs that should be set to
the property and value you wish to modify.
'Immediate" determines whether the modification is applied immediately
or not. If 'immediate' is not specified, default is false.
:example:
.. code-block:: yaml
policies:
- name: disable-rds-deletion-protection
resource: rds
filters:
- DeletionProtection: true
- PubliclyAccessible: true
actions:
- type: modify-db
update:
- property: 'DeletionProtection'
value: false
- property: 'PubliclyAccessible'
value: false
immediate: true
"""
schema = type_schema(
'modify-db',
immediate={"type": 'boolean'},
update={
'type': 'array',
'items': {
'type': 'object',
'properties': {
'property': {'type': 'string', 'enum': [
'AllocatedStorage',
'DBInstanceClass',
'DBSubnetGroupName',
'DBSecurityGroups',
'VpcSecurityGroupIds',
'MasterUserPassword',
'DBParameterGroupName',
'BackupRetentionPeriod',
'PreferredBackupWindow',
'PreferredMaintenanceWindow',
'MultiAZ',
'EngineVersion',
'AllowMajorVersionUpgrade',
'AutoMinorVersionUpgrade',
'LicenseModel',
'Iops',
'OptionGroupName',
'NewDBInstanceIdentifier',
'StorageType',
'TdeCredentialArn',
'TdeCredentialPassword',
'CACertificateIdentifier',
'Domain',
'CopyTagsToSnapshot',
'MonitoringInterval',
'DBPortNumber',
'PubliclyAccessible',
'DomainIAMRoleName',
'PromotionTier',
'EnableIAMDatabaseAuthentication',
'EnablePerformanceInsights',
'PerformanceInsightsKMSKeyId',
'PerformanceInsightsRetentionPeriod',
'CloudwatchLogsExportConfiguration',
'UseDefaultProcessorFeatures',
'DeletionProtection']},
'value': {}
},
},
},
required=('update',))
permissions = ('rds:ModifyDBInstance',)
[docs] def process(self, resources):
c = local_session(self.manager.session_factory).client('rds')
for r in resources:
param = {}
for update in self.data.get('update'):
if r[update['property']] != update['value']:
param[update['property']] = update['value']
if not param:
continue
param['ApplyImmediately'] = self.data.get('immediate', False)
param['DBInstanceIdentifier'] = r['DBInstanceIdentifier']
try:
c.modify_db_instance(**param)
except c.exceptions.DBInstanceNotFoundFault:
raise
[docs]@resources.register('rds-reserved')
class ReservedRDS(QueryResourceManager):
[docs] class resource_type(object):
service = 'rds'
name = id = 'ReservedDBInstanceId'
date = 'StartTime'
enum_spec = (
'describe_reserved_db_instances', 'ReservedDBInstances', None)
filter_name = 'ReservedDBInstances'
filter_type = 'list'
dimension = None
type = "reserved-db"