EMQ X data persistence stores messages to Redis

Redis

brief introduction

Redis is completely open source and complies with BSD protocol. It is a high-performance key value database.

Redis and other key - value caching products have the following three characteristics:

  • Redis supports data persistence. It can save the data in memory on disk and can be loaded again for use when restarting.
  • Redis not only supports simple key value data, but also provides storage of list, set, zset, hash and other data structures.
  • Redis supports data backup, that is, data backup in master slave mode.

Can refer to centos8.0 install redis
Can refer to centos8.0 platform uses redis cli to monitor and manage redis5
To install Redis, start the Redis server through the Redis server command.
If the installed Redis is not on the same server as EMQ X, please specify the correct Redis server address and port. For EMQ X installed through RPM, Redis related configuration files are located in / etc/emqx/plugins/emqx_backend_redis.conf is as follows:

Common commands

  • View redis status: systemctl status redis
  • Start redis: systemctl start redis
  • Restart redis: systemctl restart redis service
  • Check the redis installation directory: ps -ef|grep redis

ps -ef|grep redis gets the process number 29895
Then: ls -l /proc/29895/cwd

Launch plug-in

Commands: emqx_ctl plugins load emqx_backend_redis

Or go to the EMQ X Dashboard management console, in plug-in - > All - > emqx_ backend_ Start the plug-in in redis

Scenario 1: client online status storage

When the client goes online and offline, update the online status, online and offline time and node client list to Redis database.

Configuration item

Open the configuration file [configuration file directory / etc/emqx/plugins/emqx_backend_redis.conf], and configure the Backend rule:

## go online
backend.redis.hook.client.connected.1    =  { "action": { "function": "on_client_connected" }, "pool": "pool1"}

## Offline
backend.redis.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}

Use example

Browser open http://ip:18083 EMQ X Dashboard management console, create a new client connection in tools - > websocket, and specify clientid as sub_client

Execute the command: Redis cli - H 127.0.0.1 - P 6379 or / usr / local / soft / Redis5 / bin / Redis cli open the Redis cli command line window and execute the command keys *. The results are as follows. Readers can see that two keys are stored in Redis:

If redis sets the password, use the command: auth passward to authorize

Common commands

  • Connection list command: HGETALL mqtt:node:emqx@127.0.0.1


Field Description:

## Online device information under node
127.0.0.1:6379> HGETALL mqtt:node:emqx@127.0.0.1
1) "sub_client1" # clientid
2) "1542272836" # Online timestamp
  • Connection details command: HGETALL mqtt:client:sub_client


Field Description:

## Client online status
127.0.0.1:6379> HGETALL mqtt:client:sub_client
1) "state"
2) "0" # 0 offline 1 Online
3) "online_at"
4) "1542272854" # Online timestamp
5) "offline_at"
6) "undefined" # Offline timestamp

Client proxy subscription

Configuration item

Open the configuration file [configuration file directory / etc/emqx/plugins/emqx_backend_redis.conf], and configure the Backend rule:

Use example

When sub_ When a client device goes online, it needs to subscribe to a sub_client/upstream and sub_client/downlink two QoS 1 topics:

## redis key is mqtt:sub:{client_id}
## HSET key {topic} {qos}
127.0.0.1:6379> HSET mqtt:sub:sub_client sub_client/upstream 1
(integer) 1

127.0.0.1:6379> HSET mqtt:sub:sub_client sub_client/downlink 1
(integer) 1


Go back to the Dashboard and check the subscription list. It can be seen that the current client has automatically subscribed to sub_client/upstream and sub_client/downlink two QoS 1 topics:

Switch back to the WebSocket page of the management console and send the sub_ The client / downlink topic publishes messages. You can receive published messages in the message subscription list.

Persistent Publishing

Item Configuration

Open the configuration file [configuration file directory / etc/emqx/plugins/emqx_backend_redis.conf], configure the Backend rule, and support the use of topic parameter for message filtering. Here, use # wildcards to store any topic message:

Use example

In the WebSocket page of EMQ X management console, use ClientID sub_ The client establishes a connection to the topic upstream_topic publishes multiple messages. For each message, EMQ X will persist two records: message list and message details.

Common commands

  • View message list command: ZRANGE mqtt:msg:upstream_topic 0 -1

## Get upstream_ All message IDS in topic topic collection
127.0.0.1:6379> ZRANGE mqtt:msg:upstream_topic 0 -1
1) "Mjk2MTY1NzU0MDI1MTM4MDM4NTkwNzgzNTc2OTE4NTg5NDI"
2) "Mjk2MTY1NzUxNzExODcwNTgzODIyMzAzMDA0NTg0MTgxNzI"
3) "Mjk2MTY1NzUzMDcxOTgyNDEyOTgzNjgxNjU5NTIwMjg2NzF"
127.0.0.1:6379>
  • View message details command: HGETALL mqtt:msg:{key}

## Get the message details with message id Mjk2MTY1NzU0MDI1MTM4MDM4NTkwNzgzNTc2OTE4NTg5NDI
127.0.0.1:6379> HGETALL mqtt:msg:Mjk2MTY1NzU0MDI1MTM4MDM4NTkwNzgzNTc2OTE4NTg5NDI
 1) "id"
 2) "Mjk2MTY1NzU0MDI1MTM4MDM4NTkwNzgzNTc2OTE4NTg5NDI"
 3) "from"
 4) "sub_client"
 5) "qos"
 6) "1"
 7) "topic"
 8) "upstream_topic"
 9) "payload"
10) "{ \"msg\": \"\xe6\x88\x91\xe6\xad\xa3\xe5\x9c\xa8\xe5\xad\xa6\xe4\xb9\xa0EMQ-Redis\xe6\x8c\x81\xe4\xb9\x85\xe5\x8c\x96\xe5\x8f\x91\xe5\xb8\x83\xe6\xb6\x88\xe6\x81\xaf002\" }"
11) "ts"
12) "1605517769649"
13) "retain"
14) "false"

Get offline messages

Configuration item

Open the configuration file [configuration file directory / etc/emqx/plugins/emqx_backend_redis.conf], and configure the Backend rule:

Use example

MQTT offline messages must meet the following conditions:

1.   with clean_session = false connect
2.   subscribe QoS > 0
3.   release QoS > 0

Create a new client sub in the EMQ X management console_ client_ Another establishes the following configuration to establish a connection to topic: sub_client/upstream release message:

Then disconnect the sub_client_ Connect to another and re launch sub_client, you can see that you can still receive messages:

Persistent Retain message

Open the configuration file [configuration file directory / etc/emqx/plugins/emqx_backend_redis.conf], and configure the Backend rule:

Publish Retain message in Dashboard:

Common commands

  • Message list command: ZRANGE mqtt:retain:upstream_topic 0 -1 or getange mqtt: retain: upstream_ Topic 0 - 1 [get value according to the data type stored in redis]

  • Message details command: HGETALL mqtt:msg:Mjk2MTY3MzA0ODEyNzgzNDMyNzI0NjM4ODU5NTQ4NDI2MjJ

## Get the message details with message id Mjk2MTY3MzA0ODEyNzgzNDMyNzI0NjM4ODU5NTQ4NDI2MjJ
127.0.0.1:6379> HGETALL mqtt:msg:2VFt0kuNrOktefX6m4nP
 1) "id"
 2) "Mjk2MTY3MzA0ODEyNzgzNDMyNzI0NjM4ODU5NTQ4NDI2MjJ"
 3) "from"
 4) "sub_client"
 5) "qos"
 6) "2"
 7) "topic"
 8) "upstream_topic"
 9) "payload"
10) "{ \"msg\": \"now i am studing\" }"
11) "ts"
12) "1605526176485"
13) "retain"
14) "true"

So far, the simulation of EMQ X data persistent storage message to Redis [scenario 1] is completed!
[reference document] EMQ official website link: EMQ official website
Next level [scenario 2: store the qualified messages under the topic specified by EMQ X in IDS]: Store the qualified messages under the specified topic of EMQ X in IDS

Tags: Java

Posted by Jarl on Sat, 07 May 2022 22:13:34 +0300