Source code for c7n_azure.policy

# Copyright 2015-2018 Capital One Services, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import re
import sys

import six
from azure.mgmt.eventgrid.models import \
    StorageQueueEventSubscriptionDestination, StringInAdvancedFilter, EventSubscriptionFilter

from c7n import utils
from c7n.actions import EventAction
from c7n.policy import PullMode, ServerlessExecutionMode, execution
from c7n.utils import local_session
from c7n_azure.azure_events import AzureEvents, AzureEventSubscription
from c7n_azure.function_package import FunctionPackage
from c7n_azure.constants import (FUNCTION_EVENT_TRIGGER_MODE,
                                 FUNCTION_TIME_TRIGGER_MODE)
from c7n_azure.functionapp_utils import FunctionAppUtilities
from c7n_azure.storage_utils import StorageUtilities
from c7n_azure.utils import ResourceIdParser, StringUtils


[docs]class AzureFunctionMode(ServerlessExecutionMode): """A policy that runs/executes in azure functions.""" schema = { 'type': 'object', 'additionalProperties': False, 'properties': { 'provision-options': { 'type': 'object', 'appInsights': { 'type': 'object', 'oneOf': [ {'type': 'string'}, {'type': 'object', 'properties': { 'name': 'string', 'location': 'string', 'resourceGroupName': 'string'} } ] }, 'storageAccount': { 'type': 'object', 'oneOf': [ {'type': 'string'}, {'type': 'object', 'properties': { 'name': 'string', 'location': 'string', 'resourceGroupName': 'string'} } ] }, 'servicePlan': { 'type': 'object', 'oneOf': [ {'type': 'string'}, {'type': 'object', 'properties': { 'name': 'string', 'location': 'string', 'resourceGroupName': 'string', 'skuTier': 'string', 'skuName': 'string'} } ] }, }, 'execution-options': {'type': 'object'} } } POLICY_METRICS = ('ResourceCount', 'ResourceTime', 'ActionTime') default_storage_name = "custodian" def __init__(self, policy): self.policy = policy self.log = logging.getLogger('custodian.azure.AzureFunctionMode') self.policy_name = self.policy.data['name'].replace(' ', '-').lower() self.function_params = None self.function_app = None self.target_subscription_ids = []
[docs] def get_function_app_params(self): session = local_session(self.policy.session_factory) provision_options = self.policy.data['mode'].get('provision-options', {}) # Service plan is parsed first, location might be shared with storage & insights service_plan = AzureFunctionMode.extract_properties( provision_options, 'servicePlan', { 'name': 'cloud-custodian', 'location': 'eastus', 'resource_group_name': 'cloud-custodian', 'sku_tier': 'Dynamic', # consumption plan 'sku_name': 'Y1' }) # Metadata used for automatic naming location = service_plan.get('location', 'eastus') rg_name = service_plan['resource_group_name'] sub_id = session.get_subscription_id() target_sub_name = session.get_function_target_subscription_name() function_suffix = StringUtils.naming_hash(rg_name + target_sub_name) storage_suffix = StringUtils.naming_hash(rg_name + sub_id) storage_account = AzureFunctionMode.extract_properties( provision_options, 'storageAccount', { 'name': self.default_storage_name + storage_suffix, 'location': location, 'resource_group_name': rg_name }) app_insights = AzureFunctionMode.extract_properties( provision_options, 'appInsights', { 'name': service_plan['name'], 'location': location, 'resource_group_name': rg_name }) function_app_name = FunctionAppUtilities.get_function_name(self.policy_name, function_suffix) FunctionAppUtilities.validate_function_name(function_app_name) params = FunctionAppUtilities.FunctionAppInfrastructureParameters( app_insights=app_insights, service_plan=service_plan, storage_account=storage_account, function_app_resource_group_name=service_plan['resource_group_name'], function_app_name=function_app_name) return params
[docs] @staticmethod def extract_properties(options, name, properties): settings = options.get(name, {}) result = {} # str type implies settings is a resource id if isinstance(settings, six.string_types): result['id'] = settings result['name'] = ResourceIdParser.get_resource_name(settings) result['resource_group_name'] = ResourceIdParser.get_resource_group(settings) else: for key in properties.keys(): result[key] = settings.get(StringUtils.snake_to_camel(key), properties[key]) return result
[docs] def run(self, event=None, lambda_context=None): """Run the actual policy.""" raise NotImplementedError("subclass responsibility")
[docs] def provision(self): # Make sure we have auth data for function provisioning session = local_session(self.policy.session_factory) session.get_functions_auth_string("") if sys.version_info[0] < 3: self.log.error("Python 2.7 is not supported for deploying Azure Functions.") sys.exit(1) self.target_subscription_ids = session.get_function_target_subscription_ids() self.function_params = self.get_function_app_params() self.function_app = FunctionAppUtilities.deploy_function_app(self.function_params)
[docs] def get_logs(self, start, end): """Retrieve logs for the policy""" raise NotImplementedError("subclass responsibility")
[docs] def build_functions_package(self, queue_name=None, target_subscription_ids=None): self.log.info("Building function package for %s" % self.function_params.function_app_name) package = FunctionPackage(self.policy_name, target_subscription_ids=target_subscription_ids) package.build(self.policy.data, modules=['c7n', 'c7n-azure', 'applicationinsights'], non_binary_packages=['pyyaml', 'pycparser', 'tabulate', 'pyrsistent'], excluded_packages=['azure-cli-core', 'distlib', 'future', 'futures'], queue_name=queue_name) package.close() self.log.info("Function package built, size is %dMB" % (package.pkg.size / (1024 * 1024))) return package
[docs]@execution.register(FUNCTION_TIME_TRIGGER_MODE) class AzurePeriodicMode(AzureFunctionMode, PullMode): """A policy that runs/execute s in azure functions at specified time intervals.""" schema = utils.type_schema(FUNCTION_TIME_TRIGGER_MODE, schedule={'type': 'string'}, rinherit=AzureFunctionMode.schema)
[docs] def provision(self): super(AzurePeriodicMode, self).provision() package = self.build_functions_package(target_subscription_ids=self.target_subscription_ids) FunctionAppUtilities.publish_functions_package(self.function_params, package)
[docs] def run(self, event=None, lambda_context=None): """Run the actual policy.""" return PullMode.run(self)
[docs] def get_logs(self, start, end): """Retrieve logs for the policy""" raise NotImplementedError("error - not implemented")
[docs]@execution.register(FUNCTION_EVENT_TRIGGER_MODE) class AzureEventGridMode(AzureFunctionMode): """A policy that runs/executes in azure functions from an azure event.""" schema = utils.type_schema(FUNCTION_EVENT_TRIGGER_MODE, events={'type': 'array', 'items': { 'oneOf': [ {'type': 'string'}, {'type': 'object', 'required': ['resourceProvider', 'event'], 'properties': { 'resourceProvider': {'type': 'string'}, 'event': {'type': 'string'}}}] }}, required=['events'], rinherit=AzureFunctionMode.schema)
[docs] def provision(self): super(AzureEventGridMode, self).provision() session = local_session(self.policy.session_factory) # queue name is restricted to lowercase letters, numbers, and single hyphens queue_name = re.sub(r'(-{2,})+', '-', self.function_params.function_app_name.lower()) storage_account = self._create_storage_queue(queue_name, session) self._create_event_subscription(storage_account, queue_name, session) package = self.build_functions_package(queue_name=queue_name) FunctionAppUtilities.publish_functions_package(self.function_params, package)
[docs] def run(self, event=None, lambda_context=None): """Run the actual policy.""" resources = self.policy.resource_manager.get_resources([event['subject']]) resources = self.policy.resource_manager.filter_resources( resources, event) if not resources: self.policy.log.info( "policy: %s resources: %s no resources found" % ( self.policy.name, self.policy.resource_type)) return resources = self.policy.resource_manager.filter_resources( resources, event) with self.policy.ctx: self.policy.ctx.metrics.put_metric( 'ResourceCount', len(resources), 'Count', Scope="Policy", buffer=False) self.policy._write_file( 'resources.json', utils.dumps(resources, indent=2)) for action in self.policy.resource_manager.actions: self.policy.log.info( "policy: %s invoking action: %s resources: %d", self.policy.name, action.name, len(resources)) if isinstance(action, EventAction): results = action.process(resources, event) else: results = action.process(resources) self.policy._write_file( "action-%s" % action.name, utils.dumps(results)) return resources
[docs] def get_logs(self, start, end): """Retrieve logs for the policy""" raise NotImplementedError("error - not implemented")
def _create_storage_queue(self, queue_name, session): self.log.info("Creating storage queue") storage_client = session.client('azure.mgmt.storage.StorageManagementClient') storage_account = storage_client.storage_accounts.get_properties( self.function_params.storage_account['resource_group_name'], self.function_params.storage_account['name']) try: StorageUtilities.create_queue_from_storage_account(storage_account, queue_name, session) self.log.info("Storage queue creation succeeded") return storage_account except Exception as e: self.log.error('Queue creation failed with error: %s' % e) raise SystemExit def _create_event_subscription(self, storage_account, queue_name, session): self.log.info('Creating event grid subscription') destination = StorageQueueEventSubscriptionDestination(resource_id=storage_account.id, queue_name=queue_name) # filter specific events subscribed_events = AzureEvents.get_event_operations( self.policy.data['mode'].get('events')) advance_filter = StringInAdvancedFilter(key='Data.OperationName', values=subscribed_events) event_filter = EventSubscriptionFilter(advanced_filters=[advance_filter]) for subscription_id in self.target_subscription_ids: try: AzureEventSubscription.create(destination, queue_name, subscription_id, session, event_filter) self.log.info('Event grid subscription creation succeeded: subscription_id=%s' % subscription_id) except Exception as e: self.log.error('Event Subscription creation failed with error: %s' % e) raise SystemExit