Source code for c7n.resources.kinesis

# Copyright 2016-2017 Capital One Services, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import, division, print_function, unicode_literals

import jmespath

from c7n.actions import Action
from c7n.manager import resources
from c7n.query import QueryResourceManager
from c7n.tags import universal_augment
from c7n.utils import local_session, type_schema, get_retry


[docs]@resources.register('kinesis') class KinesisStream(QueryResourceManager): retry = staticmethod( get_retry(( 'LimitExceededException',)))
[docs] class resource_type(object): service = 'kinesis' type = 'stream' enum_spec = ('list_streams', 'StreamNames', None) detail_spec = ( 'describe_stream', 'StreamName', None, 'StreamDescription') name = id = 'StreamName' filter_name = None filter_type = None date = None dimension = 'StreamName' universal_taggable = True
[docs] def augment(self, resources): return universal_augment( self, super(KinesisStream, self).augment(resources))
[docs]@KinesisStream.action_registry.register('encrypt') class Encrypt(Action): schema = type_schema('encrypt', key={'type': 'string'}, required=('key',)) permissions = ("kinesis:UpdateStream",)
[docs] def process(self, resources): # get KeyId key = "alias/" + self.data.get('key') self.key_id = local_session(self.manager.session_factory).client( 'kms').describe_key(KeyId=key)['KeyMetadata']['KeyId'] client = local_session(self.manager.session_factory).client('kinesis') for r in resources: if not r['StreamStatus'] == 'ACTIVE': continue client.start_stream_encryption( StreamName=r['StreamName'], EncryptionType='KMS', KeyId=self.key_id )
[docs]@KinesisStream.action_registry.register('delete') class Delete(Action): schema = type_schema('delete') permissions = ("kinesis:DeleteStream",)
[docs] def process(self, resources): client = local_session(self.manager.session_factory).client('kinesis') not_active = [r['StreamName'] for r in resources if r['StreamStatus'] != 'ACTIVE'] self.log.warning( "The following streams cannot be deleted (wrong state): %s" % ( ", ".join(not_active))) for r in resources: if not r['StreamStatus'] == 'ACTIVE': continue client.delete_stream( StreamName=r['StreamName'])
[docs]@resources.register('firehose') class DeliveryStream(QueryResourceManager):
[docs] class resource_type(object): service = 'firehose' type = 'deliverystream' enum_spec = ('list_delivery_streams', 'DeliveryStreamNames', None) detail_spec = ( 'describe_delivery_stream', 'DeliveryStreamName', None, 'DeliveryStreamDescription') name = id = 'DeliveryStreamName' filter_name = None filter_type = None date = 'CreateTimestamp' dimension = 'DeliveryStreamName'
[docs]@DeliveryStream.action_registry.register('delete') class FirehoseDelete(Action): schema = type_schema('delete') permissions = ("firehose:DeleteDeliveryStream",)
[docs] def process(self, resources): client = local_session(self.manager.session_factory).client('firehose') creating = [r['DeliveryStreamName'] for r in resources if r['DeliveryStreamStatus'] == 'CREATING'] if creating: self.log.warning( "These delivery streams can't be deleted (wrong state): %s" % ( ", ".join(creating))) for r in resources: if not r['DeliveryStreamStatus'] == 'ACTIVE': continue client.delete_delivery_stream( DeliveryStreamName=r['DeliveryStreamName'])
[docs]@DeliveryStream.action_registry.register('encrypt-s3-destination') class FirehoseEncryptS3Destination(Action): """Action to set encryption key a Firehose S3 destination :example: .. code-block:: yaml policies: - name: encrypt-s3-destination resource: firehose filters: - KmsMasterKeyId: absent actions: - type: encrypt-s3-destination key_arn: <arn of KMS key/alias> """ schema = type_schema( 'encrypt-s3-destination', key_arn={'type': 'string'}, required=('key_arn',)) permissions = ("firehose:UpdateDestination",) DEST_MD = { 'SplunkDestinationDescription': { 'update': 'SplunkDestinationUpdate', 'clear': ['S3BackupMode'], 'encrypt_path': 'S3DestinationDescription.EncryptionConfiguration', 'remap': [('S3DestinationDescription', 'S3Update')] }, 'ElasticsearchDestinationDescription': { 'update': 'ElasticsearchDestinationUpdate', 'clear': ['S3BackupMode'], 'encrypt_path': 'S3DestinationDescription.EncryptionConfiguration', 'remap': [('S3DestinationDescription', 'S3Update')], }, 'ExtendedS3DestinationDescription': { 'update': 'ExtendedS3DestinationUpdate', 'clear': ['S3BackupMode'], 'encrypt_path': 'EncryptionConfiguration', 'remap': [] }, 'RedshiftDestinationDescription': { 'update': 'RedshiftDestinationUpdate', 'clear': ['S3BackupMode', "ClusterJDBCURL", "CopyCommand", "Username"], 'encrypt_path': 'S3DestinationDescription.EncryptionConfiguration', 'remap': [('S3DestinationDescription', 'S3Update')] }, }
[docs] def process(self, resources): client = local_session(self.manager.session_factory).client('firehose') key = self.data.get('key_arn') for r in resources: if not r['DeliveryStreamStatus'] == 'ACTIVE': continue version = r['VersionId'] name = r['DeliveryStreamName'] d = r['Destinations'][0] destination_id = d['DestinationId'] for dtype, dmetadata in self.DEST_MD.items(): if dtype not in d: continue dinfo = d[dtype] for k in dmetadata['clear']: dinfo.pop(k, None) if dmetadata['encrypt_path']: encrypt_info = jmespath.search(dmetadata['encrypt_path'], dinfo) else: encrypt_info = dinfo encrypt_info.pop('NoEncryptionConfig', None) encrypt_info['KMSEncryptionConfig'] = {'AWSKMSKeyARN': key} for old_k, new_k in dmetadata['remap']: if old_k in dinfo: dinfo[new_k] = dinfo.pop(old_k) params = dict(DeliveryStreamName=name, DestinationId=destination_id, CurrentDeliveryStreamVersionId=version) params[dmetadata['update']] = dinfo client.update_destination(**params)
[docs]@resources.register('kinesis-analytics') class AnalyticsApp(QueryResourceManager):
[docs] class resource_type(object): service = "kinesisanalytics" enum_spec = ('list_applications', 'ApplicationSummaries', None) detail_spec = ('describe_application', 'ApplicationName', 'ApplicationName', 'ApplicationDetail') name = "ApplicationName" arn = id = "ApplicationARN" dimension = None filter_name = None filter_type = None
[docs]@AnalyticsApp.action_registry.register('delete') class AppDelete(Action): schema = type_schema('delete') permissions = ("kinesisanalytics:DeleteApplication",)
[docs] def process(self, resources): client = local_session( self.manager.session_factory).client('kinesisanalytics') for r in resources: client.delete_application( ApplicationName=r['ApplicationName'], CreateTimestamp=r['CreateTimestamp'])