Newer
Older
mbed-os / drivers / usb / tests / TESTS / host_tests / usb_device_serial.py
"""
mbed SDK
Copyright (c) 2018 ARM Limited
SPDX-License-Identifier: Apache-2.0

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from __future__ import print_function
import functools
import itertools
import time
import threading
import uuid
import sys
import serial
import serial.tools.list_ports as stlp
import six
import mbed_host_tests

# Pyserial 3.5 has a compatibility breaking capitalization change :((
try:
    from serial import portNotOpenError as PortNotOpenError
except ImportError:
    from serial import PortNotOpenError as PortNotOpenError


MSG_KEY_DEVICE_READY = 'ready'
MSG_KEY_SERIAL_NUMBER = 'usb_dev_sn'
MSG_KEY_PORT_OPEN_WAIT = 'port_open_wait'
MSG_KEY_PORT_OPEN_CLOSE = 'port_open_close'
MSG_KEY_SEND_BYTES_SINGLE = 'send_single'
MSG_KEY_SEND_BYTES_MULTIPLE = 'send_multiple'
MSG_KEY_LOOPBACK = 'loopback'
MSG_KEY_CHANGE_LINE_CODING = 'change_lc'

RX_BUFF_SIZE = 32

# This delay eliminates the possibility of the device detecting
# the port being closed when still waiting for data.
TERM_CLOSE_DELAY = 0.01

# A duration the serial terminal is open on the host side
# during terminal reopen test.
TERM_REOPEN_DELAY = 0.1

LINE_CODING_SEP = ','
LINE_CODING_DELIM = ';'


def usb_serial_name(serial_number):
    """Get USB serial device name based on the device serial number."""
    if sys.platform.startswith('win'):
        # The USB spec defines all USB string descriptors to be
        # UNICODE UTF-16LE. Windows however, decodes the USB serial
        # number string descriptor as uppercase characters only.
        # To solve this issue, convert the pattern to uppercase.
        serial_number = str(serial_number).upper()
    for port_info in stlp.comports():
        if port_info.serial_number == serial_number:
            return port_info.device
    return None


class RetryError(Exception):
    """Exception raised by retry_fun_call()."""


def retry_fun_call(fun, num_retries=3, retry_delay=0.0):
    """Call fun and retry if any exception was raised.

    fun is called at most num_retries with a retry_dalay in between calls.
    Raises RetryError if the retry limit is exhausted.
    """
    verbose = False
    final_err = None
    for retry in range(1, num_retries + 1):
        try:
            return fun()  # pylint: disable=not-callable
        except Exception as exc:  # pylint: disable=broad-except
            final_err = exc
            if verbose:
                print('Retry {}/{} failed ({})'
                      .format(retry, num_retries, str(fun)))
            time.sleep(retry_delay)
    err_msg = 'Failed with "{}". Tried {} times.'
    raise RetryError(err_msg.format(final_err, num_retries))


class USBSerialTest(mbed_host_tests.BaseHostTest):
    """Host side test for USB CDC & Serial classes."""

    _BYTESIZES = {
        5: serial.FIVEBITS,
        6: serial.SIXBITS,
        7: serial.SEVENBITS,
        8: serial.EIGHTBITS}
    _PARITIES = {
        0: serial.PARITY_NONE,
        1: serial.PARITY_ODD,
        2: serial.PARITY_EVEN,
        3: serial.PARITY_MARK,
        4: serial.PARITY_SPACE}
    _STOPBITS = {
        0: serial.STOPBITS_ONE,
        1: serial.STOPBITS_ONE_POINT_FIVE,
        2: serial.STOPBITS_TWO}

    @staticmethod
    def get_usb_serial_name(usb_id_str):
        """Get USB serial device name as registered in the system.

        Search is based on the unique USB SN generated by the host
        during test suite setup.
        Raises RuntimeError if the device is not found.
        """
        port_name = usb_serial_name(usb_id_str)
        if port_name is None:
            err_msg = 'USB serial device (SN={}) not found.'
            raise RuntimeError(err_msg.format(usb_id_str))
        return port_name

    def __init__(self):
        super(USBSerialTest, self).__init__()
        self.__bg_task = None
        self.dut_usb_dev_sn = uuid.uuid4().hex  # 32 hex digit string

    def port_open_wait(self):
        """Open the serial and wait until it's closed by the device."""

        # Note: Need to set dsrdtr on open to true to avoid exception on Linux
        # https://github.com/pyserial/pyserial/issues/67
        mbed_serial = serial.Serial(dsrdtr=True)

        mbed_serial.dtr = False
        try:
            mbed_serial.port = retry_fun_call(
                fun=functools.partial(self.get_usb_serial_name, self.dut_usb_dev_sn),  # pylint: disable=not-callable
                num_retries=20,
                retry_delay=0.05)
            retry_fun_call(
                fun=mbed_serial.open,
                num_retries=20,
                retry_delay=0.05)
        except RetryError as exc:
            self.log('TEST ERROR: {}'.format(exc))
            self.notify_complete(False)
            return
        mbed_serial.dtr = True
        try:
            mbed_serial.read()  # wait until closed
        except (PortNotOpenError, serial.SerialException):
            pass

    def port_open_close(self):
        """Open the serial and close it with a delay."""
        mbed_serial = serial.Serial(timeout=0.5, write_timeout=0.1, dsrdtr=True)
        mbed_serial.dtr = False
        try:
            mbed_serial.port = retry_fun_call(
                fun=functools.partial(self.get_usb_serial_name, self.dut_usb_dev_sn),  # pylint: disable=not-callable
                num_retries=20,
                retry_delay=0.05)
            retry_fun_call(
                fun=mbed_serial.open,
                num_retries=20,
                retry_delay=0.05)
        except RetryError as exc:
            self.log('TEST ERROR: {}'.format(exc))
            self.notify_complete(False)
            return
        mbed_serial.reset_output_buffer()
        mbed_serial.dtr = True
        time.sleep(TERM_REOPEN_DELAY)
        mbed_serial.close()

    def send_data_sequence(self, chunk_size=1):
        """Open the serial and send a sequence of values.

        chunk_size defines the size of data sent in each write operation.
        The input buffer content is discarded.
        """
        mbed_serial = serial.Serial(write_timeout=0.1, dsrdtr=True)
        try:
            mbed_serial.port = retry_fun_call(
                fun=functools.partial(self.get_usb_serial_name, self.dut_usb_dev_sn),  # pylint: disable=not-callable
                num_retries=20,
                retry_delay=0.05)
            retry_fun_call(
                fun=mbed_serial.open,
                num_retries=20,
                retry_delay=0.05)
        except RetryError as exc:
            self.log('TEST ERROR: {}'.format(exc))
            self.notify_complete(False)
            return
        mbed_serial.reset_output_buffer()
        mbed_serial.dtr = True
        for byteval in itertools.chain(reversed(range(0x100)), range(0x100)):
            try:
                payload = bytearray(chunk_size * (byteval,))
                mbed_serial.write(payload)
#                 self.log('SENT: {!r}'.format(payload))
                # Discard input buffer content. The data received from the
                # device during the concurrent rx/tx test is irrelevant.
                mbed_serial.reset_input_buffer()
            except serial.SerialException as exc:
                self.log('TEST ERROR: {}'.format(exc))
                self.notify_complete(False)
                return
        while mbed_serial.out_waiting > 0:
            time.sleep(0.001)
        time.sleep(TERM_CLOSE_DELAY)
        mbed_serial.close()

    def loopback(self):
        """Open the serial and send back every byte received."""
        mbed_serial = serial.Serial(timeout=0.5, write_timeout=0.1, dsrdtr=True)
        mbed_serial.dtr = False
        try:
            mbed_serial.port = retry_fun_call(
                fun=functools.partial(self.get_usb_serial_name, self.dut_usb_dev_sn),  # pylint: disable=not-callable
                num_retries=20,
                retry_delay=0.05)
            retry_fun_call(
                fun=mbed_serial.open,
                num_retries=20,
                retry_delay=0.05)
        except RetryError as exc:
            self.log('TEST ERROR: {}'.format(exc))
            self.notify_complete(False)
            return
        mbed_serial.reset_output_buffer()
        mbed_serial.dtr = True
        try:
            payload = mbed_serial.read(1)
            while len(payload) == 1:
                mbed_serial.write(payload)
#                 self.log('SENT: {!r}'.format(payload))
                payload = mbed_serial.read(1)
        except serial.SerialException as exc:
            self.log('TEST ERROR: {}'.format(exc))
            self.notify_complete(False)
            return
        while mbed_serial.out_waiting > 0:
            time.sleep(0.001)
        time.sleep(TERM_CLOSE_DELAY)
        mbed_serial.close()

    def change_line_coding(self):
        """Open the serial and change serial params according to device request.

        New line coding params are read from the device serial data.
        """
        mbed_serial = serial.Serial(timeout=0.5, dsrdtr=False)
        mbed_serial.dtr = False
        try:
            mbed_serial.port = retry_fun_call(
                fun=functools.partial(self.get_usb_serial_name, self.dut_usb_dev_sn),  # pylint: disable=not-callable
                num_retries=20,
                retry_delay=0.05)
            retry_fun_call(
                fun=mbed_serial.open,
                num_retries=20,
                retry_delay=0.05)
        except RetryError as exc:
            self.log('TEST ERROR: {}'.format(exc))
            self.notify_complete(False)
            return
        mbed_serial.reset_output_buffer()
        mbed_serial.dtr = True
        try:
            payload = six.ensure_str(mbed_serial.read_until(LINE_CODING_DELIM))
            while len(payload) > 0:
                baud, bits, parity, stop = (
                    int(i) for i in payload.strip(LINE_CODING_DELIM).split(LINE_CODING_SEP))
                new_line_coding = {
                    'baudrate': baud,
                    'bytesize': self._BYTESIZES[bits],
                    'parity': self._PARITIES[parity],
                    'stopbits': self._STOPBITS[stop]}
                mbed_serial.apply_settings(new_line_coding)
                payload = six.ensure_str(mbed_serial.read_until(LINE_CODING_DELIM))
        except serial.SerialException as exc:
            self.log('TEST ERROR: {}'.format(exc))
            self.notify_complete(False)
            return
        time.sleep(TERM_CLOSE_DELAY)
        mbed_serial.close()

    def setup(self):
        self.register_callback(MSG_KEY_DEVICE_READY, self.cb_device_ready)
        self.register_callback(MSG_KEY_PORT_OPEN_WAIT, self.cb_port_open_wait)
        self.register_callback(MSG_KEY_PORT_OPEN_CLOSE, self.cb_port_open_close)
        self.register_callback(MSG_KEY_SEND_BYTES_SINGLE, self.cb_send_bytes_single)
        self.register_callback(MSG_KEY_SEND_BYTES_MULTIPLE, self.cb_send_bytes_multiple)
        self.register_callback(MSG_KEY_LOOPBACK, self.cb_loopback)
        self.register_callback(MSG_KEY_CHANGE_LINE_CODING, self.cb_change_line_coding)

    def cb_device_ready(self, key, value, timestamp):
        """Send a unique USB SN to the device.

        DUT uses this SN every time it connects to host as a USB device.
        """
        self.send_kv(MSG_KEY_SERIAL_NUMBER, self.dut_usb_dev_sn)

    def start_bg_task(self, **thread_kwargs):
        """Start a new daemon thread.

        The callbacks delegate serial handling to a background task to
        prevent any delays in the device side assert handling. Only one
        background task is kept running to prevent multiple access
        to serial.
        """
        try:
            self.__bg_task.join()
        except (AttributeError, RuntimeError):
            pass
        self.__bg_task = threading.Thread(**thread_kwargs)
        self.__bg_task.daemon = True
        self.__bg_task.start()

    def cb_port_open_wait(self, key, value, timestamp):
        """Open the serial and wait until it's closed by the device."""
        self.start_bg_task(target=self.port_open_wait)

    def cb_port_open_close(self, key, value, timestamp):
        """Open the serial and close it with a delay."""
        self.start_bg_task(target=self.port_open_close)

    def cb_send_bytes_single(self, key, value, timestamp):
        """Open the serial and send a sequence of values."""
        self.start_bg_task(
            target=self.send_data_sequence,
            args=(1, ))

    def cb_send_bytes_multiple(self, key, value, timestamp):
        """Open the serial and send a sequence of one byte values."""
        chunk_size = RX_BUFF_SIZE * int(value)
        self.start_bg_task(
            target=self.send_data_sequence,
            args=(chunk_size, ))

    def cb_loopback(self, key, value, timestamp):
        """Open the serial and send a sequence of multibyte values."""
        self.start_bg_task(target=self.loopback)

    def cb_change_line_coding(self, key, value, timestamp):
        """Open the serial and change the line coding."""
        self.start_bg_task(target=self.change_line_coding)