Divide download module

This commit is contained in:
taizan-hokuto
2020-02-02 00:38:22 +09:00
parent 04aedc82e8
commit f1d8393971
4 changed files with 122 additions and 105 deletions

100
pytchat/tool/asyncdl.py Normal file
View File

@@ -0,0 +1,100 @@
import aiohttp
import asyncio
import json
from . import parser
from . block import Block
from . dlworker import DownloadWorker
from .. paramgen import arcparam
from .. import config
from urllib.parse import quote
headers = config.headers
REPLAY_URL = "https://www.youtube.com/live_chat_replay/" \
"get_live_chat_replay?continuation="
def ready_blocks(video_id, duration, div, callback):
if div <= 0: raise ValueError
def _divide(start, end, count):
min_interval = 120
if (not isinstance(start,int) or
not isinstance(end,int) or
not isinstance(count,int)):
raise ValueError("start/end/count must be int")
if start>end:
raise ValueError("end must be equal to or greater than start.")
if count<1:
raise ValueError("count must be equal to or greater than 1.")
if (end-start)/count < min_interval:
count = int((end-start)/min_interval)
if count == 0 : count = 1
interval= (end-start)/count
if count == 1:
return [start]
return sorted(list(set([int(start+interval*j)
for j in range(count) ])))
async def _get_blocks( video_id, duration, div, callback):
async with aiohttp.ClientSession() as session:
futures = [_create_block(session, video_id, pos, seektime, callback)
for pos, seektime in enumerate(_divide(-1, duration, div))]
return await asyncio.gather(*futures,return_exceptions=True)
async def _create_block(session, video_id, pos, seektime, callback):
continuation = arcparam.getparam(
video_id, seektime = seektime)
url = f"{REPLAY_URL}{quote(continuation)}&pbj=1"
async with session.get(url, headers = headers) as resp:
text = await resp.text()
next_continuation, actions = parser.parse(json.loads(text))
if actions:
first = parser.get_offset(actions[0])
last = parser.get_offset(actions[-1])
if callback:
callback(actions,last-first)
return Block(
pos = pos,
continuation = next_continuation,
chat_data = actions,
first = first,
last = last
)
loop = asyncio.get_event_loop()
result = loop.run_until_complete(
_get_blocks(video_id, duration, div, callback))
return result
def download_chunk(callback, blocks):
async def _dl_distribute():
workers = [
DownloadWorker(
fetch = _fetch,
block = block
)
for block in blocks
]
async with aiohttp.ClientSession() as session:
tasks = [worker.run(session) for worker in workers]
return await asyncio.gather(*tasks,return_exceptions=True)
async def _fetch(continuation,session):
url = f"{REPLAY_URL}{quote(continuation)}&pbj=1"
async with session.get(url,headers = config.headers) as resp:
text = await resp.text()
continuation, actions = parser.parse(json.loads(text))
if actions:
last = parser.get_offset(actions[-1])
first = parser.get_offset(actions[0])
if callback:
callback(actions,last-first)
return actions,continuation,last
return continuation, [], None
loop = asyncio.get_event_loop()
loop.run_until_complete(
_dl_distribute())

9
pytchat/tool/block.py Normal file
View File

@@ -0,0 +1,9 @@
class Block:
def __init__(self, pos=0, first=0, last=0,
continuation='', chat_data=[]):
self.pos = pos
self.first = first
self.last = last
self.temp_last = 0
self.continuation = continuation
self.chat_data = chat_data

View File

@@ -1,11 +1,10 @@
from . import parser from . import parser
class DownloadWorker: class DownloadWorker:
def __init__(self, dl, block, blocklist): def __init__(self, fetch, block):
self.block = block self.block = block
self.blocklist = blocklist self.fetch = fetch
self.dl = dl
async def run(self,session): async def run(self,session):
temp_last = self.block.temp_last temp_last = self.block.temp_last
self.block.chat_data, continuation = self.cut( self.block.chat_data, continuation = self.cut(
@@ -14,7 +13,7 @@ class DownloadWorker:
self.block.last, self.block.last,
temp_last ) temp_last )
while continuation: while continuation:
data, cont, fetched_last = await self.dl(continuation, session) data, cont, fetched_last = await self.fetch(continuation, session)
data, continuation = self.cut(data, cont, fetched_last, temp_last) data, continuation = self.cut(data, cont, fetched_last, temp_last)
self.block.chat_data.extend(data) self.block.chat_data.extend(data)

View File

@@ -3,92 +3,29 @@ import aiohttp
import json import json
import traceback import traceback
from urllib.parse import quote from urllib.parse import quote
from . import asyncdl
from . import parser from . import parser
from . import videoinfo from . import videoinfo
from . dlworker import DownloadWorker from . block import Block
from . duplcheck import duplicate_head, duplicate_tail, overwrap from . duplcheck import duplicate_head, duplicate_tail, overwrap
from .. import config from .. import config
from .. import util from .. exceptions import InvalidVideoIdException
from .. paramgen import arcparam from .. paramgen import arcparam
from ..exceptions import InvalidVideoIdException
logger = config.logger(__name__) logger = config.logger(__name__)
headers=config.headers headers=config.headers
REPLAY_URL = "https://www.youtube.com/live_chat_replay/" \
"get_live_chat_replay?continuation="
class Block:
def __init__(self, pos=0, first=0, last=0,
continuation='', chat_data=[]):
self.pos = pos
self.first = first
self.last = last
self.temp_last = 0
self.continuation = continuation
self.chat_data = chat_data
class Downloader: class Downloader:
def __init__(self, video_id, duration, div, callback=None): def __init__(self, video_id, duration, div, callback):
self.video_id = video_id self.video_id = video_id
self.duration = duration self.duration = duration
self.div = div self.div = div
self.blocks = [] self.blocks = []
self.callback = callback self.callback = callback
def ready_blocks(self): def ready_blocks(self):
if self.div <= 0: raise ValueError result = asyncdl.ready_blocks(
self.video_id, self.duration, self.div, self.callback)
def _divide(start, end, count):
min_interval = 120
if (not isinstance(start,int) or
not isinstance(end,int) or
not isinstance(count,int)):
raise ValueError("start/end/count must be int")
if start>end:
raise ValueError("end must be equal to or greater than start.")
if count<1:
raise ValueError("count must be equal to or greater than 1.")
if (end-start)/count < min_interval:
count = int((end-start)/min_interval)
if count == 0 : count = 1
interval= (end-start)/count
if count == 1:
return [start]
return sorted(list(set([int(start+interval*j)
for j in range(count) ])))
async def _get_blocks(duration,div):
async with aiohttp.ClientSession() as session:
futures = [_create_block(session, pos, seektime)
for pos, seektime in enumerate(_divide(-1, duration, div))]
return await asyncio.gather(*futures,return_exceptions=True)
async def _create_block(session, pos, seektime):
continuation = arcparam.getparam(
self.video_id, seektime = seektime)
url = f"{REPLAY_URL}{quote(continuation)}&pbj=1"
async with session.get(url, headers = headers) as resp:
text = await resp.text()
next_continuation, actions = parser.parse(json.loads(text))
if actions:
first = parser.get_offset(actions[0])
last = parser.get_offset(actions[-1])
if self.callback:
self.callback(actions,last-first)
return Block(
pos = pos,
continuation = next_continuation,
chat_data = actions,
first = first,
last = last
)
loop = asyncio.get_event_loop()
result = loop.run_until_complete(
_get_blocks(self.duration, self.div))
self.blocks = [block for block in result if block] self.blocks = [block for block in result if block]
return self return self
@@ -107,36 +44,9 @@ class Downloader:
return self return self
def download_blocks(self): def download_blocks(self):
loop = asyncio.get_event_loop() asyncdl.download_chunk(self.callback, self.blocks)
loop.run_until_complete(self._dl_distribute())
return self return self
async def _dl_distribute(self):
workers = [
DownloadWorker(
dl=self.dl_func,
block = block,
blocklist= self.blocks
)
for pos,block in enumerate(self.blocks)
]
async with aiohttp.ClientSession() as session:
tasks = [worker.run(session) for worker in workers]
return await asyncio.gather(*tasks,return_exceptions=True)
async def dl_func(self,continuation,session):
url = f"{REPLAY_URL}{quote(continuation)}&pbj=1"
async with session.get(url,headers = config.headers) as resp:
text = await resp.text()
continuation, actions = parser.parse(json.loads(text))
if actions:
last = parser.get_offset(actions[-1])
first = parser.get_offset(actions[0])
if self.callback:
self.callback(actions,last-first)
return actions,continuation,last
return continuation, [], None
def remove_duplicate_tail(self): def remove_duplicate_tail(self):
self.blocks = duplicate_tail(self.blocks) self.blocks = duplicate_tail(self.blocks)
return self return self
@@ -157,7 +67,6 @@ class Downloader:
.remove_duplicate_tail() .remove_duplicate_tail()
.combine() .combine()
) )
def download(video_id, div = 20, callback=None, processor = None): def download(video_id, div = 20, callback=None, processor = None):
duration = 0 duration = 0
@@ -168,4 +77,4 @@ def download(video_id, div = 20, callback=None, processor = None):
if duration == 0: if duration == 0:
print("video is live.") print("video is live.")
return return
return Downloader(video_id, duration, div, callback).download() return Downloader(video_id, duration, div, callback).download()