简介
本文主要介绍如何使用 ESP8266 连接MQTT服务器,利用 PubSubClient
库,实现客户端与 MQTT 服务器的连接、订阅、收发消息等功能。服务器使用EMQX为例。部分代码使用 PubSubClient
库自带的示例。
连接到MQTT服务器分为TCP方式连接和TLS/SSL方式连接,使用EMQX的公共服务器的话是TCP方式连接,使用EMQX的私有服务器为TLS/SSL方式连接。TCP和TLS/SSL连接的区别在于定义espClient的方式不同,连接服务器的端口不同,以及TLS/SSL需要填写服务器指纹,并使用espClient.setFingerprint(fingerprint);
设置指纹
EMQX公共服务器链接 ,使用说明 。
EMQX私有服务器链接 ,使用说明 ,私有服务器有一定的免费额度,每月1百万连接分钟数(大约23个设备持续在线连接1个月)和1G流量。
首先先在Arduino IDE的 管理库 中添加 PubSubClient
库
常用函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 WiFiClient espClient; WiFiClientSecure espClient; PubSubClient client (espClient) ; espClient.setFingerprint (fingerprint); client.setServer (mqtt_server, mqtt_port); client.setCallback (callback); client.connect (clientId.c_str ()); client.connect (clientId.c_str () , mqtt_name , mqtt_password); client.connected (); client.state (); client.publish (topic, "connected" ); client.subscribe (topic); client.subscribe (topic,qos); client.unsubscribe (topic); client.loop ();
头文件
1 2 #include <ESP8266WiFi.h> #include <PubSubClient.h>
定义基本信息
1 2 3 4 5 6 7 const char * ssid = "" ; const char * password = "" ; const char * mqtt_server = "" ; const int mqtt_port = 1883 ; const char * mqtt_name = "" ; const char * mqtt_password = "" ; const char * topic="" ;
MQTT连接用户名和密码都是可选的,具体要根据MQTT服务器。一般来说,使用TCP连接服务器,端口为1883;使用TLS/SSL连接服务器,端口为8883。
如果使用 EMQX的公共服务器,使用TCP连接,服务器地址为broker.emqx.io
,端口为1883,连接用户名为emqx
,密码为public
,也可以不填。
如果使用私有服务器,使用TLS/SSL连接,服务器地址根据控制台具体信息,端口为8883,用户名和密码自己在控制台里定义。
定义 espClient
和 client
。如果使用TCP连接,则按以下定义:
1 2 WiFiClient espClient; PubSubClient client (espClient) ;
TLS/SSL连接
如果使用TLS/SSL连接,则按以下连接:
设置服务器和证书:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 BearSSL::WiFiClientSecure espClient; PubSubClient mqtt_client (espClient) ;const char *ntp_server = "pool.ntp.org" ; const long gmt_offset_sec = 0 ; const int daylight_offset_sec = 0 ; 的DigiCert Global Root G2 static const char ca_cert[]PROGMEM = R"EOF( -----BEGIN CERTIFICATE----- MIIDjjCCAnagAwIBAgIQAzrx5qcRqaC7KGSxHQn65TANBgkqhkiG9w0BAQsFADBh MQswCQYDVQQGEwJVUzEVMBMGA1UEChMMRGlnaUNlcnQgSW5jMRkwFwYDVQQLExB3 d3cuZGlnaWNlcnQuY29tMSAwHgYDVQQDExdEaWdpQ2VydCBHbG9iYWwgUm9vdCBH MjAeFw0xMzA4MDExMjAwMDBaFw0zODAxMTUxMjAwMDBaMGExCzAJBgNVBAYTAlVT MRUwEwYDVQQKEwxEaWdpQ2VydCBJbmMxGTAXBgNVBAsTEHd3dy5kaWdpY2VydC5j b20xIDAeBgNVBAMTF0RpZ2lDZXJ0IEdsb2JhbCBSb290IEcyMIIBIjANBgkqhkiG 9w0BAQEFAAOCAQ8AMIIBCgKCAQEAuzfNNNx7a8myaJCtSnX/RrohCgiN9RlUyfuI 2/Ou8jqJkTx65qsGGmvPrC3oXgkkRLpimn7Wo6h+4FR1IAWsULecYxpsMNzaHxmx 1x7e/dfgy5SDN67sH0NO3Xss0r0upS/kqbitOtSZpLYl6ZtrAGCSYP9PIUkY92eQ q2EGnI/yuum06ZIya7XzV+hdG82MHauVBJVJ8zUtluNJbd134/tJS7SsVQepj5Wz tCO7TG1F8PapspUwtP1MVYwnSlcUfIKdzXOS0xZKBgyMUNGPHgm+F6HmIcr9g+UQ vIOlCsRnKPZzFBQ9RnbDhxSJITRNrw9FDKZJobq7nMWxM4MphQIDAQABo0IwQDAP BgNVHRMBAf8EBTADAQH/MA4GA1UdDwEB/wQEAwIBhjAdBgNVHQ4EFgQUTiJUIBiV 5uNu5g/6+rkS7QYXjzkwDQYJKoZIhvcNAQELBQADggEBAGBnKJRvDkhj6zHd6mcY 1Yl9PMWLSn/pvtsrF9+wX3N3KjITOYFnQoQj8kVnNeyIv/iPsGEMNKSuIEyExtv4 NeF22d+mQrvHRAiGfzZ0JFrabA0UWTW98kndth/Jsw1HKj2ZL7tcu7XUIOGZX1NG Fdtom/DzMNU+MeKNhJ7jitralj41E6Vf8PlwUHBHQRFXGU7Aj64GxJUTFy8bJZ91 8rGOmaFvE7FBcf6IKshPECBV1/MUReXgRPTqh5Uykw7+U0b6LJ3/iyK5S9kJRaTe pLiaWN0bfVKfjllDiIGknibVb63dDcY3fe0Dkhvld1927jyNxF1WW6LZZm6zNTfl MrY= -----END CERTIFICATE----- )EOF" ;
setup
函数
如果TLS/SSL连接,则需要使用 Client.setFingerprint(fingerprint);
设置指纹
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 void setup () { Serial.begin (115200 , SERIAL_8N1, SERIAL_TX_ONLY); pinMode (BUILTIN_LED, OUTPUT); Serial.begin (115200 ); setup_wifi (); client.setServer (mqtt_server, mqtt_port); client.setCallback (callback); }
建立WiFi连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 void setup_wifi () { digitalWrite (BUILTIN_LED, LOW); delay (10 ); Serial.println (); Serial.print ("Connecting to " ); Serial.println (ssid); WiFi.mode (WIFI_STA); WiFi.begin (ssid, password); while (WiFi.status () != WL_CONNECTED) { delay (500 ); Serial.print ("." ); } randomSeed (micros ()); Serial.println ("" ); Serial.println ("WiFi connected" ); Serial.println ("IP address: " ); Serial.println (WiFi.localIP ()); digitalWrite (BUILTIN_LED, HIGH); }
建立MQTT连接
建立MQTT连接以及断开后的重连,这里需要定义设备名, 设备名可以自己定义,也可以使用XX-随机数,XX-IP地址等,例如ESP8266-1898,ESP8266-123.456.7.891
如果存在相同的设备ID,那么所有拥有相同设备ID的设备将会不断的断开重连。所以建议在设备ID后添加上IP地址。
前面提过,连接用户名和密码可填可不填,如果使用用户名和密码,则使用 client.connect(clientId.c_str() , mqtt_name , mqtt_password)
来连接MQTT服务器,如果不使用用户名和密码,则使用 client.connect(clientId.c_str())
来连接。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 void reconnect () { while (!client.connected ()) { Serial.print ("Attempting MQTT connection..." ); String clientId = "ESP8266-" ; clientId += WiFi.localIP ().toString ().c_str (); if (client.connect (clientId.c_str () , mqtt_name , mqtt_password)) { Serial.println ("connected" ); client.publish (topic, "connected" ); client.subscribe (topic); } else { char err_buf[128 ]; espClient.getLastSSLError (err_buf, sizeof (err_buf)); Serial.print ("failed, rc=" ); Serial.print (client.state ()); Serial.print (" SSL error: " ); Serial.println (err_buf); Serial.println (" try again in 5 seconds" ); delay (5000 ); } } }
PubSubClient
库中 PubSubClient.h
对于状态码的定义
1 2 3 4 5 6 7 8 9 10 11 #define MQTT_CONNECTION_TIMEOUT -4 #define MQTT_CONNECTION_LOST -3 #define MQTT_CONNECT_FAILED -2 #define MQTT_DISCONNECTED -1 #define MQTT_CONNECTED 0 #define MQTT_CONNECT_BAD_PROTOCOL 1 #define MQTT_CONNECT_BAD_CLIENT_ID 2 #define MQTT_CONNECT_UNAVAILABLE 3 #define MQTT_CONNECT_BAD_CREDENTIALS 4 #define MQTT_CONNECT_UNAUTHORIZED 5
接收订阅的消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 void callback (char * topic, byte* payload, unsigned int length) { Serial.print ("Message arrived [" ); Serial.print (topic); Serial.print ("] " ); char msg[length]; for (int i = 0 ; i < length; i++) msg[i]=(char )payload[i]; snprintf (msg, length+1 , msg); Serial.println (msg); if (!strcmp (msg, "on" )) digitalWrite (BUILTIN_LED, LOW); if (!strcmp (msg, "off" )) digitalWrite (BUILTIN_LED, HIGH); }
loop函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 void loop () { if (!client.connected ()) { reconnect (); } client.loop (); unsigned long now = millis (); if (now - lastMsg > 2000 ) { lastMsg = now; ++value; snprintf (msg, MSG_BUFFER_SIZE, "hello world #%ld" , value); Serial.print ("Publish message: " ); Serial.println (msg); client.publish (topic, msg); } }
完整代码示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 #include <ESP8266WiFi.h> #include <PubSubClient.h> const char * ssid = "" ; const char * password = "" ; const char * mqtt_server = "broker.emqx.io" ; const int mqtt_port = 1883 ; const char * mqtt_name = "emqx" ; const char * mqtt_password = "public" ; const char * topic="testtopic" ; WiFiClient espClient; PubSubClient client (espClient) ;unsigned long lastMsg = 0 ;#define MSG_BUFFER_SIZE (50) char msg[MSG_BUFFER_SIZE];int value = 0 ;void setup () { Serial.begin (115200 , SERIAL_8N1, SERIAL_TX_ONLY); pinMode (BUILTIN_LED, OUTPUT); Serial.begin (115200 ); setup_wifi (); client.setServer (mqtt_server, mqtt_port); client.setCallback (callback); } void setup_wifi () { digitalWrite (BUILTIN_LED, LOW); delay (10 ); Serial.println (); Serial.print ("Connecting to " ); Serial.println (ssid); WiFi.mode (WIFI_STA); WiFi.begin (ssid, password); while (WiFi.status () != WL_CONNECTED) { delay (500 ); Serial.print ("." ); } randomSeed (micros ()); Serial.println ("" ); Serial.println ("WiFi connected" ); Serial.println ("IP address: " ); Serial.println (WiFi.localIP ()); digitalWrite (BUILTIN_LED, HIGH); } void reconnect () { while (!client.connected ()) { Serial.print ("Attempting MQTT connection..." ); String clientId = "ESP8266-" ; clientId += WiFi.localIP ().toString ().c_str (); if (client.connect (clientId.c_str () , mqtt_name , mqtt_password)) { Serial.println ("connected" ); client.publish (topic, "connected" ); client.subscribe (topic); } else { char err_buf[128 ]; espClient.getLastSSLError (err_buf, sizeof (err_buf)); Serial.print ("failed, rc=" ); Serial.print (client.state ()); Serial.print (" SSL error: " ); Serial.println (err_buf); Serial.println (" try again in 5 seconds" ); delay (5000 ); } } } void callback (char * topic, byte* payload, unsigned int length) { Serial.print ("Message arrived [" ); Serial.print (topic); Serial.print ("] " ); char msg[length]; for (int i = 0 ; i < length; i++) msg[i]=(char )payload[i]; snprintf (msg, length+1 , msg); Serial.println (msg); if (!strcmp (msg, "on" )) digitalWrite (BUILTIN_LED, LOW); if (!strcmp (msg, "off" )) digitalWrite (BUILTIN_LED, HIGH); } void loop () { if (!client.connected ()) { reconnect (); } client.loop (); unsigned long now = millis (); if (now - lastMsg > 2000 ) { lastMsg = now; ++value; snprintf (msg, MSG_BUFFER_SIZE, "hello world #%ld" , value); Serial.print ("Publish message: " ); Serial.println (msg); client.publish (topic, msg); } }