Source code for fdi.utils.queueworks

import paho.mqtt.client as mqtt
import json
import logging
import queue
import threading
import time

CONN_RESULT = {}
CONN_RESULT[0] = "Connection accepted"
CONN_RESULT[1] = "Connection Refused, unacceptable protocol version"
CONN_RESULT[2] = "Connection Refused, identifier rejected"
CONN_RESULT[3] = "Connection Refused, Server unavailable"
CONN_RESULT[4] = "Connection Refused, bad user name or password"
CONN_RESULT[5] = "Connection Refused, not authorized"


[docs]class queuework2: keepalive = 60 logger = None subclient = None def __init__(self, topics, host=None, port=None, username=None, passwd=None, client_id=None, callback=None, qos=1, userdata=None, clean_session=None, ): self.init_logger() self.init_args(topics, host=host, port=port, username=username, passwd=passwd, on_msg_callback=callback, client_id=client_id, qos=qos, userdata=userdata, clean_session=clean_session) self.client = None pass
[docs] def init_logger(self): logging.basicConfig( level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') self.logger = logging.getLogger("queuework") pass
[docs] def init_args(self, topics, host=None, port=None, username=None, passwd=None, client_id=None, on_msg_callback=None, qos=1, clean_session=None, userdata=None): self.topics = topics self._h = host self._p = port self.username = username self.passwd = passwd self.qos = qos self.client_id = client_id # self.msgq = queue.Queue() self.lock = threading.Lock() self.userdata = userdata self.on_msg_callback = on_msg_callback self.clean_session = clean_session self.reset = False # publish msg ids self.last_mid = -1 self.mids_without_ack = [] self.mwa_lock = threading.Lock() self.ordered_send = True pass
[docs] def process_mid(self, mid, where): """ if mid is already in mwa, check it; if not put it in """ mwa = self.mids_without_ack if mid and mid not in mwa: self.logger.debug(where + '%d not in MWA' % mid) self.last_mid = mid mwa.append(mid) else: if self.ordered_send: if mid > mwa[-1]: self.logger.error(where + "Mid %d found before %d acknowledged." % (mid, mwa[-1])) elif mid < mwa[-1]: self.logger.warning(where + "Mid %d acknowledged after %d has been sent." % (mid, mwa[-1])) mwa.remove(mid) self.logger.debug(where + 'MID %d removed from in MWA' % mid) msg = where + "Published with msg ID %d." % mid self.logger.debug(msg)
[docs] def on_subscribe(self, client, userdata, mid, granted_qos): if client.subscr_mid == mid: client.subscribed = True
[docs] def on_connect(self, client, userdata, flags, rc): msg = CONN_RESULT[rc] has_prev_data = ' some' if flags['session present'] else ' no' logmsg = "Connected with result: " + msg + \ has_prev_data + ' prev session data.' if rc: client.connected = False self.logger.warning(logmsg) else: client.connected = True self.logger.debug(logmsg) pass
[docs] def on_disconnect(self, client, userdata, rc): client.connected = False if rc != 0: msg = CONN_RESULT[rc] + " Unexpected disconnection." else: msg = CONN_RESULT[rc] self.logger.debug("Disconnected with result: " + msg) pass
[docs] def on_message(self, client, userdata, msg): self.lock.acquire() self.msgq.put(msg) self.logger.debug(msg) self.lock.release() pass
[docs] def on_publish(self, client, userdata, mid): self.mwa_lock.acquire() self.process_mid(mid, 'on_publish: ') self.mwa_lock.release()
[docs] def wait_for(self, client, msgType, period=0.1): """ Waiting for subscribe and publish to clear. http://www.steves-internet-guide.com/subscribing-topics-mqtt-client/ return: 0 if successful. """ if msgType == "SUBACK": if client.on_subscribe: limit = 0 while not client.subscribed: self.logger.debug("waiting suback") client.loop(period) # check for messages # time.sleep(period) limit += period if limit > 10: self.logger.warning(msgType + " timeout.") return 1 return 0 elif msgType == "CONNACK": if client.on_connect: limit = 0 while not client.connected: self.logger.debug("waiting connack") client.loop(period) # check for messages # time.sleep(period) limit += period if limit > 10: self.logger.warning(msgType + " timeout.") return 1 # print(':::::::::::::') return 0 elif msgType == "PUBACK": if client.on_publish: limit = 0 while len(self.mids_without_ack): self.logger.debug("waiting puback %s", str(self.mids_without_ack)) client.loop(period) # check for messages # time.sleep(period) limit += period if limit > 10: self.logger.warning(msgType + " timeout.") return 1 return 0 else: self.logger.error('Bad msgType ' + msgType) return 2
[docs] def init_client(self, force=False): if self.client and not force: return self.client self.client = None # self.logger.debug(type(self.userdata)) username = self.username password = self.passwd host = self._h port = self._p qos = self.qos # init client = mqtt.Client(client_id=self.client_id, clean_session=self.clean_session, userdata=self.userdata) self.client = client client.max_inflight_messages_set(100) client.on_connect = self.on_connect client.on_message = self.on_msg_callback if \ self.on_msg_callback else self.on_message client.on_publish = self.on_publish client.on_disconnect = self.on_disconnect client.username_pw_set(username, password) # connect client.connect(host, port, self.keepalive) client.connected = False self.logger.debug("Connect " + host + ":" + str(port)) # subscription ms ids client.subscr_mid = -1 if self.wait_for(client, 'CONNACK'): return None # topic subscription if isinstance(self.topics, list): if isinstance(self.topics[0], str): # topics is a list of topics topics = [(topic, qos) for topic in self.topics] else: topics = self.topics elif isinstance(self.topics, str): topics = self.topics else: self.logger.error('Bad format for subscrib() topics ' + str(self.topics)) return None rc, mid = client.subscribe(topics, qos=qos) client.subscr_mid = mid self.logger.debug("subscribe %s status: %s mid %s" % (str(topics), CONN_RESULT[rc], str(mid))) client.subscribed = False if self.wait_for(client, 'SUBACK'): return None return client
[docs] def start_send(self): return self.init_client()
[docs] def send(self, topics, text, conn=True): if conn: self.client.connect(self._h, self._p, self.keepalive) # self.client.reconnect() self.client.connected = False self.logger.debug("Connect " + self._h + ":" + str(self._p)) if self.wait_for(self.client, 'CONNACK'): return 1 # rc, mid = self.client.publish(topics, payload=text, qos=self.qos) self.mwa_lock.acquire() self.process_mid(mid, 'send: ') self.mwa_lock.release() self.logger.debug("Publish status: %d mid %d" % (rc, mid)) if conn: if self.wait_for(self.client, 'PUBACK'): return 1 # client.loop_stop() # client.disconnect() return 0
[docs] def stop_send(self): if self.client is not None: # self.client.disconnect() # self.client.loop_start() # self.client = None pass
[docs] def start_receive(self, loop='no'): if self.init_client(): # if loop == 'forever': self.client.loop_forever() elif loop == 'start': self.client.loop_start() else: pass
#
[docs] def stop_receive(self): if self.subclient is not None: self.subclient.loop_stop() self.subclient.disconnect() self.subclient = None pass
[docs] def get_size(): size = self.msgq.qsize() return size
[docs] def get_message(self): try: self.lock.acquire() # m = self.msgq.get(True,self.timeout) m = self.msgq.get_nowait() except Exception: m = None finally: self.lock.release() return m