straighten out filter logic

This commit is contained in:
Nick Stocchero 2020-08-06 17:18:57 -06:00
commit d8cea0213d
3 changed files with 252 additions and 70 deletions

View file

@ -3217,19 +3217,24 @@ def test_role_config_client():
result = config_client.list_discovered_resources(resourceType="AWS::IAM::Role")
assert not result["resourceIdentifiers"]
role_id = iam_client.create_role(
Path="/",
RoleName="mytestrole",
Description="mytestrole",
AssumeRolePolicyDocument=json.dumps("{ }"),
)["Role"]["RoleId"]
# Make 10 policies
roles = []
num_roles = 10
for ix in range(1, num_roles + 1):
this_policy = iam_client.create_role(
RoleName="role{}".format(ix),
Path="/",
Description="role{}".format(ix),
AssumeRolePolicyDocument=json.dumps("{ }"),
)
roles.append(
{
"id": this_policy["Role"]["RoleId"],
"name": this_policy["Role"]["RoleName"],
}
)
iam_client.create_role(
Path="/",
RoleName="mytestrole2",
Description="zmytestrole",
AssumeRolePolicyDocument=json.dumps("{ }"),
)
assert len(roles) == num_roles
# Test non-aggregated query: (everything is getting a random id, so we can't test names by ordering)
result = config_client.list_discovered_resources(
@ -3269,12 +3274,77 @@ def test_role_config_client():
!= first_agg_result
)
# Test non-aggregated resource name/id filter
assert (
config_client.list_discovered_resources(
resourceType="AWS::IAM::Role", resourceName=roles[1]["name"], limit=1,
)["resourceIdentifiers"][0]["resourceName"]
== roles[1]["name"]
)
assert (
config_client.list_discovered_resources(
resourceType="AWS::IAM::Role", resourceIds=[roles[0]["id"]], limit=1,
)["resourceIdentifiers"][0]["resourceName"]
== roles[0]["name"]
)
# Test aggregated resource name/id filter
assert (
config_client.list_aggregate_discovered_resources(
ConfigurationAggregatorName="test_aggregator",
ResourceType="AWS::IAM::Role",
Filters={"ResourceName": roles[5]["name"]},
Limit=1,
)["ResourceIdentifiers"][0]["ResourceName"]
== roles[5]["name"]
)
assert (
config_client.list_aggregate_discovered_resources(
ConfigurationAggregatorName="test_aggregator",
ResourceType="AWS::IAM::Role",
Filters={"ResourceId": roles[4]["id"]},
Limit=1,
)["ResourceIdentifiers"][0]["ResourceName"]
== roles[4]["name"]
)
# Test name/id filter with pagination
first_call = config_client.list_discovered_resources(
resourceType="AWS::IAM::Role",
resourceIds=[roles[1]["id"], roles[2]["id"]],
limit=1,
)
assert first_call["nextToken"] in [roles[1]["id"], roles[2]["id"]]
assert first_call["resourceIdentifiers"][0]["resourceName"] in [
roles[1]["name"],
roles[2]["name"],
]
second_call = config_client.list_discovered_resources(
resourceType="AWS::IAM::Role",
resourceIds=[roles[1]["id"], roles[2]["id"]],
limit=1,
nextToken=first_call["nextToken"],
)
assert "nextToken" not in second_call
assert first_call["resourceIdentifiers"][0]["resourceName"] in [
roles[1]["name"],
roles[2]["name"],
]
assert (
first_call["resourceIdentifiers"][0]["resourceName"]
!= second_call["resourceIdentifiers"][0]["resourceName"]
)
# Test non-aggregated batch get
assert (
config_client.batch_get_resource_config(
resourceKeys=[{"resourceType": "AWS::IAM::Role", "resourceId": role_id}]
resourceKeys=[
{"resourceType": "AWS::IAM::Role", "resourceId": roles[0]["id"]}
]
)["baseConfigurationItems"][0]["resourceName"]
== "mytestrole"
== roles[0]["name"]
)
# Test aggregated batch get
@ -3285,12 +3355,12 @@ def test_role_config_client():
{
"SourceAccountId": ACCOUNT_ID,
"SourceRegion": "global",
"ResourceId": role_id,
"ResourceId": roles[1]["id"],
"ResourceType": "AWS::IAM::Role",
}
],
)["BaseConfigurationItems"][0]["resourceName"]
== "mytestrole"
== roles[1]["name"]
)
@ -3312,7 +3382,7 @@ def test_policy_list_config_discovered_resources():
],
}
# Create a role
# Create a policy
policy_config_query.backends["global"].create_policy(
description="mypolicy",
path="",
@ -3320,6 +3390,12 @@ def test_policy_list_config_discovered_resources():
policy_name="mypolicy",
)
# We expect the backend to have arns as their keys
for backend_key in list(
policy_config_query.backends["global"].managed_policies.keys()
):
assert backend_key.startswith("arn:aws:iam::")
result = policy_config_query.list_config_service_resources(None, None, 100, None)[0]
assert len(result) == 1
@ -3465,20 +3541,24 @@ def test_policy_config_client():
result = config_client.list_discovered_resources(resourceType="AWS::IAM::Policy")
assert not result["resourceIdentifiers"]
policy_id = iam_client.create_policy(
PolicyName="mypolicy",
Path="/",
PolicyDocument=json.dumps(basic_policy),
Description="mypolicy",
)["Policy"]["PolicyId"]
# Make 10 policies
policies = []
num_policies = 10
for ix in range(1, num_policies + 1):
this_policy = iam_client.create_policy(
PolicyName="policy{}".format(ix),
Path="/",
PolicyDocument=json.dumps(basic_policy),
Description="policy{}".format(ix),
)
policies.append(
{
"id": this_policy["Policy"]["PolicyId"],
"name": this_policy["Policy"]["PolicyName"],
}
)
# second policy
iam_client.create_policy(
PolicyName="zmypolicy",
Path="/",
PolicyDocument=json.dumps(basic_policy),
Description="zmypolicy",
)
assert len(policies) == num_policies
# Test non-aggregated query: (everything is getting a random id, so we can't test names by ordering)
result = config_client.list_discovered_resources(
@ -3518,12 +3598,77 @@ def test_policy_config_client():
!= first_agg_result
)
# Test non-aggregated resource name/id filter
assert (
config_client.list_discovered_resources(
resourceType="AWS::IAM::Policy", resourceName=policies[1]["name"], limit=1,
)["resourceIdentifiers"][0]["resourceName"]
== policies[1]["name"]
)
assert (
config_client.list_discovered_resources(
resourceType="AWS::IAM::Policy", resourceIds=[policies[0]["id"]], limit=1,
)["resourceIdentifiers"][0]["resourceName"]
== policies[0]["name"]
)
# Test aggregated resource name/id filter
assert (
config_client.list_aggregate_discovered_resources(
ConfigurationAggregatorName="test_aggregator",
ResourceType="AWS::IAM::Policy",
Filters={"ResourceName": policies[5]["name"]},
Limit=1,
)["ResourceIdentifiers"][0]["ResourceName"]
== policies[5]["name"]
)
assert (
config_client.list_aggregate_discovered_resources(
ConfigurationAggregatorName="test_aggregator",
ResourceType="AWS::IAM::Policy",
Filters={"ResourceId": policies[4]["id"]},
Limit=1,
)["ResourceIdentifiers"][0]["ResourceName"]
== policies[4]["name"]
)
# Test name/id filter with pagination
first_call = config_client.list_discovered_resources(
resourceType="AWS::IAM::Policy",
resourceIds=[policies[1]["id"], policies[2]["id"]],
limit=1,
)
assert first_call["nextToken"] in [policies[1]["id"], policies[2]["id"]]
assert first_call["resourceIdentifiers"][0]["resourceName"] in [
policies[1]["name"],
policies[2]["name"],
]
second_call = config_client.list_discovered_resources(
resourceType="AWS::IAM::Policy",
resourceIds=[policies[1]["id"], policies[2]["id"]],
limit=1,
nextToken=first_call["nextToken"],
)
assert "nextToken" not in second_call
assert first_call["resourceIdentifiers"][0]["resourceName"] in [
policies[1]["name"],
policies[2]["name"],
]
assert (
first_call["resourceIdentifiers"][0]["resourceName"]
!= second_call["resourceIdentifiers"][0]["resourceName"]
)
# Test non-aggregated batch get
assert (
config_client.batch_get_resource_config(
resourceKeys=[{"resourceType": "AWS::IAM::Policy", "resourceId": policy_id}]
resourceKeys=[
{"resourceType": "AWS::IAM::Policy", "resourceId": policies[7]["id"]}
]
)["baseConfigurationItems"][0]["resourceName"]
== "mypolicy"
== policies[7]["name"]
)
# Test aggregated batch get
@ -3534,10 +3679,10 @@ def test_policy_config_client():
{
"SourceAccountId": ACCOUNT_ID,
"SourceRegion": "global",
"ResourceId": policy_id,
"ResourceId": policies[8]["id"],
"ResourceType": "AWS::IAM::Policy",
}
],
)["BaseConfigurationItems"][0]["resourceName"]
== "mypolicy"
== policies[8]["name"]
)