Initial EKS Implementaion (#3981)

* Implemented EKS list_clusters

* Implemented EKS create_cluster

* Implemented EKS describe_cluster

* Implemented EKS delete_cluster

* Implemented EKS list_nodegroups

* Implemented EKS create_nodegroup

* Implemented EKS describe_nodegroup

* Implemented EKS delete_nodegroup

* Implemented EKS Server Tests

* EKS - rework tests to use decorator everywhere

Co-authored-by: Bert Blommers <info@bertblommers.nl>
This commit is contained in:
D. Ferruzzi 2021-06-20 03:34:31 -07:00 committed by GitHub
commit 61e2204941
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 2492 additions and 0 deletions

View file

828
tests/test_eks/test_eks.py Normal file
View file

@ -0,0 +1,828 @@
from __future__ import unicode_literals
from copy import deepcopy
from unittest import SkipTest
import boto3
import mock
import pytest
import sure # noqa
from botocore.exceptions import ClientError
from freezegun import freeze_time
from moto import mock_eks, settings
from moto.core import ACCOUNT_ID
from moto.core.utils import iso_8601_datetime_without_milliseconds
from moto.eks.exceptions import (
InvalidParameterException,
InvalidRequestException,
ResourceInUseException,
ResourceNotFoundException,
)
from moto.eks.models import (
CLUSTER_EXISTS_MSG,
CLUSTER_IN_USE_MSG,
CLUSTER_NOT_FOUND_MSG,
CLUSTER_NOT_READY_MSG,
LAUNCH_TEMPLATE_WITH_DISK_SIZE_MSG,
LAUNCH_TEMPLATE_WITH_REMOTE_ACCESS_MSG,
NODEGROUP_EXISTS_MSG,
NODEGROUP_NOT_FOUND_MSG,
)
from moto.eks.responses import DEFAULT_MAX_RESULTS
from moto.utilities.utils import random_string
from .test_eks_constants import (
BatchCountSize,
ClusterAttributes,
ClusterInputs,
DISK_SIZE,
ErrorAttributes,
FROZEN_TIME,
INSTANCE_TYPES,
LAUNCH_TEMPLATE,
NodegroupAttributes,
NodegroupInputs,
PageCount,
PARTITIONS,
PossibleTestResults,
RegExTemplates,
REGION,
REMOTE_ACCESS,
ResponseAttributes,
SERVICE,
)
from .test_eks_utils import (
attributes_to_test,
generate_clusters,
generate_nodegroups,
is_valid_uri,
random_names,
region_matches_partition,
)
@pytest.fixture(scope="function")
def ClusterBuilder():
class ClusterTestDataFactory:
def __init__(self, client, count, minimal):
# Generate 'count' number of random Cluster objects.
self.cluster_names = generate_clusters(client, count, minimal)
# Get the name of the first generated Cluster.
first_name = self.cluster_names[0]
# Collect the output of describe_cluster() for the first Cluster.
self.cluster_describe_output = client.describe_cluster(name=first_name)[
ResponseAttributes.CLUSTER
]
# Pick a random Cluster name from the list and a name guaranteed not to be on the list.
(self.existing_cluster_name, self.nonexistent_cluster_name) = random_names(
self.cluster_names
)
# Generate a list of the Cluster attributes to be tested when validating results.
self.attributes_to_test = attributes_to_test(
ClusterInputs, self.existing_cluster_name
)
def _execute(count=1, minimal=True):
client = boto3.client(SERVICE, region_name=REGION)
return client, ClusterTestDataFactory(client, count, minimal)
yield _execute
@pytest.fixture(scope="function")
def NodegroupBuilder(ClusterBuilder):
class NodegroupTestDataFactory:
def __init__(self, client, cluster, count, minimal):
self.cluster_name = cluster.existing_cluster_name
# Generate 'count' number of random Nodegroup objects.
self.nodegroup_names = generate_nodegroups(
client, self.cluster_name, count, minimal
)
# Get the name of the first generated Nodegroup.
first_name = self.nodegroup_names[0]
# Collect the output of describe_nodegroup() for the first Nodegroup.
self.nodegroup_describe_output = client.describe_nodegroup(
clusterName=self.cluster_name, nodegroupName=first_name
)[ResponseAttributes.NODEGROUP]
# Pick a random Nodegroup name from the list and a name guaranteed not to be on the list.
(
self.existing_nodegroup_name,
self.nonexistent_nodegroup_name,
) = random_names(self.nodegroup_names)
_, self.nonexistent_cluster_name = random_names(self.cluster_name)
# Generate a list of the Nodegroup attributes to be tested when validating results.
self.attributes_to_test = attributes_to_test(
NodegroupInputs, self.existing_nodegroup_name
)
def _execute(count=1, minimal=True):
client, cluster = ClusterBuilder()
return client, NodegroupTestDataFactory(client, cluster, count, minimal)
return _execute
###
# This specific test does not use the fixture since
# it is intended to verify that there are no clusters
# in the list at initialization, which means the mock
# decorator must be used manually in this one case.
###
@mock_eks
def test_list_clusters_returns_empty_by_default():
client = boto3.client(SERVICE, region_name=REGION)
result = client.list_clusters()[ResponseAttributes.CLUSTERS]
result.should.be.empty
@mock_eks
def test_list_clusters_returns_sorted_cluster_names(ClusterBuilder):
client, generated_test_data = ClusterBuilder(BatchCountSize.SMALL)
expected_result = sorted(generated_test_data.cluster_names)
result = client.list_clusters()[ResponseAttributes.CLUSTERS]
assert_result_matches_expected_list(result, expected_result, BatchCountSize.SMALL)
@mock_eks
def test_list_clusters_returns_default_max_results(ClusterBuilder):
client, generated_test_data = ClusterBuilder(BatchCountSize.LARGE)
expected_len = DEFAULT_MAX_RESULTS
expected_result = (sorted(generated_test_data.cluster_names))[:expected_len]
result = client.list_clusters()[ResponseAttributes.CLUSTERS]
assert_result_matches_expected_list(result, expected_result, expected_len)
@mock_eks
def test_list_clusters_returns_custom_max_results(ClusterBuilder):
client, generated_test_data = ClusterBuilder(BatchCountSize.MEDIUM)
expected_len = PageCount.LARGE
expected_result = (sorted(generated_test_data.cluster_names))[:expected_len]
result = client.list_clusters(maxResults=expected_len)[ResponseAttributes.CLUSTERS]
assert_result_matches_expected_list(result, expected_result, expected_len)
@mock_eks
def test_list_clusters_returns_second_page_results(ClusterBuilder):
client, generated_test_data = ClusterBuilder(BatchCountSize.MEDIUM)
page1_len = PageCount.LARGE
expected_len = BatchCountSize.MEDIUM - page1_len
expected_result = (sorted(generated_test_data.cluster_names))[page1_len:]
token = client.list_clusters(maxResults=page1_len)[ResponseAttributes.NEXT_TOKEN]
result = client.list_clusters(nextToken=token)[ResponseAttributes.CLUSTERS]
assert_result_matches_expected_list(result, expected_result, expected_len)
@mock_eks
def test_list_clusters_returns_custom_second_page_results(ClusterBuilder):
client, generated_test_data = ClusterBuilder(BatchCountSize.MEDIUM)
page1_len = PageCount.LARGE
expected_len = PageCount.SMALL
expected_result = (sorted(generated_test_data.cluster_names))[
page1_len : page1_len + expected_len
]
token = client.list_clusters(maxResults=page1_len)[ResponseAttributes.NEXT_TOKEN]
result = client.list_clusters(maxResults=expected_len, nextToken=token)[
ResponseAttributes.CLUSTERS
]
assert_result_matches_expected_list(result, expected_result, expected_len)
@mock_eks
def test_create_cluster_throws_exception_when_cluster_exists(ClusterBuilder):
client, generated_test_data = ClusterBuilder(BatchCountSize.SMALL)
expected_exception = ResourceInUseException
expected_msg = CLUSTER_EXISTS_MSG.format(
clusterName=generated_test_data.existing_cluster_name,
)
with pytest.raises(ClientError) as raised_exception:
client.create_cluster(
name=generated_test_data.existing_cluster_name,
**dict(ClusterInputs.REQUIRED)
)
count_clusters_after_test = len(client.list_clusters()[ResponseAttributes.CLUSTERS])
count_clusters_after_test.should.equal(BatchCountSize.SMALL)
assert_expected_exception(raised_exception, expected_exception, expected_msg)
@mock_eks
def test_create_cluster_generates_valid_cluster_arn(ClusterBuilder):
_, generated_test_data = ClusterBuilder()
expected_arn_values = [
PARTITIONS,
REGION,
ACCOUNT_ID,
generated_test_data.cluster_names,
]
all_arn_values_should_be_valid(
expected_arn_values=expected_arn_values,
pattern=RegExTemplates.CLUSTER_ARN,
arn_under_test=generated_test_data.cluster_describe_output[
ClusterAttributes.ARN
],
)
@freeze_time(FROZEN_TIME)
@mock_eks
def test_create_cluster_generates_valid_cluster_created_timestamp(ClusterBuilder):
_, generated_test_data = ClusterBuilder()
result_time = iso_8601_datetime_without_milliseconds(
generated_test_data.cluster_describe_output[ClusterAttributes.CREATED_AT]
)
if settings.TEST_SERVER_MODE:
RegExTemplates.ISO8601_FORMAT.match(result_time).should.be.true
else:
result_time.should.equal(FROZEN_TIME)
@mock_eks
def test_create_cluster_generates_valid_cluster_endpoint(ClusterBuilder):
_, generated_test_data = ClusterBuilder()
result_endpoint = generated_test_data.cluster_describe_output[
ClusterAttributes.ENDPOINT
]
is_valid_uri(result_endpoint).should.be.true
result_endpoint.should.contain(REGION)
@mock_eks
def test_create_cluster_generates_valid_oidc_identity(ClusterBuilder):
_, generated_test_data = ClusterBuilder()
result_issuer = generated_test_data.cluster_describe_output[
ClusterAttributes.IDENTITY
][ClusterAttributes.OIDC][ClusterAttributes.ISSUER]
is_valid_uri(result_issuer).should.be.true
result_issuer.should.contain(REGION)
@mock_eks
def test_create_cluster_saves_provided_parameters(ClusterBuilder):
_, generated_test_data = ClusterBuilder(minimal=False)
for key, expected_value in generated_test_data.attributes_to_test:
generated_test_data.cluster_describe_output[key].should.equal(expected_value)
@mock_eks
def test_describe_cluster_throws_exception_when_cluster_not_found(ClusterBuilder):
client, generated_test_data = ClusterBuilder(BatchCountSize.SMALL)
expected_exception = ResourceNotFoundException
expected_msg = CLUSTER_NOT_FOUND_MSG.format(
clusterName=generated_test_data.nonexistent_cluster_name,
)
with pytest.raises(ClientError) as raised_exception:
client.describe_cluster(name=generated_test_data.nonexistent_cluster_name)
assert_expected_exception(raised_exception, expected_exception, expected_msg)
@mock_eks
def test_delete_cluster_returns_deleted_cluster(ClusterBuilder):
client, generated_test_data = ClusterBuilder(BatchCountSize.SMALL, False)
result = client.delete_cluster(name=generated_test_data.existing_cluster_name)[
ResponseAttributes.CLUSTER
]
for key, expected_value in generated_test_data.attributes_to_test:
result[key].should.equal(expected_value)
@mock_eks
def test_delete_cluster_removes_deleted_cluster(ClusterBuilder):
client, generated_test_data = ClusterBuilder(BatchCountSize.SMALL, False)
client.delete_cluster(name=generated_test_data.existing_cluster_name)
result_cluster_list = client.list_clusters()[ResponseAttributes.CLUSTERS]
len(result_cluster_list).should.equal(BatchCountSize.SMALL - 1)
result_cluster_list.should_not.contain(generated_test_data.existing_cluster_name)
@mock_eks
def test_delete_cluster_throws_exception_when_cluster_not_found(ClusterBuilder):
client, generated_test_data = ClusterBuilder(BatchCountSize.SMALL)
expected_exception = ResourceNotFoundException
expected_msg = CLUSTER_NOT_FOUND_MSG.format(
clusterName=generated_test_data.nonexistent_cluster_name,
)
with pytest.raises(ClientError) as raised_exception:
client.delete_cluster(name=generated_test_data.nonexistent_cluster_name)
count_clusters_after_test = len(client.list_clusters()[ResponseAttributes.CLUSTERS])
count_clusters_after_test.should.equal(BatchCountSize.SMALL)
assert_expected_exception(raised_exception, expected_exception, expected_msg)
@mock_eks
def test_list_nodegroups_returns_empty_by_default(ClusterBuilder):
client, generated_test_data = ClusterBuilder()
result = client.list_nodegroups(
clusterName=generated_test_data.existing_cluster_name
)[ResponseAttributes.NODEGROUPS]
result.should.be.empty
@mock_eks
def test_list_nodegroups_returns_sorted_nodegroup_names(NodegroupBuilder):
client, generated_test_data = NodegroupBuilder(BatchCountSize.SMALL)
expected_result = sorted(generated_test_data.nodegroup_names)
result = client.list_nodegroups(clusterName=generated_test_data.cluster_name)[
ResponseAttributes.NODEGROUPS
]
assert_result_matches_expected_list(result, expected_result, BatchCountSize.SMALL)
@mock_eks
def test_list_nodegroups_returns_default_max_results(NodegroupBuilder):
client, generated_test_data = NodegroupBuilder(BatchCountSize.LARGE)
expected_len = DEFAULT_MAX_RESULTS
expected_result = (sorted(generated_test_data.nodegroup_names))[:expected_len]
result = client.list_nodegroups(clusterName=generated_test_data.cluster_name)[
ResponseAttributes.NODEGROUPS
]
assert_result_matches_expected_list(result, expected_result, expected_len)
@mock_eks
def test_list_nodegroups_returns_custom_max_results(NodegroupBuilder):
client, generated_test_data = NodegroupBuilder(BatchCountSize.LARGE)
expected_len = BatchCountSize.LARGE
expected_result = (sorted(generated_test_data.nodegroup_names))[:expected_len]
result = client.list_nodegroups(
clusterName=generated_test_data.cluster_name, maxResults=expected_len
)[ResponseAttributes.NODEGROUPS]
assert_result_matches_expected_list(result, expected_result, expected_len)
@mock_eks
def test_list_nodegroups_returns_second_page_results(NodegroupBuilder):
client, generated_test_data = NodegroupBuilder(BatchCountSize.MEDIUM)
page1_len = PageCount.LARGE
expected_len = BatchCountSize.MEDIUM - page1_len
expected_result = (sorted(generated_test_data.nodegroup_names))[page1_len:]
token = client.list_nodegroups(
clusterName=generated_test_data.cluster_name, maxResults=page1_len
)[ResponseAttributes.NEXT_TOKEN]
result = client.list_nodegroups(
clusterName=generated_test_data.cluster_name, nextToken=token
)[ResponseAttributes.NODEGROUPS]
assert_result_matches_expected_list(result, expected_result, expected_len)
@mock_eks
def test_list_nodegroups_returns_custom_second_page_results(NodegroupBuilder):
client, generated_test_data = NodegroupBuilder(BatchCountSize.MEDIUM)
page1_len = PageCount.LARGE
expected_len = PageCount.SMALL
expected_result = (sorted(generated_test_data.nodegroup_names))[
page1_len : page1_len + expected_len
]
token = client.list_nodegroups(
clusterName=generated_test_data.cluster_name, maxResults=page1_len
)[ResponseAttributes.NEXT_TOKEN]
result = client.list_nodegroups(
clusterName=generated_test_data.cluster_name,
maxResults=expected_len,
nextToken=token,
)[ResponseAttributes.NODEGROUPS]
assert_result_matches_expected_list(result, expected_result, expected_len)
@mock_eks
def test_create_nodegroup_throws_exception_when_cluster_not_found():
client = boto3.client(SERVICE, region_name=REGION)
non_existent_cluster_name = random_string()
expected_exception = ResourceNotFoundException
expected_msg = CLUSTER_NOT_FOUND_MSG.format(clusterName=non_existent_cluster_name,)
with pytest.raises(ClientError) as raised_exception:
client.create_nodegroup(
clusterName=non_existent_cluster_name,
nodegroupName=random_string(),
**dict(NodegroupInputs.REQUIRED)
)
assert_expected_exception(raised_exception, expected_exception, expected_msg)
@mock_eks
def test_create_nodegroup_throws_exception_when_nodegroup_already_exists(
NodegroupBuilder,
):
client, generated_test_data = NodegroupBuilder(BatchCountSize.SMALL)
expected_exception = ResourceInUseException
expected_msg = NODEGROUP_EXISTS_MSG.format(
clusterName=generated_test_data.cluster_name,
nodegroupName=generated_test_data.existing_nodegroup_name,
)
with pytest.raises(ClientError) as raised_exception:
client.create_nodegroup(
clusterName=generated_test_data.cluster_name,
nodegroupName=generated_test_data.existing_nodegroup_name,
**dict(NodegroupInputs.REQUIRED)
)
count_nodegroups_after_test = len(
client.list_nodegroups(clusterName=generated_test_data.cluster_name)[
ResponseAttributes.NODEGROUPS
]
)
count_nodegroups_after_test.should.equal(BatchCountSize.SMALL)
assert_expected_exception(raised_exception, expected_exception, expected_msg)
@mock_eks
def test_create_nodegroup_throws_exception_when_cluster_not_active(
NodegroupBuilder, monkeypatch
):
if settings.TEST_SERVER_MODE:
raise SkipTest("Cant patch Cluster attributes in server mode.")
client, generated_test_data = NodegroupBuilder(BatchCountSize.SMALL)
expected_exception = InvalidRequestException
expected_msg = CLUSTER_NOT_READY_MSG.format(
clusterName=generated_test_data.cluster_name,
)
with mock.patch("moto.eks.models.Cluster.isActive", return_value=False):
with pytest.raises(ClientError) as raised_exception:
client.create_nodegroup(
clusterName=generated_test_data.cluster_name,
nodegroupName=random_string(),
**dict(NodegroupInputs.REQUIRED)
)
count_nodegroups_after_test = len(
client.list_nodegroups(clusterName=generated_test_data.cluster_name)[
ResponseAttributes.NODEGROUPS
]
)
count_nodegroups_after_test.should.equal(BatchCountSize.SMALL)
assert_expected_exception(raised_exception, expected_exception, expected_msg)
@mock_eks
def test_create_nodegroup_generates_valid_nodegroup_arn(NodegroupBuilder):
_, generated_test_data = NodegroupBuilder()
expected_arn_values = [
PARTITIONS,
REGION,
ACCOUNT_ID,
generated_test_data.cluster_name,
generated_test_data.nodegroup_names,
None,
]
all_arn_values_should_be_valid(
expected_arn_values=expected_arn_values,
pattern=RegExTemplates.NODEGROUP_ARN,
arn_under_test=generated_test_data.nodegroup_describe_output[
NodegroupAttributes.ARN
],
)
@freeze_time(FROZEN_TIME)
@mock_eks
def test_create_nodegroup_generates_valid_nodegroup_created_timestamp(NodegroupBuilder):
_, generated_test_data = NodegroupBuilder()
result_time = iso_8601_datetime_without_milliseconds(
generated_test_data.nodegroup_describe_output[NodegroupAttributes.CREATED_AT]
)
if settings.TEST_SERVER_MODE:
RegExTemplates.ISO8601_FORMAT.match(result_time).should.be.true
else:
result_time.should.equal(FROZEN_TIME)
@freeze_time(FROZEN_TIME)
@mock_eks
def test_create_nodegroup_generates_valid_nodegroup_modified_timestamp(
NodegroupBuilder,
):
client, generated_test_data = NodegroupBuilder()
result_time = iso_8601_datetime_without_milliseconds(
generated_test_data.nodegroup_describe_output[NodegroupAttributes.MODIFIED_AT]
)
if settings.TEST_SERVER_MODE:
RegExTemplates.ISO8601_FORMAT.match(result_time).should.be.true
else:
result_time.should.equal(FROZEN_TIME)
@mock_eks
def test_create_nodegroup_generates_valid_autoscaling_group_name(NodegroupBuilder):
_, generated_test_data = NodegroupBuilder()
result_resources = generated_test_data.nodegroup_describe_output[
NodegroupAttributes.RESOURCES
]
result_asg_name = result_resources[NodegroupAttributes.AUTOSCALING_GROUPS][0][
NodegroupAttributes.NAME
]
RegExTemplates.NODEGROUP_ASG_NAME_PATTERN.match(result_asg_name).should.be.true
@mock_eks
def test_create_nodegroup_generates_valid_security_group_name(NodegroupBuilder):
_, generated_test_data = NodegroupBuilder()
result_resources = generated_test_data.nodegroup_describe_output[
NodegroupAttributes.RESOURCES
]
result_security_group = result_resources[NodegroupAttributes.REMOTE_ACCESS_SG]
RegExTemplates.NODEGROUP_SECURITY_GROUP_NAME_PATTERN.match(
result_security_group
).should.be.true
@mock_eks
def test_create_nodegroup_saves_provided_parameters(NodegroupBuilder):
_, generated_test_data = NodegroupBuilder(minimal=False)
for key, expected_value in generated_test_data.attributes_to_test:
generated_test_data.nodegroup_describe_output[key].should.equal(expected_value)
@mock_eks
def test_describe_nodegroup_throws_exception_when_cluster_not_found(NodegroupBuilder):
client, generated_test_data = NodegroupBuilder()
expected_exception = ResourceNotFoundException
expected_msg = CLUSTER_NOT_FOUND_MSG.format(
clusterName=generated_test_data.nonexistent_cluster_name,
)
with pytest.raises(ClientError) as raised_exception:
client.describe_nodegroup(
clusterName=generated_test_data.nonexistent_cluster_name,
nodegroupName=generated_test_data.existing_nodegroup_name,
)
assert_expected_exception(raised_exception, expected_exception, expected_msg)
@mock_eks
def test_describe_nodegroup_throws_exception_when_nodegroup_not_found(NodegroupBuilder):
client, generated_test_data = NodegroupBuilder()
expected_exception = ResourceNotFoundException
expected_msg = NODEGROUP_NOT_FOUND_MSG.format(
nodegroupName=generated_test_data.nonexistent_nodegroup_name,
)
with pytest.raises(ClientError) as raised_exception:
client.describe_nodegroup(
clusterName=generated_test_data.cluster_name,
nodegroupName=generated_test_data.nonexistent_nodegroup_name,
)
assert_expected_exception(raised_exception, expected_exception, expected_msg)
@mock_eks
def test_delete_cluster_throws_exception_when_nodegroups_exist(NodegroupBuilder):
client, generated_test_data = NodegroupBuilder()
expected_exception = ResourceInUseException
expected_msg = CLUSTER_IN_USE_MSG
with pytest.raises(ClientError) as raised_exception:
client.delete_cluster(name=generated_test_data.cluster_name)
count_clusters_after_test = len(client.list_clusters()[ResponseAttributes.CLUSTERS])
count_clusters_after_test.should.equal(BatchCountSize.SINGLE)
assert_expected_exception(raised_exception, expected_exception, expected_msg)
@mock_eks
def test_delete_nodegroup_removes_deleted_nodegroup(NodegroupBuilder):
client, generated_test_data = NodegroupBuilder(BatchCountSize.SMALL)
client.delete_nodegroup(
clusterName=generated_test_data.cluster_name,
nodegroupName=generated_test_data.existing_nodegroup_name,
)
result = client.list_nodegroups(clusterName=generated_test_data.cluster_name)[
ResponseAttributes.NODEGROUPS
]
len(result).should.equal(BatchCountSize.SMALL - 1)
result.should_not.contain(generated_test_data.existing_nodegroup_name)
@mock_eks
def test_delete_nodegroup_returns_deleted_nodegroup(NodegroupBuilder):
client, generated_test_data = NodegroupBuilder(BatchCountSize.SMALL, False)
result = client.delete_nodegroup(
clusterName=generated_test_data.cluster_name,
nodegroupName=generated_test_data.existing_nodegroup_name,
)[ResponseAttributes.NODEGROUP]
for key, expected_value in generated_test_data.attributes_to_test:
result[key].should.equal(expected_value)
@mock_eks
def test_delete_nodegroup_throws_exception_when_cluster_not_found(NodegroupBuilder):
client, generated_test_data = NodegroupBuilder()
expected_exception = ResourceNotFoundException
expected_msg = CLUSTER_NOT_FOUND_MSG.format(
clusterName=generated_test_data.nonexistent_cluster_name,
)
with pytest.raises(ClientError) as raised_exception:
client.delete_nodegroup(
clusterName=generated_test_data.nonexistent_cluster_name,
nodegroupName=generated_test_data.existing_nodegroup_name,
)
assert_expected_exception(raised_exception, expected_exception, expected_msg)
@mock_eks
def test_delete_nodegroup_throws_exception_when_nodegroup_not_found(NodegroupBuilder):
client, generated_test_data = NodegroupBuilder()
expected_exception = ResourceNotFoundException
expected_msg = NODEGROUP_NOT_FOUND_MSG.format(
nodegroupName=generated_test_data.nonexistent_nodegroup_name,
)
with pytest.raises(ClientError) as raised_exception:
client.delete_nodegroup(
clusterName=generated_test_data.cluster_name,
nodegroupName=generated_test_data.nonexistent_nodegroup_name,
)
assert_expected_exception(raised_exception, expected_exception, expected_msg)
# If launch_template is specified, you can not specify instanceTypes, diskSize, or remoteAccess.
test_cases = [
# Happy Paths
(LAUNCH_TEMPLATE, None, None, None, PossibleTestResults.SUCCESS),
(None, INSTANCE_TYPES, DISK_SIZE, REMOTE_ACCESS, PossibleTestResults.SUCCESS),
(None, None, DISK_SIZE, REMOTE_ACCESS, PossibleTestResults.SUCCESS),
(None, INSTANCE_TYPES, None, REMOTE_ACCESS, PossibleTestResults.SUCCESS),
(None, INSTANCE_TYPES, DISK_SIZE, None, PossibleTestResults.SUCCESS),
(None, INSTANCE_TYPES, None, None, PossibleTestResults.SUCCESS),
(None, None, DISK_SIZE, None, PossibleTestResults.SUCCESS),
(None, None, None, REMOTE_ACCESS, PossibleTestResults.SUCCESS),
(None, None, None, None, PossibleTestResults.SUCCESS),
# Unhappy Paths
(LAUNCH_TEMPLATE, INSTANCE_TYPES, None, None, PossibleTestResults.FAILURE),
(LAUNCH_TEMPLATE, None, DISK_SIZE, None, PossibleTestResults.FAILURE),
(LAUNCH_TEMPLATE, None, None, REMOTE_ACCESS, PossibleTestResults.FAILURE),
(LAUNCH_TEMPLATE, INSTANCE_TYPES, DISK_SIZE, None, PossibleTestResults.FAILURE),
(LAUNCH_TEMPLATE, INSTANCE_TYPES, None, REMOTE_ACCESS, PossibleTestResults.FAILURE),
(LAUNCH_TEMPLATE, None, DISK_SIZE, REMOTE_ACCESS, PossibleTestResults.FAILURE),
(
LAUNCH_TEMPLATE,
INSTANCE_TYPES,
DISK_SIZE,
REMOTE_ACCESS,
PossibleTestResults.FAILURE,
),
]
@pytest.mark.parametrize(
"launch_template, instance_types, disk_size, remote_access, expected_result",
test_cases,
)
@mock_eks
def test_create_nodegroup_handles_launch_template_combinations(
ClusterBuilder,
launch_template,
instance_types,
disk_size,
remote_access,
expected_result,
):
client, generated_test_data = ClusterBuilder()
nodegroup_name = random_string()
expected_exception = InvalidParameterException
expected_msg = None
test_inputs = dict(
deepcopy(
# Required Constants
NodegroupInputs.REQUIRED
# Required Variables
+ [
(
ClusterAttributes.CLUSTER_NAME,
generated_test_data.existing_cluster_name,
),
(NodegroupAttributes.NODEGROUP_NAME, nodegroup_name),
]
# Test Case Values
+ [
_
for _ in [launch_template, instance_types, disk_size, remote_access]
if _
]
)
)
if expected_result == PossibleTestResults.SUCCESS:
result = client.create_nodegroup(**test_inputs)[ResponseAttributes.NODEGROUP]
for key, expected_value in test_inputs.items():
result[key].should.equal(expected_value)
else:
if launch_template and disk_size:
expected_msg = LAUNCH_TEMPLATE_WITH_DISK_SIZE_MSG
elif launch_template and remote_access:
expected_msg = LAUNCH_TEMPLATE_WITH_REMOTE_ACCESS_MSG
# Docs say this combination throws an exception but testing shows that
# instanceTypes overrides the launchTemplate instance values instead.
# Leaving here for easier correction if/when that gets fixed.
elif launch_template and instance_types:
pass
if expected_msg:
with pytest.raises(ClientError) as raised_exception:
client.create_nodegroup(**test_inputs)
assert_expected_exception(raised_exception, expected_exception, expected_msg)
def all_arn_values_should_be_valid(expected_arn_values, pattern, arn_under_test):
"""
Applies regex `pattern` to `arn_under_test` and asserts
that each group matches the provided expected value.
A list entry of None in the 'expected_arn_values' will
assert that the value exists but not match a specific value.
"""
findall = pattern.findall(arn_under_test)[0]
expected_values = deepcopy(expected_arn_values)
# findall() returns a list of matches from right to left so it must be reversed
# in order to match the logical order of the 'expected_arn_values' list.
for value in reversed(findall):
expected_value = expected_values.pop()
if expected_value:
value.should.be.within(expected_value)
else:
value.should.be.truthy
region_matches_partition(findall[1], findall[0]).should.be.true
def assert_expected_exception(raised_exception, expected_exception, expected_msg):
error = raised_exception.value.response[ErrorAttributes.ERROR]
error[ErrorAttributes.CODE].should.equal(expected_exception.TYPE)
error[ErrorAttributes.MESSAGE].should.equal(expected_msg)
def assert_result_matches_expected_list(result, expected_result, expected_len):
assert result == expected_result
assert len(result) == expected_len

View file

@ -0,0 +1,237 @@
"""
This file should only contain constants used for the EKS tests.
"""
import re
from enum import Enum
from boto3 import Session
from moto.eks import REGION as DEFAULT_REGION
DEFAULT_ENCODING = "utf-8"
DEFAULT_HTTP_HEADERS = {"Content-type": "application/json"}
FROZEN_TIME = "2013-11-27T01:42:00Z"
PARTITIONS = Session().get_available_partitions()
REGION = Session().region_name or DEFAULT_REGION
SERVICE = "eks"
SUBNET_IDS = ["subnet-12345ab", "subnet-67890cd"]
AMI_TYPE_KEY = "amiType"
AMI_TYPE_VALUE = "AL2_x86_64"
CLIENT_REQUEST_TOKEN_KEY = "clientRequestToken"
CLIENT_REQUEST_TOKEN_VALUE = "test_request_token"
DISK_SIZE_KEY = "diskSize"
DISK_SIZE_VALUE = 30
ENCRYPTION_CONFIG_KEY = "encryptionConfig"
ENCRYPTION_CONFIG_VALUE = [
{"resources": ["secrets"], "provider": {"keyArn": "arn:of:the:key"}}
]
INSTANCE_TYPES_KEY = "instanceTypes"
INSTANCE_TYPES_VALUE = ["t3.medium"]
KUBERNETES_NETWORK_CONFIG_KEY = "kubernetesNetworkConfig"
KUBERNETES_NETWORK_CONFIG_VALUE = {"serviceIpv4Cidr": "172.20.0.0/16"}
LABELS_KEY = "labels"
LABELS_VALUE = {"purpose": "example"}
LAUNCH_TEMPLATE_KEY = "launchTemplate"
LAUNCH_TEMPLATE_VALUE = {"name": "myTemplate", "version": "2", "id": "123456"}
LOGGING_KEY = "logging"
LOGGING_VALUE = {"clusterLogging": [{"types": ["api"], "enabled": True}]}
NODEROLE_ARN_KEY = "nodeRole"
NODEROLE_ARN_VALUE = "arn:aws:iam::123456789012:role/role_name"
REMOTE_ACCESS_KEY = "remoteAccess"
REMOTE_ACCESS_VALUE = {"ec2SshKey": "eksKeypair"}
RESOURCES_VPC_CONFIG_KEY = "resourcesVpcConfig"
RESOURCES_VPC_CONFIG_VALUE = {
"subnetIds": SUBNET_IDS,
"endpointPublicAccess": True,
"endpointPrivateAccess": False,
}
ROLE_ARN_KEY = "roleArn"
ROLE_ARN_VALUE = "arn:aws:iam::123456789012:role/role_name"
SCALING_CONFIG_KEY = "scalingConfig"
SCALING_CONFIG_VALUE = {"minSize": 2, "maxSize": 3, "desiredSize": 2}
STATUS_KEY = "status"
STATUS_VALUE = "ACTIVE"
SUBNETS_KEY = "subnets"
SUBNETS_VALUE = SUBNET_IDS
TAGS_KEY = "tags"
TAGS_VALUE = {"hello": "world"}
VERSION_KEY = "version"
VERSION_VALUE = "1"
AMI_TYPE = (AMI_TYPE_KEY, AMI_TYPE_VALUE)
CLIENT_REQUEST_TOKEN = (CLIENT_REQUEST_TOKEN_KEY, CLIENT_REQUEST_TOKEN_VALUE)
DISK_SIZE = (DISK_SIZE_KEY, DISK_SIZE_VALUE)
ENCRYPTION_CONFIG = (ENCRYPTION_CONFIG_KEY, ENCRYPTION_CONFIG_VALUE)
INSTANCE_TYPES = (INSTANCE_TYPES_KEY, INSTANCE_TYPES_VALUE)
KUBERNETES_NETWORK_CONFIG = (
KUBERNETES_NETWORK_CONFIG_KEY,
KUBERNETES_NETWORK_CONFIG_VALUE,
)
LABELS = (LABELS_KEY, LABELS_VALUE)
LAUNCH_TEMPLATE = (LAUNCH_TEMPLATE_KEY, LAUNCH_TEMPLATE_VALUE)
LOGGING = (LOGGING_KEY, LOGGING_VALUE)
NODEROLE_ARN = (NODEROLE_ARN_KEY, NODEROLE_ARN_VALUE)
REMOTE_ACCESS = (REMOTE_ACCESS_KEY, REMOTE_ACCESS_VALUE)
RESOURCES_VPC_CONFIG = (RESOURCES_VPC_CONFIG_KEY, RESOURCES_VPC_CONFIG_VALUE)
ROLE_ARN = (ROLE_ARN_KEY, ROLE_ARN_VALUE)
SCALING_CONFIG = (SCALING_CONFIG_KEY, SCALING_CONFIG_VALUE)
STATUS = (STATUS_KEY, STATUS_VALUE)
SUBNETS = (SUBNETS_KEY, SUBNETS_VALUE)
TAGS = (TAGS_KEY, TAGS_VALUE)
VERSION = (VERSION_KEY, VERSION_VALUE)
class ResponseAttributes:
CLUSTER = "cluster"
CLUSTERS = "clusters"
MESSAGE = "message"
NEXT_TOKEN = "nextToken"
NODEGROUP = "nodegroup"
NODEGROUPS = "nodegroups"
class ErrorAttributes:
CODE = "Code"
ERROR = "Error"
MESSAGE = "Message"
class ClusterInputs:
REQUIRED = [ROLE_ARN, RESOURCES_VPC_CONFIG]
OPTIONAL = [
CLIENT_REQUEST_TOKEN,
ENCRYPTION_CONFIG,
LOGGING,
KUBERNETES_NETWORK_CONFIG,
TAGS,
VERSION,
]
class NodegroupInputs:
REQUIRED = [NODEROLE_ARN, SUBNETS]
OPTIONAL = [
AMI_TYPE,
DISK_SIZE,
INSTANCE_TYPES,
LABELS,
REMOTE_ACCESS,
SCALING_CONFIG,
TAGS,
]
class PossibleTestResults(Enum):
SUCCESS = "SUCCESS"
FAILURE = "FAILURE"
class AddonAttributes:
ADDON_NAME = "addonName"
class ClusterAttributes:
ARN = "arn"
CLUSTER_NAME = "clusterName"
CREATED_AT = "createdAt"
ENDPOINT = "endpoint"
IDENTITY = "identity"
ISSUER = "issuer"
NAME = "name"
OIDC = "oidc"
class FargateAttributes:
PROFILE_NAME = "fargateProfileName"
class NodegroupAttributes:
ARN = "nodegroupArn"
AUTOSCALING_GROUPS = "autoScalingGroups"
CREATED_AT = "createdAt"
MODIFIED_AT = "modifiedAt"
NAME = "name"
NODEGROUP_NAME = "nodegroupName"
REMOTE_ACCESS_SG = "remoteAccessSecurityGroup"
RESOURCES = "resources"
class BatchCountSize:
SINGLE = 1
SMALL = 10
MEDIUM = 20
LARGE = 200
class PageCount:
SMALL = 3
LARGE = 10
NODEGROUP_UUID_PATTERN = "(?P<nodegroup_uuid>[-0-9a-z]{8}-[-0-9a-z]{4}-[-0-9a-z]{4}-[-0-9a-z]{4}-[-0-9a-z]{12})"
class RegExTemplates:
CLUSTER_ARN = re.compile(
"arn:"
+ "(?P<partition>.+):"
+ "eks:"
+ "(?P<region>[-0-9a-zA-Z]+):"
+ "(?P<account_id>[0-9]{12}):"
+ "cluster/"
+ "(?P<cluster_name>.+)"
)
ISO8601_FORMAT = re.compile(
r"^(-?(?:[1-9][0-9]*)?[0-9]{4})-(1[0-2]|0[1-9])-(3[01]|0[1-9]|[12][0-9])T(2[0-3]|[01][0-9]):([0-5][0-9]):([0-5][0-9])(\.[0-9]+)?(Z|[+-](?:2[0-3]|[01][0-9]):[0-5][0-9])?$"
)
NODEGROUP_ARN = re.compile(
"arn:"
+ "(?P<partition>.+):"
+ "eks:"
+ "(?P<region>[-0-9a-zA-Z]+):"
+ "(?P<account_id>[0-9]{12}):"
+ "nodegroup/"
+ "(?P<cluster_name>.+)/"
+ "(?P<nodegroup_name>.+)/"
+ NODEGROUP_UUID_PATTERN
)
NODEGROUP_ASG_NAME_PATTERN = re.compile("eks-" + NODEGROUP_UUID_PATTERN)
NODEGROUP_SECURITY_GROUP_NAME_PATTERN = re.compile("sg-" + "([-0-9a-z]{17})")
class Endpoints:
CREATE_CLUSTER = "/clusters"
CREATE_NODEGROUP = "/clusters/{clusterName}/node-groups"
DESCRIBE_CLUSTER = "/clusters/{clusterName}"
DESCRIBE_NODEGROUP = "/clusters/{clusterName}/node-groups/{nodegroupName}"
DELETE_CLUSTER = "/clusters/{clusterName}"
DELETE_NODEGROUP = "/clusters/{clusterName}/node-groups/{nodegroupName}"
LIST_CLUSTERS = "/clusters?maxResults={maxResults}&nextToken={nextToken}"
LIST_NODEGROUPS = "/clusters/{clusterName}/node-groups?maxResults={maxResults}&nextToken={nextToken}"
class StatusCodes:
OK = 200
class HttpHeaders:
ErrorType = "x-amzn-ErrorType"

View file

@ -0,0 +1,110 @@
from copy import deepcopy
from random import randint
try:
from urllib.parse import urlparse
except ImportError:
from urlparse import urlparse
from moto.utilities.utils import random_string
from tests.test_eks.test_eks_constants import (
ClusterAttributes,
ClusterInputs,
NodegroupAttributes,
NodegroupInputs,
ResponseAttributes,
STATUS,
)
generate_random_name = random_string
def attributes_to_test(inputs, name):
"""
Assembles the list of tuples which will be used to validate test results.
"""
result = deepcopy(inputs.REQUIRED + inputs.OPTIONAL + [STATUS])
if isinstance(inputs, ClusterInputs):
result += [(ClusterAttributes.NAME, name)]
elif isinstance(inputs, NodegroupInputs):
result += [(NodegroupAttributes.NODEGROUP_NAME, name)]
return result
def generate_clusters(client, num_clusters, minimal):
"""
Generates 'num_clusters' number of clusters with randomized data and adds them to the mocked backend.
If 'minimal' is True, only the required values are generated; if False all values are generated.
Returns a list of the names of the generated clusters.
"""
return [
client.create_cluster(
name=generate_random_name(), **_input_builder(ClusterInputs, minimal)
)[ResponseAttributes.CLUSTER][ClusterAttributes.NAME]
for _ in range(num_clusters)
]
def generate_nodegroups(client, cluster_name, num_nodegroups, minimal):
"""
Generates 'num_nodegroups' number of nodegroups with randomized data and adds them to the mocked backend.
If 'minimal' is True, only the required values are generated; if False, all values are generated.
Returns a list of the names of the generated nodegroups.
"""
return [
client.create_nodegroup(
nodegroupName=generate_random_name(),
clusterName=cluster_name,
**_input_builder(NodegroupInputs, minimal)
)[ResponseAttributes.NODEGROUP][NodegroupAttributes.NODEGROUP_NAME]
for _ in range(num_nodegroups)
]
def is_valid_uri(value):
"""
Returns true if a provided string has the form of a valid uri.
"""
result = urlparse(value)
return all([result.scheme, result.netloc, result.path])
def region_matches_partition(region, partition):
"""
Returns True if the provided region and partition are a valid pair.
"""
valid_matches = [
("cn-", "aws-cn"),
("us-gov-", "aws-us-gov"),
("us-gov-iso-", "aws-iso"),
("us-gov-iso-b-", "aws-iso-b"),
]
for prefix, expected_partition in valid_matches:
if region.startswith(prefix):
return partition == expected_partition
return partition == "aws"
def _input_builder(options, minimal):
"""
Assembles the inputs which will be used to generate test object into a dictionary.
"""
values = deepcopy(options.REQUIRED)
if not minimal:
values.extend(deepcopy(options.OPTIONAL))
return dict(values)
def random_names(name_list):
"""
Returns one value picked at random a list, and one value guaranteed not to be on the list.
"""
name_on_list = name_list[randint(0, len(name_list) - 1)]
name_not_on_list = generate_random_name()
while name_not_on_list in name_list:
name_not_on_list = generate_random_name()
return name_on_list, name_not_on_list

View file

@ -0,0 +1,524 @@
from __future__ import unicode_literals
import json
from copy import deepcopy
import pytest
import sure # noqa
import moto.server as server
from moto import mock_eks
from moto.core import ACCOUNT_ID
from moto.eks.exceptions import ResourceInUseException, ResourceNotFoundException
from moto.eks.models import (
CLUSTER_EXISTS_MSG,
CLUSTER_IN_USE_MSG,
CLUSTER_NOT_FOUND_MSG,
NODEGROUP_EXISTS_MSG,
NODEGROUP_NOT_FOUND_MSG,
)
from moto.eks.responses import DEFAULT_MAX_RESULTS, DEFAULT_NEXT_TOKEN
from tests.test_eks.test_eks import all_arn_values_should_be_valid
from tests.test_eks.test_eks_constants import (
AddonAttributes,
ClusterAttributes,
DEFAULT_ENCODING,
DEFAULT_HTTP_HEADERS,
DEFAULT_REGION,
Endpoints,
FargateAttributes,
HttpHeaders,
NodegroupAttributes,
NODEROLE_ARN_KEY,
NODEROLE_ARN_VALUE,
PARTITIONS,
RegExTemplates,
ResponseAttributes,
ROLE_ARN_KEY,
ROLE_ARN_VALUE,
SERVICE,
StatusCodes,
SUBNETS_KEY,
SUBNETS_VALUE,
)
"""
Test the different server responses
"""
NAME_LIST = ["foo", "bar", "baz", "qux"]
class TestCluster:
cluster_name = "example_cluster"
data = {ClusterAttributes.NAME: cluster_name, ROLE_ARN_KEY: ROLE_ARN_VALUE}
endpoint = Endpoints.CREATE_CLUSTER
expected_arn_values = [
PARTITIONS,
DEFAULT_REGION,
ACCOUNT_ID,
cluster_name,
]
class TestNodegroup:
cluster_name = TestCluster.cluster_name
nodegroup_name = "example_nodegroup"
data = {
ClusterAttributes.CLUSTER_NAME: cluster_name,
NodegroupAttributes.NODEGROUP_NAME: nodegroup_name,
NODEROLE_ARN_KEY: NODEROLE_ARN_VALUE,
SUBNETS_KEY: SUBNETS_VALUE,
}
endpoint = Endpoints.CREATE_NODEGROUP.format(clusterName=cluster_name)
expected_arn_values = [
PARTITIONS,
DEFAULT_REGION,
ACCOUNT_ID,
cluster_name,
nodegroup_name,
None,
]
@pytest.fixture(autouse=True)
def test_client():
backend = server.create_backend_app(SERVICE)
yield backend.test_client()
@pytest.fixture(scope="function")
def create_cluster(test_client):
def create_and_verify_cluster(client, name):
"""Creates one valid cluster and verifies return status code 200."""
data = deepcopy(TestCluster.data)
data.update(name=name)
response = client.post(
TestCluster.endpoint, data=json.dumps(data), headers=DEFAULT_HTTP_HEADERS,
)
response.status_code.should.equal(StatusCodes.OK)
return json.loads(response.data.decode(DEFAULT_ENCODING))[
ResponseAttributes.CLUSTER
]
def _execute(name=TestCluster.cluster_name):
return create_and_verify_cluster(test_client, name=name)
yield _execute
@pytest.fixture(scope="function", autouse=True)
def create_nodegroup(test_client):
def create_and_verify_nodegroup(client, name):
"""Creates one valid nodegroup and verifies return status code 200."""
data = deepcopy(TestNodegroup.data)
data.update(nodegroupName=name)
response = client.post(
TestNodegroup.endpoint, data=json.dumps(data), headers=DEFAULT_HTTP_HEADERS,
)
response.status_code.should.equal(StatusCodes.OK)
return json.loads(response.data.decode(DEFAULT_ENCODING))[
ResponseAttributes.NODEGROUP
]
def _execute(name=TestNodegroup.nodegroup_name):
return create_and_verify_nodegroup(test_client, name=name)
yield _execute
@mock_eks
def test_eks_create_single_cluster(create_cluster):
result_cluster = create_cluster()
result_cluster[ClusterAttributes.NAME].should.equal(TestCluster.cluster_name)
all_arn_values_should_be_valid(
expected_arn_values=TestCluster.expected_arn_values,
pattern=RegExTemplates.CLUSTER_ARN,
arn_under_test=result_cluster[ClusterAttributes.ARN],
)
@mock_eks
def test_eks_create_multiple_clusters_with_same_name(test_client, create_cluster):
create_cluster()
expected_exception = ResourceInUseException
expected_msg = CLUSTER_EXISTS_MSG.format(clusterName=TestCluster.cluster_name)
expected_data = {
ClusterAttributes.CLUSTER_NAME: TestCluster.cluster_name,
NodegroupAttributes.NODEGROUP_NAME: None,
AddonAttributes.ADDON_NAME: None,
ResponseAttributes.MESSAGE: expected_msg,
}
response = test_client.post(
TestCluster.endpoint,
data=json.dumps(TestCluster.data),
headers=DEFAULT_HTTP_HEADERS,
)
should_return_expected_exception(response, expected_exception, expected_data)
@mock_eks
def test_eks_create_nodegroup_without_cluster(test_client):
expected_exception = ResourceNotFoundException
expected_msg = CLUSTER_NOT_FOUND_MSG.format(clusterName=TestCluster.cluster_name)
expected_data = {
ClusterAttributes.CLUSTER_NAME: None,
NodegroupAttributes.NODEGROUP_NAME: None,
FargateAttributes.PROFILE_NAME: None,
AddonAttributes.ADDON_NAME: None,
ResponseAttributes.MESSAGE: expected_msg,
}
endpoint = Endpoints.CREATE_NODEGROUP.format(clusterName=TestCluster.cluster_name)
response = test_client.post(
endpoint, data=json.dumps(TestNodegroup.data), headers=DEFAULT_HTTP_HEADERS
)
should_return_expected_exception(response, expected_exception, expected_data)
@mock_eks
def test_eks_create_nodegroup_on_existing_cluster(create_cluster, create_nodegroup):
create_cluster()
result_data = create_nodegroup()
result_data[NodegroupAttributes.NODEGROUP_NAME].should.equal(
TestNodegroup.nodegroup_name
)
all_arn_values_should_be_valid(
expected_arn_values=TestNodegroup.expected_arn_values,
pattern=RegExTemplates.NODEGROUP_ARN,
arn_under_test=result_data[NodegroupAttributes.ARN],
)
@mock_eks
def test_eks_create_multiple_nodegroups_with_same_name(
test_client, create_cluster, create_nodegroup
):
create_cluster()
create_nodegroup()
expected_exception = ResourceInUseException
expected_msg = NODEGROUP_EXISTS_MSG.format(
clusterName=TestNodegroup.cluster_name,
nodegroupName=TestNodegroup.nodegroup_name,
)
expected_data = {
ClusterAttributes.CLUSTER_NAME: TestNodegroup.cluster_name,
NodegroupAttributes.NODEGROUP_NAME: TestNodegroup.nodegroup_name,
AddonAttributes.ADDON_NAME: None,
ResponseAttributes.MESSAGE: expected_msg,
}
response = test_client.post(
TestNodegroup.endpoint,
data=json.dumps(TestNodegroup.data),
headers=DEFAULT_HTTP_HEADERS,
)
should_return_expected_exception(response, expected_exception, expected_data)
@mock_eks
def test_eks_list_clusters(test_client, create_cluster):
[create_cluster(name) for name in NAME_LIST]
response = test_client.get(
Endpoints.LIST_CLUSTERS.format(
maxResults=DEFAULT_MAX_RESULTS, nextToken=DEFAULT_NEXT_TOKEN
)
)
result_data = json.loads(response.data.decode(DEFAULT_ENCODING))[
ResponseAttributes.CLUSTERS
]
response.status_code.should.equal(StatusCodes.OK)
len(result_data).should.equal(len(NAME_LIST))
sorted(result_data).should.equal(sorted(NAME_LIST))
@mock_eks
def test_eks_list_nodegroups(test_client, create_cluster, create_nodegroup):
create_cluster()
[create_nodegroup(name) for name in NAME_LIST]
response = test_client.get(
Endpoints.LIST_NODEGROUPS.format(
clusterName=TestCluster.cluster_name,
maxResults=DEFAULT_MAX_RESULTS,
nextToken=DEFAULT_NEXT_TOKEN,
)
)
result_data = json.loads(response.data.decode(DEFAULT_ENCODING))[
ResponseAttributes.NODEGROUPS
]
response.status_code.should.equal(StatusCodes.OK)
sorted(result_data).should.equal(sorted(NAME_LIST))
len(result_data).should.equal(len(NAME_LIST))
@mock_eks
def test_eks_describe_existing_cluster(test_client, create_cluster):
create_cluster()
response = test_client.get(
Endpoints.DESCRIBE_CLUSTER.format(clusterName=TestCluster.cluster_name)
)
result_data = json.loads(response.data.decode(DEFAULT_ENCODING))[
ResponseAttributes.CLUSTER
]
response.status_code.should.equal(StatusCodes.OK)
result_data[ClusterAttributes.NAME].should.equal(TestCluster.cluster_name)
all_arn_values_should_be_valid(
expected_arn_values=TestCluster.expected_arn_values,
pattern=RegExTemplates.CLUSTER_ARN,
arn_under_test=result_data[ClusterAttributes.ARN],
)
@mock_eks
def test_eks_describe_nonexisting_cluster(test_client):
expected_exception = ResourceNotFoundException
expected_msg = CLUSTER_NOT_FOUND_MSG.format(clusterName=TestCluster.cluster_name)
expected_data = {
ClusterAttributes.CLUSTER_NAME: None,
NodegroupAttributes.NODEGROUP_NAME: None,
FargateAttributes.PROFILE_NAME: None,
AddonAttributes.ADDON_NAME: None,
ResponseAttributes.MESSAGE: expected_msg,
}
response = test_client.get(
Endpoints.DESCRIBE_CLUSTER.format(clusterName=TestCluster.cluster_name)
)
should_return_expected_exception(response, expected_exception, expected_data)
@mock_eks
def test_eks_describe_existing_nodegroup(test_client, create_cluster, create_nodegroup):
create_cluster()
create_nodegroup()
response = test_client.get(
Endpoints.DESCRIBE_NODEGROUP.format(
clusterName=TestNodegroup.cluster_name,
nodegroupName=TestNodegroup.nodegroup_name,
)
)
result_data = json.loads(response.data.decode(DEFAULT_ENCODING))[
ResponseAttributes.NODEGROUP
]
response.status_code.should.equal(StatusCodes.OK)
result_data[ClusterAttributes.CLUSTER_NAME].should.equal(TestNodegroup.cluster_name)
result_data[NodegroupAttributes.NODEGROUP_NAME].should.equal(
TestNodegroup.nodegroup_name
)
all_arn_values_should_be_valid(
expected_arn_values=TestNodegroup.expected_arn_values,
pattern=RegExTemplates.NODEGROUP_ARN,
arn_under_test=result_data[NodegroupAttributes.ARN],
)
@mock_eks
def test_eks_describe_nonexisting_nodegroup(test_client, create_cluster):
create_cluster()
expected_exception = ResourceNotFoundException
expected_msg = NODEGROUP_NOT_FOUND_MSG.format(
clusterName=TestNodegroup.cluster_name,
nodegroupName=TestNodegroup.nodegroup_name,
)
expected_data = {
ClusterAttributes.CLUSTER_NAME: TestNodegroup.cluster_name,
NodegroupAttributes.NODEGROUP_NAME: TestNodegroup.nodegroup_name,
FargateAttributes.PROFILE_NAME: None,
AddonAttributes.ADDON_NAME: None,
ResponseAttributes.MESSAGE: expected_msg,
}
response = test_client.get(
Endpoints.DESCRIBE_NODEGROUP.format(
clusterName=TestCluster.cluster_name,
nodegroupName=TestNodegroup.nodegroup_name,
)
)
should_return_expected_exception(response, expected_exception, expected_data)
@mock_eks
def test_eks_describe_nodegroup_nonexisting_cluster(test_client):
expected_exception = ResourceNotFoundException
expected_msg = CLUSTER_NOT_FOUND_MSG.format(clusterName=TestNodegroup.cluster_name)
expected_data = {
ClusterAttributes.CLUSTER_NAME: TestNodegroup.cluster_name,
NodegroupAttributes.NODEGROUP_NAME: TestNodegroup.nodegroup_name,
FargateAttributes.PROFILE_NAME: None,
AddonAttributes.ADDON_NAME: None,
ResponseAttributes.MESSAGE: expected_msg,
}
response = test_client.get(
Endpoints.DESCRIBE_NODEGROUP.format(
clusterName=TestCluster.cluster_name,
nodegroupName=TestNodegroup.nodegroup_name,
)
)
should_return_expected_exception(response, expected_exception, expected_data)
@mock_eks
def test_eks_delete_cluster(test_client, create_cluster):
create_cluster()
response = test_client.delete(
Endpoints.DELETE_CLUSTER.format(clusterName=TestCluster.cluster_name)
)
result_data = json.loads(response.data.decode(DEFAULT_ENCODING))[
ResponseAttributes.CLUSTER
]
response.status_code.should.equal(StatusCodes.OK)
result_data[ClusterAttributes.NAME].should.equal(TestCluster.cluster_name)
all_arn_values_should_be_valid(
expected_arn_values=TestCluster.expected_arn_values,
pattern=RegExTemplates.CLUSTER_ARN,
arn_under_test=result_data[ClusterAttributes.ARN],
)
@mock_eks
def test_eks_delete_nonexisting_cluster(test_client):
expected_exception = ResourceNotFoundException
expected_msg = CLUSTER_NOT_FOUND_MSG.format(clusterName=TestCluster.cluster_name)
expected_data = {
ClusterAttributes.CLUSTER_NAME: None,
NodegroupAttributes.NODEGROUP_NAME: None,
FargateAttributes.PROFILE_NAME: None,
AddonAttributes.ADDON_NAME: None,
ResponseAttributes.MESSAGE: expected_msg,
}
response = test_client.delete(
Endpoints.DELETE_CLUSTER.format(clusterName=TestCluster.cluster_name)
)
should_return_expected_exception(response, expected_exception, expected_data)
@mock_eks
def test_eks_delete_cluster_with_nodegroups(
test_client, create_cluster, create_nodegroup
):
create_cluster()
create_nodegroup()
expected_exception = ResourceInUseException
expected_msg = CLUSTER_IN_USE_MSG.format(clusterName=TestCluster.cluster_name)
expected_data = {
ClusterAttributes.CLUSTER_NAME: TestCluster.cluster_name,
NodegroupAttributes.NODEGROUP_NAME: TestNodegroup.nodegroup_name,
AddonAttributes.ADDON_NAME: None,
ResponseAttributes.MESSAGE: expected_msg,
}
response = test_client.delete(
Endpoints.DELETE_CLUSTER.format(clusterName=TestCluster.cluster_name)
)
should_return_expected_exception(response, expected_exception, expected_data)
@mock_eks
def test_eks_delete_nodegroup(test_client, create_cluster, create_nodegroup):
create_cluster()
create_nodegroup()
response = test_client.delete(
Endpoints.DELETE_NODEGROUP.format(
clusterName=TestNodegroup.cluster_name,
nodegroupName=TestNodegroup.nodegroup_name,
)
)
result_data = json.loads(response.data.decode(DEFAULT_ENCODING))[
ResponseAttributes.NODEGROUP
]
response.status_code.should.equal(StatusCodes.OK)
result_data[ClusterAttributes.CLUSTER_NAME].should.equal(TestNodegroup.cluster_name)
result_data[NodegroupAttributes.NODEGROUP_NAME].should.equal(
TestNodegroup.nodegroup_name
)
all_arn_values_should_be_valid(
expected_arn_values=TestNodegroup.expected_arn_values,
pattern=RegExTemplates.NODEGROUP_ARN,
arn_under_test=result_data[NodegroupAttributes.ARN],
)
@mock_eks
def test_eks_delete_nonexisting_nodegroup(test_client, create_cluster):
create_cluster()
expected_exception = ResourceNotFoundException
expected_msg = NODEGROUP_NOT_FOUND_MSG.format(
clusterName=TestNodegroup.cluster_name,
nodegroupName=TestNodegroup.nodegroup_name,
)
expected_data = {
ClusterAttributes.CLUSTER_NAME: TestNodegroup.cluster_name,
NodegroupAttributes.NODEGROUP_NAME: TestNodegroup.nodegroup_name,
FargateAttributes.PROFILE_NAME: None,
AddonAttributes.ADDON_NAME: None,
ResponseAttributes.MESSAGE: expected_msg,
}
response = test_client.delete(
Endpoints.DELETE_NODEGROUP.format(
clusterName=TestNodegroup.cluster_name,
nodegroupName=TestNodegroup.nodegroup_name,
)
)
should_return_expected_exception(response, expected_exception, expected_data)
@mock_eks
def test_eks_delete_nodegroup_nonexisting_cluster(test_client):
expected_exception = ResourceNotFoundException
expected_msg = CLUSTER_NOT_FOUND_MSG.format(
clusterName=TestNodegroup.cluster_name,
nodegroupName=TestNodegroup.nodegroup_name,
)
expected_data = {
ClusterAttributes.CLUSTER_NAME: None,
NodegroupAttributes.NODEGROUP_NAME: None,
FargateAttributes.PROFILE_NAME: None,
AddonAttributes.ADDON_NAME: None,
ResponseAttributes.MESSAGE: expected_msg,
}
response = test_client.delete(
Endpoints.DELETE_NODEGROUP.format(
clusterName=TestNodegroup.cluster_name,
nodegroupName=TestNodegroup.nodegroup_name,
)
)
should_return_expected_exception(response, expected_exception, expected_data)
def should_return_expected_exception(response, expected_exception, expected_data):
result_data = json.loads(response.data.decode(DEFAULT_ENCODING))
response.status_code.should.equal(expected_exception.STATUS)
response.headers.get(HttpHeaders.ErrorType).should.equal(expected_exception.TYPE)
result_data.should.equal(expected_data)