简介
本文主要介绍如何使用 ESP8266 连接MQTT服务器,利用 PubSubClient
库,实现客户端与 MQTT 服务器的连接、订阅、收发消息等功能。服务器使用EMQX为例。部分代码使用 PubSubClient
库自带的示例。
连接到MQTT服务器分为TCP方式连接和TLS/SSL方式连接,使用EMQX的公共服务器的话是TCP方式连接,使用EMQX的私有服务器为TLS/SSL方式连接。TCP和TLS/SSL连接的区别在于定义espClient的方式不同,连接服务器的端口不同,以及TLS/SSL需要填写服务器指纹,并使用espClient.setFingerprint(fingerprint);
设置指纹
EMQX公共服务器链接 ,使用说明 。
EMQX私有服务器链接 ,使用说明 ,私有服务器有一定的免费额度,每月1百万连接分钟数(大约23个设备持续在线连接1个月)和1G流量。
首先先在Arduino IDE的 管理库 中添加 PubSubClient
库
常用函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 WiFiClient espClient; WiFiClientSecure espClient; PubSubClient client (espClient) ; espClient.setFingerprint (fingerprint); client.setServer (mqtt_server, mqtt_port); client.setCallback (callback); client.connect (clientId.c_str ()); client.connect (clientId.c_str () , mqtt_name , mqtt_password); client.connected (); client.state (); client.publish (topic, "connected" ); client.subscribe (topic); client.subscribe (topic,qos); client.unsubscribe (topic); client.loop ();
头文件
1 2 #include <ESP8266WiFi.h> #include <PubSubClient.h>
定义基本信息
1 2 3 4 5 6 7 const char * ssid = "" ; const char * password = "" ; const char * mqtt_server = "" ; const int mqtt_port = 1883 ; const char * mqtt_name = "" ; const char * mqtt_password = "" ; const char * topic="" ;
MQTT连接用户名和密码都是可选的,具体要根据MQTT服务器。一般来说,使用TCP连接服务器,端口为1883;使用TLS/SSL连接服务器,端口为8883。
如果使用 EMQX的公共服务器,使用TCP连接,服务器地址为broker.emqx.io
,端口为1883,连接用户名为emqx
,密码为public
,也可以不填。
如果使用私有服务器,使用TLS/SSL连接,服务器地址根据控制台具体信息,端口为8883,用户名和密码自己在控制台里定义。
定义 espClient
和 client
。如果使用TCP连接,则按以下定义:
1 2 WiFiClient espClient; PubSubClient client (espClient) ;
TLS/SSL连接
如果使用TLS/SSL连接,则按以下连接:
设置服务器和证书:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 BearSSL::WiFiClientSecure espClient; PubSubClient mqtt_client (espClient) ;const char *ntp_server = "pool.ntp.org" ; const long gmt_offset_sec = 0 ; const int daylight_offset_sec = 0 ; 的DigiCert Global Root G2 static const char ca_cert[]PROGMEM = R"EOF( -----BEGIN CERTIFICATE----- MIIDjjCCAnagAwIBAgIQAzrx5qcRqaC7KGSxHQn65TANBgkqhkiG9w0BAQsFADBh MQswCQYDVQQGEwJVUzEVMBMGA1UEChMMRGlnaUNlcnQgSW5jMRkwFwYDVQQLExB3 d3cuZGlnaWNlcnQuY29tMSAwHgYDVQQDExdEaWdpQ2VydCBHbG9iYWwgUm9vdCBH MjAeFw0xMzA4MDExMjAwMDBaFw0zODAxMTUxMjAwMDBaMGExCzAJBgNVBAYTAlVT MRUwEwYDVQQKEwxEaWdpQ2VydCBJbmMxGTAXBgNVBAsTEHd3dy5kaWdpY2VydC5j b20xIDAeBgNVBAMTF0RpZ2lDZXJ0IEdsb2JhbCBSb290IEcyMIIBIjANBgkqhkiG 9w0BAQEFAAOCAQ8AMIIBCgKCAQEAuzfNNNx7a8myaJCtSnX/RrohCgiN9RlUyfuI 2/Ou8jqJkTx65qsGGmvPrC3oXgkkRLpimn7Wo6h+4FR1IAWsULecYxpsMNzaHxmx 1x7e/dfgy5SDN67sH0NO3Xss0r0upS/kqbitOtSZpLYl6ZtrAGCSYP9PIUkY92eQ q2EGnI/yuum06ZIya7XzV+hdG82MHauVBJVJ8zUtluNJbd134/tJS7SsVQepj5Wz tCO7TG1F8PapspUwtP1MVYwnSlcUfIKdzXOS0xZKBgyMUNGPHgm+F6HmIcr9g+UQ vIOlCsRnKPZzFBQ9RnbDhxSJITRNrw9FDKZJobq7nMWxM4MphQIDAQABo0IwQDAP BgNVHRMBAf8EBTADAQH/MA4GA1UdDwEB/wQEAwIBhjAdBgNVHQ4EFgQUTiJUIBiV 5uNu5g/6+rkS7QYXjzkwDQYJKoZIhvcNAQELBQADggEBAGBnKJRvDkhj6zHd6mcY 1Yl9PMWLSn/pvtsrF9+wX3N3KjITOYFnQoQj8kVnNeyIv/iPsGEMNKSuIEyExtv4 NeF22d+mQrvHRAiGfzZ0JFrabA0UWTW98kndth/Jsw1HKj2ZL7tcu7XUIOGZX1NG Fdtom/DzMNU+MeKNhJ7jitralj41E6Vf8PlwUHBHQRFXGU7Aj64GxJUTFy8bJZ91 8rGOmaFvE7FBcf6IKshPECBV1/MUReXgRPTqh5Uykw7+U0b6LJ3/iyK5S9kJRaTe pLiaWN0bfVKfjllDiIGknibVb63dDcY3fe0Dkhvld1927jyNxF1WW6LZZm6zNTfl MrY= -----END CERTIFICATE----- )EOF" ;
setup
函数
如果TLS/SSL连接,则需要使用 Client.setFingerprint(fingerprint);
设置指纹
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 void setup () { Serial.begin (115200 , SERIAL_8N1, SERIAL_TX_ONLY); pinMode (BUILTIN_LED, OUTPUT); Serial.begin (115200 ); setup_wifi (); client.setServer (mqtt_server, mqtt_port); client.setCallback (callback); }
建立WiFi连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 void setup_wifi () { digitalWrite (BUILTIN_LED, LOW); delay (10 ); Serial.println (); Serial.print ("Connecting to " ); Serial.println (ssid); WiFi.mode (WIFI_STA); WiFi.begin (ssid, password); while (WiFi.status () != WL_CONNECTED) { delay (500 ); Serial.print ("." ); } randomSeed (micros ()); Serial.println ("" ); Serial.println ("WiFi connected" ); Serial.println ("IP address: " ); Serial.println (WiFi.localIP ()); digitalWrite (BUILTIN_LED, HIGH); }
建立MQTT连接
建立MQTT连接以及断开后的重连,这里需要定义设备名, 设备名可以自己定义,也可以使用XX-随机数,XX-IP地址等,例如ESP8266-1898,ESP8266-123.456.7.891
如果存在相同的设备ID,那么所有拥有相同设备ID的设备将会不断的断开重连。所以建议在设备ID后添加上IP地址。
前面提过,连接用户名和密码可填可不填,如果使用用户名和密码,则使用 client.connect(clientId.c_str() , mqtt_name , mqtt_password)
来连接MQTT服务器,如果不使用用户名和密码,则使用 client.connect(clientId.c_str())
来连接。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 void reconnect () { while (!client.connected ()) { Serial.print ("Attempting MQTT connection..." ); String clientId = "ESP8266-" ; clientId += WiFi.localIP ().toString ().c_str (); if (client.connect (clientId.c_str () , mqtt_name , mqtt_password)) { Serial.println ("connected" ); client.publish (topic, "connected" ); client.subscribe (topic); } else { char err_buf[128 ]; espClient.getLastSSLError (err_buf, sizeof (err_buf)); Serial.print ("failed, rc=" ); Serial.print (client.state ()); Serial.print (" SSL error: " ); Serial.println (err_buf); Serial.println (" try again in 5 seconds" ); delay (5000 ); } } }
PubSubClient
库中 PubSubClient.h
对于状态码的定义
1 2 3 4 5 6 7 8 9 10 11 #define MQTT_CONNECTION_TIMEOUT -4 #define MQTT_CONNECTION_LOST -3 #define MQTT_CONNECT_FAILED -2 #define MQTT_DISCONNECTED -1 #define MQTT_CONNECTED 0 #define MQTT_CONNECT_BAD_PROTOCOL 1 #define MQTT_CONNECT_BAD_CLIENT_ID 2 #define MQTT_CONNECT_UNAVAILABLE 3 #define MQTT_CONNECT_BAD_CREDENTIALS 4 #define MQTT_CONNECT_UNAUTHORIZED 5
接收订阅的消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 void callback (char * topic, byte* payload, unsigned int length) { Serial.print ("Message arrived [" ); Serial.print (topic); Serial.print ("] " ); char msg[length]; for (int i = 0 ; i < length; i++) msg[i]=(char )payload[i]; snprintf (msg, length+1 , msg); Serial.println (msg); if (!strcmp (msg, "on" )) digitalWrite (BUILTIN_LED, LOW); if (!strcmp (msg, "off" )) digitalWrite (BUILTIN_LED, HIGH); }
loop函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 void loop () { if (!client.connected ()) { reconnect (); } client.loop (); unsigned long now = millis (); if (now - lastMsg > 2000 ) { lastMsg = now; ++value; snprintf (msg, MSG_BUFFER_SIZE, "hello world #%ld" , value); Serial.print ("Publish message: " ); Serial.println (msg); client.publish (topic, msg); } }
完整代码示例
include <ESP8266WiFi.h> #include <PubSubClient.h> const char * ssid = "" ; const char * password = "" ; const char * mqtt_server = "broker.emqx.io" ; const int mqtt_port = 1883 ; const char * mqtt_name = "emqx" ; const char * mqtt_password = "public" ; const char * topic="testtopic" ; WiFiClient espClient; PubSubClient client (espClient) ;unsigned long lastMsg = 0 ;#define MSG_BUFFER_SIZE (50) char msg[MSG_BUFFER_SIZE];int value = 0 ;void setup () { Serial.begin (115200 , SERIAL_8N1, SERIAL_TX_ONLY); pinMode (BUILTIN_LED, OUTPUT); Serial.begin (115200 ); setup_wifi (); client.setServer (mqtt_server, mqtt_port); client.setCallback (callback); } void setup_wifi () { digitalWrite (BUILTIN_LED, LOW); delay (10 ); Serial.println (); Serial.print ("Connecting to " ); Serial.println (ssid); WiFi.mode (WIFI_STA); WiFi.begin (ssid, password); while (WiFi.status () != WL_CONNECTED) { delay (500 ); Serial.print ("." ); } randomSeed (micros ()); Serial.println ("" ); Serial.println ("WiFi connected" ); Serial.println ("IP address: " ); Serial.println (WiFi.localIP ()); digitalWrite (BUILTIN_LED, HIGH); } void reconnect () { while (!client.connected ()) { Serial.print ("Attempting MQTT connection..." ); String clientId = "ESP8266-" ; clientId += WiFi.localIP ().toString ().c_str (); if (client.connect (clientId.c_str () , mqtt_name , mqtt_password)) { Serial.println ("connected" ); client.publish (topic, "connected" ); client.subscribe (topic); } else { char err_buf[128 ]; espClient.getLastSSLError (err_buf, sizeof (err_buf)); Serial.print ("failed, rc=" ); Serial.print (client.state ()); Serial.print (" SSL error: " ); Serial.println (err_buf); Serial.println (" try again in 5 seconds" ); delay (5000 ); } } } void callback (char * topic, byte* payload, unsigned int length) { Serial.print ("Message arrived [" ); Serial.print (topic); Serial.print ("] " ); char msg[length]; for (int i = 0 ; i < length; i++) msg[i]=(char )payload[i]; snprintf (msg, length+1 , msg); Serial.println (msg); if (!strcmp (msg, "on" )) digitalWrite (BUILTIN_LED, LOW); if (!strcmp (msg, "off" )) digitalWrite (BUILTIN_LED, HIGH); } void loop () { if (!client.connected ()) { reconnect (); } client.loop (); unsigned long now = millis (); if (now - lastMsg > 2000 ) { lastMsg = now; ++value; snprintf (msg, MSG_BUFFER_SIZE, "hello world #%ld" , value); Serial.print ("Publish message: " ); Serial.println (msg); client.publish (topic, msg); } }