# Copyright 2016-2017 Capital One Services, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import, division, print_function, unicode_literals
import functools
import logging
import re
from datetime import datetime
from concurrent.futures import as_completed
from dateutil.tz import tzutc
from dateutil.parser import parse
from c7n.actions import (
ActionRegistry, BaseAction, ModifyVpcSecurityGroupsAction)
from c7n.filters import FilterRegistry, AgeFilter
import c7n.filters.vpc as net_filters
from c7n.manager import resources
from c7n.query import QueryResourceManager
from c7n.tags import universal_augment
from c7n.utils import (
local_session, generate_arn,
get_retry, chunks, snapshot_identifier, type_schema)
log = logging.getLogger('custodian.elasticache')
filters = FilterRegistry('elasticache.filters')
actions = ActionRegistry('elasticache.actions')
TTYPE = re.compile('cache.t1')
[docs]@resources.register('cache-cluster')
class ElastiCacheCluster(QueryResourceManager):
[docs] class resource_type(object):
service = 'elasticache'
type = 'cluster'
enum_spec = ('describe_cache_clusters',
'CacheClusters[]', None)
name = id = 'CacheClusterId'
filter_name = 'CacheClusterId'
filter_type = 'scalar'
date = 'CacheClusterCreateTime'
dimension = 'CacheClusterId'
universal_taggable = True
filter_registry = filters
action_registry = actions
_generate_arn = None
retry = staticmethod(get_retry(('Throttled',)))
permissions = ('elasticache:ListTagsForResource',)
augment = universal_augment
@property
def generate_arn(self):
if self._generate_arn is None:
self._generate_arn = functools.partial(
generate_arn,
'elasticache',
region=self.config.region,
account_id=self.account_id,
resource_type='cluster',
separator=':')
return self._generate_arn
[docs]@filters.register('security-group')
class SecurityGroupFilter(net_filters.SecurityGroupFilter):
RelatedIdsExpression = "SecurityGroups[].SecurityGroupId"
[docs]@filters.register('subnet')
class SubnetFilter(net_filters.SubnetFilter):
"""Filters elasticache clusters based on their associated subnet
:example:
.. code-block:: yaml
policies:
- name: elasticache-in-subnet-x
resource: cache-cluster
filters:
- type: subnet
key: SubnetId
value: subnet-12ab34cd
"""
RelatedIdsExpression = ""
[docs] def process(self, resources, event=None):
self.groups = {
r['CacheSubnetGroupName']: r for r in
self.manager.get_resource_manager(
'cache-subnet-group').resources()}
return super(SubnetFilter, self).process(resources, event)
filters.register('network-location', net_filters.NetworkLocation)
[docs]@actions.register('delete')
class DeleteElastiCacheCluster(BaseAction):
"""Action to delete an elasticache cluster
To prevent unwanted deletion of elasticache clusters, it is recommended
to include a filter
:example:
.. code-block:: yaml
policies:
- name: elasticache-delete-stale-clusters
resource: cache-cluster
filters:
- type: value
value_type: age
key: CacheClusterCreateTime
op: ge
value: 90
actions:
- type: delete
skip-snapshot: false
"""
schema = type_schema(
'delete', **{'skip-snapshot': {'type': 'boolean'}})
permissions = ('elasticache:DeleteCacheCluster',
'elasticache:DeleteReplicationGroup')
[docs] def process(self, clusters):
skip = self.data.get('skip-snapshot', False)
client = local_session(
self.manager.session_factory).client('elasticache')
clusters_to_delete = []
replication_groups_to_delete = set()
for cluster in clusters:
if cluster.get('ReplicationGroupId', ''):
replication_groups_to_delete.add(cluster['ReplicationGroupId'])
else:
clusters_to_delete.append(cluster)
# added if statement to handle differences in parameters if snapshot is skipped
for cluster in clusters_to_delete:
params = {'CacheClusterId': cluster['CacheClusterId']}
if _cluster_eligible_for_snapshot(cluster) and not skip:
params['FinalSnapshotIdentifier'] = snapshot_identifier(
'Final', cluster['CacheClusterId'])
self.log.debug(
"Taking final snapshot of %s", cluster['CacheClusterId'])
else:
self.log.debug(
"Skipping final snapshot of %s", cluster['CacheClusterId'])
client.delete_cache_cluster(**params)
self.log.info(
'Deleted ElastiCache cluster: %s',
cluster['CacheClusterId'])
for replication_group in replication_groups_to_delete:
params = {'ReplicationGroupId': replication_group,
'RetainPrimaryCluster': False}
if not skip:
params['FinalSnapshotIdentifier'] = snapshot_identifier(
'Final', replication_group)
client.delete_replication_group(**params)
self.log.info(
'Deleted ElastiCache replication group: %s',
replication_group)
[docs]@actions.register('snapshot')
class SnapshotElastiCacheCluster(BaseAction):
"""Action to snapshot an elasticache cluster
:example:
.. code-block:: yaml
policies:
- name: elasticache-cluster-snapshot
resource: cache-cluster
filters:
- type: value
key: CacheClusterStatus
op: not-in
value: ["deleted","deleting","creating"]
actions:
- snapshot
"""
schema = type_schema('snapshot')
permissions = ('elasticache:CreateSnapshot',)
[docs] def process(self, clusters):
set_size = len(clusters)
clusters = [c for c in clusters if _cluster_eligible_for_snapshot(c)]
if set_size != len(clusters):
self.log.info(
"action:snapshot implicitly filtered from %d to %d clusters for snapshot support",
set_size, len(clusters))
with self.executor_factory(max_workers=2) as w:
futures = []
client = local_session(self.manager.session_factory).client('elasticache')
for cluster in clusters:
futures.append(
w.submit(self.process_cluster_snapshot, client, cluster))
for f in as_completed(futures):
if f.exception():
self.log.error(
"Exception creating cache cluster snapshot \n %s",
f.exception())
return clusters
[docs] def process_cluster_snapshot(self, client, cluster):
client.create_snapshot(
SnapshotName=snapshot_identifier(
'Backup',
cluster['CacheClusterId']),
CacheClusterId=cluster['CacheClusterId'])
[docs]@actions.register('modify-security-groups')
class ElasticacheClusterModifyVpcSecurityGroups(ModifyVpcSecurityGroupsAction):
"""Modify security groups on an Elasticache cluster.
Looks at the individual clusters and modifies the Replication
Group's configuration for Security groups so all nodes get
affected equally
"""
permissions = ('elasticache:ModifyReplicationGroup',)
[docs] def process(self, clusters):
replication_group_map = {}
client = local_session(
self.manager.session_factory).client('elasticache')
groups = super(
ElasticacheClusterModifyVpcSecurityGroups, self).get_groups(
clusters)
for idx, c in enumerate(clusters):
# build map of Replication Groups to Security Groups
replication_group_map[c['ReplicationGroupId']] = groups[idx]
for idx, r in enumerate(replication_group_map.keys()):
client.modify_replication_group(
ReplicationGroupId=r,
SecurityGroupIds=replication_group_map[r])
[docs]@resources.register('cache-subnet-group')
class ElastiCacheSubnetGroup(QueryResourceManager):
[docs] class resource_type(object):
service = 'elasticache'
type = 'subnet-group'
enum_spec = ('describe_cache_subnet_groups',
'CacheSubnetGroups', None)
name = id = 'CacheSubnetGroupName'
filter_name = 'CacheSubnetGroupName'
filter_type = 'scalar'
date = None
dimension = None
[docs]@resources.register('cache-snapshot')
class ElastiCacheSnapshot(QueryResourceManager):
[docs] class resource_type(object):
service = 'elasticache'
type = 'snapshot'
enum_spec = ('describe_snapshots', 'Snapshots', None)
name = id = 'SnapshotName'
filter_name = 'SnapshotName'
filter_type = 'scalar'
date = 'StartTime'
dimension = None
universal_taggable = True
permissions = ('elasticache:ListTagsForResource',)
filter_registry = FilterRegistry('elasticache-snapshot.filters')
action_registry = ActionRegistry('elasticache-snapshot.actions')
_generate_arn = None
retry = staticmethod(get_retry(('Throttled',)))
augment = universal_augment
@property
def generate_arn(self):
if self._generate_arn is None:
self._generate_arn = functools.partial(
generate_arn,
'elasticache',
region=self.config.region,
account_id=self.account_id,
resource_type='snapshot',
separator=':')
return self._generate_arn
[docs]@ElastiCacheSnapshot.filter_registry.register('age')
class ElastiCacheSnapshotAge(AgeFilter):
"""Filters elasticache snapshots based on their age (in days)
:example:
.. code-block:: yaml
policies:
- name: elasticache-stale-snapshots
resource: cache-snapshot
filters:
- type: age
days: 30
op: ge
"""
schema = type_schema(
'age', days={'type': 'number'},
op={'$ref': '#/definitions/filters_common/comparison_operators'})
date_attribute = 'dummy'
[docs] def get_resource_date(self, snapshot):
""" Override superclass method as there is no single snapshot date attribute.
"""
def to_datetime(v):
if not isinstance(v, datetime):
v = parse(v)
if not v.tzinfo:
v = v.replace(tzinfo=tzutc())
return v
# Return the earliest of the node snaphot creation times.
return min([to_datetime(ns['SnapshotCreateTime'])
for ns in snapshot['NodeSnapshots']])
[docs]@ElastiCacheSnapshot.action_registry.register('delete')
class DeleteElastiCacheSnapshot(BaseAction):
"""Action to delete elasticache snapshots
To prevent unwanted deletion of elasticache snapshots, it is recommended to
apply a filter
:example:
.. code-block:: yaml
policies:
- name: elasticache-stale-snapshots
resource: cache-snapshot
filters:
- type: age
days: 30
op: ge
actions:
- delete
"""
schema = type_schema('delete')
permissions = ('elasticache:DeleteSnapshot',)
[docs] def process(self, snapshots):
log.info("Deleting %d ElastiCache snapshots", len(snapshots))
with self.executor_factory(max_workers=3) as w:
futures = []
client = local_session(self.manager.session_factory).client('elasticache')
for snapshot_set in chunks(reversed(snapshots), size=50):
futures.append(
w.submit(self.process_snapshot_set, client, snapshot_set))
for f in as_completed(futures):
if f.exception():
self.log.error(
"Exception deleting snapshot set \n %s",
f.exception())
return snapshots
[docs] def process_snapshot_set(self, client, snapshots_set):
for s in snapshots_set:
client.delete_snapshot(SnapshotName=s['SnapshotName'])
def _cluster_eligible_for_snapshot(cluster):
# added regex search to filter unsupported cachenode types
return (
cluster['Engine'] != 'memcached' and not
TTYPE.match(cluster['CacheNodeType'])
)