Compare commits
24 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a5c7ba52c8 | ||
|
|
c37201fa03 | ||
|
|
a474899268 | ||
|
|
3f72eb0e00 | ||
|
|
4652a56bc6 | ||
|
|
35218a66da | ||
|
|
3432609588 | ||
|
|
48669e5f53 | ||
|
|
f46df3ae42 | ||
|
|
96c028bd5d | ||
|
|
402dc15d7a | ||
|
|
6088ab6932 | ||
|
|
d98d34d8b3 | ||
|
|
24fa104e84 | ||
|
|
b4dad8c641 | ||
|
|
3550cd6d91 | ||
|
|
2815b48e0e | ||
|
|
650e6ccb65 | ||
|
|
4a00a19a43 | ||
|
|
b067eda7b6 | ||
|
|
1b6bc86e76 | ||
|
|
da2b513bcc | ||
|
|
6adae578ef | ||
|
|
086a14115f |
@@ -2,7 +2,7 @@
|
||||
pytchat is a lightweight python library to browse youtube livechat without Selenium or BeautifulSoup.
|
||||
"""
|
||||
__copyright__ = 'Copyright (C) 2019, 2020 taizan-hokuto'
|
||||
__version__ = '0.4.4'
|
||||
__version__ = '0.4.6'
|
||||
__license__ = 'MIT'
|
||||
__author__ = 'taizan-hokuto'
|
||||
__author_email__ = '55448286+taizan-hokuto@users.noreply.github.com'
|
||||
|
||||
@@ -1,31 +1,21 @@
|
||||
import argparse
|
||||
import asyncio
|
||||
try:
|
||||
from asyncio import CancelledError
|
||||
except ImportError:
|
||||
from asyncio.futures import CancelledError
|
||||
import os
|
||||
import signal
|
||||
from json.decoder import JSONDecodeError
|
||||
from pathlib import Path
|
||||
from httpcore import ReadTimeout as HCReadTimeout, NetworkError as HCNetworkError
|
||||
from .arguments import Arguments
|
||||
from .echo import Echo
|
||||
from .progressbar import ProgressBar
|
||||
from .. exceptions import InvalidVideoIdException, NoContents, PatternUnmatchError, UnknownConnectionError
|
||||
from .. processors.html_archiver import HTMLArchiver
|
||||
from .. tool.extract.extractor import Extractor
|
||||
from .. tool.videoinfo import VideoInfo
|
||||
from .. util.extract_video_id import extract_video_id
|
||||
from .. import util
|
||||
from .. exceptions import InvalidVideoIdException
|
||||
from .. import __version__
|
||||
from .cli_extractor import CLIExtractor
|
||||
|
||||
|
||||
'''
|
||||
Most of CLI modules refer to
|
||||
Petter Kraabøl's Twitch-Chat-Downloader
|
||||
https://github.com/PetterKraabol/Twitch-Chat-Downloader
|
||||
(MIT License)
|
||||
|
||||
'''
|
||||
|
||||
|
||||
@@ -38,20 +28,19 @@ def main():
|
||||
'If ID starts with a hyphen (-), enclose the ID in square brackets.')
|
||||
parser.add_argument('-o', f'--{Arguments.Name.OUTPUT}', type=str,
|
||||
help='Output directory (end with "/"). default="./"', default='./')
|
||||
parser.add_argument(f'--{Arguments.Name.SAVE_ERROR_DATA}', action='store_true',
|
||||
help='Save error data when error occurs(".dat" file)')
|
||||
parser.add_argument(f'--{Arguments.Name.DEBUG}', action='store_true',
|
||||
help='Debug mode. Stop when exceptions have occurred and save error data (".dat" file).')
|
||||
parser.add_argument(f'--{Arguments.Name.VERSION}', action='store_true',
|
||||
help='Show version')
|
||||
help='Show version.')
|
||||
parser.add_argument(f'--{Arguments.Name.ECHO}', action='store_true',
|
||||
help='Show chats of specified video')
|
||||
help='Display chats of specified video.')
|
||||
|
||||
Arguments(parser.parse_args().__dict__)
|
||||
|
||||
if Arguments().print_version:
|
||||
print(f'pytchat v{__version__} © 2019,2020 taizan-hokuto')
|
||||
print(f'pytchat v{__version__} © 2019, 2020 taizan-hokuto')
|
||||
return
|
||||
|
||||
# Extractor
|
||||
if not Arguments().video_ids:
|
||||
parser.print_help()
|
||||
return
|
||||
@@ -59,7 +48,7 @@ def main():
|
||||
# Echo
|
||||
if Arguments().echo:
|
||||
if len(Arguments().video_ids) > 1:
|
||||
print("You can specify only one video ID.")
|
||||
print("When using --echo option, only one video ID can be specified.")
|
||||
return
|
||||
try:
|
||||
Echo(Arguments().video_ids[0]).run()
|
||||
@@ -67,111 +56,16 @@ def main():
|
||||
print("Invalid video id:", str(e))
|
||||
except Exception as e:
|
||||
print(type(e), str(e))
|
||||
if Arguments().debug:
|
||||
raise
|
||||
finally:
|
||||
return
|
||||
|
||||
# Extractor
|
||||
if not os.path.exists(Arguments().output):
|
||||
print("\nThe specified directory does not exist.:{}\n".format(Arguments().output))
|
||||
return
|
||||
try:
|
||||
Runner().run()
|
||||
CLIExtractor().run()
|
||||
except CancelledError as e:
|
||||
print(str(e))
|
||||
|
||||
|
||||
class Runner:
|
||||
|
||||
def run(self) -> None:
|
||||
ex = None
|
||||
pbar = None
|
||||
for counter, video_id in enumerate(Arguments().video_ids):
|
||||
if len(Arguments().video_ids) > 1:
|
||||
print(f"\n{'-' * 10} video:{counter + 1} of {len(Arguments().video_ids)} {'-' * 10}")
|
||||
|
||||
try:
|
||||
video_id = extract_video_id(video_id)
|
||||
separated_path = str(Path(Arguments().output)) + os.path.sep
|
||||
path = util.checkpath(separated_path + video_id + '.html')
|
||||
try:
|
||||
info = VideoInfo(video_id)
|
||||
except (PatternUnmatchError, JSONDecodeError) as e:
|
||||
print("Cannot parse video information.:{} {}".format(video_id, type(e)))
|
||||
if Arguments().save_error_data:
|
||||
util.save(str(e.doc), "ERR", ".dat")
|
||||
continue
|
||||
except Exception as e:
|
||||
print("Cannot parse video information.:{} {}".format(video_id, type(e)))
|
||||
continue
|
||||
|
||||
print(f"\n"
|
||||
f" video_id: {video_id}\n"
|
||||
f" channel: {info.get_channel_name()}\n"
|
||||
f" title: {info.get_title()}\n"
|
||||
f" output path: {path}")
|
||||
|
||||
duration = info.get_duration()
|
||||
pbar = ProgressBar(total=(duration * 1000), status_txt="Extracting")
|
||||
ex = Extractor(video_id,
|
||||
callback=pbar.disp,
|
||||
div=10)
|
||||
signal.signal(signal.SIGINT, (lambda a, b: self.cancel(ex, pbar)))
|
||||
|
||||
data = ex.extract()
|
||||
if data == []:
|
||||
continue
|
||||
pbar.reset("#", "=", total=len(data), status_txt="Rendering ")
|
||||
processor = HTMLArchiver(path, callback=pbar.disp)
|
||||
processor.process(
|
||||
[{'video_id': None,
|
||||
'timeout': 1,
|
||||
'chatdata': (action["replayChatItemAction"]["actions"][0] for action in data)}]
|
||||
)
|
||||
processor.finalize()
|
||||
pbar.reset('#', '#', status_txt='Completed ')
|
||||
pbar.close()
|
||||
print()
|
||||
if pbar.is_cancelled():
|
||||
print("\nThe extraction process has been discontinued.\n")
|
||||
except InvalidVideoIdException:
|
||||
print("Invalid Video ID or URL:", video_id)
|
||||
except NoContents as e:
|
||||
print(f"Abort:{str(e)}:[{video_id}]")
|
||||
except (JSONDecodeError, PatternUnmatchError) as e:
|
||||
print("{}:{}".format(e.msg, video_id))
|
||||
if Arguments().save_error_data:
|
||||
util.save(e.doc, "ERR_", ".dat")
|
||||
except (UnknownConnectionError, HCNetworkError, HCReadTimeout) as e:
|
||||
print(f"An unknown network error occurred during the processing of [{video_id}]. : " + str(e))
|
||||
except Exception as e:
|
||||
print(f"Abort:{str(type(e))} {str(e)[:80]}")
|
||||
finally:
|
||||
clear_tasks()
|
||||
|
||||
return
|
||||
|
||||
def cancel(self, ex=None, pbar=None) -> None:
|
||||
'''Called when keyboard interrupted has occurred.
|
||||
'''
|
||||
print("\nKeyboard interrupted.\n")
|
||||
if ex and pbar:
|
||||
ex.cancel()
|
||||
pbar.cancel()
|
||||
|
||||
|
||||
def clear_tasks():
|
||||
'''
|
||||
Clear remained tasks.
|
||||
Called when internal exception has occurred or
|
||||
after each extraction process is completed.
|
||||
'''
|
||||
async def _shutdown():
|
||||
tasks = [t for t in asyncio.all_tasks()
|
||||
if t is not asyncio.current_task()]
|
||||
for task in tasks:
|
||||
task.cancel()
|
||||
|
||||
try:
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.run_until_complete(_shutdown())
|
||||
except Exception as e:
|
||||
print(e)
|
||||
|
||||
@@ -18,7 +18,7 @@ class Arguments(metaclass=Singleton):
|
||||
VERSION: str = 'version'
|
||||
OUTPUT: str = 'output_dir'
|
||||
VIDEO_IDS: str = 'video_id'
|
||||
SAVE_ERROR_DATA: bool = 'save_error_data'
|
||||
DEBUG: bool = 'debug'
|
||||
ECHO: bool = 'echo'
|
||||
|
||||
def __init__(self,
|
||||
@@ -36,10 +36,10 @@ class Arguments(metaclass=Singleton):
|
||||
self.print_version: bool = arguments[Arguments.Name.VERSION]
|
||||
self.output: str = arguments[Arguments.Name.OUTPUT]
|
||||
self.video_ids: List[int] = []
|
||||
self.save_error_data: bool = arguments[Arguments.Name.SAVE_ERROR_DATA]
|
||||
self.debug: bool = arguments[Arguments.Name.DEBUG]
|
||||
self.echo: bool = arguments[Arguments.Name.ECHO]
|
||||
|
||||
# Videos
|
||||
|
||||
if arguments[Arguments.Name.VIDEO_IDS]:
|
||||
self.video_ids = [video_id
|
||||
for video_id in arguments[Arguments.Name.VIDEO_IDS].split(',')]
|
||||
|
||||
121
pytchat/cli/cli_extractor.py
Normal file
121
pytchat/cli/cli_extractor.py
Normal file
@@ -0,0 +1,121 @@
|
||||
import asyncio
|
||||
import os
|
||||
import signal
|
||||
import traceback
|
||||
from httpcore import ReadTimeout as HCReadTimeout, NetworkError as HCNetworkError
|
||||
from json.decoder import JSONDecodeError
|
||||
from pathlib import Path
|
||||
from .arguments import Arguments
|
||||
from .progressbar import ProgressBar
|
||||
from .. import util
|
||||
from .. exceptions import InvalidVideoIdException, NoContents, PatternUnmatchError, UnknownConnectionError
|
||||
from .. processors.html_archiver import HTMLArchiver
|
||||
from .. tool.extract.extractor import Extractor
|
||||
from .. tool.videoinfo import VideoInfo
|
||||
from .. util.extract_video_id import extract_video_id
|
||||
|
||||
|
||||
class CLIExtractor:
|
||||
|
||||
def run(self) -> None:
|
||||
ex = None
|
||||
pbar = None
|
||||
for counter, video_id in enumerate(Arguments().video_ids):
|
||||
if len(Arguments().video_ids) > 1:
|
||||
print(f"\n{'-' * 10} video:{counter + 1} of {len(Arguments().video_ids)} {'-' * 10}")
|
||||
|
||||
try:
|
||||
video_id = extract_video_id(video_id)
|
||||
separated_path = str(Path(Arguments().output)) + os.path.sep
|
||||
path = util.checkpath(separated_path + video_id + '.html')
|
||||
try:
|
||||
info = VideoInfo(video_id)
|
||||
except (PatternUnmatchError, JSONDecodeError) as e:
|
||||
print("Cannot parse video information.:{} {}".format(video_id, type(e)))
|
||||
if Arguments().debug:
|
||||
util.save(str(e.doc), "ERR", ".dat")
|
||||
continue
|
||||
except Exception as e:
|
||||
print("Cannot parse video information.:{} {}".format(video_id, type(e)))
|
||||
continue
|
||||
|
||||
print(f"\n"
|
||||
f" video_id: {video_id}\n"
|
||||
f" channel: {info.get_channel_name()}\n"
|
||||
f" title: {info.get_title()}\n"
|
||||
f" output path: {path}")
|
||||
|
||||
duration = info.get_duration()
|
||||
pbar = ProgressBar(total=(duration * 1000), status_txt="Extracting")
|
||||
ex = Extractor(video_id,
|
||||
callback=pbar.disp,
|
||||
div=10)
|
||||
signal.signal(signal.SIGINT, (lambda a, b: self.cancel(ex, pbar)))
|
||||
|
||||
data = ex.extract()
|
||||
if data == [] or data is None:
|
||||
continue
|
||||
pbar.reset("#", "=", total=1000, status_txt="Rendering ")
|
||||
processor = HTMLArchiver(path, callback=pbar.disp)
|
||||
processor.process(
|
||||
[{'video_id': None,
|
||||
'timeout': 1,
|
||||
'chatdata': (action["replayChatItemAction"]["actions"][0] for action in data)}]
|
||||
)
|
||||
processor.finalize()
|
||||
pbar.reset('#', '#', status_txt='Completed ')
|
||||
pbar.close()
|
||||
print()
|
||||
if pbar.is_cancelled():
|
||||
print("\nThe extraction process has been discontinued.\n")
|
||||
except InvalidVideoIdException:
|
||||
print("Invalid Video ID or URL:", video_id)
|
||||
except NoContents as e:
|
||||
print(f"Abort:{str(e)}:[{video_id}]")
|
||||
except (JSONDecodeError, PatternUnmatchError) as e:
|
||||
print("{}:{}".format(e.msg, video_id))
|
||||
if Arguments().debug:
|
||||
filename = util.save(e.doc, "ERR_", ".dat")
|
||||
traceback.print_exc()
|
||||
print(f"Saved error data: {filename}")
|
||||
except (UnknownConnectionError, HCNetworkError, HCReadTimeout) as e:
|
||||
if Arguments().debug:
|
||||
traceback.print_exc()
|
||||
print(f"An unknown network error occurred during the processing of [{video_id}]. : " + str(e))
|
||||
except Exception as e:
|
||||
print(f"Abort:{str(type(e))} {str(e)[:80]}")
|
||||
if Arguments().debug:
|
||||
traceback.print_exc()
|
||||
finally:
|
||||
clear_tasks()
|
||||
|
||||
return
|
||||
|
||||
def cancel(self, ex=None, pbar=None) -> None:
|
||||
'''Called when keyboard interrupted has occurred.
|
||||
'''
|
||||
print("\nKeyboard interrupted.\n")
|
||||
if ex and pbar:
|
||||
ex.cancel()
|
||||
pbar.cancel()
|
||||
|
||||
|
||||
def clear_tasks():
|
||||
'''
|
||||
Clear remained tasks.
|
||||
Called when internal exception has occurred or
|
||||
after each extraction process is completed.
|
||||
'''
|
||||
async def _shutdown():
|
||||
tasks = [t for t in asyncio.all_tasks()
|
||||
if t is not asyncio.current_task()]
|
||||
for task in tasks:
|
||||
task.cancel()
|
||||
|
||||
try:
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.run_until_complete(_shutdown())
|
||||
except Exception as e:
|
||||
print(str(e))
|
||||
if Arguments().debug:
|
||||
traceback.print_exc()
|
||||
@@ -118,13 +118,10 @@ class PytchatCore:
|
||||
except exceptions.ChatParseException as e:
|
||||
self._logger.debug(f"[{self._video_id}]{str(e)}")
|
||||
self._raise_exception(e)
|
||||
except (TypeError, json.JSONDecodeError) as e:
|
||||
except Exception as e:
|
||||
self._logger.error(f"{traceback.format_exc(limit=-1)}")
|
||||
self._raise_exception(e)
|
||||
|
||||
self._logger.debug(f"[{self._video_id}]finished fetching chat.")
|
||||
self._raise_exception(exceptions.ChatDataFinished)
|
||||
|
||||
def _get_contents(self, continuation, client, headers):
|
||||
'''Get 'continuationContents' from livechat json.
|
||||
If contents is None at first fetching,
|
||||
@@ -201,7 +198,7 @@ class PytchatCore:
|
||||
raise self._exception_holder
|
||||
|
||||
def _raise_exception(self, exception: Exception = None):
|
||||
self._is_alive = False
|
||||
self.terminate()
|
||||
if self._hold_exception is False:
|
||||
raise exception
|
||||
self._exception_holder = exception
|
||||
|
||||
@@ -186,12 +186,12 @@ class LiveChatAsync:
|
||||
except exceptions.ChatParseException as e:
|
||||
self._logger.debug(f"[{self._video_id}]{str(e)}")
|
||||
raise
|
||||
except (TypeError, json.JSONDecodeError):
|
||||
except Exception:
|
||||
self._logger.error(f"{traceback.format_exc(limit = -1)}")
|
||||
raise
|
||||
|
||||
self._logger.debug(f"[{self._video_id}] finished fetching chat.")
|
||||
raise exceptions.ChatDataFinished
|
||||
|
||||
|
||||
async def _check_pause(self, continuation):
|
||||
if self._pauser.empty():
|
||||
|
||||
@@ -179,12 +179,12 @@ class LiveChat:
|
||||
except exceptions.ChatParseException as e:
|
||||
self._logger.debug(f"[{self._video_id}]{str(e)}")
|
||||
raise
|
||||
except (TypeError, json.JSONDecodeError):
|
||||
except Exception:
|
||||
self._logger.error(f"{traceback.format_exc(limit=-1)}")
|
||||
raise
|
||||
|
||||
self._logger.debug(f"[{self._video_id}] finished fetching chat.")
|
||||
raise exceptions.ChatDataFinished
|
||||
|
||||
|
||||
def _check_pause(self, continuation):
|
||||
if self._pauser.empty():
|
||||
|
||||
@@ -112,7 +112,7 @@ class Chatdata:
|
||||
await asyncio.sleep(1 - stop_interval)
|
||||
|
||||
def json(self) -> str:
|
||||
return json.dumps([vars(a) for a in self.items], ensure_ascii=False, cls=CustomEncoder)
|
||||
return ''.join(("[", ','.join((a.json() for a in self.items)), "]"))
|
||||
|
||||
|
||||
class DefaultProcessor(ChatProcessor):
|
||||
@@ -137,7 +137,7 @@ class DefaultProcessor(ChatProcessor):
|
||||
if component is None:
|
||||
continue
|
||||
timeout += component.get('timeout', 0)
|
||||
chatdata = component.get('chatdata')
|
||||
chatdata = component.get('chatdata') # if from Extractor, chatdata is generator.
|
||||
if chatdata is None:
|
||||
continue
|
||||
for action in chatdata:
|
||||
@@ -153,7 +153,7 @@ class DefaultProcessor(ChatProcessor):
|
||||
chatlist.append(chat)
|
||||
|
||||
if self.first and chatlist:
|
||||
self.abs_diff = time.time() - chatlist[0].timestamp / 1000 + 2
|
||||
self.abs_diff = time.time() - chatlist[0].timestamp / 1000
|
||||
self.first = False
|
||||
|
||||
chatdata = Chatdata(chatlist, float(timeout), self.abs_diff)
|
||||
|
||||
@@ -7,7 +7,7 @@ from concurrent.futures import ThreadPoolExecutor
|
||||
from .chat_processor import ChatProcessor
|
||||
from .default.processor import DefaultProcessor
|
||||
from ..exceptions import UnknownConnectionError
|
||||
|
||||
import tempfile
|
||||
|
||||
PATTERN = re.compile(r"(.*)\(([0-9]+)\)$")
|
||||
|
||||
@@ -52,10 +52,11 @@ class HTMLArchiver(ChatProcessor):
|
||||
self.save_path = self._checkpath(save_path)
|
||||
self.processor = DefaultProcessor()
|
||||
self.emoji_table = {} # dict for custom emojis. key: emoji_id, value: base64 encoded image binary.
|
||||
self.header = [HEADER_HTML]
|
||||
self.body = ['<body>\n', '<table class="css">\n', self._parse_table_header(fmt_headers)]
|
||||
self.callback = callback
|
||||
self.executor = ThreadPoolExecutor(max_workers=10)
|
||||
self.tmp_fp = tempfile.NamedTemporaryFile(mode="a", encoding="utf-8", delete=False)
|
||||
self.tmp_filename = self.tmp_fp.name
|
||||
self.counter = 0
|
||||
|
||||
def _checkpath(self, filepath):
|
||||
splitter = os.path.splitext(os.path.basename(filepath))
|
||||
@@ -85,9 +86,9 @@ class HTMLArchiver(ChatProcessor):
|
||||
Count of total lines written to the file.
|
||||
"""
|
||||
if chat_components is None or len(chat_components) == 0:
|
||||
return
|
||||
return self.save_path ,self.counter
|
||||
for c in self.processor.process(chat_components).items:
|
||||
self.body.extend(
|
||||
self.tmp_fp.write(
|
||||
self._parse_html_line((
|
||||
c.datetime,
|
||||
c.elapsedTime,
|
||||
@@ -100,6 +101,8 @@ class HTMLArchiver(ChatProcessor):
|
||||
)
|
||||
if self.callback:
|
||||
self.callback(None, 1)
|
||||
self.counter += 1
|
||||
return self.save_path, self.counter
|
||||
|
||||
def _parse_html_line(self, raw_line):
|
||||
return ''.join(('<tr>',
|
||||
@@ -149,9 +152,19 @@ class HTMLArchiver(ChatProcessor):
|
||||
'</style>\n'))
|
||||
|
||||
def finalize(self):
|
||||
self.executor.shutdown()
|
||||
self.header.extend([self._create_styles(), '</head>\n'])
|
||||
self.body.extend(['</table>\n</body>\n</html>'])
|
||||
with open(self.save_path, mode='a', encoding='utf-8') as f:
|
||||
f.writelines(self.header)
|
||||
f.writelines(self.body)
|
||||
if self.tmp_fp:
|
||||
self.tmp_fp.flush()
|
||||
self.tmp_fp = None
|
||||
with open(self.save_path, mode='w', encoding='utf-8') as outfile:
|
||||
# write header
|
||||
outfile.writelines((
|
||||
HEADER_HTML, self._create_styles(), '</head>\n',
|
||||
'<body>\n', '<table class="css">\n',
|
||||
self._parse_table_header(fmt_headers)))
|
||||
# write body
|
||||
fp = open(self.tmp_filename, mode="r", encoding="utf-8")
|
||||
for line in fp:
|
||||
outfile.write(line)
|
||||
outfile.write('</table>\n</body>\n</html>')
|
||||
fp.close()
|
||||
os.remove(self.tmp_filename)
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
from typing import Generator
|
||||
from . import asyncdl
|
||||
from . import duplcheck
|
||||
from .. videoinfo import VideoInfo
|
||||
@@ -60,11 +61,10 @@ class Extractor:
|
||||
self.blocks = duplcheck.remove_duplicate_tail(self.blocks)
|
||||
return self
|
||||
|
||||
def _combine(self):
|
||||
ret = []
|
||||
def _get_chatdata(self) -> Generator:
|
||||
for block in self.blocks:
|
||||
ret.extend(block.chat_data)
|
||||
return ret
|
||||
for chatdata in block.chat_data:
|
||||
yield chatdata
|
||||
|
||||
def _execute_extract_operations(self):
|
||||
return (
|
||||
@@ -74,7 +74,7 @@ class Extractor:
|
||||
._remove_overlap()
|
||||
._download_blocks()
|
||||
._remove_duplicate_tail()
|
||||
._combine()
|
||||
._get_chatdata()
|
||||
)
|
||||
|
||||
def extract(self):
|
||||
|
||||
@@ -16,10 +16,11 @@ def extract(url):
|
||||
json.dump(html.json(), f, ensure_ascii=False)
|
||||
|
||||
|
||||
def save(data, filename, extention):
|
||||
with open(filename + "_" + (datetime.datetime.now().strftime('%Y-%m-%d %H-%M-%S')) + extention,
|
||||
mode='w', encoding='utf-8') as f:
|
||||
def save(data, filename, extention) -> str:
|
||||
save_filename = filename + "_" + (datetime.datetime.now().strftime('%Y-%m-%d %H-%M-%S')) + extention
|
||||
with open(save_filename ,mode='w', encoding='utf-8') as f:
|
||||
f.writelines(data)
|
||||
return save_filename
|
||||
|
||||
|
||||
def checkpath(filepath):
|
||||
|
||||
1
tests/testdata/default/jsonified_item.json
vendored
Normal file
1
tests/testdata/default/jsonified_item.json
vendored
Normal file
@@ -0,0 +1 @@
|
||||
{"author": {"badgeUrl": "", "type": "", "isVerified": false, "isChatOwner": false, "isChatSponsor": false, "isChatModerator": false, "channelId": "author_channel_id", "channelUrl": "http://www.youtube.com/channel/author_channel_id", "name": "author_name", "imageUrl": "https://yt3.ggpht.com/------------/AAAAAAAAAAA/AAAAAAAAAAA/xxxxxxxxxxxx/s64-x-x-xx-xx-xx-c0xffffff/photo.jpg"}, "type": "superChat", "id": "dummy_id", "timestamp": 1570678496000, "elapsedTime": "", "datetime": "2019-10-10 12:34:56", "message": "dummy_message", "messageEx": ["dummy_message"], "amountValue": 800.0, "amountString": "¥800", "currency": "JPY", "bgColor": 4280150454, "colors": {"headerBackgroundColor": 4278239141, "headerTextColor": 4278190080, "bodyBackgroundColor": 4280150454, "bodyTextColor": 4278190080, "timestampColor": 2147483648, "authorNameTextColor": 2315255808}}
|
||||
1
tests/testdata/default/jsonified_list.json
vendored
Normal file
1
tests/testdata/default/jsonified_list.json
vendored
Normal file
@@ -0,0 +1 @@
|
||||
[{"author": {"badgeUrl": "", "type": "", "isVerified": false, "isChatOwner": false, "isChatSponsor": false, "isChatModerator": false, "channelId": "author_channel_id", "channelUrl": "http://www.youtube.com/channel/author_channel_id", "name": "author_name", "imageUrl": "https://yt3.ggpht.com/------------/AAAAAAAAAAA/AAAAAAAAAAA/xxxxxxxxxxxx/s64-x-x-xx-xx-xx-c0xffffff/photo.jpg"}, "type": "superChat", "id": "dummy_id", "timestamp": 1570678496000, "elapsedTime": "", "datetime": "2019-10-10 12:34:56", "message": "dummy_message", "messageEx": ["dummy_message"], "amountValue": 800.0, "amountString": "¥800", "currency": "JPY", "bgColor": 4280150454, "colors": {"headerBackgroundColor": 4278239141, "headerTextColor": 4278190080, "bodyBackgroundColor": 4280150454, "bodyTextColor": 4278190080, "timestampColor": 2147483648, "authorNameTextColor": 2315255808}}]
|
||||
Reference in New Issue
Block a user