Source code for pyisy.nodes

"""Representation of ISY Nodes."""

from __future__ import annotations

import re
from asyncio import sleep
from dataclasses import dataclass
from operator import itemgetter
from typing import TYPE_CHECKING
from xml.dom import minidom

from ..constants import (
    ATTR_ACTION,
    ATTR_CONTROL,
    ATTR_FLAG,
    ATTR_ID,
    ATTR_INSTANCE,
    ATTR_NODE_DEF_ID,
    ATTR_PRECISION,
    ATTR_UNIT_OF_MEASURE,
    DEFAULT_PRECISION,
    DEFAULT_UNIT_OF_MEASURE,
    DEV_MEMORY,
    DEV_WRITING,
    EVENT_PROPS_IGNORED,
    FAMILY_BRULTECH,
    FAMILY_NODESERVER,
    FAMILY_RCS,
    FAMILY_ZMATTER_ZWAVE,
    FAMILY_ZWAVE,
    INSTEON_RAMP_RATES,
    ISY_VALUE_UNKNOWN,
    NC_NODE_ENABLED,
    NC_NODE_ERROR,
    NODE_CHANGED_ACTIONS,
    NODE_IS_CONTROLLER,
    NODE_IS_ROOT,
    PROP_BATTERY_LEVEL,
    PROP_COMMS_ERROR,
    PROP_RAMP_RATE,
    PROP_STATUS,
    PROTO_INSTEON,
    PROTO_NODE_SERVER,
    PROTO_ZIGBEE,
    PROTO_ZWAVE,
    TAG_ADDRESS,
    TAG_DEVICE_TYPE,
    TAG_ENABLED,
    TAG_EVENT_INFO,
    TAG_FAMILY,
    TAG_FOLDER,
    TAG_FORMATTED,
    TAG_GROUP,
    TAG_LINK,
    TAG_NAME,
    TAG_NODE,
    TAG_PARENT,
    TAG_PRIMARY_NODE,
    TAG_TYPE,
    UOM_SECONDS,
    XML_TRUE,
)
from ..exceptions import XML_ERRORS, XML_PARSE_ERROR, ISYResponseParseError
from ..helpers import (
    EventEmitter,
    NodeProperty,
    ZWaveProperties,
    attr_from_element,
    attr_from_xml,
    parse_xml_properties,
    value_from_xml,
)
from ..logging import _LOGGER
from ..node_servers import NodeServers
from .group import Group
from .node import Node

if TYPE_CHECKING:
    from ..isy import ISY

MEMORY_REGEX = (
    r".*dbAddr=(?P<dbAddr>[A-F0-9x]*) \[(?P<value>[A-F0-9]{2})\] "
    r"cmd1=(?P<cmd1>[A-F0-9x]{4}) cmd2=(?P<cmd2>[A-F0-9x]{4})"
)
SINGLE_NODE_TYPES = {TAG_GROUP, TAG_NODE}


[docs] class Nodes: """ This class handles the ISY nodes. This class can be used as a dictionary to navigate through the controller's structure to objects of type :class:`pyisy.nodes.Node` and :class:`pyisy.nodes.Group` that represent objects on the controller. | isy: ISY class | root: [optional] String representing the current navigation level's ID | addresses: [optional] list of node ids | nnames: [optional] list of node names | nparents: [optional] list of node parents | nobjs: [optional] list of node objects | ntypes: [optional] list of node types | xml: [optional] String of xml data containing the configuration data :ivar all_lower_nodes: Return all nodes beneath current level :ivar children: A list of the object's children. :ivar has_children: Indicates if object has children :ivar name: The name of the current folder in navigation. """
[docs] def __init__( self, isy: ISY, root: str | None = None, addresses: list[str] | None = None, nnames: list[str] | None = None, nparents: list[str] | None = None, nobjs: list[Node] | None = None, ntypes: list[str] | None = None, xml: str | None = None, _address_index: dict[str, int] | None = None, # Internal use only _nnames_index: dict[str, int] | None = None, # Internal use only ) -> None: """Initialize the Nodes ISY Node Manager class.""" self.isy = isy self.root = root self.addresses: list[str] = [] self._address_index: dict[str, int] = {} self.nnames: list[str] = [] self._nnames_index: dict[str, int] = {} self.nparents: list[str] = [] self.nobjs: list[Node] = [] self.ntypes: list[str] = [] self.status_events = EventEmitter() if xml is not None: self.parse(xml) return if addresses is not None: self.addresses = addresses self._address_index = _address_index or {address: i for i, address in enumerate(addresses)} if nnames is not None: self.nnames = nnames self._nnames_index = _nnames_index or {name: i for i, name in enumerate(nnames)} if nparents is not None: self.nparents = nparents if nobjs is not None: self.nobjs = nobjs if ntypes is not None: self.ntypes = ntypes
[docs] def __str__(self) -> str: """Return string representation of the nodes/folders/groups.""" if self.root is None: return "Folder <root>" ind = self._address_index[self.root] type_ = self.ntypes[ind] if type_ == TAG_FOLDER: return f"Folder ({self.root})" if type_ == TAG_GROUP: return f"Group ({self.root})" return f"Node ({self.root})"
[docs] def __repr__(self) -> str: """Create a pretty representation of the nodes/folders/groups.""" # get and sort children folders: list[tuple[str, str, str]] = [] groups: list[tuple[str, str, str]] = [] nodes: list[tuple[str, str, str]] = [] for child in self.children: child_type = child[0] if child_type == TAG_FOLDER: folders.append(child) elif child_type == TAG_GROUP: groups.append(child) elif child_type == TAG_NODE: nodes.append(child) # initialize data folders.sort(key=itemgetter(1)) groups.sort(key=itemgetter(1)) nodes.sort(key=itemgetter(1)) return ( f"{self}\n" f"{self.__repr_folders__(folders)}" f"{self.__repr_groups__(groups)}" f"{self.__repr_nodes__(nodes)}" )
[docs] def __repr_folders__(self, folders: list[tuple[str, str, str]]) -> str: """Return a representation of the folder structure.""" out = "" for fold in folders: fold_obj = self[fold[2]] out += f" + {fold[1]}: Folder({fold[2]})\n" for line in repr(fold_obj).split("\n")[1:]: out += f" | {line}\n" out += " -\n" return out
[docs] def __repr_groups__(self, groups: list[tuple[str, str, str]]) -> str: """Return a representation of the groups structure.""" out = "" for group in groups: out += f" + {group[1]}: Group({group[2]})\n" for member in self[group[2]].members: out += f" | {self[member].name}: Node({member})\n" out += " |\n -\n" return out
[docs] def __repr_nodes__(self, nodes: list[tuple[str, str, str]]) -> str: """Return a representation of the nodes structure.""" out = "" for node in nodes: has_children = node[2] in self.nparents out += f" {'+ ' if has_children else ''}{node[1]}: Node({node[2]})\n" if has_children: for child in self.get_children(node[2]): out += f" | {child[1]}: Node({child[2]})\n" out += " |\n -\n" return out
[docs] def __iter__(self) -> NodeIterator: """Return an iterator for each node below the current nav level.""" iter_data = self.all_lower_nodes return NodeIterator(self, iter_data, delta=1)
[docs] def __reversed__(self) -> NodeIterator: """Return the iterator in reverse order.""" iter_data = self.all_lower_nodes return NodeIterator(self, iter_data, delta=-1)
[docs] def update_received(self, xmldoc: minidom.Element) -> None: """Update nodes from event stream message.""" address = value_from_xml(xmldoc, TAG_NODE) node = self.get_by_id(address) if not node: _LOGGER.debug( "Received a node update for node %s but could not find a record of this " "node. Please try restarting the module if the problem persists, this " "may be due to a new node being added to the ISY since last restart.", address, ) return value = value_from_xml(xmldoc, ATTR_ACTION, "") value = int(value) if value != "" else ISY_VALUE_UNKNOWN prec = attr_from_xml(xmldoc, ATTR_ACTION, ATTR_PRECISION, DEFAULT_PRECISION) uom = attr_from_xml(xmldoc, ATTR_ACTION, ATTR_UNIT_OF_MEASURE, DEFAULT_UNIT_OF_MEASURE) formatted = value_from_xml(xmldoc, TAG_FORMATTED) # Process the action and value if provided in event data. node.update_state(NodeProperty(PROP_STATUS, value, prec, uom, formatted, address)) _LOGGER.debug("ISY Updated Node: %s", address)
[docs] def control_message_received(self, xmldoc: minidom.Element) -> None: """ Pass Control events from an event stream message to nodes. Used for sending out to subscribers. """ address = value_from_xml(xmldoc, TAG_NODE) cntrl = value_from_xml(xmldoc, ATTR_CONTROL) if not (address and cntrl): # If there is no node associated with the control message ignore it return node = self.get_by_id(address) if not node: _LOGGER.debug( "Received a node update for node %s but could not find a record of this " "node. Please try restarting the module if the problem persists, this " "may be due to a new node being added to the ISY since last restart.", address, ) return # Process the action and value if provided in event data. node.update_last_update() value = value_from_xml(xmldoc, ATTR_ACTION, 0) value = int(value) if value != "" else ISY_VALUE_UNKNOWN prec = attr_from_xml(xmldoc, ATTR_ACTION, ATTR_PRECISION, DEFAULT_PRECISION) uom = attr_from_xml(xmldoc, ATTR_ACTION, ATTR_UNIT_OF_MEASURE, DEFAULT_UNIT_OF_MEASURE) formatted = value_from_xml(xmldoc, TAG_FORMATTED) if cntrl == PROP_RAMP_RATE: value = INSTEON_RAMP_RATES.get(value, value) uom = UOM_SECONDS node_property = NodeProperty(cntrl, value, prec, uom, formatted, address) if cntrl == PROP_COMMS_ERROR and value == 0 and PROP_COMMS_ERROR in node.aux_properties: # Clear a previous comms error del node.aux_properties[PROP_COMMS_ERROR] if cntrl == PROP_BATTERY_LEVEL and node.is_battery_node: # Update the state if this is a battery node node.update_state(NodeProperty(PROP_STATUS, value, prec, uom, formatted, address)) _LOGGER.debug("ISY Updated Node: %s", address) elif cntrl not in EVENT_PROPS_IGNORED: node.update_property(node_property) node.control_events.notify(node_property) _LOGGER.debug("ISY Node Control Event: %s", node_property)
[docs] def node_changed_received(self, xmldoc: minidom.Element) -> None: """Handle Node Change/Update events from an event stream message.""" action = value_from_xml(xmldoc, ATTR_ACTION) if not action or action not in NODE_CHANGED_ACTIONS: return (event_desc, e_i_keys) = NODE_CHANGED_ACTIONS[action] node = value_from_xml(xmldoc, TAG_NODE) detail = {} if e_i_keys and xmldoc.getElementsByTagName(TAG_EVENT_INFO): detail = {key: value_from_xml(xmldoc, key) for key in e_i_keys} if action == NC_NODE_ERROR: _LOGGER.error("ISY Could not communicate with device: %s", node) elif action == NC_NODE_ENABLED and node in self.addresses: node_obj: Node = self.get_by_id(node) # pylint: disable=attribute-defined-outside-init node_obj.enabled = detail[TAG_ENABLED] == XML_TRUE self.status_events.notify(event=NodeChangedEvent(node, action, detail)) _LOGGER.debug( "ISY received a %s event for node %s %s", event_desc, node, detail if detail else "", )
# FUTURE: Handle additional node change actions to force updates.
[docs] def progress_report_received(self, xmldoc: minidom.Element) -> None: """Handle Progress Report '_7' events from an event stream message.""" event_info = value_from_xml(xmldoc, TAG_EVENT_INFO) address, _, message = event_info.partition("]") address = address.strip("[ ") message = message.strip() action = DEV_WRITING detail = {"message": message} if address != "All" and message.startswith("Memory"): action = DEV_MEMORY regex = re.compile(MEMORY_REGEX) if event := regex.search(event_info): detail = { "memory": event.group("dbAddr"), "cmd1": event.group("cmd1"), "cmd2": event.group("cmd2"), "value": int(event.group("value"), 16), } self.status_events.notify(event=NodeChangedEvent(address, action, detail)) _LOGGER.debug( "ISY received a progress report %s event for node %s %s", action, address, detail if detail else "", )
[docs] def parse(self, xml: str) -> None: """ Parse the xml data. | xml: String of the xml data """ try: xmldoc = minidom.parseString(xml) except XML_ERRORS as exc: _LOGGER.error("%s: Nodes", XML_PARSE_ERROR) raise ISYResponseParseError(XML_PARSE_ERROR) from exc # get nodes ntypes = [TAG_FOLDER, TAG_NODE, TAG_GROUP] node_servers = [] for ntype in ntypes: features = xmldoc.getElementsByTagName(ntype) for feature in features: # Get Node Information address = value_from_xml(feature, TAG_ADDRESS) nname = value_from_xml(feature, TAG_NAME) _LOGGER.debug("Parsing %s: %s [%s]", ntype, nname, address) nparent = value_from_xml(feature, TAG_PARENT) pnode = value_from_xml(feature, TAG_PRIMARY_NODE) family = value_from_xml(feature, TAG_FAMILY) device_type = value_from_xml(feature, TAG_TYPE) node_def_id = attr_from_element(feature, ATTR_NODE_DEF_ID) flag = int(attr_from_element(feature, ATTR_FLAG, 0)) enabled = value_from_xml(feature, TAG_ENABLED) == XML_TRUE # Assume Insteon, update as confirmed otherwise protocol = PROTO_INSTEON zwave_props = None node_server = None if family is not None: if family in (FAMILY_ZWAVE, FAMILY_ZMATTER_ZWAVE): protocol = PROTO_ZWAVE zwave_prop_xml = feature.getElementsByTagName(TAG_DEVICE_TYPE) if zwave_prop_xml: zwave_props = ZWaveProperties.from_xml(zwave_prop_xml[0]) else: ZWaveProperties() elif family in (FAMILY_BRULTECH, FAMILY_RCS): protocol = PROTO_ZIGBEE elif family == FAMILY_NODESERVER: # Node Server Slot is stored with family as text: node_server = attr_from_xml(feature, TAG_FAMILY, ATTR_INSTANCE) if node_server: protocol = f"{PROTO_NODE_SERVER}_{node_server}" node_servers.append(node_server) # Process the different node types if ntype == TAG_FOLDER and address not in self.addresses: self.insert(address, nname, nparent, None, ntype) elif ntype == TAG_NODE: if address in self.addresses: self.get_by_id(address).update(xmldoc=feature) continue state, aux_props, state_set = parse_xml_properties(feature) self.insert( address, nname, nparent, Node( self, address=address, name=nname, state=state, aux_properties=aux_props, zwave_props=zwave_props, node_def_id=node_def_id, pnode=pnode, device_type=device_type, enabled=enabled, node_server=node_server, protocol=protocol, family_id=family, state_set=state_set, flag=flag, ), ntype, ) elif ntype == TAG_GROUP and address not in self.addresses: # Ignore groups that contain 0x08 in the flag since # that is a ISY scene that contains every device/ # scene so it will contain some scenes we have not # seen yet so they are not defined and it includes # the ISY MAC address in newer versions of # ISY firmwares > 5.0.6+ .. if flag & NODE_IS_ROOT: _LOGGER.debug("Skipping root group flag=%s %s", flag, address) continue mems = feature.getElementsByTagName(TAG_LINK) # Build list of members members = [mem.firstChild.nodeValue for mem in mems] # Build list of controllers controllers = [ mem.firstChild.nodeValue for mem in mems if int(attr_from_element(mem, TAG_TYPE, 0)) == NODE_IS_CONTROLLER ] self.insert( address, nname, nparent, Group( self, address=address, name=nname, members=members, controllers=controllers, family_id=family, pnode=pnode, flag=flag, ), ntype, ) _LOGGER.debug("ISY Loaded %s", ntype) if self.isy.node_servers is None: self.isy.node_servers = NodeServers(self.isy, set(node_servers))
[docs] async def update(self, wait_time: float = 0.0, xml: str | None = None) -> None: """ Update the status and properties of the nodes in the class. This calls the "/rest/status" endpoint. | wait_time: [optional] Amount of seconds to wait before updating """ if wait_time: await sleep(wait_time) if xml is None: xml = await self.isy.conn.get_status() if xml is None: _LOGGER.warning("ISY Failed to update nodes.") return None try: xmldoc = minidom.parseString(xml) except XML_ERRORS: _LOGGER.error("%s: Nodes", XML_PARSE_ERROR) return False for feature in xmldoc.getElementsByTagName(TAG_NODE): address = feature.attributes[ATTR_ID].value if address in self.addresses: await self.get_by_id(address).update(xmldoc=feature) continue _LOGGER.info("ISY Updated Node Statuses.") return None
[docs] async def update_nodes(self, wait_time: float = 0.0) -> None: """ Update the contents of the class. This calls the "/rest/nodes" endpoint. | wait_time: [optional] Amount of seconds to wait before updating """ if wait_time: await sleep(wait_time) xml = await self.isy.conn.get_nodes() if xml is None: _LOGGER.warning("ISY Failed to update nodes.") return self.parse(xml)
[docs] def insert(self, address: str, nname: str, nparent: str, nobj: Node, ntype: str) -> None: """ Insert a new node into the lists. | address: node id | nname: node name | nparent: node parent | nobj: node object | ntype: node type """ self.addresses.append(address) self._address_index[address] = len(self.addresses) - 1 self.nnames.append(nname) self._nnames_index[nname] = len(self.nnames) - 1 self.nparents.append(nparent) self.ntypes.append(ntype) self.nobjs.append(nobj)
[docs] def __getitem__(self, val: str) -> Node | Nodes: """Navigate through the node tree. Can take names or IDs.""" if val in self._address_index: fun = self.get_by_id elif val in self._nnames_index: fun = self.get_by_name else: try: val = int(val) fun = self.get_by_index except ValueError: fun = None if fun: output = None try: output = fun(val) except ValueError: pass if output: return output raise KeyError(f"Unrecognized Key: [{val}]")
[docs] def __setitem__(self, item: object, value: object) -> None: """Set item value.""" return
[docs] def get_by_name(self, val: str) -> Node | Nodes | None: """ Get child object with the given name. | val: String representing name to look for. """ i = self._nnames_index.get(val) if i is not None and (self.root is None or self.nparents[i] == self.root): return self.get_by_index(i) return None
[docs] def get_by_id(self, address: str) -> Node | Nodes | None: """ Get object with the given ID. | address: Integer representing node/group/folder id. """ if (i := self._address_index.get(address)) is None: return None return self.get_by_index(i)
[docs] def get_by_index(self, i: int) -> Node | Nodes: """ Return the object at the given index in the list. | i: Integer representing index of node/group/folder. """ if self.ntypes[i] in SINGLE_NODE_TYPES: return self.nobjs[i] return Nodes( isy=self.isy, root=self.addresses[i], addresses=self.addresses, nnames=self.nnames, nparents=self.nparents, nobjs=self.nobjs, ntypes=self.ntypes, _address_index=self._address_index, _nnames_index=self._nnames_index, )
[docs] def get_folder(self, address: str) -> str | None: """Return the folder of a given node address.""" parent = self.nparents[self._address_index[address]] if parent is None: # Node is in the root folder. return None parent_index = self._address_index[parent] if self.ntypes[parent_index] != TAG_FOLDER: return self.get_folder(parent) return self.nnames[parent_index]
@property def children(self) -> list[tuple[str, str, str]]: """Return the children of the class.""" return self.get_children()
[docs] def get_children(self, ident: str | None = None) -> list[tuple[str, str, str]]: """Return the children of the class.""" if ident is None: ident = self.root return [ (self.ntypes[i], self.nnames[i], self.addresses[i]) for i in [index for index, parent in enumerate(self.nparents) if parent == ident] ]
@property def has_children(self) -> bool: """Return if the root has children.""" return self.root in self.nparents @property def name(self) -> str: """Return the name of the root.""" if self.root is None: return "" return self.nnames[self._address_index[self.root]] @property def all_lower_nodes(self) -> list[tuple[str, str, str]]: """Return all nodes below the current root.""" output: list[tuple[str, str, str]] = [] myname = self.name + "/" for dtype, name, ident in self.children: if dtype in SINGLE_NODE_TYPES: output.append((dtype, myname + name, ident)) if dtype == TAG_NODE and ident in self.nparents: output += [ (child[0], f"{myname}{name}/{child[1]}", child[2]) for child in self.get_children(ident) ] if dtype == TAG_FOLDER: output += [ (dtype2, myname + name2, ident2) for (dtype2, name2, ident2) in self[ident].all_lower_nodes ] return output
class NodeIterator: """Iterate through a list of nodes, returning node objects.""" def __init__(self, nodes: Nodes, iter_data, delta: int = 1) -> None: """Initialize a NodeIterator class.""" self._nodes = nodes self._iterdata = iter_data self._len = len(iter_data) self._delta = delta if delta > 0: self._ind = 0 else: self._ind = self._len - 1 def __next__(self): """Get the next element in the iteration.""" if self._ind >= self._len or self._ind < 0: raise StopIteration _, path, ident = self._iterdata[self._ind] self._ind += self._delta return (path, self._nodes[ident]) def __len__(self): """Return the number of elements.""" return self._len @dataclass class NodeChangedEvent: """Class representation of a node change event.""" address: str action: str event_info: dict