Merge remote-tracking branch 'upstream/master'
This commit is contained in:
commit
6d12c83b89
24 changed files with 629 additions and 45 deletions
|
|
@ -1,17 +1,17 @@
|
|||
from __future__ import unicode_literals, print_function
|
||||
|
||||
from decimal import Decimal
|
||||
|
||||
import six
|
||||
import boto
|
||||
import boto3
|
||||
from boto3.dynamodb.conditions import Attr
|
||||
from boto3.dynamodb.conditions import Attr, Key
|
||||
import sure # noqa
|
||||
import requests
|
||||
from pytest import raises
|
||||
from moto import mock_dynamodb2, mock_dynamodb2_deprecated
|
||||
from moto.dynamodb2 import dynamodb_backend2
|
||||
from boto.exception import JSONResponseError
|
||||
from botocore.exceptions import ClientError
|
||||
from boto3.dynamodb.conditions import Key
|
||||
from tests.helpers import requires_boto_gte
|
||||
import tests.backport_assert_raises
|
||||
|
||||
|
|
@ -1119,7 +1119,7 @@ def test_update_item_on_map():
|
|||
})
|
||||
|
||||
# Test nested value for a nonexistent attribute.
|
||||
with raises(client.exceptions.ConditionalCheckFailedException):
|
||||
with assert_raises(client.exceptions.ConditionalCheckFailedException):
|
||||
table.update_item(Key={
|
||||
'forum_name': 'the-key',
|
||||
'subject': '123'
|
||||
|
|
@ -1200,3 +1200,95 @@ def test_update_if_not_exists():
|
|||
resp = table.scan()
|
||||
# Still the original value
|
||||
assert resp['Items'][0]['created_at'] == 123
|
||||
|
||||
|
||||
@mock_dynamodb2
|
||||
def test_query_global_secondary_index_when_created_via_update_table_resource():
|
||||
dynamodb = boto3.resource('dynamodb', region_name='us-east-1')
|
||||
|
||||
# Create the DynamoDB table.
|
||||
dynamodb.create_table(
|
||||
TableName='users',
|
||||
KeySchema=[
|
||||
{
|
||||
'AttributeName': 'user_id',
|
||||
'KeyType': 'HASH'
|
||||
},
|
||||
],
|
||||
AttributeDefinitions=[
|
||||
{
|
||||
'AttributeName': 'user_id',
|
||||
'AttributeType': 'N',
|
||||
},
|
||||
{
|
||||
'AttributeName': 'forum_name',
|
||||
'AttributeType': 'S'
|
||||
},
|
||||
{
|
||||
'AttributeName': 'subject',
|
||||
'AttributeType': 'S'
|
||||
},
|
||||
],
|
||||
ProvisionedThroughput={
|
||||
'ReadCapacityUnits': 5,
|
||||
'WriteCapacityUnits': 5
|
||||
},
|
||||
)
|
||||
table = dynamodb.Table('users')
|
||||
table.update(
|
||||
AttributeDefinitions=[
|
||||
{
|
||||
'AttributeName': 'forum_name',
|
||||
'AttributeType': 'S'
|
||||
},
|
||||
],
|
||||
GlobalSecondaryIndexUpdates=[
|
||||
{'Create':
|
||||
{
|
||||
'IndexName': 'forum_name_index',
|
||||
'KeySchema': [
|
||||
{
|
||||
'AttributeName': 'forum_name',
|
||||
'KeyType': 'HASH',
|
||||
},
|
||||
],
|
||||
'Projection': {
|
||||
'ProjectionType': 'ALL',
|
||||
},
|
||||
'ProvisionedThroughput': {
|
||||
'ReadCapacityUnits': 5,
|
||||
'WriteCapacityUnits': 5
|
||||
},
|
||||
}
|
||||
}
|
||||
]
|
||||
)
|
||||
|
||||
next_user_id = 1
|
||||
for my_forum_name in ['cats', 'dogs']:
|
||||
for my_subject in ['my pet is the cutest', 'wow look at what my pet did', "don't you love my pet?"]:
|
||||
table.put_item(Item={'user_id': next_user_id, 'forum_name': my_forum_name, 'subject': my_subject})
|
||||
next_user_id += 1
|
||||
|
||||
# get all the cat users
|
||||
forum_only_query_response = table.query(
|
||||
IndexName='forum_name_index',
|
||||
Select='ALL_ATTRIBUTES',
|
||||
KeyConditionExpression=Key('forum_name').eq('cats'),
|
||||
)
|
||||
forum_only_items = forum_only_query_response['Items']
|
||||
assert len(forum_only_items) == 3
|
||||
for item in forum_only_items:
|
||||
assert item['forum_name'] == 'cats'
|
||||
|
||||
# query all cat users with a particular subject
|
||||
forum_and_subject_query_results = table.query(
|
||||
IndexName='forum_name_index',
|
||||
Select='ALL_ATTRIBUTES',
|
||||
KeyConditionExpression=Key('forum_name').eq('cats'),
|
||||
FilterExpression=Attr('subject').eq('my pet is the cutest'),
|
||||
)
|
||||
forum_and_subject_items = forum_and_subject_query_results['Items']
|
||||
assert len(forum_and_subject_items) == 1
|
||||
assert forum_and_subject_items[0] == {'user_id': Decimal('1'), 'forum_name': 'cats',
|
||||
'subject': 'my pet is the cutest'}
|
||||
|
|
|
|||
|
|
@ -689,6 +689,31 @@ def test_authorize_and_revoke_in_bulk():
|
|||
sg01.ip_permissions_egress.shouldnt.contain(ip_permission)
|
||||
|
||||
|
||||
@mock_ec2
|
||||
def test_security_group_ingress_without_multirule():
|
||||
ec2 = boto3.resource('ec2', 'ca-central-1')
|
||||
sg = ec2.create_security_group(Description='Test SG', GroupName='test-sg')
|
||||
|
||||
assert len(sg.ip_permissions) == 0
|
||||
sg.authorize_ingress(CidrIp='192.168.0.1/32', FromPort=22, ToPort=22, IpProtocol='tcp')
|
||||
|
||||
# Fails
|
||||
assert len(sg.ip_permissions) == 1
|
||||
|
||||
|
||||
@mock_ec2
|
||||
def test_security_group_ingress_without_multirule_after_reload():
|
||||
ec2 = boto3.resource('ec2', 'ca-central-1')
|
||||
sg = ec2.create_security_group(Description='Test SG', GroupName='test-sg')
|
||||
|
||||
assert len(sg.ip_permissions) == 0
|
||||
sg.authorize_ingress(CidrIp='192.168.0.1/32', FromPort=22, ToPort=22, IpProtocol='tcp')
|
||||
|
||||
# Also Fails
|
||||
sg_after = ec2.SecurityGroup(sg.id)
|
||||
assert len(sg_after.ip_permissions) == 1
|
||||
|
||||
|
||||
@mock_ec2_deprecated
|
||||
def test_get_all_security_groups_filter_with_same_vpc_id():
|
||||
conn = boto.connect_ec2('the_key', 'the_secret')
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ import json
|
|||
from moto.ec2 import utils as ec2_utils
|
||||
from uuid import UUID
|
||||
|
||||
from moto import mock_cloudformation
|
||||
from moto import mock_cloudformation, mock_elbv2
|
||||
from moto import mock_ecs
|
||||
from moto import mock_ec2
|
||||
from nose.tools import assert_raises
|
||||
|
|
@ -2015,3 +2015,62 @@ def _fetch_container_instance_resources(container_instance_description):
|
|||
registered_resources['PORTS'] = \
|
||||
[x['stringSetValue'] for x in registered_resources_list if x['name'] == 'PORTS'][0]
|
||||
return remaining_resources, registered_resources
|
||||
|
||||
|
||||
@mock_ecs
|
||||
def test_create_service_load_balancing():
|
||||
client = boto3.client('ecs', region_name='us-east-1')
|
||||
client.create_cluster(
|
||||
clusterName='test_ecs_cluster'
|
||||
)
|
||||
client.register_task_definition(
|
||||
family='test_ecs_task',
|
||||
containerDefinitions=[
|
||||
{
|
||||
'name': 'hello_world',
|
||||
'image': 'docker/hello-world:latest',
|
||||
'cpu': 1024,
|
||||
'memory': 400,
|
||||
'essential': True,
|
||||
'environment': [{
|
||||
'name': 'AWS_ACCESS_KEY_ID',
|
||||
'value': 'SOME_ACCESS_KEY'
|
||||
}],
|
||||
'logConfiguration': {'logDriver': 'json-file'}
|
||||
}
|
||||
]
|
||||
)
|
||||
response = client.create_service(
|
||||
cluster='test_ecs_cluster',
|
||||
serviceName='test_ecs_service',
|
||||
taskDefinition='test_ecs_task',
|
||||
desiredCount=2,
|
||||
loadBalancers=[
|
||||
{
|
||||
'targetGroupArn': 'test_target_group_arn',
|
||||
'loadBalancerName': 'test_load_balancer_name',
|
||||
'containerName': 'test_container_name',
|
||||
'containerPort': 123
|
||||
}
|
||||
]
|
||||
)
|
||||
response['service']['clusterArn'].should.equal(
|
||||
'arn:aws:ecs:us-east-1:012345678910:cluster/test_ecs_cluster')
|
||||
response['service']['desiredCount'].should.equal(2)
|
||||
len(response['service']['events']).should.equal(0)
|
||||
len(response['service']['loadBalancers']).should.equal(1)
|
||||
response['service']['loadBalancers'][0]['targetGroupArn'].should.equal(
|
||||
'test_target_group_arn')
|
||||
response['service']['loadBalancers'][0]['loadBalancerName'].should.equal(
|
||||
'test_load_balancer_name')
|
||||
response['service']['loadBalancers'][0]['containerName'].should.equal(
|
||||
'test_container_name')
|
||||
response['service']['loadBalancers'][0]['containerPort'].should.equal(123)
|
||||
response['service']['pendingCount'].should.equal(0)
|
||||
response['service']['runningCount'].should.equal(0)
|
||||
response['service']['serviceArn'].should.equal(
|
||||
'arn:aws:ecs:us-east-1:012345678910:service/test_ecs_service')
|
||||
response['service']['serviceName'].should.equal('test_ecs_service')
|
||||
response['service']['status'].should.equal('ACTIVE')
|
||||
response['service']['taskDefinition'].should.equal(
|
||||
'arn:aws:ecs:us-east-1:012345678910:task-definition/test_ecs_task:1')
|
||||
|
|
|
|||
|
|
@ -1,4 +1,6 @@
|
|||
from __future__ import unicode_literals
|
||||
|
||||
import json
|
||||
import os
|
||||
import boto3
|
||||
import botocore
|
||||
|
|
@ -6,7 +8,7 @@ from botocore.exceptions import ClientError
|
|||
from nose.tools import assert_raises
|
||||
import sure # noqa
|
||||
|
||||
from moto import mock_elbv2, mock_ec2, mock_acm
|
||||
from moto import mock_elbv2, mock_ec2, mock_acm, mock_cloudformation
|
||||
from moto.elbv2 import elbv2_backends
|
||||
|
||||
|
||||
|
|
@ -416,6 +418,7 @@ def test_create_target_group_and_listeners():
|
|||
response = conn.describe_target_groups()
|
||||
response.get('TargetGroups').should.have.length_of(0)
|
||||
|
||||
|
||||
@mock_elbv2
|
||||
@mock_ec2
|
||||
def test_create_target_group_without_non_required_parameters():
|
||||
|
|
@ -454,6 +457,7 @@ def test_create_target_group_without_non_required_parameters():
|
|||
target_group = response.get('TargetGroups')[0]
|
||||
target_group.should_not.be.none
|
||||
|
||||
|
||||
@mock_elbv2
|
||||
@mock_ec2
|
||||
def test_create_invalid_target_group():
|
||||
|
|
@ -1105,6 +1109,50 @@ def test_describe_invalid_target_group():
|
|||
conn.describe_target_groups(Names=['invalid'])
|
||||
|
||||
|
||||
@mock_elbv2
|
||||
@mock_ec2
|
||||
def test_describe_target_groups_no_arguments():
|
||||
conn = boto3.client('elbv2', region_name='us-east-1')
|
||||
ec2 = boto3.resource('ec2', region_name='us-east-1')
|
||||
|
||||
security_group = ec2.create_security_group(
|
||||
GroupName='a-security-group', Description='First One')
|
||||
vpc = ec2.create_vpc(CidrBlock='172.28.7.0/24', InstanceTenancy='default')
|
||||
subnet1 = ec2.create_subnet(
|
||||
VpcId=vpc.id,
|
||||
CidrBlock='172.28.7.192/26',
|
||||
AvailabilityZone='us-east-1a')
|
||||
subnet2 = ec2.create_subnet(
|
||||
VpcId=vpc.id,
|
||||
CidrBlock='172.28.7.192/26',
|
||||
AvailabilityZone='us-east-1b')
|
||||
|
||||
response = conn.create_load_balancer(
|
||||
Name='my-lb',
|
||||
Subnets=[subnet1.id, subnet2.id],
|
||||
SecurityGroups=[security_group.id],
|
||||
Scheme='internal',
|
||||
Tags=[{'Key': 'key_name', 'Value': 'a_value'}])
|
||||
|
||||
response.get('LoadBalancers')[0].get('LoadBalancerArn')
|
||||
|
||||
conn.create_target_group(
|
||||
Name='a-target',
|
||||
Protocol='HTTP',
|
||||
Port=8080,
|
||||
VpcId=vpc.id,
|
||||
HealthCheckProtocol='HTTP',
|
||||
HealthCheckPort='8080',
|
||||
HealthCheckPath='/',
|
||||
HealthCheckIntervalSeconds=5,
|
||||
HealthCheckTimeoutSeconds=5,
|
||||
HealthyThresholdCount=5,
|
||||
UnhealthyThresholdCount=2,
|
||||
Matcher={'HttpCode': '200'})
|
||||
|
||||
assert len(conn.describe_target_groups()['TargetGroups']) == 1
|
||||
|
||||
|
||||
@mock_elbv2
|
||||
def test_describe_account_limits():
|
||||
client = boto3.client('elbv2', region_name='eu-central-1')
|
||||
|
|
@ -1473,3 +1521,68 @@ def test_modify_listener_http_to_https():
|
|||
{'Type': 'forward', 'TargetGroupArn': target_group_arn}
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
@mock_ec2
|
||||
@mock_elbv2
|
||||
@mock_cloudformation
|
||||
def test_create_target_groups_through_cloudformation():
|
||||
cfn_conn = boto3.client('cloudformation', region_name='us-east-1')
|
||||
elbv2_client = boto3.client('elbv2', region_name='us-east-1')
|
||||
|
||||
# test that setting a name manually as well as letting cloudformation create a name both work
|
||||
# this is a special case because test groups have a name length limit of 22 characters, and must be unique
|
||||
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-elasticloadbalancingv2-targetgroup.html#cfn-elasticloadbalancingv2-targetgroup-name
|
||||
template = {
|
||||
"AWSTemplateFormatVersion": "2010-09-09",
|
||||
"Description": "ECS Cluster Test CloudFormation",
|
||||
"Resources": {
|
||||
"testVPC": {
|
||||
"Type": "AWS::EC2::VPC",
|
||||
"Properties": {
|
||||
"CidrBlock": "10.0.0.0/16",
|
||||
},
|
||||
},
|
||||
"testGroup1": {
|
||||
"Type": "AWS::ElasticLoadBalancingV2::TargetGroup",
|
||||
"Properties": {
|
||||
"Port": 80,
|
||||
"Protocol": "HTTP",
|
||||
"VpcId": {"Ref": "testVPC"},
|
||||
},
|
||||
},
|
||||
"testGroup2": {
|
||||
"Type": "AWS::ElasticLoadBalancingV2::TargetGroup",
|
||||
"Properties": {
|
||||
"Port": 90,
|
||||
"Protocol": "HTTP",
|
||||
"VpcId": {"Ref": "testVPC"},
|
||||
},
|
||||
},
|
||||
"testGroup3": {
|
||||
"Type": "AWS::ElasticLoadBalancingV2::TargetGroup",
|
||||
"Properties": {
|
||||
"Name": "MyTargetGroup",
|
||||
"Port": 70,
|
||||
"Protocol": "HTTPS",
|
||||
"VpcId": {"Ref": "testVPC"},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
template_json = json.dumps(template)
|
||||
cfn_conn.create_stack(
|
||||
StackName="test-stack",
|
||||
TemplateBody=template_json,
|
||||
)
|
||||
|
||||
describe_target_groups_response = elbv2_client.describe_target_groups()
|
||||
target_group_dicts = describe_target_groups_response['TargetGroups']
|
||||
assert len(target_group_dicts) == 3
|
||||
|
||||
# there should be 2 target groups with the same prefix of 10 characters (since the random suffix is 12)
|
||||
# and one named MyTargetGroup
|
||||
assert len([tg for tg in target_group_dicts if tg['TargetGroupName'] == 'MyTargetGroup']) == 1
|
||||
assert len(
|
||||
[tg for tg in target_group_dicts if tg['TargetGroupName'].startswith('test-stack')]
|
||||
) == 2
|
||||
|
|
|
|||
106
tests/test_s3/test_s3_storageclass.py
Normal file
106
tests/test_s3/test_s3_storageclass.py
Normal file
|
|
@ -0,0 +1,106 @@
|
|||
from __future__ import unicode_literals
|
||||
|
||||
import boto
|
||||
import boto3
|
||||
from boto.exception import S3CreateError, S3ResponseError
|
||||
from boto.s3.lifecycle import Lifecycle, Transition, Expiration, Rule
|
||||
|
||||
import sure # noqa
|
||||
from botocore.exceptions import ClientError
|
||||
from datetime import datetime
|
||||
from nose.tools import assert_raises
|
||||
|
||||
from moto import mock_s3_deprecated, mock_s3
|
||||
|
||||
|
||||
@mock_s3
|
||||
def test_s3_storage_class_standard():
|
||||
s3 = boto3.client("s3")
|
||||
s3.create_bucket(Bucket="Bucket")
|
||||
|
||||
# add an object to the bucket with standard storage
|
||||
|
||||
s3.put_object(Bucket="Bucket", Key="my_key", Body="my_value")
|
||||
|
||||
list_of_objects = s3.list_objects(Bucket="Bucket")
|
||||
|
||||
list_of_objects['Contents'][0]["StorageClass"].should.equal("STANDARD")
|
||||
|
||||
|
||||
@mock_s3
|
||||
def test_s3_storage_class_infrequent_access():
|
||||
s3 = boto3.client("s3")
|
||||
s3.create_bucket(Bucket="Bucket")
|
||||
|
||||
# add an object to the bucket with standard storage
|
||||
|
||||
s3.put_object(Bucket="Bucket", Key="my_key_infrequent", Body="my_value_infrequent", StorageClass="STANDARD_IA")
|
||||
|
||||
D = s3.list_objects(Bucket="Bucket")
|
||||
|
||||
D['Contents'][0]["StorageClass"].should.equal("STANDARD_IA")
|
||||
|
||||
|
||||
@mock_s3
|
||||
def test_s3_storage_class_copy():
|
||||
s3 = boto3.client("s3")
|
||||
s3.create_bucket(Bucket="Bucket")
|
||||
s3.put_object(Bucket="Bucket", Key="First_Object", Body="Body", StorageClass="STANDARD")
|
||||
|
||||
s3.create_bucket(Bucket="Bucket2")
|
||||
# second object is originally of storage class REDUCED_REDUNDANCY
|
||||
s3.put_object(Bucket="Bucket2", Key="Second_Object", Body="Body2")
|
||||
|
||||
s3.copy_object(CopySource = {"Bucket": "Bucket", "Key": "First_Object"}, Bucket="Bucket2", Key="Second_Object", StorageClass="ONEZONE_IA")
|
||||
|
||||
list_of_copied_objects = s3.list_objects(Bucket="Bucket2")
|
||||
|
||||
# checks that a copied object can be properly copied
|
||||
list_of_copied_objects["Contents"][0]["StorageClass"].should.equal("ONEZONE_IA")
|
||||
|
||||
|
||||
@mock_s3
|
||||
def test_s3_invalid_copied_storage_class():
|
||||
s3 = boto3.client("s3")
|
||||
s3.create_bucket(Bucket="Bucket")
|
||||
s3.put_object(Bucket="Bucket", Key="First_Object", Body="Body", StorageClass="STANDARD")
|
||||
|
||||
s3.create_bucket(Bucket="Bucket2")
|
||||
s3.put_object(Bucket="Bucket2", Key="Second_Object", Body="Body2", StorageClass="REDUCED_REDUNDANCY")
|
||||
|
||||
# Try to copy an object with an invalid storage class
|
||||
with assert_raises(ClientError) as err:
|
||||
s3.copy_object(CopySource = {"Bucket": "Bucket", "Key": "First_Object"}, Bucket="Bucket2", Key="Second_Object", StorageClass="STANDARD2")
|
||||
|
||||
e = err.exception
|
||||
e.response["Error"]["Code"].should.equal("InvalidStorageClass")
|
||||
e.response["Error"]["Message"].should.equal("The storage class you specified is not valid")
|
||||
|
||||
|
||||
@mock_s3
|
||||
def test_s3_invalid_storage_class():
|
||||
s3 = boto3.client("s3")
|
||||
s3.create_bucket(Bucket="Bucket")
|
||||
|
||||
# Try to add an object with an invalid storage class
|
||||
with assert_raises(ClientError) as err:
|
||||
s3.put_object(Bucket="Bucket", Key="First_Object", Body="Body", StorageClass="STANDARDD")
|
||||
|
||||
e = err.exception
|
||||
e.response["Error"]["Code"].should.equal("InvalidStorageClass")
|
||||
e.response["Error"]["Message"].should.equal("The storage class you specified is not valid")
|
||||
|
||||
@mock_s3
|
||||
def test_s3_default_storage_class():
|
||||
s3 = boto3.client("s3")
|
||||
s3.create_bucket(Bucket="Bucket")
|
||||
|
||||
s3.put_object(Bucket="Bucket", Key="First_Object", Body="Body")
|
||||
|
||||
list_of_objects = s3.list_objects(Bucket="Bucket")
|
||||
|
||||
# tests that the default storage class is still STANDARD
|
||||
list_of_objects["Contents"][0]["StorageClass"].should.equal("STANDARD")
|
||||
|
||||
|
||||
|
||||
13
tests/test_secretsmanager/test_secretsmanager.py
Normal file
13
tests/test_secretsmanager/test_secretsmanager.py
Normal file
|
|
@ -0,0 +1,13 @@
|
|||
from __future__ import unicode_literals
|
||||
|
||||
import boto3
|
||||
|
||||
from moto import mock_secretsmanager
|
||||
import sure # noqa
|
||||
|
||||
@mock_secretsmanager
|
||||
def test_get_secret_value():
|
||||
conn = boto3.client('secretsmanager', region_name='us-west-2')
|
||||
|
||||
result = conn.get_secret_value(SecretId='java-util-test-password')
|
||||
assert result['SecretString'] == 'mysecretstring'
|
||||
27
tests/test_secretsmanager/test_server.py
Normal file
27
tests/test_secretsmanager/test_server.py
Normal file
|
|
@ -0,0 +1,27 @@
|
|||
from __future__ import unicode_literals
|
||||
|
||||
import json
|
||||
import sure # noqa
|
||||
|
||||
import moto.server as server
|
||||
from moto import mock_secretsmanager
|
||||
|
||||
'''
|
||||
Test the different server responses
|
||||
'''
|
||||
|
||||
|
||||
@mock_secretsmanager
|
||||
def test_get_secret_value():
|
||||
|
||||
backend = server.create_backend_app("secretsmanager")
|
||||
test_client = backend.test_client()
|
||||
|
||||
res = test_client.post('/',
|
||||
data={"SecretId": "test", "VersionStage": "AWSCURRENT"},
|
||||
headers={
|
||||
"X-Amz-Target": "secretsmanager.GetSecretValue"},
|
||||
)
|
||||
|
||||
json_data = json.loads(res.data.decode("utf-8"))
|
||||
assert json_data['SecretString'] == "mysecretstring"
|
||||
|
|
@ -10,6 +10,7 @@ import sure # noqa
|
|||
|
||||
import responses
|
||||
from botocore.exceptions import ClientError
|
||||
from nose.tools import assert_raises
|
||||
from moto import mock_sns, mock_sqs
|
||||
|
||||
|
||||
|
|
@ -308,6 +309,20 @@ def test_publish_subject():
|
|||
raise RuntimeError('Should have raised an InvalidParameter exception')
|
||||
|
||||
|
||||
@mock_sns
|
||||
def test_publish_message_too_long():
|
||||
sns = boto3.resource('sns', region_name='us-east-1')
|
||||
topic = sns.create_topic(Name='some-topic')
|
||||
|
||||
with assert_raises(ClientError):
|
||||
topic.publish(
|
||||
Message="".join(["." for i in range(0, 262145)]))
|
||||
|
||||
# message short enough - does not raise an error
|
||||
topic.publish(
|
||||
Message="".join(["." for i in range(0, 262144)]))
|
||||
|
||||
|
||||
def _setup_filter_policy_test(filter_policy):
|
||||
sns = boto3.resource('sns', region_name='us-east-1')
|
||||
topic = sns.create_topic(Name='some-topic')
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue