"""Connection to the ISY."""
from __future__ import annotations
import asyncio
import ssl
from urllib.parse import quote, urlencode
import aiohttp
from .constants import (
METHOD_GET,
URL_CLOCK,
URL_CONFIG,
URL_DEFINITIONS,
URL_MEMBERS,
URL_NETWORK,
URL_NODES,
URL_PING,
URL_PROGRAMS,
URL_RESOURCES,
URL_STATUS,
URL_SUBFOLDERS,
URL_VARIABLES,
VAR_INTEGER,
VAR_STATE,
XML_FALSE,
XML_TRUE,
)
from .exceptions import ISYConnectionError, ISYInvalidAuthError
from .logging import _LOGGER, enable_logging
MAX_HTTPS_CONNECTIONS_ISY = 2
MAX_HTTP_CONNECTIONS_ISY = 5
MAX_HTTPS_CONNECTIONS_IOX = 20
MAX_HTTP_CONNECTIONS_IOX = 50
MAX_RETRIES = 5
RETRY_BACKOFF = [0.01, 0.10, 0.25, 1, 2] # Seconds
HTTP_OK = 200 # Valid request received, will run it
HTTP_UNAUTHORIZED = 401 # User authentication failed
HTTP_NOT_FOUND = 404 # Unrecognized request received and ignored
HTTP_SERVICE_UNAVAILABLE = 503 # Valid request received, system too busy to run it
HTTP_TIMEOUT = 30
HTTP_HEADERS = {
"Connection": "keep-alive",
"Keep-Alive": "5000",
"Accept-Encoding": "gzip, deflate",
}
EMPTY_XML_RESPONSE = '<?xml version="1.0" encoding="UTF-8"?>'
[docs]
class Connection:
"""Connection object to manage connection to and interaction with ISY."""
[docs]
def __init__(
self,
address: str,
port: int,
username: str,
password: str,
use_https: bool = False,
tls_ver: float = 1.1,
webroot: str = "",
websession: aiohttp.ClientSession | None = None,
) -> None:
"""Initialize the Connection object."""
if len(_LOGGER.handlers) == 0:
enable_logging(add_null_handler=True)
self._address = address
self._port = port
self._username = username
self._password = password
self._auth = aiohttp.BasicAuth(self._username, self._password)
self._webroot = webroot.rstrip("/")
self.req_session = websession
self._tls_ver = tls_ver
self.use_https = use_https
self._url = f"http{'s' if self.use_https else ''}://{self._address}:{self._port}{self._webroot}"
self.semaphore = asyncio.Semaphore(
MAX_HTTPS_CONNECTIONS_ISY if use_https else MAX_HTTP_CONNECTIONS_ISY
)
if websession is None:
websession = get_new_client_session(use_https, tls_ver)
self.req_session = websession
self.sslcontext = get_sslcontext(use_https, tls_ver)
[docs]
async def test_connection(self) -> str | None:
"""Test the connection and get the config for the ISY."""
config = await self.get_config(retries=None)
if not config:
_LOGGER.error("Could not connect to the ISY with the parameters provided.")
raise ISYConnectionError
return config
[docs]
def increase_available_connections(self) -> None:
"""Increase the number of allowed connections for newer hardware."""
_LOGGER.debug("Increasing available simultaneous connections")
self.semaphore = asyncio.Semaphore(
MAX_HTTPS_CONNECTIONS_IOX if self.use_https else MAX_HTTP_CONNECTIONS_IOX
)
[docs]
async def close(self) -> None:
"""Cleanup connections and prepare for exit."""
await self.req_session.close()
@property
def connection_info(self) -> dict[str, str | int | bytes | None]:
"""Return the connection info required to connect to the ISY."""
connection_info = {}
connection_info["auth"] = self._auth.encode()
connection_info["addr"] = self._address
connection_info["port"] = int(self._port)
connection_info["passwd"] = self._password
connection_info["webroot"] = self._webroot
if self.use_https and self._tls_ver:
connection_info["tls"] = self._tls_ver
return connection_info
@property
def url(self) -> str:
"""Return the full connection url."""
return self._url
# COMMON UTILITIES
[docs]
def compile_url(self, path: list[str], query: str | None = None) -> str:
"""Compile the URL to fetch from the ISY."""
url = self.url
if path is not None:
url += "/rest/" + "/".join([quote(item) for item in path])
if query is not None:
url += "?" + urlencode(query)
return url
[docs]
async def request(self, url: str, retries: int = 0, ok404: bool = False, delay: int = 0) -> str | None:
"""Execute request to ISY REST interface."""
_LOGGER.debug("ISY Request: %s", url)
if delay:
await asyncio.sleep(delay)
try:
async with (
self.semaphore,
self.req_session.get(
url,
auth=self._auth,
headers=HTTP_HEADERS,
timeout=HTTP_TIMEOUT,
ssl=self.sslcontext,
) as res,
):
endpoint = url.split("rest", 1)[1]
if res.status == HTTP_OK:
_LOGGER.debug("ISY Response Received: %s", endpoint)
results = await res.text(encoding="utf-8", errors="ignore")
if results != EMPTY_XML_RESPONSE:
return results
_LOGGER.debug("Invalid empty XML returned: %s", endpoint)
res.release()
if res.status == HTTP_NOT_FOUND:
if ok404:
_LOGGER.debug("ISY Response Received %s", endpoint)
res.release()
return ""
_LOGGER.error("ISY Reported an Invalid Command Received %s", endpoint)
res.release()
return None
if res.status == HTTP_UNAUTHORIZED:
_LOGGER.error("Invalid credentials provided for ISY connection.")
res.release()
raise ISYInvalidAuthError("Invalid credentials provided for ISY connection.")
if res.status == HTTP_SERVICE_UNAVAILABLE:
_LOGGER.warning("ISY too busy to process request %s", endpoint)
res.release()
except asyncio.TimeoutError:
_LOGGER.warning("Timeout while trying to connect to the ISY.")
except (
aiohttp.ClientOSError,
aiohttp.ServerDisconnectedError,
):
_LOGGER.debug("ISY not ready or closed connection.")
except aiohttp.ClientResponseError as err:
_LOGGER.error("Client Response Error from ISY: %s %s.", err.status, err.message)
except aiohttp.ClientError as err:
_LOGGER.error(
"ISY Could not receive response from device because of a network issue: %s",
type(err),
)
if retries is None:
raise ISYConnectionError
if retries < MAX_RETRIES:
_LOGGER.debug(
"Retrying ISY Request in %ss, retry %s.",
RETRY_BACKOFF[retries],
retries + 1,
)
# sleep to allow the ISY to catch up
await asyncio.sleep(RETRY_BACKOFF[retries])
# recurse to try again
return await self.request(url, retries + 1, ok404=ok404)
# fail for good
_LOGGER.error(
"Bad ISY Request: (%s) Failed after %s retries.",
url,
retries,
)
return None
[docs]
async def ping(self) -> bool:
"""Test connection to the ISY and return True if alive."""
req_url = self.compile_url([URL_PING])
result = await self.request(req_url, ok404=True)
return result is not None
[docs]
async def get_description(self) -> str | None:
"""Fetch the services description from the ISY."""
url = "https://" if self.use_https else "http://"
url += f"{self._address}:{self._port}{self._webroot}/desc"
return await self.request(url)
[docs]
async def get_config(self, retries: int = 0) -> str | None:
"""Fetch the configuration from the ISY."""
req_url = self.compile_url([URL_CONFIG])
return await self.request(req_url, retries=retries)
[docs]
async def get_programs(self, address: int | str | None = None) -> str | None:
"""Fetch the list of programs from the ISY."""
addr = [URL_PROGRAMS]
if address is not None:
addr.append(str(address))
req_url = self.compile_url(addr, {URL_SUBFOLDERS: XML_TRUE})
return await self.request(req_url)
[docs]
async def get_nodes(self) -> str | None:
"""Fetch the list of nodes/groups/scenes from the ISY."""
req_url = self.compile_url([URL_NODES], {URL_MEMBERS: XML_FALSE})
return await self.request(req_url)
[docs]
async def get_status(self) -> str | None:
"""Fetch the status of nodes/groups/scenes from the ISY."""
req_url = self.compile_url([URL_STATUS])
return await self.request(req_url)
[docs]
async def get_variable_defs(self) -> list[str | BaseException] | None:
"""Fetch the list of variables from the ISY."""
req_list = [
[URL_VARIABLES, URL_DEFINITIONS, VAR_INTEGER],
[URL_VARIABLES, URL_DEFINITIONS, VAR_STATE],
]
req_urls = [self.compile_url(req) for req in req_list]
return await asyncio.gather(*[self.request(req_url) for req_url in req_urls], return_exceptions=True)
[docs]
async def get_variables(self) -> str | None:
"""Fetch the variable details from the ISY to update local copy."""
req_list = [
[URL_VARIABLES, METHOD_GET, VAR_INTEGER],
[URL_VARIABLES, METHOD_GET, VAR_STATE],
]
req_urls = [self.compile_url(req) for req in req_list]
results = await asyncio.gather(
*[self.request(req_url) for req_url in req_urls], return_exceptions=True
)
results = [r for r in results if r is not None] # Strip any bad requests.
result = "".join(results)
return result.replace('</vars><?xml version="1.0" encoding="UTF-8"?><vars>', "")
[docs]
async def get_network(self) -> str | None:
"""Fetch the list of network resources from the ISY."""
req_url = self.compile_url([URL_NETWORK, URL_RESOURCES])
return await self.request(req_url)
[docs]
async def get_time(self) -> str | None:
"""Fetch the system time info from the ISY."""
req_url = self.compile_url([URL_CLOCK])
return await self.request(req_url)
def get_new_client_session(use_https: bool, tls_ver: float = 1.1) -> aiohttp.ClientSession:
"""Create a new Client Session for Connecting."""
if use_https:
if not can_https(tls_ver):
raise (ValueError("PyISY could not connect to the ISY. Check log for SSL/TLS error."))
return aiohttp.ClientSession(cookie_jar=aiohttp.CookieJar(unsafe=True))
return aiohttp.ClientSession()
def get_sslcontext(use_https: bool, tls_ver: float = 1.1) -> ssl.SSLContext | None:
"""Create an SSLContext object to use for the connections."""
if not use_https:
return None
if tls_ver == 1.1:
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_1)
elif tls_ver == 1.2:
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
# Allow older ciphers for older ISYs
context.set_ciphers("DEFAULT:!aNULL:!eNULL:!MD5:!3DES:!DES:!RC4:!IDEA:!SEED:!aDSS:!SRP:!PSK")
return context
def can_https(tls_ver: float) -> bool:
"""
Verify minimum requirements to use an HTTPS connection.
Returns boolean indicating whether HTTPS is available.
"""
output = True
# check that Python was compiled against correct OpenSSL lib
if "PROTOCOL_TLSv1_1" not in dir(ssl):
_LOGGER.error("PyISY cannot use HTTPS: Compiled against old OpenSSL library. See docs.")
output = False
# check the requested TLS version
if tls_ver not in [1.1, 1.2]:
_LOGGER.error("PyISY cannot use HTTPS: Only TLS 1.1 and 1.2 are supported by the ISY controller.")
output = False
return output