Use httpx

This commit is contained in:
taizan-hokuto
2020-08-30 22:16:58 +09:00
parent 8012e1d191
commit 95f975c93d
20 changed files with 259 additions and 354 deletions

View File

@@ -1,9 +1,10 @@
import logging
from . import mylogger
headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.106 Safari/537.36'}
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.135 Safari/537.36',
}
def logger(module_name: str, loglevel=None):
def logger(module_name: str, loglevel=logging.DEBUG):
module_logger = mylogger.get_logger(module_name, loglevel=loglevel)
return module_logger

View File

@@ -1,13 +1,13 @@
import aiohttp
import asyncio
import httpx
import json
import signal
import time
import traceback
import urllib.parse
from aiohttp.client_exceptions import ClientConnectorError
from concurrent.futures import CancelledError
from asyncio import Queue
from concurrent.futures import CancelledError
from .buffer import Buffer
from ..parser.live import Parser
from .. import config
@@ -22,7 +22,7 @@ MAX_RETRY = 10
class LiveChatAsync:
'''asyncio(aiohttp)を利用してYouTubeのライブ配信のチャットデータを取得する。
'''asyncioを利用してYouTubeのライブ配信のチャットデータを取得する。
Parameter
---------
@@ -161,11 +161,11 @@ class LiveChatAsync:
parameter for next chat data
'''
try:
async with aiohttp.ClientSession() as session:
async with httpx.AsyncClient(http2=True) as client:
while(continuation and self._is_alive):
continuation = await self._check_pause(continuation)
contents = await self._get_contents(
continuation, session, headers)
continuation, client, headers)
metadata, chatdata = self._parser.parse(contents)
timeout = metadata['timeoutMs'] / 1000
@@ -210,7 +210,7 @@ class LiveChatAsync:
self._video_id, 3, self._topchat_only)
return continuation
async def _get_contents(self, continuation, session, headers):
async def _get_contents(self, continuation, client, headers):
'''Get 'continuationContents' from livechat json.
If contents is None at first fetching,
try to fetch archive chat data.
@@ -219,7 +219,7 @@ class LiveChatAsync:
-------
'continuationContents' which includes metadata & chatdata.
'''
livechat_json = await self._get_livechat_json(continuation, session, headers)
livechat_json = await self._get_livechat_json(continuation, client, headers)
contents = self._parser.get_contents(livechat_json)
if self._first_fetch:
if contents is None or self._is_replay:
@@ -229,18 +229,18 @@ class LiveChatAsync:
continuation = arcparam.getparam(
self._video_id, self.seektime, self._topchat_only)
livechat_json = (await self._get_livechat_json(
continuation, session, headers))
continuation, client, headers))
reload_continuation = self._parser.reload_continuation(
self._parser.get_contents(livechat_json))
if reload_continuation:
livechat_json = (await self._get_livechat_json(
reload_continuation, session, headers))
reload_continuation, client, headers))
contents = self._parser.get_contents(livechat_json)
self._is_replay = True
self._first_fetch = False
return contents
async def _get_livechat_json(self, continuation, session, headers):
async def _get_livechat_json(self, continuation, client, headers):
'''
Get json which includes chat data.
'''
@@ -249,14 +249,13 @@ class LiveChatAsync:
status_code = 0
url = f"https://www.youtube.com/{self._fetch_url}{continuation}&pbj=1"
for _ in range(MAX_RETRY + 1):
async with session.get(url, headers=headers) as resp:
try:
text = await resp.text()
livechat_json = json.loads(text)
break
except (ClientConnectorError, json.JSONDecodeError):
await asyncio.sleep(1)
continue
try:
resp = await client.get(url, headers=headers)
livechat_json = resp.json()
break
except (httpx.HTTPError, json.JSONDecodeError):
await asyncio.sleep(1)
continue
else:
self._logger.error(f"[{self._video_id}]"
f"Exceeded retry count. status_code={status_code}")

View File

@@ -1,4 +1,4 @@
import requests
import httpx
import json
import signal
import time
@@ -153,10 +153,10 @@ class LiveChat:
parameter for next chat data
'''
try:
with requests.Session() as session:
with httpx.Client(http2=True) as client:
while(continuation and self._is_alive):
continuation = self._check_pause(continuation)
contents = self._get_contents(continuation, session, headers)
contents = self._get_contents(continuation, client, headers)
metadata, chatdata = self._parser.parse(contents)
timeout = metadata['timeoutMs'] / 1000
chat_component = {
@@ -199,7 +199,7 @@ class LiveChat:
continuation = liveparam.getparam(self._video_id, 3)
return continuation
def _get_contents(self, continuation, session, headers):
def _get_contents(self, continuation, client, headers):
'''Get 'continuationContents' from livechat json.
If contents is None at first fetching,
try to fetch archive chat data.
@@ -209,7 +209,7 @@ class LiveChat:
'continuationContents' which includes metadata & chat data.
'''
livechat_json = (
self._get_livechat_json(continuation, session, headers)
self._get_livechat_json(continuation, client, headers)
)
contents = self._parser.get_contents(livechat_json)
if self._first_fetch:
@@ -219,18 +219,18 @@ class LiveChat:
self._fetch_url = "live_chat_replay/get_live_chat_replay?continuation="
continuation = arcparam.getparam(
self._video_id, self.seektime, self._topchat_only)
livechat_json = (self._get_livechat_json(continuation, session, headers))
livechat_json = (self._get_livechat_json(continuation, client, headers))
reload_continuation = self._parser.reload_continuation(
self._parser.get_contents(livechat_json))
if reload_continuation:
livechat_json = (self._get_livechat_json(
reload_continuation, session, headers))
reload_continuation, client, headers))
contents = self._parser.get_contents(livechat_json)
self._is_replay = True
self._first_fetch = False
return contents
def _get_livechat_json(self, continuation, session, headers):
def _get_livechat_json(self, continuation, client, headers):
'''
Get json which includes chat data.
'''
@@ -239,10 +239,9 @@ class LiveChat:
status_code = 0
url = f"https://www.youtube.com/{self._fetch_url}{continuation}&pbj=1"
for _ in range(MAX_RETRY + 1):
with session.get(url, headers=headers) as resp:
with client:
try:
text = resp.text
livechat_json = json.loads(text)
livechat_json = client.get(url, headers=headers).json()
break
except json.JSONDecodeError:
time.sleep(1)

View File

@@ -1,6 +1,6 @@
import os
import re
import requests
import httpx
from base64 import standard_b64encode
from .chat_processor import ChatProcessor
from .default.processor import DefaultProcessor
@@ -108,7 +108,7 @@ class HTMLArchiver(ChatProcessor):
for item in message_items)
def _encode_img(self, url):
resp = requests.get(url)
resp = httpx.get(url)
return standard_b64encode(resp.content).decode()
def _set_emoji_table(self, item: dict):

View File

@@ -1,6 +1,5 @@
import aiohttp
import httpx
import asyncio
import json
from . import parser
from . block import Block
from . worker import ExtractWorker
@@ -55,7 +54,7 @@ def ready_blocks(video_id, duration, div, callback):
raise ValueError
async def _get_blocks(video_id, duration, div, callback):
async with aiohttp.ClientSession() as session:
async with httpx.AsyncClient(http2=True) as session:
tasks = [_create_block(session, video_id, seektime, callback)
for seektime in _split(-1, duration, div)]
return await asyncio.gather(*tasks)
@@ -65,9 +64,8 @@ def ready_blocks(video_id, duration, div, callback):
url = f"{REPLAY_URL}{quote(continuation)}&pbj=1"
for _ in range(MAX_RETRY_COUNT):
try:
async with session.get(url, headers=headers) as resp:
text = await resp.text()
next_continuation, actions = parser.parse(json.loads(text))
resp = await session.get(url, headers=headers)
next_continuation, actions = parser.parse(resp.json())
break
except JSONDecodeError:
await asyncio.sleep(3)
@@ -106,7 +104,7 @@ def fetch_patch(callback, blocks, video_id):
)
for block in blocks
]
async with aiohttp.ClientSession() as session:
async with httpx.AsyncClient() as session:
tasks = [worker.run(session) for worker in workers]
return await asyncio.gather(*tasks)
@@ -114,9 +112,8 @@ def fetch_patch(callback, blocks, video_id):
url = f"{REPLAY_URL}{quote(continuation)}&pbj=1"
for _ in range(MAX_RETRY_COUNT):
try:
async with session.get(url, headers=config.headers) as resp:
chat_json = await resp.text()
continuation, actions = parser.parse(json.loads(chat_json))
resp = await session.get(url, headers=config.headers)
continuation, actions = parser.parse(resp.json())
break
except JSONDecodeError:
await asyncio.sleep(3)

View File

@@ -1,6 +1,7 @@
from . block import Block
from . patch import fill, split
from ... paramgen import arcparam
from typing import Tuple
class ExtractWorker:
@@ -76,7 +77,7 @@ def _search_new_block(worker) -> Block:
return new_block
def _get_undone_block(blocks) -> (int, Block):
def _get_undone_block(blocks) -> Tuple[int, Block]:
min_interval_ms = 120000
max_remaining = 0
undone_block = None

View File

@@ -1,12 +1,12 @@
import aiohttp
import httpx
import asyncio
import json
from . import parser
from . block import Block
from . worker import ExtractWorker
from . patch import Patch
from ... import config
from ... import config
from ... paramgen import arcparam_mining as arcparam
from concurrent.futures import CancelledError
from urllib.parse import quote
@@ -14,10 +14,12 @@ from urllib.parse import quote
headers = config.headers
REPLAY_URL = "https://www.youtube.com/live_chat_replay?continuation="
INTERVAL = 1
def _split(start, end, count, min_interval_sec = 120):
def _split(start, end, count, min_interval_sec=120):
"""
Split section from `start` to `end` into `count` pieces,
and returns the beginning of each piece.
and returns the beginning of each piece.
The `count` is adjusted so that the length of each piece
is no smaller than `min_interval`.
@@ -25,42 +27,43 @@ def _split(start, end, count, min_interval_sec = 120):
--------
List of the offset of each block's first chat data.
"""
if not (isinstance(start,int) or isinstance(start,float)) or \
not (isinstance(end,int) or isinstance(end,float)):
if not (isinstance(start, int) or isinstance(start, float)) or \
not (isinstance(end, int) or isinstance(end, float)):
raise ValueError("start/end must be int or float")
if not isinstance(count,int):
if not isinstance(count, int):
raise ValueError("count must be int")
if start>end:
if start > end:
raise ValueError("end must be equal to or greater than start.")
if count<1:
if count < 1:
raise ValueError("count must be equal to or greater than 1.")
if (end-start)/count < min_interval_sec:
count = int((end-start)/min_interval_sec)
if count == 0 : count = 1
interval= (end-start)/count
if (end - start) / count < min_interval_sec:
count = int((end - start) / min_interval_sec)
if count == 0:
count = 1
interval = (end - start) / count
if count == 1:
return [start]
return sorted( list(set( [int(start + interval*j)
for j in range(count) ])))
return sorted(list(set([int(start + interval * j)
for j in range(count)])))
def ready_blocks(video_id, duration, div, callback):
if div <= 0: raise ValueError
if div <= 0:
raise ValueError
async def _get_blocks( video_id, duration, div, callback):
async with aiohttp.ClientSession() as session:
tasks = [_create_block(session, video_id, seektime, callback)
for seektime in _split(0, duration, div)]
async def _get_blocks(video_id, duration, div, callback):
async with httpx.ClientSession() as session:
tasks = [_create_block(session, video_id, seektime, callback)
for seektime in _split(0, duration, div)]
return await asyncio.gather(*tasks)
async def _create_block(session, video_id, seektime, callback):
continuation = arcparam.getparam(video_id, seektime = seektime)
url=(f"{REPLAY_URL}{quote(continuation)}&playerOffsetMs="
f"{int(seektime*1000)}&hidden=false&pbj=1")
async with session.get(url, headers = headers) as resp:
continuation = arcparam.getparam(video_id, seektime=seektime)
url = (f"{REPLAY_URL}{quote(continuation)}&playerOffsetMs="
f"{int(seektime*1000)}&hidden=false&pbj=1")
async with session.get(url, headers=headers) as resp:
chat_json = await resp.text()
if chat_json is None:
return
@@ -70,39 +73,40 @@ def ready_blocks(video_id, duration, div, callback):
if callback:
callback(actions, INTERVAL)
return Block(
continuation = continuation,
chat_data = actions,
first = first,
last = seektime,
seektime = seektime
continuation=continuation,
chat_data=actions,
first=first,
last=seektime,
seektime=seektime
)
"""
fetch initial blocks.
"""
"""
loop = asyncio.get_event_loop()
blocks = loop.run_until_complete(
_get_blocks(video_id, duration, div, callback))
return blocks
def fetch_patch(callback, blocks, video_id):
async def _allocate_workers():
workers = [
ExtractWorker(
fetch = _fetch, block = block,
blocks = blocks, video_id = video_id
fetch=_fetch, block=block,
blocks=blocks, video_id=video_id
)
for block in blocks
]
async with aiohttp.ClientSession() as session:
async with httpx.ClientSession() as session:
tasks = [worker.run(session) for worker in workers]
return await asyncio.gather(*tasks)
return await asyncio.gather(*tasks)
async def _fetch(seektime,session) -> Patch:
continuation = arcparam.getparam(video_id, seektime = seektime)
url=(f"{REPLAY_URL}{quote(continuation)}&playerOffsetMs="
f"{int(seektime*1000)}&hidden=false&pbj=1")
async with session.get(url,headers = config.headers) as resp:
async def _fetch(seektime, session) -> Patch:
continuation = arcparam.getparam(video_id, seektime=seektime)
url = (f"{REPLAY_URL}{quote(continuation)}&playerOffsetMs="
f"{int(seektime*1000)}&hidden=false&pbj=1")
async with session.get(url, headers=config.headers) as resp:
chat_json = await resp.text()
actions = []
try:
@@ -113,21 +117,22 @@ def fetch_patch(callback, blocks, video_id):
pass
if callback:
callback(actions, INTERVAL)
return Patch(chats = actions, continuation = continuation,
seektime = seektime, last = seektime)
return Patch(chats=actions, continuation=continuation,
seektime=seektime, last=seektime)
"""
allocate workers and assign blocks.
"""
"""
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(_allocate_workers())
except CancelledError:
pass
async def _shutdown():
print("\nshutdown...")
tasks = [t for t in asyncio.all_tasks()
if t is not asyncio.current_task()]
if t is not asyncio.current_task()]
for task in tasks:
task.cancel()
try:
@@ -135,7 +140,7 @@ async def _shutdown():
except asyncio.CancelledError:
pass
def cancel():
loop = asyncio.get_event_loop()
loop.create_task(_shutdown())

View File

@@ -1,6 +1,6 @@
import json
import re
import requests
import httpx
from .. import config
from ..exceptions import InvalidVideoIdException
from ..util.extract_video_id import extract_video_id
@@ -85,7 +85,7 @@ class VideoInfo:
def _get_page_text(self, video_id):
url = f"https://www.youtube.com/embed/{video_id}"
resp = requests.get(url, headers=headers)
resp = httpx.get(url, headers=headers)
resp.raise_for_status()
return resp.text

View File

@@ -1,11 +1,11 @@
import requests
import httpx
import json
import datetime
from .. import config
def extract(url):
_session = requests.Session()
_session = httpx.Client(http2=True)
html = _session.get(url, headers=config.headers)
with open(str(datetime.datetime.now().strftime('%Y-%m-%d %H-%M-%S')
) + 'test.json', mode='w', encoding='utf-8') as f: