#!/usr/bin/env python
# -*- coding: utf-8 -*-
from threading import Thread
from struct import unpack, calcsize
[docs]class Timeout(Exception):
"""
This class rises timeout exceptions
"""
pass
[docs]class Service(object):
"""
This base class represents a service
"""
[docs] @classmethod
def from_bytes(cls, buffer):
"""
Unpack Service data buffer
"""
format_size = calcsize(cls.PACK_FORMAT)
return cls(*unpack(cls.PACK_FORMAT, buffer[:format_size]), buffer[format_size:])
def __repr__(self):
return "{}{}".format(type(self).__name__, vars(self))
[docs] def handle(self, source_address, destination_address, exeption=None):
"""
Method intended to be overriden by childs
"""
pass
[docs]class CanNode(Thread):
"""
This base class let a can node run in a single thread
Attributes
----------
driver: PythonCanDriver
access low-level CAN bus
address: int
CAN node address
services: list
a list of services
isRunning: boolean
state of the CAN node
"""
[docs] def __init__(self, driver, address):
"""
Initialize Thread
Parameters
----------
driver : PythonCanDriver
Relying can interface
address : int
Node CAN address
"""
Thread.__init__(self)
self.driver = driver
self.address = address
self.services = []
self.isRunning = True
[docs] def stop(self):
"""
Set CanNode.isRunning boolean class variable to False
"""
self.isRunning = False
[docs] def add_service(self, handler):
"""
Add service handler to the CAN node
"""
self.services.append(handler)
[docs] def run(self):
"""
Override method from Thread to permanently read data on the CAN bus if CanNode.isRunning is True
"""
while self.isRunning:
identifier, data, dlc, flag, time = self.driver.receive()
if identifier is None:
continue
self.handle_rx_frame(identifier, data, dlc, flag, time)