# Copyright 2015-2017 Capital One Services, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Actions to perform on Azure resources
"""
import abc
import datetime
import logging
from email.utils import parseaddr
from datetime import timedelta
import jmespath
import six
from c7n_azure import constants
from c7n_azure.storage_utils import StorageUtilities
from c7n_azure.tags import TagHelper
from c7n_azure.utils import utcnow, ThreadHelper, StringUtils
from dateutil import tz as tzutils
from msrestazure.azure_exceptions import CloudError
from c7n import utils
from c7n.actions import BaseAction, BaseNotify, EventAction
from c7n.filters import FilterValidationError
from c7n.filters.core import PolicyValidationError
from c7n.filters.offhours import Time
from c7n.resolver import ValuesFrom
from c7n.utils import type_schema
[docs]@six.add_metaclass(abc.ABCMeta)
class AzureBaseAction(BaseAction):
session = None
max_workers = constants.DEFAULT_MAX_THREAD_WORKERS
chunk_size = constants.DEFAULT_CHUNK_SIZE
log = logging.getLogger('custodian.azure.AzureBaseAction')
[docs] def process(self, resources, event=None):
self.session = self.manager.get_session()
results, exceptions = self.process_in_parallel(resources, event)
if len(exceptions) > 0:
self.handle_exceptions(exceptions)
return results
[docs] def handle_exceptions(self, exceptions):
"""raising one exception re-raises the last exception and maintains
the stack trace"""
raise exceptions[0]
[docs] def process_in_parallel(self, resources, event):
return ThreadHelper.execute_in_parallel(
resources=resources,
event=event,
execution_method=self._process_resources,
executor_factory=self.executor_factory,
log=self.log,
max_workers=self.max_workers,
chunk_size=self.chunk_size
)
def _process_resources(self, resources, event):
self._prepare_processing()
for r in resources:
try:
self._process_resource(r)
except CloudError as e:
self.log.error("Failed to process resource.\n"
"Type: {0}.\n"
"Name: {1}.\n"
"Error: {2}".format(r['type'], r['name'], e))
def _prepare_processing(self):
pass
@abc.abstractmethod
def _process_resource(self, resource):
raise NotImplementedError(
"Base action class does not implement this behavior")
[docs]@six.add_metaclass(abc.ABCMeta)
class AzureEventAction(EventAction, AzureBaseAction):
def _process_resources(self, resources, event):
self._prepare_processing()
for r in resources:
try:
self._process_resource(r, event)
except CloudError as e:
self.log.error("Failed to process resource.\n"
"Type: {0}.\n"
"Name: {1}.\n"
"Error: {2}".format(r['type'], r['name'], e))
@abc.abstractmethod
def _process_resource(self, resource, event):
raise NotImplementedError(
"Base action class does not implement this behavior")
[docs]class Tag(AzureBaseAction):
"""Adds tags to Azure resources
.. code-block:: yaml
policies:
- name: azure-tag-resourcegroups
resource: azure.resourcegroup
description: |
Tag all existing resource groups with a value such as Environment
actions:
- type: tag
tag: Environment
value: Test
"""
schema = utils.type_schema(
'tag',
**{
'tag': {'type': 'string'},
'value': {'type': 'string'},
'tags': {'type': 'object'}
}
)
def __init__(self, data=None, manager=None, log_dir=None):
super(Tag, self).__init__(data, manager, log_dir)
[docs] def validate(self):
if not self.data.get('tags') and not (self.data.get('tag') and self.data.get('value')):
raise FilterValidationError(
"Must specify either tags or a tag and value")
if self.data.get('tags') and self.data.get('tag'):
raise FilterValidationError(
"Can't specify both tags and tag, choose one")
return self
def _prepare_processing(self,):
self.new_tags = self.data.get('tags') or {self.data.get('tag'): self.data.get('value')}
def _process_resource(self, resource):
TagHelper.add_tags(self, resource, self.new_tags)
[docs]class RemoveTag(AzureBaseAction):
"""Removes tags from Azure resources
.. code-block:: yaml
policies:
- name: azure-remove-tag-resourcegroups
resource: azure.resourcegroup
description: |
Remove tag for all existing resource groups with a key such as Environment
actions:
- type: untag
tags: ['Environment']
"""
schema = utils.type_schema(
'untag',
tags={'type': 'array', 'items': {'type': 'string'}})
def __init__(self, data=None, manager=None, log_dir=None):
super(RemoveTag, self).__init__(data, manager, log_dir)
[docs] def validate(self):
if not self.data.get('tags'):
raise FilterValidationError("Must specify tags")
return self
def _prepare_processing(self,):
self.tags_to_delete = self.data.get('tags')
def _process_resource(self, resource):
TagHelper.remove_tags(self, resource, self.tags_to_delete)
[docs]class AutoTagUser(AzureEventAction):
"""Attempts to tag a resource with the first user who created/modified it.
.. code-block:: yaml
policies:
- name: azure-auto-tag-creator
resource: azure.resourcegroup
description: |
Tag all existing resource groups with the 'CreatorEmail' tag
actions:
- type: auto-tag-user
tag: CreatorEmail
This action searches from the earliest 'write' operation's caller
in the activity logs for a particular resource.
Note: activity logs are only held for the last 90 days.
"""
default_user = "Unknown"
query_select = "eventTimestamp, operationName, caller"
max_query_days = 90
# compiled JMES paths
service_admin_jmes_path = jmespath.compile(constants.EVENT_GRID_SERVICE_ADMIN_JMES_PATH)
sp_jmes_path = jmespath.compile(constants.EVENT_GRID_SP_NAME_JMES_PATH)
upn_jmes_path = jmespath.compile(constants.EVENT_GRID_UPN_CLAIM_JMES_PATH)
principal_role_jmes_path = jmespath.compile(constants.EVENT_GRID_PRINCIPAL_ROLE_JMES_PATH)
principal_type_jmes_path = jmespath.compile(constants.EVENT_GRID_PRINCIPAL_TYPE_JMES_PATH)
schema = utils.type_schema(
'auto-tag-user',
required=['tag'],
**{'update': {'type': 'boolean'},
'tag': {'type': 'string'},
'days': {'type': 'integer'}})
def __init__(self, data=None, manager=None, log_dir=None):
super(AutoTagUser, self).__init__(data, manager, log_dir)
self.log = logging.getLogger('custodian.azure.actions.auto-tag-user')
[docs] def validate(self):
if self.manager.action_registry.get('tag') is None:
raise FilterValidationError("Resource does not support tagging")
if self.manager.data.get('mode', {}).get('type') == 'azure-event-grid' \
and self.data.get('days') is not None:
raise PolicyValidationError(
"Auto tag user in event mode does not use days.")
if (self.data.get('days') is not None and
(self.data.get('days') < 1 or self.data.get('days') > 90)):
raise FilterValidationError("Days must be between 1 and 90")
return self
def _prepare_processing(self):
self.session = self.manager.get_session()
self.client = self.manager.get_client('azure.mgmt.monitor.MonitorManagementClient')
self.tag_key = self.data['tag']
self.should_update = self.data.get('update', False)
def _process_resource(self, resource, event):
# if the auto-tag-user policy set update to False (or it's unset) then we
# will skip writing their UserName tag and not overwrite pre-existing values
if not self.should_update and resource.get('tags', {}).get(self.tag_key, None):
return
user = self.default_user
if event:
user = self._get_user_from_event(event) or user
else:
user = self._get_user_from_resource_logs(resource) or user
# issue tag action to label user
TagHelper.add_tags(self, resource, {self.tag_key: user})
def _get_user_from_event(self, event):
principal_role = self.principal_role_jmes_path.search(event)
principal_type = self.principal_type_jmes_path.search(event)
user = None
# The Subscription Admins role does not have a principal type
if StringUtils.equal(principal_role, 'Subscription Admin'):
user = self.service_admin_jmes_path.search(event)
# ServicePrincipal type
elif StringUtils.equal(principal_type, 'ServicePrincipal'):
user = self.sp_jmes_path.search(event)
# Other types and main fallback (e.g. User, Office 365 Groups, and Security Groups)
if not user and self.upn_jmes_path.search(event):
user = self.upn_jmes_path.search(event)
# Last effort search for an email address in the claims
if not user:
claims = event['data']['claims']
for c in claims:
value = claims[c]
if self._is_email(value):
user = value
if not user:
self.log.error('Principal could not be determined.')
return user
def _is_email(self, target):
if target is None:
return False
elif parseaddr(target)[1] and '@' in target and '.' in target:
return True
else:
return False
def _get_user_from_resource_logs(self, resource):
# Calculate start time
delta_days = self.data.get('days', self.max_query_days)
start_time = utcnow() - datetime.timedelta(days=delta_days)
# resource group type
if self.manager.type == 'resourcegroup':
resource_type = "Microsoft.Resources/subscriptions/resourcegroups"
query_filter = " and ".join([
"eventTimestamp ge '%s'" % start_time,
"resourceGroupName eq '%s'" % resource['name'],
"eventChannels eq 'Operation'"
])
# other Azure resources
else:
resource_type = resource['type']
query_filter = " and ".join([
"eventTimestamp ge '%s'" % start_time,
"resourceUri eq '%s'" % resource['id'],
"eventChannels eq 'Operation'"
])
# fetch activity logs
logs = self.client.activity_logs.list(
filter=query_filter,
select=self.query_select
)
# get the user who issued the first operation
operation_name = "%s/write" % resource_type
first_op = self.get_first_operation(logs, operation_name)
return first_op.caller if first_op else None
[docs] @staticmethod
def get_first_operation(logs, operation_name):
first_operation = None
for l in logs:
if l.operation_name.value and l.operation_name.value.lower() == operation_name.lower():
first_operation = l
return first_operation
[docs]class TagTrim(AzureBaseAction):
"""Automatically remove tags from an azure resource.
Azure Resources and Resource Groups have a limit of 15 tags.
In order to make additional tag space on a set of resources,
this action can be used to remove enough tags to make the
desired amount of space while preserving a given set of tags.
Setting the space value to 0 removes all tags but those
listed to preserve.
.. code-block :: yaml
policies:
- name: azure-tag-trim
comment: |
Any instances with 14 or more tags get tags removed until
they match the target tag count, in this case 13, so
that we free up tag slots for another usage.
resource: azure.resourcegroup
filters:
# Filter down to resources that do not have the space
# to add additional required tags. For example, if an
# additional 2 tags need to be added to a resource, with
# 15 tags as the limit, then filter down to resources that
# have 14 or more tags since they will need to have tags
# removed for the 2 extra. This also ensures that metrics
# reporting is correct for the policy.
- type: value
key: "length(Tags)"
op: ge
value: 14
actions:
- type: tag-trim
space: 2
preserve:
- OwnerContact
- Environment
- downtime
- custodian_status
"""
max_tag_count = 15
schema = utils.type_schema(
'tag-trim',
space={'type': 'integer'},
preserve={'type': 'array', 'items': {'type': 'string'}})
def __init__(self, data=None, manager=None, log_dir=None):
super(TagTrim, self).__init__(data, manager, log_dir)
self.preserve = set(self.data.get('preserve', {}))
self.space = self.data.get('space', 1)
[docs] def validate(self):
if self.space < 0 or self.space > 15:
raise FilterValidationError("Space must be between 0 and 15")
return self
def _process_resource(self, resource):
tags = resource.get('tags', {})
if self.space and len(tags) + self.space <= self.max_tag_count:
return
# delete tags
keys = set(tags)
tags_to_preserve = self.preserve.intersection(keys)
candidates = keys - tags_to_preserve
if self.space:
# Free up slots to fit
remove = (len(candidates) -
(self.max_tag_count - (self.space + len(tags_to_preserve))))
candidates = list(sorted(candidates))[:remove]
if not candidates:
self.log.warning(
"Could not find any candidates to trim %s" % resource['id'])
return
TagHelper.remove_tags(self, resource, candidates)
[docs]class Notify(BaseNotify):
batch_size = 50
schema = {
'type': 'object',
'anyOf': [
{'required': ['type', 'transport', 'to']},
{'required': ['type', 'transport', 'to_from']}],
'properties': {
'type': {'enum': ['notify']},
'to': {'type': 'array', 'items': {'type': 'string'}},
'owner_absent_contact': {'type': 'array', 'items': {'type': 'string'}},
'to_from': ValuesFrom.schema,
'cc': {'type': 'array', 'items': {'type': 'string'}},
'cc_from': ValuesFrom.schema,
'cc_manager': {'type': 'boolean'},
'from': {'type': 'string'},
'subject': {'type': 'string'},
'template': {'type': 'string'},
'transport': {
'oneOf': [
{'type': 'object',
'required': ['type', 'queue'],
'properties': {
'queue': {'type': 'string'},
'type': {'enum': ['asq']}
}}],
},
}
}
def __init__(self, data=None, manager=None, log_dir=None):
super(Notify, self).__init__(data, manager, log_dir)
[docs] def process(self, resources, event=None):
session = utils.local_session(self.manager.session_factory)
subscription_id = session.get_subscription_id()
message = {
'event': event,
'account_id': subscription_id,
'account': subscription_id,
'region': 'all',
'policy': self.manager.data}
message['action'] = self.expand_variables(message)
for batch in utils.chunks(resources, self.batch_size):
message['resources'] = batch
receipt = self.send_data_message(message, session)
self.log.info("sent message:%s policy:%s template:%s count:%s" % (
receipt, self.manager.data['name'],
self.data.get('template', 'default'), len(batch)))
[docs] def send_data_message(self, message, session):
if self.data['transport']['type'] == 'asq':
queue_uri = self.data['transport']['queue']
return self.send_to_azure_queue(queue_uri, message, session)
[docs] def send_to_azure_queue(self, queue_uri, message, session):
queue_service, queue_name = StorageUtilities.get_queue_client_by_uri(queue_uri, session)
return StorageUtilities.put_queue_message(queue_service, queue_name, self.pack(message)).id
DEFAULT_TAG = "custodian_status"
[docs]class TagDelayedAction(AzureBaseAction):
"""Tag resources for future action.
The optional 'tz' parameter can be used to adjust the clock to align
with a given timezone. The default value is 'utc'.
If neither 'days' nor 'hours' is specified, Cloud Custodian will default
to marking the resource for action 4 days in the future.
.. code-block :: yaml
policies:
- name: vm-mark-for-stop
resource: azure.vm
filters:
- type: value
key: Name
value: instance-to-stop-in-four-days
actions:
- type: mark-for-op
op: stop
"""
schema = utils.type_schema(
'mark-for-op',
tag={'type': 'string'},
msg={'type': 'string'},
days={'type': 'integer', 'minimum': 0, 'exclusiveMinimum': False},
hours={'type': 'integer', 'minimum': 0, 'exclusiveMinimum': False},
tz={'type': 'string'},
op={'type': 'string'})
default_template = 'Resource does not meet policy: {op}@{action_date}'
def __init__(self, data=None, manager=None, log_dir=None):
super(TagDelayedAction, self).__init__(data, manager, log_dir)
self.tz = tzutils.gettz(
Time.TZ_ALIASES.get(self.data.get('tz', 'utc')))
msg_tmpl = self.data.get('msg', self.default_template)
op = self.data.get('op', 'stop')
days = self.data.get('days', 0)
hours = self.data.get('hours', 0)
action_date = self.generate_timestamp(days, hours)
self.tag = self.data.get('tag', DEFAULT_TAG)
self.msg = msg_tmpl.format(
op=op, action_date=action_date)
[docs] def validate(self):
op = self.data.get('op')
if self.manager and op not in self.manager.action_registry.keys():
raise PolicyValidationError(
"mark-for-op specifies invalid op:%s in %s" % (
op, self.manager.data))
self.tz = tzutils.gettz(
Time.TZ_ALIASES.get(self.data.get('tz', 'utc')))
if not self.tz:
raise PolicyValidationError(
"Invalid timezone specified %s in %s" % (
self.tz, self.manager.data))
return self
[docs] def generate_timestamp(self, days, hours):
from c7n_azure.utils import now
n = now(tz=self.tz)
if days is None or hours is None:
# maintains default value of days being 4 if nothing is provided
days = 4
action_date = (n + timedelta(days=days, hours=hours))
if hours > 0:
action_date_string = action_date.strftime('%Y/%m/%d %H%M %Z')
else:
action_date_string = action_date.strftime('%Y/%m/%d')
return action_date_string
def _process_resource(self, resource):
tags = resource.get('tags', {})
# add new tag
tags[self.tag] = self.msg
TagHelper.update_resource_tags(self, resource, tags)
[docs]class DeleteAction(AzureBaseAction):
schema = type_schema('delete')
def _prepare_processing(self,):
self.client = self.manager.get_client('azure.mgmt.resource.ResourceManagementClient')
def _process_resource(self, resource):
self.client.resources.delete_by_id(resource['id'],
self.session.resource_api_version(resource['id']))