229 lines
9.6 KiB
Python
229 lines
9.6 KiB
Python
import pytchat
|
|
import os
|
|
import re
|
|
import json
|
|
from datetime import datetime
|
|
import time
|
|
import threading
|
|
import sys
|
|
from flask import Flask, jsonify, request, send_from_directory
|
|
from flask_cors import CORS
|
|
from rich.console import Console # Keep rich.console for internal logging, not for main output
|
|
|
|
console = Console() # Use a separate console for internal logging
|
|
|
|
# Define the emoji pattern and coloring function
|
|
emoji_pattern = re.compile(
|
|
"["
|
|
"\U0001F600-\U0001F64F" # emoticons
|
|
"\U0001F300-\U0001F5FF" # symbols & pictographs
|
|
"\U0001F680-\U0001F6FF" # transport & map symbols
|
|
"\U0001F1E0-\U0001F1FF" # flags (iOS)
|
|
"\u2600-\u26FF\u2700-\u27BF" # Miscellaneous Symbols and Dingbats
|
|
"\u2B50\u2B55\u3030\u303D\u3297\u3299" # Other common emojis
|
|
"\U00002500-\U00002BEF" # Chinese/Japanese characters (often used for emojis)
|
|
"\U00002702-\U000027B0"
|
|
"\U000024C2-\U0001F251"
|
|
"]+"
|
|
"\uFE0F?" # Variation Selector
|
|
"\U0001F3FB-\U0001F3FF?" # Skin tones
|
|
"\u200D?" # Zero Width Joiner
|
|
"\u20E3?" # Combining Enclosing Keycap
|
|
, flags=re.UNICODE)
|
|
|
|
def colour_emoji(txt):
|
|
return emoji_pattern.sub(r'[magenta]\g<0>[/magenta]', txt)
|
|
|
|
# --- User Color Management ---
|
|
USER_COLORS_FILE = "user_colors.json" # This file will be in the overlay project directory
|
|
|
|
# A palette of distinct colors for usernames
|
|
COLOR_PALETTE = [
|
|
"#FF0000", "#00FF00", "#0000FF", "#FFFF00", "#00FFFF", "#FF00FF",
|
|
"#FFA500", "#800080", "#008000", "#FFC0CB", "#808000", "#008080",
|
|
"#C0C0C0", "#800000", "#000080", "#FFD700", "#ADFF2F", "#FF69B4"
|
|
]
|
|
|
|
user_color_map = {}
|
|
|
|
def load_user_colors():
|
|
global user_color_map
|
|
if os.path.exists(USER_COLORS_FILE):
|
|
try:
|
|
with open(USER_COLORS_FILE, 'r') as f:
|
|
user_color_map = json.load(f)
|
|
except json.JSONDecodeError:
|
|
console.print(f"[red]Error loading {USER_COLORS_FILE}. File might be corrupted. Starting with empty colors.[/red]")
|
|
user_color_map = {}
|
|
|
|
def save_user_colors():
|
|
with open(USER_COLORS_FILE, 'w') as f:
|
|
json.dump(user_color_map, f, indent=4)
|
|
|
|
def get_user_color(author_id):
|
|
global user_color_map
|
|
if author_id not in user_color_map:
|
|
used_colors = set(user_color_map.values())
|
|
available_colors = [color for color in COLOR_PALETTE if color not in used_colors]
|
|
|
|
if available_colors:
|
|
next_color_index = len(user_color_map) % len(available_colors)
|
|
user_color_map[author_id] = available_colors[next_color_index]
|
|
else:
|
|
next_color_index = len(user_color_map) % len(COLOR_PALETTE)
|
|
user_color_map[author_id] = COLOR_PALETTE[next_color_index]
|
|
console.print("[yellow]Warning: All colors in palette are reserved. Cycling through existing colors.[/yellow]")
|
|
|
|
save_user_colors()
|
|
return user_color_map[author_id]
|
|
|
|
# --- Flask App Setup ---
|
|
# Determine if running as a PyInstaller bundle
|
|
if getattr(sys, 'frozen', False):
|
|
# If the application is run as a bundle, the PyInstaller bootloader
|
|
# extends the sys.path by a temporary directory and sets sys._MEIPASS
|
|
base_dir = sys._MEIPASS
|
|
else:
|
|
base_dir = os.path.abspath(os.path.dirname(__file__))
|
|
|
|
STATIC_FOLDER = os.path.join(base_dir, 'static')
|
|
TEMPLATE_FOLDER = os.path.join(base_dir, 'templates')
|
|
|
|
app = Flask(__name__, static_folder=STATIC_FOLDER, template_folder=TEMPLATE_FOLDER)
|
|
CORS(app) # Enable CORS for all routes
|
|
chat_messages = [] # Global list to store chat messages for the web overlay
|
|
chat_messages_lock = threading.Lock() # Lock for thread-safe access to chat_messages
|
|
|
|
@app.route('/')
|
|
def serve_index():
|
|
return send_from_directory(app.template_folder, 'index.html')
|
|
|
|
@app.route('/<path:filename>')
|
|
def serve_static(filename):
|
|
# Serve static files (CSS, JS) from the static folder
|
|
return send_from_directory(app.static_folder, filename)
|
|
|
|
@app.route('/chat')
|
|
def get_chat():
|
|
with chat_messages_lock:
|
|
# console.print(f"[blue]Sending {len(chat_messages)} messages to frontend.[/blue]") # Suppressed for cleaner output
|
|
return jsonify(chat_messages)
|
|
|
|
# --- Chat Scraper Thread ---
|
|
def chat_scraper_thread(video_id):
|
|
global chat_messages
|
|
load_user_colors() # Load colors once at startup of the scraper thread
|
|
|
|
try:
|
|
livechat = pytchat.create(video_id=video_id)
|
|
# console.print(f"[green]Chat scraper started for video ID: {video_id}[/green]") # Suppressed for cleaner output
|
|
|
|
message_count = 0
|
|
while livechat.is_alive():
|
|
for c in livechat.get().sync_items():
|
|
author_display_name = c.author.name
|
|
author_channel_id = c.author.channelId
|
|
message_text = c.message
|
|
|
|
# Mapping for common text-based emotes to Unicode emojis
|
|
# This dictionary should be defined once, outside the loop for efficiency
|
|
emote_to_unicode_map = {
|
|
":smiling_face_with_heart_eyes:": "😍",
|
|
":kissing_face:": "😗",
|
|
":grinning_face:": "😀",
|
|
":thumbs_up:": "👍",
|
|
":thumbs_down:": "👎",
|
|
":fire:": "🔥",
|
|
":heart:": "❤️",
|
|
":100:": "💯",
|
|
":clap:": "👏",
|
|
":thinking_face:": "🤔",
|
|
":face_with_tears_of_joy:": "😂",
|
|
":rolling_on_the_floor_laughing:": "🤣",
|
|
":slightly_smiling_face:": "🙂",
|
|
":winking_face:": "😉",
|
|
":pensive_face:": "😔",
|
|
":angry_face:": "😠",
|
|
":crying_face:": "😢",
|
|
":loudly_crying_face:": "😭",
|
|
":face_with_open_mouth:": "😮",
|
|
":hugging_face:": "🤗",
|
|
":star-struck:": "🤩",
|
|
":zany_face:": "🤪",
|
|
":face_with_raised_eyebrow:": "🤨",
|
|
":shushing_face:": "🤫",
|
|
":face_with_hand_over_mouth:": "🤭",
|
|
":face_with_symbols_on_mouth:": "🤬",
|
|
":exploding_head:": "🤯",
|
|
":face_vomiting:": "🤮",
|
|
":sneezing_face:": "🤧",
|
|
":hot_face:": "🥵",
|
|
":cold_face:": "🥶",
|
|
":woozy_face:": "🥴",
|
|
":dizzy_face:": "😵",
|
|
":face_with_monocle:": "🧐",
|
|
":cowboy_hat_face:": "🤠",
|
|
":partying_face:": "🥳",
|
|
":disguised_face:": "🥸",
|
|
":smiling_face_with_tear:": "🥲",
|
|
":pleading_face:": "🥺",
|
|
":face_holding_back_tears:": "🥹",
|
|
":saluting_face:": "🫡",
|
|
":melting_face:": "🫠",
|
|
":face_with_peeking_eye:": "🫣",
|
|
":face_with_diagonal_mouth:": "🫤",
|
|
":dotted_line_face:": "🫥",
|
|
":face_with_open_eyes_and_hand_over_mouth:": "🫢",
|
|
":face_with_head_shaking_horizontally:": "🫨",
|
|
":face_with_head_shaking_vertically:": "🫸",
|
|
}
|
|
|
|
def replace_emote_with_unicode(match):
|
|
emote_text = match.group(0)
|
|
return emote_to_unicode_map.get(emote_text, emote_text)
|
|
|
|
message_text = re.sub(r'(:[a-zA-Z0-9_-]+:)', replace_emote_with_unicode, message_text)
|
|
|
|
# Now, apply magenta coloring to the *actual* Unicode emojis
|
|
message_text = colour_emoji(message_text)
|
|
|
|
user_color = get_user_color(author_channel_id)
|
|
|
|
# Store message in a format suitable for JSON and web display
|
|
with chat_messages_lock:
|
|
chat_messages.append({
|
|
"timestamp": datetime.now().strftime("%H:%M:%S"),
|
|
"author": author_display_name,
|
|
"author_color": user_color,
|
|
"message": message_text
|
|
})
|
|
# Keep only the last N messages to prevent memory issues
|
|
if len(chat_messages) > 100: # Keep last 100 messages
|
|
chat_messages = chat_messages[-100:]
|
|
|
|
message_count += 1
|
|
time.sleep(1) # Poll every second to avoid excessive CPU usage
|
|
except Exception as e:
|
|
console.print(f"[red]Chat scraper error: {e}[/red]")
|
|
# finally:
|
|
# console.print("[yellow]Chat scraper stopped.[/yellow]") # Suppressed for cleaner output
|
|
|
|
if __name__ == '__main__':
|
|
# This part will be run when backend.py is executed directly
|
|
video_id = input("Enter the YouTube Live Stream Video ID for the overlay: ")
|
|
|
|
# Suppress Flask's default access logs
|
|
import logging
|
|
log = logging.getLogger('werkzeug')
|
|
log.setLevel(logging.ERROR) # Only log errors, not access messages
|
|
|
|
# Run the Flask app
|
|
flask_thread = threading.Thread(target=lambda: app.run(debug=False, host='127.0.0.1', port=5000))
|
|
flask_thread.daemon = True # Daemonize thread so it exits when main thread exits
|
|
flask_thread.start()
|
|
console.print(f"[green]Starting Flask server on http://127.0.0.1:5000[/green]")
|
|
console.print(f"[yellow]Overlay URL: http://127.0.0.1:5000/[/yellow]")
|
|
console.print("[yellow]Please copy this URL and paste it into your browser or streaming software (e.g., OBS Studio Browser Source).[/yellow]")
|
|
|
|
# Run the chat scraper in the main thread
|
|
chat_scraper_thread(video_id) |