ref –
- https://emqx.medium.com/how-to-use-mqtt-in-the-react-project-1b04d80bd383
- http://www.steves-internet-guide.com/mqtt/
- http://www.steves-internet-guide.com/mqtt-hosting-brokers-and-servers/
- http://www.steves-internet-guide.com/using-javascript-mqtt-client-websockets/
- https://github.com/chkr1011/MQTTnet/issues/498
MQTT – Message Queuing Telemetry Transport.
The best way of understanding MQTT is to use it, and to do that you need two things:
- An MQTT broker
- An MQTT client
Rather than installing your own broker (mosquitto) I recommend that you first start with a public test broker like test.mosquitto.org or broker.emqx.io
its protocol may be: ws://
and port 8083
Now that we have a broker that will take in our messages…we should implement the MQTT client.
Implementing the MQTT client
Create a React Native App.
npx create-react-app react-mqtt-test –template typescript
npm install –save typescript @types/node @types/react @types/react-dom @types/jest
npm install mqtt –save
Note that you will encounter error
ERROR in ./node_modules/mqtt/lib/connect/index.js 7:12-26
Module not found: Error: Can’t resolve ‘url’ in ‘/Users/ricky_tsao/Desktop/jwt-react-rtk-query/node_modules/mqtt/lib/connect’
npm install url
(https://stackoverflow.com/questions/54392956/react-native-mqtt-module-url-does-not-exist-in-the-haste-module-map)
Next, under src, create components. Under components, create folder Hook.
Create index.js. Let’s create functional component called HookMqtt.
We create state hooks for our client, isSubscribed, payload, and connection status
HookMqtt
src/components/Hook/index.js
1 2 3 4 5 6 |
const HookMqtt = () => { const [client, setClient] = useState(null); const [isSubed, setIsSub] = useState(false); const [payload, setPayload] = useState({}); const [connectStatus, setConnectStatus] = useState('Connect'); } |
We declare a function where we use mqtt.connect for host and mqttOption. The reason why we do it here is because we need to update our client.
src/components/Hook/index.js
1 2 3 4 |
const mqttConnect = (host, mqttOption) => { setConnectStatus('Connecting'); setClient(mqtt.connect(host, mqttOption)); }; |
Notice setClient. When we connect, we update client with setClient, then the client variable can be used else where in HookMqtt functional component.
HookMqtt renders a UI like so:
1 2 3 4 5 6 7 8 9 10 |
return ( <> <Connection connect={mqttConnect} disconnect={mqttDisconnect} connectBtn={connectStatus} /> <QosOption.Provider value={qosOption}> <Subscriber sub={mqttSub} unSub={mqttUnSub} showUnsub={isSubed} /> <Publisher publish={mqttPublish} /> </QosOption.Provider> <Receiver payload={payload}/> </> ); |
Notice that it has four components:
- Connection
- Subscriber
- Publisher
- Receiver
Let’s talk about Connection first, as it is a component we need to be implemented to connect to our broker. Notice that Connect takes in prop mattConnect.
Connection
We want to pass in our previously created mqttConnect function into our connect prop parameter.
We will use this connect function to connect. However, let’s initialize url and options first.
First, in this class, we have a form, which initializes it via record.
src/components/Hook/Connection.js
1 2 3 4 5 6 7 |
const Connection = ({ connect, disconnect, connectBtn }) => { const record = { host: 'broker.emqx.io', clientId: `mqttjs_ + ${Math.random().toString(16).substr(2, 8)}`, port: 8083, }; |
We also have a record that has the basic connection properties of host, clientId, and port.
This is to connect to our free broker.
Next, we implement onFinish, which is a handler for when the user have filled out the form with user name and password.
We extract our connection information in order to form a url.
src/components/Hook/Connection.js
1 2 3 4 5 |
const onFinish = (values) => { const { host, clientId, port, username, password } = values; const url = `ws://${host}:${port}/mqtt`; ... ... |
Options
We then create options object, which tells the broker what we want as far as the connection goes. Pay attention to clean and retain.
Clean property removes pending messages.
clean property means that we want to clean the session when we go offline. When this happens, then whatever pending QoS 1 and 2 messages that was published by other clients will not be there anymore.
We want to set clean to false so that we DO NOT clean the session when we go offline. That way, it still receives published Qos 1 and 2 messages when we go offline. When we log back on, we will see the messages.
By default clean is set to true. This means that when clients disconnect, whatever message that is published when they are offline will not be received when they sign back on.
src/components/Class/Connection.js
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
const options = { keepalive: 30, protocolId: 'MQTT', protocolVersion: 4, // set to false to receive QoS 1 and 2 messages while offline clean: false, // default true reconnectPeriod: 1000, connectTimeout: 30 * 1000, will: { topic: 'WillMsg', payload: 'Connection Closed abnormally..!', qos: 0, retain: true // default false }, rejectUnauthorized: false }; |
Normally if someone publishes a message to a topic, and no one is subscribed to that topic the message is simply discarded by the broker.
However the publisher can tell the broker to keep the last message on that topic by setting the retained message flag to true.
This can be very useful, as for example, if you have sensor publishing its status only when changed e.g. Door sensor. What happens if a new subscriber subscribes to this status?
Without retained messages the subscriber would have to wait for the status to change before it received a message.
However with the retained flat turned on, any subscribed client(s) would be able to get the
last retained message (current state of the sensor).
What is important to understand is that only one message is retained per topic.
Also, the next message published on that topic replaces the last retained message for that topic.
moving on, we add properties for authentication to our options. This way, we have url/options all ready to connect.
1 2 3 4 5 |
options.clientId = clientId; options.username = username; options.password = password; connect(url, options); }; |
so we set our athentication user info onto options and then pass everything into connect.
Since the connect function is passed in by Mqtt, it will run this line to connect and then set the client object.
src/components/Hook/index.js
1 |
setClient(mqtt.connect(host, mqttOption)); |
Back to HookMqtt. Now that we have a valid client to work with, let’s implement useEffect for when our client has changed to a new client.
When a new client comes onboard we have to make sure to get events so we can update our state when these happen:
src/components/Hook/index.js
- connect status (connectStatus)
- error status
- reconnect status (connectStatus)
- message status (payload)
We get the event updates via callbacks:
src/components/Hook/index.js
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
useEffect(() => { if (client) { client.on('connect', () => { setConnectStatus('Connected'); }); client.on('error', (err) => { client.end(); }); client.on('reconnect', () => { setConnectStatus('Reconnecting'); }); client.on('message', (topic, message) => { const payload = { topic, message: message.toString() }; setPayload(payload); }); } }, [client]); // whenever client changes...we do effects |
Now, when a client signs on, we can update our state payload and connectStatus when an event happens for that particular user.
Similar to how Connection is implemented, we’ll do the same for Publisher.
So in our functional component HookMqtt:
1 2 3 4 5 6 |
return ( <> ... <Subscriber sub={mqttSub} unSub={mqttUnSub} showUnsub={isSubed} /> </> ) |
where mqttSub is:
1 2 3 4 5 6 7 8 9 10 11 12 |
const mqttSub = (subscription) => { if (client) { const { topic, qos } = subscription; client.subscribe(topic, { qos }, (error) => { if (error) { console.log('Subscribe to topics error', error) return } setIsSub(true) }); } }; |
It takes a subscription with the topic and qos data.
Now, where does topic come from?
In the Subscriber form, we have intiialValues with the record data:
1 2 3 4 5 6 |
<Form layout="vertical" name="basic" form={form} initialValues={record} onFinish={onFinish} |
where record data is:
1 2 3 4 |
const record = { topic: 'testtopic/react', qos: 0, }; |
topic and qos (in the Form) matches up to the ui controls. topic to textfield. qos to a pulldown. The options for the pulldown is dictated by qsOptioins.Provider, which gives us constant value context.
So that setups topic and qos defaults. If you use the pull down to select new qos, or type in new topic, the values parameter will be updated in onFinish.
1 2 3 4 5 6 7 |
const Subscriber = ({ sub, unSub, showUnsub }) => { ... const onFinish = (values) => { sub(values); }; ... } |
Subscriber takes in prop sub and we call sub with our values. aka calling mqttSub.
In mqttSub, we see that:
1 2 3 4 |
client.subscribe(topic, { qos }, (error) => { ... ... } |
it takes in a string topic, an options qos, and a callback for error anonymous function.
In client.d.ts, it is declared like so:
1 2 3 4 5 |
public subscribe ( topic:string | string[], opts: IClientSubscribeOptions, callback?: ClientSubscribeCallback): this ) |
where IClientSubscribeOptions is defined as:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
export interface IClientSubscribeOptions { /** * the QoS */ qos: QoS, /* * no local flag * */ nl?: boolean, /* * Retain As Published flag * */ rap?: boolean, /* * Retain Handling option * */ rh?: number } |
We need to put the qos to know what agreement is used between sender and receiver on the guarantee of delivering a message.
There are three levels.
- 0 – client publish message, no acknowledgement from broker
- 1 – sending is guaranteed. Broker will acknowledge. Client will re-send until it gets the broker’s acknowledgement
- 2 – Sequence of 4 messages between sender and receiver (a kind of handshake) to confirm that message has been sent, and acknowledgement has been received. Both sender and receiver are sure that the message was sent exactly once.
Thus, client.subscribe will simply subscribe this client to the broker. And the broker will publish messages to this client in the future.
Receiving the Messages
Now looks at functional component Receiver.
In HookMqtt, it gives the state payload as prop into Receiver component.
1 |
<Receiver payload={payload}/> |
and it updates the payload in useEffect:
1 2 3 4 5 6 7 8 9 10 11 |
useEffect(() => { if (client) { client.on('message', (topic, message) => { const payload = { topic, message: message.toString() }; setPayload(payload); }); ... ... } } |
The broker will be able to send messages to our client through the message event. Then we quickly update our payload.
In functional component Receiver, we append this new payload into our message queue.
1 2 3 4 5 6 7 8 |
const Receiver = ({ payload }) => { const [messages, setMessages] = useState([]) useEffect(() => { if (payload.topic) { setMessages(messages => [...messages, payload]) } }, [payload]) |
where we render a List using this messages array:
1 2 3 4 5 6 |
<List size="small" bordered dataSource={messages} renderItem={renderListItem} /> |
Publishing Messages
In HookMqtt,
1 |
<Publisher publish={mqttPublish} /> |
where
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
const mqttPublish = (context) => { if (client) { const { topic, qos, payload } = context; // client.d.ts: // public publish ( // topic: string, // message: string | Buffer, // opts: IClientPublishOptions, // callback?: PacketCallback // ): this client.publish( topic, payload, { qos, retain: true, }, error => { if (error) { console.log('Publish error: ', error); } } ); } } |
So following the interface defined for us, we give it a topic, a payload message, and options of type IClientPublishOptions where we set retain to true.
As mentioned earlier, retain to true means the user will get the last message. If we were to set this to false, then they will not.