Added redshift.get_cluster_credentials (#3611)
* Added redshift.get_cluster_credentials * Marked endpoint in list * Removed f string from tests * Python 2.7 compat changes * Fixed parameter retrieval * Formatting * Removed try/catch in favor of if * Changed to existing random_string util Co-authored-by: Andrea Amorosi <aamorosi@amazon.es>
This commit is contained in:
parent
d7f218bfec
commit
5a41866f71
4 changed files with 148 additions and 2 deletions
|
|
@ -1,5 +1,6 @@
|
|||
from __future__ import unicode_literals
|
||||
|
||||
import time
|
||||
import datetime
|
||||
|
||||
import boto
|
||||
|
|
@ -1487,3 +1488,107 @@ def test_resize_cluster():
|
|||
)
|
||||
ex.value.response["Error"]["Code"].should.equal("InvalidParameterValue")
|
||||
ex.value.response["Error"]["Message"].should.contain("Invalid cluster type")
|
||||
|
||||
|
||||
@mock_redshift
|
||||
def test_get_cluster_credentials_non_existent_cluster():
|
||||
client = boto3.client("redshift", region_name="us-east-1")
|
||||
|
||||
with pytest.raises(ClientError) as ex:
|
||||
client.get_cluster_credentials(ClusterIdentifier="non-existent")
|
||||
ex.value.response["Error"]["Code"].should.equal("ClusterNotFound")
|
||||
ex.value.response["Error"]["Message"].should.match(r"Cluster .+ not found.")
|
||||
|
||||
|
||||
@mock_redshift
|
||||
def test_get_cluster_credentials_non_existent_cluster():
|
||||
client = boto3.client("redshift", region_name="us-east-1")
|
||||
|
||||
with pytest.raises(ClientError) as ex:
|
||||
client.get_cluster_credentials(
|
||||
ClusterIdentifier="non-existent", DbUser="some_user"
|
||||
)
|
||||
ex.value.response["Error"]["Code"].should.equal("ClusterNotFound")
|
||||
ex.value.response["Error"]["Message"].should.match(r"Cluster .+ not found.")
|
||||
|
||||
|
||||
@mock_redshift
|
||||
def test_get_cluster_credentials_invalid_duration():
|
||||
client = boto3.client("redshift", region_name="us-east-1")
|
||||
|
||||
cluster_identifier = "my_cluster"
|
||||
client.create_cluster(
|
||||
ClusterIdentifier=cluster_identifier,
|
||||
ClusterType="single-node",
|
||||
DBName="test",
|
||||
MasterUsername="user",
|
||||
MasterUserPassword="password",
|
||||
NodeType="ds2.xlarge",
|
||||
)
|
||||
|
||||
db_user = "some_user"
|
||||
with pytest.raises(ClientError) as ex:
|
||||
client.get_cluster_credentials(
|
||||
ClusterIdentifier=cluster_identifier, DbUser=db_user, DurationSeconds=899
|
||||
)
|
||||
ex.value.response["Error"]["Code"].should.equal("InvalidParameterValue")
|
||||
ex.value.response["Error"]["Message"].should.contain(
|
||||
"Token duration must be between 900 and 3600 seconds"
|
||||
)
|
||||
|
||||
with pytest.raises(ClientError) as ex:
|
||||
client.get_cluster_credentials(
|
||||
ClusterIdentifier=cluster_identifier, DbUser=db_user, DurationSeconds=3601
|
||||
)
|
||||
ex.value.response["Error"]["Code"].should.equal("InvalidParameterValue")
|
||||
ex.value.response["Error"]["Message"].should.contain(
|
||||
"Token duration must be between 900 and 3600 seconds"
|
||||
)
|
||||
|
||||
|
||||
@mock_redshift
|
||||
def test_get_cluster_credentials():
|
||||
client = boto3.client("redshift", region_name="us-east-1")
|
||||
|
||||
cluster_identifier = "my_cluster"
|
||||
client.create_cluster(
|
||||
ClusterIdentifier=cluster_identifier,
|
||||
ClusterType="single-node",
|
||||
DBName="test",
|
||||
MasterUsername="user",
|
||||
MasterUserPassword="password",
|
||||
NodeType="ds2.xlarge",
|
||||
)
|
||||
|
||||
expected_expiration = time.mktime(
|
||||
(datetime.datetime.now() + datetime.timedelta(0, 900)).timetuple()
|
||||
)
|
||||
db_user = "some_user"
|
||||
response = client.get_cluster_credentials(
|
||||
ClusterIdentifier=cluster_identifier, DbUser=db_user,
|
||||
)
|
||||
response["DbUser"].should.equal("IAM:%s" % db_user)
|
||||
assert time.mktime((response["Expiration"]).timetuple()) == pytest.approx(
|
||||
expected_expiration
|
||||
)
|
||||
response["DbPassword"].should.have.length_of(32)
|
||||
|
||||
response = client.get_cluster_credentials(
|
||||
ClusterIdentifier=cluster_identifier, DbUser=db_user, AutoCreate=True
|
||||
)
|
||||
response["DbUser"].should.equal("IAMA:%s" % db_user)
|
||||
|
||||
response = client.get_cluster_credentials(
|
||||
ClusterIdentifier=cluster_identifier, DbUser="some_other_user", AutoCreate=False
|
||||
)
|
||||
response["DbUser"].should.equal("IAM:%s" % "some_other_user")
|
||||
|
||||
expected_expiration = time.mktime(
|
||||
(datetime.datetime.now() + datetime.timedelta(0, 3000)).timetuple()
|
||||
)
|
||||
response = client.get_cluster_credentials(
|
||||
ClusterIdentifier=cluster_identifier, DbUser=db_user, DurationSeconds=3000,
|
||||
)
|
||||
assert time.mktime(response["Expiration"].timetuple()) == pytest.approx(
|
||||
expected_expiration
|
||||
)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue