Source code for c7n.filters.iamaccess

# Copyright 2016-2017 Capital One Services, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
IAM Resource Policy Checker
---------------------------

When securing resources with iam policies, we want to parse and evaluate
the resource's policy for any cross account or public access grants that
are not intended.

In general, iam policies can be complex, and where possible using iam
simulate is preferrable, but requires passing the caller's arn, which
is not feasible when we're evaluating who the valid set of callers
are.


References

- IAM Policy Evaluation
  https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_evaluation-logic.html

- IAM Policy Reference
  https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements.html

"""
from __future__ import absolute_import, division, print_function, unicode_literals

import fnmatch
import logging
import json

import six

from c7n.filters import Filter
from c7n.resolver import ValuesFrom
from c7n.utils import type_schema

log = logging.getLogger('custodian.iamaccess')


def _account(arn):
    # we could try except but some minor runtime cost, basically flag
    # invalids values
    if ':' not in arn:
        return arn
    return arn.split(':', 5)[4]


[docs]class PolicyChecker(object): """ checker_config: - check_actions: only check one of the specified actions - everyone_only: only check for wildcard permission grants - allowed_accounts: permission grants to these accounts are okay - whitelist_conditions: a list of conditions that are considered sufficient enough to whitelist the statement. """ def __init__(self, checker_config): self.checker_config = checker_config # Config properties @property def allowed_accounts(self): return self.checker_config.get('allowed_accounts', ()) @property def everyone_only(self): return self.checker_config.get('everyone_only', False) @property def check_actions(self): return self.checker_config.get('check_actions', ()) @property def whitelist_conditions(self): return self.checker_config.get('whitelist_conditions', ()) @property def allowed_vpce(self): return self.checker_config.get('allowed_vpce', ()) @property def allowed_vpc(self): return self.checker_config.get('allowed_vpc', ()) @property def allowed_orgid(self): return self.checker_config.get('allowed_orgid', ()) # Policy statement handling
[docs] def check(self, policy_text): if isinstance(policy_text, six.string_types): policy = json.loads(policy_text) else: policy = policy_text violations = [] for s in policy.get('Statement', ()): if self.handle_statement(s): violations.append(s) return violations
[docs] def handle_statement(self, s): if (all((self.handle_principal(s), self.handle_effect(s), self.handle_action(s))) and not self.handle_conditions(s)): return s
[docs] def handle_action(self, s): if self.check_actions: actions = s.get('Action') actions = isinstance(actions, six.string_types) and (actions,) or actions for a in actions: if fnmatch.filter(self.check_actions, a): return True return False return True
[docs] def handle_effect(self, s): if s['Effect'] == 'Allow': return True
[docs] def handle_principal(self, s): if 'NotPrincipal' in s: return True if 'Principal' not in s: return True # Skip service principals if 'Service' in s['Principal']: s['Principal'].pop('Service') if not s['Principal']: return False assert len(s['Principal']) == 1, "Too many principals %s" % s if isinstance(s['Principal'], six.string_types): p = s['Principal'] elif 'AWS' in s['Principal']: p = s['Principal']['AWS'] elif 'Federated' in s['Principal']: p = s['Principal']['Federated'] else: return True principal_ok = True p = isinstance(p, six.string_types) and (p,) or p for pid in p: if pid == '*': principal_ok = False elif self.everyone_only: continue elif pid.startswith('arn:aws:iam::cloudfront:user'): continue else: account_id = _account(pid) if account_id not in self.allowed_accounts: principal_ok = False return not principal_ok
[docs] def handle_conditions(self, s): conditions = self.normalize_conditions(s) if not conditions: return False results = [] for c in conditions: results.append(self.handle_condition(s, c)) return all(results)
[docs] def handle_condition(self, s, c): if not c['op']: return False if c['key'] in self.whitelist_conditions: return True handler_name = "handle_%s" % c['key'].replace('-', '_').replace(':', '_') handler = getattr(self, handler_name, None) if handler is None: log.warning("no handler:%s op:%s key:%s values:%s" % ( handler_name, c['op'], c['key'], c['values'])) return return not handler(s, c)
[docs] def normalize_conditions(self, s): s_cond = [] if 'Condition' not in s: return s_cond conditions = ( 'StringEquals', 'StringEqualsIgnoreCase', 'StringLike', 'ArnEquals', 'ArnLike', 'IpAddress', 'NotIpAddress') set_conditions = ('ForAllValues', 'ForAnyValues') for s_cond_op in list(s['Condition'].keys()): cond = {'op': s_cond_op} if s_cond_op not in conditions: if not any(s_cond_op.startswith(c) for c in set_conditions): continue cond['key'] = list(s['Condition'][s_cond_op].keys())[0] cond['values'] = s['Condition'][s_cond_op][cond['key']] cond['values'] = ( isinstance(cond['values'], six.string_types) and (cond['values'],) or cond['values']) cond['key'] = cond['key'].lower() s_cond.append(cond) return s_cond
# Condition handlers # kms specific
[docs] def handle_kms_calleraccount(self, s, c): return bool(set(map(_account, c['values'])).difference(self.allowed_accounts))
# sns default policy
[docs] def handle_aws_sourceowner(self, s, c): return bool(set(map(_account, c['values'])).difference(self.allowed_accounts))
# s3 logging
[docs] def handle_aws_sourcearn(self, s, c): return bool(set(map(_account, c['values'])).difference(self.allowed_accounts))
[docs] def handle_aws_sourceip(self, s, c): return False
[docs] def handle_aws_sourcevpce(self, s, c): if not self.allowed_vpce: return False return bool(set(map(_account, c['values'])).difference(self.allowed_vpce))
[docs] def handle_aws_sourcevpc(self, s, c): if not self.allowed_vpc: return False return bool(set(map(_account, c['values'])).difference(self.allowed_vpc))
[docs] def handle_aws_principalorgid(self, s, c): if not self.allowed_orgid: return False return bool(set(map(_account, c['values'])).difference(self.allowed_orgid))
[docs]class CrossAccountAccessFilter(Filter): """Check a resource's embedded iam policy for cross account access. """ schema = type_schema( 'cross-account', # only consider policies that grant one of the given actions. actions={'type': 'array', 'items': {'type': 'string'}}, # only consider policies which grant to * everyone_only={'type': 'boolean'}, # disregard statements using these conditions. whitelist_conditions={'type': 'array', 'items': {'type': 'string'}}, # white list accounts whitelist_from={'ref': '#/definitions/filters_common/value_from'}, whitelist={'type': 'array', 'items': {'type': 'string'}}, whitelist_orgids_from={'ref': '#/definitions/filters_common/value_from'}, whitelist_orgids={'type': 'array', 'items': {'type': 'string'}}, whitelist_vpce_from={'ref': '#/definitions/filters_common/value_from'}, whitelist_vpce={'type': 'array', 'items': {'type': 'string'}}, whitelist_vpc_from={'ref': '#/definitions/filters_common/value_from'}, whitelist_vpc={'type': 'array', 'items': {'type': 'string'}}) policy_attribute = 'Policy' annotation_key = 'CrossAccountViolations' checker_factory = PolicyChecker
[docs] def process(self, resources, event=None): self.everyone_only = self.data.get('everyone_only', False) self.conditions = set(self.data.get( 'whitelist_conditions', ("aws:userid", "aws:username"))) self.actions = self.data.get('actions', ()) self.accounts = self.get_accounts() self.vpcs = self.get_vpcs() self.vpces = self.get_vpces() self.orgid = self.get_orgids() self.checker_config = getattr(self, 'checker_config', None) or {} self.checker_config.update( {'allowed_accounts': self.accounts, 'allowed_vpc': self.vpcs, 'allowed_vpce': self.vpces, 'allowed_orgid': self.orgid, 'check_actions': self.actions, 'everyone_only': self.everyone_only, 'whitelist_conditions': self.conditions}) self.checker = self.checker_factory(self.checker_config) return super(CrossAccountAccessFilter, self).process(resources, event)
[docs] def get_accounts(self): owner_id = self.manager.config.account_id accounts = set(self.data.get('whitelist', ())) if 'whitelist_from' in self.data: values = ValuesFrom(self.data['whitelist_from'], self.manager) accounts = accounts.union(values.get_values()) accounts.add(owner_id) return accounts
[docs] def get_vpcs(self): vpc = set(self.data.get('whitelist_vpc', ())) if 'whitelist_vpc_from' in self.data: values = ValuesFrom(self.data['whitelist_vpc_from'], self.manager) vpc = vpc.union(values.get_values()) return vpc
[docs] def get_vpces(self): vpce = set(self.data.get('whitelist_vpce', ())) if 'whitelist_vpce_from' in self.data: values = ValuesFrom(self.data['whitelist_vpce_from'], self.manager) vpce = vpce.union(values.get_values()) return vpce
[docs] def get_orgids(self): org_ids = set(self.data.get('whitelist_orgids', ())) if 'whitelist_orgids_from' in self.data: values = ValuesFrom(self.data['whitelist_orgids_from'], self.manager) org_ids = org_ids.union(values.get_values()) return org_ids
[docs] def get_resource_policy(self, r): return r.get(self.policy_attribute, None)
def __call__(self, r): p = self.get_resource_policy(r) if p is None: return False violations = self.checker.check(p) if violations: r[self.annotation_key] = violations return True