Merge branch 'master' into get-caller-identity

This commit is contained in:
Bendegúz Ács 2019-08-21 12:36:40 +02:00 committed by GitHub
commit 24dcdb7453
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
82 changed files with 3063 additions and 309 deletions

View file

@ -988,13 +988,30 @@ def test_api_keys():
apikey['name'].should.equal(apikey_name)
len(apikey['value']).should.equal(40)
apikey_name = 'TESTKEY3'
payload = {'name': apikey_name }
response = client.create_api_key(**payload)
apikey_id = response['id']
patch_operations = [
{'op': 'replace', 'path': '/name', 'value': 'TESTKEY3_CHANGE'},
{'op': 'replace', 'path': '/customerId', 'value': '12345'},
{'op': 'replace', 'path': '/description', 'value': 'APIKEY UPDATE TEST'},
{'op': 'replace', 'path': '/enabled', 'value': 'false'},
]
response = client.update_api_key(apiKey=apikey_id, patchOperations=patch_operations)
response['name'].should.equal('TESTKEY3_CHANGE')
response['customerId'].should.equal('12345')
response['description'].should.equal('APIKEY UPDATE TEST')
response['enabled'].should.equal(False)
response = client.get_api_keys()
len(response['items']).should.equal(2)
len(response['items']).should.equal(3)
client.delete_api_key(apiKey=apikey_id)
response = client.get_api_keys()
len(response['items']).should.equal(1)
len(response['items']).should.equal(2)
@mock_apigateway
def test_usage_plans():

View file

@ -7,11 +7,13 @@ from boto.ec2.autoscale.group import AutoScalingGroup
from boto.ec2.autoscale import Tag
import boto.ec2.elb
import sure # noqa
from botocore.exceptions import ClientError
from nose.tools import assert_raises
from moto import mock_autoscaling, mock_ec2_deprecated, mock_elb_deprecated, mock_elb, mock_autoscaling_deprecated, mock_ec2
from tests.helpers import requires_boto_gte
from utils import setup_networking, setup_networking_deprecated
from utils import setup_networking, setup_networking_deprecated, setup_instance_with_networking
@mock_autoscaling_deprecated
@ -724,6 +726,67 @@ def test_create_autoscaling_group_boto3():
response['ResponseMetadata']['HTTPStatusCode'].should.equal(200)
@mock_autoscaling
def test_create_autoscaling_group_from_instance():
autoscaling_group_name = 'test_asg'
image_id = 'ami-0cc293023f983ed53'
instance_type = 't2.micro'
mocked_instance_with_networking = setup_instance_with_networking(image_id, instance_type)
client = boto3.client('autoscaling', region_name='us-east-1')
response = client.create_auto_scaling_group(
AutoScalingGroupName=autoscaling_group_name,
InstanceId=mocked_instance_with_networking['instance'],
MinSize=1,
MaxSize=3,
DesiredCapacity=2,
Tags=[
{'ResourceId': 'test_asg',
'ResourceType': 'auto-scaling-group',
'Key': 'propogated-tag-key',
'Value': 'propogate-tag-value',
'PropagateAtLaunch': True
},
{'ResourceId': 'test_asg',
'ResourceType': 'auto-scaling-group',
'Key': 'not-propogated-tag-key',
'Value': 'not-propogate-tag-value',
'PropagateAtLaunch': False
}],
VPCZoneIdentifier=mocked_instance_with_networking['subnet1'],
NewInstancesProtectedFromScaleIn=False,
)
response['ResponseMetadata']['HTTPStatusCode'].should.equal(200)
describe_launch_configurations_response = client.describe_launch_configurations()
describe_launch_configurations_response['LaunchConfigurations'].should.have.length_of(1)
launch_configuration_from_instance = describe_launch_configurations_response['LaunchConfigurations'][0]
launch_configuration_from_instance['LaunchConfigurationName'].should.equal('test_asg')
launch_configuration_from_instance['ImageId'].should.equal(image_id)
launch_configuration_from_instance['InstanceType'].should.equal(instance_type)
@mock_autoscaling
def test_create_autoscaling_group_from_invalid_instance_id():
invalid_instance_id = 'invalid_instance'
mocked_networking = setup_networking()
client = boto3.client('autoscaling', region_name='us-east-1')
with assert_raises(ClientError) as ex:
client.create_auto_scaling_group(
AutoScalingGroupName='test_asg',
InstanceId=invalid_instance_id,
MinSize=9,
MaxSize=15,
DesiredCapacity=12,
VPCZoneIdentifier=mocked_networking['subnet1'],
NewInstancesProtectedFromScaleIn=False,
)
ex.exception.response['ResponseMetadata']['HTTPStatusCode'].should.equal(400)
ex.exception.response['Error']['Code'].should.equal('ValidationError')
ex.exception.response['Error']['Message'].should.equal('Instance [{0}] is invalid.'.format(invalid_instance_id))
@mock_autoscaling
def test_describe_autoscaling_groups_boto3():
mocked_networking = setup_networking()
@ -823,6 +886,62 @@ def test_update_autoscaling_group_boto3():
group['NewInstancesProtectedFromScaleIn'].should.equal(False)
@mock_autoscaling
def test_update_autoscaling_group_min_size_desired_capacity_change():
mocked_networking = setup_networking()
client = boto3.client('autoscaling', region_name='us-east-1')
client.create_launch_configuration(
LaunchConfigurationName='test_launch_configuration'
)
client.create_auto_scaling_group(
AutoScalingGroupName='test_asg',
LaunchConfigurationName='test_launch_configuration',
MinSize=2,
MaxSize=20,
DesiredCapacity=3,
VPCZoneIdentifier=mocked_networking['subnet1'],
)
client.update_auto_scaling_group(
AutoScalingGroupName='test_asg',
MinSize=5,
)
response = client.describe_auto_scaling_groups(
AutoScalingGroupNames=['test_asg'])
group = response['AutoScalingGroups'][0]
group['DesiredCapacity'].should.equal(5)
group['MinSize'].should.equal(5)
group['Instances'].should.have.length_of(5)
@mock_autoscaling
def test_update_autoscaling_group_max_size_desired_capacity_change():
mocked_networking = setup_networking()
client = boto3.client('autoscaling', region_name='us-east-1')
client.create_launch_configuration(
LaunchConfigurationName='test_launch_configuration'
)
client.create_auto_scaling_group(
AutoScalingGroupName='test_asg',
LaunchConfigurationName='test_launch_configuration',
MinSize=2,
MaxSize=20,
DesiredCapacity=10,
VPCZoneIdentifier=mocked_networking['subnet1'],
)
client.update_auto_scaling_group(
AutoScalingGroupName='test_asg',
MaxSize=5,
)
response = client.describe_auto_scaling_groups(
AutoScalingGroupNames=['test_asg'])
group = response['AutoScalingGroups'][0]
group['DesiredCapacity'].should.equal(5)
group['MaxSize'].should.equal(5)
group['Instances'].should.have.length_of(5)
@mock_autoscaling
def test_autoscaling_taqs_update_boto3():
mocked_networking = setup_networking()
@ -1269,3 +1388,36 @@ def test_set_desired_capacity_down_boto3():
instance_ids = {instance['InstanceId'] for instance in group['Instances']}
set(protected).should.equal(instance_ids)
set(unprotected).should_not.be.within(instance_ids) # only unprotected killed
@mock_autoscaling
@mock_ec2
def test_terminate_instance_in_autoscaling_group():
mocked_networking = setup_networking()
client = boto3.client('autoscaling', region_name='us-east-1')
_ = client.create_launch_configuration(
LaunchConfigurationName='test_launch_configuration'
)
_ = client.create_auto_scaling_group(
AutoScalingGroupName='test_asg',
LaunchConfigurationName='test_launch_configuration',
MinSize=1,
MaxSize=20,
VPCZoneIdentifier=mocked_networking['subnet1'],
NewInstancesProtectedFromScaleIn=False
)
response = client.describe_auto_scaling_groups(AutoScalingGroupNames=['test_asg'])
original_instance_id = next(
instance['InstanceId']
for instance in response['AutoScalingGroups'][0]['Instances']
)
ec2_client = boto3.client('ec2', region_name='us-east-1')
ec2_client.terminate_instances(InstanceIds=[original_instance_id])
response = client.describe_auto_scaling_groups(AutoScalingGroupNames=['test_asg'])
replaced_instance_id = next(
instance['InstanceId']
for instance in response['AutoScalingGroups'][0]['Instances']
)
replaced_instance_id.should_not.equal(original_instance_id)

View file

@ -31,3 +31,18 @@ def setup_networking_deprecated():
"10.11.2.0/24",
availability_zone='us-east-1b')
return {'vpc': vpc.id, 'subnet1': subnet1.id, 'subnet2': subnet2.id}
@mock_ec2
def setup_instance_with_networking(image_id, instance_type):
mock_data = setup_networking()
ec2 = boto3.resource('ec2', region_name='us-east-1')
instances = ec2.create_instances(
ImageId=image_id,
InstanceType=instance_type,
MaxCount=1,
MinCount=1,
SubnetId=mock_data['subnet1']
)
mock_data['instance'] = instances[0].id
return mock_data

View file

@ -642,6 +642,87 @@ def test_describe_task_definition():
len(resp['jobDefinitions']).should.equal(3)
@mock_logs
@mock_ec2
@mock_ecs
@mock_iam
@mock_batch
def test_submit_job_by_name():
ec2_client, iam_client, ecs_client, logs_client, batch_client = _get_clients()
vpc_id, subnet_id, sg_id, iam_arn = _setup(ec2_client, iam_client)
compute_name = 'test_compute_env'
resp = batch_client.create_compute_environment(
computeEnvironmentName=compute_name,
type='UNMANAGED',
state='ENABLED',
serviceRole=iam_arn
)
arn = resp['computeEnvironmentArn']
resp = batch_client.create_job_queue(
jobQueueName='test_job_queue',
state='ENABLED',
priority=123,
computeEnvironmentOrder=[
{
'order': 123,
'computeEnvironment': arn
},
]
)
queue_arn = resp['jobQueueArn']
job_definition_name = 'sleep10'
batch_client.register_job_definition(
jobDefinitionName=job_definition_name,
type='container',
containerProperties={
'image': 'busybox',
'vcpus': 1,
'memory': 128,
'command': ['sleep', '10']
}
)
batch_client.register_job_definition(
jobDefinitionName=job_definition_name,
type='container',
containerProperties={
'image': 'busybox',
'vcpus': 1,
'memory': 256,
'command': ['sleep', '10']
}
)
resp = batch_client.register_job_definition(
jobDefinitionName=job_definition_name,
type='container',
containerProperties={
'image': 'busybox',
'vcpus': 1,
'memory': 512,
'command': ['sleep', '10']
}
)
job_definition_arn = resp['jobDefinitionArn']
resp = batch_client.submit_job(
jobName='test1',
jobQueue=queue_arn,
jobDefinition=job_definition_name
)
job_id = resp['jobId']
resp_jobs = batch_client.describe_jobs(jobs=[job_id])
# batch_client.terminate_job(jobId=job_id)
len(resp_jobs['jobs']).should.equal(1)
resp_jobs['jobs'][0]['jobId'].should.equal(job_id)
resp_jobs['jobs'][0]['jobQueue'].should.equal(queue_arn)
resp_jobs['jobs'][0]['jobDefinition'].should.equal(job_definition_arn)
# SLOW TESTS
@expected_failure
@mock_logs

View file

@ -68,7 +68,7 @@ def test_get_open_id_token_for_developer_identity():
},
TokenDuration=123
)
assert len(result['Token'])
assert len(result['Token']) > 0
assert result['IdentityId'] == '12345'
@mock_cognitoidentity
@ -83,3 +83,15 @@ def test_get_open_id_token_for_developer_identity_when_no_explicit_identity_id()
)
assert len(result['Token']) > 0
assert len(result['IdentityId']) > 0
@mock_cognitoidentity
def test_get_open_id_token():
conn = boto3.client('cognito-identity', 'us-west-2')
result = conn.get_open_id_token(
IdentityId='12345',
Logins={
'someurl': '12345'
}
)
assert len(result['Token']) > 0
assert result['IdentityId'] == '12345'

View file

@ -133,6 +133,22 @@ def test_create_user_pool_domain():
result["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
@mock_cognitoidp
def test_create_user_pool_domain_custom_domain_config():
conn = boto3.client("cognito-idp", "us-west-2")
domain = str(uuid.uuid4())
custom_domain_config = {
"CertificateArn": "arn:aws:acm:us-east-1:123456789012:certificate/123456789012",
}
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
result = conn.create_user_pool_domain(
UserPoolId=user_pool_id, Domain=domain, CustomDomainConfig=custom_domain_config
)
result["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
result["CloudFrontDomain"].should.equal("e2c343b3293ee505.cloudfront.net")
@mock_cognitoidp
def test_describe_user_pool_domain():
conn = boto3.client("cognito-idp", "us-west-2")
@ -162,6 +178,23 @@ def test_delete_user_pool_domain():
result["DomainDescription"].keys().should.have.length_of(0)
@mock_cognitoidp
def test_update_user_pool_domain():
conn = boto3.client("cognito-idp", "us-west-2")
domain = str(uuid.uuid4())
custom_domain_config = {
"CertificateArn": "arn:aws:acm:us-east-1:123456789012:certificate/123456789012",
}
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
conn.create_user_pool_domain(UserPoolId=user_pool_id, Domain=domain)
result = conn.update_user_pool_domain(
UserPoolId=user_pool_id, Domain=domain, CustomDomainConfig=custom_domain_config
)
result["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
result["CloudFrontDomain"].should.equal("e2c343b3293ee505.cloudfront.net")
@mock_cognitoidp
def test_create_user_pool_client():
conn = boto3.client("cognito-idp", "us-west-2")

View file

@ -123,6 +123,526 @@ def test_put_configuration_recorder():
assert "maximum number of configuration recorders: 1 is reached." in ce.exception.response['Error']['Message']
@mock_config
def test_put_configuration_aggregator():
client = boto3.client('config', region_name='us-west-2')
# With too many aggregation sources:
with assert_raises(ClientError) as ce:
client.put_configuration_aggregator(
ConfigurationAggregatorName='testing',
AccountAggregationSources=[
{
'AccountIds': [
'012345678910',
'111111111111',
'222222222222'
],
'AwsRegions': [
'us-east-1',
'us-west-2'
]
},
{
'AccountIds': [
'012345678910',
'111111111111',
'222222222222'
],
'AwsRegions': [
'us-east-1',
'us-west-2'
]
}
]
)
assert 'Member must have length less than or equal to 1' in ce.exception.response['Error']['Message']
assert ce.exception.response['Error']['Code'] == 'ValidationException'
# With an invalid region config (no regions defined):
with assert_raises(ClientError) as ce:
client.put_configuration_aggregator(
ConfigurationAggregatorName='testing',
AccountAggregationSources=[
{
'AccountIds': [
'012345678910',
'111111111111',
'222222222222'
],
'AllAwsRegions': False
}
]
)
assert 'Your request does not specify any regions' in ce.exception.response['Error']['Message']
assert ce.exception.response['Error']['Code'] == 'InvalidParameterValueException'
with assert_raises(ClientError) as ce:
client.put_configuration_aggregator(
ConfigurationAggregatorName='testing',
OrganizationAggregationSource={
'RoleArn': 'arn:aws:iam::012345678910:role/SomeRole'
}
)
assert 'Your request does not specify any regions' in ce.exception.response['Error']['Message']
assert ce.exception.response['Error']['Code'] == 'InvalidParameterValueException'
# With both region flags defined:
with assert_raises(ClientError) as ce:
client.put_configuration_aggregator(
ConfigurationAggregatorName='testing',
AccountAggregationSources=[
{
'AccountIds': [
'012345678910',
'111111111111',
'222222222222'
],
'AwsRegions': [
'us-east-1',
'us-west-2'
],
'AllAwsRegions': True
}
]
)
assert 'You must choose one of these options' in ce.exception.response['Error']['Message']
assert ce.exception.response['Error']['Code'] == 'InvalidParameterValueException'
with assert_raises(ClientError) as ce:
client.put_configuration_aggregator(
ConfigurationAggregatorName='testing',
OrganizationAggregationSource={
'RoleArn': 'arn:aws:iam::012345678910:role/SomeRole',
'AwsRegions': [
'us-east-1',
'us-west-2'
],
'AllAwsRegions': True
}
)
assert 'You must choose one of these options' in ce.exception.response['Error']['Message']
assert ce.exception.response['Error']['Code'] == 'InvalidParameterValueException'
# Name too long:
with assert_raises(ClientError) as ce:
client.put_configuration_aggregator(
ConfigurationAggregatorName='a' * 257,
AccountAggregationSources=[
{
'AccountIds': [
'012345678910',
],
'AllAwsRegions': True
}
]
)
assert 'configurationAggregatorName' in ce.exception.response['Error']['Message']
assert ce.exception.response['Error']['Code'] == 'ValidationException'
# Too many tags (>50):
with assert_raises(ClientError) as ce:
client.put_configuration_aggregator(
ConfigurationAggregatorName='testing',
AccountAggregationSources=[
{
'AccountIds': [
'012345678910',
],
'AllAwsRegions': True
}
],
Tags=[{'Key': '{}'.format(x), 'Value': '{}'.format(x)} for x in range(0, 51)]
)
assert 'Member must have length less than or equal to 50' in ce.exception.response['Error']['Message']
assert ce.exception.response['Error']['Code'] == 'ValidationException'
# Tag key is too big (>128 chars):
with assert_raises(ClientError) as ce:
client.put_configuration_aggregator(
ConfigurationAggregatorName='testing',
AccountAggregationSources=[
{
'AccountIds': [
'012345678910',
],
'AllAwsRegions': True
}
],
Tags=[{'Key': 'a' * 129, 'Value': 'a'}]
)
assert 'Member must have length less than or equal to 128' in ce.exception.response['Error']['Message']
assert ce.exception.response['Error']['Code'] == 'ValidationException'
# Tag value is too big (>256 chars):
with assert_raises(ClientError) as ce:
client.put_configuration_aggregator(
ConfigurationAggregatorName='testing',
AccountAggregationSources=[
{
'AccountIds': [
'012345678910',
],
'AllAwsRegions': True
}
],
Tags=[{'Key': 'tag', 'Value': 'a' * 257}]
)
assert 'Member must have length less than or equal to 256' in ce.exception.response['Error']['Message']
assert ce.exception.response['Error']['Code'] == 'ValidationException'
# Duplicate Tags:
with assert_raises(ClientError) as ce:
client.put_configuration_aggregator(
ConfigurationAggregatorName='testing',
AccountAggregationSources=[
{
'AccountIds': [
'012345678910',
],
'AllAwsRegions': True
}
],
Tags=[{'Key': 'a', 'Value': 'a'}, {'Key': 'a', 'Value': 'a'}]
)
assert 'Duplicate tag keys found.' in ce.exception.response['Error']['Message']
assert ce.exception.response['Error']['Code'] == 'InvalidInput'
# Invalid characters in the tag key:
with assert_raises(ClientError) as ce:
client.put_configuration_aggregator(
ConfigurationAggregatorName='testing',
AccountAggregationSources=[
{
'AccountIds': [
'012345678910',
],
'AllAwsRegions': True
}
],
Tags=[{'Key': '!', 'Value': 'a'}]
)
assert 'Member must satisfy regular expression pattern:' in ce.exception.response['Error']['Message']
assert ce.exception.response['Error']['Code'] == 'ValidationException'
# If it contains both the AccountAggregationSources and the OrganizationAggregationSource
with assert_raises(ClientError) as ce:
client.put_configuration_aggregator(
ConfigurationAggregatorName='testing',
AccountAggregationSources=[
{
'AccountIds': [
'012345678910',
],
'AllAwsRegions': False
}
],
OrganizationAggregationSource={
'RoleArn': 'arn:aws:iam::012345678910:role/SomeRole',
'AllAwsRegions': False
}
)
assert 'AccountAggregationSource and the OrganizationAggregationSource' in ce.exception.response['Error']['Message']
assert ce.exception.response['Error']['Code'] == 'InvalidParameterValueException'
# If it contains neither:
with assert_raises(ClientError) as ce:
client.put_configuration_aggregator(
ConfigurationAggregatorName='testing',
)
assert 'AccountAggregationSource or the OrganizationAggregationSource' in ce.exception.response['Error']['Message']
assert ce.exception.response['Error']['Code'] == 'InvalidParameterValueException'
# Just make one:
account_aggregation_source = {
'AccountIds': [
'012345678910',
'111111111111',
'222222222222'
],
'AwsRegions': [
'us-east-1',
'us-west-2'
],
'AllAwsRegions': False
}
result = client.put_configuration_aggregator(
ConfigurationAggregatorName='testing',
AccountAggregationSources=[account_aggregation_source],
)
assert result['ConfigurationAggregator']['ConfigurationAggregatorName'] == 'testing'
assert result['ConfigurationAggregator']['AccountAggregationSources'] == [account_aggregation_source]
assert 'arn:aws:config:us-west-2:123456789012:config-aggregator/config-aggregator-' in \
result['ConfigurationAggregator']['ConfigurationAggregatorArn']
assert result['ConfigurationAggregator']['CreationTime'] == result['ConfigurationAggregator']['LastUpdatedTime']
# Update the existing one:
original_arn = result['ConfigurationAggregator']['ConfigurationAggregatorArn']
account_aggregation_source.pop('AwsRegions')
account_aggregation_source['AllAwsRegions'] = True
result = client.put_configuration_aggregator(
ConfigurationAggregatorName='testing',
AccountAggregationSources=[account_aggregation_source]
)
assert result['ConfigurationAggregator']['ConfigurationAggregatorName'] == 'testing'
assert result['ConfigurationAggregator']['AccountAggregationSources'] == [account_aggregation_source]
assert result['ConfigurationAggregator']['ConfigurationAggregatorArn'] == original_arn
# Make an org one:
result = client.put_configuration_aggregator(
ConfigurationAggregatorName='testingOrg',
OrganizationAggregationSource={
'RoleArn': 'arn:aws:iam::012345678910:role/SomeRole',
'AwsRegions': ['us-east-1', 'us-west-2']
}
)
assert result['ConfigurationAggregator']['ConfigurationAggregatorName'] == 'testingOrg'
assert result['ConfigurationAggregator']['OrganizationAggregationSource'] == {
'RoleArn': 'arn:aws:iam::012345678910:role/SomeRole',
'AwsRegions': [
'us-east-1',
'us-west-2'
],
'AllAwsRegions': False
}
@mock_config
def test_describe_configuration_aggregators():
client = boto3.client('config', region_name='us-west-2')
# Without any config aggregators:
assert not client.describe_configuration_aggregators()['ConfigurationAggregators']
# Make 10 config aggregators:
for x in range(0, 10):
client.put_configuration_aggregator(
ConfigurationAggregatorName='testing{}'.format(x),
AccountAggregationSources=[
{
'AccountIds': [
'012345678910',
],
'AllAwsRegions': True
}
]
)
# Describe with an incorrect name:
with assert_raises(ClientError) as ce:
client.describe_configuration_aggregators(ConfigurationAggregatorNames=['DoesNotExist'])
assert 'The configuration aggregator does not exist.' in ce.exception.response['Error']['Message']
assert ce.exception.response['Error']['Code'] == 'NoSuchConfigurationAggregatorException'
# Error describe with more than 1 item in the list:
with assert_raises(ClientError) as ce:
client.describe_configuration_aggregators(ConfigurationAggregatorNames=['testing0', 'DoesNotExist'])
assert 'At least one of the configuration aggregators does not exist.' in ce.exception.response['Error']['Message']
assert ce.exception.response['Error']['Code'] == 'NoSuchConfigurationAggregatorException'
# Get the normal list:
result = client.describe_configuration_aggregators()
assert not result.get('NextToken')
assert len(result['ConfigurationAggregators']) == 10
# Test filtered list:
agg_names = ['testing0', 'testing1', 'testing2']
result = client.describe_configuration_aggregators(ConfigurationAggregatorNames=agg_names)
assert not result.get('NextToken')
assert len(result['ConfigurationAggregators']) == 3
assert [agg['ConfigurationAggregatorName'] for agg in result['ConfigurationAggregators']] == agg_names
# Test Pagination:
result = client.describe_configuration_aggregators(Limit=4)
assert len(result['ConfigurationAggregators']) == 4
assert result['NextToken'] == 'testing4'
assert [agg['ConfigurationAggregatorName'] for agg in result['ConfigurationAggregators']] == \
['testing{}'.format(x) for x in range(0, 4)]
result = client.describe_configuration_aggregators(Limit=4, NextToken='testing4')
assert len(result['ConfigurationAggregators']) == 4
assert result['NextToken'] == 'testing8'
assert [agg['ConfigurationAggregatorName'] for agg in result['ConfigurationAggregators']] == \
['testing{}'.format(x) for x in range(4, 8)]
result = client.describe_configuration_aggregators(Limit=4, NextToken='testing8')
assert len(result['ConfigurationAggregators']) == 2
assert not result.get('NextToken')
assert [agg['ConfigurationAggregatorName'] for agg in result['ConfigurationAggregators']] == \
['testing{}'.format(x) for x in range(8, 10)]
# Test Pagination with Filtering:
result = client.describe_configuration_aggregators(ConfigurationAggregatorNames=['testing2', 'testing4'], Limit=1)
assert len(result['ConfigurationAggregators']) == 1
assert result['NextToken'] == 'testing4'
assert result['ConfigurationAggregators'][0]['ConfigurationAggregatorName'] == 'testing2'
result = client.describe_configuration_aggregators(ConfigurationAggregatorNames=['testing2', 'testing4'], Limit=1, NextToken='testing4')
assert not result.get('NextToken')
assert result['ConfigurationAggregators'][0]['ConfigurationAggregatorName'] == 'testing4'
# Test with an invalid filter:
with assert_raises(ClientError) as ce:
client.describe_configuration_aggregators(NextToken='WRONG')
assert 'The nextToken provided is invalid' == ce.exception.response['Error']['Message']
assert ce.exception.response['Error']['Code'] == 'InvalidNextTokenException'
@mock_config
def test_put_aggregation_authorization():
client = boto3.client('config', region_name='us-west-2')
# Too many tags (>50):
with assert_raises(ClientError) as ce:
client.put_aggregation_authorization(
AuthorizedAccountId='012345678910',
AuthorizedAwsRegion='us-west-2',
Tags=[{'Key': '{}'.format(x), 'Value': '{}'.format(x)} for x in range(0, 51)]
)
assert 'Member must have length less than or equal to 50' in ce.exception.response['Error']['Message']
assert ce.exception.response['Error']['Code'] == 'ValidationException'
# Tag key is too big (>128 chars):
with assert_raises(ClientError) as ce:
client.put_aggregation_authorization(
AuthorizedAccountId='012345678910',
AuthorizedAwsRegion='us-west-2',
Tags=[{'Key': 'a' * 129, 'Value': 'a'}]
)
assert 'Member must have length less than or equal to 128' in ce.exception.response['Error']['Message']
assert ce.exception.response['Error']['Code'] == 'ValidationException'
# Tag value is too big (>256 chars):
with assert_raises(ClientError) as ce:
client.put_aggregation_authorization(
AuthorizedAccountId='012345678910',
AuthorizedAwsRegion='us-west-2',
Tags=[{'Key': 'tag', 'Value': 'a' * 257}]
)
assert 'Member must have length less than or equal to 256' in ce.exception.response['Error']['Message']
assert ce.exception.response['Error']['Code'] == 'ValidationException'
# Duplicate Tags:
with assert_raises(ClientError) as ce:
client.put_aggregation_authorization(
AuthorizedAccountId='012345678910',
AuthorizedAwsRegion='us-west-2',
Tags=[{'Key': 'a', 'Value': 'a'}, {'Key': 'a', 'Value': 'a'}]
)
assert 'Duplicate tag keys found.' in ce.exception.response['Error']['Message']
assert ce.exception.response['Error']['Code'] == 'InvalidInput'
# Invalid characters in the tag key:
with assert_raises(ClientError) as ce:
client.put_aggregation_authorization(
AuthorizedAccountId='012345678910',
AuthorizedAwsRegion='us-west-2',
Tags=[{'Key': '!', 'Value': 'a'}]
)
assert 'Member must satisfy regular expression pattern:' in ce.exception.response['Error']['Message']
assert ce.exception.response['Error']['Code'] == 'ValidationException'
# Put a normal one there:
result = client.put_aggregation_authorization(AuthorizedAccountId='012345678910', AuthorizedAwsRegion='us-east-1',
Tags=[{'Key': 'tag', 'Value': 'a'}])
assert result['AggregationAuthorization']['AggregationAuthorizationArn'] == 'arn:aws:config:us-west-2:123456789012:' \
'aggregation-authorization/012345678910/us-east-1'
assert result['AggregationAuthorization']['AuthorizedAccountId'] == '012345678910'
assert result['AggregationAuthorization']['AuthorizedAwsRegion'] == 'us-east-1'
assert isinstance(result['AggregationAuthorization']['CreationTime'], datetime)
creation_date = result['AggregationAuthorization']['CreationTime']
# And again:
result = client.put_aggregation_authorization(AuthorizedAccountId='012345678910', AuthorizedAwsRegion='us-east-1')
assert result['AggregationAuthorization']['AggregationAuthorizationArn'] == 'arn:aws:config:us-west-2:123456789012:' \
'aggregation-authorization/012345678910/us-east-1'
assert result['AggregationAuthorization']['AuthorizedAccountId'] == '012345678910'
assert result['AggregationAuthorization']['AuthorizedAwsRegion'] == 'us-east-1'
assert result['AggregationAuthorization']['CreationTime'] == creation_date
@mock_config
def test_describe_aggregation_authorizations():
client = boto3.client('config', region_name='us-west-2')
# With no aggregation authorizations:
assert not client.describe_aggregation_authorizations()['AggregationAuthorizations']
# Make 10 account authorizations:
for i in range(0, 10):
client.put_aggregation_authorization(AuthorizedAccountId='{}'.format(str(i) * 12), AuthorizedAwsRegion='us-west-2')
result = client.describe_aggregation_authorizations()
assert len(result['AggregationAuthorizations']) == 10
assert not result.get('NextToken')
for i in range(0, 10):
assert result['AggregationAuthorizations'][i]['AuthorizedAccountId'] == str(i) * 12
# Test Pagination:
result = client.describe_aggregation_authorizations(Limit=4)
assert len(result['AggregationAuthorizations']) == 4
assert result['NextToken'] == ('4' * 12) + '/us-west-2'
assert [auth['AuthorizedAccountId'] for auth in result['AggregationAuthorizations']] == ['{}'.format(str(x) * 12) for x in range(0, 4)]
result = client.describe_aggregation_authorizations(Limit=4, NextToken=('4' * 12) + '/us-west-2')
assert len(result['AggregationAuthorizations']) == 4
assert result['NextToken'] == ('8' * 12) + '/us-west-2'
assert [auth['AuthorizedAccountId'] for auth in result['AggregationAuthorizations']] == ['{}'.format(str(x) * 12) for x in range(4, 8)]
result = client.describe_aggregation_authorizations(Limit=4, NextToken=('8' * 12) + '/us-west-2')
assert len(result['AggregationAuthorizations']) == 2
assert not result.get('NextToken')
assert [auth['AuthorizedAccountId'] for auth in result['AggregationAuthorizations']] == ['{}'.format(str(x) * 12) for x in range(8, 10)]
# Test with an invalid filter:
with assert_raises(ClientError) as ce:
client.describe_aggregation_authorizations(NextToken='WRONG')
assert 'The nextToken provided is invalid' == ce.exception.response['Error']['Message']
assert ce.exception.response['Error']['Code'] == 'InvalidNextTokenException'
@mock_config
def test_delete_aggregation_authorization():
client = boto3.client('config', region_name='us-west-2')
client.put_aggregation_authorization(AuthorizedAccountId='012345678910', AuthorizedAwsRegion='us-west-2')
# Delete it:
client.delete_aggregation_authorization(AuthorizedAccountId='012345678910', AuthorizedAwsRegion='us-west-2')
# Verify that none are there:
assert not client.describe_aggregation_authorizations()['AggregationAuthorizations']
# Try it again -- nothing should happen:
client.delete_aggregation_authorization(AuthorizedAccountId='012345678910', AuthorizedAwsRegion='us-west-2')
@mock_config
def test_delete_configuration_aggregator():
client = boto3.client('config', region_name='us-west-2')
client.put_configuration_aggregator(
ConfigurationAggregatorName='testing',
AccountAggregationSources=[
{
'AccountIds': [
'012345678910',
],
'AllAwsRegions': True
}
]
)
client.delete_configuration_aggregator(ConfigurationAggregatorName='testing')
# And again to confirm that it's deleted:
with assert_raises(ClientError) as ce:
client.delete_configuration_aggregator(ConfigurationAggregatorName='testing')
assert 'The configuration aggregator does not exist.' in ce.exception.response['Error']['Message']
assert ce.exception.response['Error']['Code'] == 'NoSuchConfigurationAggregatorException'
@mock_config
def test_describe_configurations():
client = boto3.client('config', region_name='us-west-2')

View file

@ -0,0 +1,12 @@
import sure # noqa
import boto3
from moto import mock_sqs, settings
def test_context_manager_returns_mock():
with mock_sqs() as sqs_mock:
conn = boto3.client("sqs", region_name='us-west-1')
conn.create_queue(QueueName="queue1")
if not settings.TEST_SERVER_MODE:
list(sqs_mock.backends['us-west-1'].queues.keys()).should.equal(['queue1'])

View file

@ -38,12 +38,6 @@ def test_domain_dispatched():
keys[0].should.equal('EmailResponse.dispatch')
def test_domain_without_matches():
dispatcher = DomainDispatcherApplication(create_backend_app)
dispatcher.get_application.when.called_with(
{"HTTP_HOST": "not-matching-anything.com"}).should.throw(RuntimeError)
def test_domain_dispatched_with_service():
# If we pass a particular service, always return that.
dispatcher = DomainDispatcherApplication(create_backend_app, service="s3")

View file

@ -1342,6 +1342,46 @@ def test_query_missing_expr_names():
resp['Items'][0]['client']['S'].should.equal('test2')
# https://github.com/spulec/moto/issues/2328
@mock_dynamodb2
def test_update_item_with_list():
dynamodb = boto3.resource('dynamodb', region_name='us-east-1')
# Create the DynamoDB table.
dynamodb.create_table(
TableName='Table',
KeySchema=[
{
'AttributeName': 'key',
'KeyType': 'HASH'
}
],
AttributeDefinitions=[
{
'AttributeName': 'key',
'AttributeType': 'S'
},
],
ProvisionedThroughput={
'ReadCapacityUnits': 1,
'WriteCapacityUnits': 1
}
)
table = dynamodb.Table('Table')
table.update_item(
Key={'key': 'the-key'},
AttributeUpdates={
'list': {'Value': [1, 2], 'Action': 'PUT'}
}
)
resp = table.get_item(Key={'key': 'the-key'})
resp['Item'].should.equal({
'key': 'the-key',
'list': [1, 2]
})
# https://github.com/spulec/moto/issues/1342
@mock_dynamodb2
def test_update_item_on_map():
@ -1964,6 +2004,36 @@ def test_condition_expression__attr_doesnt_exist():
update_if_attr_doesnt_exist()
@mock_dynamodb2
def test_condition_expression__or_order():
client = boto3.client('dynamodb', region_name='us-east-1')
client.create_table(
TableName='test',
KeySchema=[{'AttributeName': 'forum_name', 'KeyType': 'HASH'}],
AttributeDefinitions=[
{'AttributeName': 'forum_name', 'AttributeType': 'S'},
],
ProvisionedThroughput={'ReadCapacityUnits': 1, 'WriteCapacityUnits': 1},
)
# ensure that the RHS of the OR expression is not evaluated if the LHS
# returns true (as it would result an error)
client.update_item(
TableName='test',
Key={
'forum_name': {'S': 'the-key'},
},
UpdateExpression='set #ttl=:ttl',
ConditionExpression='attribute_not_exists(#ttl) OR #ttl <= :old_ttl',
ExpressionAttributeNames={'#ttl': 'ttl'},
ExpressionAttributeValues={
':ttl': {'N': '6'},
':old_ttl': {'N': '5'},
}
)
@mock_dynamodb2
def test_query_gsi_with_range_key():
dynamodb = boto3.client('dynamodb', region_name='us-east-1')

View file

@ -10,7 +10,7 @@ from nose.tools import assert_raises
import sure # noqa
from moto import mock_ec2_deprecated, mock_ec2
from moto.ec2.models import AMIS
from moto.ec2.models import AMIS, OWNER_ID
from tests.helpers import requires_boto_gte
@ -152,6 +152,29 @@ def test_ami_copy():
cm.exception.request_id.should_not.be.none
@mock_ec2
def test_copy_image_changes_owner_id():
conn = boto3.client('ec2', region_name='us-east-1')
# this source AMI ID is from moto/ec2/resources/amis.json
source_ami_id = "ami-03cf127a"
# confirm the source ami owner id is different from the default owner id.
# if they're ever the same it means this test is invalid.
check_resp = conn.describe_images(ImageIds=[source_ami_id])
check_resp["Images"][0]["OwnerId"].should_not.equal(OWNER_ID)
copy_resp = conn.copy_image(
SourceImageId=source_ami_id,
Name="new-image",
Description="a copy of an image",
SourceRegion="us-east-1")
describe_resp = conn.describe_images(Owners=["self"])
describe_resp["Images"][0]["OwnerId"].should.equal(OWNER_ID)
describe_resp["Images"][0]["ImageId"].should.equal(copy_resp["ImageId"])
@mock_ec2_deprecated
def test_ami_tagging():
conn = boto.connect_vpc('the_key', 'the_secret')

View file

@ -12,6 +12,7 @@ from freezegun import freeze_time
import sure # noqa
from moto import mock_ec2_deprecated, mock_ec2
from moto.ec2.models import OWNER_ID
@mock_ec2_deprecated
@ -395,7 +396,7 @@ def test_snapshot_filters():
).should.equal({snapshot3.id})
snapshots_by_owner_id = conn.get_all_snapshots(
filters={'owner-id': '123456789012'})
filters={'owner-id': OWNER_ID})
set([snap.id for snap in snapshots_by_owner_id]
).should.equal({snapshot1.id, snapshot2.id, snapshot3.id})

View file

@ -161,7 +161,7 @@ def test_elastic_network_interfaces_filtering():
subnet.id, groups=[security_group1.id, security_group2.id])
eni2 = conn.create_network_interface(
subnet.id, groups=[security_group1.id])
eni3 = conn.create_network_interface(subnet.id)
eni3 = conn.create_network_interface(subnet.id, description='test description')
all_enis = conn.get_all_network_interfaces()
all_enis.should.have.length_of(3)
@ -189,6 +189,12 @@ def test_elastic_network_interfaces_filtering():
enis_by_group.should.have.length_of(1)
set([eni.id for eni in enis_by_group]).should.equal(set([eni1.id]))
# Filter by Description
enis_by_description = conn.get_all_network_interfaces(
filters={'description': eni3.description })
enis_by_description.should.have.length_of(1)
enis_by_description[0].description.should.equal(eni3.description)
# Unsupported filter
conn.get_all_network_interfaces.when.called_with(
filters={'not-implemented-filter': 'foobar'}).should.throw(NotImplementedError)
@ -343,6 +349,106 @@ def test_elastic_network_interfaces_get_by_subnet_id():
enis.should.have.length_of(0)
@mock_ec2
def test_elastic_network_interfaces_get_by_description():
ec2 = boto3.resource('ec2', region_name='us-west-2')
ec2_client = boto3.client('ec2', region_name='us-west-2')
vpc = ec2.create_vpc(CidrBlock='10.0.0.0/16')
subnet = ec2.create_subnet(
VpcId=vpc.id, CidrBlock='10.0.0.0/24', AvailabilityZone='us-west-2a')
eni1 = ec2.create_network_interface(
SubnetId=subnet.id, PrivateIpAddress='10.0.10.5', Description='test interface')
# The status of the new interface should be 'available'
waiter = ec2_client.get_waiter('network_interface_available')
waiter.wait(NetworkInterfaceIds=[eni1.id])
filters = [{'Name': 'description', 'Values': [eni1.description]}]
enis = list(ec2.network_interfaces.filter(Filters=filters))
enis.should.have.length_of(1)
filters = [{'Name': 'description', 'Values': ['bad description']}]
enis = list(ec2.network_interfaces.filter(Filters=filters))
enis.should.have.length_of(0)
@mock_ec2
def test_elastic_network_interfaces_describe_network_interfaces_with_filter():
ec2 = boto3.resource('ec2', region_name='us-west-2')
ec2_client = boto3.client('ec2', region_name='us-west-2')
vpc = ec2.create_vpc(CidrBlock='10.0.0.0/16')
subnet = ec2.create_subnet(
VpcId=vpc.id, CidrBlock='10.0.0.0/24', AvailabilityZone='us-west-2a')
eni1 = ec2.create_network_interface(
SubnetId=subnet.id, PrivateIpAddress='10.0.10.5', Description='test interface')
# The status of the new interface should be 'available'
waiter = ec2_client.get_waiter('network_interface_available')
waiter.wait(NetworkInterfaceIds=[eni1.id])
# Filter by network-interface-id
response = ec2_client.describe_network_interfaces(
Filters=[{'Name': 'network-interface-id', 'Values': [eni1.id]}])
response['NetworkInterfaces'].should.have.length_of(1)
response['NetworkInterfaces'][0]['NetworkInterfaceId'].should.equal(eni1.id)
response['NetworkInterfaces'][0]['PrivateIpAddress'].should.equal(eni1.private_ip_address)
response['NetworkInterfaces'][0]['Description'].should.equal(eni1.description)
response = ec2_client.describe_network_interfaces(
Filters=[{'Name': 'network-interface-id', 'Values': ['bad-id']}])
response['NetworkInterfaces'].should.have.length_of(0)
# Filter by private-ip-address
response = ec2_client.describe_network_interfaces(
Filters=[{'Name': 'private-ip-address', 'Values': [eni1.private_ip_address]}])
response['NetworkInterfaces'].should.have.length_of(1)
response['NetworkInterfaces'][0]['NetworkInterfaceId'].should.equal(eni1.id)
response['NetworkInterfaces'][0]['PrivateIpAddress'].should.equal(eni1.private_ip_address)
response['NetworkInterfaces'][0]['Description'].should.equal(eni1.description)
response = ec2_client.describe_network_interfaces(
Filters=[{'Name': 'private-ip-address', 'Values': ['11.11.11.11']}])
response['NetworkInterfaces'].should.have.length_of(0)
# Filter by sunet-id
response = ec2_client.describe_network_interfaces(
Filters=[{'Name': 'subnet-id', 'Values': [eni1.subnet.id]}])
response['NetworkInterfaces'].should.have.length_of(1)
response['NetworkInterfaces'][0]['NetworkInterfaceId'].should.equal(eni1.id)
response['NetworkInterfaces'][0]['PrivateIpAddress'].should.equal(eni1.private_ip_address)
response['NetworkInterfaces'][0]['Description'].should.equal(eni1.description)
response = ec2_client.describe_network_interfaces(
Filters=[{'Name': 'subnet-id', 'Values': ['sn-bad-id']}])
response['NetworkInterfaces'].should.have.length_of(0)
# Filter by description
response = ec2_client.describe_network_interfaces(
Filters=[{'Name': 'description', 'Values': [eni1.description]}])
response['NetworkInterfaces'].should.have.length_of(1)
response['NetworkInterfaces'][0]['NetworkInterfaceId'].should.equal(eni1.id)
response['NetworkInterfaces'][0]['PrivateIpAddress'].should.equal(eni1.private_ip_address)
response['NetworkInterfaces'][0]['Description'].should.equal(eni1.description)
response = ec2_client.describe_network_interfaces(
Filters=[{'Name': 'description', 'Values': ['bad description']}])
response['NetworkInterfaces'].should.have.length_of(0)
# Filter by multiple filters
response = ec2_client.describe_network_interfaces(
Filters=[{'Name': 'private-ip-address', 'Values': [eni1.private_ip_address]},
{'Name': 'network-interface-id', 'Values': [eni1.id]},
{'Name': 'subnet-id', 'Values': [eni1.subnet.id]}])
response['NetworkInterfaces'].should.have.length_of(1)
response['NetworkInterfaces'][0]['NetworkInterfaceId'].should.equal(eni1.id)
response['NetworkInterfaces'][0]['PrivateIpAddress'].should.equal(eni1.private_ip_address)
response['NetworkInterfaces'][0]['Description'].should.equal(eni1.description)
@mock_ec2_deprecated
@mock_cloudformation_deprecated
def test_elastic_network_interfaces_cloudformation():

View file

@ -1,4 +1,5 @@
from __future__ import unicode_literals
from datetime import datetime
from copy import deepcopy
@ -94,6 +95,10 @@ def test_register_task_definition():
}],
'logConfiguration': {'logDriver': 'json-file'}
}
],
tags=[
{'key': 'createdBy', 'value': 'moto-unittest'},
{'key': 'foo', 'value': 'bar'},
]
)
type(response['taskDefinition']).should.be(dict)
@ -473,6 +478,8 @@ def test_describe_services():
response['services'][0]['deployments'][0]['pendingCount'].should.equal(2)
response['services'][0]['deployments'][0]['runningCount'].should.equal(0)
response['services'][0]['deployments'][0]['status'].should.equal('PRIMARY')
(datetime.now() - response['services'][0]['deployments'][0]["createdAt"].replace(tzinfo=None)).seconds.should.be.within(0, 10)
(datetime.now() - response['services'][0]['deployments'][0]["updatedAt"].replace(tzinfo=None)).seconds.should.be.within(0, 10)
@mock_ecs
@ -2304,3 +2311,52 @@ def test_create_service_load_balancing():
response['service']['status'].should.equal('ACTIVE')
response['service']['taskDefinition'].should.equal(
'arn:aws:ecs:us-east-1:012345678910:task-definition/test_ecs_task:1')
@mock_ecs
def test_list_tags_for_resource():
client = boto3.client('ecs', region_name='us-east-1')
response = client.register_task_definition(
family='test_ecs_task',
containerDefinitions=[
{
'name': 'hello_world',
'image': 'docker/hello-world:latest',
'cpu': 1024,
'memory': 400,
'essential': True,
'environment': [{
'name': 'AWS_ACCESS_KEY_ID',
'value': 'SOME_ACCESS_KEY'
}],
'logConfiguration': {'logDriver': 'json-file'}
}
],
tags=[
{'key': 'createdBy', 'value': 'moto-unittest'},
{'key': 'foo', 'value': 'bar'},
]
)
type(response['taskDefinition']).should.be(dict)
response['taskDefinition']['revision'].should.equal(1)
response['taskDefinition']['taskDefinitionArn'].should.equal(
'arn:aws:ecs:us-east-1:012345678910:task-definition/test_ecs_task:1')
task_definition_arn = response['taskDefinition']['taskDefinitionArn']
response = client.list_tags_for_resource(resourceArn=task_definition_arn)
type(response['tags']).should.be(list)
response['tags'].should.equal([
{'key': 'createdBy', 'value': 'moto-unittest'},
{'key': 'foo', 'value': 'bar'},
])
@mock_ecs
def test_list_tags_for_resource_unknown():
client = boto3.client('ecs', region_name='us-east-1')
task_definition_arn = 'arn:aws:ecs:us-east-1:012345678910:task-definition/unknown:1'
try:
client.list_tags_for_resource(resourceArn=task_definition_arn)
except ClientError as err:
err.response['Error']['Code'].should.equal('ClientException')

View file

@ -667,6 +667,91 @@ def test_register_targets():
response.get('TargetHealthDescriptions').should.have.length_of(1)
@mock_ec2
@mock_elbv2
def test_stopped_instance_target():
target_group_port = 8080
conn = boto3.client('elbv2', region_name='us-east-1')
ec2 = boto3.resource('ec2', region_name='us-east-1')
security_group = ec2.create_security_group(
GroupName='a-security-group', Description='First One')
vpc = ec2.create_vpc(CidrBlock='172.28.7.0/24', InstanceTenancy='default')
subnet1 = ec2.create_subnet(
VpcId=vpc.id,
CidrBlock='172.28.7.192/26',
AvailabilityZone='us-east-1a')
subnet2 = ec2.create_subnet(
VpcId=vpc.id,
CidrBlock='172.28.7.0/26',
AvailabilityZone='us-east-1b')
conn.create_load_balancer(
Name='my-lb',
Subnets=[subnet1.id, subnet2.id],
SecurityGroups=[security_group.id],
Scheme='internal',
Tags=[{'Key': 'key_name', 'Value': 'a_value'}])
response = conn.create_target_group(
Name='a-target',
Protocol='HTTP',
Port=target_group_port,
VpcId=vpc.id,
HealthCheckProtocol='HTTP',
HealthCheckPath='/',
HealthCheckIntervalSeconds=5,
HealthCheckTimeoutSeconds=5,
HealthyThresholdCount=5,
UnhealthyThresholdCount=2,
Matcher={'HttpCode': '200'})
target_group = response.get('TargetGroups')[0]
# No targets registered yet
response = conn.describe_target_health(
TargetGroupArn=target_group.get('TargetGroupArn'))
response.get('TargetHealthDescriptions').should.have.length_of(0)
response = ec2.create_instances(
ImageId='ami-1234abcd', MinCount=1, MaxCount=1)
instance = response[0]
target_dict = {
'Id': instance.id,
'Port': 500
}
response = conn.register_targets(
TargetGroupArn=target_group.get('TargetGroupArn'),
Targets=[target_dict])
response = conn.describe_target_health(
TargetGroupArn=target_group.get('TargetGroupArn'))
response.get('TargetHealthDescriptions').should.have.length_of(1)
target_health_description = response.get('TargetHealthDescriptions')[0]
target_health_description['Target'].should.equal(target_dict)
target_health_description['HealthCheckPort'].should.equal(str(target_group_port))
target_health_description['TargetHealth'].should.equal({
'State': 'healthy'
})
instance.stop()
response = conn.describe_target_health(
TargetGroupArn=target_group.get('TargetGroupArn'))
response.get('TargetHealthDescriptions').should.have.length_of(1)
target_health_description = response.get('TargetHealthDescriptions')[0]
target_health_description['Target'].should.equal(target_dict)
target_health_description['HealthCheckPort'].should.equal(str(target_group_port))
target_health_description['TargetHealth'].should.equal({
'State': 'unused',
'Reason': 'Target.InvalidState',
'Description': 'Target is in the stopped state'
})
@mock_ec2
@mock_elbv2
def test_target_group_attributes():
@ -1726,3 +1811,132 @@ def test_redirect_action_listener_rule_cloudformation():
'Port': '443', 'Protocol': 'HTTPS', 'StatusCode': 'HTTP_301',
}
},])
@mock_elbv2
@mock_ec2
def test_cognito_action_listener_rule():
conn = boto3.client('elbv2', region_name='us-east-1')
ec2 = boto3.resource('ec2', region_name='us-east-1')
security_group = ec2.create_security_group(
GroupName='a-security-group', Description='First One')
vpc = ec2.create_vpc(CidrBlock='172.28.7.0/24', InstanceTenancy='default')
subnet1 = ec2.create_subnet(
VpcId=vpc.id,
CidrBlock='172.28.7.192/26',
AvailabilityZone='us-east-1a')
subnet2 = ec2.create_subnet(
VpcId=vpc.id,
CidrBlock='172.28.7.128/26',
AvailabilityZone='us-east-1b')
response = conn.create_load_balancer(
Name='my-lb',
Subnets=[subnet1.id, subnet2.id],
SecurityGroups=[security_group.id],
Scheme='internal',
Tags=[{'Key': 'key_name', 'Value': 'a_value'}])
load_balancer_arn = response.get('LoadBalancers')[0].get('LoadBalancerArn')
action = {
'Type': 'authenticate-cognito',
'AuthenticateCognitoConfig': {
'UserPoolArn': 'arn:aws:cognito-idp:us-east-1:123456789012:userpool/us-east-1_ABCD1234',
'UserPoolClientId': 'abcd1234abcd',
'UserPoolDomain': 'testpool',
}
}
response = conn.create_listener(LoadBalancerArn=load_balancer_arn,
Protocol='HTTP',
Port=80,
DefaultActions=[action])
listener = response.get('Listeners')[0]
listener.get('DefaultActions')[0].should.equal(action)
listener_arn = listener.get('ListenerArn')
describe_rules_response = conn.describe_rules(ListenerArn=listener_arn)
describe_rules_response['Rules'][0]['Actions'][0].should.equal(action)
describe_listener_response = conn.describe_listeners(ListenerArns=[listener_arn, ])
describe_listener_actions = describe_listener_response['Listeners'][0]['DefaultActions'][0]
describe_listener_actions.should.equal(action)
@mock_elbv2
@mock_cloudformation
def test_cognito_action_listener_rule_cloudformation():
cnf_conn = boto3.client('cloudformation', region_name='us-east-1')
elbv2_client = boto3.client('elbv2', region_name='us-east-1')
template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "ECS Cluster Test CloudFormation",
"Resources": {
"testVPC": {
"Type": "AWS::EC2::VPC",
"Properties": {
"CidrBlock": "10.0.0.0/16",
},
},
"subnet1": {
"Type": "AWS::EC2::Subnet",
"Properties": {
"CidrBlock": "10.0.0.0/24",
"VpcId": {"Ref": "testVPC"},
"AvalabilityZone": "us-east-1b",
},
},
"subnet2": {
"Type": "AWS::EC2::Subnet",
"Properties": {
"CidrBlock": "10.0.1.0/24",
"VpcId": {"Ref": "testVPC"},
"AvalabilityZone": "us-east-1b",
},
},
"testLb": {
"Type": "AWS::ElasticLoadBalancingV2::LoadBalancer",
"Properties": {
"Name": "my-lb",
"Subnets": [{"Ref": "subnet1"}, {"Ref": "subnet2"}],
"Type": "application",
"SecurityGroups": [],
}
},
"testListener": {
"Type": "AWS::ElasticLoadBalancingV2::Listener",
"Properties": {
"LoadBalancerArn": {"Ref": "testLb"},
"Port": 80,
"Protocol": "HTTP",
"DefaultActions": [{
"Type": "authenticate-cognito",
"AuthenticateCognitoConfig": {
'UserPoolArn': 'arn:aws:cognito-idp:us-east-1:123456789012:userpool/us-east-1_ABCD1234',
'UserPoolClientId': 'abcd1234abcd',
'UserPoolDomain': 'testpool',
}
}]
}
}
}
}
template_json = json.dumps(template)
cnf_conn.create_stack(StackName="test-stack", TemplateBody=template_json)
describe_load_balancers_response = elbv2_client.describe_load_balancers(Names=['my-lb',])
load_balancer_arn = describe_load_balancers_response['LoadBalancers'][0]['LoadBalancerArn']
describe_listeners_response = elbv2_client.describe_listeners(LoadBalancerArn=load_balancer_arn)
describe_listeners_response['Listeners'].should.have.length_of(1)
describe_listeners_response['Listeners'][0]['DefaultActions'].should.equal([{
'Type': 'authenticate-cognito',
"AuthenticateCognitoConfig": {
'UserPoolArn': 'arn:aws:cognito-idp:us-east-1:123456789012:userpool/us-east-1_ABCD1234',
'UserPoolClientId': 'abcd1234abcd',
'UserPoolDomain': 'testpool',
}
},])

View file

@ -419,6 +419,63 @@ def test_get_partition():
partition['Values'].should.equal(values[1])
@mock_glue
def test_batch_get_partition():
client = boto3.client('glue', region_name='us-east-1')
database_name = 'myspecialdatabase'
table_name = 'myfirsttable'
helpers.create_database(client, database_name)
helpers.create_table(client, database_name, table_name)
values = [['2018-10-01'], ['2018-09-01']]
helpers.create_partition(client, database_name, table_name, values=values[0])
helpers.create_partition(client, database_name, table_name, values=values[1])
partitions_to_get = [
{'Values': values[0]},
{'Values': values[1]},
]
response = client.batch_get_partition(DatabaseName=database_name, TableName=table_name, PartitionsToGet=partitions_to_get)
partitions = response['Partitions']
partitions.should.have.length_of(2)
partition = partitions[1]
partition['TableName'].should.equal(table_name)
partition['Values'].should.equal(values[1])
@mock_glue
def test_batch_get_partition_missing_partition():
client = boto3.client('glue', region_name='us-east-1')
database_name = 'myspecialdatabase'
table_name = 'myfirsttable'
helpers.create_database(client, database_name)
helpers.create_table(client, database_name, table_name)
values = [['2018-10-01'], ['2018-09-01'], ['2018-08-01']]
helpers.create_partition(client, database_name, table_name, values=values[0])
helpers.create_partition(client, database_name, table_name, values=values[2])
partitions_to_get = [
{'Values': values[0]},
{'Values': values[1]},
{'Values': values[2]},
]
response = client.batch_get_partition(DatabaseName=database_name, TableName=table_name, PartitionsToGet=partitions_to_get)
partitions = response['Partitions']
partitions.should.have.length_of(2)
partitions[0]['Values'].should.equal(values[0])
partitions[1]['Values'].should.equal(values[2])
@mock_glue
def test_update_partition_not_found_moving():
client = boto3.client('glue', region_name='us-east-1')

View file

@ -11,21 +11,29 @@ import sure # noqa
from moto import mock_kms, mock_kms_deprecated
from nose.tools import assert_raises
from freezegun import freeze_time
from datetime import date
from datetime import datetime
from dateutil.tz import tzutc
@mock_kms_deprecated
@mock_kms
def test_create_key():
conn = boto.kms.connect_to_region("us-west-2")
conn = boto3.client('kms', region_name='us-east-1')
with freeze_time("2015-01-01 00:00:00"):
key = conn.create_key(policy="my policy",
description="my key", key_usage='ENCRYPT_DECRYPT')
key = conn.create_key(Policy="my policy",
Description="my key",
KeyUsage='ENCRYPT_DECRYPT',
Tags=[
{
'TagKey': 'project',
'TagValue': 'moto',
},
])
key['KeyMetadata']['Description'].should.equal("my key")
key['KeyMetadata']['KeyUsage'].should.equal("ENCRYPT_DECRYPT")
key['KeyMetadata']['Enabled'].should.equal(True)
key['KeyMetadata']['CreationDate'].should.equal("1420070400")
key['KeyMetadata']['CreationDate'].should.be.a(date)
@mock_kms_deprecated
@ -183,6 +191,7 @@ def test_decrypt():
conn = boto.kms.connect_to_region('us-west-2')
response = conn.decrypt('ZW5jcnlwdG1l'.encode('utf-8'))
response['Plaintext'].should.equal(b'encryptme')
response['KeyId'].should.equal('key_id')
@mock_kms_deprecated

View file

@ -162,3 +162,63 @@ def test_delete_retention_policy():
response = conn.delete_log_group(logGroupName=log_group_name)
@mock_logs
def test_get_log_events():
conn = boto3.client('logs', 'us-west-2')
log_group_name = 'test'
log_stream_name = 'stream'
conn.create_log_group(logGroupName=log_group_name)
conn.create_log_stream(
logGroupName=log_group_name,
logStreamName=log_stream_name
)
events = [{'timestamp': x, 'message': str(x)} for x in range(20)]
conn.put_log_events(
logGroupName=log_group_name,
logStreamName=log_stream_name,
logEvents=events
)
resp = conn.get_log_events(
logGroupName=log_group_name,
logStreamName=log_stream_name,
limit=10)
resp['events'].should.have.length_of(10)
resp.should.have.key('nextForwardToken')
resp.should.have.key('nextBackwardToken')
for i in range(10):
resp['events'][i]['timestamp'].should.equal(i)
resp['events'][i]['message'].should.equal(str(i))
next_token = resp['nextForwardToken']
resp = conn.get_log_events(
logGroupName=log_group_name,
logStreamName=log_stream_name,
nextToken=next_token,
limit=10)
resp['events'].should.have.length_of(10)
resp.should.have.key('nextForwardToken')
resp.should.have.key('nextBackwardToken')
resp['nextForwardToken'].should.equal(next_token)
for i in range(10):
resp['events'][i]['timestamp'].should.equal(i+10)
resp['events'][i]['message'].should.equal(str(i+10))
resp = conn.get_log_events(
logGroupName=log_group_name,
logStreamName=log_stream_name,
nextToken=resp['nextBackwardToken'],
limit=10)
resp['events'].should.have.length_of(10)
resp.should.have.key('nextForwardToken')
resp.should.have.key('nextBackwardToken')
for i in range(10):
resp['events'][i]['timestamp'].should.equal(i)
resp['events'][i]['message'].should.equal(str(i))

View file

@ -1,7 +1,6 @@
from __future__ import unicode_literals
import six
import sure # noqa
import datetime
from moto.organizations import utils

View file

@ -3,7 +3,6 @@ from __future__ import unicode_literals
import boto3
import json
import six
import sure # noqa
from botocore.exceptions import ClientError
from nose.tools import assert_raises
@ -27,6 +26,25 @@ def test_create_organization():
validate_organization(response)
response['Organization']['FeatureSet'].should.equal('ALL')
response = client.list_accounts()
len(response['Accounts']).should.equal(1)
response['Accounts'][0]['Name'].should.equal('master')
response['Accounts'][0]['Id'].should.equal(utils.MASTER_ACCOUNT_ID)
response['Accounts'][0]['Email'].should.equal(utils.MASTER_ACCOUNT_EMAIL)
response = client.list_policies(Filter='SERVICE_CONTROL_POLICY')
len(response['Policies']).should.equal(1)
response['Policies'][0]['Name'].should.equal('FullAWSAccess')
response['Policies'][0]['Id'].should.equal(utils.DEFAULT_POLICY_ID)
response['Policies'][0]['AwsManaged'].should.equal(True)
response = client.list_targets_for_policy(PolicyId=utils.DEFAULT_POLICY_ID)
len(response['Targets']).should.equal(2)
root_ou = [t for t in response['Targets'] if t['Type'] == 'ROOT'][0]
root_ou['Name'].should.equal('Root')
master_account = [t for t in response['Targets'] if t['Type'] == 'ACCOUNT'][0]
master_account['Name'].should.equal('master')
@mock_organizations
def test_describe_organization():
@ -177,11 +195,11 @@ def test_list_accounts():
response = client.list_accounts()
response.should.have.key('Accounts')
accounts = response['Accounts']
len(accounts).should.equal(5)
len(accounts).should.equal(6)
for account in accounts:
validate_account(org, account)
accounts[3]['Name'].should.equal(mockname + '3')
accounts[2]['Email'].should.equal(mockname + '2' + '@' + mockdomain)
accounts[4]['Name'].should.equal(mockname + '3')
accounts[3]['Email'].should.equal(mockname + '2' + '@' + mockdomain)
@mock_organizations
@ -291,8 +309,10 @@ def test_list_children():
response02 = client.list_children(ParentId=root_id, ChildType='ORGANIZATIONAL_UNIT')
response03 = client.list_children(ParentId=ou01_id, ChildType='ACCOUNT')
response04 = client.list_children(ParentId=ou01_id, ChildType='ORGANIZATIONAL_UNIT')
response01['Children'][0]['Id'].should.equal(account01_id)
response01['Children'][0]['Id'].should.equal(utils.MASTER_ACCOUNT_ID)
response01['Children'][0]['Type'].should.equal('ACCOUNT')
response01['Children'][1]['Id'].should.equal(account01_id)
response01['Children'][1]['Type'].should.equal('ACCOUNT')
response02['Children'][0]['Id'].should.equal(ou01_id)
response02['Children'][0]['Type'].should.equal('ORGANIZATIONAL_UNIT')
response03['Children'][0]['Id'].should.equal(account02_id)
@ -591,4 +611,3 @@ def test_list_targets_for_policy_exception():
ex.operation_name.should.equal('ListTargetsForPolicy')
ex.response['Error']['Code'].should.equal('400')
ex.response['Error']['Message'].should.contain('InvalidInputException')

View file

@ -18,7 +18,8 @@ def test_create_database():
MasterUsername='root',
MasterUserPassword='hunter2',
Port=1234,
DBSecurityGroups=["my_sg"])
DBSecurityGroups=["my_sg"],
VpcSecurityGroupIds=['sg-123456'])
db_instance = database['DBInstance']
db_instance['AllocatedStorage'].should.equal(10)
db_instance['DBInstanceClass'].should.equal("db.m1.small")
@ -35,6 +36,7 @@ def test_create_database():
db_instance['DbiResourceId'].should.contain("db-")
db_instance['CopyTagsToSnapshot'].should.equal(False)
db_instance['InstanceCreateTime'].should.be.a("datetime.datetime")
db_instance['VpcSecurityGroups'][0]['VpcSecurityGroupId'].should.equal('sg-123456')
@mock_rds2
@ -260,9 +262,11 @@ def test_modify_db_instance():
instances['DBInstances'][0]['AllocatedStorage'].should.equal(10)
conn.modify_db_instance(DBInstanceIdentifier='db-master-1',
AllocatedStorage=20,
ApplyImmediately=True)
ApplyImmediately=True,
VpcSecurityGroupIds=['sg-123456'])
instances = conn.describe_db_instances(DBInstanceIdentifier='db-master-1')
instances['DBInstances'][0]['AllocatedStorage'].should.equal(20)
instances['DBInstances'][0]['VpcSecurityGroups'][0]['VpcSecurityGroupId'].should.equal('sg-123456')
@mock_rds2

View file

@ -36,6 +36,7 @@ def test_create_cluster_boto3():
response['Cluster']['NodeType'].should.equal('ds2.xlarge')
create_time = response['Cluster']['ClusterCreateTime']
create_time.should.be.lower_than(datetime.datetime.now(create_time.tzinfo))
create_time.should.be.greater_than(datetime.datetime.now(create_time.tzinfo) - datetime.timedelta(minutes=1))
@mock_redshift

View file

@ -2,7 +2,11 @@ from __future__ import unicode_literals
import boto3
import sure # noqa
from moto import mock_resourcegroupstaggingapi, mock_s3, mock_ec2, mock_elbv2
from moto import mock_ec2
from moto import mock_elbv2
from moto import mock_kms
from moto import mock_resourcegroupstaggingapi
from moto import mock_s3
@mock_s3
@ -225,10 +229,12 @@ def test_get_tag_values_ec2():
@mock_ec2
@mock_elbv2
@mock_kms
@mock_resourcegroupstaggingapi
def test_get_resources_elbv2():
conn = boto3.client('elbv2', region_name='us-east-1')
def test_get_many_resources():
elbv2 = boto3.client('elbv2', region_name='us-east-1')
ec2 = boto3.resource('ec2', region_name='us-east-1')
kms = boto3.client('kms', region_name='us-east-1')
security_group = ec2.create_security_group(
GroupName='a-security-group', Description='First One')
@ -242,7 +248,7 @@ def test_get_resources_elbv2():
CidrBlock='172.28.7.0/26',
AvailabilityZone='us-east-1b')
conn.create_load_balancer(
elbv2.create_load_balancer(
Name='my-lb',
Subnets=[subnet1.id, subnet2.id],
SecurityGroups=[security_group.id],
@ -259,13 +265,27 @@ def test_get_resources_elbv2():
]
)
conn.create_load_balancer(
elbv2.create_load_balancer(
Name='my-other-lb',
Subnets=[subnet1.id, subnet2.id],
SecurityGroups=[security_group.id],
Scheme='internal',
)
kms.create_key(
KeyUsage='ENCRYPT_DECRYPT',
Tags=[
{
'TagKey': 'key_name',
'TagValue': 'a_value'
},
{
'TagKey': 'key_2',
'TagValue': 'val2'
}
]
)
rtapi = boto3.client('resourcegroupstaggingapi', region_name='us-east-1')
resp = rtapi.get_resources(ResourceTypeFilters=['elasticloadbalancer:loadbalancer'])

View file

@ -652,6 +652,114 @@ def test_change_resource_record_sets_crud_valid():
response = conn.list_resource_record_sets(HostedZoneId=hosted_zone_id)
len(response['ResourceRecordSets']).should.equal(0)
@mock_route53
def test_change_weighted_resource_record_sets():
conn = boto3.client('route53', region_name='us-east-2')
conn.create_hosted_zone(
Name='test.vpc.internal.',
CallerReference=str(hash('test'))
)
zones = conn.list_hosted_zones_by_name(
DNSName='test.vpc.internal.'
)
hosted_zone_id = zones['HostedZones'][0]['Id']
#Create 2 weighted records
conn.change_resource_record_sets(
HostedZoneId=hosted_zone_id,
ChangeBatch={
'Changes': [
{
'Action': 'CREATE',
'ResourceRecordSet': {
'Name': 'test.vpc.internal',
'Type': 'A',
'SetIdentifier': 'test1',
'Weight': 50,
'AliasTarget': {
'HostedZoneId': 'Z3AADJGX6KTTL2',
'DNSName': 'internal-test1lb-447688172.us-east-2.elb.amazonaws.com.',
'EvaluateTargetHealth': True
}
}
},
{
'Action': 'CREATE',
'ResourceRecordSet': {
'Name': 'test.vpc.internal',
'Type': 'A',
'SetIdentifier': 'test2',
'Weight': 50,
'AliasTarget': {
'HostedZoneId': 'Z3AADJGX6KTTL2',
'DNSName': 'internal-testlb2-1116641781.us-east-2.elb.amazonaws.com.',
'EvaluateTargetHealth': True
}
}
}
]
}
)
response = conn.list_resource_record_sets(HostedZoneId=hosted_zone_id)
record = response['ResourceRecordSets'][0]
#Update the first record to have a weight of 90
conn.change_resource_record_sets(
HostedZoneId=hosted_zone_id,
ChangeBatch={
'Changes' : [
{
'Action' : 'UPSERT',
'ResourceRecordSet' : {
'Name' : record['Name'],
'Type' : record['Type'],
'SetIdentifier' : record['SetIdentifier'],
'Weight' : 90,
'AliasTarget' : {
'HostedZoneId' : record['AliasTarget']['HostedZoneId'],
'DNSName' : record['AliasTarget']['DNSName'],
'EvaluateTargetHealth' : record['AliasTarget']['EvaluateTargetHealth']
}
}
},
]
}
)
record = response['ResourceRecordSets'][1]
#Update the second record to have a weight of 10
conn.change_resource_record_sets(
HostedZoneId=hosted_zone_id,
ChangeBatch={
'Changes' : [
{
'Action' : 'UPSERT',
'ResourceRecordSet' : {
'Name' : record['Name'],
'Type' : record['Type'],
'SetIdentifier' : record['SetIdentifier'],
'Weight' : 10,
'AliasTarget' : {
'HostedZoneId' : record['AliasTarget']['HostedZoneId'],
'DNSName' : record['AliasTarget']['DNSName'],
'EvaluateTargetHealth' : record['AliasTarget']['EvaluateTargetHealth']
}
}
},
]
}
)
response = conn.list_resource_record_sets(HostedZoneId=hosted_zone_id)
for record in response['ResourceRecordSets']:
if record['SetIdentifier'] == 'test1':
record['Weight'].should.equal(90)
if record['SetIdentifier'] == 'test2':
record['Weight'].should.equal(10)
@mock_route53
def test_change_resource_record_invalid():

View file

@ -639,7 +639,7 @@ def test_delete_keys():
@mock_s3_deprecated
def test_delete_keys_with_invalid():
def test_delete_keys_invalid():
conn = boto.connect_s3('the_key', 'the_secret')
bucket = conn.create_bucket('foobar')
@ -648,6 +648,7 @@ def test_delete_keys_with_invalid():
Key(bucket=bucket, name='file3').set_contents_from_string('abc')
Key(bucket=bucket, name='file4').set_contents_from_string('abc')
# non-existing key case
result = bucket.delete_keys(['abc', 'file3'])
result.deleted.should.have.length_of(1)
@ -656,6 +657,18 @@ def test_delete_keys_with_invalid():
keys.should.have.length_of(3)
keys[0].name.should.equal('file1')
# empty keys
result = bucket.delete_keys([])
result.deleted.should.have.length_of(0)
result.errors.should.have.length_of(0)
@mock_s3
def test_boto3_delete_empty_keys_list():
with assert_raises(ClientError) as err:
boto3.client('s3').delete_objects(Bucket='foobar', Delete={'Objects': []})
assert err.exception.response["Error"]["Code"] == "MalformedXML"
@mock_s3_deprecated
def test_bucket_name_with_dot():
@ -1671,6 +1684,42 @@ def test_boto3_multipart_etag():
resp['ETag'].should.equal(EXPECTED_ETAG)
@mock_s3
@reduced_min_part_size
def test_boto3_multipart_part_size():
s3 = boto3.client('s3', region_name='us-east-1')
s3.create_bucket(Bucket='mybucket')
mpu = s3.create_multipart_upload(Bucket='mybucket', Key='the-key')
mpu_id = mpu["UploadId"]
parts = []
n_parts = 10
for i in range(1, n_parts + 1):
part_size = REDUCED_PART_SIZE + i
body = b'1' * part_size
part = s3.upload_part(
Bucket='mybucket',
Key='the-key',
PartNumber=i,
UploadId=mpu_id,
Body=body,
ContentLength=len(body),
)
parts.append({"PartNumber": i, "ETag": part["ETag"]})
s3.complete_multipart_upload(
Bucket='mybucket',
Key='the-key',
UploadId=mpu_id,
MultipartUpload={"Parts": parts},
)
for i in range(1, n_parts + 1):
obj = s3.head_object(Bucket='mybucket', Key='the-key', PartNumber=i)
assert obj["ContentLength"] == REDUCED_PART_SIZE + i
@mock_s3
def test_boto3_put_object_with_tagging():
s3 = boto3.client('s3', region_name='us-east-1')

View file

@ -1,16 +1,12 @@
from __future__ import unicode_literals
import boto
import boto3
from boto.exception import S3CreateError, S3ResponseError
from boto.s3.lifecycle import Lifecycle, Transition, Expiration, Rule
import sure # noqa
from botocore.exceptions import ClientError
from datetime import datetime
from nose.tools import assert_raises
from moto import mock_s3_deprecated, mock_s3
from moto import mock_s3
@mock_s3
@ -41,6 +37,18 @@ def test_s3_storage_class_infrequent_access():
D['Contents'][0]["StorageClass"].should.equal("STANDARD_IA")
@mock_s3
def test_s3_storage_class_intelligent_tiering():
s3 = boto3.client("s3")
s3.create_bucket(Bucket="Bucket")
s3.put_object(Bucket="Bucket", Key="my_key_infrequent", Body="my_value_infrequent", StorageClass="INTELLIGENT_TIERING")
objects = s3.list_objects(Bucket="Bucket")
objects['Contents'][0]["StorageClass"].should.equal("INTELLIGENT_TIERING")
@mock_s3
def test_s3_storage_class_copy():
s3 = boto3.client("s3")
@ -90,6 +98,7 @@ def test_s3_invalid_storage_class():
e.response["Error"]["Code"].should.equal("InvalidStorageClass")
e.response["Error"]["Message"].should.equal("The storage class you specified is not valid")
@mock_s3
def test_s3_default_storage_class():
s3 = boto3.client("s3")
@ -103,4 +112,27 @@ def test_s3_default_storage_class():
list_of_objects["Contents"][0]["StorageClass"].should.equal("STANDARD")
@mock_s3
def test_s3_copy_object_error_for_glacier_storage_class():
s3 = boto3.client("s3")
s3.create_bucket(Bucket="Bucket")
s3.put_object(Bucket="Bucket", Key="First_Object", Body="Body", StorageClass="GLACIER")
with assert_raises(ClientError) as exc:
s3.copy_object(CopySource={"Bucket": "Bucket", "Key": "First_Object"}, Bucket="Bucket", Key="Second_Object")
exc.exception.response["Error"]["Code"].should.equal("ObjectNotInActiveTierError")
@mock_s3
def test_s3_copy_object_error_for_deep_archive_storage_class():
s3 = boto3.client("s3")
s3.create_bucket(Bucket="Bucket")
s3.put_object(Bucket="Bucket", Key="First_Object", Body="Body", StorageClass="DEEP_ARCHIVE")
with assert_raises(ClientError) as exc:
s3.copy_object(CopySource={"Bucket": "Bucket", "Key": "First_Object"}, Bucket="Bucket", Key="Second_Object")
exc.exception.response["Error"]["Code"].should.equal("ObjectNotInActiveTierError")

View file

@ -3,11 +3,15 @@ import json
import boto
import boto3
from botocore.client import ClientError
from freezegun import freeze_time
from nose.tools import assert_raises
import sure # noqa
from moto import mock_sts, mock_sts_deprecated, mock_iam
from moto.iam.models import ACCOUNT_ID
from moto.sts.responses import MAX_FEDERATION_TOKEN_POLICY_LENGTH
@freeze_time("2012-01-01 12:00:00")
@ -80,6 +84,41 @@ def test_assume_role():
assume_role_response['AssumedRoleUser']['AssumedRoleId'].should.have.length_of(21 + 1 + len(session_name))
@freeze_time("2012-01-01 12:00:00")
@mock_sts_deprecated
def test_assume_role_with_web_identity():
conn = boto.connect_sts()
policy = json.dumps({
"Statement": [
{
"Sid": "Stmt13690092345534",
"Action": [
"S3:ListBucket"
],
"Effect": "Allow",
"Resource": [
"arn:aws:s3:::foobar-tester"
]
},
]
})
s3_role = "arn:aws:iam::123456789012:role/test-role"
role = conn.assume_role_with_web_identity(
s3_role, "session-name", policy, duration_seconds=123)
credentials = role.credentials
credentials.expiration.should.equal('2012-01-01T12:02:03.000Z')
credentials.session_token.should.have.length_of(356)
assert credentials.session_token.startswith("FQoGZXIvYXdzE")
credentials.access_key.should.have.length_of(20)
assert credentials.access_key.startswith("ASIA")
credentials.secret_key.should.have.length_of(40)
role.user.arn.should.equal("arn:aws:iam::123456789012:role/test-role")
role.user.assume_role_id.should.contain("session-name")
@mock_sts
def test_get_caller_identity_with_default_credentials():
identity = boto3.client(
@ -137,3 +176,32 @@ def test_get_caller_identity_with_assumed_role_credentials():
identity['Arn'].should.equal(assumed_role['AssumedRoleUser']['Arn'])
identity['UserId'].should.equal(assumed_role['AssumedRoleUser']['AssumedRoleId'])
identity['Account'].should.equal(str(ACCOUNT_ID))
@mock_sts
def test_federation_token_with_too_long_policy():
"Trying to get a federation token with a policy longer than 2048 character should fail"
cli = boto3.client("sts", region_name='us-east-1')
resource_tmpl = 'arn:aws:s3:::yyyy-xxxxx-cloud-default/my_default_folder/folder-name-%s/*'
statements = []
for num in range(30):
statements.append(
{
'Effect': 'Allow',
'Action': ['s3:*'],
'Resource': resource_tmpl % str(num)
}
)
policy = {
'Version': '2012-10-17',
'Statement': statements
}
json_policy = json.dumps(policy)
assert len(json_policy) > MAX_FEDERATION_TOKEN_POLICY_LENGTH
with assert_raises(ClientError) as exc:
cli.get_federation_token(Name='foo', DurationSeconds=3600, Policy=json_policy)
exc.exception.response['Error']['Code'].should.equal('ValidationError')
exc.exception.response['Error']['Message'].should.contain(
str(MAX_FEDERATION_TOKEN_POLICY_LENGTH)
)