Source code for brainstem.discover

# Copyright (c) 2018 Acroname Inc. - All Rights Reserved
#
# This file is part of the BrainStem (tm) package which is released under MIT.
# See file LICENSE or go to https://acroname.com for full license details.

"""
A module that provides methods for discovering brainstem modules over USB and TPCIP.

The discovery module provides an interface for locating BrainStem modules accross
multiple transports. It provides a way to find all modules for a give transport
as well as specific modules by serial number, or first found. The result of a call
to one of the discovery functions is either a list of brainstem.link.Spec objects,
or a single brainstem.link.Spec.

The Discovery module allows users to find specific brainstem devices via their
serial number, or a list of all devices connected to the host via usb or on the
same subnet via TCP/IP. In all cases a :doc:`Spec <link>` object is returned with
connection details for the device. In addition do connection details, the BrainStem
model is returned. This model is one of a list of BrainStem device model numbers
which are accessible via the :doc:`defs <defs>` module.

A typical interactive python session finding all connected USB modules might look
like the following.

    >> import brainstem
    >> module_list = brainstem.discover.findAllModules(brainstem.link.Spec.USB)
    >> print [str(s) for s in module_list]
    ['Model: 4 LinkType: USB(serial: 0xCB4A3B25, module: 0)', 'Model: 13 LinkType: USB(serial: 0x40F5849A, module: 0)']

For an overview of links, discovery and the Brainstem network
see the `Acroname BrainStem Reference`_

.. _Acroname BrainStem Reference:
    https://acroname.com/reference
"""

from . import _BS_C, ffi
from .link import Spec, aEtherConfig
from .result import Result

[docs]def findModule(transports, serial_number, aether_config=aEtherConfig()): """ Return the Spec for the module with the given serial number. Transports can be presented as a list. TCPIP modules take a little longer to find due to the Multicast and gather necessary for finding modules on the local network segment. :param transports: A list of transports or a single transport. :type transports: int or list(int) :param serial_number: The module serial_number to look for. :type serial_number: unsigned int :return: The connection spec for the module whose serial number is given in the args. """ if not hasattr(transports, '__iter__'): transports = [transports] for trans in transports: cspec = ffi.new("struct linkSpec_CCA*") result = ffi.new("struct Result*") fake_id = ffi.new("unsigned int*") fake_id[0] = 0 _BS_C.module_findModule(fake_id, result, cspec, serial_number, aether_config.networkInterface, trans) if result.error == Result.NO_ERROR: return Spec.cca_spec_to_python_spec(cspec) return None
[docs]def findFirstModule(transports, aether_config=aEtherConfig()): """ Return the Spec for the first module found on the given transport. :param transports: A list of transports or a single transport. :type transports: int or list(int) :param aether_config: Allows configuration of aEther other than the default. :type aether_config: aEtherConfig :return: The connection spec of the first module found on the given transport. :rtype: Spec """ if not hasattr(transports, '__iter__'): transports = [transports] for trans in transports: cspec = ffi.new("struct linkSpec_CCA*") result = ffi.new("struct Result*") fake_id = ffi.new("unsigned int*") #signature required, but not used. fake_id[0] = 0 _BS_C.module_findFirstModule(fake_id, result, cspec, aether_config.networkInterface, trans) if result.error == Result.NO_ERROR: return Spec.cca_spec_to_python_spec(cspec) return None
[docs]def findAllModules(transports, aether_config=aEtherConfig(), buffer_length=128): """ Return a list of Specs for all modules found on the transports given. Transports can be presented as a list, and the results would be a list of all modules found for those transports. TCPIP modules take a little longer to find due to the Multicast and gather necessary for finding modules on the local network segment. :param transports: A list of transports or a single transport. :type transports: int or list(int) :return: A list of the Spec objects for all modules found. :rtype: list(Spec) """ if not hasattr(transports, '__iter__'): transports = [transports] return_list = [] for trans in transports: ffi_buffer = ffi.new("struct linkSpec_CCA[]", buffer_length) result = ffi.new("struct Result*") fake_id = ffi.new("unsigned int*") fake_id[0] = 0 _BS_C.module_sDiscover(fake_id, result, ffi_buffer, buffer_length, trans) return_list += [Spec.cca_spec_to_python_spec(ffi_buffer[i]) for i in range(result.value)] return return_list
[docs]def getIPv4Interfaces(list_length=30): """ Populates a list with all of the available IPv4 Interfaces. :param list_length: Size of list to allocate for. :type list_length: unsigned int :return: A tuple of IPv4 interfaces. :rtype: tuple(unsigned int) """ result = ffi.new("struct Result*") data = ffi.new("unsigned int[]", list_length) _BS_C.module_GetIPv4Interfaces(result, data, list_length) device_list = [] if result.error == Result.NO_ERROR: for x in range(0, result.value): device_list.append(data[x]) return tuple(device_list)
[docs]class DeviceNode(object): """ Python representation of DeviceNode_t (C structure) - hub_serial_number (uint32_t): Serial number of the Acroname hub where the device was found. - hub_port (uint8_t): Port of the Acroname hub where the device was found. - id_vendor (uint16_t): Manufactures Vendor ID of the downstream device. - id_product (uint16_t): Manufactures Product ID of the downstream device. - speed (enumeration): The devices downstream device speed. - Unknown (0) - Low Speed (1) - Full Speed (2) - High Speed (3) - Super Speed (4) - Super Speed Plus (5) - product_name (string): USB string descriptor. - manufacture (string): USB string descriptor. - serial_number (string): USB string descriptor. """ def __init__(self): self.hub_serial_number = 0 self.hub_port = 0 self.id_vendor = 0 self.id_product = 0 self.speed = 0 self.product_name = "" self.manufacture = "" self.serial_number = "" def __str__(self): ret = "\n" ret = ret + "SN: 0x%08X\n" % self.hub_serial_number ret = ret + "Port: %d\n" % self.hub_port ret = ret + "\tVendor ID: 0x%04X\n" % self.id_vendor ret = ret + "\tProduct ID: 0x%04X\n" % self.id_product ret = ret + "\tSpeed: %d\n" % self.speed ret = ret + "\tProduct Name: %s\n" % self.product_name ret = ret + "\tManufacture: %s\n" % self.manufacture ret = ret + "\tSerial Number: %s\n" % self.serial_number return ret def __repr__(self): return self.__str__()
[docs]def getDownstreamDevices(list_length=128): """ Gets downstream device USB information for all Acroname hubs. :param list_length: The amount of memory to provide for the lower level C call. :return: Result object containing NO_ERROR and a tuple of DeviceNode's containing the detected downstream devices:: - **aErrParam**: Passed in values are not valid (NULL, size, etc). - **aErrMemory**: No more room in the list. - **aErrNotFound**: No Acroname devices were found. :rtype: Result """ data = ffi.new("struct DeviceNode_CCA[]", list_length) result = ffi.new("struct Result*") _BS_C.portMapping_getDownstreamDevices(result, data, list_length) if result.error: return Result(result.error, tuple(list())) device_list = [] for x in range(0, result.value): node = DeviceNode() node.hub_serial_number = data[x].hubSerialNumber node.hub_port = data[x].hubPort node.id_vendor = data[x].idVendor node.id_product = data[x].idProduct node.speed = data[x].speed node.product_name = ffi.string(data[x].productName) node.manufacture = ffi.string(data[x].manufacturer) node.serial_number = ffi.string(data[x].serialNumber) device_list.append(node) return Result(result.error, tuple(device_list))