Source code for c7n.actions.securityhub

# Copyright 2018 Capital One Services, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import Counter
from datetime import datetime
from dateutil.tz import tzutc

import hashlib
import jmespath
import json

from .core import BaseAction
from c7n.utils import type_schema, local_session, chunks, dumps, filter_empty

from c7n.manager import resources as aws_resources
from c7n.version import version


FindingTypes = {
    "Software and Configuration Checks": [
        "Vulnerabilities",
        "Vulnerabilities/CVE",
        "AWS Security Best Practices",
        "AWS Security Best Practices/Network Reachability",
        "Industry and Regulatory Standards",
        "Industry and Regulatory Standards/CIS Host Hardening Benchmarks",
        "Industry and Regulatory Standards/CIS AWS Foundations Benchmark",
        "Industry and Regulatory Standards/PCI-DSS Controls",
        "Industry and Regulatory Standards/Cloud Security Alliance Controls",
        "Industry and Regulatory Standards/ISO 90001 Controls",
        "Industry and Regulatory Standards/ISO 27001 Controls",
        "Industry and Regulatory Standards/ISO 27017 Controls",
        "Industry and Regulatory Standards/ISO 27018 Controls",
        "Industry and Regulatory Standards/SOC 1",
        "Industry and Regulatory Standards/SOC 2",
        "Industry and Regulatory Standards/HIPAA Controls (USA)",
        "Industry and Regulatory Standards/NIST 800-53 Controls (USA)",
        "Industry and Regulatory Standards/NIST CSF Controls (USA)",
        "Industry and Regulatory Standards/IRAP Controls (Australia)",
        "Industry and Regulatory Standards/K-ISMS Controls (Korea)",
        "Industry and Regulatory Standards/MTCS Controls (Singapore)",
        "Industry and Regulatory Standards/FISC Controls (Japan)",
        "Industry and Regulatory Standards/My Number Act Controls (Japan)",
        "Industry and Regulatory Standards/ENS Controls (Spain)",
        "Industry and Regulatory Standards/Cyber Essentials Plus Controls (UK)",
        "Industry and Regulatory Standards/G-Cloud Controls (UK)",
        "Industry and Regulatory Standards/C5 Controls (Germany)",
        "Industry and Regulatory Standards/IT-Grundschutz Controls (Germany)",
        "Industry and Regulatory Standards/GDPR Controls (Europe)",
        "Industry and Regulatory Standards/TISAX Controls (Europe)",
    ],
    "TTPs": [
        "Initial Access",
        "Execution",
        "Persistence",
        "Privilege Escalation",
        "Defense Evasion",
        "Credential Access",
        "Discovery",
        "Lateral Movement",
        "Collection",
        "Command and Control",
    ],
    "Effects": [
        "Data Exposure",
        "Data Exfiltration",
        "Data Destruction",
        "Denial of Service",
        "Resource Consumption",
    ],
}


# Mostly undocumented value size limit
SECHUB_VALUE_SIZE_LIMIT = 1024


[docs]def build_vocabulary(): vocab = [] for ns, quals in FindingTypes.items(): for q in quals: vocab.append("{}/{}".format(ns, q)) return vocab
[docs]class PostFinding(BaseAction): """Report a finding to AWS Security Hub. Custodian acts as a finding provider, allowing users to craft policies that report to the AWS SecurityHub. For resources that are taggable, we will tag the resource with an identifier such that further findings generate updates. Example generate a finding for accounts that don't have shield enabled. :example: .. code-block:: yaml policies: - name: account-shield-enabled resource: account filters: - shield-enabled actions: - type: post-finding severity_normalized: 6 types: - "Software and Configuration Checks/Industry and Regulatory Standards/NIST CSF Controls (USA)" recommendation: "Enable shield" recommendation_url: "https://www.example.com/policies/AntiDDoS.html" confidence: 100 compliance_status: FAILED """ # NOQA FindingVersion = "2018-10-08" ProductName = "default" permissions = ('securityhub:BatchImportFindings',) schema_alias = True schema = type_schema( "post-finding", required=["types"], title={"type": "string"}, severity={"type": "number", 'default': 0}, severity_normalized={"type": "number", "min": 0, "max": 100, 'default': 0}, confidence={"type": "number", "min": 0, "max": 100}, criticality={"type": "number", "min": 0, "max": 100}, # Cross region aggregation region={'type': 'string', 'description': 'cross-region aggregation target'}, recommendation={"type": "string"}, recommendation_url={"type": "string"}, fields={"type": "object"}, batch_size={'type': 'integer', 'minimum': 1, 'maximum': 10}, types={ "type": "array", "items": {"type": "string", "enum": build_vocabulary()}, }, compliance_status={ "type": "string", "enum": ["PASSED", "WARNING", "FAILED", "NOT_AVAILABLE"], }, ) NEW_FINDING = 'New'
[docs] def get_finding_tag(self, resource): finding_tag = None tags = resource.get('Tags', []) finding_key = '{}:{}'.format('c7n:FindingId', self.data.get('title', self.manager.ctx.policy.name)) # Support Tags as dictionary if isinstance(tags, dict): return tags.get(finding_key) # Support Tags as list of {'Key': 'Value'} for t in tags: key = t['Key'] value = t['Value'] if key == finding_key: finding_tag = value return finding_tag
[docs] def group_resources(self, resources): grouped_resources = {} for r in resources: finding_tag = self.get_finding_tag(r) or self.NEW_FINDING grouped_resources.setdefault(finding_tag, []).append(r) return grouped_resources
[docs] def process(self, resources, event=None): region_name = self.data.get('region', self.manager.config.region) client = local_session( self.manager.session_factory).client( "securityhub", region_name=region_name) now = datetime.utcnow().replace(tzinfo=tzutc()).isoformat() # default batch size to one to work around security hub console issue # which only shows a single resource in a finding. batch_size = self.data.get('batch_size', 1) stats = Counter() for key, grouped_resources in self.group_resources(resources).items(): for resource_set in chunks(grouped_resources, batch_size): stats['Finding'] += 1 if key == self.NEW_FINDING: finding_id = None created_at = now updated_at = now else: finding_id, created_at = self.get_finding_tag( resource_set[0]).split(':', 1) updated_at = now finding = self.get_finding( resource_set, finding_id, created_at, updated_at) import_response = client.batch_import_findings( Findings=[finding]) if import_response['FailedCount'] > 0: stats['Failed'] += import_response['FailedCount'] self.log.error( "import_response=%s" % (import_response)) if key == self.NEW_FINDING: stats['New'] += len(resource_set) # Tag resources with new finding ids tag_action = self.manager.action_registry.get('tag') if tag_action is None: continue tag_action({ 'key': '{}:{}'.format( 'c7n:FindingId', self.data.get( 'title', self.manager.ctx.policy.name)), 'value': '{}:{}'.format( finding['Id'], created_at)}, self.manager).process(resource_set) else: stats['Update'] += len(resource_set) self.log.debug( "policy:%s securityhub %d findings resources %d new %d updated %d failed", self.manager.ctx.policy.name, stats['Finding'], stats['New'], stats['Update'], stats['Failed'])
[docs] def get_finding(self, resources, existing_finding_id, created_at, updated_at): policy = self.manager.ctx.policy model = self.manager.resource_type if existing_finding_id: finding_id = existing_finding_id else: finding_id = '{}/{}/{}/{}'.format( self.manager.config.region, self.manager.config.account_id, hashlib.md5(json.dumps( policy.data).encode('utf8')).hexdigest(), hashlib.md5(json.dumps(list(sorted( [r[model.id] for r in resources]))).encode( 'utf8')).hexdigest()) finding = { "SchemaVersion": self.FindingVersion, "ProductArn": "arn:aws:securityhub:{}:{}:product/{}/{}".format( self.manager.config.region, self.manager.config.account_id, self.manager.config.account_id, self.ProductName, ), "AwsAccountId": self.manager.config.account_id, "Description": self.data.get( "description", policy.data.get("description", "") ).strip(), "Title": self.data.get("title", policy.name), 'Id': finding_id, "GeneratorId": policy.name, 'CreatedAt': created_at, 'UpdatedAt': updated_at, "RecordState": "ACTIVE", } severity = {'Product': 0, 'Normalized': 0} if self.data.get("severity") is not None: severity["Product"] = self.data["severity"] if self.data.get("severity_normalized") is not None: severity["Normalized"] = self.data["severity_normalized"] if severity: finding["Severity"] = severity recommendation = {} if self.data.get("recommendation"): recommendation["Text"] = self.data["recommendation"] if self.data.get("recommendation_url"): recommendation["Url"] = self.data["recommendation_url"] if recommendation: finding["Remediation"] = {"Recommendation": recommendation} if "confidence" in self.data: finding["Confidence"] = self.data["confidence"] if "criticality" in self.data: finding["Criticality"] = self.data["criticality"] if "compliance_status" in self.data: finding["Compliance"] = {"Status": self.data["compliance_status"]} fields = { 'resource': policy.resource_type, 'ProviderName': 'CloudCustodian', 'ProviderVersion': version } if "fields" in self.data: fields.update(self.data["fields"]) else: tags = {} for t in policy.tags: if ":" in t: k, v = t.split(":", 1) else: k, v = t, "" tags[k] = v fields.update(tags) if fields: finding["ProductFields"] = fields finding_resources = [] for r in resources: finding_resources.append(self.format_resource(r)) finding["Resources"] = finding_resources finding["Types"] = list(self.data["types"]) return filter_empty(finding)
[docs] def format_resource(self, r): raise NotImplementedError("subclass responsibility")
[docs]class OtherResourcePostFinding(PostFinding): fields = ()
[docs] def format_resource(self, r): details = {} for k in r: if isinstance(k, (list, dict)): continue details[k] = r[k] for f in self.fields: value = jmespath.search(f['expr'], r) if not value: continue details[f['key']] = value for k, v in details.items(): if isinstance(v, datetime): v = v.isoformat() elif isinstance(v, (list, dict)): v = dumps(v) elif isinstance(v, (int, float, bool)): v = str(v) else: continue details[k] = v[:SECHUB_VALUE_SIZE_LIMIT] details['c7n:resource-type'] = self.manager.type other = { 'Type': 'Other', 'Id': self.manager.get_arns([r])[0], 'Region': self.manager.config.region, 'Details': {'Other': filter_empty(details)} } tags = {t['Key']: t['Value'] for t in r.get('Tags', [])} if tags: other['Tags'] = tags return other
[docs] @classmethod def register_resource(klass, registry, event): for rtype, resource_manager in registry.items(): if not resource_manager.has_arn(): continue if 'post-finding' in resource_manager.action_registry: continue resource_manager.action_registry.register('post-finding', klass)
aws_resources.subscribe( aws_resources.EVENT_FINAL, OtherResourcePostFinding.register_resource)