Files
MultiChatOverlay/filename
2025-11-18 01:53:39 +01:00

75 lines
3.1 KiB
Plaintext

import asyncio
from typing import Dict
from chat_listener import TwitchBot
import security # To decrypt tokens
from config import settings # To get client_id and client_secret
class ListenerManager:
def __init__(self):
# This dictionary will hold our running listener tasks.
# The key will be the user_id and the value will be the asyncio.Task.
self.active_listeners: Dict[int, Dict] = {}
print("ListenerManager initialized.")
async def start_listener_for_user(self, user, websocket_manager):
"""
Starts a chat listener for a given user if one isn't already running.
"""
if user.id in self.active_listeners:
print(f"Listener for user {user.id} is already running.")
return
# Guard Clause: Ensure the user has a valid platform ID required by twitchio.
if not user.platform_user_id:
print(f"ERROR: Cannot start listener for user {user.id}. Missing platform_user_id.")
return
print(f"Starting listener for user {user.id} ({user.username})...")
try:
tokens = security.decrypt_tokens(user.encrypted_tokens)
access_token = tokens['access_token']
refresh_token = tokens['refresh_token']
# Initialize the bot object without credentials first.
bot = TwitchBot(
websocket_manager=websocket_manager,
db_user_id=user.id
)
# Create a task that runs our new start method with all credentials.
# If super().__init__ fails inside bot.start(), the exception will be
# caught by our try/except block here.
task = asyncio.create_task(bot.start(
access_token=access_token, refresh_token=refresh_token,
client_id=settings.TWITCH_CLIENT_ID, client_secret=settings.TWITCH_CLIENT_SECRET,
channel_name=user.username
))
# Store both the task and the bot instance for graceful shutdown
self.active_listeners[user.id] = {"task": task, "bot": bot}
except Exception as e:
# This will catch errors during bot instantiation (e.g., bad token)
print(f"ERROR: Failed to instantiate or start listener for user {user.id}: {e}")
async def stop_listener_for_user(self, user_id: int):
"""Stops a chat listener for a given user."""
if user_id not in self.active_listeners:
print(f"No active listener found for user {user_id}.")
return
print(f"Stopping listener for user {user_id}...")
listener_info = self.active_listeners.pop(user_id)
task = listener_info["task"]
bot = listener_info["bot"]
# Gracefully close the bot's connection
if bot and not bot.is_closed():
await bot.close()
# Cancel the asyncio task
task.cancel()
try:
await task
except asyncio.CancelledError:
print(f"Listener for user {user_id} successfully stopped.")