Source code for fdi.dataset.messagequeue

# -*- coding: utf-8 -*-

from .deserialize import deserialize
from ..utils.getconfig import get_mqtt_config
from .listener import EventListener, EventSender
from .serializable import serialize

import paho.mqtt.client as mqtt
from paho.mqtt.client import error_string, MQTT_ERR_NO_CONN

from itertools import chain
import logging

if 0:
    logging.basicConfig(level=logging.DEBUG,
                        format='%(asctime)s %(name)8s %(process)d %(threadName)s %(levelname)s %(funcName)10s() %(lineno)3d- %(message)s')

# create logger
logger = logging.getLogger(__name__)
# logger.debug('level %d' %  (logger.getEffectiveLevel()))


[docs]class MqttRelayListener(EventListener): """ Generic interface for sending anything heard to an MQTT message queue. """
[docs] def __init__(self, topics=None, host=None, port=None, username=None, passwd=None, callback=None, clean_session=None, client_id=None, userdata=None, qos=1, conn=True, subs=True, **kwds): # MqttRelayListener """ Starts a MQTT message queue and forward everything in the arguement list to the MQ serialized. host, port, username, passwd: if any is not provided, it is looked up in `config[...]. """ super().__init__(**kwds) if bool(host and port and username and passwd) is False: conf = get_mqtt_config() mq = mqtt.Client( client_id=client_id, clean_session=clean_session, userdata=userdata if userdata else self) mq.username_pw_set(username if username else conf['mq_user'], passwd if passwd else conf['mq_pass']) self.mq = mq self.topics = topics # for topic subscription if isinstance(self.topics, list): if isinstance(self.topics[0], str): # topics is a list of topics topics = [(topic, qos) for topic in self.topics] else: topics = self.topics elif isinstance(self.topics, str): topics = self.topics else: logger.error('Bad format for subscrib() topics ' + str(self.topics)) return None self.topics_for_subscription = topics self.host = host if host else conf['mq_host'] self.port = port if port else int(conf['mq_port']) self.qos = qos self.keepalive = True self.username = username if username else conf['mq_user'] self.passwd = passwd if passwd else conf['mq_pass'] mq.username_pw_set(self.username, self.passwd) #mq.on_message = callback if callback else on_message mq.on_connect = on_connect self.mq.loop_start() # connect if conn: mq.connect(self.host, self.port, self.keepalive) logger.debug("Connect " + self.host + ":" + str(self.port))
[docs] def targetChanged(self, *args, **kwargs): """ Informs that an event has happened in a target of any type. """ payload = list(chain(args, kwargs.items())) json_str = serialize(payload) logger.debug("send msg to [" + self.topics + "]") logger.debug(json_str) self.mq.reconnect() msg_info = self.mq.publish( self.topics, payload=json_str, qos=self.qos, retain=False) rc, mid = msg_info.rc, msg_info.mid if rc == MQTT_ERR_NO_CONN: raise Exception('why not connected?') logger.debug("Publish status: %d mid %d" % (rc, mid)) logger.debug("send over")
[docs]class MqttRelaySender(EventSender): """ Gets MQTT messages and forwards to listeners. """
[docs] def __init__(self, topics=None, host=None, port=None, username=None, passwd=None, callback=None, clean_session=None, client_id=None, userdata=None, keepalive=60, qos=1, **kwds): """ Starts a MQTT message queue and forward everything in the arguement list to the MQ serialized. host, port, username, passwd: if any is not provided, it is looked up in `config[...]. """ super().__init__(**kwds) if bool(host and port and username and passwd) is False: conf = get_mqtt_config() logger.debug('starting mq listening to '+str(topics)) mq = mqtt.Client( client_id=client_id, clean_session=clean_session, userdata=userdata if userdata else self) username = username if username else conf['mq_user'] passwd = passwd if passwd else conf['mq_pass'] mq.username_pw_set(username, passwd) host = host if host else conf['mq_host'] port = port if port else int(conf['mq_port']) mq.on_message = on_message mq.connect(host, port, keepalive=keepalive) logger.debug("Connect " + host + ":" + str(port)) rc, mid = mq.subscribe(topics, qos=qos) logger.debug("subscribe %s status: %s mid %s" % (str(topics), error_string(rc), str(mid))) self.mq = mq mq.loop_start()
[docs]def on_connect(client, userdata, flags, rc): has_prev_data = ' some' if flags['session present'] else ' no' msg = mqtt.connack_string(rc) # CONN_RESULT[rc] logmsg = "Connected with result: " + msg + \ has_prev_data + ' prev session data.' rc, mid = userdata.mq.subscribe(userdata.topics_for_subscription, qos=userdata.qos) logger.debug("subscribe %s status: %s mid %s" % (str(userdata.topics), error_string(rc), str(mid)))
[docs]def on_message(client, userdata, msg): mqtt_rel_s = userdata logger.debug("Received: " + msg.topic + ' ' + str(msg.payload)) msgobj = deserialize(msg.payload.decode(encoding='utf-8')) mqtt_rel_s.fire(msgobj) mqtt_rel_s.last_msg = msgobj