live-forum/server/crawler/commerce/services/daren_account_service.py
2026-03-24 11:27:37 +08:00

344 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
达人账号数据抓取服务
负责抓取MCN机构绑定的达人账号数据
"""
import json
import time
import requests
from pathlib import Path
from typing import List, Dict, Any, Optional
from datetime import datetime
from core.api_client import ByteDanceAPIClient
from config.settings import DATA_DIR, REPORT_URL, REPORT_TIMEOUT, REPORT_MAX_RETRIES
from utils.logger import logger
class DarenAccountService:
"""达人账号数据抓取服务"""
def __init__(self, api_client: Optional[ByteDanceAPIClient] = None):
"""
初始化抓取服务
Args:
api_client: API客户端实例如果为None则创建新实例
"""
self.api_client = api_client or ByteDanceAPIClient()
self.output_dir = DATA_DIR / 'all_account'
self.output_dir.mkdir(parents=True, exist_ok=True)
def extract_daren_data(self, raw_data: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
从原始响应数据中提取达人信息并重组为目标格式
Args:
raw_data: API返回的原始数据
Returns:
重组后的达人数据列表(符合目标格式)
"""
darens = []
try:
if 'data' not in raw_data:
logger.warning("响应中缺少data字段")
return darens
data_obj = raw_data['data']
daren_list = data_obj.get('daren_list', [])
if not daren_list:
logger.warning("daren_list为空")
return darens
# 提取每个达人的信息并重组
for index, item in enumerate(daren_list):
# 提取抖音号short_id
anchor_id = ''
short_id_list = item.get('short_id_list', [])
for short_id_item in short_id_list:
if short_id_item.get('app_name') == '抖音号':
anchor_id = short_id_item.get('short_id', '')
break
# 如果没有抖音号使用aweme_id作为后备
if not anchor_id:
anchor_id = item.get('aweme_id', '')
# 转换liveStatusis_live!=0 -> "2", 否则 -> "1"
is_live = item.get('is_live', 0)
live_status = "2" if is_live != 0 else "1"
# 重组为目标格式
daren = {
'rank': index + 1, # 使用序号作为rank
'anchorID': anchor_id,
'anchorName': item.get('user_name', ''),
'anchorAvatar': item.get('avatar', ''),
'income': '0', # 固定为"0"(该接口不提供收入数据)
'fans': str(item.get('fans_count', 0)), # 转换为字符串
'liveStatus': live_status,
'linkMicStatus': '0', # 固定为"0"
'userID': item.get('user_id', ''),
}
darens.append(daren)
logger.debug(f"提取并重组 {len(darens)} 条达人数据")
except Exception as e:
logger.error(f"提取达人数据失败: {e}", exc_info=True)
return darens
def fetch_all_pages(self, page_size: int = 20, status: int = 1) -> List[Dict[str, Any]]:
"""
抓取所有页的达人数据
Args:
page_size: 每页数量
status: 状态1=全部)
Returns:
所有页的达人数据列表
"""
all_darens = []
page = 1
total = None
print(f"\n{'='*60}")
print(f"🚀 开始抓取达人账号数据")
print(f"{'='*60}")
while True:
logger.info(f"正在获取第{page}页数据...")
print(f"\n📄 正在获取第{page}页数据...")
# 获取当前页数据
page_data = self.api_client.get_daren_list(
page=page,
page_size=page_size,
status=status
)
if not page_data:
logger.error(f"{page}页获取失败")
print(f"❌ 第{page}页获取失败")
break
# 检查响应状态
st = page_data.get('st', -1)
code = page_data.get('code', -1)
if st != 0 or code != 0:
msg = page_data.get('msg', '')
logger.error(f"{page}页返回错误: st={st}, code={code}, msg={msg}")
print(f"❌ 第{page}页返回错误: {msg}")
break
# 提取达人数据
page_darens = self.extract_daren_data(page_data)
if not page_darens:
logger.info(f"{page}页没有数据,抓取完成")
print(f"✅ 第{page}页没有数据,抓取完成")
break
all_darens.extend(page_darens)
# 获取总数(第一页时)
if total is None and 'data' in page_data:
total = page_data['data'].get('total', 0)
print(f"📊 总数: {total} 个达人")
logger.info(f"总记录数: {total}")
print(f"✅ 第{page}页: 获取到 {len(page_darens)} 条数据(累计: {len(all_darens)}")
# 如果当前页数据少于page_size说明是最后一页
if len(page_darens) < page_size:
logger.info("已是最后一页,抓取完成")
print(f"\n✅ 已是最后一页,抓取完成")
break
# 如果已经达到总数,停止
if total and len(all_darens) >= total:
logger.info("已达到总数,抓取完成")
print(f"\n✅ 已达到总数,抓取完成")
break
page += 1
# 请求间隔,避免频率过高
time.sleep(1)
print(f"\n{'='*60}")
print(f"✅ 抓取完成,共获取 {len(all_darens)} 条数据")
print(f"{'='*60}")
logger.info(f"抓取完成,共获取 {len(all_darens)} 条数据")
return all_darens
def report_data(self, data: Dict[str, Any], filename: Optional[str] = None) -> bool:
"""
上报数据到指定地址
Args:
data: 要上报的数据完整的JSON对象
filename: 保存的文件名用于替换URL中的占位符
Returns:
上报是否成功
"""
if not REPORT_URL:
logger.debug("未配置上报地址,跳过数据上报")
return False
# 替换URL中的占位符
report_url = REPORT_URL
if filename:
# 提取文件名不含扩展名作为logId
log_id = Path(filename).stem # 获取不含扩展名的文件名
report_url = report_url.replace('{logId}', log_id)
logger.info(f"开始上报数据到: {report_url}")
for attempt in range(REPORT_MAX_RETRIES):
try:
response = requests.post(
report_url,
json=data, # 使用json参数自动设置Content-Type为application/json
timeout=REPORT_TIMEOUT,
headers={
'Content-Type': 'application/json',
'User-Agent': 'ByteDance-Crawler/1.0'
}
)
response.raise_for_status()
logger.info(f"数据上报成功,状态码: {response.status_code}")
return True
except requests.exceptions.Timeout:
logger.warning(f"上报请求超时 (尝试 {attempt + 1}/{REPORT_MAX_RETRIES})")
if attempt < REPORT_MAX_RETRIES - 1:
time.sleep(2 * (attempt + 1)) # 指数退避
else:
logger.error("上报请求超时,已达到最大重试次数")
except requests.exceptions.HTTPError as e:
logger.error(f"上报请求HTTP错误: {e.response.status_code} - {e.response.text}")
if attempt < REPORT_MAX_RETRIES - 1:
time.sleep(2 * (attempt + 1))
else:
logger.error("上报请求失败,已达到最大重试次数")
except requests.exceptions.RequestException as e:
logger.error(f"上报请求异常: {e}")
if attempt < REPORT_MAX_RETRIES - 1:
time.sleep(2 * (attempt + 1))
else:
logger.error("上报请求失败,已达到最大重试次数")
except Exception as e:
logger.error(f"上报数据时发生未知错误: {e}", exc_info=True)
return False
return False
def save_to_json(self, data: List[Dict[str, Any]], filename: Optional[str] = None) -> Path:
"""
保存数据到JSON文件
Args:
data: 要保存的数据列表
filename: 文件名如果为None则使用时间戳
Returns:
保存的文件路径
"""
if filename is None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"daren_account_{timestamp}.json"
filepath = self.output_dir / filename
# 构建保存的数据结构
save_data = {
"metadata": {
"total": len(data),
"created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"source": "巨量百应 - MCN机构达人账号管理"
},
"data": data
}
try:
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(save_data, f, ensure_ascii=False, indent=2)
logger.info(f"数据已保存到: {filepath}")
print(f"\n💾 数据已保存到: {filepath}")
# 尝试上报数据(上报失败不影响保存)
if REPORT_URL:
print(f"\n📤 正在上报数据到服务器...")
report_success = self.report_data(save_data, filename=filename)
if report_success:
print(f"✅ 数据上报成功")
else:
print(f"⚠️ 数据上报失败,但数据已保存到本地")
return filepath
except Exception as e:
logger.error(f"保存数据失败: {e}", exc_info=True)
print(f"\n❌ 保存数据失败: {e}")
raise
def crawl(self, page_size: int = 20, status: int = 1) -> Optional[Path]:
"""
执行完整的抓取流程
Args:
page_size: 每页数量
status: 状态1=全部)
Returns:
保存的文件路径如果失败则返回None
"""
try:
# 抓取所有页数据
all_data = self.fetch_all_pages(page_size=page_size, status=status)
if not all_data:
print("\n❌ 未获取到任何数据")
return None
# 保存数据
filepath = self.save_to_json(all_data)
# 显示统计信息
print(f"\n📊 数据统计:")
print(f" - 总达人数: {len(all_data)}")
# 统计在线直播的达人liveStatus == "2"
live_count = sum(1 for d in all_data if d.get('liveStatus') == '2')
if live_count > 0:
print(f" - 正在直播: {live_count}")
# 统计粉丝数最多的前3名
sorted_by_fans = sorted(all_data, key=lambda x: int(x.get('fans', 0)), reverse=True)
if len(sorted_by_fans) >= 3:
print(f"\n🏆 粉丝数Top 3:")
for i, daren in enumerate(sorted_by_fans[:3], 1):
fans_num = int(daren.get('fans', 0))
print(f" {i}. {daren['anchorName']}: {fans_num:,} 粉丝")
return filepath
except Exception as e:
logger.error(f"抓取过程发生错误: {e}", exc_info=True)
print(f"\n❌ 抓取失败: {e}")
return None