Merge remote-tracking branch 'upstream/master'
This commit is contained in:
commit
0527e88d46
541 changed files with 75504 additions and 51429 deletions
|
|
@ -1,30 +1,27 @@
|
|||
import boto
|
||||
from boto.ec2.cloudwatch.alarm import MetricAlarm
|
||||
import boto3
|
||||
from datetime import datetime, timedelta
|
||||
import pytz
|
||||
import sure # noqa
|
||||
|
||||
from moto import mock_cloudwatch_deprecated
|
||||
|
||||
|
||||
def alarm_fixture(name="tester", action=None):
|
||||
action = action or ['arn:alarm']
|
||||
action = action or ["arn:alarm"]
|
||||
return MetricAlarm(
|
||||
name=name,
|
||||
namespace="{0}_namespace".format(name),
|
||||
metric="{0}_metric".format(name),
|
||||
comparison='>=',
|
||||
comparison=">=",
|
||||
threshold=2.0,
|
||||
period=60,
|
||||
evaluation_periods=5,
|
||||
statistic='Average',
|
||||
description='A test',
|
||||
dimensions={'InstanceId': ['i-0123456,i-0123457']},
|
||||
statistic="Average",
|
||||
description="A test",
|
||||
dimensions={"InstanceId": ["i-0123456,i-0123457"]},
|
||||
alarm_actions=action,
|
||||
ok_actions=['arn:ok'],
|
||||
insufficient_data_actions=['arn:insufficient'],
|
||||
unit='Seconds',
|
||||
ok_actions=["arn:ok"],
|
||||
insufficient_data_actions=["arn:insufficient"],
|
||||
unit="Seconds",
|
||||
)
|
||||
|
||||
|
||||
|
|
@ -38,21 +35,20 @@ def test_create_alarm():
|
|||
alarms = conn.describe_alarms()
|
||||
alarms.should.have.length_of(1)
|
||||
alarm = alarms[0]
|
||||
alarm.name.should.equal('tester')
|
||||
alarm.namespace.should.equal('tester_namespace')
|
||||
alarm.metric.should.equal('tester_metric')
|
||||
alarm.comparison.should.equal('>=')
|
||||
alarm.name.should.equal("tester")
|
||||
alarm.namespace.should.equal("tester_namespace")
|
||||
alarm.metric.should.equal("tester_metric")
|
||||
alarm.comparison.should.equal(">=")
|
||||
alarm.threshold.should.equal(2.0)
|
||||
alarm.period.should.equal(60)
|
||||
alarm.evaluation_periods.should.equal(5)
|
||||
alarm.statistic.should.equal('Average')
|
||||
alarm.description.should.equal('A test')
|
||||
dict(alarm.dimensions).should.equal(
|
||||
{'InstanceId': ['i-0123456,i-0123457']})
|
||||
list(alarm.alarm_actions).should.equal(['arn:alarm'])
|
||||
list(alarm.ok_actions).should.equal(['arn:ok'])
|
||||
list(alarm.insufficient_data_actions).should.equal(['arn:insufficient'])
|
||||
alarm.unit.should.equal('Seconds')
|
||||
alarm.statistic.should.equal("Average")
|
||||
alarm.description.should.equal("A test")
|
||||
dict(alarm.dimensions).should.equal({"InstanceId": ["i-0123456,i-0123457"]})
|
||||
list(alarm.alarm_actions).should.equal(["arn:alarm"])
|
||||
list(alarm.ok_actions).should.equal(["arn:ok"])
|
||||
list(alarm.insufficient_data_actions).should.equal(["arn:insufficient"])
|
||||
alarm.unit.should.equal("Seconds")
|
||||
|
||||
|
||||
@mock_cloudwatch_deprecated
|
||||
|
|
@ -79,19 +75,18 @@ def test_put_metric_data():
|
|||
conn = boto.connect_cloudwatch()
|
||||
|
||||
conn.put_metric_data(
|
||||
namespace='tester',
|
||||
name='metric',
|
||||
namespace="tester",
|
||||
name="metric",
|
||||
value=1.5,
|
||||
dimensions={'InstanceId': ['i-0123456,i-0123457']},
|
||||
dimensions={"InstanceId": ["i-0123456,i-0123457"]},
|
||||
)
|
||||
|
||||
metrics = conn.list_metrics()
|
||||
metrics.should.have.length_of(1)
|
||||
metric = metrics[0]
|
||||
metric.namespace.should.equal('tester')
|
||||
metric.name.should.equal('metric')
|
||||
dict(metric.dimensions).should.equal(
|
||||
{'InstanceId': ['i-0123456,i-0123457']})
|
||||
metric.namespace.should.equal("tester")
|
||||
metric.name.should.equal("metric")
|
||||
dict(metric.dimensions).should.equal({"InstanceId": ["i-0123456,i-0123457"]})
|
||||
|
||||
|
||||
@mock_cloudwatch_deprecated
|
||||
|
|
@ -110,8 +105,7 @@ def test_describe_alarms():
|
|||
alarms.should.have.length_of(4)
|
||||
alarms = conn.describe_alarms(alarm_name_prefix="nfoo")
|
||||
alarms.should.have.length_of(2)
|
||||
alarms = conn.describe_alarms(
|
||||
alarm_names=["nfoobar", "nbarfoo", "nbazfoo"])
|
||||
alarms = conn.describe_alarms(alarm_names=["nfoobar", "nbarfoo", "nbazfoo"])
|
||||
alarms.should.have.length_of(3)
|
||||
alarms = conn.describe_alarms(action_prefix="afoo")
|
||||
alarms.should.have.length_of(2)
|
||||
|
|
|
|||
515
tests/test_cloudwatch/test_cloudwatch_boto3.py
Executable file → Normal file
515
tests/test_cloudwatch/test_cloudwatch_boto3.py
Executable file → Normal file
|
|
@ -1,224 +1,291 @@
|
|||
from __future__ import unicode_literals
|
||||
|
||||
import boto3
|
||||
from botocore.exceptions import ClientError
|
||||
from datetime import datetime, timedelta
|
||||
import pytz
|
||||
import sure # noqa
|
||||
|
||||
from moto import mock_cloudwatch
|
||||
|
||||
|
||||
@mock_cloudwatch
|
||||
def test_put_list_dashboard():
|
||||
client = boto3.client('cloudwatch', region_name='eu-central-1')
|
||||
widget = '{"widgets": [{"type": "text", "x": 0, "y": 7, "width": 3, "height": 3, "properties": {"markdown": "Hello world"}}]}'
|
||||
|
||||
client.put_dashboard(DashboardName='test1', DashboardBody=widget)
|
||||
resp = client.list_dashboards()
|
||||
|
||||
len(resp['DashboardEntries']).should.equal(1)
|
||||
|
||||
|
||||
@mock_cloudwatch
|
||||
def test_put_list_prefix_nomatch_dashboard():
|
||||
client = boto3.client('cloudwatch', region_name='eu-central-1')
|
||||
widget = '{"widgets": [{"type": "text", "x": 0, "y": 7, "width": 3, "height": 3, "properties": {"markdown": "Hello world"}}]}'
|
||||
|
||||
client.put_dashboard(DashboardName='test1', DashboardBody=widget)
|
||||
resp = client.list_dashboards(DashboardNamePrefix='nomatch')
|
||||
|
||||
len(resp['DashboardEntries']).should.equal(0)
|
||||
|
||||
|
||||
@mock_cloudwatch
|
||||
def test_delete_dashboard():
|
||||
client = boto3.client('cloudwatch', region_name='eu-central-1')
|
||||
widget = '{"widgets": [{"type": "text", "x": 0, "y": 7, "width": 3, "height": 3, "properties": {"markdown": "Hello world"}}]}'
|
||||
|
||||
client.put_dashboard(DashboardName='test1', DashboardBody=widget)
|
||||
client.put_dashboard(DashboardName='test2', DashboardBody=widget)
|
||||
client.put_dashboard(DashboardName='test3', DashboardBody=widget)
|
||||
client.delete_dashboards(DashboardNames=['test2', 'test1'])
|
||||
|
||||
resp = client.list_dashboards(DashboardNamePrefix='test3')
|
||||
len(resp['DashboardEntries']).should.equal(1)
|
||||
|
||||
|
||||
@mock_cloudwatch
|
||||
def test_delete_dashboard_fail():
|
||||
client = boto3.client('cloudwatch', region_name='eu-central-1')
|
||||
widget = '{"widgets": [{"type": "text", "x": 0, "y": 7, "width": 3, "height": 3, "properties": {"markdown": "Hello world"}}]}'
|
||||
|
||||
client.put_dashboard(DashboardName='test1', DashboardBody=widget)
|
||||
client.put_dashboard(DashboardName='test2', DashboardBody=widget)
|
||||
client.put_dashboard(DashboardName='test3', DashboardBody=widget)
|
||||
# Doesnt delete anything if all dashboards to be deleted do not exist
|
||||
try:
|
||||
client.delete_dashboards(DashboardNames=['test2', 'test1', 'test_no_match'])
|
||||
except ClientError as err:
|
||||
err.response['Error']['Code'].should.equal('ResourceNotFound')
|
||||
else:
|
||||
raise RuntimeError('Should of raised error')
|
||||
|
||||
resp = client.list_dashboards()
|
||||
len(resp['DashboardEntries']).should.equal(3)
|
||||
|
||||
|
||||
@mock_cloudwatch
|
||||
def test_get_dashboard():
|
||||
client = boto3.client('cloudwatch', region_name='eu-central-1')
|
||||
widget = '{"widgets": [{"type": "text", "x": 0, "y": 7, "width": 3, "height": 3, "properties": {"markdown": "Hello world"}}]}'
|
||||
client.put_dashboard(DashboardName='test1', DashboardBody=widget)
|
||||
|
||||
resp = client.get_dashboard(DashboardName='test1')
|
||||
resp.should.contain('DashboardArn')
|
||||
resp.should.contain('DashboardBody')
|
||||
resp['DashboardName'].should.equal('test1')
|
||||
|
||||
|
||||
@mock_cloudwatch
|
||||
def test_get_dashboard_fail():
|
||||
client = boto3.client('cloudwatch', region_name='eu-central-1')
|
||||
|
||||
try:
|
||||
client.get_dashboard(DashboardName='test1')
|
||||
except ClientError as err:
|
||||
err.response['Error']['Code'].should.equal('ResourceNotFound')
|
||||
else:
|
||||
raise RuntimeError('Should of raised error')
|
||||
|
||||
|
||||
@mock_cloudwatch
|
||||
def test_alarm_state():
|
||||
client = boto3.client('cloudwatch', region_name='eu-central-1')
|
||||
|
||||
client.put_metric_alarm(
|
||||
AlarmName='testalarm1',
|
||||
MetricName='cpu',
|
||||
Namespace='blah',
|
||||
Period=10,
|
||||
EvaluationPeriods=5,
|
||||
Statistic='Average',
|
||||
Threshold=2,
|
||||
ComparisonOperator='GreaterThanThreshold',
|
||||
)
|
||||
client.put_metric_alarm(
|
||||
AlarmName='testalarm2',
|
||||
MetricName='cpu',
|
||||
Namespace='blah',
|
||||
Period=10,
|
||||
EvaluationPeriods=5,
|
||||
Statistic='Average',
|
||||
Threshold=2,
|
||||
ComparisonOperator='GreaterThanThreshold',
|
||||
)
|
||||
|
||||
# This is tested implicitly as if it doesnt work the rest will die
|
||||
client.set_alarm_state(
|
||||
AlarmName='testalarm1',
|
||||
StateValue='ALARM',
|
||||
StateReason='testreason',
|
||||
StateReasonData='{"some": "json_data"}'
|
||||
)
|
||||
|
||||
resp = client.describe_alarms(
|
||||
StateValue='ALARM'
|
||||
)
|
||||
len(resp['MetricAlarms']).should.equal(1)
|
||||
resp['MetricAlarms'][0]['AlarmName'].should.equal('testalarm1')
|
||||
resp['MetricAlarms'][0]['StateValue'].should.equal('ALARM')
|
||||
|
||||
resp = client.describe_alarms(
|
||||
StateValue='OK'
|
||||
)
|
||||
len(resp['MetricAlarms']).should.equal(1)
|
||||
resp['MetricAlarms'][0]['AlarmName'].should.equal('testalarm2')
|
||||
resp['MetricAlarms'][0]['StateValue'].should.equal('OK')
|
||||
|
||||
# Just for sanity
|
||||
resp = client.describe_alarms()
|
||||
len(resp['MetricAlarms']).should.equal(2)
|
||||
|
||||
|
||||
@mock_cloudwatch
|
||||
def test_put_metric_data_no_dimensions():
|
||||
conn = boto3.client('cloudwatch', region_name='us-east-1')
|
||||
|
||||
conn.put_metric_data(
|
||||
Namespace='tester',
|
||||
MetricData=[
|
||||
dict(
|
||||
MetricName='metric',
|
||||
Value=1.5,
|
||||
)
|
||||
]
|
||||
)
|
||||
|
||||
metrics = conn.list_metrics()['Metrics']
|
||||
metrics.should.have.length_of(1)
|
||||
metric = metrics[0]
|
||||
metric['Namespace'].should.equal('tester')
|
||||
metric['MetricName'].should.equal('metric')
|
||||
|
||||
|
||||
|
||||
@mock_cloudwatch
|
||||
def test_put_metric_data_with_statistics():
|
||||
conn = boto3.client('cloudwatch', region_name='us-east-1')
|
||||
|
||||
conn.put_metric_data(
|
||||
Namespace='tester',
|
||||
MetricData=[
|
||||
dict(
|
||||
MetricName='statmetric',
|
||||
Timestamp=datetime(2015, 1, 1),
|
||||
# no Value to test https://github.com/spulec/moto/issues/1615
|
||||
StatisticValues=dict(
|
||||
SampleCount=123.0,
|
||||
Sum=123.0,
|
||||
Minimum=123.0,
|
||||
Maximum=123.0
|
||||
),
|
||||
Unit='Milliseconds',
|
||||
StorageResolution=123
|
||||
)
|
||||
]
|
||||
)
|
||||
|
||||
metrics = conn.list_metrics()['Metrics']
|
||||
metrics.should.have.length_of(1)
|
||||
metric = metrics[0]
|
||||
metric['Namespace'].should.equal('tester')
|
||||
metric['MetricName'].should.equal('statmetric')
|
||||
# TODO: test statistics - https://github.com/spulec/moto/issues/1615
|
||||
|
||||
@mock_cloudwatch
|
||||
def test_get_metric_statistics():
|
||||
conn = boto3.client('cloudwatch', region_name='us-east-1')
|
||||
utc_now = datetime.now(tz=pytz.utc)
|
||||
|
||||
conn.put_metric_data(
|
||||
Namespace='tester',
|
||||
MetricData=[
|
||||
dict(
|
||||
MetricName='metric',
|
||||
Value=1.5,
|
||||
Timestamp=utc_now
|
||||
)
|
||||
]
|
||||
)
|
||||
|
||||
stats = conn.get_metric_statistics(
|
||||
Namespace='tester',
|
||||
MetricName='metric',
|
||||
StartTime=utc_now - timedelta(seconds=60),
|
||||
EndTime=utc_now + timedelta(seconds=60),
|
||||
Period=60,
|
||||
Statistics=['SampleCount', 'Sum']
|
||||
)
|
||||
|
||||
stats['Datapoints'].should.have.length_of(1)
|
||||
datapoint = stats['Datapoints'][0]
|
||||
datapoint['SampleCount'].should.equal(1.0)
|
||||
datapoint['Sum'].should.equal(1.5)
|
||||
# from __future__ import unicode_literals
|
||||
|
||||
import boto3
|
||||
from botocore.exceptions import ClientError
|
||||
from datetime import datetime, timedelta
|
||||
from nose.tools import assert_raises
|
||||
from uuid import uuid4
|
||||
import pytz
|
||||
import sure # noqa
|
||||
|
||||
from moto import mock_cloudwatch
|
||||
|
||||
|
||||
@mock_cloudwatch
|
||||
def test_put_list_dashboard():
|
||||
client = boto3.client("cloudwatch", region_name="eu-central-1")
|
||||
widget = '{"widgets": [{"type": "text", "x": 0, "y": 7, "width": 3, "height": 3, "properties": {"markdown": "Hello world"}}]}'
|
||||
|
||||
client.put_dashboard(DashboardName="test1", DashboardBody=widget)
|
||||
resp = client.list_dashboards()
|
||||
|
||||
len(resp["DashboardEntries"]).should.equal(1)
|
||||
|
||||
|
||||
@mock_cloudwatch
|
||||
def test_put_list_prefix_nomatch_dashboard():
|
||||
client = boto3.client("cloudwatch", region_name="eu-central-1")
|
||||
widget = '{"widgets": [{"type": "text", "x": 0, "y": 7, "width": 3, "height": 3, "properties": {"markdown": "Hello world"}}]}'
|
||||
|
||||
client.put_dashboard(DashboardName="test1", DashboardBody=widget)
|
||||
resp = client.list_dashboards(DashboardNamePrefix="nomatch")
|
||||
|
||||
len(resp["DashboardEntries"]).should.equal(0)
|
||||
|
||||
|
||||
@mock_cloudwatch
|
||||
def test_delete_dashboard():
|
||||
client = boto3.client("cloudwatch", region_name="eu-central-1")
|
||||
widget = '{"widgets": [{"type": "text", "x": 0, "y": 7, "width": 3, "height": 3, "properties": {"markdown": "Hello world"}}]}'
|
||||
|
||||
client.put_dashboard(DashboardName="test1", DashboardBody=widget)
|
||||
client.put_dashboard(DashboardName="test2", DashboardBody=widget)
|
||||
client.put_dashboard(DashboardName="test3", DashboardBody=widget)
|
||||
client.delete_dashboards(DashboardNames=["test2", "test1"])
|
||||
|
||||
resp = client.list_dashboards(DashboardNamePrefix="test3")
|
||||
len(resp["DashboardEntries"]).should.equal(1)
|
||||
|
||||
|
||||
@mock_cloudwatch
|
||||
def test_delete_dashboard_fail():
|
||||
client = boto3.client("cloudwatch", region_name="eu-central-1")
|
||||
widget = '{"widgets": [{"type": "text", "x": 0, "y": 7, "width": 3, "height": 3, "properties": {"markdown": "Hello world"}}]}'
|
||||
|
||||
client.put_dashboard(DashboardName="test1", DashboardBody=widget)
|
||||
client.put_dashboard(DashboardName="test2", DashboardBody=widget)
|
||||
client.put_dashboard(DashboardName="test3", DashboardBody=widget)
|
||||
# Doesnt delete anything if all dashboards to be deleted do not exist
|
||||
try:
|
||||
client.delete_dashboards(DashboardNames=["test2", "test1", "test_no_match"])
|
||||
except ClientError as err:
|
||||
err.response["Error"]["Code"].should.equal("ResourceNotFound")
|
||||
else:
|
||||
raise RuntimeError("Should of raised error")
|
||||
|
||||
resp = client.list_dashboards()
|
||||
len(resp["DashboardEntries"]).should.equal(3)
|
||||
|
||||
|
||||
@mock_cloudwatch
|
||||
def test_get_dashboard():
|
||||
client = boto3.client("cloudwatch", region_name="eu-central-1")
|
||||
widget = '{"widgets": [{"type": "text", "x": 0, "y": 7, "width": 3, "height": 3, "properties": {"markdown": "Hello world"}}]}'
|
||||
client.put_dashboard(DashboardName="test1", DashboardBody=widget)
|
||||
|
||||
resp = client.get_dashboard(DashboardName="test1")
|
||||
resp.should.contain("DashboardArn")
|
||||
resp.should.contain("DashboardBody")
|
||||
resp["DashboardName"].should.equal("test1")
|
||||
|
||||
|
||||
@mock_cloudwatch
|
||||
def test_get_dashboard_fail():
|
||||
client = boto3.client("cloudwatch", region_name="eu-central-1")
|
||||
|
||||
try:
|
||||
client.get_dashboard(DashboardName="test1")
|
||||
except ClientError as err:
|
||||
err.response["Error"]["Code"].should.equal("ResourceNotFound")
|
||||
else:
|
||||
raise RuntimeError("Should of raised error")
|
||||
|
||||
|
||||
@mock_cloudwatch
|
||||
def test_alarm_state():
|
||||
client = boto3.client("cloudwatch", region_name="eu-central-1")
|
||||
|
||||
client.put_metric_alarm(
|
||||
AlarmName="testalarm1",
|
||||
MetricName="cpu",
|
||||
Namespace="blah",
|
||||
Period=10,
|
||||
EvaluationPeriods=5,
|
||||
Statistic="Average",
|
||||
Threshold=2,
|
||||
ComparisonOperator="GreaterThanThreshold",
|
||||
)
|
||||
client.put_metric_alarm(
|
||||
AlarmName="testalarm2",
|
||||
MetricName="cpu",
|
||||
Namespace="blah",
|
||||
Period=10,
|
||||
EvaluationPeriods=5,
|
||||
Statistic="Average",
|
||||
Threshold=2,
|
||||
ComparisonOperator="GreaterThanThreshold",
|
||||
)
|
||||
|
||||
# This is tested implicitly as if it doesnt work the rest will die
|
||||
client.set_alarm_state(
|
||||
AlarmName="testalarm1",
|
||||
StateValue="ALARM",
|
||||
StateReason="testreason",
|
||||
StateReasonData='{"some": "json_data"}',
|
||||
)
|
||||
|
||||
resp = client.describe_alarms(StateValue="ALARM")
|
||||
len(resp["MetricAlarms"]).should.equal(1)
|
||||
resp["MetricAlarms"][0]["AlarmName"].should.equal("testalarm1")
|
||||
resp["MetricAlarms"][0]["StateValue"].should.equal("ALARM")
|
||||
|
||||
resp = client.describe_alarms(StateValue="OK")
|
||||
len(resp["MetricAlarms"]).should.equal(1)
|
||||
resp["MetricAlarms"][0]["AlarmName"].should.equal("testalarm2")
|
||||
resp["MetricAlarms"][0]["StateValue"].should.equal("OK")
|
||||
|
||||
# Just for sanity
|
||||
resp = client.describe_alarms()
|
||||
len(resp["MetricAlarms"]).should.equal(2)
|
||||
|
||||
|
||||
@mock_cloudwatch
|
||||
def test_put_metric_data_no_dimensions():
|
||||
conn = boto3.client("cloudwatch", region_name="us-east-1")
|
||||
|
||||
conn.put_metric_data(
|
||||
Namespace="tester", MetricData=[dict(MetricName="metric", Value=1.5)]
|
||||
)
|
||||
|
||||
metrics = conn.list_metrics()["Metrics"]
|
||||
metrics.should.have.length_of(1)
|
||||
metric = metrics[0]
|
||||
metric["Namespace"].should.equal("tester")
|
||||
metric["MetricName"].should.equal("metric")
|
||||
|
||||
|
||||
@mock_cloudwatch
|
||||
def test_put_metric_data_with_statistics():
|
||||
conn = boto3.client("cloudwatch", region_name="us-east-1")
|
||||
utc_now = datetime.now(tz=pytz.utc)
|
||||
|
||||
conn.put_metric_data(
|
||||
Namespace="tester",
|
||||
MetricData=[
|
||||
dict(
|
||||
MetricName="statmetric",
|
||||
Timestamp=utc_now,
|
||||
# no Value to test https://github.com/spulec/moto/issues/1615
|
||||
StatisticValues=dict(
|
||||
SampleCount=123.0, Sum=123.0, Minimum=123.0, Maximum=123.0
|
||||
),
|
||||
Unit="Milliseconds",
|
||||
StorageResolution=123,
|
||||
)
|
||||
],
|
||||
)
|
||||
|
||||
metrics = conn.list_metrics()["Metrics"]
|
||||
metrics.should.have.length_of(1)
|
||||
metric = metrics[0]
|
||||
metric["Namespace"].should.equal("tester")
|
||||
metric["MetricName"].should.equal("statmetric")
|
||||
# TODO: test statistics - https://github.com/spulec/moto/issues/1615
|
||||
|
||||
|
||||
@mock_cloudwatch
|
||||
def test_get_metric_statistics():
|
||||
conn = boto3.client("cloudwatch", region_name="us-east-1")
|
||||
utc_now = datetime.now(tz=pytz.utc)
|
||||
|
||||
conn.put_metric_data(
|
||||
Namespace="tester",
|
||||
MetricData=[dict(MetricName="metric", Value=1.5, Timestamp=utc_now)],
|
||||
)
|
||||
|
||||
stats = conn.get_metric_statistics(
|
||||
Namespace="tester",
|
||||
MetricName="metric",
|
||||
StartTime=utc_now - timedelta(seconds=60),
|
||||
EndTime=utc_now + timedelta(seconds=60),
|
||||
Period=60,
|
||||
Statistics=["SampleCount", "Sum"],
|
||||
)
|
||||
|
||||
stats["Datapoints"].should.have.length_of(1)
|
||||
datapoint = stats["Datapoints"][0]
|
||||
datapoint["SampleCount"].should.equal(1.0)
|
||||
datapoint["Sum"].should.equal(1.5)
|
||||
|
||||
|
||||
@mock_cloudwatch
|
||||
def test_list_metrics():
|
||||
cloudwatch = boto3.client("cloudwatch", "eu-west-1")
|
||||
# Verify namespace has to exist
|
||||
res = cloudwatch.list_metrics(Namespace="unknown/")["Metrics"]
|
||||
res.should.be.empty
|
||||
# Create some metrics to filter on
|
||||
create_metrics(cloudwatch, namespace="list_test_1/", metrics=4, data_points=2)
|
||||
create_metrics(cloudwatch, namespace="list_test_2/", metrics=4, data_points=2)
|
||||
# Verify we can retrieve everything
|
||||
res = cloudwatch.list_metrics()["Metrics"]
|
||||
len(res).should.equal(16) # 2 namespaces * 4 metrics * 2 data points
|
||||
# Verify we can filter by namespace/metric name
|
||||
res = cloudwatch.list_metrics(Namespace="list_test_1/")["Metrics"]
|
||||
len(res).should.equal(8) # 1 namespace * 4 metrics * 2 data points
|
||||
res = cloudwatch.list_metrics(Namespace="list_test_1/", MetricName="metric1")[
|
||||
"Metrics"
|
||||
]
|
||||
len(res).should.equal(2) # 1 namespace * 1 metrics * 2 data points
|
||||
# Verify format
|
||||
res.should.equal(
|
||||
[
|
||||
{u"Namespace": "list_test_1/", u"Dimensions": [], u"MetricName": "metric1"},
|
||||
{u"Namespace": "list_test_1/", u"Dimensions": [], u"MetricName": "metric1"},
|
||||
]
|
||||
)
|
||||
# Verify unknown namespace still has no results
|
||||
res = cloudwatch.list_metrics(Namespace="unknown/")["Metrics"]
|
||||
res.should.be.empty
|
||||
|
||||
|
||||
@mock_cloudwatch
|
||||
def test_list_metrics_paginated():
|
||||
cloudwatch = boto3.client("cloudwatch", "eu-west-1")
|
||||
# Verify that only a single page of metrics is returned
|
||||
cloudwatch.list_metrics()["Metrics"].should.be.empty
|
||||
# Verify we can't pass a random NextToken
|
||||
with assert_raises(ClientError) as e:
|
||||
cloudwatch.list_metrics(NextToken=str(uuid4()))
|
||||
e.exception.response["Error"]["Message"].should.equal(
|
||||
"Request parameter NextToken is invalid"
|
||||
)
|
||||
# Add a boatload of metrics
|
||||
create_metrics(cloudwatch, namespace="test", metrics=100, data_points=1)
|
||||
# Verify that a single page is returned until we've reached 500
|
||||
first_page = cloudwatch.list_metrics()
|
||||
first_page["Metrics"].shouldnt.be.empty
|
||||
len(first_page["Metrics"]).should.equal(100)
|
||||
create_metrics(cloudwatch, namespace="test", metrics=200, data_points=2)
|
||||
first_page = cloudwatch.list_metrics()
|
||||
len(first_page["Metrics"]).should.equal(500)
|
||||
first_page.shouldnt.contain("NextToken")
|
||||
# Verify that adding more data points results in pagination
|
||||
create_metrics(cloudwatch, namespace="test", metrics=60, data_points=10)
|
||||
first_page = cloudwatch.list_metrics()
|
||||
len(first_page["Metrics"]).should.equal(500)
|
||||
first_page["NextToken"].shouldnt.be.empty
|
||||
# Retrieve second page - and verify there's more where that came from
|
||||
second_page = cloudwatch.list_metrics(NextToken=first_page["NextToken"])
|
||||
len(second_page["Metrics"]).should.equal(500)
|
||||
second_page.should.contain("NextToken")
|
||||
# Last page should only have the last 100 results, and no NextToken (indicating that pagination is finished)
|
||||
third_page = cloudwatch.list_metrics(NextToken=second_page["NextToken"])
|
||||
len(third_page["Metrics"]).should.equal(100)
|
||||
third_page.shouldnt.contain("NextToken")
|
||||
# Verify that we can't reuse an existing token
|
||||
with assert_raises(ClientError) as e:
|
||||
cloudwatch.list_metrics(NextToken=first_page["NextToken"])
|
||||
e.exception.response["Error"]["Message"].should.equal(
|
||||
"Request parameter NextToken is invalid"
|
||||
)
|
||||
|
||||
|
||||
def create_metrics(cloudwatch, namespace, metrics=5, data_points=5):
|
||||
for i in range(0, metrics):
|
||||
metric_name = "metric" + str(i)
|
||||
for j in range(0, data_points):
|
||||
cloudwatch.put_metric_data(
|
||||
Namespace=namespace,
|
||||
MetricData=[{"MetricName": metric_name, "Value": j, "Unit": "Seconds"}],
|
||||
)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue