Source code for c7n.actions.network

# Copyright 2017-2018 Capital One Services, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import itertools
import jmespath
import six

from c7n.exceptions import PolicyExecutionError, PolicyValidationError
from c7n import utils

from .core import Action


[docs]class ModifyVpcSecurityGroupsAction(Action): """Common action for modifying security groups on a vpc attached resources. Security groups for add or remove can be specified via group id or name. Group removal also supports symbolic names such as 'matched', 'network-location' or 'all'. 'matched' uses the annotations/output of the 'security-group' filter filter. 'network-location' uses the annotations of the 'network-location' interface filter for `SecurityGroupMismatch`. Note a vpc attached resource requires at least one security group, this action will use the sg specified in `isolation-group` to ensure resources always have at least one security-group. type: modify-security-groups add: [] remove: [] | matched | network-location isolation-group: sg-xyz """ schema_alias = True schema = { 'type': 'object', 'additionalProperties': False, 'properties': { 'type': {'enum': ['modify-security-groups']}, 'add': {'oneOf': [ {'type': 'string'}, {'type': 'array', 'items': { 'type': 'string'}}]}, 'remove': {'oneOf': [ {'type': 'array', 'items': { 'type': 'string'}}, {'enum': [ 'matched', 'network-location', 'all', {'type': 'string'}]}]}, 'isolation-group': {'oneOf': [ {'type': 'string'}, {'type': 'array', 'items': { 'type': 'string'}}]}}, 'anyOf': [ {'required': ['isolation-group', 'remove', 'type']}, {'required': ['add', 'remove', 'type']}, {'required': ['add', 'type']}] } SYMBOLIC_SGS = set(('all', 'matched', 'network-location')) sg_expr = None vpc_expr = None
[docs] def validate(self): sg_filter = self.manager.filter_registry.get('security-group') if not sg_filter or not sg_filter.RelatedIdsExpression: raise PolicyValidationError(self._format_error(( "policy:{policy} resource:{resource_type} does " "not support {action_type} action"))) if self.get_action_group_names(): vpc_filter = self.manager.filter_registry.get('vpc') if not vpc_filter or not vpc_filter.RelatedIdsExpression: raise PolicyValidationError(self._format_error(( "policy:{policy} resource:{resource_type} does not support " "security-group names only ids in action:{action_type}"))) self.vpc_expr = jmespath.compile(vpc_filter.RelatedIdsExpression) if self.sg_expr is None: self.sg_expr = jmespath.compile( self.manager.filter_registry.get('security-group').RelatedIdsExpression) if 'all' in self._get_array('remove') and not self._get_array('isolation-group'): raise PolicyValidationError(self._format_error(( "policy:{policy} use of action:{action_type} with " "remove: all requires specifying isolation-group"))) return self
[docs] def get_group_names(self, groups): names = [] for g in groups: if g.startswith('sg-'): continue elif g in self.SYMBOLIC_SGS: continue names.append(g) return names
[docs] def get_action_group_names(self): """Return all the security group names configured in this action.""" return self.get_group_names( list(itertools.chain( *[self._get_array('add'), self._get_array('remove'), self._get_array('isolation-group')])))
def _format_error(self, msg, **kw): return msg.format( policy=self.manager.ctx.policy.name, resource_type=self.manager.type, action_type=self.type, **kw) def _get_array(self, k): v = self.data.get(k, []) if isinstance(v, six.string_types): return [v] return v
[docs] def get_groups_by_names(self, names): """Resolve security names to security groups resources.""" if not names: return [] client = utils.local_session( self.manager.session_factory).client('ec2') sgs = self.manager.retry( client.describe_security_groups, Filters=[{ 'Name': 'group-name', 'Values': names}]).get( 'SecurityGroups', []) unresolved = set(names) for s in sgs: if s['GroupName'] in unresolved: unresolved.remove(s['GroupName']) if unresolved: raise PolicyExecutionError(self._format_error( "policy:{policy} security groups not found " "requested: {names}, found: {groups}", names=list(unresolved), groups=[g['GroupId'] for g in sgs])) return sgs
[docs] def resolve_group_names(self, r, target_group_ids, groups): """Resolve any security group names to the corresponding group ids With the context of a given network attached resource. """ names = self.get_group_names(target_group_ids) if not names: return target_group_ids target_group_ids = list(target_group_ids) vpc_id = self.vpc_expr.search(r) if not vpc_id: raise PolicyExecutionError(self._format_error( "policy:{policy} non vpc attached resource used " "with modify-security-group: {resource_id}", resource_id=r[self.manager.resource_type.id])) found = False for n in names: for g in groups: if g['GroupName'] == n and g['VpcId'] == vpc_id: found = g['GroupId'] if not found: raise PolicyExecutionError(self._format_error(( "policy:{policy} could not resolve sg:{name} for " "resource:{resource_id} in vpc:{vpc}"), name=n, resource_id=r[self.manager.resource_type.id], vpc=vpc_id)) target_group_ids.remove(n) target_group_ids.append(found) return target_group_ids
[docs] def resolve_remove_symbols(self, r, target_group_ids, rgroups): """Resolve the resources security groups that need be modified. Specifically handles symbolic names that match annotations from policy filters for groups being removed. """ if 'matched' in target_group_ids: return r.get('c7n:matched-security-groups', ()) elif 'network-location' in target_group_ids: for reason in r.get('c7n:NetworkLocation', ()): if reason['reason'] == 'SecurityGroupMismatch': return list(reason['security-groups']) elif 'all' in target_group_ids: return rgroups return target_group_ids
[docs] def get_groups(self, resources): """Return lists of security groups to set on each resource For each input resource, parse the various add/remove/isolation- group policies for 'modify-security-groups' to find the resulting set of VPC security groups to attach to that resource. Returns a list of lists containing the resulting VPC security groups that should end up on each resource passed in. :param resources: List of resources containing VPC Security Groups :return: List of lists of security groups per resource """ resolved_groups = self.get_groups_by_names(self.get_action_group_names()) return_groups = [] for idx, r in enumerate(resources): rgroups = self.sg_expr.search(r) or [] add_groups = self.resolve_group_names( r, self._get_array('add'), resolved_groups) remove_groups = self.resolve_remove_symbols( r, self.resolve_group_names( r, self._get_array('remove'), resolved_groups), rgroups) isolation_groups = self.resolve_group_names( r, self._get_array('isolation-group'), resolved_groups) for g in remove_groups: if g in rgroups: rgroups.remove(g) for g in add_groups: if g not in rgroups: rgroups.append(g) if not rgroups: rgroups = list(isolation_groups) return_groups.append(rgroups) return return_groups