live-forum/server/crawler/commerce/services/crawler_service.py
2026-03-24 11:27:37 +08:00

447 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
数据抓取服务
负责排行榜数据的抓取和存储
"""
import json
import math
import time
import random
import requests
from pathlib import Path
from typing import List, Dict, Any, Optional
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
from core.api_client import ByteDanceAPIClient
from core.cookie_manager import CookieManager
from config.settings import (
DATA_DIR, FETCH_FANS_ENABLED,
FANS_FETCH_INTERVAL_MIN, FANS_FETCH_INTERVAL_MAX,
FANS_FETCH_CONCURRENCY,
REPORT_URL, REPORT_TIMEOUT, REPORT_MAX_RETRIES
)
from utils.logger import logger
class CrawlerService:
"""数据抓取服务"""
def __init__(self, api_client: Optional[ByteDanceAPIClient] = None):
"""
初始化抓取服务
Args:
api_client: API客户端实例如果为None则创建新实例
"""
self.api_client = api_client or ByteDanceAPIClient()
self.output_dir = DATA_DIR / 'anchor_income_rank'
self.output_dir.mkdir(parents=True, exist_ok=True)
def extract_anchor_data(self, raw_data: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
从原始响应数据中提取主播信息
Args:
raw_data: API返回的原始数据
Returns:
提取后的主播数据列表
"""
anchors = []
try:
series = []
# 解析嵌套的JSON结构
if 'data' in raw_data:
if 'data_string' in raw_data['data']:
# 如果数据在data_string中字符串格式的JSON
data_string = raw_data['data']['data_string']
parsed_data = json.loads(data_string)
if 'data' in parsed_data and 'series' in parsed_data['data']:
series = parsed_data['data']['series']
elif 'series' in raw_data['data']:
# 如果数据直接在data中
series = raw_data['data']['series']
if not series:
logger.warning("无法找到数据系列")
return anchors
# 提取每个主播的信息
for item in series:
anchor = {
'rank': int(item.get('rank', 0)),
'anchorID': item.get('anchorID', ''),
'anchorName': item.get('anchorName', ''),
'anchorAvatar': item.get('anchorAvatar', ''),
'income': item.get('income', '0'),
'liveStatus': item.get('liveStatus', '0'),
'linkMicStatus': item.get('linkMicStatus', '0'),
'userID': item.get('userID', '')
}
anchors.append(anchor)
except Exception as e:
logger.error(f"提取主播数据失败: {e}", exc_info=True)
return anchors
def fetch_all_pages(self, rank_type: int = 0, size: int = 10, filter_type: str = 'anchor', fetch_fans: bool = False) -> List[Dict[str, Any]]:
"""
抓取所有页的数据
Args:
rank_type: 排行类型0=总榜
size: 每页数量
filter_type: 过滤类型
fetch_fans: 是否获取粉丝数
Returns:
所有页的主播数据列表如果fetch_fans=True已包含fans字段
"""
all_anchors = []
# 先获取第一页,确定总数
logger.info("正在获取第1页数据...")
first_page_data = self.api_client.get_leaderboard(
rank_type=rank_type,
page=1,
size=size,
filter_type=filter_type
)
if not first_page_data:
logger.error("获取第一页数据失败")
return all_anchors
# 提取第一页数据
first_page_anchors = self.extract_anchor_data(first_page_data)
# 如果启用粉丝数抓取,立即获取这一页的粉丝数
if fetch_fans and first_page_anchors:
logger.info(f"正在获取第1页的粉丝数据{len(first_page_anchors)}个主播,并发数: {FANS_FETCH_CONCURRENCY}...")
first_page_anchors = self._fetch_fans_for_page(first_page_anchors, page_num=1, concurrency=FANS_FETCH_CONCURRENCY)
all_anchors.extend(first_page_anchors)
logger.info(f"第1页: 获取到 {len(first_page_anchors)} 条数据")
# 获取总数
try:
if 'data' in first_page_data:
if 'data_string' in first_page_data['data']:
# 如果数据在data_string中字符串格式的JSON
data_string = first_page_data['data']['data_string']
parsed_data = json.loads(data_string)
total = parsed_data.get('data', {}).get('total', 0)
elif 'total' in first_page_data['data']:
# 如果total直接在data中
total = first_page_data['data'].get('total', 0)
else:
total = len(first_page_anchors)
logger.warning("无法获取总数,使用当前页数据量")
else:
total = len(first_page_anchors)
logger.warning("无法获取总数,使用当前页数据量")
except Exception as e:
logger.warning(f"解析总数失败: {e},使用当前页数据量")
total = len(first_page_anchors)
logger.info(f"总记录数: {total}")
# 计算总页数
total_pages = math.ceil(total / size) if total > 0 else 1
logger.info(f"总页数: {total_pages}")
# 如果只有一页,直接返回
if total_pages <= 1:
return all_anchors
# 抓取剩余页
for page in range(2, total_pages + 1):
logger.info(f"正在获取第{page}页数据...")
page_data = self.api_client.get_leaderboard(
rank_type=rank_type,
page=page,
size=size,
filter_type=filter_type
)
if not page_data:
logger.warning(f"{page}页获取失败,跳过")
continue
page_anchors = self.extract_anchor_data(page_data)
# 如果启用粉丝数抓取,立即获取这一页的粉丝数
if fetch_fans and page_anchors:
logger.info(f"正在获取第{page}页的粉丝数据({len(page_anchors)}个主播,并发数: {FANS_FETCH_CONCURRENCY}...")
page_anchors = self._fetch_fans_for_page(page_anchors, page_num=page, concurrency=FANS_FETCH_CONCURRENCY)
all_anchors.extend(page_anchors)
logger.info(f"{page}页: 获取到 {len(page_anchors)} 条数据")
# 请求间隔,避免频率过高
time.sleep(1)
logger.info(f"抓取完成,共获取 {len(all_anchors)} 条数据")
return all_anchors
def _fetch_fans_for_page(self, anchors: List[Dict[str, Any]], page_num: int, concurrency: int = FANS_FETCH_CONCURRENCY) -> List[Dict[str, Any]]:
"""
为单页的主播数据获取粉丝数(并发执行)
Args:
anchors: 单页的主播数据列表
page_num: 页码(用于显示)
concurrency: 并发数
Returns:
添加了fans字段的主播数据列表
"""
total = len(anchors)
if total == 0:
return []
# 创建结果字典使用索引作为key保持顺序
results = {}
def fetch_single_fans(anchor: Dict[str, Any], index: int) -> tuple:
"""获取单个主播的粉丝数"""
anchor_id = anchor.get('anchorID', '')
anchor_name = anchor.get('anchorName', anchor_id)
if not anchor_id:
return (index, anchor, '0', '无anchorID')
try:
# 随机请求间隔(避免同时发起太多请求)
interval = random.uniform(FANS_FETCH_INTERVAL_MIN, FANS_FETCH_INTERVAL_MAX)
time.sleep(interval)
# 获取粉丝数
fans = self.api_client.get_anchor_detail(anchor_id)
if fans is None:
logger.warning(f"获取粉丝数失败anchor_id: {anchor_id}设置为0")
return (index, anchor, '0', '请求失败')
return (index, anchor, fans, '成功')
except Exception as e:
logger.error(f"获取粉丝数异常anchor_id: {anchor_id}, 错误: {e}")
return (index, anchor, '0', f'异常: {str(e)[:20]}')
# 使用线程池并发执行
completed = 0
with ThreadPoolExecutor(max_workers=concurrency) as executor:
# 提交所有任务
future_to_index = {
executor.submit(fetch_single_fans, anchor, index): index
for index, anchor in enumerate(anchors)
}
# 处理完成的任务
for future in as_completed(future_to_index):
try:
index, anchor, fans, status = future.result()
anchor['fans'] = fans
results[index] = anchor
completed += 1
# 显示进度
anchor_name = anchor.get('anchorName', anchor.get('anchorID', 'Unknown'))
print(f"{page_num}页进度: {completed}/{total} - {anchor_name}: {fans} 粉丝 ({status})", end='\r')
except Exception as e:
index = future_to_index[future]
logger.error(f"处理任务异常index: {index}, 错误: {e}")
anchor = anchors[index]
anchor['fans'] = '0'
results[index] = anchor
completed += 1
print(f"{page_num}页进度: {completed}/{total} - 处理异常", end='\r')
print() # 换行
# 按照原始顺序返回结果
updated_anchors = [results[i] for i in range(total)]
return updated_anchors
def report_data(self, data: Dict[str, Any], filename: Optional[str] = None) -> bool:
"""
上报数据到指定地址
Args:
data: 要上报的数据完整的JSON对象
filename: 保存的文件名用于替换URL中的占位符
Returns:
上报是否成功
"""
if not REPORT_URL:
logger.debug("未配置上报地址,跳过数据上报")
return False
# 替换URL中的占位符
report_url = REPORT_URL
if filename:
# 提取文件名不含扩展名作为logId
log_id = Path(filename).stem # 获取不含扩展名的文件名
report_url = report_url.replace('{logId}', log_id)
logger.info(f"开始上报数据到: {report_url}")
for attempt in range(REPORT_MAX_RETRIES):
try:
response = requests.post(
report_url,
json=data, # 使用json参数自动设置Content-Type为application/json
timeout=REPORT_TIMEOUT,
headers={
'Content-Type': 'application/json',
'User-Agent': 'ByteDance-Crawler/1.0'
}
)
response.raise_for_status()
logger.info(f"数据上报成功,状态码: {response.status_code}")
return True
except requests.exceptions.Timeout:
logger.warning(f"上报请求超时 (尝试 {attempt + 1}/{REPORT_MAX_RETRIES})")
if attempt < REPORT_MAX_RETRIES - 1:
time.sleep(2 * (attempt + 1)) # 指数退避
else:
logger.error("上报请求超时,已达到最大重试次数")
except requests.exceptions.HTTPError as e:
logger.error(f"上报请求HTTP错误: {e.response.status_code} - {e.response.text}")
if attempt < REPORT_MAX_RETRIES - 1:
time.sleep(2 * (attempt + 1))
else:
logger.error("上报请求失败,已达到最大重试次数")
except requests.exceptions.RequestException as e:
logger.error(f"上报请求异常: {e}")
if attempt < REPORT_MAX_RETRIES - 1:
time.sleep(2 * (attempt + 1))
else:
logger.error("上报请求失败,已达到最大重试次数")
except Exception as e:
logger.error(f"上报数据时发生未知错误: {e}", exc_info=True)
return False
return False
def save_to_json(self, data: List[Dict[str, Any]], filename: Optional[str] = None) -> Path:
"""
保存数据到JSON文件
Args:
data: 要保存的数据列表
filename: 文件名如果为None则使用时间戳
Returns:
保存的文件路径
"""
if filename is None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"anchor_income_rank_{timestamp}.json"
filepath = self.output_dir / filename
# 构建保存的数据结构
save_data = {
"metadata": {
"total": len(data),
"created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"source": "字节跳动直播服务平台"
},
"data": data
}
try:
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(save_data, f, ensure_ascii=False, indent=2)
logger.info(f"数据已保存到: {filepath}")
# 尝试上报数据(上报失败不影响保存)
if REPORT_URL:
print(f"\n📤 正在上报数据...")
report_success = self.report_data(save_data, filename=filename)
if report_success:
print(f"✅ 数据上报成功")
else:
print(f"⚠️ 数据上报失败,但数据已保存到本地")
return filepath
except Exception as e:
logger.error(f"保存数据失败: {e}", exc_info=True)
raise
def crawl(self, rank_type: int = 0, size: int = 10, filter_type: str = 'anchor', fetch_fans: bool = FETCH_FANS_ENABLED) -> Optional[Path]:
"""
执行完整的抓取流程
Args:
rank_type: 排行类型0=总榜
size: 每页数量
filter_type: 过滤类型
fetch_fans: 是否抓取粉丝数据
Returns:
保存的文件路径如果失败则返回None
"""
try:
print("\n" + "="*60)
print("🚀 开始抓取排行榜数据")
if fetch_fans:
print("📊 粉丝数据抓取: 已启用")
else:
print("📊 粉丝数据抓取: 已禁用")
print("="*60)
# 抓取所有页数据(如果启用粉丝数,会在每页获取后立即获取粉丝数)
all_data = self.fetch_all_pages(
rank_type=rank_type,
size=size,
filter_type=filter_type,
fetch_fans=fetch_fans
)
if not all_data:
print("\n❌ 未获取到任何数据")
return None
# 保存数据
print(f"\n💾 正在保存数据(共 {len(all_data)} 条)...")
# 根据是否包含粉丝数,使用不同的文件名
if fetch_fans:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"anchor_income_rank_with_fans_{timestamp}.json"
else:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"anchor_income_rank_{timestamp}.json"
filepath = self.save_to_json(all_data, filename)
print(f"\n✅ 抓取完成!")
print(f"📁 数据已保存到: {filepath}")
print("="*60 + "\n")
return filepath
except Exception as e:
logger.error(f"抓取过程发生错误: {e}", exc_info=True)
print(f"\n❌ 抓取失败: {e}")
return None