意外的预期惊悚片:编码好奇心的故事
#python #machinelearning #fastapi

您还记得上一次深入编码会议,失去时间的踪迹,只是为了获得纯粹的乐趣吗?这与截止日期,效率甚至完美无关。在发现新的道路的同时,这是实验,破坏和重建的诱惑。

今天,我将带您进行一次激动人心的编码冒险,灵感来自LinkedIn代码片段,在那里我与FastAPIRiverWatchdogTenacity纠结在一起。准备好?系好!

- 免责声明:

For the adventurous spirit: Don't attempt this at home `{home == production}`. For clarity, the code has been condensed into three files.

曾经是fastapi ...

LinkedInPost

当我的眼睛抓到一个迷人的Mantisnlp的LinkedIn帖子时,这一切就开始了。该帖子展示了用于教育目的的FastAPI代码片段。让我感兴趣的是迅速的模型重装。在进行ML预测的摘要中,作者首先加载了模型,然后执行了预测。

这激发了我的问题:我可以根据事件更改修改API的模型状态吗?如果可以实现,我们可以提供一种机器学习模型,该模型是壳或半训练的,并且会随着时间的流逝而继续学习?

使用企鹅分类数据集作为我的画布,我设想了这样的东西:

from pydantic import BaseModel
from fastapi import FastAPI


app = FastAPI(title="😂 Pure Joy")

class Attributes(BaseModel):
    island:str
    bill_length_mm: float | None
    bill_depth_mm: float | None
    flipper_length_mm: float | None
    body_mass_g: float | None
    sex: str | None

class LearnAttributes(BaseModel):
    attributes: Attributes
    species: str

@app.on_event("startup")
def startup_event():
    # check and use if the ml model exists else create one
    ...


@app.post("/predict")
def predict(attributes: Attributes) -> dict:

    # predict
    ...

@app.post("/learn")
def learn(learn_attributes: LearnAttributes) -> dict
    # learn and update the ml model
    ...

游泳学习河

,但是只是调整API感觉太简单了。我希望API和机器学习模型具有动态性,并在同时发展。输入River,这是一种用于在线培训的工具制作的工具。随着新数据的流入,该模型不断完善其技能。

这是一个基本模型:

# ml.py
from river import compose
from river import preprocessing, stats
from river import naive_bayes


def penguins_model():
    island_transformation = compose.Select("island") | preprocessing.OneHotEncoder(
        drop_first=True
    )

    sex_transformation = (
        compose.Select("sex")
        | preprocessing.StatImputer(("sex", stats.Mode()))
        | preprocessing.OneHotEncoder(drop_first=True)
    )

    numeric_transformation = compose.Select(
        "bill_length_mm",
        "bill_depth_mm",
        "flipper_length_mm",
        "body_mass_g",
    ) | preprocessing.StatImputer(
        ("bill_length_mm", stats.Mean()),
        ("bill_depth_mm", stats.Mean()),
        ("flipper_length_mm", stats.Mean()),
        ("body_mass_g", stats.Mean()),
    )

    model = (
        island_transformation + sex_transformation + numeric_transformation
        | naive_bayes.MultinomialNB(alpha=1)
    )

    return model

modelflow

将乐高零件放在一个完整的可测试代码中。

from pathlib import Path
import pickle
...

MODEL_FILE = "model/naive.pickle"
app = FastAPI(title="😂 Pure Joy")

...

@app.on_event("startup")
def startup_event():
    model_file = Path(MODEL_FILE)
    if not model_file.exists():
        from ml import penguins_model

        app.state.ml = penguins_model()
    else:
        app.state.ml = pickle.loads(model_file.read_bytes())


@app.post("/predict")
def predict(attributes: Attributes) -> str | None:
    X = attributes.model_dump()
    return app.state.ml.predict_one(X)


@app.post("/learn")
def learn(learn_attributes: LearnAttributes) -> dict[str, str]:
    X = learn_attributes.attributes.model_dump()
    y = learn_attributes.species

    y_pred = app.state.ml.predict_one(X)
    app.state.ml.learn_one(X, y)

    Path(MODEL_FILE).write_bytes(pickle.dumps(app.state.ml)

    return {"status": f"we learned {y}. We initially predicted {y_pred}"}

繁荣!那很简单。让我们运行

uvicorn app:app --reload

ðurray...代码无缝地工作。也就是说,直到多名工人进入竞争,导致意外的pandemonium。为什么?原因在于uvicorn的工人:每个人都独立运作,造成了混乱,尤其是在执行时:

uvicorn app:app --reload --workers 4

为了更好地说明难题,让我们介绍有关预测和学习的计数器:

...

@app.on_event("startup")
def startup_event():
    model_file = Path(MODEL_FILE)
    if not model_file.exists():
       ...
        ml.meta = {"predicted": 0,
                   "learned": 0}

        app.state.ml = ml
    else:
        app.state.ml = pickle.loads(model_file.read_bytes())


@app.post("/predict")
def predict(attributes: Attributes) -> dict[str, str | int | None]:
    X = attributes.model_dump()
    y_pred  = app.state.ml.predict_one(X)
    app.state.ml.meta["predicted"] += 1

    return {"predicted": y_pred,
            **app.state.ml.meta,}


@app.post("/learn")
def learn(learn_features: LearnAttributes) -> dict[str, str|int]:
    ...

    app.state.ml.meta["learned"] += 1

    Path(MODEL_FILE).write_bytes(pickle.dumps(app.state.ml)

    return {"status": f"we learned {y}. We initially predicted {y_pred}",
            **app.state.ml.meta,}

messy

预测和学习的统计是遥不可及的。

ð我的喜悦短暂。核心问题是,我们的ML模型的知识并没有统一分布在工人之间。感觉就像在厨房里有几个厨师,每个厨师都增加了独特的味道,没有意识到其他人的贡献。


看门狗的树皮和咬人

那是Watchdog吠叫到现场的时候。它的活动处理功能确保了机器学习模型更新根据工人的文件更改而无缝地发生。让我们添加修改。

...
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler


MODEL_FILE = "model/naive.pickle"
app = FastAPI(title="😂 Pure Joy")


class FileHandler(FileSystemEventHandler):
    def on_modified(self, event):
        print(f"path={event.src_path} event={event.event_type}")
        app.state.ml = pickle.loads(Path(MODEL_FILE).read_bytes())

...

handler = FileHandler()
observer = Observer()


@app.on_event("startup")
def startup_event():
    model_file = Path(MODEL_FILE)
    if not model_file.exists():
        ...
    else:
        ...

    observer.schedule(handler, path=MODEL_FILE, recursive=False)
    observer.start()


@app.on_event("shutdown")
def shutdown_event():
    observer.stop()
    observer.join()


@app.post("/predict")
def predict(attributes: Attributes) -> dict[str, str | int | None]:
   ...


@app.post("/learn")
def learn(learn_attributes: LearnAttributes) -> dict[str, str | int]:
   ...

unmessy

ðhurray!再次起作用。但是watchdog有怪癖。它咬回去。工人仍然偶尔会发生碰撞,每个人都大声疾呼以更新模型,有时会发现“使用”错误的文件。至少可以说是混乱的。但是,这不是什么娱乐编码?面临挑战并找到创造性的解决方案。


救援的坚韧!

那是Tenacity进入的地方。与其让工人冲突并放弃,坚韧使他们……好吧,顽强!它为他们配备了智能重试。如果工人面临“使用中的”错误,则耐心地等待并重新进行了重试,以确保最终考虑每个更新。

我们将更新机器学习文件以进行加载和节省,以增加顽强性。更重要的是,我们可以将代码移动以稍微清理。最终代码看起来像:

将模式删除

# schemas.py
from pydantic import BaseModel


class Attributes(BaseModel):
    island: str
    bill_length_mm: float | None
    bill_depth_mm: float | None
    flipper_length_mm: float | None
    body_mass_g: float | None
    sex: str | None


class LearnAttributes(BaseModel):
    attributes: Attributes
    species: str


class Predictions(BaseModel):
    species: str | None
    predicted: int
    learned: int


class Learnings(BaseModel):
    status: str
    predicted: int
    learned: int

添加顽强的型号IO(输入和输出)功能

# ml.py
import pickle
from pathlib import Path
from typing import Literal

...
from tenacity import retry, retry_if_exception_type

...

@retry(retry=retry_if_exception_type(IOError))
def ml_io(
    model_file: Path,
    mode: Literal["wb", "rb"] = "rb",
    ml_object: compose.Pipeline | None = None,
):
    if mode == "rb" and not model_file.exists():
        ml = penguins_model()
        ml.meta = {
            "predicted": 0,
            "learned": 0,
        }
        # save the first local copy (god like ...)
        model_file.write_bytes(pickle.dumps(ml_object))

        return ml

    elif mode == "rb" and model_file.exists():
        return pickle.loads(model_file.read_bytes())

    elif mode == "wb":
        return model_file.write_bytes(pickle.dumps(ml_object))
    else:
        NotImplemented(f"mode can only be `wb` or `rb`")

和我们的应用程序:

# app.py
from pathlib import Path

from fastapi import FastAPI
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

from ml import ml_io
from schemas import Attributes, LearnAttributes, Predictions, Learnings


MODEL_FILE = "model/naive.pickle"
app = FastAPI(title="😂 Pure Joy")


class FileHandler(FileSystemEventHandler):
    def on_modified(self, event):
        # print(f"path={event.src_path} event={event.event_type}")
        app.state.ml = ml_io(model_file=Path(MODEL_FILE), mode="rb")


handler = FileHandler()
observer = Observer()


@app.on_event("startup")
def startup_event():
    model_file = Path(MODEL_FILE)
    app.state.ml = ml_io(model_file=model_file, mode="rb")

    observer.schedule(handler, path=MODEL_FILE, recursive=False)
    observer.start()


@app.on_event("shutdown")
def shutdown_event():
    observer.stop()
    observer.join()


@app.post("/predict")
def predict(attributes: Attributes) -> Predictions:
    X = attributes.model_dump()
    y_pred = app.state.ml.predict_one(X)
    app.state.ml.meta["predicted"] += 1

    return {
        "species": y_pred,
        **app.state.ml.meta,
    }


@app.post("/learn")
def learn(learn_attributes: LearnAttributes) -> Learnings:
    X = learn_attributes.attributes.model_dump()
    y = learn_attributes.species

    y_pred = app.state.ml.predict_one(X)
    app.state.ml.learn_one(X, y)

    app.state.ml.meta["learned"] += 1

    ml_io(model_file=Path(MODEL_FILE), mode="wb", ml_object=app.state.ml)

    return {
        "status": f"learned {y}. Initially predicted {y_pred}",
        **app.state.ml.meta,
    }

我们故事的日落

这一旅程并不是要创建生产代码。通过挑战和解决方案,这是一次激动人心的冒险,这证明了编码乐趣的美感。我试验,学会,弄坏了东西,并最终建造了一些 Magnificent

最大的收获?拥抱混乱和好奇心。陶醉于不可预测性。请记住,编码乐趣始终很有趣。如有疑问,要顽强!

在此之前,请继续编码