Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions debian/labgrid.install
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ debian/labgrid-exporter /usr/bin
debian/labgrid-pytest /usr/bin
debian/labgrid-suggest /usr/bin
helpers/labgrid-bound-connect /usr/sbin
helpers/labgrid-can-interface /usr/sbin
helpers/labgrid-raw-interface /usr/sbin
contrib/completion/labgrid-client.bash => /usr/share/bash-completion/completions/labgrid-client
146 changes: 146 additions & 0 deletions helpers/labgrid-can-interface
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
#!/usr/bin/env python3
#
# Wrapper script to be deployed on machines that should allow controlled
# manipulation of CAN network interfaces. A /etc/labgrid/helpers.yaml
# can deny access to CAN interfaces. See below.
#
# This is intended to be used via sudo. For example, add via visudo:
# %developers ALL = NOPASSWD: /usr/sbin/labgrid-can-interface

import argparse
import os
import sys

import yaml

# Linux CAN interface type
ARPHRD_CAN = 280

def get_denylist():
denylist_file = "/etc/labgrid/helpers.yaml"
try:
with open(denylist_file) as stream:
data = yaml.load(stream, Loader=yaml.SafeLoader)
except (PermissionError, FileNotFoundError, AttributeError) as e:
raise Exception(f"No configuration file ({denylist_file}), inaccessable or invalid yaml") from e

denylist = data.get("can-interface", {}).get("denied-interfaces", [])

if not isinstance(denylist, list):
raise Exception("No explicit denied-interfaces or not a list, please check your configuration")

return denylist


def is_can_interface(ifname: str) -> bool:
"""Check if the given interface is a CAN interface. It may throw exceptions if the
interface does not exist or is inaccessible.

Args:
ifname (str): name of the interface

Returns:
bool: True if the interface is a CAN, False otherwise.
"""
type_path = f"/sys/class/net/{ifname}/type"

with open(type_path, "r") as f:
if_type = int(f.read().strip())
return if_type == ARPHRD_CAN


def main(program, options):
if not options.ifname:
raise ValueError("Empty interface name.")
if any((c == "/" or c.isspace()) for c in options.ifname):
raise ValueError(f"Interface name '{options.ifname}' contains invalid characters.")
if len(options.ifname) > 15:
raise ValueError(f"Interface name '{options.ifname}' is too long.")

denylist = get_denylist()

if options.ifname in denylist:
raise ValueError(f"Interface name '{options.ifname}' is denied in denylist.")

programs = ["ip", "tc"]
if program not in programs:
raise ValueError(f"Invalid program {program} called with wrapper, valid programs are: {programs}")

if not (program == "ip" and options.action == "add"):
if not is_can_interface(options.ifname):
raise ValueError(f"Interface '{options.ifname}' is not a CAN interface.")

args = [program]

if program == "ip":
if options.action == "add":
args.extend(["link", "add", "name", options.ifname, "type", "vcan"])
elif options.action == "del":
args.extend(["link", "del", "name", options.ifname])
elif options.action in ("up", "down"):
args.extend(["link", "set", "dev", options.ifname, options.action])
elif options.action == "set-bitrate":
if options.bitrate is None:
raise ValueError("Missing bitrate for set-bitrate action")
try:
bitrate = int(options.bitrate)
except ValueError:
raise ValueError("Bitrate must be an integer")
if bitrate <= 0:
raise ValueError("Bitrate must be higher than 0")
args.extend(["link", "set", options.ifname, "type", "can", "bitrate", str(bitrate)])
else:
raise ValueError(f"Invalid ip subcommand '{options.action}'")
elif program == "tc":
if options.action == "set-bitrate":
try:
bitrate = int(options.bitrate)
except ValueError:
raise ValueError("Bitrate must be an integer")
if bitrate <= 0:
raise ValueError("Bitrate must be higher than 0")

# Only the bitrate parameter is configured, the rest is hardcoded per cannelloni example
args.extend(["qdisc", "add", "dev", options.ifname, "root", "tbf", "rate", f"{bitrate}bit", "latency", "100ms", "burst", "1000"])
else:
raise ValueError(f"Invalid tc subcommand '{options.action}'")

try:
os.execvp(args[0], args)
except FileNotFoundError as e:
raise RuntimeError(f"Missing {program} binary") from e


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-d", "--debug", action="store_true", default=False, help="enable debug mode")
subparsers = parser.add_subparsers(dest="program", help="program to run")

# ip
ip_parser = subparsers.add_parser("ip")
ip_parser.add_argument("ifname", type=str, help="interface name")
ip_parser.add_argument(
"action",
type=str,
choices=["add", "del", "up", "down", "set-bitrate"],
help="action, one of {%(choices)s}",
)
ip_parser.add_argument("bitrate", nargs="?", type=int,
help="bitrate in bits per second, mandatory for set-bitrate action")

# tc
tc_parser = subparsers.add_parser("tc")
tc_parser.add_argument("ifname", type=str, help="interface name")
tc_parser.add_argument("action", type=str, choices=["set-bitrate"], help="action, one of {%(choices)s}")
tc_parser.add_argument("bitrate", type=int, help="bitrate in bits per second for set-bitrate")

args = parser.parse_args()
try:
main(args.program, args)
except Exception as e: # pylint: disable=broad-except
if args.debug:
import traceback

traceback.print_exc(file=sys.stderr)
print(f"ERROR: {e}", file=sys.stderr)
exit(1)
1 change: 1 addition & 0 deletions labgrid/driver/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .bareboxdriver import BareboxDriver
from .candriver import CANDriver
from .ubootdriver import UBootDriver
from .smallubootdriver import SmallUBootDriver
from .serialdriver import SerialDriver
Expand Down
111 changes: 111 additions & 0 deletions labgrid/driver/candriver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import contextlib
import shutil
import subprocess
import warnings

import attr

from ..factory import target_factory
from ..resource import NetworkCANPort, RawCANPort
from ..resource.udev import USBCANPort
from ..util.helper import processwrapper
from ..util.proxy import proxymanager
from .common import Driver


@target_factory.reg_driver
@attr.s(eq=False)
class CANDriver(Driver):
bindings = {
"port": {"NetworkCANPort", "RawCANPort", "USBCANPort"},
}

def __attrs_post_init__(self):
super().__attrs_post_init__()
self.ifname = None
self.child = None
self.can_helper = shutil.which("labgrid-can-interface")
if self.can_helper is None:
self.can_helper = "/usr/sbin/labgrid-can-interface"
warnings.warn(f"labgrid-can-interface helper not found, falling back to {self.can_helper}")
self.helper_wrapper = ["sudo", self.can_helper]
self.cannelloni_bin = shutil.which("cannelloni")
if self.cannelloni_bin is None:
self.cannelloni_bin = "/usr/bin/cannelloni"
warnings.warn(f"cannelloni binary not found, falling back to {self.cannelloni_bin}")

def _wrap_command(self, cmd):
return self.helper_wrapper + cmd

def on_activate(self):
if isinstance(self.port, NetworkCANPort):
host, port = proxymanager.get_host_and_port(self.port)
# XXX The port might not be unique, use something better
self.ifname = f"lg_vcan{port}"

cmd_ip_add = self._wrap_command(["ip", self.ifname, "add"])
processwrapper.check_output(cmd_ip_add)

cmd_ip_up = self._wrap_command(["ip", self.ifname, "up"])
processwrapper.check_output(cmd_ip_up)

cmd_tc = self._wrap_command(["tc", self.ifname, "set-bitrate", str(self.port.speed)])
processwrapper.check_output(cmd_tc)
cmd_cannelloni = [
self.cannelloni_bin,
"-C", "c",
"-I", f"{self.ifname}",
"-R", f"{host}",
"-r", f"{port}",
]
self.logger.info("Running command: %s", cmd_cannelloni)
self.child = subprocess.Popen(cmd_cannelloni)
# XXX How to check the process? Ideally read output and find the "connected" string?
elif isinstance(self.port, (RawCANPort, USBCANPort)):
host = None
self.ifname = self.port.ifname

cmd_down = self._wrap_command(["ip", self.ifname, "down"])
processwrapper.check_output(cmd_down)

cmd_type_bitrate = self._wrap_command(["ip", self.ifname, "set-bitrate", str(self.port.speed)])
processwrapper.check_output(cmd_type_bitrate)

cmd_up = self._wrap_command(["ip", self.ifname, "up"])
processwrapper.check_output(cmd_up)
else:
raise NotImplementedError(f"Unsupported CAN resource: {self.port}")

def on_deactivate(self):
ifname = self.ifname
self.ifname = None
if isinstance(self.port, NetworkCANPort):
assert self.child
child = self.child
self.child = None
child.terminate()
try:
child.wait(2.0)
except subprocess.TimeoutExpired:
self.logger.warning("cannelloni on %s still running after SIGTERM", ifname)
child.kill()
child.wait(1.0)
self.logger.info("stopped cannelloni for interface %s", ifname)

cmd_ip_del = self._wrap_command(["ip", ifname, "del"])
processwrapper.check_output(cmd_ip_del)
else:
cmd_down = self._wrap_command(["ip", ifname, "down"])
processwrapper.check_output(cmd_down)

@Driver.check_bound
def get_export_vars(self):
export_vars = {
"ifname": self.ifname,
"speed": str(self.port.speed),
}
if isinstance(self.port, NetworkCANPort):
host, port = proxymanager.get_host_and_port(self.port)
export_vars["host"] = host
export_vars["port"] = str(port)
return export_vars
98 changes: 98 additions & 0 deletions labgrid/remote/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from .common import ResourceEntry, queue_as_aiter
from .generated import labgrid_coordinator_pb2, labgrid_coordinator_pb2_grpc
from ..util import get_free_port, labgrid_version
from ..util.helper import processwrapper


exports: Dict[str, Type[ResourceEntry]] = {}
Expand Down Expand Up @@ -304,6 +305,103 @@ def _stop(self, start_params):
exports["RawSerialPort"] = SerialPortExport


@attr.s(eq=False)
class CANPortExport(ResourceExport):
"""ResourceExport for a USB and Raw CANPort"""

def __attrs_post_init__(self):
super().__attrs_post_init__()
if self.cls == "RawCANPort":
from ..resource.canport import RawCANPort

self.local = RawCANPort(target=None, name=None, **self.local_params)
elif self.cls == "USBCANPort":
from ..resource.udev import USBCANPort

self.local = USBCANPort(target=None, name=None, **self.local_params)
self.data["cls"] = "NetworkCANPort"
self.child = None
self.port = None
self.can_helper = shutil.which("labgrid-can-interface")
if self.can_helper is None:
self.can_helper = "/usr/sbin/labgrid-can-interface"
warnings.warn(f"labgrid-can-interface helper not found, falling back to {self.can_helper}")
self.helper_wrapper = ["sudo", self.can_helper]
self.cannelloni_bin = shutil.which("cannelloni")
if self.cannelloni_bin is None:
self.cannelloni_bin = "/usr/bin/cannelloni"
warnings.warn(f"cannelloni binary not found, falling back to {self.cannelloni_bin}")

def __del__(self):
if self.child is not None:
self.stop()

def _get_start_params(self):
return {
"ifname": self.local.ifname,
}

def _get_params(self):
"""Helper function to return parameters"""
return {
"host": self.host,
"port": self.port,
"speed": self.local.speed,
"extra": {
"ifname": self.local.ifname,
},
}

def _start(self, start_params):
"""Start ``cannelloni`` subprocess"""
assert self.local.avail
assert self.child is None
self.port = get_free_port()

processwrapper.check_output(self.helper_wrapper + ["ip", self.local.ifname, "down"])
processwrapper.check_output(self.helper_wrapper + ["ip", self.local.ifname, "set-bitrate", str(self.local.speed)])
processwrapper.check_output(self.helper_wrapper + ["ip", self.local.ifname, "up"])

cmd_cannelloni = [
self.cannelloni_bin,
"-C", "s",
# XXX Set "no peer checking" mode. Is it ok? It seems so for serial...
"-p",
"-I", f"{self.local.ifname}",
"-l", f"{self.port}",
]
self.logger.info("Starting cannelloni with: %s", " ".join(cmd_cannelloni))
self.child = subprocess.Popen(cmd_cannelloni)
try:
self.child.wait(timeout=2)
raise ExporterError(f"cannelloni for {start_params['ifname']} exited immediately")
except subprocess.TimeoutExpired:
# good, cannelloni didn't exit immediately
pass
self.logger.info("cannelloni started for %s on port %d", start_params["ifname"], self.port)

def _stop(self, start_params):
"""Stop ``cannelloni`` subprocess and disable the interface"""
assert self.child
child = self.child
self.child = None
port = self.port
self.port = None
child.terminate()
try:
child.wait(3.0)
except subprocess.TimeoutExpired:
self.logger.warning("cannelloni for %s still running after SIGTERM", start_params["ifname"])
log_subprocess_kernel_stack(self.logger, child)
child.kill()
child.wait(1.0)
self.logger.info("cannelloni stopped for %s on port %d", start_params["ifname"], port)
processwrapper.check_output(self.helper_wrapper + ["ip", start_params["ifname"], "down"])

exports["USBCANPort"] = CANPortExport
exports["RawCANPort"] = CANPortExport


@attr.s(eq=False)
class NetworkInterfaceExport(ResourceExport):
"""ResourceExport for a network interface"""
Expand Down
4 changes: 3 additions & 1 deletion labgrid/resource/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .base import SerialPort, NetworkInterface, EthernetPort, SysfsGPIO
from .base import CANPort, SerialPort, NetworkInterface, EthernetPort, SysfsGPIO
from .canport import NetworkCANPort, RawCANPort
from .ethernetport import SNMPEthernetPort
from .serialport import RawSerialPort, NetworkSerialPort
from .modbus import ModbusTCPCoil
Expand All @@ -22,6 +23,7 @@
SigrokUSBDevice,
SigrokUSBSerialDevice,
USBAudioInput,
USBCANPort,
USBDebugger,
USBFlashableDevice,
USBMassStorage,
Expand Down
Loading