# Copyright 2018 Capital One Services, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# import base64
from collections import namedtuple
import json
import logging
import hashlib
from c7n_gcp.client import errors
from c7n.mu import custodian_archive as base_archive
from c7n.utils import local_session
from googleapiclient.errors import HttpError
log = logging.getLogger('c7n_gcp.mu')
DEFAULT_REGION = 'us-central1'
[docs]def custodian_archive(packages=None):
if not packages:
packages = []
packages.append('c7n_gcp')
archive = base_archive(packages)
# Requirements are fetched server-side, which helps for binary extensions
# but for pure python packages, if we have a local install and its
# relatively small, it might be faster to just upload.
#
requirements = set()
requirements.add('jmespath')
requirements.add('retrying')
requirements.add('python-dateutil')
requirements.add('ratelimiter>=1.2.0.post0')
requirements.add('google-auth>=1.4.1')
requirements.add('google-auth-httplib2>=0.0.3')
requirements.add('google-api-python-client>=1.7.3')
archive.add_contents(
'requirements.txt',
'\n'.join(sorted(requirements)))
return archive
[docs]class CloudFunctionManager(object):
def __init__(self, session_factory, region="us-central1"):
self.session_factory = session_factory
self.session = local_session(session_factory)
self.client = self.session.client(
'cloudfunctions', 'v1', 'projects.locations.functions')
self.region = region
[docs] def list_functions(self, prefix=None):
"""List extant cloud functions."""
return self.client.execute_command(
'list',
{'parent': "projects/{}/locations/{}".format(
self.session.get_default_project(),
self.region)}
).get('functions', [])
[docs] def remove(self, func):
project = self.session.get_default_project()
# delete event sources
for e in func.events:
e.remove(func)
func_name = "projects/{}/locations/{}/functions/{}".format(
project, self.region, func.name)
try:
return self.client.execute_command('delete', {'name': func_name})
except errors.HttpError as e:
if e.resp.status != 404:
raise
[docs] def publish(self, func):
"""publish the given function."""
project = self.session.get_default_project()
func_name = "projects/{}/locations/{}/functions/{}".format(
project, self.region, func.name)
func_info = self.get(func.name)
source_url = None
archive = func.get_archive()
if not func_info or self._delta_source(archive, func_name):
source_url = self._upload(archive, self.region)
config = func.get_config()
config['name'] = func_name
if source_url:
config['sourceUploadUrl'] = source_url
# todo - we'll really need before() and after() for pre-provisioning of
# resources (ie topic for function stream on create) and post provisioning (schedule
# invocation of extant function).
#
# convergent event source creation
for e in func.events:
e.add(func)
if func_info is None:
log.info("creating function")
response = self.client.execute_command(
'create', {
'location': "projects/{}/locations/{}".format(
project, self.region),
'body': config})
else:
delta = delta_resource(func_info, config, ('httpsTrigger',))
if not delta:
response = None
else:
update_mask = ','.join(delta)
log.info("updating function config %s", update_mask)
response = self.client.execute_command(
'patch', {
'name': func_name,
'body': config,
'updateMask': update_mask})
return response
[docs] def metrics(self, funcs, start, end, period=5 * 60):
"""Get the metrics for a set of functions."""
[docs] def logs(self, func, start, end):
"""Get the logs for a given function."""
[docs] def get(self, func_name, qualifier=None):
"""Get the details on a given function."""
project = self.session.get_default_project()
func_name = "projects/{}/locations/{}/functions/{}".format(
project, self.region, func_name)
try:
return self.client.execute_query('get', {'name': func_name})
except errors.HttpError as e:
if e.resp.status != 404:
raise
def _get_http_client(self, client):
# Upload source, we need a class sans credentials as we're
# posting to a presigned url.
return self.client.get_http()
def _delta_source(self, archive, func_name):
checksum = archive.get_checksum(hasher=hashlib.md5)
source_info = self.client.execute_command(
'generateDownloadUrl', {'name': func_name, 'body': {}})
http = self._get_http_client(self.client)
source_headers, _ = http.request(source_info['downloadUrl'], 'HEAD')
# 'x-goog-hash': 'crc32c=tIfQ9A==, md5=DqrN06/NbVGsG+3CdrVK+Q=='
deployed_checksum = source_headers['x-goog-hash'].split(',')[-1].split('=', 1)[-1]
modified = deployed_checksum != checksum
log.debug("archive modified:%s checksum %r deployed checksum %r",
modified, checksum, deployed_checksum)
return modified
def _upload(self, archive, region):
"""Upload function source and return source url
"""
# Generate source upload url
url = self.client.execute_command(
'generateUploadUrl',
{'parent': 'projects/{}/locations/{}'.format(
self.session.get_default_project(),
region)}).get('uploadUrl')
log.debug("uploading function code %s", url)
http = self._get_http_client(self.client)
headers, response = http.request(
url, method='PUT',
headers={
'content-type': 'application/zip',
'Content-Length': '%d' % archive.size,
'x-goog-content-length-range': '0,104857600'
},
body=open(archive.path, 'rb')
)
log.info("function code uploaded")
if headers['status'] != '200':
raise RuntimeError("%s\n%s" % (headers, response))
return url
[docs]def delta_resource(old_config, new_config, ignore=()):
found = []
for k in new_config:
if k in ignore:
continue
if new_config[k] != old_config[k]:
found.append(k)
return found
[docs]class CloudFunction(object):
def __init__(self, func_data, archive=None):
self.func_data = func_data
self.archive = archive
@property
def name(self):
return self.func_data['name']
@property
def timeout(self):
return self.func_data.get('timeout', '60s')
@property
def memory_size(self):
return self.func_data.get('memory-size', 512)
@property
def service_account(self):
return self.func_data.get('service-account', None)
@property
def runtime(self):
return self.func_data.get('runtime', 'python37')
@property
def labels(self):
return dict(self.func_data.get('labels', {}))
@property
def environment(self):
return self.func_data.get('environment', {})
@property
def network(self):
return self.func_data.get('network')
@property
def max_instances(self):
return self.func_data.get('max-instances')
@property
def events(self):
return [e for e in self.func_data.get('events', ())]
[docs] def get_archive(self):
return self.archive
[docs] def get_config(self):
labels = self.labels
labels['deployment-tool'] = 'custodian'
conf = {
'name': self.name,
'timeout': self.timeout,
'entryPoint': 'handler',
'runtime': self.runtime,
'labels': labels,
'availableMemoryMb': self.memory_size}
if self.environment:
conf['environmentVariables'] = self.environment
if self.network:
conf['network'] = self.network
if self.max_instances:
conf['maxInstances'] = self.max_instances
if self.service_account:
conf['serviceAccountEmail'] = self.service_account
for e in self.events:
conf.update(e.get_config(self))
return conf
PolicyHandlerTemplate = """\
import base64
import json
import traceback
import os
import logging
import sys
def run(event, context=None):
logging.info("starting function execution")
trigger_type = os.environ.get('FUNCTION_TRIGGER_TYPE', '')
if trigger_type == 'HTTP_TRIGGER':
event = {'request': event}
else:
event = json.loads(base64.b64decode(event['data']).decode('utf-8'))
print("Event: %s" % (event,))
try:
from c7n_gcp.handler import run
result = run(event, context)
logging.info("function execution complete")
if trigger_type == 'HTTP_TRIGGER':
return json.dumps(result), 200, (('Content-Type', 'application/json'),)
return result
except Exception as e:
traceback.print_exc()
raise
"""
[docs]class PolicyFunction(CloudFunction):
def __init__(self, policy, archive=None, events=()):
self.policy = policy
self.func_data = self.policy.data['mode']
self.archive = archive or custodian_archive()
self._events = events
@property
def name(self):
return self.policy.name
@property
def events(self):
return self._events
[docs] def get_archive(self):
self.archive.add_contents('main.py', PolicyHandlerTemplate)
self.archive.add_contents(
'config.json', json.dumps(
{'policies': [self.policy.data]}, indent=2))
self.archive.close()
return self.archive
[docs] def get_config(self):
config = super(PolicyFunction, self).get_config()
config['entryPoint'] = 'run'
return config
[docs]class EventSource(object):
def __init__(self, session, data=None):
self.data = data
self.session = session
@property
def prefix(self):
return self.data.get('prefix', 'custodian-auto-')
[docs] def add(self, func):
"""Default no-op
"""
[docs] def remove(self, func):
"""Default no-op
"""
[docs] def get_config(self, func):
return {}
[docs]class HTTPEvent(EventSource):
"""Internet exposed http endpoint for cloud function"""
[docs] def get_config(self, func):
return {'httpsTrigger': {}}
[docs]class BucketEvent(EventSource):
trigger = 'google.storage.object.finalize'
collection_id = 'cloudfunctions.projects.buckets'
events = [
# finalize is basically on write
'google.storage.object.finalize',
'google.storage.object.archive',
'google.storage.object.delete',
'google.storage.object.metadataUpdate',
'providers/cloud.storage/eventTypes/object.change']
[docs] def get_config(self, func):
return {
'eventTrigger': {
'eventType': self.data.get('event', self.trigger),
'resource': self.data['bucket']}}
[docs]class PubSubSource(EventSource):
trigger = 'providers/cloud.pubsub/eventTypes/topic.publish'
collection_id = 'pubsub.projects.topics'
# data -> topic
[docs] def get_config(self, func):
return {
'eventTrigger': {
'eventType': self.trigger,
'failurePolicy': {},
'service': 'pubsub.googleapis.com',
'resource': self.get_topic_param()}}
[docs] def get_topic_param(self, topic=None, project=None):
return 'projects/{}/topics/{}'.format(
project or self.session.get_default_project(),
topic or self.data['topic'])
[docs] def ensure_topic(self):
"""Verify the pub/sub topic exists.
Returns the topic qualified name.
"""
client = self.session.client('pubsub', 'v1', 'projects.topics')
topic = self.get_topic_param()
try:
client.execute_command('get', {'topic': topic})
except HttpError as e:
if e.resp.status != 404:
raise
else:
return topic
# bug in discovery doc.. apis say body must be empty but its required in the
# discovery api for create.
client.execute_command('create', {'name': topic, 'body': {}})
return topic
[docs] def ensure_iam(self, publisher=None):
"""Ensure the given identities are in the iam role bindings for the topic.
"""
topic = self.get_topic_param()
client = self.session.client('pubsub', 'v1', 'projects.topics')
policy = client.execute_command('getIamPolicy', {'resource': topic})
policy.pop('etag')
found = False
for binding in policy.get('bindings', {}):
if binding['role'] != 'roles/pubsub.publisher':
continue
if publisher in binding['members']:
return
found = binding
if not found:
policy.setdefault(
'bindings', {'members': [publisher], 'role': 'roles/pubsub.publisher'})
else:
found['members'].append(publisher)
client.execute_command('setIamPolicy', {'resource': topic, 'body': {'policy': policy}})
[docs] def add(self):
self.ensure_topic()
[docs] def remove(self):
if not self.data.get('topic').startswith(self.prefix):
return
client = self.session.client('topic', 'v1', 'projects.topics')
client.execute_command('delete', {'topic': self.get_topic_param()})
[docs]class PeriodicEvent(EventSource):
"""Periodic serverless execution.
Supports both http and pub/sub triggers.
Note periodic requires the setup of app engine and is restricted
to app engine locations.
https://cloud.google.com/scheduler/docs/setup
Schedule can be specified in either cron syntax or app engine schedule expression.
https://cloud.google.com/scheduler/docs/configuring/cron-job-schedules
Examples of schedule expressions.
https://cloud.google.com/appengine/docs/standard/python/config/cronref
"""
def __init__(self, session, data):
self.session = session
self.data = data
@property
def target_type(self):
return self.data.get('target-type', 'http')
[docs] def get_config(self, func):
return self.get_target(func).get_config(func)
[docs] def add(self, func):
target = self.get_target(func)
target.add(func)
job = self.get_job_config(func, target)
client = self.session.client(
'cloudscheduler', 'v1beta1', 'projects.locations.jobs')
delta = self.diff_job(client, job)
if delta:
log.info("update periodic function - %s" % (", ".join(delta)))
return client.execute_command(
'patch', {
'name': job['name'],
'updateMask': ','.join(delta),
'body': job})
elif delta is not None:
return
return client.execute_command(
'create', {
'parent': 'projects/{}/locations/{}'.format(
self.session.get_default_project(),
self.data.get('region', DEFAULT_REGION)),
'body': job})
[docs] def remove(self, func):
target = self.get_target(func)
target.remove(func)
job = self.get_job_config(func, target)
if not job['name'].rsplit('/', 1)[-1].startswith(self.prefix):
return
client = self.session.client(
'cloudscheduler', 'v1beta1', 'projects.locations.jobs')
return client.execute_command('delete', {'name': job['name']})
# Periodic impl
[docs] def diff_job(self, client, target_job):
try:
job = client.execute_query('get', {'name': target_job['name']})
except HttpError as e:
if e.resp.status != 404:
raise
return None
delta = delta_resource(job, target_job, ignore=('httpTarget', 'pubSubTarget'))
if not delta:
return False
return delta
[docs] def get_target(self, func):
if self.target_type == 'http':
return HTTPEvent(self.session, self.data)
elif self.target_type == 'pubsub':
config = dict(self.data)
config['topic'] = '{}{}'.format(self.prefix, func.name)
return PubSubSource(self.session, config)
else:
raise ValueError("Unknown periodic target: %s" % self.target_type)
[docs] def get_job_config(self, func, target):
job = {
'name': "projects/{}/locations/{}/jobs/{}".format(
self.session.get_default_project(),
self.data.get('region', DEFAULT_REGION),
self.data.get('name', '{}{}'.format(self.prefix, func.name))),
'schedule': self.data['schedule'],
'timeZone': self.data.get('tz', 'Etc/UTC')}
if self.target_type == 'http':
job['httpTarget'] = {
'uri': 'https://{}-{}.cloudfunctions.net/{}'.format(
self.data.get('region', DEFAULT_REGION),
self.session.get_default_project(),
func.name)
}
elif self.target_type == 'pubsub':
job['pubsubTarget'] = {
'topicName': target.get_topic_param(),
}
return job
LogInfo = namedtuple('LogInfo', 'name scope_type scope_id id')
[docs]class LogSubscriber(EventSource):
"""Composite as a log sink
subscriber = LogSubscriber(dict(
log='projects/custodian-1291/logs/cloudaudit.googleapis.com%2Factivity'))
function = CloudFunction(dict(name='log-sub', events=[subscriber])
"""
# filter, log, topic, name
# optional scope, scope_id (if scope != default)
# + pub sub
def __init__(self, session, data):
self.data = data
self.session = session
self.pubsub = PubSubSource(session, data)
[docs] def get_log(self):
scope_type, scope_id, _, log_id = self.data['log'].split('/', 3)
return LogInfo(
scope_type=scope_type, scope_id=scope_id,
id=log_id, name=self.data['log'])
[docs] def get_log_filter(self):
return self.data.get('filter')
[docs] def get_parent(self, log_info):
"""Get the parent container for the log sink"""
if self.data.get('scope', 'log') == 'log':
if log_info.scope_type != 'projects':
raise ValueError("Invalid log subscriber scope")
parent = "%s/%s" % (log_info.scope_type, log_info.scope_id)
elif self.data['scope'] == 'project':
parent = 'projects/{}'.format(
self.data.get('scope_id', self.session.get_default_project()))
elif self.data['scope'] == 'organization':
parent = 'organizations/{}'.format(self.data['scope_id'])
elif self.data['scope'] == 'folder':
parent = 'folders/{}'.format(self.data['scope_id'])
elif self.data['scope'] == 'billing':
parent = 'billingAccounts/{}'.format(self.data['scope_id'])
else:
raise ValueError(
'invalid log subscriber scope %s' % (self.data))
return parent
[docs] def get_sink(self, topic_info=""):
log_info = self.get_log()
parent = self.get_parent(log_info)
log_filter = self.get_log_filter()
scope = parent.split('/', 1)[0]
sink = {
'parent': parent,
'uniqueWriterIdentity': False,
# Sink body
'body': {
'name': self.data['name'],
'destination': "pubsub.googleapis.com/%s" % topic_info
}
}
if log_filter is not None:
sink['body']['filter'] = log_filter
if scope != 'projects':
sink['body']['includeChildren'] = True
sink['uniqueWriterIdentity'] = True
sink_path = '%s/sinks/%s' % (sink['parent'], sink['body']['name'])
return scope, sink_path, sink
[docs] def ensure_sink(self):
"""Ensure the log sink and its pub sub topic exist."""
topic_info = self.pubsub.ensure_topic()
scope, sink_path, sink_info = self.get_sink(topic_info)
client = self.session.client('logging', 'v2', '%s.sinks' % scope)
try:
sink = client.execute_command('get', {'sinkName': sink_path})
except HttpError as e:
if e.resp.status != 404:
raise
sink = client.execute_command('create', sink_info)
else:
delta = delta_resource(sink, sink_info['body'])
if delta:
sink_info['updateMask'] = ','.join(delta)
sink_info['sinkName'] = sink_path
sink_info.pop('parent')
sink = client.execute_command('update', sink_info)
else:
return sink_path
self.pubsub.ensure_iam(publisher=sink['writerIdentity'])
return sink_path
[docs] def add(self, func):
"""Create any configured log sink if doesn't exist."""
return self.ensure_sink()
[docs] def remove(self, func):
"""Remove any provisioned log sink if auto created"""
if not self.data['name'].startswith(self.prefix):
return
parent = self.get_parent(self.get_log())
_, sink_path, _ = self.get_sink()
client = self.session.client(
'logging', 'v2', '%s.sinks' % (parent.split('/', 1)[0]))
try:
client.execute_command(
'delete', {'sinkName': sink_path})
except HttpError as e:
if e.resp.status != 404:
raise
[docs] def get_config(self, func):
return self.pubsub.get_config(func)
[docs]class ApiSubscriber(EventSource):
"""Subscribe to individual api calls
via audit log -> filtered sink -> pub/sub topic -> cloud function.
"""
# https://cloud.google.com/logging/docs/reference/audit/auditlog/rest/Shared.Types/AuditLog
# scope - project
# api calls
def __init__(self, session, data):
self.data = data
self.session = session
[docs] def get_subscription(self, func):
log_name = "{}/{}/logs/cloudaudit.googleapis.com%2Factivity".format(
self.data.get('scope', 'projects'),
self.session.get_default_project())
log_filter = 'logName = "%s"' % log_name
log_filter += " AND protoPayload.methodName = (%s)" % (
' OR '.join(['"%s"' % m for m in self.data['methods']]))
return {
'topic': '{}audit-{}'.format(self.prefix, func.name),
'name': '{}audit-{}'.format(self.prefix, func.name),
'log': log_name,
'filter': log_filter}
[docs] def add(self, func):
return LogSubscriber(self.session, self.get_subscription(func)).add(func)
[docs] def remove(self, func):
return LogSubscriber(self.session, self.get_subscription(func)).remove(func)
[docs] def get_config(self, func):
return LogSubscriber(self.session, self.get_subscription(func)).get_config(func)