feat: remove !h and make the bot have an in-memory dict of greeted users instead of using the backoff service

This commit is contained in:
cătălin 2025-02-26 11:53:18 +01:00
commit b2185f4174
No known key found for this signature in database
52 changed files with 404 additions and 353 deletions

View file

View file

View file

@ -0,0 +1,68 @@
from litestar import Request
from litestar.exceptions import HTTPException
from huesoporro.actions.authenticate import AuthenticateAction
from huesoporro.actions.get_user_by_jwt import GetUserByJWTAction
from huesoporro.infra.authenticator import TwitchAuthenticator
from huesoporro.infra.db import Database
from huesoporro.infra.repos import UserRepo
from huesoporro.libs.db import MarkovDatabase
from huesoporro.models import User
from huesoporro.settings import Settings
from huesoporro.svc.get_chatbot_settings import ChatbotSettingsGetterSvc
from huesoporro.svc.store import SentenceStorerSvc
from huesoporro.svc.store_settings import ChatbotSettingsStorerSvc
def get_settings() -> Settings:
return Settings.get()
def get_authenticator(s: Settings) -> TwitchAuthenticator:
return TwitchAuthenticator(s=s)
def get_db(s: Settings):
return Database(s=s)
async def get_get_user_by_jwt_action(
user_repo: UserRepo, authenticator: TwitchAuthenticator, s: Settings
):
return GetUserByJWTAction(user_repo=user_repo, authenticator=authenticator, s=s)
async def authenticate(
request: Request, get_user_by_jwt_action: GetUserByJWTAction
) -> User:
token = request.query_params.get("huesoporro_token")
if token:
return await get_user_by_jwt_action.run(token)
cookies = request.cookies.get("huesoporroAuth")
if cookies:
return await get_user_by_jwt_action.run(cookies)
raise HTTPException(status_code=401, detail="Unauthorized")
async def get_chatbot_settings_svc(db: Database):
return ChatbotSettingsGetterSvc(db=db)
async def store_chatbot_settings_svc(db: Database):
return ChatbotSettingsStorerSvc(db=db)
async def get_sentences_storer_svc(db: MarkovDatabase):
return SentenceStorerSvc(db=db)
async def get_user_repo(s: Settings):
return UserRepo(s=s)
async def get_authenticate_action(
user_repo: UserRepo, authenticator: TwitchAuthenticator, s: Settings
):
return AuthenticateAction(user_repo=user_repo, authenticator=authenticator, s=s)

View file

@ -0,0 +1,45 @@
import httpx
from litestar import MediaType, Request, Response
from litestar.exceptions import HTTPException
from litestar.response import Redirect
from litestar.status_codes import HTTP_500_INTERNAL_SERVER_ERROR
from loguru import logger
def http_exception_handler(_: Request, exc: HTTPException) -> Response:
status_code = getattr(exc, "status_code", HTTP_500_INTERNAL_SERVER_ERROR)
detail = getattr(exc, "detail", "")
if isinstance(exc, HTTPException) and (exc.status_code in [401, 403]):
logger.warning("User could not authenticate. Redirecting to /login page")
return Redirect("/login")
return Response(
media_type=MediaType.TEXT,
content=detail,
status_code=status_code,
)
def httpx_status_error_handler(_: Request, exc: httpx.HTTPStatusError):
logger.error(f"HTTPX error occurred: {exc}")
return Response(
media_type=MediaType.TEXT,
content=f"HTTPX error occurred: {exc}",
status_code=exc.response.status_code,
)
async def after_exception_handler(exc: Exception, scope: "Scope") -> None: # type: ignore[name-defined] # noqa: F821
"""Hook function that will be invoked after each exception."""
state = scope["app"].state
if not hasattr(state, "error_count"):
state.error_count = 1
else:
state.error_count += 1
logger.error(
f"an exception of type {type(exc).__name__} has occurred for requested path {scope['path']} and the application error count is {state.error_count}.",
)
import traceback
traceback.print_exc()

View file

@ -0,0 +1,95 @@
import httpx
from litestar import Litestar, get
from litestar.contrib.jinja import JinjaTemplateEngine
from litestar.di import Provide
from litestar.exceptions import HTTPException
from litestar.static_files import StaticFilesConfig
from litestar.template import TemplateConfig
from apps.httpapi.litestar.dependencies import (
authenticate,
get_authenticate_action,
get_authenticator,
get_chatbot_settings_svc,
get_db,
get_get_user_by_jwt_action,
get_sentences_storer_svc,
get_settings,
get_user_repo,
store_chatbot_settings_svc,
)
from apps.httpapi.litestar.errors import (
after_exception_handler,
http_exception_handler,
httpx_status_error_handler,
)
from apps.httpapi.litestar.routes.api import (
get_bot_settings,
get_bot_status,
get_index,
get_tts_overlay,
get_tts_permalink,
manage_bot,
save_bot_settings,
)
from apps.httpapi.litestar.routes.auth import get_code, login
from huesoporro.bot import BotsManager
from huesoporro.settings import Settings
@get("/healthz")
def get_health() -> dict:
return {"status": "ok"}
def create_app():
return Litestar(
route_handlers=[
get_health,
login,
get_index,
get_tts_overlay,
get_tts_permalink,
get_code,
manage_bot,
get_bot_status,
save_bot_settings,
get_bot_settings,
],
static_files_config=(
StaticFilesConfig(
path="/tts_files",
directories=[Settings.get().tts_cache_path],
),
StaticFilesConfig(
path="static",
directories=[Settings.get().static_files_path],
),
),
template_config=TemplateConfig(
directory=Settings.get().templates_files_path,
engine=JinjaTemplateEngine,
),
exception_handlers={
HTTPException: http_exception_handler,
httpx.HTTPStatusError: httpx_status_error_handler,
},
after_exception=[after_exception_handler],
dependencies={
"s": Provide(get_settings, use_cache=True),
"a": Provide(get_authenticator, use_cache=True),
"user": Provide(authenticate),
"db": Provide(get_db, use_cache=True),
"bm": Provide(BotsManager, use_cache=True),
"gbs": Provide(get_chatbot_settings_svc),
"sbs": Provide(store_chatbot_settings_svc),
"sss": Provide(get_sentences_storer_svc),
"authenticator": Provide(get_authenticator),
"authenticate_action": Provide(get_authenticate_action),
"user_repo": Provide(get_user_repo),
"get_user_by_jwt_action": Provide(get_get_user_by_jwt_action),
},
)
app = create_app()

View file

@ -0,0 +1,106 @@
from typing import Literal
from litestar import MediaType, Response, get, put
from litestar.datastructures import UploadFile
from litestar.response import Template
from pydantic import BaseModel, ConfigDict
from huesoporro.bot import BotsManager
from huesoporro.models import ChatbotSettings, User
from huesoporro.svc.get_chatbot_settings import ChatbotSettingsGetterSvc
from huesoporro.svc.store_settings import ChatbotSettingsStorerSvc
class ManageBotDTO(BaseModel):
command: Literal["start", "stop"]
channel_name: str | None = None
class ImportTextFileDTO(BaseModel):
file: UploadFile
channel_name: str
model_config = ConfigDict(arbitrary_types_allowed=True)
@get(
"/tts",
media_type=MediaType.HTML,
)
async def get_tts_overlay(user: User) -> Template:
return Template(template_name="tts.html")
@get(
"/tts/permalink",
media_type=MediaType.HTML,
)
async def get_tts_permalink(access_token: str) -> Template:
"""Handler for the /tts permalink endpoint to be used by apps that can only give the authentication as a query
param and not as a cookie, i.e. OBS"""
return Template(
template_name="tts.html",
)
@get(
"/",
media_type=MediaType.HTML,
)
async def get_index(user: User, gbs: ChatbotSettingsGetterSvc) -> Template:
chatbot_settings = await gbs.run(user=user)
return Template(
template_name="index.html",
context=chatbot_settings.model_dump() if chatbot_settings else {},
)
@put("/api/v1/bot")
async def manage_bot(
user: User,
data: ManageBotDTO,
gbs: ChatbotSettingsGetterSvc,
sbs: ChatbotSettingsStorerSvc,
bm: BotsManager,
) -> Response:
chatbot_settings = await gbs.run(user=user)
if not chatbot_settings:
await sbs.run(user=user, bot_settings=ChatbotSettings())
chatbot_settings = await gbs.run(user=user)
if data.command == "start":
if not data.channel_name:
return Response({"message": "Channel name is required"}, status_code=400)
bm.add_bot(user, data.channel_name, chatbot_settings=chatbot_settings) # type: ignore[arg-type]
if user.user in bm.bots:
await bm.run_user_bot(user)
return Response({"message": "Bot started"})
if data.command == "stop" and user.user in bm.bots:
await bm.stop_user_bot(user)
return Response({"message": "Bot stopped"})
return Response({"message": "Invalid command"}, status_code=400)
@get("/api/v1/bot")
async def get_bot_status(user: User, bm: BotsManager) -> dict:
if user.user not in bm.bots:
return {"status": "ko"}
return {"status": "ok"}
@get("/api/v1/bot/settings")
async def get_bot_settings(
user: User, gbs: ChatbotSettingsGetterSvc
) -> ChatbotSettings | dict:
cbs = await gbs.run(user=user)
if not cbs:
return {"status": "Not found"}
return cbs
@put("/api/v1/bot/settings")
async def save_bot_settings(
user: User, data: ChatbotSettings, sbs: ChatbotSettingsStorerSvc
) -> dict:
await sbs.run(user=user, bot_settings=data)
return {"status": "ok"}

View file

@ -0,0 +1,41 @@
import secrets
from litestar import MediaType, get
from litestar.datastructures.cookie import Cookie
from litestar.response import Redirect, Template
from huesoporro.actions.authenticate import AuthenticateAction
from huesoporro.settings import Settings
@get(path="/o/code")
async def get_code(code: str, authenticate_action: AuthenticateAction) -> Redirect:
token = await authenticate_action.run(code)
return Redirect(
"/",
cookies=[
Cookie(
key="huesoporroAuth",
value=token,
expires=604800, # 1 week
)
],
)
@get(
"/login",
media_type=MediaType.HTML,
)
async def login(s: Settings) -> Template:
scopes = "+".join(s.twitch_scopes)
return Template(
"login.html",
context={
"twitch_login_url": "https://id.twitch.tv/oauth2/authorize?response_type=code"
f"&client_id={s.twitch_client_id}"
f"&redirect_uri={s.server_hostname}o/code"
f"&scope={scopes}"
f"&state={secrets.token_urlsafe(32)}"
},
)