# Copyright (c) 2018 Acroname Inc. - All Rights Reserved
#
# This file is part of the BrainStem (tm) package which is released under MIT.
# See file LICENSE or go to https://acroname.com for full license details.
"""
A module that provides a Spec class for specifying a connection to a BrainStem module.
A Spec instance fully describes a connection to a brainstem module. In the case of
USB based stems this is simply the serial number of the module. For TCPIP based stems
this is an IP address and TCP port.
For more information about links and the Brainstem network
see the `Acroname BrainStem Reference`_
.. _Acroname BrainStem Reference:
https://acroname.com/reference
"""
import socket
import struct
from ._bs_c_cffi import ffi
from . import _BS_C
from .result import Result
from .defs import (
MODEL_MTM_IOSERIAL
)
[docs]class Status(object):
""" Status variables represent the link status possibilities for Brainstem Links.
Status States:
* STOPPED (0)
* INITIALIZING (1)
* RUNNING (2)
* STOPPING (3)
* SYNCING (4)
* INVALID_LINK_STREAM (5)
* IO_ERROR (6)
* UNKNOWN_ERROR (7)
"""
STOPPED = 0
INITIALIZING = 1
RUNNING = 2
STOPPING = 3
SYNCING = 4
INVALID_LINK_STREAM = 5
IO_ERROR = 6
RESETTING = 7
UNKNOWN_ERROR = 8
[docs]class aEtherConfig(object):
""" aEther configuration class for configuring AETHER connection types.
Note: If localOnly == false AND networkInterface is default (0 or LOCALHOST_IP_ADDRESS)
it will be populated with the auto-selected interface upon successful connection.
Attributes:
enabled: True: Client-Server model is used; False: Direct module control is used.
fallback: True: If connections fails it will automatically search for network connections.
localOnly: True: Restricts access to localhost; False: Expose device to external network.
assignedPort: Server assigned port after successful connection.
networkInterface: Network interface to use for connections.
"""
def __init__(self):
self.enabled = True
self.fallback = True
self.localOnly = True
self.assignedPort = 0
self.networkInterface = _BS_C.LOCALHOST_IP_ADDRESS
def __str__(self):
return "aEther Config: Enabled: %d - Fallback: %d - LocalOnly: %d - AssignedPort: %d - NetworkInterface: %d" % \
(self.enabled, self.fallback, self.localOnly, self.assignedPort, self.networkInterface)
@staticmethod
def cca_config_to_python_config(cca_config):
config = aEtherConfig()
config.enabled = cca_config.enabled
config.fallback = cca_config.fallback
config.localOnly = cca_config.localOnly
config.assignedPort = cca_config.assignedPort
config.networkInterface = cca_config.networkInterface
return config
@staticmethod
def python_config_to_cca_config(config, cca_config):
cca_config.enabled = config.enabled
cca_config.fallback = config.fallback
cca_config.localOnly = config.localOnly
cca_config.assignedPort = config.assignedPort
cca_config.networkInterface = config.networkInterface
[docs]class Spec(object):
""" Spec class for specifying connection details
Instances of Spec represent the connection details for a brainstem link.
The Spec class also contains constants representing the possible transport
types for BrainStem modules.
args:
transport (int): One of USB, TCPIP, SERIAL or AETHER.
serial_number (int): The module serial number.
module: The module address on the Brainstem network.
model: The device model number of the Brainstem module.
**keywords: For TCPIP, SERIAL and AETHER connections. The possibilities are,
* ip_address: (int/str) The IPV4 address for a TCPIP/AETHER connection type.
* ip_port: (int/str) The port for a TCPIP/AETHER connection type.
* port: (str) The serial port for a SERIAL connection type.
* baudrate: (int/str) The baudrate for a SERIAL connection type.
"""
INVALID = 0 #: INVALID Undefined transport type.
USB = 1 #: USB transport type.
TCPIP = 2 #: TCPIP transport type.
SERIAL = 3 #: SERIAL transport type.
AETHER = 4 #: AETHER transport type.
def __init__(self, transport, serial_number, module, model, **keywords):
self.transport = transport
self.serial_number = serial_number
self.module = module
# Model was added in 2.2.0 This adds legacy support
if model == 2:
self.model = MODEL_MTM_IOSERIAL
else:
self.model = model
for key in keywords.keys():
if key == 'ip_address':
if isinstance(keywords[key], int):
self.ip_address = keywords[key]
else:
try:
self.ip_address = socket.inet_aton(keywords[key])
except socket.error:
raise ValueError("Failed to convert ip_address key ", keywords[key])
elif key == 'ip_port':
if isinstance(keywords[key], int):
self.ip_port = keywords[key]
else:
try:
self.ip_port = int(keywords[key])
except ValueError:
raise ValueError("Failed to convert ip_port key ", keywords[key])
elif key == 'port':
if isinstance(keywords[key], str):
self.port = keywords[key]
else:
try:
#This is probably a bad choice as it will ALWAYS succeed.
self.port = str(keywords[key])
except ValueError:
#This should never happen because every type in python can be converted to a string
raise ValueError("Failed to convert port key ", keywords[key])
elif key == 'baudrate':
if isinstance(keywords[key], int):
self.baudrate = keywords[key]
else:
try:
self.baudrate = int(keywords[key])
except ValueError:
raise ValueError("Failed to convert baudrate key ", keywords[key])
else:
raise KeyError("Unknown keyword in Spec ", key)
[docs] @staticmethod
def cca_spec_to_python_spec(cca_spec):
""" Internal: Translate cffi spec into python Spec"""
if cca_spec.type == Spec.USB:
return Spec(Spec.USB, cca_spec.serial_num, cca_spec.module, cca_spec.model)
elif cca_spec.type == Spec.TCPIP:
return Spec(Spec.TCPIP, cca_spec.serial_num, cca_spec.module, cca_spec.model,
ip_address=cca_spec.ip_address, ip_port=cca_spec.ip_port)
elif cca_spec.type == Spec.SERIAL:
return Spec(Spec.SERIAL, cca_spec.serial_num, cca_spec.module, cca_spec.model,
port=cca_spec.port, baudrate=cca_spec.baudrate)
elif cca_spec.type == Spec.AETHER:
return Spec(Spec.AETHER, cca_spec.serial_num, cca_spec.module, cca_spec.model,
ip_address=cca_spec.ip_address, ip_port=cca_spec.ip_port)
return None
@staticmethod
#NOTE: cca_spec is passed in because of how ffi lifetime works.
def python_spec_to_cca_spec(spec, cca_spec):
cca_spec.serial_num = spec.serial_number
cca_spec.module = spec.module
cca_spec.type = spec.transport
cca_spec.model = spec.model
if spec.transport == Spec.USB:
pass
elif spec.transport == Spec.TCPIP:
if hasattr(spec, "ip_address"):
cca_spec.ip_address = spec.ip_address
if hasattr(spec, "ip_port"):
cca_spec.ip_port = spec.ip_port
elif spec.transport == Spec.SERIAL:
if hasattr(spec, "baudrate"):
cca_spec.baudrate = spec.baudrate
if hasattr(spec, "port"):
CCA_SPEC_PORT_SIZE = 100
ffi.memmove(cca_spec.port, spec.port.encode('utf-8'), CCA_SPEC_PORT_SIZE)
cca_spec.port[CCA_SPEC_PORT_SIZE-1] = b'\0' #ensure null termination.
elif spec.transport == Spec.AETHER:
if hasattr(spec, "ip_address"):
cca_spec.ip_address = spec.ip_address
if hasattr(spec, "ip_port"):
cca_spec.ip_port = spec.ip_port
def __str__(self):
type_string = "USB"
if self.transport == Spec.TCPIP:
type_string = "TCPIP"
elif self.transport == Spec.SERIAL:
type_string = "SERIAL"
elif self.transport == Spec.AETHER:
type_string = "AETHER"
addr, port = ('', '')
if hasattr(self, 'ip_address'):
addr = ", IP Address: %s" % socket.inet_ntoa(struct.pack('!I', socket.htonl(self.ip_address)))
if hasattr(self, 'ip_port'):
port = ", IP Port: %d" % self.ip_port
if hasattr(self, 'port'):
port += ", Serial Port: %s" % self.port
if hasattr(self, 'baudrate'):
port += ", Baudrate: %d" % self.baudrate
return 'Model: %s LinkType: %s(serial: %08X%s%s)' % (self.model, type_string, self.serial_number, addr, port)
# Simple class which provide key and value properties for a tuple
[docs]class StreamStatusEntry(tuple):
STREAM_KEY_MODULE_ADDRESS = 0
STREAM_KEY_CMD = 1
STREAM_KEY_OPTION = 2
STREAM_KEY_INDEX = 3
STREAM_KEY_SUBINDEX = 4
def __new__(cls, key, value):
return super(StreamStatusEntry, cls).__new__(cls, (key, value))
def __str__(self):
return "StreamStatusEntry - Key: %d : Value: %d" % (self.key, self.value)
@property
def key(self):
"""unsigned long long (64bit): A unique key made up of module, cmd, option, index, subindex"""
return self[0]
@property
def value(self):
"""unsigned int (32bit): The Value associated with the key"""
return self[1]
@staticmethod
def getStreamKeyElement(key, element):
result = ffi.new("struct Result*")
_BS_C.link_getStreamKeyElement(result, key, element)
return Result(result.error, result.value)
# The link class manages the link to the stem. We REALLY only want one
# link to be created. Users should never directly instantiate a Link object.
class Link(object):
def __init__(self, id):
self.__id = id
@property
def id(self):
"""unsigned int: A unique identifier of the associated module"""
return self.__id[0]
@property
def _id_pointer(self):
"""unsigned int*: pointer to the unique identifier."""
return self.__id
def enableStream(self, module_address, cmd, option, index, enable):
"""
Enables streaming for the supplied criteria.
:param module_address: Address of module on link (stem.address is yourself)
:type module_address: unsigned byte
:param cmd: The command code.
:type cmd: unsigned byte
:param option: The option code.
:type option: unsigned byte
:param index: The entity index.
:type index: unsigned byte
:param enable: Enable (True) or disable (False) streaming.
:param enable: bool
:return: An error result from the list of defined error codes in brainstem.result
"""
result = ffi.new("struct Result*")
_BS_C.link_enableStream(self._id_pointer, result, module_address, cmd, option, index, enable)
return result.error
def getLinkSpecifier(self):
"""
Retrieves the current connection specification
: return: :return: Result object containing the requested value when the results error is set to NO_ERROR(0)
:rtype: Result containing a Spec
"""
result = ffi.new("struct Result*")
cspec = ffi.new("struct linkSpec_CCA*")
_BS_C.link_getLinkSpecifier(self._id_pointer, result, cspec)
return Result(result.error, Spec.cca_spec_to_python_spec(cspec))
def registerStreamCallback(self, module_address, cmd, option, index, enable, cb, pRef):
"""
Registers a callback function based on a specific module, cmd, option, and index.
:param module_address: Address of module on link (stem.address is yourself)
:type module_address: unsigned byte
:param cmd: The command code.
:type cmd: unsigned byte
:param option: The option code.
:type option: unsigned byte
:param index: The entity index.
:type index: unsigned byte
:param enable: Enable (True) or disable (False) streaming.
:param enable: bool
:param cb Callback to be executed on the provided criteria.
:type cb: @ffi.callback("unsigned char(aPacket*, void*)")
:param pRef Handle to be passed to the provided callback. This handle must be kept alive by the caller.
:type pRef: ffi handle
:return: An error result from the list of defined error codes in brainstem.result
"""
result = ffi.new("struct Result*")
_BS_C.link_registerStreamCallback(self._id_pointer, result, module_address, cmd, option, index, enable, cb, pRef)
return result.error
def getStreamStatus(self, module_address, cmd, option, index, sub_index, buffer_length=1024):
"""
Gets all available stream values based on the search criteria. 0xFF can be used as a wildcard for all possible values
:param module_address: Address of module on link (stem.address is yourself)
:type module_address: unsigned byte
:param cmd: The command code.
:type cmd: unsigned byte
:param option: The option code.
:type option: unsigned byte
:param index: The entity index.
:type index: unsigned byte
:param sub_index: The sub index.
:type sub_index: unsigned byte
:param buffer_length: Size of the buffer to allocate
:type buffer_length: subIndex int
:return: An error result from the list of defined error codes in brainstem.result
"""
result = ffi.new("struct Result*")
data = ffi.new("struct StreamStatusEntry_CCA[]", buffer_length)
_BS_C.link_getStreamStatus(self._id_pointer, result, module_address, cmd, option, index, sub_index, data, buffer_length)
if result.error:
return Result(result.error, tuple(list()))
status_list = []
for x in range(0, result.value):
status_list.append(StreamStatusEntry(data[x].key, data[x].value))
return Result(result.error, tuple(status_list))