Skip to content

Advanced Usage

💡 What is Advanced Usage?

Advanced usage refers to improving tool efficiency or solving specific issues using developer interfaces beyond the basic CLI mode.

🔍 Have Questions?

If you encounter any issues during development, please describe them in detail and ask in F2 Discussions, or contact [email protected] via email with your inquiry. I will respond as soon as possible.

Douyin

Batch Collection of Published Videos Beta

batch-posts

Important ❗❗❗

  1. Rate Limits: The Douyin platform enforces rate limits on frequent requests. High-concurrency requests may result in errors such as 429 or 444. Control the request rate to avoid IP bans.
  2. Network Stability: Ensure a stable network environment to prevent collection task interruptions, especially when processing a large number of videos.
  3. Device Performance: Ensure sufficient device performance to prevent lag caused by large-scale collection tasks.
  4. Concurrency Settings: When collecting multiple videos, increase the values of max_connections and max_tasks appropriately to support asynchronous concurrency. Otherwise, tasks may become blocked.
  5. Polling for New Videos: If you need to poll for video updates and collect them, refer to the changes in the provided code snippet.
🔗 Example Code
py
import asyncio
import traceback

from f2.apps.douyin.handler import DouyinHandler
from f2.apps.douyin.db import AsyncUserDB
from f2.apps.douyin.dl import DouyinDownloader
from f2.utils.conf_manager import ConfigManager
from f2.cli.cli_console import RichConsoleManager
from f2.log.logger import logger

# 全局配置参数,保护敏感信息
kwargs = {
    "headers": {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36 Edg/130.0.0.0",
        "Referer": "https://www.douyin.com/",
    },
    "proxies": {"http://": None, "https://": None},
    # 指定模式
    "mode": "post",
} | ConfigManager("conf/app.yaml").get_config("douyin")

# 实例化下载器和处理器
dydownloader = DouyinDownloader(kwargs)
dyhandler = DouyinHandler(kwargs)

# 批量采集的用户ID
sec_user_ids = [
    "MS4wLjABAAAAMn__d0rqdcuqb1lVJKapsl-ssFNQnayKwd136gpbScI",  # 呆瓜小匪🍉
    "MS4wLjABAAAABsG6uyCohhTUpE4DmmD-c2EsdLeFIvJic8yxbXEze9g",  # 攒钱隆地雷
    "MS4wLjABAAAA070w5X9l5I82jsuGY6ntBMGlOYp8yzp4-rH8X1qCEPw",  # 小贝
    "MS4wLjABAAAAXAw5z6oNfNF1VCjmYRz1nwicQ0lLoTcOPuALhpPLKK8",  # 林语惊
    "MS4wLjABAAAAfQnGjmLfe2oJazbA_nO9EpA9zpieuegM5wxVMqXF6SE",  # 朱之琳
    "MS4wLjABAAAAPLFrUMv2S-AFNXRP2JMzvmS9_Ow39fVweFGKNxXHPys",  # 深海蜜柚
    "MS4wLjABAAAAOQ9BYHDT-BJr2yHwwNNvdNszXteeSzjuH5nifQOFvglpxMY3nP_qrzsIsXtEymCu",  # 聪明羊羊
    "MS4wLjABAAAAWaeKn3y5ZGRXElUi0iP0VcIbDH8WeZ5RmPeA9FnBZG-DYx5VTRIt-x7fXUsirIHf",  # 热锅铲女
    "MS4wLjABAAAAaa8Lsk2sIhdvQBXbnn_HT2FDGATjE0vHEDF5QjKsgYl5A30WE5ZDsMRemAObStYR",  # 蓝羊羊不懒
    "MS4wLjABAAAAEg6xF6p_5K4zBdvR0LgjMXYmY6XoOR0kIWr-EiV51Mv3ui8_d1JJhdHwSScBNO2J",  # bb猪
    "MS4wLjABAAAAejNXYKfKBp_9q4Hy9SHS1BndE_Jw50LbVs7zolIiVaFqzpl1EOunD4FApGocolKP",  # 闪光波克尔
    "MS4wLjABAAAA3CrLwX6x5aHKOdnRrEwRssgnFnmQRGf6CX3RWXc9HYEjysZ2vcy7Px0MngbLBLfc",  # 糖心蛋
    "MS4wLjABAAAAogz57t45g20LdsrkxEfvcoR7c701ow9FE7rBbFbYxUZETSzJBdgK__vIWmTHRLL4",  # 金铁兽
    "MS4wLjABAAAAj8_YMsUZglM9qYJXuZwrbT3gEpQqiW7aF6d4jpdFE1xGyDind6FkrRoUd2OjkOkF",  # 谁吃了我的火龙果
]


async def download_post(sec_user_id: str):
    """
    下载单个用户的所有作品

    Args:
        sec_user_id (str): 用户ID
    """

    try:
        logger.debug(
            f"[bold green]开始下载用户ID:{sec_user_id} 的作品...[/bold green]"
        )
        async with AsyncUserDB("douyin_users.db", **kwargs) as audb:
            user_path = await dyhandler.get_or_add_user_data(kwargs, sec_user_id, audb)

        async for aweme_list in dyhandler.fetch_user_post_videos(
            sec_user_id=sec_user_id
        ):
            if not aweme_list:
                logger.info(
                    f"[bold yellow]无法获取用户作品信息:[/bold yellow] {sec_user_id}"
                )
                return

            await dydownloader.create_download_tasks(
                kwargs, aweme_list._to_list(), user_path
            )

        logger.info(f"[bold green]用户ID:{sec_user_id} 作品下载完成。[/bold green]")
    except Exception as e:
        logger.error(f"[bold red]用户ID:{sec_user_id} 下载失败:{e}[/bold red]")


async def main():
    """
    主函数,批量启动作品下载任务
    """
    logger.info("[bold blue]开始批量下载多个用户的作品[/bold blue]")

    semaphore = asyncio.Semaphore(kwargs.get("max_tasks", 5))

    async def limited_download(sec_user_id):
        async with semaphore:
            # await download_post(sec_user_id) #
            # 每小时检查一次作品更新状态 #
            while True:  
                await download_post(sec_user_id)  
                await asyncio.sleep(1 * 60 * 60)  

    # 使用RichConsoleManager管理进度条
    with RichConsoleManager().progress:
        tasks = [
            asyncio.create_task(limited_download(sec_user_id))
            for sec_user_id in sec_user_ids
        ]
        await asyncio.gather(*tasks)


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        logger.info("[bold yellow]程序已手动停止[/bold yellow]")
    except Exception as e:
        logger.error(f"[bold red]程序运行时出现异常: {e}[/bold red]")
        logger.error(traceback.format_exc())

Batch Collection of Live Streams Beta

batch-lives

Important ❗❗❗

  1. Account Restrictions: Douyin restricts the same account from entering the same livestream room simultaneously. When collecting livestreams with a logged-in account, you can only watch the stream after starting the collection task.
  2. Bypassing Restrictions with Guest Accounts: You can bypass the above restriction using a guest account. Refer to methods for generating mstoken and ttwid, or see the relevant code snippet in Livestream Danmaku Forwarding.
  3. Network Stability: Ensure a stable network environment to prevent task interruptions.
  4. Device Performance: Ensure sufficient device performance to prevent lag from large-scale collection tasks.
  5. Concurrency Settings: When collecting multiple livestreams, increase max_connections and max_tasks appropriately to support asynchronous concurrency. Otherwise, tasks may become blocked.
  6. Polling for Live Streams: If you need to poll for live stream status updates and collect them, refer to the changes in the provided code snippet.
🔗 Example Code
py
import asyncio
import traceback

from f2.apps.douyin.handler import DouyinHandler
from f2.apps.douyin.db import AsyncUserDB
from f2.apps.douyin.dl import DouyinDownloader
from f2.utils.conf_manager import ConfigManager
from f2.cli.cli_console import RichConsoleManager
from f2.log.logger import logger

# 全局配置参数,保护敏感信息
kwargs = {
    "headers": {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36 Edg/130.0.0.0",
        "Referer": "https://www.douyin.com/",
    },
    "proxies": {"http://": None, "https://": None},
    # 指定模式
    "mode": "live",
} | ConfigManager("conf/app.yaml").get_config("douyin")

# 实例化下载器和处理器
dydownloader = DouyinDownloader(kwargs)
dyhandler = DouyinHandler(kwargs)

# 批量采集的直播间ID,如果需要填写房间ID则使用fetch_user_live_videos_by_room_id方法
webcast_ids = [
    "10359270066",  # 清崽
    "205048140143",  # 偷星九月天
    "13819501559",  # QQ清
    "422057730070",  # 丫丫br
]


async def download_live_stream(
    webcast_id: str,
):
    """
    下载单个直播间的直播流,直到流断开

    Args:
        webcast_id (str): 直播间ID
    """
    try:
        live = await dyhandler.fetch_user_live_videos(webcast_id=webcast_id)

        if not live:
            logger.info(f"[bold yellow]无法获取直播间信息:[/bold yellow] {webcast_id}")
            return

        if live.live_status != 2:
            # 直播间未开播,跳过下载
            logger.info(
                f"[bold cyan]直播间ID:{webcast_id} 当前未开播,跳过...[/bold cyan]"
            )
            return

        async with AsyncUserDB("douyin_users.db") as audb:
            user_path = await dyhandler.get_or_add_user_data(
                kwargs, live.sec_user_id, audb
            )

        logger.debug(
            f"[bold green]开始下载直播间ID:{webcast_id} 的直播流...[/bold green]"
        )
        await dydownloader.create_stream_tasks(kwargs, live._to_dict(), user_path)
        logger.info(
            f"[bold green]直播间ID:{webcast_id} 直播流已结束,下载完成。[/bold green]"
        )

    except Exception as e:
        logger.error(f"[bold red]直播间ID:{webcast_id} 下载失败:{e}[/bold red]")


async def main():
    """
    主函数,批量启动直播下载任务
    """
    logger.info("[bold blue]开始批量下载多个直播间的直播流[/bold blue]")

    semaphore = asyncio.Semaphore(kwargs.get("max_tasks", 5))

    async def limited_download(webcast_id):
        async with semaphore:
            # await download_live_stream(webcast_id) #
            # 每分钟检查一次直播状态 #
            while True:  
                await download_live_stream(webcast_id)  
                await asyncio.sleep(1 * 60)  

    # 使用RichConsoleManager管理进度条
    with RichConsoleManager().progress:
        tasks = [
            asyncio.create_task(limited_download(webcast_id))
            for webcast_id in webcast_ids
        ]
        await asyncio.gather(*tasks)


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        logger.info("[bold yellow]程序已手动停止[/bold yellow]")
    except Exception as e:
        logger.error(f"[bold red]程序运行时出现异常: {e}[/bold red]")
        logger.error(traceback.format_exc())

Livestream Danmaku Forwarding

wss-connect

Important ❗❗❗

  1. Account Restrictions: Douyin restricts the same account from entering the same livestream room simultaneously. To bypass this, generate a guest account using ttwid.
  2. Network Stability: Ensure a stable network environment to prevent task interruptions.
  3. Device Performance: Ensure sufficient device performance to prevent lag from large-scale collection tasks.
  4. Concurrency Settings: When forwarding danmaku (chat messages) from multiple livestreams, use separate WSS connections to prevent message mix-ups and blockages.
🔗 Example Code
py
import asyncio

from f2.apps.douyin.crawler import DouyinWebSocketCrawler
from f2.apps.douyin.handler import DouyinHandler
from f2.apps.douyin.utils import TokenManager  
from f2.log.logger import logger


kwargs = {
    "headers": {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36 Edg/130.0.0.0",
        "Referer": "https://www.douyin.com/",
        "Content-Type": "application/protobuffer;",
    },
    "proxies": {"http://": None, "https://": None},
    "timeout": 10,
    # 游客cookie即可,需要注意每次生成的ttwid作为用户标识只可在一个直播间使用,不可多个直播间同时使用
    # 使用TokenManager.gen_ttwid()即可生成新的游客ttwid
    # "cookie": "GUEST_COOKIE_HERE",  #
    "cookie": f"ttwid={TokenManager.gen_ttwid()}; __live_version__=%221.1.2.6631%22; live_use_vvc=%22false%22;",  
}


kwargs2 = {
    "headers": {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36 Edg/130.0.0.0",
        "Upgrade": "websocket",
        "Connection": "Upgrade",
    },
    "proxies": {"http://": None, "https://": None},
    "timeout": 10,
    # 是否在终端显示弹幕消息
    "show_message": True,
    # 不需要填写cookie
    "cookie": "",
}

wss_callbacks = {
    "WebcastRoomMessage": DouyinWebSocketCrawler.WebcastRoomMessage,
    "WebcastLikeMessage": DouyinWebSocketCrawler.WebcastLikeMessage,
    "WebcastMemberMessage": DouyinWebSocketCrawler.WebcastMemberMessage,
    "WebcastChatMessage": DouyinWebSocketCrawler.WebcastChatMessage,
    "WebcastGiftMessage": DouyinWebSocketCrawler.WebcastGiftMessage,
    "WebcastSocialMessage": DouyinWebSocketCrawler.WebcastSocialMessage,
    "WebcastRoomUserSeqMessage": DouyinWebSocketCrawler.WebcastRoomUserSeqMessage,
    "WebcastUpdateFanTicketMessage": DouyinWebSocketCrawler.WebcastUpdateFanTicketMessage,
    "WebcastCommonTextMessage": DouyinWebSocketCrawler.WebcastCommonTextMessage,
    "WebcastMatchAgainstScoreMessage": DouyinWebSocketCrawler.WebcastMatchAgainstScoreMessage,
    "WebcastEcomFansClubMessage": DouyinWebSocketCrawler.WebcastEcomFansClubMessage,
    "WebcastRanklistHourEntranceMessage": DouyinWebSocketCrawler.WebcastRanklistHourEntranceMessage,
    "WebcastRoomStatsMessage": DouyinWebSocketCrawler.WebcastRoomStatsMessage,
    "WebcastLiveShoppingMessage": DouyinWebSocketCrawler.WebcastLiveShoppingMessage,
    "WebcastLiveEcomGeneralMessage": DouyinWebSocketCrawler.WebcastLiveEcomGeneralMessage,
    "WebcastProductChangeMessage": DouyinWebSocketCrawler.WebcastProductChangeMessage,
    "WebcastRoomStreamAdaptationMessage": DouyinWebSocketCrawler.WebcastRoomStreamAdaptationMessage,
    "WebcastNotifyEffectMessage": DouyinWebSocketCrawler.WebcastNotifyEffectMessage,
    "WebcastLightGiftMessage": DouyinWebSocketCrawler.WebcastLightGiftMessage,
    "WebcastProfitInteractionScoreMessage": DouyinWebSocketCrawler.WebcastProfitInteractionScoreMessage,
    "WebcastRoomRankMessage": DouyinWebSocketCrawler.WebcastRoomRankMessage,
    "WebcastFansclubMessage": DouyinWebSocketCrawler.WebcastFansclubMessage,
    "WebcastHotRoomMessage": DouyinWebSocketCrawler.WebcastHotRoomMessage,
    "WebcastLinkMicMethod": DouyinWebSocketCrawler.WebcastLinkMicMethod,
    "LinkMicMethod": DouyinWebSocketCrawler.WebcastLinkMicMethod,
    "WebcastLinkerContributeMessage": DouyinWebSocketCrawler.WebcastLinkerContributeMessage,
    "WebcastEmojiChatMessage": DouyinWebSocketCrawler.WebcastEmojiChatMessage,
    "WebcastScreenChatMessage": DouyinWebSocketCrawler.WebcastScreenChatMessage,
    "WebcastRoomDataSyncMessage": DouyinWebSocketCrawler.WebcastRoomDataSyncMessage,
    "WebcastInRoomBannerMessage": DouyinWebSocketCrawler.WebcastInRoomBannerMessage,
    "WebcastLinkMessage": DouyinWebSocketCrawler.WebcastLinkMessage,
    "WebcastBattleTeamTaskMessage": DouyinWebSocketCrawler.WebcastBattleTeamTaskMessage,
    "WebcastHotChatMessage": DouyinWebSocketCrawler.WebcastHotChatMessage,
    # TODO: 以下消息类型暂未实现
    # WebcastLinkMicArmiesMethod
    # WebcastLinkmicPlayModeUpdateScoreMessage
    # WebcastSandwichBorderMessage
    # WebcastLuckyBoxTempStatusMessage
    # WebcastLotteryEventMessage
    # WebcastLotteryEventNewMessage
    # WebcastDecorationUpdateMessage
    # WebcastDecorationModifyMethod
    # WebcastLinkSettingNotifyMessage
    # WebcastLinkMicBattleMethod
    # WebcastExhibitionChatMessage
}


async def main():
    # 获取游客ttwid的user_unique_id,你可以通过TokenManager.gen_ttwid()生成新的游客ttwid
    user = await DouyinHandler(kwargs).fetch_query_user()
    # logger.info("游客user_unique_id:", user.user_unique_id)

    # 通过此接口获取room_id,参数为live_id
    room = await DouyinHandler(kwargs).fetch_user_live_videos("277303127629")
    # logger.info("直播间ID:", room.room_id)

    if room.live_status != 2:
        logger.info("直播已结束")
        return

    # 通过该接口获取wss所需的cursor和internal_ext
    live_im = await DouyinHandler(kwargs).fetch_live_im(
        room_id=room.room_id, unique_id=user.user_unique_id
    )
    # logger.info(
    #     "直播间IM页码:", live_im.cursor, "直播间IM扩展:", live_im.internal_ext
    # )

    # 获取直播弹幕
    await DouyinHandler(kwargs2).fetch_live_danmaku(
        room_id=room.room_id,
        user_unique_id=user.user_unique_id,
        internal_ext=live_im.internal_ext,
        cursor=live_im.cursor,
        wss_callbacks=wss_callbacks,
    )


if __name__ == "__main__":
    asyncio.run(main())

Released under the Apache-2.0 license.