Organizations - implement Delegated Administrator functionality (#3200)

* Add organizations.register_delegated_administrator

* Add organizations.list_delegated_administrators

* Add organizations.list_delegated_services_for_account

* Add organizations.deregister_delegated_administrator

* Fix Python2 incompatibility
This commit is contained in:
Anton Grübel 2020-07-31 17:32:57 +02:00 committed by GitHub
commit 8162947ebb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 619 additions and 58 deletions

View file

@ -2,6 +2,54 @@ from __future__ import unicode_literals
from moto.core.exceptions import JsonRESTError
class AccountAlreadyRegisteredException(JsonRESTError):
code = 400
def __init__(self):
super(AccountAlreadyRegisteredException, self).__init__(
"AccountAlreadyRegisteredException",
"The provided account is already a delegated administrator for your organization.",
)
class AccountNotRegisteredException(JsonRESTError):
code = 400
def __init__(self):
super(AccountNotRegisteredException, self).__init__(
"AccountNotRegisteredException",
"The provided account is not a registered delegated administrator for your organization.",
)
class AccountNotFoundException(JsonRESTError):
code = 400
def __init__(self):
super(AccountNotFoundException, self).__init__(
"AccountNotFoundException", "You specified an account that doesn't exist."
)
class AWSOrganizationsNotInUseException(JsonRESTError):
code = 400
def __init__(self):
super(AWSOrganizationsNotInUseException, self).__init__(
"AWSOrganizationsNotInUseException",
"Your account is not a member of an organization.",
)
class ConstraintViolationException(JsonRESTError):
code = 400
def __init__(self, message):
super(ConstraintViolationException, self).__init__(
"ConstraintViolationException", message
)
class InvalidInputException(JsonRESTError):
code = 400

View file

@ -4,7 +4,7 @@ import datetime
import re
import json
from moto.core import BaseBackend, BaseModel
from moto.core import BaseBackend, BaseModel, ACCOUNT_ID
from moto.core.exceptions import RESTError
from moto.core.utils import unix_time
from moto.organizations import utils
@ -12,6 +12,11 @@ from moto.organizations.exceptions import (
InvalidInputException,
DuplicateOrganizationalUnitException,
DuplicatePolicyException,
AccountNotFoundException,
ConstraintViolationException,
AccountAlreadyRegisteredException,
AWSOrganizationsNotInUseException,
AccountNotRegisteredException,
)
@ -85,15 +90,13 @@ class FakeAccount(BaseModel):
def describe(self):
return {
"Account": {
"Id": self.id,
"Arn": self.arn,
"Email": self.email,
"Name": self.name,
"Status": self.status,
"JoinedMethod": self.joined_method,
"JoinedTimestamp": unix_time(self.create_time),
}
"Id": self.id,
"Arn": self.arn,
"Email": self.email,
"Name": self.name,
"Status": self.status,
"JoinedMethod": self.joined_method,
"JoinedTimestamp": unix_time(self.create_time),
}
@ -221,6 +224,56 @@ class FakeServiceAccess(BaseModel):
return service_principal in FakeServiceAccess.TRUSTED_SERVICES
class FakeDelegatedAdministrator(BaseModel):
# List of services, which support a different Account to ba a delegated administrator
# https://docs.aws.amazon.com/organizations/latest/userguide/orgs_integrated-services-list.html
SUPPORTED_SERVICES = [
"config-multiaccountsetup.amazonaws.com",
"guardduty.amazonaws.com",
"access-analyzer.amazonaws.com",
"macie.amazonaws.com",
"servicecatalog.amazonaws.com",
"ssm.amazonaws.com",
]
def __init__(self, account):
self.account = account
self.enabled_date = datetime.datetime.utcnow()
self.services = {}
def add_service_principal(self, service_principal):
if service_principal in self.services:
raise AccountAlreadyRegisteredException
if not self.supported_service(service_principal):
raise InvalidInputException(
"You specified an unrecognized service principal."
)
self.services[service_principal] = {
"ServicePrincipal": service_principal,
"DelegationEnabledDate": unix_time(datetime.datetime.utcnow()),
}
def remove_service_principal(self, service_principal):
if service_principal not in self.services:
raise InvalidInputException(
"You specified an unrecognized service principal."
)
self.services.pop(service_principal)
def describe(self):
admin = self.account.describe()
admin["DelegationEnabledDate"] = unix_time(self.enabled_date)
return admin
@staticmethod
def supported_service(service_principal):
return service_principal in FakeDelegatedAdministrator.SUPPORTED_SERVICES
class OrganizationsBackend(BaseBackend):
def __init__(self):
self.org = None
@ -228,6 +281,7 @@ class OrganizationsBackend(BaseBackend):
self.ou = []
self.policies = []
self.services = []
self.admins = []
def create_organization(self, **kwargs):
self.org = FakeOrganization(kwargs["FeatureSet"])
@ -259,10 +313,7 @@ class OrganizationsBackend(BaseBackend):
def describe_organization(self):
if not self.org:
raise RESTError(
"AWSOrganizationsNotInUseException",
"Your account is not a member of an organization.",
)
raise AWSOrganizationsNotInUseException
return self.org.describe()
def list_roots(self):
@ -325,10 +376,7 @@ class OrganizationsBackend(BaseBackend):
(account for account in self.accounts if account.id == account_id), None
)
if account is None:
raise RESTError(
"AccountNotFoundException",
"You specified an account that doesn't exist.",
)
raise AccountNotFoundException
return account
def get_account_by_attr(self, attr, value):
@ -341,15 +389,12 @@ class OrganizationsBackend(BaseBackend):
None,
)
if account is None:
raise RESTError(
"AccountNotFoundException",
"You specified an account that doesn't exist.",
)
raise AccountNotFoundException
return account
def describe_account(self, **kwargs):
account = self.get_account_by_id(kwargs["AccountId"])
return account.describe()
return dict(Account=account.describe())
def describe_create_account_status(self, **kwargs):
account = self.get_account_by_attr(
@ -358,15 +403,13 @@ class OrganizationsBackend(BaseBackend):
return account.create_account_status
def list_accounts(self):
return dict(
Accounts=[account.describe()["Account"] for account in self.accounts]
)
return dict(Accounts=[account.describe() for account in self.accounts])
def list_accounts_for_parent(self, **kwargs):
parent_id = self.validate_parent_id(kwargs["ParentId"])
return dict(
Accounts=[
account.describe()["Account"]
account.describe()
for account in self.accounts
if account.parent_id == parent_id
]
@ -399,7 +442,7 @@ class OrganizationsBackend(BaseBackend):
elif kwargs["ChildType"] == "ORGANIZATIONAL_UNIT":
obj_list = self.ou
else:
raise RESTError("InvalidInputException", "You specified an invalid value.")
raise InvalidInputException("You specified an invalid value.")
return dict(
Children=[
{"Id": obj.id, "Type": kwargs["ChildType"]}
@ -427,7 +470,7 @@ class OrganizationsBackend(BaseBackend):
"You specified a policy that doesn't exist.",
)
else:
raise RESTError("InvalidInputException", "You specified an invalid value.")
raise InvalidInputException("You specified an invalid value.")
return policy.describe()
def get_policy_by_id(self, policy_id):
@ -472,12 +515,9 @@ class OrganizationsBackend(BaseBackend):
account.attached_policies.append(policy)
policy.attachments.append(account)
else:
raise RESTError(
"AccountNotFoundException",
"You specified an account that doesn't exist.",
)
raise AccountNotFoundException
else:
raise RESTError("InvalidInputException", "You specified an invalid value.")
raise InvalidInputException("You specified an invalid value.")
def list_policies(self, **kwargs):
return dict(
@ -510,12 +550,9 @@ class OrganizationsBackend(BaseBackend):
elif re.compile(utils.ACCOUNT_ID_REGEX).match(kwargs["TargetId"]):
obj = next((a for a in self.accounts if a.id == kwargs["TargetId"]), None)
if obj is None:
raise RESTError(
"AccountNotFoundException",
"You specified an account that doesn't exist.",
)
raise AccountNotFoundException
else:
raise RESTError("InvalidInputException", "You specified an invalid value.")
raise InvalidInputException("You specified an invalid value.")
return dict(
Policies=[
p.describe()["Policy"]["PolicySummary"] for p in obj.attached_policies
@ -533,7 +570,7 @@ class OrganizationsBackend(BaseBackend):
"You specified a policy that doesn't exist.",
)
else:
raise RESTError("InvalidInputException", "You specified an invalid value.")
raise InvalidInputException("You specified an invalid value.")
objects = [
{"TargetId": obj.id, "Arn": obj.arn, "Name": obj.name, "Type": obj.type}
for obj in policy.attachments
@ -606,5 +643,95 @@ class OrganizationsBackend(BaseBackend):
if service_principal:
self.services.remove(service_principal)
def register_delegated_administrator(self, **kwargs):
account_id = kwargs["AccountId"]
if account_id == ACCOUNT_ID:
raise ConstraintViolationException(
"You cannot register master account/yourself as delegated administrator for your organization."
)
account = self.get_account_by_id(account_id)
admin = next(
(admin for admin in self.admins if admin.account.id == account_id), None
)
if admin is None:
admin = FakeDelegatedAdministrator(account)
self.admins.append(admin)
admin.add_service_principal(kwargs["ServicePrincipal"])
def list_delegated_administrators(self, **kwargs):
admins = self.admins
service = kwargs.get("ServicePrincipal")
if service:
if not FakeDelegatedAdministrator.supported_service(service):
raise InvalidInputException(
"You specified an unrecognized service principal."
)
admins = [admin for admin in admins if service in admin.services]
delegated_admins = [admin.describe() for admin in admins]
return dict(DelegatedAdministrators=delegated_admins)
def list_delegated_services_for_account(self, **kwargs):
admin = next(
(admin for admin in self.admins if admin.account.id == kwargs["AccountId"]),
None,
)
if admin is None:
account = next(
(
account
for account in self.accounts
if account.id == kwargs["AccountId"]
),
None,
)
if account:
raise AccountNotRegisteredException
raise AWSOrganizationsNotInUseException
services = [service for service in admin.services.values()]
return dict(DelegatedServices=services)
def deregister_delegated_administrator(self, **kwargs):
account_id = kwargs["AccountId"]
service = kwargs["ServicePrincipal"]
if account_id == ACCOUNT_ID:
raise ConstraintViolationException(
"You cannot register master account/yourself as delegated administrator for your organization."
)
admin = next(
(admin for admin in self.admins if admin.account.id == account_id), None,
)
if admin is None:
account = next(
(
account
for account in self.accounts
if account.id == kwargs["AccountId"]
),
None,
)
if account:
raise AccountNotRegisteredException
raise AccountNotFoundException
admin.remove_service_principal(service)
# remove account, when no services attached
if not admin.services:
self.admins.remove(admin)
organizations_backend = OrganizationsBackend()

View file

@ -163,3 +163,31 @@ class OrganizationsResponse(BaseResponse):
return json.dumps(
self.organizations_backend.disable_aws_service_access(**self.request_params)
)
def register_delegated_administrator(self):
return json.dumps(
self.organizations_backend.register_delegated_administrator(
**self.request_params
)
)
def list_delegated_administrators(self):
return json.dumps(
self.organizations_backend.list_delegated_administrators(
**self.request_params
)
)
def list_delegated_services_for_account(self):
return json.dumps(
self.organizations_backend.list_delegated_services_for_account(
**self.request_params
)
)
def deregister_delegated_administrator(self):
return json.dumps(
self.organizations_backend.deregister_delegated_administrator(
**self.request_params
)
)