Python大模型行业资讯摘要流水线:自动化工程实现全攻略
```python
@dataclass
class Article:
"""资讯结构化元数据实体类"""
title: str = ""
url: str = ""
source: str = ""
published: str = ""
raw_content: str = ""
clean_content: str = ""
word_count: int = 0
uid: str = ""
seen: bool = False
class ArticleFetcher:
"""异构资讯多源采集引擎"""
def __init__(self):
# 持久化请求会话,复用连接池优化网络性能
self.session = requests.Session()
self.session.headers.update({
"User-Agent": (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 Chrome/125.0.0.0 Safari/537.36"
),
})
# 全局URL指纹集合,实现链接级粗去重
self.seen_urls: set = set()
def fetch_rss(self, feed_url: str, source: str) -> List[Article]:
"""RSS订阅源批量资讯拉取接口"""
articles = []
try:
feed = feedparser.parse(feed_url)
# 限制单次解析条目上限,避免单次流量过载
for entry in feed.entries[:20]:
raw_text = self._get_rss_entry_content(entry)
article = Article(
title=getattr(entry, "title", ""),
url=getattr(entry, "link", ""),
source=source,
published=getattr(entry, "published", ""),
raw_content=raw_text,
)
# URL哈希判重,仅新增未收录资讯
if self._is_new_article(article):
articles.append(article)
print(f"[{source}] 成功采集新增资讯 {len(articles)} 篇")
except Exception as e:
print(f"[{source}] RSS源解析异常,异常信息:{str(e)}")
return articles
def fetch_html_fulltext(self, url: str, source: str) -> Optional[Article]:
"""RSS正文缺失兜底:单页面HTML全文采集"""
try:
resp = self.session.get(url, timeout=15)
soup = BeautifulSoup(resp.text, "html.parser")
# 清除广告、导航、脚本等无效DOM节点
for invalid_tag in soup.select("script, style, na v, .ad, .advertisement"):
invalid_tag.decompose()
# 精准定位正文容器标签
content_block = soup.select_one("article, .content, #article-body, main")
pu re_text = content_block.get_text(separator="", strip=True) if content_block else ""
return Article(
title=soup.title.string or "",
url=url, source=source,
raw_content=pu re_text[:2000],
)
except Exception as e:
print(f"[HTML采集] 链接{url[:40]} 请求失败,异常:{str(e)}")
return None
def _get_rss_entry_content(self, entry) -> str:
"""解析RSS条目摘要/正文字段,兼容多平台RSS结构差异"""
for field in ("summary", "content"):
field_val = getattr(entry, field, None)
if not field_val:
continue
# 兼容数组型content结构与字符串摘要
raw_html = field_val[0].get("value", str(field_val)) if isinstance(field_val, list) else str(field_val)
return self._html_to_plaintext(raw_html)
return ""
def _html_to_plaintext(self, html: str) -> str:
"""HTML标签清洗,输出纯文本并截断超长内容"""
soup = BeautifulSoup(html, "html.parser")
return soup.get_text(separator=" ", strip=True)[:3000]
def _is_new_article(self, article: Article) -> bool:
"""基于URL MD5哈希实现链接级去重判断"""
if not article.url:
return False
url_fingerprint = hashlib.md5(article.url.encode(encoding="utf-8")).hexdigest()[:16]
if url_fingerprint in self.seen_urls:
return False
self.seen_urls.add(url_fingerprint)
return True
```
**采集层补充说明**
1. **双层采集兜底机制**:优先解析RSS内置摘要;若RSS仅提供标题无正文,自动调用`fetch_html_fulltext`抓取网页全文,解决国内资讯站点RSS内容缺失问题。
2. **网络优化**:复用`requests.Session`长连接、固定通用UA、设置15s请求超时,降低封禁风险。
3. **粗粒度去重**:通过URL生成16位MD5指纹全局缓存,拦截重复链接请求,节省网络IO。
### (二)去重与质量过滤层:SimHash 文本指纹过滤引擎
在链接粗去重基础上,引入 **SimHash局部敏感哈希算法**实现语义级去重,解决“不同URL报道同一事件”的重复资讯问题。同步增加稿件质量前置过滤,拦截短文本无价值内容、标题党煽动类资讯,进一步减少无效大模型调用。
核心原理:对资讯正文生成64位文本指纹,通过指纹间海明距离判别是否重复;默认海明距离阈值设为3,距离≤3则判定为同事件重复稿件。相比单纯URL匹配,这套方案可识别换标题、换发布渠道但核心内容一致的同质化资讯。
```python
import hashlib
from typing import List
@dataclass
class ContentDeduplicator:
"""SimHash语义去重 + 资讯质量过滤器"""
seen_text_signatures: set = set()
@staticmethod
def generate_simhash(text: str, slide_window: int = 3) -> int:
"""轻量化64位SimHash指纹生成,滑动窗口提取文本特征向量"""
# 截断超长文本,控制特征计算开销
word_list = text.split()[:200]
feature_vectors = []
# 滑动窗口提取连续词组特征
for idx in range(len(word_list) - slide_window + 1):
window_words = word_list[idx: idx + slide_window]
combined_key = "".join(window_words)
feature_hash = hash(combined_key) & 0xFFFF
feature_vectors.append(feature_hash)
# 64维向量加权聚合
weight_vec = [0] * 64
for h_val in feature_vectors:
for bit_idx in range(64):
bit_flag = (h_val >> bit_idx) & 1
weight_vec[bit_idx] += 1 if bit_flag else -1
# 生成最终64位二进制指纹
final_fingerprint = 0
for weight in weight_vec:
final_fingerprint = (final_fingerprint << 1) | (1 if weight > 0 else 0)
return final_fingerprint
def check_duplicate(self, article: Article, hamming_threshold: int = 3) -> bool:
"""海明距离计算,判断文本语义重复"""
target_text = article.clean_content if article.clean_content else article.raw_content
current_sig = self.generate_simhash(target_text)
# 比对历史指纹库
for history_sig in self.seen_text_signatures:
hamming_dist = bin(current_sig ^ history_sig).count("1")
if hamming_dist <= hamming_threshold:
return True
self.seen_text_signatures.add(current_sig)
return False
def batch_filter(self, articles: List[Article]) -> List[Article]:
"""批量执行三重过滤:短文本过滤、标题党拦截、SimHash语义去重"""
valid_articles = []
skip_count = 0
# 标题党风险关键词库
clickbait_keywords = [
"震惊", "刚刚", "内幕", "真相", "必看", "转发",
"突发", "刚刚宣布", "紧急", "惊天", "彻底爆发"
]
for article in articles:
text = article.clean_content or article.raw_content
article.word_count = len(text.split())
# 过滤100词以内无价值短资讯
if article.word_count < 100:
skip_count += 1
continue
# 拦截标题党煽动类稿件
if any(keyword in article.title for keyword in clickbait_keywords):
skip_count += 1
continue
# 语义去重校验
if self.check_duplicate(article):
skip_count += 1
continue
valid_articles.append(article)
print(f"过滤完成:共丢弃{skip_count}/{len(articles)}篇低质/重复资讯,留存有效稿件{len(valid_articles)}篇")
return valid_articles
```
**调优说明**
- **误杀规避**:短文本资讯(100-200字)特征稀疏,容易出现SimHash误判,可动态下调海明阈值至2;长深度稿件维持阈值3,平衡召回与去重效果。
- **双重去重体系**:URL哈希粗过滤 + SimHash语义细过滤,兼顾性能与去重精度。
### (三)大模型结构化深度摘要层
基于OpenAI兼容接口封装批量分析引擎,通过精细化Prompt约束大模型输出标准化JSON结构化分析报告——不再输出自由文本摘要。内置Token计数、超长文本截断、批量限流、异常捕获、接口成本统计能力,完全适配批量资讯处理场景。
输出字段覆盖:精炼标题、事件背景、核心事实清单、深度趋势分析、行业标签、资讯质量打分、预估阅读时长、核心结论、内容潜在偏见标注。这些字段足以满足情报分析、行业简报等业务需求。
```python
from openai import OpenAI
import tiktoken
import time
import json
from typing import List, Dict
class LLMDeepSummaryAnalyzer:
"""资讯深度结构化分析生成器,兼容OpenAI系列模型接口"""
def __init__(
self,
api_key: str,
base_url: str = "https://api.openai.com/v1",
model_name: str = "gpt-4o-mini",
single_max_output_tokens: int = 1200,
):
self.llm_client = OpenAI(api_key=api_key, base_url=base_url)
self.model = model_name
self.max_output_tokens = single_max_output_tokens
def single_article_analysis(self, article: Article) -> Dict:
"""单篇资讯调用LLM生成结构化深度分析"""
raw_text = (article.clean_content or article.raw_content)[:2500]
# 标准化约束Prompt,强制JSON输出规范
analysis_prompt = f"""
你是专业行业情报编辑,严谨客观,仅基于原文内容输出分析,禁止杜撰信息。
待分析资讯元信息:
资讯来源:{article.source}
资讯标题:{article.title}
发布时间:{article.published}
正文原文:
{raw_text}
严格遵循以下JSON格式输出,禁止输出任何额外解释、markdown、说明文字,仅返回纯JSON:
{{
"title": "20字以内精炼新闻标题",
"background": "50-100字事件行业背景梳理",
"core_facts": ["客观事实1", "客观事实2", "客观事实3"],
"analysis": "150-300字深度解读,包含事件成因、行业影响、短期发展预判",
"tags": ["行业标签1", "细分标签2", "热点标签3"],
"quality_score": 1-10区间资讯质量打分,信息越详实分数越高,
"read_time_minutes": 预估阅读时长(整数),
"key_conclusion": "30字以内全文核心总结",
"bias_signals": ["文章存在的立场偏向/信息片面点,无则填空数组"]
}}
"""
try:
# Token长度校验,超长自动截断上下文
token_encoder = tiktoken.encoding_for_model(self.model)
prompt_token_len = len(token_encoder.encode(analysis_prompt))
if prompt_token_len > 8000:
analysis_prompt = self._truncate_overlength_prompt(analysis_prompt, token_encoder)
# LLM接口调用,低温度保证输出稳定性
chat_resp = self.llm_client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": "专业客观的行业情报分析师,严格按照指定JSON格式输出,不附加任何多余文字。"},
{"role": "user", "content": analysis_prompt}
],
temperature=0.3,
max_tokens=self.max_output_tokens,
)
response_content = chat_resp.choices[0].message.content.strip()
# 剥离代码块标记,兼容模型输出```json包裹场景
if "```json" in response_content:
response_content = response_content.split("```json")[1].split("```")[0]
elif "```" in response_content:
response_content = response_content.split("```")[1].split("```")[0]
# JSON反序列化
analysis_result = json.loads(response_content.strip())
# 补充业务元数据
analysis_result["source"] = article.source
analysis_result["original_url"] = article.url
analysis_result["word_count"] = article.word_count
analysis_result["token_used"] = chat_resp.usage.total_tokens if chat_resp.usage else 0
return analysis_result
except json.JSONDecodeError:
return {"error": "LLM输出JSON格式解析失败"}
except Exception as err:
return {"error": f"接口调用异常:{str(err)}"}
def _truncate_overlength_prompt(self, prompt: str, encoder) -> str:
"""超长上下文截断,控制输入Token上限7000"""
line_list = prompt.split("")
final_lines = []
total_token = 0
for line in line_list:
line_token = len(encoder.encode(line))
if total_token + line_token > 7000:
break
final_lines.append(line)
total_token += line_token
return "".join(final_lines)
def batch_analysis(self, article_list: List[Article], request_delay: float = 0.5) -> List[Dict]:
"""批量资讯分析,限流间隔防接口封禁,统计总Token与调用成本"""
analysis_reports = []
total_consume_tokens = 0
for idx, article in enumerate(article_list):
print(f"正在处理 {idx + 1}/{len(article_list)} | 资讯标题:{article.title[:30]}...")
report = self.single_article_analysis(article)
if "error" not in report:
analysis_reports.append(report)
total_consume_tokens += report.get("token_used", 0)
print(f"处理成功 | 消耗Token:{report['token_used']} | 资讯质量分:{report['quality_score']}")
else:
print(f"处理失败 | 异常信息:{report['error'][:40]}")
time.sleep(request_delay)
# 接口成本概算(gpt-4o-mini 百万Token单价0.1元)
estimate_cost = total_consume_tokens / 1_000_000 * 0.1
print(f"批量分析完成:有效报告{len(analysis_reports)}份,总消耗Token {total_consume_tokens},预估调用成本 ¥{estimate_cost:.2f}")
return analysis_reports
```
**Prompt设计核心价值**
- **结构化输出约束**:强制JSON标准格式,省去后续文本解析、摘要拆分工作,直接对接报表导出。
- **多层情报维度**:除基础摘要外,增加事实提取、偏见识别、质量打分,完全满足行业情报研判需求。
- **低温度参数(0.3)**:降低模型随机发散,保证同类型资讯输出结构、尺度统一。
### (四)输出层:Markdown 标准化报表导出组件
将大模型生成的结构化分析报告批量渲染为可直接查阅的Markdown日报文档,按资讯质量分降序排序,区分高/中/低价值资讯标签,自动创建输出目录,适配日常情报归档、周报交付场景。
```python
import json
from datetime import datetime
import os
from typing import List, Dict
class DailyReportExporter:
"""结构化分析报告Markdown导出工具"""
@staticmethod
def export_markdown(report_list: List[Dict], output_path: str = "output/daily_report.md"):
# 文档头部元信息
md_content = [
f"# 行业资讯自动化情报日报 {datetime.now().strftime('%Y-%m-%d')}",
"",
f"> 本次有效分析资讯总量:{len(report_list)} 篇",
"",
]
# 按资讯质量分降序排序,高价值资讯前置
sorted_reports = sorted(report_list, key=lambda x: x.get("quality_score", 0), reverse=True)
for report in sorted_reports:
score = report.get("quality_score", 0)
# 分级标签标识
level_tag = "?高价值" if score >= 7 else "?中等价值" if score >= 5 else "⚪低价值"
md_content.append(f"## {level_tag} {report.get('title', '无标题资讯')}")
md_content.append("")
# 基础元数据
md_content.append(f"- 资讯来源:{report.get('source', '未知')} | 预估阅读时长:{report.get('read_time_minutes', 0)}分钟")
md_content.append(f"- 事件背景:{report.get('background', '')}")
md_content.append("")
# 核心事实清单
fact_list = report.get("core_facts", [])
if fact_list:
md_content.append("### 核心客观事实")
for fact in fact_list[:3]:
md_content.append(f"- {fact}")
md_content.append("")
# 深度分析文本
md_content.append(f"### 深度行业分析{report.get('analysis', '')}")
md_content.append("")
# 行业标签
tag_list = report.get("tags", [])
if tag_list:
md_content.append(f"**分类标签**:{', '.join(tag_list)}")
# 内容立场提示
bias_list = report.get("bias_signals", [])
if bias_list:
md_content.append(f"**内容立场提示**:{', '.join(bias_list)}")
# 分隔线
md_content.append("---")
md_content.append("")
# 自动创建输出目录
output_dir = os.path.dirname(output_path)
if output_dir and not os.path.exists(output_dir):
os.makedirs(output_dir, parents=True, exist_ok=True)
# 写入本地文件,UTF-8编码兼容中文
with open(output_path, "w", encoding="utf-8") as f:
f.write("".join(md_content))
print(f"情报日报已导出至路径:{output_path}")
```
## 四、全链路流水线调度入口
封装一体化执行函数,串联采集、过滤、LLM分析、报表导出全流程,支持批量RSS源批量订阅配置,可以直接定时调度运行。
```python
def run_daily_intelligence_pipeline(
feed_configs: List[tuple[str, str]],
llm_api_key: str,
min_quality_threshold: int = 5,
) -> List[Dict]:
"""
每日资讯流水线统一调度函数
feed_configs: RSS源配置数组,格式[(rss订阅链接, 资讯来源名称)]
min_quality_threshold: 最终保留资讯最低质量分数
"""
# 1. 多源资讯采集
fetcher = ArticleFetcher()
raw_article_pool = []
for feed_url, source_name in feed_configs:
raw_article_pool.extend(fetcher.fetch_rss(feed_url, source_name))
time.sleep(1)
# 2. SimHash去重与低质资讯过滤
dedup_engine = ContentDeduplicator()
valid_articles = dedup_engine.batch_filter(raw_article_pool)
# 3. LLM批量结构化深度分析
llm_analyzer = LLMDeepSummaryAnalyzer(api_key=llm_api_key)
full_reports = llm_analyzer.batch_analysis(valid_articles, request_delay=0.5)
# 4. 过滤低于阈值的低质量分析稿件
final_reports = [r for r in full_reports if r.get("quality_score", 0) >= min_quality_threshold]
# 5. 导出Markdown日报
DailyReportExporter.export_markdown(final_reports, "output/daily_report.md")
return final_reports
if __name__ == "__main__":
# 业务RSS订阅源配置示例
FEED_SOURCE_LIST = [
("https://feeds.feedburner.com/ruanyifeng", "阮一峰技术周刊"),
("https://www.36kr.com/feed", "36氪创投资讯"),
]
# 启动全链路流水线
result_reports = run_daily_intelligence_pipeline(
feed_configs=FEED_SOURCE_LIST,
llm_api_key="填入你的LLM接口密钥",
min_quality_threshold=5,
)
```
## 五、接口调用成本量化测算
以 `gpt-4o-mini` 模型、单批次处理50篇有效资讯为基准测算:
| 指标项 | 数值 |
|--------|------|
| 单篇资讯平均输入 Token | 800 |
| 单篇资讯平均输出 Token | 600 |
| 50篇资讯总消耗 Token | 70000 |
| 接口估算成本 | ¥0.07 |
整体调用成本极低。系统主要开销来自大模型API调用,无需额外存储或算力成本。日均500篇资讯处理量级,单日Token成本仅约0.7元,规模化落地性价比突出。
## 六、工程落地常见问题与优化方案
- **RSS源仅返回摘要、无完整正文**
解决方案:配套HTML全文采集兜底接口 `fetch_html_fulltext`,当RSS文本长度不足时自动触发页面全量爬取,补全资讯正文。
- **LLM返回内容包含代码标记,JSON解析失败**
优化方案:增加字符串分割逻辑剥离 ` ```json `代码块标记,Prompt强制约束仅输出纯JSON文本,双保险规避解析异常。
- **资讯质量打分浮动、跨模型不具备可比性**
说明:质量分为LLM主观研判指标,仅用于内部稿件分级筛选,不适合作为客观量化指标;切换模型时建议同步调整过滤阈值。
- **SimHash短文本误判重复**
调优方案:区分文本长度动态调整海明距离阈值,100-200字短资讯阈值下调至2,长深度资讯维持阈值3。
## 七、方案能力边界说明
- **事实校验局限**:LLM仅基于原文文本做信息萃取与解读,无法独立完成事实真伪核查。涉及重大行业事件、数据类情报,必须溯源原始稿件,人工复核。
- **成本线性增长**:资讯处理量与Token消耗、调用成本呈线性正相关。日均千篇以上大规模场景,可切换低成本 `gpt-3.5-turbo` 或前置轻量摘要预过滤,压缩文本长度。
- **时效性约束**:资讯更新频率完全依赖RSS源站推送策略,多数资讯平台RSS仅每日更新1次。对高实时性情报需求,需补充关键词搜索引擎轮询采集链路。