GT06E GPS Tracker Part 2: Establishing connection

This is a continuation from Part 1.

As of writing this I already had complete solution (as far as getting tracking data is concerned), a multi-process TCP server with a mysql database back-end.
The server code itself is not pretty thus I am cautious about sharing it at this stage (I will “open-source” it once I cleaned it up).

One caveat: my solution does not support batch mode (where the payload contain multiple concatenated location and other packets).

The assumption is that the reader is somewhat familiar with python and sockets.

In this part I will focus on the initial and subsequent replies to the packets from trackers.
Due to nature of the “protocol” if the replies are not properly formed the tracker will drop the connection. Thus it is crucial to get it right.

For convenience I setup the following “constants”:

START=0x7878
ADV_START=0x7979
STOP=0x0D0A
PROTOCOL= {
    'login':0x01,
    'location':0x12,
    'status':0x13,
    'string':0x15,
    'alarm':0x16,
    'gps':0x1A,
    'command':0x80,
    'adv_string':0x94
}

Unfortunately due to closeness of the manufacturer the only way I could glimpse at the detail of the protocol is by googling the following phrases:
“GT06 protocol” and “GT02 protocol”. This site was very helpful.
I suggest to read the protocol description before trying to understand WTF is going on with my code. BTW: the GT06 is not the same as GT06E, and protocol slightly differs (thus I also included GT02).

When a tracker connects it sends a login packet. I process the login packet in the following way:

def rx_reply(data):
    payload = None
    serial = None
    dstart = None
    prot = None
    if data:
        dstart, = struct.unpack_from('>H',data)
    else:
        print("Skipping empty packet! @rx_reply() : '%s'" % binascii.hexlify(data))
        return False
    if dstart == START:
        elength = len(data)-5
        length, = struct.unpack_from('>B',data,2)
        if (elength) != length:
            print("Skipping packet with incorrect length; expected: 0x%02x (%d), received: 0x%02x (%d)!" % (elength,elength,length,length))
            return False
        dstop, = struct.unpack_from('>H',data,(length+3))
        if dstop != STOP:
            print("Skipping packet with incorrect stop sequence; expected: 0x%04x, received: 0x%04x!" % (STOP,dstop))
            return False
        prot, = struct.unpack_from('>B',data,3)
        serial,dcheck, = struct.unpack_from('>HH',data,(length-1))
        check = get_fcs(data[2:(length+1)],fcs_table)
        if check != dcheck:
            print("Skipping packet with incorrect check sum; expected: 0x%04x, received: 0x%04x!" % (check,dcheck))
            return False
        if prot == PROTOCOL['status']:
            payload = process_status(data[4:(length-1)])
        elif prot == PROTOCOL['string']:
            clength,serverid, = struct.unpack_from('>BI',data,4)
            eclength=length-8
            if clength != eclength:
                print("Skipping packet with incorrect command length; expected: 0x%02x (%d), received: 0x%02x (%d)!" % (eclength,eclength,clength,clength))
            payload, = struct.unpack_from(">%ds" % (clength-4),data,9)
        elif prot == PROTOCOL['location']:
            payload = process_location(data[4:(length-1)])
        else:
            payload = {'raw':binascii.hexlify(data[4:(length-1)])}
    elif dstart == ADV_START:
        elength = len(data)-6
        length, = struct.unpack_from('>H',data,2)
        if (elength) != length:
            print("Skipping packet with incorrect length; expected: 0x%02x (%d), received: 0x%02x (%d)!" % (elength,elength,length,length))
            return False
        dstop, = struct.unpack_from('>H',data,(length+4))
        if dstop != STOP:
            print("Skipping packet with incorrect stop sequence; expected: 0x%04x, received: 0x%04x!" % (STOP,dstop))
            return False
        prot, = struct.unpack_from('>B',data,4)
        serial,dcheck, = struct.unpack_from('>HH',data,(length))
        check = get_fcs(data[2:(length+2)],fcs_table)
        if check != dcheck:
            print("Skipping packet with incorrect check sum; expected: 0x%04x, received: 0x%04x!" % (check,dcheck))
        payload = data[5:(length-1)]
    else:
        print("Skipping packet with unknown start sequence; received: 0x%04x!" % dstart)
    if not serial:
        print("Received garbage hex: '%s' ascii %s" % (binascii.hexlify(data),repr(data)))
    return (serial,payload,prot,dstart)

Note: the length check will fail the batch packet and disconnect. Disable the batch mode or write a splitter function and batch packet detection logic. The get_fcs function and fcs_table are described in part 1.

Once I have serial, IMEI and protocol type I reply the following way:

def tx_login(serial,prot,start):
    pre=struct.pack('>H',start)
    if start == ADV_START:
        payload=struct.pack('>BHH',0x05,prot,serial)
    else:
        payload=struct.pack('>BBH',0x05,prot,serial)
    check= get_fcs(payload,fcs_table)
    post=struct.pack('>HH',check,STOP)
    return pre+payload+post

The simplest way to make this work is following:

serial,payload,prot,start = rx_reply(sock.recv(256))
imei=payload['raw']
sock.send(tx_login(serial,prot,start))

In next part I will describe decoding the location data and misc data (as much as I could find documentation for it).

Leave a Reply

Your email address will not be published. Required fields are marked *