前言
本篇我们实现 AHT10 温湿度传感器采集周围环境数据,并设置定时采集功能,将数据上报至物联网平台。
准备工作
在上一篇文章中只是简单提了一下设备侧的硬件,下面补上实物图,以供参考。
OrangePi 3B 开发板,这里我已经装好了底座,接上了天线。
连接传感器
参照开发板引脚图,一一对应连接。这里我选择的是I2C4,其中传感器的VIN口连接开发板上3.3V接口。
注意不要接错了,以免烧坏传感器和开发板。连接完成后如图所示。
启用I2C配置
安装传感器后,上电开机。
由于系统默认关闭I2C接口,需要我们手动开启。执行以下命令(普通用户请加 sudo 权限):
sudo orangepi-config
移动方向键,选取要启用的 I2C 配置。这里我启用的是 I2C4。
保存后返回,随后重启系统。
sudo reboot
测试连接
使用 i2cdetect 命令检测设备是否连接到 i2c4。
sudo i2cdetect -y 4
编写测量模块
这里是最关键的一步,传感器测量到的数据并不是以十进制数据返回的,需要我们从设备上读取,进行解码操作。
安装 smbus2 库。
pip install smbus2
导入相关库。
import time
import smbus2
定义地址读写相关常量。
AHT10_ADDRESS = 0x38
AHT_TEMPERATURE_CONST = 200
AHT_TEMPERATURE_OFFSET = 50
CMD_INITIALIZE = [0xE1, 0x08, 0x00]
CMD_RESET = 0xBA
CMD_MEASURE = [0xAC, 0x33, 0x00]
创建i2c接口对象。
class AHT10:
def __init__(self):
self.i2c = smbus2.SMBus(4)
self.address = AHT10_ADDRESS
self.aht10_init()
self.raw = []
self.result = [0, 0]
初始化传感器。
def aht10_init(self):
self.aht10_reset()
self.i2c.write_i2c_block_data(self.address, 0x70, CMD_INITIALIZE)
复位。
def aht10_reset(self):
self.i2c.write_byte_data(self.address, 0x70, CMD_RESET)
读数据,这里需要延时,时间不宜太短。
def read_raw(self):
self.i2c.write_i2c_block_data(self.address, 0x70, CMD_MEASURE)
time.sleep(0.2)
self.raw = self.i2c.read_i2c_block_data(self.address, 0x71, 6)
self.result[0] = self.raw[1] << 12 | self.raw[2] << 4 | self.raw[3] >> 4
self.result[1] = (self.raw[3] & 0x0F) << 16 | self.raw[4] << 8 | self.raw[5]
处理湿度数据。
@property
def humidity(self):
self.read_raw()
return (self.result[0] / 0x100000) * 100
处理温度数据。
@property
def temperature(self):
self.read_raw()
return (self.result[1] / 0x100000) * AHT_TEMPERATURE_CONST - AHT_TEMPERATURE_OFFSET
完整代码如下。
import time
import smbus2
AHT10_ADDRESS = 0x38
AHT_TEMPERATURE_CONST = 200
AHT_TEMPERATURE_OFFSET = 50
CMD_INITIALIZE = [0xE1, 0x08, 0x00]
CMD_RESET = 0xBA
CMD_MEASURE = [0xAC, 0x33, 0x00]
class AHT10:
def __init__(self):
self.i2c = smbus2.SMBus(4)
self.address = AHT10_ADDRESS
self.aht10_init()
self.raw = []
self.result = [0, 0]
def aht10_init(self):
self.aht10_reset()
self.i2c.write_i2c_block_data(self.address, 0x70, CMD_INITIALIZE)
def aht10_reset(self):
self.i2c.write_byte_data(self.address, 0x70, CMD_RESET)
def read_raw(self):
self.i2c.write_i2c_block_data(self.address, 0x70, CMD_MEASURE)
time.sleep(0.2)
self.raw = self.i2c.read_i2c_block_data(self.address, 0x71, 6)
self.result[0] = self.raw[1] << 12 | self.raw[2] << 4 | self.raw[3] >> 4
self.result[1] = (self.raw[3] & 0x0F) << 16 | self.raw[4] << 8 | self.raw[5]
@property
def humidity(self):
self.read_raw()
return (self.result[0] / 0x100000) * 100
@property
def temperature(self):
self.read_raw()
return (self.result[1] / 0x100000) * AHT_TEMPERATURE_CONST - AHT_TEMPERATURE_OFFSET
测量数据
先读取数据,再获取温湿度值。
if __name__ == "__main__":
aht10 = AHT10()
aht10.read_raw()
print(f"Humidity:{aht10.humidity}")
print(f"Temperature:{aht10.temperature}")
将文件通过SSH上传至开发板。随后执行py文件,查看测量结果。
python3 aht10.py
对接物联网平台
在设备侧还需做以下修改:
定义上传至平台的函数。
def uploadAHT10DataThread(self):
payload = {
"温度": round(self.aht10.temperature, 1),
"湿度": round(self.aht10.humidity, 1)
}
self.publish(self.TOPIC_REPORT_PROPERTY, payload)
设置定时任务,使用到了 schedule 模块。
schedule.every(10).minutes.do(client.uploadAHT10DataThread)
while True:
schedule.run_pending()
time.sleep(1)
以上方式实现了每十分钟收集一次温湿度数据,并上报至平台。
查看上报数据
版权声明
本文系作者原创文章,未经允许不得转载,否则保留追究法律责任的权利。https://www.scott-sloan.cn/archives/261/