Compare commits
12 Commits
830333cf6e
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 03e8019f0d | |||
| 00e03209de | |||
| d343375790 | |||
| 01469f9b77 | |||
| 0801a3413d | |||
| 81a8e3a56b | |||
| b29290d6ba | |||
| e41c64fee6 | |||
| 0a68509f46 | |||
| 60fc22d112 | |||
| a94d9293f3 | |||
| 88ee64bf04 |
26
README.md
26
README.md
@@ -1,7 +1,19 @@
|
||||
# YouTube Live Chat in Terminal
|
||||
|
||||
---
|
||||
|
||||
> **ARCHIVED PROJECT**
|
||||
>
|
||||
> This project is considered archived and is no longer in active development.
|
||||
>
|
||||
> **Reason:** It relies on the YouTube Data API v3, which has significant quota limitations (10,000 points/day). Continuous polling for live chat messages quickly exhausts this quota, making this approach unsustainable for long-term or frequent use.
|
||||
>
|
||||
> **Recommendation:** For a more sustainable and quota-friendly solution, please refer to the **`youtube-chat-webhook-v2`** project. That project explores alternative methods for fetching YouTube live chat data that do not rely on the official API and its restrictive quotas.
|
||||
|
||||
---
|
||||
|
||||
## Project Description
|
||||
This project aims to provide a customizable way to view YouTube Live Chat directly in your terminal. Designed primarily for Linux content creators, it offers an alternative to YouTube's default chat interface, with future plans for enhanced features like custom styling and a web overlay.
|
||||
This project aims to provide a customizable way to view YouTube Live Chat directly in your terminal. It is implemented in `main.py`.
|
||||
|
||||
## Features
|
||||
* **Real-time Live Chat Display:** Fetches and displays messages from YouTube Live Streams.
|
||||
@@ -16,15 +28,15 @@ This project aims to provide a customizable way to view YouTube Live Chat direct
|
||||
|
||||
## Setup Instructions
|
||||
|
||||
### 1. Clone the Repository (Future Step)
|
||||
Once this project is on GitHub, you would clone it using:
|
||||
### 1. Clone the Repository
|
||||
Clone the project from your Gitea instance:
|
||||
```bash
|
||||
git clone [repository-url]
|
||||
cd youtube_chat_terminal
|
||||
git clone https://gitea.ramforth.net/RamTech/youtube-chat-terminal.git
|
||||
cd youtube-chat-terminal
|
||||
```
|
||||
|
||||
### 2. Create and Activate a Python Virtual Environment
|
||||
It's highly recommended to use a virtual environment to manage project dependencies.
|
||||
### 2. Gitea Repository
|
||||
This project is hosted on your Gitea instance at: `https://gitea.ramforth.net/RamTech/youtube-chat-terminal`
|
||||
|
||||
```bash
|
||||
cd /home/joe/Cloud9/Documents/Obisdian/youtube_chat_terminal
|
||||
|
||||
203
main.py
Normal file
203
main.py
Normal file
@@ -0,0 +1,203 @@
|
||||
from datetime import datetime
|
||||
import re
|
||||
import os
|
||||
import google.oauth2.credentials
|
||||
import google_auth_oauthlib.flow
|
||||
import googleapiclient.discovery
|
||||
import googleapiclient.errors
|
||||
import time
|
||||
from rich.console import Console
|
||||
from rich.style import Style
|
||||
from rich.panel import Panel
|
||||
from rich.live import Live
|
||||
from rich.text import Text
|
||||
|
||||
# The CLIENT_SECRETS_FILE contains your Client ID and Client Secret.
|
||||
CLIENT_SECRETS_FILE = "client_secret.json"
|
||||
|
||||
# This scope allows read-only access to YouTube analytics data.
|
||||
SCOPES = ['https://www.googleapis.com/auth/youtube.readonly']
|
||||
API_SERVICE_NAME = 'youtube'
|
||||
API_VERSION = 'v3'
|
||||
|
||||
console = Console()
|
||||
|
||||
def get_authenticated_service():
|
||||
credentials = None
|
||||
|
||||
# Check if we have saved credentials
|
||||
if os.path.exists('token.json'):
|
||||
credentials = google.oauth2.credentials.Credentials.from_authorized_user_file('token.json', SCOPES)
|
||||
|
||||
# If no valid credentials available, let the user log in.
|
||||
if not credentials or not credentials.valid:
|
||||
if credentials and credentials.expired and credentials.refresh_token:
|
||||
credentials.refresh(google.auth.transport.requests.Request())
|
||||
else:
|
||||
flow = google_auth_oauthlib.flow.InstalledAppFlow.from_client_secrets_file(
|
||||
CLIENT_SECRETS_FILE, SCOPES)
|
||||
credentials = flow.run_local_server(port=0)
|
||||
# Save the credentials for the next run
|
||||
with open('token.json', 'w') as token:
|
||||
token.write(credentials.to_json())
|
||||
|
||||
return googleapiclient.discovery.build(
|
||||
API_SERVICE_NAME, API_VERSION, credentials=credentials)
|
||||
|
||||
def get_live_chat_id(youtube, video_id):
|
||||
retries = 0
|
||||
max_retries = 5
|
||||
while retries < max_retries:
|
||||
try:
|
||||
response = youtube.videos().list(
|
||||
part='liveStreamingDetails',
|
||||
id=video_id
|
||||
).execute()
|
||||
|
||||
if 'items' in response and response['items']:
|
||||
video = response['items'][0]
|
||||
if 'liveStreamingDetails' in video and 'activeLiveChatId' in video['liveStreamingDetails']:
|
||||
return video['liveStreamingDetails']['activeLiveChatId']
|
||||
return None
|
||||
except googleapiclient.errors.HttpError as e:
|
||||
if e.resp.status == 403 and "quotaExceeded" in str(e.content):
|
||||
console.print(f"[red]Quota exceeded. Retrying in {2**retries} seconds...[/red]")
|
||||
time.sleep(2**retries)
|
||||
retries += 1
|
||||
else:
|
||||
console.print(f"[red]An HTTP error {e.resp.status} occurred: {e.content}[/red]")
|
||||
return None
|
||||
console.print(f"[red]Failed to get live chat ID after {max_retries} retries due to quota issues.[/red]")
|
||||
return None
|
||||
|
||||
def fetch_live_chat_messages(youtube, live_chat_id, page_token=None):
|
||||
retries = 0
|
||||
max_retries = 5
|
||||
while retries < max_retries:
|
||||
try:
|
||||
request = youtube.liveChatMessages().list(
|
||||
liveChatId=live_chat_id,
|
||||
part='snippet,authorDetails',
|
||||
pageToken=page_token
|
||||
)
|
||||
response = request.execute()
|
||||
return response
|
||||
except googleapiclient.errors.HttpError as e:
|
||||
if e.resp.status == 403 and "quotaExceeded" in str(e.content):
|
||||
console.print(f"[red]Quota exceeded. Retrying in {2**retries} seconds...[/red]")
|
||||
time.sleep(2**retries)
|
||||
retries += 1
|
||||
else:
|
||||
console.print(f"[red]An HTTP error {e.resp.status} occurred: {e.content}[/red]")
|
||||
return None
|
||||
console.print(f"[red]Failed to fetch live chat messages after {max_retries} retries due to quota issues.[/red]")
|
||||
return None
|
||||
|
||||
def main():
|
||||
youtube = get_authenticated_service()
|
||||
console.print("[green]Successfully authenticated with YouTube API![/green]")
|
||||
|
||||
# Clear the terminal screen
|
||||
os.system('clear')
|
||||
|
||||
video_id = input("Enter the YouTube Live Stream Video ID: ")
|
||||
live_chat_id = get_live_chat_id(youtube, video_id)
|
||||
|
||||
if not live_chat_id:
|
||||
console.print("[red]Could not find an active live chat for the given video ID.[/red]")
|
||||
return
|
||||
|
||||
console.print(f"[green]Found live chat ID: {live_chat_id}[/green]")
|
||||
|
||||
# Setup chat logging
|
||||
log_dir = "chat_logs"
|
||||
os.makedirs(log_dir, exist_ok=True)
|
||||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
log_filename = os.path.join(log_dir, f"chat_{video_id}_{timestamp}.log")
|
||||
log_file = open(log_filename, "w", encoding="utf-8")
|
||||
console.print(f"[green]Chat will be logged to {log_filename}[/green]")
|
||||
|
||||
# Wait for 5 seconds, then clear the screen again
|
||||
time.sleep(5)
|
||||
os.system('clear')
|
||||
|
||||
next_page_token = None
|
||||
message_count = 0
|
||||
|
||||
try:
|
||||
while True:
|
||||
# Fetch new messages
|
||||
response = fetch_live_chat_messages(youtube, live_chat_id, next_page_token)
|
||||
if response:
|
||||
for item in response.get('items', []):
|
||||
author_display_name = item['authorDetails']['displayName']
|
||||
message_text = item['snippet']['displayMessage']
|
||||
|
||||
# Process emotes provided by the API
|
||||
if 'textMessageDetails' in item['snippet'] and 'emotes' in item['snippet']['textMessageDetails']:
|
||||
emotes = item['snippet']['textMessageDetails']['emotes']
|
||||
# Sort emotes by offset to replace them correctly from right to left
|
||||
emotes.sort(key=lambda x: x['offsets'][0]['startIndex'], reverse=True)
|
||||
|
||||
for emote in emotes:
|
||||
start = emote['offsets'][0]['startIndex']
|
||||
end = emote['offsets'][0]['endIndex'] + 1 # +1 because end index is inclusive
|
||||
emote_id = emote['emoteId']
|
||||
# Replace the original text with the emote ID surrounded by colons and styled
|
||||
message_text = message_text[:start] + f"[blue]:{emote_id}:[/blue]" + message_text[end:]
|
||||
|
||||
# After processing API-provided emotes, use regex for text-based emotes (e.g., :face-purple-sweating:)
|
||||
# This catches emotes that are just part of the messageText without explicit emote data.
|
||||
message_text = re.sub(r'(:[a-zA-Z0-9_-]+:)', r'[blue]\1[/blue]', message_text)
|
||||
|
||||
# Add coloring for standard emojis
|
||||
# This regex matches a broader range of common emoji unicode characters and sequences.
|
||||
# It's important to compile with re.UNICODE for proper Unicode character matching.
|
||||
emoji_pattern = re.compile(
|
||||
"(" # Start capturing group
|
||||
"\U0001F600-\U0001F64F" # emoticons
|
||||
"\U0001F300-\U0001F5FF" # symbols & pictographs
|
||||
"\U0001F680-\U0001F6FF" # transport & map symbols
|
||||
"\U0001F1E0-\U0001F1FF" # flags (iOS)
|
||||
"\U00002702-\U000027B0" # Dingbats
|
||||
"\U000024C2-\U0001F251" # Enclosed characters
|
||||
"\U0001F900-\U0001F9FF" # Supplemental Symbols and Pictographs
|
||||
"\U0000200D" # Zero Width Joiner (for emoji sequences)
|
||||
"\U0000FE0F" # Variation Selector-16 (for emoji presentation)
|
||||
")+"
|
||||
, re.UNICODE)
|
||||
message_text = emoji_pattern.sub(r'[magenta]\1[/magenta]', message_text)
|
||||
|
||||
# Alternate background styles
|
||||
background_style = Style(bgcolor="#2B2B2B") if message_count % 2 == 0 else Style(bgcolor="#3A3A3A")
|
||||
|
||||
# Simple color for username (can be expanded to unique colors per user)
|
||||
username_style = Style(color="#4CAF50", bold=True)
|
||||
|
||||
# Format message for terminal and log file
|
||||
formatted_message = f"{author_display_name}: {message_text}"
|
||||
console.print(formatted_message, style=background_style)
|
||||
log_file.write(f"{datetime.now().strftime("%H:%M:%S")} {formatted_message}\n")
|
||||
message_count += 1
|
||||
|
||||
next_page_token = response.get('nextPageToken')
|
||||
polling_interval_millis = response['pollingIntervalMillis']
|
||||
# Ensure a minimum sleep duration to conserve API quota
|
||||
sleep_time = max(5, polling_interval_millis / 1000.0)
|
||||
time.sleep(sleep_time)
|
||||
else:
|
||||
console.print("[yellow]No new messages or an error occurred. Retrying...[/yellow]")
|
||||
time.sleep(5) # Wait 5 seconds before retrying on error
|
||||
except KeyboardInterrupt:
|
||||
pass # Exit gracefully on Ctrl+C
|
||||
finally:
|
||||
log_file.close()
|
||||
save_log = input(f"\nDo you want to save the chat log to {log_filename}? (y/n): ").lower()
|
||||
if save_log != 'y' and save_log != 'yes':
|
||||
os.remove(log_filename)
|
||||
console.print(f"[red]Chat log not saved. {log_filename} deleted.[/red]")
|
||||
else:
|
||||
console.print(f"[green]Chat log saved to {log_filename}[/green]")
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
Reference in New Issue
Block a user