# Copyright 2018-2019 Capital One Services, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import json
import hashlib
from six.moves.urllib.parse import urlparse
from c7n.exceptions import PolicyExecutionError, PolicyValidationError
from c7n.utils import local_session, type_schema
from .core import MethodAction
from c7n_gcp.provider import resources as gcp_resources
[docs]class PostFinding(MethodAction):
"""Post finding for matched resources to Cloud Security Command Center.
:Example:
.. code-block:: yaml
policies:
- name: gcp-instances-with-label
resource: gcp.instance
filters:
- "tag:name": "bad-instance"
actions:
- type: post-finding
org-domain: example.io
category: MEDIUM_INTERNET_SECURITY
The source for custodian can either be specified inline to the policy, or
custodian can generate one at runtime if it doesn't exist given a org-domain
or org-id.
Finding updates are not currently supported, due to upstream api issues.
"""
schema = type_schema(
'post-finding',
**{
'source': {
'type': 'string',
'description': 'qualified name of source to post to CSCC as'},
'org-domain': {'type': 'string'},
'org-id': {'type': 'integer'},
'category': {'type': 'string'}})
method_spec = {'op': 'create', 'result': 'name', 'annotation_key': 'c7n:Finding'}
# create throws error if already exists, patch method has bad docs.
ignore_error_codes = (409,)
CustodianSourceName = 'CloudCustodian'
DefaultCategory = 'Custodian'
Service = 'securitycenter'
ServiceVersion = 'v1beta1'
_source = None
[docs] def validate(self):
if not any([self.data.get(k) for k in ('source', 'org-domain', 'org-id')]):
raise PolicyValidationError(
"policy:%s CSCC post-finding requires one of source, org-domain, org-id" % (
self.manager.ctx.policy.name))
[docs] def process(self, resources):
self.initialize_source()
return super(PostFinding, self).process(resources)
[docs] def get_client(self, session, model):
return session.client(
self.Service, self.ServiceVersion, 'organizations.sources.findings')
[docs] def get_resource_params(self, model, resource):
return self.get_finding(resource)
[docs] def initialize_source(self):
# Ideally we'll be given a source, but we'll attempt to auto create it
# if given an org_domain or org_id.
if self._source:
return self._source
elif 'source' in self.data:
self._source = self.data['source']
return self._source
session = local_session(self.manager.session_factory)
# Resolve Organization Id
if 'org-id' in self.data:
org_id = self.data['org-id']
else:
orgs = session.client('cloudresourcemanager', 'v1', 'organizations')
res = orgs.execute_query(
'search', {'body': {
'filter': 'domain:%s' % self.data['org-domain']}}).get(
'organizations')
if not res:
raise PolicyExecutionError("Could not determine organization id")
org_id = res[0]['name'].rsplit('/', 1)[-1]
# Resolve Source
client = session.client(self.Service, self.ServiceVersion, 'organizations.sources')
source = None
res = [s for s in
client.execute_query(
'list', {'parent': 'organizations/{}'.format(org_id)}).get('sources')
if s['displayName'] == self.CustodianSourceName]
if res:
source = res[0]['name']
if source is None:
source = client.execute_command(
'create',
{'parent': 'organizations/{}'.format(org_id),
'body': {
'displayName': self.CustodianSourceName,
'description': 'Cloud Management Rules Engine'}}).get('name')
self.log.info(
"policy:%s resolved cscc source: %s, update policy with this source value",
self.manager.ctx.policy.name,
source)
self._source = source
return self._source
[docs] def get_name(self, r):
"""Given an arbitrary resource attempt to resolve back to a qualified name."""
namer = ResourceNameAdapters[self.manager.resource_type.service]
return namer(r)
[docs] def get_finding(self, resource):
policy = self.manager.ctx.policy
resource_name = self.get_name(resource)
# ideally we could be using shake, but its py3.6+ only
finding_id = hashlib.sha256(
b"%s%s" % (
policy.name.encode('utf8'),
resource_name.encode('utf8'))).hexdigest()[:32]
finding = {
'name': '{}/findings/{}'.format(self._source, finding_id),
'resourceName': resource_name,
'state': 'ACTIVE',
'category': self.data.get('category', self.DefaultCategory),
'eventTime': datetime.datetime.utcnow().isoformat('T') + 'Z',
'sourceProperties': {
'resource-type': self.manager.type,
'title': policy.data.get('title', policy.name),
'policy-name': policy.name,
'policy': json.dumps(policy.data)
}
}
request = {
'parent': self._source,
'findingId': finding_id[:31],
'body': finding}
return request
[docs] @classmethod
def register_resource(klass, registry, event):
for rtype, resource_manager in registry.items():
if resource_manager.resource_type.service not in ResourceNameAdapters:
continue
elif 'post-finding' in resource_manager.action_registry:
continue
resource_manager.action_registry.register('post-finding', klass)
# CSCC uses its own notion of resource id, if we want our findings on
# a resource to be linked from the asset view we need to post w/ the
# same resource name. If this conceptulization of resource name is
# standard, then we should move these to resource types with
# appropriate hierarchies by service.
[docs]def name_compute(r):
prefix = urlparse(r['selfLink']).path.strip('/').split('/')[2:][:-1]
return "//compute.googleapis.com/{}/{}".format(
"/".join(prefix),
r['id'])
[docs]def name_iam(r):
return "//iam.googleapis.com/projects/{}/serviceAccounts/{}".format(
r['projectId'],
r['uniqueId'])
[docs]def name_resourcemanager(r):
rid = r.get('projectNumber')
if rid is not None:
rtype = 'projects'
else:
rid = r.get('organizationId')
rtype = 'organizations'
return "//cloudresourcemanager.googleapis.com/{}/{}".format(
rtype, rid)
[docs]def name_container(r):
return "//container.googleapis.com/{}".format(
"/".join(urlparse(r['selfLink']).path.strip('/').split('/')[1:]))
[docs]def name_storage(r):
return "//storage.googleapis.com/{}".format(r['name'])
[docs]def name_appengine(r):
return "//appengine.googleapis.com/{}".format(r['name'])
ResourceNameAdapters = {
'appengine': name_appengine,
'cloudresourcemanager': name_resourcemanager,
'compute': name_compute,
'container': name_container,
'iam': name_iam,
'storage': name_storage,
}
gcp_resources.subscribe(
gcp_resources.EVENT_FINAL, PostFinding.register_resource)