most of testing is done

This commit is contained in:
Alex Bainbridge 2020-06-29 18:20:57 -04:00
commit bdc1e93a4f
5 changed files with 595 additions and 53 deletions

View file

@ -14,6 +14,7 @@ import uuid
import itertools
import json
import yaml
import hashlib
from .utils import parameter_arn
from .exceptions import (
@ -116,8 +117,19 @@ def generate_ssm_doc_param_list(parameters):
return None
param_list = []
for param_name, param_info in parameters.items():
param_info["Name"] = param_name
param_list.append(param_info)
final_dict = {}
final_dict["Name"] = param_name
final_dict["Type"] = param_info["type"]
final_dict["Description"] = param_info["description"]
if param_info["type"] == "StringList" or param_info["type"] == "StringMap" or param_info["type"] == "MapList":
final_dict["DefaultValue"] = json.dumps(param_info["default"])
else:
final_dict["DefaultValue"] = str(param_info["default"])
param_list.append(final_dict)
return param_list
@ -137,7 +149,7 @@ class Document(BaseModel):
self.status = "Active"
self.document_version = document_version
self.owner = ACCOUNT_ID
self.created_date = datetime.datetime.now()
self.created_date = datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
if document_format == "JSON":
try:
@ -155,12 +167,12 @@ class Document(BaseModel):
self.content_json = content_json
try:
self.schema_version = content_json["schemaVersion"]
self.schema_version = str(content_json["schemaVersion"])
self.description = content_json.get("description")
self.outputs = content_json.get("outputs")
self.files = content_json.get("files")
# TODO add platformType
self.platform_types = "Not Implemented (moto)"
self.platform_types = ["Not Implemented (moto)"]
self.parameter_list = generate_ssm_doc_param_list(content_json.get("parameters"))
if self.schema_version == "0.3" or self.schema_version == "2.0" or self.schema_version == "2.2":
@ -430,9 +442,8 @@ class SimpleSystemManagerBackend(BaseBackend):
latest = self._documents[document.name]['latest_version']
default_version = self._documents[document.name]["default_version"]
return {
"Hash": hash,
base = {
"Hash": hashlib.sha256(document.content.encode('utf-8')).hexdigest(),
"HashType": "Sha256",
"Name": document.name,
"Owner": document.owner,
@ -442,11 +453,20 @@ class SimpleSystemManagerBackend(BaseBackend):
"Description": document.description,
"Parameters": document.parameter_list,
"PlatformTypes": document.platform_types,
"DocumentType": document.document_type,
"SchemaVersion": document.schema_version,
"LatestVersion": latest,
"DefaultVersion": default_version,
"DocumentFormat": document.document_format
}
if document.version_name:
base["VersionName"] = document.version_name
if document.target_type:
base["TargetType"] = document.target_type
if document.tags:
base["Tags"] = document.tags
return base
def _generate_document_information(self, ssm_document, document_format):
base = {
@ -502,12 +522,13 @@ class SimpleSystemManagerBackend(BaseBackend):
document_format=document_format, requires=requires, attachments=attachments,
target_type=target_type, tags=tags)
_validate_document_info(content=content, name=name, document_type=document_type)
_validate_document_info(content=content, name=name, document_type=document_type,
document_format=document_format)
if self._documents.get(ssm_document.Name):
raise DocumentAlreadyExists(f"Document with same name {name} already exists")
if self._documents.get(ssm_document.name):
raise DocumentAlreadyExists(f"The specified document already exists.")
self._documents[ssm_document.Name] = {
self._documents[ssm_document.name] = {
"documents": {
ssm_document.document_version: ssm_document
},
@ -522,21 +543,24 @@ class SimpleSystemManagerBackend(BaseBackend):
keys_to_delete = set()
if documents:
if documents[0].document_type == "ApplicationConfigurationSchema" and not force:
default_version = self._documents[name]["default_version"]
if documents[default_version].document_type == "ApplicationConfigurationSchema" and not force:
raise InvalidDocumentOperation("You attempted to delete a document while it is still shared. "
"You must stop sharing the document before you can delete it.")
if document_version and document_version == self._documents[name]["default_version"]:
if document_version and document_version == default_version:
raise InvalidDocumentOperation("Default version of the document can't be deleted.")
if document_version or version_name:
for doc_version, document in documents.items():
if document_version and doc_version == document_version:
keys_to_delete.add(document_version)
continue
if version_name and document.version_name == version_name:
keys_to_delete.add(document_version)
continue
# We delete only a specific version
delete_doc = self._find_document(name, document_version, version_name)
if delete_doc:
keys_to_delete.add(document_version)
else:
raise InvalidDocument("The specified document does not exist.")
else:
# We are deleting all versions
keys_to_delete = set(documents.keys())
for key in keys_to_delete:
@ -549,7 +573,7 @@ class SimpleSystemManagerBackend(BaseBackend):
def _find_document(self, name, document_version=None, version_name=None, strict=True):
if not self._documents.get(name):
raise InvalidDocument(f"Document with name {name} does not exist.")
raise InvalidDocument(f"The specified document does not exist.")
documents = self._documents[name]["documents"]
ssm_document = None
@ -575,37 +599,43 @@ class SimpleSystemManagerBackend(BaseBackend):
break
if strict and not ssm_document:
raise InvalidDocument(f"Document with name {name} does not exist.")
raise InvalidDocument(f"The specified document does not exist.")
return ssm_document
def get_document(self, name, document_version, version_name, document_format):
_validate_document_format(document_format=document_format)
ssm_document = self._find_document(name, document_version, version_name)
if not document_format:
document_format = ssm_document.document_format
else:
_validate_document_format(document_format=document_format)
return self._generate_document_information(ssm_document, document_format)
def update_document_default_version(self, name, document_version):
ssm_document = self._find_document(name, document_version=document_version)
self._documents[name]["default_version"] = document_version
base = {
'Name': ssm_document.name,
'DefaultVersion': document_version,
"Name": ssm_document.name,
"DefaultVersion": document_version,
}
if ssm_document.version_name:
base['DefaultVersionName'] = ssm_document.version_name
base["DefaultVersionName"] = ssm_document.version_name
return base
def update_document(self, content, attachments, name, version_name, document_version, document_format, target_type):
_validate_document_info(content=content, name=name, document_type=None, strict=False)
_validate_document_info(content=content, name=name, document_type=None, document_format=document_format,
strict=False)
if not self._documents.get(name):
raise InvalidDocument("The specified document does not exist.")
if self._documents.get[name]['latest_version'] != document_version or document_version != "$LATEST":
if self._documents[name]['latest_version'] != document_version and document_version != "$LATEST":
raise InvalidDocumentVersion("The document version is not valid or does not exist.")
if self._find_document(name, version_name=version_name, strict=False):
if version_name and self._find_document(name, version_name=version_name, strict=False):
raise DuplicateDocumentVersionName(f"The specified version name is a duplicate.")
old_ssm_document = self._find_document(name)
@ -614,13 +644,14 @@ class SimpleSystemManagerBackend(BaseBackend):
document_type=old_ssm_document.document_type, document_format=document_format,
requires=old_ssm_document.requires, attachments=attachments,
target_type=target_type, tags=old_ssm_document.tags,
document_version=self._documents.get[name]['latest_version'])
document_version=str(int(self._documents[name]['latest_version']) + 1))
for doc_version, document in self._documents[name].items():
for doc_version, document in self._documents[name]['documents'].items():
if document.content == new_ssm_document.content:
raise DuplicateDocumentContent("The content of the association document matches another document. "
"Change the content of the document and try again.")
self._documents[name]["latest_version"] = str(int(self._documents[name]["latest_version"]) + 1)
self._documents[name]["documents"][new_ssm_document.document_version] = new_ssm_document
return self._generate_document_description(new_ssm_document)
@ -629,16 +660,22 @@ class SimpleSystemManagerBackend(BaseBackend):
ssm_document = self._find_document(name, document_version, version_name)
return self._generate_document_description(ssm_document)
def list_documents(self, document_filter_list, filters, max_results=10, next_token=0):
def list_documents(self, document_filter_list, filters, max_results=10, next_token="0"):
if document_filter_list:
raise ValidationException(
"DocumentFilterList is deprecated. Instead use Filters."
)
next_token = int(next_token)
results = []
dummy_token_tracker = 0
# Sort to maintain next token adjacency
for document_name, document_bundle in sorted(self._documents.items()):
if len(results) == max_results:
# There's still more to go so we need a next token
return results, str(next_token + len(results))
if dummy_token_tracker < next_token:
dummy_token_tracker = dummy_token_tracker + 1
continue
@ -651,10 +688,8 @@ class SimpleSystemManagerBackend(BaseBackend):
else:
results.append(self._generate_document_list_information(ssm_doc))
if len(results) == max_results:
return results, next_token + max_results
return results
# If we've fallen out of the loop, theres no more documents. No next token.
return results, ""
def delete_parameter(self, name):
return self._parameters.pop(name, None)

View file

@ -24,7 +24,7 @@ class SimpleSystemManagerResponse(BaseResponse):
name = self._get_param("Name")
version_name = self._get_param("VersionName")
document_type = self._get_param("DocumentType")
document_format = self._get_param("DocumentFormat")
document_format = self._get_param("DocumentFormat", "JSON")
target_type = self._get_param("TargetType")
tags = self._get_param("Tags")
@ -32,9 +32,9 @@ class SimpleSystemManagerResponse(BaseResponse):
name=name, version_name=version_name, document_type=document_type,
document_format=document_format, target_type=target_type, tags=tags)
return {
return json.dumps({
'DocumentDescription': result
}
})
def delete_document(self):
name = self._get_param("Name")
@ -44,18 +44,18 @@ class SimpleSystemManagerResponse(BaseResponse):
self.ssm_backend.delete_document(name=name, document_version=document_version,
version_name=version_name, force=force)
return {}
return json.dumps({})
def get_document(self):
name = self._get_param("Name")
version_name = self._get_param("VersionName")
document_version = self._get_param("DocumentVersion")
document_format = self._get_param("DocumentFormat")
document_format = self._get_param("DocumentFormat", "JSON")
document = self.ssm_backend.get_document(name=name, document_version=document_version,
document_format=document_format, version_name=version_name)
return document
return json.dumps(document)
def describe_document(self):
name = self._get_param("Name")
@ -65,9 +65,9 @@ class SimpleSystemManagerResponse(BaseResponse):
result = self.ssm_backend.describe_document(name=name, document_version=document_version,
version_name=version_name)
return {
return json.dumps({
'Document': result
}
})
def update_document(self):
content = self._get_param("Content")
@ -75,39 +75,39 @@ class SimpleSystemManagerResponse(BaseResponse):
name = self._get_param("Name")
version_name = self._get_param("VersionName")
document_version = self._get_param("DocumentVersion")
document_format = self._get_param("DocumentFormat")
document_format = self._get_param("DocumentFormat", "JSON")
target_type = self._get_param("TargetType")
result = self.ssm_backend.update_document(content=content, attachments=attachments, name=name,
version_name=version_name, document_version=document_version,
document_format=document_format, target_type=target_type)
return {
return json.dumps({
'DocumentDescription': result
}
})
def update_document_default_version(self):
name = self._get_param("Name")
document_version = self._get_param("DocumentVersion")
result = self.ssm_backend.update_document_default_version(name=name, document_version=document_version)
return {
return json.dumps({
'Description': result
}
})
def list_documents(self):
document_filter_list = self._get_param("DocumentFilterList")
filters = self._get_param("Filters")
max_results = self._get_param("MaxResults", 10)
next_token = self._get_param("NextToken")
next_token = self._get_param("NextToken", "0")
documents, token = self.ssm_backend.list_documents(document_filter_list=document_filter_list, filters=filters,
max_results=max_results, next_token=next_token)
return {
return json.dumps({
"DocumentIdentifiers": documents,
"NextToken": token
}
})
def _get_param(self, param, default=None):
return self.request_params.get(param, default)