Skip to content

进阶用法

💡 什么是进阶用法?

进阶用法是指在 CLI 模式的基础上,通过进阶开发者接口的方法,提高工具的使用效率,或者解决一些特殊问题。

🔍 欢迎提问

如果您在开发过程中遇到任何疑问,请详细描述并在 F2 Discussions 提问,或通过邮件联系 [email protected],描述您的来意,我将尽快为您解答。

Douyin

批量采集发布视频 Beta

batch-posts

重要 ❗❗❗

  1. 速率限制:抖音平台对于频繁请求有一定的速率限制,高并发的请求可能会导致请求失败出现 429444 等错误码。请自行控制请求速率,避免被封禁 IP。
  2. 网络稳定性:请确保网络环境稳定,否则可能导致采集任务中断,特别是在作品数量较多的情况下。
  3. 设备性能:请确保设备性能足够,避免因大量采集任务导致设备卡顿。
  4. 并发设置:如需采集多个作品,请适当增加 max_connectionsmax_tasks 参数值,以满足异步并发需求,否则可能出现任务阻塞的情况。
  5. 轮询更新作品:若需轮询检测作品状态并进行采集,请参考代码片段中的变化部分。
🔗 示例代码
py
import asyncio
import traceback

from f2.apps.douyin.handler import DouyinHandler
from f2.apps.douyin.db import AsyncUserDB
from f2.apps.douyin.dl import DouyinDownloader
from f2.utils.conf_manager import ConfigManager
from f2.cli.cli_console import RichConsoleManager
from f2.log.logger import logger

# 全局配置参数,保护敏感信息
kwargs = {
    "headers": {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36 Edg/130.0.0.0",
        "Referer": "https://www.douyin.com/",
    },
    "proxies": {"http://": None, "https://": None},
    # 指定模式
    "mode": "post",
} | ConfigManager("conf/app.yaml").get_config("douyin")

# 实例化下载器和处理器
dydownloader = DouyinDownloader(kwargs)
dyhandler = DouyinHandler(kwargs)

# 批量采集的用户ID
sec_user_ids = [
    "MS4wLjABAAAAMn__d0rqdcuqb1lVJKapsl-ssFNQnayKwd136gpbScI",  # 呆瓜小匪🍉
    "MS4wLjABAAAABsG6uyCohhTUpE4DmmD-c2EsdLeFIvJic8yxbXEze9g",  # 攒钱隆地雷
    "MS4wLjABAAAA070w5X9l5I82jsuGY6ntBMGlOYp8yzp4-rH8X1qCEPw",  # 小贝
    "MS4wLjABAAAAXAw5z6oNfNF1VCjmYRz1nwicQ0lLoTcOPuALhpPLKK8",  # 林语惊
    "MS4wLjABAAAAfQnGjmLfe2oJazbA_nO9EpA9zpieuegM5wxVMqXF6SE",  # 朱之琳
    "MS4wLjABAAAAPLFrUMv2S-AFNXRP2JMzvmS9_Ow39fVweFGKNxXHPys",  # 深海蜜柚
    "MS4wLjABAAAAOQ9BYHDT-BJr2yHwwNNvdNszXteeSzjuH5nifQOFvglpxMY3nP_qrzsIsXtEymCu",  # 聪明羊羊
    "MS4wLjABAAAAWaeKn3y5ZGRXElUi0iP0VcIbDH8WeZ5RmPeA9FnBZG-DYx5VTRIt-x7fXUsirIHf",  # 热锅铲女
    "MS4wLjABAAAAaa8Lsk2sIhdvQBXbnn_HT2FDGATjE0vHEDF5QjKsgYl5A30WE5ZDsMRemAObStYR",  # 蓝羊羊不懒
    "MS4wLjABAAAAEg6xF6p_5K4zBdvR0LgjMXYmY6XoOR0kIWr-EiV51Mv3ui8_d1JJhdHwSScBNO2J",  # bb猪
    "MS4wLjABAAAAejNXYKfKBp_9q4Hy9SHS1BndE_Jw50LbVs7zolIiVaFqzpl1EOunD4FApGocolKP",  # 闪光波克尔
    "MS4wLjABAAAA3CrLwX6x5aHKOdnRrEwRssgnFnmQRGf6CX3RWXc9HYEjysZ2vcy7Px0MngbLBLfc",  # 糖心蛋
    "MS4wLjABAAAAogz57t45g20LdsrkxEfvcoR7c701ow9FE7rBbFbYxUZETSzJBdgK__vIWmTHRLL4",  # 金铁兽
    "MS4wLjABAAAAj8_YMsUZglM9qYJXuZwrbT3gEpQqiW7aF6d4jpdFE1xGyDind6FkrRoUd2OjkOkF",  # 谁吃了我的火龙果
]


async def download_post(sec_user_id: str):
    """
    下载单个用户的所有作品

    Args:
        sec_user_id (str): 用户ID
    """

    try:
        logger.debug(
            f"[bold green]开始下载用户ID:{sec_user_id} 的作品...[/bold green]"
        )
        async with AsyncUserDB("douyin_users.db", **kwargs) as audb:
            user_path = await dyhandler.get_or_add_user_data(kwargs, sec_user_id, audb)

        async for aweme_list in dyhandler.fetch_user_post_videos(
            sec_user_id=sec_user_id
        ):
            if not aweme_list:
                logger.info(
                    f"[bold yellow]无法获取用户作品信息:[/bold yellow] {sec_user_id}"
                )
                return

            await dydownloader.create_download_tasks(
                kwargs, aweme_list._to_list(), user_path
            )

        logger.info(f"[bold green]用户ID:{sec_user_id} 作品下载完成。[/bold green]")
    except Exception as e:
        logger.error(f"[bold red]用户ID:{sec_user_id} 下载失败:{e}[/bold red]")


async def main():
    """
    主函数,批量启动作品下载任务
    """
    logger.info("[bold blue]开始批量下载多个用户的作品[/bold blue]")

    semaphore = asyncio.Semaphore(kwargs.get("max_tasks", 5))

    async def limited_download(sec_user_id):
        async with semaphore:
            # await download_post(sec_user_id) #
            # 每小时检查一次作品更新状态 #
            while True:  
                await download_post(sec_user_id)  
                await asyncio.sleep(1 * 60 * 60)  

    # 使用RichConsoleManager管理进度条
    with RichConsoleManager().progress:
        tasks = [
            asyncio.create_task(limited_download(sec_user_id))
            for sec_user_id in sec_user_ids
        ]
        await asyncio.gather(*tasks)


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        logger.info("[bold yellow]程序已手动停止[/bold yellow]")
    except Exception as e:
        logger.error(f"[bold red]程序运行时出现异常: {e}[/bold red]")
        logger.error(traceback.format_exc())

批量采集直播流 Beta

batch-lives

重要 ❗❗❗

  1. 账号限制:抖音平台限制同一账号无法同时进入同一直播间。因此,使用登录账号采集直播流时,仅可在采集任务启动后继续观看该直播。
  2. 游客账号绕过:可通过游客账号绕过上述限制。有关生成游客账号的方法,请参考 mstokenttwid直播弹幕转发 的相关代码片段。
  3. 网络稳定性:请确保网络环境稳定,否则可能导致采集任务中断。
  4. 设备性能:请确保设备性能足够,避免因大量采集任务导致设备卡顿。
  5. 并发设置:如需采集多个直播,请适当增加 max_connectionsmax_tasks 参数值,以满足异步并发需求,否则可能出现任务阻塞的情况。
  6. 轮询开播采集:若需轮询检测开播状态并进行采集,请参考代码片段中的变化部分。
🔗 示例代码
py
import asyncio
import traceback

from f2.apps.douyin.handler import DouyinHandler
from f2.apps.douyin.db import AsyncUserDB
from f2.apps.douyin.dl import DouyinDownloader
from f2.utils.conf_manager import ConfigManager
from f2.cli.cli_console import RichConsoleManager
from f2.log.logger import logger

# 全局配置参数,保护敏感信息
kwargs = {
    "headers": {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36 Edg/130.0.0.0",
        "Referer": "https://www.douyin.com/",
    },
    "proxies": {"http://": None, "https://": None},
    # 指定模式
    "mode": "live",
} | ConfigManager("conf/app.yaml").get_config("douyin")

# 实例化下载器和处理器
dydownloader = DouyinDownloader(kwargs)
dyhandler = DouyinHandler(kwargs)

# 批量采集的直播间ID,如果需要填写房间ID则使用fetch_user_live_videos_by_room_id方法
webcast_ids = [
    "10359270066",  # 清崽
    "205048140143",  # 偷星九月天
    "13819501559",  # QQ清
    "422057730070",  # 丫丫br
]


async def download_live_stream(
    webcast_id: str,
):
    """
    下载单个直播间的直播流,直到流断开

    Args:
        webcast_id (str): 直播间ID
    """
    try:
        live = await dyhandler.fetch_user_live_videos(webcast_id=webcast_id)

        if not live:
            logger.info(f"[bold yellow]无法获取直播间信息:[/bold yellow] {webcast_id}")
            return

        if live.live_status != 2:
            # 直播间未开播,跳过下载
            logger.info(
                f"[bold cyan]直播间ID:{webcast_id} 当前未开播,跳过...[/bold cyan]"
            )
            return

        async with AsyncUserDB("douyin_users.db") as audb:
            user_path = await dyhandler.get_or_add_user_data(
                kwargs, live.sec_user_id, audb
            )

        logger.debug(
            f"[bold green]开始下载直播间ID:{webcast_id} 的直播流...[/bold green]"
        )
        await dydownloader.create_stream_tasks(kwargs, live._to_dict(), user_path)
        logger.info(
            f"[bold green]直播间ID:{webcast_id} 直播流已结束,下载完成。[/bold green]"
        )

    except Exception as e:
        logger.error(f"[bold red]直播间ID:{webcast_id} 下载失败:{e}[/bold red]")


async def main():
    """
    主函数,批量启动直播下载任务
    """
    logger.info("[bold blue]开始批量下载多个直播间的直播流[/bold blue]")

    semaphore = asyncio.Semaphore(kwargs.get("max_tasks", 5))

    async def limited_download(webcast_id):
        async with semaphore:
            # await download_live_stream(webcast_id) #
            # 每分钟检查一次直播状态 #
            while True:  
                await download_live_stream(webcast_id)  
                await asyncio.sleep(1 * 60)  

    # 使用RichConsoleManager管理进度条
    with RichConsoleManager().progress:
        tasks = [
            asyncio.create_task(limited_download(webcast_id))
            for webcast_id in webcast_ids
        ]
        await asyncio.gather(*tasks)


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        logger.info("[bold yellow]程序已手动停止[/bold yellow]")
    except Exception as e:
        logger.error(f"[bold red]程序运行时出现异常: {e}[/bold red]")
        logger.error(traceback.format_exc())

直播弹幕转发

wss-connect

重要 ❗❗❗

  1. 账号限制:抖音平台限制同一账号无法同时进入同一直播间。因此,需要使用生成 ttwid 的方法生成游客账号,绕过上述限制。
  2. 网络稳定性:请确保网络环境稳定,否则可能导致采集任务中断。
  3. 设备性能:请确保设备性能足够,避免因大量采集任务导致设备卡顿。
  4. 并发设置:多个直播间弹幕转发时,请使用不同的 WSS 配置连接,以避免弹幕混乱和阻塞。
🔗 示例代码
py
import asyncio

from f2.apps.douyin.crawler import DouyinWebSocketCrawler
from f2.apps.douyin.handler import DouyinHandler
from f2.apps.douyin.utils import TokenManager  
from f2.log.logger import logger


kwargs = {
    "headers": {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36 Edg/130.0.0.0",
        "Referer": "https://www.douyin.com/",
        "Content-Type": "application/protobuffer;",
    },
    "proxies": {"http://": None, "https://": None},
    "timeout": 10,
    # 游客cookie即可,需要注意每次生成的ttwid作为用户标识只可在一个直播间使用,不可多个直播间同时使用
    # 使用TokenManager.gen_ttwid()即可生成新的游客ttwid
    # "cookie": "GUEST_COOKIE_HERE",  #
    "cookie": f"ttwid={TokenManager.gen_ttwid()}; __live_version__=%221.1.2.6631%22; live_use_vvc=%22false%22;",  
}


kwargs2 = {
    "headers": {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36 Edg/130.0.0.0",
        "Upgrade": "websocket",
        "Connection": "Upgrade",
    },
    "proxies": {"http://": None, "https://": None},
    "timeout": 10,
    # 是否在终端显示弹幕消息
    "show_message": True,
    # 不需要填写cookie
    "cookie": "",
}

wss_callbacks = {
    "WebcastRoomMessage": DouyinWebSocketCrawler.WebcastRoomMessage,
    "WebcastLikeMessage": DouyinWebSocketCrawler.WebcastLikeMessage,
    "WebcastMemberMessage": DouyinWebSocketCrawler.WebcastMemberMessage,
    "WebcastChatMessage": DouyinWebSocketCrawler.WebcastChatMessage,
    "WebcastGiftMessage": DouyinWebSocketCrawler.WebcastGiftMessage,
    "WebcastSocialMessage": DouyinWebSocketCrawler.WebcastSocialMessage,
    "WebcastRoomUserSeqMessage": DouyinWebSocketCrawler.WebcastRoomUserSeqMessage,
    "WebcastUpdateFanTicketMessage": DouyinWebSocketCrawler.WebcastUpdateFanTicketMessage,
    "WebcastCommonTextMessage": DouyinWebSocketCrawler.WebcastCommonTextMessage,
    "WebcastMatchAgainstScoreMessage": DouyinWebSocketCrawler.WebcastMatchAgainstScoreMessage,
    "WebcastEcomFansClubMessage": DouyinWebSocketCrawler.WebcastEcomFansClubMessage,
    "WebcastRanklistHourEntranceMessage": DouyinWebSocketCrawler.WebcastRanklistHourEntranceMessage,
    "WebcastRoomStatsMessage": DouyinWebSocketCrawler.WebcastRoomStatsMessage,
    "WebcastLiveShoppingMessage": DouyinWebSocketCrawler.WebcastLiveShoppingMessage,
    "WebcastLiveEcomGeneralMessage": DouyinWebSocketCrawler.WebcastLiveEcomGeneralMessage,
    "WebcastProductChangeMessage": DouyinWebSocketCrawler.WebcastProductChangeMessage,
    "WebcastRoomStreamAdaptationMessage": DouyinWebSocketCrawler.WebcastRoomStreamAdaptationMessage,
    "WebcastNotifyEffectMessage": DouyinWebSocketCrawler.WebcastNotifyEffectMessage,
    "WebcastLightGiftMessage": DouyinWebSocketCrawler.WebcastLightGiftMessage,
    "WebcastProfitInteractionScoreMessage": DouyinWebSocketCrawler.WebcastProfitInteractionScoreMessage,
    "WebcastRoomRankMessage": DouyinWebSocketCrawler.WebcastRoomRankMessage,
    "WebcastFansclubMessage": DouyinWebSocketCrawler.WebcastFansclubMessage,
    "WebcastHotRoomMessage": DouyinWebSocketCrawler.WebcastHotRoomMessage,
    "WebcastLinkMicMethod": DouyinWebSocketCrawler.WebcastLinkMicMethod,
    "LinkMicMethod": DouyinWebSocketCrawler.WebcastLinkMicMethod,
    "WebcastLinkerContributeMessage": DouyinWebSocketCrawler.WebcastLinkerContributeMessage,
    "WebcastEmojiChatMessage": DouyinWebSocketCrawler.WebcastEmojiChatMessage,
    "WebcastScreenChatMessage": DouyinWebSocketCrawler.WebcastScreenChatMessage,
    "WebcastRoomDataSyncMessage": DouyinWebSocketCrawler.WebcastRoomDataSyncMessage,
    "WebcastInRoomBannerMessage": DouyinWebSocketCrawler.WebcastInRoomBannerMessage,
    "WebcastLinkMessage": DouyinWebSocketCrawler.WebcastLinkMessage,
    "WebcastBattleTeamTaskMessage": DouyinWebSocketCrawler.WebcastBattleTeamTaskMessage,
    "WebcastHotChatMessage": DouyinWebSocketCrawler.WebcastHotChatMessage,
    # TODO: 以下消息类型暂未实现
    # WebcastLinkMicArmiesMethod
    # WebcastLinkmicPlayModeUpdateScoreMessage
    # WebcastSandwichBorderMessage
    # WebcastLuckyBoxTempStatusMessage
    # WebcastLotteryEventMessage
    # WebcastLotteryEventNewMessage
    # WebcastDecorationUpdateMessage
    # WebcastDecorationModifyMethod
    # WebcastLinkSettingNotifyMessage
    # WebcastLinkMicBattleMethod
    # WebcastExhibitionChatMessage
}


async def main():
    # 获取游客ttwid的user_unique_id,你可以通过TokenManager.gen_ttwid()生成新的游客ttwid
    user = await DouyinHandler(kwargs).fetch_query_user()
    # logger.info("游客user_unique_id:", user.user_unique_id)

    # 通过此接口获取room_id,参数为live_id
    room = await DouyinHandler(kwargs).fetch_user_live_videos("277303127629")
    # logger.info("直播间ID:", room.room_id)

    if room.live_status != 2:
        logger.info("直播已结束")
        return

    # 通过该接口获取wss所需的cursor和internal_ext
    live_im = await DouyinHandler(kwargs).fetch_live_im(
        room_id=room.room_id, unique_id=user.user_unique_id
    )
    # logger.info(
    #     "直播间IM页码:", live_im.cursor, "直播间IM扩展:", live_im.internal_ext
    # )

    # 获取直播弹幕
    await DouyinHandler(kwargs2).fetch_live_danmaku(
        room_id=room.room_id,
        user_unique_id=user.user_unique_id,
        internal_ext=live_im.internal_ext,
        cursor=live_im.cursor,
        wss_callbacks=wss_callbacks,
    )


if __name__ == "__main__":
    asyncio.run(main())

Released under the Apache-2.0 license.