stomp.py (not so final)

Monday, 5 December, 2005

Well I did say sort of.

Yet another version of stomp.py can be found below.

After some consideration, I thought better of the one-listener-per-subscription method, including instead, addlistener & dellistener methods in the Connection class — thus all listeners receive all subscription messages. Otherwise, the question becomes how to handle acknowledgements? Does the first listener subscribe-call set the acknowledgement? In which case, what if the initial subscriber sets ack: auto and latter subscribers want client? Too much complexity, and, at least to me, this way is cleaner (and by cleaner, I mean it’s easier to determine ‘functional responsibility’).

One problem this change has exposed, is a bug in unsubscribe. I don’t (yet) know if it’s a problem in my module, in activemq/stomp, or it’s just one of those you-shouldn’t-be-coding-so-late-in-the-evening issues that suddenly vanishes when you try it again the next day. But I’ve tried an unsubscribe in a direct telnet session and (annoyingly) still receive messages to that destination, so evidence begins to point toward other sources. I’ll investigate further…

UPDATE 06/12/05: confirmed on a separate machine (& instance of activemq) that unsubscribe doesn’t work as expected.


"""Stomp Protocol Connectivity

    This provides basic connectivity to a message broker supporting the 'stomp' protocol.
    At the moment ACK, SEND, SUBSCRIBE, UNSUBSCRIBE, BEGIN, ABORT, COMMIT and DISCONNECT operations
    are supported.  CONNECT is implicit.

    This changes the previous version which required a listener per subscription -- now a listener object
    just calls the 'addlistener' method and will receive all messages sent in response to all/any subscriptions.
    (The reason for the change is that the handling of an 'ack' becomes problematic unless the listener mechanism
    is decoupled from subscriptions).

    UNSUBSCRIBE seems to be broken.  Can't tell if it's might fault or activemq's, but when I do it manually from
    telnet, I get the same results.

    Note that you must 'start' an instance of Connection to begin receiving messages.  For example:

        conn = stomp.Connection('localhost', 62003, 'myuser', 'mypass')
        conn.start()

    Meta-Data
    ---------
    Author: Jason R Briggs
    License: http://www.apache.org/licenses/LICENSE-2.0
    Version: $Revision: 1.2 $
    Start Date: 2005/12/01
    Last Revision Date: $Date: 2005/12/05 23:21 $

    Notes/Attribution
    -----------------
    * uuid method courtesy of Carl Free Jr:

http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/213761

"""

import md5
import random
import re
import select
import socket
import sys
import threading
import time

# retrieve a description from the headers of a message
_destination_re = re.compile(r'destination:s*(.*)')

def _uuid( *args ):
    """
    uuid courtesy of Carl Free Jr:
    (http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/213761)
    """

    t = long( time.time() * 1000 )
    r = long( random.random() * 100000000000000000L )

    try:
        a = socket.gethostbyname( socket.gethostname() )
    except:
        # if we can't get a network address, just imagine one
        a = random.random() * 100000000000000000L
    data = str(t) + ' ' + str(r) + ' ' + str(a) + ' ' + str(args)
    data = md5.md5(data).hexdigest()

    return data

class Connection(threading.Thread):

    def __init__(self, host, port, user='', passcode=''):
        self.ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.ss.connect((host, port))
        self.rs = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.rs.connect((host, port))
        self.rs.settimeout(1)
        self.buf = [ ]
        self.sendbuf = [ ]

        self.listeners = [ ]

        self.running = 1

        connstr = 'CONNECTnuser: %snpasscode: %snnx00n' % (user, passcode)
        self.ss.send(connstr)
        print self.__read(self.ss)
        self.rs.send(connstr)
        print self.__read(self.rs)

        threading.Thread.__init__(self)

    def __read(self, sock):
        try:
            while 1:
                c = sock.recv(1024)
                self.buf.append(c)
                if 'x00' in c:
                    break
        except Exception:
            pass
        s = ''.join(self.buf)
        pos = s.find('x00')
        s1 = s[0:pos]
        s2 = s[pos+1:]
        if len(s2) > 0:
            self.buf = [ s2 ]
        else:
            self.buf = [ ]
        return s1

    def run(self):
        while self.running or len(self.sendbuf) > 0:
            if len(self.sendbuf) > 0:
                for msg in self.sendbuf:
                    self.rs.sendall(msg)
                self.sendbuf = [ ]
                if not self.running:
                    break

            msg = self.__read(self.rs)
            if not msg or msg == '':
                continue
            mat = _destination_re.search(msg)
            if mat:
                dest = mat.group(1)
                for listener in self.listeners:
                    listener.receive(msg)
        print 'connection message loop completed'
        self.ss.close()
        self.rs.close()

    # objects listening to subscription messages
    def addlistener(self, listener):
        self.listeners.append(listener)

    def dellistener(self, listener):
        self.listeners.remove(listener)

    #
    # stomp transmissions
    #

    def abort(self, transactionid):
        self.ss.send('ABORTntransaction: %snnx00n' % transactionid)

    def ack(self, messageid, transactionid=None):
        if transactionid:
            header = 'message-id: %sntransaction: %s' % (messageid, transactionid)
        else:
            header = 'message-id: %s' % messageid
        self.sendbuf.append('ACKn%snnx00n' % header)

    def begin(self, transactionid=None):
        if not transactionid:
            transactionid = _uuid()
        self.ss.send('BEGINntransaction: %snnx00n' % transactionid)
        return transactionid

    def sendblank(self):
        self.sendbuf.append('x00n')

    def commit(self, transactionid):
        self.ss.send('COMMITntransaction: %snnx00n' % transactionid)

    def disconnect(self):
        self.running = None
        self.ss.send('DISCONNECTnnx00n')
        self.sendbuf.append('DISCONNECTnnx00n')

    def send(self, dest, msg, transactionid=None):
        if transactionid:
            transheader = 'transaction: %s' % transactionid
        else:
            transheader = ''
        self.ss.send('SENDndestination: %sn%snn%snx00n' % (dest, transheader, msg))

    def subscribe(self, dest, ack='auto'):
        self.sendbuf.append('SUBSCRIBEndestination: %snack: %snnx00n' % (dest, ack))

    def unsubscribe(self, dest):
        self.sendbuf.append('UNSUBSCRIBEndestination:%snnx00n' % dest)

#
# command line testing
#
if __name__ == '__main__':

    class StompTester(object):
        def __init__(self, host, port, user='', passcode=''):
            self.c = Connection(host, port, user, passcode)
            self.c.addlistener(self)
            self.c.start()

        def receive(self, message):
            print message
            print 'END@'

        def ack(self, args):
            if len(args) < 3:
                self.c.ack(args[1])
            else:
                self.c.ack(args[1], args[2])

        def abort(self, args):
            self.c.abort(args[1])

        def begin(self, args):
            print 'transaction id: %s' % self.c.begin()

        def blank(self, args):
            self.c.sendblank()

        def commit(self, args):
            if len(args) < 2:
                print 'expecting: commit <transid>'
            else:
                self.c.commit(args[1])

        def disconnect(self, args):
            self.c.disconnect()

        def send(self, args):
            if len(args) < 3:
                print 'expecting: send <destination> <message>'
            else:
                self.c.send(args[1], ' '.join(args[2:]))

        def sendtrans(self, args):
            if len(args) < 3:
                print 'expecting: sendtrans <destination> <transid> <message>'
            else:
                self.c.send(args[1], ' '.join(args[3:]), args[2])

        def subscribe(self, args):
            if len(args) < 2:
                print 'expecting: subscribe <destination> [ack]'
            elif len(args) > 2:
                self.c.subscribe(args[1], args[2])
            else:
                self.c.subscribe(args[1])

        def unsubscribe(self, args):
            if len(args) < 2:
                print 'expecting: unsubscribe <destination>'
            else:
                self.c.unsubscribe(args[1])

    try:
        if len(sys.argv) < 3:
            print 'USAGE: stomp-test3.py host port [user] [passcode]'
            sys.exit(1)

        host = sys.argv[1]
        port = int(sys.argv[2])

        if len(sys.argv) >= 5:
            user = sys.argv[3]
            passcode = sys.argv[4]
        else:
            user = 'test'
            passcode = 'test'

        st = StompTester(host, port, user, passcode)
        while 1:
            line = sys.stdin.readline()
            if not line or line.lstrip().rstrip() == '':
                continue
            elif 'quit' in line or 'disconnect' in line:
                st.disconnect(None)
                break

            split = line.split()
            if hasattr(st, split[0]):
                getattr(st, split[0])(split)
            else:
                print 'unrecognized command'
    except KeyboardInterrupt:
        st.disconnect(None)

Leave a Reply