Source code for c7n.resources.ebs

# Copyright 2015-2017 Capital One Services, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import, division, print_function, unicode_literals

from collections import Counter
import logging
import itertools
import json
import time

from botocore.exceptions import ClientError
from concurrent.futures import as_completed
from dateutil.parser import parse as parse_date
import six

from c7n.actions import BaseAction
from c7n.exceptions import PolicyValidationError
from c7n.filters import (
    CrossAccountAccessFilter, Filter, AgeFilter, ValueFilter,
    ANNOTATION_KEY)
from c7n.filters.health import HealthEventFilter

from c7n.manager import resources
from c7n.resources.kms import ResourceKmsKeyAlias
from c7n.query import QueryResourceManager
from c7n.tags import Tag
from c7n.utils import (
    camelResource,
    chunks,
    get_retry,
    local_session,
    set_annotation,
    type_schema,
    QueryParser,
)
from c7n.resources.ami import AMI

log = logging.getLogger('custodian.ebs')


[docs]@resources.register('ebs-snapshot') class Snapshot(QueryResourceManager):
[docs] class resource_type(object): service = 'ec2' type = 'snapshot' enum_spec = ( 'describe_snapshots', 'Snapshots', None) detail_spec = None id = 'SnapshotId' filter_name = 'SnapshotIds' filter_type = 'list' name = 'SnapshotId' date = 'StartTime' dimension = None default_report_fields = ( 'SnapshotId', 'VolumeId', 'tag:InstanceId', 'VolumeSize', 'StartTime', 'State', )
[docs] def resources(self, query=None): qfilters = SnapshotQueryParser.parse(self.data.get('query', [])) query = query or {} if qfilters: query['Filters'] = qfilters if query.get('OwnerIds') is None: query['OwnerIds'] = ['self'] return super(Snapshot, self).resources(query=query)
[docs] def get_resources(self, ids, cache=True, augment=True): if cache: resources = self._get_cached_resources(ids) if resources is not None: return resources while ids: try: return self.source.get_resources(ids) except ClientError as e: bad_snap = ErrorHandler.extract_bad_snapshot(e) if bad_snap: ids.remove(bad_snap) continue raise return []
[docs]class ErrorHandler(object):
[docs] @staticmethod def remove_snapshot(rid, resource_set): found = None for r in resource_set: if r['SnapshotId'] == rid: found = r break if found: resource_set.remove(found)
[docs] @staticmethod def extract_bad_snapshot(e): """Handle various client side errors when describing snapshots""" msg = e.response['Error']['Message'] error = e.response['Error']['Code'] e_snap_id = None if error == 'InvalidSnapshot.NotFound': e_snap_id = msg[msg.find("'") + 1:msg.rfind("'")] log.warning("Snapshot not found %s" % e_snap_id) elif error == 'InvalidSnapshotID.Malformed': e_snap_id = msg[msg.find('"') + 1:msg.rfind('"')] log.warning("Snapshot id malformed %s" % e_snap_id) return e_snap_id
[docs]class SnapshotQueryParser(QueryParser): QuerySchema = { 'description': six.string_types, 'owner-alias': ('amazon', 'amazon-marketplace', 'microsoft'), 'owner-id': six.string_types, 'progress': six.string_types, 'snapshot-id': six.string_types, 'start-time': six.string_types, 'status': ('pending', 'completed', 'error'), 'tag': six.string_types, 'tag-key': six.string_types, 'volume-id': six.string_types, 'volume-size': six.string_types, } type_name = 'EBS'
[docs]@Snapshot.action_registry.register('tag') class SnapshotTag(Tag): permissions = ('ec2:CreateTags',)
[docs] def process_resource_set(self, client, resource_set, tags): while resource_set: try: return super(SnapshotTag, self).process_resource_set( client, resource_set, tags) except ClientError as e: bad_snap = ErrorHandler.extract_bad_snapshot(e) if bad_snap: ErrorHandler.remove_snapshot(bad_snap, resource_set) continue raise
[docs]@Snapshot.filter_registry.register('age') class SnapshotAge(AgeFilter): """EBS Snapshot Age Filter Filters an EBS snapshot based on the age of the snapshot (in days) :example: .. code-block:: yaml policies: - name: ebs-snapshots-week-old resource: ebs-snapshot filters: - type: age days: 7 op: ge """ schema = type_schema( 'age', days={'type': 'number'}, op={'$ref': '#/definitions/filters_common/comparison_operators'}) date_attribute = 'StartTime'
def _filter_ami_snapshots(self, snapshots): if not self.data.get('value', True): return snapshots # try using cache first to get a listing of all AMI snapshots and compares resources to the list # This will populate the cache. amis = self.manager.get_resource_manager('ami').resources() ami_snaps = [] for i in amis: for dev in i.get('BlockDeviceMappings'): if 'Ebs' in dev and 'SnapshotId' in dev['Ebs']: ami_snaps.append(dev['Ebs']['SnapshotId']) matches = [] for snap in snapshots: if snap['SnapshotId'] not in ami_snaps: matches.append(snap) return matches
[docs]@Snapshot.filter_registry.register('cross-account') class SnapshotCrossAccountAccess(CrossAccountAccessFilter): permissions = ('ec2:DescribeSnapshotAttribute',)
[docs] def process(self, resources, event=None): self.accounts = self.get_accounts() results = [] client = local_session(self.manager.session_factory).client('ec2') with self.executor_factory(max_workers=3) as w: futures = [] for resource_set in chunks(resources, 50): futures.append(w.submit( self.process_resource_set, client, resource_set)) for f in as_completed(futures): if f.exception(): self.log.error( "Exception checking cross account access \n %s" % ( f.exception())) continue results.extend(f.result()) return results
[docs] def process_resource_set(self, client, resource_set): results = [] for r in resource_set: attrs = self.manager.retry( client.describe_snapshot_attribute, SnapshotId=r['SnapshotId'], Attribute='createVolumePermission')['CreateVolumePermissions'] shared_accounts = { g.get('Group') or g.get('UserId') for g in attrs} delta_accounts = shared_accounts.difference(self.accounts) if delta_accounts: r['c7n:CrossAccountViolations'] = list(delta_accounts) results.append(r) return results
[docs]@Snapshot.filter_registry.register('unused') class SnapshotUnusedFilter(Filter): """Filters snapshots based on usage true: snapshot is not used by launch-template, launch-config, or ami. false: snapshot is being used by launch-template, launch-config, or ami. :example: .. code-block:: yaml policies: - name: snapshot-unused resource: ebs-snapshot filters: - type: unused value: true """ schema = type_schema('unused', value={'type': 'boolean'})
[docs] def get_permissions(self): return list(itertools.chain([ self.manager.get_resource_manager(m).get_permissions() for m in ('asg', 'launch-config', 'ami')]))
def _pull_asg_snapshots(self): asgs = self.manager.get_resource_manager('asg').resources() snap_ids = set() lcfgs = set(a['LaunchConfigurationName'] for a in asgs if 'LaunchConfigurationName' in a) lcfg_mgr = self.manager.get_resource_manager('launch-config') if lcfgs: for lc in lcfg_mgr.resources(): for b in lc.get('BlockDeviceMappings'): if 'Ebs' in b and 'SnapshotId' in b['Ebs']: snap_ids.add(b['Ebs']['SnapshotId']) tmpl_mgr = self.manager.get_resource_manager('launch-template-version') for tversion in tmpl_mgr.get_resources( list(tmpl_mgr.get_asg_templates(asgs).keys())): for bd in tversion['LaunchTemplateData'].get('BlockDeviceMappings', ()): if 'Ebs' in bd and 'SnapshotId' in bd['Ebs']: snap_ids.add(bd['Ebs']['SnapshotId']) return snap_ids def _pull_ami_snapshots(self): amis = self.manager.get_resource_manager('ami').resources() ami_snaps = set() for i in amis: for dev in i.get('BlockDeviceMappings'): if 'Ebs' in dev and 'SnapshotId' in dev['Ebs']: ami_snaps.add(dev['Ebs']['SnapshotId']) return ami_snaps
[docs] def process(self, resources, event=None): snaps = self._pull_asg_snapshots().union(self._pull_ami_snapshots()) if self.data.get('value', True): return [r for r in resources if r['SnapshotId'] not in snaps] return [r for r in resources if r['SnapshotId'] in snaps]
[docs]@Snapshot.filter_registry.register('skip-ami-snapshots') class SnapshotSkipAmiSnapshots(Filter): """ Filter to remove snapshots of AMIs from results This filter is 'true' by default. :example: implicit with no parameters, 'true' by default .. code-block:: yaml policies: - name: delete-stale-snapshots resource: ebs-snapshot filters: - type: age days: 28 op: ge - skip-ami-snapshots :example: explicit with parameter .. code-block:: yaml policies: - name: delete-snapshots resource: ebs-snapshot filters: - type: age days: 28 op: ge - type: skip-ami-snapshots value: false """ schema = type_schema('skip-ami-snapshots', value={'type': 'boolean'})
[docs] def get_permissions(self): return AMI(self.manager.ctx, {}).get_permissions()
[docs] def process(self, snapshots, event=None): resources = _filter_ami_snapshots(self, snapshots) return resources
[docs]@Snapshot.action_registry.register('delete') class SnapshotDelete(BaseAction): """Deletes EBS snapshots :example: .. code-block:: yaml policies: - name: delete-stale-snapshots resource: ebs-snapshot filters: - type: age days: 28 op: ge actions: - delete """ schema = type_schema( 'delete', **{'skip-ami-snapshots': {'type': 'boolean'}}) permissions = ('ec2:DeleteSnapshot',)
[docs] def process(self, snapshots): self.image_snapshots = set() # Be careful re image snapshots, we do this by default # to keep things safe by default, albeit we'd get an error # if we did try to delete something associated to an image. pre = len(snapshots) snapshots = list(filter(None, _filter_ami_snapshots(self, snapshots))) post = len(snapshots) log.info("Deleting %d snapshots, auto-filtered %d ami-snapshots", post, pre - post) client = local_session(self.manager.session_factory).client('ec2') with self.executor_factory(max_workers=2) as w: futures = [] for snapshot_set in chunks(reversed(snapshots), size=50): futures.append( w.submit(self.process_snapshot_set, client, snapshot_set)) for f in as_completed(futures): if f.exception(): self.log.error( "Exception deleting snapshot set \n %s" % ( f.exception())) return snapshots
[docs] def process_snapshot_set(self, client, snapshots_set): retry = get_retry(( 'RequestLimitExceeded', 'Client.RequestLimitExceeded')) for s in snapshots_set: if s['SnapshotId'] in self.image_snapshots: continue try: retry(client.delete_snapshot, SnapshotId=s['SnapshotId'], DryRun=self.manager.config.dryrun) except ClientError as e: if e.response['Error']['Code'] == "InvalidSnapshot.NotFound": continue raise
[docs]@Snapshot.action_registry.register('copy') class CopySnapshot(BaseAction): """Copy a snapshot across regions https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ebs-copy-snapshot.html :example: .. code-block:: yaml policies: - name: copy-snapshot-east-west resource: ebs-snapshot filters: - type: age days: 7 op: le actions: - type: copy target_region: us-west-2 target_key: target_kms_key encrypted: true """ schema = type_schema( 'copy', target_region={'type': 'string'}, target_key={'type': 'string'}, encrypted={'type': 'boolean'}, ) permissions = ( 'ec2:CreateTags', 'ec2:CopySnapshot', 'ec2:DescribeSnapshots')
[docs] def validate(self): if self.data.get('encrypted', True): key = self.data.get('target_key') if not key: raise PolicyValidationError( "Encrypted snapshot copy requires kms key on %s" % ( self.manager.data,)) return self
[docs] def process(self, resources): if self.data['target_region'] == self.manager.config.region: self.log.info( "Source and destination region are the same, skipping") return with self.executor_factory(max_workers=2) as w: list(w.map(self.process_resource_set, chunks(resources, 20)))
[docs] def process_resource_set(self, resource_set): client = self.manager.session_factory( region=self.data['target_region']).client('ec2') if self.data['target_region'] != self.manager.config.region: cross_region = True params = {} params['Encrypted'] = self.data.get('encrypted', True) if params['Encrypted']: params['KmsKeyId'] = self.data['target_key'] for snapshot_set in chunks(resource_set, 5): for r in snapshot_set: snapshot_id = client.copy_snapshot( SourceRegion=self.manager.config.region, SourceSnapshotId=r['SnapshotId'], Description=r.get('Description', ''), **params)['SnapshotId'] if r.get('Tags'): client.create_tags( Resources=[snapshot_id], Tags=r['Tags']) r['c7n:CopiedSnapshot'] = snapshot_id if not cross_region or len(snapshot_set) < 5: continue copy_ids = [r['c7n:CopiedSnapshot'] for r in snapshot_set] self.log.debug( "Waiting on cross-region snapshot copy %s", ",".join(copy_ids)) waiter = client.get_waiter('snapshot_completed') waiter.config.delay = 60 waiter.config.max_attempts = 60 waiter.wait(SnapshotIds=copy_ids) self.log.debug( "Cross region copy complete %s", ",".join(copy_ids))
[docs]@resources.register('ebs') class EBS(QueryResourceManager):
[docs] class resource_type(object): service = 'ec2' type = 'volume' enum_spec = ('describe_volumes', 'Volumes', None) name = id = 'VolumeId' filter_name = 'VolumeIds' filter_type = 'list' date = 'createTime' dimension = 'VolumeId' metrics_namespace = 'AWS/EBS' config_type = "AWS::EC2::Volume" default_report_fields = ( 'VolumeId', 'Attachments[0].InstanceId', 'Size', 'VolumeType', 'KmsKeyId' )
[docs]@EBS.action_registry.register('detach') class VolumeDetach(BaseAction): """ Detach an EBS volume from an Instance. If 'Force' Param is True, then we'll do a forceful detach of the Volume. The default value for 'Force' is False. :example: .. code-block:: yaml policies: - name: instance-ebs-volumes resource: ebs filters: - VolumeId : volumeid actions: - detach """ schema = type_schema('detach', force={'type': 'boolean'}) permissions = ('ec2:DetachVolume',)
[docs] def process(self, volumes, event=None): client = local_session(self.manager.session_factory).client('ec2') for vol in volumes: for attachment in vol.get('Attachments', []): client.detach_volume(InstanceId=attachment['InstanceId'], VolumeId=attachment['VolumeId'], Force=self.data.get('force', False))
[docs]@EBS.filter_registry.register('instance') class AttachedInstanceFilter(ValueFilter): """Filter volumes based on filtering on their attached instance :example: .. code-block:: yaml policies: - name: instance-ebs-volumes resource: ebs filters: - instance """ schema = type_schema('instance', rinherit=ValueFilter.schema)
[docs] def get_permissions(self): return self.manager.get_resource_manager('ec2').get_permissions()
[docs] def process(self, resources, event=None): original_count = len(resources) resources = [r for r in resources if r.get('Attachments')] self.log.debug('Filtered from %d volumes to %d attached volumes' % ( original_count, len(resources))) self.instance_map = self.get_instance_mapping(resources) return list(filter(self, resources))
def __call__(self, r): instance = self.instance_map[r['Attachments'][0]['InstanceId']] if self.match(instance): r['Instance'] = instance set_annotation(r, ANNOTATION_KEY, "instance-%s" % self.k) return True
[docs] def get_instance_mapping(self, resources): instance_ids = [r['Attachments'][0]['InstanceId'] for r in resources] instances = self.manager.get_resource_manager( 'ec2').get_resources(instance_ids) self.log.debug("Queried %d instances for %d volumes" % ( len(instances), len(resources))) return {i['InstanceId']: i for i in instances}
[docs]@EBS.filter_registry.register('kms-alias') class KmsKeyAlias(ResourceKmsKeyAlias):
[docs] def process(self, resources, event=None): return self.get_matching_aliases(resources)
[docs]@EBS.filter_registry.register('fault-tolerant') class FaultTolerantSnapshots(Filter): """ This filter will return any EBS volume that does/does not have a snapshot within the last 7 days. 'Fault-Tolerance' in this instance means that, in the event of a failure, the volume can be restored from a snapshot with (reasonable) data loss - name: ebs-volume-tolerance - resource: ebs - filters: [{ 'type': 'fault-tolerant', 'tolerant': True}] """ schema = type_schema('fault-tolerant', tolerant={'type': 'boolean'}) check_id = 'H7IgTzjTYb' permissions = ('support:RefreshTrustedAdvisorCheck', 'support:DescribeTrustedAdvisorCheckResult')
[docs] def pull_check_results(self): result = set() client = local_session(self.manager.session_factory).client('support') client.refresh_trusted_advisor_check(checkId=self.check_id) results = client.describe_trusted_advisor_check_result( checkId=self.check_id, language='en')['result'] for r in results['flaggedResources']: result.update([r['metadata'][1]]) return result
[docs] def process(self, resources, event=None): flagged = self.pull_check_results() if self.data.get('tolerant', True): return [r for r in resources if r['VolumeId'] not in flagged] return [r for r in resources if r['VolumeId'] in flagged]
[docs]@EBS.filter_registry.register('health-event') class HealthFilter(HealthEventFilter): schema = type_schema( 'health-event', types={'type': 'array', 'items': { 'type': 'string', 'enum': ['AWS_EBS_DEGRADED_EBS_VOLUME_PERFORMANCE', 'AWS_EBS_VOLUME_LOST']}}, statuses={'type': 'array', 'items': { 'type': 'string', 'enum': ['open', 'upcoming', 'closed'] }}) permissions = HealthEventFilter.permissions + ( 'config:GetResourceConfigHistory',)
[docs] def process(self, resources, event=None): if 'AWS_EBS_VOLUME_LOST' not in self.data['types']: return super(HealthFilter, self).process(resources, event) if not resources: return resources client = local_session(self.manager.session_factory).client( 'health', region_name='us-east-1') f = self.get_filter_parameters() resource_map = {} paginator = client.get_paginator('describe_events') events = list(itertools.chain( *[p['events']for p in paginator.paginate(filter=f)])) entities = self.process_event(client, events) event_map = {e['arn']: e for e in events} config = local_session(self.manager.session_factory).client('config') for e in entities: rid = e['entityValue'] if not resource_map.get(rid): resource_map[rid] = self.load_resource(config, rid) resource_map[rid].setdefault( 'c7n:HealthEvent', []).append(event_map[e['eventArn']]) return list(resource_map.values())
[docs] def load_resource(self, config, rid): resources_histories = config.get_resource_config_history( resourceType='AWS::EC2::Volume', resourceId=rid, limit=2)['configurationItems'] for r in resources_histories: if r['configurationItemStatus'] != u'ResourceDeleted': return camelResource(json.loads(r['configuration'])) return {"VolumeId": rid}
[docs]@EBS.action_registry.register('copy-instance-tags') class CopyInstanceTags(BaseAction): """Copy instance tags to its attached volume. Useful for cost allocation to ebs volumes and tracking usage info for volumes. Mostly useful for volumes not set to delete on termination, which are otherwise candidates for garbage collection, copying the instance tags gives us more semantic information to determine if their useful, as well letting us know the last time the volume was actually used. :example: .. code-block:: yaml policies: - name: ebs-copy-instance-tags resource: ebs filters: - type: value key: "Attachments[0].Device" value: not-null actions: - type: copy-instance-tags tags: - Name """ schema = type_schema( 'copy-instance-tags', tags={'type': 'array', 'items': {'type': 'string'}})
[docs] def get_permissions(self): perms = self.manager.get_resource_manager('ec2').get_permissions() perms.append('ec2:CreateTags') return perms
[docs] def process(self, volumes): vol_count = len(volumes) volumes = [v for v in volumes if v['Attachments']] if len(volumes) != vol_count: self.log.warning( "ebs copy tags action implicitly filtered from %d to %d", vol_count, len(volumes)) self.initialize(volumes) client = local_session(self.manager.session_factory).client('ec2') with self.executor_factory(max_workers=10) as w: futures = [] for instance_set in chunks(sorted( self.instance_map.keys(), reverse=True), size=100): futures.append( w.submit(self.process_instance_set, client, instance_set)) for f in as_completed(futures): if f.exception(): self.log.error( "Exception copying instance tags \n %s" % ( f.exception()))
[docs] def initialize(self, volumes): instance_vol_map = {} for v in volumes: instance_vol_map.setdefault( v['Attachments'][0]['InstanceId'], []).append(v) instance_map = { i['InstanceId']: i for i in self.manager.get_resource_manager('ec2').get_resources( list(instance_vol_map.keys()))} self.instance_vol_map = instance_vol_map self.instance_map = instance_map
[docs] def process_instance_set(self, client, instance_ids): for i in instance_ids: try: self.process_instance_volumes( client, self.instance_map[i], self.instance_vol_map[i]) except Exception as e: self.log.exception( "Error copy instance:%s tags to volumes: %s \n %s", i, ",".join([v['VolumeId'] for v in self.instance_vol_map[i]]), e)
[docs] def process_instance_volumes(self, client, instance, volumes): for v in volumes: copy_tags = self.get_volume_tags(v, instance, v['Attachments'][0]) if not copy_tags: continue # Can't add more tags than the resource supports could try # to delete extant ones inline, else trim-tags action. if len(copy_tags) > 40: log.warning( "action:%s volume:%s instance:%s too many tags to copy" % ( self.__class__.__name__.lower(), v['VolumeId'], instance['InstanceId'])) continue try: self.manager.retry( client.create_tags, Resources=[v['VolumeId']], Tags=copy_tags, DryRun=self.manager.config.dryrun) except ClientError as e: if e.response['Error']['Code'] == "InvalidVolume.NotFound": continue raise
[docs] def get_volume_tags(self, volume, instance, attachment): only_tags = self.data.get('tags', []) # specify which tags to copy copy_tags = [] extant_tags = dict([ (t['Key'], t['Value']) for t in volume.get('Tags', [])]) for t in instance.get('Tags', ()): if only_tags and not t['Key'] in only_tags: continue if t['Key'] in extant_tags and t['Value'] == extant_tags[t['Key']]: continue if t['Key'].startswith('aws:'): continue copy_tags.append(t) # Don't add attachment tags if we're already current if 'LastAttachInstance' in extant_tags \ and extant_tags['LastAttachInstance'] == attachment['InstanceId']: return copy_tags copy_tags.append( {'Key': 'LastAttachTime', 'Value': attachment['AttachTime'].isoformat()}) copy_tags.append( {'Key': 'LastAttachInstance', 'Value': attachment['InstanceId']}) return copy_tags
[docs]@EBS.action_registry.register('encrypt-instance-volumes') class EncryptInstanceVolumes(BaseAction): """Encrypt extant volumes attached to an instance - Requires instance restart - Not suitable for autoscale groups. Multistep process: - Stop instance (if running) - For each volume - Create snapshot - Wait on snapshot creation - Copy Snapshot to create encrypted snapshot - Wait on snapshot creation - Create encrypted volume from snapshot - Wait on volume creation - Delete transient snapshots - Detach Unencrypted Volume - Attach Encrypted Volume - Set DeleteOnTermination instance attribute equal to source volume - For each volume - Delete unencrypted volume - Start Instance (if originally running) - For each newly encrypted volume - Delete transient tags :example: .. code-block:: yaml policies: - name: encrypt-unencrypted-ebs resource: ebs filters: - Encrypted: false actions: - type: encrypt-instance-volumes key: alias/encrypted """ schema = type_schema( 'encrypt-instance-volumes', required=['key'], key={'type': 'string'}, delay={'type': 'number'}, verbose={'type': 'boolean'}) permissions = ( 'ec2:CopySnapshot', 'ec2:CreateSnapshot', 'ec2:CreateVolume', 'ec2:DescribeInstances', 'ec2:DescribeSnapshots', 'ec2:DescribeVolumes', 'ec2:StopInstances', 'ec2:StartInstances', 'ec2:ModifyInstanceAttribute', 'ec2:DeleteTags')
[docs] def validate(self): self.verbose = self.data.get('verbose', False) return self
[docs] def process(self, volumes): original_count = len(volumes) volumes = [v for v in volumes if not v['Encrypted'] or not v['Attachments']] log.debug( "EncryptVolumes filtered from %d to %d " " unencrypted attached volumes" % ( original_count, len(volumes))) # Group volumes by instance id instance_vol_map = {} for v in volumes: instance_id = v['Attachments'][0]['InstanceId'] instance_vol_map.setdefault(instance_id, []).append(v) # Query instances to find current instance state self.instance_map = { i['InstanceId']: i for i in self.manager.get_resource_manager('ec2').get_resources( list(instance_vol_map.keys()), cache=False)} client = local_session(self.manager.session_factory).client('ec2') with self.executor_factory(max_workers=3) as w: futures = {} for instance_id, vol_set in instance_vol_map.items(): futures[w.submit( self.process_volume, client, instance_id, vol_set)] = instance_id for f in as_completed(futures): if f.exception(): instance_id = futures[f] log.error( "Exception processing instance:%s volset: %s \n %s" % ( instance_id, instance_vol_map[instance_id], f.exception()))
[docs] def process_volume(self, client, instance_id, vol_set): """Encrypt attached unencrypted ebs volumes vol_set corresponds to all the unencrypted volumes on a given instance. """ key_id = self.get_encryption_key() if self.verbose: self.log.debug("Using encryption key: %s" % key_id) # Only stop and start the instance if it was running. instance_running = self.stop_instance(client, instance_id) if instance_running is None: return # Create all the volumes before patching the instance. paired = [] for v in vol_set: vol_id = self.create_encrypted_volume(client, v, key_id, instance_id) paired.append((v, vol_id)) # Next detach and reattach for v, vol_id in paired: client.detach_volume( InstanceId=instance_id, VolumeId=v['VolumeId']) # 5/8/2016 The detach isn't immediately consistent time.sleep(self.data.get('delay', 15)) client.attach_volume( InstanceId=instance_id, VolumeId=vol_id, Device=v['Attachments'][0]['Device']) # Set DeleteOnTermination attribute the same as source volume if v['Attachments'][0]['DeleteOnTermination']: client.modify_instance_attribute( InstanceId=instance_id, BlockDeviceMappings=[ { 'DeviceName': v['Attachments'][0]['Device'], 'Ebs': { 'VolumeId': vol_id, 'DeleteOnTermination': True } } ] ) if instance_running: client.start_instances(InstanceIds=[instance_id]) if self.verbose: self.log.debug( "Deleting unencrypted volumes for: %s" % instance_id) for v in vol_set: client.delete_volume(VolumeId=v['VolumeId']) # Clean-up transient tags on newly created encrypted volume. for v, vol_id in paired: client.delete_tags( Resources=[vol_id], Tags=[ {'Key': 'maid-crypt-remediation'}, {'Key': 'maid-origin-volume'}, {'Key': 'maid-instance-device'} ] )
[docs] def stop_instance(self, client, instance_id): instance_state = self.instance_map[instance_id]['State']['Name'] if instance_state in ('shutting-down', 'terminated'): self.log.debug('Skipping terminating instance: %s' % instance_id) return elif instance_state in ('running',): client.stop_instances(InstanceIds=[instance_id]) self.wait_on_resource(client, instance_id=instance_id) return True return False
[docs] def create_encrypted_volume(self, ec2, v, key_id, instance_id): # Create a current snapshot results = ec2.create_snapshot( VolumeId=v['VolumeId'], Description="maid transient snapshot for encryption",) transient_snapshots = [results['SnapshotId']] ec2.create_tags( Resources=[results['SnapshotId']], Tags=[ {'Key': 'maid-crypto-remediation', 'Value': 'true'}]) self.wait_on_resource(ec2, snapshot_id=results['SnapshotId']) # Create encrypted snapshot from current results = ec2.copy_snapshot( SourceSnapshotId=results['SnapshotId'], SourceRegion=v['AvailabilityZone'][:-1], Description='maid transient snapshot for encryption', Encrypted=True, KmsKeyId=key_id) transient_snapshots.append(results['SnapshotId']) ec2.create_tags( Resources=[results['SnapshotId']], Tags=[ {'Key': 'maid-crypto-remediation', 'Value': 'true'} ]) self.wait_on_resource(ec2, snapshot_id=results['SnapshotId']) # Create encrypted volume, also tag so we can recover results = ec2.create_volume( Size=v['Size'], VolumeType=v['VolumeType'], SnapshotId=results['SnapshotId'], AvailabilityZone=v['AvailabilityZone'], Encrypted=True) ec2.create_tags( Resources=[results['VolumeId']], Tags=[ {'Key': 'maid-crypt-remediation', 'Value': instance_id}, {'Key': 'maid-origin-volume', 'Value': v['VolumeId']}, {'Key': 'maid-instance-device', 'Value': v['Attachments'][0]['Device']}]) # Wait on encrypted volume creation self.wait_on_resource(ec2, volume_id=results['VolumeId']) # Delete transient snapshots for sid in transient_snapshots: ec2.delete_snapshot(SnapshotId=sid) return results['VolumeId']
[docs] def get_encryption_key(self): kms = local_session(self.manager.session_factory).client('kms') key_alias = self.data.get('key') result = kms.describe_key(KeyId=key_alias) key_id = result['KeyMetadata']['KeyId'] return key_id
[docs] def wait_on_resource(self, *args, **kw): # Sigh this is dirty, but failure in the middle of our workflow # due to overly long resource creation is complex to unwind, # with multi-volume instances. Wait up to three times (actual # wait time is a per resource type configuration. # Note we wait for all resource creation before attempting to # patch an instance, so even on resource creation failure, the # instance is not modified try: return self._wait_on_resource(*args, **kw) except Exception: try: return self._wait_on_resource(*args, **kw) except Exception: return self._wait_on_resource(*args, **kw)
def _wait_on_resource( self, client, snapshot_id=None, volume_id=None, instance_id=None): # boto client waiters poll every 15 seconds up to a max 600s (5m) if snapshot_id: if self.verbose: self.log.debug( "Waiting on snapshot completion %s" % snapshot_id) waiter = client.get_waiter('snapshot_completed') waiter.wait(SnapshotIds=[snapshot_id]) if self.verbose: self.log.debug("Snapshot: %s completed" % snapshot_id) elif volume_id: if self.verbose: self.log.debug("Waiting on volume creation %s" % volume_id) waiter = client.get_waiter('volume_available') waiter.wait(VolumeIds=[volume_id]) if self.verbose: self.log.debug("Volume: %s created" % volume_id) elif instance_id: if self.verbose: self.log.debug("Waiting on instance stop") waiter = client.get_waiter('instance_stopped') waiter.wait(InstanceIds=[instance_id]) if self.verbose: self.log.debug("Instance: %s stopped" % instance_id)
[docs]@EBS.action_registry.register('snapshot') class CreateSnapshot(BaseAction): """Snapshot an EBS volume :example: .. code-block:: yaml policies: - name: snapshot-volumes resource: ebs filters: - Attachments: [] - State: available actions: - snapshot """ permissions = ('ec2:CreateSnapshot',) schema = type_schema('snapshot')
[docs] def process(self, volumes): client = local_session(self.manager.session_factory).client('ec2') retry = get_retry(['Throttled'], max_attempts=5) for vol in volumes: vol_id = vol['VolumeId'] retry(self.process_volume, client=client, volume=vol_id)
[docs] def process_volume(self, client, volume): try: client.create_snapshot(VolumeId=volume) except ClientError as e: if e.response['Error']['Code'] == 'InvalidVolume.NotFound': return raise
[docs]@EBS.action_registry.register('delete') class Delete(BaseAction): """Delete an ebs volume. If the force boolean is true, we will detach an attached volume from an instance. Note this cannot be done for running instance root volumes. :example: .. code-block:: yaml policies: - name: delete-unattached-volumes resource: ebs filters: - Attachments: [] - State: available actions: - delete """ schema = type_schema('delete', force={'type': 'boolean'}) permissions = ( 'ec2:DetachVolume', 'ec2:DeleteVolume', 'ec2:DescribeVolumes')
[docs] def process(self, volumes): client = local_session(self.manager.session_factory).client('ec2') with self.executor_factory(max_workers=3) as w: futures = {} for v in volumes: futures[ w.submit(self.process_volume, client, v)] = v for f in as_completed(futures): v = futures[f] if f.exception(): self.log.error( "Error processing volume:%s error:%s", v['VolumeId'], f.exception())
[docs] def process_volume(self, client, volume): try: if self.data.get('force') and len(volume['Attachments']): client.detach_volume(VolumeId=volume['VolumeId'], Force=True) waiter = client.get_waiter('volume_available') waiter.wait(VolumeIds=[volume['VolumeId']]) self.manager.retry( client.delete_volume, VolumeId=volume['VolumeId']) except ClientError as e: if e.response['Error']['Code'] == "InvalidVolume.NotFound": return raise
[docs]@EBS.filter_registry.register('modifyable') class ModifyableVolume(Filter): """Check if an ebs volume is modifyable online. Considerations: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/limitations.html Consideration Summary - only current instance types are supported (one exception m3.medium) Current Generation Instances (2017-2) https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/instance-types.html#current-gen-instances - older magnetic volume types are not supported - shrinking volumes is not supported - must wait at least 6hrs between modifications to the same volume. - volumes must have been attached after nov 1st, 2016. See `custodian schema ebs.actions.modify` for examples. """ schema = type_schema('modifyable') older_generation = set(( 'm1.small', 'm1.medium', 'm1.large', 'm1.xlarge', 'c1.medium', 'c1.xlarge', 'cc2.8xlarge', 'm2.xlarge', 'm2.2xlarge', 'm2.4xlarge', 'cr1.8xlarge', 'hi1.4xlarge', 'hs1.8xlarge', 'cg1.4xlarge', 't1.micro', # two legs good, not all current gen work either. 'm3.large', 'm3.xlarge', 'm3.2xlarge' )) permissions = ("ec2:DescribeInstances",)
[docs] def process(self, resources, event=None): results = [] filtered = [] attached = [] stats = Counter() marker_date = parse_date('2016-11-01T00:00:00+00:00') # Filter volumes for r in resources: # unsupported type if r['VolumeType'] == 'standard': stats['vol-type'] += 1 filtered.append(r['VolumeId']) continue # unattached are easy if not r.get('Attachments'): results.append(r) continue # check for attachment date older then supported date if r['Attachments'][0]['AttachTime'] < marker_date: stats['attach-time'] += 1 filtered.append(r['VolumeId']) continue attached.append(r) # Filter volumes attached to unsupported instance types ec2 = self.manager.get_resource_manager('ec2') instance_map = {} for v in attached: instance_map.setdefault( v['Attachments'][0]['InstanceId'], []).append(v) instances = ec2.get_resources(list(instance_map.keys())) for i in instances: if i['InstanceType'] in self.older_generation: stats['instance-type'] += len(instance_map[i['InstanceId']]) filtered.extend([v['VolumeId'] for v in instance_map.pop(i['InstanceId'])]) else: results.extend(instance_map.pop(i['InstanceId'])) # Filter volumes that are currently under modification client = local_session(self.manager.session_factory).client('ec2') modifying = set() for vol_set in chunks(list(results), 200): vol_ids = [v['VolumeId'] for v in vol_set] mutating = client.describe_volumes_modifications( Filters=[ {'Name': 'volume-id', 'Values': vol_ids}, {'Name': 'modification-state', 'Values': ['modifying', 'optimizing', 'failed']}]) for vm in mutating.get('VolumesModifications', ()): stats['vol-mutation'] += 1 filtered.append(vm['VolumeId']) modifying.add(vm['VolumeId']) self.log.debug( "filtered %d of %d volumes due to %s", len(filtered), len(resources), sorted(stats.items())) return [r for r in results if r['VolumeId'] not in modifying]
[docs]@EBS.action_registry.register('modify') class ModifyVolume(BaseAction): """Modify an ebs volume online. **Note this action requires use of modifyable filter** Intro Blog & Use Cases: https://aws.amazon.com/blogs/aws/amazon-ebs-update-new-elastic-volumes-change-everything/ Docs: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ebs-modify-volume.html Considerations: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/limitations.html :example: Find under utilized provisioned iops volumes older than a week and change their type. .. code-block:: yaml policies: - name: ebs-remove-piops resource: ebs filters: - type: value key: CreateDate value_type: age value: 7 op: greater-than - VolumeType: io1 - type: metrics name: VolumeConsumedReadWriteOps statistics: Maximum value: 100 op: less-than days: 7 - modifyable actions: - type: modify volume-type: gp2 `iops-percent` and `size-percent` can be used to modify respectively iops on io1 volumes and volume size. When converting to io1, `iops-percent` is used to set the iops allocation for the new volume against the extant value for the old volume. :example: Double storage and quadruple iops for all io1 volumes. .. code-block:: yaml policies: - name: ebs-remove-piops resource: ebs filters: - VolumeType: io1 - modifyable actions: - type: modify size-percent: 200 iops-percent: 400 **Note** resizing down aka shrinking requires OS and FS support and potentially additional preparation, else data-loss may occur. To prevent accidents, shrinking must be explicitly enabled by also setting `shrink: true` on the action. """ schema = type_schema( 'modify', **{'volume-type': {'enum': ['io1', 'gp2', 'st1', 'sc1']}, 'shrink': False, 'size-percent': {'type': 'number'}, 'iops-percent': {'type': 'number'}}) # assumptions as its the closest i can find. permissions = ("ec2:ModifyVolumeAttribute",)
[docs] def validate(self): if 'modifyable' not in self.manager.data.get('filters', ()): raise PolicyValidationError( "modify action requires modifyable filter in policy") if self.data.get('size-percent', 100) < 100 and not self.data.get('shrink', False): raise PolicyValidationError(( "shrinking volumes requires os/fs support " "or data-loss may ensue, use `shrink: true` to override")) return self
[docs] def process(self, resources): client = local_session(self.manager.session_factory).client('ec2') for resource_set in chunks(resources, 50): self.process_resource_set(client, resource_set)
[docs] def process_resource_set(self, client, resource_set): vtype = self.data.get('volume-type') psize = self.data.get('size-percent') piops = self.data.get('iops-percent') for r in resource_set: params = {'VolumeId': r['VolumeId']} if piops and ('io1' in (vtype, r['VolumeType'])): # default here if we're changing to io1 params['Iops'] = max(int(r.get('Iops', 10) * piops / 100.0), 100) if psize: params['Size'] = max(int(r['Size'] * psize / 100.0), 1) if vtype: params['VolumeType'] = vtype self.manager.retry(client.modify_volume, **params)