From 85340b52a8ff57a9b6f6bb24709128dfe8d28d66 Mon Sep 17 00:00:00 2001 From: "Erlend E. Aasland" Date: Fri, 12 Jul 2024 13:38:54 +0200 Subject: [PATCH 1/3] mypy: disable notes --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index e9c85993..de855934 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -56,3 +56,4 @@ exclude = [ "^test*", "^setup.py*", ] +disable_error_code = "annotation-unchecked" From 289ef7f893777e05946a1457789a9a4da1df02ab Mon Sep 17 00:00:00 2001 From: "Erlend E. Aasland" Date: Thu, 11 Jul 2024 14:24:26 +0200 Subject: [PATCH 2/3] Fixup NMT --- canopen/nmt.py | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/canopen/nmt.py b/canopen/nmt.py index 8ce737ea..76c71c89 100644 --- a/canopen/nmt.py +++ b/canopen/nmt.py @@ -2,11 +2,17 @@ import logging import struct import time -from typing import Callable, Optional +from collections.abc import Callable +from typing import Dict, Final, List, Optional, TYPE_CHECKING + + +if TYPE_CHECKING: + from canopen import Network + logger = logging.getLogger(__name__) -NMT_STATES = { +NMT_STATES: Final[Dict[int, str]] = { 0: 'INITIALISING', 4: 'STOPPED', 5: 'OPERATIONAL', @@ -15,7 +21,7 @@ 127: 'PRE-OPERATIONAL' } -NMT_COMMANDS = { +NMT_COMMANDS: Final[Dict[str, int]] = { 'OPERATIONAL': 1, 'STOPPED': 2, 'SLEEP': 80, @@ -26,7 +32,7 @@ 'RESET COMMUNICATION': 130 } -COMMAND_TO_STATE = { +COMMAND_TO_STATE: Final[Dict[int, int]] = { 1: 5, 2: 4, 80: 80, @@ -45,7 +51,7 @@ class NmtBase: def __init__(self, node_id: int): self.id = node_id - self.network = None + self.network: Optional[Network] = None self._state = 0 def on_command(self, can_id, data, timestamp): @@ -111,7 +117,7 @@ def __init__(self, node_id: int): #: Timestamp of last heartbeat message self.timestamp: Optional[float] = None self.state_update = threading.Condition() - self._callbacks = [] + self._callbacks: List[Callable[[int], None]] = [] def on_heartbeat(self, can_id, data, timestamp): with self.state_update: @@ -139,6 +145,7 @@ def send_command(self, code: int): super(NmtMaster, self).send_command(code) logger.info( "Sending NMT command 0x%X to node %d", code, self.id) + assert self.network is not None self.network.send_message(0, [code, self.id]) def wait_for_heartbeat(self, timeout: float = 10): @@ -180,7 +187,9 @@ def start_node_guarding(self, period: float): :param period: Period (in seconds) at which the node guarding should be advertised to the slave node. """ - if self._node_guarding_producer : self.stop_node_guarding() + if self._node_guarding_producer: + self.stop_node_guarding() + assert self.network is not None self._node_guarding_producer = self.network.send_periodic(0x700 + self.id, None, period, True) def stop_node_guarding(self): @@ -216,6 +225,7 @@ def send_command(self, code: int) -> None: if self._state == 0: logger.info("Sending boot-up message") + assert self.network is not None self.network.send_message(0x700 + self.id, [0]) # The heartbeat service should start on the transition @@ -246,6 +256,7 @@ def start_heartbeat(self, heartbeat_time_ms: int): self.stop_heartbeat() if heartbeat_time_ms > 0: logger.info("Start the heartbeat timer, interval is %d ms", self._heartbeat_time_ms) + assert self.network is not None self._send_task = self.network.send_periodic( 0x700 + self.id, [self._state], heartbeat_time_ms / 1000.0) From fe787f3f4ec37ceb7fd648faba43d8277a857fb8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Colomb?= Date: Mon, 9 Jun 2025 21:49:28 +0200 Subject: [PATCH 3/3] Remove network is not None assertions, superseeded. --- canopen/nmt.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/canopen/nmt.py b/canopen/nmt.py index 7a968627..c13d0779 100644 --- a/canopen/nmt.py +++ b/canopen/nmt.py @@ -145,7 +145,6 @@ def send_command(self, code: int): super(NmtMaster, self).send_command(code) logger.info( "Sending NMT command 0x%X to node %d", code, self.id) - assert self.network is not None self.network.send_message(0, [code, self.id]) def wait_for_heartbeat(self, timeout: float = 10): @@ -189,7 +188,6 @@ def start_node_guarding(self, period: float): """ if self._node_guarding_producer: self.stop_node_guarding() - assert self.network is not None self._node_guarding_producer = self.network.send_periodic(0x700 + self.id, None, period, True) def stop_node_guarding(self): @@ -225,7 +223,6 @@ def send_command(self, code: int) -> None: if self._state == 0: logger.info("Sending boot-up message") - assert self.network is not None self.network.send_message(0x700 + self.id, [0]) # The heartbeat service should start on the transition @@ -256,7 +253,6 @@ def start_heartbeat(self, heartbeat_time_ms: int): self.stop_heartbeat() if heartbeat_time_ms > 0: logger.info("Start the heartbeat timer, interval is %d ms", self._heartbeat_time_ms) - assert self.network is not None self._send_task = self.network.send_periodic( 0x700 + self.id, [self._state], heartbeat_time_ms / 1000.0)