Source code for brainstem.module

# Copyright (c) 2018 Acroname Inc. - All Rights Reserved
#
# This file is part of the BrainStem (tm) package which is released under MIT.
# See file LICENSE or go to https://acroname.com for full license details.

"""
A module that provides base classes for BrainStem Modules and Entities.

The Module and Entity classes are designed to be extended for specific types
of BraiStem Modules and Entities. For more information about Brainstem Modules
and Entities, please see the `Terminology`_ section of the `Acroname BrainStem Reference`_

.. _Terminology:
    https://acroname.com/reference/brainstem/terms.html

.. _Acroname BrainStem Reference:
    https://acroname.com/reference
"""
import gc
import struct
from time import sleep

from . import _BS_C
from ._link import Link, UEI
from .link import Spec, Status, aEtherConfig
from .result import Result
from . import discover


[docs]class Module(object): """ Base class for BrainStem Modules. Provides default implementations for connecting and disconnecting from BrainStem modules via the module's serial number, a `Spec`_ object or through another module. .. _Spec: brainstem.spec """ def __init__(self, address, enable_auto_networking=True, model=0): """ Initialize a Module object. Args: address (int): The BrainStem module addresses should be even integers. """ self.__address = address self.__link = None self.__spec = None self.__bAutoNetworking = enable_auto_networking self.__model = model self.__aether_config = aEtherConfig() def __del__(self): self.__link = None self.__spec = None gc.collect() @property def address(self): """ int: Return the Brainstem module address. """ return self.__address @property def bAutoNetworking(self): """ bool: Return the current networking mode. """ return self.__bAutoNetworking @property def spec(self): """ Spec: Return the current spec object. """ return self.__spec @property def link(self): """ Link: return the current link or None. """ return self.__link @property def model(self): """ Model: returns the model number of the object. """ return self.__model def getConfig(self): """ """ return self.__aether_config def setConfig(self, config): """ """ if self.isConnected(): return Result.CONNECTION_ERROR self.__aether_config = config return Result.NO_ERROR # Connect from spec should be fast. The spec fully qualifies connection # parameters for the link.
[docs] def connectFromSpec(self, spec): """ Result.error: Connect to a BrainStem module with a Spec. args: spec (Spec): The specifier for the connection. returns: Result.error: Returns an error result from the list of defined error codes in brainstem.result """ if spec is None: return Result.PARAMETER_ERROR err = Result.NO_ERROR self.__link = Link.create_link(spec, self.__aether_config) if self.__link is not None: status = self.__link.get_status() count = 0 # Wait for two seconds trying to confirm we are connected... while status != _BS_C.RUNNING and count <= 1000: if status in (_BS_C.INVALID_LINK_STREAM, _BS_C.IO_ERROR, _BS_C.UNKNOWN_ERROR): err = Result.CONNECTION_ERROR break sleep(0.01) count += 1 status = self.__link.get_status() if err == Result.NO_ERROR: if status != _BS_C.RUNNING: err = Result.NOT_READY else: err = self.autoNetwork() if err == Result.NO_ERROR: self.__spec = spec else: self.__link = None self.__spec = None else: err = Result.RESOURCE_ERROR return err
def autoNetwork(self): if self.bAutoNetworking: magic_address = self.__link.getModuleAddress() if magic_address.error == Result.NO_ERROR \ and magic_address.value != 0 \ and self.__address != magic_address.value: self.__address = magic_address.value return magic_address.error return Result.NO_ERROR
[docs] def connectThroughLinkModule(self, module): """ Result.error: Connect to network module. Connects to a Brainstem module on a BrainStem network, through the module given as an argument. The module passed in must have an active valid connection. args: module (Module): The brainstem module to connec through. returns: Result.error: Returns an error result from the list of defined error codes in brainstem.result """ if module.isConnected(): self.__link = module.link self.__spec = module.spec self.__bAutoNetworking = False return Result.NO_ERROR else: return Result.CONNECTION_ERROR
[docs] def connect(self, transport, serial_number): """ Result.error: Connect to a Module with a transport type and serial number. args: transport (Spec.transport): The transport to connect over. serial_number (int): Serial number of the module. returns: Result.error: Returns an error result from the list of defined error codes in brainstem.result """ if serial_number is None or serial_number == 0: return Result.PARAMETER_ERROR return self.discoverAndConnect(transport, serial_number)
[docs] def isConnected(self): """ Returns true if the Module has an active connection or false otherwise""" # if we don't have a linkref we can't be connected. if self.__link is None: return False stat = self.__link.get_status() # We should ask our linkref, what the status is and return that. if stat == _BS_C.RUNNING: # or stat == _BS_C.INITIALIZING: return True else: return False
[docs] def getStatus(self): """ Returns the status of the BrainStem connection See brainstem.link.Status for the possiable states. """ if self.__link is None: return Status.INVALID_LINK_STREAM else: return self.__link.get_status()
[docs] def disconnect(self): """ Disconnect from the Brainstem module.""" self.__link = None # We need to make sure the link is collected immediately... or it can # lead to driver wedging. gc.collect()
[docs] def reconnect(self): """ Reconnect a lost connection to a Brainstem module.""" if self.isConnected(): return Result.NO_ERROR self.__link = None # We need to make sure the link is collected immediately... or it can # lead to driver wedging. gc.collect() if self.__spec is not None: return self.connectFromSpec(self.__spec) else: return Result.CONFIGURATION_ERROR
[docs] def setModuleAddress(self, address): """ Set the address of the module object. This method changes the local address of the module, not of the device. It is possible to set the module address of the device via system.setModuleSoftwareOffset(). args: address (int): The module address to switch to for this module instance. returns: Result.error: Returns an error result from the list of defined error codes in brainstem.result """ if (address % 2) or (address > 254): return Result.PARAMETER_ERROR self.__address = address return Result.NO_ERROR
[docs] def setNetworkingMode(self, mode): """ Set the networking mode of the module object. By default the module object is configured to automatically adjust its address based on the devices current module address. So that, if the device has a software or hardware offset it will still be able to communicate with the device. If advanced networking is required the auto networking mode can be turned off. args: mode (bool): True or 1 = Auto networking False or 0 = Manual networking returns: Result.error: Returns an error result from the list of defined error codes in brainstem.result """ self.__bAutoNetworking = mode return Result.NO_ERROR
[docs] def discoverAndConnect(self, transport, serial_number=None): """ Discover and connect from the Module level. A disover-based connect. This member function will connect to the first available BrainStem found on the given transport. If the serial number is passed, it will only connect to the module with that serial number. Passing 0 or None as the serial number will create a link to the first link module found on the specified transport. args: transport (int): The module address to switch to for this module instance. serial_number (int): The module serial_number to look for. returns: Result.error: Returns an error result from the list of defined error codes in brainstem.result """ spec = None if serial_number is None or serial_number == 0: specs = discover.findAllModules(transport, self.__aether_config) for s in specs: if self.model == 0 or self.model == s.model: spec = s break else: spec = discover.findModule(transport, serial_number, self.__aether_config) if spec is not None: result = self.connectFromSpec(spec) if (((result == Result.CONNECTION_ERROR) or (result == Result.NOT_FOUND)) and ((transport == Spec.USB) or (transport == Spec.AETHER)) and self.__aether_config.fallback): # MOSTLY copy and paste from this function. It is inception. ########################################### spec = None fallback_transport = Spec.USB if transport == Spec.AETHER else Spec.AETHER if serial_number is None or serial_number == 0: specs = discover.findAllModules(fallback_transport, self.__aether_config) for s in specs: if self.model == 0 or self.model == s.model: spec = s break else: spec = discover.findModule(fallback_transport, serial_number, self.__aether_config) if spec is not None: result = self.connectFromSpec(spec) else: return Result.NOT_FOUND ########################################### else: # MOSTLY copy and paste from this function. It is inception. ########################################### spec = None if serial_number is None or serial_number == 0: fallback_transport = Spec.USB if transport == Spec.AETHER else Spec.AETHER specs = discover.findAllModules(fallback_transport, self.__aether_config) for s in specs: if self.model == 0 or self.model == s.model: spec = s break else: spec = discover.findModule(transport, serial_number, self.__aether_config) if spec is not None: result = self.connectFromSpec(spec) else: return Result.NOT_FOUND ########################################### #return Result.NOT_FOUND return result
[docs]class Entity(object): """ Base class for BrainStem Entity. Provides the default implementation for a functional entity within the BrainStem. This can include IO like GPIOs, Analogs etc. For a more detailed description of Entities see the `Terminology`_ section of the brainstem reference for more information. .. _Terminology: https://acroname.com/reference/brainstem/terms.html """ def __init__(self, module, command, index): """ Initialize an Entity object. Args: module (Module): The Module this entity belongs to. command (int): The BrainStem command for the entity. index (int): The entity index for this entity instance. """ self.__module = module self.__command = command self.__index = index @property def module(self): """Module: Return this entities module.""" return self.__module @property def command(self): """int: Return the entitiy command.""" return self.__command @property def index(self): """int: Return the entity index""" return self.__index
[docs] def call_UEI(self, option): """ Result.error: Call a set UEI on this entity. args: option (int): The command option. returns: Result.error: Returns an error result from the list of defined error codes in brainstem.result """ uei = UEI() uei.type = UEI.VOID try: return self._set_UEI(option, uei) except struct.error: return Result.RANGE_ERROR
[docs] def set_UEI8(self, option, value): """ Result.error: Call a set UEI with byte param on this entity. args: option (int): The command option. value (byte): The byte parameter to send. returns: Result.error: Returns an error result from the list of defined error codes in brainstem.result """ uei = UEI() uei.type = UEI.BYTE uei.value = value try: return self._set_UEI(option, uei) except struct.error: return Result.RANGE_ERROR
[docs] def set_UEI8_with_subindex(self, option, subindex, value): """ Result.error: Call a set UEI with a subindex. args: option (int): The command option. subindex (byte): The subindex of the entity. param (byte): The byte parameter to send. returns: Result.error: Returns an error result from the list of defined error codes in brainstem.result """ uei = UEI() uei.type = UEI.BYTE uei.value = value uei.subindex = subindex try: return self._set_UEI(option, uei) except struct.error: return Result.RANGE_ERROR
[docs] def set_UEI16(self, option, value): """ Result.error: Call a set UEI with short param on this entity. args: option (int): The command option. value (short): The short parameter to send. returns: Result.error: Returns an error result from the list of defined error codes in brainstem.result """ uei = UEI() uei.type = UEI.SHORT uei.value = value try: return self._set_UEI(option, uei) except struct.error: return Result.RANGE_ERROR
[docs] def set_UEI32(self, option, value): """ Result.error: Call a set UEI with int param on this entity. args: option (int): The command option. value (int): The int parameter to send. returns: Result.error: Returns an error result from the list of defined error codes in brainstem.result """ uei = UEI() uei.type = UEI.INT uei.value = value try: return self._set_UEI(option, uei) except struct.error: return Result.RANGE_ERROR
[docs] def set_UEI32_with_subindex(self, option, subindex, value): """ Result.error: Call a set UEI with a subindex. args: option (int): The command option. subindex (byte): The subindex of the entity. param (byte): The byte parameter to send. returns: Result.error: Returns an error result from the list of defined error codes in brainstem.result """ uei = UEI() uei.type = UEI.INT uei.value = value uei.subindex = subindex try: return self._set_UEI(option, uei) except struct.error: return Result.RANGE_ERROR
[docs] def set_UEIBytes(self, option, buffer): """ Result.error: Call a set UEI with buffer and length of buffer on this entity. args: option (int): The command option. buffer (byte array): The buffer to be sent returns: Result.error: Returns an error result from the list of defined error codes in brainstem.result """ uei = UEI() uei.type = UEI.BYTES uei.value = buffer try: return self._set_UEI(option, uei) except struct.error: return Result.RANGE_ERROR
[docs] def get_UEI(self, option): """ Result.error: Get a UEI value. args: option (int): The command option. returns: Result: Returns a result object, whose value is set, or with the requested value when the results error is set to NO_ERROR """ uei = UEI() uei.type = UEI.VOID return self._get_UEI(option, uei)
[docs] def get_UEIBytes(self, option): """ Get a UEI Bytes 8 bit value. args: option (int): The command option. returns: Result: Returns a result object, whose value is set, or with the requested value when the results error is set to NO_ERROR """ if self.module.link is None: return Result(_BS_C.aErrConnection, None) else: uei = UEI() uei.type = UEI.VOID self._fill_UEI(option, _BS_C.ueiOPTION_GET, uei) self.module.link.send_UEI(uei) result = self.module.link.receive_UEIBytes(self.module.address, self.command, option | _BS_C.ueiOPTION_VAL, self.index) return result
[docs] def prep_UEIBytes(self, buffer, valuesize): """ Helper function for UEIBytes set which converts base type to single byte tuple args: buffer (tuple): An array of values to be converted to single bytes valuesize (int): The base value size of the elements in the set of bytes, 1 = uint8, 2 = uint16, 4 = uint32 returns: Result: Returns a tuple """ new_buffer = [] for index in range(len(buffer)): for byte in range(valuesize): value = buffer[index] value = value >> (8 * byte) value = value & 0xFF new_buffer.append(value) return tuple(new_buffer)
[docs] def check_UEIBytes(self, result, valuesize): """ Helper function for UEIBytes get checks, specifically checking and fixing value sizes args: result (Result): The Result object from a get_UEIBytes valuesize (int): The base value size of the elements in the set of bytes, 1 = uint8, 2 = uint16, 4 = uint32 returns: Result: Returns a result object, whose value is set, or with the requested value when the results error is set to NO_ERROR """ if result.error != Result.NO_ERROR: return result if len(result.value) % valuesize != 0: return Result(Result.MEMORY_ERROR, 0) result_tuple = [] temp_value = 0 for index in range(len(result.value)): temp_value = temp_value + (result.value[index] << (8 * (index % valuesize))) if index == (valuesize-1): result_tuple.append(temp_value) temp_value = 0 return Result(Result.NO_ERROR, tuple(result_tuple))
[docs] def bytes_to_string(self, result): """ Helper function for UEIBytes to convert byte array value to a string args: result (Result): The Result object from a get_UEIBytes returns: Result: Returns a result object, whose value is set, or with the requested value when the results error is set to NO_ERROR """ if result.error != Result.NO_ERROR: return result temp_value = bytes(result.value).decode('utf-8') return Result(Result.NO_ERROR, temp_value)
[docs] def drain_UEI(self, option): """ drain UEI packets matchin option. args: option (int): The command option. returns: Result: Returns a result object, whose value is the number of packets drained, and the error value set to NO_ERROR """ return self.module.link.drain_UEI_packets(self.module.address, self.command, option | _BS_C.ueiOPTION_VAL, self.index)
def await_UEI_Val(self, option, timeout): return self.module.link.receive_UEI(self.module.address, self.command, option | _BS_C.ueiOPTION_VAL, self.index, timeout)
[docs] def get_UEI_with_param(self, option, param): """ Result.error: Get a UEI value based on a parameter. args: option (int): The command option. param(byte): The command parameter returns: Result: Returns a result object, whose value is set, on with the requested value when the results error is set to NO_ERROR """ uei = UEI() uei.type = UEI.BYTE uei.value = param return self._get_UEI(option, uei)
def send_command(self, length, data, match_tuple): if self.module.link is None: return Result(_BS_C.aErrConnection, None) else: result = self.module.link.send_command_packet(self.module.address, self.command, length, data) if result == Result.NO_ERROR: result = self.module.link.receive_command_packet(self.module.address, self.command, match_tuple) return result else: return Result(result, None) def _fill_UEI(self, option, req_type, uei): """ Internal: fill a UEI object""" uei.module = self.module.address uei.command = self.command uei.option = req_type | option uei.specifier = _BS_C.ueiSPECIFIER_RETURN_HOST | self.index return uei def _get_UEI(self, option, uei): """ Internal: send get UEI""" if self.module.link is None: return Result(_BS_C.aErrConnection, None) else: self._fill_UEI(option, _BS_C.ueiOPTION_GET, uei) self.module.link.send_UEI(uei) result = self.module.link.receive_UEI(self.module.address, self.command, option | _BS_C.ueiOPTION_VAL, self.index) return result def _set_UEI(self, option, uei): """ Internal: send set UEI""" if self.module.link is None: return _BS_C.aErrConnection else: self._fill_UEI(option, _BS_C.ueiOPTION_SET, uei) while True: self.module.link.send_UEI(uei) if uei.transmit_complete(): break; result = self.module.link.receive_UEI(self.module.address, self.command, option | _BS_C.ueiOPTION_ACK, self.index) return result.error