huesoporro/src/huesoporro/tts.py
2024-12-11 17:08:17 +01:00

131 lines
4.4 KiB
Python

import asyncio
from collections import deque
from hashlib import sha512
from pathlib import Path
from gtts import gTTS
from litestar import WebSocket
from loguru import logger
from src.huesoporro.settings import Settings
class TTSManager:
TEXT_MAX_LENGTH: int = 400
def __init__(self, max_queue_size=10):
self.queue: deque = deque(maxlen=max_queue_size)
# Connected WebSocket clients
self.clients: list[WebSocket] = []
# Currently playing audio
self.current_audio = None
# Lock to prevent race conditions
self._lock = asyncio.Lock()
self._tasks = []
self.s = Settings.get()
def generate_tts(self, text, language="pt", tld="com.br"):
# Generate unique filename
text = text[0 : self.TEXT_MAX_LENGTH]
filename = (
self.s.tts_cache_path / f"{sha512(text.lower().encode()).hexdigest()}.mp3"
)
if filename.exists():
logger.info(
f"TTS already exists for '{text[:50]}' at {filename}. Returning it"
)
return {
"filename": filename.name,
"text": text,
"filepath": str(filename),
"language": language,
"tld": tld,
}
logger.info(f"Generating TTS for '{text[:50]}'")
# Generate TTS
tts = gTTS(text=text, lang=language, tld=tld)
tts.save(str(filename))
return {
"filename": filename.name,
"text": text,
"filepath": filename,
"language": language,
"tld": tld,
}
async def add_to_queue(self, text, language="pt", tld="com.br"):
"""Add TTS request to queue and start processing if not already running"""
async with self._lock:
# Generate TTS file
audio_info = self.generate_tts(text, language, tld)
# Add to queue
self.queue.append(audio_info)
# If this is the only item, start processing
if len(self.queue) == 1:
self._tasks.append(asyncio.create_task(self.process_queue()))
return audio_info
async def process_queue(self):
"""Process queue and stream audio to connected clients"""
while True:
async with self._lock:
# Check if queue is empty
if not self.queue:
return
# Get next audio file
audio_info = self.queue[0]
try:
# Read the entire audio file
audio_path = Path(audio_info["filepath"])
with audio_path.open("rb") as audio_file:
file_size = audio_path.stat().st_size
logger.info(
f"Streaming file: {audio_info['filename']}, Size: {file_size} bytes"
)
# Stream audio to all connected clients
for client in self.clients:
try:
# Reset file pointer to beginning
audio_file.seek(0)
# Send file size first (as a header)
await client.send_text(f"FILE_HEADER:{file_size}")
# Stream file in chunks
chunk = audio_file.read(128) # Larger chunk size
chunk_count = 0
while chunk:
logger.info(f"Streamed {chunk_count} chunks")
chunk_count += 1
await client.send_bytes(chunk)
chunk = audio_file.read(128)
# Send file footer
await client.send_text("FILE_FOOTER")
except Exception: # noqa: BLE001
logger.error(
f"Error streaming to client {client.client}. Removing it."
)
if client in self.clients:
self.clients.remove(client)
except Exception as e: # noqa: BLE001
logger.error(f"Error processing audio file: {e}")
# Remove the processed item from the queue
async with self._lock:
if self.queue and self.queue[0] == audio_info:
self.queue.popleft()