MCP是手腳,Skill是靈魂· 第三篇:樂高式Agent

作者:努力撞蘑菇AI
日期:2026年5月27日 上午8:31
來源:WeChat 原文

整理版優先睇

速讀 5 個重點 高亮

鏈式調用係Agent設計嘅靈魂:用樂高積木比喻教你把Skill拆細再組合

整理版摘要

呢篇文章係「MCP係手腳,Skill係靈魂」系列嘅第三篇,作者用樂高積木比喻嚟解釋點解Agent Skill要行鏈式調用。佢認為,單體Skill就好似預先黐實曬嘅固定玩具,一壞就冇得救,而鏈式調用就似樂高積木——每塊積木(Skill)只負責一件具體事,透過標準化接口自由組合。

作者先拆解單體Skill嘅三大致命弱點:上下文窗口壓力、調試維護地獄、複用率接近零。然後用一個完整案例——「每日市場簡報Agent」——由三個Skill組成:數據採集器、分析引擎、報告生成器,逐個拆解設計細節、接口契約(用Pydantic定義)、邊界條件,仲有失敗處理嘅斷路器模式。

最後佢展示咗完整嘅Python實現,證明呢種模塊化設計唔單止令Agent更可靠,仲方便獨立開發同測試。整體結論係:Agent設計嘅靈魂唔係單一強大嘅Skill,而係一套清晰嘅接口協定同故障隔離機制,令唔同Skill可以好似樂高積木咁靈活拼砌。

  • 鏈式調用通過職責單一化,解決單體Skill上下文壓力大、難調試、難複用嘅問題。
  • Pydantic定義接口契約,確保上下游Skill數據格式一致,好似樂高積木嘅標準化卡口。
  • 鏈式調用與單體Skill嘅最大差異:每個Skill只做一件事,失敗時唔會級聯崩潰。
  • 借鑑軟件工程嘅單一職責原則同斷路器模式,令Agent設計具備工業級可靠性。
  • 設計新Agent時,優先拆分工責、定義清楚輸入輸出格式,並為每個Skill獨立配置斷路器。
整理重點

點解要鏈式調用?

好多人一開始會想整一個「大而全」嘅Super Skill,一次過做曬數據採集、分析同報告。作者話呢種做法係固定玩具,唔係樂高。單體Skill有三個致命弱點:第一,上下文窗口壓力——一個Skill塞太多任務,好快爆context,模型會遺忘早期資訊;第二,調試同維護地獄——輸出唔啱時,好難知邊個環節出錯;第三,複用率接近零——邏輯強耦合,帶唔走。

鏈式調用嘅「卡口」就係接口契約——上游輸出乜格式,下游就預期接收乜格式。呢個格式一旦約定好,兩個Skill可以獨立開發、獨立測試、獨立維護。

整理重點

案例拆解:每日市場簡報Agent

作者用一個真實案例示範:一個每日朝早8點自動運行嘅Agent,做三件事——市場數據採集器(Skill A)、數據分析引擎(Skill B)、簡報生成器(Skill C)。每個Skill嘅輸出係下一個嘅輸入,傳遞嘅係結構嚴謹嘅數據對象,唔係模糊嘅自然語言。

Skill A係「原料供應商」,只負責採集同清洗,唔做分析。佢有清晰嘅輸入(日期、數據源列表、config)同輸出(狀態、時間戳、結構化嘅股票、新聞、競爭對手數據)。邊界條件包括:數據源超時就記錄失敗,繼續處理其他;所有數據源失敗就標記partial_failure,觸發降級。

Skill B係「思考者」,接收原始數據,分析出市場信號、趨勢方向同風險提示。佢嘅輸出用MarketInsightTrendAnalysis等模型定義,仲有數據質量分數同分析置信度。如果數據質量低過0.6,會自動降低信心並加風險提示。

Skill C係「作家」,將分析結果生成Markdown格式嘅每日簡報,包含市場總覽、風險提示、關鍵洞察同機會信號。最終輸出可以直接發送畀管理層。

整理重點

接口設計同斷路器:成敗關鍵

接口設計係鏈式調用成敗嘅核心。作者用樂高比喻:標準樂高同得寶樂高(DUPLO)嘅卡口尺寸唔同,勉強拼埋會好唔穩。接口契約(Interface Contract)明確規定上游保證輸出某種格式,下游假設接收該格式,中間有驗證層確保遵守。

鏈式調用最大風險係級聯失敗——Skill A壞咗,Skill B用垃圾數據分析,Skill C生成精美錯誤報告。解決方案係斷路器模式,有三種狀態:CLOSED(正常)、OPEN(斷開)、HALF_OPEN(探測)。當連續失敗次數超過閾值,斷路器就會打開,拒絕後續請求;一段時間後轉半開狀態,畀少量請求通過測試是否恢復。

作者為每個Skill配置獨立斷路器,例如Skill A失敗閾值3次,Skill B係2次,Skill C係5次。喺編排器入面,用CircuitBreaker.call()包住每個Skill嘅執行函數,如果斷路器打開就拋出CircuitOpenError,編排器可以根據錯誤決定係咪用降級方案。

整理重點

完整Python實現同失敗處理

作者展示咗三個Skill嘅核心實現。Skill A用asyncio.gather併發採集多個數據源,使用return_exceptions=True確保一個源失敗唔影響其他,並記錄失敗來源。Skill B根據數據質量分數動態調整分析置信度,當新聞為空時跳過情感分析,設定為中性。Skill C將分析結果轉成Markdown,包含表情符號同清晰結構。

程式內容 python
class MarketDataCollector:
 async def run(self, input_data: dict) -> SkillAOutput:
 stocks, stock_err = await self.collect_stocks(...)
 news, news_err = await self.collect_news(...)
 # 併發處理,唔讓一個失敗影響其他
 ...

編排器(Orchestrator)負責串聯三個Skill,每個Skill都經過斷路器保護。佢會檢查Skill A嘅輸出狀態,如果係partial_failure,數據質量分數會下降,但依然繼續後續步驟。整個流程有清晰嘅日誌記錄同錯誤處理。

呢種設計令Agent具備工業級可靠性:即使部分數據源失敗,依然可以產出有價值嘅簡報,而唔係全盤崩潰。

你有冇諗過,點解樂高積木可以風靡全球幾十年?

唔係因為某一塊積木有幾神奇。一塊2×4嘅基礎磚,單獨睇平平無奇——佢就係一塊膠。但當你將幾百塊、幾千塊積木組合埋一齊,卻可以砌出埃菲爾鐵塔、千年隼號飛船、甚至成個城市嘅微縮模型。

樂高嘅魔力唔在於積木本身,而在於標準化接口帶嚟嘅無限組合可能。

今日我哋講嘅Agent Skill設計,同樂高積木嘅哲學一模一樣。

單一個Skill就好似一塊樂高積木——佢處理一件具體嘅事,做好咗就交俾下一個。Skill A負責數據採集,輸出結構化數據;Skill B負責數據分析,將原始數據變成洞察;Skill C負責報告生成,將洞察變成可讀嘅文字。三個Skill頭尾相連,形成一條流水線,呢個就係「鏈式調用」。

今日我用一個完整嘅真實案例——「每日市場簡報Agent」——帶你拆解鏈式調用嘅每一個細節:Skill點樣設計、接口點樣對齊、失敗咗點處理,以及完整嘅Python實現。

圖片
樂高積木組合成複雜結構,象徵Skill鏈式調用嘅模塊化設計

一、點解單一個Skill唔夠用?

1.1 單Skill嘅能力邊界

喺我哋開始砌積木之前,先諗清楚一個問題:點解唔將所有邏輯塞曬入一個Skill?

好多人啱開始設計Agent嗰時,會有一種「大而全」嘅衝動——寫一個Super Skill,等佢又可以捉數據、又可以分析、又可以寫報告。聽落好方便,實際上係一場災難。

幻想嚇,你買咗一套樂高,但廠商話「我哋將所有零件都預先黐埋一齊,做咗一個完整嘅模型」——咁就唔係樂高了,嗰啲叫固定玩具。一旦某個部件壞咗,成個模型報廢;想換個造型?對唔住,做唔到。

單體Skill有三個致命弱點:

① 上下文窗口壓力LLM嘅上下文窗口係有限嘅。如果你俾一個Skill同時處理「抓取10個數據源」+「多維度分析」+「生成完整報告」,好易就將128K嘅上下文塞滿。模型會開始「遺忘」早期信息,輸出質量急劇下降。

② 調試同維護地獄當輸出唔符合預期嗰時,你點知係數據採集環節出咗問題,定係分析邏輯有偏差,定係生成格式唔啱?一個500行嘅Super Skill,排查起上嚟如同大海撈針。

③ 複用率為零你為呢個任務寫嘅數據分析邏輯,下次換個場景,一行代碼都帶唔走。因為佢同其他邏輯強耦合埋一齊,根本冇辦法單獨提取。

1.2 鏈式調用嘅核心價值

鏈式調用解決嘅核心問題只有一個:職責單一化

每個Skill只做一件事,將呢件事做到極致,然後將結果交俾下一個Skill。呢個背後有一個深刻嘅設計哲學,喺軟件工程裏面叫「單一職責原則(SRP)」,喺樂高世界裏面叫「標準化卡口」。

正正因為每塊樂高積木嘅卡口尺寸完全一致,你先可以自由組合。Skill鏈式調用嘅「卡口」就係接口契約——上游Skill輸出乜嘢格式,下游Skill就期望接收乜嘢格式。呢個格式一旦約定好,兩個Skill可以獨立開發、獨立測試、獨立維護。


二、案例解構:每日市場簡報Agent

2.1 整體架構一覽

「每日市場簡報Agent」係一個每日早上8點自動運行嘅Agent,佢完成三件事:

  1. 抓取數據:從多個數據源採集前一日嘅市場數據(股市、行業新聞、競品動態)
  2. 分析洞察:對原始數據進行多維度分析,提煉關鍵信號同趨勢
  3. 生成簡報:將分析結果寫成一份可以直接發送俾管理層嘅日報

對應嘅三個Skill:

[Skill A: 市場數據採集器]
         ↓ JSON數據包
[Skill B: 數據分析引擎]
         ↓ 分析報告結構體
[Skill C: 簡報生成器]
         ↓ Markdown簡報
     [發送/存儲]

成條流水線嘅關鍵在於:每個Skill嘅輸出,係下一個Skill嘅輸入。佢哋之間傳遞嘅唔係模糊嘅自然語言,而係結構嚴謹嘅數據對象——就好似樂高積木透過精確尺寸嘅卡口連接,而唔係用膠水隨意黐埋一齊。

2.2 Skill A:市場數據採集器

職責定義:呢個Skill係成條鏈路嘅「原料供應商」。佢嘅唯一工作係從外部世界採集數據,清洗成標準格式,然後交出去。佢唔做任何分析,唔發表任何觀點。

一個好嘅數據採集Skill,就好似樂高積木工廠裏面嘅原料車間——佢只管將膠粒壓製成符合規格嘅積木塊,唔關心呢啲積木最終會砌成乜嘢造型。

輸入格式

{
  "date""2026-05-21",           # 採集日期
  "sources": ["stock""news""competitor"],  # 數據源列表
  "config": {
    "stock_symbols": ["AAPL""GOOGL""MSFT"],
    "news_keywords": ["AI""大模型""Agent"],
    "competitor_list": ["competitor_a""competitor_b"]
  }
}

輸出格式

{
  "status""success",
"timestamp""2026-05-21T23:59:00Z",
"data": {
    "stocks": [
      {"symbol""AAPL""close"189.5"change_pct"1.2"volume"82000000},
      {"symbol""GOOGL""close"175.3"change_pct"-0.8"volume"25000000}
    ],
    "news": [
      {
        "title""OpenAI發佈GPT-5技術報告",
        "source""Reuters",
        "published_at""2026-05-21T14:30:00Z",
        "sentiment""positive",
        "relevance_score"0.95
      }
    ],
    "competitors": [
      {
        "name""competitor_a",
        "recent_actions": ["發佈新產品線""CEO接受採訪"],
        "social_mentions"1250
      }
    ]
  },
"metadata": {
    "total_sources"3,
    "success_count"3,
    "failed_sources": [],
    "execution_time_ms"3200
  }
}

邊界條件設計

  • 如果某個數據源超時(>10秒),記錄喺failed_sources中,繼續處理其他源,唔拋出異常
  • 如果所有數據源都失敗,status設為"partial_failure",觸發下游嘅降級處理
  • 股票數據缺失嗰時,用前一日數據填充,並喺metadata中標記"filled_with_previous": true

三、接口設計:積木嘅卡口尺寸

喺深入Skill B同Skill C之前,我哋需要重點講一個成日俾人忽視嘅關鍵——接口設計

呢個係成個鏈式調用成敗嘅核心。

3.1 點解接口設計咁關鍵?

返去樂高嘅比喻。你有冇試過將樂高積木同得寶積木(DUPLO)拼埋一齊?得寶係樂高嘅大號版,卡口尺寸係標準樂高嘅兩倍。理論上佢哋嚟自同一間公司,應該兼容啩?

答案係:只能單向兼容,而且好勉強。

呢個就係接口唔對齊嘅代價。喺Skill鏈式調用中,如果Skill A輸出嘅數據結構同Skill B期望嘅輸入結構唔一致,成條鏈路就會喺連接位斷裂。更差嘅係,呢種斷裂往往唔會即刻報錯,而係產生「垃圾進,垃圾出(GIGO)」嘅效果——Skill B接收咗錯誤格式嘅數據,勉強運行,輸出一堆冇意義嘅結果,然後Skill C將呢啲垃圾包裝成靚靚嘅報告發俾你老細。

接口契約(Interface Contract)係解決方案。佢明確規定:

  • 上游Skill保證輸出某種格式
  • 下游Skill假設接收嘅係呢種格式
  • 兩者之間有驗證層確保契約被遵守

3.2 用Pydantic定義接口契約

喺Python生態裏面,Pydantic係定義接口契約嘅最佳工具。佢令數據結構從「約定俗成嘅口頭承諾」變成「編譯時可驗證嘅代碼契約」。

from pydantic import BaseModel, Field, validator
from typing import List, Optional, Literal
from datetime import datetime

# ===== 接口契約定義 =====

class StockData(BaseModel):
    symbol: str
    close: float
    change_pct: float
    volume: int

class NewsItem(BaseModel):
    title: str
    source: str
    published_at: datetime
    sentiment: Literal["positive""negative""neutral"]
    relevance_score: float = Field(ge=0.0, le=1.0)

class CompetitorData(BaseModel):
    name: str
    recent_actions: List[str]
    social_mentions: int

class MarketRawData(BaseModel):
    stocks: List[StockData]
    news: List[NewsItem]
    competitors: List[CompetitorData]

class CollectionMetadata(BaseModel):
    total_sources: int
    success_count: int
    failed_sources: List[str] = []
    execution_time_ms: int
    filled_with_previous: bool = False

# Skill A -> Skill B 的接口契約
class SkillAOutput(BaseModel):
    status: Literal["success""partial_failure""failure"]
    timestamp: datetime
    data: MarketRawData
    metadata: CollectionMetadata
    
    @validator('data')
    def data_must_have_stocks(cls, v, values):
        if values.get('status') == 'success'andnot v.stocks:
            raise ValueError('成功狀態下股票數據不能為空')
        return v

呢段代碼做咗幾件關鍵嘅事:

  1. 每個字段都有明確嘅類型約束
  2. sentiment使用Literal限定只能係三個值之一,唔會出現「Positive」「POSITIVE」呢類大細寫混亂
  3. relevance_scoreField(ge=0.0, le=1.0)確保分數喺合法範圍內
  4. 自定義validator確保業務邏輯層面嘅契約

3.3 Skill B:數據分析引擎

職責定義:Skill B係成條鏈路嘅「思考者」。佢接收Skill A清洗好嘅原始數據,運用分析邏輯提煉出洞察——邊啲信號值得關注?市場喺往邊個方向走?競爭對手有乜嘢異動?

呢個Skill就好似樂高套裝裏面嘅核心部件——佢將各種小積木組合成有意義嘅結構單元,例如一個車輪組合、一個門窗套件。單獨嘅車輪冇意義,但當佢同車身、車軸組合埋一齊,先構成咗「汽車」呢個概念。

輸入:接收SkillAOutput,亦即係Skill A嘅完整輸出

核心處理邏輯

class MarketInsight(BaseModel):
    category: str
    signal: str
    confidence: float = Field(ge=0.0, le=1.0)
    supporting_data: List[str]
    action_suggestion: Optional[str] = None

class TrendAnalysis(BaseModel):
    direction: Literal["bullish""bearish""neutral"]
    strength: Literal["strong""moderate""weak"]
    key_drivers: List[str]

# Skill B -> Skill C 的接口契約
class SkillBOutput(BaseModel):
    analysis_date: str
    market_sentiment: Literal["positive""negative""neutral"]
    sentiment_score: float = Field(ge=-1.0, le=1.0)
    top_insights: List[MarketInsight]
    trend_analysis: TrendAnalysis
    risk_alerts: List[str]
    opportunities: List[str]
    data_quality_score: float  # 基於Skill A的metadata計算
    analysis_confidence: float  # 整體分析置信度

邊界條件設計

  • 如果輸入數據嘅data_quality_score低過0.6(超過40%嘅數據源失敗),降低analysis_confidence並在risk_alerts中標註數據唔完整
  • 新聞數據為空嗰時,跳過情感分析,market_sentiment設為"neutral",並註明原因
  • 分析引擎調用LLM超時嗰時,返回基於規則引擎嘅降級分析結果

四、完整嘅Python實現

而家我哋將所有積木拼埋一齊,展示完整嘅鏈式調用實現,包含編排邏輯同失敗處理。

4.1 三個Skill嘅核心實現

import asyncio
import logging
from datetime import datetime, date
from typing import Optional, Dict, Any
from pydantic import BaseModel

logger = logging.getLogger(__name__)

# ===== Skill A:市場數據採集器 =====

class MarketDataCollector:
    """Skill A: 市場數據採集器
    職責:從多數據源採集原始市場數據,輸出標準化JSON
    """

    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.timeout = 10# 單數據源超時時間(秒)
    
    asyncdef collect_stocks(self, symbols: list) -> tuple:
        """採集股票數據,失敗時返回空列表"""
        try:
            # 實際場景這裏調用股票API
            await asyncio.sleep(0.1)  # 模擬API調用
            return [
                StockData(symbol=s, close=150.0, change_pct=0.5, volume=1000000)
                for s in symbols
            ], None
        except Exception as e:
            logger.warning(f"股票數據採集失敗: {e}")
            return [], str(e)
    
    asyncdef collect_news(self, keywords: list) -> tuple:
        """採集新聞數據,失敗時返回空列表"""
        try:
            await asyncio.sleep(0.1)
            return [
                NewsItem(
                    title=f"關於{kw}的最新動態",
                    source="Reuters",
                    published_at=datetime.now(),
                    sentiment="positive",
                    relevance_score=0.8
                )
                for kw in keywords[:3]  # 限制條數
            ], None
        except Exception as e:
            logger.warning(f"新聞數據採集失敗: {e}")
            return [], str(e)
    
    asyncdef run(self, input_data: dict) -> SkillAOutput:
        """主執行方法,併發採集所有數據源"""
        target_date = input_data.get("date", str(date.today()))
        cfg = input_data.get("config", {})
        
        # 併發執行所有數據源採集
        stocks_task = self.collect_stocks(cfg.get("stock_symbols", []))
        news_task = self.collect_news(cfg.get("news_keywords", []))
        
        results = await asyncio.gather(
            stocks_task, news_task,
            return_exceptions=True# 關鍵:不讓一個失敗影響其他
        )
        
        stocks, stock_err = results[0ifnot isinstance(results[0], Exception) else ([], str(results[0]))
        news, news_err = results[1ifnot isinstance(results[1], Exception) else ([], str(results[1]))
        
        failed_sources = []
        if stock_err: failed_sources.append(f"stocks: {stock_err}")
        if news_err: failed_sources.append(f"news: {news_err}")
        
        success_count = 2 - len(failed_sources)
        status = "success"ifnot failed_sources else (
            "failure"if success_count == 0else"partial_failure"
        )
        
        return SkillAOutput(
            status=status,
            timestamp=datetime.now(),
            data=MarketRawData(
                stocks=stocks,
                news=news,
                competitors=[]
            ),
            metadata=CollectionMetadata(
                total_sources=2,
                success_count=success_count,
                failed_sources=failed_sources,
                execution_time_ms=350
            )
        )


# ===== Skill B:數據分析引擎 =====

class MarketAnalysisEngine:
    """Skill B: 數據分析引擎
    職責:接收原始數據,輸出結構化市場洞察
    """

    
    def __init__(self, llm_client=None):
        self.llm = llm_client
    
    def _calculate_data_quality(self, metadata: CollectionMetadata) -> float:
        """基於採集元數據計算數據質量分"""
        if metadata.total_sources == 0:
            return0.0
        base_score = metadata.success_count / metadata.total_sources
        # 有填充數據時輕微降分
        if metadata.filled_with_previous:
            base_score *= 0.9
        return round(base_score, 2)
    
    def _analyze_stock_trend(self, stocks: list) -> TrendAnalysis:
        """基於股票數據分析市場趨勢"""
        ifnot stocks:
            return TrendAnalysis(direction="neutral", strength="weak"
                               key_drivers=["數據不足"])
        
        positive_count = sum(1for s in stocks if s.change_pct > 0)
        ratio = positive_count / len(stocks)
        
        direction = "bullish"if ratio > 0.6else ("bearish"if ratio < 0.4else"neutral")
        avg_change = sum(abs(s.change_pct) for s in stocks) / len(stocks)
        strength = "strong"if avg_change > 2.0else ("moderate"if avg_change > 0.5else"weak")
        
        return TrendAnalysis(
            direction=direction,
            strength=strength,
            key_drivers=[f"{s.symbol}{s.change_pct:+.1f}%"for s in stocks[:3]]
        )
    
    asyncdef run(self, skill_a_output: SkillAOutput) -> SkillBOutput:
        """主執行方法"""
        data = skill_a_output.data
        metadata = skill_a_output.metadata
        
        data_quality = self._calculate_data_quality(metadata)
        trend = self._analyze_stock_trend(data.stocks)
        
        # 數據質量過低時,降級處理
        analysis_confidence = data_quality * 0.9
        risk_alerts = []
        if data_quality < 0.6:
            risk_alerts.append(f"數據質量較低({data_quality:.0%}),分析結論僅供參考")
        
        # 從新聞中提取洞察
        insights = []
        for news in data.news[:5]:
            if news.relevance_score > 0.7:
                insights.append(MarketInsight(
                    category="新聞信號",
                    signal=news.title,
                    confidence=news.relevance_score,
                    supporting_data=[f"來源: {news.source}"],
                    action_suggestion="持續關注"if news.sentiment == "positive"else"注意風險"
                ))
        
        # 計算整體市場情緒
        sentiment_scores = {"positive"1.0"neutral"0.0"negative"-1.0}
        if data.news:
            avg_sentiment = sum(
                sentiment_scores.get(n.sentiment, 0for n in data.news
            ) / len(data.news)
        else:
            avg_sentiment = 0.0
        
        market_sentiment = (
            "positive"if avg_sentiment > 0.3else
            ("negative"if avg_sentiment < -0.3else"neutral")
        )
        
        return SkillBOutput(
            analysis_date=str(date.today()),
            market_sentiment=market_sentiment,
            sentiment_score=avg_sentiment,
            top_insights=insights,
            trend_analysis=trend,
            risk_alerts=risk_alerts,
            opportunities=["AI賽道持續關注"if trend.direction == "bullish"else""],
            data_quality_score=data_quality,
            analysis_confidence=analysis_confidence
        )


# ===== Skill C:簡報生成器 =====

class ReportGenerator:
    """Skill C: 簡報生成器
    職責:接收分析結果,生成可讀性強的Markdown簡報
    """

    
    TREND_EMOJI = {"bullish""📈""bearish""📉""neutral""➡️"}
    SENTIMENT_LABEL = {"positive""樂觀""negative""悲觀""neutral""中性"}
    
    asyncdef run(self, skill_b_output: SkillBOutput) -> str:
        """主執行方法,生成Markdown格式簡報"""
        b = skill_b_output
        trend_emoji = self.TREND_EMOJI.get(b.trend_analysis.direction, "➡️")
        sentiment_label = self.SENTIMENT_LABEL.get(b.market_sentiment, "中性")
        
        report_sections = [
            f"# 每日市場簡報 | {b.analysis_date}",
            f"> 數據質量:{b.data_quality_score:.0%} | 分析置信度:{b.analysis_confidence:.0%}",
            "",
            "## 市場總覽",
            f"- **整體趨勢**:{trend_emoji} {b.trend_analysis.direction.upper()} ({b.trend_analysis.strength})",
            f"- **市場情緒**:{sentiment_label}(得分:{b.sentiment_score:+.2f})",
            f"- **主要驅動**:{', '.join(b.trend_analysis.key_drivers)}",
        ]
        
        if b.risk_alerts:
            report_sections += [
                "",
                "## ⚠️ 風險提示",
                *[f"- {alert}"for alert in b.risk_alerts]
            ]
        
        if b.top_insights:
            report_sections += [
                "",
                "## 關鍵洞察",
                *[f"- **{i.signal}**(置信度:{i.confidence:.0%}{' → ' + i.action_suggestion if i.action_suggestion else ''}"
                  for i in b.top_insights]
            ]
        
        if any(b.opportunities):
            report_sections += [
                "",
                "## 機會信號",
                *[f"- {opp}"for opp in b.opportunities if opp]
            ]
        
        report_sections += [
            "",
            "---",
            f"*報告生成時間:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | 由市場簡報Agent自動生成*"
        ]
        
        return"\n".join(report_sections)

五、失敗處理:斷路器模式

鏈式調用最大嘅風險係乜嘢?

級聯失敗(Cascading Failure)——Skill A冧咗,Skill B拎到垃圾數據,Skill C用垃圾數據生成咗一份措辭嚴謹嘅錯誤簡報,然後呢份簡報被發咗俾50個高管。

呢個比冇簡報更加衰。

解決方案係借鑑電路工程嘅「斷路器(Circuit Breaker)」模式。當電路過載時,保險絲斷開,保護成個電路系統。喺鏈式調用中,當某個Skill失敗時,斷路器阻止後續Skill繼續執行錯誤嘅流程。

5.1 斷路器嘅三種狀態

  • CLOSED(閉合):正常工作狀態,請求正常通過
  • OPEN(斷開):失敗次數超過閾值,拒絕所有請求,直接返回錯誤
  • HALF_OPEN(半開):OPEN狀態持續一段時間後,允許少量請求通過測試係咪恢復
import time
from enum import Enum
from functools import wraps

class CircuitState(Enum):
    CLOSED = "closed"      # 正常
    OPEN = "open"          # 斷開
    HALF_OPEN = "half_open"# 探測中

class CircuitBreaker:
    """Skill級別的斷路器"""
    
    def __init__(
        self, 
        skill_name: str,
        failure_threshold: int = 3,    # 連續失敗N次後斷開
        recovery_timeout: int = 60,    # 斷開後N秒嘗試恢復
        success_threshold: int = 2     # 半開狀態下連續成功N次才恢復
    )
:

        self.skill_name = skill_name
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.success_threshold = success_threshold
        
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.success_count = 0
        self.last_failure_time = None
    
    def _should_attempt(self) -> bool:
        if self.state == CircuitState.CLOSED:
            returnTrue
        elif self.state == CircuitState.OPEN:
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = CircuitState.HALF_OPEN
                logger.info(f"[{self.skill_name}] 斷路器進入半開狀態,開始探測")
                returnTrue
            returnFalse
        else:  # HALF_OPEN
            returnTrue
    
    def _on_success(self):
        self.failure_count = 0
        if self.state == CircuitState.HALF_OPEN:
            self.success_count += 1
            if self.success_count >= self.success_threshold:
                self.state = CircuitState.CLOSED
                self.success_count = 0
                logger.info(f"[{self.skill_name}] 斷路器恢復正常(CLOSED)")
    
    def _on_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        self.success_count = 0
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN
            logger.error(
                f"[{self.skill_name}] 斷路器斷開(OPEN)!"
                f"連續失敗{self.failure_count}次,{self.recovery_timeout}秒後嘗試恢復"
            )
    
    asyncdef call(self, skill_func, *args, **kwargs):
        """通過斷路器調用Skill"""
        ifnot self._should_attempt():
            raise CircuitOpenError(
                f"[{self.skill_name}] 斷路器斷開中,拒絕執行。"
                f"預計{self.recovery_timeout}秒後嘗試恢復"
            )
        
        try:
            result = await skill_func(*args, **kwargs)
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise

class CircuitOpenError(Exception):
    pass

5.2 鏈式調用嘅編排器(帶斷路器)

呢個係最核心嘅部分——編排器(Orchestrator),負責將三個Skill串聯起來,同時處理各種失敗場景:

class MarketBriefingOrchestrator:
    """每日市場簡報Agent的編排器
    負責鏈式調用Skill A -> B -> C,並處理故障
    """

    
    def __init__(self, config: dict):
        # 初始化三個Skill
        self.skill_a = MarketDataCollector(config)
        self.skill_b = MarketAnalysisEngine()
        self.skill_c = ReportGenerator()
        
        # 為每個Skill配置獨立的斷路器
        self.cb_a = CircuitBreaker("SkillA-Collector", failure_threshold=3)
        self.cb_b = CircuitBreaker("SkillB-Analyzer", failure_threshold=2)
        self.cb_c = CircuitBreaker("SkillC-Generator", failure_threshold=5)
        
    asyncdef run(self, date: str) -> dict:
        """執行完整的鏈式調用流程"""
        result = {
            "success"False,
            "report"None,
            "error"None,
            "stage"None,
            "fallback_used"False
        }
        
        # === Step 1: Skill A 數據採集 ===
        logger.info("[鏈路] 啓動 Skill A: 市場數據採集")
        try:
            skill_a_output = await self.cb_a.call(
                self.skill_a.run,
                {"date": date, "config": self._get_config()}
            )
            
            # 檢查數據質量,決定是否繼續
            if skill_a_output.status == "failure":
                logger.error("[鏈路] Skill A 完全失敗,啓動降級方案")
                returnawait self._fallback_report(date, "數據採集失敗")
                
        except CircuitOpenError as e:
            logger.error(f"[鏈路] Skill A 斷路器斷開: {e}")
            returnawait self._fallback_report(date, "採集服務不可用")
        except Exception as e:
            logger.error(f"[鏈路] Skill A 執行異常: {e}")
            result["stage"] = "skill_a"
            result["error"] = str(e)
            return result
        
        # === Step 2: Skill B 數據分析 ===
        logger.info("[鏈路] Skill A 完成,傳遞數據到 Skill B: 數據分析")
        try:
            skill_b_output = await self.cb_b.call(
                self.skill_b.run,
                skill_a_output  # 直接傳遞上游輸出
            )
        except CircuitOpenError as e:
            logger.warning(f"[鏈路] Skill B 斷路器斷開,嘗試降級分析: {e}")
            # Skill B 失敗時,用規則引擎生成降級的分析結果
            skill_b_output = self._rule_based_analysis(skill_a_output)
            result["fallback_used"] = True
        except Exception as e:
            logger.error(f"[鏈路] Skill B 執行異常: {e}")
            result["stage"] = "skill_b"
            result["error"] = str(e)
            return result
        
        # === Step 3: Skill C 報告生成 ===
        logger.info("[鏈路] Skill B 完成,傳遞分析到 Skill C: 報告生成")
        try:
            report = await self.cb_c.call(
                self.skill_c.run,
                skill_b_output  # 直接傳遞上游輸出
            )
            result["success"] = True
            result["report"] = report
            if result["fallback_used"]:
                result["report"] += "\n\n> ⚠️ 注:本報告部分分析使用了降級規則引擎,可能不如AI分析精準"
                
        except Exception as e:
            logger.error(f"[鏈路] Skill C 執行異常: {e}")
            result["stage"] = "skill_c"
            result["error"] = str(e)
        
        return result
    
    def _rule_based_analysis(self, skill_a_output: SkillAOutput) -> SkillBOutput:
        """Skill B的降級方案:基於規則的簡化分析"""
        stocks = skill_a_output.data.stocks
        positive = sum(1for s in stocks if s.change_pct > 0)
        direction = "bullish"if positive > len(stocks)/2else"bearish"
        
        return SkillBOutput(
            analysis_date=str(date.today()),
            market_sentiment="neutral",
            sentiment_score=0.0,
            top_insights=[],
            trend_analysis=TrendAnalysis(
                direction=direction,
                strength="weak",
                key_drivers=["規則引擎降級分析"]
            ),
            risk_alerts=["⚠️ AI分析引擎不可用,使用規則引擎降級輸出"],
            opportunities=[],
            data_quality_score=skill_a_output.metadata.success_count / max(skill_a_output.metadata.total_sources, 1),
            analysis_confidence=0.4# 降級時置信度明確標低
        )
    
    asyncdef _fallback_report(self, date: str, reason: str) -> dict:
        """最後的降級兜底:返回一份說明無法生成的通知"""
        return {
            "success"False,
            "report"f"# 每日市場簡報 | {date}\n\n> ❌ 今日簡報生成失敗\n\n原因:{reason}\n\n請檢查數據採集服務狀態。",
            "error": reason,
            "stage""skill_a",
            "fallback_used"True
        }
    
    def _get_config(self) -> dict:
        return {
            "stock_symbols": ["AAPL""GOOGL""MSFT""NVDA"],
            "news_keywords": ["AI""大模型""Agent""LLM"],
            "competitor_list": ["competitor_a""competitor_b"]
        }


# ===== 主程序入口 =====

asyncdef main():
    config = {"env""production"}
    orchestrator = MarketBriefingOrchestrator(config)
    
    target_date = "2026-05-21"
    logger.info(f"開始生成 {target_date} 的市場簡報...")
    
    result = await orchestrator.run(target_date)
    
    if result["success"]:
        print("✅ 簡報生成成功!")
        if result["fallback_used"]:
            print("⚠️  注意:部分環節使用了降級方案")
        print("\n" + "="*50)
        print(result["report"])
    else:
        print(f"❌ 簡報生成失敗於階段: {result['stage']}")
        print(f"錯誤原因: {result['error']}")
        if result.get("report"):
            print("降級通知:")
            print(result["report"])

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    asyncio.run(main())

六、鏈式調用嘅進階設計原則

經過上面嘅完整實現,我哋總結幾條喺實際生產環境中至關重要嘅設計原則。

6.1 接口版本化

當你嘅Skill鏈投入生產並開始迭代時,你會遇到一個棘手嘅問題:升級Skill B嘅分析邏輯時,輸出格式改變咗,但Skill C仲用緊舊版格式——成條鏈路靜默崩潰。

解決方案係俾接口增加版本字段:

class SkillBOutput(BaseModel):
    _schema_version: str = "v2.1.0"  # 接口版本
    # ... 其他字段

下游Skill喺接收數據時,先檢查版本兼容性,唔兼容時明確報錯,而唔係靜默處理出錯誤結果。

6.2 冪等性設計

鏈式調用裏面嘅每個Skill都應該係冪等的——相同嘅輸入,多次調用,產生相同嘅輸出,唔產生副作用。

點解呢個咁重要?因為喺斷路器重試、網絡波動等場景下,同一個Skill可能會俾人調用多次。如果Skill唔係冪等嘅,重複調用會導致數據重複寫入、重複發送通知等問題。

實現冪等性嘅簡單方法:俾每次調用生成一個唯一嘅request_id,Skill喺執行前檢查呢個ID係咪已經被處理過。

6.3 可觀測性:令鏈路透明

鏈式調用嘅鏈路越長,調試越困難。喺生產環境中,你需要能夠回答:「今朝早8:03嘅簡報生成,係邊個Skill花咗最多時間?」

每個Skill都應該輸出結構化日誌,包含:

{
  "trace_id""abc-123",        # 整條鏈路的唯一ID
  "skill_name""SkillB",       # 當前Skill名稱
  "execution_time_ms"1250,    # 執行耗時
  "input_hash""sha256...",    # 輸入數據的哈希(用於調試)
  "output_size_bytes"4096,    # 輸出大小
  "model_used""gpt-4o",       # 使用的模型
  "fallback_triggered": false    # 是否觸發了降級
}

七、從3個Skill到N個Skill:動態鏈路

我哋今日講嘅係固定嘅三段式鏈路:A → B → C。但現實中,鏈路往往需要動態調整。

例如,當Skill A採集到嘅新聞數量超過100條時,需要插入一個「Skill A.5:新聞過濾器」,先將唔相關嘅噪音過濾走,再傳俾Skill B。當市場出現極端行情(大漲或大跌超過5%)時,需要在Skill B之後插入一個「Skill B.5:預警分析器」,專門生成風險預警。

實現動態鏈路嘅關鍵係策略模式(Strategy Pattern)

class DynamicOrchestrator:
    """動態鏈路編排器:根據運行時條件插入或跳過Skill"""
    
    def __init__(self):
        self.pipeline = []  # Skill列表
    
    def add_skill(self, skill, condition=None, name=""):
        """condition是一個函數,接收前一個Skill的輸出,返回bool決定是否執行此Skill"""
        self.pipeline.append({
            "skill": skill,
            "condition": condition,  # None表示無條件執行
            "name": name
        })
        return self  # 支持鏈式調用:orchestrator.add_skill(a).add_skill(b)
    
    asyncdef run(self, initial_input):
        current_data = initial_input
        for step in self.pipeline:
            # 檢查條件
            if step["condition"andnot step["condition"](current_data):
                logger.info(f"[動態鏈路] 跳過 {step['name']}(條件不滿足)")
                continue
            
            logger.info(f"[動態鏈路] 執行 {step['name']}")
            current_data = await step["skill"].run(current_data)
        
        return current_data

# 使用示例
orchestrator = DynamicOrchestrator()
orchestrator.add_skill(skill_a, name="數據採集") \
    .add_skill(
        news_filter,
        condition=lambda x: len(x.data.news) > 100,  # 新聞超100條才執行
        name="新聞過濾"
    ) \
    .add_skill(skill_b, name="數據分析") \
    .add_skill(
        alert_analyzer,
        condition=lambda x: abs(x.sentiment_score) > 0.8,  # 極端情緒才執行
        name="預警分析"
    ) \
    .add_skill(skill_c, name="報告生成")

呢種設計令你嘅鏈路好似真正嘅樂高一樣——每次需要新功能,唔係重寫成條鏈路,而係拼上一塊新積木。


八、總結:標準化接口係一切嘅基礎

我哋今日行完咗一個完整嘅鏈式調用系統嘅設計與實現。返去樂高嘅比喻,等我哋做一個最終嘅對照:

樂高世界
Skill鏈式調用
積木嘅卡口尺寸
接口契約(Pydantic模型)
每塊積木嘅形狀功能
每個Skill嘅職責邊界
樂高套裝說明書
編排器(Orchestrator)
保險絲
斷路器(Circuit Breaker)
換一塊積木升級
替換單一個Skill唔影響整體
同一個卡口標準
接口版本管理

鏈式調用嘅核心心法只有一句話:每個Skill只做一件事,做完交俾下一個,接口係一切嘅基礎。

當你下次面對一個複雜嘅Agent任務時,先問自己三個問題:

  1. 呢個任務可以拆成邊幾個獨立嘅職責?
  2. 每個職責嘅輸入同輸出係乜嘢?
  3. 佢哋之間嘅接口點樣定義先最穩定?

諗清楚呢三個問題,你已經完成咗80%嘅設計工作。淨低嘅20%,就係本文裏面嗰啲防禦性編程——斷路器、降級方案、版本管理——佢哋係護城河,令你嘅積木城堡唔會因為一塊積木損壞就轟然倒塌。


如果呢篇文章對你有幫助,歡迎轉發俾正在設計Agent系統嘅朋友。關注公眾號,每週持續更新Agent開發實戰內容。

我係專注AI Agent深度實踐嘅努力撞蘑菇AI,致力於分享真實可用嘅AI使用經驗同工作流探索,歡迎你同我一齊將AI玩明白,如果內容對你有幫助,歡迎關注、點讚、轉發。

你有沒有想過,為什麼樂高積木能風靡全球幾十年?

不是因為某一塊積木有多神奇。一塊2×4的基礎磚,單獨看平平無奇——它就是一塊塑料。但當你把幾百塊、幾千塊積木組合在一起,卻能搭出埃菲爾鐵塔、千年隼號飛船、甚至整座城市的微縮模型。

樂高的魔力不在積木本身,而在於標準化接口帶來的無限組合可能。

今天我們聊的Agent Skill設計,和樂高積木的哲學如出一轍。

單個Skill就像一塊樂高積木——它處理一件具體的事,做好了就交給下一個。Skill A負責數據採集,輸出結構化數據;Skill B負責數據分析,把原始數據變成洞察;Skill C負責報告生成,把洞察變成可讀的文字。三個Skill首尾相連,形成一條流水線,這就是「鏈式調用」。

今天我用一個完整的真實案例——「每日市場簡報Agent」——帶你拆解鏈式調用的每一個細節:Skill如何設計、接口如何對齊、失敗了怎麼處理,以及完整的Python實現。

圖片
樂高積木組合成複雜結構,象徵Skill鏈式調用的模塊化設計

一、為什麼單個Skill不夠用?

1.1 單Skill的能力邊界

在我們開始搭積木之前,先想清楚一個問題:為什麼不把所有邏輯塞進一個Skill?

很多人剛開始設計Agent時,會有一種「大而全」的衝動——寫一個Super Skill,讓它又能抓數據、又能分析、又能寫報告。聽起來很方便,實際上是一場災難。

想象一下,你買了一套樂高,但廠商說「我們把所有零件都預先粘在一起,做成了一個完整的模型」——那就不是樂高了,那叫固定玩具。一旦某個部件壞了,整個模型報廢;想換個造型?對不起,做不到。

單體Skill有三個致命弱點:

① 上下文窗口壓力LLM的上下文窗口是有限的。如果你讓一個Skill同時處理「抓取10個數據源」+「多維度分析」+「生成完整報告」,輕鬆就能把128K的上下文塞滿。模型會開始「遺忘」早期信息,輸出質量急劇下降。

② 調試和維護地獄當輸出不符合預期時,你怎麼知道是數據採集環節出了問題,還是分析邏輯有偏差,還是生成格式不對?一個500行的Super Skill,排查起來如同大海撈針。

③ 複用率為零你為這個任務寫的數據分析邏輯,下次換個場景,一行代碼都帶不走。因為它和其他邏輯強耦合在一起,根本無法單獨提取。

1.2 鏈式調用的核心價值

鏈式調用解決的核心問題只有一個:職責單一化

每個Skill只做一件事,把這件事做到極致,然後把結果交給下一個Skill。這背後有一個深刻的設計哲學,在軟件工程裏叫「單一職責原則(SRP)」,在樂高世界裏叫「標準化卡口」。

正是因為每塊樂高積木的卡口尺寸完全一致,你才能自由組合。Skill鏈式調用的「卡口」就是接口契約——上游Skill輸出什麼格式,下游Skill就期望接收什麼格式。這個格式一旦約定好,兩個Skill可以獨立開發、獨立測試、獨立維護。


二、案例解構:每日市場簡報Agent

2.1 整體架構一覽

「每日市場簡報Agent」是一個每天早上8點自動運行的Agent,它完成三件事:

  1. 抓取數據:從多個數據源採集前一天的市場數據(股市、行業新聞、競品動態)
  2. 分析洞察:對原始數據進行多維度分析,提煉關鍵信號和趨勢
  3. 生成簡報:把分析結果寫成一份可直接發送給管理層的日報

對應的三個Skill:

[Skill A: 市場數據採集器]
         ↓ JSON數據包
[Skill B: 數據分析引擎]
         ↓ 分析報告結構體
[Skill C: 簡報生成器]
         ↓ Markdown簡報
     [發送/存儲]

整個流水線的關鍵在於:每個Skill的輸出,是下一個Skill的輸入。它們之間傳遞的不是模糊的自然語言,而是結構嚴謹的數據對象——就像樂高積木通過精確尺寸的卡口連接,而不是用膠水隨意粘在一起。

2.2 Skill A:市場數據採集器

職責定義:這個Skill是整條鏈路的「原料供應商」。它的唯一工作是從外部世界採集數據,清洗成標準格式,然後交出去。它不做任何分析,不發表任何觀點。

一個好的數據採集Skill,就像樂高積木工廠裏的原料車間——它只管把塑料顆粒壓制成符合規格的積木塊,不關心這些積木最終會搭成什麼造型。

輸入格式

{
  "date""2026-05-21",           # 採集日期
  "sources": ["stock""news""competitor"],  # 數據源列表
  "config": {
    "stock_symbols": ["AAPL""GOOGL""MSFT"],
    "news_keywords": ["AI""大模型""Agent"],
    "competitor_list": ["competitor_a""competitor_b"]
  }
}

輸出格式

{
  "status""success",
"timestamp""2026-05-21T23:59:00Z",
"data": {
    "stocks": [
      {"symbol""AAPL""close"189.5"change_pct"1.2"volume"82000000},
      {"symbol""GOOGL""close"175.3"change_pct"-0.8"volume"25000000}
    ],
    "news": [
      {
        "title""OpenAI發佈GPT-5技術報告",
        "source""Reuters",
        "published_at""2026-05-21T14:30:00Z",
        "sentiment""positive",
        "relevance_score"0.95
      }
    ],
    "competitors": [
      {
        "name""competitor_a",
        "recent_actions": ["發佈新產品線""CEO接受採訪"],
        "social_mentions"1250
      }
    ]
  },
"metadata": {
    "total_sources"3,
    "success_count"3,
    "failed_sources": [],
    "execution_time_ms"3200
  }
}

邊界條件設計

  • 如果某個數據源超時(>10秒),記錄在failed_sources中,繼續處理其他源,不拋出異常
  • 如果所有數據源都失敗,status設為"partial_failure",觸發下游的降級處理
  • 股票數據缺失時,用前一天數據填充,並在metadata中標記"filled_with_previous": true

三、接口設計:積木的卡口尺寸

在深入Skill B和Skill C之前,我們需要重點講一個往往被忽視的關鍵——接口設計

這是整個鏈式調用成敗的核心。

3.1 為什麼接口設計如此關鍵?

回到樂高的比喻。你有沒有試過把樂高積木和得寶積木(DUPLO)拼在一起?得寶是樂高的大號版,卡口尺寸是標準樂高的兩倍。理論上它們來自同一家公司,應該能兼容吧?

答案是:只能單向兼容,而且非常勉強。

這就是接口不對齊的代價。在Skill鏈式調用中,如果Skill A輸出的數據結構和Skill B期望的輸入結構不一致,整條鏈路就會在連接處斷裂。更糟糕的是,這種斷裂往往不會立即報錯,而是產生「垃圾進,垃圾出(GIGO)」的效果——Skill B接收了錯誤格式的數據,勉強運行,輸出一堆無意義的結果,然後Skill C把這些垃圾包裝成漂亮的報告發給你的老闆。

接口契約(Interface Contract)是解決方案。它明確規定:

  • 上游Skill保證輸出某種格式
  • 下游Skill假設接收的是這種格式
  • 兩者之間有驗證層確保契約被遵守

3.2 用Pydantic定義接口契約

在Python生態裏,Pydantic是定義接口契約的最佳工具。它讓數據結構從「約定俗成的口頭承諾」變成「編譯時可驗證的代碼契約」。

from pydantic import BaseModel, Field, validator
from typing import List, Optional, Literal
from datetime import datetime

# ===== 接口契約定義 =====

class StockData(BaseModel):
    symbol: str
    close: float
    change_pct: float
    volume: int

class NewsItem(BaseModel):
    title: str
    source: str
    published_at: datetime
    sentiment: Literal["positive""negative""neutral"]
    relevance_score: float = Field(ge=0.0, le=1.0)

class CompetitorData(BaseModel):
    name: str
    recent_actions: List[str]
    social_mentions: int

class MarketRawData(BaseModel):
    stocks: List[StockData]
    news: List[NewsItem]
    competitors: List[CompetitorData]

class CollectionMetadata(BaseModel):
    total_sources: int
    success_count: int
    failed_sources: List[str] = []
    execution_time_ms: int
    filled_with_previous: bool = False

# Skill A -> Skill B 的接口契約
class SkillAOutput(BaseModel):
    status: Literal["success""partial_failure""failure"]
    timestamp: datetime
    data: MarketRawData
    metadata: CollectionMetadata
    
    @validator('data')
    def data_must_have_stocks(cls, v, values):
        if values.get('status') == 'success'andnot v.stocks:
            raise ValueError('成功狀態下股票數據不能為空')
        return v

這段代碼做了幾件關鍵的事:

  1. 每個字段都有明確的類型約束
  2. sentiment使用Literal限定只能是三個值之一,不會出現「Positive」「POSITIVE」這類大小寫混亂
  3. relevance_scoreField(ge=0.0, le=1.0)確保分數在合法範圍內
  4. 自定義validator確保業務邏輯層面的契約

3.3 Skill B:數據分析引擎

職責定義:Skill B是整條鏈路的「思考者」。它接收Skill A清洗好的原始數據,運用分析邏輯提煉出洞察——哪些信號值得關注?市場在往哪個方向走?競爭對手有什麼異動?

這個Skill就像樂高套裝裏的核心部件——它把各種小積木組合成有意義的結構單元,比如一個車輪組合、一個門窗套件。單獨的車輪沒有意義,但當它和車身、車軸組合在一起,才構成了「汽車」這個概念。

輸入:接收SkillAOutput,也就是Skill A的完整輸出

核心處理邏輯

class MarketInsight(BaseModel):
    category: str
    signal: str
    confidence: float = Field(ge=0.0, le=1.0)
    supporting_data: List[str]
    action_suggestion: Optional[str] = None

class TrendAnalysis(BaseModel):
    direction: Literal["bullish""bearish""neutral"]
    strength: Literal["strong""moderate""weak"]
    key_drivers: List[str]

# Skill B -> Skill C 的接口契約
class SkillBOutput(BaseModel):
    analysis_date: str
    market_sentiment: Literal["positive""negative""neutral"]
    sentiment_score: float = Field(ge=-1.0, le=1.0)
    top_insights: List[MarketInsight]
    trend_analysis: TrendAnalysis
    risk_alerts: List[str]
    opportunities: List[str]
    data_quality_score: float  # 基於Skill A的metadata計算
    analysis_confidence: float  # 整體分析置信度

邊界條件設計

  • 如果輸入數據的data_quality_score低於0.6(超過40%的數據源失敗),降低analysis_confidence並在risk_alerts中標註數據不完整
  • 新聞數據為空時,跳過情感分析,market_sentiment設為"neutral",並註明原因
  • 分析引擎調用LLM超時時,返回基於規則引擎的降級分析結果

四、完整的Python實現

現在我們把所有積木拼在一起,展示完整的鏈式調用實現,包含編排邏輯和失敗處理。

4.1 三個Skill的核心實現

import asyncio
import logging
from datetime import datetime, date
from typing import Optional, Dict, Any
from pydantic import BaseModel

logger = logging.getLogger(__name__)

# ===== Skill A:市場數據採集器 =====

class MarketDataCollector:
    """Skill A: 市場數據採集器
    職責:從多數據源採集原始市場數據,輸出標準化JSON
    """

    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.timeout = 10# 單數據源超時時間(秒)
    
    asyncdef collect_stocks(self, symbols: list) -> tuple:
        """採集股票數據,失敗時返回空列表"""
        try:
            # 實際場景這裏調用股票API
            await asyncio.sleep(0.1)  # 模擬API調用
            return [
                StockData(symbol=s, close=150.0, change_pct=0.5, volume=1000000)
                for s in symbols
            ], None
        except Exception as e:
            logger.warning(f"股票數據採集失敗: {e}")
            return [], str(e)
    
    asyncdef collect_news(self, keywords: list) -> tuple:
        """採集新聞數據,失敗時返回空列表"""
        try:
            await asyncio.sleep(0.1)
            return [
                NewsItem(
                    title=f"關於{kw}的最新動態",
                    source="Reuters",
                    published_at=datetime.now(),
                    sentiment="positive",
                    relevance_score=0.8
                )
                for kw in keywords[:3]  # 限制條數
            ], None
        except Exception as e:
            logger.warning(f"新聞數據採集失敗: {e}")
            return [], str(e)
    
    asyncdef run(self, input_data: dict) -> SkillAOutput:
        """主執行方法,併發採集所有數據源"""
        target_date = input_data.get("date", str(date.today()))
        cfg = input_data.get("config", {})
        
        # 併發執行所有數據源採集
        stocks_task = self.collect_stocks(cfg.get("stock_symbols", []))
        news_task = self.collect_news(cfg.get("news_keywords", []))
        
        results = await asyncio.gather(
            stocks_task, news_task,
            return_exceptions=True# 關鍵:不讓一個失敗影響其他
        )
        
        stocks, stock_err = results[0ifnot isinstance(results[0], Exception) else ([], str(results[0]))
        news, news_err = results[1ifnot isinstance(results[1], Exception) else ([], str(results[1]))
        
        failed_sources = []
        if stock_err: failed_sources.append(f"stocks: {stock_err}")
        if news_err: failed_sources.append(f"news: {news_err}")
        
        success_count = 2 - len(failed_sources)
        status = "success"ifnot failed_sources else (
            "failure"if success_count == 0else"partial_failure"
        )
        
        return SkillAOutput(
            status=status,
            timestamp=datetime.now(),
            data=MarketRawData(
                stocks=stocks,
                news=news,
                competitors=[]
            ),
            metadata=CollectionMetadata(
                total_sources=2,
                success_count=success_count,
                failed_sources=failed_sources,
                execution_time_ms=350
            )
        )


# ===== Skill B:數據分析引擎 =====

class MarketAnalysisEngine:
    """Skill B: 數據分析引擎
    職責:接收原始數據,輸出結構化市場洞察
    """

    
    def __init__(self, llm_client=None):
        self.llm = llm_client
    
    def _calculate_data_quality(self, metadata: CollectionMetadata) -> float:
        """基於採集元數據計算數據質量分"""
        if metadata.total_sources == 0:
            return0.0
        base_score = metadata.success_count / metadata.total_sources
        # 有填充數據時輕微降分
        if metadata.filled_with_previous:
            base_score *= 0.9
        return round(base_score, 2)
    
    def _analyze_stock_trend(self, stocks: list) -> TrendAnalysis:
        """基於股票數據分析市場趨勢"""
        ifnot stocks:
            return TrendAnalysis(direction="neutral", strength="weak"
                               key_drivers=["數據不足"])
        
        positive_count = sum(1for s in stocks if s.change_pct > 0)
        ratio = positive_count / len(stocks)
        
        direction = "bullish"if ratio > 0.6else ("bearish"if ratio < 0.4else"neutral")
        avg_change = sum(abs(s.change_pct) for s in stocks) / len(stocks)
        strength = "strong"if avg_change > 2.0else ("moderate"if avg_change > 0.5else"weak")
        
        return TrendAnalysis(
            direction=direction,
            strength=strength,
            key_drivers=[f"{s.symbol}{s.change_pct:+.1f}%"for s in stocks[:3]]
        )
    
    asyncdef run(self, skill_a_output: SkillAOutput) -> SkillBOutput:
        """主執行方法"""
        data = skill_a_output.data
        metadata = skill_a_output.metadata
        
        data_quality = self._calculate_data_quality(metadata)
        trend = self._analyze_stock_trend(data.stocks)
        
        # 數據質量過低時,降級處理
        analysis_confidence = data_quality * 0.9
        risk_alerts = []
        if data_quality < 0.6:
            risk_alerts.append(f"數據質量較低({data_quality:.0%}),分析結論僅供參考")
        
        # 從新聞中提取洞察
        insights = []
        for news in data.news[:5]:
            if news.relevance_score > 0.7:
                insights.append(MarketInsight(
                    category="新聞信號",
                    signal=news.title,
                    confidence=news.relevance_score,
                    supporting_data=[f"來源: {news.source}"],
                    action_suggestion="持續關注"if news.sentiment == "positive"else"注意風險"
                ))
        
        # 計算整體市場情緒
        sentiment_scores = {"positive"1.0"neutral"0.0"negative"-1.0}
        if data.news:
            avg_sentiment = sum(
                sentiment_scores.get(n.sentiment, 0for n in data.news
            ) / len(data.news)
        else:
            avg_sentiment = 0.0
        
        market_sentiment = (
            "positive"if avg_sentiment > 0.3else
            ("negative"if avg_sentiment < -0.3else"neutral")
        )
        
        return SkillBOutput(
            analysis_date=str(date.today()),
            market_sentiment=market_sentiment,
            sentiment_score=avg_sentiment,
            top_insights=insights,
            trend_analysis=trend,
            risk_alerts=risk_alerts,
            opportunities=["AI賽道持續關注"if trend.direction == "bullish"else""],
            data_quality_score=data_quality,
            analysis_confidence=analysis_confidence
        )


# ===== Skill C:簡報生成器 =====

class ReportGenerator:
    """Skill C: 簡報生成器
    職責:接收分析結果,生成可讀性強的Markdown簡報
    """

    
    TREND_EMOJI = {"bullish""📈""bearish""📉""neutral""➡️"}
    SENTIMENT_LABEL = {"positive""樂觀""negative""悲觀""neutral""中性"}
    
    asyncdef run(self, skill_b_output: SkillBOutput) -> str:
        """主執行方法,生成Markdown格式簡報"""
        b = skill_b_output
        trend_emoji = self.TREND_EMOJI.get(b.trend_analysis.direction, "➡️")
        sentiment_label = self.SENTIMENT_LABEL.get(b.market_sentiment, "中性")
        
        report_sections = [
            f"# 每日市場簡報 | {b.analysis_date}",
            f"> 數據質量:{b.data_quality_score:.0%} | 分析置信度:{b.analysis_confidence:.0%}",
            "",
            "## 市場總覽",
            f"- **整體趨勢**:{trend_emoji} {b.trend_analysis.direction.upper()} ({b.trend_analysis.strength})",
            f"- **市場情緒**:{sentiment_label}(得分:{b.sentiment_score:+.2f})",
            f"- **主要驅動**:{', '.join(b.trend_analysis.key_drivers)}",
        ]
        
        if b.risk_alerts:
            report_sections += [
                "",
                "## ⚠️ 風險提示",
                *[f"- {alert}"for alert in b.risk_alerts]
            ]
        
        if b.top_insights:
            report_sections += [
                "",
                "## 關鍵洞察",
                *[f"- **{i.signal}**(置信度:{i.confidence:.0%}{' → ' + i.action_suggestion if i.action_suggestion else ''}"
                  for i in b.top_insights]
            ]
        
        if any(b.opportunities):
            report_sections += [
                "",
                "## 機會信號",
                *[f"- {opp}"for opp in b.opportunities if opp]
            ]
        
        report_sections += [
            "",
            "---",
            f"*報告生成時間:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | 由市場簡報Agent自動生成*"
        ]
        
        return"\n".join(report_sections)

五、失敗處理:斷路器模式

鏈式調用最大的風險是什麼?

級聯失敗(Cascading Failure)——Skill A崩了,Skill B拿到了垃圾數據,Skill C用垃圾數據生成了一份措辭嚴謹的錯誤簡報,然後這份簡報被髮給了50個高管。

這比沒有簡報還要糟糕。

解決方案是借鑑電路工程的「斷路器(Circuit Breaker)」模式。當電路過載時,保險絲斷開,保護整個電路系統。在鏈式調用中,當某個Skill失敗時,斷路器阻止後續Skill繼續執行錯誤的流程。

5.1 斷路器的三種狀態

  • CLOSED(閉合):正常工作狀態,請求正常通過
  • OPEN(斷開):失敗次數超過閾值,拒絕所有請求,直接返回錯誤
  • HALF_OPEN(半開):OPEN狀態持續一段時間後,允許少量請求通過測試是否恢復
import time
from enum import Enum
from functools import wraps

class CircuitState(Enum):
    CLOSED = "closed"      # 正常
    OPEN = "open"          # 斷開
    HALF_OPEN = "half_open"# 探測中

class CircuitBreaker:
    """Skill級別的斷路器"""
    
    def __init__(
        self, 
        skill_name: str,
        failure_threshold: int = 3,    # 連續失敗N次後斷開
        recovery_timeout: int = 60,    # 斷開後N秒嘗試恢復
        success_threshold: int = 2     # 半開狀態下連續成功N次才恢復
    )
:

        self.skill_name = skill_name
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.success_threshold = success_threshold
        
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.success_count = 0
        self.last_failure_time = None
    
    def _should_attempt(self) -> bool:
        if self.state == CircuitState.CLOSED:
            returnTrue
        elif self.state == CircuitState.OPEN:
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = CircuitState.HALF_OPEN
                logger.info(f"[{self.skill_name}] 斷路器進入半開狀態,開始探測")
                returnTrue
            returnFalse
        else:  # HALF_OPEN
            returnTrue
    
    def _on_success(self):
        self.failure_count = 0
        if self.state == CircuitState.HALF_OPEN:
            self.success_count += 1
            if self.success_count >= self.success_threshold:
                self.state = CircuitState.CLOSED
                self.success_count = 0
                logger.info(f"[{self.skill_name}] 斷路器恢復正常(CLOSED)")
    
    def _on_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        self.success_count = 0
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN
            logger.error(
                f"[{self.skill_name}] 斷路器斷開(OPEN)!"
                f"連續失敗{self.failure_count}次,{self.recovery_timeout}秒後嘗試恢復"
            )
    
    asyncdef call(self, skill_func, *args, **kwargs):
        """通過斷路器調用Skill"""
        ifnot self._should_attempt():
            raise CircuitOpenError(
                f"[{self.skill_name}] 斷路器斷開中,拒絕執行。"
                f"預計{self.recovery_timeout}秒後嘗試恢復"
            )
        
        try:
            result = await skill_func(*args, **kwargs)
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise

class CircuitOpenError(Exception):
    pass

5.2 鏈式調用的編排器(帶斷路器)

這是最核心的部分——編排器(Orchestrator),負責把三個Skill串聯起來,同時處理各種失敗場景:

class MarketBriefingOrchestrator:
    """每日市場簡報Agent的編排器
    負責鏈式調用Skill A -> B -> C,並處理故障
    """

    
    def __init__(self, config: dict):
        # 初始化三個Skill
        self.skill_a = MarketDataCollector(config)
        self.skill_b = MarketAnalysisEngine()
        self.skill_c = ReportGenerator()
        
        # 為每個Skill配置獨立的斷路器
        self.cb_a = CircuitBreaker("SkillA-Collector", failure_threshold=3)
        self.cb_b = CircuitBreaker("SkillB-Analyzer", failure_threshold=2)
        self.cb_c = CircuitBreaker("SkillC-Generator", failure_threshold=5)
        
    asyncdef run(self, date: str) -> dict:
        """執行完整的鏈式調用流程"""
        result = {
            "success"False,
            "report"None,
            "error"None,
            "stage"None,
            "fallback_used"False
        }
        
        # === Step 1: Skill A 數據採集 ===
        logger.info("[鏈路] 啓動 Skill A: 市場數據採集")
        try:
            skill_a_output = await self.cb_a.call(
                self.skill_a.run,
                {"date": date, "config": self._get_config()}
            )
            
            # 檢查數據質量,決定是否繼續
            if skill_a_output.status == "failure":
                logger.error("[鏈路] Skill A 完全失敗,啓動降級方案")
                returnawait self._fallback_report(date, "數據採集失敗")
                
        except CircuitOpenError as e:
            logger.error(f"[鏈路] Skill A 斷路器斷開: {e}")
            returnawait self._fallback_report(date, "採集服務不可用")
        except Exception as e:
            logger.error(f"[鏈路] Skill A 執行異常: {e}")
            result["stage"] = "skill_a"
            result["error"] = str(e)
            return result
        
        # === Step 2: Skill B 數據分析 ===
        logger.info("[鏈路] Skill A 完成,傳遞數據到 Skill B: 數據分析")
        try:
            skill_b_output = await self.cb_b.call(
                self.skill_b.run,
                skill_a_output  # 直接傳遞上游輸出
            )
        except CircuitOpenError as e:
            logger.warning(f"[鏈路] Skill B 斷路器斷開,嘗試降級分析: {e}")
            # Skill B 失敗時,用規則引擎生成降級的分析結果
            skill_b_output = self._rule_based_analysis(skill_a_output)
            result["fallback_used"] = True
        except Exception as e:
            logger.error(f"[鏈路] Skill B 執行異常: {e}")
            result["stage"] = "skill_b"
            result["error"] = str(e)
            return result
        
        # === Step 3: Skill C 報告生成 ===
        logger.info("[鏈路] Skill B 完成,傳遞分析到 Skill C: 報告生成")
        try:
            report = await self.cb_c.call(
                self.skill_c.run,
                skill_b_output  # 直接傳遞上游輸出
            )
            result["success"] = True
            result["report"] = report
            if result["fallback_used"]:
                result["report"] += "\n\n> ⚠️ 注:本報告部分分析使用了降級規則引擎,可能不如AI分析精準"
                
        except Exception as e:
            logger.error(f"[鏈路] Skill C 執行異常: {e}")
            result["stage"] = "skill_c"
            result["error"] = str(e)
        
        return result
    
    def _rule_based_analysis(self, skill_a_output: SkillAOutput) -> SkillBOutput:
        """Skill B的降級方案:基於規則的簡化分析"""
        stocks = skill_a_output.data.stocks
        positive = sum(1for s in stocks if s.change_pct > 0)
        direction = "bullish"if positive > len(stocks)/2else"bearish"
        
        return SkillBOutput(
            analysis_date=str(date.today()),
            market_sentiment="neutral",
            sentiment_score=0.0,
            top_insights=[],
            trend_analysis=TrendAnalysis(
                direction=direction,
                strength="weak",
                key_drivers=["規則引擎降級分析"]
            ),
            risk_alerts=["⚠️ AI分析引擎不可用,使用規則引擎降級輸出"],
            opportunities=[],
            data_quality_score=skill_a_output.metadata.success_count / max(skill_a_output.metadata.total_sources, 1),
            analysis_confidence=0.4# 降級時置信度明確標低
        )
    
    asyncdef _fallback_report(self, date: str, reason: str) -> dict:
        """最後的降級兜底:返回一份說明無法生成的通知"""
        return {
            "success"False,
            "report"f"# 每日市場簡報 | {date}\n\n> ❌ 今日簡報生成失敗\n\n原因:{reason}\n\n請檢查數據採集服務狀態。",
            "error": reason,
            "stage""skill_a",
            "fallback_used"True
        }
    
    def _get_config(self) -> dict:
        return {
            "stock_symbols": ["AAPL""GOOGL""MSFT""NVDA"],
            "news_keywords": ["AI""大模型""Agent""LLM"],
            "competitor_list": ["competitor_a""competitor_b"]
        }


# ===== 主程序入口 =====

asyncdef main():
    config = {"env""production"}
    orchestrator = MarketBriefingOrchestrator(config)
    
    target_date = "2026-05-21"
    logger.info(f"開始生成 {target_date} 的市場簡報...")
    
    result = await orchestrator.run(target_date)
    
    if result["success"]:
        print("✅ 簡報生成成功!")
        if result["fallback_used"]:
            print("⚠️  注意:部分環節使用了降級方案")
        print("\n" + "="*50)
        print(result["report"])
    else:
        print(f"❌ 簡報生成失敗於階段: {result['stage']}")
        print(f"錯誤原因: {result['error']}")
        if result.get("report"):
            print("降級通知:")
            print(result["report"])

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    asyncio.run(main())

六、鏈式調用的進階設計原則

經過上面的完整實現,我們總結幾條在實際生產環境中至關重要的設計原則。

6.1 接口版本化

當你的Skill鏈投入生產並開始迭代時,你會遇到一個棘手的問題:升級Skill B的分析邏輯時,輸出格式改變了,但Skill C還用的是舊版格式——整條鏈路靜默崩潰。

解決方案是給接口增加版本字段:

class SkillBOutput(BaseModel):
    _schema_version: str = "v2.1.0"  # 接口版本
    # ... 其他字段

下游Skill在接收數據時,先檢查版本兼容性,不兼容時明確報錯,而不是靜默處理出錯誤結果。

6.2 冪等性設計

鏈式調用中的每個Skill都應該是冪等的——相同的輸入,多次調用,產生相同的輸出,不產生副作用。

為什麼這很重要?因為在斷路器重試、網絡波動等場景下,同一個Skill可能被調用多次。如果Skill不是冪等的,重複調用會導致數據重複寫入、重複發送通知等問題。

實現冪等性的簡單方法:給每次調用生成一個唯一的request_id,Skill在執行前檢查這個ID是否已經被處理過。

6.3 可觀測性:讓鏈路透明

鏈式調用的鏈路越長,調試越困難。在生產環境中,你需要能夠回答:「今天早上8:03的簡報生成,是哪個Skill花了最多時間?」

每個Skill都應該輸出結構化日誌,包含:

{
  "trace_id""abc-123",        # 整條鏈路的唯一ID
  "skill_name""SkillB",       # 當前Skill名稱
  "execution_time_ms"1250,    # 執行耗時
  "input_hash""sha256...",    # 輸入數據的哈希(用於調試)
  "output_size_bytes"4096,    # 輸出大小
  "model_used""gpt-4o",       # 使用的模型
  "fallback_triggered": false    # 是否觸發了降級
}

七、從3個Skill到N個Skill:動態鏈路

我們今天講的是固定的三段式鏈路:A → B → C。但現實中,鏈路往往需要動態調整。

比如,當Skill A採集到的新聞數量超過100條時,需要插入一個「Skill A.5:新聞過濾器」,先把不相關的噪音過濾掉,再傳給Skill B。當市場出現極端行情(大漲或大跌超過5%)時,需要在Skill B之後插入一個「Skill B.5:預警分析器」,專門生成風險預警。

實現動態鏈路的關鍵是策略模式(Strategy Pattern)

class DynamicOrchestrator:
    """動態鏈路編排器:根據運行時條件插入或跳過Skill"""
    
    def __init__(self):
        self.pipeline = []  # Skill列表
    
    def add_skill(self, skill, condition=None, name=""):
        """condition是一個函數,接收前一個Skill的輸出,返回bool決定是否執行此Skill"""
        self.pipeline.append({
            "skill": skill,
            "condition": condition,  # None表示無條件執行
            "name": name
        })
        return self  # 支持鏈式調用:orchestrator.add_skill(a).add_skill(b)
    
    asyncdef run(self, initial_input):
        current_data = initial_input
        for step in self.pipeline:
            # 檢查條件
            if step["condition"andnot step["condition"](current_data):
                logger.info(f"[動態鏈路] 跳過 {step['name']}(條件不滿足)")
                continue
            
            logger.info(f"[動態鏈路] 執行 {step['name']}")
            current_data = await step["skill"].run(current_data)
        
        return current_data

# 使用示例
orchestrator = DynamicOrchestrator()
orchestrator.add_skill(skill_a, name="數據採集") \
    .add_skill(
        news_filter,
        condition=lambda x: len(x.data.news) > 100,  # 新聞超100條才執行
        name="新聞過濾"
    ) \
    .add_skill(skill_b, name="數據分析") \
    .add_skill(
        alert_analyzer,
        condition=lambda x: abs(x.sentiment_score) > 0.8,  # 極端情緒才執行
        name="預警分析"
    ) \
    .add_skill(skill_c, name="報告生成")

這種設計讓你的鏈路像真正的樂高一樣——每次需要新功能,不是重寫整條鏈路,而是拼上一塊新積木。


八、總結:標準化接口是一切的基礎

我們今天走完了一個完整的鏈式調用系統的設計與實現。回到樂高的比喻,讓我們做一個最終的對照:

樂高世界
Skill鏈式調用
積木的卡口尺寸
接口契約(Pydantic模型)
每塊積木的形狀功能
每個Skill的職責邊界
樂高套裝說明書
編排器(Orchestrator)
保險絲
斷路器(Circuit Breaker)
換一塊積木升級
替換單個Skill不影響整體
同一個卡口標準
接口版本管理

鏈式調用的核心心法只有一句話:每個Skill只做一件事,做完交給下一個,接口是一切的基礎。

當你下次面對一個複雜的Agent任務時,先問自己三個問題:

  1. 這個任務能拆成哪幾個獨立的職責?
  2. 每個職責的輸入和輸出是什麼?
  3. 它們之間的接口如何定義才能最穩定?

想清楚這三個問題,你已經完成了80%的設計工作。剩下的20%,就是本文裏那些防禦性編程——斷路器、降級方案、版本管理——它們是護城河,讓你的積木城堡不會因為一塊積木損壞就轟然倒塌。


如果這篇文章對你有幫助,歡迎轉發給正在設計Agent系統的朋友。關注公眾號,每週持續更新Agent開發實戰內容。

我是專注 AI Agent深度實踐的努力撞蘑菇AI,致力於分享真實可用的 AI 使用經驗與工作流探索,歡迎你和我一起把 AI 玩明白,如果內容對你有幫助,歡迎關注、點贊、轉發。