轻松的数据存储:MongoDB数据库和Raspberry Pi pico W演练 - 第2部分(CRUD)
#raspberrypi #mongodb #micropython


了解如何使用MongoDB和Raspberry Pi pico W使用其数据API使用基本的CRUD操作。在开始之前,请确保您观看第1部分。在本教程中,我们使用BME280传感器来获取温度,传感器和湿度数据以在我们的数据库中工作。实际上,您可以使用自己喜欢的任何传感器!

另外,如果没有,请务必订阅并支持该频道!

订阅:

Youtube

支持:

https://www.buymeacoffee.com/mmshilleh

步骤1-)代码

from machine import Pin, I2C, RTC
import json
import urequests as requests
import network
import ntptime
import time
import utime

import bme280

import constants


i2c = I2C(0,sda=Pin(0), scl=Pin(1), freq=400000)
URL = "<url>"
API_KEY = "<api key>"

def connect_to_wifi(ssid, psk):
    wlan = network.WLAN(network.STA_IF)
    wlan.active(True)
    wlan.connect(ssid, psk)

    while not wlan.isconnected() and wlan.status() >= 0:
        print("Waiting to Connect")
        time.sleep(10)
    if not wlan.isconnected():
        raise Exception("Wifi not available")
    print("Connected to WiFi")


def findOne(filter_dictionary):
    try:
        headers = { "api-key": API_KEY }
        searchPayload = {
            "dataSource": "Cluster0",
            "database": "BME280",
            "collection": "Readings",
            "filter": filter_dictionary,
        }
        response = requests.post(URL + "findOne", headers=headers, json=searchPayload)
        print("Response: (" + str(response.status_code) + "), msg = " + str(response.text))
        if response.status_code >= 200 and response.status_code < 300:
            print("Success Response")
        else:
            print(response.status_code)
            print("Error")
        response.close()
    except Exception as e:
        print(e)


def find(filter_dictionary):
    try:
        headers = { "api-key": API_KEY }
        searchPayload = {
            "dataSource": "Cluster0",
            "database": "BME280",
            "collection": "Readings",
            "filter": filter_dictionary,
        }
        response = requests.post(URL + "find", headers=headers, json=searchPayload)
        print("Response: (" + str(response.status_code) + "), msg = " + str(response.text))
        if response.status_code >= 200 and response.status_code < 300:
            print("Success Response")
        else:
            print(response.status_code)
            print("Error")
        response.close()
    except Exception as e:
        print(e)


def insertOne(temp, pressure, humidity, time):
    try:
        headers = { "api-key": API_KEY }
        documentToAdd = {"Device": "BME280",
                         "Temperature (C)": temp,
                         "Pressure": pressure,
                         "Humidity": humidity,
                         "Time": time}
        insertPayload = {
            "dataSource": "Cluster0",
            "database": "BME280",
            "collection": "Readings",
            "document": documentToAdd,
        }
        response = requests.post(URL + "insertOne", headers=headers, json=insertPayload)
        print(response)
        print("Response: (" + str(response.status_code) + "), msg = " + str(response.text))
        if response.status_code >= 200 and response.status_code < 300:
            print("Success Response")
        else:
            print(response.status_code)
            print("Error")
        response.close()
    except Exception as e:
        print(e)


def insertMany(document_list):
    try:
        headers = { "api-key": API_KEY }
        insertPayload = {
            "dataSource": "Cluster0",
            "database": "BME280",
            "collection": "Readings",
            "documents": document_list,
        }
        response = requests.post(URL + "insertMany", headers=headers, json=insertPayload)
        print("Response: (" + str(response.status_code) + "), msg = " + str(response.text))
        if response.status_code >= 200 and response.status_code < 300:
            print("Success Response")
        else:
            print(response.status_code)
            print("Error")
        response.close()
    except Exception as e:
        print(e)


def updateOne(filter_dictionary, update_dict):
    try:
        headers = { "api-key": API_KEY }
        update = {"set": update_dict}
        searchPayload = {
            "dataSource": "Cluster0",
            "database": "BME280",
            "collection": "Readings",
            "filter": filter_dictionary,
            "update": update_dict,
        }
        response = requests.post(URL + "updateOne", headers=headers, json=searchPayload)
        print("Response: (" + str(response.status_code) + "), msg = " + str(response.text))
        if response.status_code >= 200 and response.status_code < 300:
            print("Success Response")
        else:
            print(response.status_code)
            print("Error")
        response.close()
    except Exception as e:
        print(e)


def deleteOne(filter_dictionary):
    try:
        headers = { "api-key": API_KEY }
        searchPayload = {
            "dataSource": "Cluster0",
            "database": "BME280",
            "collection": "Readings",
            "filter": filter_dictionary,
        }
        response = requests.post(URL + "delete", headers=headers, json=searchPayload)
        print("Response: (" + str(response.status_code) + "), msg = " + str(response.text))
        if response.status_code >= 200 and response.status_code < 300:
            print("Success Response")
        else:
            print(response.status_code)
            print("Error")
        response.close()
    except Exception as e:
        print(e)


def main():
    connect_to_wifi(constants.INTERNET_NAME, constants.INTERNET_PASSWORD)
    #document_list = []
    while True:
        bme = bme280.BME280(i2c=i2c)
        temp, pressure, humidity = bme.values
        print(temp, pressure, humidity)
        rtc_time_tuple = RTC().datetime()
        formatted_time = "{:04}-{:02}-{:02} {:02}:{:02}:{:02}".format(
            rtc_time_tuple[0], rtc_time_tuple[1], rtc_time_tuple[2], 
            rtc_time_tuple[4], rtc_time_tuple[5], rtc_time_tuple[6]
        )
        print(formatted_time)

        #insertOne(temp, pressure, humidity, formatted_time)

        #document_list.append({"Device": "BME280",
            #"Temperature (C)": temp,
            #"Pressure": pressure,
            #"Humidity": humidity,
            #"Time": formatted_time}
        #)
        #if len(document_list) == 10:
           #print(json.dumps(document_list))
           #insertMany(document_list)
           #document_list = []
        #findOne({"Temperature (C)": "23.26C", "Humidity": "53.69%"})
        #find({"Temperature (C)": "24.65C"})
        #updateOne({"Temperature (C)": "23.26C"}, {"Temperature (C)": "24.26C"})
        deleteOne({"Temperature (C)": "24.26C"})

main()

这是我们在本教程中使用的代码,我们会评论主要功能的部分和不注重部分,以激活不同的功能;这在YouTube视频中更为明显。另外,请注意,可以大量清理代码,重复很多,主要用于演示目的。

我们定义的功能如下:

insertone(params):在此功能中,我们从传感器传递值。我们还从RTC()中传递了DateTime,这是一个实时时钟对象,可以知道何时记录数据点。尽管我们在本教程中没有使用它,但是最好在BME280数据上使用时间戳。我们利用附加到URL上的操作插入机,并通过函数中定义的标题和有效载荷传递。

insertmany(params):在此功能中,我们传递了一个值列表。在这种情况下,这是字典列表。这使我们能够将数据点插入批处理,这比一次在第一个功能中执行的数据要快得多。我们将限制设置为代码中批处理大小的限制,但是您可以通过更改IF-Statement中的数字来扩展数据点的大小。

findOne(params):在此功能中,我们传递到包含我们正在寻找的数据点的属性的字典中。在这种情况下,我们正在搜索一个数据点,我们添加的数据点的温度为23.26C,湿度为53.69%,如果有一个以上的数据点具有此类值,则它将返回它找到的第一个值!您可以根据需要定义搜索参数。我们可以看到,我们在有效载荷中使用了FindOne扩展程序以及传递过滤器词典。

findmany(params):在此功能中,我们传递到包含我们再次寻找数据点的属性的字典中。在这种情况下,我们可以获取与相应值相比而不是单点的数据点列表。唯一的区别是动作,而不是发现的。这对于查找一系列数据点很有用。

updateOne(params):在此功能中,除了更新字典外,我们还传递了过滤词典。 MongoDB基于过滤器搜索数据点,并根据我们在更新中定义的内容进行相应替换。有关此信息的更多信息,请参见上面的视频。当然,像其他功能一样,我们也可以进行基于批处理的操作。

deleteone(params):在此功能中,我们像其他功能一样传递过滤器。 MongoDB搜索数据点并根据过滤器永久删除它。它只会删除一个点,如果您想删除更多,请使用删除。在操作中要小心,因为您无法逆转操作。

总体而言,使用API​​的操作非常相似,只是URL扩展的差异以及有效负载中的值略有不同。他们在这里的文档中有更多信息。但是,这应该为您提供在与数据API一起使用MongoDB时要做的大多数事情的基础。

结论
希望您喜欢迄今为止的教程和该系列。 MongoDB使免费使用其云存储非常容易!对于要存储和访问缓慢移动数据的情况,这可能非常有用。不要忘记订阅上面的频道,并继续关注更多编码和与微控制器相关的内容。