secretsanta/secretsanta/repo.py
2023-12-09 11:15:39 +01:00

49 lines
1.6 KiB
Python

import json
from contextlib import asynccontextmanager
import aioboto3
from botocore.exceptions import ClientError
from secretsanta.domain import Group, IRepo
from secretsanta.settings import Config, get_config
class S3Repo(IRepo):
def __init__(self, config: Config | None = None):
self.config = config or get_config()
self.session = aioboto3.Session(
aws_access_key_id=self.config.s3_access_key_id,
aws_secret_access_key=self.config.s3_secret_access_key,
aws_session_token=self.config.s3_session_token,
region_name=self.config.s3_region,
)
@asynccontextmanager
async def get_s3(self):
kwargs = {}
if self.config.s3_endpoint_url:
kwargs["endpoint_url"] = str(self.config.s3_endpoint_url)
async with self.session.client("s3", **kwargs) as s3:
yield s3
async def get(self, group_uuid: str) -> Group | None:
async with self.get_s3() as s3:
try:
s3_obj = await s3.get_object(
Bucket=self.config.s3_bucket_name,
Key=f"secretsanta/{group_uuid}.json",
)
except ClientError:
return None
else:
group_raw = await s3_obj["Body"].read()
return Group(**json.loads(group_raw))
async def create(self, group: Group) -> Group:
async with self.get_s3() as s3:
await s3.put_object(
Bucket=self.config.s3_bucket_name,
Key=f"secretsanta/{group.uuid}.json",
Body=group.model_dump_json(),
)
return group