Working on: The Twitch authentication
This commit is contained in:
@@ -1,9 +1,12 @@
|
|||||||
|
import logging
|
||||||
import twitchio
|
import twitchio
|
||||||
from sqlalchemy.orm import Session
|
from sqlalchemy.orm import Session
|
||||||
from database import SessionLocal
|
from database import SessionLocal
|
||||||
import models
|
import models
|
||||||
import security
|
import security
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
class TwitchBot(twitchio.Client):
|
class TwitchBot(twitchio.Client):
|
||||||
def __init__(self, websocket_manager, db_user_id: int):
|
def __init__(self, websocket_manager, db_user_id: int):
|
||||||
self.websocket_manager = websocket_manager
|
self.websocket_manager = websocket_manager
|
||||||
@@ -16,7 +19,7 @@ class TwitchBot(twitchio.Client):
|
|||||||
A custom start method that also handles initialization. This makes the
|
A custom start method that also handles initialization. This makes the
|
||||||
entire setup process an awaitable, atomic operation.
|
entire setup process an awaitable, atomic operation.
|
||||||
"""
|
"""
|
||||||
print(f"DIAGNOSTIC: Initializing and connecting for user {self.db_user_id}...")
|
logger.info(f"DIAGNOSTIC: Initializing and connecting for user {self.db_user_id}...")
|
||||||
|
|
||||||
# The sensitive __init__ call is now inside the awaitable task.
|
# The sensitive __init__ call is now inside the awaitable task.
|
||||||
super().__init__(token=access_token, client_id=client_id, client_secret=client_secret,
|
super().__init__(token=access_token, client_id=client_id, client_secret=client_secret,
|
||||||
@@ -24,19 +27,22 @@ class TwitchBot(twitchio.Client):
|
|||||||
self.channel_name = channel_name
|
self.channel_name = channel_name
|
||||||
self.is_initialized = True
|
self.is_initialized = True
|
||||||
|
|
||||||
|
try:
|
||||||
await self.connect()
|
await self.connect()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Twitch connection failed for user {self.db_user_id}: {e}")
|
||||||
|
|
||||||
async def event_ready(self):
|
async def event_ready(self):
|
||||||
"""Called once when the bot goes online."""
|
"""Called once when the bot goes online."""
|
||||||
# Diagnostic Logging: Confirming the bot is ready and joined the channel.
|
# Diagnostic Logging: Confirming the bot is ready and joined the channel.
|
||||||
print(f"DIAGNOSTIC: Listener connected and ready for user_id: {self.db_user_id}, channel: #{self.channel_name}")
|
logger.info(f"DIAGNOSTIC: Listener connected and ready for user_id: {self.db_user_id}, channel: #{self.channel_name}")
|
||||||
|
|
||||||
async def event_token_refresh(self, token: str, refresh_token: str):
|
async def event_token_refresh(self, token: str, refresh_token: str):
|
||||||
"""
|
"""
|
||||||
Called when twitchio automatically refreshes the token.
|
Called when twitchio automatically refreshes the token.
|
||||||
We must save the new tokens back to our database.
|
We must save the new tokens back to our database.
|
||||||
"""
|
"""
|
||||||
print(f"DIAGNOSTIC: Token refreshed for user {self.db_user_id}. Saving new tokens to database.")
|
logger.info(f"DIAGNOSTIC: Token refreshed for user {self.db_user_id}. Saving new tokens to database.")
|
||||||
db: Session = SessionLocal()
|
db: Session = SessionLocal()
|
||||||
try:
|
try:
|
||||||
user = db.query(models.User).filter(models.User.id == self.db_user_id).first()
|
user = db.query(models.User).filter(models.User.id == self.db_user_id).first()
|
||||||
@@ -49,7 +55,7 @@ class TwitchBot(twitchio.Client):
|
|||||||
async def event_message(self, message): # Mandate: Type hint removed to prevent import errors.
|
async def event_message(self, message): # Mandate: Type hint removed to prevent import errors.
|
||||||
"""Runs every time a message is sent in chat."""
|
"""Runs every time a message is sent in chat."""
|
||||||
# Diagnostic Logging: Checkpoint 1 - A raw message is received from Twitch.
|
# Diagnostic Logging: Checkpoint 1 - A raw message is received from Twitch.
|
||||||
print(f"DIAGNOSTIC: Message received for user {self.db_user_id} in channel {self.channel_name}: '{message.content}'")
|
logger.info(f"DIAGNOSTIC: Message received for user {self.db_user_id} in channel {self.channel_name}: '{message.content}'")
|
||||||
|
|
||||||
# Ignore messages sent by the bot itself to prevent loops.
|
# Ignore messages sent by the bot itself to prevent loops.
|
||||||
if message.echo:
|
if message.echo:
|
||||||
@@ -62,11 +68,11 @@ class TwitchBot(twitchio.Client):
|
|||||||
"platform": "twitch"
|
"platform": "twitch"
|
||||||
}
|
}
|
||||||
# Diagnostic Logging: Checkpoint 2 - The message data has been prepared for broadcasting.
|
# Diagnostic Logging: Checkpoint 2 - The message data has been prepared for broadcasting.
|
||||||
print(f"DIAGNOSTIC: Prepared chat_data for user {self.db_user_id}: {chat_data}")
|
logger.info(f"DIAGNOSTIC: Prepared chat_data for user {self.db_user_id}: {chat_data}")
|
||||||
|
|
||||||
# Broadcast the message to the specific user's overlay
|
# Broadcast the message to the specific user's overlay
|
||||||
# We need the user's ID to know which WebSocket connection to send to.
|
# We need the user's ID to know which WebSocket connection to send to.
|
||||||
user_id = self.db_user_id
|
user_id = self.db_user_id
|
||||||
await self.websocket_manager.broadcast_to_user(user_id, chat_data)
|
await self.websocket_manager.broadcast_to_user(user_id, chat_data)
|
||||||
# Diagnostic Logging: Checkpoint 3 - The broadcast function was called.
|
# Diagnostic Logging: Checkpoint 3 - The broadcast function was called.
|
||||||
print(f"DIAGNOSTIC: Broadcast called for user {self.db_user_id}.")
|
logger.info(f"DIAGNOSTIC: Broadcast called for user {self.db_user_id}.")
|
||||||
@@ -1,31 +1,34 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
|
import logging
|
||||||
from typing import Dict
|
from typing import Dict
|
||||||
|
|
||||||
from chat_listener import TwitchBot
|
from chat_listener import TwitchBot
|
||||||
import security # To decrypt tokens
|
import security # To decrypt tokens
|
||||||
from config import settings # To get client_id and client_secret
|
from config import settings # To get client_id and client_secret
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
class ListenerManager:
|
class ListenerManager:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
# This dictionary will hold our running listener tasks.
|
# This dictionary will hold our running listener tasks.
|
||||||
# The key will be the user_id and the value will be the asyncio.Task.
|
# The key will be the user_id and the value will be the asyncio.Task.
|
||||||
self.active_listeners: Dict[int, Dict] = {}
|
self.active_listeners: Dict[int, Dict] = {}
|
||||||
print("ListenerManager initialized.")
|
logger.info("ListenerManager initialized.")
|
||||||
|
|
||||||
async def start_listener_for_user(self, user, websocket_manager):
|
async def start_listener_for_user(self, user, websocket_manager):
|
||||||
"""
|
"""
|
||||||
Starts a chat listener for a given user if one isn't already running.
|
Starts a chat listener for a given user if one isn't already running.
|
||||||
"""
|
"""
|
||||||
if user.id in self.active_listeners:
|
if user.id in self.active_listeners:
|
||||||
print(f"Listener for user {user.id} is already running.")
|
logger.info(f"Listener for user {user.id} is already running.")
|
||||||
return
|
return
|
||||||
|
|
||||||
# Guard Clause: Ensure the user has a valid platform ID required by twitchio.
|
# Guard Clause: Ensure the user has a valid platform ID required by twitchio.
|
||||||
if not user.platform_user_id:
|
if not user.platform_user_id:
|
||||||
print(f"ERROR: Cannot start listener for user {user.id}. Missing platform_user_id.")
|
logger.error(f"Cannot start listener for user {user.id}. Missing platform_user_id.")
|
||||||
return
|
return
|
||||||
|
|
||||||
print(f"Starting listener for user {user.id} ({user.username})...")
|
logger.info(f"Starting listener for user {user.id} ({user.username})...")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
tokens = security.decrypt_tokens(user.encrypted_tokens)
|
tokens = security.decrypt_tokens(user.encrypted_tokens)
|
||||||
@@ -50,15 +53,15 @@ class ListenerManager:
|
|||||||
self.active_listeners[user.id] = {"task": task, "bot": bot}
|
self.active_listeners[user.id] = {"task": task, "bot": bot}
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# This will catch errors during bot instantiation (e.g., bad token)
|
# This will catch errors during bot instantiation (e.g., bad token)
|
||||||
print(f"ERROR: Failed to instantiate or start listener for user {user.id}: {e}")
|
logger.error(f"Failed to instantiate or start listener for user {user.id}: {e}")
|
||||||
|
|
||||||
async def stop_listener_for_user(self, user_id: int):
|
async def stop_listener_for_user(self, user_id: int):
|
||||||
"""Stops a chat listener for a given user."""
|
"""Stops a chat listener for a given user."""
|
||||||
if user_id not in self.active_listeners:
|
if user_id not in self.active_listeners:
|
||||||
print(f"No active listener found for user {user_id}.")
|
logger.info(f"No active listener found for user {user_id}.")
|
||||||
return
|
return
|
||||||
|
|
||||||
print(f"Stopping listener for user {user_id}...")
|
logger.info(f"Stopping listener for user {user_id}...")
|
||||||
listener_info = self.active_listeners.pop(user_id)
|
listener_info = self.active_listeners.pop(user_id)
|
||||||
task = listener_info["task"]
|
task = listener_info["task"]
|
||||||
bot = listener_info["bot"]
|
bot = listener_info["bot"]
|
||||||
@@ -73,4 +76,4 @@ class ListenerManager:
|
|||||||
try:
|
try:
|
||||||
await task
|
await task
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
print(f"Listener for user {user_id} successfully stopped.")
|
logger.info(f"Listener for user {user_id} successfully stopped.")
|
||||||
17
main.py
17
main.py
@@ -1,4 +1,5 @@
|
|||||||
import os
|
import os
|
||||||
|
import logging
|
||||||
import asyncio
|
import asyncio
|
||||||
from fastapi import FastAPI, Request, Depends, HTTPException
|
from fastapi import FastAPI, Request, Depends, HTTPException
|
||||||
from uvicorn.middleware.proxy_headers import ProxyHeadersMiddleware
|
from uvicorn.middleware.proxy_headers import ProxyHeadersMiddleware
|
||||||
@@ -25,9 +26,13 @@ BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
|||||||
STATIC_DIR = os.path.join(BASE_DIR, "static")
|
STATIC_DIR = os.path.join(BASE_DIR, "static")
|
||||||
TEMPLATES_DIR = os.path.join(BASE_DIR, "templates")
|
TEMPLATES_DIR = os.path.join(BASE_DIR, "templates")
|
||||||
|
|
||||||
|
# --- Logging Configuration ---
|
||||||
|
logging.basicConfig(level=logging.INFO)
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
async def background_listener_startup(app: FastAPI):
|
async def background_listener_startup(app: FastAPI):
|
||||||
"""A non-blocking task to start listeners after the app has started."""
|
"""A non-blocking task to start listeners after the app has started."""
|
||||||
print("Background task: Starting listeners for all users...")
|
logger.info("Background task: Starting listeners for all users...")
|
||||||
db = SessionLocal()
|
db = SessionLocal()
|
||||||
users = db.query(models.User).all()
|
users = db.query(models.User).all()
|
||||||
db.close()
|
db.close()
|
||||||
@@ -36,23 +41,23 @@ async def background_listener_startup(app: FastAPI):
|
|||||||
try:
|
try:
|
||||||
await app.state.listener_manager.start_listener_for_user(user, app.state.websocket_manager)
|
await app.state.listener_manager.start_listener_for_user(user, app.state.websocket_manager)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"ERROR: Failed to start listener for user {user.id} ({user.username}): {e}")
|
logger.error(f"Failed to start listener for user {user.id} ({user.username}): {e}")
|
||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def lifespan(app: FastAPI):
|
async def lifespan(app: FastAPI):
|
||||||
# This code runs on startup
|
# This code runs on startup
|
||||||
print("Application startup: Creating database tables...")
|
logger.info("Application startup: Creating database tables...")
|
||||||
app.state.websocket_manager = WebSocketManager()
|
app.state.websocket_manager = WebSocketManager()
|
||||||
app.state.listener_manager = ListenerManager()
|
app.state.listener_manager = ListenerManager()
|
||||||
models.Base.metadata.create_all(bind=engine)
|
models.Base.metadata.create_all(bind=engine)
|
||||||
print("Application startup: Database tables created.")
|
logger.info("Application startup: Database tables created.")
|
||||||
|
|
||||||
# Decouple listener startup from the main application startup
|
# Decouple listener startup from the main application startup
|
||||||
asyncio.create_task(background_listener_startup(app))
|
asyncio.create_task(background_listener_startup(app))
|
||||||
yield
|
yield
|
||||||
|
|
||||||
# This code runs on shutdown
|
# This code runs on shutdown
|
||||||
print("Application shutdown: Stopping all listeners...")
|
logger.info("Application shutdown: Stopping all listeners...")
|
||||||
manager = app.state.listener_manager
|
manager = app.state.listener_manager
|
||||||
# Create a copy of keys to avoid runtime errors from changing dict size
|
# Create a copy of keys to avoid runtime errors from changing dict size
|
||||||
for user_id in list(manager.active_listeners.keys()):
|
for user_id in list(manager.active_listeners.keys()):
|
||||||
@@ -203,4 +208,4 @@ async def websocket_endpoint(websocket: WebSocket, user_id: int):
|
|||||||
await websocket.receive_text()
|
await websocket.receive_text()
|
||||||
except Exception:
|
except Exception:
|
||||||
manager.disconnect(user_id, websocket)
|
manager.disconnect(user_id, websocket)
|
||||||
print(f"WebSocket for user {user_id} disconnected.")
|
logger.info(f"WebSocket for user {user_id} disconnected.")
|
||||||
Reference in New Issue
Block a user