Merge pull request #1687 from sthuber90/master

Fix ECR and bug fixes
This commit is contained in:
Steve Pulec 2018-09-22 16:40:33 -04:00 committed by GitHub
commit 693c677b87
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 378 additions and 50 deletions

View file

@ -4,12 +4,12 @@ import hashlib
from copy import copy
from random import random
from botocore.exceptions import ParamValidationError
from moto.core import BaseBackend, BaseModel
from moto.ec2 import ec2_backends
from moto.ecr.exceptions import ImageNotFoundException, RepositoryNotFoundException
from botocore.exceptions import ParamValidationError
DEFAULT_REGISTRY_ID = '012345678910'
@ -97,13 +97,14 @@ class Repository(BaseObject):
class Image(BaseObject):
def __init__(self, tag, manifest, repository, registry_id=DEFAULT_REGISTRY_ID):
def __init__(self, tag, manifest, repository, digest=None, registry_id=DEFAULT_REGISTRY_ID):
self.image_tag = tag
self.image_tags = [tag] if tag is not None else []
self.image_manifest = manifest
self.image_size_in_bytes = 50 * 1024 * 1024
self.repository = repository
self.registry_id = registry_id
self.image_digest = None
self.image_digest = digest
self.image_pushed_at = None
def _create_digest(self):
@ -115,6 +116,14 @@ class Image(BaseObject):
self._create_digest()
return self.image_digest
def get_image_manifest(self):
return self.image_manifest
def update_tag(self, tag):
self.image_tag = tag
if tag not in self.image_tags and tag is not None:
self.image_tags.append(tag)
@property
def response_object(self):
response_object = self.gen_response_object()
@ -124,26 +133,26 @@ class Image(BaseObject):
response_object['imageManifest'] = self.image_manifest
response_object['repositoryName'] = self.repository
response_object['registryId'] = self.registry_id
return response_object
return {k: v for k, v in response_object.items() if v is not None and v != [None]}
@property
def response_list_object(self):
response_object = self.gen_response_object()
response_object['imageTag'] = self.image_tag
response_object['imageDigest'] = "i don't know"
return response_object
return {k: v for k, v in response_object.items() if v is not None and v != [None]}
@property
def response_describe_object(self):
response_object = self.gen_response_object()
response_object['imageTags'] = [self.image_tag]
response_object['imageTags'] = self.image_tags
response_object['imageDigest'] = self.get_image_digest()
response_object['imageManifest'] = self.image_manifest
response_object['repositoryName'] = self.repository
response_object['registryId'] = self.registry_id
response_object['imageSizeInBytes'] = self.image_size_in_bytes
response_object['imagePushedAt'] = '2017-05-09'
return response_object
return {k: v for k, v in response_object.items() if v is not None and v != []}
@property
def response_batch_get_image(self):
@ -154,7 +163,7 @@ class Image(BaseObject):
response_object['imageManifest'] = self.image_manifest
response_object['repositoryName'] = self.repository
response_object['registryId'] = self.registry_id
return response_object
return {k: v for k, v in response_object.items() if v is not None and v != [None]}
class ECRBackend(BaseBackend):
@ -231,7 +240,7 @@ class ECRBackend(BaseBackend):
found = False
for image in repository.images:
if (('imageDigest' in image_id and image.get_image_digest() == image_id['imageDigest']) or
('imageTag' in image_id and image.image_tag == image_id['imageTag'])):
('imageTag' in image_id and image_id['imageTag'] in image.image_tags)):
found = True
response.add(image)
if not found:
@ -257,9 +266,16 @@ class ECRBackend(BaseBackend):
else:
raise Exception("{0} is not a repository".format(repository_name))
image = Image(image_tag, image_manifest, repository_name)
repository.images.append(image)
return image
existing_images = list(filter(lambda x: x.response_object['imageManifest'] == image_manifest, repository.images))
if not existing_images:
# this image is not in ECR yet
image = Image(image_tag, image_manifest, repository_name)
repository.images.append(image)
return image
else:
# update existing image
existing_images[0].update_tag(image_tag)
return existing_images[0]
def batch_get_image(self, repository_name, registry_id=None, image_ids=None, accepted_media_types=None):
if repository_name in self.repositories:

View file

@ -1,14 +1,17 @@
from __future__ import unicode_literals
import time
import boto3
import string
import random
import hashlib
import uuid
import random
import re
from datetime import datetime
from moto.core import BaseBackend, BaseModel
import string
import time
import uuid
from collections import OrderedDict
from datetime import datetime
import boto3
from moto.core import BaseBackend, BaseModel
from .exceptions import (
ResourceNotFoundException,
InvalidRequestException,
@ -271,15 +274,37 @@ class IoTBackend(BaseBackend):
def list_thing_types(self, thing_type_name=None):
if thing_type_name:
# It's wierd but thing_type_name is filterd by forward match, not complete match
# It's weird but thing_type_name is filtered by forward match, not complete match
return [_ for _ in self.thing_types.values() if _.thing_type_name.startswith(thing_type_name)]
thing_types = self.thing_types.values()
return thing_types
return self.thing_types.values()
def list_things(self, attribute_name, attribute_value, thing_type_name):
# TODO: filter by attributess or thing_type
things = self.things.values()
return things
def list_things(self, attribute_name, attribute_value, thing_type_name, max_results, token):
all_things = [_.to_dict() for _ in self.things.values()]
if attribute_name is not None and thing_type_name is not None:
filtered_things = list(filter(lambda elem:
attribute_name in elem["attributes"] and
elem["attributes"][attribute_name] == attribute_value and
"thingTypeName" in elem and
elem["thingTypeName"] == thing_type_name, all_things))
elif attribute_name is not None and thing_type_name is None:
filtered_things = list(filter(lambda elem:
attribute_name in elem["attributes"] and
elem["attributes"][attribute_name] == attribute_value, all_things))
elif attribute_name is None and thing_type_name is not None:
filtered_things = list(
filter(lambda elem: "thingTypeName" in elem and elem["thingTypeName"] == thing_type_name, all_things))
else:
filtered_things = all_things
if token is None:
things = filtered_things[0:max_results]
next_token = str(max_results) if len(filtered_things) > max_results else None
else:
token = int(token)
things = filtered_things[token:token + max_results]
next_token = str(token + max_results) if len(filtered_things) > token + max_results else None
return things, next_token
def describe_thing(self, thing_name):
things = [_ for _ in self.things.values() if _.thing_name == thing_name]

View file

@ -1,7 +1,9 @@
from __future__ import unicode_literals
import json
from moto.core.responses import BaseResponse
from .models import iot_backends
import json
class IoTResponse(BaseResponse):
@ -32,30 +34,39 @@ class IoTResponse(BaseResponse):
return json.dumps(dict(thingTypeName=thing_type_name, thingTypeArn=thing_type_arn))
def list_thing_types(self):
# previous_next_token = self._get_param("nextToken")
# max_results = self._get_int_param("maxResults")
previous_next_token = self._get_param("nextToken")
max_results = self._get_int_param("maxResults", 50) # not the default, but makes testing easier
thing_type_name = self._get_param("thingTypeName")
thing_types = self.iot_backend.list_thing_types(
thing_type_name=thing_type_name
)
# TODO: implement pagination in the future
next_token = None
return json.dumps(dict(thingTypes=[_.to_dict() for _ in thing_types], nextToken=next_token))
thing_types = [_.to_dict() for _ in thing_types]
if previous_next_token is None:
result = thing_types[0:max_results]
next_token = str(max_results) if len(thing_types) > max_results else None
else:
token = int(previous_next_token)
result = thing_types[token:token + max_results]
next_token = str(token + max_results) if len(thing_types) > token + max_results else None
return json.dumps(dict(thingTypes=result, nextToken=next_token))
def list_things(self):
# previous_next_token = self._get_param("nextToken")
# max_results = self._get_int_param("maxResults")
previous_next_token = self._get_param("nextToken")
max_results = self._get_int_param("maxResults", 50) # not the default, but makes testing easier
attribute_name = self._get_param("attributeName")
attribute_value = self._get_param("attributeValue")
thing_type_name = self._get_param("thingTypeName")
things = self.iot_backend.list_things(
things, next_token = self.iot_backend.list_things(
attribute_name=attribute_name,
attribute_value=attribute_value,
thing_type_name=thing_type_name,
max_results=max_results,
token=previous_next_token
)
# TODO: implement pagination in the future
next_token = None
return json.dumps(dict(things=[_.to_dict() for _ in things], nextToken=next_token))
return json.dumps(dict(things=things, nextToken=next_token))
def describe_thing(self):
thing_name = self._get_param("thingName")