Fix process

This commit is contained in:
taizan-hokouto
2020-12-05 14:42:02 +09:00
parent bc3f16e86b
commit 02d48ceccc
8 changed files with 126 additions and 61 deletions

View File

@@ -1,6 +1,8 @@
import asyncio
import httpx
import socket
from concurrent.futures import CancelledError
from json import JSONDecodeError
from . import parser
from . block import Block
from . worker import ExtractWorker
@@ -8,18 +10,17 @@ from . patch import Patch
from ... import config
from ... paramgen import arcparam
from ... exceptions import UnknownConnectionError
from concurrent.futures import CancelledError
from json import JSONDecodeError
from urllib.parse import quote
from ... util import get_param
headers = config.headers
REPLAY_URL = "https://www.youtube.com/live_chat_replay/" \
"get_live_chat_replay?continuation="
smr = config._smr
MAX_RETRY_COUNT = 3
# Set to avoid duplicate parameters
param_set = set()
aquired_params = set()
dat = ''
def _split(start, end, count, min_interval_sec=120):
@@ -55,28 +56,30 @@ def _split(start, end, count, min_interval_sec=120):
def ready_blocks(video_id, duration, div, callback):
param_set.clear()
aquired_params.clear()
if div <= 0:
raise ValueError
async def _get_blocks(video_id, duration, div, callback):
async with httpx.AsyncClient(http2=True) as session:
async with httpx.AsyncClient(http2=True, headers=headers) as session:
tasks = [_create_block(session, video_id, seektime, callback)
for seektime in _split(-1, duration, div)]
return await asyncio.gather(*tasks)
async def _create_block(session, video_id, seektime, callback):
continuation = arcparam.getparam(video_id, seektime=seektime)
url = f"{REPLAY_URL}{quote(continuation)}&pbj=1"
err = None
last_offset = 0
global dat
for _ in range(MAX_RETRY_COUNT):
try:
if continuation in param_set:
if continuation in aquired_params:
next_continuation, actions = None, []
break
param_set.add(continuation)
resp = await session.get(url, headers=headers, timeout=10)
next_continuation, actions = parser.parse(resp.json())
aquired_params.add(continuation)
param = get_param(continuation, replay=True, offsetms=seektime * 1000, dat=dat)
resp = await session.post(smr, json=param, timeout=10)
next_continuation, actions, last_offset, dat = parser.parse(resp.json())
break
except JSONDecodeError:
await asyncio.sleep(3)
@@ -88,15 +91,14 @@ def ready_blocks(video_id, duration, div, callback):
raise UnknownConnectionError("Abort:" + str(err))
if actions:
first = parser.get_offset(actions[0])
last = parser.get_offset(actions[-1])
first_offset = parser.get_offset(actions[0])
if callback:
callback(actions, last - first)
callback(actions, last_offset - first_offset)
return Block(
continuation=next_continuation,
chat_data=actions,
first=first,
last=last
first=first_offset,
last=last_offset
)
"""
@@ -122,17 +124,19 @@ def fetch_patch(callback, blocks, video_id):
tasks = [worker.run(session) for worker in workers]
return await asyncio.gather(*tasks)
async def _fetch(continuation, session) -> Patch:
url = f"{REPLAY_URL}{quote(continuation)}&pbj=1"
async def _fetch(continuation, last_offset, session=None) -> Patch:
global dat
err = None
for _ in range(MAX_RETRY_COUNT):
try:
if continuation in param_set:
if continuation in aquired_params:
continuation, actions = None, []
break
param_set.add(continuation)
resp = await session.get(url, headers=config.headers)
continuation, actions = parser.parse(resp.json())
aquired_params.add(continuation)
params = get_param(continuation, replay=True, offsetms=last_offset, dat=dat)
# util.save(json.dumps(params, ensure_ascii=False), "v:/~~/param_"+str(last_offset), ".json")
resp = await session.post(smr, json=params)
continuation, actions, last_offset, dat = parser.parse(resp.json())
break
except JSONDecodeError:
await asyncio.sleep(3)
@@ -147,7 +151,7 @@ def fetch_patch(callback, blocks, video_id):
raise UnknownConnectionError("Abort:" + str(err))
if actions:
last = parser.get_offset(actions[-1])
last = last_offset
first = parser.get_offset(actions[0])
if callback:
callback(actions, last - first)