Add support for RDS resource filtering (#3669)

* Add support for RDS resource filtering

* Extensive testing was performed against real AWS endpoints in order to
  nail down the filter behavior under various scenarios, ensuring that
  `moto` returns the proper response or error.
* Full test coverage of all utility functions as well as several
  filter/parameter combinations.

* Split up filter tests, per PR feedback

* Remove unused import

* Fix pytest teardown failure on Python 2.7
This commit is contained in:
Brian Pandola 2021-02-10 01:06:54 -08:00 committed by GitHub
commit 8557fb8439
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 788 additions and 23 deletions

View file

@ -0,0 +1,359 @@
from __future__ import unicode_literals
import boto3
import pytest
import sure # noqa
from botocore.exceptions import ClientError
from moto import mock_rds2
class TestDBInstanceFilters(object):
mock_rds = mock_rds2()
@classmethod
def setup_class(cls):
cls.mock_rds.start()
client = boto3.client("rds", region_name="us-west-2")
for i in range(10):
identifier = "db-instance-{}".format(i)
engine = "postgres" if (i % 3) else "mysql"
client.create_db_instance(
DBInstanceIdentifier=identifier,
Engine=engine,
DBInstanceClass="db.m1.small",
)
cls.client = client
@classmethod
def teardown_class(cls):
cls.mock_rds.stop()
def test_invalid_filter_name_raises_error(self):
with pytest.raises(ClientError) as ex:
self.client.describe_db_instances(
Filters=[{"Name": "invalid-filter-name", "Values": []}]
)
ex.value.response["Error"]["Code"].should.equal("InvalidParameterValue")
ex.value.response["Error"]["Message"].should.equal(
"Unrecognized filter name: invalid-filter-name"
)
def test_empty_filter_values_raises_error(self):
with pytest.raises(ClientError) as ex:
self.client.describe_db_instances(
Filters=[{"Name": "db-instance-id", "Values": []}]
)
ex.value.response["Error"]["Code"].should.equal("InvalidParameterCombination")
ex.value.response["Error"]["Message"].should.contain("must not be empty")
def test_db_instance_id_filter(self):
resp = self.client.describe_db_instances()
db_instance_identifier = resp["DBInstances"][0]["DBInstanceIdentifier"]
db_instances = self.client.describe_db_instances(
Filters=[{"Name": "db-instance-id", "Values": [db_instance_identifier]}]
).get("DBInstances")
db_instances.should.have.length_of(1)
db_instances[0]["DBInstanceIdentifier"].should.equal(db_instance_identifier)
def test_db_instance_id_filter_works_with_arns(self):
resp = self.client.describe_db_instances()
db_instance_arn = resp["DBInstances"][0]["DBInstanceArn"]
db_instances = self.client.describe_db_instances(
Filters=[{"Name": "db-instance-id", "Values": [db_instance_arn]}]
).get("DBInstances")
db_instances.should.have.length_of(1)
db_instances[0]["DBInstanceArn"].should.equal(db_instance_arn)
def test_dbi_resource_id_filter(self):
resp = self.client.describe_db_instances()
dbi_resource_identifier = resp["DBInstances"][0]["DbiResourceId"]
db_instances = self.client.describe_db_instances(
Filters=[{"Name": "dbi-resource-id", "Values": [dbi_resource_identifier]}]
).get("DBInstances")
for db_instance in db_instances:
db_instance["DbiResourceId"].should.equal(dbi_resource_identifier)
def test_engine_filter(self):
db_instances = self.client.describe_db_instances(
Filters=[{"Name": "engine", "Values": ["postgres"]}]
).get("DBInstances")
for db_instance in db_instances:
db_instance["Engine"].should.equal("postgres")
db_instances = self.client.describe_db_instances(
Filters=[{"Name": "engine", "Values": ["oracle"]}]
).get("DBInstances")
db_instances.should.have.length_of(0)
def test_multiple_filters(self):
resp = self.client.describe_db_instances(
Filters=[
{
"Name": "db-instance-id",
"Values": ["db-instance-0", "db-instance-1", "db-instance-3"],
},
{"Name": "engine", "Values": ["mysql", "oracle"]},
]
)
returned_identifiers = [
db["DBInstanceIdentifier"] for db in resp["DBInstances"]
]
returned_identifiers.should.have.length_of(2)
"db-instance-0".should.be.within(returned_identifiers)
"db-instance-3".should.be.within(returned_identifiers)
def test_invalid_db_instance_identifier_with_exclusive_filter(self):
# Passing a non-existent DBInstanceIdentifier will not raise an error
# if the resulting filter matches other resources.
resp = self.client.describe_db_instances(
DBInstanceIdentifier="non-existent",
Filters=[{"Name": "db-instance-id", "Values": ["db-instance-1"]}],
)
resp["DBInstances"].should.have.length_of(1)
resp["DBInstances"][0]["DBInstanceIdentifier"].should.equal("db-instance-1")
def test_invalid_db_instance_identifier_with_non_matching_filter(self):
# Passing a non-existent DBInstanceIdentifier will raise an error if
# the resulting filter does not match any resources.
with pytest.raises(ClientError) as ex:
self.client.describe_db_instances(
DBInstanceIdentifier="non-existent",
Filters=[{"Name": "engine", "Values": ["mysql"]}],
)
ex.value.response["Error"]["Code"].should.equal("DBInstanceNotFound")
ex.value.response["Error"]["Message"].should.equal(
"Database non-existent not found."
)
def test_valid_db_instance_identifier_with_exclusive_filter(self):
# Passing a valid DBInstanceIdentifier with a filter it does not match
# but does match other resources will return those other resources.
resp = self.client.describe_db_instances(
DBInstanceIdentifier="db-instance-0",
Filters=[
{"Name": "db-instance-id", "Values": ["db-instance-1"]},
{"Name": "engine", "Values": ["postgres"]},
],
)
returned_identifiers = [
db["DBInstanceIdentifier"] for db in resp["DBInstances"]
]
"db-instance-0".should_not.be.within(returned_identifiers)
"db-instance-1".should.be.within(returned_identifiers)
def test_valid_db_instance_identifier_with_inclusive_filter(self):
# Passing a valid DBInstanceIdentifier with a filter it matches but also
# matches other resources will return all matching resources.
resp = self.client.describe_db_instances(
DBInstanceIdentifier="db-instance-0",
Filters=[
{"Name": "db-instance-id", "Values": ["db-instance-1"]},
{"Name": "engine", "Values": ["mysql", "postgres"]},
],
)
returned_identifiers = [
db["DBInstanceIdentifier"] for db in resp["DBInstances"]
]
"db-instance-0".should.be.within(returned_identifiers)
"db-instance-1".should.be.within(returned_identifiers)
def test_valid_db_instance_identifier_with_non_matching_filter(self):
# Passing a valid DBInstanceIdentifier will raise an error if the
# resulting filter does not match any resources.
with pytest.raises(ClientError) as ex:
self.client.describe_db_instances(
DBInstanceIdentifier="db-instance-0",
Filters=[{"Name": "engine", "Values": ["postgres"]}],
)
ex.value.response["Error"]["Code"].should.equal("DBInstanceNotFound")
ex.value.response["Error"]["Message"].should.equal(
"Database db-instance-0 not found."
)
class TestDBSnapshotFilters(object):
mock_rds = mock_rds2()
@classmethod
def setup_class(cls):
cls.mock_rds.start()
client = boto3.client("rds", region_name="us-west-2")
# We'll set up two instances (one postgres, one mysql)
# with two snapshots each.
for i in range(2):
identifier = "db-instance-{}".format(i)
engine = "postgres" if i else "mysql"
client.create_db_instance(
DBInstanceIdentifier=identifier,
Engine=engine,
DBInstanceClass="db.m1.small",
)
for j in range(2):
client.create_db_snapshot(
DBInstanceIdentifier=identifier,
DBSnapshotIdentifier="{}-snapshot-{}".format(identifier, j),
)
cls.client = client
@classmethod
def teardown_class(cls):
cls.mock_rds.stop()
def test_invalid_filter_name_raises_error(self):
with pytest.raises(ClientError) as ex:
self.client.describe_db_snapshots(
Filters=[{"Name": "invalid-filter-name", "Values": []}]
)
ex.value.response["Error"]["Code"].should.equal("InvalidParameterValue")
ex.value.response["Error"]["Message"].should.equal(
"Unrecognized filter name: invalid-filter-name"
)
def test_empty_filter_values_raises_error(self):
with pytest.raises(ClientError) as ex:
self.client.describe_db_snapshots(
Filters=[{"Name": "db-snapshot-id", "Values": []}]
)
ex.value.response["Error"]["Code"].should.equal("InvalidParameterCombination")
ex.value.response["Error"]["Message"].should.contain("must not be empty")
def test_db_snapshot_id_filter(self):
snapshots = self.client.describe_db_snapshots(
Filters=[{"Name": "db-snapshot-id", "Values": ["db-instance-1-snapshot-0"]}]
).get("DBSnapshots")
snapshots.should.have.length_of(1)
snapshots[0]["DBSnapshotIdentifier"].should.equal("db-instance-1-snapshot-0")
def test_db_instance_id_filter(self):
resp = self.client.describe_db_instances()
db_instance_identifier = resp["DBInstances"][0]["DBInstanceIdentifier"]
snapshots = self.client.describe_db_snapshots(
Filters=[{"Name": "db-instance-id", "Values": [db_instance_identifier]}]
).get("DBSnapshots")
for snapshot in snapshots:
snapshot["DBInstanceIdentifier"].should.equal(db_instance_identifier)
def test_db_instance_id_filter_works_with_arns(self):
resp = self.client.describe_db_instances()
db_instance_identifier = resp["DBInstances"][0]["DBInstanceIdentifier"]
db_instance_arn = resp["DBInstances"][0]["DBInstanceArn"]
snapshots = self.client.describe_db_snapshots(
Filters=[{"Name": "db-instance-id", "Values": [db_instance_arn]}]
).get("DBSnapshots")
for snapshot in snapshots:
snapshot["DBInstanceIdentifier"].should.equal(db_instance_identifier)
def test_dbi_resource_id_filter(self):
resp = self.client.describe_db_instances()
dbi_resource_identifier = resp["DBInstances"][0]["DbiResourceId"]
snapshots = self.client.describe_db_snapshots(
Filters=[{"Name": "dbi-resource-id", "Values": [dbi_resource_identifier]}]
).get("DBSnapshots")
for snapshot in snapshots:
snapshot["DbiResourceId"].should.equal(dbi_resource_identifier)
def test_engine_filter(self):
snapshots = self.client.describe_db_snapshots(
Filters=[{"Name": "engine", "Values": ["postgres"]}]
).get("DBSnapshots")
for snapshot in snapshots:
snapshot["Engine"].should.equal("postgres")
snapshots = self.client.describe_db_snapshots(
Filters=[{"Name": "engine", "Values": ["oracle"]}]
).get("DBSnapshots")
snapshots.should.have.length_of(0)
def test_multiple_filters(self):
snapshots = self.client.describe_db_snapshots(
Filters=[
{"Name": "db-snapshot-id", "Values": ["db-instance-0-snapshot-1"]},
{
"Name": "db-instance-id",
"Values": ["db-instance-1", "db-instance-0"],
},
{"Name": "engine", "Values": ["mysql"]},
]
).get("DBSnapshots")
snapshots.should.have.length_of(1)
snapshots[0]["DBSnapshotIdentifier"].should.equal("db-instance-0-snapshot-1")
def test_invalid_snapshot_id_with_db_instance_id_and_filter(self):
# Passing a non-existent DBSnapshotIdentifier will return an empty list
# if DBInstanceIdentifier is also passed in.
resp = self.client.describe_db_snapshots(
DBSnapshotIdentifier="non-existent",
DBInstanceIdentifier="a-db-instance-identifier",
Filters=[{"Name": "db-instance-id", "Values": ["db-instance-1"]}],
)
resp["DBSnapshots"].should.have.length_of(0)
def test_invalid_snapshot_id_with_non_matching_filter(self):
# Passing a non-existent DBSnapshotIdentifier will raise an error if
# the resulting filter does not match any resources.
with pytest.raises(ClientError) as ex:
self.client.describe_db_snapshots(
DBSnapshotIdentifier="non-existent",
Filters=[{"Name": "engine", "Values": ["oracle"]}],
)
ex.value.response["Error"]["Code"].should.equal("DBSnapshotNotFound")
ex.value.response["Error"]["Message"].should.equal(
"DBSnapshot non-existent not found."
)
def test_valid_snapshot_id_with_exclusive_filter(self):
# Passing a valid DBSnapshotIdentifier with a filter it does not match
# but does match other resources will return those other resources.
resp = self.client.describe_db_snapshots(
DBSnapshotIdentifier="db-instance-0-snapshot-0",
Filters=[
{"Name": "db-snapshot-id", "Values": ["db-instance-1-snapshot-1"]},
{"Name": "db-instance-id", "Values": ["db-instance-1"]},
{"Name": "engine", "Values": ["postgres"]},
],
)
resp["DBSnapshots"].should.have.length_of(1)
resp["DBSnapshots"][0]["DBSnapshotIdentifier"].should.equal(
"db-instance-1-snapshot-1"
)
def test_valid_snapshot_id_with_inclusive_filter(self):
# Passing a valid DBSnapshotIdentifier with a filter it matches but also
# matches other resources will return all matching resources.
snapshots = self.client.describe_db_snapshots(
DBSnapshotIdentifier="db-instance-0-snapshot-0",
Filters=[
{"Name": "db-snapshot-id", "Values": ["db-instance-1-snapshot-1"]},
{
"Name": "db-instance-id",
"Values": ["db-instance-1", "db-instance-0"],
},
{"Name": "engine", "Values": ["mysql", "postgres"]},
],
).get("DBSnapshots")
returned_identifiers = [ss["DBSnapshotIdentifier"] for ss in snapshots]
returned_identifiers.should.have.length_of(2)
"db-instance-0-snapshot-0".should.be.within(returned_identifiers)
"db-instance-1-snapshot-1".should.be.within(returned_identifiers)
def test_valid_snapshot_id_with_non_matching_filter(self):
# Passing a valid DBSnapshotIdentifier will raise an error if the
# resulting filter does not match any resources.
with pytest.raises(ClientError) as ex:
self.client.describe_db_snapshots(
DBSnapshotIdentifier="db-instance-0-snapshot-0",
Filters=[{"Name": "engine", "Values": ["postgres"]}],
)
ex.value.response["Error"]["Code"].should.equal("DBSnapshotNotFound")
ex.value.response["Error"]["Message"].should.equal(
"DBSnapshot db-instance-0-snapshot-0 not found."
)

View file

@ -0,0 +1,174 @@
from __future__ import unicode_literals
import pytest
from moto.rds2.utils import (
FilterDef,
apply_filter,
merge_filters,
filters_from_querystring,
validate_filters,
)
class TestFilterValidation(object):
@classmethod
def setup_class(cls):
cls.filter_defs = {
"not-implemented": FilterDef(None, ""),
"identifier": FilterDef(["identifier"], "Object Identifiers"),
}
def test_unrecognized_filter_raises_exception(self):
filters = {"invalid-filter-name": ["test-value"]}
with pytest.raises(KeyError) as ex:
validate_filters(filters, self.filter_defs)
assert "Unrecognized filter name: invalid-filter-name" in str(ex)
def test_empty_filter_values_raises_exception(self):
filters = {"identifier": []}
with pytest.raises(ValueError) as ex:
validate_filters(filters, self.filter_defs)
assert "Object Identifiers must not be empty" in str(ex)
def test_unimplemented_filter_raises_exception(self):
filters = {"not-implemented": ["test-value"]}
with pytest.raises(NotImplementedError):
validate_filters(filters, self.filter_defs)
class Resource(object):
def __init__(self, identifier, **kwargs):
self.identifier = identifier
self.__dict__.update(kwargs)
class TestResourceFiltering(object):
@classmethod
def setup_class(cls):
cls.filter_defs = {
"identifier": FilterDef(["identifier"], "Object Identifiers"),
"nested-resource": FilterDef(["nested.identifier"], "Nested Identifiers"),
"common-attr": FilterDef(["common_attr"], ""),
"multiple-attrs": FilterDef(["common_attr", "uncommon_attr"], ""),
}
cls.resources = {
"identifier-0": Resource("identifier-0"),
"identifier-1": Resource("identifier-1", common_attr="common"),
"identifier-2": Resource("identifier-2"),
"identifier-3": Resource("identifier-3", nested=Resource("nested-id-1")),
"identifier-4": Resource("identifier-4", common_attr="common"),
"identifier-5": Resource("identifier-5", uncommon_attr="common"),
}
def test_filtering_on_nested_attribute(self):
filters = {"nested-resource": ["nested-id-1"]}
filtered_resources = apply_filter(self.resources, filters, self.filter_defs)
filtered_resources.should.have.length_of(1)
filtered_resources.should.have.key("identifier-3")
def test_filtering_on_common_attribute(self):
filters = {"common-attr": ["common"]}
filtered_resources = apply_filter(self.resources, filters, self.filter_defs)
filtered_resources.should.have.length_of(2)
filtered_resources.should.have.key("identifier-1")
filtered_resources.should.have.key("identifier-4")
def test_filtering_on_multiple_attributes(self):
filters = {"multiple-attrs": ["common"]}
filtered_resources = apply_filter(self.resources, filters, self.filter_defs)
filtered_resources.should.have.length_of(3)
filtered_resources.should.have.key("identifier-1")
filtered_resources.should.have.key("identifier-4")
filtered_resources.should.have.key("identifier-5")
def test_filters_with_multiple_values(self):
filters = {"identifier": ["identifier-0", "identifier-3", "identifier-5"]}
filtered_resources = apply_filter(self.resources, filters, self.filter_defs)
filtered_resources.should.have.length_of(3)
filtered_resources.should.have.key("identifier-0")
filtered_resources.should.have.key("identifier-3")
filtered_resources.should.have.key("identifier-5")
def test_multiple_filters(self):
filters = {
"identifier": ["identifier-1", "identifier-3", "identifier-5"],
"common-attr": ["common"],
"multiple-attrs": ["common"],
}
filtered_resources = apply_filter(self.resources, filters, self.filter_defs)
filtered_resources.should.have.length_of(1)
filtered_resources.should.have.key("identifier-1")
class TestMergingFilters(object):
def test_when_filters_to_update_is_none(self):
filters_to_update = {"filter-name": ["value1"]}
merged = merge_filters(filters_to_update, None)
assert merged == filters_to_update
def test_when_filters_to_merge_is_none(self):
filters_to_merge = {"filter-name": ["value1"]}
merged = merge_filters(None, filters_to_merge)
assert merged == filters_to_merge
def test_when_both_filters_are_none(self):
merged = merge_filters(None, None)
assert merged == {}
def test_values_are_merged(self):
filters_to_update = {"filter-name": ["value1"]}
filters_to_merge = {"filter-name": ["value2"]}
merged = merge_filters(filters_to_update, filters_to_merge)
assert merged == {"filter-name": ["value1", "value2"]}
def test_complex_merge(self):
filters_to_update = {
"filter-name-1": ["value1"],
"filter-name-2": ["value1", "value2"],
"filter-name-3": ["value1"],
}
filters_to_merge = {
"filter-name-1": ["value2"],
"filter-name-3": ["value2"],
"filter-name-4": ["value1", "value2"],
}
merged = merge_filters(filters_to_update, filters_to_merge)
assert len(merged.keys()) == 4
for key in merged.keys():
assert merged[key] == ["value1", "value2"]
class TestParsingFiltersFromQuerystring(object):
def test_parse_empty_list(self):
# The AWS query protocol serializes empty lists as an empty string.
querystring = {
"Filters.Filter.1.Name": ["empty-filter"],
"Filters.Filter.1.Value.1": [""],
}
filters = filters_from_querystring(querystring)
assert filters == {"empty-filter": []}
def test_multiple_values(self):
querystring = {
"Filters.Filter.1.Name": ["multi-value"],
"Filters.Filter.1.Value.1": ["value1"],
"Filters.Filter.1.Value.2": ["value2"],
}
filters = filters_from_querystring(querystring)
values = filters["multi-value"]
assert len(values) == 2
assert "value1" in values
assert "value2" in values
def test_multiple_filters(self):
querystring = {
"Filters.Filter.1.Name": ["filter-1"],
"Filters.Filter.1.Value.1": ["value1"],
"Filters.Filter.2.Name": ["filter-2"],
"Filters.Filter.2.Value.1": ["value2"],
}
filters = filters_from_querystring(querystring)
assert len(filters.keys()) == 2
assert filters["filter-1"] == ["value1"]
assert filters["filter-2"] == ["value2"]