MCP是手腳,Skill是靈魂· 第三篇:樂高式Agent
整理版優先睇
鏈式調用係Agent設計嘅靈魂:用樂高積木比喻教你把Skill拆細再組合
呢篇文章係「MCP係手腳,Skill係靈魂」系列嘅第三篇,作者用樂高積木比喻嚟解釋點解Agent Skill要行鏈式調用。佢認為,單體Skill就好似預先黐實曬嘅固定玩具,一壞就冇得救,而鏈式調用就似樂高積木——每塊積木(Skill)只負責一件具體事,透過標準化接口自由組合。
作者先拆解單體Skill嘅三大致命弱點:上下文窗口壓力、調試維護地獄、複用率接近零。然後用一個完整案例——「每日市場簡報Agent」——由三個Skill組成:數據採集器、分析引擎、報告生成器,逐個拆解設計細節、接口契約(用Pydantic定義)、邊界條件,仲有失敗處理嘅斷路器模式。
最後佢展示咗完整嘅Python實現,證明呢種模塊化設計唔單止令Agent更可靠,仲方便獨立開發同測試。整體結論係:Agent設計嘅靈魂唔係單一強大嘅Skill,而係一套清晰嘅接口協定同故障隔離機制,令唔同Skill可以好似樂高積木咁靈活拼砌。
- 鏈式調用通過職責單一化,解決單體Skill上下文壓力大、難調試、難複用嘅問題。
- 用Pydantic定義接口契約,確保上下游Skill數據格式一致,好似樂高積木嘅標準化卡口。
- 鏈式調用與單體Skill嘅最大差異:每個Skill只做一件事,失敗時唔會級聯崩潰。
- 借鑑軟件工程嘅單一職責原則同斷路器模式,令Agent設計具備工業級可靠性。
- 設計新Agent時,優先拆分工責、定義清楚輸入輸出格式,並為每個Skill獨立配置斷路器。
點解要鏈式調用?
好多人一開始會想整一個「大而全」嘅Super Skill,一次過做曬數據採集、分析同報告。作者話呢種做法係固定玩具,唔係樂高。單體Skill有三個致命弱點:第一,上下文窗口壓力——一個Skill塞太多任務,好快爆context,模型會遺忘早期資訊;第二,調試同維護地獄——輸出唔啱時,好難知邊個環節出錯;第三,複用率接近零——邏輯強耦合,帶唔走。
鏈式調用嘅「卡口」就係接口契約——上游輸出乜格式,下游就預期接收乜格式。呢個格式一旦約定好,兩個Skill可以獨立開發、獨立測試、獨立維護。
案例拆解:每日市場簡報Agent
作者用一個真實案例示範:一個每日朝早8點自動運行嘅Agent,做三件事——市場數據採集器(Skill A)、數據分析引擎(Skill B)、簡報生成器(Skill C)。每個Skill嘅輸出係下一個嘅輸入,傳遞嘅係結構嚴謹嘅數據對象,唔係模糊嘅自然語言。
Skill A係「原料供應商」,只負責採集同清洗,唔做分析。佢有清晰嘅輸入(日期、數據源列表、config)同輸出(狀態、時間戳、結構化嘅股票、新聞、競爭對手數據)。邊界條件包括:數據源超時就記錄失敗,繼續處理其他;所有數據源失敗就標記partial_failure,觸發降級。
Skill B係「思考者」,接收原始數據,分析出市場信號、趨勢方向同風險提示。佢嘅輸出用MarketInsight、TrendAnalysis等模型定義,仲有數據質量分數同分析置信度。如果數據質量低過0.6,會自動降低信心並加風險提示。
Skill C係「作家」,將分析結果生成Markdown格式嘅每日簡報,包含市場總覽、風險提示、關鍵洞察同機會信號。最終輸出可以直接發送畀管理層。
接口設計同斷路器:成敗關鍵
接口設計係鏈式調用成敗嘅核心。作者用樂高比喻:標準樂高同得寶樂高(DUPLO)嘅卡口尺寸唔同,勉強拼埋會好唔穩。接口契約(Interface Contract)明確規定上游保證輸出某種格式,下游假設接收該格式,中間有驗證層確保遵守。
鏈式調用最大風險係級聯失敗——Skill A壞咗,Skill B用垃圾數據分析,Skill C生成精美錯誤報告。解決方案係斷路器模式,有三種狀態:CLOSED(正常)、OPEN(斷開)、HALF_OPEN(探測)。當連續失敗次數超過閾值,斷路器就會打開,拒絕後續請求;一段時間後轉半開狀態,畀少量請求通過測試是否恢復。
作者為每個Skill配置獨立斷路器,例如Skill A失敗閾值3次,Skill B係2次,Skill C係5次。喺編排器入面,用CircuitBreaker.call()包住每個Skill嘅執行函數,如果斷路器打開就拋出CircuitOpenError,編排器可以根據錯誤決定係咪用降級方案。
完整Python實現同失敗處理
作者展示咗三個Skill嘅核心實現。Skill A用asyncio.gather併發採集多個數據源,使用return_exceptions=True確保一個源失敗唔影響其他,並記錄失敗來源。Skill B根據數據質量分數動態調整分析置信度,當新聞為空時跳過情感分析,設定為中性。Skill C將分析結果轉成Markdown,包含表情符號同清晰結構。
class MarketDataCollector:
async def run(self, input_data: dict) -> SkillAOutput:
stocks, stock_err = await self.collect_stocks(...)
news, news_err = await self.collect_news(...)
# 併發處理,唔讓一個失敗影響其他
...
編排器(Orchestrator)負責串聯三個Skill,每個Skill都經過斷路器保護。佢會檢查Skill A嘅輸出狀態,如果係partial_failure,數據質量分數會下降,但依然繼續後續步驟。整個流程有清晰嘅日誌記錄同錯誤處理。
呢種設計令Agent具備工業級可靠性:即使部分數據源失敗,依然可以產出有價值嘅簡報,而唔係全盤崩潰。
你有冇諗過,點解樂高積木可以風靡全球幾十年?
唔係因為某一塊積木有幾神奇。一塊2×4嘅基礎磚,單獨睇平平無奇——佢就係一塊膠。但當你將幾百塊、幾千塊積木組合埋一齊,卻可以砌出埃菲爾鐵塔、千年隼號飛船、甚至成個城市嘅微縮模型。
樂高嘅魔力唔在於積木本身,而在於標準化接口帶嚟嘅無限組合可能。
今日我哋講嘅Agent Skill設計,同樂高積木嘅哲學一模一樣。
單一個Skill就好似一塊樂高積木——佢處理一件具體嘅事,做好咗就交俾下一個。Skill A負責數據採集,輸出結構化數據;Skill B負責數據分析,將原始數據變成洞察;Skill C負責報告生成,將洞察變成可讀嘅文字。三個Skill頭尾相連,形成一條流水線,呢個就係「鏈式調用」。
今日我用一個完整嘅真實案例——「每日市場簡報Agent」——帶你拆解鏈式調用嘅每一個細節:Skill點樣設計、接口點樣對齊、失敗咗點處理,以及完整嘅Python實現。

一、點解單一個Skill唔夠用?
1.1 單Skill嘅能力邊界
喺我哋開始砌積木之前,先諗清楚一個問題:點解唔將所有邏輯塞曬入一個Skill?
好多人啱開始設計Agent嗰時,會有一種「大而全」嘅衝動——寫一個Super Skill,等佢又可以捉數據、又可以分析、又可以寫報告。聽落好方便,實際上係一場災難。
幻想嚇,你買咗一套樂高,但廠商話「我哋將所有零件都預先黐埋一齊,做咗一個完整嘅模型」——咁就唔係樂高了,嗰啲叫固定玩具。一旦某個部件壞咗,成個模型報廢;想換個造型?對唔住,做唔到。
單體Skill有三個致命弱點:
① 上下文窗口壓力LLM嘅上下文窗口係有限嘅。如果你俾一個Skill同時處理「抓取10個數據源」+「多維度分析」+「生成完整報告」,好易就將128K嘅上下文塞滿。模型會開始「遺忘」早期信息,輸出質量急劇下降。
② 調試同維護地獄當輸出唔符合預期嗰時,你點知係數據採集環節出咗問題,定係分析邏輯有偏差,定係生成格式唔啱?一個500行嘅Super Skill,排查起上嚟如同大海撈針。
③ 複用率為零你為呢個任務寫嘅數據分析邏輯,下次換個場景,一行代碼都帶唔走。因為佢同其他邏輯強耦合埋一齊,根本冇辦法單獨提取。
1.2 鏈式調用嘅核心價值
鏈式調用解決嘅核心問題只有一個:職責單一化。
每個Skill只做一件事,將呢件事做到極致,然後將結果交俾下一個Skill。呢個背後有一個深刻嘅設計哲學,喺軟件工程裏面叫「單一職責原則(SRP)」,喺樂高世界裏面叫「標準化卡口」。
正正因為每塊樂高積木嘅卡口尺寸完全一致,你先可以自由組合。Skill鏈式調用嘅「卡口」就係接口契約——上游Skill輸出乜嘢格式,下游Skill就期望接收乜嘢格式。呢個格式一旦約定好,兩個Skill可以獨立開發、獨立測試、獨立維護。
二、案例解構:每日市場簡報Agent
2.1 整體架構一覽
「每日市場簡報Agent」係一個每日早上8點自動運行嘅Agent,佢完成三件事:
抓取數據:從多個數據源採集前一日嘅市場數據(股市、行業新聞、競品動態) 分析洞察:對原始數據進行多維度分析,提煉關鍵信號同趨勢 生成簡報:將分析結果寫成一份可以直接發送俾管理層嘅日報
對應嘅三個Skill:
[Skill A: 市場數據採集器]
↓ JSON數據包
[Skill B: 數據分析引擎]
↓ 分析報告結構體
[Skill C: 簡報生成器]
↓ Markdown簡報
[發送/存儲]
成條流水線嘅關鍵在於:每個Skill嘅輸出,係下一個Skill嘅輸入。佢哋之間傳遞嘅唔係模糊嘅自然語言,而係結構嚴謹嘅數據對象——就好似樂高積木透過精確尺寸嘅卡口連接,而唔係用膠水隨意黐埋一齊。
2.2 Skill A:市場數據採集器
職責定義:呢個Skill係成條鏈路嘅「原料供應商」。佢嘅唯一工作係從外部世界採集數據,清洗成標準格式,然後交出去。佢唔做任何分析,唔發表任何觀點。
一個好嘅數據採集Skill,就好似樂高積木工廠裏面嘅原料車間——佢只管將膠粒壓製成符合規格嘅積木塊,唔關心呢啲積木最終會砌成乜嘢造型。
輸入格式:
{
"date": "2026-05-21", # 採集日期
"sources": ["stock", "news", "competitor"], # 數據源列表
"config": {
"stock_symbols": ["AAPL", "GOOGL", "MSFT"],
"news_keywords": ["AI", "大模型", "Agent"],
"competitor_list": ["competitor_a", "competitor_b"]
}
}
輸出格式:
{
"status": "success",
"timestamp": "2026-05-21T23:59:00Z",
"data": {
"stocks": [
{"symbol": "AAPL", "close": 189.5, "change_pct": 1.2, "volume": 82000000},
{"symbol": "GOOGL", "close": 175.3, "change_pct": -0.8, "volume": 25000000}
],
"news": [
{
"title": "OpenAI發佈GPT-5技術報告",
"source": "Reuters",
"published_at": "2026-05-21T14:30:00Z",
"sentiment": "positive",
"relevance_score": 0.95
}
],
"competitors": [
{
"name": "competitor_a",
"recent_actions": ["發佈新產品線", "CEO接受採訪"],
"social_mentions": 1250
}
]
},
"metadata": {
"total_sources": 3,
"success_count": 3,
"failed_sources": [],
"execution_time_ms": 3200
}
}
邊界條件設計:
如果某個數據源超時(>10秒),記錄喺 failed_sources中,繼續處理其他源,唔拋出異常如果所有數據源都失敗, status設為"partial_failure",觸發下游嘅降級處理股票數據缺失嗰時,用前一日數據填充,並喺metadata中標記 "filled_with_previous": true
三、接口設計:積木嘅卡口尺寸
喺深入Skill B同Skill C之前,我哋需要重點講一個成日俾人忽視嘅關鍵——接口設計。
呢個係成個鏈式調用成敗嘅核心。
3.1 點解接口設計咁關鍵?
返去樂高嘅比喻。你有冇試過將樂高積木同得寶積木(DUPLO)拼埋一齊?得寶係樂高嘅大號版,卡口尺寸係標準樂高嘅兩倍。理論上佢哋嚟自同一間公司,應該兼容啩?
答案係:只能單向兼容,而且好勉強。
呢個就係接口唔對齊嘅代價。喺Skill鏈式調用中,如果Skill A輸出嘅數據結構同Skill B期望嘅輸入結構唔一致,成條鏈路就會喺連接位斷裂。更差嘅係,呢種斷裂往往唔會即刻報錯,而係產生「垃圾進,垃圾出(GIGO)」嘅效果——Skill B接收咗錯誤格式嘅數據,勉強運行,輸出一堆冇意義嘅結果,然後Skill C將呢啲垃圾包裝成靚靚嘅報告發俾你老細。
接口契約(Interface Contract)係解決方案。佢明確規定:
上游Skill保證輸出某種格式 下游Skill假設接收嘅係呢種格式 兩者之間有驗證層確保契約被遵守
3.2 用Pydantic定義接口契約
喺Python生態裏面,Pydantic係定義接口契約嘅最佳工具。佢令數據結構從「約定俗成嘅口頭承諾」變成「編譯時可驗證嘅代碼契約」。
from pydantic import BaseModel, Field, validator
from typing import List, Optional, Literal
from datetime import datetime
# ===== 接口契約定義 =====
class StockData(BaseModel):
symbol: str
close: float
change_pct: float
volume: int
class NewsItem(BaseModel):
title: str
source: str
published_at: datetime
sentiment: Literal["positive", "negative", "neutral"]
relevance_score: float = Field(ge=0.0, le=1.0)
class CompetitorData(BaseModel):
name: str
recent_actions: List[str]
social_mentions: int
class MarketRawData(BaseModel):
stocks: List[StockData]
news: List[NewsItem]
competitors: List[CompetitorData]
class CollectionMetadata(BaseModel):
total_sources: int
success_count: int
failed_sources: List[str] = []
execution_time_ms: int
filled_with_previous: bool = False
# Skill A -> Skill B 的接口契約
class SkillAOutput(BaseModel):
status: Literal["success", "partial_failure", "failure"]
timestamp: datetime
data: MarketRawData
metadata: CollectionMetadata
@validator('data')
def data_must_have_stocks(cls, v, values):
if values.get('status') == 'success'andnot v.stocks:
raise ValueError('成功狀態下股票數據不能為空')
return v
呢段代碼做咗幾件關鍵嘅事:
每個字段都有明確嘅類型約束 sentiment使用Literal限定只能係三個值之一,唔會出現「Positive」「POSITIVE」呢類大細寫混亂relevance_score用Field(ge=0.0, le=1.0)確保分數喺合法範圍內自定義validator確保業務邏輯層面嘅契約
3.3 Skill B:數據分析引擎
職責定義:Skill B係成條鏈路嘅「思考者」。佢接收Skill A清洗好嘅原始數據,運用分析邏輯提煉出洞察——邊啲信號值得關注?市場喺往邊個方向走?競爭對手有乜嘢異動?
呢個Skill就好似樂高套裝裏面嘅核心部件——佢將各種小積木組合成有意義嘅結構單元,例如一個車輪組合、一個門窗套件。單獨嘅車輪冇意義,但當佢同車身、車軸組合埋一齊,先構成咗「汽車」呢個概念。
輸入:接收SkillAOutput,亦即係Skill A嘅完整輸出
核心處理邏輯:
class MarketInsight(BaseModel):
category: str
signal: str
confidence: float = Field(ge=0.0, le=1.0)
supporting_data: List[str]
action_suggestion: Optional[str] = None
class TrendAnalysis(BaseModel):
direction: Literal["bullish", "bearish", "neutral"]
strength: Literal["strong", "moderate", "weak"]
key_drivers: List[str]
# Skill B -> Skill C 的接口契約
class SkillBOutput(BaseModel):
analysis_date: str
market_sentiment: Literal["positive", "negative", "neutral"]
sentiment_score: float = Field(ge=-1.0, le=1.0)
top_insights: List[MarketInsight]
trend_analysis: TrendAnalysis
risk_alerts: List[str]
opportunities: List[str]
data_quality_score: float # 基於Skill A的metadata計算
analysis_confidence: float # 整體分析置信度
邊界條件設計:
如果輸入數據嘅 data_quality_score低過0.6(超過40%嘅數據源失敗),降低analysis_confidence並在risk_alerts中標註數據唔完整新聞數據為空嗰時,跳過情感分析, market_sentiment設為"neutral",並註明原因分析引擎調用LLM超時嗰時,返回基於規則引擎嘅降級分析結果
四、完整嘅Python實現
而家我哋將所有積木拼埋一齊,展示完整嘅鏈式調用實現,包含編排邏輯同失敗處理。
4.1 三個Skill嘅核心實現
import asyncio
import logging
from datetime import datetime, date
from typing import Optional, Dict, Any
from pydantic import BaseModel
logger = logging.getLogger(__name__)
# ===== Skill A:市場數據採集器 =====
class MarketDataCollector:
"""Skill A: 市場數據採集器
職責:從多數據源採集原始市場數據,輸出標準化JSON
"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.timeout = 10# 單數據源超時時間(秒)
asyncdef collect_stocks(self, symbols: list) -> tuple:
"""採集股票數據,失敗時返回空列表"""
try:
# 實際場景這裏調用股票API
await asyncio.sleep(0.1) # 模擬API調用
return [
StockData(symbol=s, close=150.0, change_pct=0.5, volume=1000000)
for s in symbols
], None
except Exception as e:
logger.warning(f"股票數據採集失敗: {e}")
return [], str(e)
asyncdef collect_news(self, keywords: list) -> tuple:
"""採集新聞數據,失敗時返回空列表"""
try:
await asyncio.sleep(0.1)
return [
NewsItem(
title=f"關於{kw}的最新動態",
source="Reuters",
published_at=datetime.now(),
sentiment="positive",
relevance_score=0.8
)
for kw in keywords[:3] # 限制條數
], None
except Exception as e:
logger.warning(f"新聞數據採集失敗: {e}")
return [], str(e)
asyncdef run(self, input_data: dict) -> SkillAOutput:
"""主執行方法,併發採集所有數據源"""
target_date = input_data.get("date", str(date.today()))
cfg = input_data.get("config", {})
# 併發執行所有數據源採集
stocks_task = self.collect_stocks(cfg.get("stock_symbols", []))
news_task = self.collect_news(cfg.get("news_keywords", []))
results = await asyncio.gather(
stocks_task, news_task,
return_exceptions=True# 關鍵:不讓一個失敗影響其他
)
stocks, stock_err = results[0] ifnot isinstance(results[0], Exception) else ([], str(results[0]))
news, news_err = results[1] ifnot isinstance(results[1], Exception) else ([], str(results[1]))
failed_sources = []
if stock_err: failed_sources.append(f"stocks: {stock_err}")
if news_err: failed_sources.append(f"news: {news_err}")
success_count = 2 - len(failed_sources)
status = "success"ifnot failed_sources else (
"failure"if success_count == 0else"partial_failure"
)
return SkillAOutput(
status=status,
timestamp=datetime.now(),
data=MarketRawData(
stocks=stocks,
news=news,
competitors=[]
),
metadata=CollectionMetadata(
total_sources=2,
success_count=success_count,
failed_sources=failed_sources,
execution_time_ms=350
)
)
# ===== Skill B:數據分析引擎 =====
class MarketAnalysisEngine:
"""Skill B: 數據分析引擎
職責:接收原始數據,輸出結構化市場洞察
"""
def __init__(self, llm_client=None):
self.llm = llm_client
def _calculate_data_quality(self, metadata: CollectionMetadata) -> float:
"""基於採集元數據計算數據質量分"""
if metadata.total_sources == 0:
return0.0
base_score = metadata.success_count / metadata.total_sources
# 有填充數據時輕微降分
if metadata.filled_with_previous:
base_score *= 0.9
return round(base_score, 2)
def _analyze_stock_trend(self, stocks: list) -> TrendAnalysis:
"""基於股票數據分析市場趨勢"""
ifnot stocks:
return TrendAnalysis(direction="neutral", strength="weak",
key_drivers=["數據不足"])
positive_count = sum(1for s in stocks if s.change_pct > 0)
ratio = positive_count / len(stocks)
direction = "bullish"if ratio > 0.6else ("bearish"if ratio < 0.4else"neutral")
avg_change = sum(abs(s.change_pct) for s in stocks) / len(stocks)
strength = "strong"if avg_change > 2.0else ("moderate"if avg_change > 0.5else"weak")
return TrendAnalysis(
direction=direction,
strength=strength,
key_drivers=[f"{s.symbol}: {s.change_pct:+.1f}%"for s in stocks[:3]]
)
asyncdef run(self, skill_a_output: SkillAOutput) -> SkillBOutput:
"""主執行方法"""
data = skill_a_output.data
metadata = skill_a_output.metadata
data_quality = self._calculate_data_quality(metadata)
trend = self._analyze_stock_trend(data.stocks)
# 數據質量過低時,降級處理
analysis_confidence = data_quality * 0.9
risk_alerts = []
if data_quality < 0.6:
risk_alerts.append(f"數據質量較低({data_quality:.0%}),分析結論僅供參考")
# 從新聞中提取洞察
insights = []
for news in data.news[:5]:
if news.relevance_score > 0.7:
insights.append(MarketInsight(
category="新聞信號",
signal=news.title,
confidence=news.relevance_score,
supporting_data=[f"來源: {news.source}"],
action_suggestion="持續關注"if news.sentiment == "positive"else"注意風險"
))
# 計算整體市場情緒
sentiment_scores = {"positive": 1.0, "neutral": 0.0, "negative": -1.0}
if data.news:
avg_sentiment = sum(
sentiment_scores.get(n.sentiment, 0) for n in data.news
) / len(data.news)
else:
avg_sentiment = 0.0
market_sentiment = (
"positive"if avg_sentiment > 0.3else
("negative"if avg_sentiment < -0.3else"neutral")
)
return SkillBOutput(
analysis_date=str(date.today()),
market_sentiment=market_sentiment,
sentiment_score=avg_sentiment,
top_insights=insights,
trend_analysis=trend,
risk_alerts=risk_alerts,
opportunities=["AI賽道持續關注"if trend.direction == "bullish"else""],
data_quality_score=data_quality,
analysis_confidence=analysis_confidence
)
# ===== Skill C:簡報生成器 =====
class ReportGenerator:
"""Skill C: 簡報生成器
職責:接收分析結果,生成可讀性強的Markdown簡報
"""
TREND_EMOJI = {"bullish": "📈", "bearish": "📉", "neutral": "➡️"}
SENTIMENT_LABEL = {"positive": "樂觀", "negative": "悲觀", "neutral": "中性"}
asyncdef run(self, skill_b_output: SkillBOutput) -> str:
"""主執行方法,生成Markdown格式簡報"""
b = skill_b_output
trend_emoji = self.TREND_EMOJI.get(b.trend_analysis.direction, "➡️")
sentiment_label = self.SENTIMENT_LABEL.get(b.market_sentiment, "中性")
report_sections = [
f"# 每日市場簡報 | {b.analysis_date}",
f"> 數據質量:{b.data_quality_score:.0%} | 分析置信度:{b.analysis_confidence:.0%}",
"",
"## 市場總覽",
f"- **整體趨勢**:{trend_emoji} {b.trend_analysis.direction.upper()} ({b.trend_analysis.strength})",
f"- **市場情緒**:{sentiment_label}(得分:{b.sentiment_score:+.2f})",
f"- **主要驅動**:{', '.join(b.trend_analysis.key_drivers)}",
]
if b.risk_alerts:
report_sections += [
"",
"## ⚠️ 風險提示",
*[f"- {alert}"for alert in b.risk_alerts]
]
if b.top_insights:
report_sections += [
"",
"## 關鍵洞察",
*[f"- **{i.signal}**(置信度:{i.confidence:.0%}){' → ' + i.action_suggestion if i.action_suggestion else ''}"
for i in b.top_insights]
]
if any(b.opportunities):
report_sections += [
"",
"## 機會信號",
*[f"- {opp}"for opp in b.opportunities if opp]
]
report_sections += [
"",
"---",
f"*報告生成時間:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | 由市場簡報Agent自動生成*"
]
return"\n".join(report_sections)五、失敗處理:斷路器模式
鏈式調用最大嘅風險係乜嘢?
級聯失敗(Cascading Failure)——Skill A冧咗,Skill B拎到垃圾數據,Skill C用垃圾數據生成咗一份措辭嚴謹嘅錯誤簡報,然後呢份簡報被發咗俾50個高管。
呢個比冇簡報更加衰。
解決方案係借鑑電路工程嘅「斷路器(Circuit Breaker)」模式。當電路過載時,保險絲斷開,保護成個電路系統。喺鏈式調用中,當某個Skill失敗時,斷路器阻止後續Skill繼續執行錯誤嘅流程。
5.1 斷路器嘅三種狀態
CLOSED(閉合):正常工作狀態,請求正常通過 OPEN(斷開):失敗次數超過閾值,拒絕所有請求,直接返回錯誤 HALF_OPEN(半開):OPEN狀態持續一段時間後,允許少量請求通過測試係咪恢復
import time
from enum import Enum
from functools import wraps
class CircuitState(Enum):
CLOSED = "closed" # 正常
OPEN = "open" # 斷開
HALF_OPEN = "half_open"# 探測中
class CircuitBreaker:
"""Skill級別的斷路器"""
def __init__(
self,
skill_name: str,
failure_threshold: int = 3, # 連續失敗N次後斷開
recovery_timeout: int = 60, # 斷開後N秒嘗試恢復
success_threshold: int = 2 # 半開狀態下連續成功N次才恢復
):
self.skill_name = skill_name
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.success_threshold = success_threshold
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
self.last_failure_time = None
def _should_attempt(self) -> bool:
if self.state == CircuitState.CLOSED:
returnTrue
elif self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
logger.info(f"[{self.skill_name}] 斷路器進入半開狀態,開始探測")
returnTrue
returnFalse
else: # HALF_OPEN
returnTrue
def _on_success(self):
self.failure_count = 0
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.success_threshold:
self.state = CircuitState.CLOSED
self.success_count = 0
logger.info(f"[{self.skill_name}] 斷路器恢復正常(CLOSED)")
def _on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
self.success_count = 0
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
logger.error(
f"[{self.skill_name}] 斷路器斷開(OPEN)!"
f"連續失敗{self.failure_count}次,{self.recovery_timeout}秒後嘗試恢復"
)
asyncdef call(self, skill_func, *args, **kwargs):
"""通過斷路器調用Skill"""
ifnot self._should_attempt():
raise CircuitOpenError(
f"[{self.skill_name}] 斷路器斷開中,拒絕執行。"
f"預計{self.recovery_timeout}秒後嘗試恢復"
)
try:
result = await skill_func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
class CircuitOpenError(Exception):
pass
5.2 鏈式調用嘅編排器(帶斷路器)
呢個係最核心嘅部分——編排器(Orchestrator),負責將三個Skill串聯起來,同時處理各種失敗場景:
class MarketBriefingOrchestrator:
"""每日市場簡報Agent的編排器
負責鏈式調用Skill A -> B -> C,並處理故障
"""
def __init__(self, config: dict):
# 初始化三個Skill
self.skill_a = MarketDataCollector(config)
self.skill_b = MarketAnalysisEngine()
self.skill_c = ReportGenerator()
# 為每個Skill配置獨立的斷路器
self.cb_a = CircuitBreaker("SkillA-Collector", failure_threshold=3)
self.cb_b = CircuitBreaker("SkillB-Analyzer", failure_threshold=2)
self.cb_c = CircuitBreaker("SkillC-Generator", failure_threshold=5)
asyncdef run(self, date: str) -> dict:
"""執行完整的鏈式調用流程"""
result = {
"success": False,
"report": None,
"error": None,
"stage": None,
"fallback_used": False
}
# === Step 1: Skill A 數據採集 ===
logger.info("[鏈路] 啓動 Skill A: 市場數據採集")
try:
skill_a_output = await self.cb_a.call(
self.skill_a.run,
{"date": date, "config": self._get_config()}
)
# 檢查數據質量,決定是否繼續
if skill_a_output.status == "failure":
logger.error("[鏈路] Skill A 完全失敗,啓動降級方案")
returnawait self._fallback_report(date, "數據採集失敗")
except CircuitOpenError as e:
logger.error(f"[鏈路] Skill A 斷路器斷開: {e}")
returnawait self._fallback_report(date, "採集服務不可用")
except Exception as e:
logger.error(f"[鏈路] Skill A 執行異常: {e}")
result["stage"] = "skill_a"
result["error"] = str(e)
return result
# === Step 2: Skill B 數據分析 ===
logger.info("[鏈路] Skill A 完成,傳遞數據到 Skill B: 數據分析")
try:
skill_b_output = await self.cb_b.call(
self.skill_b.run,
skill_a_output # 直接傳遞上游輸出
)
except CircuitOpenError as e:
logger.warning(f"[鏈路] Skill B 斷路器斷開,嘗試降級分析: {e}")
# Skill B 失敗時,用規則引擎生成降級的分析結果
skill_b_output = self._rule_based_analysis(skill_a_output)
result["fallback_used"] = True
except Exception as e:
logger.error(f"[鏈路] Skill B 執行異常: {e}")
result["stage"] = "skill_b"
result["error"] = str(e)
return result
# === Step 3: Skill C 報告生成 ===
logger.info("[鏈路] Skill B 完成,傳遞分析到 Skill C: 報告生成")
try:
report = await self.cb_c.call(
self.skill_c.run,
skill_b_output # 直接傳遞上游輸出
)
result["success"] = True
result["report"] = report
if result["fallback_used"]:
result["report"] += "\n\n> ⚠️ 注:本報告部分分析使用了降級規則引擎,可能不如AI分析精準"
except Exception as e:
logger.error(f"[鏈路] Skill C 執行異常: {e}")
result["stage"] = "skill_c"
result["error"] = str(e)
return result
def _rule_based_analysis(self, skill_a_output: SkillAOutput) -> SkillBOutput:
"""Skill B的降級方案:基於規則的簡化分析"""
stocks = skill_a_output.data.stocks
positive = sum(1for s in stocks if s.change_pct > 0)
direction = "bullish"if positive > len(stocks)/2else"bearish"
return SkillBOutput(
analysis_date=str(date.today()),
market_sentiment="neutral",
sentiment_score=0.0,
top_insights=[],
trend_analysis=TrendAnalysis(
direction=direction,
strength="weak",
key_drivers=["規則引擎降級分析"]
),
risk_alerts=["⚠️ AI分析引擎不可用,使用規則引擎降級輸出"],
opportunities=[],
data_quality_score=skill_a_output.metadata.success_count / max(skill_a_output.metadata.total_sources, 1),
analysis_confidence=0.4# 降級時置信度明確標低
)
asyncdef _fallback_report(self, date: str, reason: str) -> dict:
"""最後的降級兜底:返回一份說明無法生成的通知"""
return {
"success": False,
"report": f"# 每日市場簡報 | {date}\n\n> ❌ 今日簡報生成失敗\n\n原因:{reason}\n\n請檢查數據採集服務狀態。",
"error": reason,
"stage": "skill_a",
"fallback_used": True
}
def _get_config(self) -> dict:
return {
"stock_symbols": ["AAPL", "GOOGL", "MSFT", "NVDA"],
"news_keywords": ["AI", "大模型", "Agent", "LLM"],
"competitor_list": ["competitor_a", "competitor_b"]
}
# ===== 主程序入口 =====
asyncdef main():
config = {"env": "production"}
orchestrator = MarketBriefingOrchestrator(config)
target_date = "2026-05-21"
logger.info(f"開始生成 {target_date} 的市場簡報...")
result = await orchestrator.run(target_date)
if result["success"]:
print("✅ 簡報生成成功!")
if result["fallback_used"]:
print("⚠️ 注意:部分環節使用了降級方案")
print("\n" + "="*50)
print(result["report"])
else:
print(f"❌ 簡報生成失敗於階段: {result['stage']}")
print(f"錯誤原因: {result['error']}")
if result.get("report"):
print("降級通知:")
print(result["report"])
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
asyncio.run(main())
六、鏈式調用嘅進階設計原則
經過上面嘅完整實現,我哋總結幾條喺實際生產環境中至關重要嘅設計原則。
6.1 接口版本化
當你嘅Skill鏈投入生產並開始迭代時,你會遇到一個棘手嘅問題:升級Skill B嘅分析邏輯時,輸出格式改變咗,但Skill C仲用緊舊版格式——成條鏈路靜默崩潰。
解決方案係俾接口增加版本字段:
class SkillBOutput(BaseModel):
_schema_version: str = "v2.1.0" # 接口版本
# ... 其他字段
下游Skill喺接收數據時,先檢查版本兼容性,唔兼容時明確報錯,而唔係靜默處理出錯誤結果。
6.2 冪等性設計
鏈式調用裏面嘅每個Skill都應該係冪等的——相同嘅輸入,多次調用,產生相同嘅輸出,唔產生副作用。
點解呢個咁重要?因為喺斷路器重試、網絡波動等場景下,同一個Skill可能會俾人調用多次。如果Skill唔係冪等嘅,重複調用會導致數據重複寫入、重複發送通知等問題。
實現冪等性嘅簡單方法:俾每次調用生成一個唯一嘅request_id,Skill喺執行前檢查呢個ID係咪已經被處理過。
6.3 可觀測性:令鏈路透明
鏈式調用嘅鏈路越長,調試越困難。喺生產環境中,你需要能夠回答:「今朝早8:03嘅簡報生成,係邊個Skill花咗最多時間?」
每個Skill都應該輸出結構化日誌,包含:
{
"trace_id": "abc-123", # 整條鏈路的唯一ID
"skill_name": "SkillB", # 當前Skill名稱
"execution_time_ms": 1250, # 執行耗時
"input_hash": "sha256...", # 輸入數據的哈希(用於調試)
"output_size_bytes": 4096, # 輸出大小
"model_used": "gpt-4o", # 使用的模型
"fallback_triggered": false # 是否觸發了降級
}七、從3個Skill到N個Skill:動態鏈路
我哋今日講嘅係固定嘅三段式鏈路:A → B → C。但現實中,鏈路往往需要動態調整。
例如,當Skill A採集到嘅新聞數量超過100條時,需要插入一個「Skill A.5:新聞過濾器」,先將唔相關嘅噪音過濾走,再傳俾Skill B。當市場出現極端行情(大漲或大跌超過5%)時,需要在Skill B之後插入一個「Skill B.5:預警分析器」,專門生成風險預警。
實現動態鏈路嘅關鍵係策略模式(Strategy Pattern):
class DynamicOrchestrator:
"""動態鏈路編排器:根據運行時條件插入或跳過Skill"""
def __init__(self):
self.pipeline = [] # Skill列表
def add_skill(self, skill, condition=None, name=""):
"""condition是一個函數,接收前一個Skill的輸出,返回bool決定是否執行此Skill"""
self.pipeline.append({
"skill": skill,
"condition": condition, # None表示無條件執行
"name": name
})
return self # 支持鏈式調用:orchestrator.add_skill(a).add_skill(b)
asyncdef run(self, initial_input):
current_data = initial_input
for step in self.pipeline:
# 檢查條件
if step["condition"] andnot step["condition"](current_data):
logger.info(f"[動態鏈路] 跳過 {step['name']}(條件不滿足)")
continue
logger.info(f"[動態鏈路] 執行 {step['name']}")
current_data = await step["skill"].run(current_data)
return current_data
# 使用示例
orchestrator = DynamicOrchestrator()
orchestrator.add_skill(skill_a, name="數據採集") \
.add_skill(
news_filter,
condition=lambda x: len(x.data.news) > 100, # 新聞超100條才執行
name="新聞過濾"
) \
.add_skill(skill_b, name="數據分析") \
.add_skill(
alert_analyzer,
condition=lambda x: abs(x.sentiment_score) > 0.8, # 極端情緒才執行
name="預警分析"
) \
.add_skill(skill_c, name="報告生成")
呢種設計令你嘅鏈路好似真正嘅樂高一樣——每次需要新功能,唔係重寫成條鏈路,而係拼上一塊新積木。
八、總結:標準化接口係一切嘅基礎
我哋今日行完咗一個完整嘅鏈式調用系統嘅設計與實現。返去樂高嘅比喻,等我哋做一個最終嘅對照:
鏈式調用嘅核心心法只有一句話:每個Skill只做一件事,做完交俾下一個,接口係一切嘅基礎。
當你下次面對一個複雜嘅Agent任務時,先問自己三個問題:
呢個任務可以拆成邊幾個獨立嘅職責? 每個職責嘅輸入同輸出係乜嘢? 佢哋之間嘅接口點樣定義先最穩定?
諗清楚呢三個問題,你已經完成咗80%嘅設計工作。淨低嘅20%,就係本文裏面嗰啲防禦性編程——斷路器、降級方案、版本管理——佢哋係護城河,令你嘅積木城堡唔會因為一塊積木損壞就轟然倒塌。
如果呢篇文章對你有幫助,歡迎轉發俾正在設計Agent系統嘅朋友。關注公眾號,每週持續更新Agent開發實戰內容。
你有沒有想過,為什麼樂高積木能風靡全球幾十年?
不是因為某一塊積木有多神奇。一塊2×4的基礎磚,單獨看平平無奇——它就是一塊塑料。但當你把幾百塊、幾千塊積木組合在一起,卻能搭出埃菲爾鐵塔、千年隼號飛船、甚至整座城市的微縮模型。
樂高的魔力不在積木本身,而在於標準化接口帶來的無限組合可能。
今天我們聊的Agent Skill設計,和樂高積木的哲學如出一轍。
單個Skill就像一塊樂高積木——它處理一件具體的事,做好了就交給下一個。Skill A負責數據採集,輸出結構化數據;Skill B負責數據分析,把原始數據變成洞察;Skill C負責報告生成,把洞察變成可讀的文字。三個Skill首尾相連,形成一條流水線,這就是「鏈式調用」。
今天我用一個完整的真實案例——「每日市場簡報Agent」——帶你拆解鏈式調用的每一個細節:Skill如何設計、接口如何對齊、失敗了怎麼處理,以及完整的Python實現。

一、為什麼單個Skill不夠用?
1.1 單Skill的能力邊界
在我們開始搭積木之前,先想清楚一個問題:為什麼不把所有邏輯塞進一個Skill?
很多人剛開始設計Agent時,會有一種「大而全」的衝動——寫一個Super Skill,讓它又能抓數據、又能分析、又能寫報告。聽起來很方便,實際上是一場災難。
想象一下,你買了一套樂高,但廠商說「我們把所有零件都預先粘在一起,做成了一個完整的模型」——那就不是樂高了,那叫固定玩具。一旦某個部件壞了,整個模型報廢;想換個造型?對不起,做不到。
單體Skill有三個致命弱點:
① 上下文窗口壓力LLM的上下文窗口是有限的。如果你讓一個Skill同時處理「抓取10個數據源」+「多維度分析」+「生成完整報告」,輕鬆就能把128K的上下文塞滿。模型會開始「遺忘」早期信息,輸出質量急劇下降。
② 調試和維護地獄當輸出不符合預期時,你怎麼知道是數據採集環節出了問題,還是分析邏輯有偏差,還是生成格式不對?一個500行的Super Skill,排查起來如同大海撈針。
③ 複用率為零你為這個任務寫的數據分析邏輯,下次換個場景,一行代碼都帶不走。因為它和其他邏輯強耦合在一起,根本無法單獨提取。
1.2 鏈式調用的核心價值
鏈式調用解決的核心問題只有一個:職責單一化。
每個Skill只做一件事,把這件事做到極致,然後把結果交給下一個Skill。這背後有一個深刻的設計哲學,在軟件工程裏叫「單一職責原則(SRP)」,在樂高世界裏叫「標準化卡口」。
正是因為每塊樂高積木的卡口尺寸完全一致,你才能自由組合。Skill鏈式調用的「卡口」就是接口契約——上游Skill輸出什麼格式,下游Skill就期望接收什麼格式。這個格式一旦約定好,兩個Skill可以獨立開發、獨立測試、獨立維護。
二、案例解構:每日市場簡報Agent
2.1 整體架構一覽
「每日市場簡報Agent」是一個每天早上8點自動運行的Agent,它完成三件事:
抓取數據:從多個數據源採集前一天的市場數據(股市、行業新聞、競品動態) 分析洞察:對原始數據進行多維度分析,提煉關鍵信號和趨勢 生成簡報:把分析結果寫成一份可直接發送給管理層的日報
對應的三個Skill:
[Skill A: 市場數據採集器]
↓ JSON數據包
[Skill B: 數據分析引擎]
↓ 分析報告結構體
[Skill C: 簡報生成器]
↓ Markdown簡報
[發送/存儲]
整個流水線的關鍵在於:每個Skill的輸出,是下一個Skill的輸入。它們之間傳遞的不是模糊的自然語言,而是結構嚴謹的數據對象——就像樂高積木通過精確尺寸的卡口連接,而不是用膠水隨意粘在一起。
2.2 Skill A:市場數據採集器
職責定義:這個Skill是整條鏈路的「原料供應商」。它的唯一工作是從外部世界採集數據,清洗成標準格式,然後交出去。它不做任何分析,不發表任何觀點。
一個好的數據採集Skill,就像樂高積木工廠裏的原料車間——它只管把塑料顆粒壓制成符合規格的積木塊,不關心這些積木最終會搭成什麼造型。
輸入格式:
{
"date": "2026-05-21", # 採集日期
"sources": ["stock", "news", "competitor"], # 數據源列表
"config": {
"stock_symbols": ["AAPL", "GOOGL", "MSFT"],
"news_keywords": ["AI", "大模型", "Agent"],
"competitor_list": ["competitor_a", "competitor_b"]
}
}
輸出格式:
{
"status": "success",
"timestamp": "2026-05-21T23:59:00Z",
"data": {
"stocks": [
{"symbol": "AAPL", "close": 189.5, "change_pct": 1.2, "volume": 82000000},
{"symbol": "GOOGL", "close": 175.3, "change_pct": -0.8, "volume": 25000000}
],
"news": [
{
"title": "OpenAI發佈GPT-5技術報告",
"source": "Reuters",
"published_at": "2026-05-21T14:30:00Z",
"sentiment": "positive",
"relevance_score": 0.95
}
],
"competitors": [
{
"name": "competitor_a",
"recent_actions": ["發佈新產品線", "CEO接受採訪"],
"social_mentions": 1250
}
]
},
"metadata": {
"total_sources": 3,
"success_count": 3,
"failed_sources": [],
"execution_time_ms": 3200
}
}
邊界條件設計:
如果某個數據源超時(>10秒),記錄在 failed_sources中,繼續處理其他源,不拋出異常如果所有數據源都失敗, status設為"partial_failure",觸發下游的降級處理股票數據缺失時,用前一天數據填充,並在metadata中標記 "filled_with_previous": true
三、接口設計:積木的卡口尺寸
在深入Skill B和Skill C之前,我們需要重點講一個往往被忽視的關鍵——接口設計。
這是整個鏈式調用成敗的核心。
3.1 為什麼接口設計如此關鍵?
回到樂高的比喻。你有沒有試過把樂高積木和得寶積木(DUPLO)拼在一起?得寶是樂高的大號版,卡口尺寸是標準樂高的兩倍。理論上它們來自同一家公司,應該能兼容吧?
答案是:只能單向兼容,而且非常勉強。
這就是接口不對齊的代價。在Skill鏈式調用中,如果Skill A輸出的數據結構和Skill B期望的輸入結構不一致,整條鏈路就會在連接處斷裂。更糟糕的是,這種斷裂往往不會立即報錯,而是產生「垃圾進,垃圾出(GIGO)」的效果——Skill B接收了錯誤格式的數據,勉強運行,輸出一堆無意義的結果,然後Skill C把這些垃圾包裝成漂亮的報告發給你的老闆。
接口契約(Interface Contract)是解決方案。它明確規定:
上游Skill保證輸出某種格式 下游Skill假設接收的是這種格式 兩者之間有驗證層確保契約被遵守
3.2 用Pydantic定義接口契約
在Python生態裏,Pydantic是定義接口契約的最佳工具。它讓數據結構從「約定俗成的口頭承諾」變成「編譯時可驗證的代碼契約」。
from pydantic import BaseModel, Field, validator
from typing import List, Optional, Literal
from datetime import datetime
# ===== 接口契約定義 =====
class StockData(BaseModel):
symbol: str
close: float
change_pct: float
volume: int
class NewsItem(BaseModel):
title: str
source: str
published_at: datetime
sentiment: Literal["positive", "negative", "neutral"]
relevance_score: float = Field(ge=0.0, le=1.0)
class CompetitorData(BaseModel):
name: str
recent_actions: List[str]
social_mentions: int
class MarketRawData(BaseModel):
stocks: List[StockData]
news: List[NewsItem]
competitors: List[CompetitorData]
class CollectionMetadata(BaseModel):
total_sources: int
success_count: int
failed_sources: List[str] = []
execution_time_ms: int
filled_with_previous: bool = False
# Skill A -> Skill B 的接口契約
class SkillAOutput(BaseModel):
status: Literal["success", "partial_failure", "failure"]
timestamp: datetime
data: MarketRawData
metadata: CollectionMetadata
@validator('data')
def data_must_have_stocks(cls, v, values):
if values.get('status') == 'success'andnot v.stocks:
raise ValueError('成功狀態下股票數據不能為空')
return v
這段代碼做了幾件關鍵的事:
每個字段都有明確的類型約束 sentiment使用Literal限定只能是三個值之一,不會出現「Positive」「POSITIVE」這類大小寫混亂relevance_score用Field(ge=0.0, le=1.0)確保分數在合法範圍內自定義validator確保業務邏輯層面的契約
3.3 Skill B:數據分析引擎
職責定義:Skill B是整條鏈路的「思考者」。它接收Skill A清洗好的原始數據,運用分析邏輯提煉出洞察——哪些信號值得關注?市場在往哪個方向走?競爭對手有什麼異動?
這個Skill就像樂高套裝裏的核心部件——它把各種小積木組合成有意義的結構單元,比如一個車輪組合、一個門窗套件。單獨的車輪沒有意義,但當它和車身、車軸組合在一起,才構成了「汽車」這個概念。
輸入:接收SkillAOutput,也就是Skill A的完整輸出
核心處理邏輯:
class MarketInsight(BaseModel):
category: str
signal: str
confidence: float = Field(ge=0.0, le=1.0)
supporting_data: List[str]
action_suggestion: Optional[str] = None
class TrendAnalysis(BaseModel):
direction: Literal["bullish", "bearish", "neutral"]
strength: Literal["strong", "moderate", "weak"]
key_drivers: List[str]
# Skill B -> Skill C 的接口契約
class SkillBOutput(BaseModel):
analysis_date: str
market_sentiment: Literal["positive", "negative", "neutral"]
sentiment_score: float = Field(ge=-1.0, le=1.0)
top_insights: List[MarketInsight]
trend_analysis: TrendAnalysis
risk_alerts: List[str]
opportunities: List[str]
data_quality_score: float # 基於Skill A的metadata計算
analysis_confidence: float # 整體分析置信度
邊界條件設計:
如果輸入數據的 data_quality_score低於0.6(超過40%的數據源失敗),降低analysis_confidence並在risk_alerts中標註數據不完整新聞數據為空時,跳過情感分析, market_sentiment設為"neutral",並註明原因分析引擎調用LLM超時時,返回基於規則引擎的降級分析結果
四、完整的Python實現
現在我們把所有積木拼在一起,展示完整的鏈式調用實現,包含編排邏輯和失敗處理。
4.1 三個Skill的核心實現
import asyncio
import logging
from datetime import datetime, date
from typing import Optional, Dict, Any
from pydantic import BaseModel
logger = logging.getLogger(__name__)
# ===== Skill A:市場數據採集器 =====
class MarketDataCollector:
"""Skill A: 市場數據採集器
職責:從多數據源採集原始市場數據,輸出標準化JSON
"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.timeout = 10# 單數據源超時時間(秒)
asyncdef collect_stocks(self, symbols: list) -> tuple:
"""採集股票數據,失敗時返回空列表"""
try:
# 實際場景這裏調用股票API
await asyncio.sleep(0.1) # 模擬API調用
return [
StockData(symbol=s, close=150.0, change_pct=0.5, volume=1000000)
for s in symbols
], None
except Exception as e:
logger.warning(f"股票數據採集失敗: {e}")
return [], str(e)
asyncdef collect_news(self, keywords: list) -> tuple:
"""採集新聞數據,失敗時返回空列表"""
try:
await asyncio.sleep(0.1)
return [
NewsItem(
title=f"關於{kw}的最新動態",
source="Reuters",
published_at=datetime.now(),
sentiment="positive",
relevance_score=0.8
)
for kw in keywords[:3] # 限制條數
], None
except Exception as e:
logger.warning(f"新聞數據採集失敗: {e}")
return [], str(e)
asyncdef run(self, input_data: dict) -> SkillAOutput:
"""主執行方法,併發採集所有數據源"""
target_date = input_data.get("date", str(date.today()))
cfg = input_data.get("config", {})
# 併發執行所有數據源採集
stocks_task = self.collect_stocks(cfg.get("stock_symbols", []))
news_task = self.collect_news(cfg.get("news_keywords", []))
results = await asyncio.gather(
stocks_task, news_task,
return_exceptions=True# 關鍵:不讓一個失敗影響其他
)
stocks, stock_err = results[0] ifnot isinstance(results[0], Exception) else ([], str(results[0]))
news, news_err = results[1] ifnot isinstance(results[1], Exception) else ([], str(results[1]))
failed_sources = []
if stock_err: failed_sources.append(f"stocks: {stock_err}")
if news_err: failed_sources.append(f"news: {news_err}")
success_count = 2 - len(failed_sources)
status = "success"ifnot failed_sources else (
"failure"if success_count == 0else"partial_failure"
)
return SkillAOutput(
status=status,
timestamp=datetime.now(),
data=MarketRawData(
stocks=stocks,
news=news,
competitors=[]
),
metadata=CollectionMetadata(
total_sources=2,
success_count=success_count,
failed_sources=failed_sources,
execution_time_ms=350
)
)
# ===== Skill B:數據分析引擎 =====
class MarketAnalysisEngine:
"""Skill B: 數據分析引擎
職責:接收原始數據,輸出結構化市場洞察
"""
def __init__(self, llm_client=None):
self.llm = llm_client
def _calculate_data_quality(self, metadata: CollectionMetadata) -> float:
"""基於採集元數據計算數據質量分"""
if metadata.total_sources == 0:
return0.0
base_score = metadata.success_count / metadata.total_sources
# 有填充數據時輕微降分
if metadata.filled_with_previous:
base_score *= 0.9
return round(base_score, 2)
def _analyze_stock_trend(self, stocks: list) -> TrendAnalysis:
"""基於股票數據分析市場趨勢"""
ifnot stocks:
return TrendAnalysis(direction="neutral", strength="weak",
key_drivers=["數據不足"])
positive_count = sum(1for s in stocks if s.change_pct > 0)
ratio = positive_count / len(stocks)
direction = "bullish"if ratio > 0.6else ("bearish"if ratio < 0.4else"neutral")
avg_change = sum(abs(s.change_pct) for s in stocks) / len(stocks)
strength = "strong"if avg_change > 2.0else ("moderate"if avg_change > 0.5else"weak")
return TrendAnalysis(
direction=direction,
strength=strength,
key_drivers=[f"{s.symbol}: {s.change_pct:+.1f}%"for s in stocks[:3]]
)
asyncdef run(self, skill_a_output: SkillAOutput) -> SkillBOutput:
"""主執行方法"""
data = skill_a_output.data
metadata = skill_a_output.metadata
data_quality = self._calculate_data_quality(metadata)
trend = self._analyze_stock_trend(data.stocks)
# 數據質量過低時,降級處理
analysis_confidence = data_quality * 0.9
risk_alerts = []
if data_quality < 0.6:
risk_alerts.append(f"數據質量較低({data_quality:.0%}),分析結論僅供參考")
# 從新聞中提取洞察
insights = []
for news in data.news[:5]:
if news.relevance_score > 0.7:
insights.append(MarketInsight(
category="新聞信號",
signal=news.title,
confidence=news.relevance_score,
supporting_data=[f"來源: {news.source}"],
action_suggestion="持續關注"if news.sentiment == "positive"else"注意風險"
))
# 計算整體市場情緒
sentiment_scores = {"positive": 1.0, "neutral": 0.0, "negative": -1.0}
if data.news:
avg_sentiment = sum(
sentiment_scores.get(n.sentiment, 0) for n in data.news
) / len(data.news)
else:
avg_sentiment = 0.0
market_sentiment = (
"positive"if avg_sentiment > 0.3else
("negative"if avg_sentiment < -0.3else"neutral")
)
return SkillBOutput(
analysis_date=str(date.today()),
market_sentiment=market_sentiment,
sentiment_score=avg_sentiment,
top_insights=insights,
trend_analysis=trend,
risk_alerts=risk_alerts,
opportunities=["AI賽道持續關注"if trend.direction == "bullish"else""],
data_quality_score=data_quality,
analysis_confidence=analysis_confidence
)
# ===== Skill C:簡報生成器 =====
class ReportGenerator:
"""Skill C: 簡報生成器
職責:接收分析結果,生成可讀性強的Markdown簡報
"""
TREND_EMOJI = {"bullish": "📈", "bearish": "📉", "neutral": "➡️"}
SENTIMENT_LABEL = {"positive": "樂觀", "negative": "悲觀", "neutral": "中性"}
asyncdef run(self, skill_b_output: SkillBOutput) -> str:
"""主執行方法,生成Markdown格式簡報"""
b = skill_b_output
trend_emoji = self.TREND_EMOJI.get(b.trend_analysis.direction, "➡️")
sentiment_label = self.SENTIMENT_LABEL.get(b.market_sentiment, "中性")
report_sections = [
f"# 每日市場簡報 | {b.analysis_date}",
f"> 數據質量:{b.data_quality_score:.0%} | 分析置信度:{b.analysis_confidence:.0%}",
"",
"## 市場總覽",
f"- **整體趨勢**:{trend_emoji} {b.trend_analysis.direction.upper()} ({b.trend_analysis.strength})",
f"- **市場情緒**:{sentiment_label}(得分:{b.sentiment_score:+.2f})",
f"- **主要驅動**:{', '.join(b.trend_analysis.key_drivers)}",
]
if b.risk_alerts:
report_sections += [
"",
"## ⚠️ 風險提示",
*[f"- {alert}"for alert in b.risk_alerts]
]
if b.top_insights:
report_sections += [
"",
"## 關鍵洞察",
*[f"- **{i.signal}**(置信度:{i.confidence:.0%}){' → ' + i.action_suggestion if i.action_suggestion else ''}"
for i in b.top_insights]
]
if any(b.opportunities):
report_sections += [
"",
"## 機會信號",
*[f"- {opp}"for opp in b.opportunities if opp]
]
report_sections += [
"",
"---",
f"*報告生成時間:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | 由市場簡報Agent自動生成*"
]
return"\n".join(report_sections)五、失敗處理:斷路器模式
鏈式調用最大的風險是什麼?
級聯失敗(Cascading Failure)——Skill A崩了,Skill B拿到了垃圾數據,Skill C用垃圾數據生成了一份措辭嚴謹的錯誤簡報,然後這份簡報被髮給了50個高管。
這比沒有簡報還要糟糕。
解決方案是借鑑電路工程的「斷路器(Circuit Breaker)」模式。當電路過載時,保險絲斷開,保護整個電路系統。在鏈式調用中,當某個Skill失敗時,斷路器阻止後續Skill繼續執行錯誤的流程。
5.1 斷路器的三種狀態
CLOSED(閉合):正常工作狀態,請求正常通過 OPEN(斷開):失敗次數超過閾值,拒絕所有請求,直接返回錯誤 HALF_OPEN(半開):OPEN狀態持續一段時間後,允許少量請求通過測試是否恢復
import time
from enum import Enum
from functools import wraps
class CircuitState(Enum):
CLOSED = "closed" # 正常
OPEN = "open" # 斷開
HALF_OPEN = "half_open"# 探測中
class CircuitBreaker:
"""Skill級別的斷路器"""
def __init__(
self,
skill_name: str,
failure_threshold: int = 3, # 連續失敗N次後斷開
recovery_timeout: int = 60, # 斷開後N秒嘗試恢復
success_threshold: int = 2 # 半開狀態下連續成功N次才恢復
):
self.skill_name = skill_name
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.success_threshold = success_threshold
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
self.last_failure_time = None
def _should_attempt(self) -> bool:
if self.state == CircuitState.CLOSED:
returnTrue
elif self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
logger.info(f"[{self.skill_name}] 斷路器進入半開狀態,開始探測")
returnTrue
returnFalse
else: # HALF_OPEN
returnTrue
def _on_success(self):
self.failure_count = 0
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.success_threshold:
self.state = CircuitState.CLOSED
self.success_count = 0
logger.info(f"[{self.skill_name}] 斷路器恢復正常(CLOSED)")
def _on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
self.success_count = 0
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
logger.error(
f"[{self.skill_name}] 斷路器斷開(OPEN)!"
f"連續失敗{self.failure_count}次,{self.recovery_timeout}秒後嘗試恢復"
)
asyncdef call(self, skill_func, *args, **kwargs):
"""通過斷路器調用Skill"""
ifnot self._should_attempt():
raise CircuitOpenError(
f"[{self.skill_name}] 斷路器斷開中,拒絕執行。"
f"預計{self.recovery_timeout}秒後嘗試恢復"
)
try:
result = await skill_func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
class CircuitOpenError(Exception):
pass
5.2 鏈式調用的編排器(帶斷路器)
這是最核心的部分——編排器(Orchestrator),負責把三個Skill串聯起來,同時處理各種失敗場景:
class MarketBriefingOrchestrator:
"""每日市場簡報Agent的編排器
負責鏈式調用Skill A -> B -> C,並處理故障
"""
def __init__(self, config: dict):
# 初始化三個Skill
self.skill_a = MarketDataCollector(config)
self.skill_b = MarketAnalysisEngine()
self.skill_c = ReportGenerator()
# 為每個Skill配置獨立的斷路器
self.cb_a = CircuitBreaker("SkillA-Collector", failure_threshold=3)
self.cb_b = CircuitBreaker("SkillB-Analyzer", failure_threshold=2)
self.cb_c = CircuitBreaker("SkillC-Generator", failure_threshold=5)
asyncdef run(self, date: str) -> dict:
"""執行完整的鏈式調用流程"""
result = {
"success": False,
"report": None,
"error": None,
"stage": None,
"fallback_used": False
}
# === Step 1: Skill A 數據採集 ===
logger.info("[鏈路] 啓動 Skill A: 市場數據採集")
try:
skill_a_output = await self.cb_a.call(
self.skill_a.run,
{"date": date, "config": self._get_config()}
)
# 檢查數據質量,決定是否繼續
if skill_a_output.status == "failure":
logger.error("[鏈路] Skill A 完全失敗,啓動降級方案")
returnawait self._fallback_report(date, "數據採集失敗")
except CircuitOpenError as e:
logger.error(f"[鏈路] Skill A 斷路器斷開: {e}")
returnawait self._fallback_report(date, "採集服務不可用")
except Exception as e:
logger.error(f"[鏈路] Skill A 執行異常: {e}")
result["stage"] = "skill_a"
result["error"] = str(e)
return result
# === Step 2: Skill B 數據分析 ===
logger.info("[鏈路] Skill A 完成,傳遞數據到 Skill B: 數據分析")
try:
skill_b_output = await self.cb_b.call(
self.skill_b.run,
skill_a_output # 直接傳遞上游輸出
)
except CircuitOpenError as e:
logger.warning(f"[鏈路] Skill B 斷路器斷開,嘗試降級分析: {e}")
# Skill B 失敗時,用規則引擎生成降級的分析結果
skill_b_output = self._rule_based_analysis(skill_a_output)
result["fallback_used"] = True
except Exception as e:
logger.error(f"[鏈路] Skill B 執行異常: {e}")
result["stage"] = "skill_b"
result["error"] = str(e)
return result
# === Step 3: Skill C 報告生成 ===
logger.info("[鏈路] Skill B 完成,傳遞分析到 Skill C: 報告生成")
try:
report = await self.cb_c.call(
self.skill_c.run,
skill_b_output # 直接傳遞上游輸出
)
result["success"] = True
result["report"] = report
if result["fallback_used"]:
result["report"] += "\n\n> ⚠️ 注:本報告部分分析使用了降級規則引擎,可能不如AI分析精準"
except Exception as e:
logger.error(f"[鏈路] Skill C 執行異常: {e}")
result["stage"] = "skill_c"
result["error"] = str(e)
return result
def _rule_based_analysis(self, skill_a_output: SkillAOutput) -> SkillBOutput:
"""Skill B的降級方案:基於規則的簡化分析"""
stocks = skill_a_output.data.stocks
positive = sum(1for s in stocks if s.change_pct > 0)
direction = "bullish"if positive > len(stocks)/2else"bearish"
return SkillBOutput(
analysis_date=str(date.today()),
market_sentiment="neutral",
sentiment_score=0.0,
top_insights=[],
trend_analysis=TrendAnalysis(
direction=direction,
strength="weak",
key_drivers=["規則引擎降級分析"]
),
risk_alerts=["⚠️ AI分析引擎不可用,使用規則引擎降級輸出"],
opportunities=[],
data_quality_score=skill_a_output.metadata.success_count / max(skill_a_output.metadata.total_sources, 1),
analysis_confidence=0.4# 降級時置信度明確標低
)
asyncdef _fallback_report(self, date: str, reason: str) -> dict:
"""最後的降級兜底:返回一份說明無法生成的通知"""
return {
"success": False,
"report": f"# 每日市場簡報 | {date}\n\n> ❌ 今日簡報生成失敗\n\n原因:{reason}\n\n請檢查數據採集服務狀態。",
"error": reason,
"stage": "skill_a",
"fallback_used": True
}
def _get_config(self) -> dict:
return {
"stock_symbols": ["AAPL", "GOOGL", "MSFT", "NVDA"],
"news_keywords": ["AI", "大模型", "Agent", "LLM"],
"competitor_list": ["competitor_a", "competitor_b"]
}
# ===== 主程序入口 =====
asyncdef main():
config = {"env": "production"}
orchestrator = MarketBriefingOrchestrator(config)
target_date = "2026-05-21"
logger.info(f"開始生成 {target_date} 的市場簡報...")
result = await orchestrator.run(target_date)
if result["success"]:
print("✅ 簡報生成成功!")
if result["fallback_used"]:
print("⚠️ 注意:部分環節使用了降級方案")
print("\n" + "="*50)
print(result["report"])
else:
print(f"❌ 簡報生成失敗於階段: {result['stage']}")
print(f"錯誤原因: {result['error']}")
if result.get("report"):
print("降級通知:")
print(result["report"])
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
asyncio.run(main())
六、鏈式調用的進階設計原則
經過上面的完整實現,我們總結幾條在實際生產環境中至關重要的設計原則。
6.1 接口版本化
當你的Skill鏈投入生產並開始迭代時,你會遇到一個棘手的問題:升級Skill B的分析邏輯時,輸出格式改變了,但Skill C還用的是舊版格式——整條鏈路靜默崩潰。
解決方案是給接口增加版本字段:
class SkillBOutput(BaseModel):
_schema_version: str = "v2.1.0" # 接口版本
# ... 其他字段
下游Skill在接收數據時,先檢查版本兼容性,不兼容時明確報錯,而不是靜默處理出錯誤結果。
6.2 冪等性設計
鏈式調用中的每個Skill都應該是冪等的——相同的輸入,多次調用,產生相同的輸出,不產生副作用。
為什麼這很重要?因為在斷路器重試、網絡波動等場景下,同一個Skill可能被調用多次。如果Skill不是冪等的,重複調用會導致數據重複寫入、重複發送通知等問題。
實現冪等性的簡單方法:給每次調用生成一個唯一的request_id,Skill在執行前檢查這個ID是否已經被處理過。
6.3 可觀測性:讓鏈路透明
鏈式調用的鏈路越長,調試越困難。在生產環境中,你需要能夠回答:「今天早上8:03的簡報生成,是哪個Skill花了最多時間?」
每個Skill都應該輸出結構化日誌,包含:
{
"trace_id": "abc-123", # 整條鏈路的唯一ID
"skill_name": "SkillB", # 當前Skill名稱
"execution_time_ms": 1250, # 執行耗時
"input_hash": "sha256...", # 輸入數據的哈希(用於調試)
"output_size_bytes": 4096, # 輸出大小
"model_used": "gpt-4o", # 使用的模型
"fallback_triggered": false # 是否觸發了降級
}七、從3個Skill到N個Skill:動態鏈路
我們今天講的是固定的三段式鏈路:A → B → C。但現實中,鏈路往往需要動態調整。
比如,當Skill A採集到的新聞數量超過100條時,需要插入一個「Skill A.5:新聞過濾器」,先把不相關的噪音過濾掉,再傳給Skill B。當市場出現極端行情(大漲或大跌超過5%)時,需要在Skill B之後插入一個「Skill B.5:預警分析器」,專門生成風險預警。
實現動態鏈路的關鍵是策略模式(Strategy Pattern):
class DynamicOrchestrator:
"""動態鏈路編排器:根據運行時條件插入或跳過Skill"""
def __init__(self):
self.pipeline = [] # Skill列表
def add_skill(self, skill, condition=None, name=""):
"""condition是一個函數,接收前一個Skill的輸出,返回bool決定是否執行此Skill"""
self.pipeline.append({
"skill": skill,
"condition": condition, # None表示無條件執行
"name": name
})
return self # 支持鏈式調用:orchestrator.add_skill(a).add_skill(b)
asyncdef run(self, initial_input):
current_data = initial_input
for step in self.pipeline:
# 檢查條件
if step["condition"] andnot step["condition"](current_data):
logger.info(f"[動態鏈路] 跳過 {step['name']}(條件不滿足)")
continue
logger.info(f"[動態鏈路] 執行 {step['name']}")
current_data = await step["skill"].run(current_data)
return current_data
# 使用示例
orchestrator = DynamicOrchestrator()
orchestrator.add_skill(skill_a, name="數據採集") \
.add_skill(
news_filter,
condition=lambda x: len(x.data.news) > 100, # 新聞超100條才執行
name="新聞過濾"
) \
.add_skill(skill_b, name="數據分析") \
.add_skill(
alert_analyzer,
condition=lambda x: abs(x.sentiment_score) > 0.8, # 極端情緒才執行
name="預警分析"
) \
.add_skill(skill_c, name="報告生成")
這種設計讓你的鏈路像真正的樂高一樣——每次需要新功能,不是重寫整條鏈路,而是拼上一塊新積木。
八、總結:標準化接口是一切的基礎
我們今天走完了一個完整的鏈式調用系統的設計與實現。回到樂高的比喻,讓我們做一個最終的對照:
鏈式調用的核心心法只有一句話:每個Skill只做一件事,做完交給下一個,接口是一切的基礎。
當你下次面對一個複雜的Agent任務時,先問自己三個問題:
這個任務能拆成哪幾個獨立的職責? 每個職責的輸入和輸出是什麼? 它們之間的接口如何定義才能最穩定?
想清楚這三個問題,你已經完成了80%的設計工作。剩下的20%,就是本文裏那些防禦性編程——斷路器、降級方案、版本管理——它們是護城河,讓你的積木城堡不會因為一塊積木損壞就轟然倒塌。
如果這篇文章對你有幫助,歡迎轉發給正在設計Agent系統的朋友。關注公眾號,每週持續更新Agent開發實戰內容。