Core Module

PyRbs Main Class

class pyrbs.PyRbs(config=None, log_all=False, run_on_start=True, exit_on_stop=True, cantimedb=True, dashboard=False)

Bases: object

Parameters:
  • config (str | BytesIO | None)

  • log_all (bool)

  • run_on_start (bool)

  • dashboard (bool)

on_pre_start()

Callback event that is executed right befor the actual start of the Restbus-Simulation

@rbs.on_pre_start():
def on_pre_start_event(self)
    with open("large_config.toml") as f:
        config = rtoml.load(f)
Parameters:

self (PyRbs)

on_start()

Callback event that is executed right at start of the Restbus-Simulation

@rbs.on_pre_start():
def on_start_event(self)
    default_can_trace.start()
Parameters:

self (PyRbs)

on_stop()

Callback event that is executed right at stop of the Restbus-Simulation

@rbs.on_pre_start():
def on_start_event(self)
    default_can_trace.stop()
Parameters:

self (PyRbs)

on_post_stop()

Callback event that is executed right after the stop of the Restbus-Simulation and befor actual process shutdown

@rbs.on_pre_start():
def on_pre_start_event(self)
    with open("large_config.toml") as f:
        rtoml.dump(f, some_result_or_state)
Parameters:

self (PyRbs)

stop()

stops the Restbus-Simulation

@rbs.timer.on("test_runtime")
def test_runtime_end(self):
    rbs.stop()
Return type:

None

Handlers

Bus Logger Handler

class pyrbs.buslog.buslog_handler.BuslogHandler(_rbs)

Bases: object

Parameters:

_rbs (PyRbs)

add(name, logpath, channels=[], sysvar=[], mode='complete')

Adds a new BusLogger to the simulation

from pyrbs import PyRbs, BusLogger
rbs = PyRbs()

new_trace: BusLogger = rbs.buslog.add(
    "new_trace", Path(f"/default/{dt.now().strftime('%Y-%m-%d_%H-%M-%S')}.mf4")
)
Parameters:
  • name (str) – Identifier for this BusLogger

  • logpath (Union[str, Path]) – Base path for the log file, root for this logger is /basepath_traces/{project_name}@{profile_name}/

  • channels (List, optional) – List of channels to consider (empty uses all available channels). Defaults to [].

  • sysvar (List[str], optional) – List of sysvars to consider in trace file (empty uses all available sysvars). Defaults to [].

  • mode (Literal["complete", "toggle"], optional) – Obsolete since pause was introduced. Defaults to “complete”.

Return type:

BusLogger

get(name)

get a predefined BusLogger

Parameters:

name (str) – Identifier for BusLogger

Return type:

BusLogger

@rbs.sysvar.on_change("testbench_logging_trigger")
def change_mode(self: SysVar):
    if self.value == "pause"
        tb_log = rbs.buslog.get("testbench_log")
        tb_log.pause()
snapshot(name, logpath, pre_trigger, post_trigger, channels=[], sysvar=[])

Create a snapshot logger that captures data around trigger events.

A Snapshot is crated the moment this function is called so it must be embedded into the triggercondition. The file will be created at the end of the post_trigger time.

For the Snapshot to work the the cantimedb parameter of PyRbs must be True

Parameters:
  • name (str) – Identifier for this snapshot

  • logpath (str) – Base path for exported trace files (relative to /opt/pycando/store/traces)

  • pre_trigger (float) – Seconds of data to retain before trigger

  • post_trigger (float) – Seconds of data to capture after trigger

  • channels ([int]) – CAN channels to capture. Defaults to [].

  • sysvar ([str]) – System variables to capture. Defaults to [].

Return type:

None

stop_all()

stops all bus logger

CAN Channel Handler

class pyrbs.canbus.channel.can_channel_handler.CanChannelHandler(_rbs)

Bases: object

Parameters:

_rbs (PyRbs)

add(config)

Adds a can channel to the rbs. The use of this method is automated in the init of pyrbs class. All the channel specifications in the /config folder will be applied. If a canmatrix is found in the config it will automaticaly add all messages and signals to the rbs

Parameters:

config (CanChannelConfig) – config will be generated from the files in /config folder

Return type:

None

CAN Message Handler

class pyrbs.canbus.message.can_message_handler.CanMessageHandler(_rbs)

Bases: object

Parameters:

_rbs (PyRbs)

add(ch_id, arb_id, frame=None)

Adds a can message to a can channel. The use of this method is automated in the init of pyrbs class. All frames found in a dbc / arxml file connected to a channel config will be added

Parameters:
  • ch_id (int) – The can channel id

  • arb_id (int) – The Arbitration id of the can message

  • frame (canmatrix.Frame | None) – The Frame object from the canmatrix pyhton package

Return type:

None

get(identifier)

Get a can message from its identifier Exampls for identifier usage:

  • “CAN1.Control_01”

  • “ALIAS_NAME.256”

  • “1.256”

  • “1.0x100”

  • [“CAN1”.”Control_01”]

  • [1, 256]

Parameters:

identifier (str | list[str])

Return type:

CanMessage

on(identifier, callback_name=None, priority=100, **kwargs)

Callback for a can message event

@rbs.can_message.on("CAN1.Control_01")
def on_control_01_msg(self):
    print(f"new msg: {self}")
Parameters:
  • identifier (str)

  • callback_name (str)

  • priority (int)

CAN Signal Handler

class pyrbs.canbus.signal.can_signal_handler.CanSignalHandler(_rbs)

Bases: object

Parameters:

_rbs (PyRbs)

get(identifier)

Get a can signal from its identifier Exampls for identifier usage:

  • “CAN1.Control_01.D_Value”

  • “ALIAS_NAME.256.D_Value”

  • “1.256.D_Value”

  • “1.0x100.D_Value”

  • [“CAN1”, “Control_01”, “D_Value”]

  • [1, 256, “D_Value”]

from pyrbs import PyRbs, CanMessage, CanSignal, Timer
rbs = PyRbs(dashboard=True)

d_val_sig: CanSignal = rbs.can_signal.get("CAN1.Control_01.D_Value")
Return type:

CanSignal

on_update(identifier, callback_name=None, priority=100, **kwargs)

Callback for a can signal update event

@rbs.can_signal.on_update("CAN1.Control_01.D_Value")
def on_update_dval(self):
    print(f"new D_Value raw: {self.raw}, phys: {self.phys}")
Parameters:
  • identifier (str)

  • callback_name (str | None)

  • priority (int)

on_change(identifier, callback_name=None, priority=100, **kwargs)

Callback for a can signal change event, triggers only if the value changed in comparison to the previos signal value

@rbs.can_signal.on_change("CAN1.Control_01.D_Value")
def on_change_dval(self):
    print(f"new D_Value raw: {self.raw}, phys: {self.phys}")
Parameters:
  • identifier (str)

  • callback_name (str | None)

  • priority (int)

property decode_information

System Variables Handler

class pyrbs.variables.sysvar_handler.SysVarHandler(_rbs)

Bases: object

Parameters:

_rbs (PyRbs)

add(identifier, init_value)

Add a sysvar to the restbus simulation A “.” will be used for namespace indentation Sysvars that where definde in a .toml or .json in the /config folder will follwo that pattern

{“pid_control”:{“p_val: 0.5,, “i_val”:0.1 “d_val”:0.2}} will result in identifiers:

  • “pid_control.p_val”

  • “pid_control.i_val”

  • “pid_control.d_val”

The datatype of the init_value will determine the type of the sysvar and can not be changed after init of the sysvar

from pyrbs import PyRbs, SysVar
rbs = PyRbs(dashboard=True)

p_val_sys: SysVar = rbs.sysvar.add("P_Value", 0.1)
Parameters:
  • identifier (str)

  • init_value (int | float | str)

Return type:

SysVar

get(identifier)

Gets a sysvar by its identifier

from pyrbs import PyRbs, SysVar
rbs = PyRbs(dashboard=True)

d_val_sys: SysVar = rbs.sysvar.get("D_Value", 0.1)
Parameters:

identifier (str)

Return type:

SysVar

on_update(identifier, callback_name=None, priority=100, **kwargs)

Callback for a sysvar update event

@rbs.sysvar.on_update("alive_counter")
def on_update_alive_counter(self):
    print(f"alive count: {self.value}")
Parameters:
  • identifier (str)

  • callback_name (str | None)

  • priority (int)

on_change(identifier, callback_name=None, priority=100, **kwargs)

Callback for a sysvar change event

@rbs.sysvar.on_change("alive_counter")
def on_change_alive_counter(self):
    print(f"alive count: {self.value}")
Parameters:
  • identifier (str)

  • callback_name (str | None)

  • priority (int)

Timer Handler

class pyrbs.timers.timer_handler.TimerHandler(_rbs)

Bases: object

Parameters:

_rbs (PyRbs)

property monotonic: float
add(name, time_to_elapse, execution_count=0, active_on_start=False)

Add a timer

from pyrbs import PyRbs, Timer
rbs = PyRbs()

timer_100ms: Timer = rbs.timer.add("100ms_timer", 0.1, 0, True)
Parameters:
  • name (str) – name of the timer

  • time_to_elapse (float) – duration of the timer

  • execution_count (int, optional) – 0 = endles repetition. 1 = 1x execution 2 = 2x… Defaults to 0.

  • active_on_start (bool, optional) – timer starts running at “on_start” event. Defaults to False.

Return type:

Timer

check()
Return type:

float

on(timer_name, callback_name=None, priority=100, **kwargs)

Callback for a due timer event

@rbs.timer.on("100ms_timer")
def on_100ms_timer(self):
    print(f"ping from timer")
Parameters:
  • timer_name (str)

  • callback_name (str)

  • priority (int)

get_timer_by_name(name)

Get a timer by name

Parameters:

name (str) – name of timer

Return type:

Timer

delete(name)

Delete a timer by name

Parameters:

name (str)

Return type:

None

State Machine Handler

class pyrbs.statemachine.statemachine_handler.StatemachineHandler(_rbs)

Bases: object

PyRbs uses the python-statemachine package docs can be found here: https://python-statemachine.readthedocs.io/en/latest/

Parameters:

_rbs (PyRbs)

add_statemachine(name, statemachine)

Adds a statemachine to the restbus simulation

# /rbs/control_states.py
from statemachine import StateMachine, State, exceptions

class ControllerTargetStates(StateMachine):
    sleep = State("sleep", initial=True)
    standby = State("standby")
    speed = State("speed")
    torque = State("torque")

    to_standby = sleep.to(standby) | speed.to(standby) | torque.to(standby)
    to_speed = standby.to(speed)
    to_torque = standby.to(torque)
    to_sleep = standby.to(sleep)
    to_save_state = sleep.to(standby) | speed.to(standby) | torque.to(standby)
# /rbs/main.py
from pyrbs import PyRbs
from control_states import ControllerTargetStates

rbs = PyRbs()

ctrl_state: ControllerTargetStates = rbs.state.add_statemachine(
    "ctrl_state", ControllerTargetStates
)
Parameters:
  • name (str)

  • statemachine (object)

Return type:

object

on_enter(name, state, priority=100)

Callback for a on_enter state event

@rbs.state.on_enter("ctrl_state", "sleep")
def on_enter_sleep(self: ControllerTargetStates):
    select_mode.value = 0
    timer_100ms.stop()
Parameters:

priority (int)

on_exit(name, state, priority=100)

Callback for a on_exit state event

@rbs.state.on_exit("ctrl_state", "sleep")
def on_exit_sleep(self: ControllerTargetStates):
    timer_100ms.start()
Parameters:

priority (int)

before_transition(name, state, priority=100)

Callback for a before_transition state event

Parameters:

priority (int)

on_transition(name, state, priority=100)

Callback for a on_transition state event

Parameters:

priority (int)

after_transition(name, state, priority=100)

Callback for a after_transition state event

Parameters:

priority (int)