-
Notifications
You must be signed in to change notification settings - Fork 0
Adding a new task
Dardo edited this page Apr 2, 2025
·
4 revisions
To add a new task to the system, you need to define a function that implements the measurement logic and register it in the list of available tasks.
A task follows this structure:
-
Defining the Main Function
- The function takes two parameters:
-
data
: a list ofChartData
objects used to store measurement results. -
exit_flag
: anEvent
object to safely terminate the task.
-
- A
Connections
instance is initialized to access the required instruments. - If an instrument is unavailable, the task exits with an error.
- A thread is spawned to process data in the background.
- The
while
loop runs measurements untilexit_flag
is set.
- The function takes two parameters:
-
Registering the Task
- A function
init_<task_name>()
is defined to create and register aTask
object. - This function is added to the task list in
__init__.py
usingtask_obj.add_init_task(init_<task_name>)
.
- A function
from tasks import Task, Tasks, ChartData
import random
from typing import Optional, List
from instruments import Instrument_Entry
from connections import Connections
from threading import Thread, Event
import time
import logging
from ..utilities import spawn_data_processor, generic_processor
from ...instruments import RaspberrySIM
def example_math_formula(x: float) -> float:
"""Computes the square root of a given value."""
return x ** 0.5
def mytask_1(data: List[ChartData], exit_flag: Event) -> None:
"""Performs a measurement using RaspberrySIM."""
cur_chart_data_1 = ChartData(
name="Square root of values", math_formula_y=example_math_formula
)
data.append(cur_chart_data_1)
conn_object = Connections()
logger = logging.getLogger(__name__)
instr_entry: Optional[Instrument_Entry] = conn_object.get_instrument("raspberry")
if instr_entry is None:
logger.error("Instrument not found.")
exit_flag.set()
return
myInstrument: RaspberrySIM = instr_entry.scpi_instrument
newThreadProcessor = spawn_data_processor(data, exit_flag, generic_processor)
try:
while not exit_flag.is_set():
curV = myInstrument.voltp
cur_chart_data_1.x.append(curV)
cur_chart_data_1.raw_y.append(curV)
myInstrument.voltp = random.uniform(0, 5) # Simulating data
time.sleep(1)
except Exception as e:
logger.error(f"Error in task: {e}")
finally:
exit_flag.set()
newThreadProcessor.join()
def init_mytask_1() -> None:
"""Registers My Task 1."""
newTask = Task(
name="My Task 1",
description="Performs a measurement using RaspberrySIM.",
instrs_aliases=["raspberry"],
function=mytask_1,
)
tasks_obj = Tasks()
tasks_obj.add_task(newTask)
In the __init__.py
file, add the following line to register the task:
task_obj.add_init_task(init_mytask_1)
Now, the task is available in the system and can be selected from the interface.