Source code for c7n.logs_support

# Copyright 2016-2017 Capital One Services, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
'''
Supporting utilities for various implementations
of PolicyExecutionMode.get_logs()
'''
from __future__ import absolute_import, division, print_function, unicode_literals

import io
import logging
import re
import time
from concurrent.futures import as_completed
from datetime import datetime
from dateutil import parser
from dateutil import tz
from gzip import GzipFile

from c7n.exceptions import ClientError
from c7n.executor import ThreadPoolExecutor
from c7n.utils import local_session


log = logging.getLogger('custodian.logs')


def _timestamp_from_string(date_text):
    try:
        date_dt = parser.parse(date_text)
        date_ts = time.mktime(date_dt.timetuple())
        return int(date_ts * 1000)
    except (AttributeError, TypeError, ValueError):
        return 0


[docs]def normalized_log_entries(raw_entries): '''Mimic the format returned by LambdaManager.logs()''' entry_start = r'([0-9:, \-]+) - .* - (\w+) - (.*)$' entry = None # process start/end here - avoid parsing log entries twice for line in raw_entries: m = re.match(entry_start, line) if m: # this is the start of a new entry # spit out the one previously built up (if any) if entry is not None: yield entry (log_time, log_level, log_text) = m.groups() # convert time log_timestamp = _timestamp_from_string(log_time) # join level and first line of message msg = '[{}] {}'.format(log_level, log_text) entry = { 'timestamp': log_timestamp, 'message': msg, } else: # additional line(s) for entry (i.e. stack trace) entry['message'] = entry['message'] + line if entry is not None: # return the final entry yield entry
[docs]def log_entries_in_range(entries, start, end): '''filter out entries before start and after end''' start = _timestamp_from_string(start) end = _timestamp_from_string(end) for entry in entries: log_timestamp = entry.get('timestamp', 0) if log_timestamp >= start and log_timestamp <= end: yield entry
[docs]def log_entries_from_s3(session_factory, output, start, end): client = local_session(session_factory).client('s3') key_prefix = output.key_prefix.strip('/') local_tz = tz.tzlocal() start = datetime.fromtimestamp( _timestamp_from_string(start) / 1000 ) end = datetime.fromtimestamp( _timestamp_from_string(end) / 1000 ).replace(tzinfo=local_tz) records = [] key_count = 0 log_filename = 'custodian-run.log.gz' marker = '{}/{}/{}'.format( key_prefix, start.strftime('%Y/%m/%d/00'), log_filename, ) p = client.get_paginator('list_objects_v2').paginate( Bucket=output.bucket, Prefix=key_prefix + '/', StartAfter=marker, ) with ThreadPoolExecutor(max_workers=20) as w: for key_set in p: if 'Contents' not in key_set: continue log_keys = [k for k in key_set['Contents'] if k['Key'].endswith(log_filename)] keys = [k for k in log_keys if k['LastModified'] < end] if len(log_keys) >= 1 and len(keys) == 0: # there were logs, but we're now past the end date break key_count += len(keys) futures = map( lambda k: w.submit(get_records, output.bucket, k, session_factory), keys, ) for f in as_completed(futures): records.extend(f.result()) log.info('Fetched {} records across {} files'.format( len(records), key_count, )) return records
[docs]def get_records(bucket, key, session_factory): client = local_session(session_factory).client('s3') result = client.get_object(Bucket=bucket, Key=key['Key']) blob = io.StringIO(result['Body'].read()) records = GzipFile(fileobj=blob).readlines() log.debug("bucket: %s key: %s records: %d", bucket, key['Key'], len(records)) return records
[docs]def log_entries_from_group(session, group_name, start, end): '''Get logs for a specific log group''' logs = session.client('logs') log.info("Fetching logs from group: %s" % group_name) try: logs.describe_log_groups(logGroupNamePrefix=group_name) except ClientError as e: if e.response['Error']['Code'] == 'ResourceNotFoundException': return raise try: log_streams = logs.describe_log_streams( logGroupName=group_name, orderBy="LastEventTime", limit=3, descending=True, ) except ClientError as e: if e.response['Error']['Code'] == 'ResourceNotFoundException': return raise start = _timestamp_from_string(start) end = _timestamp_from_string(end) for s in reversed(log_streams['logStreams']): result = logs.get_log_events( logGroupName=group_name, logStreamName=s['logStreamName'], startTime=start, endTime=end, ) for e in result['events']: yield e