Source code for c7n_azure.output

# Copyright 2015-2018 Capital One Services, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Provides output support for Azure Blob Storage using
the 'azure://' prefix

"""
import logging
import os
import shutil
import tempfile

from c7n_azure.storage_utils import StorageUtilities
from c7n_azure.utils import AppInsightsHelper
from c7n.output import (
    blob_outputs,
    log_outputs,
    metrics_outputs,
    DirectoryOutput,
    LogOutput,
    Metrics
)
from c7n.utils import local_session

from applicationinsights import TelemetryClient
from applicationinsights.logging import LoggingHandler
from azure.common import AzureHttpError


[docs]@blob_outputs.register('azure') class AzureStorageOutput(DirectoryOutput): """ Usage: .. code-block:: python with AzureStorageOutput(session_factory, 'azure://bucket/prefix'): log.info('xyz') # -> log messages sent to custodian-run.log.gz """ DEFAULT_BLOB_FOLDER_PREFIX = '{policy_name}/{now:%Y/%m/%d/%H/}' def __init__(self, ctx, config=None): self.ctx = ctx self.config = config self.log = logging.getLogger('custodian.output') self.root_dir = tempfile.mkdtemp() self.output_dir = self.get_output_path(self.ctx.options.output_dir) self.blob_service, self.container, self.file_prefix = \ self.get_blob_client_wrapper(self.output_dir, ctx) def __exit__(self, exc_type=None, exc_value=None, exc_traceback=None): if exc_type is not None: self.log.exception("Error while executing policy") self.log.debug("Uploading policy logs") self.compress() self.upload() shutil.rmtree(self.root_dir) self.log.debug("Policy Logs uploaded")
[docs] def get_output_path(self, output_url): # if pyformat is not specified, then use the policy name and formatted date if '{' not in output_url: output_url = self.join(output_url, self.DEFAULT_BLOB_FOLDER_PREFIX) return output_url.format(**self.get_output_vars())
[docs] def upload(self): for root, dirs, files in os.walk(self.root_dir): for f in files: blob_name = self.join(self.file_prefix, root[len(self.root_dir):], f) blob_name.strip('/') try: self.blob_service.create_blob_from_path( self.container, blob_name, os.path.join(root, f)) except AzureHttpError as e: self.log.error("Error writing output. Confirm output storage URL is correct " "and that 'Storage Blob Contributor' role is assigned. \n" + str(e)) self.log.debug("%s uploaded" % blob_name)
[docs] @staticmethod def join(*parts): return "/".join([s.strip('/') for s in parts if s != ''])
[docs] @staticmethod def get_blob_client_wrapper(output_path, ctx): # provides easier test isolation s = local_session(ctx.session_factory) return StorageUtilities.get_blob_client_by_uri(output_path, s)
[docs]@metrics_outputs.register('azure') class MetricsOutput(Metrics): """Send metrics data to app insights """ def __init__(self, ctx, config=None): super(MetricsOutput, self).__init__(ctx, config) self.namespace = self.ctx.policy.name self.tc = None def _initialize(self): if self.tc is not None: return self.instrumentation_key = AppInsightsHelper.get_instrumentation_key(self.config['url']) self.tc = TelemetryClient(self.instrumentation_key) self.subscription_id = local_session(self.ctx.policy.session_factory).get_subscription_id() def _format_metric(self, key, value, unit, dimensions): self._initialize() d = { 'Name': key, 'Value': value, 'Dimensions': { 'Policy': self.ctx.policy.name, 'ResType': self.ctx.policy.resource_type, 'SubscriptionId': self.subscription_id, 'ExecutionId': self.ctx.execution_id, 'Unit': unit } } for k, v in dimensions.items(): d['Dimensions'][k] = v return d def _put_metrics(self, ns, metrics): self._initialize() for m in metrics: self.tc.track_metric(name=m['Name'], value=m['Value'], properties=m['Dimensions']) self.tc.flush()
[docs]class AppInsightsLogHandler(LoggingHandler): def __init__(self, instrumentation_key, policy_name, subscription_id, execution_id): super(AppInsightsLogHandler, self).__init__(instrumentation_key) self.policy_name = policy_name self.subscription_id = subscription_id self.execution_id = execution_id
[docs] def emit(self, record): properties = { 'Process': record.processName, 'Module': record.module, 'FileName': record.filename, 'LineNumber': record.lineno, 'Level': record.levelname, 'Policy': self.policy_name, 'SubscriptionId': self.subscription_id, 'ExecutionId': self.execution_id } if record.exc_info: self.client.track_exception(*record.exc_info, properties=properties) return formatted_message = self.format(record) self.client.track_trace(formatted_message, properties=properties, severity=record.levelname)
[docs]@log_outputs.register('azure') class AppInsightsLogOutput(LogOutput): log_format = '%(asctime)s - %(levelname)s - %(name)s - %(message)s'
[docs] def get_handler(self): self.instrumentation_key = AppInsightsHelper.get_instrumentation_key(self.config['url']) self.subscription_id = local_session(self.ctx.policy.session_factory).get_subscription_id() return AppInsightsLogHandler(self.instrumentation_key, self.ctx.policy.name, self.subscription_id, self.ctx.execution_id)