# Copyright 2016-2017 Capital One Services, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Generic EC2 Resource Tag / Filters and actions
These work for the whole family of resources associated
to ec2 (subnets, vpc, security-groups, volumes, instances,
snapshots).
"""
from __future__ import absolute_import, division, print_function, unicode_literals
from collections import Counter
from concurrent.futures import as_completed
from datetime import datetime, timedelta
from dateutil import tz as tzutil
from dateutil.parser import parse
import itertools
import jmespath
import time
from c7n.manager import resources as aws_resources
from c7n.actions import BaseAction as Action, AutoTagUser
from c7n.exceptions import PolicyValidationError, PolicyExecutionError
from c7n.filters import Filter, OPERATORS
from c7n.filters.offhours import Time
from c7n import utils
DEFAULT_TAG = "maid_status"
[docs]def universal_augment(self, resources):
# Resource Tagging API Support
# https://docs.aws.amazon.com/awsconsolehelpdocs/latest/gsg/supported-resources.html
# Bail on empty set
if not resources:
return resources
# For global resources, tags don't populate in the get_resources call
# unless the call is being made to us-east-1
region = getattr(self.resource_type, 'global_resource', None) and 'us-east-1' or self.region
client = utils.local_session(
self.session_factory).client('resourcegroupstaggingapi', region_name=region)
# Lazy for non circular :-(
from c7n.query import RetryPageIterator
paginator = client.get_paginator('get_resources')
paginator.PAGE_ITERATOR_CLS = RetryPageIterator
resource_type = getattr(self.get_model(), 'resource_type', None)
if not resource_type:
resource_type = self.get_model().service
if self.get_model().type:
resource_type += ":" + self.get_model().type
resource_tag_map_list = list(itertools.chain(
*[p['ResourceTagMappingList'] for p in paginator.paginate(
ResourceTypeFilters=[resource_type])]))
resource_tag_map = {
r['ResourceARN']: r['Tags'] for r in resource_tag_map_list}
for arn, r in zip(self.get_arns(resources), resources):
if 'Tags' in r:
continue
r['Tags'] = resource_tag_map.get(arn, [])
return resources
def _common_tag_processer(executor_factory, batch_size, concurrency, client,
process_resource_set, id_key, resources, tags,
log):
error = None
with executor_factory(max_workers=concurrency) as w:
futures = []
for resource_set in utils.chunks(resources, size=batch_size):
futures.append(
w.submit(process_resource_set, client, resource_set, tags))
for f in as_completed(futures):
if f.exception():
error = f.exception()
log.error(
"Exception with tags: %s %s", tags, f.exception())
if error:
raise error
[docs]class TagTrim(Action):
"""Automatically remove tags from an ec2 resource.
EC2 Resources have a limit of 50 tags, in order to make
additional tags space on a set of resources, this action can
be used to remove enough tags to make the desired amount of
space while preserving a given set of tags.
.. code-block :: yaml
policies:
- name: ec2-tag-trim
comment: |
Any instances with 48 or more tags get tags removed until
they match the target tag count, in this case 47 so we
that we free up a tag slot for another usage.
resource: ec2
filters:
# Filter down to resources which already have 8 tags
# as we need space for 3 more, this also ensures that
# metrics reporting is correct for the policy.
- type: value
key: "length(Tags)"
op: ge
value: 48
actions:
- type: tag-trim
space: 3
preserve:
- OwnerContact
- ASV
- CMDBEnvironment
- downtime
- custodian_status
"""
max_tag_count = 50
schema = utils.type_schema(
'tag-trim',
space={'type': 'integer'},
preserve={'type': 'array', 'items': {'type': 'string'}})
schema_alias = True
permissions = ('ec2:DeleteTags',)
[docs] def process(self, resources):
self.id_key = self.manager.get_model().id
self.preserve = set(self.data.get('preserve'))
self.space = self.data.get('space', 3)
client = utils.local_session(
self.manager.session_factory).client(self.manager.resource_type.service)
futures = {}
mid = self.manager.get_model().id
with self.executor_factory(max_workers=2) as w:
for r in resources:
futures[w.submit(self.process_resource, client, r)] = r
for f in as_completed(futures):
if f.exception():
self.log.warning(
"Error processing tag-trim on resource:%s",
futures[f][mid])
[docs] def process_resource(self, client, i):
# Can't really go in batch parallel without some heuristics
# without some more complex matching wrt to grouping resources
# by common tags populations.
tag_map = {
t['Key']: t['Value'] for t in i.get('Tags', [])
if not t['Key'].startswith('aws:')}
# Space == 0 means remove all but specified
if self.space and len(tag_map) + self.space <= self.max_tag_count:
return
keys = set(tag_map)
preserve = self.preserve.intersection(keys)
candidates = keys - self.preserve
if self.space:
# Free up slots to fit
remove = len(candidates) - (
self.max_tag_count - (self.space + len(preserve)))
candidates = list(sorted(candidates))[:remove]
if not candidates:
self.log.warning(
"Could not find any candidates to trim %s" % i[self.id_key])
return
self.process_tag_removal(i, candidates)
[docs] def process_tag_removal(self, client, resource, tags):
self.manager.retry(
client.delete_tags,
Tags=[{'Key': c} for c in tags],
Resources=[resource[self.id_key]],
DryRun=self.manager.config.dryrun)
[docs]class TagActionFilter(Filter):
"""Filter resources for tag specified future action
Filters resources by a 'custodian_status' tag which specifies a future
date for an action.
The filter parses the tag values looking for an 'op@date'
string. The date is parsed and compared to do today's date, the
filter succeeds if today's date is gte to the target date.
The optional 'skew' parameter provides for incrementing today's
date a number of days into the future. An example use case might
be sending a final notice email a few days before terminating an
instance, or snapshotting a volume prior to deletion.
The optional 'skew_hours' parameter provides for incrementing the current
time a number of hours into the future.
Optionally, the 'tz' parameter can get used to specify the timezone
in which to interpret the clock (default value is 'utc')
.. code-block :: yaml
policies:
- name: ec2-stop-marked
resource: ec2
filters:
- type: marked-for-op
# The default tag used is custodian_status
# but that is configurable
tag: custodian_status
op: stop
# Another optional tag is skew
tz: utc
actions:
- type: stop
"""
schema = utils.type_schema(
'marked-for-op',
tag={'type': 'string'},
tz={'type': 'string'},
skew={'type': 'number', 'minimum': 0},
skew_hours={'type': 'number', 'minimum': 0},
op={'type': 'string'})
schema_alias = True
current_date = None
[docs] def validate(self):
op = self.data.get('op')
if self.manager and op not in self.manager.action_registry.keys():
raise PolicyValidationError(
"Invalid marked-for-op op:%s in %s" % (op, self.manager.data))
tz = tzutil.gettz(Time.TZ_ALIASES.get(self.data.get('tz', 'utc')))
if not tz:
raise PolicyValidationError(
"Invalid timezone specified '%s' in %s" % (
self.data.get('tz'), self.manager.data))
return self
def __call__(self, i):
tag = self.data.get('tag', DEFAULT_TAG)
op = self.data.get('op', 'stop')
skew = self.data.get('skew', 0)
skew_hours = self.data.get('skew_hours', 0)
tz = tzutil.gettz(Time.TZ_ALIASES.get(self.data.get('tz', 'utc')))
v = None
for n in i.get('Tags', ()):
if n['Key'] == tag:
v = n['Value']
break
if v is None:
return False
if ':' not in v or '@' not in v:
return False
msg, tgt = v.rsplit(':', 1)
action, action_date_str = tgt.strip().split('@', 1)
if action != op:
return False
try:
action_date = parse(action_date_str)
except Exception:
self.log.warning("could not parse tag:%s value:%s on %s" % (
tag, v, i['InstanceId']))
if self.current_date is None:
self.current_date = datetime.now()
if action_date.tzinfo:
# if action_date is timezone aware, set to timezone provided
action_date = action_date.astimezone(tz)
self.current_date = datetime.now(tz=tz)
return self.current_date >= (
action_date - timedelta(days=skew, hours=skew_hours))
[docs]class TagCountFilter(Filter):
"""Simplify tag counting..
ie. these two blocks are equivalent
.. code-block :: yaml
- filters:
- type: value
op: gte
count: 8
- filters:
- type: tag-count
count: 8
"""
schema = utils.type_schema(
'tag-count',
count={'type': 'integer', 'minimum': 0},
op={'enum': list(OPERATORS.keys())})
schema_alias = True
def __call__(self, i):
count = self.data.get('count', 10)
op_name = self.data.get('op', 'gte')
op = OPERATORS.get(op_name)
tag_count = len([
t['Key'] for t in i.get('Tags', [])
if not t['Key'].startswith('aws:')])
return op(tag_count, count)
[docs]class Tag(Action):
"""Tag an ec2 resource.
"""
batch_size = 25
concurrency = 2
schema = utils.type_schema(
'tag', aliases=('mark',),
tags={'type': 'object'},
key={'type': 'string'},
value={'type': 'string'},
tag={'type': 'string'},
)
schema_alias = True
permissions = ('ec2:CreateTags',)
id_key = None
[docs] def validate(self):
if self.data.get('key') and self.data.get('tag'):
raise PolicyValidationError(
"Can't specify both key and tag, choose one in %s" % (
self.manager.data,))
return self
[docs] def process(self, resources):
# Legacy
msg = self.data.get('msg')
msg = self.data.get('value') or msg
tag = self.data.get('tag', DEFAULT_TAG)
tag = self.data.get('key') or tag
# Support setting multiple tags in a single go with a mapping
tags = self.data.get('tags')
if tags is None:
tags = []
else:
tags = [{'Key': k, 'Value': v} for k, v in tags.items()]
if msg:
tags.append({'Key': tag, 'Value': msg})
self.interpolate_values(tags)
batch_size = self.data.get('batch_size', self.batch_size)
client = self.get_client()
_common_tag_processer(
self.executor_factory, batch_size, self.concurrency, client,
self.process_resource_set, self.id_key, resources, tags, self.log)
[docs] def process_resource_set(self, client, resource_set, tags):
mid = self.manager.get_model().id
self.manager.retry(
client.create_tags,
Resources=[v[mid] for v in resource_set],
Tags=tags,
DryRun=self.manager.config.dryrun)
[docs] def interpolate_values(self, tags):
params = {
'account_id': self.manager.config.account_id,
'now': utils.FormatDate.utcnow(),
'region': self.manager.config.region}
for t in tags:
t['Value'] = t['Value'].format(**params)
[docs] def get_client(self):
return utils.local_session(self.manager.session_factory).client(
self.manager.resource_type.service)
[docs]class RemoveTag(Action):
"""Remove tags from ec2 resources.
"""
batch_size = 100
concurrency = 2
schema = utils.type_schema(
'untag', aliases=('unmark', 'remove-tag'),
tags={'type': 'array', 'items': {'type': 'string'}})
schema_alias = True
permissions = ('ec2:DeleteTags',)
[docs] def process(self, resources):
self.id_key = self.manager.get_model().id
tags = self.data.get('tags', [DEFAULT_TAG])
batch_size = self.data.get('batch_size', self.batch_size)
client = self.get_client()
_common_tag_processer(
self.executor_factory, batch_size, self.concurrency, client,
self.process_resource_set, self.id_key, resources, tags, self.log)
[docs] def process_resource_set(self, client, resource_set, tag_keys):
return self.manager.retry(
client.delete_tags,
Resources=[v[self.id_key] for v in resource_set],
Tags=[{'Key': k} for k in tag_keys],
DryRun=self.manager.config.dryrun)
[docs] def get_client(self):
return utils.local_session(self.manager.session_factory).client(
self.manager.resource_type.service)
[docs]class TagDelayedAction(Action):
"""Tag resources for future action.
The optional 'tz' parameter can be used to adjust the clock to align
with a given timezone. The default value is 'utc'.
If neither 'days' nor 'hours' is specified, Cloud Custodian will default
to marking the resource for action 4 days in the future.
.. code-block :: yaml
policies:
- name: ec2-mark-for-stop-in-future
resource: ec2
filters:
- type: value
key: Name
value: instance-to-stop-in-four-days
actions:
- type: mark-for-op
op: stop
"""
schema = utils.type_schema(
'mark-for-op',
tag={'type': 'string'},
msg={'type': 'string'},
days={'type': 'integer', 'minimum': 0, 'exclusiveMinimum': False},
hours={'type': 'integer', 'minimum': 0, 'exclusiveMinimum': False},
tz={'type': 'string'},
op={'type': 'string'})
schema_alias = True
batch_size = 200
concurrency = 2
default_template = 'Resource does not meet policy: {op}@{action_date}'
[docs] def get_permissions(self):
return self.manager.action_registry['tag'].permissions
[docs] def validate(self):
op = self.data.get('op')
if self.manager and op not in self.manager.action_registry.keys():
raise PolicyValidationError(
"mark-for-op specifies invalid op:%s in %s" % (
op, self.manager.data))
self.tz = tzutil.gettz(
Time.TZ_ALIASES.get(self.data.get('tz', 'utc')))
if not self.tz:
raise PolicyValidationError(
"Invalid timezone specified %s in %s" % (
self.tz, self.manager.data))
return self
[docs] def generate_timestamp(self, days, hours):
n = datetime.now(tz=self.tz)
if days is None or hours is None:
# maintains default value of days being 4 if nothing is provided
days = 4
action_date = (n + timedelta(days=days, hours=hours))
if hours > 0:
action_date_string = action_date.strftime('%Y/%m/%d %H%M %Z')
else:
action_date_string = action_date.strftime('%Y/%m/%d')
return action_date_string
[docs] def process(self, resources):
self.tz = tzutil.gettz(
Time.TZ_ALIASES.get(self.data.get('tz', 'utc')))
self.id_key = self.manager.get_model().id
# Move this to policy? / no resources bypasses actions?
if not len(resources):
return
msg_tmpl = self.data.get('msg', self.default_template)
op = self.data.get('op', 'stop')
tag = self.data.get('tag', DEFAULT_TAG)
days = self.data.get('days', 0)
hours = self.data.get('hours', 0)
action_date = self.generate_timestamp(days, hours)
msg = msg_tmpl.format(
op=op, action_date=action_date)
self.log.info("Tagging %d resources for %s on %s" % (
len(resources), op, action_date))
tags = [{'Key': tag, 'Value': msg}]
# if the tag implementation has a specified batch size, it's typically
# due to some restraint on the api so we defer to that.
batch_size = getattr(
self.manager.action_registry.get('tag'), 'batch_size', self.batch_size)
client = self.get_client()
_common_tag_processer(
self.executor_factory, batch_size, self.concurrency, client,
self.process_resource_set, self.id_key, resources, tags, self.log)
[docs] def process_resource_set(self, client, resource_set, tags):
tagger = self.manager.action_registry['tag']({}, self.manager)
tagger.process_resource_set(client, resource_set, tags)
[docs] def get_client(self):
return utils.local_session(
self.manager.session_factory).client(
self.manager.resource_type.service)
[docs]class NormalizeTag(Action):
"""Transform the value of a tag.
Set the tag value to uppercase, title, lowercase, or strip text
from a tag key.
.. code-block :: yaml
policies:
- name: ec2-service-transform-lower
resource: ec2
comment: |
ec2-service-tag-value-to-lower
query:
- instance-state-name: running
filters:
- "tag:testing8882": present
actions:
- type: normalize-tag
key: lower_key
action: lower
- name: ec2-service-strip
resource: ec2
comment: |
ec2-service-tag-strip-blah
query:
- instance-state-name: running
filters:
- "tag:testing8882": present
actions:
- type: normalize-tag
key: strip_key
action: strip
value: blah
"""
schema_alias = True
schema = utils.type_schema(
'normalize-tag',
key={'type': 'string'},
action={'type': 'string',
'items': {
'enum': ['upper', 'lower', 'title' 'strip', 'replace']}},
value={'type': 'string'})
permissions = ('ec2:CreateTags',)
[docs] def create_tag(self, client, ids, key, value):
self.manager.retry(
client.create_tags,
Resources=ids,
Tags=[{'Key': key, 'Value': value}])
[docs] def create_set(self, instances):
key = self.data.get('key', None)
resource_set = {}
for r in instances:
tags = {t['Key']: t['Value'] for t in r.get('Tags', [])}
if tags[key] not in resource_set:
resource_set[tags[key]] = []
resource_set[tags[key]].append(r)
return resource_set
[docs] def filter_resources(self, resources):
key = self.data.get('key', None)
res = 0
for r in resources:
tags = {t['Key']: t['Value'] for t in r.get('Tags', [])}
if key not in tags.keys():
resources.pop(res)
res += 1
return resources
[docs] def process(self, resources):
count = len(resources)
resources = self.filter_resources(resources)
self.log.info(
"Filtered from %s resources to %s" % (count, len(resources)))
self.id_key = self.manager.get_model().id
resource_set = self.create_set(resources)
with self.executor_factory(max_workers=3) as w:
futures = []
for r in resource_set:
action = self.data.get('action')
value = self.data.get('value')
new_value = False
if action == 'lower' and not r.islower():
new_value = r.lower()
elif action == 'upper' and not r.isupper():
new_value = r.upper()
elif action == 'title' and not r.istitle():
new_value = r.title()
elif action == 'strip' and value and value in r:
new_value = r.strip(value)
if new_value:
futures.append(
w.submit(self.process_transform, new_value, resource_set[r]))
for f in as_completed(futures):
if f.exception():
self.log.error(
"Exception renaming tag set \n %s" % (
f.exception()))
return resources
[docs]class UniversalTag(Tag):
"""Applies one or more tags to the specified resources.
"""
batch_size = 20
concurrency = 1
permissions = ('resourcegroupstaggingapi:TagResources',)
[docs] def process(self, resources):
self.id_key = self.manager.get_model().id
# Legacy
msg = self.data.get('msg')
msg = self.data.get('value') or msg
tag = self.data.get('tag', DEFAULT_TAG)
tag = self.data.get('key') or tag
# Support setting multiple tags in a single go with a mapping
tags = self.data.get('tags', {})
if msg:
tags[tag] = msg
batch_size = self.data.get('batch_size', self.batch_size)
client = self.get_client()
_common_tag_processer(
self.executor_factory, batch_size, self.concurrency, client,
self.process_resource_set, self.id_key, resources, tags, self.log)
[docs] def process_resource_set(self, client, resource_set, tags):
arns = self.manager.get_arns(resource_set)
return universal_retry(
client.tag_resources, ResourceARNList=arns, Tags=tags)
[docs] def get_client(self):
return utils.local_session(
self.manager.session_factory).client('resourcegroupstaggingapi')
[docs]class UniversalUntag(RemoveTag):
"""Removes the specified tags from the specified resources.
"""
batch_size = 20
concurrency = 1
permissions = ('resourcegroupstaggingapi:UntagResources',)
[docs] def get_client(self):
return utils.local_session(
self.manager.session_factory).client('resourcegroupstaggingapi')
[docs] def process_resource_set(self, client, resource_set, tag_keys):
arns = self.manager.get_arns(resource_set)
return universal_retry(
client.untag_resources, ResourceARNList=arns, TagKeys=tag_keys)
[docs]class UniversalTagDelayedAction(TagDelayedAction):
"""Tag resources for future action.
:example:
.. code-block :: yaml
policies:
- name: ec2-mark-stop
resource: ec2
filters:
- type: image-age
op: ge
days: 90
actions:
- type: mark-for-op
tag: custodian_cleanup
op: terminate
days: 4
"""
batch_size = 20
concurrency = 2
permissions = ('resourcegroupstaggingapi:TagResources',)
[docs] def process(self, resources):
self.tz = tzutil.gettz(
Time.TZ_ALIASES.get(self.data.get('tz', 'utc')))
self.id_key = self.manager.get_model().id
# Move this to policy? / no resources bypasses actions?
if not len(resources):
return
msg_tmpl = self.data.get('msg', self.default_template)
op = self.data.get('op', 'stop')
tag = self.data.get('tag', DEFAULT_TAG)
days = self.data.get('days', 0)
hours = self.data.get('hours', 0)
action_date = self.generate_timestamp(days, hours)
msg = msg_tmpl.format(
op=op, action_date=action_date)
self.log.info("Tagging %d resources for %s on %s" % (
len(resources), op, action_date))
tags = {tag: msg}
batch_size = self.data.get('batch_size', self.batch_size)
client = self.get_client()
_common_tag_processer(
self.executor_factory, batch_size, self.concurrency, client,
self.process_resource_set, self.id_key, resources, tags, self.log)
[docs] def process_resource_set(self, client, resource_set, tags):
arns = self.manager.get_arns(resource_set)
return universal_retry(
client.tag_resources, ResourceARNList=arns, Tags=tags)
[docs] def get_client(self):
return utils.local_session(
self.manager.session_factory).client('resourcegroupstaggingapi')
aws_resources.subscribe(
aws_resources.EVENT_REGISTER, CopyRelatedResourceTag.register_resources)
[docs]def universal_retry(method, ResourceARNList, **kw):
"""Retry support for resourcegroup tagging apis.
The resource group tagging api typically returns a 200 status code
with embedded resource specific errors. To enable resource specific
retry on throttles, we extract those, perform backoff w/ jitter and
continue. Other errors are immediately raised.
We do not aggregate unified resource responses across retries, only the
last successful response is returned for a subset of the resources if
a retry is performed.
"""
max_attempts = 6
for idx, delay in enumerate(
utils.backoff_delays(1.5, 2 ** 8, jitter=True)):
response = method(ResourceARNList=ResourceARNList, **kw)
failures = response.get('FailedResourcesMap', {})
if not failures:
return response
errors = {}
throttles = set()
for f_arn in failures:
error_code = failures[f_arn]['ErrorCode']
if error_code == 'ThrottlingException':
throttles.add(f_arn)
elif error_code == 'ResourceNotFoundException':
continue
else:
errors[f_arn] = error_code
if errors:
raise Exception("Resource Tag Errors %s" % (errors))
if idx == max_attempts - 1:
raise Exception("Resource Tag Throttled %s" % (", ".join(throttles)))
time.sleep(delay)
ResourceARNList = list(throttles)