Advanced Usage
💡 What is Advanced Usage?
Advanced usage refers to improving tool efficiency or solving specific issues using developer interfaces beyond the basic CLI
mode.
🔍 Have Questions?
If you encounter any issues during development, please describe them in detail and ask in F2 Discussions, or contact [email protected]
via email with your inquiry. I will respond as soon as possible.
Douyin
Batch Collection of Published Videos Beta
Important ❗❗❗
- Rate Limits: The Douyin platform enforces rate limits on frequent requests. High-concurrency requests may result in errors such as
429
or444
. Control the request rate to avoid IP bans. - Network Stability: Ensure a stable network environment to prevent collection task interruptions, especially when processing a large number of videos.
- Device Performance: Ensure sufficient device performance to prevent lag caused by large-scale collection tasks.
- Concurrency Settings: When collecting multiple videos, increase the values of
max_connections
andmax_tasks
appropriately to support asynchronous concurrency. Otherwise, tasks may become blocked. - Polling for New Videos: If you need to poll for video updates and collect them, refer to the changes in the provided code snippet.
🔗 Example Code
py
import asyncio
import traceback
from f2.apps.douyin.handler import DouyinHandler
from f2.apps.douyin.db import AsyncUserDB
from f2.apps.douyin.dl import DouyinDownloader
from f2.utils.conf_manager import ConfigManager
from f2.cli.cli_console import RichConsoleManager
from f2.log.logger import logger
# 全局配置参数,保护敏感信息
kwargs = {
"headers": {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36 Edg/130.0.0.0",
"Referer": "https://www.douyin.com/",
},
"proxies": {"http://": None, "https://": None},
# 指定模式
"mode": "post",
} | ConfigManager("conf/app.yaml").get_config("douyin")
# 实例化下载器和处理器
dydownloader = DouyinDownloader(kwargs)
dyhandler = DouyinHandler(kwargs)
# 批量采集的用户ID
sec_user_ids = [
"MS4wLjABAAAAMn__d0rqdcuqb1lVJKapsl-ssFNQnayKwd136gpbScI", # 呆瓜小匪🍉
"MS4wLjABAAAABsG6uyCohhTUpE4DmmD-c2EsdLeFIvJic8yxbXEze9g", # 攒钱隆地雷
"MS4wLjABAAAA070w5X9l5I82jsuGY6ntBMGlOYp8yzp4-rH8X1qCEPw", # 小贝
"MS4wLjABAAAAXAw5z6oNfNF1VCjmYRz1nwicQ0lLoTcOPuALhpPLKK8", # 林语惊
"MS4wLjABAAAAfQnGjmLfe2oJazbA_nO9EpA9zpieuegM5wxVMqXF6SE", # 朱之琳
"MS4wLjABAAAAPLFrUMv2S-AFNXRP2JMzvmS9_Ow39fVweFGKNxXHPys", # 深海蜜柚
"MS4wLjABAAAAOQ9BYHDT-BJr2yHwwNNvdNszXteeSzjuH5nifQOFvglpxMY3nP_qrzsIsXtEymCu", # 聪明羊羊
"MS4wLjABAAAAWaeKn3y5ZGRXElUi0iP0VcIbDH8WeZ5RmPeA9FnBZG-DYx5VTRIt-x7fXUsirIHf", # 热锅铲女
"MS4wLjABAAAAaa8Lsk2sIhdvQBXbnn_HT2FDGATjE0vHEDF5QjKsgYl5A30WE5ZDsMRemAObStYR", # 蓝羊羊不懒
"MS4wLjABAAAAEg6xF6p_5K4zBdvR0LgjMXYmY6XoOR0kIWr-EiV51Mv3ui8_d1JJhdHwSScBNO2J", # bb猪
"MS4wLjABAAAAejNXYKfKBp_9q4Hy9SHS1BndE_Jw50LbVs7zolIiVaFqzpl1EOunD4FApGocolKP", # 闪光波克尔
"MS4wLjABAAAA3CrLwX6x5aHKOdnRrEwRssgnFnmQRGf6CX3RWXc9HYEjysZ2vcy7Px0MngbLBLfc", # 糖心蛋
"MS4wLjABAAAAogz57t45g20LdsrkxEfvcoR7c701ow9FE7rBbFbYxUZETSzJBdgK__vIWmTHRLL4", # 金铁兽
"MS4wLjABAAAAj8_YMsUZglM9qYJXuZwrbT3gEpQqiW7aF6d4jpdFE1xGyDind6FkrRoUd2OjkOkF", # 谁吃了我的火龙果
]
async def download_post(sec_user_id: str):
"""
下载单个用户的所有作品
Args:
sec_user_id (str): 用户ID
"""
try:
logger.debug(
f"[bold green]开始下载用户ID:{sec_user_id} 的作品...[/bold green]"
)
async with AsyncUserDB("douyin_users.db", **kwargs) as audb:
user_path = await dyhandler.get_or_add_user_data(kwargs, sec_user_id, audb)
async for aweme_list in dyhandler.fetch_user_post_videos(
sec_user_id=sec_user_id
):
if not aweme_list:
logger.info(
f"[bold yellow]无法获取用户作品信息:[/bold yellow] {sec_user_id}"
)
return
await dydownloader.create_download_tasks(
kwargs, aweme_list._to_list(), user_path
)
logger.info(f"[bold green]用户ID:{sec_user_id} 作品下载完成。[/bold green]")
except Exception as e:
logger.error(f"[bold red]用户ID:{sec_user_id} 下载失败:{e}[/bold red]")
async def main():
"""
主函数,批量启动作品下载任务
"""
logger.info("[bold blue]开始批量下载多个用户的作品[/bold blue]")
semaphore = asyncio.Semaphore(kwargs.get("max_tasks", 5))
async def limited_download(sec_user_id):
async with semaphore:
# await download_post(sec_user_id) #
# 每小时检查一次作品更新状态 #
while True:
await download_post(sec_user_id)
await asyncio.sleep(1 * 60 * 60)
# 使用RichConsoleManager管理进度条
with RichConsoleManager().progress:
tasks = [
asyncio.create_task(limited_download(sec_user_id))
for sec_user_id in sec_user_ids
]
await asyncio.gather(*tasks)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("[bold yellow]程序已手动停止[/bold yellow]")
except Exception as e:
logger.error(f"[bold red]程序运行时出现异常: {e}[/bold red]")
logger.error(traceback.format_exc())
Batch Collection of Live Streams Beta
Important ❗❗❗
- Account Restrictions: Douyin restricts the same account from entering the same livestream room simultaneously. When collecting livestreams with a logged-in account, you can only watch the stream after starting the collection task.
- Bypassing Restrictions with Guest Accounts: You can bypass the above restriction using a guest account. Refer to methods for generating
mstoken
andttwid
, or see the relevant code snippet in Livestream Danmaku Forwarding. - Network Stability: Ensure a stable network environment to prevent task interruptions.
- Device Performance: Ensure sufficient device performance to prevent lag from large-scale collection tasks.
- Concurrency Settings: When collecting multiple livestreams, increase
max_connections
andmax_tasks
appropriately to support asynchronous concurrency. Otherwise, tasks may become blocked. - Polling for Live Streams: If you need to poll for live stream status updates and collect them, refer to the changes in the provided code snippet.
🔗 Example Code
py
import asyncio
import traceback
from f2.apps.douyin.handler import DouyinHandler
from f2.apps.douyin.db import AsyncUserDB
from f2.apps.douyin.dl import DouyinDownloader
from f2.utils.conf_manager import ConfigManager
from f2.cli.cli_console import RichConsoleManager
from f2.log.logger import logger
# 全局配置参数,保护敏感信息
kwargs = {
"headers": {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36 Edg/130.0.0.0",
"Referer": "https://www.douyin.com/",
},
"proxies": {"http://": None, "https://": None},
# 指定模式
"mode": "live",
} | ConfigManager("conf/app.yaml").get_config("douyin")
# 实例化下载器和处理器
dydownloader = DouyinDownloader(kwargs)
dyhandler = DouyinHandler(kwargs)
# 批量采集的直播间ID,如果需要填写房间ID则使用fetch_user_live_videos_by_room_id方法
webcast_ids = [
"10359270066", # 清崽
"205048140143", # 偷星九月天
"13819501559", # QQ清
"422057730070", # 丫丫br
]
async def download_live_stream(
webcast_id: str,
):
"""
下载单个直播间的直播流,直到流断开
Args:
webcast_id (str): 直播间ID
"""
try:
live = await dyhandler.fetch_user_live_videos(webcast_id=webcast_id)
if not live:
logger.info(f"[bold yellow]无法获取直播间信息:[/bold yellow] {webcast_id}")
return
if live.live_status != 2:
# 直播间未开播,跳过下载
logger.info(
f"[bold cyan]直播间ID:{webcast_id} 当前未开播,跳过...[/bold cyan]"
)
return
async with AsyncUserDB("douyin_users.db") as audb:
user_path = await dyhandler.get_or_add_user_data(
kwargs, live.sec_user_id, audb
)
logger.debug(
f"[bold green]开始下载直播间ID:{webcast_id} 的直播流...[/bold green]"
)
await dydownloader.create_stream_tasks(kwargs, live._to_dict(), user_path)
logger.info(
f"[bold green]直播间ID:{webcast_id} 直播流已结束,下载完成。[/bold green]"
)
except Exception as e:
logger.error(f"[bold red]直播间ID:{webcast_id} 下载失败:{e}[/bold red]")
async def main():
"""
主函数,批量启动直播下载任务
"""
logger.info("[bold blue]开始批量下载多个直播间的直播流[/bold blue]")
semaphore = asyncio.Semaphore(kwargs.get("max_tasks", 5))
async def limited_download(webcast_id):
async with semaphore:
# await download_live_stream(webcast_id) #
# 每分钟检查一次直播状态 #
while True:
await download_live_stream(webcast_id)
await asyncio.sleep(1 * 60)
# 使用RichConsoleManager管理进度条
with RichConsoleManager().progress:
tasks = [
asyncio.create_task(limited_download(webcast_id))
for webcast_id in webcast_ids
]
await asyncio.gather(*tasks)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("[bold yellow]程序已手动停止[/bold yellow]")
except Exception as e:
logger.error(f"[bold red]程序运行时出现异常: {e}[/bold red]")
logger.error(traceback.format_exc())
Livestream Danmaku Forwarding
Important ❗❗❗
- Account Restrictions: Douyin restricts the same account from entering the same livestream room simultaneously. To bypass this, generate a guest account using
ttwid
. - Network Stability: Ensure a stable network environment to prevent task interruptions.
- Device Performance: Ensure sufficient device performance to prevent lag from large-scale collection tasks.
- Concurrency Settings: When forwarding danmaku (chat messages) from multiple livestreams, use separate
WSS
connections to prevent message mix-ups and blockages.
🔗 Example Code
py
import asyncio
from f2.apps.douyin.crawler import DouyinWebSocketCrawler
from f2.apps.douyin.handler import DouyinHandler
from f2.apps.douyin.utils import TokenManager
from f2.log.logger import logger
kwargs = {
"headers": {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36 Edg/130.0.0.0",
"Referer": "https://www.douyin.com/",
"Content-Type": "application/protobuffer;",
},
"proxies": {"http://": None, "https://": None},
"timeout": 10,
# 游客cookie即可,需要注意每次生成的ttwid作为用户标识只可在一个直播间使用,不可多个直播间同时使用
# 使用TokenManager.gen_ttwid()即可生成新的游客ttwid
# "cookie": "GUEST_COOKIE_HERE", #
"cookie": f"ttwid={TokenManager.gen_ttwid()}; __live_version__=%221.1.2.6631%22; live_use_vvc=%22false%22;",
}
kwargs2 = {
"headers": {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36 Edg/130.0.0.0",
"Upgrade": "websocket",
"Connection": "Upgrade",
},
"proxies": {"http://": None, "https://": None},
"timeout": 10,
# 是否在终端显示弹幕消息
"show_message": True,
# 不需要填写cookie
"cookie": "",
}
wss_callbacks = {
"WebcastRoomMessage": DouyinWebSocketCrawler.WebcastRoomMessage,
"WebcastLikeMessage": DouyinWebSocketCrawler.WebcastLikeMessage,
"WebcastMemberMessage": DouyinWebSocketCrawler.WebcastMemberMessage,
"WebcastChatMessage": DouyinWebSocketCrawler.WebcastChatMessage,
"WebcastGiftMessage": DouyinWebSocketCrawler.WebcastGiftMessage,
"WebcastSocialMessage": DouyinWebSocketCrawler.WebcastSocialMessage,
"WebcastRoomUserSeqMessage": DouyinWebSocketCrawler.WebcastRoomUserSeqMessage,
"WebcastUpdateFanTicketMessage": DouyinWebSocketCrawler.WebcastUpdateFanTicketMessage,
"WebcastCommonTextMessage": DouyinWebSocketCrawler.WebcastCommonTextMessage,
"WebcastMatchAgainstScoreMessage": DouyinWebSocketCrawler.WebcastMatchAgainstScoreMessage,
"WebcastEcomFansClubMessage": DouyinWebSocketCrawler.WebcastEcomFansClubMessage,
"WebcastRanklistHourEntranceMessage": DouyinWebSocketCrawler.WebcastRanklistHourEntranceMessage,
"WebcastRoomStatsMessage": DouyinWebSocketCrawler.WebcastRoomStatsMessage,
"WebcastLiveShoppingMessage": DouyinWebSocketCrawler.WebcastLiveShoppingMessage,
"WebcastLiveEcomGeneralMessage": DouyinWebSocketCrawler.WebcastLiveEcomGeneralMessage,
"WebcastProductChangeMessage": DouyinWebSocketCrawler.WebcastProductChangeMessage,
"WebcastRoomStreamAdaptationMessage": DouyinWebSocketCrawler.WebcastRoomStreamAdaptationMessage,
"WebcastNotifyEffectMessage": DouyinWebSocketCrawler.WebcastNotifyEffectMessage,
"WebcastLightGiftMessage": DouyinWebSocketCrawler.WebcastLightGiftMessage,
"WebcastProfitInteractionScoreMessage": DouyinWebSocketCrawler.WebcastProfitInteractionScoreMessage,
"WebcastRoomRankMessage": DouyinWebSocketCrawler.WebcastRoomRankMessage,
"WebcastFansclubMessage": DouyinWebSocketCrawler.WebcastFansclubMessage,
"WebcastHotRoomMessage": DouyinWebSocketCrawler.WebcastHotRoomMessage,
"WebcastLinkMicMethod": DouyinWebSocketCrawler.WebcastLinkMicMethod,
"LinkMicMethod": DouyinWebSocketCrawler.WebcastLinkMicMethod,
"WebcastLinkerContributeMessage": DouyinWebSocketCrawler.WebcastLinkerContributeMessage,
"WebcastEmojiChatMessage": DouyinWebSocketCrawler.WebcastEmojiChatMessage,
"WebcastScreenChatMessage": DouyinWebSocketCrawler.WebcastScreenChatMessage,
"WebcastRoomDataSyncMessage": DouyinWebSocketCrawler.WebcastRoomDataSyncMessage,
"WebcastInRoomBannerMessage": DouyinWebSocketCrawler.WebcastInRoomBannerMessage,
"WebcastLinkMessage": DouyinWebSocketCrawler.WebcastLinkMessage,
"WebcastBattleTeamTaskMessage": DouyinWebSocketCrawler.WebcastBattleTeamTaskMessage,
"WebcastHotChatMessage": DouyinWebSocketCrawler.WebcastHotChatMessage,
# TODO: 以下消息类型暂未实现
# WebcastLinkMicArmiesMethod
# WebcastLinkmicPlayModeUpdateScoreMessage
# WebcastSandwichBorderMessage
# WebcastLuckyBoxTempStatusMessage
# WebcastLotteryEventMessage
# WebcastLotteryEventNewMessage
# WebcastDecorationUpdateMessage
# WebcastDecorationModifyMethod
# WebcastLinkSettingNotifyMessage
# WebcastLinkMicBattleMethod
# WebcastExhibitionChatMessage
}
async def main():
# 获取游客ttwid的user_unique_id,你可以通过TokenManager.gen_ttwid()生成新的游客ttwid
user = await DouyinHandler(kwargs).fetch_query_user()
# logger.info("游客user_unique_id:", user.user_unique_id)
# 通过此接口获取room_id,参数为live_id
room = await DouyinHandler(kwargs).fetch_user_live_videos("277303127629")
# logger.info("直播间ID:", room.room_id)
if room.live_status != 2:
logger.info("直播已结束")
return
# 通过该接口获取wss所需的cursor和internal_ext
live_im = await DouyinHandler(kwargs).fetch_live_im(
room_id=room.room_id, unique_id=user.user_unique_id
)
# logger.info(
# "直播间IM页码:", live_im.cursor, "直播间IM扩展:", live_im.internal_ext
# )
# 获取直播弹幕
await DouyinHandler(kwargs2).fetch_live_danmaku(
room_id=room.room_id,
user_unique_id=user.user_unique_id,
internal_ext=live_im.internal_ext,
cursor=live_im.cursor,
wss_callbacks=wss_callbacks,
)
if __name__ == "__main__":
asyncio.run(main())