from ._bs_c_cffi import ffi
from . import _BS_C #imported from __init__
from .ffi_utils import data_to_bytearray
from .result import Result
from .link import StreamStatusEntry
[docs]class Entity(object):
"""
Base class for BrainStem Entity.
Provides the default implementation for a functional entity within
the BrainStem. This can include IO like GPIOs, Analogs etc. For a
more detailed description of Entities see the `Terminology`_ section
of the brainstem reference for more information.
.. _Terminology:
https://acroname.com/reference/brainstem/terms.html
"""
def __init__(self, module, cmd, index):
"""
Initialize an Entity object.
:param module: The Module this entity belongs to.
:type module: Module
:param command The BrainStem command for the entity.
:type command: int
:param index The entity index for this entity instance.
:type index: int
"""
self._cmd = cmd
self._index = index
self._module = module
@property
def command(self):
"""int: Return the entity command."""
return self._cmd
@property
def index(self):
"""int: Return the entity index"""
return self._index
@property
def module(self):
""" Module: returns the associated module object. """
return self._module
[docs] def call_UEI(self, option):
"""
Call a set UEI on this entity.
:param option: The command option.
:type option: byte
:return: An error result from the list of defined error codes in brainstem.result
"""
result = ffi.new("struct Result*")
_BS_C.entity_callUEI(self._module._id_pointer, result, self._cmd, self._index, option)
return result.error
[docs] def set_UEI8(self, option, value):
"""
Call a set UEI with byte value on this entity.
:param option: The command option.
:type option: byte
:param value: The byte parameter to send.
:type value: byte
:return: An error result from the list of defined error codes in brainstem.result
"""
result = ffi.new("struct Result*")
_BS_C.entity_setUEI8(self._module._id_pointer, result, self._cmd, self._index, option, value)
return result.error
[docs] def set_UEI8_with_subindex(self, option, subIndex, value):
"""
Call a set UEI byte value with a subIndex.
:param option: The command option.
:type option: byte
:param subIndex: The subIndex of the entity.
:type subIndex: byte
:param value: The byte parameter to send.
:type value: byte
:return: An error result from the list of defined error codes in brainstem.result
"""
result = ffi.new("struct Result*")
_BS_C.entity_setUEI8SubIndex(self._module._id_pointer, result, self._cmd, self._index, option, subIndex, value)
return result.error
[docs] def get_UEI8(self, option):
"""
Get a UEI byte value.
:param option: The command option.
:type option: byte
:return: Result object containing the requested value when the results error is set to NO_ERROR(0)
:rtype: Result
"""
result = ffi.new("struct Result*")
_BS_C.entity_getUEI8(self._module._id_pointer, result, self._cmd, self._index, option)
return Result(result.error, result.value)
[docs] def get_UEI8_with_subindex(self, option, subIndex):
"""
Call a get UEI byte value with a subIndex.
:param option: The command option.
:type option: byte
:param subIndex: The subIndex of the entity.
:type subIndex: byte
:return: Result object containing the requested value when the results error is set to NO_ERROR(0)
:rtype: Result
"""
result = ffi.new("struct Result*")
_BS_C.entity_getUEI8SubIndex(self._module._id_pointer, result, self._cmd, self._index, option, subIndex)
return Result(result.error, result.value)
[docs] def set_UEI16(self, option, value):
"""
Call a set UEI with short value on this entity.
:param option: The command option.
:type option: byte
:param value: The short parameter to send.
:type value: short
:return: An error result from the list of defined error codes in brainstem.result
"""
result = ffi.new("struct Result*")
_BS_C.entity_setUEI16(self._module._id_pointer, result, self._cmd, self._index, option, value)
return result.error
[docs] def set_UEI16_with_subindex(self, option, subIndex, value):
"""
Call a set UEI short value with a subIndex.
:param option: The command option.
:type option: byte
:param subIndex: The subIndex of the entity.
:type subIndex: byte
:param value: The short parameter to send.
:type value: short
:return: An error result from the list of defined error codes in brainstem.result
"""
result = ffi.new("struct Result*")
_BS_C.entity_setUEI16SubIndex(self._module._id_pointer, result, self._cmd, self._index, option, subIndex, value)
return result.error
[docs] def get_UEI16(self, option):
"""
Get a UEI short value.
:param option: The command option.
:type option: byte
:return: Result object containing the requested value when the results error is set to NO_ERROR(0)
:rtype: Result
"""
result = ffi.new("struct Result*")
_BS_C.entity_getUEI16(self._module._id_pointer, result, self._cmd, self._index, option)
return Result(result.error, result.value)
[docs] def get_UEI16_with_subindex(self, option, subIndex):
"""
Call a get UEI short value with a subIndex.
:param option: The command option.
:type option: byte
:param subIndex: The subIndex of the entity.
:type subIndex: byte
:return: Result object containing the requested value when the results error is set to NO_ERROR(0)
:rtype: Result
"""
result = ffi.new("struct Result*")
_BS_C.entity_getUEI16SubIndex(self._module._id_pointer, result, self._cmd, self._index, option, subIndex)
return Result(result.error, result.value)
[docs] def set_UEI32(self, option, value):
"""
Call a set UEI with int value on this entity.
:param option: The command option.
:type option: byte
:param value: The int parameter to send.
:type value: int
"""
result = ffi.new("struct Result*")
_BS_C.entity_setUEI32(self._module._id_pointer, result, self._cmd, self._index, option, value)
return result.error
[docs] def set_UEI32_with_subindex(self, option, subIndex, value):
"""
Call a set UEI int value with a subIndex.
:param option: The command option.
:type option: byte
:param subIndex: The subIndex of the entity.
:type subIndex: byte
:param value: The int parameter to send.
:type value: int
:return: An error result from the list of defined error codes in brainstem.result
"""
result = ffi.new("struct Result*")
_BS_C.entity_setUEI32SubIndex(self._module._id_pointer, result, self._cmd, self._index, option, subIndex, value)
return result.error
[docs] def get_UEI32(self, option):
"""
Get a UEI int value.
:param option: The command option.
:type option: byte
:return: Result object containing the requested value when the results error is set to NO_ERROR(0)
:rtype: Result
"""
result = ffi.new("struct Result*")
_BS_C.entity_getUEI32(self._module._id_pointer, result, self._cmd, self._index, option)
return Result(result.error, result.value)
[docs] def get_UEI32_with_subindex(self, option, subIndex):
"""
Call a get UEI int value with a subIndex.
:param option: The command option.
:type option: byte
:param subIndex: The subIndex of the entity.
:type subIndex: byte
:return: Result object containing the requested value when the results error is set to NO_ERROR(0)
:rtype: Result
"""
result = ffi.new("struct Result*")
_BS_C.entity_getUEI32SubIndex(self._module._id_pointer, result, self._cmd, self._index, option, subIndex)
return Result(result.error, result.value)
[docs] def set_UEIBytes(self, option, buffer):
"""
Call a set UEI with buffer and length of buffer on this entity.
:param option: The command option.
:type option: byte
:param buffer: The buffer to be sent
:type buffer: bytearray()
:return: An error result from the list of defined error codes in brainstem.result
"""
result = ffi.new("struct Result*")
byte_array = data_to_bytearray(buffer)
buffer_length = len(byte_array)
ffi_buffer = ffi.new("unsigned char[]", buffer_length)
for x in range(buffer_length):
ffi_buffer[x] = byte_array[x]
_BS_C.entity_setUEIBytes(self._module._id_pointer, result, self._cmd, self._index, option, ffi_buffer, buffer_length)
return result.error
[docs] def get_UEIBytes(self, option, buffer_length=65536):
"""
Get a UEI Bytes buffer on this entity.
:param option: The command option.
:type option: byte
:param buffer_length: The subIndex of the entity.
:type buffer_length: unsigned int
:return: Result object containing the requested value when the results error is set to NO_ERROR(0)
:rtype: Result
"""
result = ffi.new("struct Result*")
ffi_buffer = ffi.new("unsigned char[]", buffer_length)
_BS_C.entity_getUEIBytes(self._module._id_pointer, result, self._cmd, self._index, option, ffi_buffer, buffer_length)
return_list = [ffi_buffer[i] for i in range(result.value)]
return Result(result.error, return_list)
[docs] def drain_UEI(self, option):
"""
Drain UEI packets matching option.
:param option: The command option.
:type option: byte
:return: An error result from the list of defined error codes in brainstem.result
"""
result = ffi.new("struct Result*")
_BS_C.entity_drainUEI(self._module._id_pointer, result, self._cmd, self._index, option)
return result.error
[docs] def setStreamEnabled(self, enable):
"""
Enables streaming for all possible option codes within the cmd and index the entity was created for.
:param enable: Enable (True) or disable (False) streaming.
:type enable: bool
:return: An error result from the list of defined error codes in brainstem.result
"""
result = ffi.new("struct Result*")
_BS_C.entity_setStreamEnabled(self._module._id_pointer, result, self._cmd, self._index, enable)
return result.error
[docs] def registerOptionCallback(self, option, enable, cb, pRef):
"""
Registers a callback function based on a specific option code. Option code applies to the cmd and index of the called API.
:param option The option code for the entities command and index.
:type option: byte
:param enable Enable (True) or disable (False) streaming.
:type enable: bool
:param cb Callback to be executed on the provided criteria.
:type cb: @ffi.callback("unsigned char(aPacket*, void*)")
:param pRef Handle to be passed to the provided callback. This handle must be kept alive by the caller.
:type pRef: ffi handle
:return: An error result from the list of defined error codes in brainstem.result
"""
return self._module.link.registerStreamCallback(self._module.address, self._cmd, option, self._index, enable, cb, pRef)
[docs] def getStreamStatus(self, buffer_length=1024):
"""
Gets all available stream values associated with the cmd and index of the called API.
:param buffer_length: Size of the buffer to allocate
:type buffer_length: unsigned int
:return: An error result from the list of defined error codes in brainstem.result
"""
result = ffi.new("struct Result*")
data = ffi.new("struct StreamStatusEntry_CCA[]", buffer_length)
_BS_C.entity_getStreamStatus(self._module._id_pointer, result, self._cmd, self._index, data, buffer_length)
if result.error:
return Result(result.error, tuple(list()))
status_list = []
for x in range(0, result.value):
status_list.append(StreamStatusEntry(data[x].key, data[x].value))
return Result(result.error, tuple(status_list))