import pytchat import os import re import json from datetime import datetime import time import threading import sys from flask import Flask, jsonify, request, send_from_directory from flask_cors import CORS from rich.console import Console # Keep rich.console for internal logging, not for main output console = Console() # Use a separate console for internal logging # Define the emoji pattern and coloring function emoji_pattern = re.compile( "[" "\U0001F600-\U0001F64F" # emoticons "\U0001F300-\U0001F5FF" # symbols & pictographs "\U0001F680-\U0001F6FF" # transport & map symbols "\U0001F1E0-\U0001F1FF" # flags (iOS) "\u2600-\u26FF\u2700-\u27BF" # Miscellaneous Symbols and Dingbats "\u2B50\u2B55\u3030\u303D\u3297\u3299" # Other common emojis "\U00002500-\U00002BEF" # Chinese/Japanese characters (often used for emojis) "\U00002702-\U000027B0" "\U000024C2-\U0001F251" "]+" "\uFE0F?" # Variation Selector "\U0001F3FB-\U0001F3FF?" # Skin tones "\u200D?" # Zero Width Joiner "\u20E3?" # Combining Enclosing Keycap , flags=re.UNICODE) def colour_emoji(txt): return emoji_pattern.sub(r'[magenta]\g<0>[/magenta]', txt) # --- User Color Management --- USER_COLORS_FILE = "user_colors.json" # This file will be in the overlay project directory # A palette of distinct colors for usernames COLOR_PALETTE = [ "#FF0000", "#00FF00", "#0000FF", "#FFFF00", "#00FFFF", "#FF00FF", "#FFA500", "#800080", "#008000", "#FFC0CB", "#808000", "#008080", "#C0C0C0", "#800000", "#000080", "#FFD700", "#ADFF2F", "#FF69B4" ] user_color_map = {} def load_user_colors(): global user_color_map if os.path.exists(USER_COLORS_FILE): try: with open(USER_COLORS_FILE, 'r') as f: user_color_map = json.load(f) except json.JSONDecodeError: console.print(f"[red]Error loading {USER_COLORS_FILE}. File might be corrupted. Starting with empty colors.[/red]") user_color_map = {} def save_user_colors(): with open(USER_COLORS_FILE, 'w') as f: json.dump(user_color_map, f, indent=4) def get_user_color(author_id): global user_color_map if author_id not in user_color_map: used_colors = set(user_color_map.values()) available_colors = [color for color in COLOR_PALETTE if color not in used_colors] if available_colors: next_color_index = len(user_color_map) % len(available_colors) user_color_map[author_id] = available_colors[next_color_index] else: next_color_index = len(user_color_map) % len(COLOR_PALETTE) user_color_map[author_id] = COLOR_PALETTE[next_color_index] console.print("[yellow]Warning: All colors in palette are reserved. Cycling through existing colors.[/yellow]") save_user_colors() return user_color_map[author_id] # --- Flask App Setup --- # Determine if running as a PyInstaller bundle if getattr(sys, 'frozen', False): # If the application is run as a bundle, the PyInstaller bootloader # extends the sys.path by a temporary directory and sets sys._MEIPASS base_dir = sys._MEIPASS else: base_dir = os.path.abspath(os.path.dirname(__file__)) STATIC_FOLDER = os.path.join(base_dir, 'static') TEMPLATE_FOLDER = os.path.join(base_dir, 'templates') app = Flask(__name__, static_folder=STATIC_FOLDER, template_folder=TEMPLATE_FOLDER) CORS(app) # Enable CORS for all routes chat_messages = [] # Global list to store chat messages for the web overlay chat_messages_lock = threading.Lock() # Lock for thread-safe access to chat_messages @app.route('/') def serve_index(): return send_from_directory(app.template_folder, 'index.html') @app.route('/') def serve_static(filename): # Serve static files (CSS, JS) from the static folder return send_from_directory(app.static_folder, filename) @app.route('/chat') def get_chat(): with chat_messages_lock: # console.print(f"[blue]Sending {len(chat_messages)} messages to frontend.[/blue]") # Suppressed for cleaner output return jsonify(chat_messages) # --- Chat Scraper Thread --- def chat_scraper_thread(video_id): global chat_messages load_user_colors() # Load colors once at startup of the scraper thread try: livechat = pytchat.create(video_id=video_id) # console.print(f"[green]Chat scraper started for video ID: {video_id}[/green]") # Suppressed for cleaner output message_count = 0 while livechat.is_alive(): for c in livechat.get().sync_items(): author_display_name = c.author.name author_channel_id = c.author.channelId message_text = c.message # Mapping for common text-based emotes to Unicode emojis # This dictionary should be defined once, outside the loop for efficiency emote_to_unicode_map = { ":smiling_face_with_heart_eyes:": "๐Ÿ˜", ":kissing_face:": "๐Ÿ˜—", ":grinning_face:": "๐Ÿ˜€", ":thumbs_up:": "๐Ÿ‘", ":thumbs_down:": "๐Ÿ‘Ž", ":fire:": "๐Ÿ”ฅ", ":heart:": "โค๏ธ", ":100:": "๐Ÿ’ฏ", ":clap:": "๐Ÿ‘", ":thinking_face:": "๐Ÿค”", ":face_with_tears_of_joy:": "๐Ÿ˜‚", ":rolling_on_the_floor_laughing:": "๐Ÿคฃ", ":slightly_smiling_face:": "๐Ÿ™‚", ":winking_face:": "๐Ÿ˜‰", ":pensive_face:": "๐Ÿ˜”", ":angry_face:": "๐Ÿ˜ ", ":crying_face:": "๐Ÿ˜ข", ":loudly_crying_face:": "๐Ÿ˜ญ", ":face_with_open_mouth:": "๐Ÿ˜ฎ", ":hugging_face:": "๐Ÿค—", ":star-struck:": "๐Ÿคฉ", ":zany_face:": "๐Ÿคช", ":face_with_raised_eyebrow:": "๐Ÿคจ", ":shushing_face:": "๐Ÿคซ", ":face_with_hand_over_mouth:": "๐Ÿคญ", ":face_with_symbols_on_mouth:": "๐Ÿคฌ", ":exploding_head:": "๐Ÿคฏ", ":face_vomiting:": "๐Ÿคฎ", ":sneezing_face:": "๐Ÿคง", ":hot_face:": "๐Ÿฅต", ":cold_face:": "๐Ÿฅถ", ":woozy_face:": "๐Ÿฅด", ":dizzy_face:": "๐Ÿ˜ต", ":face_with_monocle:": "๐Ÿง", ":cowboy_hat_face:": "๐Ÿค ", ":partying_face:": "๐Ÿฅณ", ":disguised_face:": "๐Ÿฅธ", ":smiling_face_with_tear:": "๐Ÿฅฒ", ":pleading_face:": "๐Ÿฅบ", ":face_holding_back_tears:": "๐Ÿฅน", ":saluting_face:": "๐Ÿซก", ":melting_face:": "๐Ÿซ ", ":face_with_peeking_eye:": "๐Ÿซฃ", ":face_with_diagonal_mouth:": "๐Ÿซค", ":dotted_line_face:": "๐Ÿซฅ", ":face_with_open_eyes_and_hand_over_mouth:": "๐Ÿซข", ":face_with_head_shaking_horizontally:": "๐Ÿซจ", ":face_with_head_shaking_vertically:": "๐Ÿซธ", } def replace_emote_with_unicode(match): emote_text = match.group(0) return emote_to_unicode_map.get(emote_text, emote_text) message_text = re.sub(r'(:[a-zA-Z0-9_-]+:)', replace_emote_with_unicode, message_text) # Now, apply magenta coloring to the *actual* Unicode emojis message_text = colour_emoji(message_text) user_color = get_user_color(author_channel_id) # Store message in a format suitable for JSON and web display with chat_messages_lock: chat_messages.append({ "timestamp": datetime.now().strftime("%H:%M:%S"), "author": author_display_name, "author_color": user_color, "message": message_text }) # Keep only the last N messages to prevent memory issues if len(chat_messages) > 100: # Keep last 100 messages chat_messages = chat_messages[-100:] message_count += 1 time.sleep(1) # Poll every second to avoid excessive CPU usage except Exception as e: console.print(f"[red]Chat scraper error: {e}[/red]") # finally: # console.print("[yellow]Chat scraper stopped.[/yellow]") # Suppressed for cleaner output if __name__ == '__main__': # This part will be run when backend.py is executed directly video_id = input("Enter the YouTube Live Stream Video ID for the overlay: ") # Suppress Flask's default access logs import logging log = logging.getLogger('werkzeug') log.setLevel(logging.ERROR) # Only log errors, not access messages # Run the Flask app flask_thread = threading.Thread(target=lambda: app.run(debug=False, host='127.0.0.1', port=5000)) flask_thread.daemon = True # Daemonize thread so it exits when main thread exits flask_thread.start() console.print(f"[green]Starting Flask server on http://127.0.0.1:5000[/green]") console.print(f"[yellow]Overlay URL: http://127.0.0.1:5000/[/yellow]") console.print("[yellow]Please copy this URL and paste it into your browser or streaming software (e.g., OBS Studio Browser Source).[/yellow]") # Run the chat scraper in the main thread chat_scraper_thread(video_id)