Files
youtube-chat-overlay/overlay_backend.py

229 lines
9.6 KiB
Python

import pytchat
import os
import re
import json
from datetime import datetime
import time
import threading
import sys
from flask import Flask, jsonify, request, send_from_directory
from flask_cors import CORS
from rich.console import Console # Keep rich.console for internal logging, not for main output
console = Console() # Use a separate console for internal logging
# Define the emoji pattern and coloring function
emoji_pattern = re.compile(
"["
"\U0001F600-\U0001F64F" # emoticons
"\U0001F300-\U0001F5FF" # symbols & pictographs
"\U0001F680-\U0001F6FF" # transport & map symbols
"\U0001F1E0-\U0001F1FF" # flags (iOS)
"\u2600-\u26FF\u2700-\u27BF" # Miscellaneous Symbols and Dingbats
"\u2B50\u2B55\u3030\u303D\u3297\u3299" # Other common emojis
"\U00002500-\U00002BEF" # Chinese/Japanese characters (often used for emojis)
"\U00002702-\U000027B0"
"\U000024C2-\U0001F251"
"]+"
"\uFE0F?" # Variation Selector
"\U0001F3FB-\U0001F3FF?" # Skin tones
"\u200D?" # Zero Width Joiner
"\u20E3?" # Combining Enclosing Keycap
, flags=re.UNICODE)
def colour_emoji(txt):
return emoji_pattern.sub(r'[magenta]\g<0>[/magenta]', txt)
# --- User Color Management ---
USER_COLORS_FILE = "user_colors.json" # This file will be in the overlay project directory
# A palette of distinct colors for usernames
COLOR_PALETTE = [
"#FF0000", "#00FF00", "#0000FF", "#FFFF00", "#00FFFF", "#FF00FF",
"#FFA500", "#800080", "#008000", "#FFC0CB", "#808000", "#008080",
"#C0C0C0", "#800000", "#000080", "#FFD700", "#ADFF2F", "#FF69B4"
]
user_color_map = {}
def load_user_colors():
global user_color_map
if os.path.exists(USER_COLORS_FILE):
try:
with open(USER_COLORS_FILE, 'r') as f:
user_color_map = json.load(f)
except json.JSONDecodeError:
console.print(f"[red]Error loading {USER_COLORS_FILE}. File might be corrupted. Starting with empty colors.[/red]")
user_color_map = {}
def save_user_colors():
with open(USER_COLORS_FILE, 'w') as f:
json.dump(user_color_map, f, indent=4)
def get_user_color(author_id):
global user_color_map
if author_id not in user_color_map:
used_colors = set(user_color_map.values())
available_colors = [color for color in COLOR_PALETTE if color not in used_colors]
if available_colors:
next_color_index = len(user_color_map) % len(available_colors)
user_color_map[author_id] = available_colors[next_color_index]
else:
next_color_index = len(user_color_map) % len(COLOR_PALETTE)
user_color_map[author_id] = COLOR_PALETTE[next_color_index]
console.print("[yellow]Warning: All colors in palette are reserved. Cycling through existing colors.[/yellow]")
save_user_colors()
return user_color_map[author_id]
# --- Flask App Setup ---
# Determine if running as a PyInstaller bundle
if getattr(sys, 'frozen', False):
# If the application is run as a bundle, the PyInstaller bootloader
# extends the sys.path by a temporary directory and sets sys._MEIPASS
base_dir = sys._MEIPASS
else:
base_dir = os.path.abspath(os.path.dirname(__file__))
STATIC_FOLDER = os.path.join(base_dir, 'static')
TEMPLATE_FOLDER = os.path.join(base_dir, 'templates')
app = Flask(__name__, static_folder=STATIC_FOLDER, template_folder=TEMPLATE_FOLDER)
CORS(app) # Enable CORS for all routes
chat_messages = [] # Global list to store chat messages for the web overlay
chat_messages_lock = threading.Lock() # Lock for thread-safe access to chat_messages
@app.route('/')
def serve_index():
return send_from_directory(app.template_folder, 'index.html')
@app.route('/<path:filename>')
def serve_static(filename):
# Serve static files (CSS, JS) from the static folder
return send_from_directory(app.static_folder, filename)
@app.route('/chat')
def get_chat():
with chat_messages_lock:
# console.print(f"[blue]Sending {len(chat_messages)} messages to frontend.[/blue]") # Suppressed for cleaner output
return jsonify(chat_messages)
# --- Chat Scraper Thread ---
def chat_scraper_thread(video_id):
global chat_messages
load_user_colors() # Load colors once at startup of the scraper thread
try:
livechat = pytchat.create(video_id=video_id)
# console.print(f"[green]Chat scraper started for video ID: {video_id}[/green]") # Suppressed for cleaner output
message_count = 0
while livechat.is_alive():
for c in livechat.get().sync_items():
author_display_name = c.author.name
author_channel_id = c.author.channelId
message_text = c.message
# Mapping for common text-based emotes to Unicode emojis
# This dictionary should be defined once, outside the loop for efficiency
emote_to_unicode_map = {
":smiling_face_with_heart_eyes:": "😍",
":kissing_face:": "😗",
":grinning_face:": "😀",
":thumbs_up:": "👍",
":thumbs_down:": "👎",
":fire:": "🔥",
":heart:": "❤️",
":100:": "💯",
":clap:": "👏",
":thinking_face:": "🤔",
":face_with_tears_of_joy:": "😂",
":rolling_on_the_floor_laughing:": "🤣",
":slightly_smiling_face:": "🙂",
":winking_face:": "😉",
":pensive_face:": "😔",
":angry_face:": "😠",
":crying_face:": "😢",
":loudly_crying_face:": "😭",
":face_with_open_mouth:": "😮",
":hugging_face:": "🤗",
":star-struck:": "🤩",
":zany_face:": "🤪",
":face_with_raised_eyebrow:": "🤨",
":shushing_face:": "🤫",
":face_with_hand_over_mouth:": "🤭",
":face_with_symbols_on_mouth:": "🤬",
":exploding_head:": "🤯",
":face_vomiting:": "🤮",
":sneezing_face:": "🤧",
":hot_face:": "🥵",
":cold_face:": "🥶",
":woozy_face:": "🥴",
":dizzy_face:": "😵",
":face_with_monocle:": "🧐",
":cowboy_hat_face:": "🤠",
":partying_face:": "🥳",
":disguised_face:": "🥸",
":smiling_face_with_tear:": "🥲",
":pleading_face:": "🥺",
":face_holding_back_tears:": "🥹",
":saluting_face:": "🫡",
":melting_face:": "🫠",
":face_with_peeking_eye:": "🫣",
":face_with_diagonal_mouth:": "🫤",
":dotted_line_face:": "🫥",
":face_with_open_eyes_and_hand_over_mouth:": "🫢",
":face_with_head_shaking_horizontally:": "🫨",
":face_with_head_shaking_vertically:": "🫸",
}
def replace_emote_with_unicode(match):
emote_text = match.group(0)
return emote_to_unicode_map.get(emote_text, emote_text)
message_text = re.sub(r'(:[a-zA-Z0-9_-]+:)', replace_emote_with_unicode, message_text)
# Now, apply magenta coloring to the *actual* Unicode emojis
message_text = colour_emoji(message_text)
user_color = get_user_color(author_channel_id)
# Store message in a format suitable for JSON and web display
with chat_messages_lock:
chat_messages.append({
"timestamp": datetime.now().strftime("%H:%M:%S"),
"author": author_display_name,
"author_color": user_color,
"message": message_text
})
# Keep only the last N messages to prevent memory issues
if len(chat_messages) > 100: # Keep last 100 messages
chat_messages = chat_messages[-100:]
message_count += 1
time.sleep(1) # Poll every second to avoid excessive CPU usage
except Exception as e:
console.print(f"[red]Chat scraper error: {e}[/red]")
# finally:
# console.print("[yellow]Chat scraper stopped.[/yellow]") # Suppressed for cleaner output
if __name__ == '__main__':
# This part will be run when backend.py is executed directly
video_id = input("Enter the YouTube Live Stream Video ID for the overlay: ")
# Suppress Flask's default access logs
import logging
log = logging.getLogger('werkzeug')
log.setLevel(logging.ERROR) # Only log errors, not access messages
# Run the Flask app
flask_thread = threading.Thread(target=lambda: app.run(debug=False, host='127.0.0.1', port=5000))
flask_thread.daemon = True # Daemonize thread so it exits when main thread exits
flask_thread.start()
console.print(f"[green]Starting Flask server on http://127.0.0.1:5000[/green]")
console.print(f"[yellow]Overlay URL: http://127.0.0.1:5000/[/yellow]")
console.print("[yellow]Please copy this URL and paste it into your browser or streaming software (e.g., OBS Studio Browser Source).[/yellow]")
# Run the chat scraper in the main thread
chat_scraper_thread(video_id)