您还记得上一次深入编码会议,失去时间的踪迹,只是为了获得纯粹的乐趣吗?这与截止日期,效率甚至完美无关。在发现新的道路的同时,这是实验,破坏和重建的诱惑。
今天,我将带您进行一次激动人心的编码冒险,灵感来自LinkedIn代码片段,在那里我与FastAPI,River,Watchdog和Tenacity纠结在一起。准备好?系好!
- 免责声明:
For the adventurous spirit: Don't attempt this at home `{home == production}`. For clarity, the code has been condensed into three files.
曾经是fastapi ...
当我的眼睛抓到一个迷人的Mantisnlp的LinkedIn帖子时,这一切就开始了。该帖子展示了用于教育目的的FastAPI
代码片段。让我感兴趣的是迅速的模型重装。在进行ML预测的摘要中,作者首先加载了模型,然后执行了预测。
这激发了我的问题:我可以根据事件更改修改API的模型状态吗?如果可以实现,我们可以提供一种机器学习模型,该模型是壳或半训练的,并且会随着时间的流逝而继续学习?
使用企鹅分类数据集作为我的画布,我设想了这样的东西:
from pydantic import BaseModel
from fastapi import FastAPI
app = FastAPI(title="😂 Pure Joy")
class Attributes(BaseModel):
island:str
bill_length_mm: float | None
bill_depth_mm: float | None
flipper_length_mm: float | None
body_mass_g: float | None
sex: str | None
class LearnAttributes(BaseModel):
attributes: Attributes
species: str
@app.on_event("startup")
def startup_event():
# check and use if the ml model exists else create one
...
@app.post("/predict")
def predict(attributes: Attributes) -> dict:
# predict
...
@app.post("/learn")
def learn(learn_attributes: LearnAttributes) -> dict
# learn and update the ml model
...
游泳学习河
,但是只是调整API感觉太简单了。我希望API和机器学习模型具有动态性,并在同时发展。输入River
,这是一种用于在线培训的工具制作的工具。随着新数据的流入,该模型不断完善其技能。
这是一个基本模型:
# ml.py
from river import compose
from river import preprocessing, stats
from river import naive_bayes
def penguins_model():
island_transformation = compose.Select("island") | preprocessing.OneHotEncoder(
drop_first=True
)
sex_transformation = (
compose.Select("sex")
| preprocessing.StatImputer(("sex", stats.Mode()))
| preprocessing.OneHotEncoder(drop_first=True)
)
numeric_transformation = compose.Select(
"bill_length_mm",
"bill_depth_mm",
"flipper_length_mm",
"body_mass_g",
) | preprocessing.StatImputer(
("bill_length_mm", stats.Mean()),
("bill_depth_mm", stats.Mean()),
("flipper_length_mm", stats.Mean()),
("body_mass_g", stats.Mean()),
)
model = (
island_transformation + sex_transformation + numeric_transformation
| naive_bayes.MultinomialNB(alpha=1)
)
return model
将乐高零件放在一个完整的可测试代码中。
from pathlib import Path
import pickle
...
MODEL_FILE = "model/naive.pickle"
app = FastAPI(title="😂 Pure Joy")
...
@app.on_event("startup")
def startup_event():
model_file = Path(MODEL_FILE)
if not model_file.exists():
from ml import penguins_model
app.state.ml = penguins_model()
else:
app.state.ml = pickle.loads(model_file.read_bytes())
@app.post("/predict")
def predict(attributes: Attributes) -> str | None:
X = attributes.model_dump()
return app.state.ml.predict_one(X)
@app.post("/learn")
def learn(learn_attributes: LearnAttributes) -> dict[str, str]:
X = learn_attributes.attributes.model_dump()
y = learn_attributes.species
y_pred = app.state.ml.predict_one(X)
app.state.ml.learn_one(X, y)
Path(MODEL_FILE).write_bytes(pickle.dumps(app.state.ml)
return {"status": f"we learned {y}. We initially predicted {y_pred}"}
繁荣!那很简单。让我们运行
uvicorn app:app --reload
ðurray...代码无缝地工作。也就是说,直到多名工人进入竞争,导致意外的pandemonium。为什么?原因在于uvicorn
的工人:每个人都独立运作,造成了混乱,尤其是在执行时:
uvicorn app:app --reload --workers 4
为了更好地说明难题,让我们介绍有关预测和学习的计数器:
...
@app.on_event("startup")
def startup_event():
model_file = Path(MODEL_FILE)
if not model_file.exists():
...
ml.meta = {"predicted": 0,
"learned": 0}
app.state.ml = ml
else:
app.state.ml = pickle.loads(model_file.read_bytes())
@app.post("/predict")
def predict(attributes: Attributes) -> dict[str, str | int | None]:
X = attributes.model_dump()
y_pred = app.state.ml.predict_one(X)
app.state.ml.meta["predicted"] += 1
return {"predicted": y_pred,
**app.state.ml.meta,}
@app.post("/learn")
def learn(learn_features: LearnAttributes) -> dict[str, str|int]:
...
app.state.ml.meta["learned"] += 1
Path(MODEL_FILE).write_bytes(pickle.dumps(app.state.ml)
return {"status": f"we learned {y}. We initially predicted {y_pred}",
**app.state.ml.meta,}
预测和学习的统计是遥不可及的。
ð我的喜悦短暂。核心问题是,我们的ML模型的知识并没有统一分布在工人之间。感觉就像在厨房里有几个厨师,每个厨师都增加了独特的味道,没有意识到其他人的贡献。
看门狗的树皮和咬人
那是Watchdog
吠叫到现场的时候。它的活动处理功能确保了机器学习模型更新根据工人的文件更改而无缝地发生。让我们添加修改。
...
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
MODEL_FILE = "model/naive.pickle"
app = FastAPI(title="😂 Pure Joy")
class FileHandler(FileSystemEventHandler):
def on_modified(self, event):
print(f"path={event.src_path} event={event.event_type}")
app.state.ml = pickle.loads(Path(MODEL_FILE).read_bytes())
...
handler = FileHandler()
observer = Observer()
@app.on_event("startup")
def startup_event():
model_file = Path(MODEL_FILE)
if not model_file.exists():
...
else:
...
observer.schedule(handler, path=MODEL_FILE, recursive=False)
observer.start()
@app.on_event("shutdown")
def shutdown_event():
observer.stop()
observer.join()
@app.post("/predict")
def predict(attributes: Attributes) -> dict[str, str | int | None]:
...
@app.post("/learn")
def learn(learn_attributes: LearnAttributes) -> dict[str, str | int]:
...
ðhurray!再次起作用。但是watchdog
有怪癖。它咬回去。工人仍然偶尔会发生碰撞,每个人都大声疾呼以更新模型,有时会发现“使用”错误的文件。至少可以说是混乱的。但是,这不是什么娱乐编码?面临挑战并找到创造性的解决方案。
救援的坚韧!
那是Tenacity
进入的地方。与其让工人冲突并放弃,坚韧使他们……好吧,顽强!它为他们配备了智能重试。如果工人面临“使用中的”错误,则耐心地等待并重新进行了重试,以确保最终考虑每个更新。
我们将更新机器学习文件以进行加载和节省,以增加顽强性。更重要的是,我们可以将代码移动以稍微清理。最终代码看起来像:
将模式删除
# schemas.py
from pydantic import BaseModel
class Attributes(BaseModel):
island: str
bill_length_mm: float | None
bill_depth_mm: float | None
flipper_length_mm: float | None
body_mass_g: float | None
sex: str | None
class LearnAttributes(BaseModel):
attributes: Attributes
species: str
class Predictions(BaseModel):
species: str | None
predicted: int
learned: int
class Learnings(BaseModel):
status: str
predicted: int
learned: int
添加顽强的型号IO(输入和输出)功能
# ml.py
import pickle
from pathlib import Path
from typing import Literal
...
from tenacity import retry, retry_if_exception_type
...
@retry(retry=retry_if_exception_type(IOError))
def ml_io(
model_file: Path,
mode: Literal["wb", "rb"] = "rb",
ml_object: compose.Pipeline | None = None,
):
if mode == "rb" and not model_file.exists():
ml = penguins_model()
ml.meta = {
"predicted": 0,
"learned": 0,
}
# save the first local copy (god like ...)
model_file.write_bytes(pickle.dumps(ml_object))
return ml
elif mode == "rb" and model_file.exists():
return pickle.loads(model_file.read_bytes())
elif mode == "wb":
return model_file.write_bytes(pickle.dumps(ml_object))
else:
NotImplemented(f"mode can only be `wb` or `rb`")
和我们的应用程序:
# app.py
from pathlib import Path
from fastapi import FastAPI
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from ml import ml_io
from schemas import Attributes, LearnAttributes, Predictions, Learnings
MODEL_FILE = "model/naive.pickle"
app = FastAPI(title="😂 Pure Joy")
class FileHandler(FileSystemEventHandler):
def on_modified(self, event):
# print(f"path={event.src_path} event={event.event_type}")
app.state.ml = ml_io(model_file=Path(MODEL_FILE), mode="rb")
handler = FileHandler()
observer = Observer()
@app.on_event("startup")
def startup_event():
model_file = Path(MODEL_FILE)
app.state.ml = ml_io(model_file=model_file, mode="rb")
observer.schedule(handler, path=MODEL_FILE, recursive=False)
observer.start()
@app.on_event("shutdown")
def shutdown_event():
observer.stop()
observer.join()
@app.post("/predict")
def predict(attributes: Attributes) -> Predictions:
X = attributes.model_dump()
y_pred = app.state.ml.predict_one(X)
app.state.ml.meta["predicted"] += 1
return {
"species": y_pred,
**app.state.ml.meta,
}
@app.post("/learn")
def learn(learn_attributes: LearnAttributes) -> Learnings:
X = learn_attributes.attributes.model_dump()
y = learn_attributes.species
y_pred = app.state.ml.predict_one(X)
app.state.ml.learn_one(X, y)
app.state.ml.meta["learned"] += 1
ml_io(model_file=Path(MODEL_FILE), mode="wb", ml_object=app.state.ml)
return {
"status": f"learned {y}. Initially predicted {y_pred}",
**app.state.ml.meta,
}
我们故事的日落
这一旅程并不是要创建生产代码。通过挑战和解决方案,这是一次激动人心的冒险,这证明了编码乐趣的美感。我试验,学会,弄坏了东西,并最终建造了一些 Magnificent 。
最大的收获?拥抱混乱和好奇心。陶醉于不可预测性。请记住,编码乐趣始终很有趣。如有疑问,要顽强!
在此之前,请继续编码