Add mfa device endpoints to iam backend.
- Add mfa device class - Add mfa devices dictionary to user class - Add responses, endpoints and tests
This commit is contained in:
parent
bba197e29f
commit
8b9d685f1c
3 changed files with 129 additions and 0 deletions
|
|
@ -11,6 +11,19 @@ from .utils import random_access_key, random_alphanumeric, random_resource_id, r
|
|||
ACCOUNT_ID = 123456789012
|
||||
|
||||
|
||||
class MFADevice(object):
|
||||
"""MFA Device class."""
|
||||
|
||||
def __init__(self,
|
||||
serial_number,
|
||||
authentication_code_1,
|
||||
authentication_code_2):
|
||||
self.enable_date = datetime.now(pytz.utc)
|
||||
self.serial_number = serial_number
|
||||
self.authentication_code_1 = authentication_code_1
|
||||
self.authentication_code_2 = authentication_code_2
|
||||
|
||||
|
||||
class Policy(BaseModel):
|
||||
|
||||
is_attachable = False
|
||||
|
|
@ -226,6 +239,7 @@ class User(BaseModel):
|
|||
datetime.utcnow(),
|
||||
"%Y-%m-%d-%H-%M-%S"
|
||||
)
|
||||
self.mfa_devices = {}
|
||||
self.policies = {}
|
||||
self.access_keys = []
|
||||
self.password = None
|
||||
|
|
@ -251,6 +265,9 @@ class User(BaseModel):
|
|||
def put_policy(self, policy_name, policy_json):
|
||||
self.policies[policy_name] = policy_json
|
||||
|
||||
def deactivate_mfa_device(self, serial_number):
|
||||
self.mfa_devices.pop(serial_number)
|
||||
|
||||
def delete_policy(self, policy_name):
|
||||
if policy_name not in self.policies:
|
||||
raise IAMNotFoundException(
|
||||
|
|
@ -263,6 +280,16 @@ class User(BaseModel):
|
|||
self.access_keys.append(access_key)
|
||||
return access_key
|
||||
|
||||
def enable_mfa_device(self,
|
||||
serial_number,
|
||||
authentication_code_1,
|
||||
authentication_code_2):
|
||||
self.mfa_devices[serial_number] = MFADevice(
|
||||
serial_number,
|
||||
authentication_code_1,
|
||||
authentication_code_2
|
||||
)
|
||||
|
||||
def get_all_access_keys(self):
|
||||
return self.access_keys
|
||||
|
||||
|
|
@ -724,6 +751,39 @@ class IAMBackend(BaseBackend):
|
|||
user = self.get_user(user_name)
|
||||
user.delete_access_key(access_key_id)
|
||||
|
||||
def enable_mfa_device(self,
|
||||
user_name,
|
||||
serial_number,
|
||||
authentication_code_1,
|
||||
authentication_code_2):
|
||||
"""Enable MFA Device for user."""
|
||||
user = self.get_user(user_name)
|
||||
if serial_number in user.mfa_devices:
|
||||
raise IAMConflictException(
|
||||
"EntityAlreadyExists",
|
||||
"Device {0} already exists".format(serial_number)
|
||||
)
|
||||
|
||||
user.enable_mfa_device(
|
||||
serial_number,
|
||||
authentication_code_1,
|
||||
authentication_code_2
|
||||
)
|
||||
|
||||
def deactivate_mfa_device(self, user_name, serial_number):
|
||||
"""Deactivate and detach MFA Device from user if device exists."""
|
||||
user = self.get_user(user_name)
|
||||
if serial_number not in user.mfa_devices:
|
||||
raise IAMNotFoundException(
|
||||
"Device {0} not found".format(serial_number)
|
||||
)
|
||||
|
||||
user.deactivate_mfa_device(serial_number)
|
||||
|
||||
def list_mfa_devices(self, user_name):
|
||||
user = self.get_user(user_name)
|
||||
return user.mfa_devices.values()
|
||||
|
||||
def delete_user(self, user_name):
|
||||
try:
|
||||
del self.users[user_name]
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue