Feat: Implement exponential backoff for YouTube API calls to handle quota errors.

This commit is contained in:
2025-10-29 13:18:18 +01:00
parent d343375790
commit 00e03209de

68
main.py
View File

@@ -45,33 +45,53 @@ def get_authenticated_service():
API_SERVICE_NAME, API_VERSION, credentials=credentials) API_SERVICE_NAME, API_VERSION, credentials=credentials)
def get_live_chat_id(youtube, video_id): def get_live_chat_id(youtube, video_id):
try: retries = 0
response = youtube.videos().list( max_retries = 5
part='liveStreamingDetails', while retries < max_retries:
id=video_id try:
).execute() response = youtube.videos().list(
part='liveStreamingDetails',
id=video_id
).execute()
if 'items' in response and response['items']: if 'items' in response and response['items']:
video = response['items'][0] video = response['items'][0]
if 'liveStreamingDetails' in video and 'activeLiveChatId' in video['liveStreamingDetails']: if 'liveStreamingDetails' in video and 'activeLiveChatId' in video['liveStreamingDetails']:
return video['liveStreamingDetails']['activeLiveChatId'] return video['liveStreamingDetails']['activeLiveChatId']
return None return None
except googleapiclient.errors.HttpError as e: except googleapiclient.errors.HttpError as e:
console.print(f"[red]An HTTP error {e.resp.status} occurred: {e.content}[/red]") if e.resp.status == 403 and "quotaExceeded" in str(e.content):
return None console.print(f"[red]Quota exceeded. Retrying in {2**retries} seconds...[/red]")
time.sleep(2**retries)
retries += 1
else:
console.print(f"[red]An HTTP error {e.resp.status} occurred: {e.content}[/red]")
return None
console.print(f"[red]Failed to get live chat ID after {max_retries} retries due to quota issues.[/red]")
return None
def fetch_live_chat_messages(youtube, live_chat_id, page_token=None): def fetch_live_chat_messages(youtube, live_chat_id, page_token=None):
try: retries = 0
request = youtube.liveChatMessages().list( max_retries = 5
liveChatId=live_chat_id, while retries < max_retries:
part='snippet,authorDetails', try:
pageToken=page_token request = youtube.liveChatMessages().list(
) liveChatId=live_chat_id,
response = request.execute() part='snippet,authorDetails',
return response pageToken=page_token
except googleapiclient.errors.HttpError as e: )
console.print(f"[red]An HTTP error {e.resp.status} occurred: {e.content}[/red]") response = request.execute()
return None return response
except googleapiclient.errors.HttpError as e:
if e.resp.status == 403 and "quotaExceeded" in str(e.content):
console.print(f"[red]Quota exceeded. Retrying in {2**retries} seconds...[/red]")
time.sleep(2**retries)
retries += 1
else:
console.print(f"[red]An HTTP error {e.resp.status} occurred: {e.content}[/red]")
return None
console.print(f"[red]Failed to fetch live chat messages after {max_retries} retries due to quota issues.[/red]")
return None
def main(): def main():
youtube = get_authenticated_service() youtube = get_authenticated_service()