# Copyright 2015-2017 Capital One Services, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import, division, print_function, unicode_literals
from collections import Counter
import logging
import itertools
import json
import time
from botocore.exceptions import ClientError
from concurrent.futures import as_completed
from dateutil.parser import parse as parse_date
import six
from c7n.actions import BaseAction
from c7n.exceptions import PolicyValidationError
from c7n.filters import (
CrossAccountAccessFilter, Filter, AgeFilter, ValueFilter,
ANNOTATION_KEY)
from c7n.filters.health import HealthEventFilter
from c7n.manager import resources
from c7n.resources.kms import ResourceKmsKeyAlias
from c7n.query import QueryResourceManager
from c7n.tags import Tag
from c7n.utils import (
camelResource,
chunks,
get_retry,
local_session,
set_annotation,
type_schema,
QueryParser,
)
from c7n.resources.ami import AMI
log = logging.getLogger('custodian.ebs')
[docs]@resources.register('ebs-snapshot')
class Snapshot(QueryResourceManager):
[docs] class resource_type(object):
service = 'ec2'
type = 'snapshot'
enum_spec = (
'describe_snapshots', 'Snapshots', None)
detail_spec = None
id = 'SnapshotId'
filter_name = 'SnapshotIds'
filter_type = 'list'
name = 'SnapshotId'
date = 'StartTime'
dimension = None
default_report_fields = (
'SnapshotId',
'VolumeId',
'tag:InstanceId',
'VolumeSize',
'StartTime',
'State',
)
[docs] def resources(self, query=None):
qfilters = SnapshotQueryParser.parse(self.data.get('query', []))
query = query or {}
if qfilters:
query['Filters'] = qfilters
if query.get('OwnerIds') is None:
query['OwnerIds'] = ['self']
return super(Snapshot, self).resources(query=query)
[docs] def get_resources(self, ids, cache=True, augment=True):
if cache:
resources = self._get_cached_resources(ids)
if resources is not None:
return resources
while ids:
try:
return self.source.get_resources(ids)
except ClientError as e:
bad_snap = ErrorHandler.extract_bad_snapshot(e)
if bad_snap:
ids.remove(bad_snap)
continue
raise
return []
[docs]class ErrorHandler(object):
[docs] @staticmethod
def remove_snapshot(rid, resource_set):
found = None
for r in resource_set:
if r['SnapshotId'] == rid:
found = r
break
if found:
resource_set.remove(found)
[docs] @staticmethod
def extract_bad_snapshot(e):
"""Handle various client side errors when describing snapshots"""
msg = e.response['Error']['Message']
error = e.response['Error']['Code']
e_snap_id = None
if error == 'InvalidSnapshot.NotFound':
e_snap_id = msg[msg.find("'") + 1:msg.rfind("'")]
log.warning("Snapshot not found %s" % e_snap_id)
elif error == 'InvalidSnapshotID.Malformed':
e_snap_id = msg[msg.find('"') + 1:msg.rfind('"')]
log.warning("Snapshot id malformed %s" % e_snap_id)
return e_snap_id
[docs]class SnapshotQueryParser(QueryParser):
QuerySchema = {
'description': six.string_types,
'owner-alias': ('amazon', 'amazon-marketplace', 'microsoft'),
'owner-id': six.string_types,
'progress': six.string_types,
'snapshot-id': six.string_types,
'start-time': six.string_types,
'status': ('pending', 'completed', 'error'),
'tag': six.string_types,
'tag-key': six.string_types,
'volume-id': six.string_types,
'volume-size': six.string_types,
}
type_name = 'EBS'
[docs]@Snapshot.action_registry.register('tag')
class SnapshotTag(Tag):
permissions = ('ec2:CreateTags',)
[docs] def process_resource_set(self, client, resource_set, tags):
while resource_set:
try:
return super(SnapshotTag, self).process_resource_set(
client, resource_set, tags)
except ClientError as e:
bad_snap = ErrorHandler.extract_bad_snapshot(e)
if bad_snap:
ErrorHandler.remove_snapshot(bad_snap, resource_set)
continue
raise
[docs]@Snapshot.filter_registry.register('age')
class SnapshotAge(AgeFilter):
"""EBS Snapshot Age Filter
Filters an EBS snapshot based on the age of the snapshot (in days)
:example:
.. code-block:: yaml
policies:
- name: ebs-snapshots-week-old
resource: ebs-snapshot
filters:
- type: age
days: 7
op: ge
"""
schema = type_schema(
'age',
days={'type': 'number'},
op={'$ref': '#/definitions/filters_common/comparison_operators'})
date_attribute = 'StartTime'
def _filter_ami_snapshots(self, snapshots):
if not self.data.get('value', True):
return snapshots
# try using cache first to get a listing of all AMI snapshots and compares resources to the list
# This will populate the cache.
amis = self.manager.get_resource_manager('ami').resources()
ami_snaps = []
for i in amis:
for dev in i.get('BlockDeviceMappings'):
if 'Ebs' in dev and 'SnapshotId' in dev['Ebs']:
ami_snaps.append(dev['Ebs']['SnapshotId'])
matches = []
for snap in snapshots:
if snap['SnapshotId'] not in ami_snaps:
matches.append(snap)
return matches
[docs]@Snapshot.filter_registry.register('cross-account')
class SnapshotCrossAccountAccess(CrossAccountAccessFilter):
permissions = ('ec2:DescribeSnapshotAttribute',)
[docs] def process(self, resources, event=None):
self.accounts = self.get_accounts()
results = []
client = local_session(self.manager.session_factory).client('ec2')
with self.executor_factory(max_workers=3) as w:
futures = []
for resource_set in chunks(resources, 50):
futures.append(w.submit(
self.process_resource_set, client, resource_set))
for f in as_completed(futures):
if f.exception():
self.log.error(
"Exception checking cross account access \n %s" % (
f.exception()))
continue
results.extend(f.result())
return results
[docs] def process_resource_set(self, client, resource_set):
results = []
for r in resource_set:
attrs = self.manager.retry(
client.describe_snapshot_attribute,
SnapshotId=r['SnapshotId'],
Attribute='createVolumePermission')['CreateVolumePermissions']
shared_accounts = {
g.get('Group') or g.get('UserId') for g in attrs}
delta_accounts = shared_accounts.difference(self.accounts)
if delta_accounts:
r['c7n:CrossAccountViolations'] = list(delta_accounts)
results.append(r)
return results
[docs]@Snapshot.filter_registry.register('unused')
class SnapshotUnusedFilter(Filter):
"""Filters snapshots based on usage
true: snapshot is not used by launch-template, launch-config, or ami.
false: snapshot is being used by launch-template, launch-config, or ami.
:example:
.. code-block:: yaml
policies:
- name: snapshot-unused
resource: ebs-snapshot
filters:
- type: unused
value: true
"""
schema = type_schema('unused', value={'type': 'boolean'})
[docs] def get_permissions(self):
return list(itertools.chain([
self.manager.get_resource_manager(m).get_permissions()
for m in ('asg', 'launch-config', 'ami')]))
def _pull_asg_snapshots(self):
asgs = self.manager.get_resource_manager('asg').resources()
snap_ids = set()
lcfgs = set(a['LaunchConfigurationName'] for a in asgs if 'LaunchConfigurationName' in a)
lcfg_mgr = self.manager.get_resource_manager('launch-config')
if lcfgs:
for lc in lcfg_mgr.resources():
for b in lc.get('BlockDeviceMappings'):
if 'Ebs' in b and 'SnapshotId' in b['Ebs']:
snap_ids.add(b['Ebs']['SnapshotId'])
tmpl_mgr = self.manager.get_resource_manager('launch-template-version')
for tversion in tmpl_mgr.get_resources(
list(tmpl_mgr.get_asg_templates(asgs).keys())):
for bd in tversion['LaunchTemplateData'].get('BlockDeviceMappings', ()):
if 'Ebs' in bd and 'SnapshotId' in bd['Ebs']:
snap_ids.add(bd['Ebs']['SnapshotId'])
return snap_ids
def _pull_ami_snapshots(self):
amis = self.manager.get_resource_manager('ami').resources()
ami_snaps = set()
for i in amis:
for dev in i.get('BlockDeviceMappings'):
if 'Ebs' in dev and 'SnapshotId' in dev['Ebs']:
ami_snaps.add(dev['Ebs']['SnapshotId'])
return ami_snaps
[docs] def process(self, resources, event=None):
snaps = self._pull_asg_snapshots().union(self._pull_ami_snapshots())
if self.data.get('value', True):
return [r for r in resources if r['SnapshotId'] not in snaps]
return [r for r in resources if r['SnapshotId'] in snaps]
[docs]@Snapshot.filter_registry.register('skip-ami-snapshots')
class SnapshotSkipAmiSnapshots(Filter):
"""
Filter to remove snapshots of AMIs from results
This filter is 'true' by default.
:example:
implicit with no parameters, 'true' by default
.. code-block:: yaml
policies:
- name: delete-stale-snapshots
resource: ebs-snapshot
filters:
- type: age
days: 28
op: ge
- skip-ami-snapshots
:example:
explicit with parameter
.. code-block:: yaml
policies:
- name: delete-snapshots
resource: ebs-snapshot
filters:
- type: age
days: 28
op: ge
- type: skip-ami-snapshots
value: false
"""
schema = type_schema('skip-ami-snapshots', value={'type': 'boolean'})
[docs] def get_permissions(self):
return AMI(self.manager.ctx, {}).get_permissions()
[docs] def process(self, snapshots, event=None):
resources = _filter_ami_snapshots(self, snapshots)
return resources
[docs]@Snapshot.action_registry.register('delete')
class SnapshotDelete(BaseAction):
"""Deletes EBS snapshots
:example:
.. code-block:: yaml
policies:
- name: delete-stale-snapshots
resource: ebs-snapshot
filters:
- type: age
days: 28
op: ge
actions:
- delete
"""
schema = type_schema(
'delete', **{'skip-ami-snapshots': {'type': 'boolean'}})
permissions = ('ec2:DeleteSnapshot',)
[docs] def process(self, snapshots):
self.image_snapshots = set()
# Be careful re image snapshots, we do this by default
# to keep things safe by default, albeit we'd get an error
# if we did try to delete something associated to an image.
pre = len(snapshots)
snapshots = list(filter(None, _filter_ami_snapshots(self, snapshots)))
post = len(snapshots)
log.info("Deleting %d snapshots, auto-filtered %d ami-snapshots",
post, pre - post)
client = local_session(self.manager.session_factory).client('ec2')
with self.executor_factory(max_workers=2) as w:
futures = []
for snapshot_set in chunks(reversed(snapshots), size=50):
futures.append(
w.submit(self.process_snapshot_set, client, snapshot_set))
for f in as_completed(futures):
if f.exception():
self.log.error(
"Exception deleting snapshot set \n %s" % (
f.exception()))
return snapshots
[docs] def process_snapshot_set(self, client, snapshots_set):
retry = get_retry((
'RequestLimitExceeded', 'Client.RequestLimitExceeded'))
for s in snapshots_set:
if s['SnapshotId'] in self.image_snapshots:
continue
try:
retry(client.delete_snapshot,
SnapshotId=s['SnapshotId'],
DryRun=self.manager.config.dryrun)
except ClientError as e:
if e.response['Error']['Code'] == "InvalidSnapshot.NotFound":
continue
raise
[docs]@Snapshot.action_registry.register('copy')
class CopySnapshot(BaseAction):
"""Copy a snapshot across regions
https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ebs-copy-snapshot.html
:example:
.. code-block:: yaml
policies:
- name: copy-snapshot-east-west
resource: ebs-snapshot
filters:
- type: age
days: 7
op: le
actions:
- type: copy
target_region: us-west-2
target_key: target_kms_key
encrypted: true
"""
schema = type_schema(
'copy',
target_region={'type': 'string'},
target_key={'type': 'string'},
encrypted={'type': 'boolean'},
)
permissions = (
'ec2:CreateTags', 'ec2:CopySnapshot', 'ec2:DescribeSnapshots')
[docs] def validate(self):
if self.data.get('encrypted', True):
key = self.data.get('target_key')
if not key:
raise PolicyValidationError(
"Encrypted snapshot copy requires kms key on %s" % (
self.manager.data,))
return self
[docs] def process(self, resources):
if self.data['target_region'] == self.manager.config.region:
self.log.info(
"Source and destination region are the same, skipping")
return
with self.executor_factory(max_workers=2) as w:
list(w.map(self.process_resource_set, chunks(resources, 20)))
[docs] def process_resource_set(self, resource_set):
client = self.manager.session_factory(
region=self.data['target_region']).client('ec2')
if self.data['target_region'] != self.manager.config.region:
cross_region = True
params = {}
params['Encrypted'] = self.data.get('encrypted', True)
if params['Encrypted']:
params['KmsKeyId'] = self.data['target_key']
for snapshot_set in chunks(resource_set, 5):
for r in snapshot_set:
snapshot_id = client.copy_snapshot(
SourceRegion=self.manager.config.region,
SourceSnapshotId=r['SnapshotId'],
Description=r.get('Description', ''),
**params)['SnapshotId']
if r.get('Tags'):
client.create_tags(
Resources=[snapshot_id], Tags=r['Tags'])
r['c7n:CopiedSnapshot'] = snapshot_id
if not cross_region or len(snapshot_set) < 5:
continue
copy_ids = [r['c7n:CopiedSnapshot'] for r in snapshot_set]
self.log.debug(
"Waiting on cross-region snapshot copy %s", ",".join(copy_ids))
waiter = client.get_waiter('snapshot_completed')
waiter.config.delay = 60
waiter.config.max_attempts = 60
waiter.wait(SnapshotIds=copy_ids)
self.log.debug(
"Cross region copy complete %s", ",".join(copy_ids))
[docs]@resources.register('ebs')
class EBS(QueryResourceManager):
[docs] class resource_type(object):
service = 'ec2'
type = 'volume'
enum_spec = ('describe_volumes', 'Volumes', None)
name = id = 'VolumeId'
filter_name = 'VolumeIds'
filter_type = 'list'
date = 'createTime'
dimension = 'VolumeId'
metrics_namespace = 'AWS/EBS'
config_type = "AWS::EC2::Volume"
default_report_fields = (
'VolumeId',
'Attachments[0].InstanceId',
'Size',
'VolumeType',
'KmsKeyId'
)
[docs]@EBS.action_registry.register('detach')
class VolumeDetach(BaseAction):
"""
Detach an EBS volume from an Instance.
If 'Force' Param is True, then we'll do a forceful detach
of the Volume. The default value for 'Force' is False.
:example:
.. code-block:: yaml
policies:
- name: instance-ebs-volumes
resource: ebs
filters:
- VolumeId : volumeid
actions:
- detach
"""
schema = type_schema('detach', force={'type': 'boolean'})
permissions = ('ec2:DetachVolume',)
[docs] def process(self, volumes, event=None):
client = local_session(self.manager.session_factory).client('ec2')
for vol in volumes:
for attachment in vol.get('Attachments', []):
client.detach_volume(InstanceId=attachment['InstanceId'],
VolumeId=attachment['VolumeId'],
Force=self.data.get('force', False))
[docs]@EBS.filter_registry.register('instance')
class AttachedInstanceFilter(ValueFilter):
"""Filter volumes based on filtering on their attached instance
:example:
.. code-block:: yaml
policies:
- name: instance-ebs-volumes
resource: ebs
filters:
- instance
"""
schema = type_schema('instance', rinherit=ValueFilter.schema)
[docs] def get_permissions(self):
return self.manager.get_resource_manager('ec2').get_permissions()
[docs] def process(self, resources, event=None):
original_count = len(resources)
resources = [r for r in resources if r.get('Attachments')]
self.log.debug('Filtered from %d volumes to %d attached volumes' % (
original_count, len(resources)))
self.instance_map = self.get_instance_mapping(resources)
return list(filter(self, resources))
def __call__(self, r):
instance = self.instance_map[r['Attachments'][0]['InstanceId']]
if self.match(instance):
r['Instance'] = instance
set_annotation(r, ANNOTATION_KEY, "instance-%s" % self.k)
return True
[docs] def get_instance_mapping(self, resources):
instance_ids = [r['Attachments'][0]['InstanceId'] for r in resources]
instances = self.manager.get_resource_manager(
'ec2').get_resources(instance_ids)
self.log.debug("Queried %d instances for %d volumes" % (
len(instances), len(resources)))
return {i['InstanceId']: i for i in instances}
[docs]@EBS.filter_registry.register('kms-alias')
class KmsKeyAlias(ResourceKmsKeyAlias):
[docs] def process(self, resources, event=None):
return self.get_matching_aliases(resources)
[docs]@EBS.filter_registry.register('fault-tolerant')
class FaultTolerantSnapshots(Filter):
"""
This filter will return any EBS volume that does/does not have a
snapshot within the last 7 days. 'Fault-Tolerance' in this instance
means that, in the event of a failure, the volume can be restored
from a snapshot with (reasonable) data loss
- name: ebs-volume-tolerance
- resource: ebs
- filters: [{
'type': 'fault-tolerant',
'tolerant': True}]
"""
schema = type_schema('fault-tolerant', tolerant={'type': 'boolean'})
check_id = 'H7IgTzjTYb'
permissions = ('support:RefreshTrustedAdvisorCheck',
'support:DescribeTrustedAdvisorCheckResult')
[docs] def pull_check_results(self):
result = set()
client = local_session(self.manager.session_factory).client('support')
client.refresh_trusted_advisor_check(checkId=self.check_id)
results = client.describe_trusted_advisor_check_result(
checkId=self.check_id, language='en')['result']
for r in results['flaggedResources']:
result.update([r['metadata'][1]])
return result
[docs] def process(self, resources, event=None):
flagged = self.pull_check_results()
if self.data.get('tolerant', True):
return [r for r in resources if r['VolumeId'] not in flagged]
return [r for r in resources if r['VolumeId'] in flagged]
[docs]@EBS.filter_registry.register('health-event')
class HealthFilter(HealthEventFilter):
schema = type_schema(
'health-event',
types={'type': 'array', 'items': {
'type': 'string',
'enum': ['AWS_EBS_DEGRADED_EBS_VOLUME_PERFORMANCE',
'AWS_EBS_VOLUME_LOST']}},
statuses={'type': 'array', 'items': {
'type': 'string',
'enum': ['open', 'upcoming', 'closed']
}})
permissions = HealthEventFilter.permissions + (
'config:GetResourceConfigHistory',)
[docs] def process(self, resources, event=None):
if 'AWS_EBS_VOLUME_LOST' not in self.data['types']:
return super(HealthFilter, self).process(resources, event)
if not resources:
return resources
client = local_session(self.manager.session_factory).client(
'health', region_name='us-east-1')
f = self.get_filter_parameters()
resource_map = {}
paginator = client.get_paginator('describe_events')
events = list(itertools.chain(
*[p['events']for p in paginator.paginate(filter=f)]))
entities = self.process_event(client, events)
event_map = {e['arn']: e for e in events}
config = local_session(self.manager.session_factory).client('config')
for e in entities:
rid = e['entityValue']
if not resource_map.get(rid):
resource_map[rid] = self.load_resource(config, rid)
resource_map[rid].setdefault(
'c7n:HealthEvent', []).append(event_map[e['eventArn']])
return list(resource_map.values())
[docs] def load_resource(self, config, rid):
resources_histories = config.get_resource_config_history(
resourceType='AWS::EC2::Volume',
resourceId=rid,
limit=2)['configurationItems']
for r in resources_histories:
if r['configurationItemStatus'] != u'ResourceDeleted':
return camelResource(json.loads(r['configuration']))
return {"VolumeId": rid}
[docs]@EBS.action_registry.register('encrypt-instance-volumes')
class EncryptInstanceVolumes(BaseAction):
"""Encrypt extant volumes attached to an instance
- Requires instance restart
- Not suitable for autoscale groups.
Multistep process:
- Stop instance (if running)
- For each volume
- Create snapshot
- Wait on snapshot creation
- Copy Snapshot to create encrypted snapshot
- Wait on snapshot creation
- Create encrypted volume from snapshot
- Wait on volume creation
- Delete transient snapshots
- Detach Unencrypted Volume
- Attach Encrypted Volume
- Set DeleteOnTermination instance attribute equal to source volume
- For each volume
- Delete unencrypted volume
- Start Instance (if originally running)
- For each newly encrypted volume
- Delete transient tags
:example:
.. code-block:: yaml
policies:
- name: encrypt-unencrypted-ebs
resource: ebs
filters:
- Encrypted: false
actions:
- type: encrypt-instance-volumes
key: alias/encrypted
"""
schema = type_schema(
'encrypt-instance-volumes',
required=['key'],
key={'type': 'string'},
delay={'type': 'number'},
verbose={'type': 'boolean'})
permissions = (
'ec2:CopySnapshot',
'ec2:CreateSnapshot',
'ec2:CreateVolume',
'ec2:DescribeInstances',
'ec2:DescribeSnapshots',
'ec2:DescribeVolumes',
'ec2:StopInstances',
'ec2:StartInstances',
'ec2:ModifyInstanceAttribute',
'ec2:DeleteTags')
[docs] def validate(self):
self.verbose = self.data.get('verbose', False)
return self
[docs] def process(self, volumes):
original_count = len(volumes)
volumes = [v for v in volumes
if not v['Encrypted'] or not v['Attachments']]
log.debug(
"EncryptVolumes filtered from %d to %d "
" unencrypted attached volumes" % (
original_count, len(volumes)))
# Group volumes by instance id
instance_vol_map = {}
for v in volumes:
instance_id = v['Attachments'][0]['InstanceId']
instance_vol_map.setdefault(instance_id, []).append(v)
# Query instances to find current instance state
self.instance_map = {
i['InstanceId']: i for i in
self.manager.get_resource_manager('ec2').get_resources(
list(instance_vol_map.keys()), cache=False)}
client = local_session(self.manager.session_factory).client('ec2')
with self.executor_factory(max_workers=3) as w:
futures = {}
for instance_id, vol_set in instance_vol_map.items():
futures[w.submit(
self.process_volume, client,
instance_id, vol_set)] = instance_id
for f in as_completed(futures):
if f.exception():
instance_id = futures[f]
log.error(
"Exception processing instance:%s volset: %s \n %s" % (
instance_id, instance_vol_map[instance_id],
f.exception()))
[docs] def process_volume(self, client, instance_id, vol_set):
"""Encrypt attached unencrypted ebs volumes
vol_set corresponds to all the unencrypted volumes on a given instance.
"""
key_id = self.get_encryption_key()
if self.verbose:
self.log.debug("Using encryption key: %s" % key_id)
# Only stop and start the instance if it was running.
instance_running = self.stop_instance(client, instance_id)
if instance_running is None:
return
# Create all the volumes before patching the instance.
paired = []
for v in vol_set:
vol_id = self.create_encrypted_volume(client, v, key_id, instance_id)
paired.append((v, vol_id))
# Next detach and reattach
for v, vol_id in paired:
client.detach_volume(
InstanceId=instance_id, VolumeId=v['VolumeId'])
# 5/8/2016 The detach isn't immediately consistent
time.sleep(self.data.get('delay', 15))
client.attach_volume(
InstanceId=instance_id, VolumeId=vol_id,
Device=v['Attachments'][0]['Device'])
# Set DeleteOnTermination attribute the same as source volume
if v['Attachments'][0]['DeleteOnTermination']:
client.modify_instance_attribute(
InstanceId=instance_id,
BlockDeviceMappings=[
{
'DeviceName': v['Attachments'][0]['Device'],
'Ebs': {
'VolumeId': vol_id,
'DeleteOnTermination': True
}
}
]
)
if instance_running:
client.start_instances(InstanceIds=[instance_id])
if self.verbose:
self.log.debug(
"Deleting unencrypted volumes for: %s" % instance_id)
for v in vol_set:
client.delete_volume(VolumeId=v['VolumeId'])
# Clean-up transient tags on newly created encrypted volume.
for v, vol_id in paired:
client.delete_tags(
Resources=[vol_id],
Tags=[
{'Key': 'maid-crypt-remediation'},
{'Key': 'maid-origin-volume'},
{'Key': 'maid-instance-device'}
]
)
[docs] def stop_instance(self, client, instance_id):
instance_state = self.instance_map[instance_id]['State']['Name']
if instance_state in ('shutting-down', 'terminated'):
self.log.debug('Skipping terminating instance: %s' % instance_id)
return
elif instance_state in ('running',):
client.stop_instances(InstanceIds=[instance_id])
self.wait_on_resource(client, instance_id=instance_id)
return True
return False
[docs] def create_encrypted_volume(self, ec2, v, key_id, instance_id):
# Create a current snapshot
results = ec2.create_snapshot(
VolumeId=v['VolumeId'],
Description="maid transient snapshot for encryption",)
transient_snapshots = [results['SnapshotId']]
ec2.create_tags(
Resources=[results['SnapshotId']],
Tags=[
{'Key': 'maid-crypto-remediation', 'Value': 'true'}])
self.wait_on_resource(ec2, snapshot_id=results['SnapshotId'])
# Create encrypted snapshot from current
results = ec2.copy_snapshot(
SourceSnapshotId=results['SnapshotId'],
SourceRegion=v['AvailabilityZone'][:-1],
Description='maid transient snapshot for encryption',
Encrypted=True,
KmsKeyId=key_id)
transient_snapshots.append(results['SnapshotId'])
ec2.create_tags(
Resources=[results['SnapshotId']],
Tags=[
{'Key': 'maid-crypto-remediation', 'Value': 'true'}
])
self.wait_on_resource(ec2, snapshot_id=results['SnapshotId'])
# Create encrypted volume, also tag so we can recover
results = ec2.create_volume(
Size=v['Size'],
VolumeType=v['VolumeType'],
SnapshotId=results['SnapshotId'],
AvailabilityZone=v['AvailabilityZone'],
Encrypted=True)
ec2.create_tags(
Resources=[results['VolumeId']],
Tags=[
{'Key': 'maid-crypt-remediation', 'Value': instance_id},
{'Key': 'maid-origin-volume', 'Value': v['VolumeId']},
{'Key': 'maid-instance-device',
'Value': v['Attachments'][0]['Device']}])
# Wait on encrypted volume creation
self.wait_on_resource(ec2, volume_id=results['VolumeId'])
# Delete transient snapshots
for sid in transient_snapshots:
ec2.delete_snapshot(SnapshotId=sid)
return results['VolumeId']
[docs] def get_encryption_key(self):
kms = local_session(self.manager.session_factory).client('kms')
key_alias = self.data.get('key')
result = kms.describe_key(KeyId=key_alias)
key_id = result['KeyMetadata']['KeyId']
return key_id
[docs] def wait_on_resource(self, *args, **kw):
# Sigh this is dirty, but failure in the middle of our workflow
# due to overly long resource creation is complex to unwind,
# with multi-volume instances. Wait up to three times (actual
# wait time is a per resource type configuration.
# Note we wait for all resource creation before attempting to
# patch an instance, so even on resource creation failure, the
# instance is not modified
try:
return self._wait_on_resource(*args, **kw)
except Exception:
try:
return self._wait_on_resource(*args, **kw)
except Exception:
return self._wait_on_resource(*args, **kw)
def _wait_on_resource(
self, client, snapshot_id=None, volume_id=None, instance_id=None):
# boto client waiters poll every 15 seconds up to a max 600s (5m)
if snapshot_id:
if self.verbose:
self.log.debug(
"Waiting on snapshot completion %s" % snapshot_id)
waiter = client.get_waiter('snapshot_completed')
waiter.wait(SnapshotIds=[snapshot_id])
if self.verbose:
self.log.debug("Snapshot: %s completed" % snapshot_id)
elif volume_id:
if self.verbose:
self.log.debug("Waiting on volume creation %s" % volume_id)
waiter = client.get_waiter('volume_available')
waiter.wait(VolumeIds=[volume_id])
if self.verbose:
self.log.debug("Volume: %s created" % volume_id)
elif instance_id:
if self.verbose:
self.log.debug("Waiting on instance stop")
waiter = client.get_waiter('instance_stopped')
waiter.wait(InstanceIds=[instance_id])
if self.verbose:
self.log.debug("Instance: %s stopped" % instance_id)
[docs]@EBS.action_registry.register('snapshot')
class CreateSnapshot(BaseAction):
"""Snapshot an EBS volume
:example:
.. code-block:: yaml
policies:
- name: snapshot-volumes
resource: ebs
filters:
- Attachments: []
- State: available
actions:
- snapshot
"""
permissions = ('ec2:CreateSnapshot',)
schema = type_schema('snapshot')
[docs] def process(self, volumes):
client = local_session(self.manager.session_factory).client('ec2')
retry = get_retry(['Throttled'], max_attempts=5)
for vol in volumes:
vol_id = vol['VolumeId']
retry(self.process_volume, client=client, volume=vol_id)
[docs] def process_volume(self, client, volume):
try:
client.create_snapshot(VolumeId=volume)
except ClientError as e:
if e.response['Error']['Code'] == 'InvalidVolume.NotFound':
return
raise
[docs]@EBS.action_registry.register('delete')
class Delete(BaseAction):
"""Delete an ebs volume.
If the force boolean is true, we will detach an attached volume
from an instance. Note this cannot be done for running instance
root volumes.
:example:
.. code-block:: yaml
policies:
- name: delete-unattached-volumes
resource: ebs
filters:
- Attachments: []
- State: available
actions:
- delete
"""
schema = type_schema('delete', force={'type': 'boolean'})
permissions = (
'ec2:DetachVolume', 'ec2:DeleteVolume', 'ec2:DescribeVolumes')
[docs] def process(self, volumes):
client = local_session(self.manager.session_factory).client('ec2')
with self.executor_factory(max_workers=3) as w:
futures = {}
for v in volumes:
futures[
w.submit(self.process_volume, client, v)] = v
for f in as_completed(futures):
v = futures[f]
if f.exception():
self.log.error(
"Error processing volume:%s error:%s",
v['VolumeId'], f.exception())
[docs] def process_volume(self, client, volume):
try:
if self.data.get('force') and len(volume['Attachments']):
client.detach_volume(VolumeId=volume['VolumeId'], Force=True)
waiter = client.get_waiter('volume_available')
waiter.wait(VolumeIds=[volume['VolumeId']])
self.manager.retry(
client.delete_volume, VolumeId=volume['VolumeId'])
except ClientError as e:
if e.response['Error']['Code'] == "InvalidVolume.NotFound":
return
raise
[docs]@EBS.filter_registry.register('modifyable')
class ModifyableVolume(Filter):
"""Check if an ebs volume is modifyable online.
Considerations:
https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/limitations.html
Consideration Summary
- only current instance types are supported (one exception m3.medium)
Current Generation Instances (2017-2)
https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/instance-types.html#current-gen-instances
- older magnetic volume types are not supported
- shrinking volumes is not supported
- must wait at least 6hrs between modifications to the same volume.
- volumes must have been attached after nov 1st, 2016.
See `custodian schema ebs.actions.modify` for examples.
"""
schema = type_schema('modifyable')
older_generation = set((
'm1.small', 'm1.medium', 'm1.large', 'm1.xlarge',
'c1.medium', 'c1.xlarge', 'cc2.8xlarge',
'm2.xlarge', 'm2.2xlarge', 'm2.4xlarge', 'cr1.8xlarge',
'hi1.4xlarge', 'hs1.8xlarge', 'cg1.4xlarge', 't1.micro',
# two legs good, not all current gen work either.
'm3.large', 'm3.xlarge', 'm3.2xlarge'
))
permissions = ("ec2:DescribeInstances",)
[docs] def process(self, resources, event=None):
results = []
filtered = []
attached = []
stats = Counter()
marker_date = parse_date('2016-11-01T00:00:00+00:00')
# Filter volumes
for r in resources:
# unsupported type
if r['VolumeType'] == 'standard':
stats['vol-type'] += 1
filtered.append(r['VolumeId'])
continue
# unattached are easy
if not r.get('Attachments'):
results.append(r)
continue
# check for attachment date older then supported date
if r['Attachments'][0]['AttachTime'] < marker_date:
stats['attach-time'] += 1
filtered.append(r['VolumeId'])
continue
attached.append(r)
# Filter volumes attached to unsupported instance types
ec2 = self.manager.get_resource_manager('ec2')
instance_map = {}
for v in attached:
instance_map.setdefault(
v['Attachments'][0]['InstanceId'], []).append(v)
instances = ec2.get_resources(list(instance_map.keys()))
for i in instances:
if i['InstanceType'] in self.older_generation:
stats['instance-type'] += len(instance_map[i['InstanceId']])
filtered.extend([v['VolumeId'] for v in instance_map.pop(i['InstanceId'])])
else:
results.extend(instance_map.pop(i['InstanceId']))
# Filter volumes that are currently under modification
client = local_session(self.manager.session_factory).client('ec2')
modifying = set()
for vol_set in chunks(list(results), 200):
vol_ids = [v['VolumeId'] for v in vol_set]
mutating = client.describe_volumes_modifications(
Filters=[
{'Name': 'volume-id',
'Values': vol_ids},
{'Name': 'modification-state',
'Values': ['modifying', 'optimizing', 'failed']}])
for vm in mutating.get('VolumesModifications', ()):
stats['vol-mutation'] += 1
filtered.append(vm['VolumeId'])
modifying.add(vm['VolumeId'])
self.log.debug(
"filtered %d of %d volumes due to %s",
len(filtered), len(resources), sorted(stats.items()))
return [r for r in results if r['VolumeId'] not in modifying]
[docs]@EBS.action_registry.register('modify')
class ModifyVolume(BaseAction):
"""Modify an ebs volume online.
**Note this action requires use of modifyable filter**
Intro Blog & Use Cases:
https://aws.amazon.com/blogs/aws/amazon-ebs-update-new-elastic-volumes-change-everything/
Docs:
https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ebs-modify-volume.html
Considerations:
https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/limitations.html
:example:
Find under utilized provisioned iops volumes older than a week
and change their type.
.. code-block:: yaml
policies:
- name: ebs-remove-piops
resource: ebs
filters:
- type: value
key: CreateDate
value_type: age
value: 7
op: greater-than
- VolumeType: io1
- type: metrics
name: VolumeConsumedReadWriteOps
statistics: Maximum
value: 100
op: less-than
days: 7
- modifyable
actions:
- type: modify
volume-type: gp2
`iops-percent` and `size-percent` can be used to modify
respectively iops on io1 volumes and volume size.
When converting to io1, `iops-percent` is used to set the iops
allocation for the new volume against the extant value for the old
volume.
:example:
Double storage and quadruple iops for all io1 volumes.
.. code-block:: yaml
policies:
- name: ebs-remove-piops
resource: ebs
filters:
- VolumeType: io1
- modifyable
actions:
- type: modify
size-percent: 200
iops-percent: 400
**Note** resizing down aka shrinking requires OS and FS support
and potentially additional preparation, else data-loss may occur.
To prevent accidents, shrinking must be explicitly enabled by also
setting `shrink: true` on the action.
"""
schema = type_schema(
'modify',
**{'volume-type': {'enum': ['io1', 'gp2', 'st1', 'sc1']},
'shrink': False,
'size-percent': {'type': 'number'},
'iops-percent': {'type': 'number'}})
# assumptions as its the closest i can find.
permissions = ("ec2:ModifyVolumeAttribute",)
[docs] def validate(self):
if 'modifyable' not in self.manager.data.get('filters', ()):
raise PolicyValidationError(
"modify action requires modifyable filter in policy")
if self.data.get('size-percent', 100) < 100 and not self.data.get('shrink', False):
raise PolicyValidationError((
"shrinking volumes requires os/fs support "
"or data-loss may ensue, use `shrink: true` to override"))
return self
[docs] def process(self, resources):
client = local_session(self.manager.session_factory).client('ec2')
for resource_set in chunks(resources, 50):
self.process_resource_set(client, resource_set)
[docs] def process_resource_set(self, client, resource_set):
vtype = self.data.get('volume-type')
psize = self.data.get('size-percent')
piops = self.data.get('iops-percent')
for r in resource_set:
params = {'VolumeId': r['VolumeId']}
if piops and ('io1' in (vtype, r['VolumeType'])):
# default here if we're changing to io1
params['Iops'] = max(int(r.get('Iops', 10) * piops / 100.0), 100)
if psize:
params['Size'] = max(int(r['Size'] * psize / 100.0), 1)
if vtype:
params['VolumeType'] = vtype
self.manager.retry(client.modify_volume, **params)