Implement support for SSM parameter store

This commit adds initial support for the Simple System Manager client.

It currently only mocks the following api endpoints:
 - delete_parameter()
 - put_parameter()
 - get_parameters()
This commit is contained in:
Michael van Tellingen 2017-04-18 19:09:10 +02:00
commit 783a1d73b4
8 changed files with 255 additions and 0 deletions

6
moto/ssm/__init__.py Normal file
View file

@ -0,0 +1,6 @@
from __future__ import unicode_literals
from .models import ssm_backends
from ..core.models import base_decorator
ssm_backend = ssm_backends['us-east-1']
mock_ssm = base_decorator(ssm_backends)

65
moto/ssm/models.py Normal file
View file

@ -0,0 +1,65 @@
from __future__ import unicode_literals
from moto.core import BaseBackend, BaseModel
from moto.ec2 import ec2_backends
class Parameter(BaseModel):
def __init__(self, name, value, type, description, keyid):
self.name = name
self.type = type
self.description = description
self.keyid = keyid
if self.type == 'SecureString':
self.value = self.encrypt(value)
else:
self.value = value
def encrypt(self, value):
return 'kms:{}:'.format(self.keyid or 'default') + value
def decrypt(self, value):
if self.type != 'SecureString':
return value
prefix = 'kms:{}:'.format(self.keyid or 'default')
if value.startswith(prefix):
return value[len(prefix):]
def response_object(self, decrypt=False):
return {
'Name': self.name,
'Type': self.type,
'Value': self.decrypt(self.value) if decrypt else self.value
}
class SimpleSystemManagerBackend(BaseBackend):
def __init__(self):
self._parameters = {}
def delete_parameter(self, name):
try:
del self._parameters[name]
except KeyError:
pass
def get_parameters(self, names, with_decryption):
result = []
for name in names:
if name in self._parameters:
result.append(self._parameters[name])
return result
def put_parameter(self, name, description, value, type, keyid, overwrite):
if not overwrite and name in self._parameters:
return
self._parameters[name] = Parameter(
name, value, type, description, keyid)
ssm_backends = {}
for region, ec2_backend in ec2_backends.items():
ssm_backends[region] = SimpleSystemManagerBackend()

56
moto/ssm/responses.py Normal file
View file

@ -0,0 +1,56 @@
from __future__ import unicode_literals
import json
from moto.core.responses import BaseResponse
from .models import ssm_backends
class SimpleSystemManagerResponse(BaseResponse):
@property
def ssm_backend(self):
return ssm_backends[self.region]
@property
def request_params(self):
try:
return json.loads(self.body)
except ValueError:
return {}
def _get_param(self, param, default=None):
return self.request_params.get(param, default)
def delete_parameter(self):
name = self._get_param('Name')
self.ssm_backend.delete_parameter(name)
return json.dumps({})
def get_parameters(self):
names = self._get_param('Names')
with_decryption = self._get_param('WithDecryption')
result = self.ssm_backend.get_parameters(names, with_decryption)
response = {
'Parameters': [],
'InvalidParameters': [],
}
for parameter in result:
param_data = parameter.response_object(with_decryption)
response['Parameters'].append(param_data)
return json.dumps(response)
def put_parameter(self):
name = self._get_param('Name')
description = self._get_param('Description')
value = self._get_param('Value')
type_ = self._get_param('Type')
keyid = self._get_param('KeyId')
overwrite = self._get_param('Overwrite', False)
self.ssm_backend.put_parameter(
name, description, value, type_, keyid, overwrite)
return json.dumps({})

10
moto/ssm/urls.py Normal file
View file

@ -0,0 +1,10 @@
from __future__ import unicode_literals
from .responses import SimpleSystemManagerResponse
url_bases = [
"https?://ssm.(.+).amazonaws.com",
]
url_paths = {
'{0}/$': SimpleSystemManagerResponse.dispatch,
}