Source code for c7n_azure.session

# Copyright 2018 Capital One Services, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import importlib
import inspect
import json
import logging
import os
import sys
import types

import jwt
from azure.common.credentials import (BasicTokenAuthentication,
                                      ServicePrincipalCredentials)
from c7n_azure import constants
from c7n_azure.utils import (ResourceIdParser, StringUtils, custodian_azure_send_override,
                             ManagedGroupHelper)
from msrestazure.azure_active_directory import MSIAuthentication
from azure.keyvault import KeyVaultAuthentication, AccessToken

try:
    from azure.cli.core._profile import Profile
except Exception:
    Profile = None


[docs]class Session(object): def __init__(self, subscription_id=None, authorization_file=None, resource=constants.RESOURCE_ACTIVE_DIRECTORY): """ :param subscription_id: If provided overrides environment variables. :param authorization_file: Path to file populated from 'get_functions_auth_string' :param resource: Resource endpoint for OAuth token. """ self.log = logging.getLogger('custodian.azure.session') self._provider_cache = {} self.subscription_id_override = subscription_id self.credentials = None self.subscription_id = None self.tenant_id = None self.resource_namespace = resource self._is_token_auth = False self._is_cli_auth = False self.authorization_file = authorization_file def _initialize_session(self): """ Creates a session using available authentication type. Auth priority: 1. Token Auth 2. Tenant Auth 3. Azure CLI Auth """ # Only run once if self.credentials is not None: return tenant_auth_variables = [ constants.ENV_TENANT_ID, constants.ENV_SUB_ID, constants.ENV_CLIENT_ID, constants.ENV_CLIENT_SECRET ] token_auth_variables = [ constants.ENV_ACCESS_TOKEN, constants.ENV_SUB_ID ] msi_auth_variables = [ constants.ENV_USE_MSI, constants.ENV_SUB_ID ] if self.authorization_file: self.credentials, self.subscription_id = self.load_auth_file(self.authorization_file) self.log.info("Creating session with authorization file") elif all(k in os.environ for k in token_auth_variables): # Token authentication self.credentials = BasicTokenAuthentication( token={ 'access_token': os.environ[constants.ENV_ACCESS_TOKEN] }) self.subscription_id = os.environ[constants.ENV_SUB_ID] self.log.info("Creating session with Token Authentication") self._is_token_auth = True elif all(k in os.environ for k in tenant_auth_variables): # Tenant (service principal) authentication self.credentials = ServicePrincipalCredentials( client_id=os.environ[constants.ENV_CLIENT_ID], secret=os.environ[constants.ENV_CLIENT_SECRET], tenant=os.environ[constants.ENV_TENANT_ID], resource=self.resource_namespace) self.subscription_id = os.environ[constants.ENV_SUB_ID] self.tenant_id = os.environ[constants.ENV_TENANT_ID] self.log.info("Creating session with Service Principal Authentication") elif all(k in os.environ for k in msi_auth_variables): # MSI authentication if constants.ENV_CLIENT_ID in os.environ: self.credentials = MSIAuthentication( client_id=os.environ[constants.ENV_CLIENT_ID], resource=self.resource_namespace) else: self.credentials = MSIAuthentication( resource=self.resource_namespace) self.subscription_id = os.environ[constants.ENV_SUB_ID] self.log.info("Creating session with MSI Authentication") else: # Azure CLI authentication self._is_cli_auth = True (self.credentials, self.subscription_id, self.tenant_id) = Profile().get_login_credentials( resource=self.resource_namespace) self.log.info("Creating session with Azure CLI Authentication") # TODO: cleanup this workaround when issue resolved. # https://github.com/Azure/azure-sdk-for-python/issues/5096 if self.resource_namespace == constants.RESOURCE_VAULT: access_token = AccessToken(token=self.get_bearer_token()) self.credentials = KeyVaultAuthentication(lambda _1, _2, _3: access_token) # Let provided id parameter override everything else if self.subscription_id_override is not None: self.subscription_id = self.subscription_id_override self.log.info("Session using Subscription ID: %s" % self.subscription_id) if self.credentials is None: self.log.error('Unable to locate credentials for Azure session.')
[docs] def get_session_for_resource(self, resource): return Session( subscription_id=self.subscription_id_override, authorization_file=self.authorization_file, resource=resource)
[docs] def client(self, client): self._initialize_session() service_name, client_name = client.rsplit('.', 1) svc_module = importlib.import_module(service_name) klass = getattr(svc_module, client_name) klass_parameters = None if sys.version_info[0] < 3: import funcsigs klass_parameters = funcsigs.signature(klass).parameters else: klass_parameters = inspect.signature(klass).parameters client = None if 'subscription_id' in klass_parameters: client = klass(credentials=self.credentials, subscription_id=self.subscription_id) else: client = klass(credentials=self.credentials) # Override send() method to log request limits & custom retries service_client = client._client service_client.orig_send = service_client.send service_client.send = types.MethodType(custodian_azure_send_override, service_client) # Don't respect retry_after_header to implement custom retries service_client.config.retry_policy.policy.respect_retry_after_header = False return client
[docs] def get_credentials(self): self._initialize_session() return self.credentials
[docs] def get_subscription_id(self): self._initialize_session() return self.subscription_id
[docs] def get_function_target_subscription_name(self): self._initialize_session() if constants.ENV_FUNCTION_MANAGEMENT_GROUP_NAME in os.environ: return os.environ[constants.ENV_FUNCTION_MANAGEMENT_GROUP_NAME] return os.environ.get(constants.ENV_FUNCTION_SUB_ID, self.subscription_id)
[docs] def get_function_target_subscription_ids(self): self._initialize_session() if constants.ENV_FUNCTION_MANAGEMENT_GROUP_NAME in os.environ: return ManagedGroupHelper.get_subscriptions_list( os.environ[constants.ENV_FUNCTION_MANAGEMENT_GROUP_NAME], self.get_credentials()) return [os.environ.get(constants.ENV_FUNCTION_SUB_ID, self.subscription_id)]
[docs] def resource_api_version(self, resource_id): """ latest non-preview api version for resource """ namespace = ResourceIdParser.get_namespace(resource_id) resource_type = ResourceIdParser.get_resource_type(resource_id) cache_id = namespace + resource_type if cache_id in self._provider_cache: return self._provider_cache[cache_id] resource_client = self.client('azure.mgmt.resource.ResourceManagementClient') provider = resource_client.providers.get(namespace) rt = next((t for t in provider.resource_types if StringUtils.equal(t.resource_type, resource_type)), None) if rt and rt.api_versions: versions = [v for v in rt.api_versions if 'preview' not in v.lower()] api_version = versions[0] if versions else rt.api_versions[0] self._provider_cache[cache_id] = api_version return api_version
[docs] def get_tenant_id(self): self._initialize_session() if self._is_token_auth: decoded = jwt.decode(self.credentials.token['access_token'], verify=False) return decoded['tid'] return self.tenant_id
[docs] def get_bearer_token(self): self._initialize_session() if self._is_cli_auth: return self.credentials._token_retriever()[1] return self.credentials.token['access_token']
[docs] def load_auth_file(self, path): with open(path) as json_file: data = json.load(json_file) self.tenant_id = data['credentials']['tenant'] return (ServicePrincipalCredentials( client_id=data['credentials']['client_id'], secret=data['credentials']['secret'], tenant=self.tenant_id, resource=self.resource_namespace ), data.get('subscription', None))
[docs] def get_functions_auth_string(self, target_subscription_id): """ Build auth json string for deploying Azure Functions. Look for dedicated Functions environment variables or fall back to normal Service Principal variables. """ self._initialize_session() function_auth_variables = [ constants.ENV_FUNCTION_TENANT_ID, constants.ENV_FUNCTION_CLIENT_ID, constants.ENV_FUNCTION_CLIENT_SECRET ] # Use dedicated function env vars if available if all(k in os.environ for k in function_auth_variables): auth = { 'credentials': { 'client_id': os.environ[constants.ENV_FUNCTION_CLIENT_ID], 'secret': os.environ[constants.ENV_FUNCTION_CLIENT_SECRET], 'tenant': os.environ[constants.ENV_FUNCTION_TENANT_ID] }, 'subscription': target_subscription_id } elif type(self.credentials) is ServicePrincipalCredentials: auth = { 'credentials': { 'client_id': os.environ[constants.ENV_CLIENT_ID], 'secret': os.environ[constants.ENV_CLIENT_SECRET], 'tenant': os.environ[constants.ENV_TENANT_ID] }, 'subscription': target_subscription_id } else: raise NotImplementedError( "Service Principal credentials are the only " "supported auth mechanism for deploying functions.") return json.dumps(auth, indent=2)