Added basic support for SQS MessageAttributes.

This commit is contained in:
Ralfas 2014-09-28 20:59:14 +01:00
commit 76aa9a8b22
5 changed files with 134 additions and 3 deletions

View file

@ -8,7 +8,7 @@ import sure # noqa
import time
from moto import mock_sqs
from tests.helpers import requires_boto_gte
@mock_sqs
def test_create_queue():
@ -92,6 +92,32 @@ def test_send_message():
messages[1].get_body().should.equal(body_two)
@requires_boto_gte("2.28")
@mock_sqs
def test_send_message_with_attributes():
conn = boto.connect_sqs('the_key', 'the_secret')
queue = conn.create_queue("test-queue", visibility_timeout=60)
queue.set_message_class(RawMessage)
body = 'this is a test message'
message = queue.new_message(body)
message_attributes = {
'test.attribute_name' : {'data_type' : 'String', 'string_value' : 'attribute value'},
'test.binary_attribute' : {'data_type' : 'Binary', 'binary_value' : 'binary value'},
'test.number_attribute' : {'data_type' : 'Number', 'string_value' : 'string value'}
}
message.message_attributes = message_attributes
queue.write(message)
messages = conn.receive_message(queue)
messages[0].get_body().should.equal(body)
for name, value in message_attributes.items():
dict(messages[0].message_attributes[name]).should.equal(value)
@mock_sqs
def test_send_message_with_delay():
conn = boto.connect_sqs('the_key', 'the_secret')
@ -257,6 +283,23 @@ def test_send_batch_operation():
messages = queue.get_messages(2)
@requires_boto_gte("2.28")
@mock_sqs
def test_send_batch_operation_with_message_attributes():
conn = boto.connect_sqs('the_key', 'the_secret')
queue = conn.create_queue("test-queue", visibility_timeout=60)
queue.set_message_class(RawMessage)
message_tuple = ("my_first_message", 'test message 1', 0, {'name1': {'data_type': 'String', 'string_value': 'foo'}})
queue.write_batch([message_tuple])
messages = queue.get_messages()
messages[0].get_body().should.equal("test message 1")
for name, value in message_tuple[3].items():
dict(messages[0].message_attributes[name]).should.equal(value)
@mock_sqs
def test_delete_batch_operation():
conn = boto.connect_sqs('the_key', 'the_secret')