Added support for WaitTimeSeconds in SQS #392

This commit is contained in:
Jot 2015-08-10 22:23:17 +02:00 committed by jsmiejczak
commit 178d1c3a93
3 changed files with 81 additions and 17 deletions

View file

@ -103,11 +103,15 @@ class Queue(object):
'MessageRetentionPeriod',
'QueueArn',
'ReceiveMessageWaitTimeSeconds',
'VisibilityTimeout']
'VisibilityTimeout',
'WaitTimeSeconds']
def __init__(self, name, visibility_timeout):
def __init__(self, name, visibility_timeout, wait_time_seconds):
self.name = name
self.visibility_timeout = visibility_timeout or 30
# wait_time_seconds will be set to immediate return messages
self.wait_time_seconds = wait_time_seconds or 0
self._messages = []
now = time.time()
@ -128,6 +132,7 @@ class Queue(object):
return sqs_backend.create_queue(
name=properties['QueueName'],
visibility_timeout=properties.get('VisibilityTimeout'),
wait_time_seconds=properties.get('WaitTimeSeconds')
)
@classmethod
@ -139,6 +144,9 @@ class Queue(object):
queue = sqs_backend.get_queue(queue_name)
if 'VisibilityTimeout' in properties:
queue.visibility_timeout = int(properties['VisibilityTimeout'])
if 'WaitTimeSeconds' in properties:
queue.wait_time_seconds = int(properties['WaitTimeSeconds'])
return queue
@classmethod
@ -192,10 +200,10 @@ class SQSBackend(BaseBackend):
self.queues = {}
super(SQSBackend, self).__init__()
def create_queue(self, name, visibility_timeout):
def create_queue(self, name, visibility_timeout, wait_time_seconds):
queue = self.queues.get(name)
if queue is None:
queue = Queue(name, visibility_timeout)
queue = Queue(name, visibility_timeout, wait_time_seconds)
self.queues[name] = queue
return queue
@ -246,7 +254,7 @@ class SQSBackend(BaseBackend):
return message
def receive_messages(self, queue_name, count):
def receive_messages(self, queue_name, count, wait_seconds_timeout):
"""
Attempt to retrieve visible messages from a queue.
@ -260,13 +268,20 @@ class SQSBackend(BaseBackend):
"""
queue = self.get_queue(queue_name)
result = []
polling_end = time.time() + wait_seconds_timeout
# queue.messages only contains visible messages
for message in queue.messages:
message.mark_received(
visibility_timeout=queue.visibility_timeout
)
result.append(message)
if len(result) >= count:
while True:
for message in queue.messages:
message.mark_received(
visibility_timeout=queue.visibility_timeout
)
result.append(message)
if len(result) >= count:
break
if time.time() > polling_end:
break
return result