Core Module
PyRbs Main Class
- class pyrbs.PyRbs(config=None, log_all=False, run_on_start=True, exit_on_stop=True, cantimedb=True, dashboard=False)
Bases:
object- Parameters:
config (str | BytesIO | None)
log_all (bool)
run_on_start (bool)
dashboard (bool)
- on_pre_start()
Callback event that is executed right befor the actual start of the Restbus-Simulation
@rbs.on_pre_start(): def on_pre_start_event(self) with open("large_config.toml") as f: config = rtoml.load(f)
- Parameters:
self (PyRbs)
- on_start()
Callback event that is executed right at start of the Restbus-Simulation
@rbs.on_pre_start(): def on_start_event(self) default_can_trace.start()
- Parameters:
self (PyRbs)
- on_stop()
Callback event that is executed right at stop of the Restbus-Simulation
@rbs.on_pre_start(): def on_start_event(self) default_can_trace.stop()
- Parameters:
self (PyRbs)
- on_post_stop()
Callback event that is executed right after the stop of the Restbus-Simulation and befor actual process shutdown
@rbs.on_pre_start(): def on_pre_start_event(self) with open("large_config.toml") as f: rtoml.dump(f, some_result_or_state)
- Parameters:
self (PyRbs)
- stop()
stops the Restbus-Simulation
@rbs.timer.on("test_runtime") def test_runtime_end(self): rbs.stop()
- Return type:
None
Handlers
Bus Logger Handler
- class pyrbs.buslog.buslog_handler.BuslogHandler(_rbs)
Bases:
object- Parameters:
_rbs (PyRbs)
- add(name, logpath, channels=[], sysvar=[], mode='complete')
Adds a new BusLogger to the simulation
from pyrbs import PyRbs, BusLogger rbs = PyRbs() new_trace: BusLogger = rbs.buslog.add( "new_trace", Path(f"/default/{dt.now().strftime('%Y-%m-%d_%H-%M-%S')}.mf4") )
- Parameters:
name (str) – Identifier for this BusLogger
logpath (Union[str, Path]) – Base path for the log file, root for this logger is /basepath_traces/{project_name}@{profile_name}/
channels (List, optional) – List of channels to consider (empty uses all available channels). Defaults to [].
sysvar (List[str], optional) – List of sysvars to consider in trace file (empty uses all available sysvars). Defaults to [].
mode (Literal["complete", "toggle"], optional) – Obsolete since pause was introduced. Defaults to “complete”.
- Return type:
- get(name)
get a predefined BusLogger
- Parameters:
name (str) – Identifier for BusLogger
- Return type:
@rbs.sysvar.on_change("testbench_logging_trigger") def change_mode(self: SysVar): if self.value == "pause" tb_log = rbs.buslog.get("testbench_log") tb_log.pause()
- snapshot(name, logpath, pre_trigger, post_trigger, channels=[], sysvar=[])
Create a snapshot logger that captures data around trigger events.
A Snapshot is crated the moment this function is called so it must be embedded into the triggercondition. The file will be created at the end of the post_trigger time.
For the Snapshot to work the the cantimedb parameter of PyRbs must be True
- Parameters:
name (str) – Identifier for this snapshot
logpath (str) – Base path for exported trace files (relative to /opt/pycando/store/traces)
pre_trigger (float) – Seconds of data to retain before trigger
post_trigger (float) – Seconds of data to capture after trigger
channels ([int]) – CAN channels to capture. Defaults to [].
sysvar ([str]) – System variables to capture. Defaults to [].
- Return type:
None
- stop_all()
stops all bus logger
CAN Channel Handler
- class pyrbs.canbus.channel.can_channel_handler.CanChannelHandler(_rbs)
Bases:
object- Parameters:
_rbs (PyRbs)
- add(config)
Adds a can channel to the rbs. The use of this method is automated in the init of pyrbs class. All the channel specifications in the /config folder will be applied. If a canmatrix is found in the config it will automaticaly add all messages and signals to the rbs
- Parameters:
config (CanChannelConfig) – config will be generated from the files in /config folder
- Return type:
None
CAN Message Handler
- class pyrbs.canbus.message.can_message_handler.CanMessageHandler(_rbs)
Bases:
object- Parameters:
_rbs (PyRbs)
- add(ch_id, arb_id, frame=None)
Adds a can message to a can channel. The use of this method is automated in the init of pyrbs class. All frames found in a dbc / arxml file connected to a channel config will be added
- Parameters:
ch_id (int) – The can channel id
arb_id (int) – The Arbitration id of the can message
frame (canmatrix.Frame | None) – The Frame object from the canmatrix pyhton package
- Return type:
None
- get(identifier)
Get a can message from its identifier Exampls for identifier usage:
“CAN1.Control_01”
“ALIAS_NAME.256”
“1.256”
“1.0x100”
[“CAN1”.”Control_01”]
[1, 256]
- Parameters:
identifier (str | list[str])
- Return type:
- on(identifier, callback_name=None, priority=100, **kwargs)
Callback for a can message event
@rbs.can_message.on("CAN1.Control_01") def on_control_01_msg(self): print(f"new msg: {self}")
- Parameters:
identifier (str)
callback_name (str)
priority (int)
CAN Signal Handler
- class pyrbs.canbus.signal.can_signal_handler.CanSignalHandler(_rbs)
Bases:
object- Parameters:
_rbs (PyRbs)
- get(identifier)
Get a can signal from its identifier Exampls for identifier usage:
“CAN1.Control_01.D_Value”
“ALIAS_NAME.256.D_Value”
“1.256.D_Value”
“1.0x100.D_Value”
[“CAN1”, “Control_01”, “D_Value”]
[1, 256, “D_Value”]
from pyrbs import PyRbs, CanMessage, CanSignal, Timer rbs = PyRbs(dashboard=True) d_val_sig: CanSignal = rbs.can_signal.get("CAN1.Control_01.D_Value")
- Return type:
- on_update(identifier, callback_name=None, priority=100, **kwargs)
Callback for a can signal update event
@rbs.can_signal.on_update("CAN1.Control_01.D_Value") def on_update_dval(self): print(f"new D_Value raw: {self.raw}, phys: {self.phys}")
- Parameters:
identifier (str)
callback_name (str | None)
priority (int)
- on_change(identifier, callback_name=None, priority=100, **kwargs)
Callback for a can signal change event, triggers only if the value changed in comparison to the previos signal value
@rbs.can_signal.on_change("CAN1.Control_01.D_Value") def on_change_dval(self): print(f"new D_Value raw: {self.raw}, phys: {self.phys}")
- Parameters:
identifier (str)
callback_name (str | None)
priority (int)
- property decode_information
System Variables Handler
- class pyrbs.variables.sysvar_handler.SysVarHandler(_rbs)
Bases:
object- Parameters:
_rbs (PyRbs)
- add(identifier, init_value)
Add a sysvar to the restbus simulation A “.” will be used for namespace indentation Sysvars that where definde in a .toml or .json in the /config folder will follwo that pattern
{“pid_control”:{“p_val: 0.5,, “i_val”:0.1 “d_val”:0.2}} will result in identifiers:
“pid_control.p_val”
“pid_control.i_val”
“pid_control.d_val”
The datatype of the init_value will determine the type of the sysvar and can not be changed after init of the sysvar
from pyrbs import PyRbs, SysVar rbs = PyRbs(dashboard=True) p_val_sys: SysVar = rbs.sysvar.add("P_Value", 0.1)
- Parameters:
identifier (str)
init_value (int | float | str)
- Return type:
- get(identifier)
Gets a sysvar by its identifier
from pyrbs import PyRbs, SysVar rbs = PyRbs(dashboard=True) d_val_sys: SysVar = rbs.sysvar.get("D_Value", 0.1)
- Parameters:
identifier (str)
- Return type:
- on_update(identifier, callback_name=None, priority=100, **kwargs)
Callback for a sysvar update event
@rbs.sysvar.on_update("alive_counter") def on_update_alive_counter(self): print(f"alive count: {self.value}")
- Parameters:
identifier (str)
callback_name (str | None)
priority (int)
- on_change(identifier, callback_name=None, priority=100, **kwargs)
Callback for a sysvar change event
@rbs.sysvar.on_change("alive_counter") def on_change_alive_counter(self): print(f"alive count: {self.value}")
- Parameters:
identifier (str)
callback_name (str | None)
priority (int)
Timer Handler
- class pyrbs.timers.timer_handler.TimerHandler(_rbs)
Bases:
object- Parameters:
_rbs (PyRbs)
- property monotonic: float
- add(name, time_to_elapse, execution_count=0, active_on_start=False)
Add a timer
from pyrbs import PyRbs, Timer rbs = PyRbs() timer_100ms: Timer = rbs.timer.add("100ms_timer", 0.1, 0, True)
- Parameters:
name (str) – name of the timer
time_to_elapse (float) – duration of the timer
execution_count (int, optional) – 0 = endles repetition. 1 = 1x execution 2 = 2x… Defaults to 0.
active_on_start (bool, optional) – timer starts running at “on_start” event. Defaults to False.
- Return type:
- check()
- Return type:
float
- on(timer_name, callback_name=None, priority=100, **kwargs)
Callback for a due timer event
@rbs.timer.on("100ms_timer") def on_100ms_timer(self): print(f"ping from timer")
- Parameters:
timer_name (str)
callback_name (str)
priority (int)
- get_timer_by_name(name)
Get a timer by name
- Parameters:
name (str) – name of timer
- Return type:
- delete(name)
Delete a timer by name
- Parameters:
name (str)
- Return type:
None
State Machine Handler
- class pyrbs.statemachine.statemachine_handler.StatemachineHandler(_rbs)
Bases:
objectPyRbs uses the python-statemachine package docs can be found here: https://python-statemachine.readthedocs.io/en/latest/
- Parameters:
_rbs (PyRbs)
- add_statemachine(name, statemachine)
Adds a statemachine to the restbus simulation
# /rbs/control_states.py from statemachine import StateMachine, State, exceptions class ControllerTargetStates(StateMachine): sleep = State("sleep", initial=True) standby = State("standby") speed = State("speed") torque = State("torque") to_standby = sleep.to(standby) | speed.to(standby) | torque.to(standby) to_speed = standby.to(speed) to_torque = standby.to(torque) to_sleep = standby.to(sleep) to_save_state = sleep.to(standby) | speed.to(standby) | torque.to(standby)
# /rbs/main.py from pyrbs import PyRbs from control_states import ControllerTargetStates rbs = PyRbs() ctrl_state: ControllerTargetStates = rbs.state.add_statemachine( "ctrl_state", ControllerTargetStates )
- Parameters:
name (str)
statemachine (object)
- Return type:
object
- on_enter(name, state, priority=100)
Callback for a on_enter state event
@rbs.state.on_enter("ctrl_state", "sleep") def on_enter_sleep(self: ControllerTargetStates): select_mode.value = 0 timer_100ms.stop()
- Parameters:
priority (int)
- on_exit(name, state, priority=100)
Callback for a on_exit state event
@rbs.state.on_exit("ctrl_state", "sleep") def on_exit_sleep(self: ControllerTargetStates): timer_100ms.start()
- Parameters:
priority (int)
- before_transition(name, state, priority=100)
Callback for a before_transition state event
- Parameters:
priority (int)
- on_transition(name, state, priority=100)
Callback for a on_transition state event
- Parameters:
priority (int)
- after_transition(name, state, priority=100)
Callback for a after_transition state event
- Parameters:
priority (int)