一、介绍
本项目采用ESP32开发板,每X秒采集一次DTH11温湿度传感器数据,通过ESP32自带WIFI,经MQTT协议传送给ActiveMQ中间件,便于后续开发和使用。
二、材料介绍
ESP32 CP2102 带WIFI 22元
DTH11 4元
透明防水塑料壳 8元
MicroUSB电源线 5元
三、ActiveMQ服务器配置
我这里使用docker一键部署ActiveMQ,并将MQTT端口映射到1883端口
四、硬件安装
将DTH11的正负连接ESP32的3.3V和GND针脚,数据连接至D13针脚
五、代码开发
1、ESP32刷写为MPython开发环境
2、新建main.py,此脚本ESP32上电即会自动运行
main.py
主程序
import random
from machine import Pin, SPI
import time,machine
import dht11,network,mrequests
from umqtt.simple import MQTTClient
import ujson
MQTT_CLIENT_ID = f'python-mqtt-{random.randint(0, 1000)}'
MQTT_BROKER = "10.0.10.129"
MQTT_USER = ""
MQTT_PASSWORD = ""
MQTT_TOPIC = "mqtt001"
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
dht = dht11.DHT11(Pin(13))
print("Connecting to MQTT server... ", end="")
client = MQTTClient(MQTT_CLIENT_ID, MQTT_BROKER, user=MQTT_USER, password=MQTT_PASSWORD)
print("Connected!")
while True:
if wlan.isconnected() == False:
print("connect WiFi")
try:
wlan.connect('WiFissid','wifipassword')
except:
print("wifi false")
#pass
else:
dht.measure()
try:
client.connect()
message = ujson.dumps({
"temp": dht.temperature(),
"humidity": dht.humidity(),
})
#print("Reporting to MQTT topic {}: {}".format(MQTT_TOPIC, message))
client.publish(MQTT_TOPIC, message)
print("mqtt push ok")
client.disconnect()
except:
print("mqtt push not ok")
#pass
time.sleep(3)
dth11.py
DTH11传感器支持库
# DHT11/DHT22 driver for MicroPython on ESP8266
# MIT license; Copyright (c) 2016 Damien P. George
try:
from esp import dht_readinto
except:
from pyb import dht_readinto
class DHTBase:
def __init__(self, pin):
self.pin = pin
self.buf = bytearray(5)
def measure(self):
buf = self.buf
dht_readinto(self.pin, buf)
if (buf[0] + buf[1] + buf[2] + buf[3]) & 0xff != buf[4]:
raise Exception("checksum error")
class DHT11(DHTBase):
def humidity(self):
return self.buf[0]
def temperature(self):
return self.buf[2]
class DHT22(DHTBase):
def humidity(self):
return (self.buf[0] << 8 | self.buf[1]) * 0.1
def temperature(self):
t = ((self.buf[2] & 0x7f) << 8 | self.buf[3]) * 0.1
if self.buf[2] & 0x80:
t = -t
return t
mrequests.py
一个比自带的requests更好用的网络库
"""A HTTP client module for MicroPython with an API similar to requests."""
import sys
try:
import socket
except ImportError:
import usocket as socket
MICROPY = sys.implementation.name == "micropython"
MAX_READ_SIZE = 4 * 1024
def encode_basic_auth(user, password):
from ubinascii import b2a_base64
auth_encoded = b2a_base64(b"%s:%s" % (user, password)).rstrip(b"\n")
return {b"Authorization": b"Basic %s" % auth_encoded}
def head(url, **kw):
return request("HEAD", url, **kw)
def get(url, **kw):
return request("GET", url, **kw)
def post(url, **kw):
return request("POST", url, **kw)
def put(url, **kw):
return request("PUT", url, **kw)
def patch(url, **kw):
return request("PATCH", url, **kw)
def delete(url, **kw):
return request("DELETE", url, **kw)
def parse_url(url):
port = None
host = None
# str.partition() would be handy here,
# but it's not supported on the esp8266 port
delim = url.find("//")
if delim >= 0:
scheme, loc = url[:delim].rstrip(':'), url[delim+2:]
else:
loc = url
scheme = ""
psep = loc.find("/")
if psep == -1:
if scheme:
host = loc
path = "/"
else:
path = loc
elif psep == 0:
path = loc
else:
path = loc[psep:]
host = loc[:psep]
if host:
hsep = host.rfind(":")
if hsep > 0:
port = int(host[hsep + 1 :])
host = host[:hsep]
return scheme or None, host, port, path
class RequestContext:
def __init__(self, url, method=None):
self.redirect = False
self.method = method or "GET"
self.scheme, self.host, self._port, self.path = parse_url(url)
if not self.scheme or not self.host:
raise ValueError("An absolute URL is required.")
@property
def port(self):
return self._port if self._port is not None else 443 if self.scheme == "https" else 80
@property
def url(self):
return "%s://%s%s" % (
self.scheme,
self.host if self._port is None else ("%s:%s" % (self.host, self.port)),
self.path,
)
def set_location(self, status, location):
if status in (301, 302, 307, 308):
self.redirect = True
elif status == 303 and self.method != "GET":
self.redirect = True
if self.redirect:
scheme, host, port, path = parse_url(location)
if scheme and self.scheme == "https" and scheme != "https":
self.redirect = False
return
if status not in (307, 308) and self.method != "HEAD":
self.method = "GET"
if scheme:
self.scheme = scheme
if host:
self.host = host
self._port = port
if path.startswith("/"):
self.path = path
else:
self.path = self.path.rsplit("/", 1)[0] + "/" + path
class Response:
def __init__(self, sock, sockfile, save_headers=False):
self.sock = sock
self.sf = sockfile
self.encoding = "utf-8"
self._cached = None
self._chunk_size = 0
self._content_size = 0
self.chunked = False
self.status_code = None
self.reason = ""
self.headers = [] if save_headers else None
def read(self, size=MAX_READ_SIZE):
if self.chunked:
if self._chunk_size == 0:
l = self.sf.readline()
# print("Chunk line:", l)
# ignore chunk extensions
l = l.split(b";", 1)[0]
self._chunk_size = int(l, 16)
# print("Chunk size:", self._chunk_size)
if self._chunk_size == 0:
# End of message
sep = sf.read(2)
if sep != b"\r\n":
raise ValueError("Expected final chunk separator, read %r instead." % sep)
return b""
data = self.sf.read(min(size, self._chunk_size))
self._chunk_size -= len(data)
if self._chunk_size == 0:
sep = self.sf.read(2)
if sep != b"\r\n":
raise ValueError("Expected chunk separator, read %r instead." % sep)
return data
else:
if size:
return self.sf.read(size)
else:
return self.sf.read(self._content_size)
def save(self, fn, chunk_size=1024):
read = 0
with open(fn, "wb") as fp:
while True:
remain = self._content_size - read
if remain <= 0:
break
chunk = self.read(min(chunk_size, remain))
read += len(chunk)
if not chunk:
break
fp.write(chunk)
self.close()
def _parse_header(self, data):
if data[:18].lower() == b"transfer-encoding:" and b"chunked" in data[18:]:
self.chunked = True
# print("Chunked response detected.")
elif data[:15].lower() == b"content-length:":
self._content_size = int(data.split(b":", 1)[1])
# print("Content length: %i" % self._content_size)
# overwrite this method, if you want to process/store headers differently
def add_header(self, data):
self._parse_header(data)
if self.headers is not None:
self.headers.append(data)
def close(self):
if not MICROPY:
self.sf.close()
self.sf = None
if self.sock:
self.sock.close()
self.sock = None
self._cached = None
@property
def content(self):
if self._cached is None:
try:
self._cached = self.read(size=None)
finally:
self.sock.close()
self.sock = None
return self._cached
@property
def text(self):
return str(self.content, self.encoding)
def json(self):
import ujson
return ujson.loads(self.content)
def request(
method,
url,
data=None,
json=None,
headers={},
auth=None,
encoding=None,
response_class=Response,
save_headers=False,
max_redirects=1,
timeout=None,
):
if auth:
headers.update(auth if callable(auth) else encode_basic_auth(auth[0], auth[1]))
if json is not None:
assert data is None
import ujson
data = ujson.dumps(json)
ctx = RequestContext(url, method)
while True:
if ctx.scheme not in ("http", "https"):
raise ValueError("Protocol scheme %s not supported." % ctx.scheme)
ctx.redirect = False
# print("Resolving host address...")
ai = socket.getaddrinfo(ctx.host, ctx.port, 0, socket.SOCK_STREAM)
ai = ai[0]
# print("Creating socket...")
sock = socket.socket(ai[0], ai[1], ai[2])
sock.settimeout(timeout)
try:
# print("Connecting to %s:%i..." % (ctx.host, ctx.port))
sock.connect(ai[-1])
if ctx.scheme == "https":
try:
import ssl
except ImportError:
import ussl as ssl
# print("Wrapping socket with SSL")
create_ctx = getattr(ssl, 'create_default_context', None)
if create_ctx:
sock = create_ctx().wrap_socket(sock, server_hostname=ctx.host)
else:
sock = ssl.wrap_socket(sock, server_hostname=ctx.host)
sf = sock if MICROPY else sock.makefile("rwb")
sf.write(b"%s %s HTTP/1.1\r\n" % (ctx.method.encode("ascii"), ctx.path.encode("ascii")))
if not b"Host" in headers:
sf.write(b"Host: %s\r\n" % ctx.host.encode())
for k, val in headers.items():
sf.write(k if isinstance(k, bytes) else k.encode('ascii'))
sf.write(b": ")
sf.write(val if isinstance(val, bytes) else val.encode('ascii'))
sf.write(b"\r\n")
if data and ctx.method not in ("GET", "HEAD"):
if json is not None:
sf.write(b"Content-Type: application/json")
if encoding:
sf.write(b"; charset=%s" % encoding.encode())
sf.write(b"\r\n")
sf.write(b"Content-Length: %d\r\n" % len(data))
sf.write(b"Connection: close\r\n\r\n")
if data and ctx.method not in ("GET", "HEAD"):
sf.write(data if isinstance(data, bytes) else data.encode(encoding or "utf-8"))
if not MICROPY:
sf.flush()
resp = response_class(sock, sf, save_headers=save_headers)
l = b""
i = 0
while True:
l += sf.read(1)
i += 1
if l.endswith(b"\r\n") or i > MAX_READ_SIZE:
break
# print("Response: %s" % l.decode("ascii"))
l = l.split(None, 2)
resp.status_code = int(l[1])
if len(l) > 2:
resp.reason = l[2].rstrip()
while True:
l = sf.readline()
if not l or l == b"\r\n":
break
if l.startswith(b"Location:"):
ctx.set_location(resp.status_code, l[9:].strip().decode("ascii"))
# print("Header: %r" % l)
resp.add_header(l)
except OSError:
sock.close()
raise
if ctx.redirect:
# print("Redirect to: %s" % ctx.url)
sock.close()
max_redirects -= 1
if max_redirects < 0:
raise ValueError("Maximum redirection count exceeded.")
else:
break
return resp
六、运行程序
上电直接运行,注意,代码可以去掉终端输出的print,防止ESP32内存溢出
七、监测程序
登录ActiveMQ查看
mqttserver.py
在任意电脑上订阅数据
# python3.6
import random
import time
from paho.mqtt import client as mqtt_client
broker = '10.0.10.129'
port = 1883
topic = "mqtt001"
# generate client ID with pub prefix randomly
client_id = f'python-mqtt-{random.randint(0, 100)}'
def connect_mqtt() -> mqtt_client:
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id)
client.on_connect = on_connect
client.connect(broker, port)
return client
def subscribe(client: mqtt_client):
def on_message(client, userdata, msg):
print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic "+time.asctime())
client.subscribe(topic)
client.on_message = on_message
def run():
client = connect_mqtt()
subscribe(client)
client.loop_forever()
if __name__ == '__main__':
run()
八、小结
ESP32 MPython太强大了!
评论