c7n package

Subpackages

Submodules

c7n.cache module

Provide basic caching services to avoid extraneous queries over multiple policies on the same resource type.

class c7n.cache.FileCacheManager(config)[source]

Bases: object

get(key)[source]
load()[source]
save(key, data)[source]
size()[source]
class c7n.cache.InMemoryCache[source]

Bases: object

get(key)[source]
load()[source]
save(key, data)[source]
size()[source]
class c7n.cache.NullCache(config)[source]

Bases: object

get(key)[source]
load()[source]
save(key, data)[source]
size()[source]
c7n.cache.factory(config)[source]

c7n.cli module

c7n.cli.main()[source]
c7n.cli.setproctitle(t)[source]
c7n.cli.setup_parser()[source]

c7n.commands module

class c7n.commands.DuplicateKeyCheckLoader(stream)[source]

Bases: yaml.cyaml.CSafeLoader

construct_mapping(node, deep=False)[source]
c7n.commands.logs(options, policies)[source]
c7n.commands.metrics_cmd(options, policies)[source]
c7n.commands.policy_command(f)[source]
c7n.commands.report(options, policies)[source]
c7n.commands.run(options, policies)[source]
c7n.commands.schema_cmd(options)[source]

Print info about the resources, actions and filters available.

c7n.commands.schema_completer(prefix)[source]

For tab-completion via argcomplete, return completion options.

For the given prefix so far, return the possible options. Note that filtering via startswith happens after this list is returned.

c7n.commands.validate(options)[source]
c7n.commands.version_cmd(options)[source]

c7n.config module

class c7n.config.Bag[source]

Bases: dict

class c7n.config.Config[source]

Bases: c7n.config.Bag

copy() → a shallow copy of D[source]
classmethod empty(**kw)[source]

c7n.credentials module

Authentication utilities

class c7n.credentials.SessionFactory(region, profile=None, assume_role=None, external_id=None)[source]

Bases: object

policy_name
set_subscribers(subscribers)[source]
update(session)[source]
c7n.credentials.assumed_session(role_arn, session_name, session=None, region=None, external_id=None)[source]

STS Role assume a boto3.Session

With automatic credential renewal.

Args:
role_arn: iam role arn to assume session_name: client session identifier session: an optional extant session, note session is captured in a function closure for renewing the sts assumed role.
Returns:a boto3 session using the sts assumed role credentials

Notes: We have to poke at botocore internals a few times

c7n.ctx module

class c7n.ctx.ExecutionContext(session_factory, policy, options)[source]

Bases: object

Policy Execution Context.

get_metadata(include=('sys-stats', 'api-stats', 'metrics'))[source]
initialize()[source]
log_dir

c7n.cwe module

class c7n.cwe.CloudWatchEvents[source]

Bases: object

A mapping of events to resource types.

classmethod get(event_name)[source]
classmethod get_ids(event, mode)[source]
classmethod get_trail_ids(event, mode)[source]

extract resources ids from a cloud trail event.

classmethod match(event)[source]

Match a given cwe event as cloudtrail with an api call

That has its information filled out.

trail_events = {'ConsoleLogin': {'ids': 'userIdentity.arn', 'source': 'signin.amazonaws.com'}, 'CreateAutoScalingGroup': {'ids': 'requestParameters.autoScalingGroupName', 'source': 'autoscaling.amazonaws.com'}, 'CreateBucket': {'ids': 'requestParameters.bucketName', 'source': 's3.amazonaws.com'}, 'CreateCluster': {'ids': 'requestParameters.clusterIdentifier', 'source': 'redshift.amazonaws.com'}, 'CreateDBInstance': {'ids': 'requestParameters.dBInstanceIdentifier', 'source': 'rds.amazonaws.com'}, 'CreateElasticsearchDomain': {'ids': 'requestParameters.domainName', 'source': 'es.amazonaws.com'}, 'CreateFunction': {'event': 'CreateFunction20150331', 'ids': 'requestParameters.functionName', 'source': 'lambda.amazonaws.com'}, 'CreateLoadBalancer': {'ids': 'requestParameters.loadBalancerName', 'source': 'elasticloadbalancing.amazonaws.com'}, 'CreateLoadBalancerPolicy': {'ids': 'requestParameters.loadBalancerName', 'source': 'elasticloadbalancing.amazonaws.com'}, 'CreateTable': {'ids': 'requestParameters.tableName', 'source': 'dynamodb.amazonaws.com'}, 'CreateVolume': {'ids': 'responseElements.volumeId', 'source': 'ec2.amazonaws.com'}, 'RunInstances': {'ids': 'responseElements.instancesSet.items[].instanceId', 'source': 'ec2.amazonaws.com'}, 'SetLoadBalancerPoliciesOfListener': {'ids': 'requestParameters.loadBalancerName', 'source': 'elasticloadbalancing.amazonaws.com'}, 'UpdateAutoScalingGroup': {'ids': 'requestParameters.autoScalingGroupName', 'source': 'autoscaling.amazonaws.com'}}

c7n.exceptions module

exception c7n.exceptions.CustodianError[source]

Bases: Exception

Custodian Exception Base Class

exception c7n.exceptions.DeprecationError[source]

Bases: c7n.exceptions.PolicySyntaxError

Policy using deprecated syntax

exception c7n.exceptions.InvalidOutputConfig[source]

Bases: c7n.exceptions.CustodianError

Invalid configuration for an output

exception c7n.exceptions.PolicyExecutionError[source]

Bases: c7n.exceptions.CustodianError

Error running a Policy.

exception c7n.exceptions.PolicySyntaxError[source]

Bases: c7n.exceptions.CustodianError

Policy Syntax Error

exception c7n.exceptions.PolicyValidationError[source]

Bases: c7n.exceptions.PolicySyntaxError

Policy Validation Error

exception c7n.exceptions.PolicyYamlError[source]

Bases: c7n.exceptions.PolicySyntaxError

Policy Yaml Structural Error

exception c7n.exceptions.ResourceLimitExceeded(msg, limit_type, limit, selection_count, population_count)[source]

Bases: c7n.exceptions.PolicyExecutionError

The policy would have affected more resources than its limit.

c7n.executor module

class c7n.executor.ExecutorRegistry(plugin_type)[source]

Bases: c7n.registry.PluginRegistry

class c7n.executor.MainThreadExecutor(*args, **kw)[source]

Bases: object

For running tests.

c7n_async == True -> catch exceptions and store them in the future. c7n_async == False -> let exceptions bubble up.

c7n_async = True
map(func, iterable)[source]
submit(func, *args, **kw)[source]
type = 'main'
class c7n.executor.MainThreadFuture(value, exception=None)[source]

Bases: object

add_done_callback(fn)[source]
cancel()[source]
cancelled()[source]
done()[source]
exception()[source]
result(timeout=None)[source]
c7n.executor.executor(name, **kw)[source]

c7n.handler module

Cloud-Custodian Lambda Entry Point

Mostly this serves to load up the policy and dispatch an event.

c7n.handler.dispatch_event(event, context)[source]
c7n.handler.get_local_output_dir()[source]

Create a local output directory per execution.

We’ve seen occassional (1/100000) perm issues with lambda on temp directory and changing unix execution users (2015-2018), so use a per execution temp space. With firecracker lambdas this may be outdated.

c7n.handler.init_config(policy_config)[source]

Get policy lambda execution configuration.

cli parameters are serialized into the policy lambda config, we merge those with any policy specific execution options.

–assume role and -s output directory get special handling, as to disambiguate any cli context.

account id is sourced from the config options or from api call and cached as a global

c7n.log module

Python Standard Logging integration with CloudWatch Logs

Double Buffered with background thread delivery.

We do an initial buffering on the log handler directly, to avoid some of the overhead of pushing to the queue (albeit dubious as std logging does default lock acquisition around handler emit). also uses a single thread for all outbound. Background thread uses a separate session.

class c7n.log.CloudWatchLogHandler(log_group='c7n.log', log_stream=None, session_factory=None)[source]

Bases: logging.Handler

Python Log Handler to Send to Cloud Watch Logs

https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/WhatIsCloudWatchLogs.html

batch_interval = 40
batch_min_buffer = 10
batch_size = 20
close()[source]

Tidy up any resources used by the handler.

This version removes the handler from an internal map of handlers, _handlers, which is used for handler lookup by name. Subclasses should ensure that this gets called from overridden close() methods.

emit(message)[source]

Send logs

flush()[source]

Ensure all logging output has been flushed.

flush_buffers(force=False)[source]
format_message(msg)[source]

format message.

start_transports()[source]

start thread transports.

class c7n.log.Error[source]

Bases: object

AlreadyAccepted = 'DataAlreadyAcceptedException'
InvalidToken = 'InvalidSequenceTokenException'
ResourceExists = 'ResourceAlreadyExistsException'
static code(e)[source]
class c7n.log.Transport(queue, batch_size, batch_interval, session_factory)[source]

Bases: object

create_stream(group, stream)[source]
loop()[source]
send()[source]
send_group(k, messages)[source]

c7n.logs_support module

Supporting utilities for various implementations of PolicyExecutionMode.get_logs()

c7n.logs_support.get_records(bucket, key, session_factory)[source]
c7n.logs_support.log_entries_from_group(session, group_name, start, end)[source]

Get logs for a specific log group

c7n.logs_support.log_entries_from_s3(session_factory, output, start, end)[source]
c7n.logs_support.log_entries_in_range(entries, start, end)[source]

filter out entries before start and after end

c7n.logs_support.normalized_log_entries(raw_entries)[source]

Mimic the format returned by LambdaManager.logs()

c7n.manager module

class c7n.manager.ResourceManager(ctx, data)[source]

Bases: object

action_registry = None
executor_factory

alias of concurrent.futures.thread.ThreadPoolExecutor

filter_registry = None
filter_resources(resources, event=None)[source]
format_json(resources, fh)[source]
get_model()[source]

Returns the resource meta-model.

classmethod get_permissions()[source]
get_resource_manager(resource_type, data=None)[source]

get a resource manager or a given resource type.

assumes the query is for the same underlying cloud provider.

get_resources(resource_ids)[source]

Retrieve a set of resources by id.

iter_filters(block_end=False)[source]
match_ids(ids)[source]

return ids that match this resource type’s id format.

resources()[source]
retry = None

c7n.mu module

Cloud Custodian Lambda Provisioning Support

docs/lambda.rst

class c7n.mu.AbstractLambdaFunction[source]

Bases: object

Abstract base class for lambda functions.

alias = None
concurrency
dead_letter_config
description
environment
get_archive()[source]

Return the lambda distribution archive object.

get_config()[source]
get_events(session_factory)[source]

event sources that should be bound to this lambda.

handler
kms_key_arn
layers
memory_size
name

Name for the lambda function

role
runtime
security_groups
subnets
tags
timeout
tracing_config
class c7n.mu.BucketLambdaNotification(data, session_factory, bucket)[source]

Bases: object

Subscribe a lambda to bucket notifications directly.

add(func)[source]
delta(src, tgt)[source]
remove(func)[source]
class c7n.mu.BucketSNSNotification(session_factory, bucket, topic=None)[source]

Bases: c7n.mu.SNSSubscription

Subscribe a lambda to bucket notifications via SNS.

get_topic(bucket)[source]
class c7n.mu.CloudWatchEventSource(data, session_factory)[source]

Bases: object

Subscribe a lambda to cloud watch events.

Cloud watch events supports a number of different event sources, from periodic timers with cron syntax, to real time instance state notifications, cloud trail events, and realtime asg membership changes.

Event Pattern for Instance State

{
  "source": ["aws.ec2"],
  "detail-type": ["EC2 Instance State-change Notification"],
  "detail": { "state": ["pending"]}
}

Event Pattern for Cloud Trail API

{
  "detail-type": ["AWS API Call via CloudTrail"],
  "detail": {
     "eventSource": ["s3.amazonaws.com"],
     "eventName": ["CreateBucket", "DeleteBucket"]
  }
}
ASG_EVENT_MAPPING = {'launch-failure': 'EC2 Instance Launch Unsuccessful', 'launch-success': 'EC2 Instance Launch Successful', 'terminate-failure': 'EC2 Instance Terminate Unsuccessful', 'terminate-success': 'EC2 Instance Terminate Successful'}
add(func)[source]
client
static delta(src, tgt)[source]

Given two cwe rules determine if the configuration is the same.

Name is already implied.

get(rule_name)[source]
pause(func)[source]
remove(func)[source]
render_event_pattern()[source]
resolve_cloudtrail_payload(payload)[source]
resume(func)[source]
session
update(func)[source]
class c7n.mu.CloudWatchLogSubscription(session_factory, log_groups, filter_pattern)[source]

Bases: object

Subscribe a lambda to a log group[s]

add(func)[source]
iam_delay = 1.5
remove(func)[source]
class c7n.mu.ConfigRule(data, session_factory)[source]

Bases: object

Use a lambda as a custom config rule.

add(func)[source]
static delta(rule, params)[source]
get(rule_name)[source]
get_rule_params(func)[source]
remove(func)[source]
class c7n.mu.LambdaFunction(func_data, archive)[source]

Bases: c7n.mu.AbstractLambdaFunction

concurrency
dead_letter_config
description
environment
get_archive()[source]

Return the lambda distribution archive object.

get_events(session_factory)[source]

event sources that should be bound to this lambda.

handler
kms_key_arn
layers
memory_size
name

Name for the lambda function

role
runtime
security_groups
subnets
tags
timeout
tracing_config
class c7n.mu.LambdaManager(session_factory, s3_asset_path=None)[source]

Bases: object

Provides CRUD operations around lambda functions

add(func, alias=None, role=None, s3_uri=None)
static delta_function(old_config, new_config)[source]
static diff_tags(old_tags, new_tags)[source]
get(func_name, qualifier=None)[source]
list_functions(prefix=None)[source]
logs(func, start, end)[source]
metrics(funcs, start, end, period=300)[source]
publish(func, alias=None, role=None, s3_uri=None)[source]
publish_alias(func_data, alias)[source]

Create or update an alias for the given function.

remove(func, alias=None)[source]
class c7n.mu.PolicyLambda(policy)[source]

Bases: c7n.mu.AbstractLambdaFunction

Wraps a custodian policy to turn it into a lambda function.

concurrency
dead_letter_config
description
environment
get_archive()[source]

Return the lambda distribution archive object.

get_events(session_factory)[source]

event sources that should be bound to this lambda.

handler = 'custodian_policy.run'
kms_key_arn
layers
memory_size
name

Name for the lambda function

packages
role
runtime
security_groups
subnets
tags
timeout
tracing_config
class c7n.mu.PythonPackageArchive(*modules)[source]

Bases: object

Creates a zip file for python lambda functions.

Parameters:modules (tuple) – the Python modules to add to the archive

Amazon doesn’t give us straightforward docs here, only an example, from which we can infer that they simply unzip the file into a directory on sys.path. So what we do is locate all of the modules specified, and add all of the .py files we find for these modules to a zip file.

In addition to the modules specified during instantiation, you can add arbitrary additional files to the archive using add_file() and add_contents(). For example, since we only add *.py files for you, you’ll need to manually add files for any compiled extension modules that your Lambda requires.

add_contents(dest, contents)[source]

Add file contents to the archive under dest.

If dest is a path, it will be added compressed and world-readable (user-writeable). You may also pass a ZipInfo for custom behavior.

add_directory(path, ignore=None)[source]

Add *.py files under the directory path to the archive.

add_file(src, dest=None)[source]

Add the file at src to the archive.

If dest is None then it is added under just the original filename. So add_file('foo/bar.txt') ends up at bar.txt in the archive, while add_file('bar.txt', 'foo/bar.txt') ends up at foo/bar.txt.

add_modules(ignore, *modules)[source]

Add the named Python modules to the archive. For consistency’s sake we only add *.py files, not *.pyc. We also don’t add other files, including compiled modules. You’ll have to add such files manually using add_file().

add_py_file(src, dest=None)[source]

This is a special case of add_file() that helps for adding a py when a pyc may be present as well. So for example, if __file__ is foo.pyc and you do:

archive.add_py_file(__file__)

then this method will add foo.py instead if it exists, and raise IOError if it doesn’t.

close()[source]

Close the zip file.

Note underlying tempfile is removed when archive is garbage collected.

get_bytes()[source]

Return the entire zip file as a byte string.

get_checksum(encoder=<function b64encode>, hasher=<built-in function openssl_sha256>)[source]

Return the b64 encoded sha256 checksum of the archive.

get_filenames()[source]

Return a list of filenames in the archive.

get_reader()[source]

Return a read-only ZipFile.

get_stream()[source]

Return the entire zip file as a stream.

path
remove()[source]

Dispose of the temp file for garbage collection.

size
zip_compression = 8
class c7n.mu.SNSSubscription(session_factory, topic_arns)[source]

Bases: object

Subscribe a lambda to one or more SNS topics.

add(func)[source]
iam_delay = 1.5
remove(func)[source]
class c7n.mu.SQSSubscription(session_factory, queue_arns, batch_size=10)[source]

Bases: object

Subscribe a lambda to one or more SQS queues.

add(func)[source]
remove(func)[source]
c7n.mu.checksum(fh, hasher, blocksize=65536)[source]
c7n.mu.custodian_archive(packages=None)[source]

Create a lambda code archive for running custodian.

Lambda archive currently always includes c7n and pkg_resources. Add additional packages in the mode block.

Example policy that includes additional packages

policy:
  name: lambda-archive-example
  resource: s3
  mode:
    packages:
      - botocore

packages: List of additional packages to include in the lambda archive.

c7n.mu.resource_exists(op, NotFound='ResourceNotFoundException', *args, **kw)[source]
c7n.mu.zinfo(fname)[source]

Amazon lambda exec environment setup can break itself if zip files aren’t constructed a particular way.

ie. It respects file perm attributes from the zip including those that prevent lambda from working. Namely lambda extracts code as one user, and executes code as a different user. Without permissions for the executing user to read the file the lambda function is broken.

Python’s default zipfile.writestr does a 0600 perm which we modify here as a workaround.

c7n.output module

Outputs metrics, logs, stats, traces, and structured records across a variety of sinks.

See docs/usage/outputs.rst

class c7n.output.BlobOutputRegistry(plugin_type)[source]

Bases: c7n.output.OutputRegistry

default_protocol = 'file'
class c7n.output.DeltaStats(ctx, config=None)[source]

Bases: object

Capture stats (dictionary of string->integer) as a stack.

Popping the stack automatically creates a delta of the last stack element to the current stats.

delta(before, after)[source]
get_snapshot()[source]
pop_snapshot()[source]
push_snapshot()[source]
class c7n.output.DirectoryOutput(ctx, config)[source]

Bases: object

compress()[source]
get_output_path(output_url)[source]
get_output_vars()[source]
get_resource_set()[source]
permissions = ()
type = 'file'
class c7n.output.LogFile(ctx, config=None)[source]

Bases: c7n.output.LogOutput

get_handler()[source]
log_path
type = 'default'
class c7n.output.LogMetrics(ctx, config=None)[source]

Bases: c7n.output.Metrics

Default metrics collection.

logs metrics, default handler should send to stderr

get_metadata()[source]
render_metric(m)[source]
type = 'default'
class c7n.output.LogOutput(ctx, config=None)[source]

Bases: object

get_handler()[source]
join_log()[source]
leave_log()[source]
log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
class c7n.output.LogOutputRegistry(plugin_type)[source]

Bases: c7n.output.OutputRegistry

default_protocol = 'aws'
class c7n.output.Metrics(ctx, config=None)[source]

Bases: object

BUFFER_SIZE = 20
flush()[source]
get_metadata()[source]
namespace = 'CloudMaid'
permissions = ()
put_metric(key, value, unit, buffer=True, **dimensions)[source]
class c7n.output.MetricsRegistry(plugin_type)[source]

Bases: c7n.output.OutputRegistry

select(selector, ctx)[source]
class c7n.output.NullStats(ctx, config=None)[source]

Bases: object

Execution statistics/metrics collection.

Encompasses concrete implementations over system stats (memory, cpu, cache size) and api calls.

The api supports stack nested snapshots, with delta consumption to support tracing metadata annotation across nested subsegments.

get_metadata()[source]

Return default of current to last snapshot, without popping.

pop_snapshot()[source]

Remove a snapshot from the snack and return a delta of the current stats to it.

push_snapshot()[source]

Take a snapshot of the system stats and append to the stack.

type = 'default'
class c7n.output.NullTracer(ctx, config=None)[source]

Bases: object

Tracing provides for detailed analytics of a policy execution.

Uses native cloud provider integration (xray, stack driver trace).

subsegment(name)[source]

Create a named subsegment as a context manager

type = 'default'
class c7n.output.OutputRegistry(plugin_type)[source]

Bases: c7n.registry.PluginRegistry

default_protocol = None
select(selector, ctx)[source]
class c7n.output.SystemStats(ctx, config=None)[source]

Bases: c7n.output.DeltaStats

Collect process statistics via psutil as deltas over policy execution.

get_metadata()[source]
get_snapshot()[source]
type = 'psutil'

c7n.policy module

class c7n.policy.ASGInstanceState(policy)[source]

Bases: c7n.policy.LambdaMode

a lambda policy that executes on an asg’s ec2 instance state changes.

schema = {'additionalProperties': False, 'properties': {'concurrency': {'type': 'integer'}, 'dead_letter_config': {'type': 'object'}, 'environment': {'type': 'object'}, 'events': {'items': {'enum': ['launch-success', 'launch-failure', 'terminate-success', 'terminate-failure']}, 'type': 'array'}, 'execution-options': {'type': 'object'}, 'function-prefix': {'type': 'string'}, 'kms_key_arn': {'type': 'string'}, 'layers': {'items': {'type': 'string'}, 'type': 'array'}, 'member-role': {'type': 'string'}, 'memory': {'type': 'number'}, 'packages': {'items': {'type': 'string'}, 'type': 'array'}, 'role': {'type': 'string'}, 'runtime': {'enum': ['python2.7', 'python3.6', 'python3.7']}, 'security_groups': {'type': 'array'}, 'subnets': {'type': 'array'}, 'tags': {'type': 'object'}, 'timeout': {'type': 'number'}, 'tracing_config': {'type': 'object'}, 'type': {'enum': ['asg-instance-state']}}, 'required': ['type'], 'type': 'object'}
type = 'asg-instance-state'
class c7n.policy.CloudTrailMode(policy)[source]

Bases: c7n.policy.LambdaMode

A lambda policy using cloudwatch events rules on cloudtrail api logs.

schema = {'additionalProperties': False, 'properties': {'concurrency': {'type': 'integer'}, 'dead_letter_config': {'type': 'object'}, 'environment': {'type': 'object'}, 'events': {'items': {'oneOf': [{'type': 'string'}, {'type': 'object', 'required': ['event', 'source', 'ids'], 'properties': {'source': {'type': 'string'}, 'ids': {'type': 'string'}, 'event': {'type': 'string'}}}]}, 'type': 'array'}, 'execution-options': {'type': 'object'}, 'function-prefix': {'type': 'string'}, 'kms_key_arn': {'type': 'string'}, 'layers': {'items': {'type': 'string'}, 'type': 'array'}, 'member-role': {'type': 'string'}, 'memory': {'type': 'number'}, 'packages': {'items': {'type': 'string'}, 'type': 'array'}, 'role': {'type': 'string'}, 'runtime': {'enum': ['python2.7', 'python3.6', 'python3.7']}, 'security_groups': {'type': 'array'}, 'subnets': {'type': 'array'}, 'tags': {'type': 'object'}, 'timeout': {'type': 'number'}, 'tracing_config': {'type': 'object'}, 'type': {'enum': ['cloudtrail']}}, 'required': ['type'], 'type': 'object'}
type = 'cloudtrail'
validate()[source]

Validate configuration settings for execution mode.

class c7n.policy.ConfigRuleMode(policy)[source]

Bases: c7n.policy.LambdaMode

a lambda policy that executes as a config service rule. http://docs.aws.amazon.com/config/latest/APIReference/API_PutConfigRule.html

cfg_event = None
resolve_resources(event)[source]
run(event, lambda_context)[source]

Run policy in push mode against given event.

Lambda automatically generates cloud watch logs, and metrics for us, albeit with some deficienies, metrics no longer count against valid resources matches, but against execution.

If metrics execution option is enabled, custodian will generate metrics per normal.

schema = {'additionalProperties': False, 'properties': {'concurrency': {'type': 'integer'}, 'dead_letter_config': {'type': 'object'}, 'environment': {'type': 'object'}, 'execution-options': {'type': 'object'}, 'function-prefix': {'type': 'string'}, 'kms_key_arn': {'type': 'string'}, 'layers': {'items': {'type': 'string'}, 'type': 'array'}, 'member-role': {'type': 'string'}, 'memory': {'type': 'number'}, 'packages': {'items': {'type': 'string'}, 'type': 'array'}, 'role': {'type': 'string'}, 'runtime': {'enum': ['python2.7', 'python3.6', 'python3.7']}, 'security_groups': {'type': 'array'}, 'subnets': {'type': 'array'}, 'tags': {'type': 'object'}, 'timeout': {'type': 'number'}, 'tracing_config': {'type': 'object'}, 'type': {'enum': ['config-rule']}}, 'required': ['type'], 'type': 'object'}
type = 'config-rule'
class c7n.policy.EC2InstanceState(policy)[source]

Bases: c7n.policy.LambdaMode

A lambda policy that executes on ec2 instance state changes.

https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-lifecycle.html

schema = {'additionalProperties': False, 'properties': {'concurrency': {'type': 'integer'}, 'dead_letter_config': {'type': 'object'}, 'environment': {'type': 'object'}, 'events': {'items': {'enum': ['pending', 'running', 'shutting-down', 'stopped', 'stopping', 'terminated']}, 'type': 'array'}, 'execution-options': {'type': 'object'}, 'function-prefix': {'type': 'string'}, 'kms_key_arn': {'type': 'string'}, 'layers': {'items': {'type': 'string'}, 'type': 'array'}, 'member-role': {'type': 'string'}, 'memory': {'type': 'number'}, 'packages': {'items': {'type': 'string'}, 'type': 'array'}, 'role': {'type': 'string'}, 'runtime': {'enum': ['python2.7', 'python3.6', 'python3.7']}, 'security_groups': {'type': 'array'}, 'subnets': {'type': 'array'}, 'tags': {'type': 'object'}, 'timeout': {'type': 'number'}, 'tracing_config': {'type': 'object'}, 'type': {'enum': ['ec2-instance-state']}}, 'required': ['type'], 'type': 'object'}
type = 'ec2-instance-state'
class c7n.policy.GuardDutyMode(policy)[source]

Bases: c7n.policy.LambdaMode

Incident Response for AWS Guard Duty

get_member_account_id(event)[source]
id_exprs = {'account': {'type': 'subexpression', 'children': [{'type': 'field', 'children': [], 'value': 'detail'}, {'type': 'field', 'children': [], 'value': 'accountId'}]}, 'ec2': {'type': 'subexpression', 'children': [{'type': 'field', 'children': [], 'value': 'detail'}, {'type': 'field', 'children': [], 'value': 'resource'}, {'type': 'field', 'children': [], 'value': 'instanceDetails'}, {'type': 'field', 'children': [], 'value': 'instanceId'}]}, 'iam-user': {'type': 'subexpression', 'children': [{'type': 'field', 'children': [], 'value': 'detail'}, {'type': 'field', 'children': [], 'value': 'resource'}, {'type': 'field', 'children': [], 'value': 'accessKeyDetails'}, {'type': 'field', 'children': [], 'value': 'userName'}]}}
provision()[source]

Provision any resources needed for the policy.

resolve_resources(event)[source]
schema = {'additionalProperties': False, 'properties': {'concurrency': {'type': 'integer'}, 'dead_letter_config': {'type': 'object'}, 'environment': {'type': 'object'}, 'execution-options': {'type': 'object'}, 'function-prefix': {'type': 'string'}, 'kms_key_arn': {'type': 'string'}, 'layers': {'items': {'type': 'string'}, 'type': 'array'}, 'member-role': {'type': 'string'}, 'memory': {'type': 'number'}, 'packages': {'items': {'type': 'string'}, 'type': 'array'}, 'role': {'type': 'string'}, 'runtime': {'enum': ['python2.7', 'python3.6', 'python3.7']}, 'security_groups': {'type': 'array'}, 'subnets': {'type': 'array'}, 'tags': {'type': 'object'}, 'timeout': {'type': 'number'}, 'tracing_config': {'type': 'object'}, 'type': {'enum': ['guard-duty']}}, 'required': ['type'], 'type': 'object'}
supported_resources = ('account', 'ec2', 'iam-user')
type = 'guard-duty'
validate()[source]

Validate configuration settings for execution mode.

class c7n.policy.LambdaMode(policy)[source]

Bases: c7n.policy.ServerlessExecutionMode

A policy that runs/executes in lambda.

POLICY_METRICS = ('ResourceCount',)
assume_member(event)[source]
get_logs(start, end)[source]

Retrieve logs for the policy

get_member_account_id(event)[source]
get_member_region(event)[source]
get_metrics(start, end, period)[source]

Retrieve any associated metrics for the policy.

provision()[source]

Provision any resources needed for the policy.

resolve_resources(event)[source]
run(event, lambda_context)[source]

Run policy in push mode against given event.

Lambda automatically generates cloud watch logs, and metrics for us, albeit with some deficienies, metrics no longer count against valid resources matches, but against execution.

If metrics execution option is enabled, custodian will generate metrics per normal.

schema = {'additionalProperties': False, 'properties': {'concurrency': {'type': 'integer'}, 'dead_letter_config': {'type': 'object'}, 'environment': {'type': 'object'}, 'execution-options': {'type': 'object'}, 'function-prefix': {'type': 'string'}, 'kms_key_arn': {'type': 'string'}, 'layers': {'items': {'type': 'string'}, 'type': 'array'}, 'member-role': {'type': 'string'}, 'memory': {'type': 'number'}, 'packages': {'items': {'type': 'string'}, 'type': 'array'}, 'role': {'type': 'string'}, 'runtime': {'enum': ['python2.7', 'python3.6', 'python3.7']}, 'security_groups': {'type': 'array'}, 'subnets': {'type': 'array'}, 'tags': {'type': 'object'}, 'timeout': {'type': 'number'}, 'tracing_config': {'type': 'object'}}, 'type': 'object'}
validate()[source]

Validate configuration settings for execution mode.

class c7n.policy.PHDMode(policy)[source]

Bases: c7n.policy.LambdaMode

Personal Health Dashboard event based policy execution.

static process_event_arns(client, event_arns)[source]
resolve_resources(event)[source]
schema = {'additionalProperties': False, 'properties': {'categories': {'items': {'enum': ['issue', 'accountNotification', 'scheduledChange']}, 'type': 'array'}, 'events': {'items': {'type': 'string'}, 'type': 'array'}, 'statuses': {'items': {'enum': ['open', 'upcoming', 'closed']}, 'type': 'array'}, 'type': {'enum': ['phd']}}, 'required': ['events', 'type'], 'type': 'object'}
type = 'phd'
validate()[source]

Validate configuration settings for execution mode.

class c7n.policy.PeriodicMode(policy)[source]

Bases: c7n.policy.LambdaMode, c7n.policy.PullMode

A policy that runs in pull mode within lambda.

POLICY_METRICS = ('ResourceCount', 'ResourceTime', 'ActionTime')
run(event, lambda_context)[source]

Run policy in push mode against given event.

Lambda automatically generates cloud watch logs, and metrics for us, albeit with some deficienies, metrics no longer count against valid resources matches, but against execution.

If metrics execution option is enabled, custodian will generate metrics per normal.

schema = {'additionalProperties': False, 'properties': {'concurrency': {'type': 'integer'}, 'dead_letter_config': {'type': 'object'}, 'environment': {'type': 'object'}, 'execution-options': {'type': 'object'}, 'function-prefix': {'type': 'string'}, 'kms_key_arn': {'type': 'string'}, 'layers': {'items': {'type': 'string'}, 'type': 'array'}, 'member-role': {'type': 'string'}, 'memory': {'type': 'number'}, 'packages': {'items': {'type': 'string'}, 'type': 'array'}, 'role': {'type': 'string'}, 'runtime': {'enum': ['python2.7', 'python3.6', 'python3.7']}, 'schedule': {'type': 'string'}, 'security_groups': {'type': 'array'}, 'subnets': {'type': 'array'}, 'tags': {'type': 'object'}, 'timeout': {'type': 'number'}, 'tracing_config': {'type': 'object'}, 'type': {'enum': ['periodic']}}, 'required': ['type'], 'type': 'object'}
type = 'periodic'
class c7n.policy.Policy(data, options, session_factory=None)[source]

Bases: object

end
execution_mode
expand_variables(variables)[source]

Expand variables in policy data.

Updates the policy data in-place.

get_cache()[source]
get_execution_mode()[source]
get_logs(start, end)[source]
get_metrics(start, end, period)[source]
get_permissions()[source]

get permissions needed by this policy

get_variables(variables=None)[source]

Get runtime variables for policy interpolation.

Runtime variables are merged with the passed in variables if any.

is_lambda
load_resource_manager()[source]
log = <Logger custodian.policy (DEBUG)>
max_resources
max_resources_percent
name
poll()[source]

Query resources and apply policy.

provider_name
provision()[source]

Provision policy as a lambda function.

push(event, lambda_ctx)[source]
region
resource_type
run()

Run policy in default mode

start
tags
tz
validate()[source]
validate_policy_start_stop()[source]
class c7n.policy.PolicyCollection(policies, options)[source]

Bases: object

filter(policy_name=None, resource_type=None)[source]
classmethod from_data(data, options)[source]
log = <Logger c7n.policies (DEBUG)>
resource_types

resource types used by the collection.

classmethod session_factory()[source]
class c7n.policy.PolicyExecutionMode(policy)[source]

Bases: object

Policy execution semantics

POLICY_METRICS = ('ResourceCount', 'ResourceTime', 'ActionTime')
get_logs(start, end)[source]

Retrieve logs for the policy

get_metrics(start, end, period)[source]

Retrieve any associated metrics for the policy.

provision()[source]

Provision any resources needed for the policy.

run(event=None, lambda_context=None)[source]

Run the actual policy.

validate()[source]

Validate configuration settings for execution mode.

class c7n.policy.PullMode(policy)[source]

Bases: c7n.policy.PolicyExecutionMode

Pull mode execution of a policy.

Queries resources from cloud provider for filtering and actions.

get_logs(start, end)[source]

Retrieve logs for the policy

is_runnable()[source]
run(*args, **kw)[source]

Run the actual policy.

schema = {'additionalProperties': False, 'properties': {'type': {'enum': ['pull']}}, 'required': ['type'], 'type': 'object'}
type = 'pull'
class c7n.policy.ServerlessExecutionMode(policy)[source]

Bases: c7n.policy.PolicyExecutionMode

get_logs(start, end)[source]

Retrieve logs for the policy

provision()[source]

Provision any resources needed for the policy.

run(event=None, lambda_context=None)[source]

Run the actual policy.

c7n.policy.load(options, path, format='yaml', validate=True, vars=None)[source]

c7n.provider module

class c7n.provider.Provider[source]

Bases: object

Provider Base Class

get_session_factory(options)[source]

Get a credential/session factory for api usage.

initialize(options)[source]

Perform any provider specific initialization

initialize_policies(policy_collection, options)[source]

Perform any initialization of policies.

Common usage is expanding policy collection for per region execution and filtering policies for applicable regions.

resource_prefix

resource prefix for this cloud provider in policy files.

resources

resources registry for this cloud provider

c7n.provider.resources(cloud_provider=None)[source]

c7n.query module

Query capability built on skew metamodel

tags_spec -> s3, elb, rds

class c7n.query.ChildDescribeSource(manager)[source]

Bases: c7n.query.DescribeSource

get_query()[source]
resource_query_factory

alias of ChildResourceQuery

type = 'describe-child'
class c7n.query.ChildResourceManager(data, options)[source]

Bases: c7n.query.QueryResourceManager

child_source = 'describe-child'
get_parent_manager()[source]
source_type
class c7n.query.ChildResourceQuery(session_factory, manager)[source]

Bases: c7n.query.ResourceQuery

A resource query for resources that must be queried with parent information.

Several resource types can only be queried in the context of their parents identifiers. ie. efs mount targets (parent efs), route53 resource records (parent hosted zone), ecs services (ecs cluster).

capture_parent_id = False
filter(resource_manager, **params)[source]

Query a set of resources.

get_parent_parameters(params, parent_id, parent_key)[source]
parent_key = 'c7n:parent-id'
class c7n.query.ConfigSource(manager)[source]

Bases: object

augment(resources)[source]
get_permissions()[source]
get_resources(ids, cache=True)[source]
load_resource(item)[source]
resources(query=None)[source]
static retry(func, *args, **kw)
type = 'config'
class c7n.query.DescribeSource(manager)[source]

Bases: object

QueryFactory

alias of ResourceQuery

augment(resources)[source]
get_permissions()[source]
get_resources(ids, cache=True)[source]
resources(query)[source]
type = 'describe'
class c7n.query.QueryMeta[source]

Bases: type

class c7n.query.QueryResourceManager(data, options)[source]

Bases: c7n.manager.ResourceManager

account_id

Return the current account ID.

This should now be passed in using the –account-id flag, but for a period of time we will support the old behavior of inferring this from IAM.

action_registry = <c7n.actions.core.ActionRegistry object>
augment(resources)[source]

subclasses may want to augment resources with additional information.

ie. we want tags by default (rds, elb), and policy, location, acl for s3 buckets.

check_resource_limit(selection_count, population_count)[source]

Check if policy’s execution affects more resources then its limit.

Ideally this would be at a higher level but we’ve hidden filtering behind the resource manager facade for default usage.

chunk_size = 20
filter_registry = <c7n.filters.core.FilterRegistry object>
generate_arn

Generates generic arn if ID is not already arn format.

get_arns(resources)[source]
get_cache_key(query)[source]
classmethod get_model()[source]

Returns the resource meta-model.

get_permissions()[source]
get_resources(ids, cache=True, augment=True)[source]

Retrieve a set of resources by id.

get_source(source_type)[source]
classmethod has_arn()[source]
classmethod match_ids(ids)[source]

return ids that match this resource type’s id format.

max_workers = 3
permissions = ()
region

Return the current region.

resource_type = ''
resources(query=None)[source]
static retry(func, *args, **kw)
source_type
class c7n.query.ResourceQuery(session_factory)[source]

Bases: object

filter(resource_manager, **params)[source]

Query a set of resources.

get(resource_manager, identities)[source]

Get resources by identities

static resolve(resource_type)[source]
class c7n.query.RetryPageIterator(method, input_token, output_token, more_results, result_keys, non_aggregate_keys, limit_key, max_items, starting_token, page_size, op_kwargs)[source]

Bases: botocore.paginate.PageIterator

static retry(func, *args, **kw)

c7n.registry module

class c7n.registry.PluginRegistry(plugin_type)[source]

Bases: object

A plugin registry

Custodian is intended to be innately pluggable both internally and externally, for resource types and their filters and actions.

This plugin registry abstraction provides the core mechanism for that. Its a simple string to class map, with python package entry_point loading for external plugins.

As an example of defining an external plugin using a python package

setup(
    name="custodian_cmdb",
    description="Custodian filters for interacting with internal CMDB"
    version='1.0',
    packages=find_packages(),
    entry_points={
         'console_scripts': [
              'custodian.ec2.filters = custodian_cmdb:filter_ec2']},
    )

For loading the plugins we can simply invoke method:load_plugins like so:

PluginRegistry('ec2.filters').load_plugins()
EVENTS = (0, 1)
EVENT_FINAL = 1
EVENT_REGISTER = 0
get(name)[source]
items()[source]
keys()[source]
load_plugins()[source]

Load external plugins.

Custodian is intended to interact with internal and external systems that are not suitable for embedding into the custodian code base.

notify(event, key=None)[source]
register(name, klass=None, condition=True, condition_message='Missing dependency for {}')[source]
subscribe(event, func)[source]
unregister(name)[source]

c7n.resolver module

class c7n.resolver.URIResolver(session_factory, cache)[source]

Bases: object

get_s3_uri(uri)[source]
resolve(uri)[source]
class c7n.resolver.ValuesFrom(data, manager)[source]

Bases: object

Retrieve values from a url.

Supports json, csv and line delimited text files and expressions to retrieve a subset of values.

Expression syntax - on json, a jmespath expr is evaluated - on csv, an integer column or jmespath expr can be specified - on csv2dict, a jmespath expr (the csv is parsed into a dictionary where the keys are the headers and the values are the remaining columns)

Text files are expected to be line delimited values.

Examples:

value_from:
   url: s3://bucket/xyz/foo.json
   expr: [].AppId

values_from:
   url: http://foobar.com/mydata
   format: json
   expr: Region."us-east-1"[].ImageId

value_from:
   url: s3://bucket/abc/foo.csv
   format: csv2dict
   expr: key[1]

 # inferred from extension
 format: [json, csv, csv2dict, txt]
get_contents()[source]
get_values()[source]
schema = {'additionalProperties': 'False', 'properties': {'expr': {'oneOf': [{'type': 'integer'}, {'type': 'string'}]}, 'format': {'enum': ['csv', 'json', 'txt', 'csv2dict']}, 'url': {'type': 'string'}}, 'required': ['url'], 'type': 'object'}
supported_formats = ('json', 'txt', 'csv', 'csv2dict')

c7n.schema module

Jsonschema validation of cloud custodian config.

We start with a walkthrough of the various class registries of resource types and assemble and generate the schema.

We do some specialization to reduce overall schema size via reference usage, although in some cases we prefer copies, due to issues with inheritance via reference ( allowedProperties and enum extension).

All filters and actions are annotated with schema typically using the utils.type_schema function.

c7n.schema.generate(resource_types=())[source]
c7n.schema.json_dump(resource=None)[source]
c7n.schema.process_resource(type_name, resource_type, resource_defs, alias_name=None, definitions=None)[source]
c7n.schema.resource_vocabulary(cloud_name=None, qualify_name=True)[source]
c7n.schema.specific_error(error)[source]

Try to find the best error for humans to resolve

The jsonschema.exceptions.best_match error is based purely on a mix of a strong match (ie. not anyOf, oneOf) and schema depth, this often yields odd results that are semantically confusing, instead we can use a bit of structural knowledge of schema to provide better results.

c7n.schema.summary(vocabulary)[source]
c7n.schema.validate(data, schema=None)[source]

c7n.sqsexec module

concurrent.futures implementation over sqs

Scatter/Gather or Map/Reduce style over two sqs queues.

class c7n.sqsexec.MessageIterator(client, queue_url, limit=0, timeout=10)[source]

Bases: object

ack(m)[source]
msg_attributes = ['sequence_id', 'op', 'ser']
next()
class c7n.sqsexec.SQSExecutor(session_factory, map_queue, reduce_queue)[source]

Bases: concurrent.futures._base.Executor

gather()[source]

Fetch results from separate queue

submit(func, *args, **kwargs)[source]

Submit a function for serialized execution on sqs

class c7n.sqsexec.SQSFuture(sequence_id)[source]

Bases: concurrent.futures._base.Future

marker = <object object>
class c7n.sqsexec.SQSWorker(session_factory, map_queue, reduce_queue, limit=0)[source]

Bases: object

process_message(m)[source]
run()[source]
stop()[source]
stopped = False
c7n.sqsexec.named(o)[source]
c7n.sqsexec.resolve(o)[source]

c7n.tags module

Generic EC2 Resource Tag / Filters and actions

These work for the whole family of resources associated to ec2 (subnets, vpc, security-groups, volumes, instances, snapshots).

class c7n.tags.CopyRelatedResourceTag(data=None, manager=None, log_dir=None)[source]

Bases: c7n.tags.Tag

Copy a related resource tag to its associated resource

In some scenarios, resource tags from a related resource should be applied to its child resource. For example, EBS Volume tags propogating to their snapshots. To use this action, specify the resource type that contains the tags that are to be copied, which can be found by using the custodian schema command.

Then, specify the key on the resource that references the related resource. In the case of ebs-snapshot, the VolumeId attribute would be the key that identifies the related resource, ebs.

Finally, specify a list of tag keys to copy from the related resource onto the original resource. The special character “*” can be used to signify that all tags from the related resource should be copied to the original resource.

To raise an error when related resources cannot be found, use the skip_missing option. By default, this is set to True.

Example:
policies:
    - name: copy-tags-from-ebs-volume-to-snapshot
      resource: ebs-snapshot
      actions:
        - type: copy-related-tag
          resource: ebs
          skip_missing: True
          key: VolumeId
          tags: '*'
get_permissions()[source]
get_resource_tag_map(r_type, ids)[source]

Returns a mapping of {resource_id: {tagkey: tagvalue}}

process(resources)[source]
process_resource(client, r, related_tags, tag_keys, tag_action)[source]
classmethod register_resources(registry, resource_class)[source]
schema = {'additionalProperties': False, 'properties': {'key': {'type': 'string'}, 'resource': {'type': 'string'}, 'skip_missing': {'type': 'boolean'}, 'tags': {'oneOf': [{'enum': ['*']}, {'type': 'array'}]}, 'type': {'enum': ['copy-related-tag']}}, 'required': ['tags', 'key', 'resource', 'type'], 'type': 'object'}
schema_alias = True
validate()[source]
class c7n.tags.NormalizeTag(data=None, manager=None, log_dir=None)[source]

Bases: c7n.actions.core.Action

Transform the value of a tag.

Set the tag value to uppercase, title, lowercase, or strip text from a tag key.

policies:
  - name: ec2-service-transform-lower
    resource: ec2
    comment: |
      ec2-service-tag-value-to-lower
    query:
      - instance-state-name: running
    filters:
      - "tag:testing8882": present
    actions:
      - type: normalize-tag
        key: lower_key
        action: lower

  - name: ec2-service-strip
    resource: ec2
    comment: |
      ec2-service-tag-strip-blah
    query:
      - instance-state-name: running
    filters:
      - "tag:testing8882": present
    actions:
      - type: normalize-tag
        key: strip_key
        action: strip
        value: blah
create_set(instances)[source]
create_tag(client, ids, key, value)[source]
filter_resources(resources)[source]
permissions = ('ec2:CreateTags',)
process(resources)[source]
process_transform(tag_value, resource_set)[source]

Transform tag value

  • Collect value from tag
  • Transform Tag value
  • Assign new value for key
schema = {'additionalProperties': False, 'properties': {'action': {'items': {'enum': ['upper', 'lower', 'titlestrip', 'replace']}, 'type': 'string'}, 'key': {'type': 'string'}, 'type': {'enum': ['normalize-tag']}, 'value': {'type': 'string'}}, 'required': ['type'], 'type': 'object'}
schema_alias = True
class c7n.tags.RemoveTag(data=None, manager=None, log_dir=None)[source]

Bases: c7n.actions.core.Action

Remove tags from ec2 resources.

batch_size = 100
concurrency = 2
get_client()[source]
permissions = ('ec2:DeleteTags',)
process(resources)[source]
process_resource_set(client, resource_set, tag_keys)[source]
schema = {'additionalProperties': False, 'properties': {'tags': {'items': {'type': 'string'}, 'type': 'array'}, 'type': {'enum': ['untag', 'unmark', 'remove-tag']}}, 'required': ['type'], 'type': 'object'}
class c7n.tags.RenameTag(data=None, manager=None, log_dir=None)[source]

Bases: c7n.actions.core.Action

Create a new tag with identical value & remove old tag

create_set(instances)[source]
create_tag(client, ids, key, value)[source]
delete_tag(client, ids, key, value)[source]
filter_resources(resources)[source]
get_client()[source]
permissions = ('ec2:CreateTags', 'ec2:DeleteTags')
process(resources)[source]
process_rename(client, tag_value, resource_set)[source]

Move source tag value to destination tag value

  • Collect value from old tag
  • Delete old tag
  • Create new tag & assign stored value
schema = {'additionalProperties': False, 'properties': {'new_key': {'type': 'string'}, 'old_key': {'type': 'string'}, 'type': {'enum': ['rename-tag']}}, 'required': ['type'], 'type': 'object'}
schema_alias = True
tag_count_max = 50
class c7n.tags.Tag(data=None, manager=None, log_dir=None)[source]

Bases: c7n.actions.core.Action

Tag an ec2 resource.

batch_size = 25
concurrency = 2
get_client()[source]
id_key = None
interpolate_values(tags)[source]
permissions = ('ec2:CreateTags',)
process(resources)[source]
process_resource_set(client, resource_set, tags)[source]
schema = {'additionalProperties': False, 'properties': {'key': {'type': 'string'}, 'tag': {'type': 'string'}, 'tags': {'type': 'object'}, 'type': {'enum': ['tag', 'mark']}, 'value': {'type': 'string'}}, 'required': ['type'], 'type': 'object'}
schema_alias = True
validate()[source]
class c7n.tags.TagActionFilter(data, manager=None)[source]

Bases: c7n.filters.core.Filter

Filter resources for tag specified future action

Filters resources by a ‘custodian_status’ tag which specifies a future date for an action.

The filter parses the tag values looking for an ‘op@date’ string. The date is parsed and compared to do today’s date, the filter succeeds if today’s date is gte to the target date.

The optional ‘skew’ parameter provides for incrementing today’s date a number of days into the future. An example use case might be sending a final notice email a few days before terminating an instance, or snapshotting a volume prior to deletion.

The optional ‘skew_hours’ parameter provides for incrementing the current time a number of hours into the future.

Optionally, the ‘tz’ parameter can get used to specify the timezone in which to interpret the clock (default value is ‘utc’)

- policies:
  - name: ec2-stop-marked
    resource: ec2
    filters:
      - type: marked-for-op
        # The default tag used is custodian_status
        # but that is configurable
        tag: custodian_status
        op: stop
        # Another optional tag is skew
        tz: utc
    actions:
      - stop
current_date = None
schema = {'additionalProperties': False, 'properties': {'op': {'type': 'string'}, 'skew': {'minimum': 0, 'type': 'number'}, 'skew_hours': {'minimum': 0, 'type': 'number'}, 'tag': {'type': 'string'}, 'type': {'enum': ['marked-for-op']}, 'tz': {'type': 'string'}}, 'required': ['type'], 'type': 'object'}
schema_alias = True
validate()[source]

validate filter config, return validation error or self

class c7n.tags.TagCountFilter(data, manager=None)[source]

Bases: c7n.filters.core.Filter

Simplify tag counting..

ie. these two blocks are equivalent

- filters:
    - type: value
      key: "[length(Tags)][0]"
      op: gte
      value: 8

- filters:
    - type: tag-count
      value: 8
schema = {'additionalProperties': False, 'properties': {'count': {'minimum': 0, 'type': 'integer'}, 'op': {'enum': ['eq', 'equal', 'ne', 'not-equal', 'gt', 'greater-than', 'ge', 'gte', 'le', 'lte', 'lt', 'less-than', 'glob', 'regex', 'regex-case', 'in', 'ni', 'not-in', 'contains', 'difference', 'intersect']}, 'type': {'enum': ['tag-count']}}, 'required': ['type'], 'type': 'object'}
schema_alias = True
class c7n.tags.TagDelayedAction(data=None, manager=None, log_dir=None)[source]

Bases: c7n.actions.core.Action

Tag resources for future action.

The optional ‘tz’ parameter can be used to adjust the clock to align with a given timezone. The default value is ‘utc’.

If neither ‘days’ nor ‘hours’ is specified, Cloud Custodian will default to marking the resource for action 4 days in the future.

policies:
  - name: ec2-mark-for-stop-in-future
    resource: ec2
    filters:
      - type: value
        key: Name
        value: instance-to-stop-in-four-days
    actions:
      - type: mark-for-op
        op: stop
batch_size = 200
concurrency = 2
default_template = 'Resource does not meet policy: {op}@{action_date}'
generate_timestamp(days, hours)[source]
get_client()[source]
get_permissions()[source]
process(resources)[source]
process_resource_set(client, resource_set, tags)[source]
schema = {'additionalProperties': False, 'properties': {'days': {'exclusiveMinimum': False, 'minimum': 0, 'type': 'integer'}, 'hours': {'exclusiveMinimum': False, 'minimum': 0, 'type': 'integer'}, 'msg': {'type': 'string'}, 'op': {'type': 'string'}, 'tag': {'type': 'string'}, 'type': {'enum': ['mark-for-op']}, 'tz': {'type': 'string'}}, 'required': ['type'], 'type': 'object'}
schema_alias = True
validate()[source]
class c7n.tags.TagTrim(data=None, manager=None, log_dir=None)[source]

Bases: c7n.actions.core.Action

Automatically remove tags from an ec2 resource.

EC2 Resources have a limit of 50 tags, in order to make additional tags space on a set of resources, this action can be used to remove enough tags to make the desired amount of space while preserving a given set of tags.

- policies:
   - name: ec2-tag-trim
     comment: |
       Any instances with 48 or more tags get tags removed until
       they match the target tag count, in this case 47 so we
       that we free up a tag slot for another usage.
     resource: ec2
     filters:
         # Filter down to resources which already have 8 tags
         # as we need space for 3 more, this also ensures that
         # metrics reporting is correct for the policy.
         type: value
         key: "[length(Tags)][0]"
         op: ge
         value: 48
     actions:
       - type: tag-trim
         space: 3
         preserve:
          - OwnerContact
          - ASV
          - CMDBEnvironment
          - downtime
          - custodian_status
max_tag_count = 50
permissions = ('ec2:DeleteTags',)
process(resources)[source]
process_resource(client, i)[source]
process_tag_removal(client, resource, tags)[source]
schema = {'additionalProperties': False, 'properties': {'preserve': {'items': {'type': 'string'}, 'type': 'array'}, 'space': {'type': 'integer'}, 'type': {'enum': ['tag-trim']}}, 'required': ['type'], 'type': 'object'}
schema_alias = True
class c7n.tags.UniversalTag(data=None, manager=None, log_dir=None)[source]

Bases: c7n.tags.Tag

Applies one or more tags to the specified resources.

batch_size = 20
concurrency = 1
get_client()[source]
permissions = ('resourcegroupstaggingapi:TagResources',)
process(resources)[source]
process_resource_set(client, resource_set, tags)[source]
class c7n.tags.UniversalTagDelayedAction(data=None, manager=None, log_dir=None)[source]

Bases: c7n.tags.TagDelayedAction

Tag resources for future action.

Example:
policies:
- name: ec2-mark-stop
  resource: ec2
  filters:
    - type: image-age
      op: ge
      days: 90
  actions:
    - type: mark-for-op
      tag: custodian_cleanup
      op: terminate
      days: 4
batch_size = 20
concurrency = 2
get_client()[source]
permissions = ('resourcegroupstaggingapi:TagResources',)
process(resources)[source]
process_resource_set(client, resource_set, tags)[source]
class c7n.tags.UniversalUntag(data=None, manager=None, log_dir=None)[source]

Bases: c7n.tags.RemoveTag

Removes the specified tags from the specified resources.

batch_size = 20
concurrency = 1
get_client()[source]
permissions = ('resourcegroupstaggingapi:UntagResources',)
process_resource_set(client, resource_set, tag_keys)[source]
c7n.tags.coalesce_copy_user_tags(resource, copy_tags, user_tags)[source]

Returns a list of tags from resource and user supplied in the format: [{‘Key’: ‘key’, ‘Value’: ‘value’}]

Due to drift on implementation on copy-tags/tags used throughout the code base, the following options are supported:

copy_tags (Tags to copy from the resource):
  • list of str, e.g. [‘key1’, ‘key2’, ‘*’]
  • bool
user_tags (User supplied tags to apply):
  • dict of key-value pairs, e.g. {Key: Value, Key2: Value}
  • list of dict e.g. [{‘Key’: k, ‘Value’: v}]

In the case that there is a conflict in a user supplied tag and an existing tag on the resource, the user supplied tags will take priority.

Additionally, a value of ‘*’ in copy_tags can be used to signify to copy all tags from the resource.

c7n.tags.register_ec2_tags(filters, actions)[source]
c7n.tags.register_universal_tags(filters, actions, compatibility=True)[source]
c7n.tags.universal_augment(self, resources)[source]
c7n.tags.universal_retry(method, ResourceARNList, **kw)[source]

Retry support for resourcegroup tagging apis.

The resource group tagging api typically returns a 200 status code with embedded resource specific errors. To enable resource specific retry on throttles, we extract those, perform backoff w/ jitter and continue. Other errors are immediately raised.

We do not aggregate unified resource responses across retries, only the last successful response is returned for a subset of the resources if a retry is performed.

c7n.testing module

class c7n.testing.TestUtils(methodName='runTest')[source]

Bases: unittest.case.TestCase

capture_logging(name=None, level=20, formatter=None, log_file=None)[source]
change_cwd(work_dir=None)[source]
change_environment(**kwargs)[source]

Change the environment to the given set of variables.

To clear an environment variable set it to None. Existing environment restored after test.

cleanUp()[source]
custodian_schema = None
get_context(config=None, session_factory=None, policy=None)[source]
get_temp_dir()[source]

Return a temporary directory that will get cleaned up.

load_policy(data, config=None, session_factory=None, validate=False, output_dir=None, cache=False)[source]
load_policy_set(data, config=None)[source]
patch(obj, attr, new)[source]
tearDown()[source]

Hook method for deconstructing the test fixture after testing it.

write_policy_file(policy, format='yaml')[source]

Write a policy file to disk in the specified format.

Input a dictionary and a format. Valid formats are yaml and json Returns the file path.

class c7n.testing.TextTestIO[source]

Bases: _io.StringIO

write(b)[source]

Write string to file.

Returns the number of characters written, which is always equal to the length of the string.

c7n.testing.mock_datetime_now(tgt, dt)[source]

c7n.utils module

class c7n.utils.DateTimeEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)[source]

Bases: json.encoder.JSONEncoder

default(obj)[source]

Implement this method in a subclass such that it returns a serializable object for o, or calls the base implementation (to raise a TypeError).

For example, to support arbitrary iterators, you could implement default like this:

def default(self, o):
    try:
        iterable = iter(o)
    except TypeError:
        pass
    else:
        return list(iterable)
    # Let the base class default method raise the TypeError
    return JSONEncoder.default(self, o)
class c7n.utils.FormatDate(d=None)[source]

Bases: object

a datetime wrapper with extended pyformat syntax

date_increment = re.compile('\\+[0-9]+[Mdh]')
classmethod utcnow()[source]
class c7n.utils.IPv4Network(address, strict=True)[source]

Bases: c7n.ipaddress.IPv4Network

class c7n.utils.QueryParser[source]

Bases: object

QuerySchema = {}
multi_value = True
classmethod parse(data)[source]
type_name = ''
value_key = 'Values'
class c7n.utils.UnicodeWriter(f, dialect=<class 'csv.excel'>, **kwds)[source]

Bases: object

utf8 encoding csv writer.

writerow(row)[source]
writerows(rows)[source]
exception c7n.utils.VarsSubstitutionError[source]

Bases: Exception

c7n.utils.annotation(i, k)[source]
c7n.utils.backoff_delays(start, stop, factor=2.0, jitter=False)[source]

Geometric backoff sequence w/ jitter

c7n.utils.camelResource(obj)[source]

Some sources from apis return lowerCased where as describe calls

always return TitleCase, this function turns the former to the later

c7n.utils.chunks(iterable, size=50)[source]

Break an iterable into lists of size

c7n.utils.dumps(data, fh=None, indent=0)[source]
c7n.utils.filter_empty(d)[source]
c7n.utils.format_event(evt)[source]
c7n.utils.format_string_values(obj, err_fallback=(<class 'IndexError'>, <class 'KeyError'>), *args, **kwargs)[source]

Format all string values in an object. Return the updated object

c7n.utils.generate_arn(service, resource, partition='aws', region=None, account_id=None, resource_type=None, separator='/')[source]

Generate an Amazon Resource Name. See http://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html.

c7n.utils.get_account_alias_from_sts(session)[source]
c7n.utils.get_account_id_from_sts(session)[source]
c7n.utils.get_retry(codes=(), max_attempts=8, min_delay=1, log_retries=False)[source]

Decorator for retry boto3 api call on transient errors.

https://www.awsarchitectureblog.com/2015/03/backoff.html https://en.wikipedia.org/wiki/Exponential_backoff

Parameters:
  • codes – A sequence of retryable error codes.
  • max_attempts – The max number of retries, by default the delay time is proportional to the max number of attempts.
  • log_retries – Whether we should log retries, if specified specifies the level at which the retry should be logged.
  • _max_delay – The maximum delay for any retry interval note this parameter is only exposed for unit testing, as its derived from the number of attempts.

Returns a function for invoking aws client calls that retries on retryable error codes.

c7n.utils.group_by(resources, key)[source]

Return a mapping of key value to resources with the corresponding value.

Key may be specified as dotted form for nested dictionary lookup

c7n.utils.load_file(path, format=None, vars=None)[source]
c7n.utils.loads(body)[source]
c7n.utils.local_session(factory)[source]

Cache a session thread local for up to 45m

c7n.utils.parse_cidr(value)[source]

Process cidr ranges.

c7n.utils.parse_s3(s3_path)[source]
c7n.utils.parse_url_config(url)[source]
c7n.utils.query_instances(session, client=None, **query)[source]

Return a list of ec2 instances for the query.

c7n.utils.reformat_schema(model)[source]

Reformat schema to be in a more displayable format.

c7n.utils.reset_session_cache()[source]
c7n.utils.set_annotation(i, k, v)[source]
>>> x = {}
>>> set_annotation(x, 'marker', 'a')
>>> annotation(x, 'marker')
['a']
c7n.utils.set_value_from_jmespath(source, expression, value, is_first=True)[source]
c7n.utils.snapshot_identifier(prefix, db_identifier)[source]

Return an identifier for a snapshot of a database or cluster.

c7n.utils.type_schema(type_name, inherits=None, rinherit=None, aliases=None, required=None, **props)[source]

jsonschema generation helper

params:
  • type_name: name of the type
  • inherits: list of document fragments that are required via anyOf[$ref]
  • rinherit: use another schema as a base for this, basically work around
    inherits issues with additionalProperties and type enums.
  • aliases: additional names this type maybe called
  • required: list of required properties, by default ‘type’ is required
  • props: additional key value properties
c7n.utils.worker(f)[source]

Generic wrapper to log uncaught exceptions in a function.

When we cross concurrent.futures executor boundaries we lose our traceback information, and when doing bulk operations we may tolerate transient failures on a partial subset. However we still want to have full accounting of the error in the logs, in a format that our error collection (cwl subscription) can still pickup.

c7n.utils.yaml_load(value)[source]

c7n.version module

Module contents