进阶用法
💡 什么是进阶用法?
进阶用法是指在 CLI
模式的基础上,通过进阶开发者接口的方法,提高工具的使用效率,或者解决一些特殊问题。
🔍 欢迎提问
如果您在开发过程中遇到任何疑问,请详细描述并在 F2 Discussions 提问,或通过邮件联系 [email protected]
,描述您的来意,我将尽快为您解答。
Douyin
批量采集发布视频 Beta
重要 ❗❗❗
- 速率限制:抖音平台对于频繁请求有一定的速率限制,高并发的请求可能会导致请求失败出现
429
,444
等错误码。请自行控制请求速率,避免被封禁 IP。 - 网络稳定性:请确保网络环境稳定,否则可能导致采集任务中断,特别是在作品数量较多的情况下。
- 设备性能:请确保设备性能足够,避免因大量采集任务导致设备卡顿。
- 并发设置:如需采集多个作品,请适当增加
max_connections
和max_tasks
参数值,以满足异步并发需求,否则可能出现任务阻塞的情况。 - 轮询更新作品:若需轮询检测作品状态并进行采集,请参考代码片段中的变化部分。
🔗 示例代码
py
import asyncio
import traceback
from f2.apps.douyin.handler import DouyinHandler
from f2.apps.douyin.db import AsyncUserDB
from f2.apps.douyin.dl import DouyinDownloader
from f2.utils.conf_manager import ConfigManager
from f2.cli.cli_console import RichConsoleManager
from f2.log.logger import logger
# 全局配置参数,保护敏感信息
kwargs = {
"headers": {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36 Edg/130.0.0.0",
"Referer": "https://www.douyin.com/",
},
"proxies": {"http://": None, "https://": None},
# 指定模式
"mode": "post",
} | ConfigManager("conf/app.yaml").get_config("douyin")
# 实例化下载器和处理器
dydownloader = DouyinDownloader(kwargs)
dyhandler = DouyinHandler(kwargs)
# 批量采集的用户ID
sec_user_ids = [
"MS4wLjABAAAAMn__d0rqdcuqb1lVJKapsl-ssFNQnayKwd136gpbScI", # 呆瓜小匪🍉
"MS4wLjABAAAABsG6uyCohhTUpE4DmmD-c2EsdLeFIvJic8yxbXEze9g", # 攒钱隆地雷
"MS4wLjABAAAA070w5X9l5I82jsuGY6ntBMGlOYp8yzp4-rH8X1qCEPw", # 小贝
"MS4wLjABAAAAXAw5z6oNfNF1VCjmYRz1nwicQ0lLoTcOPuALhpPLKK8", # 林语惊
"MS4wLjABAAAAfQnGjmLfe2oJazbA_nO9EpA9zpieuegM5wxVMqXF6SE", # 朱之琳
"MS4wLjABAAAAPLFrUMv2S-AFNXRP2JMzvmS9_Ow39fVweFGKNxXHPys", # 深海蜜柚
"MS4wLjABAAAAOQ9BYHDT-BJr2yHwwNNvdNszXteeSzjuH5nifQOFvglpxMY3nP_qrzsIsXtEymCu", # 聪明羊羊
"MS4wLjABAAAAWaeKn3y5ZGRXElUi0iP0VcIbDH8WeZ5RmPeA9FnBZG-DYx5VTRIt-x7fXUsirIHf", # 热锅铲女
"MS4wLjABAAAAaa8Lsk2sIhdvQBXbnn_HT2FDGATjE0vHEDF5QjKsgYl5A30WE5ZDsMRemAObStYR", # 蓝羊羊不懒
"MS4wLjABAAAAEg6xF6p_5K4zBdvR0LgjMXYmY6XoOR0kIWr-EiV51Mv3ui8_d1JJhdHwSScBNO2J", # bb猪
"MS4wLjABAAAAejNXYKfKBp_9q4Hy9SHS1BndE_Jw50LbVs7zolIiVaFqzpl1EOunD4FApGocolKP", # 闪光波克尔
"MS4wLjABAAAA3CrLwX6x5aHKOdnRrEwRssgnFnmQRGf6CX3RWXc9HYEjysZ2vcy7Px0MngbLBLfc", # 糖心蛋
"MS4wLjABAAAAogz57t45g20LdsrkxEfvcoR7c701ow9FE7rBbFbYxUZETSzJBdgK__vIWmTHRLL4", # 金铁兽
"MS4wLjABAAAAj8_YMsUZglM9qYJXuZwrbT3gEpQqiW7aF6d4jpdFE1xGyDind6FkrRoUd2OjkOkF", # 谁吃了我的火龙果
]
async def download_post(sec_user_id: str):
"""
下载单个用户的所有作品
Args:
sec_user_id (str): 用户ID
"""
try:
logger.debug(
f"[bold green]开始下载用户ID:{sec_user_id} 的作品...[/bold green]"
)
async with AsyncUserDB("douyin_users.db", **kwargs) as audb:
user_path = await dyhandler.get_or_add_user_data(kwargs, sec_user_id, audb)
async for aweme_list in dyhandler.fetch_user_post_videos(
sec_user_id=sec_user_id
):
if not aweme_list:
logger.info(
f"[bold yellow]无法获取用户作品信息:[/bold yellow] {sec_user_id}"
)
return
await dydownloader.create_download_tasks(
kwargs, aweme_list._to_list(), user_path
)
logger.info(f"[bold green]用户ID:{sec_user_id} 作品下载完成。[/bold green]")
except Exception as e:
logger.error(f"[bold red]用户ID:{sec_user_id} 下载失败:{e}[/bold red]")
async def main():
"""
主函数,批量启动作品下载任务
"""
logger.info("[bold blue]开始批量下载多个用户的作品[/bold blue]")
semaphore = asyncio.Semaphore(kwargs.get("max_tasks", 5))
async def limited_download(sec_user_id):
async with semaphore:
# await download_post(sec_user_id) #
# 每小时检查一次作品更新状态 #
while True:
await download_post(sec_user_id)
await asyncio.sleep(1 * 60 * 60)
# 使用RichConsoleManager管理进度条
with RichConsoleManager().progress:
tasks = [
asyncio.create_task(limited_download(sec_user_id))
for sec_user_id in sec_user_ids
]
await asyncio.gather(*tasks)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("[bold yellow]程序已手动停止[/bold yellow]")
except Exception as e:
logger.error(f"[bold red]程序运行时出现异常: {e}[/bold red]")
logger.error(traceback.format_exc())
批量采集直播流 Beta
重要 ❗❗❗
- 账号限制:抖音平台限制同一账号无法同时进入同一直播间。因此,使用登录账号采集直播流时,仅可在采集任务启动后继续观看该直播。
- 游客账号绕过:可通过游客账号绕过上述限制。有关生成游客账号的方法,请参考
mstoken
与ttwid
或 直播弹幕转发 的相关代码片段。 - 网络稳定性:请确保网络环境稳定,否则可能导致采集任务中断。
- 设备性能:请确保设备性能足够,避免因大量采集任务导致设备卡顿。
- 并发设置:如需采集多个直播,请适当增加
max_connections
和max_tasks
参数值,以满足异步并发需求,否则可能出现任务阻塞的情况。 - 轮询开播采集:若需轮询检测开播状态并进行采集,请参考代码片段中的变化部分。
🔗 示例代码
py
import asyncio
import traceback
from f2.apps.douyin.handler import DouyinHandler
from f2.apps.douyin.db import AsyncUserDB
from f2.apps.douyin.dl import DouyinDownloader
from f2.utils.conf_manager import ConfigManager
from f2.cli.cli_console import RichConsoleManager
from f2.log.logger import logger
# 全局配置参数,保护敏感信息
kwargs = {
"headers": {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36 Edg/130.0.0.0",
"Referer": "https://www.douyin.com/",
},
"proxies": {"http://": None, "https://": None},
# 指定模式
"mode": "live",
} | ConfigManager("conf/app.yaml").get_config("douyin")
# 实例化下载器和处理器
dydownloader = DouyinDownloader(kwargs)
dyhandler = DouyinHandler(kwargs)
# 批量采集的直播间ID,如果需要填写房间ID则使用fetch_user_live_videos_by_room_id方法
webcast_ids = [
"10359270066", # 清崽
"205048140143", # 偷星九月天
"13819501559", # QQ清
"422057730070", # 丫丫br
]
async def download_live_stream(
webcast_id: str,
):
"""
下载单个直播间的直播流,直到流断开
Args:
webcast_id (str): 直播间ID
"""
try:
live = await dyhandler.fetch_user_live_videos(webcast_id=webcast_id)
if not live:
logger.info(f"[bold yellow]无法获取直播间信息:[/bold yellow] {webcast_id}")
return
if live.live_status != 2:
# 直播间未开播,跳过下载
logger.info(
f"[bold cyan]直播间ID:{webcast_id} 当前未开播,跳过...[/bold cyan]"
)
return
async with AsyncUserDB("douyin_users.db") as audb:
user_path = await dyhandler.get_or_add_user_data(
kwargs, live.sec_user_id, audb
)
logger.debug(
f"[bold green]开始下载直播间ID:{webcast_id} 的直播流...[/bold green]"
)
await dydownloader.create_stream_tasks(kwargs, live._to_dict(), user_path)
logger.info(
f"[bold green]直播间ID:{webcast_id} 直播流已结束,下载完成。[/bold green]"
)
except Exception as e:
logger.error(f"[bold red]直播间ID:{webcast_id} 下载失败:{e}[/bold red]")
async def main():
"""
主函数,批量启动直播下载任务
"""
logger.info("[bold blue]开始批量下载多个直播间的直播流[/bold blue]")
semaphore = asyncio.Semaphore(kwargs.get("max_tasks", 5))
async def limited_download(webcast_id):
async with semaphore:
# await download_live_stream(webcast_id) #
# 每分钟检查一次直播状态 #
while True:
await download_live_stream(webcast_id)
await asyncio.sleep(1 * 60)
# 使用RichConsoleManager管理进度条
with RichConsoleManager().progress:
tasks = [
asyncio.create_task(limited_download(webcast_id))
for webcast_id in webcast_ids
]
await asyncio.gather(*tasks)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("[bold yellow]程序已手动停止[/bold yellow]")
except Exception as e:
logger.error(f"[bold red]程序运行时出现异常: {e}[/bold red]")
logger.error(traceback.format_exc())
直播弹幕转发
重要 ❗❗❗
- 账号限制:抖音平台限制同一账号无法同时进入同一直播间。因此,需要使用生成
ttwid
的方法生成游客账号,绕过上述限制。 - 网络稳定性:请确保网络环境稳定,否则可能导致采集任务中断。
- 设备性能:请确保设备性能足够,避免因大量采集任务导致设备卡顿。
- 并发设置:多个直播间弹幕转发时,请使用不同的
WSS
配置连接,以避免弹幕混乱和阻塞。
🔗 示例代码
py
import asyncio
from f2.apps.douyin.crawler import DouyinWebSocketCrawler
from f2.apps.douyin.handler import DouyinHandler
from f2.apps.douyin.utils import TokenManager
from f2.log.logger import logger
kwargs = {
"headers": {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36 Edg/130.0.0.0",
"Referer": "https://www.douyin.com/",
"Content-Type": "application/protobuffer;",
},
"proxies": {"http://": None, "https://": None},
"timeout": 10,
# 游客cookie即可,需要注意每次生成的ttwid作为用户标识只可在一个直播间使用,不可多个直播间同时使用
# 使用TokenManager.gen_ttwid()即可生成新的游客ttwid
# "cookie": "GUEST_COOKIE_HERE", #
"cookie": f"ttwid={TokenManager.gen_ttwid()}; __live_version__=%221.1.2.6631%22; live_use_vvc=%22false%22;",
}
kwargs2 = {
"headers": {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36 Edg/130.0.0.0",
"Upgrade": "websocket",
"Connection": "Upgrade",
},
"proxies": {"http://": None, "https://": None},
"timeout": 10,
# 是否在终端显示弹幕消息
"show_message": True,
# 不需要填写cookie
"cookie": "",
}
wss_callbacks = {
"WebcastRoomMessage": DouyinWebSocketCrawler.WebcastRoomMessage,
"WebcastLikeMessage": DouyinWebSocketCrawler.WebcastLikeMessage,
"WebcastMemberMessage": DouyinWebSocketCrawler.WebcastMemberMessage,
"WebcastChatMessage": DouyinWebSocketCrawler.WebcastChatMessage,
"WebcastGiftMessage": DouyinWebSocketCrawler.WebcastGiftMessage,
"WebcastSocialMessage": DouyinWebSocketCrawler.WebcastSocialMessage,
"WebcastRoomUserSeqMessage": DouyinWebSocketCrawler.WebcastRoomUserSeqMessage,
"WebcastUpdateFanTicketMessage": DouyinWebSocketCrawler.WebcastUpdateFanTicketMessage,
"WebcastCommonTextMessage": DouyinWebSocketCrawler.WebcastCommonTextMessage,
"WebcastMatchAgainstScoreMessage": DouyinWebSocketCrawler.WebcastMatchAgainstScoreMessage,
"WebcastEcomFansClubMessage": DouyinWebSocketCrawler.WebcastEcomFansClubMessage,
"WebcastRanklistHourEntranceMessage": DouyinWebSocketCrawler.WebcastRanklistHourEntranceMessage,
"WebcastRoomStatsMessage": DouyinWebSocketCrawler.WebcastRoomStatsMessage,
"WebcastLiveShoppingMessage": DouyinWebSocketCrawler.WebcastLiveShoppingMessage,
"WebcastLiveEcomGeneralMessage": DouyinWebSocketCrawler.WebcastLiveEcomGeneralMessage,
"WebcastProductChangeMessage": DouyinWebSocketCrawler.WebcastProductChangeMessage,
"WebcastRoomStreamAdaptationMessage": DouyinWebSocketCrawler.WebcastRoomStreamAdaptationMessage,
"WebcastNotifyEffectMessage": DouyinWebSocketCrawler.WebcastNotifyEffectMessage,
"WebcastLightGiftMessage": DouyinWebSocketCrawler.WebcastLightGiftMessage,
"WebcastProfitInteractionScoreMessage": DouyinWebSocketCrawler.WebcastProfitInteractionScoreMessage,
"WebcastRoomRankMessage": DouyinWebSocketCrawler.WebcastRoomRankMessage,
"WebcastFansclubMessage": DouyinWebSocketCrawler.WebcastFansclubMessage,
"WebcastHotRoomMessage": DouyinWebSocketCrawler.WebcastHotRoomMessage,
"WebcastLinkMicMethod": DouyinWebSocketCrawler.WebcastLinkMicMethod,
"LinkMicMethod": DouyinWebSocketCrawler.WebcastLinkMicMethod,
"WebcastLinkerContributeMessage": DouyinWebSocketCrawler.WebcastLinkerContributeMessage,
"WebcastEmojiChatMessage": DouyinWebSocketCrawler.WebcastEmojiChatMessage,
"WebcastScreenChatMessage": DouyinWebSocketCrawler.WebcastScreenChatMessage,
"WebcastRoomDataSyncMessage": DouyinWebSocketCrawler.WebcastRoomDataSyncMessage,
"WebcastInRoomBannerMessage": DouyinWebSocketCrawler.WebcastInRoomBannerMessage,
"WebcastLinkMessage": DouyinWebSocketCrawler.WebcastLinkMessage,
"WebcastBattleTeamTaskMessage": DouyinWebSocketCrawler.WebcastBattleTeamTaskMessage,
"WebcastHotChatMessage": DouyinWebSocketCrawler.WebcastHotChatMessage,
# TODO: 以下消息类型暂未实现
# WebcastLinkMicArmiesMethod
# WebcastLinkmicPlayModeUpdateScoreMessage
# WebcastSandwichBorderMessage
# WebcastLuckyBoxTempStatusMessage
# WebcastLotteryEventMessage
# WebcastLotteryEventNewMessage
# WebcastDecorationUpdateMessage
# WebcastDecorationModifyMethod
# WebcastLinkSettingNotifyMessage
# WebcastLinkMicBattleMethod
# WebcastExhibitionChatMessage
}
async def main():
# 获取游客ttwid的user_unique_id,你可以通过TokenManager.gen_ttwid()生成新的游客ttwid
user = await DouyinHandler(kwargs).fetch_query_user()
# logger.info("游客user_unique_id:", user.user_unique_id)
# 通过此接口获取room_id,参数为live_id
room = await DouyinHandler(kwargs).fetch_user_live_videos("277303127629")
# logger.info("直播间ID:", room.room_id)
if room.live_status != 2:
logger.info("直播已结束")
return
# 通过该接口获取wss所需的cursor和internal_ext
live_im = await DouyinHandler(kwargs).fetch_live_im(
room_id=room.room_id, unique_id=user.user_unique_id
)
# logger.info(
# "直播间IM页码:", live_im.cursor, "直播间IM扩展:", live_im.internal_ext
# )
# 获取直播弹幕
await DouyinHandler(kwargs2).fetch_live_danmaku(
room_id=room.room_id,
user_unique_id=user.user_unique_id,
internal_ext=live_im.internal_ext,
cursor=live_im.cursor,
wss_callbacks=wss_callbacks,
)
if __name__ == "__main__":
asyncio.run(main())