# Copyright 2015-2017 Capital One Services, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import, division, print_function, unicode_literals
from botocore.exceptions import ClientError
import json
import logging
from c7n.actions import RemovePolicyBase, BaseAction
from c7n.filters import Filter, CrossAccountAccessFilter, ValueFilter
from c7n.manager import resources
from c7n.query import QueryResourceManager, RetryPageIterator
from c7n.utils import local_session, type_schema
from c7n.tags import universal_augment
log = logging.getLogger('custodian.kms')
[docs]@resources.register('kms')
class KeyAlias(QueryResourceManager):
[docs] class resource_type(object):
service = 'kms'
type = 'key-alias'
enum_spec = ('list_aliases', 'Aliases', None)
name = "AliasName"
id = "AliasArn"
dimension = None
filter_name = None
[docs] def augment(self, resources):
return [r for r in resources if 'TargetKeyId' in r]
[docs]@resources.register('kms-key')
class Key(QueryResourceManager):
[docs] class resource_type(object):
service = 'kms'
type = "key"
enum_spec = ('list_keys', 'Keys', None)
name = "KeyId"
id = "KeyArn"
dimension = None
filter_name = None
universal_taggable = True
[docs] def augment(self, resources):
client = local_session(self.session_factory).client('kms')
for r in resources:
try:
key_id = r.get('KeyArn')
info = client.describe_key(KeyId=key_id)['KeyMetadata']
r.update(info)
except ClientError as e:
if e.response['Error']['Code'] == 'AccessDeniedException':
self.log.warning(
"Access denied when describing key:%s",
key_id)
else:
raise
return universal_augment(self, resources)
[docs]@Key.filter_registry.register('key-rotation-status')
class KeyRotationStatus(ValueFilter):
"""Filters KMS keys by the rotation status
:example:
.. code-block:: yaml
policies:
- name: kms-key-disabled-rotation
resource: kms-key
filters:
- type: key-rotation-status
key: KeyRotationEnabled
value: false
"""
schema = type_schema('key-rotation-status', rinherit=ValueFilter.schema)
permissions = ('kms:GetKeyRotationStatus',)
[docs] def process(self, resources, event=None):
client = local_session(self.manager.session_factory).client('kms')
def _key_rotation_status(resource):
try:
resource['KeyRotationEnabled'] = client.get_key_rotation_status(
KeyId=resource['KeyId'])
except ClientError as e:
if e.response['Error']['Code'] == 'AccessDeniedException':
self.log.warning(
"Access denied when getting rotation status on key:%s",
resource.get('KeyArn'))
else:
raise
with self.executor_factory(max_workers=2) as w:
query_resources = [
r for r in resources if 'KeyRotationEnabled' not in r]
self.log.debug(
"Querying %d kms-keys' rotation status" % len(query_resources))
list(w.map(_key_rotation_status, query_resources))
return [r for r in resources if self.match(
r.get('KeyRotationEnabled', {}))]
[docs]@Key.filter_registry.register('cross-account')
@KeyAlias.filter_registry.register('cross-account')
class KMSCrossAccountAccessFilter(CrossAccountAccessFilter):
"""Filter KMS keys which have cross account permissions
:example:
.. code-block:: yaml
policies:
- name: kms-key-cross-account
resource: kms-key
filters:
- type: cross-account
"""
permissions = ('kms:GetKeyPolicy',)
[docs] def process(self, resources, event=None):
client = local_session(
self.manager.session_factory).client('kms')
def _augment(r):
key_id = r.get('TargetKeyId', r.get('KeyId'))
assert key_id, "Invalid key resources %s" % r
r['Policy'] = client.get_key_policy(
KeyId=key_id, PolicyName='default')['Policy']
return r
self.log.debug("fetching policy for %d kms keys" % len(resources))
with self.executor_factory(max_workers=1) as w:
resources = list(filter(None, w.map(_augment, resources)))
return super(KMSCrossAccountAccessFilter, self).process(
resources, event)
[docs]@KeyAlias.filter_registry.register('grant-count')
class GrantCount(Filter):
"""Filters KMS key grants
This can be used to ensure issues around grant limits are monitored
:example:
.. code-block:: yaml
policies:
- name: kms-grants
resource: kms
filters:
- type: grant-count
min: 100
"""
schema = type_schema(
'grant-count', min={'type': 'integer', 'minimum': 0})
permissions = ('kms:ListGrants',)
[docs] def process(self, keys, event=None):
client = local_session(self.manager.session_factory).client('kms')
results = []
for k in keys:
results.append(self.process_key(client, k))
return [r for r in results if r]
[docs] def process_key(self, client, key):
p = client.get_paginator('list_grants')
p.PAGE_ITERATOR_CLS = RetryPageIterator
grant_count = 0
for rp in p.paginate(KeyId=key['TargetKeyId']):
grant_count += len(rp['Grants'])
key['GrantCount'] = grant_count
grant_threshold = self.data.get('min', 5)
if grant_count < grant_threshold:
return None
self.manager.ctx.metrics.put_metric(
"ExtantGrants", grant_count, "Count",
Scope=key['AliasName'][6:])
return key
[docs]class ResourceKmsKeyAlias(ValueFilter):
schema = type_schema('kms-alias', rinherit=ValueFilter.schema)
[docs] def get_permissions(self):
return KeyAlias(self.manager.ctx, {}).get_permissions()
[docs] def get_matching_aliases(self, resources, event=None):
key_aliases = KeyAlias(self.manager.ctx, {}).resources()
key_aliases_dict = {a['TargetKeyId']: a for a in key_aliases}
matched = []
for r in resources:
if r.get('KmsKeyId'):
r['KeyAlias'] = key_aliases_dict.get(
r.get('KmsKeyId').split("key/", 1)[-1])
if self.match(r.get('KeyAlias')):
matched.append(r)
return matched
[docs]@Key.action_registry.register('remove-statements')
@KeyAlias.action_registry.register('remove-statements')
class RemovePolicyStatement(RemovePolicyBase):
"""Action to remove policy statements from KMS
:example:
.. code-block:: yaml
policies:
- name: kms-key-cross-account
resource: kms-key
filters:
- type: cross-account
actions:
- type: remove-statements
statement_ids: matched
"""
permissions = ('kms:GetKeyPolicy', 'kms:PutKeyPolicy')
[docs] def process(self, resources):
results = []
client = local_session(self.manager.session_factory).client('kms')
for r in resources:
key_id = r.get('TargetKeyId', r.get('KeyId'))
assert key_id, "Invalid key resources %s" % r
try:
results += filter(None, [self.process_resource(client, r, key_id)])
except Exception:
self.log.exception(
"Error processing sns:%s", key_id)
return results
[docs] def process_resource(self, client, resource, key_id):
if 'Policy' not in resource:
try:
resource['Policy'] = client.get_key_policy(
KeyId=key_id, PolicyName='default')['Policy']
except ClientError as e:
if e.response['Error']['Code'] != "NotFoundException":
raise
resource['Policy'] = None
if not resource['Policy']:
return
p = json.loads(resource['Policy'])
statements, found = self.process_policy(
p, resource, CrossAccountAccessFilter.annotation_key)
if not found:
return
# NB: KMS supports only one key policy 'default'
# http://docs.aws.amazon.com/kms/latest/developerguide/programming-key-policies.html#list-policies
client.put_key_policy(
KeyId=key_id,
PolicyName='default',
Policy=json.dumps(p)
)
return {'Name': key_id,
'State': 'PolicyRemoved',
'Statements': found}
[docs]@Key.action_registry.register('set-rotation')
class KmsKeyRotation(BaseAction):
"""Toggle KMS key rotation
:example:
.. code-block: yaml
policy:
- name: enable-cmk-rotation
resource: kms-key
filters:
- type: key-rotation-status
key: KeyRotationEnabled
value: False
actions:
- type: set-rotation
state: True
"""
permissions = ('kms:EnableKeyRotation',)
schema = type_schema('set-rotation', state={'type': 'boolean'})
[docs] def process(self, keys):
client = local_session(self.manager.session_factory).client('kms')
for k in keys:
if self.data.get('state', True):
client.enable_key_rotation(KeyId=k['KeyId'])
continue
client.disable_key_rotation(KeyId=k['KeyId'])