Merge pull request #2894 from microe/add_sts_assume_role_with_saml

Add the STS call assume_role_with_saml
This commit is contained in:
Steve Pulec 2020-04-26 15:25:38 -05:00 committed by GitHub
commit 60bcb46729
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 207 additions and 0 deletions

View file

@ -1,6 +1,7 @@
from __future__ import unicode_literals
import argparse
import io
import json
import re
import sys
@ -29,6 +30,7 @@ UNSIGNED_REQUESTS = {
"AWSCognitoIdentityService": ("cognito-identity", "us-east-1"),
"AWSCognitoIdentityProviderService": ("cognito-idp", "us-east-1"),
}
UNSIGNED_ACTIONS = {"AssumeRoleWithSAML": ("sts", "us-east-1")}
class DomainDispatcherApplication(object):
@ -77,9 +79,13 @@ class DomainDispatcherApplication(object):
else:
# Unsigned request
target = environ.get("HTTP_X_AMZ_TARGET")
action = self.get_action_from_body(environ)
if target:
service, _ = target.split(".", 1)
service, region = UNSIGNED_REQUESTS.get(service, DEFAULT_SERVICE_REGION)
elif action and action in UNSIGNED_ACTIONS:
# See if we can match the Action to a known service
service, region = UNSIGNED_ACTIONS.get(action)
else:
# S3 is the last resort when the target is also unknown
service, region = DEFAULT_SERVICE_REGION
@ -130,6 +136,26 @@ class DomainDispatcherApplication(object):
self.app_instances[backend] = app
return app
def get_action_from_body(self, environ):
body = None
try:
# AWS requests use querystrings as the body (Action=x&Data=y&...)
simple_form = environ["CONTENT_TYPE"].startswith(
"application/x-www-form-urlencoded"
)
request_body_size = int(environ["CONTENT_LENGTH"])
if simple_form and request_body_size:
body = environ["wsgi.input"].read(request_body_size).decode("utf-8")
body_dict = dict(x.split("=") for x in body.split("&"))
return body_dict["Action"]
except (KeyError, ValueError):
pass
finally:
if body:
# We've consumed the body = need to reset it
environ["wsgi.input"] = io.StringIO(body)
return None
def __call__(self, environ, start_response):
backend_app = self.get_application(environ)
return backend_app(environ, start_response)

View file

@ -1,5 +1,7 @@
from __future__ import unicode_literals
from base64 import b64decode
import datetime
import xmltodict
from moto.core import BaseBackend, BaseModel
from moto.core.utils import iso_8601_datetime_with_milliseconds
from moto.core import ACCOUNT_ID
@ -79,5 +81,24 @@ class STSBackend(BaseBackend):
def assume_role_with_web_identity(self, **kwargs):
return self.assume_role(**kwargs)
def assume_role_with_saml(self, **kwargs):
del kwargs["principal_arn"]
saml_assertion_encoded = kwargs.pop("saml_assertion")
saml_assertion_decoded = b64decode(saml_assertion_encoded)
saml_assertion = xmltodict.parse(saml_assertion_decoded.decode("utf-8"))
kwargs["duration"] = int(
saml_assertion["samlp:Response"]["Assertion"]["AttributeStatement"][
"Attribute"
][2]["AttributeValue"]
)
kwargs["role_session_name"] = saml_assertion["samlp:Response"]["Assertion"][
"AttributeStatement"
]["Attribute"][0]["AttributeValue"]
kwargs["external_id"] = None
kwargs["policy"] = None
role = AssumedRole(**kwargs)
self.assumed_roles.append(role)
return role
sts_backend = STSBackend()

View file

@ -71,6 +71,19 @@ class TokenResponse(BaseResponse):
template = self.response_template(ASSUME_ROLE_WITH_WEB_IDENTITY_RESPONSE)
return template.render(role=role)
def assume_role_with_saml(self):
role_arn = self.querystring.get("RoleArn")[0]
principal_arn = self.querystring.get("PrincipalArn")[0]
saml_assertion = self.querystring.get("SAMLAssertion")[0]
role = sts_backend.assume_role_with_saml(
role_arn=role_arn,
principal_arn=principal_arn,
saml_assertion=saml_assertion,
)
template = self.response_template(ASSUME_ROLE_WITH_SAML_RESPONSE)
return template.render(role=role)
def get_caller_identity(self):
template = self.response_template(GET_CALLER_IDENTITY_RESPONSE)
@ -168,6 +181,30 @@ ASSUME_ROLE_WITH_WEB_IDENTITY_RESPONSE = """<AssumeRoleWithWebIdentityResponse x
</AssumeRoleWithWebIdentityResponse>"""
ASSUME_ROLE_WITH_SAML_RESPONSE = """<AssumeRoleWithSAMLResponse xmlns="https://sts.amazonaws.com/doc/2011-06-15/">
<AssumeRoleWithSAMLResult>
<Audience>https://signin.aws.amazon.com/saml</Audience>
<AssumedRoleUser>
<AssumedRoleId>{{ role.user_id }}</AssumedRoleId>
<Arn>{{ role.arn }}</Arn>
</AssumedRoleUser>
<Credentials>
<AccessKeyId>{{ role.access_key_id }}</AccessKeyId>
<SecretAccessKey>{{ role.secret_access_key }}</SecretAccessKey>
<SessionToken>{{ role.session_token }}</SessionToken>
<Expiration>{{ role.expiration_ISO8601 }}</Expiration>
</Credentials>
<Subject>{{ role.user_id }}</Subject>
<NameQualifier>B64EncodedStringOfHashOfIssuerAccountIdAndUserId=</NameQualifier>
<SubjectType>persistent</SubjectType>
<Issuer>http://localhost:3000/</Issuer>
</AssumeRoleWithSAMLResult>
<ResponseMetadata>
<RequestId>c6104cbe-af31-11e0-8154-cbc7ccf896c7</RequestId>
</ResponseMetadata>
</AssumeRoleWithSAMLResponse>"""
GET_CALLER_IDENTITY_RESPONSE = """<GetCallerIdentityResponse xmlns="https://sts.amazonaws.com/doc/2011-06-15/">
<GetCallerIdentityResult>
<Arn>{{ arn }}</Arn>