Move functions

This commit is contained in:
taizan-hokouto
2020-12-05 14:39:55 +09:00
parent 4db9486853
commit bc3f16e86b
8 changed files with 80 additions and 76 deletions

View File

@@ -5,17 +5,16 @@ import json
import signal
import time
import traceback
import urllib.parse
from asyncio import Queue
from concurrent.futures import CancelledError
from .buffer import Buffer
from ..parser.live import Parser
from .. import config
from .. import exceptions
from .. import util
from ..paramgen import liveparam, arcparam
from ..processors.default.processor import DefaultProcessor
from ..processors.combinator import Combinator
from ..util.extract_video_id import extract_video_id
headers = config.headers
MAX_RETRY = 10
@@ -84,7 +83,7 @@ class LiveChatAsync:
topchat_only=False,
logger=config.logger(__name__),
):
self._video_id = extract_video_id(video_id)
self._video_id = util.extract_video_id(video_id)
self.seektime = seektime
if isinstance(processor, tuple):
self.processor = Combinator(processor)
@@ -101,8 +100,10 @@ class LiveChatAsync:
self._pauser = Queue()
self._pauser.put_nowait(None)
self._first_fetch = True
self._fetch_url = "live_chat/get_live_chat?continuation="
self._fetch_url = config._sml
self._topchat_only = topchat_only
self._dat = ''
self._last_offset_ms = 0
self._logger = logger
self.exception = None
LiveChatAsync._logger = logger
@@ -160,10 +161,8 @@ class LiveChatAsync:
async with httpx.AsyncClient(http2=True) as client:
while(continuation and self._is_alive):
continuation = await self._check_pause(continuation)
contents = await self._get_contents(
continuation, client, headers)
contents = await self._get_contents(continuation, client, headers)
metadata, chatdata = self._parser.parse(contents)
timeout = metadata['timeoutMs'] / 1000
chat_component = {
"video_id": self._video_id,
@@ -183,16 +182,16 @@ class LiveChatAsync:
diff_time = timeout - (time.time() - time_mark)
await asyncio.sleep(diff_time)
continuation = metadata.get('continuation')
self._last_offset_ms = metadata.get('last_offset_ms', 0)
except exceptions.ChatParseException as e:
self._logger.debug(f"[{self._video_id}]{str(e)}")
raise
except Exception:
self._logger.error(f"{traceback.format_exc(limit = -1)}")
self._logger.error(f"{traceback.format_exc(limit=-1)}")
raise
self._logger.debug(f"[{self._video_id}] finished fetching chat.")
async def _check_pause(self, continuation):
if self._pauser.empty():
'''pause'''
@@ -215,46 +214,50 @@ class LiveChatAsync:
-------
'continuationContents' which includes metadata & chatdata.
'''
livechat_json = await self._get_livechat_json(continuation, client, headers)
contents = self._parser.get_contents(livechat_json)
livechat_json = await self._get_livechat_json(continuation, client, replay=self._is_replay, offset_ms=self._last_offset_ms)
contents, dat = self._parser.get_contents(livechat_json)
if self._dat == '' and dat:
self._dat = dat
if self._first_fetch:
if contents is None or self._is_replay:
'''Try to fetch archive chat data.'''
self._parser.is_replay = True
self._fetch_url = "live_chat_replay/get_live_chat_replay?continuation="
self._fetch_url = config._smr
continuation = arcparam.getparam(
self._video_id, self.seektime, self._topchat_only)
livechat_json = (await self._get_livechat_json(
continuation, client, headers))
continuation, client, replay=True, offset_ms=self.seektime * 1000))
reload_continuation = self._parser.reload_continuation(
self._parser.get_contents(livechat_json))
self._parser.get_contents(livechat_json)[0])
if reload_continuation:
livechat_json = (await self._get_livechat_json(
reload_continuation, client, headers))
contents = self._parser.get_contents(livechat_json)
contents, _ = self._parser.get_contents(livechat_json)
self._is_replay = True
self._first_fetch = False
return contents
async def _get_livechat_json(self, continuation, client, headers):
async def _get_livechat_json(self, continuation, client, replay: bool, offset_ms: int = 0):
'''
Get json which includes chat data.
'''
continuation = urllib.parse.quote(continuation)
# continuation = urllib.parse.quote(continuation)
livechat_json = None
url = f"https://www.youtube.com/{self._fetch_url}{continuation}&pbj=1"
if offset_ms < 0:
offset_ms = 0
param = util.get_param(continuation, dat=self._dat, replay=replay, offsetms=offset_ms)
for _ in range(MAX_RETRY + 1):
try:
resp = await client.get(url, headers=headers)
resp = await client.post(self._fetch_url, json=param)
livechat_json = resp.json()
break
except (json.JSONDecodeError, httpx.HTTPError):
await asyncio.sleep(1)
await asyncio.sleep(2)
continue
else:
self._logger.error(f"[{self._video_id}]"
f"Exceeded retry count.")
return None
raise exceptions.RetryExceedMaxCount()
return livechat_json
async def _callback_loop(self, callback):
@@ -330,9 +333,6 @@ class LiveChatAsync:
self.terminate()
def _task_finished(self):
'''
Terminate fetching chats.
'''
if self.is_alive():
self.terminate()
try: