Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,39 @@ repos:
rev: v1.16.0 # Last checked 2025/06
hooks:
- id: mypy
language: system
exclude: >-
^(assembler_tools/hec-assembler-tools/assembler/common/run_config\.py|
*assembler_tools/hec-assembler-tools/assembler/instructions/|
*assembler_tools/hec-assembler-tools/assembler/memory_model/|
*assembler_tools/hec-assembler-tools/assembler/stages/asm_scheduler\.py|
*assembler_tools/hec-assembler-tools/assembler/stages/scheduler\.py|
*assembler_tools/hec-assembler-tools/debug_tools/main\.py|
*assembler_tools/hec-assembler-tools/debug_tools/xinst_timing_check/|
*assembler_tools/hec-assembler-tools/he_as\.py|
*assembler_tools/hec-assembler-tools/linker/__init__\.py|
*assembler_tools/hec-assembler-tools/linker/instructions/__init__\.py|
*assembler_tools/hec-assembler-tools/assembler/spec_config/isa_spec.py)
args: ["--follow-imports=skip", "--install-types", "--non-interactive"]
- repo: local
hooks:
- id: pylint
name: pylint
entry: pylint
language: system
types: [python]
exclude: >-
^(assembler_tools/hec-assembler-tools/assembler/common/run_config\.py|
*assembler_tools/hec-assembler-tools/assembler/instructions/|
*assembler_tools/hec-assembler-tools/assembler/memory_model/|
*assembler_tools/hec-assembler-tools/assembler/stages/asm_scheduler\.py|
*assembler_tools/hec-assembler-tools/assembler/stages/scheduler\.py|
*assembler_tools/hec-assembler-tools/debug_tools/main\.py|
*assembler_tools/hec-assembler-tools/debug_tools/xinst_timing_check/|
*assembler_tools/hec-assembler-tools/he_as\.py|
*assembler_tools/hec-assembler-tools/linker/__init__\.py|
*assembler_tools/hec-assembler-tools/linker/instructions/__init__\.py|
*assembler_tools/hec-assembler-tools/assembler/spec_config/isa_spec.py)
args:
- -rn # Only display messages
- -sn # Don't display the score
Expand Down
1 change: 1 addition & 0 deletions .typos.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@
[default.extend-words]
# variation of params
parms = "parms"
bload = "bload"
2 changes: 2 additions & 0 deletions assembler_tools/hec-assembler-tools/assembler/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Copyright (C) 2025 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
28 changes: 21 additions & 7 deletions assembler_tools/hec-assembler-tools/assembler/common/counter.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
# Copyright (C) 2025 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

"""
Counter module providing sequential value generation with reset capability.

This module implements a Counter class that allows for the creation and management of
counters that generate sequential values. These counters can be individually reset
or collectively reset through the class interface, making them useful for various
numbering and sequence generation tasks in the assembler.
"""

import itertools
from typing import Set, Optional


class Counter:
"""
Expand All @@ -18,7 +31,8 @@ class CounterIter:
This iterator starts at a specified value and increments by a specified step.
It can be reset to start over from its initial start value.
"""
def __init__(self, start = 0, step = 1):

def __init__(self, start=0, step=1):
"""
Initializes a new CounterIter object.

Expand All @@ -28,7 +42,7 @@ def __init__(self, start = 0, step = 1):
"""
self.__start = start
self.__step = step
self.__counter = None # itertools.counter
self.__counter = None # itertools.counter
self.reset()

def __next__(self):
Expand Down Expand Up @@ -66,10 +80,10 @@ def reset(self):
"""
self.__counter = itertools.count(self.start, self.step)

__counters = set()
__counters: Set["Counter.CounterIter"] = set()

@classmethod
def count(cls, start = 0, step = 1) -> CounterIter:
def count(cls, start=0, step=1) -> "Counter.CounterIter":
"""
Creates a new counter iterator that returns evenly spaced values.

Expand All @@ -85,17 +99,17 @@ def count(cls, start = 0, step = 1) -> CounterIter:
return retval

@classmethod
def reset(cls, counter: CounterIter = None):
def reset(cls, counter: Optional["Counter.CounterIter"] = None):
"""
Reset the specified counter, or all counters if none is specified.

This method resets the specified counter, or all counters, to start
over from their respective `start` values.

Args:
counter (CounterIter, optional): The counter to reset.
counter (Optional[CounterIter], optional): The counter to reset.
If None, all counters are reset.
"""
counters_to_reset = cls.__counters if counter is None else { counter }
counters_to_reset = cls.__counters if counter is None else {counter}
for c in counters_to_reset:
c.reset()
Original file line number Diff line number Diff line change
@@ -1,8 +1,21 @@
import heapq
# Copyright (C) 2025 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

"""
Priority queue implementation with support for task prioritization, ordering, and updating.

This module provides a priority queue that allows tasks to be added with priorities,
and supports operations to update, remove, and retrieve tasks based on their priorities.
"""

import heapq
import bisect
import itertools
from typing import List, Dict, Optional, Tuple, Any


class PriorityQueue:
# pylint: disable=invalid-name
"""
A priority queue implementation that supports task prioritization and ordering.

Expand All @@ -17,6 +30,7 @@ class __PriorityQueueIter:
This iterator allows for iterating over the tasks in the priority queue
while ensuring that the queue's size does not change during iteration.
"""

def __init__(self, pq, removed):
"""
Initializes the iterator with the priority queue and removed marker.
Expand Down Expand Up @@ -45,29 +59,35 @@ def __next__(self):
raise RuntimeError("PriorityQueue changed size during iteration.")

# Skip all removed tasks
while self.__current < len(self.__pq) \
and self.__pq[self.__current][-1] is self.__removed:
while (
self.__current < len(self.__pq)
and self.__pq[self.__current][-1] is self.__removed
):
self.__current += 1
if self.__current >= len(self.__pq):
raise StopIteration
priority, _, task = self.__pq[self.__current]
self.__current += 1 # point to nex element
self.__current += 1 # point to nex element
return (priority, task)


class __PriorityTracker:
"""
A helper class to track tasks by their priority.

This class maintains a mapping of priorities to tasks and supports
operations to add, find, and remove tasks based on their priority.
"""

def __init__(self):
"""
Initializes the priority tracker with empty mappings.
"""
self.__priority_dict = {} # dict(int, SortedList(task)): maps priority to unordered set of tasks with same priority
self.__priority_dict_set = {} # dict(int, set(task)): maps priority to unordered set of tasks with same priority
self.__priority_dict = (
{}
) # dict(int, SortedList(task)): maps priority to unordered set of tasks with same priority
self.__priority_dict_set = (
{}
) # dict(int, set(task)): maps priority to unordered set of tasks with same priority

def find(self, priority: int) -> object:
"""
Expand All @@ -79,7 +99,11 @@ def find(self, priority: int) -> object:
Returns:
object: A task with the specified priority, or None if not found.
"""
return next(iter(self.__priority_dict[priority]))[1] if priority in self.__priority_dict else None
return (
next(iter(self.__priority_dict[priority]))[1]
if priority in self.__priority_dict
else None
)

def push(self, priority: int, tie_breaker: tuple, task: object):
"""
Expand All @@ -94,7 +118,7 @@ def push(self, priority: int, tie_breaker: tuple, task: object):
ValueError: If the task is None.
"""
if task is None:
raise ValueError('`task` cannot be `None`.')
raise ValueError("`task` cannot be `None`.")

if priority not in self.__priority_dict:
self.__priority_dict[priority] = []
Expand All @@ -104,7 +128,7 @@ def push(self, priority: int, tie_breaker: tuple, task: object):
bisect.insort_right(self.__priority_dict[priority], (tie_breaker, task))
self.__priority_dict_set[priority].add(task)

def pop(self, priority: int, task = None) -> object:
def pop(self, priority: int, task=None) -> object:
"""
Removes a task with the specified priority.

Expand All @@ -126,12 +150,20 @@ def pop(self, priority: int, task = None) -> object:
assert priority in self.__priority_dict_set
if task:
# Find index for task
idx = next((i for i, (_, contained_task) in enumerate(self.__priority_dict[priority]) if contained_task == task),
len(self.__priority_dict[priority]))
idx = next(
(
i
for i, (_, contained_task) in enumerate(
self.__priority_dict[priority]
)
if contained_task == task
),
len(self.__priority_dict[priority]),
)
if idx >= len(self.__priority_dict[priority]):
raise ValueError('`task` not found in priority.')
raise ValueError("`task` not found in priority.")
_, retval = self.__priority_dict[priority].pop(idx)
assert(retval == task)
assert retval == task
else:
# Remove first task
_, retval = self.__priority_dict[priority].pop(0)
Expand All @@ -144,38 +176,44 @@ def pop(self, priority: int, task = None) -> object:
self.__priority_dict_set.pop(priority)
return retval

__REMOVED = object() # Placeholder for a removed task
__REMOVED = object() # Placeholder for a removed task

def __init__(self, queue: list = None):
def __init__(self, queue: Optional[List[Tuple[int, Any]]] = None):
"""
Creates a new PriorityQueue object.

Args:
queue (list, optional): A list of (priority, task) tuples to initialize the queue.
queue (Optional[List[Tuple[int, Any]]], optional): A list of (priority, task) tuples to initialize the queue.
This is an O(len(queue)) operation.

Raises:
ValueError: If any task in the queue is None.
"""
# entry: [priority: int, nonce: int, task: hashable_object]
self.__pq = [] # list(entry) - List of entries arranged in a heap
self.__entry_finder = {} # dictionary(task: Hashable_object, entry) - mapping of tasks to entries
self.__priority_tracker = PriorityQueue.__PriorityTracker() # Tracks tasks by priority
self.__counter: int = itertools.count(1) # Unique sequence count
self.__pq: List[List[Any]] = (
[]
) # list(entry) - List of entries arranged in a heap
self.__entry_finder: Dict[Any, List[Any]] = (
{}
) # dictionary(task: Hashable_object, entry) - mapping of tasks to entries
self.__priority_tracker = (
PriorityQueue.__PriorityTracker()
) # Tracks tasks by priority
self.__counter = itertools.count(1) # Unique sequence count

if queue:
for priority, task in queue:
if task is None:
raise ValueError('`queue`: tasks cannot be `None`.')
raise ValueError("`queue`: tasks cannot be `None`.")
count = next(self.__counter)
entry = [priority, ((0, ), count), task]
entry = [priority, ((0,), count), task]
self.__entry_finder[task] = entry
self.__priority_tracker.push(*entry)#priority, task)
self.__pq.append()
self.__priority_tracker.push(priority, ((0,), count), task)
heapq.heappush(self.__pq, entry)
heapq.heapify(self.__pq)

def __bool__(self):
"""
"""
Returns True if the priority queue is not empty, False otherwise.

Returns:
Expand Down Expand Up @@ -220,37 +258,34 @@ def __repr__(self):
Returns:
str: A string representation of the queue.
"""
return '<{} object at {}>(len={}, pq={})'.format(type(self).__name__,
hex(id(self)),
len(self),
self.__pq)
return f"<{type(self).__name__} object at {hex(id(self))}>(len={len(self)}, pq={self.__pq})"

def push(self, priority: int, task: object, tie_breaker: tuple = None): #ahead: bool = None):
def push(
self, priority: int, task: object, tie_breaker: Optional[Tuple[int, ...]] = None
):
"""
Adds a new task or update the priority of an existing task.

Args:
priority (int): The priority of the task.
task (object): The task to add or update.
tie_breaker (tuple, optional): A tuple of ints to use as a tie breaker for tasks
tie_breaker (Optional[Tuple[int, ...]], optional): A tuple of ints to use as a tie breaker for tasks
of the same priority. Defaults to (0,) if None.

Raises:
ValueError: If the task is None.
TypeError: If the tie_breaker is not a tuple of ints or None.
"""
if task is None:
raise ValueError('`task` cannot be `None`.')
if tie_breaker is not None \
and not all(isinstance(x, int) for x in tie_breaker):
raise TypeError('`tie_breaker` expected tuple of `int`s, or `None`.')
raise ValueError("`task` cannot be `None`.")
if tie_breaker is not None and not all(isinstance(x, int) for x in tie_breaker):
raise TypeError("`tie_breaker` expected tuple of `int`s, or `None`.")
b_add_needed = True
if task in self.__entry_finder:
old_priority, (old_tie_breaker, _), _ = self.__entry_finder[task]
if tie_breaker is None:
tie_breaker = old_tie_breaker
if old_priority != priority \
or tie_breaker != old_tie_breaker:
if old_priority != priority or tie_breaker != old_tie_breaker:
self.remove(task)
else:
# same task without priority change detected: no need to add
Expand All @@ -261,11 +296,13 @@ def push(self, priority: int, task: object, tie_breaker: tuple = None): #ahead:

if b_add_needed:
if len(self.__pq) == 0:
self.__counter: int = itertools.count(1) # restart sequence count when queue is empty
self.__counter = itertools.count(
1
) # restart sequence count when queue is empty
count = next(self.__counter)
entry = [priority, (tie_breaker, count), task]
self.__entry_finder[task] = entry
self.__priority_tracker.push(*entry)#priority, task)
self.__priority_tracker.push(priority, (tie_breaker, count), task)
heapq.heappush(self.__pq, entry)

def remove(self, task: object):
Expand All @@ -281,15 +318,17 @@ def remove(self, task: object):
# mark an existing task as PriorityQueue.__REMOVED.
entry = self.__entry_finder.pop(task)
priority, *_ = entry
self.__priority_tracker.pop(priority, task) # remove it from the priority tracker
self.__priority_tracker.pop(
priority, task
) # remove it from the priority tracker
entry[-1] = PriorityQueue.__REMOVED

def peek(self) -> tuple:
def peek(self) -> Optional[Tuple[int, Any]]:
"""
Returns the task with the lowest priority without removing it from the queue.

Returns:
tuple: The (priority, task) pair of the task with the lowest priority,
Optional[Tuple[int, Any]]: The (priority, task) pair of the task with the lowest priority,
or None if the queue is empty.
"""
# make sure head is not a removed task
Expand Down Expand Up @@ -326,7 +365,7 @@ def pop(self) -> tuple:
IndexError: If the queue is empty.
"""
task = PriorityQueue.__REMOVED
while task is PriorityQueue.__REMOVED: # make sure head is not a removed task
while task is PriorityQueue.__REMOVED: # make sure head is not a removed task
priority, _, task = heapq.heappop(self.__pq)
self.__entry_finder.pop(task)
self.__priority_tracker.pop(priority, task)
Expand Down
Loading