huesoporro/src/huesoporro/svc/refresh.py

27 lines
908 B
Python

import datetime
from pydantic import BaseModel
from src.huesoporro.infra.authenticator import TwitchAuthenticator
from src.huesoporro.infra.db import Database
from src.huesoporro.models import User
class RefreshTokenAuthenticator(BaseModel):
db: Database
authenticator: TwitchAuthenticator
@staticmethod
def get_four_hours_from_now() -> float:
now = datetime.datetime.now(datetime.UTC)
four_hours_later = now + datetime.timedelta(hours=4)
return four_hours_later.timestamp()
async def run(self, refresh_token: str) -> User:
auth = await self.authenticator.refresh_token(refresh_token)
username = await self.authenticator.validate_token(auth.access_token)
expires_at = self.get_four_hours_from_now()
user = User(user=username, expires_at=expires_at, twitch_auth=auth)
await self.db.save_user(user)
return user