List dependencies for services - add integration test to verify

This commit is contained in:
Bert Blommers 2020-09-13 16:08:23 +01:00
commit db1d7123f6
41 changed files with 1405 additions and 1245 deletions

View file

@ -2,7 +2,7 @@ import boto3
import sure # noqa
from moto import mock_codecommit
from moto.iam.models import ACCOUNT_ID
from moto.core import ACCOUNT_ID
from botocore.exceptions import ClientError
from nose.tools import assert_raises

View file

@ -1,6 +1,7 @@
from __future__ import unicode_literals
import boto3
import sure # noqa
from botocore.exceptions import ClientError
from nose.tools import assert_raises

View file

@ -12,7 +12,7 @@ import sure # noqa
from moto import mock_ec2_deprecated, mock_ec2
from moto.ec2.models import AMIS, OWNER_ID
from moto.iam.models import ACCOUNT_ID
from moto.core import ACCOUNT_ID
from tests.helpers import requires_boto_gte

View file

@ -0,0 +1,100 @@
from moto import mock_cloudformation_deprecated, mock_ec2_deprecated
from moto import mock_cloudformation, mock_ec2
from tests.test_cloudformation.fixtures import vpc_eni
import boto
import boto3
import json
import sure # noqa
@mock_ec2_deprecated
@mock_cloudformation_deprecated
def test_elastic_network_interfaces_cloudformation():
template = vpc_eni.template
template_json = json.dumps(template)
conn = boto.cloudformation.connect_to_region("us-west-1")
conn.create_stack("test_stack", template_body=template_json)
ec2_conn = boto.ec2.connect_to_region("us-west-1")
eni = ec2_conn.get_all_network_interfaces()[0]
eni.private_ip_addresses.should.have.length_of(1)
stack = conn.describe_stacks()[0]
resources = stack.describe_resources()
cfn_eni = [
resource
for resource in resources
if resource.resource_type == "AWS::EC2::NetworkInterface"
][0]
cfn_eni.physical_resource_id.should.equal(eni.id)
outputs = {output.key: output.value for output in stack.outputs}
outputs["ENIIpAddress"].should.equal(eni.private_ip_addresses[0].private_ip_address)
@mock_ec2
@mock_cloudformation
def test_volume_size_through_cloudformation():
ec2 = boto3.client("ec2", region_name="us-east-1")
cf = boto3.client("cloudformation", region_name="us-east-1")
volume_template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Resources": {
"testInstance": {
"Type": "AWS::EC2::Instance",
"Properties": {
"ImageId": "ami-d3adb33f",
"KeyName": "dummy",
"InstanceType": "t2.micro",
"BlockDeviceMappings": [
{"DeviceName": "/dev/sda2", "Ebs": {"VolumeSize": "50"}}
],
"Tags": [
{"Key": "foo", "Value": "bar"},
{"Key": "blah", "Value": "baz"},
],
},
}
},
}
template_json = json.dumps(volume_template)
cf.create_stack(StackName="test_stack", TemplateBody=template_json)
instances = ec2.describe_instances()
volume = instances["Reservations"][0]["Instances"][0]["BlockDeviceMappings"][0][
"Ebs"
]
volumes = ec2.describe_volumes(VolumeIds=[volume["VolumeId"]])
volumes["Volumes"][0]["Size"].should.equal(50)
@mock_ec2_deprecated
@mock_cloudformation_deprecated
def test_subnet_tags_through_cloudformation():
vpc_conn = boto.vpc.connect_to_region("us-west-1")
vpc = vpc_conn.create_vpc("10.0.0.0/16")
subnet_template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Resources": {
"testSubnet": {
"Type": "AWS::EC2::Subnet",
"Properties": {
"VpcId": vpc.id,
"CidrBlock": "10.0.0.0/24",
"AvailabilityZone": "us-west-1b",
"Tags": [
{"Key": "foo", "Value": "bar"},
{"Key": "blah", "Value": "baz"},
],
},
}
},
}
cf_conn = boto.cloudformation.connect_to_region("us-west-1")
template_json = json.dumps(subnet_template)
cf_conn.create_stack("test_stack", template_body=template_json)
subnet = vpc_conn.get_all_subnets(filters={"cidrBlock": "10.0.0.0/24"})[0]
subnet.tags["foo"].should.equal("bar")
subnet.tags["blah"].should.equal("baz")

View file

@ -7,15 +7,12 @@ from nose.tools import assert_raises
import boto3
from botocore.exceptions import ClientError
import boto
import boto.cloudformation
import boto.ec2
from boto.exception import EC2ResponseError
import sure # noqa
from moto import mock_ec2, mock_cloudformation_deprecated, mock_ec2_deprecated
from moto import mock_ec2, mock_ec2_deprecated
from tests.helpers import requires_boto_gte
from tests.test_cloudformation.fixtures import vpc_eni
import json
@mock_ec2_deprecated
@ -501,27 +498,3 @@ def test_elastic_network_interfaces_describe_network_interfaces_with_filter():
eni1.private_ip_address
)
response["NetworkInterfaces"][0]["Description"].should.equal(eni1.description)
@mock_ec2_deprecated
@mock_cloudformation_deprecated
def test_elastic_network_interfaces_cloudformation():
template = vpc_eni.template
template_json = json.dumps(template)
conn = boto.cloudformation.connect_to_region("us-west-1")
conn.create_stack("test_stack", template_body=template_json)
ec2_conn = boto.ec2.connect_to_region("us-west-1")
eni = ec2_conn.get_all_network_interfaces()[0]
eni.private_ip_addresses.should.have.length_of(1)
stack = conn.describe_stacks()[0]
resources = stack.describe_resources()
cfn_eni = [
resource
for resource in resources
if resource.resource_type == "AWS::EC2::NetworkInterface"
][0]
cfn_eni.physical_resource_id.should.equal(eni.id)
outputs = {output.key: output.value for output in stack.outputs}
outputs["ENIIpAddress"].should.equal(eni.private_ip_addresses[0].private_ip_address)

View file

@ -7,19 +7,17 @@ import tests.backport_assert_raises
from nose.tools import assert_raises
import base64
import datetime
import ipaddress
import json
import six
import boto
import boto3
from boto.ec2.instance import Reservation, InstanceAttribute
from boto.exception import EC2ResponseError, EC2ResponseError
from boto.exception import EC2ResponseError
from freezegun import freeze_time
import sure # noqa
from moto import mock_ec2_deprecated, mock_ec2, mock_cloudformation
from moto import mock_ec2_deprecated, mock_ec2
from tests.helpers import requires_boto_gte
@ -1673,40 +1671,3 @@ def test_describe_instance_attribute():
invalid_instance_attribute=invalid_instance_attribute
)
ex.exception.response["Error"]["Message"].should.equal(message)
@mock_ec2
@mock_cloudformation
def test_volume_size_through_cloudformation():
ec2 = boto3.client("ec2", region_name="us-east-1")
cf = boto3.client("cloudformation", region_name="us-east-1")
volume_template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Resources": {
"testInstance": {
"Type": "AWS::EC2::Instance",
"Properties": {
"ImageId": "ami-d3adb33f",
"KeyName": "dummy",
"InstanceType": "t2.micro",
"BlockDeviceMappings": [
{"DeviceName": "/dev/sda2", "Ebs": {"VolumeSize": "50"}}
],
"Tags": [
{"Key": "foo", "Value": "bar"},
{"Key": "blah", "Value": "baz"},
],
},
}
},
}
template_json = json.dumps(volume_template)
cf.create_stack(StackName="test_stack", TemplateBody=template_json)
instances = ec2.describe_instances()
volume = instances["Reservations"][0]["Instances"][0]["BlockDeviceMappings"][0][
"Ebs"
]
volumes = ec2.describe_volumes(VolumeIds=[volume["VolumeId"]])
volumes["Volumes"][0]["Size"].should.equal(50)

View file

@ -9,8 +9,8 @@ from botocore.exceptions import ClientError
import pytz
import sure # noqa
from moto import mock_ec2, mock_ec2_deprecated
from moto.backends import get_model
from moto import mock_ec2, mock_ec2_deprecated, settings
from moto.ec2.models import ec2_backends
from moto.core.utils import iso_8601_datetime_with_milliseconds
@ -184,13 +184,14 @@ def test_request_spot_instances_fulfilled():
request.state.should.equal("open")
get_model("SpotInstanceRequest", "us-east-1")[0].state = "active"
if not settings.TEST_SERVER_MODE:
ec2_backends["us-east-1"].spot_instance_requests[request.id].state = "active"
requests = conn.get_all_spot_instance_requests()
requests.should.have.length_of(1)
request = requests[0]
requests = conn.get_all_spot_instance_requests()
requests.should.have.length_of(1)
request = requests[0]
request.state.should.equal("active")
request.state.should.equal("active")
@mock_ec2_deprecated
@ -247,10 +248,11 @@ def test_request_spot_instances_setting_instance_id():
conn = boto.ec2.connect_to_region("us-east-1")
request = conn.request_spot_instances(price=0.5, image_id="ami-abcd1234")
req = get_model("SpotInstanceRequest", "us-east-1")[0]
req.state = "active"
req.instance_id = "i-12345678"
if not settings.TEST_SERVER_MODE:
req = ec2_backends["us-east-1"].spot_instance_requests[request[0].id]
req.state = "active"
req.instance_id = "i-12345678"
request = conn.get_all_spot_instance_requests()[0]
assert request.state == "active"
assert request.instance_id == "i-12345678"
request = conn.get_all_spot_instance_requests()[0]
assert request.state == "active"
assert request.instance_id == "i-12345678"

View file

@ -9,11 +9,10 @@ import boto
import boto.vpc
from boto.exception import EC2ResponseError
from botocore.exceptions import ParamValidationError, ClientError
import json
import sure # noqa
import random
from moto import mock_cloudformation_deprecated, mock_ec2, mock_ec2_deprecated
from moto import mock_ec2, mock_ec2_deprecated
@mock_ec2_deprecated
@ -311,38 +310,6 @@ def test_get_subnets_filtering():
).should.throw(NotImplementedError)
@mock_ec2_deprecated
@mock_cloudformation_deprecated
def test_subnet_tags_through_cloudformation():
vpc_conn = boto.vpc.connect_to_region("us-west-1")
vpc = vpc_conn.create_vpc("10.0.0.0/16")
subnet_template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Resources": {
"testSubnet": {
"Type": "AWS::EC2::Subnet",
"Properties": {
"VpcId": vpc.id,
"CidrBlock": "10.0.0.0/24",
"AvailabilityZone": "us-west-1b",
"Tags": [
{"Key": "foo", "Value": "bar"},
{"Key": "blah", "Value": "baz"},
],
},
}
},
}
cf_conn = boto.cloudformation.connect_to_region("us-west-1")
template_json = json.dumps(subnet_template)
cf_conn.create_stack("test_stack", template_body=template_json)
subnet = vpc_conn.get_all_subnets(filters={"cidrBlock": "10.0.0.0/24"})[0]
subnet.tags["foo"].should.equal("bar")
subnet.tags["blah"].should.equal("baz")
@mock_ec2
def test_create_subnet_response_fields():
ec2 = boto3.resource("ec2", region_name="us-west-1")

View file

@ -1,8 +1,6 @@
from __future__ import unicode_literals
from datetime import datetime
from copy import deepcopy
from botocore.exceptions import ClientError
import boto3
import sure # noqa
@ -10,7 +8,6 @@ import json
from moto.ec2 import utils as ec2_utils
from uuid import UUID
from moto import mock_cloudformation, mock_elbv2
from moto import mock_ecs
from moto import mock_ec2
from nose.tools import assert_raises
@ -1649,120 +1646,6 @@ def test_resource_reservation_and_release_memory_reservation():
container_instance_description["runningTasksCount"].should.equal(0)
@mock_ecs
@mock_cloudformation
def test_create_cluster_through_cloudformation():
template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "ECS Cluster Test CloudFormation",
"Resources": {
"testCluster": {
"Type": "AWS::ECS::Cluster",
"Properties": {"ClusterName": "testcluster"},
}
},
}
template_json = json.dumps(template)
ecs_conn = boto3.client("ecs", region_name="us-west-1")
resp = ecs_conn.list_clusters()
len(resp["clusterArns"]).should.equal(0)
cfn_conn = boto3.client("cloudformation", region_name="us-west-1")
cfn_conn.create_stack(StackName="test_stack", TemplateBody=template_json)
resp = ecs_conn.list_clusters()
len(resp["clusterArns"]).should.equal(1)
@mock_ecs
@mock_cloudformation
def test_create_cluster_through_cloudformation_no_name():
# cloudformation should create a cluster name for you if you do not provide it
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-ecs-cluster.html#cfn-ecs-cluster-clustername
template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "ECS Cluster Test CloudFormation",
"Resources": {"testCluster": {"Type": "AWS::ECS::Cluster"}},
}
template_json = json.dumps(template)
cfn_conn = boto3.client("cloudformation", region_name="us-west-1")
cfn_conn.create_stack(StackName="test_stack", TemplateBody=template_json)
ecs_conn = boto3.client("ecs", region_name="us-west-1")
resp = ecs_conn.list_clusters()
len(resp["clusterArns"]).should.equal(1)
@mock_ecs
@mock_cloudformation
def test_update_cluster_name_through_cloudformation_should_trigger_a_replacement():
template1 = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "ECS Cluster Test CloudFormation",
"Resources": {
"testCluster": {
"Type": "AWS::ECS::Cluster",
"Properties": {"ClusterName": "testcluster1"},
}
},
}
template2 = deepcopy(template1)
template2["Resources"]["testCluster"]["Properties"]["ClusterName"] = "testcluster2"
template1_json = json.dumps(template1)
cfn_conn = boto3.client("cloudformation", region_name="us-west-1")
stack_resp = cfn_conn.create_stack(
StackName="test_stack", TemplateBody=template1_json
)
template2_json = json.dumps(template2)
cfn_conn.update_stack(StackName=stack_resp["StackId"], TemplateBody=template2_json)
ecs_conn = boto3.client("ecs", region_name="us-west-1")
resp = ecs_conn.list_clusters()
len(resp["clusterArns"]).should.equal(1)
resp["clusterArns"][0].endswith("testcluster2").should.be.true
@mock_ecs
@mock_cloudformation
def test_create_task_definition_through_cloudformation():
template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "ECS Cluster Test CloudFormation",
"Resources": {
"testTaskDefinition": {
"Type": "AWS::ECS::TaskDefinition",
"Properties": {
"ContainerDefinitions": [
{
"Name": "ecs-sample",
"Image": "amazon/amazon-ecs-sample",
"Cpu": "200",
"Memory": "500",
"Essential": "true",
}
],
"Volumes": [],
},
}
},
}
template_json = json.dumps(template)
cfn_conn = boto3.client("cloudformation", region_name="us-west-1")
stack_name = "test_stack"
cfn_conn.create_stack(StackName=stack_name, TemplateBody=template_json)
ecs_conn = boto3.client("ecs", region_name="us-west-1")
resp = ecs_conn.list_task_definitions()
len(resp["taskDefinitionArns"]).should.equal(1)
task_definition_arn = resp["taskDefinitionArns"][0]
task_definition_details = cfn_conn.describe_stack_resource(
StackName=stack_name, LogicalResourceId="testTaskDefinition"
)["StackResourceDetail"]
task_definition_details["PhysicalResourceId"].should.equal(task_definition_arn)
@mock_ec2
@mock_ecs
def test_task_definitions_unable_to_be_placed():
@ -1877,142 +1760,6 @@ def test_task_definitions_with_port_clash():
response["tasks"][0]["stoppedReason"].should.equal("")
@mock_ecs
@mock_cloudformation
def test_update_task_definition_family_through_cloudformation_should_trigger_a_replacement():
template1 = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "ECS Cluster Test CloudFormation",
"Resources": {
"testTaskDefinition": {
"Type": "AWS::ECS::TaskDefinition",
"Properties": {
"Family": "testTaskDefinition1",
"ContainerDefinitions": [
{
"Name": "ecs-sample",
"Image": "amazon/amazon-ecs-sample",
"Cpu": "200",
"Memory": "500",
"Essential": "true",
}
],
"Volumes": [],
},
}
},
}
template1_json = json.dumps(template1)
cfn_conn = boto3.client("cloudformation", region_name="us-west-1")
cfn_conn.create_stack(StackName="test_stack", TemplateBody=template1_json)
template2 = deepcopy(template1)
template2["Resources"]["testTaskDefinition"]["Properties"][
"Family"
] = "testTaskDefinition2"
template2_json = json.dumps(template2)
cfn_conn.update_stack(StackName="test_stack", TemplateBody=template2_json)
ecs_conn = boto3.client("ecs", region_name="us-west-1")
resp = ecs_conn.list_task_definitions(familyPrefix="testTaskDefinition2")
len(resp["taskDefinitionArns"]).should.equal(1)
resp["taskDefinitionArns"][0].endswith("testTaskDefinition2:1").should.be.true
@mock_ecs
@mock_cloudformation
def test_create_service_through_cloudformation():
template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "ECS Cluster Test CloudFormation",
"Resources": {
"testCluster": {
"Type": "AWS::ECS::Cluster",
"Properties": {"ClusterName": "testcluster"},
},
"testTaskDefinition": {
"Type": "AWS::ECS::TaskDefinition",
"Properties": {
"ContainerDefinitions": [
{
"Name": "ecs-sample",
"Image": "amazon/amazon-ecs-sample",
"Cpu": "200",
"Memory": "500",
"Essential": "true",
}
],
"Volumes": [],
},
},
"testService": {
"Type": "AWS::ECS::Service",
"Properties": {
"Cluster": {"Ref": "testCluster"},
"DesiredCount": 10,
"TaskDefinition": {"Ref": "testTaskDefinition"},
},
},
},
}
template_json = json.dumps(template)
cfn_conn = boto3.client("cloudformation", region_name="us-west-1")
cfn_conn.create_stack(StackName="test_stack", TemplateBody=template_json)
ecs_conn = boto3.client("ecs", region_name="us-west-1")
resp = ecs_conn.list_services(cluster="testcluster")
len(resp["serviceArns"]).should.equal(1)
@mock_ecs
@mock_cloudformation
def test_update_service_through_cloudformation_should_trigger_replacement():
template1 = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "ECS Cluster Test CloudFormation",
"Resources": {
"testCluster": {
"Type": "AWS::ECS::Cluster",
"Properties": {"ClusterName": "testcluster"},
},
"testTaskDefinition": {
"Type": "AWS::ECS::TaskDefinition",
"Properties": {
"ContainerDefinitions": [
{
"Name": "ecs-sample",
"Image": "amazon/amazon-ecs-sample",
"Cpu": "200",
"Memory": "500",
"Essential": "true",
}
],
"Volumes": [],
},
},
"testService": {
"Type": "AWS::ECS::Service",
"Properties": {
"Cluster": {"Ref": "testCluster"},
"TaskDefinition": {"Ref": "testTaskDefinition"},
"DesiredCount": 10,
},
},
},
}
template_json1 = json.dumps(template1)
cfn_conn = boto3.client("cloudformation", region_name="us-west-1")
cfn_conn.create_stack(StackName="test_stack", TemplateBody=template_json1)
template2 = deepcopy(template1)
template2["Resources"]["testService"]["Properties"]["DesiredCount"] = 5
template2_json = json.dumps(template2)
cfn_conn.update_stack(StackName="test_stack", TemplateBody=template2_json)
ecs_conn = boto3.client("ecs", region_name="us-west-1")
resp = ecs_conn.list_services(cluster="testcluster")
len(resp["serviceArns"]).should.equal(1)
@mock_ec2
@mock_ecs
def test_attributes():

View file

@ -0,0 +1,253 @@
import boto3
import json
from copy import deepcopy
from moto import mock_cloudformation, mock_ecs
@mock_ecs
@mock_cloudformation
def test_update_task_definition_family_through_cloudformation_should_trigger_a_replacement():
template1 = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "ECS Cluster Test CloudFormation",
"Resources": {
"testTaskDefinition": {
"Type": "AWS::ECS::TaskDefinition",
"Properties": {
"Family": "testTaskDefinition1",
"ContainerDefinitions": [
{
"Name": "ecs-sample",
"Image": "amazon/amazon-ecs-sample",
"Cpu": "200",
"Memory": "500",
"Essential": "true",
}
],
"Volumes": [],
},
}
},
}
template1_json = json.dumps(template1)
cfn_conn = boto3.client("cloudformation", region_name="us-west-1")
cfn_conn.create_stack(StackName="test_stack", TemplateBody=template1_json)
template2 = deepcopy(template1)
template2["Resources"]["testTaskDefinition"]["Properties"][
"Family"
] = "testTaskDefinition2"
template2_json = json.dumps(template2)
cfn_conn.update_stack(StackName="test_stack", TemplateBody=template2_json)
ecs_conn = boto3.client("ecs", region_name="us-west-1")
resp = ecs_conn.list_task_definitions(familyPrefix="testTaskDefinition2")
len(resp["taskDefinitionArns"]).should.equal(1)
resp["taskDefinitionArns"][0].endswith("testTaskDefinition2:1").should.be.true
@mock_ecs
@mock_cloudformation
def test_create_service_through_cloudformation():
template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "ECS Cluster Test CloudFormation",
"Resources": {
"testCluster": {
"Type": "AWS::ECS::Cluster",
"Properties": {"ClusterName": "testcluster"},
},
"testTaskDefinition": {
"Type": "AWS::ECS::TaskDefinition",
"Properties": {
"ContainerDefinitions": [
{
"Name": "ecs-sample",
"Image": "amazon/amazon-ecs-sample",
"Cpu": "200",
"Memory": "500",
"Essential": "true",
}
],
"Volumes": [],
},
},
"testService": {
"Type": "AWS::ECS::Service",
"Properties": {
"Cluster": {"Ref": "testCluster"},
"DesiredCount": 10,
"TaskDefinition": {"Ref": "testTaskDefinition"},
},
},
},
}
template_json = json.dumps(template)
cfn_conn = boto3.client("cloudformation", region_name="us-west-1")
cfn_conn.create_stack(StackName="test_stack", TemplateBody=template_json)
ecs_conn = boto3.client("ecs", region_name="us-west-1")
resp = ecs_conn.list_services(cluster="testcluster")
len(resp["serviceArns"]).should.equal(1)
@mock_ecs
@mock_cloudformation
def test_update_service_through_cloudformation_should_trigger_replacement():
template1 = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "ECS Cluster Test CloudFormation",
"Resources": {
"testCluster": {
"Type": "AWS::ECS::Cluster",
"Properties": {"ClusterName": "testcluster"},
},
"testTaskDefinition": {
"Type": "AWS::ECS::TaskDefinition",
"Properties": {
"ContainerDefinitions": [
{
"Name": "ecs-sample",
"Image": "amazon/amazon-ecs-sample",
"Cpu": "200",
"Memory": "500",
"Essential": "true",
}
],
"Volumes": [],
},
},
"testService": {
"Type": "AWS::ECS::Service",
"Properties": {
"Cluster": {"Ref": "testCluster"},
"TaskDefinition": {"Ref": "testTaskDefinition"},
"DesiredCount": 10,
},
},
},
}
template_json1 = json.dumps(template1)
cfn_conn = boto3.client("cloudformation", region_name="us-west-1")
cfn_conn.create_stack(StackName="test_stack", TemplateBody=template_json1)
template2 = deepcopy(template1)
template2["Resources"]["testService"]["Properties"]["DesiredCount"] = 5
template2_json = json.dumps(template2)
cfn_conn.update_stack(StackName="test_stack", TemplateBody=template2_json)
ecs_conn = boto3.client("ecs", region_name="us-west-1")
resp = ecs_conn.list_services(cluster="testcluster")
len(resp["serviceArns"]).should.equal(1)
@mock_ecs
@mock_cloudformation
def test_create_cluster_through_cloudformation():
template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "ECS Cluster Test CloudFormation",
"Resources": {
"testCluster": {
"Type": "AWS::ECS::Cluster",
"Properties": {"ClusterName": "testcluster"},
}
},
}
template_json = json.dumps(template)
ecs_conn = boto3.client("ecs", region_name="us-west-1")
resp = ecs_conn.list_clusters()
len(resp["clusterArns"]).should.equal(0)
cfn_conn = boto3.client("cloudformation", region_name="us-west-1")
cfn_conn.create_stack(StackName="test_stack", TemplateBody=template_json)
resp = ecs_conn.list_clusters()
len(resp["clusterArns"]).should.equal(1)
@mock_ecs
@mock_cloudformation
def test_create_cluster_through_cloudformation_no_name():
# cloudformation should create a cluster name for you if you do not provide it
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-ecs-cluster.html#cfn-ecs-cluster-clustername
template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "ECS Cluster Test CloudFormation",
"Resources": {"testCluster": {"Type": "AWS::ECS::Cluster"}},
}
template_json = json.dumps(template)
cfn_conn = boto3.client("cloudformation", region_name="us-west-1")
cfn_conn.create_stack(StackName="test_stack", TemplateBody=template_json)
ecs_conn = boto3.client("ecs", region_name="us-west-1")
resp = ecs_conn.list_clusters()
len(resp["clusterArns"]).should.equal(1)
@mock_ecs
@mock_cloudformation
def test_update_cluster_name_through_cloudformation_should_trigger_a_replacement():
template1 = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "ECS Cluster Test CloudFormation",
"Resources": {
"testCluster": {
"Type": "AWS::ECS::Cluster",
"Properties": {"ClusterName": "testcluster1"},
}
},
}
template2 = deepcopy(template1)
template2["Resources"]["testCluster"]["Properties"]["ClusterName"] = "testcluster2"
template1_json = json.dumps(template1)
cfn_conn = boto3.client("cloudformation", region_name="us-west-1")
stack_resp = cfn_conn.create_stack(
StackName="test_stack", TemplateBody=template1_json
)
template2_json = json.dumps(template2)
cfn_conn.update_stack(StackName=stack_resp["StackId"], TemplateBody=template2_json)
ecs_conn = boto3.client("ecs", region_name="us-west-1")
resp = ecs_conn.list_clusters()
len(resp["clusterArns"]).should.equal(1)
resp["clusterArns"][0].endswith("testcluster2").should.be.true
@mock_ecs
@mock_cloudformation
def test_create_task_definition_through_cloudformation():
template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "ECS Cluster Test CloudFormation",
"Resources": {
"testTaskDefinition": {
"Type": "AWS::ECS::TaskDefinition",
"Properties": {
"ContainerDefinitions": [
{
"Name": "ecs-sample",
"Image": "amazon/amazon-ecs-sample",
"Cpu": "200",
"Memory": "500",
"Essential": "true",
}
],
"Volumes": [],
},
}
},
}
template_json = json.dumps(template)
cfn_conn = boto3.client("cloudformation", region_name="us-west-1")
stack_name = "test_stack"
cfn_conn.create_stack(StackName=stack_name, TemplateBody=template_json)
ecs_conn = boto3.client("ecs", region_name="us-west-1")
resp = ecs_conn.list_task_definitions()
len(resp["taskDefinitionArns"]).should.equal(1)
task_definition_arn = resp["taskDefinitionArns"][0]
task_definition_details = cfn_conn.describe_stack_resource(
StackName=stack_name, LogicalResourceId="testTaskDefinition"
)["StackResourceDetail"]
task_definition_details["PhysicalResourceId"].should.equal(task_definition_arn)

View file

@ -1,6 +1,5 @@
from __future__ import unicode_literals
import json
import os
import boto3
import botocore
@ -8,7 +7,7 @@ from botocore.exceptions import ClientError, ParamValidationError
from nose.tools import assert_raises
import sure # noqa
from moto import mock_elbv2, mock_ec2, mock_acm, mock_cloudformation
from moto import mock_elbv2, mock_ec2, mock_acm
from moto.elbv2 import elbv2_backends
from moto.core import ACCOUNT_ID
@ -1667,82 +1666,6 @@ def test_modify_listener_http_to_https():
)
@mock_ec2
@mock_elbv2
@mock_cloudformation
def test_create_target_groups_through_cloudformation():
cfn_conn = boto3.client("cloudformation", region_name="us-east-1")
elbv2_client = boto3.client("elbv2", region_name="us-east-1")
# test that setting a name manually as well as letting cloudformation create a name both work
# this is a special case because test groups have a name length limit of 22 characters, and must be unique
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-elasticloadbalancingv2-targetgroup.html#cfn-elasticloadbalancingv2-targetgroup-name
template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "ECS Cluster Test CloudFormation",
"Resources": {
"testVPC": {
"Type": "AWS::EC2::VPC",
"Properties": {"CidrBlock": "10.0.0.0/16"},
},
"testGroup1": {
"Type": "AWS::ElasticLoadBalancingV2::TargetGroup",
"Properties": {
"Port": 80,
"Protocol": "HTTP",
"VpcId": {"Ref": "testVPC"},
},
},
"testGroup2": {
"Type": "AWS::ElasticLoadBalancingV2::TargetGroup",
"Properties": {
"Port": 90,
"Protocol": "HTTP",
"VpcId": {"Ref": "testVPC"},
},
},
"testGroup3": {
"Type": "AWS::ElasticLoadBalancingV2::TargetGroup",
"Properties": {
"Name": "MyTargetGroup",
"Port": 70,
"Protocol": "HTTPS",
"VpcId": {"Ref": "testVPC"},
},
},
},
}
template_json = json.dumps(template)
cfn_conn.create_stack(StackName="test-stack", TemplateBody=template_json)
describe_target_groups_response = elbv2_client.describe_target_groups()
target_group_dicts = describe_target_groups_response["TargetGroups"]
assert len(target_group_dicts) == 3
# there should be 2 target groups with the same prefix of 10 characters (since the random suffix is 12)
# and one named MyTargetGroup
assert (
len(
[
tg
for tg in target_group_dicts
if tg["TargetGroupName"] == "MyTargetGroup"
]
)
== 1
)
assert (
len(
[
tg
for tg in target_group_dicts
if tg["TargetGroupName"].startswith("test-stack")
]
)
== 2
)
@mock_elbv2
@mock_ec2
def test_redirect_action_listener_rule():
@ -1816,95 +1739,6 @@ def test_redirect_action_listener_rule():
modify_listener_actions.should.equal(expected_default_actions)
@mock_elbv2
@mock_cloudformation
def test_redirect_action_listener_rule_cloudformation():
cnf_conn = boto3.client("cloudformation", region_name="us-east-1")
elbv2_client = boto3.client("elbv2", region_name="us-east-1")
template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "ECS Cluster Test CloudFormation",
"Resources": {
"testVPC": {
"Type": "AWS::EC2::VPC",
"Properties": {"CidrBlock": "10.0.0.0/16"},
},
"subnet1": {
"Type": "AWS::EC2::Subnet",
"Properties": {
"CidrBlock": "10.0.0.0/24",
"VpcId": {"Ref": "testVPC"},
"AvalabilityZone": "us-east-1b",
},
},
"subnet2": {
"Type": "AWS::EC2::Subnet",
"Properties": {
"CidrBlock": "10.0.1.0/24",
"VpcId": {"Ref": "testVPC"},
"AvalabilityZone": "us-east-1b",
},
},
"testLb": {
"Type": "AWS::ElasticLoadBalancingV2::LoadBalancer",
"Properties": {
"Name": "my-lb",
"Subnets": [{"Ref": "subnet1"}, {"Ref": "subnet2"}],
"Type": "application",
"SecurityGroups": [],
},
},
"testListener": {
"Type": "AWS::ElasticLoadBalancingV2::Listener",
"Properties": {
"LoadBalancerArn": {"Ref": "testLb"},
"Port": 80,
"Protocol": "HTTP",
"DefaultActions": [
{
"Type": "redirect",
"RedirectConfig": {
"Port": "443",
"Protocol": "HTTPS",
"StatusCode": "HTTP_301",
},
}
],
},
},
},
}
template_json = json.dumps(template)
cnf_conn.create_stack(StackName="test-stack", TemplateBody=template_json)
describe_load_balancers_response = elbv2_client.describe_load_balancers(
Names=["my-lb"]
)
describe_load_balancers_response["LoadBalancers"].should.have.length_of(1)
load_balancer_arn = describe_load_balancers_response["LoadBalancers"][0][
"LoadBalancerArn"
]
describe_listeners_response = elbv2_client.describe_listeners(
LoadBalancerArn=load_balancer_arn
)
describe_listeners_response["Listeners"].should.have.length_of(1)
describe_listeners_response["Listeners"][0]["DefaultActions"].should.equal(
[
{
"Type": "redirect",
"RedirectConfig": {
"Port": "443",
"Protocol": "HTTPS",
"StatusCode": "HTTP_301",
},
}
]
)
@mock_elbv2
@mock_ec2
def test_cognito_action_listener_rule():
@ -1962,97 +1796,6 @@ def test_cognito_action_listener_rule():
describe_listener_actions.should.equal(action)
@mock_elbv2
@mock_cloudformation
def test_cognito_action_listener_rule_cloudformation():
cnf_conn = boto3.client("cloudformation", region_name="us-east-1")
elbv2_client = boto3.client("elbv2", region_name="us-east-1")
template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "ECS Cluster Test CloudFormation",
"Resources": {
"testVPC": {
"Type": "AWS::EC2::VPC",
"Properties": {"CidrBlock": "10.0.0.0/16"},
},
"subnet1": {
"Type": "AWS::EC2::Subnet",
"Properties": {
"CidrBlock": "10.0.0.0/24",
"VpcId": {"Ref": "testVPC"},
"AvalabilityZone": "us-east-1b",
},
},
"subnet2": {
"Type": "AWS::EC2::Subnet",
"Properties": {
"CidrBlock": "10.0.1.0/24",
"VpcId": {"Ref": "testVPC"},
"AvalabilityZone": "us-east-1b",
},
},
"testLb": {
"Type": "AWS::ElasticLoadBalancingV2::LoadBalancer",
"Properties": {
"Name": "my-lb",
"Subnets": [{"Ref": "subnet1"}, {"Ref": "subnet2"}],
"Type": "application",
"SecurityGroups": [],
},
},
"testListener": {
"Type": "AWS::ElasticLoadBalancingV2::Listener",
"Properties": {
"LoadBalancerArn": {"Ref": "testLb"},
"Port": 80,
"Protocol": "HTTP",
"DefaultActions": [
{
"Type": "authenticate-cognito",
"AuthenticateCognitoConfig": {
"UserPoolArn": "arn:aws:cognito-idp:us-east-1:{}:userpool/us-east-1_ABCD1234".format(
ACCOUNT_ID
),
"UserPoolClientId": "abcd1234abcd",
"UserPoolDomain": "testpool",
},
}
],
},
},
},
}
template_json = json.dumps(template)
cnf_conn.create_stack(StackName="test-stack", TemplateBody=template_json)
describe_load_balancers_response = elbv2_client.describe_load_balancers(
Names=["my-lb"]
)
load_balancer_arn = describe_load_balancers_response["LoadBalancers"][0][
"LoadBalancerArn"
]
describe_listeners_response = elbv2_client.describe_listeners(
LoadBalancerArn=load_balancer_arn
)
describe_listeners_response["Listeners"].should.have.length_of(1)
describe_listeners_response["Listeners"][0]["DefaultActions"].should.equal(
[
{
"Type": "authenticate-cognito",
"AuthenticateCognitoConfig": {
"UserPoolArn": "arn:aws:cognito-idp:us-east-1:{}:userpool/us-east-1_ABCD1234".format(
ACCOUNT_ID
),
"UserPoolClientId": "abcd1234abcd",
"UserPoolDomain": "testpool",
},
}
]
)
@mock_elbv2
@mock_ec2
def test_fixed_response_action_listener_rule():
@ -2108,93 +1851,6 @@ def test_fixed_response_action_listener_rule():
describe_listener_actions.should.equal(action)
@mock_elbv2
@mock_cloudformation
def test_fixed_response_action_listener_rule_cloudformation():
cnf_conn = boto3.client("cloudformation", region_name="us-east-1")
elbv2_client = boto3.client("elbv2", region_name="us-east-1")
template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "ECS Cluster Test CloudFormation",
"Resources": {
"testVPC": {
"Type": "AWS::EC2::VPC",
"Properties": {"CidrBlock": "10.0.0.0/16"},
},
"subnet1": {
"Type": "AWS::EC2::Subnet",
"Properties": {
"CidrBlock": "10.0.0.0/24",
"VpcId": {"Ref": "testVPC"},
"AvalabilityZone": "us-east-1b",
},
},
"subnet2": {
"Type": "AWS::EC2::Subnet",
"Properties": {
"CidrBlock": "10.0.1.0/24",
"VpcId": {"Ref": "testVPC"},
"AvalabilityZone": "us-east-1b",
},
},
"testLb": {
"Type": "AWS::ElasticLoadBalancingV2::LoadBalancer",
"Properties": {
"Name": "my-lb",
"Subnets": [{"Ref": "subnet1"}, {"Ref": "subnet2"}],
"Type": "application",
"SecurityGroups": [],
},
},
"testListener": {
"Type": "AWS::ElasticLoadBalancingV2::Listener",
"Properties": {
"LoadBalancerArn": {"Ref": "testLb"},
"Port": 80,
"Protocol": "HTTP",
"DefaultActions": [
{
"Type": "fixed-response",
"FixedResponseConfig": {
"ContentType": "text/plain",
"MessageBody": "This page does not exist",
"StatusCode": "404",
},
}
],
},
},
},
}
template_json = json.dumps(template)
cnf_conn.create_stack(StackName="test-stack", TemplateBody=template_json)
describe_load_balancers_response = elbv2_client.describe_load_balancers(
Names=["my-lb"]
)
load_balancer_arn = describe_load_balancers_response["LoadBalancers"][0][
"LoadBalancerArn"
]
describe_listeners_response = elbv2_client.describe_listeners(
LoadBalancerArn=load_balancer_arn
)
describe_listeners_response["Listeners"].should.have.length_of(1)
describe_listeners_response["Listeners"][0]["DefaultActions"].should.equal(
[
{
"Type": "fixed-response",
"FixedResponseConfig": {
"ContentType": "text/plain",
"MessageBody": "This page does not exist",
"StatusCode": "404",
},
}
]
)
@mock_elbv2
@mock_ec2
def test_fixed_response_action_listener_rule_validates_status_code():

View file

@ -0,0 +1,348 @@
import boto3
import json
from moto import mock_elbv2, mock_ec2, mock_cloudformation
from moto.core import ACCOUNT_ID
@mock_elbv2
@mock_cloudformation
def test_redirect_action_listener_rule_cloudformation():
cnf_conn = boto3.client("cloudformation", region_name="us-east-1")
elbv2_client = boto3.client("elbv2", region_name="us-east-1")
template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "ECS Cluster Test CloudFormation",
"Resources": {
"testVPC": {
"Type": "AWS::EC2::VPC",
"Properties": {"CidrBlock": "10.0.0.0/16"},
},
"subnet1": {
"Type": "AWS::EC2::Subnet",
"Properties": {
"CidrBlock": "10.0.0.0/24",
"VpcId": {"Ref": "testVPC"},
"AvalabilityZone": "us-east-1b",
},
},
"subnet2": {
"Type": "AWS::EC2::Subnet",
"Properties": {
"CidrBlock": "10.0.1.0/24",
"VpcId": {"Ref": "testVPC"},
"AvalabilityZone": "us-east-1b",
},
},
"testLb": {
"Type": "AWS::ElasticLoadBalancingV2::LoadBalancer",
"Properties": {
"Name": "my-lb",
"Subnets": [{"Ref": "subnet1"}, {"Ref": "subnet2"}],
"Type": "application",
"SecurityGroups": [],
},
},
"testListener": {
"Type": "AWS::ElasticLoadBalancingV2::Listener",
"Properties": {
"LoadBalancerArn": {"Ref": "testLb"},
"Port": 80,
"Protocol": "HTTP",
"DefaultActions": [
{
"Type": "redirect",
"RedirectConfig": {
"Port": "443",
"Protocol": "HTTPS",
"StatusCode": "HTTP_301",
},
}
],
},
},
},
}
template_json = json.dumps(template)
cnf_conn.create_stack(StackName="test-stack", TemplateBody=template_json)
describe_load_balancers_response = elbv2_client.describe_load_balancers(
Names=["my-lb"]
)
describe_load_balancers_response["LoadBalancers"].should.have.length_of(1)
load_balancer_arn = describe_load_balancers_response["LoadBalancers"][0][
"LoadBalancerArn"
]
describe_listeners_response = elbv2_client.describe_listeners(
LoadBalancerArn=load_balancer_arn
)
describe_listeners_response["Listeners"].should.have.length_of(1)
describe_listeners_response["Listeners"][0]["DefaultActions"].should.equal(
[
{
"Type": "redirect",
"RedirectConfig": {
"Port": "443",
"Protocol": "HTTPS",
"StatusCode": "HTTP_301",
},
}
]
)
@mock_elbv2
@mock_cloudformation
def test_cognito_action_listener_rule_cloudformation():
cnf_conn = boto3.client("cloudformation", region_name="us-east-1")
elbv2_client = boto3.client("elbv2", region_name="us-east-1")
template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "ECS Cluster Test CloudFormation",
"Resources": {
"testVPC": {
"Type": "AWS::EC2::VPC",
"Properties": {"CidrBlock": "10.0.0.0/16"},
},
"subnet1": {
"Type": "AWS::EC2::Subnet",
"Properties": {
"CidrBlock": "10.0.0.0/24",
"VpcId": {"Ref": "testVPC"},
"AvalabilityZone": "us-east-1b",
},
},
"subnet2": {
"Type": "AWS::EC2::Subnet",
"Properties": {
"CidrBlock": "10.0.1.0/24",
"VpcId": {"Ref": "testVPC"},
"AvalabilityZone": "us-east-1b",
},
},
"testLb": {
"Type": "AWS::ElasticLoadBalancingV2::LoadBalancer",
"Properties": {
"Name": "my-lb",
"Subnets": [{"Ref": "subnet1"}, {"Ref": "subnet2"}],
"Type": "application",
"SecurityGroups": [],
},
},
"testListener": {
"Type": "AWS::ElasticLoadBalancingV2::Listener",
"Properties": {
"LoadBalancerArn": {"Ref": "testLb"},
"Port": 80,
"Protocol": "HTTP",
"DefaultActions": [
{
"Type": "authenticate-cognito",
"AuthenticateCognitoConfig": {
"UserPoolArn": "arn:aws:cognito-idp:us-east-1:{}:userpool/us-east-1_ABCD1234".format(
ACCOUNT_ID
),
"UserPoolClientId": "abcd1234abcd",
"UserPoolDomain": "testpool",
},
}
],
},
},
},
}
template_json = json.dumps(template)
cnf_conn.create_stack(StackName="test-stack", TemplateBody=template_json)
describe_load_balancers_response = elbv2_client.describe_load_balancers(
Names=["my-lb"]
)
load_balancer_arn = describe_load_balancers_response["LoadBalancers"][0][
"LoadBalancerArn"
]
describe_listeners_response = elbv2_client.describe_listeners(
LoadBalancerArn=load_balancer_arn
)
describe_listeners_response["Listeners"].should.have.length_of(1)
describe_listeners_response["Listeners"][0]["DefaultActions"].should.equal(
[
{
"Type": "authenticate-cognito",
"AuthenticateCognitoConfig": {
"UserPoolArn": "arn:aws:cognito-idp:us-east-1:{}:userpool/us-east-1_ABCD1234".format(
ACCOUNT_ID
),
"UserPoolClientId": "abcd1234abcd",
"UserPoolDomain": "testpool",
},
}
]
)
@mock_ec2
@mock_elbv2
@mock_cloudformation
def test_create_target_groups_through_cloudformation():
cfn_conn = boto3.client("cloudformation", region_name="us-east-1")
elbv2_client = boto3.client("elbv2", region_name="us-east-1")
# test that setting a name manually as well as letting cloudformation create a name both work
# this is a special case because test groups have a name length limit of 22 characters, and must be unique
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-elasticloadbalancingv2-targetgroup.html#cfn-elasticloadbalancingv2-targetgroup-name
template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "ECS Cluster Test CloudFormation",
"Resources": {
"testVPC": {
"Type": "AWS::EC2::VPC",
"Properties": {"CidrBlock": "10.0.0.0/16"},
},
"testGroup1": {
"Type": "AWS::ElasticLoadBalancingV2::TargetGroup",
"Properties": {
"Port": 80,
"Protocol": "HTTP",
"VpcId": {"Ref": "testVPC"},
},
},
"testGroup2": {
"Type": "AWS::ElasticLoadBalancingV2::TargetGroup",
"Properties": {
"Port": 90,
"Protocol": "HTTP",
"VpcId": {"Ref": "testVPC"},
},
},
"testGroup3": {
"Type": "AWS::ElasticLoadBalancingV2::TargetGroup",
"Properties": {
"Name": "MyTargetGroup",
"Port": 70,
"Protocol": "HTTPS",
"VpcId": {"Ref": "testVPC"},
},
},
},
}
template_json = json.dumps(template)
cfn_conn.create_stack(StackName="test-stack", TemplateBody=template_json)
describe_target_groups_response = elbv2_client.describe_target_groups()
target_group_dicts = describe_target_groups_response["TargetGroups"]
assert len(target_group_dicts) == 3
# there should be 2 target groups with the same prefix of 10 characters (since the random suffix is 12)
# and one named MyTargetGroup
assert (
len(
[
tg
for tg in target_group_dicts
if tg["TargetGroupName"] == "MyTargetGroup"
]
)
== 1
)
assert (
len(
[
tg
for tg in target_group_dicts
if tg["TargetGroupName"].startswith("test-stack")
]
)
== 2
)
@mock_elbv2
@mock_cloudformation
def test_fixed_response_action_listener_rule_cloudformation():
cnf_conn = boto3.client("cloudformation", region_name="us-east-1")
elbv2_client = boto3.client("elbv2", region_name="us-east-1")
template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "ECS Cluster Test CloudFormation",
"Resources": {
"testVPC": {
"Type": "AWS::EC2::VPC",
"Properties": {"CidrBlock": "10.0.0.0/16"},
},
"subnet1": {
"Type": "AWS::EC2::Subnet",
"Properties": {
"CidrBlock": "10.0.0.0/24",
"VpcId": {"Ref": "testVPC"},
"AvalabilityZone": "us-east-1b",
},
},
"subnet2": {
"Type": "AWS::EC2::Subnet",
"Properties": {
"CidrBlock": "10.0.1.0/24",
"VpcId": {"Ref": "testVPC"},
"AvalabilityZone": "us-east-1b",
},
},
"testLb": {
"Type": "AWS::ElasticLoadBalancingV2::LoadBalancer",
"Properties": {
"Name": "my-lb",
"Subnets": [{"Ref": "subnet1"}, {"Ref": "subnet2"}],
"Type": "application",
"SecurityGroups": [],
},
},
"testListener": {
"Type": "AWS::ElasticLoadBalancingV2::Listener",
"Properties": {
"LoadBalancerArn": {"Ref": "testLb"},
"Port": 80,
"Protocol": "HTTP",
"DefaultActions": [
{
"Type": "fixed-response",
"FixedResponseConfig": {
"ContentType": "text/plain",
"MessageBody": "This page does not exist",
"StatusCode": "404",
},
}
],
},
},
},
}
template_json = json.dumps(template)
cnf_conn.create_stack(StackName="test-stack", TemplateBody=template_json)
describe_load_balancers_response = elbv2_client.describe_load_balancers(
Names=["my-lb"]
)
load_balancer_arn = describe_load_balancers_response["LoadBalancers"][0][
"LoadBalancerArn"
]
describe_listeners_response = elbv2_client.describe_listeners(
LoadBalancerArn=load_balancer_arn
)
describe_listeners_response["Listeners"].should.have.length_of(1)
describe_listeners_response["Listeners"][0]["DefaultActions"].should.equal(
[
{
"Type": "fixed-response",
"FixedResponseConfig": {
"ContentType": "text/plain",
"MessageBody": "This page does not exist",
"StatusCode": "404",
},
}
]
)

View file

@ -0,0 +1,383 @@
import base64
import boto3
import json
import time
import zlib
from botocore.exceptions import ClientError
from io import BytesIO
from moto import mock_logs, mock_lambda, mock_iam
from nose.tools import assert_raises
from zipfile import ZipFile, ZIP_DEFLATED
@mock_lambda
@mock_logs
def test_put_subscription_filter_update():
# given
region_name = "us-east-1"
client_lambda = boto3.client("lambda", region_name)
client_logs = boto3.client("logs", region_name)
log_group_name = "/test"
log_stream_name = "stream"
client_logs.create_log_group(logGroupName=log_group_name)
client_logs.create_log_stream(
logGroupName=log_group_name, logStreamName=log_stream_name
)
function_arn = client_lambda.create_function(
FunctionName="test",
Runtime="python3.8",
Role=_get_role_name(region_name),
Handler="lambda_function.lambda_handler",
Code={"ZipFile": _get_test_zip_file()},
Description="test lambda function",
Timeout=3,
MemorySize=128,
Publish=True,
)["FunctionArn"]
# when
client_logs.put_subscription_filter(
logGroupName=log_group_name,
filterName="test",
filterPattern="",
destinationArn=function_arn,
)
# then
response = client_logs.describe_subscription_filters(logGroupName=log_group_name)
response["subscriptionFilters"].should.have.length_of(1)
filter = response["subscriptionFilters"][0]
creation_time = filter["creationTime"]
creation_time.should.be.a(int)
filter["destinationArn"] = "arn:aws:lambda:us-east-1:123456789012:function:test"
filter["distribution"] = "ByLogStream"
filter["logGroupName"] = "/test"
filter["filterName"] = "test"
filter["filterPattern"] = ""
# when
# to update an existing subscription filter the 'filerName' must be identical
client_logs.put_subscription_filter(
logGroupName=log_group_name,
filterName="test",
filterPattern="[]",
destinationArn=function_arn,
)
# then
response = client_logs.describe_subscription_filters(logGroupName=log_group_name)
response["subscriptionFilters"].should.have.length_of(1)
filter = response["subscriptionFilters"][0]
filter["creationTime"].should.equal(creation_time)
filter["destinationArn"] = "arn:aws:lambda:us-east-1:123456789012:function:test"
filter["distribution"] = "ByLogStream"
filter["logGroupName"] = "/test"
filter["filterName"] = "test"
filter["filterPattern"] = "[]"
# when
# only one subscription filter can be associated with a log group
with assert_raises(ClientError) as e:
client_logs.put_subscription_filter(
logGroupName=log_group_name,
filterName="test-2",
filterPattern="",
destinationArn=function_arn,
)
# then
ex = e.exception
ex.operation_name.should.equal("PutSubscriptionFilter")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("LimitExceededException")
ex.response["Error"]["Message"].should.equal("Resource limit exceeded.")
@mock_lambda
@mock_logs
def test_put_subscription_filter_with_lambda():
# given
region_name = "us-east-1"
client_lambda = boto3.client("lambda", region_name)
client_logs = boto3.client("logs", region_name)
log_group_name = "/test"
log_stream_name = "stream"
client_logs.create_log_group(logGroupName=log_group_name)
client_logs.create_log_stream(
logGroupName=log_group_name, logStreamName=log_stream_name
)
function_arn = client_lambda.create_function(
FunctionName="test",
Runtime="python3.8",
Role=_get_role_name(region_name),
Handler="lambda_function.lambda_handler",
Code={"ZipFile": _get_test_zip_file()},
Description="test lambda function",
Timeout=3,
MemorySize=128,
Publish=True,
)["FunctionArn"]
# when
client_logs.put_subscription_filter(
logGroupName=log_group_name,
filterName="test",
filterPattern="",
destinationArn=function_arn,
)
# then
response = client_logs.describe_subscription_filters(logGroupName=log_group_name)
response["subscriptionFilters"].should.have.length_of(1)
filter = response["subscriptionFilters"][0]
filter["creationTime"].should.be.a(int)
filter["destinationArn"] = "arn:aws:lambda:us-east-1:123456789012:function:test"
filter["distribution"] = "ByLogStream"
filter["logGroupName"] = "/test"
filter["filterName"] = "test"
filter["filterPattern"] = ""
# when
client_logs.put_log_events(
logGroupName=log_group_name,
logStreamName=log_stream_name,
logEvents=[
{"timestamp": 0, "message": "test"},
{"timestamp": 0, "message": "test 2"},
],
)
# then
msg_showed_up, received_message = _wait_for_log_msg(
client_logs, "/aws/lambda/test", "awslogs"
)
assert msg_showed_up, "CloudWatch log event was not found. All logs: {}".format(
received_message
)
data = json.loads(received_message)["awslogs"]["data"]
response = json.loads(
zlib.decompress(base64.b64decode(data), 16 + zlib.MAX_WBITS).decode("utf-8")
)
response["messageType"].should.equal("DATA_MESSAGE")
response["owner"].should.equal("123456789012")
response["logGroup"].should.equal("/test")
response["logStream"].should.equal("stream")
response["subscriptionFilters"].should.equal(["test"])
log_events = sorted(response["logEvents"], key=lambda log_event: log_event["id"])
log_events.should.have.length_of(2)
log_events[0]["id"].should.be.a(int)
log_events[0]["message"].should.equal("test")
log_events[0]["timestamp"].should.equal(0)
log_events[1]["id"].should.be.a(int)
log_events[1]["message"].should.equal("test 2")
log_events[1]["timestamp"].should.equal(0)
@mock_lambda
@mock_logs
def test_delete_subscription_filter_errors():
# given
region_name = "us-east-1"
client_lambda = boto3.client("lambda", region_name)
client_logs = boto3.client("logs", region_name)
log_group_name = "/test"
client_logs.create_log_group(logGroupName=log_group_name)
function_arn = client_lambda.create_function(
FunctionName="test",
Runtime="python3.8",
Role=_get_role_name(region_name),
Handler="lambda_function.lambda_handler",
Code={"ZipFile": _get_test_zip_file()},
Description="test lambda function",
Timeout=3,
MemorySize=128,
Publish=True,
)["FunctionArn"]
client_logs.put_subscription_filter(
logGroupName=log_group_name,
filterName="test",
filterPattern="",
destinationArn=function_arn,
)
# when
client_logs.delete_subscription_filter(
logGroupName="/test", filterName="test",
)
# then
response = client_logs.describe_subscription_filters(logGroupName=log_group_name)
response["subscriptionFilters"].should.have.length_of(0)
@mock_lambda
@mock_logs
def test_delete_subscription_filter_errors():
# given
region_name = "us-east-1"
client_lambda = boto3.client("lambda", region_name)
client_logs = boto3.client("logs", region_name)
log_group_name = "/test"
client_logs.create_log_group(logGroupName=log_group_name)
function_arn = client_lambda.create_function(
FunctionName="test",
Runtime="python3.8",
Role=_get_role_name(region_name),
Handler="lambda_function.lambda_handler",
Code={"ZipFile": _get_test_zip_file()},
Description="test lambda function",
Timeout=3,
MemorySize=128,
Publish=True,
)["FunctionArn"]
client_logs.put_subscription_filter(
logGroupName=log_group_name,
filterName="test",
filterPattern="",
destinationArn=function_arn,
)
# when
with assert_raises(ClientError) as e:
client_logs.delete_subscription_filter(
logGroupName="not-existing-log-group", filterName="test",
)
# then
ex = e.exception
ex.operation_name.should.equal("DeleteSubscriptionFilter")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("ResourceNotFoundException")
ex.response["Error"]["Message"].should.equal(
"The specified log group does not exist"
)
# when
with assert_raises(ClientError) as e:
client_logs.delete_subscription_filter(
logGroupName="/test", filterName="wrong-filter-name",
)
# then
ex = e.exception
ex.operation_name.should.equal("DeleteSubscriptionFilter")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("ResourceNotFoundException")
ex.response["Error"]["Message"].should.equal(
"The specified subscription filter does not exist."
)
@mock_logs
def test_put_subscription_filter_errors():
# given
client = boto3.client("logs", "us-east-1")
log_group_name = "/test"
client.create_log_group(logGroupName=log_group_name)
# when
with assert_raises(ClientError) as e:
client.put_subscription_filter(
logGroupName="not-existing-log-group",
filterName="test",
filterPattern="",
destinationArn="arn:aws:lambda:us-east-1:123456789012:function:test",
)
# then
ex = e.exception
ex.operation_name.should.equal("PutSubscriptionFilter")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("ResourceNotFoundException")
ex.response["Error"]["Message"].should.equal(
"The specified log group does not exist"
)
# when
with assert_raises(ClientError) as e:
client.put_subscription_filter(
logGroupName="/test",
filterName="test",
filterPattern="",
destinationArn="arn:aws:lambda:us-east-1:123456789012:function:not-existing",
)
# then
ex = e.exception
ex.operation_name.should.equal("PutSubscriptionFilter")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("InvalidParameterException")
ex.response["Error"]["Message"].should.equal(
"Could not execute the lambda function. "
"Make sure you have given CloudWatch Logs permission to execute your function."
)
# when
with assert_raises(ClientError) as e:
client.put_subscription_filter(
logGroupName="/test",
filterName="test",
filterPattern="",
destinationArn="arn:aws:lambda:us-east-1:123456789012:function:not-existing",
)
# then
ex = e.exception
ex.operation_name.should.equal("PutSubscriptionFilter")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("InvalidParameterException")
ex.response["Error"]["Message"].should.equal(
"Could not execute the lambda function. "
"Make sure you have given CloudWatch Logs permission to execute your function."
)
def _get_role_name(region_name):
with mock_iam():
iam = boto3.client("iam", region_name=region_name)
try:
return iam.get_role(RoleName="test-role")["Role"]["Arn"]
except ClientError:
return iam.create_role(
RoleName="test-role", AssumeRolePolicyDocument="test policy", Path="/",
)["Role"]["Arn"]
def _get_test_zip_file():
func_str = """
def lambda_handler(event, context):
return event
"""
zip_output = BytesIO()
zip_file = ZipFile(zip_output, "w", ZIP_DEFLATED)
zip_file.writestr("lambda_function.py", func_str)
zip_file.close()
zip_output.seek(0)
return zip_output.read()
def _wait_for_log_msg(client, log_group_name, expected_msg_part):
received_messages = []
start = time.time()
while (time.time() - start) < 10:
result = client.describe_log_streams(logGroupName=log_group_name)
log_streams = result.get("logStreams")
if not log_streams:
time.sleep(1)
continue
for log_stream in log_streams:
result = client.get_log_events(
logGroupName=log_group_name, logStreamName=log_stream["logStreamName"],
)
received_messages.extend(
[event["message"] for event in result.get("events")]
)
for message in received_messages:
if expected_msg_part in message:
return True, message
time.sleep(1)
return False, received_messages

View file

@ -1,17 +1,10 @@
import base64
import json
import time
import zlib
from io import BytesIO
from zipfile import ZipFile, ZIP_DEFLATED
import boto3
import os
import sure # noqa
import six
from botocore.exceptions import ClientError
from moto import mock_logs, settings, mock_lambda, mock_iam
from moto import mock_logs, settings
from nose.tools import assert_raises
from nose import SkipTest
@ -465,375 +458,3 @@ def test_describe_subscription_filters_errors():
ex.response["Error"]["Message"].should.equal(
"The specified log group does not exist"
)
@mock_lambda
@mock_logs
def test_put_subscription_filter_update():
# given
region_name = "us-east-1"
client_lambda = boto3.client("lambda", region_name)
client_logs = boto3.client("logs", region_name)
log_group_name = "/test"
log_stream_name = "stream"
client_logs.create_log_group(logGroupName=log_group_name)
client_logs.create_log_stream(
logGroupName=log_group_name, logStreamName=log_stream_name
)
function_arn = client_lambda.create_function(
FunctionName="test",
Runtime="python3.8",
Role=_get_role_name(region_name),
Handler="lambda_function.lambda_handler",
Code={"ZipFile": _get_test_zip_file()},
Description="test lambda function",
Timeout=3,
MemorySize=128,
Publish=True,
)["FunctionArn"]
# when
client_logs.put_subscription_filter(
logGroupName=log_group_name,
filterName="test",
filterPattern="",
destinationArn=function_arn,
)
# then
response = client_logs.describe_subscription_filters(logGroupName=log_group_name)
response["subscriptionFilters"].should.have.length_of(1)
filter = response["subscriptionFilters"][0]
creation_time = filter["creationTime"]
creation_time.should.be.a(int)
filter["destinationArn"] = "arn:aws:lambda:us-east-1:123456789012:function:test"
filter["distribution"] = "ByLogStream"
filter["logGroupName"] = "/test"
filter["filterName"] = "test"
filter["filterPattern"] = ""
# when
# to update an existing subscription filter the 'filerName' must be identical
client_logs.put_subscription_filter(
logGroupName=log_group_name,
filterName="test",
filterPattern="[]",
destinationArn=function_arn,
)
# then
response = client_logs.describe_subscription_filters(logGroupName=log_group_name)
response["subscriptionFilters"].should.have.length_of(1)
filter = response["subscriptionFilters"][0]
filter["creationTime"].should.equal(creation_time)
filter["destinationArn"] = "arn:aws:lambda:us-east-1:123456789012:function:test"
filter["distribution"] = "ByLogStream"
filter["logGroupName"] = "/test"
filter["filterName"] = "test"
filter["filterPattern"] = "[]"
# when
# only one subscription filter can be associated with a log group
with assert_raises(ClientError) as e:
client_logs.put_subscription_filter(
logGroupName=log_group_name,
filterName="test-2",
filterPattern="",
destinationArn=function_arn,
)
# then
ex = e.exception
ex.operation_name.should.equal("PutSubscriptionFilter")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("LimitExceededException")
ex.response["Error"]["Message"].should.equal("Resource limit exceeded.")
@mock_lambda
@mock_logs
def test_put_subscription_filter_with_lambda():
# given
region_name = "us-east-1"
client_lambda = boto3.client("lambda", region_name)
client_logs = boto3.client("logs", region_name)
log_group_name = "/test"
log_stream_name = "stream"
client_logs.create_log_group(logGroupName=log_group_name)
client_logs.create_log_stream(
logGroupName=log_group_name, logStreamName=log_stream_name
)
function_arn = client_lambda.create_function(
FunctionName="test",
Runtime="python3.8",
Role=_get_role_name(region_name),
Handler="lambda_function.lambda_handler",
Code={"ZipFile": _get_test_zip_file()},
Description="test lambda function",
Timeout=3,
MemorySize=128,
Publish=True,
)["FunctionArn"]
# when
client_logs.put_subscription_filter(
logGroupName=log_group_name,
filterName="test",
filterPattern="",
destinationArn=function_arn,
)
# then
response = client_logs.describe_subscription_filters(logGroupName=log_group_name)
response["subscriptionFilters"].should.have.length_of(1)
filter = response["subscriptionFilters"][0]
filter["creationTime"].should.be.a(int)
filter["destinationArn"] = "arn:aws:lambda:us-east-1:123456789012:function:test"
filter["distribution"] = "ByLogStream"
filter["logGroupName"] = "/test"
filter["filterName"] = "test"
filter["filterPattern"] = ""
# when
client_logs.put_log_events(
logGroupName=log_group_name,
logStreamName=log_stream_name,
logEvents=[
{"timestamp": 0, "message": "test"},
{"timestamp": 0, "message": "test 2"},
],
)
# then
msg_showed_up, received_message = _wait_for_log_msg(
client_logs, "/aws/lambda/test", "awslogs"
)
assert msg_showed_up, "CloudWatch log event was not found. All logs: {}".format(
received_message
)
data = json.loads(received_message)["awslogs"]["data"]
response = json.loads(
zlib.decompress(base64.b64decode(data), 16 + zlib.MAX_WBITS).decode("utf-8")
)
response["messageType"].should.equal("DATA_MESSAGE")
response["owner"].should.equal("123456789012")
response["logGroup"].should.equal("/test")
response["logStream"].should.equal("stream")
response["subscriptionFilters"].should.equal(["test"])
log_events = sorted(response["logEvents"], key=lambda log_event: log_event["id"])
log_events.should.have.length_of(2)
log_events[0]["id"].should.be.a(int)
log_events[0]["message"].should.equal("test")
log_events[0]["timestamp"].should.equal(0)
log_events[1]["id"].should.be.a(int)
log_events[1]["message"].should.equal("test 2")
log_events[1]["timestamp"].should.equal(0)
@mock_logs
def test_put_subscription_filter_errors():
# given
client = boto3.client("logs", "us-east-1")
log_group_name = "/test"
client.create_log_group(logGroupName=log_group_name)
# when
with assert_raises(ClientError) as e:
client.put_subscription_filter(
logGroupName="not-existing-log-group",
filterName="test",
filterPattern="",
destinationArn="arn:aws:lambda:us-east-1:123456789012:function:test",
)
# then
ex = e.exception
ex.operation_name.should.equal("PutSubscriptionFilter")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("ResourceNotFoundException")
ex.response["Error"]["Message"].should.equal(
"The specified log group does not exist"
)
# when
with assert_raises(ClientError) as e:
client.put_subscription_filter(
logGroupName="/test",
filterName="test",
filterPattern="",
destinationArn="arn:aws:lambda:us-east-1:123456789012:function:not-existing",
)
# then
ex = e.exception
ex.operation_name.should.equal("PutSubscriptionFilter")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("InvalidParameterException")
ex.response["Error"]["Message"].should.equal(
"Could not execute the lambda function. "
"Make sure you have given CloudWatch Logs permission to execute your function."
)
# when
with assert_raises(ClientError) as e:
client.put_subscription_filter(
logGroupName="/test",
filterName="test",
filterPattern="",
destinationArn="arn:aws:lambda:us-east-1:123456789012:function:not-existing",
)
# then
ex = e.exception
ex.operation_name.should.equal("PutSubscriptionFilter")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("InvalidParameterException")
ex.response["Error"]["Message"].should.equal(
"Could not execute the lambda function. "
"Make sure you have given CloudWatch Logs permission to execute your function."
)
@mock_lambda
@mock_logs
def test_delete_subscription_filter_errors():
# given
region_name = "us-east-1"
client_lambda = boto3.client("lambda", region_name)
client_logs = boto3.client("logs", region_name)
log_group_name = "/test"
client_logs.create_log_group(logGroupName=log_group_name)
function_arn = client_lambda.create_function(
FunctionName="test",
Runtime="python3.8",
Role=_get_role_name(region_name),
Handler="lambda_function.lambda_handler",
Code={"ZipFile": _get_test_zip_file()},
Description="test lambda function",
Timeout=3,
MemorySize=128,
Publish=True,
)["FunctionArn"]
client_logs.put_subscription_filter(
logGroupName=log_group_name,
filterName="test",
filterPattern="",
destinationArn=function_arn,
)
# when
client_logs.delete_subscription_filter(
logGroupName="/test", filterName="test",
)
# then
response = client_logs.describe_subscription_filters(logGroupName=log_group_name)
response["subscriptionFilters"].should.have.length_of(0)
@mock_lambda
@mock_logs
def test_delete_subscription_filter_errors():
# given
region_name = "us-east-1"
client_lambda = boto3.client("lambda", region_name)
client_logs = boto3.client("logs", region_name)
log_group_name = "/test"
client_logs.create_log_group(logGroupName=log_group_name)
function_arn = client_lambda.create_function(
FunctionName="test",
Runtime="python3.8",
Role=_get_role_name(region_name),
Handler="lambda_function.lambda_handler",
Code={"ZipFile": _get_test_zip_file()},
Description="test lambda function",
Timeout=3,
MemorySize=128,
Publish=True,
)["FunctionArn"]
client_logs.put_subscription_filter(
logGroupName=log_group_name,
filterName="test",
filterPattern="",
destinationArn=function_arn,
)
# when
with assert_raises(ClientError) as e:
client_logs.delete_subscription_filter(
logGroupName="not-existing-log-group", filterName="test",
)
# then
ex = e.exception
ex.operation_name.should.equal("DeleteSubscriptionFilter")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("ResourceNotFoundException")
ex.response["Error"]["Message"].should.equal(
"The specified log group does not exist"
)
# when
with assert_raises(ClientError) as e:
client_logs.delete_subscription_filter(
logGroupName="/test", filterName="wrong-filter-name",
)
# then
ex = e.exception
ex.operation_name.should.equal("DeleteSubscriptionFilter")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("ResourceNotFoundException")
ex.response["Error"]["Message"].should.equal(
"The specified subscription filter does not exist."
)
def _get_role_name(region_name):
with mock_iam():
iam = boto3.client("iam", region_name=region_name)
try:
return iam.get_role(RoleName="test-role")["Role"]["Arn"]
except ClientError:
return iam.create_role(
RoleName="test-role", AssumeRolePolicyDocument="test policy", Path="/",
)["Role"]["Arn"]
def _get_test_zip_file():
func_str = """
def lambda_handler(event, context):
return event
"""
zip_output = BytesIO()
zip_file = ZipFile(zip_output, "w", ZIP_DEFLATED)
zip_file.writestr("lambda_function.py", func_str)
zip_file.close()
zip_output.seek(0)
return zip_output.read()
def _wait_for_log_msg(client, log_group_name, expected_msg_part):
received_messages = []
start = time.time()
while (time.time() - start) < 10:
result = client.describe_log_streams(logGroupName=log_group_name)
log_streams = result.get("logStreams")
if not log_streams:
time.sleep(1)
continue
for log_stream in log_streams:
result = client.get_log_events(
logGroupName=log_group_name, logStreamName=log_stream["logStreamName"],
)
received_messages.extend(
[event["message"] for event in result.get("events")]
)
for message in received_messages:
if expected_msg_part in message:
return True, message
time.sleep(1)
return False, received_messages

View file

@ -3,7 +3,6 @@ from __future__ import unicode_literals
import base64
import json
import os
import time
import uuid
@ -17,34 +16,13 @@ from boto.exception import SQSError
from boto.sqs.message import Message, RawMessage
from botocore.exceptions import ClientError
from freezegun import freeze_time
from moto import mock_sqs, mock_sqs_deprecated, mock_cloudformation, settings
from moto import mock_sqs, mock_sqs_deprecated, mock_lambda, mock_logs, settings
from nose import SkipTest
from nose.tools import assert_raises
from tests.helpers import requires_boto_gte
from tests.test_awslambda.test_lambda import get_test_zip_file1, get_role_name
from moto.core import ACCOUNT_ID
sqs_template_with_tags = """
{
"AWSTemplateFormatVersion": "2010-09-09",
"Resources": {
"SQSQueue": {
"Type": "AWS::SQS::Queue",
"Properties": {
"Tags" : [
{
"Key" : "keyname1",
"Value" : "value1"
},
{
"Key" : "keyname2",
"Value" : "value2"
}
]
}
}
}
}"""
TEST_POLICY = """
{
"Version":"2012-10-17",
@ -2042,15 +2020,54 @@ def test_send_messages_to_fifo_without_message_group_id():
)
@mock_logs
@mock_lambda
@mock_sqs
@mock_cloudformation
def test_create_from_cloudformation_json_with_tags():
cf = boto3.client("cloudformation", region_name="us-east-1")
client = boto3.client("sqs", region_name="us-east-1")
def test_invoke_function_from_sqs_exception():
logs_conn = boto3.client("logs", region_name="us-east-1")
sqs = boto3.resource("sqs", region_name="us-east-1")
queue = sqs.create_queue(QueueName="test-sqs-queue1")
cf.create_stack(StackName="test-sqs", TemplateBody=sqs_template_with_tags)
conn = boto3.client("lambda", region_name="us-east-1")
func = conn.create_function(
FunctionName="testFunction",
Runtime="python2.7",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"ZipFile": get_test_zip_file1()},
Description="test lambda function",
Timeout=3,
MemorySize=128,
Publish=True,
)
queue_url = client.list_queues()["QueueUrls"][0]
response = conn.create_event_source_mapping(
EventSourceArn=queue.attributes["QueueArn"], FunctionName=func["FunctionArn"]
)
queue_tags = client.list_queue_tags(QueueUrl=queue_url)["Tags"]
queue_tags.should.equal({"keyname1": "value1", "keyname2": "value2"})
assert response["EventSourceArn"] == queue.attributes["QueueArn"]
assert response["State"] == "Enabled"
entries = [{"Id": "1", "MessageBody": json.dumps({"uuid": str(uuid.uuid4()), "test": "test"})}]
queue.send_messages(Entries=entries)
start = time.time()
while (time.time() - start) < 30:
result = logs_conn.describe_log_streams(logGroupName="/aws/lambda/testFunction")
log_streams = result.get("logStreams")
if not log_streams:
time.sleep(1)
continue
assert len(log_streams) >= 1
result = logs_conn.get_log_events(
logGroupName="/aws/lambda/testFunction",
logStreamName=log_streams[0]["logStreamName"],
)
for event in result.get("events"):
if "custom log event" in event["message"]:
return
time.sleep(1)
assert False, "Test Failed"

View file

@ -0,0 +1,38 @@
import boto3
from moto import mock_sqs, mock_cloudformation
sqs_template_with_tags = """
{
"AWSTemplateFormatVersion": "2010-09-09",
"Resources": {
"SQSQueue": {
"Type": "AWS::SQS::Queue",
"Properties": {
"Tags" : [
{
"Key" : "keyname1",
"Value" : "value1"
},
{
"Key" : "keyname2",
"Value" : "value2"
}
]
}
}
}
}"""
@mock_sqs
@mock_cloudformation
def test_create_from_cloudformation_json_with_tags():
cf = boto3.client("cloudformation", region_name="us-east-1")
client = boto3.client("sqs", region_name="us-east-1")
cf.create_stack(StackName="test-sqs", TemplateBody=sqs_template_with_tags)
queue_url = client.list_queues()["QueueUrls"][0]
queue_tags = client.list_queue_tags(QueueUrl=queue_url)["Tags"]
queue_tags.should.equal({"keyname1": "value1", "keyname2": "value2"})

View file

@ -7,12 +7,11 @@ import botocore.exceptions
import sure # noqa
import datetime
import uuid
import json
from botocore.exceptions import ClientError, ParamValidationError
from nose.tools import assert_raises
from moto import mock_ssm, mock_cloudformation
from moto import mock_ssm
@mock_ssm
@ -1714,68 +1713,3 @@ def test_get_command_invocation():
invocation_response = client.get_command_invocation(
CommandId=cmd_id, InstanceId=instance_id, PluginName="FAKE"
)
@mock_ssm
@mock_cloudformation
def test_get_command_invocations_from_stack():
stack_template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "Test Stack",
"Resources": {
"EC2Instance1": {
"Type": "AWS::EC2::Instance",
"Properties": {
"ImageId": "ami-test-image-id",
"KeyName": "test",
"InstanceType": "t2.micro",
"Tags": [
{"Key": "Test Description", "Value": "Test tag"},
{"Key": "Test Name", "Value": "Name tag for tests"},
],
},
}
},
"Outputs": {
"test": {
"Description": "Test Output",
"Value": "Test output value",
"Export": {"Name": "Test value to export"},
},
"PublicIP": {"Value": "Test public ip"},
},
}
cloudformation_client = boto3.client("cloudformation", region_name="us-east-1")
stack_template_str = json.dumps(stack_template)
response = cloudformation_client.create_stack(
StackName="test_stack",
TemplateBody=stack_template_str,
Capabilities=("CAPABILITY_IAM",),
)
client = boto3.client("ssm", region_name="us-east-1")
ssm_document = "AWS-RunShellScript"
params = {"commands": ["#!/bin/bash\necho 'hello world'"]}
response = client.send_command(
Targets=[
{"Key": "tag:aws:cloudformation:stack-name", "Values": ("test_stack",)}
],
DocumentName=ssm_document,
Parameters=params,
OutputS3Region="us-east-2",
OutputS3BucketName="the-bucket",
OutputS3KeyPrefix="pref",
)
cmd = response["Command"]
cmd_id = cmd["CommandId"]
instance_ids = cmd["InstanceIds"]
invocation_response = client.get_command_invocation(
CommandId=cmd_id, InstanceId=instance_ids[0], PluginName="aws:runShellScript"
)

View file

@ -0,0 +1,70 @@
import boto3
import json
from moto import mock_ssm, mock_cloudformation
@mock_ssm
@mock_cloudformation
def test_get_command_invocations_from_stack():
stack_template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "Test Stack",
"Resources": {
"EC2Instance1": {
"Type": "AWS::EC2::Instance",
"Properties": {
"ImageId": "ami-test-image-id",
"KeyName": "test",
"InstanceType": "t2.micro",
"Tags": [
{"Key": "Test Description", "Value": "Test tag"},
{"Key": "Test Name", "Value": "Name tag for tests"},
],
},
}
},
"Outputs": {
"test": {
"Description": "Test Output",
"Value": "Test output value",
"Export": {"Name": "Test value to export"},
},
"PublicIP": {"Value": "Test public ip"},
},
}
cloudformation_client = boto3.client("cloudformation", region_name="us-east-1")
stack_template_str = json.dumps(stack_template)
response = cloudformation_client.create_stack(
StackName="test_stack",
TemplateBody=stack_template_str,
Capabilities=("CAPABILITY_IAM",),
)
client = boto3.client("ssm", region_name="us-east-1")
ssm_document = "AWS-RunShellScript"
params = {"commands": ["#!/bin/bash\necho 'hello world'"]}
response = client.send_command(
Targets=[
{"Key": "tag:aws:cloudformation:stack-name", "Values": ("test_stack",)}
],
DocumentName=ssm_document,
Parameters=params,
OutputS3Region="us-east-2",
OutputS3BucketName="the-bucket",
OutputS3KeyPrefix="pref",
)
cmd = response["Command"]
cmd_id = cmd["CommandId"]
instance_ids = cmd["InstanceIds"]
invocation_response = client.get_command_invocation(
CommandId=cmd_id, InstanceId=instance_ids[0], PluginName="aws:runShellScript"
)