# Copyright 2016-2017 Capital One Services, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import, division, print_function, unicode_literals
import itertools
import operator
import zlib
import jmespath
from c7n.actions import BaseAction, ModifyVpcSecurityGroupsAction
from c7n.exceptions import PolicyValidationError, ClientError
from c7n.filters import (
DefaultVpcBase, Filter, ValueFilter)
import c7n.filters.vpc as net_filters
from c7n.filters.iamaccess import CrossAccountAccessFilter
from c7n.filters.related import RelatedResourceFilter
from c7n.filters.revisions import Diff
from c7n.filters.locked import Locked
from c7n import query, resolver
from c7n.manager import resources
from c7n.utils import chunks, local_session, type_schema, get_retry, parse_cidr
from c7n.resources.shield import IsShieldProtected, SetShieldProtection
[docs]@resources.register('vpc')
class Vpc(query.QueryResourceManager):
[docs] class resource_type(object):
service = 'ec2'
type = 'vpc'
enum_spec = ('describe_vpcs', 'Vpcs', None)
name = id = 'VpcId'
filter_name = 'VpcIds'
filter_type = 'list'
date = None
dimension = None
config_type = 'AWS::EC2::VPC'
id_prefix = "vpc-"
[docs]@Vpc.filter_registry.register('flow-logs')
class FlowLogFilter(Filter):
"""Are flow logs enabled on the resource.
ie to find all vpcs with flows logs disabled we can do this
:example:
.. code-block:: yaml
policies:
- name: flow-logs-enabled
resource: vpc
filters:
- flow-logs
or to find all vpcs with flow logs but that don't match a
particular configuration.
:example:
.. code-block:: yaml
policies:
- name: flow-mis-configured
resource: vpc
filters:
- not:
- type: flow-logs
enabled: true
set-op: or
op: equal
# equality operator applies to following keys
traffic-type: all
status: active
log-group: vpc-logs
"""
schema = type_schema(
'flow-logs',
**{'enabled': {'type': 'boolean', 'default': False},
'op': {'enum': ['equal', 'not-equal'], 'default': 'equal'},
'set-op': {'enum': ['or', 'and'], 'default': 'or'},
'status': {'enum': ['active']},
'deliver-status': {'enum': ['success', 'failure']},
'destination': {'type': 'string'},
'destination-type': {'enum': ['s3', 'cloud-watch-logs']},
'traffic-type': {'enum': ['accept', 'reject', 'all']},
'log-group': {'type': 'string'}})
permissions = ('ec2:DescribeFlowLogs',)
[docs] def process(self, resources, event=None):
client = local_session(self.manager.session_factory).client('ec2')
# TODO given subnet/nic level logs, we should paginate, but we'll
# need to add/update botocore pagination support.
logs = client.describe_flow_logs().get('FlowLogs', ())
m = self.manager.get_model()
resource_map = {}
for fl in logs:
resource_map.setdefault(fl['ResourceId'], []).append(fl)
enabled = self.data.get('enabled', False)
log_group = self.data.get('log-group')
traffic_type = self.data.get('traffic-type')
destination_type = self.data.get('destination-type')
destination = self.data.get('destination')
status = self.data.get('status')
delivery_status = self.data.get('deliver-status')
op = self.data.get('op', 'equal') == 'equal' and operator.eq or operator.ne
set_op = self.data.get('set-op', 'or')
results = []
# looping over vpc resources
for r in resources:
if r[m.id] not in resource_map:
# we didn't find a flow log for this vpc
if enabled:
# vpc flow logs not enabled so exclude this vpc from results
continue
results.append(r)
continue
flogs = resource_map[r[m.id]]
r['c7n:flow-logs'] = flogs
# config comparisons are pointless if we only want vpcs with no flow logs
if enabled:
fl_matches = []
for fl in flogs:
dest_type_match = (destination_type is None) or op(
fl['LogDestinationType'], destination_type)
dest_match = (destination is None) or op(
fl['LogDestination'], destination)
status_match = (status is None) or op(fl['FlowLogStatus'], status.upper())
delivery_status_match = (delivery_status is None) or op(
fl['DeliverLogsStatus'], delivery_status.upper())
traffic_type_match = (
traffic_type is None) or op(
fl['TrafficType'],
traffic_type.upper())
log_group_match = (log_group is None) or op(fl['LogGroupName'], log_group)
# combine all conditions to check if flow log matches the spec
fl_match = (status_match and traffic_type_match and dest_match and
log_group_match and dest_type_match and delivery_status_match)
fl_matches.append(fl_match)
if set_op == 'or':
if any(fl_matches):
results.append(r)
elif set_op == 'and':
if all(fl_matches):
results.append(r)
return results
[docs]@Vpc.filter_registry.register('security-group')
class VpcSecurityGroupFilter(RelatedResourceFilter):
"""Filter VPCs based on Security Group attributes
:example:
.. code-block:: yaml
policies:
- name: gray-vpcs
resource: vpc
filters:
- type: security-group
key: tag:Color
value: Gray
"""
schema = type_schema(
'security-group', rinherit=ValueFilter.schema,
**{'match-resource': {'type': 'boolean'},
'operator': {'enum': ['and', 'or']}})
RelatedResource = "c7n.resources.vpc.SecurityGroup"
RelatedIdsExpression = '[SecurityGroups][].GroupId'
AnnotationKey = "matched-vpcs"
[docs]@Vpc.filter_registry.register('subnet')
class VpcSubnetFilter(RelatedResourceFilter):
"""Filter VPCs based on Subnet attributes
:example:
.. code-block:: yaml
policies:
- name: gray-vpcs
resource: vpc
filters:
- type: subnet
key: tag:Color
value: Gray
"""
schema = type_schema(
'subnet', rinherit=ValueFilter.schema,
**{'match-resource': {'type': 'boolean'},
'operator': {'enum': ['and', 'or']}})
RelatedResource = "c7n.resources.vpc.Subnet"
RelatedIdsExpression = '[Subnets][].SubnetId'
AnnotationKey = "MatchedVpcsSubnets"
[docs]@Vpc.filter_registry.register('nat-gateway')
class VpcNatGatewayFilter(RelatedResourceFilter):
"""Filter VPCs based on NAT Gateway attributes
:example:
.. code-block:: yaml
policies:
- name: gray-vpcs
resource: vpc
filters:
- type: nat-gateway
key: tag:Color
value: Gray
"""
schema = type_schema(
'nat-gateway', rinherit=ValueFilter.schema,
**{'match-resource': {'type': 'boolean'},
'operator': {'enum': ['and', 'or']}})
RelatedResource = "c7n.resources.vpc.NATGateway"
RelatedIdsExpression = '[NatGateways][].NatGatewayId'
AnnotationKey = "MatchedVpcsNatGateways"
[docs]@Vpc.filter_registry.register('internet-gateway')
class VpcInternetGatewayFilter(RelatedResourceFilter):
"""Filter VPCs based on Internet Gateway attributes
:example:
.. code-block:: yaml
policies:
- name: gray-vpcs
resource: vpc
filters:
- type: internet-gateway
key: tag:Color
value: Gray
"""
schema = type_schema(
'internet-gateway', rinherit=ValueFilter.schema,
**{'match-resource': {'type': 'boolean'},
'operator': {'enum': ['and', 'or']}})
RelatedResource = "c7n.resources.vpc.InternetGateway"
RelatedIdsExpression = '[InternetGateways][].InternetGatewayId'
AnnotationKey = "MatchedVpcsIgws"
[docs]@Vpc.filter_registry.register('vpc-attributes')
class AttributesFilter(Filter):
"""Filters VPCs based on their DNS attributes
:example:
.. code-block:: yaml
policies:
- name: dns-hostname-enabled
resource: vpc
filters:
- type: vpc-attributes
dnshostnames: True
"""
schema = type_schema(
'vpc-attributes',
dnshostnames={'type': 'boolean'},
dnssupport={'type': 'boolean'})
permissions = ('ec2:DescribeVpcAttributes',)
[docs] def process(self, resources, event=None):
results = []
client = local_session(self.manager.session_factory).client('ec2')
dns_hostname = self.data.get('dnshostnames', None)
dns_support = self.data.get('dnssupport', None)
for r in resources:
if dns_hostname is not None:
hostname = client.describe_vpc_attribute(
VpcId=r['VpcId'],
Attribute='enableDnsHostnames'
)['EnableDnsHostnames']['Value']
if dns_support is not None:
support = client.describe_vpc_attribute(
VpcId=r['VpcId'],
Attribute='enableDnsSupport'
)['EnableDnsSupport']['Value']
if dns_hostname is not None and dns_support is not None:
if dns_hostname == hostname and dns_support == support:
results.append(r)
elif dns_hostname is not None and dns_support is None:
if dns_hostname == hostname:
results.append(r)
elif dns_support is not None and dns_hostname is None:
if dns_support == support:
results.append(r)
return results
[docs]@Vpc.filter_registry.register('dhcp-options')
class DhcpOptionsFilter(Filter):
"""Filter VPCs based on their dhcp options
:example:
.. code-block: yaml
policies:
- name: vpcs-in-domain
resource: vpc
filters:
- type: dhcp-options
domain-name: ec2.internal
if an option value is specified as a list, then all elements must be present.
if an option value is specified as a string, then that string must be present.
vpcs not matching a given option value can be found via specifying
a `present: false` parameter.
"""
option_keys = ('domain-name', 'domain-name-servers', 'ntp-servers')
schema = type_schema('dhcp-options', **{
k: {'oneOf': [
{'type': 'array', 'items': {'type': 'string'}},
{'type': 'string'}]}
for k in option_keys})
schema['properties']['present'] = {'type': 'boolean'}
permissions = ('ec2:DescribeDhcpOptions',)
[docs] def validate(self):
if not any([self.data.get(k) for k in self.option_keys]):
raise PolicyValidationError("one of %s required" % (self.option_keys,))
return self
[docs] def process(self, resources, event=None):
client = local_session(self.manager.session_factory).client('ec2')
option_ids = [r['DhcpOptionsId'] for r in resources]
options_map = {}
results = []
for options in client.describe_dhcp_options(
Filters=[{
'Name': 'dhcp-options-id',
'Values': option_ids}]).get('DhcpOptions', ()):
options_map[options['DhcpOptionsId']] = {
o['Key']: [v['Value'] for v in o['Values']]
for o in options['DhcpConfigurations']}
for vpc in resources:
if self.process_vpc(vpc, options_map[vpc['DhcpOptionsId']]):
results.append(vpc)
return results
[docs] def process_vpc(self, vpc, dhcp):
vpc['c7n:DhcpConfiguration'] = dhcp
found = True
for k in self.option_keys:
if k not in self.data:
continue
is_list = isinstance(self.data[k], list)
if k not in dhcp:
found = False
elif not is_list and self.data[k] not in dhcp[k]:
found = False
elif is_list and sorted(self.data[k]) != sorted(dhcp[k]):
found = False
if not self.data.get('present', True):
found = not found
return found
[docs]@resources.register('subnet')
class Subnet(query.QueryResourceManager):
[docs] class resource_type(object):
service = 'ec2'
type = 'subnet'
enum_spec = ('describe_subnets', 'Subnets', None)
name = id = 'SubnetId'
filter_name = 'SubnetIds'
filter_type = 'list'
date = None
dimension = None
config_type = 'AWS::EC2::Subnet'
id_prefix = "subnet-"
Subnet.filter_registry.register('flow-logs', FlowLogFilter)
[docs]@resources.register('security-group')
class SecurityGroup(query.QueryResourceManager):
[docs] class resource_type(object):
service = 'ec2'
type = 'security-group'
enum_spec = ('describe_security_groups', 'SecurityGroups', None)
detail_spec = None
name = id = 'GroupId'
filter_name = "GroupIds"
filter_type = 'list'
date = None
dimension = None
config_type = "AWS::EC2::SecurityGroup"
id_prefix = "sg-"
[docs] def get_source(self, source_type):
if source_type == 'config':
return ConfigSG(self)
return super(SecurityGroup, self).get_source(source_type)
[docs]class ConfigSG(query.ConfigSource):
[docs] def load_resource(self, item):
r = super(ConfigSG, self).load_resource(item)
for rset in ('IpPermissions', 'IpPermissionsEgress'):
for p in r.get(rset, ()):
if p.get('FromPort', '') is None:
p.pop('FromPort')
if p.get('ToPort', '') is None:
p.pop('ToPort')
if 'Ipv6Ranges' not in p:
p[u'Ipv6Ranges'] = []
for i in p.get('UserIdGroupPairs', ()):
for k, v in list(i.items()):
if v is None:
i.pop(k)
# legacy config form, still version 1.2
for attribute, element_key in (('IpRanges', u'CidrIp'),):
if attribute not in p:
continue
p[attribute] = [{element_key: v} for v in p[attribute]]
if 'Ipv4Ranges' in p:
p['IpRanges'] = p.pop('Ipv4Ranges')
return r
[docs]@SecurityGroup.filter_registry.register('locked')
class SecurityGroupLockedFilter(Locked):
[docs] def get_parent_id(self, resource, account_id):
return resource.get('VpcId', account_id)
[docs]@SecurityGroup.filter_registry.register('diff')
class SecurityGroupDiffFilter(Diff):
[docs] def diff(self, source, target):
differ = SecurityGroupDiff()
return differ.diff(source, target)
[docs]class SecurityGroupDiff(object):
"""Diff two versions of a security group
Immutable: GroupId, GroupName, Description, VpcId, OwnerId
Mutable: Tags, Rules
"""
[docs] def diff(self, source, target):
delta = {}
tag_delta = self.get_tag_delta(source, target)
if tag_delta:
delta['tags'] = tag_delta
ingress_delta = self.get_rule_delta('IpPermissions', source, target)
if ingress_delta:
delta['ingress'] = ingress_delta
egress_delta = self.get_rule_delta(
'IpPermissionsEgress', source, target)
if egress_delta:
delta['egress'] = egress_delta
if delta:
return delta
[docs] def get_tag_delta(self, source, target):
source_tags = {t['Key']: t['Value'] for t in source.get('Tags', ())}
target_tags = {t['Key']: t['Value'] for t in target.get('Tags', ())}
target_keys = set(target_tags.keys())
source_keys = set(source_tags.keys())
removed = source_keys.difference(target_keys)
added = target_keys.difference(source_keys)
changed = set()
for k in target_keys.intersection(source_keys):
if source_tags[k] != target_tags[k]:
changed.add(k)
return {k: v for k, v in {
'added': {k: target_tags[k] for k in added},
'removed': {k: source_tags[k] for k in removed},
'updated': {k: target_tags[k] for k in changed}}.items() if v}
[docs] def get_rule_delta(self, key, source, target):
source_rules = {
self.compute_rule_hash(r): r for r in source.get(key, ())}
target_rules = {
self.compute_rule_hash(r): r for r in target.get(key, ())}
source_keys = set(source_rules.keys())
target_keys = set(target_rules.keys())
removed = source_keys.difference(target_keys)
added = target_keys.difference(source_keys)
return {k: v for k, v in
{'removed': [source_rules[rid] for rid in sorted(removed)],
'added': [target_rules[rid] for rid in sorted(added)]}.items() if v}
RULE_ATTRS = (
('PrefixListIds', 'PrefixListId'),
('UserIdGroupPairs', 'GroupId'),
('IpRanges', 'CidrIp'),
('Ipv6Ranges', 'CidrIpv6')
)
[docs] def compute_rule_hash(self, rule):
buf = "%d-%d-%s-" % (
rule.get('FromPort', 0) or 0,
rule.get('ToPort', 0) or 0,
rule.get('IpProtocol', '-1') or '-1'
)
for a, ke in self.RULE_ATTRS:
if a not in rule:
continue
ev = [e[ke] for e in rule[a]]
ev.sort()
for e in ev:
buf += "%s-" % e
# mask to generate the same numeric value across all Python versions
return zlib.crc32(buf.encode('ascii')) & 0xffffffff
[docs]@SecurityGroup.action_registry.register('patch')
class SecurityGroupApplyPatch(BaseAction):
"""Modify a resource via application of a reverse delta.
"""
schema = type_schema('patch')
permissions = ('ec2:AuthorizeSecurityGroupIngress',
'ec2:AuthorizeSecurityGroupEgress',
'ec2:RevokeSecurityGroupIngress',
'ec2:RevokeSecurityGroupEgress',
'ec2:CreateTags',
'ec2:DeleteTags')
[docs] def validate(self):
diff_filters = [n for n in self.manager.iter_filters() if isinstance(
n, SecurityGroupDiffFilter)]
if not len(diff_filters):
raise PolicyValidationError(
"resource patching requires diff filter")
return self
[docs] def process(self, resources):
client = local_session(self.manager.session_factory).client('ec2')
differ = SecurityGroupDiff()
patcher = SecurityGroupPatch()
for r in resources:
# reverse the patch by computing fresh, the forward
# patch is for notifications
d = differ.diff(r, r['c7n:previous-revision']['resource'])
patcher.apply_delta(client, r, d)
[docs]class SecurityGroupPatch(object):
RULE_TYPE_MAP = {
'egress': ('IpPermissionsEgress',
'revoke_security_group_egress',
'authorize_security_group_egress'),
'ingress': ('IpPermissions',
'revoke_security_group_ingress',
'authorize_security_group_ingress')}
retry = staticmethod(get_retry((
'RequestLimitExceeded', 'Client.RequestLimitExceeded')))
[docs] def apply_delta(self, client, target, change_set):
if 'tags' in change_set:
self.process_tags(client, target, change_set['tags'])
if 'ingress' in change_set:
self.process_rules(
client, 'ingress', target, change_set['ingress'])
if 'egress' in change_set:
self.process_rules(
client, 'egress', target, change_set['egress'])
[docs] def process_rules(self, client, rule_type, group, delta):
key, revoke_op, auth_op = self.RULE_TYPE_MAP[rule_type]
revoke, authorize = getattr(
client, revoke_op), getattr(client, auth_op)
# Process removes
if 'removed' in delta:
self.retry(revoke, GroupId=group['GroupId'],
IpPermissions=[r for r in delta['removed']])
# Process adds
if 'added' in delta:
self.retry(authorize, GroupId=group['GroupId'],
IpPermissions=[r for r in delta['added']])
[docs]class SGUsage(Filter):
[docs] def get_permissions(self):
return list(itertools.chain(
[self.manager.get_resource_manager(m).get_permissions()
for m in
['lambda', 'eni', 'launch-config', 'security-group']]))
[docs] def filter_peered_refs(self, resources):
if not resources:
return resources
# Check that groups are not referenced across accounts
client = local_session(self.manager.session_factory).client('ec2')
peered_ids = set()
for resource_set in chunks(resources, 200):
for sg_ref in client.describe_security_group_references(
GroupId=[r['GroupId'] for r in resource_set]
)['SecurityGroupReferenceSet']:
peered_ids.add(sg_ref['GroupId'])
self.log.debug(
"%d of %d groups w/ peered refs", len(peered_ids), len(resources))
return [r for r in resources if r['GroupId'] not in peered_ids]
[docs] def scan_groups(self):
used = set()
for kind, scanner in (
("nics", self.get_eni_sgs),
("sg-perm-refs", self.get_sg_refs),
('lambdas', self.get_lambda_sgs),
("launch-configs", self.get_launch_config_sgs),
):
sg_ids = scanner()
new_refs = sg_ids.difference(used)
used = used.union(sg_ids)
self.log.debug(
"%s using %d sgs, new refs %s total %s",
kind, len(sg_ids), len(new_refs), len(used))
return used
[docs] def get_launch_config_sgs(self):
# Note assuming we also have launch config garbage collection
# enabled.
sg_ids = set()
for cfg in self.manager.get_resource_manager('launch-config').resources():
for g in cfg['SecurityGroups']:
sg_ids.add(g)
for g in cfg['ClassicLinkVPCSecurityGroups']:
sg_ids.add(g)
return sg_ids
[docs] def get_lambda_sgs(self):
sg_ids = set()
for func in self.manager.get_resource_manager('lambda').resources():
if 'VpcConfig' not in func:
continue
for g in func['VpcConfig']['SecurityGroupIds']:
sg_ids.add(g)
return sg_ids
[docs] def get_eni_sgs(self):
sg_ids = set()
for nic in self.manager.get_resource_manager('eni').resources():
for g in nic['Groups']:
sg_ids.add(g['GroupId'])
return sg_ids
[docs] def get_sg_refs(self):
sg_ids = set()
for sg in self.manager.get_resource_manager('security-group').resources():
for perm_type in ('IpPermissions', 'IpPermissionsEgress'):
for p in sg.get(perm_type, []):
for g in p.get('UserIdGroupPairs', ()):
sg_ids.add(g['GroupId'])
return sg_ids
[docs]@SecurityGroup.filter_registry.register('unused')
class UnusedSecurityGroup(SGUsage):
"""Filter to just vpc security groups that are not used.
We scan all extant enis in the vpc to get a baseline set of groups
in use. Then augment with those referenced by launch configs, and
lambdas as they may not have extant resources in the vpc at a
given moment. We also find any security group with references from
other security group either within the vpc or across peered
connections.
Note this filter does not support classic security groups atm.
:example:
.. code-block:: yaml
policies:
- name: security-groups-unused
resource: security-group
filters:
- unused
"""
schema = type_schema('unused')
[docs] def process(self, resources, event=None):
used = self.scan_groups()
unused = [
r for r in resources
if r['GroupId'] not in used and 'VpcId' in r]
return unused and self.filter_peered_refs(unused) or []
[docs]@SecurityGroup.filter_registry.register('used')
class UsedSecurityGroup(SGUsage):
"""Filter to security groups that are used.
This operates as a complement to the unused filter for multi-step
workflows.
:example:
.. code-block:: yaml
policies:
- name: security-groups-in-use
resource: security-group
filters:
- used
"""
schema = type_schema('used')
[docs] def process(self, resources, event=None):
used = self.scan_groups()
unused = [
r for r in resources
if r['GroupId'] not in used and 'VpcId' in r]
unused = set([g['GroupId'] for g in self.filter_peered_refs(unused)])
return [r for r in resources if r['GroupId'] not in unused]
[docs]@SecurityGroup.filter_registry.register('stale')
class Stale(Filter):
"""Filter to find security groups that contain stale references
to other groups that are either no longer present or traverse
a broken vpc peering connection. Note this applies to VPC
Security groups only and will implicitly filter security groups.
AWS Docs:
https://docs.aws.amazon.com/vpc/latest/peering/vpc-peering-security-groups.html
:example:
.. code-block:: yaml
policies:
- name: stale-security-groups
resource: security-group
filters:
- stale
"""
schema = type_schema('stale')
permissions = ('ec2:DescribeStaleSecurityGroups',)
[docs] def process(self, resources, event=None):
client = local_session(self.manager.session_factory).client('ec2')
vpc_ids = set([r['VpcId'] for r in resources if 'VpcId' in r])
group_map = {r['GroupId']: r for r in resources}
results = []
self.log.debug("Querying %d vpc for stale refs", len(vpc_ids))
stale_count = 0
for vpc_id in vpc_ids:
stale_groups = client.describe_stale_security_groups(
VpcId=vpc_id).get('StaleSecurityGroupSet', ())
stale_count += len(stale_groups)
for s in stale_groups:
if s['GroupId'] in group_map:
r = group_map[s['GroupId']]
if 'StaleIpPermissions' in s:
r['MatchedIpPermissions'] = s['StaleIpPermissions']
if 'StaleIpPermissionsEgress' in s:
r['MatchedIpPermissionsEgress'] = s[
'StaleIpPermissionsEgress']
results.append(r)
self.log.debug("Found %d stale security groups", stale_count)
return results
[docs]@SecurityGroup.filter_registry.register('default-vpc')
class SGDefaultVpc(DefaultVpcBase):
"""Filter that returns any security group that exists within the default vpc
:example:
.. code-block:: yaml
policies:
- name: security-group-default-vpc
resource: security-group
filters:
- default-vpc
"""
schema = type_schema('default-vpc')
def __call__(self, resource, event=None):
if 'VpcId' not in resource:
return False
return self.match(resource['VpcId'])
[docs]class SGPermission(Filter):
"""Filter for verifying security group ingress and egress permissions
All attributes of a security group permission are available as
value filters.
If multiple attributes are specified the permission must satisfy
all of them. Note that within an attribute match against a list value
of a permission we default to or.
If a group has any permissions that match all conditions, then it
matches the filter.
Permissions that match on the group are annotated onto the group and
can subsequently be used by the remove-permission action.
We have specialized handling for matching `Ports` in ingress/egress
permission From/To range. The following example matches on ingress
rules which allow for a range that includes all of the given ports.
.. code-block:: yaml
- type: ingress
Ports: [22, 443, 80]
As well for verifying that a rule only allows for a specific set of ports
as in the following example. The delta between this and the previous
example is that if the permission allows for any ports not specified here,
then the rule will match. ie. OnlyPorts is a negative assertion match,
it matches when a permission includes ports outside of the specified set.
.. code-block:: yaml
- type: ingress
OnlyPorts: [22]
For simplifying ipranges handling which is specified as a list on a rule
we provide a `Cidr` key which can be used as a value type filter evaluated
against each of the rules. If any iprange cidr match then the permission
matches.
.. code-block:: yaml
- type: ingress
IpProtocol: -1
FromPort: 445
We also have specialized handling for matching self-references in
ingress/egress permissions. The following example matches on ingress
rules which allow traffic its own same security group.
.. code-block:: yaml
- type: ingress
SelfReference: True
As well for assertions that a ingress/egress permission only matches
a given set of ports, *note* OnlyPorts is an inverse match.
.. code-block:: yaml
- type: egress
OnlyPorts: [22, 443, 80]
- type: egress
Cidr:
value_type: cidr
op: in
value: x.y.z
`Cidr` can match ipv4 rules and `CidrV6` can match ipv6 rules. In
this example we are blocking global inbound connections to SSH or
RDP.
.. code-block:: yaml
- type: ingress
Ports: [22, 3389]
Cidr:
value:
- "0.0.0.0/0"
- "::/0"
op: in
"""
perm_attrs = set((
'IpProtocol', 'FromPort', 'ToPort', 'UserIdGroupPairs',
'IpRanges', 'PrefixListIds'))
filter_attrs = set(('Cidr', 'CidrV6', 'Ports', 'OnlyPorts', 'SelfReference', 'Description'))
attrs = perm_attrs.union(filter_attrs)
attrs.add('match-operator')
[docs] def validate(self):
delta = set(self.data.keys()).difference(self.attrs)
delta.remove('type')
if delta:
raise PolicyValidationError("Unknown keys %s on %s" % (
", ".join(delta), self.manager.data))
return self
[docs] def process(self, resources, event=None):
self.vfilters = []
fattrs = list(sorted(self.perm_attrs.intersection(self.data.keys())))
self.ports = 'Ports' in self.data and self.data['Ports'] or ()
self.only_ports = (
'OnlyPorts' in self.data and self.data['OnlyPorts'] or ())
for f in fattrs:
fv = self.data.get(f)
if isinstance(fv, dict):
fv['key'] = f
else:
fv = {f: fv}
vf = ValueFilter(fv)
vf.annotate = False
self.vfilters.append(vf)
return super(SGPermission, self).process(resources, event)
[docs] def process_ports(self, perm):
found = None
if 'FromPort' in perm and 'ToPort' in perm:
for port in self.ports:
if port >= perm['FromPort'] and port <= perm['ToPort']:
found = True
break
found = False
only_found = False
for port in self.only_ports:
if port == perm['FromPort'] and port == perm['ToPort']:
only_found = True
if self.only_ports and not only_found:
found = found is None or found and True or False
if self.only_ports and only_found:
found = False
return found
def _process_cidr(self, cidr_key, cidr_type, range_type, perm):
found = None
ip_perms = perm.get(range_type, [])
if not ip_perms:
return False
match_range = self.data[cidr_key]
match_range['key'] = cidr_type
vf = ValueFilter(match_range)
vf.annotate = False
for ip_range in ip_perms:
found = vf(ip_range)
if found:
break
else:
found = False
return found
[docs] def process_cidrs(self, perm):
found_v6 = found_v4 = None
if 'CidrV6' in self.data:
found_v6 = self._process_cidr('CidrV6', 'CidrIpv6', 'Ipv6Ranges', perm)
if 'Cidr' in self.data:
found_v4 = self._process_cidr('Cidr', 'CidrIp', 'IpRanges', perm)
match_op = self.data.get('match-operator', 'and') == 'and' and all or any
cidr_match = [k for k in (found_v6, found_v4) if k is not None]
if not cidr_match:
return None
return match_op(cidr_match)
[docs] def process_description(self, perm):
if 'Description' not in self.data:
return None
d = dict(self.data['Description'])
d['key'] = 'Description'
vf = ValueFilter(d)
vf.annotate = False
for k in ('Ipv6Ranges', 'IpRanges', 'UserIdGroupPairs', 'PrefixListIds'):
if k not in perm or not perm[k]:
continue
return vf(perm[k][0])
return False
[docs] def process_self_reference(self, perm, sg_id):
found = None
ref_match = self.data.get('SelfReference')
if ref_match is not None:
found = False
if 'UserIdGroupPairs' in perm and 'SelfReference' in self.data:
self_reference = sg_id in [p['GroupId']
for p in perm['UserIdGroupPairs']]
if ref_match is False and not self_reference:
found = True
if ref_match is True and self_reference:
found = True
return found
[docs] def expand_permissions(self, permissions):
"""Expand each list of cidr, prefix list, user id group pair
by port/protocol as an individual rule.
The console ux automatically expands them out as addition/removal is
per this expansion, the describe calls automatically group them.
"""
for p in permissions:
np = dict(p)
values = {}
for k in (u'IpRanges',
u'Ipv6Ranges',
u'PrefixListIds',
u'UserIdGroupPairs'):
values[k] = np.pop(k, ())
np[k] = []
for k, v in values.items():
if not v:
continue
for e in v:
ep = dict(np)
ep[k] = [e]
yield ep
def __call__(self, resource):
matched = []
sg_id = resource['GroupId']
match_op = self.data.get('match-operator', 'and') == 'and' and all or any
for perm in self.expand_permissions(resource[self.ip_permissions_key]):
perm_matches = {}
for idx, f in enumerate(self.vfilters):
perm_matches[idx] = bool(f(perm))
perm_matches['description'] = self.process_description(perm)
perm_matches['ports'] = self.process_ports(perm)
perm_matches['cidrs'] = self.process_cidrs(perm)
perm_matches['self-refs'] = self.process_self_reference(perm, sg_id)
perm_match_values = list(filter(
lambda x: x is not None, perm_matches.values()))
# account for one python behavior any([]) == False, all([]) == True
if match_op == all and not perm_match_values:
continue
match = match_op(perm_match_values)
if match:
matched.append(perm)
if matched:
resource['Matched%s' % self.ip_permissions_key] = matched
return True
[docs]@SecurityGroup.filter_registry.register('ingress')
class IPPermission(SGPermission):
ip_permissions_key = "IpPermissions"
schema = {
'type': 'object',
# 'additionalProperties': True,
'properties': {
'type': {'enum': ['ingress']},
'match-operator': {'type': 'string', 'enum': ['or', 'and']},
'Ports': {'type': 'array', 'items': {'type': 'integer'}},
'SelfReference': {'type': 'boolean'}
},
'required': ['type']}
[docs]@SecurityGroup.filter_registry.register('egress')
class IPPermissionEgress(SGPermission):
ip_permissions_key = "IpPermissionsEgress"
schema = {
'type': 'object',
# 'additionalProperties': True,
'properties': {
'type': {'enum': ['egress']},
'match-operator': {'type': 'string', 'enum': ['or', 'and']},
'SelfReference': {'type': 'boolean'}
},
'required': ['type']}
[docs]@SecurityGroup.action_registry.register('delete')
class Delete(BaseAction):
"""Action to delete security group(s)
It is recommended to apply a filter to the delete policy to avoid the
deletion of all security groups returned.
:example:
.. code-block:: yaml
policies:
- name: security-groups-unused-delete
resource: security-group
filters:
- type: unused
actions:
- delete
"""
schema = type_schema('delete')
permissions = ('ec2:DeleteSecurityGroup',)
[docs] def process(self, resources):
client = local_session(self.manager.session_factory).client('ec2')
for r in resources:
client.delete_security_group(GroupId=r['GroupId'])
[docs]@SecurityGroup.action_registry.register('remove-permissions')
class RemovePermissions(BaseAction):
"""Action to remove ingress/egress rule(s) from a security group
:example:
.. code-block:: yaml
policies:
- name: security-group-revoke-8080
resource: security-group
filters:
- type: ingress
IpProtocol: tcp
Ports: [8080]
actions:
- type: remove-permissions
ingress: matched
"""
schema = type_schema(
'remove-permissions',
ingress={'type': 'string', 'enum': ['matched', 'all']},
egress={'type': 'string', 'enum': ['matched', 'all']})
permissions = ('ec2:RevokeSecurityGroupIngress',
'ec2:RevokeSecurityGroupEgress')
[docs] def process(self, resources):
i_perms = self.data.get('ingress', 'matched')
e_perms = self.data.get('egress', 'matched')
client = local_session(self.manager.session_factory).client('ec2')
for r in resources:
for label, perms in [('ingress', i_perms), ('egress', e_perms)]:
if perms == 'matched':
key = 'MatchedIpPermissions%s' % (
label == 'egress' and 'Egress' or '')
groups = r.get(key, ())
elif perms == 'all':
key = 'IpPermissions%s' % (
label == 'egress' and 'Egress' or '')
groups = r.get(key, ())
elif isinstance(perms, list):
groups = perms
else:
continue
if not groups:
continue
method = getattr(client, 'revoke_security_group_%s' % label)
method(GroupId=r['GroupId'], IpPermissions=groups)
[docs]@resources.register('eni')
class NetworkInterface(query.QueryResourceManager):
[docs] class resource_type(object):
service = 'ec2'
type = 'eni'
enum_spec = ('describe_network_interfaces', 'NetworkInterfaces', None)
name = id = 'NetworkInterfaceId'
filter_name = 'NetworkInterfaceIds'
filter_type = 'list'
dimension = None
date = None
config_type = "AWS::EC2::NetworkInterface"
id_prefix = "eni-"
[docs] def get_source(self, source_type):
if source_type == 'describe':
return DescribeENI(self)
elif source_type == 'config':
return query.ConfigSource(self)
raise ValueError("invalid source %s" % source_type)
[docs]class DescribeENI(query.DescribeSource):
[docs] def augment(self, resources):
for r in resources:
r['Tags'] = r.pop('TagSet', [])
return resources
NetworkInterface.filter_registry.register('flow-logs', FlowLogFilter)
NetworkInterface.filter_registry.register(
'network-location', net_filters.NetworkLocation)
[docs]@NetworkInterface.filter_registry.register('subnet')
class InterfaceSubnetFilter(net_filters.SubnetFilter):
"""Network interface subnet filter
:example:
.. code-block:: yaml
policies:
- name: network-interface-in-subnet
resource: eni
filters:
- type: subnet
key: CidrBlock
value: 10.0.2.0/24
"""
RelatedIdsExpression = "SubnetId"
[docs]@NetworkInterface.filter_registry.register('security-group')
class InterfaceSecurityGroupFilter(net_filters.SecurityGroupFilter):
"""Network interface security group filter
:example:
.. code-block:: yaml
policies:
- name: network-interface-ssh
resource: eni
filters:
- type: security-group
match-resource: true
key: FromPort
value: 22
"""
RelatedIdsExpression = "Groups[].GroupId"
[docs]@NetworkInterface.filter_registry.register('vpc')
class InterfaceVpcFilter(net_filters.VpcFilter):
RelatedIdsExpress = "VpcId"
[docs]@NetworkInterface.action_registry.register('modify-security-groups')
class InterfaceModifyVpcSecurityGroups(ModifyVpcSecurityGroupsAction):
"""Remove security groups from an interface.
Can target either physical groups as a list of group ids or
symbolic groups like 'matched' or 'all'. 'matched' uses
the annotations of the 'group' interface filter.
Note an interface always gets at least one security group, so
we also allow specification of an isolation/quarantine group
that can be specified if there would otherwise be no groups.
:example:
.. code-block:: yaml
policies:
- name: network-interface-remove-group
resource: eni
filters:
- type: security-group
match-resource: true
key: FromPort
value: 22
actions:
- type: modify-security-groups
isolation-group: sg-01ab23c4
add: []
"""
permissions = ('ec2:ModifyNetworkInterfaceAttribute',)
[docs] def process(self, resources):
client = local_session(self.manager.session_factory).client('ec2')
groups = super(
InterfaceModifyVpcSecurityGroups, self).get_groups(resources)
for idx, r in enumerate(resources):
client.modify_network_interface_attribute(
NetworkInterfaceId=r['NetworkInterfaceId'],
Groups=groups[idx])
[docs]@resources.register('route-table')
class RouteTable(query.QueryResourceManager):
[docs] class resource_type(object):
service = 'ec2'
type = 'route-table'
enum_spec = ('describe_route_tables', 'RouteTables', None)
name = id = 'RouteTableId'
filter_name = 'RouteTableIds'
filter_type = 'list'
date = None
dimension = None
id_prefix = "rtb-"
[docs]@RouteTable.filter_registry.register('subnet')
class SubnetRoute(net_filters.SubnetFilter):
"""Filter a route table by its associated subnet attributes."""
RelatedIdsExpression = "Associations[].SubnetId"
RelatedMapping = None
[docs]@RouteTable.filter_registry.register('route')
class Route(ValueFilter):
"""Filter a route table by its routes' attributes."""
schema = type_schema('route', rinherit=ValueFilter.schema)
[docs] def process(self, resources, event=None):
results = []
for r in resources:
matched = []
for route in r['Routes']:
if self.match(route):
matched.append(route)
if matched:
r.setdefault('c7n:matched-routes', []).extend(matched)
results.append(r)
return results
[docs]@resources.register('transit-gateway')
class TransitGateway(query.QueryResourceManager):
[docs] class resource_type(object):
service = 'ec2'
enum_spec = ('describe_transit_gateways', 'TransitGateways', None)
dimension = None
name = id = 'TransitGatewayId'
arn = "TransitGatewayArn"
filter_name = 'TransitGatewayIds'
filter_type = 'list'
[docs]class TransitGatewayAttachmentQuery(query.ChildResourceQuery):
[docs] def get_parent_parameters(self, params, parent_id, parent_key):
merged_params = dict(params)
merged_params.setdefault('Filters', []).append(
{'Name': parent_key, 'Values': [parent_id]})
return merged_params
[docs]@query.sources.register('transit-attachment')
class TransitAttachmentSource(query.ChildDescribeSource):
resource_query_factory = TransitGatewayAttachmentQuery
[docs]@resources.register('transit-attachment')
class TransitGatewayAttachment(query.ChildResourceManager):
child_source = 'transit-attachment'
[docs] class resource_type(object):
service = 'ec2'
enum_spec = ('describe_transit_gateway_attachments', 'TransitGatewayAttachments', None)
parent_spec = ('transit-gateway', 'transit-gateway-id', None)
dimension = None
name = id = 'TransitGatewayAttachmentId'
filter_name = None
filter_type = None
arn = False
[docs]@resources.register('peering-connection')
class PeeringConnection(query.QueryResourceManager):
[docs] class resource_type(object):
service = 'ec2'
type = 'vpc-peering-connection'
enum_spec = ('describe_vpc_peering_connections',
'VpcPeeringConnections', None)
name = id = 'VpcPeeringConnectionId'
filter_name = 'VpcPeeringConnectionIds'
filter_type = 'list'
date = None
dimension = None
id_prefix = "pcx-"
type = "vpc-peering-connection"
[docs]@PeeringConnection.filter_registry.register('cross-account')
class CrossAccountPeer(CrossAccountAccessFilter):
schema = type_schema(
'cross-account',
# white list accounts
whitelist_from=resolver.ValuesFrom.schema,
whitelist={'type': 'array', 'items': {'type': 'string'}})
permissions = ('ec2:DescribeVpcPeeringConnections',)
[docs] def process(self, resources, event=None):
results = []
accounts = self.get_accounts()
owners = map(jmespath.compile, (
'AccepterVpcInfo.OwnerId', 'RequesterVpcInfo.OwnerId'))
for r in resources:
for o_expr in owners:
account_id = o_expr.search(r)
if account_id and account_id not in accounts:
r.setdefault(
'c7n:CrossAccountViolations', []).append(account_id)
results.append(r)
return results
[docs]@PeeringConnection.filter_registry.register('missing-route')
class MissingRoute(Filter):
"""Return peers which are missing a route in route tables.
If the peering connection is between two vpcs in the same account,
the connection is returned unless it is in present route tables in
each vpc.
If the peering connection is between accounts, then the local vpc's
route table is checked.
"""
schema = type_schema('missing-route')
permissions = ('ec2:DescribeRouteTables',)
[docs] def process(self, resources, event=None):
tables = self.manager.get_resource_manager(
'route-table').resources()
routed_vpcs = {}
mid = 'VpcPeeringConnectionId'
for t in tables:
for r in t.get('Routes', ()):
if mid in r:
routed_vpcs.setdefault(r[mid], []).append(t['VpcId'])
results = []
for r in resources:
if r[mid] not in routed_vpcs:
results.append(r)
continue
for k in ('AccepterVpcInfo', 'RequesterVpcInfo'):
if r[k]['OwnerId'] != self.manager.config.account_id:
continue
if r[k].get('Region') and r['k']['Region'] != self.manager.config.region:
continue
if r[k]['VpcId'] not in routed_vpcs[r['VpcPeeringConnectionId']]:
results.append(r)
break
return results
[docs]@resources.register('network-acl')
class NetworkAcl(query.QueryResourceManager):
[docs] class resource_type(object):
service = 'ec2'
type = 'network-acl'
enum_spec = ('describe_network_acls', 'NetworkAcls', None)
name = id = 'NetworkAclId'
filter_name = 'NetworkAclIds'
filter_type = 'list'
date = None
dimension = None
config_type = "AWS::EC2::NetworkAcl"
id_prefix = "acl-"
[docs]@NetworkAcl.filter_registry.register('subnet')
class AclSubnetFilter(net_filters.SubnetFilter):
"""Filter network acls by the attributes of their attached subnets.
:example:
.. code-block:: yaml
policies:
- name: subnet-acl
resource: network-acl
filters:
- type: subnet
key: "tag:Location"
value: Public
"""
RelatedIdsExpression = "Associations[].SubnetId"
[docs]@NetworkAcl.filter_registry.register('s3-cidr')
class AclAwsS3Cidrs(Filter):
"""Filter network acls by those that allow access to s3 cidrs.
Defaults to filtering those nacls that do not allow s3 communication.
:example:
Find all nacls that do not allow communication with s3.
.. code-block:: yaml
policies:
- name: s3-not-allowed-nacl
resource: network-acl
filters:
- s3-cidr
"""
# TODO allow for port specification as range
schema = type_schema(
's3-cidr',
egress={'type': 'boolean', 'default': True},
ingress={'type': 'boolean', 'default': True},
present={'type': 'boolean', 'default': False})
permissions = ('ec2:DescribePrefixLists',)
[docs] def process(self, resources, event=None):
ec2 = local_session(self.manager.session_factory).client('ec2')
cidrs = jmespath.search(
"PrefixLists[].Cidrs[]", ec2.describe_prefix_lists())
cidrs = [parse_cidr(cidr) for cidr in cidrs]
results = []
check_egress = self.data.get('egress', True)
check_ingress = self.data.get('ingress', True)
present = self.data.get('present', False)
for r in resources:
matched = {cidr: None for cidr in cidrs}
for entry in r['Entries']:
if entry['Egress'] and not check_egress:
continue
if not entry['Egress'] and not check_ingress:
continue
entry_cidr = parse_cidr(entry['CidrBlock'])
for c in matched:
if c in entry_cidr and matched[c] is None:
matched[c] = (
entry['RuleAction'] == 'allow' and True or False)
if present and all(matched.values()):
results.append(r)
elif not present and not all(matched.values()):
results.append(r)
return results
[docs]@resources.register('network-addr')
class NetworkAddress(query.QueryResourceManager):
[docs] class resource_type(object):
service = 'ec2'
type = 'eip-allocation'
enum_spec = ('describe_addresses', 'Addresses', None)
name = 'PublicIp'
id = 'AllocationId'
filter_name = 'PublicIps'
filter_type = 'list'
date = None
dimension = None
config_type = "AWS::EC2::EIP"
NetworkAddress.filter_registry.register('shield-enabled', IsShieldProtected)
NetworkAddress.action_registry.register('set-shield', SetShieldProtection)
[docs]@NetworkAddress.action_registry.register('release')
class AddressRelease(BaseAction):
"""Action to release elastic IP address(es)
Use the force option to cause any attached elastic IPs to
also be released. Otherwise, only unattached elastic IPs
will be released.
:example:
.. code-block:: yaml
policies:
- name: release-network-addr
resource: network-addr
filters:
- AllocationId: ...
actions:
- type: release
force: True
"""
schema = type_schema('release', force={'type': 'boolean'})
permissions = ('ec2:ReleaseAddress', 'ec2:DisassociateAddress',)
[docs] def process_attached(self, client, associated_addrs):
for aa in list(associated_addrs):
try:
client.disassociate_address(AssociationId=aa['AssociationId'])
except ClientError as e:
# If its already been diassociated ignore, else raise.
if not(e.response['Error']['Code'] == 'InvalidAssocationID.NotFound' and
aa['AssocationId'] in e.response['Error']['Message']):
raise e
associated_addrs.remove(aa)
return associated_addrs
[docs] def process(self, network_addrs):
client = local_session(self.manager.session_factory).client('ec2')
force = self.data.get('force')
assoc_addrs = [addr for addr in network_addrs if 'AssociationId' in addr]
unassoc_addrs = [addr for addr in network_addrs if 'AssociationId' not in addr]
if len(assoc_addrs) and not force:
self.log.warning(
"Filtered %d attached eips of %d eips. Use 'force: true' to release them.",
len(assoc_addrs), len(network_addrs))
elif len(assoc_addrs) and force:
unassoc_addrs = itertools.chain(
unassoc_addrs, self.process_attached(client, assoc_addrs))
for r in unassoc_addrs:
try:
client.release_address(AllocationId=r['AllocationId'])
except ClientError as e:
# If its already been released, ignore, else raise.
if e.response['Error']['Code'] == 'InvalidAllocationID.NotFound':
raise
[docs]@resources.register('customer-gateway')
class CustomerGateway(query.QueryResourceManager):
[docs] class resource_type(object):
service = 'ec2'
type = 'customer-gateway'
enum_spec = ('describe_customer_gateways', 'CustomerGateways', None)
detail_spec = None
id = 'CustomerGatewayId'
filter_name = 'CustomerGatewayIds'
filter_type = 'list'
name = 'CustomerGatewayId'
date = None
dimension = None
id_prefix = "cgw-"
[docs]@resources.register('internet-gateway')
class InternetGateway(query.QueryResourceManager):
[docs] class resource_type(object):
service = 'ec2'
type = 'internet-gateway'
enum_spec = ('describe_internet_gateways', 'InternetGateways', None)
name = id = 'InternetGatewayId'
filter_name = 'InternetGatewayIds'
filter_type = 'list'
dimension = None
date = None
config_type = "AWS::EC2::InternetGateway"
id_prefix = "igw-"
[docs]@resources.register('nat-gateway')
class NATGateway(query.QueryResourceManager):
[docs] class resource_type(object):
service = 'ec2'
type = 'nat-gateway'
enum_spec = ('describe_nat_gateways', 'NatGateways', None)
name = id = 'NatGatewayId'
filter_name = 'NatGatewayIds'
filter_type = 'list'
dimension = None
date = 'CreateTime'
id_prefix = "nat-"
[docs]@NATGateway.action_registry.register('delete')
class DeleteNATGateway(BaseAction):
schema = type_schema('delete')
permissions = ('ec2:DeleteNatGateway',)
[docs] def process(self, resources):
client = local_session(self.manager.session_factory).client('ec2')
for r in resources:
client.delete_nat_gateway(NatGatewayId=r['NatGatewayId'])
[docs]@resources.register('vpn-connection')
class VPNConnection(query.QueryResourceManager):
[docs] class resource_type(object):
service = 'ec2'
type = 'vpc-connection'
enum_spec = ('describe_vpn_connections', 'VpnConnections', None)
name = id = 'VpnConnectionId'
filter_name = 'VpnConnectionIds'
filter_type = 'list'
dimension = None
date = None
config_type = 'AWS::EC2::VPNConnection'
id_prefix = "vpn-"
[docs]@resources.register('vpn-gateway')
class VPNGateway(query.QueryResourceManager):
[docs] class resource_type(object):
service = 'ec2'
type = 'vpc-gateway'
enum_spec = ('describe_vpn_gateways', 'VpnGateways', None)
name = id = 'VpnGatewayId'
filter_name = 'VpnGatewayIds'
filter_type = 'list'
dimension = None
date = None
config_type = 'AWS::EC2::VPNGateway'
id_prefix = "vgw-"
[docs]@resources.register('vpc-endpoint')
class VpcEndpoint(query.QueryResourceManager):
[docs] class resource_type(object):
service = 'ec2'
type = 'vpc-endpoint'
enum_spec = ('describe_vpc_endpoints', 'VpcEndpoints', None)
name = id = 'VpcEndpointId'
date = 'CreationTimestamp'
filter_name = 'VpcEndpointIds'
filter_type = 'list'
dimension = None
id_prefix = "vpce-"
taggable = False
[docs]@VpcEndpoint.filter_registry.register('cross-account')
class EndpointCrossAccountFilter(CrossAccountAccessFilter):
policy_attribute = 'PolicyDocument'
annotation_key = 'c7n:CrossAccountViolations'
permissions = ('ec2:DescribeVpcEndpoints',)
[docs]@VpcEndpoint.filter_registry.register('security-group')
class EndpointSecurityGroupFilter(net_filters.SecurityGroupFilter):
RelatedIdsExpression = "Groups[].GroupId"
[docs]@VpcEndpoint.filter_registry.register('subnet')
class EndpointSubnetFilter(net_filters.SubnetFilter):
RelatedIdsExpression = "SubnetIds[]"
[docs]@VpcEndpoint.filter_registry.register('vpc')
class EndpointVpcFilter(net_filters.VpcFilter):
RelatedIdsExpression = "VpcId"
[docs]@resources.register('key-pair')
class KeyPair(query.QueryResourceManager):
[docs] class resource_type(object):
service = 'ec2'
type = 'key-pair'
enum_spec = ('describe_key_pairs', 'KeyPairs', None)
detail_spec = None
id = 'KeyName'
filter_name = 'KeyNames'
name = 'KeyName'
date = None
dimension = None
taggable = False
[docs]@Vpc.action_registry.register('set-flow-log')
@Subnet.action_registry.register('set-flow-log')
@NetworkInterface.action_registry.register('set-flow-log')
class CreateFlowLogs(BaseAction):
"""Create flow logs for a network resource
:example:
.. code-block: yaml
policies:
- name: vpc-enable-flow-logs
resource: vpc
filters:
- type: flow-logs
enabled: false
actions:
- type: set-flow-log
DeliverLogsPermissionArn: arn:iam:role
LogGroupName: /custodian/vpc/flowlogs/
"""
permissions = ('ec2:CreateFlowLogs',)
schema = {
'type': 'object',
'additionalProperties': False,
'properties': {
'type': {'enum': ['set-flow-log']},
'state': {'type': 'boolean'},
'DeliverLogsPermissionArn': {'type': 'string'},
'LogGroupName': {'type': 'string'},
'LogDestination': {'type': 'string'},
'LogDestinationType': {'enum': ['s3', 'cloud-watch-logs']},
'TrafficType': {
'type': 'string',
'enum': ['ACCEPT', 'REJECT', 'ALL']
}
}
}
RESOURCE_ALIAS = {
'vpc': 'VPC',
'subnet': 'Subnet',
'eni': 'NetworkInterface'
}
[docs] def validate(self):
self.state = self.data.get('state', True)
if self.state:
if not self.data.get('DeliverLogsPermissionArn'):
raise PolicyValidationError(
'DeliverLogsPermissionArn required when '
'creating flow-logs on %s' % (self.manager.data,))
if (not self.data.get('LogGroupName') and not self.data.get('LogDestination')):
raise PolicyValidationError(
'Either LogGroupName or LogDestination required')
if (self.data.get('LogDestinationType') == 's3' and
not self.data.get('LogDestination')):
raise PolicyValidationError(
'LogDestination required when LogDestinationType is s3')
return self
[docs] def delete_flow_logs(self, client, rids):
flow_logs = client.describe_flow_logs(
Filters=[{'Name': 'resource-id', 'Values': rids}])['FlowLogs']
try:
results = client.delete_flow_logs(
FlowLogIds=[f['FlowLogId'] for f in flow_logs])
for r in results['Unsuccessful']:
self.log.exception(
'Exception: delete flow-log for %s: %s on %s',
r['ResourceId'], r['Error']['Message'])
except ClientError as e:
if e.response['Error']['Code'] == 'InvalidParameterValue':
self.log.exception(
'delete flow-log: %s', e.response['Error']['Message'])
else:
raise
[docs] def process(self, resources):
client = local_session(self.manager.session_factory).client('ec2')
params = dict(self.data)
params.pop('type')
if self.data.get('state'):
params.pop('state')
model = self.manager.get_model()
params['ResourceIds'] = [r[model.id] for r in resources]
if not self.state:
self.delete_flow_logs(client, params['ResourceIds'])
return
params['ResourceType'] = self.RESOURCE_ALIAS[model.type]
params['TrafficType'] = self.data.get('TrafficType', 'ALL').upper()
try:
results = client.create_flow_logs(**params)
for r in results['Unsuccessful']:
self.log.exception(
'Exception: create flow-log for %s: %s',
r['ResourceId'], r['Error']['Message'])
except ClientError as e:
if e.response['Error']['Code'] == 'FlowLogAlreadyExists':
self.log.exception(
'Exception: create flow-log: %s',
e.response['Error']['Message'])
else:
raise