Added several new endpoints for accessing iam groups and users,

include unit tests.
This commit is contained in:
Rory-Finnegan 2014-08-19 16:30:11 -05:00 committed by Rory-Finnegan
commit ac74af4085
4 changed files with 469 additions and 3 deletions

View file

@ -1,7 +1,9 @@
from __future__ import unicode_literals
from moto.core import BaseBackend
from .utils import random_resource_id
from boto.exception import BotoServerError
from moto.core import BaseBackend
from .utils import random_access_key, random_alhpnumeric, random_resource_id
from datetime import datetime
class Role(object):
@ -65,12 +67,83 @@ class Certificate(object):
return self.name
class AccessKey(object):
def __init__(self, user_name):
self.user_name = user_name
self.access_key_id = random_access_key()
self.secret_access_key = random_alhpnumeric(32)
self.status = 'Active'
self.create_date = datetime.strftime(
datetime.utcnow(),
"%Y-%m-%d-%H-%M-%S"
)
class Group(object):
def __init__(self, name, path='/'):
self.name = name
self.id = random_resource_id()
self.path = path
self.created = datetime.strftime(
datetime.utcnow(),
"%Y-%m-%d-%H-%M-%S"
)
self.users = []
class User(object):
def __init__(self, name, path='/'):
self.name = name
self.id = random_resource_id()
self.path = path
self.created = datetime.strftime(
datetime.utcnow(),
"%Y-%m-%d-%H-%M-%S"
)
self.policies = {}
self.access_keys = []
def get_policy(self, policy_name):
policy_json = None
try:
policy_json = self.policies[policy_name]
except:
raise BotoServerError(404, 'Not Found')
return {
'policy_name': policy_name,
'policy_document': policy_json,
'user_name': self.name,
}
def put_policy(self, policy_name, policy_json):
self.policies[policy_name] = policy_json
def delete_policy(self, policy_name):
if policy_name not in self.policies:
raise BotoServerError(404, 'Not Found')
del self.policies[policy_name]
def create_access_key(self):
access_key = AccessKey(self.name)
self.access_keys.append(access_key)
return access_key
def get_all_access_keys(self):
return self.access_keys
class IAMBackend(BaseBackend):
def __init__(self):
self.instance_profiles = {}
self.roles = {}
self.certificates = {}
self.groups = {}
self.users = {}
super(IAMBackend, self).__init__()
def create_role(self, role_name, assume_role_policy_document, path, policies):
@ -125,4 +198,105 @@ class IAMBackend(BaseBackend):
if name == cert.cert_name:
return cert
def create_group(self, group_name, path='/'):
if group_name in self.groups:
raise BotoServerError(409, 'Conflict')
group = Group(group_name, path)
self.groups[group_name] = group
return group
def get_group(self, group_name, marker=None, max_items=None):
group = None
try:
group = self.groups[group_name]
except KeyError:
raise BotoServerError(404, 'Not Found')
return group
def create_user(self, user_name, path='/'):
if user_name in self.users:
raise BotoServerError(409, 'Conflict')
user = User(user_name, path)
self.users[user_name] = user
return user
def add_user_to_group(self, group_name, user_name):
group = None
user = None
try:
group = self.groups[group_name]
user = self.users[user_name]
except KeyError:
raise BotoServerError(404, 'Not Found')
group.users.append(user)
def remove_user_from_group(self, group_name, user_name):
group = None
user = None
try:
group = self.groups[group_name]
user = self.users[user_name]
group.users.remove(user)
except (KeyError, ValueError):
raise BotoServerError(404, 'Not Found')
def get_user_policy(self, user_name, policy_name):
policy = None
try:
user = self.users[user_name]
policy = user.get_policy(policy_name)
except KeyError:
raise BotoServerError(404, 'Not Found')
return policy
def put_user_policy(self, user_name, policy_name, policy_json):
try:
user = self.users[user_name]
user.put_policy(policy_name, policy_json)
except KeyError:
raise BotoServerError(404, 'Not Found')
def delete_user_policy(self, user_name, policy_name):
try:
user = self.users[user_name]
user.delete_policy(policy_name)
except KeyError:
raise BotoServerError(404, 'Not Found')
def create_access_key(self, user_name=None):
key = None
try:
user = self.users[user_name]
key = user.create_access_key()
except KeyError:
raise BotoServerError(404, 'Not Found')
print('username={}'.format(key.user_name))
return key
def get_all_access_keys(self, user_name, marker=None, max_items=None):
keys = None
try:
user = self.users[user_name]
keys = user.get_all_access_keys()
except KeyError:
raise BotoServerError(404, 'Not Found')
return keys
def delete_user(self, user_name):
try:
del self.users[user_name]
except KeyError:
raise BotoServerError(404, 'Not Found')
iam_backend = IAMBackend()