# Copyright 2016-2019 Capital One Services, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import, division, print_function, unicode_literals
from botocore.exceptions import ClientError
from concurrent.futures import as_completed
from datetime import datetime
from c7n.actions import BaseAction, ModifyVpcSecurityGroupsAction
from c7n.filters.kms import KmsRelatedFilter
from c7n import query
from c7n.manager import resources
from c7n.tags import (
TagDelayedAction, RemoveTag, TagActionFilter, Tag, universal_augment,
register_universal_tags)
from c7n.utils import (
local_session, chunks, type_schema, snapshot_identifier)
from c7n.filters.vpc import SecurityGroupFilter, SubnetFilter
[docs]@resources.register('dynamodb-table')
class Table(query.QueryResourceManager):
[docs] class resource_type(object):
service = 'dynamodb'
type = 'table'
enum_spec = ('list_tables', 'TableNames', None)
detail_spec = ("describe_table", "TableName", None, "Table")
id = 'TableName'
filter_name = None
name = 'TableName'
date = 'CreationDateTime'
dimension = 'TableName'
config_type = 'AWS::DynamoDB::Table'
permissions = ('dynamodb:ListTagsOfResource',)
[docs] def get_source(self, source_type):
if source_type == 'describe':
return DescribeTable(self)
elif source_type == 'config':
return ConfigTable(self)
raise ValueError('invalid source %s' % source_type)
register_universal_tags(Table.filter_registry, Table.action_registry, False)
[docs]class ConfigTable(query.ConfigSource):
[docs] def load_resource(self, item):
resource = super(ConfigTable, self).load_resource(item)
resource['CreationDateTime'] = datetime.fromtimestamp(resource['CreationDateTime'] / 1000.0)
if 'LastUpdateToPayPerRequestDateTime' in resource['BillingModeSummary']:
resource['BillingModeSummary'][
'LastUpdateToPayPerRequestDateTime'] = datetime.fromtimestamp(
resource['BillingModeSummary']['LastUpdateToPayPerRequestDateTime'] / 1000.0)
sse_info = resource.pop('Ssedescription', None)
if sse_info is None:
return resource
resource['SSEDescription'] = sse_info
for k, r in (('KmsmasterKeyArn', 'KMSMasterKeyArn'),
('Ssetype', 'SSEType')):
if k in sse_info:
sse_info[r] = sse_info.pop(k)
return resource
[docs]class DescribeTable(query.DescribeSource):
[docs] def augment(self, resources):
return universal_augment(
self.manager,
super(DescribeTable, self).augment(resources))
[docs]class StatusFilter(object):
"""Filter tables by status"""
valid_states = ()
[docs] def filter_table_state(self, tables, states=None):
states = states or self.valid_states
orig_count = len(tables)
result = [t for t in tables if t['TableStatus'] in states]
self.log.info("%s %d of %d tables" % (
self.__class__.__name__, len(result), orig_count))
return result
[docs] def filter_backup_state(self, tables, states=None):
states = states or self.valid_states
orig_count = len(tables)
result = [t for t in tables if t['BackupStatus'] in states]
self.log.info("%s %d of %d tables" % (
self.__class__.__name__, len(result), orig_count))
return result
[docs]@Table.filter_registry.register('kms-key')
class KmsFilter(KmsRelatedFilter):
"""
Filter a resource by its associcated kms key and optionally the aliasname
of the kms key by using 'c7n:AliasName'
:example:
.. code-block:: yaml
policies:
- name: dynamodb-kms-key-filters
resource: dynamodb-table
filters:
- type: kms-key
key: c7n:AliasName
value: "^(alias/aws/dynamodb)"
op: regex
"""
RelatedIdsExpression = 'SSEDescription.KMSMasterKeyArn'
[docs]@Table.action_registry.register('delete')
class DeleteTable(BaseAction, StatusFilter):
"""Action to delete dynamodb tables
:example:
.. code-block:: yaml
policies:
- name: delete-empty-tables
resource: dynamodb-table
filters:
- TableSizeBytes: 0
actions:
- delete
"""
valid_status = ('ACTIVE',)
schema = type_schema('delete')
permissions = ("dynamodb:DeleteTable",)
[docs] def delete_table(self, client, table_set):
for t in table_set:
client.delete_table(TableName=t['TableName'])
[docs] def process(self, resources):
resources = self.filter_table_state(
resources, self.valid_status)
if not len(resources):
return
futures = []
client = local_session(self.manager.session_factory).client('dynamodb')
with self.executor_factory(max_workers=2) as w:
for table_set in chunks(resources, 20):
futures.append(w.submit(self.delete_table, client, table_set))
for f in as_completed(futures):
if f.exception():
self.log.error(
"Exception deleting dynamodb table set \n %s"
% (f.exception()))
[docs]@Table.action_registry.register('set-stream')
class SetStream(BaseAction, StatusFilter):
"""Action to enable/disable streams on table.
:example:
.. code-block:: yaml
policies:
- name: stream-update
resource: dynamodb-table
filters:
- TableName: 'test'
- TableStatus: 'ACTIVE'
actions:
- type: set-stream
state: True
stream_view_type: 'NEW_IMAGE'
"""
valid_status = ('ACTIVE',)
schema = type_schema('set-stream',
state={'type': 'boolean'},
stream_view_type={'type': 'string'})
permissions = ("dynamodb:UpdateTable",)
[docs] def process(self, tables):
tables = self.filter_table_state(
tables, self.valid_status)
if not len(tables):
self.log.warning("Table not in ACTIVE state.")
return
state = self.data.get('state')
type = self.data.get('stream_view_type')
stream_spec = {"StreamEnabled": state}
if self.data.get('stream_view_type') is not None:
stream_spec.update({"StreamViewType": type})
c = local_session(self.manager.session_factory).client('dynamodb')
with self.executor_factory(max_workers=2) as w:
futures = {w.submit(c.update_table,
TableName=t['TableName'],
StreamSpecification=stream_spec): t for t in tables}
for f in as_completed(futures):
t = futures[f]
if f.exception():
self.log.error(
"Exception updating dynamodb table set \n %s"
% (f.exception()))
continue
if self.data.get('stream_view_type') is not None:
stream_state = \
f.result()['TableDescription']['StreamSpecification']['StreamEnabled']
stream_type = \
f.result()['TableDescription']['StreamSpecification']['StreamViewType']
t['c7n:StreamState'] = stream_state
t['c7n:StreamType'] = stream_type
[docs]@Table.action_registry.register('backup')
class CreateBackup(BaseAction, StatusFilter):
"""Creates a manual backup of a DynamoDB table. Use of the optional
prefix flag will attach a user specified prefix. Otherwise,
the backup prefix will default to 'Backup'.
:example:
.. code-block:: yaml
policies:
- name: dynamodb-create-backup
resource: dynamodb-table
actions:
- type: backup
prefix: custom
"""
valid_status = ('ACTIVE',)
schema = type_schema('backup',
prefix={'type': 'string'})
permissions = ('dynamodb:CreateBackup',)
[docs] def process(self, resources):
resources = self.filter_table_state(
resources, self.valid_status)
if not len(resources):
return
c = local_session(self.manager.session_factory).client('dynamodb')
futures = {}
prefix = self.data.get('prefix', 'Backup')
with self.executor_factory(max_workers=2) as w:
for t in resources:
futures[w.submit(
c.create_backup,
BackupName=snapshot_identifier(
prefix, t['TableName']),
TableName=t['TableName'])] = t
for f in as_completed(futures):
t = futures[f]
if f.exception():
self.manager.log.warning(
"Could not complete DynamoDB backup table:%s", t)
arn = f.result()['BackupDetails']['BackupArn']
t['c7n:BackupArn'] = arn
[docs]@resources.register('dynamodb-backup')
class Backup(query.QueryResourceManager):
[docs] class resource_type(object):
service = 'dynamodb'
type = 'table'
enum_spec = ('list_backups', 'BackupSummaries', None)
detail_spec = None
id = 'Table'
filter_name = None
name = 'TableName'
date = 'BackupCreationDateTime'
dimension = 'TableName'
config_type = 'AWS::DynamoDB::Table'
[docs]@Backup.action_registry.register('delete')
class DeleteBackup(BaseAction, StatusFilter):
"""Deletes backups of a DynamoDB table
:example:
.. code-block:: yaml
policies:
- name: dynamodb-delete-backup
resource: dynamodb-backup
filters:
- type: value
key: BackupCreationDateTime
op: greater-than
value_type: age
value: 28
actions:
- type: delete
"""
valid_status = ('AVAILABLE',)
schema = type_schema('delete')
permissions = ('dynamodb:DeleteBackup',)
[docs] def process(self, backups):
backups = self.filter_backup_state(
backups, self.valid_status)
if not len(backups):
return
c = local_session(self.manager.session_factory).client('dynamodb')
for table_set in chunks(backups, 20):
self.process_dynamodb_backups(table_set, c)
[docs] def process_dynamodb_backups(self, table_set, c):
for t in table_set:
try:
c.delete_backup(
BackupArn=t['BackupArn'])
except ClientError as e:
if e.response['Error']['Code'] == 'ResourceNotFoundException':
self.log.warning("Could not complete DynamoDB backup deletion for table:%s", t)
continue
raise
[docs]@resources.register('dynamodb-stream')
class Stream(query.QueryResourceManager):
# Note stream management takes place on the table resource
[docs] class resource_type(object):
service = 'dynamodbstreams'
# Note max rate of 5 calls per second
enum_spec = ('list_streams', 'Streams', None)
# Note max rate of 10 calls per second.
detail_spec = (
"describe_stream", "StreamArn", "StreamArn", "StreamDescription")
arn = id = 'StreamArn'
# TODO, we default to filtering by id, but the api takes table names, which
# require additional client side filtering as multiple streams may be present
# per table.
# filter_name = 'TableName'
filter_name = None
name = 'TableName'
date = 'CreationDateTime'
dimension = 'TableName'
[docs]@resources.register('dax')
class DynamoDbAccelerator(query.QueryResourceManager):
[docs] class resource_type(object):
service = 'dax'
type = 'cluster'
enum_spec = ('describe_clusters', 'Clusters', None)
detail_spec = None
id = 'ClusterArn'
name = 'ClusterName'
config_type = 'AWS::DAX::Cluster'
filter_name = None
dimension = None
date = None
permissions = ('dax:ListTags',)
[docs] def get_source(self, source_type):
if source_type == 'describe':
return DescribeDaxCluster(self)
elif source_type == 'config':
return query.ConfigSource(self)
raise ValueError('invalid source %s' % source_type)
[docs] def get_resources(self, ids, cache=True, augment=True):
"""Override in order to disable the augment for serverless policies.
list_tags on dax resources always fail until the cluster is finished creating.
"""
return super(DynamoDbAccelerator, self).get_resources(ids, cache, augment=False)
[docs]class DescribeDaxCluster(query.DescribeSource):
[docs] def get_resources(self, ids, cache=True):
"""Retrieve dax resources for serverless policies or related resources
"""
client = local_session(self.manager.session_factory).client('dax')
return client.describe_clusters(ClusterNames=ids).get('Clusters')
[docs] def augment(self, clusters):
resources = super(DescribeDaxCluster, self).augment(clusters)
return list(filter(None, _dax_cluster_tags(
resources,
self.manager.session_factory,
self.manager.retry,
self.manager.log)))
def _dax_cluster_tags(tables, session_factory, retry, log):
client = local_session(session_factory).client('dax')
def process_tags(r):
try:
r['Tags'] = retry(
client.list_tags, ResourceName=r['ClusterArn'])['Tags']
return r
except (client.exceptions.ClusterNotFoundFault,
client.exceptions.InvalidClusterStateFault):
return None
return filter(None, list(map(process_tags, tables)))
DynamoDbAccelerator.filter_registry.register('marked-for-op', TagActionFilter)
[docs]@DynamoDbAccelerator.filter_registry.register('security-group')
class DaxSecurityGroupFilter(SecurityGroupFilter):
RelatedIdsExpression = "SecurityGroups[].SecurityGroupIdentifier"
[docs]@DynamoDbAccelerator.action_registry.register('tag')
class DaxTagging(Tag):
"""Action to create tag(s) on a resource
:example:
.. code-block:: yaml
policies:
- name: dax-cluster-tag
resource: dax
filters:
- "tag:target-tag": absent
actions:
- type: tag
key: target-tag
value: target-tag-value
"""
permissions = ('dax:TagResource',)
[docs] def process_resource_set(self, client, resources, tags):
mid = self.manager.resource_type.id
for r in resources:
try:
client.tag_resource(ResourceName=r[mid], Tags=tags)
except (client.exceptions.ClusterNotFoundFault,
client.exceptions.InvalidARNFault,
client.exceptions.InvalidClusterStateFault) as e:
self.log.warning('Exception tagging %s: \n%s', r['ClusterName'], e)
[docs]@DynamoDbAccelerator.action_registry.register('remove-tag')
class DaxRemoveTagging(RemoveTag):
"""Action to remove tag(s) on a resource
:example:
.. code-block:: yaml
policies:
- name: dax-remove-tag
resource: dax
filters:
- "tag:OutdatedTag": present
actions:
- type: remove-tag
tags: ["OutdatedTag"]
"""
permissions = ('dax:UntagResource',)
[docs] def process_resource_set(self, client, resources, tag_keys):
for r in resources:
try:
client.untag_resource(
ResourceName=r['ClusterArn'], TagKeys=tag_keys)
except (client.exceptions.ClusterNotFoundFault,
client.exceptions.TagNotFoundFault,
client.exceptions.InvalidClusterStateFault) as e:
self.log.warning('Exception removing tags on %s: \n%s', r['ClusterName'], e)
[docs]@DynamoDbAccelerator.action_registry.register('mark-for-op')
class DaxMarkForOp(TagDelayedAction):
"""Action to specify an action to occur at a later date
:example:
.. code-block:: yaml
policies:
- name: dax-mark-tag-compliance
resource: dax
filters:
- "tag:custodian_cleanup": absent
- "tag:OwnerName": absent
actions:
- type: mark-for-op
tag: custodian_cleanup
msg: "Missing tag 'OwnerName': {op}@{action_date}"
op: delete
days: 7
"""
[docs]@DynamoDbAccelerator.action_registry.register('delete')
class DaxDeleteCluster(BaseAction):
"""Action to delete a DAX cluster
:example:
.. code-block: yaml
policies:
- name: dax-delete-cluster
resource: dax
filters:
- "tag:DeleteMe": present
actions:
- type: delete
"""
permissions = ('dax:DeleteCluster',)
schema = type_schema('delete')
[docs] def process(self, resources):
client = local_session(self.manager.session_factory).client('dax')
for r in resources:
try:
client.delete_cluster(ClusterName=r['ClusterName'])
except (client.exceptions.ClusterNotFoundFault,
client.exceptions.InvalidARNFault,
client.exceptions.InvalidClusterStateFault) as e:
self.log.warning('Exception marking %s: \n%s', r['ClusterName'], e)
[docs]@DynamoDbAccelerator.action_registry.register('update-cluster')
class DaxUpdateCluster(BaseAction):
"""Updates a DAX cluster configuration
:example:
.. code-block: yaml
policies:
- name: dax-update-cluster
resource: dax
filters:
- ParameterGroup.ParameterGroupName: 'default.dax1.0'
actions:
- type: update-cluster
ParameterGroupName: 'testparamgroup'
"""
schema = {
'type': 'object',
'additionalProperties': False,
'properties': {
'type': {'enum': ['update-cluster']},
'Description': {'type': 'string'},
'PreferredMaintenanceWindow': {'type': 'string'},
'NotificationTopicArn': {'type': 'string'},
'NotificationTopicStatus': {'type': 'string'},
'ParameterGroupName': {'type': 'string'}
}
}
permissions = ('dax:UpdateCluster',)
[docs] def process(self, resources):
client = local_session(self.manager.session_factory).client('dax')
params = dict(self.data)
params.pop('type')
for r in resources:
params['ClusterName'] = r['ClusterName']
try:
client.update_cluster(**params)
except (client.exceptions.ClusterNotFoundFault,
client.exceptions.InvalidClusterStateFault) as e:
self.log.warning(
'Exception updating dax cluster %s: \n%s',
r['ClusterName'], e)
[docs]@DynamoDbAccelerator.action_registry.register('modify-security-groups')
class DaxModifySecurityGroup(ModifyVpcSecurityGroupsAction):
permissions = ('dax:UpdateCluster',)
[docs] def process(self, resources):
client = local_session(self.manager.session_factory).client('dax')
groups = super(DaxModifySecurityGroup, self).get_groups(resources)
for idx, r in enumerate(resources):
client.update_cluster(
ClusterName=r['ClusterName'], SecurityGroupIds=groups[idx])
[docs]@DynamoDbAccelerator.filter_registry.register('subnet')
class DaxSubnetFilter(SubnetFilter):
"""Filters DAX clusters based on their associated subnet group
:example:
.. code-block:: yaml
policies:
- name: dax-no-auto-public
resource: dax
filters:
- type: subnet
key: MapPublicIpOnLaunch
value: False
"""
RelatedIdsExpression = ""
[docs] def process(self, resources, event=None):
client = local_session(self.manager.session_factory).client('dax')
subnet_groups = client.describe_subnet_groups()['SubnetGroups']
self.groups = {s['SubnetGroupName']: s for s in subnet_groups}
return super(DaxSubnetFilter, self).process(resources)