Merge branch 'master' into Bug-Fix-Secondary-Indexes-Ignored
This commit is contained in:
commit
5c7f01ab29
20 changed files with 1723 additions and 52 deletions
|
|
@ -124,11 +124,22 @@ class Item(object):
|
|||
def update_with_attribute_updates(self, attribute_updates):
|
||||
for attribute_name, update_action in attribute_updates.items():
|
||||
action = update_action['Action']
|
||||
if action == 'DELETE' and not 'Value' in update_action:
|
||||
if attribute_name in self.attrs:
|
||||
del self.attrs[attribute_name]
|
||||
continue
|
||||
new_value = list(update_action['Value'].values())[0]
|
||||
if action == 'PUT':
|
||||
# TODO deal with other types
|
||||
if isinstance(new_value, list) or isinstance(new_value, set):
|
||||
self.attrs[attribute_name] = DynamoType({"SS": new_value})
|
||||
elif isinstance(new_value, dict):
|
||||
self.attrs[attribute_name] = DynamoType({"M": new_value})
|
||||
elif update_action['Value'].keys() == ['N']:
|
||||
self.attrs[attribute_name] = DynamoType({"N": new_value})
|
||||
elif update_action['Value'].keys() == ['NULL']:
|
||||
if attribute_name in self.attrs:
|
||||
del self.attrs[attribute_name]
|
||||
else:
|
||||
self.attrs[attribute_name] = DynamoType({"S": new_value})
|
||||
|
||||
|
|
@ -278,20 +289,63 @@ class Table(object):
|
|||
except KeyError:
|
||||
return None
|
||||
|
||||
def query(self, hash_key, range_comparison, range_objs):
|
||||
def query(self, hash_key, range_comparison, range_objs, index_name=None):
|
||||
results = []
|
||||
last_page = True # Once pagination is implemented, change this
|
||||
|
||||
possible_results = [item for item in list(self.all_items()) if isinstance(item, Item) and item.hash_key == hash_key]
|
||||
if index_name:
|
||||
all_indexes = (self.global_indexes or []) + (self.indexes or [])
|
||||
indexes_by_name = dict((i['IndexName'], i) for i in all_indexes)
|
||||
if index_name not in indexes_by_name:
|
||||
raise ValueError('Invalid index: %s for table: %s. Available indexes are: %s' % (
|
||||
index_name, self.name, ', '.join(indexes_by_name.keys())
|
||||
))
|
||||
|
||||
index = indexes_by_name[index_name]
|
||||
try:
|
||||
index_hash_key = [key for key in index['KeySchema'] if key['KeyType'] == 'HASH'][0]
|
||||
except IndexError:
|
||||
raise ValueError('Missing Hash Key. KeySchema: %s' % index['KeySchema'])
|
||||
|
||||
possible_results = []
|
||||
for item in self.all_items():
|
||||
if not isinstance(item, Item):
|
||||
continue
|
||||
item_hash_key = item.attrs.get(index_hash_key['AttributeName'])
|
||||
if item_hash_key and item_hash_key == hash_key:
|
||||
possible_results.append(item)
|
||||
else:
|
||||
possible_results = [item for item in list(self.all_items()) if isinstance(item, Item) and item.hash_key == hash_key]
|
||||
|
||||
if index_name:
|
||||
try:
|
||||
index_range_key = [key for key in index['KeySchema'] if key['KeyType'] == 'RANGE'][0]
|
||||
except IndexError:
|
||||
index_range_key = None
|
||||
|
||||
if range_comparison:
|
||||
for result in possible_results:
|
||||
if result.range_key.compare(range_comparison, range_objs):
|
||||
results.append(result)
|
||||
if index_name and not index_range_key:
|
||||
raise ValueError('Range Key comparison but no range key found for index: %s' % index_name)
|
||||
|
||||
elif index_name:
|
||||
for result in possible_results:
|
||||
if result.attrs.get(index_range_key['AttributeName']).compare(range_comparison, range_objs):
|
||||
results.append(result)
|
||||
else:
|
||||
for result in possible_results:
|
||||
if result.range_key.compare(range_comparison, range_objs):
|
||||
results.append(result)
|
||||
else:
|
||||
# If we're not filtering on range key, return all values
|
||||
results = possible_results
|
||||
|
||||
results.sort(key=lambda item: item.range_key)
|
||||
if index_name:
|
||||
|
||||
if index_range_key:
|
||||
results.sort(key=lambda item: item.attrs[index_range_key['AttributeName']].value
|
||||
if item.attrs.get(index_range_key['AttributeName']) else None)
|
||||
else:
|
||||
results.sort(key=lambda item: item.range_key)
|
||||
return results, last_page
|
||||
|
||||
def all_items(self):
|
||||
|
|
@ -361,6 +415,38 @@ class DynamoDBBackend(BaseBackend):
|
|||
table.throughput = throughput
|
||||
return table
|
||||
|
||||
def update_table_global_indexes(self, name, global_index_updates):
|
||||
table = self.tables[name]
|
||||
gsis_by_name = dict((i['IndexName'], i) for i in table.global_indexes)
|
||||
for gsi_update in global_index_updates:
|
||||
gsi_to_create = gsi_update.get('Create')
|
||||
gsi_to_update = gsi_update.get('Update')
|
||||
gsi_to_delete = gsi_update.get('Delete')
|
||||
|
||||
if gsi_to_delete:
|
||||
index_name = gsi_to_delete['IndexName']
|
||||
if index_name not in gsis_by_name:
|
||||
raise ValueError('Global Secondary Index does not exist, but tried to delete: %s' %
|
||||
gsi_to_delete['IndexName'])
|
||||
|
||||
del gsis_by_name[index_name]
|
||||
|
||||
if gsi_to_update:
|
||||
index_name = gsi_to_update['IndexName']
|
||||
if index_name not in gsis_by_name:
|
||||
raise ValueError('Global Secondary Index does not exist, but tried to update: %s' %
|
||||
gsi_to_update['IndexName'])
|
||||
gsis_by_name[index_name].update(gsi_to_update)
|
||||
|
||||
if gsi_to_create:
|
||||
if gsi_to_create['IndexName'] in gsis_by_name:
|
||||
raise ValueError('Global Secondary Index already exists: %s' % gsi_to_create['IndexName'])
|
||||
|
||||
gsis_by_name[gsi_to_create['IndexName']] = gsi_to_create
|
||||
|
||||
table.global_indexes = gsis_by_name.values()
|
||||
return table
|
||||
|
||||
def put_item(self, table_name, item_attrs, expected=None, overwrite=False):
|
||||
table = self.tables.get(table_name)
|
||||
if not table:
|
||||
|
|
@ -400,7 +486,7 @@ class DynamoDBBackend(BaseBackend):
|
|||
hash_key, range_key = self.get_keys_value(table, keys)
|
||||
return table.get_item(hash_key, range_key)
|
||||
|
||||
def query(self, table_name, hash_key_dict, range_comparison, range_value_dicts):
|
||||
def query(self, table_name, hash_key_dict, range_comparison, range_value_dicts, index_name=None):
|
||||
table = self.tables.get(table_name)
|
||||
if not table:
|
||||
return None, None
|
||||
|
|
@ -408,7 +494,7 @@ class DynamoDBBackend(BaseBackend):
|
|||
hash_key = DynamoType(hash_key_dict)
|
||||
range_values = [DynamoType(range_value) for range_value in range_value_dicts]
|
||||
|
||||
return table.query(hash_key, range_comparison, range_values)
|
||||
return table.query(hash_key, range_comparison, range_values, index_name)
|
||||
|
||||
def scan(self, table_name, filters):
|
||||
table = self.tables.get(table_name)
|
||||
|
|
@ -425,12 +511,20 @@ class DynamoDBBackend(BaseBackend):
|
|||
def update_item(self, table_name, key, update_expression, attribute_updates):
|
||||
table = self.get_table(table_name)
|
||||
|
||||
if table.hash_key_attr in key:
|
||||
# Sometimes the key is wrapped in a dict with the key name
|
||||
key = key[table.hash_key_attr]
|
||||
if all([table.hash_key_attr in key, table.range_key_attr in key]):
|
||||
# Covers cases where table has hash and range keys, ``key`` param will be a dict
|
||||
hash_value = DynamoType(key[table.hash_key_attr])
|
||||
range_value = DynamoType(key[table.range_key_attr])
|
||||
elif table.hash_key_attr in key:
|
||||
# Covers tables that have a range key where ``key`` param is a dict
|
||||
hash_value = DynamoType(key[table.hash_key_attr])
|
||||
range_value = None
|
||||
else:
|
||||
# Covers other cases
|
||||
hash_value = DynamoType(key)
|
||||
range_value = None
|
||||
|
||||
hash_value = DynamoType(key)
|
||||
item = table.get_item(hash_value)
|
||||
item = table.get_item(hash_value, range_value)
|
||||
if update_expression:
|
||||
item.update(update_expression)
|
||||
else:
|
||||
|
|
|
|||
|
|
@ -125,8 +125,11 @@ class DynamoHandler(BaseResponse):
|
|||
|
||||
def update_table(self):
|
||||
name = self.body['TableName']
|
||||
throughput = self.body["ProvisionedThroughput"]
|
||||
table = dynamodb_backend2.update_table_throughput(name, throughput)
|
||||
if 'GlobalSecondaryIndexUpdates' in self.body:
|
||||
table = dynamodb_backend2.update_table_global_indexes(name, self.body['GlobalSecondaryIndexUpdates'])
|
||||
if 'ProvisionedThroughput' in self.body:
|
||||
throughput = self.body["ProvisionedThroughput"]
|
||||
table = dynamodb_backend2.update_table_throughput(name, throughput)
|
||||
return dynamo_json_dump(table.describe)
|
||||
|
||||
def describe_table(self):
|
||||
|
|
@ -241,11 +244,31 @@ class DynamoHandler(BaseResponse):
|
|||
if key_condition_expression:
|
||||
value_alias_map = self.body['ExpressionAttributeValues']
|
||||
|
||||
table = dynamodb_backend2.get_table(name)
|
||||
index_name = self.body.get('IndexName')
|
||||
if index_name:
|
||||
all_indexes = (table.global_indexes or []) + (table.indexes or [])
|
||||
indexes_by_name = dict((i['IndexName'], i) for i in all_indexes)
|
||||
if index_name not in indexes_by_name:
|
||||
raise ValueError('Invalid index: %s for table: %s. Available indexes are: %s' % (
|
||||
index_name, name, ', '.join(indexes_by_name.keys())
|
||||
))
|
||||
|
||||
index = indexes_by_name[index_name]['KeySchema']
|
||||
else:
|
||||
index = table.schema
|
||||
|
||||
key_map = [column for _, column in sorted((k, v) for k, v in self.body['ExpressionAttributeNames'].items())]
|
||||
|
||||
if " AND " in key_condition_expression:
|
||||
expressions = key_condition_expression.split(" AND ", 1)
|
||||
hash_key_expression = expressions[0]
|
||||
|
||||
index_hash_key = [key for key in index if key['KeyType'] == 'HASH'][0]
|
||||
hash_key_index_in_key_map = key_map.index(index_hash_key['AttributeName'])
|
||||
|
||||
hash_key_expression = expressions.pop(hash_key_index_in_key_map).strip('()')
|
||||
# TODO implement more than one range expression and OR operators
|
||||
range_key_expression = expressions[1].replace(")", "")
|
||||
range_key_expression = expressions[0].strip('()')
|
||||
range_key_expression_components = range_key_expression.split()
|
||||
range_comparison = range_key_expression_components[1]
|
||||
if 'AND' in range_key_expression:
|
||||
|
|
@ -293,24 +316,26 @@ class DynamoHandler(BaseResponse):
|
|||
range_comparison = None
|
||||
range_values = []
|
||||
|
||||
items, last_page = dynamodb_backend2.query(name, hash_key, range_comparison, range_values)
|
||||
index_name = self.body.get('IndexName')
|
||||
items, last_page = dynamodb_backend2.query(name, hash_key, range_comparison, range_values, index_name=index_name)
|
||||
if items is None:
|
||||
er = 'com.amazonaws.dynamodb.v20111205#ResourceNotFoundException'
|
||||
return self.error(er)
|
||||
|
||||
limit = self.body.get("Limit")
|
||||
if limit:
|
||||
items = items[:limit]
|
||||
|
||||
reversed = self.body.get("ScanIndexForward")
|
||||
if reversed is False:
|
||||
items.reverse()
|
||||
|
||||
limit = self.body.get("Limit")
|
||||
if limit:
|
||||
items = items[:limit]
|
||||
|
||||
result = {
|
||||
"Count": len(items),
|
||||
"Items": [item.attrs for item in items],
|
||||
"ConsumedCapacityUnits": 1,
|
||||
}
|
||||
if self.body.get('Select', '').upper() != 'COUNT':
|
||||
result["Items"] = [item.attrs for item in items]
|
||||
|
||||
# Implement this when we do pagination
|
||||
# if not last_page:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue