Merge pull request #1848 from ashb/expand-glue-catalog-mocking

Mock more of the Glue Data Catalog APIs
This commit is contained in:
Steve Pulec 2018-10-15 01:14:03 -04:00 committed by GitHub
commit dfc793916d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 673 additions and 61 deletions

View file

@ -29,3 +29,28 @@ TABLE_INPUT = {
},
'TableType': 'EXTERNAL_TABLE',
}
PARTITION_INPUT = {
# 'DatabaseName': 'dbname',
'StorageDescriptor': {
'BucketColumns': [],
'Columns': [],
'Compressed': False,
'InputFormat': 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat',
'Location': 's3://.../partition=value',
'NumberOfBuckets': -1,
'OutputFormat': 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat',
'Parameters': {},
'SerdeInfo': {
'Parameters': {'path': 's3://...', 'serialization.format': '1'},
'SerializationLibrary': 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'},
'SkewedInfo': {'SkewedColumnNames': [],
'SkewedColumnValueLocationMaps': {},
'SkewedColumnValues': []},
'SortColumns': [],
'StoredAsSubDirectories': False,
},
# 'TableName': 'source_table',
# 'Values': ['2018-06-26'],
}

View file

@ -2,7 +2,7 @@ from __future__ import unicode_literals
import copy
from .fixtures.datacatalog import TABLE_INPUT
from .fixtures.datacatalog import TABLE_INPUT, PARTITION_INPUT
def create_database(client, database_name):
@ -17,22 +17,38 @@ def get_database(client, database_name):
return client.get_database(Name=database_name)
def create_table_input(table_name, s3_location, columns=[], partition_keys=[]):
def create_table_input(database_name, table_name, columns=[], partition_keys=[]):
table_input = copy.deepcopy(TABLE_INPUT)
table_input['Name'] = table_name
table_input['PartitionKeys'] = partition_keys
table_input['StorageDescriptor']['Columns'] = columns
table_input['StorageDescriptor']['Location'] = s3_location
table_input['StorageDescriptor']['Location'] = 's3://my-bucket/{database_name}/{table_name}'.format(
database_name=database_name,
table_name=table_name
)
return table_input
def create_table(client, database_name, table_name, table_input):
def create_table(client, database_name, table_name, table_input=None, **kwargs):
if table_input is None:
table_input = create_table_input(database_name, table_name, **kwargs)
return client.create_table(
DatabaseName=database_name,
TableInput=table_input
)
def update_table(client, database_name, table_name, table_input=None, **kwargs):
if table_input is None:
table_input = create_table_input(database_name, table_name, **kwargs)
return client.update_table(
DatabaseName=database_name,
TableInput=table_input,
)
def get_table(client, database_name, table_name):
return client.get_table(
DatabaseName=database_name,
@ -44,3 +60,60 @@ def get_tables(client, database_name):
return client.get_tables(
DatabaseName=database_name
)
def get_table_versions(client, database_name, table_name):
return client.get_table_versions(
DatabaseName=database_name,
TableName=table_name
)
def get_table_version(client, database_name, table_name, version_id):
return client.get_table_version(
DatabaseName=database_name,
TableName=table_name,
VersionId=version_id,
)
def create_partition_input(database_name, table_name, values=[], columns=[]):
root_path = 's3://my-bucket/{database_name}/{table_name}'.format(
database_name=database_name,
table_name=table_name
)
part_input = copy.deepcopy(PARTITION_INPUT)
part_input['Values'] = values
part_input['StorageDescriptor']['Columns'] = columns
part_input['StorageDescriptor']['SerdeInfo']['Parameters']['path'] = root_path
return part_input
def create_partition(client, database_name, table_name, partiton_input=None, **kwargs):
if partiton_input is None:
partiton_input = create_partition_input(database_name, table_name, **kwargs)
return client.create_partition(
DatabaseName=database_name,
TableName=table_name,
PartitionInput=partiton_input
)
def update_partition(client, database_name, table_name, old_values=[], partiton_input=None, **kwargs):
if partiton_input is None:
partiton_input = create_partition_input(database_name, table_name, **kwargs)
return client.update_partition(
DatabaseName=database_name,
TableName=table_name,
PartitionInput=partiton_input,
PartitionValueList=old_values,
)
def get_partition(client, database_name, table_name, values):
return client.get_partition(
DatabaseName=database_name,
TableName=table_name,
PartitionValues=values,
)

View file

@ -1,10 +1,15 @@
from __future__ import unicode_literals
import sure # noqa
import re
from nose.tools import assert_raises
import boto3
from botocore.client import ClientError
from datetime import datetime
import pytz
from moto import mock_glue
from . import helpers
@ -30,7 +35,19 @@ def test_create_database_already_exists():
with assert_raises(ClientError) as exc:
helpers.create_database(client, database_name)
exc.exception.response['Error']['Code'].should.equal('DatabaseAlreadyExistsException')
exc.exception.response['Error']['Code'].should.equal('AlreadyExistsException')
@mock_glue
def test_get_database_not_exits():
client = boto3.client('glue', region_name='us-east-1')
database_name = 'nosuchdatabase'
with assert_raises(ClientError) as exc:
helpers.get_database(client, database_name)
exc.exception.response['Error']['Code'].should.equal('EntityNotFoundException')
exc.exception.response['Error']['Message'].should.match('Database nosuchdatabase not found')
@mock_glue
@ -40,12 +57,7 @@ def test_create_table():
helpers.create_database(client, database_name)
table_name = 'myspecialtable'
s3_location = 's3://my-bucket/{database_name}/{table_name}'.format(
database_name=database_name,
table_name=table_name
)
table_input = helpers.create_table_input(table_name, s3_location)
table_input = helpers.create_table_input(database_name, table_name)
helpers.create_table(client, database_name, table_name, table_input)
response = helpers.get_table(client, database_name, table_name)
@ -63,18 +75,12 @@ def test_create_table_already_exists():
helpers.create_database(client, database_name)
table_name = 'cantcreatethistabletwice'
s3_location = 's3://my-bucket/{database_name}/{table_name}'.format(
database_name=database_name,
table_name=table_name
)
table_input = helpers.create_table_input(table_name, s3_location)
helpers.create_table(client, database_name, table_name, table_input)
helpers.create_table(client, database_name, table_name)
with assert_raises(ClientError) as exc:
helpers.create_table(client, database_name, table_name, table_input)
helpers.create_table(client, database_name, table_name)
exc.exception.response['Error']['Code'].should.equal('TableAlreadyExistsException')
exc.exception.response['Error']['Code'].should.equal('AlreadyExistsException')
@mock_glue
@ -87,11 +93,7 @@ def test_get_tables():
table_inputs = {}
for table_name in table_names:
s3_location = 's3://my-bucket/{database_name}/{table_name}'.format(
database_name=database_name,
table_name=table_name
)
table_input = helpers.create_table_input(table_name, s3_location)
table_input = helpers.create_table_input(database_name, table_name)
table_inputs[table_name] = table_input
helpers.create_table(client, database_name, table_name, table_input)
@ -99,10 +101,326 @@ def test_get_tables():
tables = response['TableList']
assert len(tables) == 3
tables.should.have.length_of(3)
for table in tables:
table_name = table['Name']
table_name.should.equal(table_inputs[table_name]['Name'])
table['StorageDescriptor'].should.equal(table_inputs[table_name]['StorageDescriptor'])
table['PartitionKeys'].should.equal(table_inputs[table_name]['PartitionKeys'])
@mock_glue
def test_get_table_versions():
client = boto3.client('glue', region_name='us-east-1')
database_name = 'myspecialdatabase'
helpers.create_database(client, database_name)
table_name = 'myfirsttable'
version_inputs = {}
table_input = helpers.create_table_input(database_name, table_name)
helpers.create_table(client, database_name, table_name, table_input)
version_inputs["1"] = table_input
columns = [{'Name': 'country', 'Type': 'string'}]
table_input = helpers.create_table_input(database_name, table_name, columns=columns)
helpers.update_table(client, database_name, table_name, table_input)
version_inputs["2"] = table_input
# Updateing with an indentical input should still create a new version
helpers.update_table(client, database_name, table_name, table_input)
version_inputs["3"] = table_input
response = helpers.get_table_versions(client, database_name, table_name)
vers = response['TableVersions']
vers.should.have.length_of(3)
vers[0]['Table']['StorageDescriptor']['Columns'].should.equal([])
vers[-1]['Table']['StorageDescriptor']['Columns'].should.equal(columns)
for n, ver in enumerate(vers):
n = str(n + 1)
ver['VersionId'].should.equal(n)
ver['Table']['Name'].should.equal(table_name)
ver['Table']['StorageDescriptor'].should.equal(version_inputs[n]['StorageDescriptor'])
ver['Table']['PartitionKeys'].should.equal(version_inputs[n]['PartitionKeys'])
response = helpers.get_table_version(client, database_name, table_name, "3")
ver = response['TableVersion']
ver['VersionId'].should.equal("3")
ver['Table']['Name'].should.equal(table_name)
ver['Table']['StorageDescriptor']['Columns'].should.equal(columns)
@mock_glue
def test_get_table_version_not_found():
client = boto3.client('glue', region_name='us-east-1')
database_name = 'myspecialdatabase'
table_name = 'myfirsttable'
helpers.create_database(client, database_name)
helpers.create_table(client, database_name, table_name)
with assert_raises(ClientError) as exc:
helpers.get_table_version(client, database_name, 'myfirsttable', "20")
exc.exception.response['Error']['Code'].should.equal('EntityNotFoundException')
exc.exception.response['Error']['Message'].should.match('version', re.I)
@mock_glue
def test_get_table_version_invalid_input():
client = boto3.client('glue', region_name='us-east-1')
database_name = 'myspecialdatabase'
table_name = 'myfirsttable'
helpers.create_database(client, database_name)
helpers.create_table(client, database_name, table_name)
with assert_raises(ClientError) as exc:
helpers.get_table_version(client, database_name, 'myfirsttable', "10not-an-int")
exc.exception.response['Error']['Code'].should.equal('InvalidInputException')
@mock_glue
def test_get_table_not_exits():
client = boto3.client('glue', region_name='us-east-1')
database_name = 'myspecialdatabase'
helpers.create_database(client, database_name)
with assert_raises(ClientError) as exc:
helpers.get_table(client, database_name, 'myfirsttable')
exc.exception.response['Error']['Code'].should.equal('EntityNotFoundException')
exc.exception.response['Error']['Message'].should.match('Table myfirsttable not found')
@mock_glue
def test_get_table_when_database_not_exits():
client = boto3.client('glue', region_name='us-east-1')
database_name = 'nosuchdatabase'
with assert_raises(ClientError) as exc:
helpers.get_table(client, database_name, 'myfirsttable')
exc.exception.response['Error']['Code'].should.equal('EntityNotFoundException')
exc.exception.response['Error']['Message'].should.match('Database nosuchdatabase not found')
@mock_glue
def test_get_partitions_empty():
client = boto3.client('glue', region_name='us-east-1')
database_name = 'myspecialdatabase'
table_name = 'myfirsttable'
helpers.create_database(client, database_name)
helpers.create_table(client, database_name, table_name)
response = client.get_partitions(DatabaseName=database_name, TableName=table_name)
response['Partitions'].should.have.length_of(0)
@mock_glue
def test_create_partition():
client = boto3.client('glue', region_name='us-east-1')
database_name = 'myspecialdatabase'
table_name = 'myfirsttable'
values = ['2018-10-01']
helpers.create_database(client, database_name)
helpers.create_table(client, database_name, table_name)
before = datetime.now(pytz.utc)
part_input = helpers.create_partition_input(database_name, table_name, values=values)
helpers.create_partition(client, database_name, table_name, part_input)
after = datetime.now(pytz.utc)
response = client.get_partitions(DatabaseName=database_name, TableName=table_name)
partitions = response['Partitions']
partitions.should.have.length_of(1)
partition = partitions[0]
partition['TableName'].should.equal(table_name)
partition['StorageDescriptor'].should.equal(part_input['StorageDescriptor'])
partition['Values'].should.equal(values)
partition['CreationTime'].should.be.greater_than(before)
partition['CreationTime'].should.be.lower_than(after)
@mock_glue
def test_create_partition_already_exist():
client = boto3.client('glue', region_name='us-east-1')
database_name = 'myspecialdatabase'
table_name = 'myfirsttable'
values = ['2018-10-01']
helpers.create_database(client, database_name)
helpers.create_table(client, database_name, table_name)
helpers.create_partition(client, database_name, table_name, values=values)
with assert_raises(ClientError) as exc:
helpers.create_partition(client, database_name, table_name, values=values)
exc.exception.response['Error']['Code'].should.equal('AlreadyExistsException')
@mock_glue
def test_get_partition_not_found():
client = boto3.client('glue', region_name='us-east-1')
database_name = 'myspecialdatabase'
table_name = 'myfirsttable'
values = ['2018-10-01']
helpers.create_database(client, database_name)
helpers.create_table(client, database_name, table_name)
with assert_raises(ClientError) as exc:
helpers.get_partition(client, database_name, table_name, values)
exc.exception.response['Error']['Code'].should.equal('EntityNotFoundException')
exc.exception.response['Error']['Message'].should.match('partition')
@mock_glue
def test_get_partition():
client = boto3.client('glue', region_name='us-east-1')
database_name = 'myspecialdatabase'
table_name = 'myfirsttable'
helpers.create_database(client, database_name)
helpers.create_table(client, database_name, table_name)
values = [['2018-10-01'], ['2018-09-01']]
helpers.create_partition(client, database_name, table_name, values=values[0])
helpers.create_partition(client, database_name, table_name, values=values[1])
response = client.get_partition(DatabaseName=database_name, TableName=table_name, PartitionValues=values[1])
partition = response['Partition']
partition['TableName'].should.equal(table_name)
partition['Values'].should.equal(values[1])
@mock_glue
def test_update_partition_not_found_moving():
client = boto3.client('glue', region_name='us-east-1')
database_name = 'myspecialdatabase'
table_name = 'myfirsttable'
helpers.create_database(client, database_name)
helpers.create_table(client, database_name, table_name)
with assert_raises(ClientError) as exc:
helpers.update_partition(client, database_name, table_name, old_values=['0000-00-00'], values=['2018-10-02'])
exc.exception.response['Error']['Code'].should.equal('EntityNotFoundException')
exc.exception.response['Error']['Message'].should.match('partition')
@mock_glue
def test_update_partition_not_found_change_in_place():
client = boto3.client('glue', region_name='us-east-1')
database_name = 'myspecialdatabase'
table_name = 'myfirsttable'
values = ['2018-10-01']
helpers.create_database(client, database_name)
helpers.create_table(client, database_name, table_name)
with assert_raises(ClientError) as exc:
helpers.update_partition(client, database_name, table_name, old_values=values, values=values)
exc.exception.response['Error']['Code'].should.equal('EntityNotFoundException')
exc.exception.response['Error']['Message'].should.match('partition')
@mock_glue
def test_update_partition_cannot_overwrite():
client = boto3.client('glue', region_name='us-east-1')
database_name = 'myspecialdatabase'
table_name = 'myfirsttable'
helpers.create_database(client, database_name)
helpers.create_table(client, database_name, table_name)
values = [['2018-10-01'], ['2018-09-01']]
helpers.create_partition(client, database_name, table_name, values=values[0])
helpers.create_partition(client, database_name, table_name, values=values[1])
with assert_raises(ClientError) as exc:
helpers.update_partition(client, database_name, table_name, old_values=values[0], values=values[1])
exc.exception.response['Error']['Code'].should.equal('AlreadyExistsException')
@mock_glue
def test_update_partition():
client = boto3.client('glue', region_name='us-east-1')
database_name = 'myspecialdatabase'
table_name = 'myfirsttable'
values = ['2018-10-01']
helpers.create_database(client, database_name)
helpers.create_table(client, database_name, table_name)
helpers.create_partition(client, database_name, table_name, values=values)
response = helpers.update_partition(
client,
database_name,
table_name,
old_values=values,
values=values,
columns=[{'Name': 'country', 'Type': 'string'}],
)
response = client.get_partition(DatabaseName=database_name, TableName=table_name, PartitionValues=values)
partition = response['Partition']
partition['TableName'].should.equal(table_name)
partition['StorageDescriptor']['Columns'].should.equal([{'Name': 'country', 'Type': 'string'}])
@mock_glue
def test_update_partition_move():
client = boto3.client('glue', region_name='us-east-1')
database_name = 'myspecialdatabase'
table_name = 'myfirsttable'
values = ['2018-10-01']
new_values = ['2018-09-01']
helpers.create_database(client, database_name)
helpers.create_table(client, database_name, table_name)
helpers.create_partition(client, database_name, table_name, values=values)
response = helpers.update_partition(
client,
database_name,
table_name,
old_values=values,
values=new_values,
columns=[{'Name': 'country', 'Type': 'string'}],
)
with assert_raises(ClientError) as exc:
helpers.get_partition(client, database_name, table_name, values)
# Old partition shouldn't exist anymore
exc.exception.response['Error']['Code'].should.equal('EntityNotFoundException')
response = client.get_partition(DatabaseName=database_name, TableName=table_name, PartitionValues=new_values)
partition = response['Partition']
partition['TableName'].should.equal(table_name)
partition['StorageDescriptor']['Columns'].should.equal([{'Name': 'country', 'Type': 'string'}])