Source code for pyisy.connection

"""Connection to the ISY."""

from __future__ import annotations

import asyncio
import ssl
from urllib.parse import quote, urlencode

import aiohttp

from .constants import (
    METHOD_GET,
    URL_CLOCK,
    URL_CONFIG,
    URL_DEFINITIONS,
    URL_MEMBERS,
    URL_NETWORK,
    URL_NODES,
    URL_PING,
    URL_PROGRAMS,
    URL_RESOURCES,
    URL_STATUS,
    URL_SUBFOLDERS,
    URL_VARIABLES,
    VAR_INTEGER,
    VAR_STATE,
    XML_FALSE,
    XML_TRUE,
)
from .exceptions import ISYConnectionError, ISYInvalidAuthError
from .logging import _LOGGER, enable_logging

MAX_HTTPS_CONNECTIONS_ISY = 2
MAX_HTTP_CONNECTIONS_ISY = 5
MAX_HTTPS_CONNECTIONS_IOX = 20
MAX_HTTP_CONNECTIONS_IOX = 50

MAX_RETRIES = 5
RETRY_BACKOFF = [0.01, 0.10, 0.25, 1, 2]  # Seconds

HTTP_OK = 200  # Valid request received, will run it
HTTP_UNAUTHORIZED = 401  # User authentication failed
HTTP_NOT_FOUND = 404  # Unrecognized request received and ignored
HTTP_SERVICE_UNAVAILABLE = 503  # Valid request received, system too busy to run it

HTTP_TIMEOUT = 30

HTTP_HEADERS = {
    "Connection": "keep-alive",
    "Keep-Alive": "5000",
    "Accept-Encoding": "gzip, deflate",
}

EMPTY_XML_RESPONSE = '<?xml version="1.0" encoding="UTF-8"?>'


[docs] class Connection: """Connection object to manage connection to and interaction with ISY."""
[docs] def __init__( self, address: str, port: int, username: str, password: str, use_https: bool = False, tls_ver: float = 1.1, webroot: str = "", websession: aiohttp.ClientSession | None = None, ) -> None: """Initialize the Connection object.""" if len(_LOGGER.handlers) == 0: enable_logging(add_null_handler=True) self._address = address self._port = port self._username = username self._password = password self._auth = aiohttp.BasicAuth(self._username, self._password) self._webroot = webroot.rstrip("/") self.req_session = websession self._tls_ver = tls_ver self.use_https = use_https self._url = f"http{'s' if self.use_https else ''}://{self._address}:{self._port}{self._webroot}" self.semaphore = asyncio.Semaphore( MAX_HTTPS_CONNECTIONS_ISY if use_https else MAX_HTTP_CONNECTIONS_ISY ) if websession is None: websession = get_new_client_session(use_https, tls_ver) self.req_session = websession self.sslcontext = get_sslcontext(use_https, tls_ver)
[docs] async def test_connection(self) -> str | None: """Test the connection and get the config for the ISY.""" config = await self.get_config(retries=None) if not config: _LOGGER.error("Could not connect to the ISY with the parameters provided.") raise ISYConnectionError return config
[docs] def increase_available_connections(self) -> None: """Increase the number of allowed connections for newer hardware.""" _LOGGER.debug("Increasing available simultaneous connections") self.semaphore = asyncio.Semaphore( MAX_HTTPS_CONNECTIONS_IOX if self.use_https else MAX_HTTP_CONNECTIONS_IOX )
[docs] async def close(self) -> None: """Cleanup connections and prepare for exit.""" await self.req_session.close()
@property def connection_info(self) -> dict[str, str | int | bytes | None]: """Return the connection info required to connect to the ISY.""" connection_info = {} connection_info["auth"] = self._auth.encode() connection_info["addr"] = self._address connection_info["port"] = int(self._port) connection_info["passwd"] = self._password connection_info["webroot"] = self._webroot if self.use_https and self._tls_ver: connection_info["tls"] = self._tls_ver return connection_info @property def url(self) -> str: """Return the full connection url.""" return self._url # COMMON UTILITIES
[docs] def compile_url(self, path: list[str], query: str | None = None) -> str: """Compile the URL to fetch from the ISY.""" url = self.url if path is not None: url += "/rest/" + "/".join([quote(item) for item in path]) if query is not None: url += "?" + urlencode(query) return url
[docs] async def request(self, url: str, retries: int = 0, ok404: bool = False, delay: int = 0) -> str | None: """Execute request to ISY REST interface.""" _LOGGER.debug("ISY Request: %s", url) if delay: await asyncio.sleep(delay) try: async with ( self.semaphore, self.req_session.get( url, auth=self._auth, headers=HTTP_HEADERS, timeout=HTTP_TIMEOUT, ssl=self.sslcontext, ) as res, ): endpoint = url.split("rest", 1)[1] if res.status == HTTP_OK: _LOGGER.debug("ISY Response Received: %s", endpoint) results = await res.text(encoding="utf-8", errors="ignore") if results != EMPTY_XML_RESPONSE: return results _LOGGER.debug("Invalid empty XML returned: %s", endpoint) res.release() if res.status == HTTP_NOT_FOUND: if ok404: _LOGGER.debug("ISY Response Received %s", endpoint) res.release() return "" _LOGGER.error("ISY Reported an Invalid Command Received %s", endpoint) res.release() return None if res.status == HTTP_UNAUTHORIZED: _LOGGER.error("Invalid credentials provided for ISY connection.") res.release() raise ISYInvalidAuthError("Invalid credentials provided for ISY connection.") if res.status == HTTP_SERVICE_UNAVAILABLE: _LOGGER.warning("ISY too busy to process request %s", endpoint) res.release() except asyncio.TimeoutError: _LOGGER.warning("Timeout while trying to connect to the ISY.") except ( aiohttp.ClientOSError, aiohttp.ServerDisconnectedError, ): _LOGGER.debug("ISY not ready or closed connection.") except aiohttp.ClientResponseError as err: _LOGGER.error("Client Response Error from ISY: %s %s.", err.status, err.message) except aiohttp.ClientError as err: _LOGGER.error( "ISY Could not receive response from device because of a network issue: %s", type(err), ) if retries is None: raise ISYConnectionError if retries < MAX_RETRIES: _LOGGER.debug( "Retrying ISY Request in %ss, retry %s.", RETRY_BACKOFF[retries], retries + 1, ) # sleep to allow the ISY to catch up await asyncio.sleep(RETRY_BACKOFF[retries]) # recurse to try again return await self.request(url, retries + 1, ok404=ok404) # fail for good _LOGGER.error( "Bad ISY Request: (%s) Failed after %s retries.", url, retries, ) return None
[docs] async def ping(self) -> bool: """Test connection to the ISY and return True if alive.""" req_url = self.compile_url([URL_PING]) result = await self.request(req_url, ok404=True) return result is not None
[docs] async def get_description(self) -> str | None: """Fetch the services description from the ISY.""" url = "https://" if self.use_https else "http://" url += f"{self._address}:{self._port}{self._webroot}/desc" return await self.request(url)
[docs] async def get_config(self, retries: int = 0) -> str | None: """Fetch the configuration from the ISY.""" req_url = self.compile_url([URL_CONFIG]) return await self.request(req_url, retries=retries)
[docs] async def get_programs(self, address: int | str | None = None) -> str | None: """Fetch the list of programs from the ISY.""" addr = [URL_PROGRAMS] if address is not None: addr.append(str(address)) req_url = self.compile_url(addr, {URL_SUBFOLDERS: XML_TRUE}) return await self.request(req_url)
[docs] async def get_nodes(self) -> str | None: """Fetch the list of nodes/groups/scenes from the ISY.""" req_url = self.compile_url([URL_NODES], {URL_MEMBERS: XML_FALSE}) return await self.request(req_url)
[docs] async def get_status(self) -> str | None: """Fetch the status of nodes/groups/scenes from the ISY.""" req_url = self.compile_url([URL_STATUS]) return await self.request(req_url)
[docs] async def get_variable_defs(self) -> list[str | BaseException] | None: """Fetch the list of variables from the ISY.""" req_list = [ [URL_VARIABLES, URL_DEFINITIONS, VAR_INTEGER], [URL_VARIABLES, URL_DEFINITIONS, VAR_STATE], ] req_urls = [self.compile_url(req) for req in req_list] return await asyncio.gather(*[self.request(req_url) for req_url in req_urls], return_exceptions=True)
[docs] async def get_variables(self) -> str | None: """Fetch the variable details from the ISY to update local copy.""" req_list = [ [URL_VARIABLES, METHOD_GET, VAR_INTEGER], [URL_VARIABLES, METHOD_GET, VAR_STATE], ] req_urls = [self.compile_url(req) for req in req_list] results = await asyncio.gather( *[self.request(req_url) for req_url in req_urls], return_exceptions=True ) results = [r for r in results if r is not None] # Strip any bad requests. result = "".join(results) return result.replace('</vars><?xml version="1.0" encoding="UTF-8"?><vars>', "")
[docs] async def get_network(self) -> str | None: """Fetch the list of network resources from the ISY.""" req_url = self.compile_url([URL_NETWORK, URL_RESOURCES]) return await self.request(req_url)
[docs] async def get_time(self) -> str | None: """Fetch the system time info from the ISY.""" req_url = self.compile_url([URL_CLOCK]) return await self.request(req_url)
def get_new_client_session(use_https: bool, tls_ver: float = 1.1) -> aiohttp.ClientSession: """Create a new Client Session for Connecting.""" if use_https: if not can_https(tls_ver): raise (ValueError("PyISY could not connect to the ISY. Check log for SSL/TLS error.")) return aiohttp.ClientSession(cookie_jar=aiohttp.CookieJar(unsafe=True)) return aiohttp.ClientSession() def get_sslcontext(use_https: bool, tls_ver: float = 1.1) -> ssl.SSLContext | None: """Create an SSLContext object to use for the connections.""" if not use_https: return None if tls_ver == 1.1: context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_1) elif tls_ver == 1.2: context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) # Allow older ciphers for older ISYs context.set_ciphers("DEFAULT:!aNULL:!eNULL:!MD5:!3DES:!DES:!RC4:!IDEA:!SEED:!aDSS:!SRP:!PSK") return context def can_https(tls_ver: float) -> bool: """ Verify minimum requirements to use an HTTPS connection. Returns boolean indicating whether HTTPS is available. """ output = True # check that Python was compiled against correct OpenSSL lib if "PROTOCOL_TLSv1_1" not in dir(ssl): _LOGGER.error("PyISY cannot use HTTPS: Compiled against old OpenSSL library. See docs.") output = False # check the requested TLS version if tls_ver not in [1.1, 1.2]: _LOGGER.error("PyISY cannot use HTTPS: Only TLS 1.1 and 1.2 are supported by the ISY controller.") output = False return output