# Copyright 2017-2018 Capital One Services, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import jmespath
import json
import logging
import six
from googleapiclient.errors import HttpError
from c7n.actions import ActionRegistry
from c7n.filters import FilterRegistry
from c7n.manager import ResourceManager
from c7n.query import sources
from c7n.utils import local_session
log = logging.getLogger('c7n_gcp.query')
[docs]class ResourceQuery(object):
def __init__(self, session_factory):
self.session_factory = session_factory
[docs] def filter(self, resource_manager, **params):
m = resource_manager.resource_type
session = local_session(self.session_factory)
client = session.client(
m.service, m.version, m.component)
# depends on resource scope
if m.scope in ('project', 'zone'):
project = session.get_default_project()
if m.scope_template:
project = m.scope_template.format(project)
if m.scope_key:
params[m.scope_key] = project
else:
params['project'] = project
if m.scope == 'zone':
if session.get_default_zone():
params['zone'] = session.get_default_zone()
enum_op, path, extra_args = m.enum_spec
if extra_args:
params.update(extra_args)
return self._invoke_client_enum(
client, enum_op, params, path)
def _invoke_client_enum(self, client, enum_op, params, path):
if client.supports_pagination(enum_op):
results = []
for page in client.execute_paged_query(enum_op, params):
page_items = jmespath.search(path, page)
if page_items:
results.extend(page_items)
return results
else:
return jmespath.search(path,
client.execute_query(enum_op, verb_arguments=params))
[docs]@sources.register('describe-gcp')
class DescribeSource(object):
def __init__(self, manager):
self.manager = manager
self.query = ResourceQuery(manager.session_factory)
[docs] def get_resources(self, query):
if query is None:
query = {}
return self.query.filter(self.manager, **query)
[docs] def get_permissions(self):
return ()
[docs] def augment(self, resources):
return resources
[docs]@six.add_metaclass(QueryMeta)
class QueryResourceManager(ResourceManager):
def __init__(self, data, options):
super(QueryResourceManager, self).__init__(data, options)
self.source = self.get_source(self.source_type)
[docs] def get_permissions(self):
return ()
[docs] def get_source(self, source_type):
return sources.get(source_type)(self)
[docs] def get_client(self):
return local_session(self.session_factory).client(
self.resource_type.service,
self.resource_type.version,
self.resource_type.component)
[docs] def get_model(self):
return self.resource_type
[docs] def get_cache_key(self, query):
return {'source_type': self.source_type, 'query': query,
'service': self.resource_type.service,
'version': self.resource_type.version,
'component': self.resource_type.component}
[docs] def get_resource(self, resource_info):
return self.resource_type.get(self.get_client(), resource_info)
@property
def source_type(self):
return self.data.get('source', 'describe-gcp')
[docs] def get_resource_query(self):
if 'query' in self.data:
return {'filter': self.data.get('query')}
[docs] def resources(self, query=None):
q = query or self.get_resource_query()
key = self.get_cache_key(q)
resources = self._fetch_resources(q)
self._cache.save(key, resources)
return self.filter_resources(resources)
def _fetch_resources(self, query):
try:
return self.augment(self.source.get_resources(query)) or []
except HttpError as e:
error = extract_error(e)
if error is None:
raise
elif error == 'accessNotConfigured':
log.warning(
"Resource:%s not available -> Service:%s not enabled on %s",
self.type,
self.resource_type.service,
local_session(self.session_factory).get_default_project())
return []
raise
[docs] def augment(self, resources):
return resources
[docs]class ChildResourceManager(QueryResourceManager):
[docs] def get_resource(self, resource_info):
child_instance = super(ChildResourceManager, self).get_resource(resource_info)
parent_resource = self.resource_type.parent_spec['resource']
parent_instance = self.get_resource_manager(parent_resource).get_resource(
self._get_parent_resource_info(child_instance)
)
annotation_key = self.resource_type.get_parent_annotation_key()
child_instance[annotation_key] = parent_instance
return child_instance
def _fetch_resources(self, query):
if not query:
query = {}
resources = []
annotation_key = self.resource_type.get_parent_annotation_key()
parent_resource_manager = self.get_resource_manager(
self.resource_type.parent_spec['resource']
)
for parent_instance in parent_resource_manager.resources():
query.update(self._get_child_enum_args(parent_instance))
children = super(ChildResourceManager, self)._fetch_resources(query)
for child_instance in children:
child_instance[annotation_key] = parent_instance
resources.extend(children)
return resources
def _get_parent_resource_info(self, child_instance):
mappings = self.resource_type.parent_spec['parent_get_params']
return self._extract_fields(child_instance, mappings)
def _get_child_enum_args(self, parent_instance):
mappings = self.resource_type.parent_spec['child_enum_params']
return self._extract_fields(parent_instance, mappings)
@staticmethod
def _extract_fields(source, mappings):
result = {}
for mapping in mappings:
result[mapping[1]] = jmespath.search(mapping[0], source)
return result
[docs]@six.add_metaclass(TypeMeta)
class TypeInfo(object):
# api client construction information
service = None
version = None
component = None
# resource enumeration parameters
scope = 'project'
enum_spec = ('list', 'items[]', None)
# ie. when project is passed instead as parent
scope_key = None
# custom formatting for scope key
scope_template = None
# individual resource retrieval method, for serverless policies.
get = None
# for get methods that require the full event payload
get_requires_event = False
[docs]class ChildTypeInfo(TypeInfo):
parent_spec = None
[docs] @classmethod
def get_parent_annotation_key(cls):
parent_resource = cls.parent_spec['resource']
return 'c7n:{}'.format(parent_resource)
ERROR_REASON = jmespath.compile('error.errors[0].reason')