当前位置: 首页 > news >正文

HomeAssistant自定义组件学习-【一】

#环境准备#

按官方的步骤准备就可以,我是在Windows下使用VS Code开发的,安装了WSL(使用模板创建组件需要在WSL环境下完成)

官方链接:https://developers.home-assistant.io/docs/development_environment

环境准备好后,在项目根目录下使用命令:

python3 -m script.scaffold inetgration

创建组件后会自动生成几个文件:__init__.py,const.py,config_flow.py和mainfest.json、string.json。

研究中盛官方给的工具软件测试,可以确定是可以自动发现设备的,所以组件整体思路就是,选择组件后,就自动发现网络上的设备,然后自动生成对应IO数量的灯和开关实体,不需要通过界面让用户输入其他数据,很简单的一个逻辑吧,但是实现硬是搞了我两个月(期间有一个月被搞得头大,没心思搞,玩了一个月吃鸡!),主要原因是不知道组件的执行流程,从哪个函数方法调用,配置的数据存储在哪里,怎么保存自己需要的数据,后面才发现,根本不需要考虑这些,直接干就完了。下面进入正题:

mainfest.json、string.json这两个Json文件暂时不需要去碰,因为不需要去做界面(估计做窗帘组件的时候需要去弄)。看__init__.py和config_flow.py两个文件:


from __future__ import annotationsfrom homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistantfrom .const import PLATFORMS# TODO Create ConfigEntry type alias with API object
# TODO Rename type alias and update all entry annotations
# type NovoConfigEntry = ConfigEntry[0xF843]  # noqa: F821# TODO Update entry annotation
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:"""Set up Novo from a config entry."""# TODO 1. Create API instance# TODO 2. Validate the API connection (and authentication)# TODO 3. Store an API object for your platforms to access# entry.runtime_data = MyAPI(...)await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)return True# TODO Update entry annotation
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:"""Unload a config entry."""return await hass.config_entries.async_unload_platforms(entry, PLATFORMS)

以上是自动生成的__init__.py的文件内容,查看文档,说的是加载组件的时候,就会调用async_setup_entry方法,如果创建组件的时候选择了自动发现设备,就会调用config_flow.py文件中的:async def _async_has_devices(hass: HomeAssistant) -> bool:,那发现设备的代码放在这两个地方都是可以的。

PS:后续测试发现,代码调用是有顺序的,顺序是:

Config_flow.py->__init__.py->PLATFORMS中顺序调用async_setup_entry

决定把发现代码放在__init__.py中。设备发现代码是通过Wireshark抓网络包分析自动发现设备过程,使用UDP协议,向网络中的5002端口发送约定的字符串:

{"cmd":"search","vendor":"www.coltsmart.com","product":"MJ-ExSx","keyword":"FF010102"}

设备就会回复自己的信息,通过回复的信息中的IP地址和端口(返回的信息是json数据,直接使用json进行分析就可以了),我们就可以建立通讯了。完整代码:


# 扫描发现设备
def discover(ip_address=None) -> dict[int, ZhongshengConfig]:"""扫描发现设备."""sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)sock.settimeout(5)found_devices: dict[int, ZhongshengConfig] = {}if ip_address is None:addrs = enum_all_broadcast()else:addrs = [ip_address]for addr in addrs:try:sock.sendto(BROADCAST_MSG, (addr, BROADCAST_PORT))except Exception:  # noqa: BLE001_LOGGER.warning(f"Can't access network {addr}")  # noqa: G004while True:try:data, addr = sock.recvfrom(512)_LOGGER.debug(f"Received response from {addr}: {data.hex(' ')}")  # noqa: G004json_dic = json.loads(data)if json_dic["id"] != "":# 查询返回dev: ZhongshengConfigd_id: int = int(json_dic["id"])if d_id in found_devices:dev = found_devices[d_id]else:dev = ZhongshengConfig()dev.load_data(data)found_devices[d_id] = devexcept TimeoutError:breakexcept OSError as e:_LOGGER.error(f"Socket error: {e!r}")  # noqa: G004return found_devices

因为设备会返回好几组数据,所以这里需要对数据进行合并,即这段代码:

                # 查询返回dev: ZhongshengConfigd_id: int = int(json_dic["id"])# dev = found_devices.get(key= d_id, default= ZhongshengConfig())if d_id in found_devices:dev = found_devices[d_id]else:dev = ZhongshengConfig()dev.load_data(data)found_devices[d_id] = dev

这里代码只是获取到设备的配置信息(ZhongshengConfig),并不是设备。设备是需要完成与硬件通讯,发送命令控制灯开关动作。所以需要构建设备类,实现与硬件通讯,设备类代码如下:

"""Description:中盛多路IO控制系统设备类,负责与设备进行通讯,完成设备I状态的读取和O端口输出
version: 1.0.0.0
Author: Cubar
Date: 2024-09-14 12:50:44
LastEditors: hht
LastEditTime: 2024-09-14 12:51:39.
"""  # noqa: D205from datetime import datetime
import logging
import socketfrom homeassistant.helpers.device_registry import DeviceInfofrom ..const import DOMAIN
from .device import ZhongshengConfig_LOGGER = logging.getLogger(__name__)LIGHT_COUNT_MAX = 64
DATA_RECIVE_MAX = 1024
QUERY_SWITCH_STATUS = bytearray([0x00, 0x01, 0x00, 0x00, 0x00, 0x06, 0x01, 0x04, 0x00, 0x00, 0x00, 0x10]
)
QUERY_LIGHT_STATUS = bytearray([0x00, 0x01, 0x00, 0x00, 0x00, 0x06, 0x01, 0x03, 0x00, 0x00, 0x00, 0x10]
)
CONTROL_LIGHT_CMD = bytearray([0x00, 0x01, 0x00, 0x00, 0x00, 0x06, 0x01, 0x06, 0x00, 0x09, 0x00, 0x00]
)
CONTROL_LIGHT_CMD_IDX_ID = 9
CONTROL_LIGHT_CMD_IDX_VALUE = 11
DEVICE_MANUFACTRUE = "Zhongsheng Tech"class ZhongshengDevice:"""中盛设备类."""_config: ZhongshengConfig_lightcount: int_switchcount: int_last_switch_query_time: datetime_last_light_query_time: datetime"""中盛设备类"""def __init__(self, config: ZhongshengConfig) -> None:"""初始化."""assert configself._lightcount = LIGHT_COUNT_MAXself._lightstatus = {}self._switchstatus = {}self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)self._ip = config.ipself._port = config.portself._config = configself._mac = config._device_mac  # noqa: SLF001# 确保第一次发送命令不受限制self._last_switch_query_time = datetime.minself._last_light_query_time = datetime.minself._switchcount = 0self._lightcount = 0for i in range(LIGHT_COUNT_MAX):self._lightstatus[i] = Falseself._switchstatus[i] = Falsetry:self._socket.connect((self._ip, self._port))self.query_light_status()self.query_switch_status()except Exception as ex:  # noqa: BLE001_LOGGER.error(f"Exception: {ex}")  # noqa: G004self._socket.close()@propertydef name(self):"""获取设备名称."""return "ZhongshengDevice"@propertydef mac(self) -> str:"""获取设备MAC地址."""return self._mac@propertydef getLightCount(self):"""获取灯光数量."""if self._lightcount == LIGHT_COUNT_MAX:self.query_light_status()return self._lightcount@propertydef getSwitchCount(self):"""获取开关数量."""if self._switchcount == LIGHT_COUNT_MAX:self.query_switch_status()return self._switchcount@propertydef device_info(self) -> DeviceInfo:"""Return the device info."""return DeviceInfo(identifiers={# Serial numbers are unique identifiers within a specific domain(DOMAIN, self._config._device_id)  # noqa: SLF001},name=self.name,manufacturer=DEVICE_MANUFACTRUE,model=DOMAIN,sw_version=self._config._soft_version,  # noqa: SLF001via_device=(DOMAIN, self._config._device_id),  # noqa: SLF001)def setLightCount(self, count):"""设置灯光数量."""if count > 0 and count < LIGHT_COUNT_MAX:self._lightcount = countdef create_light_cmd(self, light_id: int = -1, turnon: bool = False) -> bytearray:"""构建灯光控制命令."""res: bytearray = CONTROL_LIGHT_CMD.copy()res[CONTROL_LIGHT_CMD_IDX_ID] = light_idres[CONTROL_LIGHT_CMD_IDX_VALUE] = 0x01 if turnon else 0x00return resdef send_ctrl_cmd(self, light_id: int = -1, turnon: bool = False) -> bool:"""发送控制指令."""#### 关、开9号灯,即id=8# 10:31:48.422→发 00 01 00 00 00 06 01 06 00 08 00 00 (关灯)# 10:31:48.479←收 00 01 00 00 00 06 01 06 00 08 00 00# 10:32:08.088→发 00 01 00 00 00 06 01 06 00 08 00 01 (开灯)# 10:32:08.128←收 00 01 00 00 00 06 01 06 00 08 00 01###if light_id is None:return False# _LOGGER.error(f'contrl light id={id} turnon:{turnon}')# 构建控制命令cmd = self.create_light_cmd(light_id, turnon)if self._socket.send(cmd) == len(cmd):# 发送成功recv = self._socket.recv(DATA_RECIVE_MAX)# 读取状态信息if len(recv) > 10:if turnon:if recv[CONTROL_LIGHT_CMD_IDX_VALUE] > 0x00:return Trueelif recv[CONTROL_LIGHT_CMD_IDX_VALUE] == 0x00:return Truereturn Falsedef query_light_status(self):"""查询灯状态."""# 比较最近一次发送查询指令时间,如果小于500ms,则不发送timed = datetime.now() - self._last_light_query_timeif timed.seconds < 1 and timed.microseconds < 500000:returnself._last_light_query_time = datetime.now()slen = self._socket.send(QUERY_LIGHT_STATUS)if slen == len(QUERY_LIGHT_STATUS):# 发送成功recv = self._socket.recv(DATA_RECIVE_MAX)# 00 01 00 00 00 23 01 03 20# 00 00 00 00 00 00 00 00 00 00# 00 00 00 00 00 00 00 00 00 00# 00 00 00 00 00 00 00 00 00 00# 00 00# 读取状态信息if len(recv) > 10:cnt = -1for i in range(LIGHT_COUNT_MAX):if (i * 2 + 10) < len(recv):if recv[i * 2 + 10] > 0:self._lightstatus[i] = Trueelse:self._lightstatus[i] = Falseelse:breakcnt = iself._lightcount = cnt + 1else:_LOGGER.error(f'query light status recv :[{len(recv)}] {recv.hex(" ")}')  # noqa: G004else:_LOGGER.error(f"light status query, send recv [{slen}/{len(QUERY_LIGHT_STATUS)}]"  # noqa: G004)def query_switch_status(self):"""发送查询状态指令."""# 比较最近一次发送查询指令时间,如果小于500ms,则不发送timed = datetime.now() - self._last_switch_query_timeif timed.seconds < 1 and timed.microseconds < 500000:returnself._last_switch_query_time = datetime.now()slen = self._socket.send(QUERY_SWITCH_STATUS)if slen == len(QUERY_SWITCH_STATUS):# 发送成功recv = self._socket.recv(DATA_RECIVE_MAX)# [30011]recv [41]:# 00 01 00 00 00 23 01 04 20# 00 00 00 00 00 00 00 00 00 00# 00 00 00 00 00 00 00 00 00 00# 00 00 00 00 00 00 00 00 00 00# 00 00# 读取状态信息if len(recv) > 10:cnt = -1for i in range(LIGHT_COUNT_MAX):if (i * 2 + 10) < len(recv):if recv[i * 2 + 10] > 0:self._switchstatus[i] = Trueelse:self._switchstatus[i] = Falseelse:break# _LOGGER.error(f'{i * 2 + 8}, {i} , [{self._switchstatus[i]}]')cnt = iself._switchcount = cnt + 1else:_LOGGER.error(f'query switch status recv:[{len(recv)}] {recv.hex(" ")}')  # noqa: G004else:_LOGGER.error(f"switch status query, recv [{slen}/{len(QUERY_SWITCH_STATUS)}]"  # noqa: G004)def turn_on(self, light_id: int) -> bool:"""打开指定ID的灯光."""if light_id >= 0 and light_id < self._lightcount:self._lightstatus[light_id] = self.send_ctrl_cmd(light_id, True)return self._lightstatus[light_id]return Falsedef turn_off(self, light_id: int) -> bool:"""关闭指定ID的灯光."""if light_id >= 0 and light_id < self._lightcount:self._lightstatus[light_id] = self.send_ctrl_cmd(light_id, False)return self._lightstatus[light_id]return Falsedef get_light_status(self, light_id: int) -> bool:"""获取指定ID的灯光状态."""if light_id >= 0 and light_id < self._lightcount:return self._lightstatus[light_id]return Falsedef get_switch_status(self, switch_id: int) -> bool:"""获取指定ID的开关状态."""if switch_id >= 0 and switch_id < self._switchcount:return self._switchstatus[switch_id]return False

设备实例化后,需要注册到HA中,注册代码为:

    for idx, dev in devices.items():deviceentry = device_registry.async_get_or_create(config_entry_id=entry.entry_id,configuration_url=f"http://{dev.ip}:{dev.port}/",connections={("mac", mac), ("ip", ip), ("port", port)},manufacturer="Zhongsheng Tech",model=DOMAIN,name=DEVICE_NAME,sw_version=dev._soft_version,  # noqa: SLF001translation_key="Zhongsheng Switch",)

这里使用for循环是兼容多设备的情况。

设备搞定了,后面就是把灯和开关的实体类实现。在开始生成代码时,有这么一段代码:

# TODO List the platforms that you want to support.
# For your initial PR, limit it to 1 platform.
PLATFORMS: list[Platform] = [Platform.LIGHT]

这里就是灯、开关之类的,本设备支持的实体。(很奇怪这样的命名,Platform直译过来是平台???不知道是我英语不好还是百度翻译不靠谱,我无法理解这里用平台来把灯、开关之类的归类),因为我要实现灯和开关,所以我的PLATFORMS是这么定义的:

#  List the platforms that you want to support.
# For your initial PR, limit it to 1 platform.
PLATFORMS: list[Platform] = [Platform.LIGHT,# Platform.SWITCH,Platform.SENSOR,
]

好吧,我这里其实是使用sensor,并没有使用Switch,因为Switch在HA里是可以操作的,但是实际场景中,Switch是实体开关,HA里面操作的话,并不反映到实体开关上来,即:在我的应用场景中,HA里面的开关只是反映实体开关的状态,只读的。所以最终版本就是使用了Sensor来做。(只是现在想来,这个Sensor也没多大意义)

这里有个约定,Platforms里的实体必须存在对应的python文件,即:在组件根目录里要存在light.py和sensor.py。通过下面的代码把这些实体“注册”到HA中(我觉得使用注册这个词更加好理解,函数翻译应该是发送):

await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)

这段代码放在注册设备后就可以了,这里就完成了设备的注册和把配置发送至实体,至此,HA就知道你的组件里有这些设备及设备支持的实体了。

后续完成light.py实体代码(觉得Sensor没必要,就不在这放了):

"""中盛IO控制器中的灯光类."""from typing import Any  # noqa: D100from homeassistant.components.light import LightEntity
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.entity_platform import AddEntitiesCallbackfrom .const import DATA_DISCOVER_COMPONENT, ZHONGSHENG_DISCOVERY_ENTITY_NEW
from .core.ZhongshengDevice import ZhongshengDeviceasync def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback
) -> None:"""Set up Zhongshen switches based on a config entry."""@callbackdef async_discover(device: ZhongshengDevice, count: int, config_entry: ConfigEntry) -> None:"""Discover and add a Zhongsheng light."""for i in range(count):async_add_entities([ZhongshengLightEntity(device, i, config_entry)])hass.data[DATA_DISCOVER_COMPONENT.format("light")] = async_dispatcher_connect(hass,ZHONGSHENG_DISCOVERY_ENTITY_NEW.format("light"),async_discover,)class ZhongshengLightEntity(LightEntity):"""中盛-灯实体类."""def __init__(self,device: ZhongshengDevice,id: int = 0,entry: ConfigEntry | None = None,name: str = "",) -> None:"""初始化灯实体."""self._attr_unique_id = f"{device.mac}_light_{id}"self._id = idself._name = nameself._entry = entryself._device = deviceif self._name is None:self._name = f"{device.name}_light"self._attr_name = self._nameself._attr_is_on = self._device.get_light_status(id)super().__init__()@propertydef light_name(self) -> str:"""获取灯名称."""return self._namedef turn_on(self, **kwargs: Any) -> None:"""打开灯光."""self._attr_is_on = bool(self._device.turn_on(self._id))async def async_turn_on(self, **kwargs: Any) -> None:"""异步打开灯光."""self._attr_is_on = bool(self._device.turn_on(self._id))def turn_off(self, **kwargs: Any) -> None:"""关闭灯光."""self._attr_is_on = not bool(self._device.turn_off(self._id))async def async_turn_off(self, **kwargs: Any) -> None:"""异步关闭灯光."""self._attr_is_on = not bool(self._device.turn_off(self._id))def get_id(self) -> int:"""获取灯ID."""return self._idasync def async_update(self) -> None:"""异步更新灯状态数据."""self._device.query_light_status()self._attr_is_on = self._device.get_light_status(self._id)

灯的实体类比较好理解,也就是继承LightEntity,实现turn_on和turn_off,也包括异步的方法。具体就是通过设备去发送控制命令。(最开始我想的是通过一个单例全局变量,把设备传到灯实体对象中,然后单例又需要配置信息,这里转来转去,然后我就去吃鸡了),看前面的代码:

async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback
) -> None:"""Set up Zhongshen switches based on a config entry."""@callbackdef async_discover(device: ZhongshengDevice, count: int, config_entry: ConfigEntry) -> None:"""Discover and add a Zhongsheng light."""for i in range(count):async_add_entities([ZhongshengLightEntity(device, i, config_entry)])hass.data[DATA_DISCOVER_COMPONENT.format("light")] = async_dispatcher_connect(hass,ZHONGSHENG_DISCOVERY_ENTITY_NEW.format("light"),async_discover,)

async_setup_entry方法是HA系统规则调用的方法,不用去多想。应该是前面执行这段代码的时候,自动调用了。

await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)

async_update这个方法是给HA周期调用的,每个实体都会调用到,所以在设备对应的方法里我做了调用频率限制。测试调用频率大概是30秒调用一次(实测时,调用周期到了1分钟,分析是16次调用时间太快,500ms的限制导致在一次调用周期内满足不了,得在第二个周期内才能触发,可以优化成:把灯id传进去,只处理id=0的请求,这样就能保证每个周期都能执行。但是这个还是不理想,最终是要优化成0.5秒执行一次,查询灯状态,这样在开关动作时,及时把状态反馈到HA中,但是这个就是另外的内容了)。

这个@callback比较好玩,里面的参数全是自己定义的,意思我要实例化实体的时候,需要用到的参数,都可以在这里提出来。我这里实例化的时候,需要用到设备、灯数量和配置信息(这个最终没用上)。使用async_add_entities方法就能把实体添加到HA中了。

async_dispatcher_connect这个方法比较关键了,类似Windows里的钩子、消息,或者ROS里的话题:与hass约定一个字符串消息,触发callback。这里实体就告一段落了。

设备对象哪里有,在__init__.py里,在那里注册了设备对象,所以在那里添加这个消息的代码:

await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)for idx, dev in devices.items():  # noqa: B007zs_dev = ZhongshengDevice(dev)dcount = zs_dev.getLightCountscount = zs_dev.getSwitchCountasync_dispatcher_send(hass,ZHONGSHENG_DISCOVERY_ENTITY_NEW.format("light"),zs_dev,dcount,entry,)# async_dispatcher_send(#     hass,#     ZHONGSHENG_DISCOVERY_ENTITY_NEW.format("switch"),#     zs_dev,#     scount,#     entry,# )async_dispatcher_send(hass,ZHONGSHENG_DISCOVERY_ENTITY_NEW.format("sensor"),zs_dev,scount,entry,)

通过async_dispatcher_send方法把设备对象,数量和配置传回去。整个就完成了。最后附上结果:

设备页面

实体页面

可以把这些灯放到面板里:


http://www.mrgr.cn/news/56330.html

相关文章:

  • Flux.using 使用说明书
  • C#运动控制
  • C++数据结构-图的存储及邻接矩阵的代码实现
  • 未来人工智能:技术、趋势与挑战
  • bug的定义和测试
  • Environment类(提供了一种统一的方式来访问应用程序的配置属性)
  • 个税自然人扣缴客户端数据的备份与恢复(在那个文件夹)
  • 当小程序学会‘读心术’:表单处理的神秘法则
  • 【西电电路实验】示波器没波形的解决方法
  • hiveserver与beeline
  • eIQ笔记(UI介绍+Loss曲线+OpenART例程)
  • 『 Linux 』HTTPS
  • 在vue项目中如何使用mixins实现代码复用
  • 迪子开了个劝退价。。。
  • 【数据结构与算法】走进数据结构的“时间胶囊”——栈
  • 极氪MIX:一台只有你想不到,没有它做不到的“家用神车”
  • 移情别恋c++ ദ്ദി˶ー̀֊ー́ ) ——8.stackqueuepriority_queue(无习题)
  • Shopify到底为什么被封店
  • 即时通讯 离线消息处理初版
  • 【MYSQL】数据库基本操作----DQL(Data Query Language)---基本查询
  • 前端学习笔记(2.0)
  • Java方法重载
  • 进入Neptoon:第二周游戏指南
  • Molmo模型实战
  • Node Checking - Checkboxes and Radio Buttons 节点检查 - 复选框和单选按钮
  • 重生之“我打数据结构,真的假的?”--1.顺序表(无习题)