Source code for c7n.resources.emr

# Copyright 2016-2017 Capital One Services, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import, division, print_function, unicode_literals

import logging
import time

import six

from c7n.actions import ActionRegistry, BaseAction
from c7n.exceptions import PolicyValidationError
from c7n.filters import FilterRegistry, MetricsFilter
from c7n.manager import resources
from c7n.query import QueryResourceManager
from c7n.utils import (
    local_session, type_schema, get_retry)
from c7n.tags import (
    TagDelayedAction, RemoveTag, TagActionFilter, Tag)

filters = FilterRegistry('emr.filters')
actions = ActionRegistry('emr.actions')
log = logging.getLogger('custodian.emr')

filters.register('marked-for-op', TagActionFilter)


[docs]@resources.register('emr') class EMRCluster(QueryResourceManager): """Resource manager for Elastic MapReduce clusters """
[docs] class resource_type(object): service = 'emr' type = 'emr' cluster_states = ['WAITING', 'BOOTSTRAPPING', 'RUNNING', 'STARTING'] enum_spec = ('list_clusters', 'Clusters', {'ClusterStates': cluster_states}) name = 'Name' id = 'Id' date = "Status.Timeline.CreationDateTime" filter_name = None dimension = None
action_registry = actions filter_registry = filters retry = staticmethod(get_retry(('ThrottlingException',))) def __init__(self, ctx, data): super(EMRCluster, self).__init__(ctx, data) self.queries = QueryFilter.parse( self.data.get('query', [ {'ClusterStates': [ 'running', 'bootstrapping', 'waiting']}]))
[docs] @classmethod def get_permissions(cls): return ("elasticmapreduce:ListClusters", "elasticmapreduce:DescribeCluster")
[docs] def get_resources(self, ids): # no filtering by id set supported at the api client = local_session(self.session_factory).client('emr') results = [] for jid in ids: results.append( client.describe_cluster(ClusterId=jid)['Cluster']) return results
[docs] def resources(self, query=None): q = self.consolidate_query_filter() if q is not None: query = query or {} for i in range(0, len(q)): query[q[i]['Name']] = q[i]['Values'] return super(EMRCluster, self).resources(query=query)
[docs] def consolidate_query_filter(self): result = [] names = set() # allow same name to be specified multiple times and append the queries # under the same name for q in self.queries: query_filter = q.query() if query_filter['Name'] in names: for filt in result: if query_filter['Name'] == filt['Name']: filt['Values'].extend(query_filter['Values']) else: names.add(query_filter['Name']) result.append(query_filter) if 'ClusterStates' not in names: # include default query result.append( { 'Name': 'ClusterStates', 'Values': ['WAITING', 'RUNNING', 'BOOTSTRAPPING'], } ) return result
[docs] def augment(self, resources): client = local_session( self.get_resource_manager('emr').session_factory).client('emr') result = [] # remap for cwmetrics for r in resources: cluster = self.retry( client.describe_cluster, ClusterId=r['Id'])['Cluster'] result.append(cluster) return result
[docs]@EMRCluster.filter_registry.register('metrics') class EMRMetrics(MetricsFilter):
[docs] def get_dimensions(self, resource): # Job flow id is legacy name for cluster id return [{'Name': 'JobFlowId', 'Value': resource['Id']}]
[docs]@actions.register('mark-for-op') class TagDelayedAction(TagDelayedAction): """Action to specify an action to occur at a later date :example: .. code-block:: yaml policies: - name: emr-mark-for-op resource: emr filters: - "tag:Name": absent actions: - type: mark-for-op tag: custodian_cleanup op: terminate days: 4 msg: "Cluster does not have required tags" """
[docs]@actions.register('tag') class TagTable(Tag): """Action to create tag(s) on a resource :example: .. code-block:: yaml policies: - name: emr-tag-table resource: emr filters: - "tag:target-tag": absent actions: - type: tag key: target-tag value: target-tag-value """ permissions = ('elasticmapreduce:AddTags',) batch_size = 1 retry = staticmethod(get_retry(('ThrottlingException',)))
[docs] def process_resource_set(self, client, resources, tags): for r in resources: self.retry(client.add_tags, ResourceId=r['Id'], Tags=tags)
[docs]@actions.register('remove-tag') class UntagTable(RemoveTag): """Action to remove tag(s) on a resource :example: .. code-block:: yaml policies: - name: emr-remove-tag resource: emr filters: - "tag:target-tag": present actions: - type: remove-tag tags: ["target-tag"] """ concurrency = 2 batch_size = 5 permissions = ('elasticmapreduce:RemoveTags',)
[docs] def process_resource_set(self, client, resources, tag_keys): for r in resources: client.remove_tags(ResourceId=r['Id'], TagKeys=tag_keys)
[docs]@actions.register('terminate') class Terminate(BaseAction): """Action to terminate EMR cluster(s) It is recommended to apply a filter to the terminate action to avoid termination of all EMR clusters :example: .. code-block:: yaml policies: - name: emr-terminate resource: emr query: - ClusterStates: [STARTING, BOOTSTRAPPING, RUNNING, WAITING] actions: - terminate """ schema = type_schema('terminate', force={'type': 'boolean'}) permissions = ("elasticmapreduce:TerminateJobFlows",) delay = 5
[docs] def process(self, emrs): client = local_session(self.manager.session_factory).client('emr') cluster_ids = [emr['Id'] for emr in emrs] if self.data.get('force'): client.set_termination_protection( JobFlowIds=cluster_ids, TerminationProtected=False) time.sleep(self.delay) client.terminate_job_flows(JobFlowIds=cluster_ids) self.log.info("Deleted emrs: %s", cluster_ids) return emrs
# Valid EMR Query Filters EMR_VALID_FILTERS = set(('CreatedAfter', 'CreatedBefore', 'ClusterStates'))
[docs]class QueryFilter(object):
[docs] @classmethod def parse(cls, data): results = [] for d in data: if not isinstance(d, dict): raise PolicyValidationError( "EMR Query Filter Invalid structure %s" % d) results.append(cls(d).validate()) return results
def __init__(self, data): self.data = data self.key = None self.value = None
[docs] def validate(self): if not len(list(self.data.keys())) == 1: raise PolicyValidationError( "EMR Query Filter Invalid %s" % self.data) self.key = list(self.data.keys())[0] self.value = list(self.data.values())[0] if self.key not in EMR_VALID_FILTERS and not self.key.startswith( 'tag:'): raise PolicyValidationError( "EMR Query Filter invalid filter name %s" % (self.data)) if self.value is None: raise PolicyValidationError( "EMR Query Filters must have a value, use tag-key" " w/ tag name as value for tag present checks" " %s" % self.data) return self
[docs] def query(self): value = self.value if isinstance(self.value, six.string_types): value = [self.value] return {'Name': self.key, 'Values': value}