Merge branch 'feature/httpx' into develop

This commit is contained in:
taizan-hokuto
2020-08-30 22:17:57 +09:00
20 changed files with 259 additions and 354 deletions

View File

@@ -1,9 +1,10 @@
import logging import logging
from . import mylogger from . import mylogger
headers = { headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.106 Safari/537.36'} 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.135 Safari/537.36',
}
def logger(module_name: str, loglevel=None): def logger(module_name: str, loglevel=logging.DEBUG):
module_logger = mylogger.get_logger(module_name, loglevel=loglevel) module_logger = mylogger.get_logger(module_name, loglevel=loglevel)
return module_logger return module_logger

View File

@@ -1,13 +1,13 @@
import aiohttp
import asyncio import asyncio
import httpx
import json import json
import signal import signal
import time import time
import traceback import traceback
import urllib.parse import urllib.parse
from aiohttp.client_exceptions import ClientConnectorError
from concurrent.futures import CancelledError
from asyncio import Queue from asyncio import Queue
from concurrent.futures import CancelledError
from .buffer import Buffer from .buffer import Buffer
from ..parser.live import Parser from ..parser.live import Parser
from .. import config from .. import config
@@ -22,7 +22,7 @@ MAX_RETRY = 10
class LiveChatAsync: class LiveChatAsync:
'''asyncio(aiohttp)を利用してYouTubeのライブ配信のチャットデータを取得する。 '''asyncioを利用してYouTubeのライブ配信のチャットデータを取得する。
Parameter Parameter
--------- ---------
@@ -161,11 +161,11 @@ class LiveChatAsync:
parameter for next chat data parameter for next chat data
''' '''
try: try:
async with aiohttp.ClientSession() as session: async with httpx.AsyncClient(http2=True) as client:
while(continuation and self._is_alive): while(continuation and self._is_alive):
continuation = await self._check_pause(continuation) continuation = await self._check_pause(continuation)
contents = await self._get_contents( contents = await self._get_contents(
continuation, session, headers) continuation, client, headers)
metadata, chatdata = self._parser.parse(contents) metadata, chatdata = self._parser.parse(contents)
timeout = metadata['timeoutMs'] / 1000 timeout = metadata['timeoutMs'] / 1000
@@ -210,7 +210,7 @@ class LiveChatAsync:
self._video_id, 3, self._topchat_only) self._video_id, 3, self._topchat_only)
return continuation return continuation
async def _get_contents(self, continuation, session, headers): async def _get_contents(self, continuation, client, headers):
'''Get 'continuationContents' from livechat json. '''Get 'continuationContents' from livechat json.
If contents is None at first fetching, If contents is None at first fetching,
try to fetch archive chat data. try to fetch archive chat data.
@@ -219,7 +219,7 @@ class LiveChatAsync:
------- -------
'continuationContents' which includes metadata & chatdata. 'continuationContents' which includes metadata & chatdata.
''' '''
livechat_json = await self._get_livechat_json(continuation, session, headers) livechat_json = await self._get_livechat_json(continuation, client, headers)
contents = self._parser.get_contents(livechat_json) contents = self._parser.get_contents(livechat_json)
if self._first_fetch: if self._first_fetch:
if contents is None or self._is_replay: if contents is None or self._is_replay:
@@ -229,18 +229,18 @@ class LiveChatAsync:
continuation = arcparam.getparam( continuation = arcparam.getparam(
self._video_id, self.seektime, self._topchat_only) self._video_id, self.seektime, self._topchat_only)
livechat_json = (await self._get_livechat_json( livechat_json = (await self._get_livechat_json(
continuation, session, headers)) continuation, client, headers))
reload_continuation = self._parser.reload_continuation( reload_continuation = self._parser.reload_continuation(
self._parser.get_contents(livechat_json)) self._parser.get_contents(livechat_json))
if reload_continuation: if reload_continuation:
livechat_json = (await self._get_livechat_json( livechat_json = (await self._get_livechat_json(
reload_continuation, session, headers)) reload_continuation, client, headers))
contents = self._parser.get_contents(livechat_json) contents = self._parser.get_contents(livechat_json)
self._is_replay = True self._is_replay = True
self._first_fetch = False self._first_fetch = False
return contents return contents
async def _get_livechat_json(self, continuation, session, headers): async def _get_livechat_json(self, continuation, client, headers):
''' '''
Get json which includes chat data. Get json which includes chat data.
''' '''
@@ -249,14 +249,13 @@ class LiveChatAsync:
status_code = 0 status_code = 0
url = f"https://www.youtube.com/{self._fetch_url}{continuation}&pbj=1" url = f"https://www.youtube.com/{self._fetch_url}{continuation}&pbj=1"
for _ in range(MAX_RETRY + 1): for _ in range(MAX_RETRY + 1):
async with session.get(url, headers=headers) as resp: try:
try: resp = await client.get(url, headers=headers)
text = await resp.text() livechat_json = resp.json()
livechat_json = json.loads(text) break
break except (httpx.HTTPError, json.JSONDecodeError):
except (ClientConnectorError, json.JSONDecodeError): await asyncio.sleep(1)
await asyncio.sleep(1) continue
continue
else: else:
self._logger.error(f"[{self._video_id}]" self._logger.error(f"[{self._video_id}]"
f"Exceeded retry count. status_code={status_code}") f"Exceeded retry count. status_code={status_code}")

View File

@@ -1,4 +1,4 @@
import requests import httpx
import json import json
import signal import signal
import time import time
@@ -153,10 +153,10 @@ class LiveChat:
parameter for next chat data parameter for next chat data
''' '''
try: try:
with requests.Session() as session: with httpx.Client(http2=True) as client:
while(continuation and self._is_alive): while(continuation and self._is_alive):
continuation = self._check_pause(continuation) continuation = self._check_pause(continuation)
contents = self._get_contents(continuation, session, headers) contents = self._get_contents(continuation, client, headers)
metadata, chatdata = self._parser.parse(contents) metadata, chatdata = self._parser.parse(contents)
timeout = metadata['timeoutMs'] / 1000 timeout = metadata['timeoutMs'] / 1000
chat_component = { chat_component = {
@@ -199,7 +199,7 @@ class LiveChat:
continuation = liveparam.getparam(self._video_id, 3) continuation = liveparam.getparam(self._video_id, 3)
return continuation return continuation
def _get_contents(self, continuation, session, headers): def _get_contents(self, continuation, client, headers):
'''Get 'continuationContents' from livechat json. '''Get 'continuationContents' from livechat json.
If contents is None at first fetching, If contents is None at first fetching,
try to fetch archive chat data. try to fetch archive chat data.
@@ -209,7 +209,7 @@ class LiveChat:
'continuationContents' which includes metadata & chat data. 'continuationContents' which includes metadata & chat data.
''' '''
livechat_json = ( livechat_json = (
self._get_livechat_json(continuation, session, headers) self._get_livechat_json(continuation, client, headers)
) )
contents = self._parser.get_contents(livechat_json) contents = self._parser.get_contents(livechat_json)
if self._first_fetch: if self._first_fetch:
@@ -219,18 +219,18 @@ class LiveChat:
self._fetch_url = "live_chat_replay/get_live_chat_replay?continuation=" self._fetch_url = "live_chat_replay/get_live_chat_replay?continuation="
continuation = arcparam.getparam( continuation = arcparam.getparam(
self._video_id, self.seektime, self._topchat_only) self._video_id, self.seektime, self._topchat_only)
livechat_json = (self._get_livechat_json(continuation, session, headers)) livechat_json = (self._get_livechat_json(continuation, client, headers))
reload_continuation = self._parser.reload_continuation( reload_continuation = self._parser.reload_continuation(
self._parser.get_contents(livechat_json)) self._parser.get_contents(livechat_json))
if reload_continuation: if reload_continuation:
livechat_json = (self._get_livechat_json( livechat_json = (self._get_livechat_json(
reload_continuation, session, headers)) reload_continuation, client, headers))
contents = self._parser.get_contents(livechat_json) contents = self._parser.get_contents(livechat_json)
self._is_replay = True self._is_replay = True
self._first_fetch = False self._first_fetch = False
return contents return contents
def _get_livechat_json(self, continuation, session, headers): def _get_livechat_json(self, continuation, client, headers):
''' '''
Get json which includes chat data. Get json which includes chat data.
''' '''
@@ -239,10 +239,9 @@ class LiveChat:
status_code = 0 status_code = 0
url = f"https://www.youtube.com/{self._fetch_url}{continuation}&pbj=1" url = f"https://www.youtube.com/{self._fetch_url}{continuation}&pbj=1"
for _ in range(MAX_RETRY + 1): for _ in range(MAX_RETRY + 1):
with session.get(url, headers=headers) as resp: with client:
try: try:
text = resp.text livechat_json = client.get(url, headers=headers).json()
livechat_json = json.loads(text)
break break
except json.JSONDecodeError: except json.JSONDecodeError:
time.sleep(1) time.sleep(1)

View File

@@ -1,6 +1,6 @@
import os import os
import re import re
import requests import httpx
from base64 import standard_b64encode from base64 import standard_b64encode
from .chat_processor import ChatProcessor from .chat_processor import ChatProcessor
from .default.processor import DefaultProcessor from .default.processor import DefaultProcessor
@@ -108,7 +108,7 @@ class HTMLArchiver(ChatProcessor):
for item in message_items) for item in message_items)
def _encode_img(self, url): def _encode_img(self, url):
resp = requests.get(url) resp = httpx.get(url)
return standard_b64encode(resp.content).decode() return standard_b64encode(resp.content).decode()
def _set_emoji_table(self, item: dict): def _set_emoji_table(self, item: dict):

View File

@@ -1,6 +1,5 @@
import aiohttp import httpx
import asyncio import asyncio
import json
from . import parser from . import parser
from . block import Block from . block import Block
from . worker import ExtractWorker from . worker import ExtractWorker
@@ -55,7 +54,7 @@ def ready_blocks(video_id, duration, div, callback):
raise ValueError raise ValueError
async def _get_blocks(video_id, duration, div, callback): async def _get_blocks(video_id, duration, div, callback):
async with aiohttp.ClientSession() as session: async with httpx.AsyncClient(http2=True) as session:
tasks = [_create_block(session, video_id, seektime, callback) tasks = [_create_block(session, video_id, seektime, callback)
for seektime in _split(-1, duration, div)] for seektime in _split(-1, duration, div)]
return await asyncio.gather(*tasks) return await asyncio.gather(*tasks)
@@ -65,9 +64,8 @@ def ready_blocks(video_id, duration, div, callback):
url = f"{REPLAY_URL}{quote(continuation)}&pbj=1" url = f"{REPLAY_URL}{quote(continuation)}&pbj=1"
for _ in range(MAX_RETRY_COUNT): for _ in range(MAX_RETRY_COUNT):
try: try:
async with session.get(url, headers=headers) as resp: resp = await session.get(url, headers=headers)
text = await resp.text() next_continuation, actions = parser.parse(resp.json())
next_continuation, actions = parser.parse(json.loads(text))
break break
except JSONDecodeError: except JSONDecodeError:
await asyncio.sleep(3) await asyncio.sleep(3)
@@ -106,7 +104,7 @@ def fetch_patch(callback, blocks, video_id):
) )
for block in blocks for block in blocks
] ]
async with aiohttp.ClientSession() as session: async with httpx.AsyncClient() as session:
tasks = [worker.run(session) for worker in workers] tasks = [worker.run(session) for worker in workers]
return await asyncio.gather(*tasks) return await asyncio.gather(*tasks)
@@ -114,9 +112,8 @@ def fetch_patch(callback, blocks, video_id):
url = f"{REPLAY_URL}{quote(continuation)}&pbj=1" url = f"{REPLAY_URL}{quote(continuation)}&pbj=1"
for _ in range(MAX_RETRY_COUNT): for _ in range(MAX_RETRY_COUNT):
try: try:
async with session.get(url, headers=config.headers) as resp: resp = await session.get(url, headers=config.headers)
chat_json = await resp.text() continuation, actions = parser.parse(resp.json())
continuation, actions = parser.parse(json.loads(chat_json))
break break
except JSONDecodeError: except JSONDecodeError:
await asyncio.sleep(3) await asyncio.sleep(3)

View File

@@ -1,6 +1,7 @@
from . block import Block from . block import Block
from . patch import fill, split from . patch import fill, split
from ... paramgen import arcparam from ... paramgen import arcparam
from typing import Tuple
class ExtractWorker: class ExtractWorker:
@@ -76,7 +77,7 @@ def _search_new_block(worker) -> Block:
return new_block return new_block
def _get_undone_block(blocks) -> (int, Block): def _get_undone_block(blocks) -> Tuple[int, Block]:
min_interval_ms = 120000 min_interval_ms = 120000
max_remaining = 0 max_remaining = 0
undone_block = None undone_block = None

View File

@@ -1,12 +1,12 @@
import aiohttp import httpx
import asyncio import asyncio
import json import json
from . import parser from . import parser
from . block import Block from . block import Block
from . worker import ExtractWorker from . worker import ExtractWorker
from . patch import Patch from . patch import Patch
from ... import config from ... import config
from ... paramgen import arcparam_mining as arcparam from ... paramgen import arcparam_mining as arcparam
from concurrent.futures import CancelledError from concurrent.futures import CancelledError
from urllib.parse import quote from urllib.parse import quote
@@ -14,10 +14,12 @@ from urllib.parse import quote
headers = config.headers headers = config.headers
REPLAY_URL = "https://www.youtube.com/live_chat_replay?continuation=" REPLAY_URL = "https://www.youtube.com/live_chat_replay?continuation="
INTERVAL = 1 INTERVAL = 1
def _split(start, end, count, min_interval_sec = 120):
def _split(start, end, count, min_interval_sec=120):
""" """
Split section from `start` to `end` into `count` pieces, Split section from `start` to `end` into `count` pieces,
and returns the beginning of each piece. and returns the beginning of each piece.
The `count` is adjusted so that the length of each piece The `count` is adjusted so that the length of each piece
is no smaller than `min_interval`. is no smaller than `min_interval`.
@@ -25,42 +27,43 @@ def _split(start, end, count, min_interval_sec = 120):
-------- --------
List of the offset of each block's first chat data. List of the offset of each block's first chat data.
""" """
if not (isinstance(start,int) or isinstance(start,float)) or \ if not (isinstance(start, int) or isinstance(start, float)) or \
not (isinstance(end,int) or isinstance(end,float)): not (isinstance(end, int) or isinstance(end, float)):
raise ValueError("start/end must be int or float") raise ValueError("start/end must be int or float")
if not isinstance(count,int): if not isinstance(count, int):
raise ValueError("count must be int") raise ValueError("count must be int")
if start>end: if start > end:
raise ValueError("end must be equal to or greater than start.") raise ValueError("end must be equal to or greater than start.")
if count<1: if count < 1:
raise ValueError("count must be equal to or greater than 1.") raise ValueError("count must be equal to or greater than 1.")
if (end-start)/count < min_interval_sec: if (end - start) / count < min_interval_sec:
count = int((end-start)/min_interval_sec) count = int((end - start) / min_interval_sec)
if count == 0 : count = 1 if count == 0:
interval= (end-start)/count count = 1
interval = (end - start) / count
if count == 1: if count == 1:
return [start] return [start]
return sorted( list(set( [int(start + interval*j) return sorted(list(set([int(start + interval * j)
for j in range(count) ]))) for j in range(count)])))
def ready_blocks(video_id, duration, div, callback): def ready_blocks(video_id, duration, div, callback):
if div <= 0: raise ValueError if div <= 0:
raise ValueError
async def _get_blocks( video_id, duration, div, callback): async def _get_blocks(video_id, duration, div, callback):
async with aiohttp.ClientSession() as session: async with httpx.ClientSession() as session:
tasks = [_create_block(session, video_id, seektime, callback) tasks = [_create_block(session, video_id, seektime, callback)
for seektime in _split(0, duration, div)] for seektime in _split(0, duration, div)]
return await asyncio.gather(*tasks) return await asyncio.gather(*tasks)
async def _create_block(session, video_id, seektime, callback): async def _create_block(session, video_id, seektime, callback):
continuation = arcparam.getparam(video_id, seektime = seektime) continuation = arcparam.getparam(video_id, seektime=seektime)
url=(f"{REPLAY_URL}{quote(continuation)}&playerOffsetMs=" url = (f"{REPLAY_URL}{quote(continuation)}&playerOffsetMs="
f"{int(seektime*1000)}&hidden=false&pbj=1") f"{int(seektime*1000)}&hidden=false&pbj=1")
async with session.get(url, headers = headers) as resp: async with session.get(url, headers=headers) as resp:
chat_json = await resp.text() chat_json = await resp.text()
if chat_json is None: if chat_json is None:
return return
@@ -70,39 +73,40 @@ def ready_blocks(video_id, duration, div, callback):
if callback: if callback:
callback(actions, INTERVAL) callback(actions, INTERVAL)
return Block( return Block(
continuation = continuation, continuation=continuation,
chat_data = actions, chat_data=actions,
first = first, first=first,
last = seektime, last=seektime,
seektime = seektime seektime=seektime
) )
""" """
fetch initial blocks. fetch initial blocks.
""" """
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
blocks = loop.run_until_complete( blocks = loop.run_until_complete(
_get_blocks(video_id, duration, div, callback)) _get_blocks(video_id, duration, div, callback))
return blocks return blocks
def fetch_patch(callback, blocks, video_id): def fetch_patch(callback, blocks, video_id):
async def _allocate_workers(): async def _allocate_workers():
workers = [ workers = [
ExtractWorker( ExtractWorker(
fetch = _fetch, block = block, fetch=_fetch, block=block,
blocks = blocks, video_id = video_id blocks=blocks, video_id=video_id
) )
for block in blocks for block in blocks
] ]
async with aiohttp.ClientSession() as session: async with httpx.ClientSession() as session:
tasks = [worker.run(session) for worker in workers] tasks = [worker.run(session) for worker in workers]
return await asyncio.gather(*tasks) return await asyncio.gather(*tasks)
async def _fetch(seektime,session) -> Patch: async def _fetch(seektime, session) -> Patch:
continuation = arcparam.getparam(video_id, seektime = seektime) continuation = arcparam.getparam(video_id, seektime=seektime)
url=(f"{REPLAY_URL}{quote(continuation)}&playerOffsetMs=" url = (f"{REPLAY_URL}{quote(continuation)}&playerOffsetMs="
f"{int(seektime*1000)}&hidden=false&pbj=1") f"{int(seektime*1000)}&hidden=false&pbj=1")
async with session.get(url,headers = config.headers) as resp: async with session.get(url, headers=config.headers) as resp:
chat_json = await resp.text() chat_json = await resp.text()
actions = [] actions = []
try: try:
@@ -113,21 +117,22 @@ def fetch_patch(callback, blocks, video_id):
pass pass
if callback: if callback:
callback(actions, INTERVAL) callback(actions, INTERVAL)
return Patch(chats = actions, continuation = continuation, return Patch(chats=actions, continuation=continuation,
seektime = seektime, last = seektime) seektime=seektime, last=seektime)
""" """
allocate workers and assign blocks. allocate workers and assign blocks.
""" """
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
try: try:
loop.run_until_complete(_allocate_workers()) loop.run_until_complete(_allocate_workers())
except CancelledError: except CancelledError:
pass pass
async def _shutdown(): async def _shutdown():
print("\nshutdown...") print("\nshutdown...")
tasks = [t for t in asyncio.all_tasks() tasks = [t for t in asyncio.all_tasks()
if t is not asyncio.current_task()] if t is not asyncio.current_task()]
for task in tasks: for task in tasks:
task.cancel() task.cancel()
try: try:
@@ -135,7 +140,7 @@ async def _shutdown():
except asyncio.CancelledError: except asyncio.CancelledError:
pass pass
def cancel(): def cancel():
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
loop.create_task(_shutdown()) loop.create_task(_shutdown())

View File

@@ -1,6 +1,6 @@
import json import json
import re import re
import requests import httpx
from .. import config from .. import config
from ..exceptions import InvalidVideoIdException from ..exceptions import InvalidVideoIdException
from ..util.extract_video_id import extract_video_id from ..util.extract_video_id import extract_video_id
@@ -85,7 +85,7 @@ class VideoInfo:
def _get_page_text(self, video_id): def _get_page_text(self, video_id):
url = f"https://www.youtube.com/embed/{video_id}" url = f"https://www.youtube.com/embed/{video_id}"
resp = requests.get(url, headers=headers) resp = httpx.get(url, headers=headers)
resp.raise_for_status() resp.raise_for_status()
return resp.text return resp.text

View File

@@ -1,11 +1,11 @@
import requests import httpx
import json import json
import datetime import datetime
from .. import config from .. import config
def extract(url): def extract(url):
_session = requests.Session() _session = httpx.Client(http2=True)
html = _session.get(url, headers=config.headers) html = _session.get(url, headers=config.headers)
with open(str(datetime.datetime.now().strftime('%Y-%m-%d %H-%M-%S') with open(str(datetime.datetime.now().strftime('%Y-%m-%d %H-%M-%S')
) + 'test.json', mode='w', encoding='utf-8') as f: ) + 'test.json', mode='w', encoding='utf-8') as f:

View File

@@ -1,5 +1,4 @@
aiohttp httpx==0.14.1
protobuf protobuf==3.13.0
pytz pytz
requests
urllib3 urllib3

View File

@@ -1,5 +1,4 @@
aioresponses
mock mock
mocker mocker
pytest pytest
pytest-mock pytest_httpx

View File

@@ -1,5 +1,5 @@
import json import json
import requests import httpx
import pytchat.config as config import pytchat.config as config
from pytchat.paramgen import arcparam from pytchat.paramgen import arcparam
from pytchat.parser.live import Parser from pytchat.parser.live import Parser
@@ -18,14 +18,15 @@ def test_arcparam_1(mocker):
def test_arcparam_2(mocker): def test_arcparam_2(mocker):
param = arcparam.getparam("SsjCnHOk-Sk", seektime=100) param = arcparam.getparam("SsjCnHOk-Sk", seektime=100)
url = f"https://www.youtube.com/live_chat_replay/get_live_chat_replay?continuation={param}&pbj=1" url = f"https://www.youtube.com/live_chat_replay/get_live_chat_replay?continuation={param}&pbj=1"
resp = requests.Session().get(url, headers=config.headers) resp = httpx.Client(http2=True).get(url, headers=config.headers)
jsn = json.loads(resp.text) jsn = json.loads(resp.text)
parser = Parser(is_replay=True) parser = Parser(is_replay=True)
contents = parser.get_contents(jsn) contents = parser.get_contents(jsn)
_ , chatdata = parser.parse(contents) _, chatdata = parser.parse(contents)
test_id = chatdata[0]["addChatItemAction"]["item"]["liveChatTextMessageRenderer"]["id"] test_id = chatdata[0]["addChatItemAction"]["item"]["liveChatTextMessageRenderer"]["id"]
assert test_id == "CjoKGkNMYXBzZTdudHVVQ0Zjc0IxZ0FkTnFnQjVREhxDSnlBNHV2bnR1VUNGV0dnd2dvZDd3NE5aZy0w" assert test_id == "CjoKGkNMYXBzZTdudHVVQ0Zjc0IxZ0FkTnFnQjVREhxDSnlBNHV2bnR1VUNGV0dnd2dvZDd3NE5aZy0w"
def test_arcparam_3(mocker): def test_arcparam_3(mocker):
param = arcparam.getparam("01234567890") param = arcparam.getparam("01234567890")
assert param == "op2w0wQmGhxDZzhLRFFvTE1ERXlNelExTmpjNE9UQWdBUT09SARgAXICCAE%3D" assert param == "op2w0wQmGhxDZzhLRFFvTE1ERXlNelExTmpjNE9UQWdBUT09SARgAXICCAE%3D"

View File

@@ -1,6 +1,6 @@
from pytchat.tool.mining import parser from pytchat.tool.mining import parser
import pytchat.config as config import pytchat.config as config
import requests import httpx
import json import json
from pytchat.paramgen import arcparam_mining as arcparam from pytchat.paramgen import arcparam_mining as arcparam
@@ -28,7 +28,7 @@ def test_arcparam_1(mocker):
def test_arcparam_2(mocker): def test_arcparam_2(mocker):
param = arcparam.getparam("PZz9NB0-Z64", 1) param = arcparam.getparam("PZz9NB0-Z64", 1)
url = f"https://www.youtube.com/live_chat_replay?continuation={param}&playerOffsetMs=1000&pbj=1" url = f"https://www.youtube.com/live_chat_replay?continuation={param}&playerOffsetMs=1000&pbj=1"
resp = requests.Session().get(url, headers=config.headers) resp = httpx.Client(http2=True).get(url, headers=config.headers)
jsn = json.loads(resp.text) jsn = json.loads(resp.text)
_, chatdata = parser.parse(jsn[1]) _, chatdata = parser.parse(jsn[1])
test_id = chatdata[0]["addChatItemAction"]["item"]["liveChatPaidMessageRenderer"]["id"] test_id = chatdata[0]["addChatItemAction"]["item"]["liveChatPaidMessageRenderer"]["id"]

View File

@@ -1,77 +0,0 @@
import aiohttp
import asyncio
import json
from pytchat.tool.extract import parser
import sys
import time
from aioresponses import aioresponses
from concurrent.futures import CancelledError
from pytchat.tool.extract import asyncdl
def _open_file(path):
with open(path,mode ='r',encoding = 'utf-8') as f:
return f.read()
def test_asyncdl_split():
ret = asyncdl._split(0,1000,1)
assert ret == [0]
ret = asyncdl._split(1000,1000,10)
assert ret == [1000]
ret = asyncdl._split(0,1000,5)
assert ret == [0,200,400,600,800]
ret = asyncdl._split(10.5, 700.3, 5)
assert ret == [10, 148, 286, 424, 562]
ret = asyncdl._split(0,500,5)
assert ret == [0,125,250,375]
ret = asyncdl._split(0,500,500)
assert ret == [0,125,250,375]
ret = asyncdl._split(-1,1000,5)
assert ret == [-1, 199, 399, 599, 799]
"""invalid argument order"""
try:
ret = asyncdl._split(500,0,5)
assert False
except ValueError:
assert True
"""invalid count"""
try:
ret = asyncdl._split(0,500,-1)
assert False
except ValueError:
assert True
try:
ret = asyncdl._split(0,500,0)
assert False
except ValueError:
assert True
"""invalid argument type"""
try:
ret = asyncdl._split(0,5000,5.2)
assert False
except ValueError:
assert True
try:
ret = asyncdl._split(0,5000,"test")
assert False
except ValueError:
assert True
try:
ret = asyncdl._split([0,1],5000,5)
assert False
except ValueError:
assert True

View File

@@ -1,60 +1,66 @@
import aiohttp
import asyncio
import json import json
import os, sys
import time
from pytchat.tool.extract import duplcheck from pytchat.tool.extract import duplcheck
from pytchat.tool.extract import parser from pytchat.tool.extract import parser
from pytchat.tool.extract.block import Block from pytchat.tool.extract.block import Block
from pytchat.tool.extract.duplcheck import _dump from pytchat.tool.extract.duplcheck import _dump
def _open_file(path):
with open(path,mode ='r',encoding = 'utf-8') as f:
return f.read()
def _open_file(path):
with open(path, mode='r', encoding='utf-8') as f:
return f.read()
def test_overlap(): def test_overlap():
""" """
test overlap data test overlap data
operation : [0] [2] [3] [4] -> last :align to end operation : [0] [2] [3] [4] -> last :align to end
[1] , [5] -> no change [1] , [5] -> no change
""" """
def load_chatdata(filename): def load_chatdata(filename):
return parser.parse( return parser.parse(
json.loads(_open_file("tests/testdata/extract_duplcheck/overlap/"+filename)) json.loads(_open_file(
"tests/testdata/extract_duplcheck/overlap/" + filename))
)[1] )[1]
blocks = ( blocks = (
Block(first = 0, last= 12771, end= 9890,chat_data = load_chatdata("dp0-0.json")), Block(first=0, last=12771, end=9890,
Block(first = 9890, last= 15800, end= 20244,chat_data = load_chatdata("dp0-1.json")), chat_data=load_chatdata("dp0-0.json")),
Block(first = 20244,last= 45146, end= 32476,chat_data = load_chatdata("dp0-2.json")), Block(first=9890, last=15800, end=20244,
Block(first = 32476,last= 50520, end= 41380,chat_data = load_chatdata("dp0-3.json")), chat_data=load_chatdata("dp0-1.json")),
Block(first = 41380,last= 62875, end= 52568,chat_data = load_chatdata("dp0-4.json")), Block(first=20244, last=45146, end=32476,
Block(first = 52568,last= 62875, end= 54000,chat_data = load_chatdata("dp0-5.json"),is_last=True) chat_data=load_chatdata("dp0-2.json")),
Block(first=32476, last=50520, end=41380,
chat_data=load_chatdata("dp0-3.json")),
Block(first=41380, last=62875, end=52568,
chat_data=load_chatdata("dp0-4.json")),
Block(first=52568, last=62875, end=54000,
chat_data=load_chatdata("dp0-5.json"), is_last=True)
) )
result = duplcheck.remove_overlap(blocks) result = duplcheck.remove_overlap(blocks)
#dp0-0.json has item offset time is 9890 (equals block[0].end = block[1].first), # dp0-0.json has item offset time is 9890 (equals block[0].end = block[1].first),
#but must be aligne to the most close and smaller value:9779. # but must be aligne to the most close and smaller value:9779.
assert result[0].last == 9779 assert result[0].last == 9779
assert result[1].last == 15800 assert result[1].last == 15800
assert result[2].last == 32196 assert result[2].last == 32196
assert result[3].last == 41116 assert result[3].last == 41116
assert result[4].last == 52384 assert result[4].last == 52384
#the last block must be always added to result. # the last block must be always added to result.
assert result[5].last == 62875 assert result[5].last == 62875
def test_duplicate_head(): def test_duplicate_head():
def load_chatdata(filename): def load_chatdata(filename):
return parser.parse( return parser.parse(
json.loads(_open_file("tests/testdata/extract_duplcheck/head/"+filename)) json.loads(_open_file(
"tests/testdata/extract_duplcheck/head/" + filename))
)[1] )[1]
""" """
@@ -69,25 +75,26 @@ def test_duplicate_head():
result : [2] , [4] , [5] result : [2] , [4] , [5]
""" """
#chat data offsets are ignored. # chat data offsets are ignored.
blocks = ( blocks = (
Block(first = 0, last = 2500, chat_data = load_chatdata("dp0-0.json")), Block(first=0, last=2500, chat_data=load_chatdata("dp0-0.json")),
Block(first = 0, last =38771, chat_data = load_chatdata("dp0-1.json")), Block(first=0, last=38771, chat_data=load_chatdata("dp0-1.json")),
Block(first = 0, last =45146, chat_data = load_chatdata("dp0-2.json")), Block(first=0, last=45146, chat_data=load_chatdata("dp0-2.json")),
Block(first = 20244, last =60520, chat_data = load_chatdata("dp0-3.json")), Block(first=20244, last=60520, chat_data=load_chatdata("dp0-3.json")),
Block(first = 20244, last =62875, chat_data = load_chatdata("dp0-4.json")), Block(first=20244, last=62875, chat_data=load_chatdata("dp0-4.json")),
Block(first = 52568, last =62875, chat_data = load_chatdata("dp0-5.json")) Block(first=52568, last=62875, chat_data=load_chatdata("dp0-5.json"))
) )
_dump(blocks) _dump(blocks)
result = duplcheck.remove_duplicate_head(blocks) result = duplcheck.remove_duplicate_head(blocks)
assert len(result) == 3 assert len(result) == 3
assert result[0].first == blocks[2].first assert result[0].first == blocks[2].first
assert result[0].last == blocks[2].last assert result[0].last == blocks[2].last
assert result[1].first == blocks[4].first assert result[1].first == blocks[4].first
assert result[1].last == blocks[4].last assert result[1].last == blocks[4].last
assert result[2].first == blocks[5].first assert result[2].first == blocks[5].first
assert result[2].last == blocks[5].last assert result[2].last == blocks[5].last
def test_duplicate_tail(): def test_duplicate_tail():
""" """
@@ -103,26 +110,25 @@ def test_duplicate_tail():
""" """
def load_chatdata(filename): def load_chatdata(filename):
return parser.parse( return parser.parse(
json.loads(_open_file("tests/testdata/extract_duplcheck/head/"+filename)) json.loads(_open_file(
"tests/testdata/extract_duplcheck/head/" + filename))
)[1] )[1]
#chat data offsets are ignored. # chat data offsets are ignored.
blocks = ( blocks = (
Block(first = 0,last = 2500, chat_data=load_chatdata("dp0-0.json")), Block(first=0, last=2500, chat_data=load_chatdata("dp0-0.json")),
Block(first = 1500,last = 2500, chat_data=load_chatdata("dp0-1.json")), Block(first=1500, last=2500, chat_data=load_chatdata("dp0-1.json")),
Block(first = 10000,last = 45146, chat_data=load_chatdata("dp0-2.json")), Block(first=10000, last=45146, chat_data=load_chatdata("dp0-2.json")),
Block(first = 20244,last = 45146, chat_data=load_chatdata("dp0-3.json")), Block(first=20244, last=45146, chat_data=load_chatdata("dp0-3.json")),
Block(first = 20244,last = 62875, chat_data=load_chatdata("dp0-4.json")), Block(first=20244, last=62875, chat_data=load_chatdata("dp0-4.json")),
Block(first = 52568,last = 62875, chat_data=load_chatdata("dp0-5.json")) Block(first=52568, last=62875, chat_data=load_chatdata("dp0-5.json"))
) )
result = duplcheck.remove_duplicate_tail(blocks) result = duplcheck.remove_duplicate_tail(blocks)
_dump(result) _dump(result)
assert len(result) == 3 assert len(result) == 3
assert result[0].first == blocks[0].first assert result[0].first == blocks[0].first
assert result[0].last == blocks[0].last assert result[0].last == blocks[0].last
assert result[1].first == blocks[2].first assert result[1].first == blocks[2].first
assert result[1].last == blocks[2].last assert result[1].last == blocks[2].last
assert result[2].first == blocks[4].first assert result[2].first == blocks[4].first
assert result[2].last == blocks[4].last assert result[2].last == blocks[4].last

View File

@@ -1,23 +1,19 @@
import aiohttp
import asyncio
import json import json
import os, sys
import time
from aioresponses import aioresponses
from pytchat.tool.extract import duplcheck
from pytchat.tool.extract import parser from pytchat.tool.extract import parser
from pytchat.tool.extract.block import Block from pytchat.tool.extract.block import Block
from pytchat.tool.extract.patch import Patch, fill, split, set_patch from pytchat.tool.extract.patch import Patch, split
from pytchat.tool.extract.duplcheck import _dump
def _open_file(path): def _open_file(path):
with open(path,mode ='r',encoding = 'utf-8') as f: with open(path, mode='r', encoding='utf-8') as f:
return f.read() return f.read()
def load_chatdata(filename): def load_chatdata(filename):
return parser.parse( return parser.parse(
json.loads(_open_file("tests/testdata/fetch_patch/"+filename)) json.loads(_open_file("tests/testdata/fetch_patch/" + filename))
)[1] )[1]
def test_split_0(): def test_split_0():
@@ -61,20 +57,23 @@ def test_split_0():
@fetched patch @fetched patch
|-- patch --| |-- patch --|
""" """
parent = Block(first=0, last=4000, end=60000, continuation='parent', during_split=True) parent = Block(first=0, last=4000, end=60000,
child = Block(first=0, last=0, end=60000, continuation='mean', during_split=True) continuation='parent', during_split=True)
child = Block(first=0, last=0, end=60000,
continuation='mean', during_split=True)
patch = Patch(chats=load_chatdata('pt0-5.json'), patch = Patch(chats=load_chatdata('pt0-5.json'),
first=32500, last=34000, continuation='patch') first=32500, last=34000, continuation='patch')
split(parent,child,patch) split(parent, child, patch)
assert child.continuation == 'patch' assert child.continuation == 'patch'
assert parent.last < child.first assert parent.last < child.first
assert parent.end == child.first assert parent.end == child.first
assert child.first < child.last assert child.first < child.last
assert child.last < child.end assert child.last < child.end
assert parent.during_split == False assert parent.during_split is False
assert child.during_split == False assert child.during_split is False
def test_split_1(): def test_split_1():
"""patch.first <= parent_block.last """patch.first <= parent_block.last
@@ -119,14 +118,15 @@ def test_split_1():
child = Block(first=0, last=0, end=60000, continuation='mean', during_split=True) child = Block(first=0, last=0, end=60000, continuation='mean', during_split=True)
patch = Patch(chats=load_chatdata('pt0-5.json'), patch = Patch(chats=load_chatdata('pt0-5.json'),
first=32500, last=34000, continuation='patch') first=32500, last=34000, continuation='patch')
split(parent,child,patch)
assert parent.last == 33000 #no change split(parent, child, patch)
assert parent.end == 60000 #no change
assert parent.last == 33000 # no change
assert parent.end == 60000 # no change
assert child.continuation is None assert child.continuation is None
assert parent.during_split == False assert parent.during_split is False
assert child.during_split == True #exclude during_split sequence assert child.during_split is True # exclude during_split sequence
def test_split_2(): def test_split_2():
"""child_block.end < patch.last: """child_block.end < patch.last:
@@ -174,7 +174,7 @@ def test_split_2():
patch = Patch(chats=load_chatdata('pt0-5.json'), patch = Patch(chats=load_chatdata('pt0-5.json'),
first=32500, last=34000, continuation='patch') first=32500, last=34000, continuation='patch')
split(parent,child,patch) split(parent, child, patch)
assert child.continuation is None assert child.continuation is None
assert parent.last < child.first assert parent.last < child.first
@@ -182,8 +182,9 @@ def test_split_2():
assert child.first < child.last assert child.first < child.last
assert child.last < child.end assert child.last < child.end
assert child.continuation is None assert child.continuation is None
assert parent.during_split == False assert parent.during_split is False
assert child.during_split == False assert child.during_split is False
def test_split_none(): def test_split_none():
"""patch.last <= parent_block.last """patch.last <= parent_block.last
@@ -193,7 +194,7 @@ def test_split_none():
and parent.block.last exceeds patch.first. and parent.block.last exceeds patch.first.
In this case, fetched patch is all discarded, In this case, fetched patch is all discarded,
and worker searches other processing block again. and worker searches other processing block again.
~~~~~~ before ~~~~~~ ~~~~~~ before ~~~~~~
@@ -229,10 +230,10 @@ def test_split_none():
patch = Patch(chats=load_chatdata('pt0-5.json'), patch = Patch(chats=load_chatdata('pt0-5.json'),
first=32500, last=34000, continuation='patch') first=32500, last=34000, continuation='patch')
split(parent,child,patch) split(parent, child, patch)
assert parent.last == 40000 #no change assert parent.last == 40000 # no change
assert parent.end == 60000 #no change assert parent.end == 60000 # no change
assert child.continuation is None assert child.continuation is None
assert parent.during_split == False assert parent.during_split is False
assert child.during_split == True #exclude during_split sequence assert child.during_split is True # exclude during_split sequence

View File

@@ -1,5 +1,8 @@
import asyncio
import json import json
from aioresponses import aioresponses from pytest_httpx import HTTPXMock
from concurrent.futures import CancelledError
from pytchat.core_multithread.livechat import LiveChat
from pytchat.core_async.livechat import LiveChatAsync from pytchat.core_async.livechat import LiveChatAsync
from pytchat.exceptions import ResponseContextError from pytchat.exceptions import ResponseContextError
@@ -9,34 +12,37 @@ def _open_file(path):
return f.read() return f.read()
@aioresponses() def add_response_file(httpx_mock: HTTPXMock, jsonfile_path: str):
def test_Async(*mock): testdata = json.loads(_open_file(jsonfile_path))
vid = '__test_id__' httpx_mock.add_response(json=testdata)
_text = _open_file('tests/testdata/paramgen_firstread.json')
_text = json.loads(_text)
mock[0].get( def test_async(httpx_mock: HTTPXMock):
f"https://www.youtube.com/live_chat?v={vid}&is_popout=1", status=200, body=_text) add_response_file(httpx_mock, 'tests/testdata/paramgen_firstread.json')
async def test_loop():
try:
chat = LiveChatAsync(video_id='__test_id__')
_ = await chat.get()
assert chat.is_alive()
chat.terminate()
assert not chat.is_alive()
except ResponseContextError:
assert False
loop = asyncio.get_event_loop()
try: try:
chat = LiveChatAsync(video_id='__test_id__') loop.run_until_complete(test_loop())
except CancelledError:
assert True
def test_multithread(httpx_mock: HTTPXMock):
add_response_file(httpx_mock, 'tests/testdata/paramgen_firstread.json')
try:
chat = LiveChat(video_id='__test_id__')
_ = chat.get()
assert chat.is_alive() assert chat.is_alive()
chat.terminate() chat.terminate()
assert not chat.is_alive() assert not chat.is_alive()
except ResponseContextError: except ResponseContextError:
assert not chat.is_alive() assert False
def test_MultiThread(mocker):
_text = _open_file('tests/testdata/paramgen_firstread.json')
_text = json.loads(_text)
responseMock = mocker.Mock()
responseMock.status_code = 200
responseMock.text = _text
mocker.patch('requests.Session.get').return_value = responseMock
try:
chat = LiveChatAsync(video_id='__test_id__')
assert chat.is_alive()
chat.terminate()
assert not chat.is_alive()
except ResponseContextError:
chat.terminate()
assert not chat.is_alive()

View File

@@ -1,6 +1,6 @@
import asyncio import asyncio
import re import json
from aioresponses import aioresponses from pytest_httpx import HTTPXMock
from concurrent.futures import CancelledError from concurrent.futures import CancelledError
from pytchat.core_multithread.livechat import LiveChat from pytchat.core_multithread.livechat import LiveChat
from pytchat.core_async.livechat import LiveChatAsync from pytchat.core_async.livechat import LiveChatAsync
@@ -12,18 +12,18 @@ def _open_file(path):
return f.read() return f.read()
@aioresponses() def add_response_file(httpx_mock: HTTPXMock, jsonfile_path: str):
def test_async_live_stream(*mock): testdata = json.loads(_open_file(jsonfile_path))
httpx_mock.add_response(json=testdata)
async def test_loop(*mock):
pattern = re.compile( def test_async_live_stream(httpx_mock: HTTPXMock):
r'^https://www.youtube.com/live_chat/get_live_chat\?continuation=.*$') add_response_file(httpx_mock, 'tests/testdata/test_stream.json')
_text = _open_file('tests/testdata/test_stream.json')
mock[0].get(pattern, status=200, body=_text) async def test_loop():
chat = LiveChatAsync(video_id='__test_id__', processor=DummyProcessor()) chat = LiveChatAsync(video_id='__test_id__', processor=DummyProcessor())
chats = await chat.get() chats = await chat.get()
rawdata = chats[0]["chatdata"] rawdata = chats[0]["chatdata"]
# assert fetching livachat data
assert list(rawdata[0]["addChatItemAction"]["item"].keys())[ assert list(rawdata[0]["addChatItemAction"]["item"].keys())[
0] == "liveChatTextMessageRenderer" 0] == "liveChatTextMessageRenderer"
assert list(rawdata[1]["addChatItemAction"]["item"].keys())[ assert list(rawdata[1]["addChatItemAction"]["item"].keys())[
@@ -41,25 +41,16 @@ def test_async_live_stream(*mock):
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
try: try:
loop.run_until_complete(test_loop(*mock)) loop.run_until_complete(test_loop())
except CancelledError: except CancelledError:
assert True assert True
@aioresponses() def test_async_replay_stream(httpx_mock: HTTPXMock):
def test_async_replay_stream(*mock): add_response_file(httpx_mock, 'tests/testdata/finished_live.json')
add_response_file(httpx_mock, 'tests/testdata/chatreplay.json')
async def test_loop(*mock):
pattern_live = re.compile(
r'^https://www.youtube.com/live_chat/get_live_chat\?continuation=.*$')
pattern_replay = re.compile(
r'^https://www.youtube.com/live_chat_replay/get_live_chat_replay\?continuation=.*$')
# empty livechat -> switch to fetch replaychat
_text_live = _open_file('tests/testdata/finished_live.json')
_text_replay = _open_file('tests/testdata/chatreplay.json')
mock[0].get(pattern_live, status=200, body=_text_live)
mock[0].get(pattern_replay, status=200, body=_text_replay)
async def test_loop():
chat = LiveChatAsync(video_id='__test_id__', processor=DummyProcessor()) chat = LiveChatAsync(video_id='__test_id__', processor=DummyProcessor())
chats = await chat.get() chats = await chat.get()
rawdata = chats[0]["chatdata"] rawdata = chats[0]["chatdata"]
@@ -71,27 +62,16 @@ def test_async_replay_stream(*mock):
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
try: try:
loop.run_until_complete(test_loop(*mock)) loop.run_until_complete(test_loop())
except CancelledError: except CancelledError:
assert True assert True
@aioresponses() def test_async_force_replay(httpx_mock: HTTPXMock):
def test_async_force_replay(*mock): add_response_file(httpx_mock, 'tests/testdata/test_stream.json')
add_response_file(httpx_mock, 'tests/testdata/chatreplay.json')
async def test_loop(*mock): async def test_loop():
pattern_live = re.compile(
r'^https://www.youtube.com/live_chat/get_live_chat\?continuation=.*$')
pattern_replay = re.compile(
r'^https://www.youtube.com/live_chat_replay/get_live_chat_replay\?continuation=.*$')
# valid live data, but force_replay = True
_text_live = _open_file('tests/testdata/test_stream.json')
# valid replay data
_text_replay = _open_file('tests/testdata/chatreplay.json')
mock[0].get(pattern_live, status=200, body=_text_live)
mock[0].get(pattern_replay, status=200, body=_text_replay)
# force replay
chat = LiveChatAsync( chat = LiveChatAsync(
video_id='__test_id__', processor=DummyProcessor(), force_replay=True) video_id='__test_id__', processor=DummyProcessor(), force_replay=True)
chats = await chat.get() chats = await chat.get()
@@ -105,20 +85,13 @@ def test_async_force_replay(*mock):
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
try: try:
loop.run_until_complete(test_loop(*mock)) loop.run_until_complete(test_loop())
except CancelledError: except CancelledError:
assert True assert True
def test_multithread_live_stream(mocker): def test_multithread_live_stream(httpx_mock: HTTPXMock):
add_response_file(httpx_mock, 'tests/testdata/test_stream.json')
_text = _open_file('tests/testdata/test_stream.json')
responseMock = mocker.Mock()
responseMock.status_code = 200
responseMock.text = _text
mocker.patch(
'requests.Session.get').return_value.__enter__.return_value = responseMock
chat = LiveChat(video_id='__test_id__', processor=DummyProcessor()) chat = LiveChat(video_id='__test_id__', processor=DummyProcessor())
chats = chat.get() chats = chat.get()
rawdata = chats[0]["chatdata"] rawdata = chats[0]["chatdata"]

View File

@@ -1,21 +1,18 @@
from pytchat.parser.live import Parser from pytchat.parser.live import Parser
import json import json
from aioresponses import aioresponses
from pytchat.exceptions import NoContents from pytchat.exceptions import NoContents
parser = Parser(is_replay=False)
def _open_file(path): def _open_file(path):
with open(path, mode='r', encoding='utf-8') as f: with open(path, mode='r', encoding='utf-8') as f:
return f.read() return f.read()
parser = Parser(is_replay=False)
@aioresponses()
def test_finishedlive(*mock): def test_finishedlive(*mock):
'''配信が終了した動画を正しく処理できるか''' '''配信が終了した動画を正しく処理できるか'''
_text = _open_file('tests/testdata/finished_live.json') _text = _open_file('tests/testdata/finished_live.json')
_text = json.loads(_text) _text = json.loads(_text)
@@ -26,10 +23,8 @@ def test_finishedlive(*mock):
assert True assert True
@aioresponses()
def test_parsejson(*mock): def test_parsejson(*mock):
'''jsonを正常にパースできるか''' '''jsonを正常にパースできるか'''
_text = _open_file('tests/testdata/paramgen_firstread.json') _text = _open_file('tests/testdata/paramgen_firstread.json')
_text = json.loads(_text) _text = json.loads(_text)

View File

@@ -12,13 +12,13 @@ def _set_test_data(filepath, mocker):
response_mock = mocker.Mock() response_mock = mocker.Mock()
response_mock.status_code = 200 response_mock.status_code = 200
response_mock.text = _text response_mock.text = _text
mocker.patch('requests.get').return_value = response_mock mocker.patch('httpx.get').return_value = response_mock
def test_archived_page(mocker): def test_archived_page(mocker):
_set_test_data('tests/testdata/videoinfo/archived_page.txt', mocker) _set_test_data('tests/testdata/videoinfo/archived_page.txt', mocker)
info = VideoInfo('__test_id__') info = VideoInfo('__test_id__')
actual_thumbnail_url = 'https://i.ytimg.com/vi/fzI9FNjXQ0o/hqdefault.jpg' actual_thumbnail_url = 'https://i.ytimg.com/vi/fzI9FNjXQ0o/hqdefault.jpg'
assert info.video_id == '__test_id__' assert info.video_id == '__test_id__'
assert info.get_channel_name() == 'GitHub' assert info.get_channel_name() == 'GitHub'
assert info.get_thumbnail() == actual_thumbnail_url assert info.get_thumbnail() == actual_thumbnail_url