Compare commits

...

22 Commits

Author SHA1 Message Date
taizan-hokuto
a790ab13a9 Merge branch 'release/v0.1.5' 2020-09-03 20:16:55 +09:00
taizan-hokuto
0456300d19 Increment version 2020-09-03 20:15:38 +09:00
taizan-hokuto
2ef1e7028f Restore setup 2020-09-03 19:59:18 +09:00
taizan-hokuto
9413c4a186 Merge branch 'feature/add_progressbar' into develop 2020-09-03 19:54:35 +09:00
taizan-hokuto
8a8cef399f Format 2020-09-03 19:48:34 +09:00
taizan-hokuto
3bcad12cf6 Add cli option 2020-09-03 19:31:34 +09:00
taizan-hokuto
4eb18279fe Add progress bar 2020-09-03 00:57:26 +09:00
taizan-hokuto
e9ed564e1b Merge branch 'feature/httpx' into develop 2020-08-30 22:17:57 +09:00
taizan-hokuto
95f975c93d Use httpx 2020-08-30 22:16:58 +09:00
taizan-hokuto
8012e1d191 Merge branch 'master' into feature/httpx 2020-08-22 12:41:57 +09:00
taizan-hokuto
f9480ea1eb Merge branch 'hotfix/cli_handle_live' 2020-08-21 22:25:48 +09:00
taizan-hokuto
404727c49c Merge tag 'cli_handle_live' into develop
v0.1.4
2020-08-21 22:25:48 +09:00
taizan-hokuto
6b924a88ef Increment version 2020-08-21 22:25:06 +09:00
taizan-hokuto
56294d6a67 Fix extracting video_id 2020-08-21 22:23:33 +09:00
taizan-hokuto
283443e374 Merge pull request #15 from EtianAM/patch-1
Fix videoinfo.py and CLI download
2020-08-21 19:34:19 +09:00
Etian Daniel Alavardo Mtz
89b51c420f Avoid changing the type of result.
However, if this argument is used elsewhere in the code it should be corrected.
2020-08-20 22:39:32 -05:00
Etian Daniel Alavardo Mtz
96474f10c6 Fix videoinfo.py
A bit ugly, but I couldn't solve it any other way. I'm bad with regex.
2020-08-20 22:29:59 -05:00
taizan-hokuto
5f78a99507 Merge tag 'exist_dir' into develop
v0.1.3
2020-08-06 00:32:07 +09:00
taizan-hokuto
497d84015e Merge branch 'master' into develop 2020-07-27 00:26:57 +09:00
taizan-hokuto
5d3c7b5abd Merge tag 'fix_color' into develop
v0.1.2
2020-07-24 22:43:09 +09:00
taizan-hokuto
f16ef60f11 Merge tag 'fix_cli' into develop
v0.1.1
2020-07-24 16:43:14 +09:00
taizan-hokuto
5fa4d051ee Merge tag 'v0.1.0' into develop
v0.1.0
2020-07-24 16:27:14 +09:00
28 changed files with 441 additions and 397 deletions

View File

@@ -2,7 +2,7 @@
pytchat is a lightweight python library to browse youtube livechat without Selenium or BeautifulSoup.
"""
__copyright__ = 'Copyright (C) 2019 taizan-hokuto'
__version__ = '0.1.3'
__version__ = '0.1.5'
__license__ = 'MIT'
__author__ = 'taizan-hokuto'
__author_email__ = '55448286+taizan-hokuto@users.noreply.github.com'

View File

@@ -1,12 +1,17 @@
import argparse
import os
import signal
from json.decoder import JSONDecodeError
from pathlib import Path
from pytchat.util.extract_video_id import extract_video_id
from .arguments import Arguments
from .. exceptions import InvalidVideoIdException, NoContents
from .progressbar import ProgressBar
from .. exceptions import InvalidVideoIdException, NoContents, PatternUnmatchError
from .. processors.html_archiver import HTMLArchiver
from .. tool.extract.extractor import Extractor
from .. tool.videoinfo import VideoInfo
from .. util.extract_video_id import extract_video_id
from .. import util
from .. import __version__
'''
@@ -29,44 +34,67 @@ def main():
help='Output directory (end with "/"). default="./"', default='./')
parser.add_argument(f'--{Arguments.Name.VERSION}', action='store_true',
help='Show version')
parser.add_argument(f'--{Arguments.Name.SAVE_ERROR_DATA}', action='store_true',
help='Save error data when error occurs(".dat" file)')
Arguments(parser.parse_args().__dict__)
if Arguments().print_version:
print(f'pytchat v{__version__} © 2019 taizan-hokuto')
return
# Extractor
if Arguments().video_ids:
for video_id in Arguments().video_ids:
if '[' in video_id:
video_id = video_id.replace('[', '').replace(']', '')
try:
if os.path.exists(Arguments().output):
path = Path(Arguments().output + video_id + '.html')
else:
raise FileNotFoundError
video_id = extract_video_id(video_id)
info = VideoInfo(video_id)
print(f"Extracting...\n"
f" video_id: {video_id}\n"
f" channel: {info.get_channel_name()}\n"
f" title: {info.get_title()}")
print(f" output path: {path.resolve()}")
Extractor(video_id,
processor=HTMLArchiver(
Arguments().output + video_id + '.html'),
callback=_disp_progress
).extract()
print("\nExtraction end.\n")
except InvalidVideoIdException:
print("Invalid Video ID or URL:", video_id)
except (TypeError, NoContents) as e:
print(e)
except FileNotFoundError:
print("The specified directory does not exist.:{}".format(Arguments().output ))
if not Arguments().video_ids:
parser.print_help()
return
parser.print_help()
for video_id in Arguments().video_ids:
if '[' in video_id:
video_id = video_id.replace('[', '').replace(']', '')
try:
video_id = extract_video_id(video_id)
if os.path.exists(Arguments().output):
path = Path(Arguments().output + video_id + '.html')
else:
raise FileNotFoundError
info = VideoInfo(video_id)
print(f"Extracting...\n"
f" video_id: {video_id}\n"
f" channel: {info.get_channel_name()}\n"
f" title: {info.get_title()}")
print(f" output path: {path.resolve()}")
duration = info.get_duration()
pbar = ProgressBar(duration)
ex = Extractor(video_id,
processor=HTMLArchiver(Arguments().output + video_id + '.html'),
callback=pbar._disp,
div=10)
signal.signal(signal.SIGINT, (lambda a, b: cancel(ex, pbar)))
ex.extract()
pbar.close()
if pbar.is_cancelled():
print("\nThe extraction process has been discontinued.\n")
return
print("\nThe extraction process has been completed.\n")
except InvalidVideoIdException:
print("Invalid Video ID or URL:", video_id)
except (TypeError, NoContents) as e:
print(e.with_traceback())
except FileNotFoundError:
print("The specified directory does not exist.:{}".format(Arguments().output))
except JSONDecodeError as e:
print(e.msg)
print("Cannot parse video information.:{}".format(video_id))
if Arguments().save_error_data:
util.save(e.doc, "ERR_JSON_DECODE", ".dat")
except PatternUnmatchError as e:
print(e.msg)
print("Cannot parse video information.:{}".format(video_id))
if Arguments().save_error_data:
util.save(e.doc, "ERR_PATTERN_UNMATCH", ".dat")
return
def _disp_progress(a, b):
print('.', end="", flush=True)
def cancel(ex: Extractor, pbar: ProgressBar):
ex.cancel()
pbar.cancel()

View File

@@ -18,6 +18,7 @@ class Arguments(metaclass=Singleton):
VERSION: str = 'version'
OUTPUT: str = 'output_dir'
VIDEO_IDS: str = 'video_id'
SAVE_ERROR_DATA: bool = 'save_error_data'
def __init__(self,
arguments: Optional[Dict[str, Union[str, bool, int]]] = None):
@@ -34,10 +35,8 @@ class Arguments(metaclass=Singleton):
self.print_version: bool = arguments[Arguments.Name.VERSION]
self.output: str = arguments[Arguments.Name.OUTPUT]
self.video_ids: List[int] = []
self.save_error_data: bool = arguments[Arguments.Name.SAVE_ERROR_DATA]
# Videos
if arguments[Arguments.Name.VIDEO_IDS]:
self.video_ids = [video_id
for video_id in arguments[Arguments.Name.VIDEO_IDS].split(',')]

View File

@@ -0,0 +1,41 @@
'''
This code for this progress bar is based on
vladignatyev/progress.py
https://gist.github.com/vladignatyev/06860ec2040cb497f0f3
(MIT License)
'''
import sys
class ProgressBar:
def __init__(self, duration):
self._duration = duration
self._count = 0
self._bar_len = 60
self._cancelled = False
def _disp(self, _, fetched):
self._progress(fetched / 1000, self._duration)
def _progress(self, fillin, total, status=''):
if total == 0 or self._cancelled:
return
self._count += fillin
filled_len = int(round(self._bar_len * self._count / float(total)))
percents = round(100.0 * self._count / float(total), 1)
if filled_len > self._bar_len:
filled_len = self._bar_len
percents = 100
bar = '=' * filled_len + ' ' * (self._bar_len - filled_len)
sys.stdout.write(' [%s] %s%s ...%s\r' % (bar, percents, '%', status))
sys.stdout.flush()
def close(self):
if not self._cancelled:
self._progress(self._duration, self._duration)
def cancel(self):
self._cancelled = True
def is_cancelled(self):
return self._cancelled

View File

@@ -1,7 +1,8 @@
import logging
from . import mylogger
headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.106 Safari/537.36'}
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.135 Safari/537.36',
}
def logger(module_name: str, loglevel=None):

View File

@@ -1,13 +1,13 @@
import aiohttp
import asyncio
import httpx
import json
import signal
import time
import traceback
import urllib.parse
from aiohttp.client_exceptions import ClientConnectorError
from concurrent.futures import CancelledError
from asyncio import Queue
from concurrent.futures import CancelledError
from .buffer import Buffer
from ..parser.live import Parser
from .. import config
@@ -22,7 +22,7 @@ MAX_RETRY = 10
class LiveChatAsync:
'''asyncio(aiohttp)を利用してYouTubeのライブ配信のチャットデータを取得する。
'''asyncioを利用してYouTubeのライブ配信のチャットデータを取得する。
Parameter
---------
@@ -161,11 +161,11 @@ class LiveChatAsync:
parameter for next chat data
'''
try:
async with aiohttp.ClientSession() as session:
async with httpx.AsyncClient(http2=True) as client:
while(continuation and self._is_alive):
continuation = await self._check_pause(continuation)
contents = await self._get_contents(
continuation, session, headers)
continuation, client, headers)
metadata, chatdata = self._parser.parse(contents)
timeout = metadata['timeoutMs'] / 1000
@@ -210,7 +210,7 @@ class LiveChatAsync:
self._video_id, 3, self._topchat_only)
return continuation
async def _get_contents(self, continuation, session, headers):
async def _get_contents(self, continuation, client, headers):
'''Get 'continuationContents' from livechat json.
If contents is None at first fetching,
try to fetch archive chat data.
@@ -219,7 +219,7 @@ class LiveChatAsync:
-------
'continuationContents' which includes metadata & chatdata.
'''
livechat_json = await self._get_livechat_json(continuation, session, headers)
livechat_json = await self._get_livechat_json(continuation, client, headers)
contents = self._parser.get_contents(livechat_json)
if self._first_fetch:
if contents is None or self._is_replay:
@@ -229,18 +229,18 @@ class LiveChatAsync:
continuation = arcparam.getparam(
self._video_id, self.seektime, self._topchat_only)
livechat_json = (await self._get_livechat_json(
continuation, session, headers))
continuation, client, headers))
reload_continuation = self._parser.reload_continuation(
self._parser.get_contents(livechat_json))
if reload_continuation:
livechat_json = (await self._get_livechat_json(
reload_continuation, session, headers))
reload_continuation, client, headers))
contents = self._parser.get_contents(livechat_json)
self._is_replay = True
self._first_fetch = False
return contents
async def _get_livechat_json(self, continuation, session, headers):
async def _get_livechat_json(self, continuation, client, headers):
'''
Get json which includes chat data.
'''
@@ -249,14 +249,13 @@ class LiveChatAsync:
status_code = 0
url = f"https://www.youtube.com/{self._fetch_url}{continuation}&pbj=1"
for _ in range(MAX_RETRY + 1):
async with session.get(url, headers=headers) as resp:
try:
text = await resp.text()
livechat_json = json.loads(text)
break
except (ClientConnectorError, json.JSONDecodeError):
await asyncio.sleep(1)
continue
try:
resp = await client.get(url, headers=headers)
livechat_json = resp.json()
break
except (httpx.HTTPError, json.JSONDecodeError):
await asyncio.sleep(1)
continue
else:
self._logger.error(f"[{self._video_id}]"
f"Exceeded retry count. status_code={status_code}")

View File

@@ -1,4 +1,4 @@
import requests
import httpx
import json
import signal
import time
@@ -153,10 +153,10 @@ class LiveChat:
parameter for next chat data
'''
try:
with requests.Session() as session:
with httpx.Client(http2=True) as client:
while(continuation and self._is_alive):
continuation = self._check_pause(continuation)
contents = self._get_contents(continuation, session, headers)
contents = self._get_contents(continuation, client, headers)
metadata, chatdata = self._parser.parse(contents)
timeout = metadata['timeoutMs'] / 1000
chat_component = {
@@ -199,7 +199,7 @@ class LiveChat:
continuation = liveparam.getparam(self._video_id, 3)
return continuation
def _get_contents(self, continuation, session, headers):
def _get_contents(self, continuation, client, headers):
'''Get 'continuationContents' from livechat json.
If contents is None at first fetching,
try to fetch archive chat data.
@@ -209,7 +209,7 @@ class LiveChat:
'continuationContents' which includes metadata & chat data.
'''
livechat_json = (
self._get_livechat_json(continuation, session, headers)
self._get_livechat_json(continuation, client, headers)
)
contents = self._parser.get_contents(livechat_json)
if self._first_fetch:
@@ -219,18 +219,18 @@ class LiveChat:
self._fetch_url = "live_chat_replay/get_live_chat_replay?continuation="
continuation = arcparam.getparam(
self._video_id, self.seektime, self._topchat_only)
livechat_json = (self._get_livechat_json(continuation, session, headers))
livechat_json = (self._get_livechat_json(continuation, client, headers))
reload_continuation = self._parser.reload_continuation(
self._parser.get_contents(livechat_json))
if reload_continuation:
livechat_json = (self._get_livechat_json(
reload_continuation, session, headers))
reload_continuation, client, headers))
contents = self._parser.get_contents(livechat_json)
self._is_replay = True
self._first_fetch = False
return contents
def _get_livechat_json(self, continuation, session, headers):
def _get_livechat_json(self, continuation, client, headers):
'''
Get json which includes chat data.
'''
@@ -239,10 +239,9 @@ class LiveChat:
status_code = 0
url = f"https://www.youtube.com/{self._fetch_url}{continuation}&pbj=1"
for _ in range(MAX_RETRY + 1):
with session.get(url, headers=headers) as resp:
with client:
try:
text = resp.text
livechat_json = json.loads(text)
livechat_json = client.get(url, headers=headers).json()
break
except json.JSONDecodeError:
time.sleep(1)

View File

@@ -62,3 +62,18 @@ class ReceivedUnknownContinuation(ChatParseException):
class FailedExtractContinuation(ChatDataFinished):
pass
class VideoInfoParseError(Exception):
'''
thrown when failed to parse video info
'''
class PatternUnmatchError(VideoInfoParseError):
'''
thrown when failed to parse video info with unmatched pattern
'''
def __init__(self, doc):
self.msg = "PatternUnmatchError"
self.doc = doc

View File

@@ -1,6 +1,6 @@
import os
import re
import requests
import httpx
from base64 import standard_b64encode
from .chat_processor import ChatProcessor
from .default.processor import DefaultProcessor
@@ -108,7 +108,7 @@ class HTMLArchiver(ChatProcessor):
for item in message_items)
def _encode_img(self, url):
resp = requests.get(url)
resp = httpx.get(url)
return standard_b64encode(resp.content).decode()
def _set_emoji_table(self, item: dict):

View File

@@ -1,6 +1,5 @@
import aiohttp
import httpx
import asyncio
import json
from . import parser
from . block import Block
from . worker import ExtractWorker
@@ -55,7 +54,7 @@ def ready_blocks(video_id, duration, div, callback):
raise ValueError
async def _get_blocks(video_id, duration, div, callback):
async with aiohttp.ClientSession() as session:
async with httpx.AsyncClient(http2=True) as session:
tasks = [_create_block(session, video_id, seektime, callback)
for seektime in _split(-1, duration, div)]
return await asyncio.gather(*tasks)
@@ -65,9 +64,8 @@ def ready_blocks(video_id, duration, div, callback):
url = f"{REPLAY_URL}{quote(continuation)}&pbj=1"
for _ in range(MAX_RETRY_COUNT):
try:
async with session.get(url, headers=headers) as resp:
text = await resp.text()
next_continuation, actions = parser.parse(json.loads(text))
resp = await session.get(url, headers=headers)
next_continuation, actions = parser.parse(resp.json())
break
except JSONDecodeError:
await asyncio.sleep(3)
@@ -106,7 +104,7 @@ def fetch_patch(callback, blocks, video_id):
)
for block in blocks
]
async with aiohttp.ClientSession() as session:
async with httpx.AsyncClient() as session:
tasks = [worker.run(session) for worker in workers]
return await asyncio.gather(*tasks)
@@ -114,9 +112,8 @@ def fetch_patch(callback, blocks, video_id):
url = f"{REPLAY_URL}{quote(continuation)}&pbj=1"
for _ in range(MAX_RETRY_COUNT):
try:
async with session.get(url, headers=config.headers) as resp:
chat_json = await resp.text()
continuation, actions = parser.parse(json.loads(chat_json))
resp = await session.get(url, headers=config.headers)
continuation, actions = parser.parse(resp.json())
break
except JSONDecodeError:
await asyncio.sleep(3)

View File

@@ -79,7 +79,7 @@ class Extractor:
def extract(self):
if self.duration == 0:
print("video is not archived.")
print("\nCannot extract chat data:\n The specified video has not yet been archived.")
return []
data = self._execute_extract_operations()
if self.processor is None:

View File

@@ -1,6 +1,7 @@
from . block import Block
from . patch import fill, split
from ... paramgen import arcparam
from typing import Tuple
class ExtractWorker:
@@ -76,7 +77,7 @@ def _search_new_block(worker) -> Block:
return new_block
def _get_undone_block(blocks) -> (int, Block):
def _get_undone_block(blocks) -> Tuple[int, Block]:
min_interval_ms = 120000
max_remaining = 0
undone_block = None

View File

@@ -1,12 +1,12 @@
import aiohttp
import httpx
import asyncio
import json
from . import parser
from . block import Block
from . worker import ExtractWorker
from . patch import Patch
from ... import config
from ... import config
from ... paramgen import arcparam_mining as arcparam
from concurrent.futures import CancelledError
from urllib.parse import quote
@@ -14,10 +14,12 @@ from urllib.parse import quote
headers = config.headers
REPLAY_URL = "https://www.youtube.com/live_chat_replay?continuation="
INTERVAL = 1
def _split(start, end, count, min_interval_sec = 120):
def _split(start, end, count, min_interval_sec=120):
"""
Split section from `start` to `end` into `count` pieces,
and returns the beginning of each piece.
and returns the beginning of each piece.
The `count` is adjusted so that the length of each piece
is no smaller than `min_interval`.
@@ -25,42 +27,43 @@ def _split(start, end, count, min_interval_sec = 120):
--------
List of the offset of each block's first chat data.
"""
if not (isinstance(start,int) or isinstance(start,float)) or \
not (isinstance(end,int) or isinstance(end,float)):
if not (isinstance(start, int) or isinstance(start, float)) or \
not (isinstance(end, int) or isinstance(end, float)):
raise ValueError("start/end must be int or float")
if not isinstance(count,int):
if not isinstance(count, int):
raise ValueError("count must be int")
if start>end:
if start > end:
raise ValueError("end must be equal to or greater than start.")
if count<1:
if count < 1:
raise ValueError("count must be equal to or greater than 1.")
if (end-start)/count < min_interval_sec:
count = int((end-start)/min_interval_sec)
if count == 0 : count = 1
interval= (end-start)/count
if (end - start) / count < min_interval_sec:
count = int((end - start) / min_interval_sec)
if count == 0:
count = 1
interval = (end - start) / count
if count == 1:
return [start]
return sorted( list(set( [int(start + interval*j)
for j in range(count) ])))
return sorted(list(set([int(start + interval * j)
for j in range(count)])))
def ready_blocks(video_id, duration, div, callback):
if div <= 0: raise ValueError
if div <= 0:
raise ValueError
async def _get_blocks( video_id, duration, div, callback):
async with aiohttp.ClientSession() as session:
tasks = [_create_block(session, video_id, seektime, callback)
for seektime in _split(0, duration, div)]
async def _get_blocks(video_id, duration, div, callback):
async with httpx.ClientSession() as session:
tasks = [_create_block(session, video_id, seektime, callback)
for seektime in _split(0, duration, div)]
return await asyncio.gather(*tasks)
async def _create_block(session, video_id, seektime, callback):
continuation = arcparam.getparam(video_id, seektime = seektime)
url=(f"{REPLAY_URL}{quote(continuation)}&playerOffsetMs="
f"{int(seektime*1000)}&hidden=false&pbj=1")
async with session.get(url, headers = headers) as resp:
continuation = arcparam.getparam(video_id, seektime=seektime)
url = (f"{REPLAY_URL}{quote(continuation)}&playerOffsetMs="
f"{int(seektime*1000)}&hidden=false&pbj=1")
async with session.get(url, headers=headers) as resp:
chat_json = await resp.text()
if chat_json is None:
return
@@ -70,39 +73,40 @@ def ready_blocks(video_id, duration, div, callback):
if callback:
callback(actions, INTERVAL)
return Block(
continuation = continuation,
chat_data = actions,
first = first,
last = seektime,
seektime = seektime
continuation=continuation,
chat_data=actions,
first=first,
last=seektime,
seektime=seektime
)
"""
fetch initial blocks.
"""
"""
loop = asyncio.get_event_loop()
blocks = loop.run_until_complete(
_get_blocks(video_id, duration, div, callback))
return blocks
def fetch_patch(callback, blocks, video_id):
async def _allocate_workers():
workers = [
ExtractWorker(
fetch = _fetch, block = block,
blocks = blocks, video_id = video_id
fetch=_fetch, block=block,
blocks=blocks, video_id=video_id
)
for block in blocks
]
async with aiohttp.ClientSession() as session:
async with httpx.ClientSession() as session:
tasks = [worker.run(session) for worker in workers]
return await asyncio.gather(*tasks)
return await asyncio.gather(*tasks)
async def _fetch(seektime,session) -> Patch:
continuation = arcparam.getparam(video_id, seektime = seektime)
url=(f"{REPLAY_URL}{quote(continuation)}&playerOffsetMs="
f"{int(seektime*1000)}&hidden=false&pbj=1")
async with session.get(url,headers = config.headers) as resp:
async def _fetch(seektime, session) -> Patch:
continuation = arcparam.getparam(video_id, seektime=seektime)
url = (f"{REPLAY_URL}{quote(continuation)}&playerOffsetMs="
f"{int(seektime*1000)}&hidden=false&pbj=1")
async with session.get(url, headers=config.headers) as resp:
chat_json = await resp.text()
actions = []
try:
@@ -113,21 +117,22 @@ def fetch_patch(callback, blocks, video_id):
pass
if callback:
callback(actions, INTERVAL)
return Patch(chats = actions, continuation = continuation,
seektime = seektime, last = seektime)
return Patch(chats=actions, continuation=continuation,
seektime=seektime, last=seektime)
"""
allocate workers and assign blocks.
"""
"""
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(_allocate_workers())
except CancelledError:
pass
async def _shutdown():
print("\nshutdown...")
tasks = [t for t in asyncio.all_tasks()
if t is not asyncio.current_task()]
if t is not asyncio.current_task()]
for task in tasks:
task.cancel()
try:
@@ -135,7 +140,7 @@ async def _shutdown():
except asyncio.CancelledError:
pass
def cancel():
loop = asyncio.get_event_loop()
loop.create_task(_shutdown())

View File

@@ -1,13 +1,13 @@
import json
import re
import requests
import httpx
from .. import config
from ..exceptions import InvalidVideoIdException
from ..exceptions import InvalidVideoIdException, PatternUnmatchError
from ..util.extract_video_id import extract_video_id
headers = config.headers
pattern = re.compile(r"yt\.setConfig\({'PLAYER_CONFIG': ({.*})}\);")
pattern = re.compile(r"'PLAYER_CONFIG': ({.*}}})")
item_channel_id = [
"videoDetails",
@@ -85,13 +85,15 @@ class VideoInfo:
def _get_page_text(self, video_id):
url = f"https://www.youtube.com/embed/{video_id}"
resp = requests.get(url, headers=headers)
resp = httpx.get(url, headers=headers)
resp.raise_for_status()
return resp.text
def _parse(self, text):
result = re.search(pattern, text)
res = json.loads(result.group(1))
if result is None:
raise PatternUnmatchError(text)
res = json.loads(result.group(1)[:-1])
response = self._get_item(res, item_response)
if response is None:
self._check_video_is_private(res.get("args"))

View File

@@ -1,11 +1,11 @@
import requests
import httpx
import json
import datetime
from .. import config
def extract(url):
_session = requests.Session()
_session = httpx.Client(http2=True)
html = _session.get(url, headers=config.headers)
with open(str(datetime.datetime.now().strftime('%Y-%m-%d %H-%M-%S')
) + 'test.json', mode='w', encoding='utf-8') as f:

View File

@@ -1,5 +1,4 @@
aiohttp
protobuf
httpx==0.14.1
protobuf==3.13.0
pytz
requests
urllib3

View File

@@ -1,5 +1,4 @@
aioresponses
mock
mocker
pytest
pytest-mock
pytest_httpx

View File

@@ -1,5 +1,5 @@
import json
import requests
import httpx
import pytchat.config as config
from pytchat.paramgen import arcparam
from pytchat.parser.live import Parser
@@ -18,14 +18,15 @@ def test_arcparam_1(mocker):
def test_arcparam_2(mocker):
param = arcparam.getparam("SsjCnHOk-Sk", seektime=100)
url = f"https://www.youtube.com/live_chat_replay/get_live_chat_replay?continuation={param}&pbj=1"
resp = requests.Session().get(url, headers=config.headers)
resp = httpx.Client(http2=True).get(url, headers=config.headers)
jsn = json.loads(resp.text)
parser = Parser(is_replay=True)
contents = parser.get_contents(jsn)
_ , chatdata = parser.parse(contents)
_, chatdata = parser.parse(contents)
test_id = chatdata[0]["addChatItemAction"]["item"]["liveChatTextMessageRenderer"]["id"]
assert test_id == "CjoKGkNMYXBzZTdudHVVQ0Zjc0IxZ0FkTnFnQjVREhxDSnlBNHV2bnR1VUNGV0dnd2dvZDd3NE5aZy0w"
def test_arcparam_3(mocker):
param = arcparam.getparam("01234567890")
assert param == "op2w0wQmGhxDZzhLRFFvTE1ERXlNelExTmpjNE9UQWdBUT09SARgAXICCAE%3D"

View File

@@ -1,6 +1,6 @@
from pytchat.tool.mining import parser
import pytchat.config as config
import requests
import httpx
import json
from pytchat.paramgen import arcparam_mining as arcparam
@@ -28,7 +28,7 @@ def test_arcparam_1(mocker):
def test_arcparam_2(mocker):
param = arcparam.getparam("PZz9NB0-Z64", 1)
url = f"https://www.youtube.com/live_chat_replay?continuation={param}&playerOffsetMs=1000&pbj=1"
resp = requests.Session().get(url, headers=config.headers)
resp = httpx.Client(http2=True).get(url, headers=config.headers)
jsn = json.loads(resp.text)
_, chatdata = parser.parse(jsn[1])
test_id = chatdata[0]["addChatItemAction"]["item"]["liveChatPaidMessageRenderer"]["id"]

View File

@@ -1,77 +0,0 @@
import aiohttp
import asyncio
import json
from pytchat.tool.extract import parser
import sys
import time
from aioresponses import aioresponses
from concurrent.futures import CancelledError
from pytchat.tool.extract import asyncdl
def _open_file(path):
with open(path,mode ='r',encoding = 'utf-8') as f:
return f.read()
def test_asyncdl_split():
ret = asyncdl._split(0,1000,1)
assert ret == [0]
ret = asyncdl._split(1000,1000,10)
assert ret == [1000]
ret = asyncdl._split(0,1000,5)
assert ret == [0,200,400,600,800]
ret = asyncdl._split(10.5, 700.3, 5)
assert ret == [10, 148, 286, 424, 562]
ret = asyncdl._split(0,500,5)
assert ret == [0,125,250,375]
ret = asyncdl._split(0,500,500)
assert ret == [0,125,250,375]
ret = asyncdl._split(-1,1000,5)
assert ret == [-1, 199, 399, 599, 799]
"""invalid argument order"""
try:
ret = asyncdl._split(500,0,5)
assert False
except ValueError:
assert True
"""invalid count"""
try:
ret = asyncdl._split(0,500,-1)
assert False
except ValueError:
assert True
try:
ret = asyncdl._split(0,500,0)
assert False
except ValueError:
assert True
"""invalid argument type"""
try:
ret = asyncdl._split(0,5000,5.2)
assert False
except ValueError:
assert True
try:
ret = asyncdl._split(0,5000,"test")
assert False
except ValueError:
assert True
try:
ret = asyncdl._split([0,1],5000,5)
assert False
except ValueError:
assert True

View File

@@ -1,60 +1,66 @@
import aiohttp
import asyncio
import json
import os, sys
import time
from pytchat.tool.extract import duplcheck
from pytchat.tool.extract import parser
from pytchat.tool.extract.block import Block
from pytchat.tool.extract.duplcheck import _dump
def _open_file(path):
with open(path,mode ='r',encoding = 'utf-8') as f:
return f.read()
def _open_file(path):
with open(path, mode='r', encoding='utf-8') as f:
return f.read()
def test_overlap():
"""
test overlap data
test overlap data
operation : [0] [2] [3] [4] -> last :align to end
[1] , [5] -> no change
"""
def load_chatdata(filename):
return parser.parse(
json.loads(_open_file("tests/testdata/extract_duplcheck/overlap/"+filename))
json.loads(_open_file(
"tests/testdata/extract_duplcheck/overlap/" + filename))
)[1]
blocks = (
Block(first = 0, last= 12771, end= 9890,chat_data = load_chatdata("dp0-0.json")),
Block(first = 9890, last= 15800, end= 20244,chat_data = load_chatdata("dp0-1.json")),
Block(first = 20244,last= 45146, end= 32476,chat_data = load_chatdata("dp0-2.json")),
Block(first = 32476,last= 50520, end= 41380,chat_data = load_chatdata("dp0-3.json")),
Block(first = 41380,last= 62875, end= 52568,chat_data = load_chatdata("dp0-4.json")),
Block(first = 52568,last= 62875, end= 54000,chat_data = load_chatdata("dp0-5.json"),is_last=True)
Block(first=0, last=12771, end=9890,
chat_data=load_chatdata("dp0-0.json")),
Block(first=9890, last=15800, end=20244,
chat_data=load_chatdata("dp0-1.json")),
Block(first=20244, last=45146, end=32476,
chat_data=load_chatdata("dp0-2.json")),
Block(first=32476, last=50520, end=41380,
chat_data=load_chatdata("dp0-3.json")),
Block(first=41380, last=62875, end=52568,
chat_data=load_chatdata("dp0-4.json")),
Block(first=52568, last=62875, end=54000,
chat_data=load_chatdata("dp0-5.json"), is_last=True)
)
result = duplcheck.remove_overlap(blocks)
#dp0-0.json has item offset time is 9890 (equals block[0].end = block[1].first),
#but must be aligne to the most close and smaller value:9779.
# dp0-0.json has item offset time is 9890 (equals block[0].end = block[1].first),
# but must be aligne to the most close and smaller value:9779.
assert result[0].last == 9779
assert result[1].last == 15800
assert result[2].last == 32196
assert result[3].last == 41116
assert result[4].last == 52384
#the last block must be always added to result.
# the last block must be always added to result.
assert result[5].last == 62875
def test_duplicate_head():
def load_chatdata(filename):
return parser.parse(
json.loads(_open_file("tests/testdata/extract_duplcheck/head/"+filename))
json.loads(_open_file(
"tests/testdata/extract_duplcheck/head/" + filename))
)[1]
"""
@@ -69,25 +75,26 @@ def test_duplicate_head():
result : [2] , [4] , [5]
"""
#chat data offsets are ignored.
# chat data offsets are ignored.
blocks = (
Block(first = 0, last = 2500, chat_data = load_chatdata("dp0-0.json")),
Block(first = 0, last =38771, chat_data = load_chatdata("dp0-1.json")),
Block(first = 0, last =45146, chat_data = load_chatdata("dp0-2.json")),
Block(first = 20244, last =60520, chat_data = load_chatdata("dp0-3.json")),
Block(first = 20244, last =62875, chat_data = load_chatdata("dp0-4.json")),
Block(first = 52568, last =62875, chat_data = load_chatdata("dp0-5.json"))
Block(first=0, last=2500, chat_data=load_chatdata("dp0-0.json")),
Block(first=0, last=38771, chat_data=load_chatdata("dp0-1.json")),
Block(first=0, last=45146, chat_data=load_chatdata("dp0-2.json")),
Block(first=20244, last=60520, chat_data=load_chatdata("dp0-3.json")),
Block(first=20244, last=62875, chat_data=load_chatdata("dp0-4.json")),
Block(first=52568, last=62875, chat_data=load_chatdata("dp0-5.json"))
)
_dump(blocks)
result = duplcheck.remove_duplicate_head(blocks)
assert len(result) == 3
assert result[0].first == blocks[2].first
assert result[0].last == blocks[2].last
assert result[0].last == blocks[2].last
assert result[1].first == blocks[4].first
assert result[1].last == blocks[4].last
assert result[1].last == blocks[4].last
assert result[2].first == blocks[5].first
assert result[2].last == blocks[5].last
assert result[2].last == blocks[5].last
def test_duplicate_tail():
"""
@@ -103,26 +110,25 @@ def test_duplicate_tail():
"""
def load_chatdata(filename):
return parser.parse(
json.loads(_open_file("tests/testdata/extract_duplcheck/head/"+filename))
json.loads(_open_file(
"tests/testdata/extract_duplcheck/head/" + filename))
)[1]
#chat data offsets are ignored.
# chat data offsets are ignored.
blocks = (
Block(first = 0,last = 2500, chat_data=load_chatdata("dp0-0.json")),
Block(first = 1500,last = 2500, chat_data=load_chatdata("dp0-1.json")),
Block(first = 10000,last = 45146, chat_data=load_chatdata("dp0-2.json")),
Block(first = 20244,last = 45146, chat_data=load_chatdata("dp0-3.json")),
Block(first = 20244,last = 62875, chat_data=load_chatdata("dp0-4.json")),
Block(first = 52568,last = 62875, chat_data=load_chatdata("dp0-5.json"))
Block(first=0, last=2500, chat_data=load_chatdata("dp0-0.json")),
Block(first=1500, last=2500, chat_data=load_chatdata("dp0-1.json")),
Block(first=10000, last=45146, chat_data=load_chatdata("dp0-2.json")),
Block(first=20244, last=45146, chat_data=load_chatdata("dp0-3.json")),
Block(first=20244, last=62875, chat_data=load_chatdata("dp0-4.json")),
Block(first=52568, last=62875, chat_data=load_chatdata("dp0-5.json"))
)
result = duplcheck.remove_duplicate_tail(blocks)
_dump(result)
assert len(result) == 3
assert result[0].first == blocks[0].first
assert result[0].last == blocks[0].last
assert result[0].last == blocks[0].last
assert result[1].first == blocks[2].first
assert result[1].last == blocks[2].last
assert result[1].last == blocks[2].last
assert result[2].first == blocks[4].first
assert result[2].last == blocks[4].last
assert result[2].last == blocks[4].last

View File

@@ -1,23 +1,19 @@
import aiohttp
import asyncio
import json
import os, sys
import time
from aioresponses import aioresponses
from pytchat.tool.extract import duplcheck
from pytchat.tool.extract import parser
from pytchat.tool.extract.block import Block
from pytchat.tool.extract.patch import Patch, fill, split, set_patch
from pytchat.tool.extract.duplcheck import _dump
from pytchat.tool.extract.patch import Patch, split
def _open_file(path):
with open(path,mode ='r',encoding = 'utf-8') as f:
with open(path, mode='r', encoding='utf-8') as f:
return f.read()
def load_chatdata(filename):
return parser.parse(
json.loads(_open_file("tests/testdata/fetch_patch/"+filename))
)[1]
return parser.parse(
json.loads(_open_file("tests/testdata/fetch_patch/" + filename))
)[1]
def test_split_0():
@@ -61,20 +57,23 @@ def test_split_0():
@fetched patch
|-- patch --|
"""
parent = Block(first=0, last=4000, end=60000, continuation='parent', during_split=True)
child = Block(first=0, last=0, end=60000, continuation='mean', during_split=True)
parent = Block(first=0, last=4000, end=60000,
continuation='parent', during_split=True)
child = Block(first=0, last=0, end=60000,
continuation='mean', during_split=True)
patch = Patch(chats=load_chatdata('pt0-5.json'),
first=32500, last=34000, continuation='patch')
split(parent,child,patch)
first=32500, last=34000, continuation='patch')
split(parent, child, patch)
assert child.continuation == 'patch'
assert parent.last < child.first
assert parent.end == child.first
assert child.first < child.last
assert child.last < child.end
assert parent.during_split == False
assert child.during_split == False
assert parent.during_split is False
assert child.during_split is False
def test_split_1():
"""patch.first <= parent_block.last
@@ -119,14 +118,15 @@ def test_split_1():
child = Block(first=0, last=0, end=60000, continuation='mean', during_split=True)
patch = Patch(chats=load_chatdata('pt0-5.json'),
first=32500, last=34000, continuation='patch')
split(parent,child,patch)
assert parent.last == 33000 #no change
assert parent.end == 60000 #no change
split(parent, child, patch)
assert parent.last == 33000 # no change
assert parent.end == 60000 # no change
assert child.continuation is None
assert parent.during_split == False
assert child.during_split == True #exclude during_split sequence
assert parent.during_split is False
assert child.during_split is True # exclude during_split sequence
def test_split_2():
"""child_block.end < patch.last:
@@ -174,7 +174,7 @@ def test_split_2():
patch = Patch(chats=load_chatdata('pt0-5.json'),
first=32500, last=34000, continuation='patch')
split(parent,child,patch)
split(parent, child, patch)
assert child.continuation is None
assert parent.last < child.first
@@ -182,8 +182,9 @@ def test_split_2():
assert child.first < child.last
assert child.last < child.end
assert child.continuation is None
assert parent.during_split == False
assert child.during_split == False
assert parent.during_split is False
assert child.during_split is False
def test_split_none():
"""patch.last <= parent_block.last
@@ -193,7 +194,7 @@ def test_split_none():
and parent.block.last exceeds patch.first.
In this case, fetched patch is all discarded,
and worker searches other processing block again.
and worker searches other processing block again.
~~~~~~ before ~~~~~~
@@ -229,10 +230,10 @@ def test_split_none():
patch = Patch(chats=load_chatdata('pt0-5.json'),
first=32500, last=34000, continuation='patch')
split(parent,child,patch)
split(parent, child, patch)
assert parent.last == 40000 #no change
assert parent.end == 60000 #no change
assert parent.last == 40000 # no change
assert parent.end == 60000 # no change
assert child.continuation is None
assert parent.during_split == False
assert child.during_split == True #exclude during_split sequence
assert parent.during_split is False
assert child.during_split is True # exclude during_split sequence

View File

@@ -1,5 +1,8 @@
import asyncio
import json
from aioresponses import aioresponses
from pytest_httpx import HTTPXMock
from concurrent.futures import CancelledError
from pytchat.core_multithread.livechat import LiveChat
from pytchat.core_async.livechat import LiveChatAsync
from pytchat.exceptions import ResponseContextError
@@ -9,34 +12,37 @@ def _open_file(path):
return f.read()
@aioresponses()
def test_Async(*mock):
vid = '__test_id__'
_text = _open_file('tests/testdata/paramgen_firstread.json')
_text = json.loads(_text)
mock[0].get(
f"https://www.youtube.com/live_chat?v={vid}&is_popout=1", status=200, body=_text)
def add_response_file(httpx_mock: HTTPXMock, jsonfile_path: str):
testdata = json.loads(_open_file(jsonfile_path))
httpx_mock.add_response(json=testdata)
def test_async(httpx_mock: HTTPXMock):
add_response_file(httpx_mock, 'tests/testdata/paramgen_firstread.json')
async def test_loop():
try:
chat = LiveChatAsync(video_id='__test_id__')
_ = await chat.get()
assert chat.is_alive()
chat.terminate()
assert not chat.is_alive()
except ResponseContextError:
assert False
loop = asyncio.get_event_loop()
try:
chat = LiveChatAsync(video_id='__test_id__')
loop.run_until_complete(test_loop())
except CancelledError:
assert True
def test_multithread(httpx_mock: HTTPXMock):
add_response_file(httpx_mock, 'tests/testdata/paramgen_firstread.json')
try:
chat = LiveChat(video_id='__test_id__')
_ = chat.get()
assert chat.is_alive()
chat.terminate()
assert not chat.is_alive()
except ResponseContextError:
assert not chat.is_alive()
def test_MultiThread(mocker):
_text = _open_file('tests/testdata/paramgen_firstread.json')
_text = json.loads(_text)
responseMock = mocker.Mock()
responseMock.status_code = 200
responseMock.text = _text
mocker.patch('requests.Session.get').return_value = responseMock
try:
chat = LiveChatAsync(video_id='__test_id__')
assert chat.is_alive()
chat.terminate()
assert not chat.is_alive()
except ResponseContextError:
chat.terminate()
assert not chat.is_alive()
assert False

View File

@@ -1,6 +1,6 @@
import asyncio
import re
from aioresponses import aioresponses
import json
from pytest_httpx import HTTPXMock
from concurrent.futures import CancelledError
from pytchat.core_multithread.livechat import LiveChat
from pytchat.core_async.livechat import LiveChatAsync
@@ -12,18 +12,18 @@ def _open_file(path):
return f.read()
@aioresponses()
def test_async_live_stream(*mock):
def add_response_file(httpx_mock: HTTPXMock, jsonfile_path: str):
testdata = json.loads(_open_file(jsonfile_path))
httpx_mock.add_response(json=testdata)
async def test_loop(*mock):
pattern = re.compile(
r'^https://www.youtube.com/live_chat/get_live_chat\?continuation=.*$')
_text = _open_file('tests/testdata/test_stream.json')
mock[0].get(pattern, status=200, body=_text)
def test_async_live_stream(httpx_mock: HTTPXMock):
add_response_file(httpx_mock, 'tests/testdata/test_stream.json')
async def test_loop():
chat = LiveChatAsync(video_id='__test_id__', processor=DummyProcessor())
chats = await chat.get()
rawdata = chats[0]["chatdata"]
# assert fetching livachat data
assert list(rawdata[0]["addChatItemAction"]["item"].keys())[
0] == "liveChatTextMessageRenderer"
assert list(rawdata[1]["addChatItemAction"]["item"].keys())[
@@ -41,25 +41,16 @@ def test_async_live_stream(*mock):
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(test_loop(*mock))
loop.run_until_complete(test_loop())
except CancelledError:
assert True
@aioresponses()
def test_async_replay_stream(*mock):
async def test_loop(*mock):
pattern_live = re.compile(
r'^https://www.youtube.com/live_chat/get_live_chat\?continuation=.*$')
pattern_replay = re.compile(
r'^https://www.youtube.com/live_chat_replay/get_live_chat_replay\?continuation=.*$')
# empty livechat -> switch to fetch replaychat
_text_live = _open_file('tests/testdata/finished_live.json')
_text_replay = _open_file('tests/testdata/chatreplay.json')
mock[0].get(pattern_live, status=200, body=_text_live)
mock[0].get(pattern_replay, status=200, body=_text_replay)
def test_async_replay_stream(httpx_mock: HTTPXMock):
add_response_file(httpx_mock, 'tests/testdata/finished_live.json')
add_response_file(httpx_mock, 'tests/testdata/chatreplay.json')
async def test_loop():
chat = LiveChatAsync(video_id='__test_id__', processor=DummyProcessor())
chats = await chat.get()
rawdata = chats[0]["chatdata"]
@@ -71,27 +62,16 @@ def test_async_replay_stream(*mock):
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(test_loop(*mock))
loop.run_until_complete(test_loop())
except CancelledError:
assert True
@aioresponses()
def test_async_force_replay(*mock):
def test_async_force_replay(httpx_mock: HTTPXMock):
add_response_file(httpx_mock, 'tests/testdata/test_stream.json')
add_response_file(httpx_mock, 'tests/testdata/chatreplay.json')
async def test_loop(*mock):
pattern_live = re.compile(
r'^https://www.youtube.com/live_chat/get_live_chat\?continuation=.*$')
pattern_replay = re.compile(
r'^https://www.youtube.com/live_chat_replay/get_live_chat_replay\?continuation=.*$')
# valid live data, but force_replay = True
_text_live = _open_file('tests/testdata/test_stream.json')
# valid replay data
_text_replay = _open_file('tests/testdata/chatreplay.json')
mock[0].get(pattern_live, status=200, body=_text_live)
mock[0].get(pattern_replay, status=200, body=_text_replay)
# force replay
async def test_loop():
chat = LiveChatAsync(
video_id='__test_id__', processor=DummyProcessor(), force_replay=True)
chats = await chat.get()
@@ -105,20 +85,13 @@ def test_async_force_replay(*mock):
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(test_loop(*mock))
loop.run_until_complete(test_loop())
except CancelledError:
assert True
def test_multithread_live_stream(mocker):
_text = _open_file('tests/testdata/test_stream.json')
responseMock = mocker.Mock()
responseMock.status_code = 200
responseMock.text = _text
mocker.patch(
'requests.Session.get').return_value.__enter__.return_value = responseMock
def test_multithread_live_stream(httpx_mock: HTTPXMock):
add_response_file(httpx_mock, 'tests/testdata/test_stream.json')
chat = LiveChat(video_id='__test_id__', processor=DummyProcessor())
chats = chat.get()
rawdata = chats[0]["chatdata"]

View File

@@ -1,21 +1,18 @@
from pytchat.parser.live import Parser
import json
from aioresponses import aioresponses
from pytchat.exceptions import NoContents
parser = Parser(is_replay=False)
def _open_file(path):
with open(path, mode='r', encoding='utf-8') as f:
return f.read()
parser = Parser(is_replay=False)
@aioresponses()
def test_finishedlive(*mock):
'''配信が終了した動画を正しく処理できるか'''
_text = _open_file('tests/testdata/finished_live.json')
_text = json.loads(_text)
@@ -26,10 +23,8 @@ def test_finishedlive(*mock):
assert True
@aioresponses()
def test_parsejson(*mock):
'''jsonを正常にパースできるか'''
_text = _open_file('tests/testdata/paramgen_firstread.json')
_text = json.loads(_text)

View File

@@ -1,5 +1,7 @@
from json.decoder import JSONDecodeError
from pytchat.tool.videoinfo import VideoInfo
from pytchat.exceptions import InvalidVideoIdException
from pytchat.exceptions import InvalidVideoIdException, PatternUnmatchError
from pytchat import util
def _open_file(path):
@@ -12,13 +14,13 @@ def _set_test_data(filepath, mocker):
response_mock = mocker.Mock()
response_mock.status_code = 200
response_mock.text = _text
mocker.patch('requests.get').return_value = response_mock
mocker.patch('httpx.get').return_value = response_mock
def test_archived_page(mocker):
_set_test_data('tests/testdata/videoinfo/archived_page.txt', mocker)
info = VideoInfo('__test_id__')
actual_thumbnail_url = 'https://i.ytimg.com/vi/fzI9FNjXQ0o/hqdefault.jpg'
actual_thumbnail_url = 'https://i.ytimg.com/vi/fzI9FNjXQ0o/hqdefault.jpg'
assert info.video_id == '__test_id__'
assert info.get_channel_name() == 'GitHub'
assert info.get_thumbnail() == actual_thumbnail_url
@@ -64,3 +66,25 @@ def test_no_info(mocker):
assert info.get_title() is None
assert info.get_channel_id() is None
assert info.get_duration() is None
def test_collapsed_data(mocker):
'''Test case the video page's info is collapsed.'''
_set_test_data(
'tests/testdata/videoinfo/collapsed_page.txt', mocker)
try:
_ = VideoInfo('__test_id__')
assert False
except JSONDecodeError:
assert True
def test_pattern_unmatch(mocker):
'''Test case the pattern for extraction is unmatched.'''
_set_test_data(
'tests/testdata/videoinfo/pattern_unmatch.txt', mocker)
try:
_ = VideoInfo('__test_id__')
assert False
except PatternUnmatchError:
assert True

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long