parent
e66916d5f1
commit
d24099c401
4 changed files with 90 additions and 15 deletions
|
|
@ -1,5 +1,6 @@
|
|||
from __future__ import unicode_literals
|
||||
|
||||
from collections import OrderedDict
|
||||
import datetime
|
||||
import uuid
|
||||
|
||||
|
|
@ -13,6 +14,7 @@ from moto.sqs import sqs_backends
|
|||
from .utils import make_arn_for_topic, make_arn_for_subscription
|
||||
|
||||
DEFAULT_ACCOUNT_ID = 123456789012
|
||||
DEFAULT_PAGE_SIZE = 100
|
||||
|
||||
|
||||
class Topic(object):
|
||||
|
|
@ -32,7 +34,7 @@ class Topic(object):
|
|||
|
||||
def publish(self, message):
|
||||
message_id = six.text_type(uuid.uuid4())
|
||||
subscriptions = self.sns_backend.list_subscriptions(self.arn)
|
||||
subscriptions, _ = self.sns_backend.list_subscriptions(self.arn)
|
||||
for subscription in subscriptions:
|
||||
subscription.publish(message, message_id)
|
||||
return message_id
|
||||
|
|
@ -77,16 +79,27 @@ class Subscription(object):
|
|||
|
||||
class SNSBackend(BaseBackend):
|
||||
def __init__(self):
|
||||
self.topics = {}
|
||||
self.subscriptions = {}
|
||||
self.topics = OrderedDict()
|
||||
self.subscriptions = OrderedDict()
|
||||
|
||||
def create_topic(self, name):
|
||||
topic = Topic(name, self)
|
||||
self.topics[topic.arn] = topic
|
||||
return topic
|
||||
|
||||
def list_topics(self):
|
||||
return self.topics.values()
|
||||
def _get_values_nexttoken(self, values_map, next_token=None):
|
||||
if next_token is None:
|
||||
next_token = 0
|
||||
next_token = int(next_token)
|
||||
values = list(values_map.values())[next_token: next_token + DEFAULT_PAGE_SIZE]
|
||||
if len(values) == DEFAULT_PAGE_SIZE:
|
||||
next_token = next_token + DEFAULT_PAGE_SIZE
|
||||
else:
|
||||
next_token = None
|
||||
return values, next_token
|
||||
|
||||
def list_topics(self, next_token=None):
|
||||
return self._get_values_nexttoken(self.topics, next_token)
|
||||
|
||||
def delete_topic(self, arn):
|
||||
self.topics.pop(arn)
|
||||
|
|
@ -107,12 +120,13 @@ class SNSBackend(BaseBackend):
|
|||
def unsubscribe(self, subscription_arn):
|
||||
self.subscriptions.pop(subscription_arn)
|
||||
|
||||
def list_subscriptions(self, topic_arn=None):
|
||||
def list_subscriptions(self, topic_arn=None, next_token=None):
|
||||
if topic_arn:
|
||||
topic = self.get_topic(topic_arn)
|
||||
return [sub for sub in self.subscriptions.values() if sub.topic == topic]
|
||||
filtered = OrderedDict([(k, sub) for k, sub in self.subscriptions.items() if sub.topic == topic])
|
||||
return self._get_values_nexttoken(filtered, next_token)
|
||||
else:
|
||||
return self.subscriptions.values()
|
||||
return self._get_values_nexttoken(self.subscriptions, next_token)
|
||||
|
||||
def publish(self, topic_arn, message):
|
||||
topic = self.get_topic(topic_arn)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue