Skip to content

Adding a new task

Dardo edited this page Apr 2, 2025 · 4 revisions

Adding a New Task

To add a new task to the system, you need to define a function that implements the measurement logic and register it in the list of available tasks.

Task Structure

A task follows this structure:

  1. Defining the Main Function

    • The function takes two parameters:
      • data: a list of ChartData objects used to store measurement results.
      • exit_flag: an Event object to safely terminate the task.
    • A Connections instance is initialized to access the required instruments.
    • If an instrument is unavailable, the task exits with an error.
    • A thread is spawned to process data in the background.
    • The while loop runs measurements until exit_flag is set.
  2. Registering the Task

    • A function init_<task_name>() is defined to create and register a Task object.
    • This function is added to the task list in __init__.py using task_obj.add_init_task(init_<task_name>).

Task Example

from tasks import Task, Tasks, ChartData
import random
from typing import Optional, List
from instruments import Instrument_Entry
from connections import Connections
from threading import Thread, Event
import time
import logging
from ..utilities import spawn_data_processor, generic_processor
from ...instruments import RaspberrySIM


def example_math_formula(x: float) -> float:
    """Computes the square root of a given value."""
    return x ** 0.5


def mytask_1(data: List[ChartData], exit_flag: Event) -> None:
    """Performs a measurement using RaspberrySIM."""
    cur_chart_data_1 = ChartData(
        name="Square root of values", math_formula_y=example_math_formula
    )
    data.append(cur_chart_data_1)

    conn_object = Connections()
    logger = logging.getLogger(__name__)
    instr_entry: Optional[Instrument_Entry] = conn_object.get_instrument("raspberry")
    
    if instr_entry is None:
        logger.error("Instrument not found.")
        exit_flag.set()
        return

    myInstrument: RaspberrySIM = instr_entry.scpi_instrument
    newThreadProcessor = spawn_data_processor(data, exit_flag, generic_processor)

    try:
        while not exit_flag.is_set():
            curV = myInstrument.voltp
            cur_chart_data_1.x.append(curV)
            cur_chart_data_1.raw_y.append(curV)
            myInstrument.voltp = random.uniform(0, 5)  # Simulating data
            time.sleep(1)
    except Exception as e:
        logger.error(f"Error in task: {e}")
    finally:
        exit_flag.set()
        newThreadProcessor.join()


def init_mytask_1() -> None:
    """Registers My Task 1."""
    newTask = Task(
        name="My Task 1",
        description="Performs a measurement using RaspberrySIM.",
        instrs_aliases=["raspberry"],
        function=mytask_1,
    )
    tasks_obj = Tasks()
    tasks_obj.add_task(newTask)

Registering the Task

In the __init__.py file, add the following line to register the task:

task_obj.add_init_task(init_mytask_1)

Now, the task is available in the system and can be selected from the interface.

Clone this wiki locally