Source code for c7n.resources.dms

# Copyright 2017 Capital One Services, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from concurrent.futures import as_completed

from c7n.actions import BaseAction
from c7n.manager import resources
from c7n.query import QueryResourceManager, DescribeSource
from c7n.utils import local_session, chunks, type_schema, get_retry
from c7n.filters.vpc import SecurityGroupFilter, SubnetFilter, VpcFilter
from c7n.filters.kms import KmsRelatedFilter
from c7n.filters import FilterRegistry
from c7n.tags import RemoveTag, Tag, TagActionFilter, TagDelayedAction


[docs]@resources.register('dms-instance') class ReplicationInstance(QueryResourceManager):
[docs] class resource_type(object): service = 'dms' type = 'rep' enum_spec = ( 'describe_replication_instances', 'ReplicationInstances', None) name = id = 'ReplicationInstanceIdentifier' arn = 'ReplicationInstanceArn' date = 'InstanceCreateTime' dimension = None # The api supports filtering which we handle via describe source. filter_name = filter_type = None
filters = FilterRegistry('dms-instance.filters') filters.register('marked-for-op', TagActionFilter) filter_registry = filters retry = staticmethod(get_retry(('Throttled',)))
[docs] def get_source(self, source_type): if source_type == 'describe': return InstanceDescribe(self) return super(ReplicationInstance, self).get_source(source_type)
[docs]@resources.register('dms-endpoint') class DmsEndpoints(QueryResourceManager):
[docs] class resource_type(object): service = 'dms' enum_spec = ('describe_endpoints', 'Endpoints', None) detail_spec = None arn = id = 'EndpointArn' name = 'EndpointIdentifier' date = None dimension = None filter_name = None
[docs]class InstanceDescribe(DescribeSource):
[docs] def get_resources(self, resource_ids): return self.query.filter( self.manager, **{ 'Filters': [ {'Name': 'replication-instance-id', 'Values': resource_ids}]})
[docs] def augment(self, resources): client = local_session(self.manager.session_factory).client('dms') with self.manager.executor_factory(max_workers=2) as w: futures = [] for resource_set in chunks(resources, 20): futures.append( w.submit(self.process_resource_set, client, resources)) for f in as_completed(futures): if f.exception(): self.manager.log.warning( "Error retrieving replinstance tags: %s", f.exception()) return resources
[docs] def process_resource_set(self, client, resources): for arn, r in zip(self.manager.get_arns(resources), resources): self.manager.log.info("arn %s" % arn) try: r['Tags'] = client.list_tags_for_resource( ResourceArn=arn).get('TagList', []) except client.exceptions.ResourceNotFoundFault: continue
[docs]@ReplicationInstance.filter_registry.register('kms-key') class KmsFilter(KmsRelatedFilter): RelatedIdsExpression = 'KmsKeyId'
[docs]@ReplicationInstance.filter_registry.register('subnet') class Subnet(SubnetFilter): RelatedIdsExpression = 'ReplicationSubnetGroup.Subnets[].SubnetIdentifier'
[docs]@ReplicationInstance.filter_registry.register('security-group') class SecurityGroup(SecurityGroupFilter): RelatedIdsExpression = 'VpcSecurityGroups[].VpcSecurityGroupId'
[docs]@ReplicationInstance.filter_registry.register('vpc') class Vpc(VpcFilter): RelatedIdsExpression = 'ReplicationSubnetGroup.VpcId'
[docs]@ReplicationInstance.action_registry.register('delete') class InstanceDelete(BaseAction): schema = type_schema('delete') permissions = ('dms:DeleteReplicationInstance',)
[docs] def process(self, resources): client = local_session(self.manager.session_factory).client('dms') for arn, r in zip(self.manager.get_arns(resources), resources): client.delete_replication_instance(ReplicationInstanceArn=arn)
[docs]@ReplicationInstance.action_registry.register('modify-instance') class ModifyReplicationInstance(BaseAction): """Modify replication instance(s) to apply new settings :example: .. code-block: yaml policies: - name: enable-minor-version-upgrade resource: dms-instance filters: - AutoMinorVersionUpgrade: False actions: - type: modify-instance ApplyImmediately: True AutoMinorVersionUpgrade: True PreferredMaintenanceWindow: mon:23:00-mon:23:59 AWS ModifyReplicationInstance Documentation: https://docs.aws.amazon.com/dms/latest/APIReference/API_ModifyReplicationInstance.html """ schema = { 'type': 'object', 'additionalProperties': False, 'properties': { 'type': {'enum': ['modify-instance']}, 'ReplicationInstanceArn': {'type': 'string'}, 'AllocatedStorage': {'type': 'integer'}, 'ApplyImmediately': {'type': 'boolean'}, 'ReplicationInstanceClass': {'type': 'string'}, 'VpcSecurityGroupIds': { 'type': 'array', 'items': {'type': 'string'} }, 'PreferredMaintenanceWindow': {'type': 'string'}, 'MultiAZ': {'type': 'boolean'}, 'EngineVersion': {'type': 'string'}, 'AllowMajorVersionUpgrade': {'type': 'boolean'}, 'AutoMinorVersionUpgrade': {'type': 'boolean'}, 'ReplicationInstanceIdentifier': {'type': 'string'} } } permissions = ('dms:ModifyReplicationInstance',)
[docs] def process(self, resources): client = local_session(self.manager.session_factory).client('dms') params = dict(self.data) params.pop('type') for r in resources: params['ReplicationInstanceArn'] = r['ReplicationInstanceArn'] try: client.modify_replication_instance(**params) except (client.exceptions.InvalidResourceStateFault, client.exceptions.ResourceNotFoundFault, client.exceptions.ResourceAlreadyExistsFault, client.exceptions.UpgradeDependencyFailureFault): continue
[docs]@ReplicationInstance.action_registry.register('tag') class InstanceTag(Tag): """ Add tag(s) to a replication instance :example: .. code-block:: yaml policies: - name: tag-dms-required resource: dms-instance filters: - "tag:RequireTag": absent actions: - type: tag key: RequiredTag value: RequiredTagValue """ permissions = ('dms:AddTagsToResource',)
[docs] def process_resource_set(self, client, resources, tags): client = local_session(self.manager.session_factory).client('dms') for r in resources: try: client.add_tags_to_resource( ResourceArn=r['ReplicationInstanceArn'], Tags=tags) except client.exceptions.ResourceNotFoundFault: continue
[docs]@ReplicationInstance.action_registry.register('remove-tag') class InstanceRemoveTag(RemoveTag): """ Remove tag(s) from a replication instance :example: .. code-block:: yaml policies: - name: delete-single-az-dms resource: dms-instance filters: - "tag:InvalidTag": present actions: - type: remove-tag tags: ["InvalidTag"] """ permissions = ('dms:RemoveTagsFromResource',)
[docs] def process_resource_set(self, client, resources, tags): for r in resources: try: client.remove_tags_from_resource( ResourceArn=r['ReplicationInstanceArn'], TagKeys=tags) except client.exceptions.ResourceNotFoundFault: continue
[docs]@ReplicationInstance.action_registry.register('mark-for-op') class InstanceMarkForOp(TagDelayedAction): """ Tag a replication instance for action at a later time :example: .. code-block:: yaml policies: - name: delete-single-az-dms resource: dms-instance filters: - MultiAZ: False actions: - type: mark-for-op tag: custodian_dms_cleanup op: delete days: 7 """
[docs]@DmsEndpoints.action_registry.register('modify-endpoint') class ModifyDmsEndpoint(BaseAction): """Modify the attributes of a DMS endpoint :example: .. code-block: yaml policies: - name: dms-endpoint-modify resource: dms-endpoint filters: - EngineName: sqlserver - SslMode: none actions: - type: modify-endpoint SslMode: require AWS ModifyEndpoint Documentation https://docs.aws.amazon.com/dms/latest/APIReference/API_ModifyEndpoint.html """ schema = { 'type': 'object', 'additionalProperties': False, 'properties': { 'type': {'enum': ['modify-endpoint']}, 'Port': {'type': 'integer', 'minimum': 1, 'maximum': 65536}, 'ServerName': {'type': 'string'}, 'SslMode': {'type': 'string', 'enum': [ 'none', 'require', 'verify-ca', 'verify-full']}, 'CertificateArn': {'type': 'string'}, 'DatabaseName': {'type': 'string'}, 'EndpointIdentifier': {'type': 'string'}, 'EngineName': {'enum': [ 'mysql', 'oracle', 'postgres', 'mariadb', 'aurora', 'redshift', 'S3', 'sybase', 'dynamodb', 'mongodb', 'sqlserver']}, 'ExtraConnectionAttributes': {'type': 'string'}, 'Username': {'type': 'string'}, 'Password': {'type': 'string'}, 'DynamoDbSettings': { 'type': 'object', 'additionalProperties': False, 'required': ['ServiceAccessRoleArn'], 'properties': {'ServiceAccessRoleArn': {'type': 'string'}} }, 'S3Settings': { 'type': 'object', 'additionalProperties': False, 'properties': { 'BucketFolder': {'type': 'string'}, 'BucketName': {'type': 'string'}, 'CompressionType': { 'type': 'string', 'enum': ['none', 'gzip'] }, 'CsvDelimiter': {'type': 'string'}, 'CsvRowDelimiter': {'type': 'string'}, 'ExternalTableDefinition': {'type': 'string'}, 'ServiceAccessRoleArn': {'type': 'string'} } }, 'MongoDbSettings': { 'type': 'object', 'additionalProperties': False, 'properties': { 'AuthMechanism': { 'type': 'string', 'enum': [ 'default', 'mongodb_cr', 'scram_sha_1'] }, 'AuthSource': {'type': 'string'}, 'Username': {'type': 'string'}, 'Password': {'type': 'string'}, 'DatabaseName': {'type': 'string'}, 'DocsToInvestigate': {'type': 'integer', 'minimum': 1}, 'ExtractDocId': {'type': 'string'}, 'NestingLevel': { 'type': 'string', 'enum': [ 'NONE', 'none', 'ONE', 'one']}, 'Port': { 'type': 'integer', 'minimum': 1, 'maximum': 65535}, 'ServerName': {'type': 'string'} } } } } permissions = ('dms:ModifyEndpoint',)
[docs] def process(self, endpoints): client = local_session(self.manager.session_factory).client('dms') params = dict(self.data) params.pop('type') for e in endpoints: params['EndpointArn'] = e['EndpointArn'] params['EndpointIdentifier'] = params.get( 'EndpointIdentifier', e['EndpointIdentifier']) params['EngineName'] = params.get('EngineName', e['EngineName']) try: client.modify_endpoint(**params) except (client.exceptions.InvalidResourceStateFault, client.exceptions.ResourceAlreadyExistsFault, client.exceptions.ResourceNotFoundFault): continue
[docs]@DmsEndpoints.action_registry.register('delete') class DeleteDmsEndpoint(BaseAction): """Delete a DMS endpoint :example: .. code-block: yaml policies: - name: dms-endpoint-no-ssl-delete resource: dms-endpoint filters: - EngineName: mariadb - SslMode: none actions: - delete """ schema = type_schema('delete') permissions = ('dms:DeleteEndpoint',)
[docs] def process(self, endpoints): client = local_session(self.manager.session_factory).client('dms') for e in endpoints: EndpointArn = e['EndpointArn'] try: client.delete_endpoint(EndpointArn=EndpointArn) except client.exceptions.ResourceNotFoundFault: continue