# Copyright 2016-2017 Capital One Services, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import, division, print_function, unicode_literals
from c7n.exceptions import PolicyValidationError
from c7n.utils import local_session, type_schema
from .core import Filter, ValueFilter
from .related import RelatedResourceFilter
[docs]class MatchResourceValidator(object):
[docs] def validate(self):
if self.data.get('match-resource'):
self.required_keys = set('key',)
return super(MatchResourceValidator, self).validate()
[docs]class SecurityGroupFilter(MatchResourceValidator, RelatedResourceFilter):
"""Filter a resource by its associated security groups."""
schema = type_schema(
'security-group', rinherit=ValueFilter.schema,
**{'match-resource': {'type': 'boolean'},
'operator': {'enum': ['and', 'or']}})
schema_alias = True
RelatedResource = "c7n.resources.vpc.SecurityGroup"
AnnotationKey = "matched-security-groups"
[docs]class SubnetFilter(MatchResourceValidator, RelatedResourceFilter):
"""Filter a resource by its associated subnets."""
schema = type_schema(
'subnet', rinherit=ValueFilter.schema,
**{'match-resource': {'type': 'boolean'},
'operator': {'enum': ['and', 'or']}})
schema_alias = True
RelatedResource = "c7n.resources.vpc.Subnet"
AnnotationKey = "matched-subnets"
[docs]class VpcFilter(MatchResourceValidator, RelatedResourceFilter):
"""Filter a resource by its associated vpc."""
schema = type_schema(
'vpc', rinherit=ValueFilter.schema,
**{'match-resource': {'type': 'boolean'},
'operator': {'enum': ['and', 'or']}})
schema_alias = True
RelatedResource = "c7n.resources.vpc.Vpc"
AnnotationKey = "matched-vpcs"
[docs]class DefaultVpcBase(Filter):
"""Filter to resources in a default vpc."""
vpcs = None
default_vpc = None
permissions = ('ec2:DescribeVpcs',)
[docs] def match(self, vpc_id):
if self.default_vpc is None:
self.log.debug("querying default vpc %s" % vpc_id)
client = local_session(self.manager.session_factory).client('ec2')
vpcs = [v['VpcId'] for v
in client.describe_vpcs()['Vpcs']
if v['IsDefault']]
if vpcs:
self.default_vpc = vpcs.pop()
return vpc_id == self.default_vpc and True or False
[docs]class NetworkLocation(Filter):
"""On a network attached resource, determine intersection of
security-group attributes, subnet attributes, and resource attributes.
The use case is a bit specialized, for most use cases using `subnet`
and `security-group` filters suffice. but say for example you wanted to
verify that an ec2 instance was only using subnets and security groups
with a given tag value, and that tag was not present on the resource.
:Example:
.. code-block:: yaml
policies:
- name: ec2-mismatched-sg-remove
resource: ec2
filters:
- type: network-location
compare: ["resource","security-group"]
key: "tag:TEAM_NAME"
ignore:
- "tag:TEAM_NAME": Enterprise
actions:
- type: modify-security-groups
remove: network-location
isolation-group: sg-xxxxxxxx
"""
schema = type_schema(
'network-location',
**{'missing-ok': {
'type': 'boolean',
'default': False,
'description': (
"How to handle missing keys on elements, by default this causes"
"resources to be considered not-equal")},
'match': {'type': 'string', 'enum': ['equal', 'not-equal'],
'default': 'non-equal'},
'compare': {
'type': 'array',
'description': (
'Which elements of network location should be considered when'
' matching.'),
'default': ['resource', 'subnet', 'security-group'],
'items': {
'enum': ['resource', 'subnet', 'security-group']}},
'key': {
'type': 'string',
'description': 'The attribute expression that should be matched on'},
'max-cardinality': {
'type': 'integer', 'default': 1,
'title': ''},
'ignore': {'type': 'array', 'items': {'type': 'object'}},
'required': ['key'],
})
schema_alias = True
permissions = ('ec2:DescribeSecurityGroups', 'ec2:DescribeSubnets')
[docs] def validate(self):
rfilters = self.manager.filter_registry.keys()
if 'subnet' not in rfilters:
raise PolicyValidationError(
"network-location requires resource subnet filter availability on %s" % (
self.manager.data))
if 'security-group' not in rfilters:
raise PolicyValidationError(
"network-location requires resource security-group filter availability on %s" % (
self.manager.data))
return self
[docs] def process(self, resources, event=None):
self.sg = self.manager.filter_registry.get('security-group')({}, self.manager)
related_sg = self.sg.get_related(resources)
self.subnet = self.manager.filter_registry.get('subnet')({}, self.manager)
related_subnet = self.subnet.get_related(resources)
self.sg_model = self.manager.get_resource_manager('security-group').get_model()
self.subnet_model = self.manager.get_resource_manager('subnet').get_model()
self.vf = self.manager.filter_registry.get('value')({}, self.manager)
# filter options
key = self.data.get('key')
self.compare = self.data.get('compare', ['subnet', 'security-group', 'resource'])
self.max_cardinality = self.data.get('max-cardinality', 1)
self.match = self.data.get('match', 'not-equal')
self.missing_ok = self.data.get('missing-ok', False)
results = []
for r in resources:
resource_sgs = self.filter_ignored(
[related_sg[sid] for sid in self.sg.get_related_ids([r])])
resource_subnets = self.filter_ignored([
related_subnet[sid] for sid in self.subnet.get_related_ids([r])])
found = self.process_resource(r, resource_sgs, resource_subnets, key)
if found:
results.append(found)
return results
[docs] def filter_ignored(self, resources):
ignores = self.data.get('ignore', ())
results = []
for r in resources:
found = False
for i in ignores:
for k, v in i.items():
if self.vf.get_resource_value(k, r) == v:
found = True
if found is True:
break
if found is True:
continue
results.append(r)
return results
[docs] def process_resource(self, r, resource_sgs, resource_subnets, key):
evaluation = []
sg_space = set()
subnet_space = set()
if 'subnet' in self.compare and resource_subnets:
subnet_values = {
rsub[self.subnet_model.id]: self.subnet.get_resource_value(key, rsub)
for rsub in resource_subnets}
if not self.missing_ok and None in subnet_values.values():
evaluation.append({
'reason': 'SubnetLocationAbsent',
'subnets': subnet_values})
subnet_space = set(filter(None, subnet_values.values()))
if len(subnet_space) > self.max_cardinality:
evaluation.append({
'reason': 'SubnetLocationCardinality',
'subnets': subnet_values})
if 'security-group' in self.compare and resource_sgs:
sg_values = {
rsg[self.sg_model.id]: self.sg.get_resource_value(key, rsg)
for rsg in resource_sgs}
if not self.missing_ok and None in sg_values.values():
evaluation.append({
'reason': 'SecurityGroupLocationAbsent',
'security-groups': sg_values})
sg_space = set(filter(None, sg_values.values()))
if len(sg_space) > self.max_cardinality:
evaluation.append({
'reason': 'SecurityGroupLocationCardinality',
'security-groups': sg_values})
if ('subnet' in self.compare and
'security-group' in self.compare and
sg_space != subnet_space):
evaluation.append({
'reason': 'LocationMismatch',
'subnets': subnet_values,
'security-groups': sg_values})
if 'resource' in self.compare:
r_value = self.vf.get_resource_value(key, r)
if not self.missing_ok and r_value is None:
evaluation.append({
'reason': 'ResourceLocationAbsent',
'resource': r_value})
elif 'security-group' in self.compare and resource_sgs and r_value not in sg_space:
evaluation.append({
'reason': 'ResourceLocationMismatch',
'resource': r_value,
'security-groups': sg_values})
elif 'subnet' in self.compare and resource_subnets and r_value not in subnet_space:
evaluation.append({
'reason': 'ResourceLocationMismatch',
'resource': r_value,
'subnet': subnet_values})
if 'security-group' in self.compare and resource_sgs:
mismatched_sgs = {sg_id: sg_value
for sg_id, sg_value in sg_values.items()
if sg_value != r_value}
if mismatched_sgs:
evaluation.append({
'reason': 'SecurityGroupMismatch',
'resource': r_value,
'security-groups': mismatched_sgs})
if evaluation and self.match == 'not-equal':
r['c7n:NetworkLocation'] = evaluation
return r
elif not evaluation and self.match == 'equal':
return r