Source code for c7n.resources.sqs

# Copyright 2016-2017 Capital One Services, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import, division, print_function, unicode_literals

from botocore.exceptions import ClientError

import json

from c7n.actions import RemovePolicyBase
from c7n.filters import CrossAccountAccessFilter, MetricsFilter
from c7n.filters.kms import KmsRelatedFilter
from c7n.manager import resources
from c7n.utils import local_session
from c7n.query import QueryResourceManager
from c7n.actions import BaseAction
from c7n.utils import type_schema
from c7n.tags import universal_augment, register_universal_tags


[docs]@resources.register('sqs') class SQS(QueryResourceManager):
[docs] class resource_type(object): service = 'sqs' type = None # type = 'queue' enum_spec = ('list_queues', 'QueueUrls', None) detail_spec = ("get_queue_attributes", "QueueUrl", None, "Attributes") id = 'QueueUrl' arn = "QueueArn" filter_name = 'QueueNamePrefix' filter_type = 'scalar' name = 'QueueUrl' date = 'CreatedTimestamp' dimension = 'QueueName' default_report_fields = ( 'QueueArn', 'CreatedTimestamp', 'ApproximateNumberOfMessages', )
[docs] def get_permissions(self): perms = super(SQS, self).get_permissions() perms.append('sqs:GetQueueAttributes') return perms
[docs] def get_resources(self, ids, cache=True): ids_normalized = [] for i in ids: if not i.startswith('https://'): ids_normalized.append(i) continue ids_normalized.append(i.rsplit('/', 1)[-1]) return super(SQS, self).get_resources(ids_normalized, cache)
[docs] def augment(self, resources): client = local_session(self.session_factory).client('sqs') def _augment(r): try: queue = self.retry( client.get_queue_attributes, QueueUrl=r, AttributeNames=['All'])['Attributes'] queue['QueueUrl'] = r except ClientError as e: if e.response['Error']['Code'] == 'AWS.SimpleQueueService.NonExistentQueue': return if e.response['Error']['Code'] == 'AccessDenied': self.log.warning("Denied access to sqs %s" % r) return raise return queue with self.executor_factory(max_workers=2) as w: return universal_augment( self, list(filter(None, w.map(_augment, resources))))
register_universal_tags( SQS.filter_registry, SQS.action_registry, compatibility=False)
[docs]@SQS.filter_registry.register('metrics') class MetricsFilter(MetricsFilter):
[docs] def get_dimensions(self, resource): return [ {'Name': 'QueueName', 'Value': resource['QueueUrl'].rsplit('/', 1)[-1]}]
[docs]@SQS.filter_registry.register('cross-account') class SQSCrossAccount(CrossAccountAccessFilter): """Filter SQS queues which have cross account permissions :example: .. code-block:: yaml policies: - name: sqs-cross-account resource: sqs filters: - type: cross-account """ permissions = ('sqs:GetQueueAttributes',)
[docs]@SQS.filter_registry.register('kms-key') class KmsFilter(KmsRelatedFilter): """ Filter a resource by its associcated kms key and optionally the aliasname of the kms key by using 'c7n:AliasName' The KmsMasterId returned for SQS sometimes has the alias name directly in the value. :example: .. code-block:: yaml policies: - name: efs-kms-key-filters resource: efs filters: - or: - type: value key: KmsMasterKeyId value: "^(alias/aws/)" op: regex - type: kms-key key: c7n:AliasName value: "^(alias/aws/)" op: regex """ RelatedIdsExpression = 'KmsMasterKeyId'
[docs]@SQS.action_registry.register('remove-statements') class RemovePolicyStatement(RemovePolicyBase): """Action to remove policy statements from SQS :example: .. code-block:: yaml policies: - name: sqs-cross-account resource: sqs filters: - type: cross-account actions: - type: remove-statements statement_ids: matched """ permissions = ('sqs:GetQueueAttributes', 'sqs:RemovePermission')
[docs] def process(self, resources): results = [] client = local_session(self.manager.session_factory).client('sqs') for r in resources: try: results += filter(None, [self.process_resource(client, r)]) except Exception: self.log.exception( "Error processing sqs:%s", r['QueueUrl']) return results
[docs] def process_resource(self, client, resource): p = resource.get('Policy') if p is None: return p = json.loads(resource['Policy']) statements, found = self.process_policy( p, resource, CrossAccountAccessFilter.annotation_key) if not found: return for f in found: client.remove_permission( QueueUrl=resource['QueueUrl'], Label=f['Sid']) return {'Name': resource['QueueUrl'], 'State': 'PolicyRemoved', 'Statements': found}
[docs]@SQS.action_registry.register('delete') class DeleteSqsQueue(BaseAction): """Action to delete a SQS queue To prevent unwanted deletion of SQS queues, it is recommended to include a filter :example: .. code-block:: yaml policies: - name: sqs-delete resource: sqs filters: - KmsMasterKeyId: absent actions: - type: delete """ schema = type_schema('delete') permissions = ('sqs:DeleteQueue',)
[docs] def process(self, queues): client = local_session(self.manager.session_factory).client('sqs') for q in queues: self.process_queue(client, q)
[docs] def process_queue(self, client, queue): try: client.delete_queue(QueueUrl=queue['QueueUrl']) except (client.exceptions.QueueDoesNotExist, client.exceptions.QueueDeletedRecently): pass
[docs]@SQS.action_registry.register('set-encryption') class SetEncryption(BaseAction): """Action to set encryption key on SQS queue :example: .. code-block:: yaml policies: - name: sqs-set-encrypt resource: sqs filters: - KmsMasterKeyId: absent actions: - type: set-encryption key: "<alias of kms key>" """ schema = type_schema( 'set-encryption', key={'type': 'string'}, required=('key',)) permissions = ('sqs:SetQueueAttributes',)
[docs] def process(self, queues): # get KeyId key = "alias/" + self.data.get('key') session = local_session(self.manager.session_factory) key_id = session.client( 'kms').describe_key(KeyId=key)['KeyMetadata']['KeyId'] client = session.client('sqs') for q in queues: self.process_queue(client, q, key_id)
[docs] def process_queue(self, client, queue, key_id): try: client.set_queue_attributes( QueueUrl=queue['QueueUrl'], Attributes={'KmsMasterKeyId': key_id} ) except (client.exceptions.QueueDoesNotExist,) as e: self.log.exception( "Exception modifying queue:\n %s" % e)
[docs]@SQS.action_registry.register('set-retention-period') class SetRetentionPeriod(BaseAction): """Action to set the retention period on an SQS queue (in seconds) :example: .. code-block:: yaml policies: - name: sqs-reduce-long-retention-period resource: sqs filters: - type: value key: MessageRetentionPeriod value_type: integer value: 345600 op: ge actions: - type: set-retention-period period: 86400 """ schema = type_schema( 'set-retention-period', period={'type': 'integer', 'minimum': 60, 'maximum': 1209600}) permissions = ('sqs:SetQueueAttributes',)
[docs] def process(self, queues): client = local_session(self.manager.session_factory).client('sqs') period = str(self.data.get('period', 345600)) for q in queues: client.set_queue_attributes( QueueUrl=q['QueueUrl'], Attributes={ 'MessageRetentionPeriod': period})