Source code for c7n.resources.datapipeline

# Copyright 2017 Capital One Services, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Data Pipeline
"""
from __future__ import absolute_import, division, print_function, unicode_literals

from botocore.exceptions import ClientError

from c7n.actions import BaseAction
from c7n.filters import FilterRegistry
from c7n.manager import resources
from c7n.query import QueryResourceManager
from c7n.utils import chunks, local_session, get_retry, type_schema
from c7n.tags import RemoveTag, Tag, TagActionFilter, TagDelayedAction


filters = FilterRegistry('datapipeline.filters')
filters.register('marked-for-op', TagActionFilter)


[docs]@resources.register('datapipeline') class DataPipeline(QueryResourceManager): retry = staticmethod(get_retry(('Throttled',))) filter_registry = filters
[docs] class resource_type(object): service = 'datapipeline' type = 'dataPipeline' id = 'id' name = 'name' date = None dimension = 'name' batch_detail_spec = ( 'describe_pipeline', 'pipelineIds', 'id', 'pipelineDescriptionList', None) enum_spec = ('list_pipelines', 'pipelineIdList', None) filter_name = None
[docs] def augment(self, resources): filter(None, _datapipeline_info( resources, self.session_factory, self.executor_factory, self.retry)) return resources
def _datapipeline_info(pipes, session_factory, executor_factory, retry): client = local_session(session_factory).client('datapipeline') def process_tags(pipe_set): pipe_map = {pipe['id']: pipe for pipe in pipe_set} while True: try: results = retry( client.describe_pipelines, pipelineIds=list(pipe_map.keys())) break except ClientError as e: if e.response['Error']['Code'] != 'PipelineNotFound': raise msg = e.response['Error']['Message'] _, lb_name = msg.strip().rsplit(' ', 1) pipe_map.pop(lb_name) if not pipe_map: results = {'TagDescriptions': []} break continue for pipe_desc in results['pipelineDescriptionList']: pipe = pipe_map[pipe_desc['pipelineId']] pipe['Tags'] = [ {'Key': t['key'], 'Value': t['value']} for t in pipe_desc['tags']] for field in pipe_desc['fields']: key = field['key'] if not key.startswith('@'): continue pipe[key[1:]] = field['stringValue'] with executor_factory(max_workers=2) as w: return list(w.map(process_tags, chunks(pipes, 20)))
[docs]@DataPipeline.action_registry.register('delete') class Delete(BaseAction): """Action to delete DataPipeline It is recommended to use a filter to avoid unwanted deletion of DataPipeline :example: .. code-block:: yaml policies: - name: datapipeline-delete resource: datapipeline actions: - delete """ schema = type_schema('delete') permissions = ("datapipeline:DeletePipeline",)
[docs] def process(self, pipelines): client = local_session( self.manager.session_factory).client('datapipeline') for p in pipelines: try: client.delete_pipeline(pipelineId=p['id']) except client.exceptions.PipelineNotFoundException: continue
[docs]@DataPipeline.action_registry.register('mark-for-op') class MarkForOpPipeline(TagDelayedAction): """Action to specify an action to occur at a later date :example: .. code-block:: yaml policies: - name: pipeline-delete-unused resource: datapipeline filters: - "tag:custodian_cleanup": absent actions: - type: mark-for-op tag: custodian_cleanup msg: "Unused data pipeline: {op}@{action_date}" op: delete days: 7 """
[docs]@DataPipeline.action_registry.register('tag') class TagPipeline(Tag): """Action to create tag(s) on a pipeline :example: .. code-block:: yaml policies: - name: tag-pipeline resource: datapipeline filters: - "tag:target-tag": absent actions: - type: tag key: target-tag value: target-tag-value """ permissions = ('datapipeline:AddTags',)
[docs] def process_resource_set(self, client, pipelines, tags): tag_array = [dict(key=t['Key'], value=t['Value']) for t in tags] for pipeline in pipelines: try: client.add_tags(pipelineId=pipeline['id'], tags=tag_array) except (client.exceptions.PipelineDeletedException, client.exceptions.PipelineNotFoundException): continue
[docs]@DataPipeline.action_registry.register('remove-tag') class UntagPipeline(RemoveTag): """Action to remove tag(s) on a pipeline :example: .. code-block:: yaml policies: - name: pipeline-remove-tag resource: datapipeline filters: - "tag:OutdatedTag": present actions: - type: remove-tag tags: ["OutdatedTag"] """ permissions = ('datapipeline:RemoveTags',)
[docs] def process_resource_set(self, client, pipelines, tags): for pipeline in pipelines: try: client.remove_tags(pipelineId=pipeline['id'], tagKeys=tags) except (client.exceptions.PipelineDeletedException, client.exceptions.PipelineNotFoundException): continue