# Copyright (c) 2018 Acroname Inc. - All Rights Reserved
# This file is part of the BrainStem (tm) package which is released under MIT.
# See file LICENSE or go to https://acroname.com for full license details.
A module that provides methods for discovering brainstem modules over USB and TPCIP.
The discovery module provides an interface for locating BrainStem modules accross
multiple transports. It provides a way to find all modules for a give transport
as well as specific modules by serial number, or first found. The result of a call
to one of the discovery functions is either a list of brainstem.link.Spec objects,
or a single brainstem.link.Spec.
The Discovery module allows users to find specific brainstem devices via their
serial number, or a list of all devices connected to the host via usb or on the
same subnet via TCP/IP. In all cases a :doc:`Spec <link>` object is returned with
connection details for the device. In addition do connection details, the BrainStem
model is returned. This model is one of a list of BrainStem device model numbers
which are accessible via the :doc:`defs <defs>` module.
A typical interactive python session finding all connected USB modules might look
like the following.
>> import brainstem
>> module_list = brainstem.discover.findAllModules(brainstem.link.Spec.USB)
>> print [str(s) for s in module_list]
['Model: 4 LinkType: USB(serial: 0xCB4A3B25, module: 0)', 'Model: 13 LinkType: USB(serial: 0x40F5849A, module: 0)']
For an overview of links, discovery and the Brainstem network
see the `Acroname BrainStem Reference`_
.. _Acroname BrainStem Reference:
from . import _BS_C, ffi
from .link import Spec, aEtherConfig
from .result import Result
[docs]def findModule(transports, serial_number, aether_config=aEtherConfig()):
""" Return the Spec for the module with the given serial number.
Transports can be presented as a list. TCPIP modules
take a little longer to find due to the Multicast and gather
necessary for finding modules on the local network segment.
transports (list(int)): A list of transports or a single transport.
serial_number (int): The module serial_number to look for.
Spec: The connection spec for the module whose serial number is
given in the args.
_result = None
if not hasattr(transports, '__iter__'):
transports = [transports]
for trans in transports:
# translate python Spec to C enum type.
_trans = _get_c_transport(trans)
if _trans is None:
return _result
# Now get a linkSpec* variable if there is a module.
_cspec = _BS_C.aDiscovery_FindModule(_trans,
if _cspec != ffi.NULL:
_result = _get_python_find_result(_cspec)
# Free the memory allocated by the C Lib call. CFFI didn't allocate,
# so the _cspec doesn't "own" the memory and it won't be GC'd
linkref = ffi.new('linkSpec**')
linkref[0] = _cspec
# return translated result or None if not found.
return _result
[docs]def findFirstModule(transport, aether_config=aEtherConfig()):
""" Return the Spec for the first module found on the given transport.
TCPIP modules take a little longer to find due to the Multicast and
gather necessary for finding modules on the local network segment.
transport (int): One of USB or TCPIP.
Spec: The connection spec of the first module found on the
given transport.
_result = None
if not hasattr(transport, '__iter__'):
transport = [transport]
for trans in transport:
# translate python Spec to C enum type.
_trans = _get_c_transport(trans)
if _trans is None:
return _result
# Now get a linkSpec* variable if there is a module.
_cspec = _BS_C.aDiscovery_FindFirstModule(_trans,
if _cspec != ffi.NULL:
_result = _get_python_find_result(_cspec)
# Free the memory allocated by the C Lib call. CFFI didn't allocate,
# so the _cspec doesn't "own" the memory and it won't be GC'd
linkref = ffi.new('linkSpec**')
linkref[0] = _cspec
# return translated result or None if not found.
return _result
[docs]def findAllModules(transports, aether_config=aEtherConfig()):
""" Return a list of Specs for all modules found on the transports given.
Transports can be presented as a list, and the results would be
a list of all modules found for those transports. TCPIP modules
take a little longer to find due to the Multicast and gather
necessary for finding modules on the local network segment.
transports (list(int)): A list of transports or a single transport.
list(Spec): A list of the Specs for all modules found.
_results = list()
_cresults = ffi.new_handle(_results)
@ffi.callback("_Bool(linkSpec*, _Bool*, void*)")
def findAll(spec, success, context):
results = ffi.from_handle(context)
device = _get_python_find_result(spec)
success[0] = True
return True
if not hasattr(transports, '__iter__'):
transports = [transports]
for trans in transports:
# translate python Spec to C enum type.
_trans = _get_c_transport(trans)
if _trans is None:
return _results
return _results
def getIPv4Interfaces(list_length=30):
data = ffi.new("uint32_t[]", list_length)
interfaces_found = _BS_C.aDiscovery_GetIPv4Interfaces(data, list_length)
device_list = []
for x in range(0, interfaces_found):
return tuple(device_list)
[docs]class DeviceNode(object):
Python representation of DeviceNode_t (C structure)
- hub_serial_number (uint32_t): Serial number of the Acroname hub where the device was found.
- hub_port (uint8_t): Port of the Acroname hub where the device was found.
- id_vendor (uint16_t): Manufactures Vendor ID of the downstream device.
- id_product (uint16_t): Manufactures Product ID of the downstream device.
- speed (enumeration): The devices downstream device speed.
- Unknown (0)
- Low Speed (1)
- Full Speed (2)
- High Speed (3)
- Super Speed (4)
- Super Speed Plus (5)
- product_name (string): USB string descriptor.
- manufacture (string): USB string descriptor.
- serial_number (string): USB string descriptor.
def __init__(self):
self.hub_serial_number = 0
self.hub_port = 0
self.id_vendor = 0
self.id_product = 0
self.speed = 0
self.product_name = ""
self.manufacture = ""
self.serial_number = ""
def __str__(self):
ret = "\n"
ret = ret + "SN: 0x%08X\n" % self.hub_serial_number
ret = ret + "Port: %d\n" % self.hub_port
ret = ret + "\tVendor ID: 0x%04X\n" % self.id_vendor
ret = ret + "\tProduct ID: 0x%04X\n" % self.id_product
ret = ret + "\tSpeed: %d\n" % self.speed
ret = ret + "\tProduct Name: %s\n" % self.product_name
ret = ret + "\tManufacture: %s\n" % self.manufacture
ret = ret + "\tSerial Number: %s\n" % self.serial_number
return ret
def __repr__(self):
return self.__str__()
[docs]def getDownstreamDevices(list_length=128):
Gets downstream device USB information for all Acroname hubs.
list_length: The amount of memory to provide for the lower level C call.
Result: Result object, containing NO_ERROR and a tuple of DeviceNode's
containing the detected downstream devices.
- aErrParam: Passed in values are not valid. (NULL, size etc).
- aErrMemory: No more room in the list
- aErrNotFound: No Acroname devices were found.
data = ffi.new("DeviceNode_t[]", list_length)
num_devices_found = ffi.new("uint32_t*")
err = _BS_C.getDownstreamDevices(data, list_length, num_devices_found)
device_list = []
for x in range(0, num_devices_found[0]):
node = DeviceNode()
node.hub_serial_number = data[x].hubSerialNumber
node.hub_port = data[x].hubPort
node.id_vendor = data[x].idVendor
node.id_product = data[x].idProduct
node.speed = data[x].speed
node.product_name = ffi.string(data[x].productName)
node.manufacture = ffi.string(data[x].manufacturer)
node.serial_number = ffi.string(data[x].serialNumber)
return Result(err, tuple(device_list))
def _get_c_transport(transport):
""" Internal: Translate Spec transport to cffi transport"""
_trans = None
if transport == Spec.USB:
_trans = _BS_C.USB
elif transport == Spec.TCPIP:
_trans = _BS_C.TCPIP
elif transport == Spec.SERIAL:
_trans = _BS_C.SERIAL
elif transport == Spec.AETHER:
_trans = _BS_C.AETHER
return _trans
def _get_python_find_result(cspec):
""" Internal: Translate cffi spec into python Spec"""
if cspec.type == _BS_C.USB:
result = Spec(Spec.USB, cspec.serial_num, cspec.module, cspec.model)
elif cspec.type == _BS_C.TCPIP:
result = Spec(Spec.TCPIP, cspec.serial_num, cspec.module, cspec.model,
ip_address=cspec.t.ip.ip_address, ip_port=cspec.t.ip.ip_port)
elif cspec.type == _BS_C.SERIAL:
result = Spec(Spec.SERIAL, cspec.serial_num, cspec.module, cspec.model,
port=cspec.t.serial.port, baudrate=cspec.t.serial.baudrate)
elif cspec.type == _BS_C.AETHER:
result = Spec(Spec.AETHER, cspec.serial_num, cspec.module, cspec.model,
ip_address=cspec.t.ip.ip_address, ip_port=cspec.t.ip.ip_port)
return result