# Copyright 2016-2018 Capital One Services, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Application Load Balancers
"""
from __future__ import absolute_import, division, print_function, unicode_literals
import json
import logging
import six
from collections import defaultdict
from c7n.actions import ActionRegistry, BaseAction
from c7n.exceptions import PolicyValidationError
from c7n.filters import (
Filter, FilterRegistry, DefaultVpcBase, MetricsFilter, ValueFilter)
import c7n.filters.vpc as net_filters
from c7n import tags
from c7n.manager import resources
from c7n.query import QueryResourceManager, DescribeSource, ConfigSource
from c7n.utils import (
local_session, chunks, type_schema, get_retry, set_annotation)
from c7n.resources.shield import IsShieldProtected, SetShieldProtection
log = logging.getLogger('custodian.app-elb')
[docs]@resources.register('app-elb')
class AppELB(QueryResourceManager):
"""Resource manager for v2 ELBs (AKA ALBs).
"""
[docs] class resource_type(object):
service = 'elbv2'
type = 'loadbalancer/app'
enum_spec = ('describe_load_balancers', 'LoadBalancers', None)
name = 'LoadBalancerName'
id = 'LoadBalancerArn'
filter_name = "Names"
filter_type = "list"
dimension = "LoadBalancer"
date = 'CreatedTime'
config_type = 'AWS::ElasticLoadBalancingV2::LoadBalancer'
retry = staticmethod(get_retry(('Throttling',)))
[docs] @classmethod
def get_permissions(cls):
# override as the service is not the iam prefix
return ("elasticloadbalancing:DescribeLoadBalancers",
"elasticloadbalancing:DescribeLoadBalancerAttributes",
"elasticloadbalancing:DescribeTags")
[docs] def get_arn(self, r):
return r[self.resource_type.id]
[docs] def get_source(self, source_type):
if source_type == 'describe':
return DescribeAppElb(self)
elif source_type == 'config':
return ConfigAppElb(self)
raise ValueError("Unsupported source: %s for %s" % (
source_type, self.resource_type.config_type))
[docs]class DescribeAppElb(DescribeSource):
[docs] def get_resources(self, ids, cache=True):
"""Support server side filtering on arns or names
"""
if ids[0].startswith('arn:'):
params = {'LoadBalancerArns': ids}
else:
params = {'Names': ids}
return self.query.filter(self.manager, **params)
[docs] def augment(self, albs):
_describe_appelb_tags(
albs,
self.manager.session_factory,
self.manager.executor_factory,
self.manager.retry)
return albs
[docs]class ConfigAppElb(ConfigSource):
[docs] def load_resource(self, item):
resource = super(ConfigAppElb, self).load_resource(item)
item_tags = item['supplementaryConfiguration']['Tags']
# Config originally stored supplementaryconfig on elbv2 as json
# strings. Support that format for historical queries.
if isinstance(item_tags, six.string_types):
item_tags = json.loads(item_tags)
resource['Tags'] = [
{'Key': t['key'], 'Value': t['value']} for t in item_tags]
item_attrs = item['supplementaryConfiguration'][
'LoadBalancerAttributes']
if isinstance(item_attrs, six.string_types):
item_attrs = json.loads(item_attrs)
# Matches annotation of AppELBAttributeFilterBase filter
resource['Attributes'] = {
attr['key']: parse_attribute_value(attr['value']) for
attr in item_attrs}
return resource
def _describe_appelb_tags(albs, session_factory, executor_factory, retry):
client = local_session(session_factory).client('elbv2')
def _process_tags(alb_set):
alb_map = {alb['LoadBalancerArn']: alb for alb in alb_set}
results = retry(client.describe_tags, ResourceArns=list(alb_map.keys()))
for tag_desc in results['TagDescriptions']:
if ('ResourceArn' in tag_desc and
tag_desc['ResourceArn'] in alb_map):
alb_map[tag_desc['ResourceArn']]['Tags'] = tag_desc['Tags']
with executor_factory(max_workers=2) as w:
list(w.map(_process_tags, chunks(albs, 20)))
AppELB.filter_registry.register('tag-count', tags.TagCountFilter)
AppELB.filter_registry.register('marked-for-op', tags.TagActionFilter)
AppELB.filter_registry.register('shield-enabled', IsShieldProtected)
AppELB.filter_registry.register('network-location', net_filters.NetworkLocation)
AppELB.action_registry.register('set-shield', SetShieldProtection)
[docs]@AppELB.filter_registry.register('metrics')
class AppElbMetrics(MetricsFilter):
"""Filter app load balancer by metric values.
See available metrics here
https://docs.aws.amazon.com/elasticloadbalancing/latest/application/load-balancer-cloudwatch-metrics.html
Custodian defaults to specifying dimensions for the app elb only.
Target Group dimension not supported atm.
"""
[docs] def get_dimensions(self, resource):
return [{
'Name': self.model.dimension,
'Value': 'app/%s/%s' % (
resource[self.model.name],
resource[self.model.id].rsplit('/')[-1])}]
[docs]@AppELB.filter_registry.register('security-group')
class SecurityGroupFilter(net_filters.SecurityGroupFilter):
RelatedIdsExpression = "SecurityGroups[]"
[docs]@AppELB.filter_registry.register('subnet')
class SubnetFilter(net_filters.SubnetFilter):
RelatedIdsExpression = "AvailabilityZones[].SubnetId"
[docs]@AppELB.filter_registry.register('vpc')
class VpcFilter(net_filters.VpcFilter):
RelatedIdsExpression = "VpcId"
[docs]@AppELB.filter_registry.register('waf-enabled')
class WafEnabled(Filter):
schema = type_schema(
'waf-enabled', **{
'web-acl': {'type': 'string'},
'state': {'type': 'boolean'}})
permissions = ('waf-regional:ListResourcesForWebACL', 'waf-regional:ListWebACLs')
# TODO verify name uniqueness within region/account
# TODO consider associated resource fetch in augment
[docs] def process(self, resources, event=None):
client = local_session(self.manager.session_factory).client(
'waf-regional')
target_acl = self.data.get('web-acl')
state = self.data.get('state', False)
name_id_map = {}
resource_map = {}
wafs = self.manager.get_resource_manager('waf-regional').resources()
for w in wafs:
if 'c7n:AssociatedResources' not in w:
arns = client.list_resources_for_web_acl(
WebACLId=w['WebACLId']).get('ResourceArns', [])
w['c7n:AssociatedResources'] = arns
name_id_map[w['Name']] = w['WebACLId']
for r in w['c7n:AssociatedResources']:
resource_map[r] = w['WebACLId']
target_acl_id = name_id_map.get(target_acl, target_acl)
# generally frown on runtime validation errors, but also frown on
# api calls during validation.
if target_acl_id not in name_id_map.values():
raise ValueError("Invalid target acl:%s, acl not found" % target_acl)
arn_key = self.manager.resource_type.id
state_map = {}
for r in resources:
arn = r[arn_key]
if arn in resource_map:
r['c7n_webacl'] = resource_map[arn]
if not target_acl:
state_map[arn] = True
continue
r_acl = resource_map[arn]
if r_acl == target_acl_id:
state_map[arn] = True
continue
state_map[arn] = False
else:
state_map[arn] = False
return [r for r in resources if state_map[r[arn_key]] == state]
[docs]@AppELB.action_registry.register('set-waf')
class SetWaf(BaseAction):
"""Enable/Disable waf protection on applicable resource.
"""
permissions = ('waf-regional:AssociateWebACL', 'waf-regional:ListWebACLs')
schema = type_schema(
'set-waf', required=['web-acl'], **{
'web-acl': {'type': 'string'},
# 'force': {'type': 'boolean'},
'state': {'type': 'boolean'}})
[docs] def validate(self):
found = False
for f in self.manager.iter_filters():
if isinstance(f, WafEnabled):
found = True
break
if not found:
# try to ensure idempotent usage
raise PolicyValidationError(
"set-waf should be used in conjunction with waf-enabled filter on %s" % (
self.manager.data,))
return self
[docs] def process(self, resources):
wafs = self.manager.get_resource_manager('waf-regional').resources()
name_id_map = {w['Name']: w['WebACLId'] for w in wafs}
target_acl = self.data.get('web-acl')
target_acl_id = name_id_map.get(target_acl, target_acl)
state = self.data.get('state', True)
if state and target_acl_id not in name_id_map.values():
raise ValueError("invalid web acl: %s" % (target_acl_id))
client = local_session(
self.manager.session_factory).client('waf-regional')
arn_key = self.manager.resource_type.id
# TODO implement force to reassociate.
# TODO investigate limits on waf association.
for r in resources:
if state:
client.associate_web_acl(
WebACLId=target_acl_id, ResourceArn=r[arn_key])
else:
client.disassociate_web_acl(
WebACLId=target_acl_id, ResourceArn=r[arn_key])
[docs]@AppELB.action_registry.register('set-s3-logging')
class SetS3Logging(BaseAction):
"""Action to enable/disable S3 logging for an application loadbalancer.
:example:
.. code-block:: yaml
policies:
- name: elbv2-test
resource: app-elb
filters:
- type: value
key: Attributes."access_logs.s3.enabled"
value: False
actions:
- type: set-s3-logging
bucket: elbv2logtest
prefix: dahlogs
state: enabled
"""
schema = type_schema(
'set-s3-logging',
state={'enum': ['enabled', 'disabled']},
bucket={'type': 'string'},
prefix={'type': 'string'},
required=('state',))
permissions = ("elasticloadbalancing:ModifyLoadBalancerAttributes",)
[docs] def validate(self):
if self.data.get('state') == 'enabled':
if 'bucket' not in self.data or 'prefix' not in self.data:
raise PolicyValidationError((
"alb logging enablement requires `bucket` "
"and `prefix` specification on %s" % (self.manager.data,)))
return self
[docs] def process(self, resources):
client = local_session(self.manager.session_factory).client('elbv2')
for elb in resources:
elb_arn = elb['LoadBalancerArn']
attributes = [{
'Key': 'access_logs.s3.enabled',
'Value': (
self.data.get('state') == 'enabled' and 'true' or 'value')}]
if self.data.get('state') == 'enabled':
attributes.append({
'Key': 'access_logs.s3.bucket',
'Value': self.data['bucket']})
prefix_template = self.data['prefix']
info = {t['Key']: t['Value'] for t in elb.get('Tags', ())}
info['DNSName'] = elb.get('DNSName', '')
info['AccountId'] = elb['LoadBalancerArn'].split(':')[4]
info['LoadBalancerName'] = elb['LoadBalancerName']
attributes.append({
'Key': 'access_logs.s3.prefix',
'Value': prefix_template.format(**info)})
self.manager.retry(
client.modify_load_balancer_attributes,
LoadBalancerArn=elb_arn, Attributes=attributes)
[docs]@AppELB.action_registry.register('mark-for-op')
class AppELBMarkForOpAction(tags.TagDelayedAction):
"""Action to create a delayed action on an ELB to start at a later date
:example:
.. code-block:: yaml
policies:
- name: appelb-failed-mark-for-op
resource: app-elb
filters:
- "tag:custodian_elb_cleanup": absent
- State: failed
actions:
- type: mark-for-op
tag: custodian_elb_cleanup
msg: "AppElb failed: {op}@{action_date}"
op: delete
days: 1
"""
batch_size = 1
[docs]@AppELB.action_registry.register('tag')
class AppELBTagAction(tags.Tag):
"""Action to create tag/tags on an ELB
:example:
.. code-block:: yaml
policies:
- name: appelb-create-required-tag
resource: app-elb
filters:
- "tag:RequiredTag": absent
actions:
- type: tag
key: RequiredTag
value: RequiredValue
"""
batch_size = 1
permissions = ("elasticloadbalancing:AddTags",)
[docs] def process_resource_set(self, client, resource_set, ts):
client.add_tags(
ResourceArns=[alb['LoadBalancerArn'] for alb in resource_set],
Tags=ts)
[docs]@AppELB.action_registry.register('remove-tag')
class AppELBRemoveTagAction(tags.RemoveTag):
"""Action to remove tag/tags from an ELB
:example:
.. code-block:: yaml
policies:
- name: appelb-delete-expired-tag
resource: app-elb
filters:
- "tag:ExpiredTag": present
actions:
- type: remove-tag
tags: ["ExpiredTag"]
"""
batch_size = 1
permissions = ("elasticloadbalancing:RemoveTags",)
[docs] def process_resource_set(self, client, resource_set, tag_keys):
client.remove_tags(
ResourceArns=[alb['LoadBalancerArn'] for alb in resource_set],
TagKeys=tag_keys)
[docs]@AppELB.action_registry.register('delete')
class AppELBDeleteAction(BaseAction):
"""Action to delete an ELB
To avoid unwanted deletions of ELB, it is recommended to apply a filter
to the rule
:example:
.. code-block:: yaml
policies:
- name: appelb-delete-failed-elb
resource: app-elb
filters:
- State: failed
actions:
- delete
"""
schema = type_schema('delete', force={'type': 'boolean'})
permissions = (
"elasticloadbalancing:DeleteLoadBalancer",
"elasticloadbalancing:ModifyLoadBalancerAttributes",)
[docs] def process(self, load_balancers):
client = local_session(self.manager.session_factory).client('elbv2')
for lb in load_balancers:
self.process_alb(client, lb)
[docs] def process_alb(self, client, alb):
try:
if self.data.get('force'):
client.modify_load_balancer_attributes(
LoadBalancerArn=alb['LoadBalancerArn'],
Attributes=[{
'Key': 'deletion_protection.enabled',
'Value': 'false',
}])
self.manager.retry(
client.delete_load_balancer, LoadBalancerArn=alb['LoadBalancerArn'])
except client.exceptions.LoadBalancerNotFoundException:
pass
except client.exceptions.OperationNotPermittedException as e:
self.log.warning(
"Exception trying to delete ALB: %s error: %s",
alb['LoadBalancerArn'], e)
[docs]class AppELBListenerFilterBase(object):
""" Mixin base class for filters that query LB listeners.
"""
permissions = ("elasticloadbalancing:DescribeListeners",)
[docs] def initialize(self, albs):
client = local_session(self.manager.session_factory).client('elbv2')
self.listener_map = defaultdict(list)
for alb in albs:
try:
results = client.describe_listeners(
LoadBalancerArn=alb['LoadBalancerArn'])
except client.exceptions.LoadBalancerNotFoundException:
continue
self.listener_map[alb['LoadBalancerArn']] = results['Listeners']
[docs]def parse_attribute_value(v):
if v.isdigit():
v = int(v)
elif v == 'true':
v = True
elif v == 'false':
v = False
return v
[docs]class AppELBAttributeFilterBase(object):
""" Mixin base class for filters that query LB attributes.
"""
[docs] def initialize(self, albs):
client = local_session(self.manager.session_factory).client('elbv2')
def _process_attributes(alb):
if 'Attributes' not in alb:
alb['Attributes'] = {}
results = client.describe_load_balancer_attributes(
LoadBalancerArn=alb['LoadBalancerArn'])
# flatten out the list of dicts and cast
for pair in results['Attributes']:
k = pair['Key']
v = parse_attribute_value(pair['Value'])
alb['Attributes'][k] = v
with self.manager.executor_factory(max_workers=2) as w:
list(w.map(_process_attributes, albs))
[docs]@AppELB.filter_registry.register('is-logging')
class IsLoggingFilter(Filter, AppELBAttributeFilterBase):
""" Matches AppELBs that are logging to S3.
bucket and prefix are optional
:example:
.. code-block:: yaml
policies:
- name: alb-is-logging-test
resource: app-elb
filters:
- type: is-logging
- name: alb-is-logging-bucket-and-prefix-test
resource: app-elb
filters:
- type: is-logging
bucket: prodlogs
prefix: alblogs
"""
permissions = ("elasticloadbalancing:DescribeLoadBalancerAttributes",)
schema = type_schema('is-logging',
bucket={'type': 'string'},
prefix={'type': 'string'}
)
[docs] def process(self, resources, event=None):
self.initialize(resources)
bucket_name = self.data.get('bucket', None)
bucket_prefix = self.data.get('prefix', None)
return [alb for alb in resources
if alb['Attributes']['access_logs.s3.enabled'] and
(not bucket_name or bucket_name == alb['Attributes'].get(
'access_logs.s3.bucket', None)) and
(not bucket_prefix or bucket_prefix == alb['Attributes'].get(
'access_logs.s3.prefix', None))
]
[docs]@AppELB.filter_registry.register('is-not-logging')
class IsNotLoggingFilter(Filter, AppELBAttributeFilterBase):
""" Matches AppELBs that are NOT logging to S3.
or do not match the optional bucket and/or prefix.
:example:
.. code-block:: yaml
policies:
- name: alb-is-not-logging-test
resource: app-elb
filters:
- type: is-not-logging
- name: alb-is-not-logging-bucket-and-prefix-test
resource: app-elb
filters:
- type: is-not-logging
bucket: prodlogs
prefix: alblogs
"""
permissions = ("elasticloadbalancing:DescribeLoadBalancerAttributes",)
schema = type_schema('is-not-logging',
bucket={'type': 'string'},
prefix={'type': 'string'}
)
[docs] def process(self, resources, event=None):
self.initialize(resources)
bucket_name = self.data.get('bucket', None)
bucket_prefix = self.data.get('prefix', None)
return [alb for alb in resources
if alb['Type'] == 'application' and (
not alb['Attributes']['access_logs.s3.enabled'] or (
bucket_name and bucket_name != alb['Attributes'].get(
'access_logs.s3.bucket', None)) or (
bucket_prefix and bucket_prefix != alb['Attributes'].get(
'access_logs.s3.prefix', None)))]
[docs]class AppELBTargetGroupFilterBase(object):
""" Mixin base class for filters that query LB target groups.
"""
[docs] def initialize(self, albs):
self.target_group_map = defaultdict(list)
target_groups = self.manager.get_resource_manager(
'app-elb-target-group').resources()
for target_group in target_groups:
for load_balancer_arn in target_group['LoadBalancerArns']:
self.target_group_map[load_balancer_arn].append(target_group)
[docs]@AppELB.filter_registry.register('listener')
class AppELBListenerFilter(ValueFilter, AppELBListenerFilterBase):
"""Filter ALB based on matching listener attributes
Adding the `matched` flag will filter on previously matched listeners
:example:
.. code-block:: yaml
policies:
- name: app-elb-invalid-ciphers
resource: app-elb
filters:
- type: listener
key: Protocol
value: HTTPS
- type: listener
key: SslPolicy
value: ['ELBSecurityPolicy-TLS-1-1-2017-01','ELBSecurityPolicy-TLS-1-2-2017-01']
op: ni
matched: true
actions:
- type: modify-listener
sslpolicy: "ELBSecurityPolicy-TLS-1-2-2017-01"
"""
schema = type_schema(
'listener', rinherit=ValueFilter.schema, matched={'type': 'boolean'})
permissions = ("elasticloadbalancing:DescribeLoadBalancerAttributes",)
[docs] def validate(self):
if not self.data.get('matched'):
return
listeners = list(self.manager.iter_filters())
found = False
for f in listeners[:listeners.index(self)]:
if not f.data.get('matched', False):
found = True
break
if not found:
raise PolicyValidationError(
"matched listener filter, requires preceding listener filter on %s " % (
self.manager.data,))
return self
[docs] def process(self, albs, event=None):
self.initialize(albs)
return super(AppELBListenerFilter, self).process(albs, event)
def __call__(self, alb):
listeners = self.listener_map[alb['LoadBalancerArn']]
if self.data.get('matched', False):
listeners = alb.pop('c7n:MatchedListeners', [])
found_listeners = False
for listener in listeners:
if self.match(listener):
set_annotation(alb, 'c7n:MatchedListeners', listener)
found_listeners = True
return found_listeners
[docs]@AppELB.action_registry.register('modify-listener')
class AppELBModifyListenerPolicy(BaseAction):
"""Action to modify the policy for an App ELB
:example:
.. code-block:: yaml
policies:
- name: appelb-modify-listener
resource: app-elb
filters:
- type: listener
key: Protocol
value: HTTP
actions:
- type: modify-listener
protocol: HTTPS
sslpolicy: "ELBSecurityPolicy-TLS-1-2-2017-01"
certificate: "arn:aws:acm:region:123456789012:certificate/12345678-\
1234-1234-1234-123456789012"
"""
schema = type_schema(
'modify-listener',
port={'type': 'integer'},
protocol={'enum': ['HTTP', 'HTTPS']},
sslpolicy={'type': 'string'},
certificate={'type': 'string'}
)
permissions = ("elasticloadbalancing:ModifyListener",)
[docs] def validate(self):
for f in self.manager.iter_filters():
if f.type == 'listener':
return self
raise PolicyValidationError(
"modify-listener action requires the listener filter %s" % (
self.manager.data,))
[docs] def process(self, load_balancers):
args = {}
if 'port' in self.data:
args['Port'] = self.data.get('port')
if 'protocol' in self.data:
args['Protocol'] = self.data.get('protocol')
if 'sslpolicy' in self.data:
args['SslPolicy'] = self.data.get('sslpolicy')
if 'certificate' in self.data:
args['Certificates'] = [{'CertificateArn': self.data.get('certificate')}]
client = local_session(self.manager.session_factory).client('elbv2')
for alb in load_balancers:
for matched_listener in alb.get('c7n:MatchedListeners', ()):
client.modify_listener(
ListenerArn=matched_listener['ListenerArn'],
**args)
[docs]@AppELB.filter_registry.register('healthcheck-protocol-mismatch')
class AppELBHealthCheckProtocolMismatchFilter(Filter,
AppELBTargetGroupFilterBase):
"""Filter AppELBs with mismatched health check protocols
A mismatched health check protocol is where the protocol on the target group
does not match the load balancer health check protocol
:example:
.. code-block:: yaml
policies:
- name: appelb-healthcheck-mismatch
resource: app-elb
filters:
- healthcheck-protocol-mismatch
"""
schema = type_schema('healthcheck-protocol-mismatch')
permissions = ("elasticloadbalancing:DescribeTargetGroups",)
[docs] def process(self, albs, event=None):
def _healthcheck_protocol_mismatch(alb):
for target_group in self.target_group_map[alb['LoadBalancerArn']]:
if (target_group['Protocol'] !=
target_group['HealthCheckProtocol']):
return True
return False
self.initialize(albs)
return [alb for alb in albs if _healthcheck_protocol_mismatch(alb)]
[docs]@AppELB.filter_registry.register('target-group')
class AppELBTargetGroupFilter(ValueFilter, AppELBTargetGroupFilterBase):
"""Filter ALB based on matching target group value"""
schema = type_schema('target-group', rinherit=ValueFilter.schema)
permissions = ("elasticloadbalancing:DescribeTargetGroups",)
[docs] def process(self, albs, event=None):
self.initialize(albs)
return super(AppELBTargetGroupFilter, self).process(albs, event)
def __call__(self, alb):
target_groups = self.target_group_map[alb['LoadBalancerArn']]
return self.match(target_groups)
[docs]@AppELB.filter_registry.register('default-vpc')
class AppELBDefaultVpcFilter(DefaultVpcBase):
"""Filter all ELB that exist within the default vpc
:example:
.. code-block:: yaml
policies:
- name: appelb-in-default-vpc
resource: app-elb
filters:
- default-vpc
"""
schema = type_schema('default-vpc')
def __call__(self, alb):
return alb.get('VpcId') and self.match(alb.get('VpcId')) or False
[docs]@resources.register('app-elb-target-group')
class AppELBTargetGroup(QueryResourceManager):
"""Resource manager for v2 ELB target groups.
"""
[docs] class resource_type(object):
service = 'elbv2'
type = 'app-elb-target-group'
enum_spec = ('describe_target_groups', 'TargetGroups', None)
name = 'TargetGroupName'
id = 'TargetGroupArn'
filter_name = None
filter_type = None
dimension = None
date = None
filter_registry = FilterRegistry('app-elb-target-group.filters')
action_registry = ActionRegistry('app-elb-target-group.actions')
retry = staticmethod(get_retry(('Throttling',)))
filter_registry.register('tag-count', tags.TagCountFilter)
filter_registry.register('marked-for-op', tags.TagActionFilter)
[docs] @classmethod
def get_permissions(cls):
# override as the service is not the iam prefix
return ("elasticloadbalancing:DescribeTargetGroups",
"elasticloadbalancing:DescribeTags")
[docs] def augment(self, target_groups):
client = local_session(self.session_factory).client('elbv2')
def _describe_target_group_health(target_group):
result = self.retry(client.describe_target_health,
TargetGroupArn=target_group['TargetGroupArn'])
target_group['TargetHealthDescriptions'] = result[
'TargetHealthDescriptions']
with self.executor_factory(max_workers=2) as w:
list(w.map(_describe_target_group_health, target_groups))
_describe_target_group_tags(
target_groups, self.session_factory,
self.executor_factory, self.retry)
return target_groups
def _describe_target_group_tags(target_groups, session_factory,
executor_factory, retry):
client = local_session(session_factory).client('elbv2')
def _process_tags(target_group_set):
target_group_map = {
target_group['TargetGroupArn']:
target_group for target_group in target_group_set
}
results = retry(
client.describe_tags,
ResourceArns=list(target_group_map.keys()))
for tag_desc in results['TagDescriptions']:
if ('ResourceArn' in tag_desc and
tag_desc['ResourceArn'] in target_group_map):
target_group_map[
tag_desc['ResourceArn']
]['Tags'] = tag_desc['Tags']
with executor_factory(max_workers=2) as w:
list(w.map(_process_tags, chunks(target_groups, 20)))
[docs]@AppELBTargetGroup.action_registry.register('mark-for-op')
class AppELBTargetGroupMarkForOpAction(tags.TagDelayedAction):
"""Action to specify a delayed action on an ELB target group"""
[docs]@AppELBTargetGroup.action_registry.register('tag')
class AppELBTargetGroupTagAction(tags.Tag):
"""Action to create tag/tags on an ELB target group
:example:
.. code-block:: yaml
policies:
- name: appelb-targetgroup-add-required-tag
resource: app-elb-target-group
filters:
- "tag:RequiredTag": absent
actions:
- type: tag
key: RequiredTag
value: RequiredValue
"""
batch_size = 1
permissions = ("elasticloadbalancing:AddTags",)
[docs] def process_resource_set(self, client, resource_set, ts):
client.add_tags(
ResourceArns=[tgroup['TargetGroupArn'] for tgroup in resource_set],
Tags=ts)
[docs]@AppELBTargetGroup.action_registry.register('remove-tag')
class AppELBTargetGroupRemoveTagAction(tags.RemoveTag):
"""Action to remove tag/tags from ELB target group
:example:
.. code-block:: yaml
policies:
- name: appelb-targetgroup-remove-expired-tag
resource: app-elb-target-group
filters:
- "tag:ExpiredTag": present
actions:
- type: remove-tag
tags: ["ExpiredTag"]
"""
batch_size = 1
permissions = ("elasticloadbalancing:RemoveTags",)
[docs] def process_resource_set(self, client, resource_set, tag_keys):
client.remove_tags(
ResourceArns=[tgroup['TargetGroupArn'] for tgroup in resource_set],
TagKeys=tag_keys)
[docs]@AppELBTargetGroup.filter_registry.register('default-vpc')
class AppELBTargetGroupDefaultVpcFilter(DefaultVpcBase):
"""Filter all application elb target groups within the default vpc
:example:
.. code-block:: yaml
policies:
- name: appelb-targetgroups-default-vpc
resource: app-elb-target-group
filters:
- default-vpc
"""
schema = type_schema('default-vpc')
def __call__(self, target_group):
return (target_group.get('VpcId') and
self.match(target_group.get('VpcId')) or False)
[docs]@AppELBTargetGroup.action_registry.register('delete')
class AppELBTargetGroupDeleteAction(BaseAction):
"""Action to delete ELB target group
It is recommended to apply a filter to the delete policy to avoid unwanted
deletion of any app elb target groups.
:example:
.. code-block:: yaml
policies:
- name: appelb-targetgroups-delete-unused
resource: app-elb-target-group
filters:
- "tag:SomeTag": absent
actions:
- delete
"""
schema = type_schema('delete')
permissions = ('elasticloadbalancing:DeleteTargetGroup',)
[docs] def process(self, resources):
client = local_session(self.manager.session_factory).client('elbv2')
for tg in resources:
self.process_target_group(client, tg)
[docs] def process_target_group(self, client, target_group):
self.manager.retry(
client.delete_target_group,
TargetGroupArn=target_group['TargetGroupArn'])