[Document sharing]-Message Queue Development


Currently, the supported queue and message bus are composed by sys_bus, Queue and event-manage. In following chapter, we will introduce the differences and show examples among them.

Confirming function support is necessary. If it is doubtful that whether the current version supports before application, you’d better try following method.

As above screen-shot implies, this FW supports all queues and message buses.


The sys_bus is mainly used in publication and subscription; in addition, the server is not necessary. Here shows codes:

import sys_bus
import utime 
def cb_callback(topic, msg):
    print(topic, msg)
# Support registering multiple subscription functions by one topic. 
sys_bus.subscribe("topic1", cb_callback)
sys_bus.subscribe("topic2", cb_callback)
sys_bus.subscribe("topic3", cb_callback)

" The subscriber will receive message after publishing"
sys_bus.publish("topic1", " What printed here is the message published by topic 1")
sys_bus.publish("topic2", " What printed here is the message published by topic 2")
sys_bus.publish("topic3", " What printed here is the message published by topic 3")

# Return {"topic1": set(cb_callback...)}
# Return set(cb_callback...)

sys_bus.unsubscribe("topic1", cb_callback)
# As suscribing callback has been cancelled above, the following publication won' t slide into the former callback. That means: the subscribed message can still be received, however, there is no callback.  
sys_bus.publish("topic1", "What printed here is the message published by topic 1")

# As the subscription of topic 1 has been cancelled, the following publication won't receive any message. 
sys_bus.publish("topic1", " What printed here is the message published by topic 1")

Here is the result.


Under the circumstance that thread A subscribes topic C, then if another thread B sends message to topic C, the thread A will receive the message of topic C correspondingly.


The Queue is widely used in the communication among threads. Here shows the codes.

import _thread
from queue import Queue
import utime
# Initialize queue
q = Queue(maxsize=100)
def get():
    while True:
# The q.get will wait until get the message. Whenever the q.put is executed, the q.get will delete block and carry on if influenced by relevant signal. 
        item = q.get()
# Start the thread and wait for message in which place. 
# put the message to queue

Here shows the result.


When the thread A builds a queue, it will wait to get the data until put by another thread B.


The event-manage is mainly used to dispatch message among layers as well as the implementation of observer, which supports dispatch service of message queue, synchronous/ asynchronous dispatch mechanism and observer mode. Here show codes:

from event_message import Event
import utime
1. Initialize event
2. Initialize event manager
3. Register event to event manager
4. Start event manager
5. Add event manager
6. Dispatch data
from event_message import Event, EventManager
# Initialize event and event manager
event = Event("test")
event_manager = EventManager()
# Register event
# Start event manager
# Add event manager, you can comment different event types by patches. 
def handler(**kwargs):
    em = kwargs["event_message"]
    The first way to get data
    Four attributes are invloved in EventMessageObject
        It provides the method to get by itself and by combination. As for getting by combination, following methods are available, the model_to_dict()will get the dictionary of combination. 
            'name': 'test',
            'event': event_object,
            'msg': '1111',
            'callback': None
    print("handler1 {}".format(kwargs))
    1. Method to get the first attribute
        # Get the event name
            event_name = em.event_name
        # Get the event message
            msg = em.msg
        # Get the original event
            ev = em.event
        # None Get the transmitted callback, it is None by default if there is no transmission. 
            cb = em.callback
    2. The way to get the second attribute value(Recommended)
        data_map = em.model_to_dict()
                'name': 'test',
                'event': event_object,
                'msg': '1111',
                'callback': None
# Asynchronous dispatch data instead of block 
while True:

The running result is shown as following figure.


The program is in a mode of cycle asynchronous dispatch data.