Newer
Older
barebox / scripts / remote / messages.py
@Aleksander Morgado Aleksander Morgado on 17 Sep 2018 15 KB bbremote: implement support for GPIO operations
#!/usr/bin/env python2
# -*- coding: utf-8 -*-

from __future__ import absolute_import, division, print_function

import struct
import binascii


class BBType(object):
    command = 1
    command_return = 2
    consolemsg = 3
    ping = 4
    pong = 5
    getenv = 6
    getenv_return = 7
    fs = 8
    fs_return = 9
    md = 10
    md_return = 11
    mw = 12
    mw_return = 13
    reset = 14
    i2c_read = 15
    i2c_read_return = 16
    i2c_write = 17
    i2c_write_return = 18
    gpio_get_value = 19
    gpio_get_value_return = 20
    gpio_set_value = 21
    gpio_set_value_return = 22
    gpio_set_direction = 23
    gpio_set_direction_return = 24


class BBPacket(object):
    def __init__(self, p_type=0, p_flags=0, payload="", raw=None):
        self.p_type = p_type
        self.p_flags = p_flags
        if raw is not None:
            self.unpack(raw)
        else:
            self.payload = payload

    def __repr__(self):
        return "BBPacket(%i, %i)" % (self.p_type, self.p_flags)

    def _unpack_payload(self, data):
        self.payload = data

    def _pack_payload(self):
        return self.payload

    def unpack(self, data):
        self.p_type, self.p_flags = struct.unpack("!HH", data[:4])
        self._unpack_payload(data[4:])

    def pack(self):
        return struct.pack("!HH", self.p_type, self.p_flags) + \
            self._pack_payload()


class BBPacketCommand(BBPacket):
    def __init__(self, raw=None, cmd=None):
        self.cmd = cmd
        super(BBPacketCommand, self).__init__(BBType.command, raw=raw)

    def __repr__(self):
        return "BBPacketCommand(cmd=%r)" % self.cmd

    def _unpack_payload(self, payload):
        self.cmd = payload

    def _pack_payload(self):
        return self.cmd


class BBPacketCommandReturn(BBPacket):
    def __init__(self, raw=None, exit_code=None):
        self.exit_code = exit_code
        super(BBPacketCommandReturn, self).__init__(BBType.command_return,
                                                    raw=raw)

    def __repr__(self):
        return "BBPacketCommandReturn(exit_code=%i)" % self.exit_code

    def _unpack_payload(self, data):
        self.exit_code, = struct.unpack("!L", data[:4])

    def _pack_payload(self):
        return struct.pack("!L", self.exit_code)


class BBPacketConsoleMsg(BBPacket):
    def __init__(self, raw=None, text=None):
        self.text = text
        super(BBPacketConsoleMsg, self).__init__(BBType.consolemsg, raw=raw)

    def __repr__(self):
        return "BBPacketConsoleMsg(text=%r)" % self.text

    def _unpack_payload(self, payload):
        self.text = payload

    def _pack_payload(self):
        return self.text


class BBPacketPing(BBPacket):
    def __init__(self, raw=None):
        super(BBPacketPing, self).__init__(BBType.ping, raw=raw)

    def __repr__(self):
        return "BBPacketPing()"


class BBPacketPong(BBPacket):
    def __init__(self, raw=None):
        super(BBPacketPong, self).__init__(BBType.pong, raw=raw)

    def __repr__(self):
        return "BBPacketPong()"


class BBPacketGetenv(BBPacket):
    def __init__(self, raw=None, varname=None):
        self.varname = varname
        super(BBPacketGetenv, self).__init__(BBType.getenv, raw=raw)

    def __repr__(self):
        return "BBPacketGetenv(varname=%r)" % self.varname

    def _unpack_payload(self, payload):
        self.varname = payload

    def _pack_payload(self):
        return self.varname


class BBPacketGetenvReturn(BBPacket):
    def __init__(self, raw=None, text=None):
        self.text = text
        super(BBPacketGetenvReturn, self).__init__(BBType.getenv_return,
                                                   raw=raw)

    def __repr__(self):
        return "BBPacketGetenvReturn(varvalue=%s)" % self.text

    def _unpack_payload(self, payload):
        self.text = payload

    def _pack_payload(self):
        return self.text


class BBPacketFS(BBPacket):
    def __init__(self, raw=None, payload=None):
        super(BBPacketFS, self).__init__(BBType.fs, payload=payload, raw=raw)

    def __repr__(self):
        return "BBPacketFS(payload=%r)" % self.payload


class BBPacketFSReturn(BBPacket):
    def __init__(self, raw=None, payload=None):
        super(BBPacketFSReturn, self).__init__(BBType.fs_return, payload=payload, raw=raw)

    def __repr__(self):
        return "BBPacketFSReturn(payload=%r)" % self.payload


class BBPacketMd(BBPacket):
    def __init__(self, raw=None, path=None, addr=None, size=None):
        self.path = path
        self.addr = addr
        self.size = size
        super(BBPacketMd, self).__init__(BBType.md, raw=raw)

    def __repr__(self):
        return "BBPacketMd(path=%r,addr=0x%x,size=%u)" % (self.path, self.addr, self.size)

    def _unpack_payload(self, payload):
        buffer_offset, self.addr, self.size, path_size, path_offset = struct.unpack("!HHHHH", payload[:10])
        # header size is always 4 bytes (HH), so adjust the absolute data offset without the header size
        absolute_path_offset = buffer_offset + path_offset - 4
        self.path = payload[absolute_path_offset:absolute_path_offset+path_size]

    def _pack_payload(self):
        # header size is always 4 bytes (HH) and we have 10 bytes of fixed data (HHHHH), so buffer offset is 14
        return struct.pack("!HHHHH%ds" % len(self.path), 14, self.addr, self.size, len(self.path), 0, self.path)


class BBPacketMdReturn(BBPacket):
    def __init__(self, raw=None, exit_code=None, data=None):
        self.exit_code = exit_code
        self.data = data
        super(BBPacketMdReturn, self).__init__(BBType.md_return, raw=raw)

    def __repr__(self):
        return "BBPacketMdReturn(exit_code=%i, data=%s)" % (self.exit_code, binascii.hexlify(self.data))

    def _unpack_payload(self, payload):
        buffer_offset, self.exit_code, data_size, data_offset = struct.unpack("!HLHH", payload[:10])
        # header size is always 4 bytes (HH), so adjust the absolute data offset without the header size
        absolute_data_offset = buffer_offset + data_offset - 4
        self.data = payload[absolute_data_offset:absolute_data_offset + data_size]

    def _pack_payload(self):
        # header size is always 4 bytes (HH) and we have 10 bytes of fixed data (HLHH), so buffer offset is 14
        return struct.pack("!HLHH%ds" % len(self.data), 14, self.exit_code, len(self.data), 0, self.data)
        return self.text


class BBPacketMw(BBPacket):
    def __init__(self, raw=None, path=None, addr=None, data=None):
        self.path = path
        self.addr = addr
        self.data = data
        super(BBPacketMw, self).__init__(BBType.mw, raw=raw)

    def __repr__(self):
        return "BBPacketMw(path=%r,addr=0x%x,data=%r)" % (self.path, self.addr, self.data)

    def _unpack_payload(self, payload):
        buffer_offset, self.addr, path_size, path_offset, data_size, data_offset = struct.unpack("!HHHHHH", payload[:12])
        # header size is always 4 bytes (HH), so adjust the absolute data offset without the header size
        absolute_path_offset = buffer_offset + path_offset - 4
        self.path = payload[absolute_path_offset:absolute_path_offset+path_size]
        absolute_data_offset = buffer_offset + data_offset - 4
        self.data = payload[absolute_data_offset:absolute_data_offset+data_size]

    def _pack_payload(self):
        # header size is always 4 bytes (HH) and we have 12 bytes of fixed data (HHHHHH), so buffer offset is 16
        path_size = len(self.path)
        data_size = len(self.data)
        return struct.pack("!HHHHHH%ds%ds" % (path_size, path_size), 16, self.addr, path_size, 0, data_size, path_size, self.path, self.data)


class BBPacketMwReturn(BBPacket):
    def __init__(self, raw=None, exit_code=None, written=None):
        self.exit_code = exit_code
        self.written = written
        super(BBPacketMwReturn, self).__init__(BBType.mw_return, raw=raw)

    def __repr__(self):
        return "BBPacketMwReturn(exit_code=%i, written=%i)" % (self.exit_code, self.written)

    def _unpack_payload(self, payload):
        buffer_offset, self.exit_code, self.written = struct.unpack("!HLH", payload[:8])

    def _pack_payload(self):
        # header size is always 4 bytes (HH) and we have 8 bytes of fixed data (HLH), so buffer offset is 14
        return struct.pack("!HLH", 12, self.exit_code, self.written)


class BBPacketReset(BBPacket):
    def __init__(self, raw=None, force=None):
        self.force = force
        super(BBPacketReset, self).__init__(BBType.reset, raw=raw)

    def __repr__(self):
        return "BBPacketReset(force=%c)" % (self.force)

    def _unpack_payload(self, payload):
        self.force = struct.unpack("?", payload[:1])

    def _pack_payload(self):
        return struct.pack("?", self.force)


class BBPacketI2cRead(BBPacket):
    def __init__(self, raw=None, bus=None, addr=None, reg=None, flags=None, size=None):
        self.bus = bus
        self.addr = addr
        self.reg = reg
        self.flags = flags
        self.size = size
        super(BBPacketI2cRead, self).__init__(BBType.i2c_read, raw=raw)

    def __repr__(self):
        return "BBPacketI2cRead(bus=0x%x,addr=0x%x,reg=0x%x,flags=0x%x,size=%u)" % (self.bus, self.addr, self.reg, self.flags, self.size)

    def _unpack_payload(self, payload):
        buffer_offset, self.bus, self.addr, self.reg, self.flags, self.size = struct.unpack("!HBBHBH", payload[:9])

    def _pack_payload(self):
        # header size is always 4 bytes (HH) and we have 9 bytes of fixed data (HBBHBH), so buffer offset is 13
        return struct.pack("!HBBHBH", 13, self.bus, self.addr, self.reg, self.flags, self.size)


class BBPacketI2cReadReturn(BBPacket):
    def __init__(self, raw=None, exit_code=None, data=None):
        self.exit_code = exit_code
        self.data = data
        super(BBPacketI2cReadReturn, self).__init__(BBType.i2c_read_return, raw=raw)

    def __repr__(self):
        return "BBPacketI2cReadReturn(exit_code=%i, data=%s)" % (self.exit_code, binascii.hexlify(self.data))

    def _unpack_payload(self, payload):
        buffer_offset, self.exit_code, data_size, data_offset = struct.unpack("!HLHH", payload[:10])
        # header size is always 4 bytes (HH), so adjust the absolute data offset without the header size
        absolute_data_offset = buffer_offset + data_offset - 4
        self.data = payload[absolute_data_offset:absolute_data_offset + data_size]

    def _pack_payload(self):
        # header size is always 4 bytes (HH) and we have 10 bytes of fixed data (HLHH), so buffer offset is 14
        return struct.pack("!HLHH%ds" % len(self.data), 14, self.exit_code, len(self.data), 0, self.data)
        return self.text


class BBPacketI2cWrite(BBPacket):
    def __init__(self, raw=None, bus=None, addr=None, reg=None, flags=None, data=None):
        self.bus = bus
        self.addr = addr
        self.reg = reg
        self.flags = flags
        self.data = data
        super(BBPacketI2cWrite, self).__init__(BBType.i2c_write, raw=raw)

    def __repr__(self):
        return "BBPacketI2cWrite(bus=0x%x,addr=0x%x,reg=0x%x,flags=0x%x,data=%r)" % (self.bus, self.addr, self.reg, self.flags, self.data)

    def _unpack_payload(self, payload):
        buffer_offset, self.bus, self.addr, self.reg, self.flags, data_size, data_offset = struct.unpack("!HBBHBHH", payload[:11])
        # header size is always 4 bytes (HH), so adjust the absolute data offset without the header size
        absolute_data_offset = buffer_offset + data_offset - 4
        self.data = payload[absolute_data_offset:absolute_data_offset+data_size]

    def _pack_payload(self):
        # header size is always 4 bytes (HH) and we have 11 bytes of fixed data (HBBHBHH), so buffer offset is 15
        data_size = len(self.data)
        return struct.pack("!HBBHBHH%ds" % data_size, 15, self.bus, self.addr, self.reg, self.flags, data_size, 0, self.data)


class BBPacketI2cWriteReturn(BBPacket):
    def __init__(self, raw=None, exit_code=None, written=None):
        self.exit_code = exit_code
        self.written = written
        super(BBPacketI2cWriteReturn, self).__init__(BBType.i2c_write_return, raw=raw)

    def __repr__(self):
        return "BBPacketI2cWriteReturn(exit_code=%i, written=%i)" % (self.exit_code, self.written)

    def _unpack_payload(self, payload):
        buffer_offset, self.exit_code, self.written = struct.unpack("!HLH", payload[:8])

    def _pack_payload(self):
        # header size is always 4 bytes (HH) and we have 8 bytes of fixed data (HLH), so buffer offset is 14
        return struct.pack("!HLH", 12, self.exit_code, self.written)


class BBPacketGpioGetValue(BBPacket):
    def __init__(self, raw=None, gpio=None):
        self.gpio = gpio
        super(BBPacketGpioGetValue, self).__init__(BBType.gpio_get_value, raw=raw)

    def __repr__(self):
        return "BBPacketGpioGetValue(gpio=%u)" % (self.gpio)

    def _unpack_payload(self, payload):
        self.gpio = struct.unpack("!L", payload[:4])

    def _pack_payload(self):
        return struct.pack("!L", self.gpio)


class BBPacketGpioGetValueReturn(BBPacket):
    def __init__(self, raw=None, value=None):
        self.value = value
        super(BBPacketGpioGetValueReturn, self).__init__(BBType.gpio_get_value_return, raw=raw)

    def __repr__(self):
        return "BBPacketGpioGetValueReturn(value=%u)" % (self.value)

    def _unpack_payload(self, payload):
        self.value = struct.unpack("!B", payload[:1])

    def _pack_payload(self):
        return struct.pack("!B", self.value)


class BBPacketGpioSetValue(BBPacket):
    def __init__(self, raw=None, gpio=None, value=None):
        self.gpio = gpio
        self.value = value
        super(BBPacketGpioSetValue, self).__init__(BBType.gpio_set_value, raw=raw)

    def __repr__(self):
        return "BBPacketGpioSetValue(gpio=%u,value=%u)" % (self.gpio, self.value)

    def _unpack_payload(self, payload):
        self.gpio = struct.unpack("!LB", payload[:5])

    def _pack_payload(self):
        return struct.pack("!LB", self.gpio, self.value)


class BBPacketGpioSetValueReturn(BBPacket):
    def __init__(self, raw=None, value=None):
        self.value = value
        super(BBPacketGpioSetValueReturn, self).__init__(BBType.gpio_set_value_return, raw=raw)

    def __repr__(self):
        return "BBPacketGpioSetValueReturn()"


class BBPacketGpioSetDirection(BBPacket):
    def __init__(self, raw=None, gpio=None, direction=None, value=None):
        self.gpio = gpio
        self.direction = direction
        self.value = value
        super(BBPacketGpioSetDirection, self).__init__(BBType.gpio_set_direction, raw=raw)

    def __repr__(self):
        return "BBPacketGpioSetDirection(gpio=%u,direction=%u,value=%u)" % (self.gpio, self.direction, self.value)

    def _unpack_payload(self, payload):
        self.gpio = struct.unpack("!LBB", payload[:6])

    def _pack_payload(self):
        return struct.pack("!LBB", self.gpio, self.direction, self.value)


class BBPacketGpioSetDirectionReturn(BBPacket):
    def __init__(self, raw=None, exit_code=None):
        self.exit_code = exit_code
        super(BBPacketGpioSetDirectionReturn, self).__init__(BBType.gpio_set_direction_return, raw=raw)

    def __repr__(self):
        return "BBPacketGpioSetDirectionReturn(exit_code=%u)" % (self.exit_code)

    def _unpack_payload(self, payload):
        self.exit_code = struct.unpack("!L", payload[:4])

    def _pack_payload(self):
        return struct.pack("!L", self.exit_code)