I know that M2MQTT does not support out-of-the-box disk buffering. But I would need to implement this. The purpose of this is to be sure that all messages published actually reached the broker.
Right now when I publish a message, my class directly puts the message in a key-value database that is stored on disk.
In another thread, a method loops a peek operation on the key-value database and looks for messages to publish. When a new message is found in the key-value database, the thread calls the M2Mqtt method Publish, it then directly ends up in the internal M2MQTT inflight queue and it returns a publish ID. It then waits for the M2Mqtt event MessagePublished before publishing any other data, once the event is called, it compares the MessagePublished ID from the event with the one it received from the Publish method. If they are equal it knows the message was successfully published.
To summarize:
- Thread 1 enqueues message to key-value database
- Thread 2 runs a peek loop on key-value database queue
- Thread 2 finds a new message
- Thread 2 calls M2Mqtt Publish and recieves ID1 from Publish method
- Thread 2 is halted and waits for MessagePublished event and checks if the ID that were published is equal to ID1, if that is the case, the key-value database queue is dequeued once and at this point the message is considered successfully sent.
- Thread 2 continues its loop as in step 2, looking for new messages to publish.
During all operations thread 1 might enqueue a lot of messages, but thread 2 will only be able to publish messages one-by-one in order to verify that the message was successfully published before actually dequeuing/removing it from the key-value database and consider it sent. I need to publish them to the broker in the same order as they appeared from thread 1.
If thread 2 simply just published all messages incoming from thread 1 via the key-value database. Thread 2 might dequeue/remove values from the key-value database despite the messages actually did not reach the broker. They might be in the RAM inflight queue, but a server reboot or service restart would empty this queue, leaving the messages completely lost because thread 2 dequeued them from the key-value database without actually verifying they reached the broker.
Can you give med guidance of how this implementation should be made in best practice with M2Mqtt? Is the pattern above best practice? And what key-value database is prefered for this type of implementation? Right now I have tried using SqlCeCompact.