描述
在本文中,我们将创建一个网站,该网站汇总了Hacker News的最新顶级故事。
为此,我们将利用 hackernews api 来获取当今的顶级故事。此外,我们将提出 OpenAI API 请求通过主题对新闻文章进行分组,并以JSON格式存储结果。该网站将使用 fastapi 和 jinja模板引擎
步骤1.从黑客新闻中获取热门故事
要查看完整的代码列表,请检查
worker.py
文件in the GitHub repo
首先,让我们以整数列表的形式获取故事ID
def get_topstories(max_stories=30):
# Get top stories
topstories = requests.get("https://hacker-news.firebaseio.com/v0/topstories.json")
if (code := topstories.status_code) != 200:
raise ValueError(f"topstories status code: {code}")
topstories_ids = topstories.json()
# Filter stores
return topstories_ids[:max_stories] # i.e. [3000, 3004, 3051]
可能会指出的是,我们将限制由max_stories=30
参数分析的故事数
棘手的部分是如何执行所有30个请求async。我们将使用aiohttp并创建helpers.py
文件以在下面添加函数:
import aiohttp
import asyncio
BATCH_SIZE = 15
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.json()
async def process_batch(session, urls):
tasks = []
for url in urls:
task = asyncio.ensure_future(fetch_url(session, url))
tasks.append(task)
return await asyncio.gather(*tasks)
async def process_urls(urls, batch_size=BATCH_SIZE):
async with aiohttp.ClientSession() as session:
batches = [urls[i : i + batch_size] for i in range(0, len(urls), batch_size)]
results = []
for batch in batches:
batch_results = await process_batch(session, batch)
results.extend(batch_results)
return results
现在,我们可以将URL的列表传递到process_urls
中,以使用异步方法处理所有请求。
让我们使用list_of_items
准备URL:
def get_items(list_of_items: List[int], batch_size=12):
# Prepare API requests to get all items
URL_ITEM = "https://hacker-news.firebaseio.com/v0/item/{}.json"
urls = [URL_ITEM.format(t_s) for t_s in list_of_items]
loop = asyncio.get_event_loop()
results = loop.run_until_complete(process_urls(urls, batch_size))
return results
list_of_items = get_topstories()
results = get_items(list_of_items)
# Now we have a list of urls:
# ["https://hacker-news.firebaseio.com/v0/item/3001.json",
# "https://hacker-news.firebaseio.com/v0/item/4001.json",
# ...]
接下来,我们将将检索的结果转换为一种易于解析的格式,该格式对于ChatGpt请求。我们将保留标题和URL字段,因为URL可以为分类元素提供宝贵的见解。
results_parsed = [
f"{el['title']} URL: {el['url']}"
for el in results if el.get("url", None) is not None
]
# The result will be:
# ["The Password Game URL: https://neal.fun/password-game/",
# "FreeBSD Jails Containers URL: https://vermaden.wordpress.com/2023/06/28/freebsd-jails-containers/"
# ...]
步骤2.提出OpenAI API请求和过程结果
首先,让我们创建一个名为get_openai_promt
的函数。它将List[str]
作为输入并返回system_message
和user_message
(我们将使用聊天优化的模型)
from typing import List, Tuple
def get_openai_prompt(topics: List[str]) -> Tuple[dict, dict]:
system_message = {
"role": "system",
"content": (
"You are an assistant that can group news articles from hackernews (news.ycombinator.com) into topics"
),
}
user_message = {
"role": "user",
"content": (
"Group the following news articles into topics\n\n"
+ topics
+ "\n\nUse the following format:\n"
+ "topic_name_1\n- title\turl\n- title\turl\ntopic_name_2\n\ttitle\turl"
),
}
return system_message, user_message
下一步是通过API请求OpenAI,解析响应并将其保存为.json
文件
import openai
topics = "\n\n".join(results_parsed)
s_m, u_m = get_openai_prompt(topics=topics) # system & user messages
# Get an API-key here: https://platform.openai.com/account/api-keys
openai.api_key = "sk-74xTNuflpF3CtQAdOeD3T3BlXkFJhYw70q1XYJKxqq0XdBZS"
# Get response from the model
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[s_m, u_m],
max_tokens=2200, # You can increase this number if needed
)
# Get a body of the response
res = response["choices"][0]["message"]["content"].split("\n")
# Parse results
# Sometimes response may be structured in different
current_topic = None
dict_ = {}
titles_returned = {}
for l in res:
if l == "\n": # We will ignore empty strings
continue
if not ("http://" in l.lower() or "https://" in l.lower()):
# If there is no link in the string it means that the string is a "topic"
current_topic = l
continue
# Otherwise current string is a title that contains a link as well
if current_topic not in dict_:
dict_[current_topic] = {}
pattern = r"- (.+?)\s*URL:"
pattern2 = r"- (.+?)\s*http"
match = re.search(pattern, l)
match2 = re.search(pattern2, l)
if match:
substring = str(match.group(1))
titles_returned[substring] = current_topic
elif match2:
substring = str(match2.group(1))
titles_returned[substring] = current_topic
else:
print(l)
data = {}
for r in results:
if "url" not in r or "score" not in r:
print("Skip")
continue
data[r["title"]] = {"url": r["url"], "score": r["score"]}
for k in data:
if k in titles_returned:
data[k]["topic"] = titles_returned[k]
continue
data[k]["topic"] = "Other"
prefix = datetime.datetime.now().strftime("%Y-%m-%d")
fname = f"data/{prefix}_articles.json"
json.dump(data, open(fname, "w"))
该脚本将生成类似于下面的JSON:
{
"A proto-pizza emerges from a fresco on a Pompeii wall":{
"url":"http://pompeiisites.org/en/comunicati/pompeii-a-still-life-discovered-by-the-new-excavations-of-regio-ix/",
"score":93,
"topic":"news"
},
"The hidden cost of air quality monitoring":{
"url":"https://www.airgradient.com/blog/hidden-costs-of-air-quality-monitoring/",
"score":395,
"topic":"news"
},
"The Password Game":{
"url":"https://neal.fun/password-game/",
"score":929,
"topic":"lifestyle"
},
"FreeBSD Jails Containers":{
"url":"https://vermaden.wordpress.com/2023/06/28/freebsd-jails-containers/",
"score":164,
"topic":"technology"
},
"What AMD Learned from Its Big Chiplet Push":{
"url":"https://spectrum.ieee.org/chiplet",
"score":38,
"topic":"technology"
},
"In deep space, astronomers spot precursor of carbon based life":{
"url":"https://www.theregister.com/2023/06/27/jwst_carbon_molecule_discovery/",
"score":39,
"topic":"Other"
}
}
我们现在准备在我们的网站中使用此JSON
步骤3.网站(Fastapi + Jinja模板)
要查看完整的代码列表,请检查
app/app.py
文件in the GitHub repo
让S创建app.py
文件中的app
文件夹
import json
from collections import defaultdict
import uvicorn
from fastapi import FastAPI, Request
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
import glob
app = FastAPI()
app.mount("/static", StaticFiles(directory="static"), name="static")
templates = Jinja2Templates(directory="templates")
@app.get("/")
def get_articles(request: Request):
fname = sorted(glob.glob("data/*_articles.json"), reverse=True)[0]
with open(fname, "r") as json_file:
articles = json.load(json_file)
grouped_articles = {}
for title, article in articles.items():
topic = article["topic"]
if topic in grouped_articles:
grouped_articles[topic][title] = article
else:
grouped_articles[topic] = {title: article}
# Calculate total score for each topic/group
topic_scores = defaultdict(lambda: 0)
for topic, data in articles.items():
topic_scores[data["topic"]] += data["score"]
return templates.TemplateResponse(
"index.html",
{
"request": request,
"articles": grouped_articles,
"topic_scores": topic_scores,
},
)
if __name__ == "__main__":
uvicorn.run("app:app", host="127.0.0.1", port=5556, reload=True)
,避免每次从文件系统中读取.json
文件,而是将其保存在内存中,偶尔对其进行更新是理想的选择。但是,为了简单起见,我们选择了完成其任务的最基本代码。我们预计网站的负载将是最小的,每秒不到一个请求(RPS)
现在让我们准备index.html
和styles.css
文件
index.html
<!DOCTYPE html>
<html>
<head>
<title>betterhacker.news</title>
<link rel="icon" type="image/x-icon" href="static/favicon.ico">
<link rel="stylesheet" href="static/styles.css">
<meta property="og:title" content="betterhacker.news">
<meta property="og:description" content="It is like hackernews, but better">
<meta property="og:type" content="website">
<meta property="og:url" content="https://betterhacker.news">
</head>
<body>
<div class="container">
<div class="main-title">betterhacker.news<div class="main-subtitle">Hackernews Top Stories grouped using modern LLMs (ChatGPT)</div></div>
{% for topic, data in articles.items() %}
<div class="column">
<h2 class="topic">{{ topic }} // {{ topic_scores[topic] }} ❤️🔥</h2>
<ul>
{% for title, article in data.items() %}
<li>
<div class="title">
<a href="{{ article.url }}">{{ title }}</a>
<span class="score"> {{ article.score }} <span class="emoji">❤️</span></span>
</div>
</li>
{% endfor %}
</ul>
</div>
{% endfor %}
</div>
<div class="footer">
Created by <a href="https://olegkhomenko.me" class="footer-link">Oleg Khomenko</a>
</div>
</body>
</html>
styles.css
body {
font-family: Arial, sans-serif;
margin: 0;
padding: 20px;
}
.container {
display: flex; justify-content: space-between;
flex-wrap: wrap; max-width: 1200px;
margin: 0 auto; background-color: #fff;
box-shadow: 0 2px 4px rgba(0, 0, 0, 0.1);
border-radius: 5px; padding: 30px;
}
.column {
flex-basis: 48%;
}
.topic {
font-size: 20px; font-weight: bold; margin-top: 20px;
}
ul {
list-style-type: none; padding: 0; margin: 0;
}
li {
margin-bottom: 10px;
}
.title {
display: flex;
align-items: center;
}
.emoji {
margin-right: 5px;
font-size: 0.8em;
}
.score {
font-size: 0.8em;
color: gray;
margin-left: 5px;
}
.main-title {
text-align: center; font-size: 32px;
font-weight: bold; margin-bottom: 40px;
}
.main-subtitle {
text-align: center; font-size: 18px;
color: rgba(128, 128, 128, 0.8);
}
a {
text-decoration: none;
color: #007bff;
}
a:hover {
text-decoration: underline;
}
.footer {
text-align: center; margin-top: 40px; font-size: 14px; color: rgba(0, 0, 0, 0.6);
}
.footer-link {
color: #007bff;
}
.footer-link:hover {
text-decoration: underline;
}
@media (max-width: 600px) {
.column {
flex-basis: 100%;
}
}
步骤4.运行并查看结果
要同时运行两个脚本,Web服务器的app.py
和用于与外部API进行交互的worker.py
,我们可以使用 tmux
要运行服务器,使用以下命令
uvicorn app.app:app --port 5556
要运行工人,请使用下面的命令
while true; do python3 worker.py; ls data/*; sleep 12h; done
现在,您可以打开喜欢的浏览器并测试结果:http://localhost:5556
或在https://betterhacker.news