Merge branch 'feature/url_pattern' into develop

This commit is contained in:
taizan-hokuto
2020-07-24 14:52:06 +09:00
13 changed files with 135 additions and 38 deletions

View File

@@ -7,7 +7,7 @@ pytchat is a python library for fetching youtube live chat.
pytchat is a python library for fetching youtube live chat pytchat is a python library for fetching youtube live chat
without using youtube api, Selenium or BeautifulSoup. without using youtube api, Selenium or BeautifulSoup.
pytchatはAPIを使わずにYouTubeチャットを取得するためのpythonライブラリです。 pytchatはYouTubeチャットを閲覧するためのpythonライブラリです。
Other features: Other features:
+ Customizable [chat data processors](https://github.com/taizan-hokuto/pytchat/wiki/ChatProcessor) including youtube api compatible one. + Customizable [chat data processors](https://github.com/taizan-hokuto/pytchat/wiki/ChatProcessor) including youtube api compatible one.
@@ -30,10 +30,9 @@ One-liner command.
Save chat data to html, with embedded custom emojis. Save chat data to html, with embedded custom emojis.
```bash ```bash
$ pytchat -v ZJ6Q4U_Vg6s -o "c:/temp/" $ pytchat -v https://www.youtube.com/watch?v=ZJ6Q4U_Vg6s -o "c:/temp/"
# options: # options:
# -v : video_id # -v : Video ID or URL that includes ID
# -o : output directory (default path: './') # -o : output directory (default path: './')
# saved filename is [video_id].html # saved filename is [video_id].html
``` ```
@@ -43,7 +42,8 @@ $ pytchat -v ZJ6Q4U_Vg6s -o "c:/temp/"
```python ```python
from pytchat import LiveChat from pytchat import LiveChat
livechat = LiveChat(video_id = "Zvp1pJpie4I") livechat = LiveChat(video_id = "Zvp1pJpie4I")
# It is also possible to specify a URL that includes the video ID:
# livechat = LiveChat("https://www.youtube.com/watch?v=Zvp1pJpie4I")
while livechat.is_alive(): while livechat.is_alive():
try: try:
chatdata = livechat.get() chatdata = livechat.get()

View File

@@ -1,5 +1,6 @@
import argparse import argparse
from pathlib import Path from pathlib import Path
from pytchat.util.extract_video_id import extract_video_id
from .arguments import Arguments from .arguments import Arguments
from .. exceptions import InvalidVideoIdException, NoContents from .. exceptions import InvalidVideoIdException, NoContents
from .. processors.html_archiver import HTMLArchiver from .. processors.html_archiver import HTMLArchiver
@@ -19,16 +20,19 @@ https://github.com/PetterKraabol/Twitch-Chat-Downloader
def main(): def main():
# Arguments # Arguments
parser = argparse.ArgumentParser(description=f'pytchat v{__version__}') parser = argparse.ArgumentParser(description=f'pytchat v{__version__}')
parser.add_argument('-v', f'--{Arguments.Name.VIDEO}', type=str, # parser.add_argument('VideoID_or_URL', type=str, default='__NONE__',nargs='?',
help='Video IDs separated by commas without space.\n' # help='Video ID, or URL that includes id.\n'
# 'If ID starts with a hyphen (-), enclose the ID in square brackets.')
parser.add_argument('-v', f'--{Arguments.Name.VIDEO_IDS}', type=str,
help='Video ID (or URL that includes Video ID). You can specify multiple video IDs by separating them with commas without spaces.\n'
'If ID starts with a hyphen (-), enclose the ID in square brackets.') 'If ID starts with a hyphen (-), enclose the ID in square brackets.')
parser.add_argument('-o', f'--{Arguments.Name.OUTPUT}', type=str, parser.add_argument('-o', f'--{Arguments.Name.OUTPUT}', type=str,
help='Output directory (end with "/"). default="./"', default='./') help='Output directory (end with "/"). default="./"', default='./')
parser.add_argument(f'--{Arguments.Name.VERSION}', action='store_true', parser.add_argument(f'--{Arguments.Name.VERSION}', action='store_true',
help='Settings version') help='Show version')
Arguments(parser.parse_args().__dict__) Arguments(parser.parse_args().__dict__)
if Arguments().print_version: if Arguments().print_version:
print(f'pytchat v{__version__}') print(f'pytchat v{__version__} © 2019 taizan-hokuto')
return return
# Extractor # Extractor
@@ -50,7 +54,9 @@ def main():
callback=_disp_progress callback=_disp_progress
).extract() ).extract()
print("\nExtraction end.\n") print("\nExtraction end.\n")
except (InvalidVideoIdException, NoContents) as e: except InvalidVideoIdException:
print("Invalid Video ID or URL:", video_id)
except (TypeError, NoContents) as e:
print(e) print(e)
return return
parser.print_help() parser.print_help()

View File

@@ -16,8 +16,8 @@ class Arguments(metaclass=Singleton):
class Name: class Name:
VERSION: str = 'version' VERSION: str = 'version'
OUTPUT: str = 'output' OUTPUT: str = 'output_dir'
VIDEO: str = 'video' VIDEO_IDS: str = 'video_id'
def __init__(self, def __init__(self,
arguments: Optional[Dict[str, Union[str, bool, int]]] = None): arguments: Optional[Dict[str, Union[str, bool, int]]] = None):
@@ -35,6 +35,9 @@ class Arguments(metaclass=Singleton):
self.output: str = arguments[Arguments.Name.OUTPUT] self.output: str = arguments[Arguments.Name.OUTPUT]
self.video_ids: List[int] = [] self.video_ids: List[int] = []
# Videos # Videos
if arguments[Arguments.Name.VIDEO]: if arguments[Arguments.Name.VIDEO_IDS]:
self.video_ids = [video_id self.video_ids = [video_id
for video_id in arguments[Arguments.Name.VIDEO].split(',')] for video_id in arguments[Arguments.Name.VIDEO_IDS].split(',')]

View File

@@ -15,6 +15,7 @@ from .. import exceptions
from ..paramgen import liveparam, arcparam from ..paramgen import liveparam, arcparam
from ..processors.default.processor import DefaultProcessor from ..processors.default.processor import DefaultProcessor
from ..processors.combinator import Combinator from ..processors.combinator import Combinator
from ..util.extract_video_id import extract_video_id
headers = config.headers headers = config.headers
MAX_RETRY = 10 MAX_RETRY = 10
@@ -86,7 +87,7 @@ class LiveChatAsync:
topchat_only=False, topchat_only=False,
logger=config.logger(__name__), logger=config.logger(__name__),
): ):
self._video_id = video_id self._video_id = extract_video_id(video_id)
self.seektime = seektime self.seektime = seektime
if isinstance(processor, tuple): if isinstance(processor, tuple):
self.processor = Combinator(processor) self.processor = Combinator(processor)

View File

@@ -14,6 +14,7 @@ from .. import exceptions
from ..paramgen import liveparam, arcparam from ..paramgen import liveparam, arcparam
from ..processors.default.processor import DefaultProcessor from ..processors.default.processor import DefaultProcessor
from ..processors.combinator import Combinator from ..processors.combinator import Combinator
from ..util.extract_video_id import extract_video_id
headers = config.headers headers = config.headers
MAX_RETRY = 10 MAX_RETRY = 10
@@ -84,7 +85,7 @@ class LiveChat:
topchat_only=False, topchat_only=False,
logger=config.logger(__name__) logger=config.logger(__name__)
): ):
self._video_id = video_id self._video_id = extract_video_id(video_id)
self.seektime = seektime self.seektime = seektime
if isinstance(processor, tuple): if isinstance(processor, tuple):
self.processor = Combinator(processor) self.processor = Combinator(processor)

View File

@@ -47,7 +47,7 @@ class HTMLArchiver(ChatProcessor):
super().__init__() super().__init__()
self.save_path = self._checkpath(save_path) self.save_path = self._checkpath(save_path)
self.processor = DefaultProcessor() self.processor = DefaultProcessor()
self.emoji_table = {} # table for custom emojis. key: emoji_id, value: base64 encoded image binary. self.emoji_table = {} # tuble for custom emojis. key: emoji_id, value: base64 encoded image binary.
self.header = [HEADER_HTML] self.header = [HEADER_HTML]
self.body = ['<body>\n', '<table class="css">\n', self._parse_table_header(fmt_headers)] self.body = ['<body>\n', '<table class="css">\n', self._parse_table_header(fmt_headers)]

View File

@@ -3,6 +3,7 @@ from . import duplcheck
from .. videoinfo import VideoInfo from .. videoinfo import VideoInfo
from ... import config from ... import config
from ... exceptions import InvalidVideoIdException from ... exceptions import InvalidVideoIdException
from ... util.extract_video_id import extract_video_id
logger = config.logger(__name__) logger = config.logger(__name__)
headers = config.headers headers = config.headers
@@ -14,7 +15,7 @@ class Extractor:
raise ValueError('div must be positive integer.') raise ValueError('div must be positive integer.')
elif div > 10: elif div > 10:
div = 10 div = 10
self.video_id = video_id self.video_id = extract_video_id(video_id)
self.div = div self.div = div
self.callback = callback self.callback = callback
self.processor = processor self.processor = processor

View File

@@ -3,6 +3,7 @@ import re
import requests import requests
from .. import config from .. import config
from ..exceptions import InvalidVideoIdException from ..exceptions import InvalidVideoIdException
from ..util.extract_video_id import extract_video_id
headers = config.headers headers = config.headers
@@ -78,8 +79,8 @@ class VideoInfo:
''' '''
def __init__(self, video_id): def __init__(self, video_id):
self.video_id = video_id self.video_id = extract_video_id(video_id)
text = self._get_page_text(video_id) text = self._get_page_text(self.video_id)
self._parse(text) self._parse(text)
def _get_page_text(self, video_id): def _get_page_text(self, video_id):

View File

@@ -0,0 +1,25 @@
import re
from .. exceptions import InvalidVideoIdException
PATTERN = re.compile(r"((?<=(v|V)/)|(?<=be/)|(?<=(\?|\&)v=)|(?<=embed/))([\w-]+)")
YT_VIDEO_ID_LENGTH = 11
def extract_video_id(url_or_id: str) -> str:
ret = ''
if type(url_or_id) != str:
raise TypeError(f"{url_or_id}: URL or VideoID must be str, but {type(url_or_id)} is passed.")
if len(url_or_id) == YT_VIDEO_ID_LENGTH:
return url_or_id
match = re.search(PATTERN, url_or_id)
if match is None:
raise InvalidVideoIdException(url_or_id)
try:
ret = match.group(4)
except IndexError:
raise InvalidVideoIdException(url_or_id)
if ret is None or len(ret) != YT_VIDEO_ID_LENGTH:
raise InvalidVideoIdException(url_or_id)
return ret

View File

@@ -0,0 +1,55 @@
from pytchat.util.extract_video_id import extract_video_id
from pytchat.exceptions import InvalidVideoIdException
VALID_TEST_PATTERNS = (
("ABC_EFG_IJK", "ABC_EFG_IJK"),
("vid_test_be", "vid_test_be"),
("https://www.youtube.com/watch?v=123_456_789", "123_456_789"),
("https://www.youtube.com/watch?v=123_456_789&t=123s", "123_456_789"),
("www.youtube.com/watch?v=123_456_789", "123_456_789"),
("watch?v=123_456_789", "123_456_789"),
("youtube.com/watch?v=123_456_789", "123_456_789"),
("http://youtu.be/ABC_EFG_IJK", "ABC_EFG_IJK"),
("youtu.be/ABC_EFG_IJK", "ABC_EFG_IJK"),
("https://www.youtube.com/watch?v=ABC_EFG_IJK&list=XYZ_ABC_12345&start_radio=1&t=1", "ABC_EFG_IJK"),
("https://www.youtube.com/embed/ABC_EFG_IJK", "ABC_EFG_IJK"),
("www.youtube.com/embed/ABC_EFG_IJK", "ABC_EFG_IJK"),
("youtube.com/embed/ABC_EFG_IJK", "ABC_EFG_IJK")
)
INVALID_TEST_PATTERNS = (
("", ""),
("0123456789", "0123456789"), # less than 11 letters id
("more_than_11_letter_string", "more_than_11_letter_string"),
("https://www.youtube.com/watch?v=more_than_11_letter_string", "more_than_11_letter_string"),
("https://www.youtube.com/channel/123_456_789", "123_456_789"),
)
TYPEERROR_TEST_PATTERNS = (
(100, 100), # not string
(["123_456_789"], "123_456_789"), # not string
)
def test_extract_valid_pattern():
for pattern in VALID_TEST_PATTERNS:
ret = extract_video_id(pattern[0])
assert ret == pattern[1]
def test_extract_invalid_pattern():
for pattern in INVALID_TEST_PATTERNS:
try:
extract_video_id(pattern[0])
assert False
except InvalidVideoIdException:
assert True
def test_extract_typeerror_pattern():
for pattern in TYPEERROR_TEST_PATTERNS:
try:
extract_video_id(pattern[0])
assert False
except TypeError:
assert True

View File

@@ -11,13 +11,13 @@ def _open_file(path):
@aioresponses() @aioresponses()
def test_Async(*mock): def test_Async(*mock):
vid = '' vid = '__test_id__'
_text = _open_file('tests/testdata/paramgen_firstread.json') _text = _open_file('tests/testdata/paramgen_firstread.json')
_text = json.loads(_text) _text = json.loads(_text)
mock[0].get( mock[0].get(
f"https://www.youtube.com/live_chat?v={vid}&is_popout=1", status=200, body=_text) f"https://www.youtube.com/live_chat?v={vid}&is_popout=1", status=200, body=_text)
try: try:
chat = LiveChatAsync(video_id='') chat = LiveChatAsync(video_id='__test_id__')
assert chat.is_alive() assert chat.is_alive()
chat.terminate() chat.terminate()
assert not chat.is_alive() assert not chat.is_alive()
@@ -33,7 +33,7 @@ def test_MultiThread(mocker):
responseMock.text = _text responseMock.text = _text
mocker.patch('requests.Session.get').return_value = responseMock mocker.patch('requests.Session.get').return_value = responseMock
try: try:
chat = LiveChatAsync(video_id='') chat = LiveChatAsync(video_id='__test_id__')
assert chat.is_alive() assert chat.is_alive()
chat.terminate() chat.terminate()
assert not chat.is_alive() assert not chat.is_alive()

View File

@@ -20,7 +20,7 @@ def test_async_live_stream(*mock):
r'^https://www.youtube.com/live_chat/get_live_chat\?continuation=.*$') r'^https://www.youtube.com/live_chat/get_live_chat\?continuation=.*$')
_text = _open_file('tests/testdata/test_stream.json') _text = _open_file('tests/testdata/test_stream.json')
mock[0].get(pattern, status=200, body=_text) mock[0].get(pattern, status=200, body=_text)
chat = LiveChatAsync(video_id='', processor=DummyProcessor()) chat = LiveChatAsync(video_id='__test_id__', processor=DummyProcessor())
chats = await chat.get() chats = await chat.get()
rawdata = chats[0]["chatdata"] rawdata = chats[0]["chatdata"]
# assert fetching livachat data # assert fetching livachat data
@@ -60,7 +60,7 @@ def test_async_replay_stream(*mock):
mock[0].get(pattern_live, status=200, body=_text_live) mock[0].get(pattern_live, status=200, body=_text_live)
mock[0].get(pattern_replay, status=200, body=_text_replay) mock[0].get(pattern_replay, status=200, body=_text_replay)
chat = LiveChatAsync(video_id='', processor=DummyProcessor()) chat = LiveChatAsync(video_id='__test_id__', processor=DummyProcessor())
chats = await chat.get() chats = await chat.get()
rawdata = chats[0]["chatdata"] rawdata = chats[0]["chatdata"]
# assert fetching replaychat data # assert fetching replaychat data
@@ -93,7 +93,7 @@ def test_async_force_replay(*mock):
mock[0].get(pattern_replay, status=200, body=_text_replay) mock[0].get(pattern_replay, status=200, body=_text_replay)
# force replay # force replay
chat = LiveChatAsync( chat = LiveChatAsync(
video_id='', processor=DummyProcessor(), force_replay=True) video_id='__test_id__', processor=DummyProcessor(), force_replay=True)
chats = await chat.get() chats = await chat.get()
rawdata = chats[0]["chatdata"] rawdata = chats[0]["chatdata"]
# assert fetching replaychat data # assert fetching replaychat data
@@ -119,7 +119,7 @@ def test_multithread_live_stream(mocker):
mocker.patch( mocker.patch(
'requests.Session.get').return_value.__enter__.return_value = responseMock 'requests.Session.get').return_value.__enter__.return_value = responseMock
chat = LiveChat(video_id='test_id', processor=DummyProcessor()) chat = LiveChat(video_id='__test_id__', processor=DummyProcessor())
chats = chat.get() chats = chat.get()
rawdata = chats[0]["chatdata"] rawdata = chats[0]["chatdata"]
# assert fetching livachat data # assert fetching livachat data

View File

@@ -1,11 +1,12 @@
from pytchat.tool.videoinfo import VideoInfo from pytchat.tool.videoinfo import VideoInfo
from pytchat.exceptions import InvalidVideoIdException from pytchat.exceptions import InvalidVideoIdException
import pytest
def _open_file(path): def _open_file(path):
with open(path, mode='r', encoding='utf-8') as f: with open(path, mode='r', encoding='utf-8') as f:
return f.read() return f.read()
def _set_test_data(filepath, mocker): def _set_test_data(filepath, mocker):
_text = _open_file(filepath) _text = _open_file(filepath)
response_mock = mocker.Mock() response_mock = mocker.Mock()
@@ -13,23 +14,25 @@ def _set_test_data(filepath, mocker):
response_mock.text = _text response_mock.text = _text
mocker.patch('requests.get').return_value = response_mock mocker.patch('requests.get').return_value = response_mock
def test_archived_page(mocker): def test_archived_page(mocker):
_set_test_data('tests/testdata/videoinfo/archived_page.txt', mocker) _set_test_data('tests/testdata/videoinfo/archived_page.txt', mocker)
info = VideoInfo('test_id') info = VideoInfo('__test_id__')
actual_thumbnail_url = 'https://i.ytimg.com/vi/fzI9FNjXQ0o/hqdefault.jpg' actual_thumbnail_url = 'https://i.ytimg.com/vi/fzI9FNjXQ0o/hqdefault.jpg'
assert info.video_id == 'test_id' assert info.video_id == '__test_id__'
assert info.get_channel_name() == 'GitHub' assert info.get_channel_name() == 'GitHub'
assert info.get_thumbnail() == actual_thumbnail_url assert info.get_thumbnail() == actual_thumbnail_url
assert info.get_title() == 'GitHub Arctic Code Vault' assert info.get_title() == 'GitHub Arctic Code Vault'
assert info.get_channel_id() == 'UC7c3Kb6jYCRj4JOHHZTxKsQ' assert info.get_channel_id() == 'UC7c3Kb6jYCRj4JOHHZTxKsQ'
assert info.get_duration() == 148 assert info.get_duration() == 148
def test_live_page(mocker): def test_live_page(mocker):
_set_test_data('tests/testdata/videoinfo/live_page.txt', mocker) _set_test_data('tests/testdata/videoinfo/live_page.txt', mocker)
info = VideoInfo('test_id') info = VideoInfo('__test_id__')
'''live page :duration = 0''' '''live page :duration = 0'''
assert info.get_duration() == 0 assert info.get_duration() == 0
assert info.video_id == 'test_id' assert info.video_id == '__test_id__'
assert info.get_channel_name() == 'BGM channel' assert info.get_channel_name() == 'BGM channel'
assert info.get_thumbnail() == \ assert info.get_thumbnail() == \
'https://i.ytimg.com/vi/fEvM-OUbaKs/hqdefault_live.jpg' 'https://i.ytimg.com/vi/fEvM-OUbaKs/hqdefault_live.jpg'
@@ -38,25 +41,26 @@ def test_live_page(mocker):
' - 24/7 Live Stream - Slow Jazz') ' - 24/7 Live Stream - Slow Jazz')
assert info.get_channel_id() == 'UCQINXHZqCU5i06HzxRkujfg' assert info.get_channel_id() == 'UCQINXHZqCU5i06HzxRkujfg'
def test_invalid_video_id(mocker): def test_invalid_video_id(mocker):
'''Test case invalid video_id is specified.''' '''Test case invalid video_id is specified.'''
_set_test_data( _set_test_data(
'tests/testdata/videoinfo/invalid_video_id_page.txt', mocker) 'tests/testdata/videoinfo/invalid_video_id_page.txt', mocker)
try: try:
_ = VideoInfo('test_id') _ = VideoInfo('__test_id__')
assert False assert False
except InvalidVideoIdException: except InvalidVideoIdException:
assert True assert True
def test_no_info(mocker): def test_no_info(mocker):
'''Test case the video page has renderer, but no info.''' '''Test case the video page has renderer, but no info.'''
_set_test_data( _set_test_data(
'tests/testdata/videoinfo/no_info_page.txt', mocker) 'tests/testdata/videoinfo/no_info_page.txt', mocker)
info = VideoInfo('test_id') info = VideoInfo('__test_id__')
assert info.video_id == 'test_id' assert info.video_id == '__test_id__'
assert info.get_channel_name() is None assert info.get_channel_name() is None
assert info.get_thumbnail() is None assert info.get_thumbnail() is None
assert info.get_title() is None assert info.get_title() is None
assert info.get_channel_id() is None assert info.get_channel_id() is None
assert info.get_duration() is None assert info.get_duration() is None