diff --git a/capa/capabilities/static.py b/capa/capabilities/static.py index d485aa48c7..5ac5b5d7f5 100644 --- a/capa/capabilities/static.py +++ b/capa/capabilities/static.py @@ -17,14 +17,19 @@ import logging import itertools import collections +from typing import Optional from dataclasses import dataclass +import intervaltree + import capa.perf import capa.helpers import capa.features.freeze as frz import capa.render.result_document as rdoc from capa.rules import Scope, RuleSet from capa.engine import FeatureSet, MatchResults +from capa.features.common import Result +from capa.features.address import Address, SuperblockAddress from capa.capabilities.common import Capabilities, find_file_capabilities from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, StaticFeatureExtractor @@ -110,11 +115,159 @@ def find_basic_block_capabilities( @dataclass class CodeCapabilities: function_matches: MatchResults + superblock_matches: MatchResults basic_block_matches: MatchResults instruction_matches: MatchResults feature_count: int +@dataclass +class FlowGraphNode: + # This dataclass will be used in the construction of the function's basic block flow graph. + # Some analysis backends provide native support for flow graphs, but we construct it here regardless + # to decrease the amount of code required on the analysis backend's side (feature extractors). + bva: Address + left: Optional[Address] + right: Optional[Address] + + +class SuperblockMatcher: + def __init__(self, ruleset: RuleSet, extractor: StaticFeatureExtractor): + self.ruleset: RuleSet = ruleset + self.extractor: StaticFeatureExtractor = extractor + self.features: FeatureSet = collections.defaultdict(set) + self.matches: MatchResults = collections.defaultdict(list) + self.flow_graph: dict[Address, FlowGraphNode] = {} + self.addr_to_bb = intervaltree.IntervalTree() + + def add_basic_block(self, bb: BBHandle, features: FeatureSet): + """ + Register a basic block and its features inot the superblock matcher. + + The basic block is added to the flowgraph tree maintained by the matcher object, + and its features are added to the global feature set maintained by the matcher object. + + Capabilities will later be extracted from all the registered features (as if they were extracted at the same level), + and will be pruned after that while keeping only capabilities matched on superblocks (i.e., relevant basic blocks + in series with no interruptions between) + """ + # Get the basic blocks that follow the current one. + branches = list(self.extractor.get_next_basic_blocks(bb)) + # Get the current bb's size + bb_size = self.extractor.get_basic_block_size(bb) + + # Add the basic block to the flow graph. + self.flow_graph[bb.address] = FlowGraphNode( + bva=bb.address, + left=branches[0] if branches and branches[0] != bb.address else None, + right=branches[1] if len(branches) > 1 and branches[1] != bb.address else None, + ) + + # Register bb's address space in the interval tree. + # This will later be used to determine the bb that a feature was extracted from. + if bb_size != 0: + self.addr_to_bb[int(bb.address) : int(bb.address) + bb_size] = bb.address + + # Add features extracted from this bb into the matcher's overall collection of features. + for feature, va in features.items(): + self.features[feature].update(va) + + def _prune(self): + # go through each rule in self.matches, and each match in self.matches[rule_name], + # and then check if the self.matches[rule_name].locations or locations in each child of self.matches[rule_name].children + # have basic block gaps in them. If so, remove that specific match from self.matches[rule_name]. + # if self.matches[rule_name] then becomes empty, remove it from self.matches. + def form_superblock_from_bbs(bb_locations: set[Address]) -> list[Address]: + cycle_heads: dict[Address, list] = collections.defaultdict(list) + + # If one of the basic blocks has both a left and a right branch in the list of basic blocks, + # then we cannot form a superblock and we return an empty list + for location in bb_locations: + if self.flow_graph[location].left in bb_locations and self.flow_graph[location].right in bb_locations: + return [] + + # Go through the list of provided basic blocks and form superblocks from it. + # If we find that only one cycle exits, and not multiple disjoint cycles, we return that cycle. + # Otherwise, we return an empty list. + while bb_locations: + # We pick a random basic block and try to form a cycle from it. + # The resulting cycle (of length greater or equal to 1) is storred + head: FlowGraphNode = self.flow_graph[bb_locations.pop()] + node = head + while node: + cycle_heads[head.bva].append(node.bva) + # Check if branch is in the list of basic blocks. If so, add to current cycle. + if node.left in bb_locations: + bb_locations.remove(node.left) + node = self.flow_graph[node.left] + elif node.right in bb_locations: + bb_locations.remove(node.right) + node = self.flow_graph[node.right] + # Check if branch is the start of an encountered cycle. If so, connect the two cycles. + elif node.left in cycle_heads: + cycle_heads[head.bva] += cycle_heads.pop(node.left) + break + elif node.right in cycle_heads: + cycle_heads[head.bva] += cycle_heads.pop(node.right) + break + # The current basic block either branches to a basic block that holds no relevant features, + # or loops back to a basic block in the cycle, or the basic block is at the end of the function. + else: + break + + if len(cycle_heads) == 1 and len(list(cycle_heads.values())[0]) > 1: + # Inputted basic blocks form a single cycle (i.e., superblock) of length > 1. + # Return that cycle (superblock). + return cycle_heads.popitem()[1] + else: + # Inputted basic blocks form either multiple disjoint cycles, or a cycle of length <= 1. + # Return an empty list (i.e., boolean False). + return [] + + def get_bbs_from_locations(locations: set[Address]) -> set[Address]: + bbs_addresses = set() + for location in locations: + # get the bb address from the location + # and add it to the set of bb addresses. + bbs_addresses.add(list(self.addr_to_bb[int(location)])[0].begin) + return bbs_addresses + + def get_locations(result: Result) -> set[Address]: + # get all locations of found features in the result. + if not result.success: + # Not statements are an edge case, but the locations of their children is not set anyways. + # Logically this is still valid because "not" is usually used to make sure features do not exist. + return set() + if result.children: + # Statements are usually what returns children, and they usually do not have locations. + locations: set[Address] = set() + for child in result.children: + locations.update(get_locations(child)) + return locations + if result.locations: + # We are dealing with a feature. Convert locations from frozenset to set then return it. + return set(result.locations) + return set() + + pruned_matches: MatchResults = collections.defaultdict(list) + for rule_name, matches in self.matches.items(): + for _, result in matches: + locations = get_locations(result) + features_bbs = get_bbs_from_locations(locations) + superblock = form_superblock_from_bbs(features_bbs) + if superblock: + # The match spans multiple basic blocks that form a superblock. Therefore, we keep it. + pruned_matches[rule_name].append((SuperblockAddress(superblock), result)) + + # update the list of valid matches. + self.matches = pruned_matches + + def match(self, f_address: Address): + # match superblock rules against the constructed flow graph. + _, self.matches = self.ruleset.match(Scope.SUPERBLOCK, self.features, f_address) + self._prune() + + def find_code_capabilities(ruleset: RuleSet, extractor: StaticFeatureExtractor, fh: FunctionHandle) -> CodeCapabilities: """ find matches for the given rules within the given function. @@ -123,6 +276,9 @@ def find_code_capabilities(ruleset: RuleSet, extractor: StaticFeatureExtractor, # includes features found within basic blocks (and instructions). function_features: FeatureSet = collections.defaultdict(set) + # matches found at the constituent superblocks of this function. + superblock_matches: MatchResults = collections.defaultdict(list) + # matches found at the basic block scope. # might be found at different basic blocks, that's ok. bb_matches: MatchResults = collections.defaultdict(list) @@ -131,6 +287,8 @@ def find_code_capabilities(ruleset: RuleSet, extractor: StaticFeatureExtractor, # might be found at different instructions, that's ok. insn_matches: MatchResults = collections.defaultdict(list) + superblock_matcher = SuperblockMatcher(ruleset, extractor) + for bb in extractor.get_basic_blocks(fh): basic_block_capabilities = find_basic_block_capabilities(ruleset, extractor, fh, bb) for feature, vas in basic_block_capabilities.features.items(): @@ -142,17 +300,29 @@ def find_code_capabilities(ruleset: RuleSet, extractor: StaticFeatureExtractor, for rule_name, res in basic_block_capabilities.instruction_matches.items(): insn_matches[rule_name].extend(res) + # add basic block and its features and capabilities to the superblock matcher. + superblock_matcher.add_basic_block(bb, basic_block_capabilities.features) + + # match capabilities at the superblock scope once all basic blocks have been added. + superblock_matcher.match(fh.address) + for rule_name, res in superblock_matcher.matches.items(): + superblock_matches[rule_name].extend(res) + rule = ruleset[rule_name] + for va, _ in res: + capa.engine.index_rule_matches(function_features, rule, [va]) + for feature, va in itertools.chain(extractor.extract_function_features(fh), extractor.extract_global_features()): function_features[feature].add(va) _, function_matches = ruleset.match(Scope.FUNCTION, function_features, fh.address) - return CodeCapabilities(function_matches, bb_matches, insn_matches, len(function_features)) + return CodeCapabilities(function_matches, superblock_matches, bb_matches, insn_matches, len(function_features)) def find_static_capabilities( ruleset: RuleSet, extractor: StaticFeatureExtractor, disable_progress=None ) -> Capabilities: all_function_matches: MatchResults = collections.defaultdict(list) + all_superblock_matches: MatchResults = collections.defaultdict(list) all_bb_matches: MatchResults = collections.defaultdict(list) all_insn_matches: MatchResults = collections.defaultdict(list) @@ -196,6 +366,7 @@ def find_static_capabilities( match_count = 0 for name, matches_ in itertools.chain( code_capabilities.function_matches.items(), + code_capabilities.superblock_matches.items(), code_capabilities.basic_block_matches.items(), code_capabilities.instruction_matches.items(), ): @@ -212,6 +383,8 @@ def find_static_capabilities( for rule_name, res in code_capabilities.function_matches.items(): all_function_matches[rule_name].extend(res) + for rule_name, res in code_capabilities.superblock_matches.items(): + all_superblock_matches[rule_name].extend(res) for rule_name, res in code_capabilities.basic_block_matches.items(): all_bb_matches[rule_name].extend(res) for rule_name, res in code_capabilities.instruction_matches.items(): @@ -223,7 +396,7 @@ def find_static_capabilities( # mapping from feature (matched rule) to set of addresses at which it matched. function_and_lower_features: FeatureSet = collections.defaultdict(set) for rule_name, results in itertools.chain( - all_function_matches.items(), all_bb_matches.items(), all_insn_matches.items() + all_function_matches.items(), all_superblock_matches.items(), all_bb_matches.items(), all_insn_matches.items() ): locations = {p[0] for p in results} rule = ruleset[rule_name] @@ -239,6 +412,7 @@ def find_static_capabilities( # and we can merge the dictionaries naively. all_insn_matches.items(), all_bb_matches.items(), + all_superblock_matches.items(), all_function_matches.items(), all_file_capabilities.matches.items(), ) diff --git a/capa/features/address.py b/capa/features/address.py index eb708a3dcd..f8a67f7071 100644 --- a/capa/features/address.py +++ b/capa/features/address.py @@ -49,6 +49,39 @@ def __hash__(self): return int.__hash__(self) +class SuperblockAddress(Address): + """an address of a superblock in a dynamic execution trace""" + + def __init__(self, addresses: list[Address]): + for address in addresses: + assert isinstance(address, AbsoluteVirtualAddress) + assert address >= 0 + self.addresses: list[Address] = addresses + + def __repr__(self): + return "superblock(" + " -> ".join([f"0x{address:x}" for address in self.addresses]) + ")" + + def __hash__(self): + return hash(tuple(self.addresses)) + + def __eq__(self, other): + assert isinstance(other, SuperblockAddress) + return self.addresses == other.addresses + + def __lt__(self, other): + assert isinstance(other, SuperblockAddress) + if not other.addresses or not self.addresses: + return False + + if self.addresses[0] != other.addresses[0]: + return self.addresses[0] < other.addresses[0] + else: + return len(self.addresses) < len(other.addresses) + + def __contains__(self, address: Address): + return address in self.addresses + + class ProcessAddress(Address): """an address of a process in a dynamic execution trace""" diff --git a/capa/features/extractors/base_extractor.py b/capa/features/extractors/base_extractor.py index 1be52d06b0..5a189f53c7 100644 --- a/capa/features/extractors/base_extractor.py +++ b/capa/features/extractors/base_extractor.py @@ -265,6 +265,20 @@ def extract_basic_block_features(self, f: FunctionHandle, bb: BBHandle) -> Itera """ raise NotImplementedError() + @abc.abstractmethod + def get_next_basic_blocks(self, bb: BBHandle) -> Iterator[Address]: + """ + for a given basic block, retrieve the basic blocks that follow it (if any). + """ + raise NotImplementedError() + + @abc.abstractmethod + def get_basic_block_size(self, bb: BBHandle) -> int: + """ + get the size of the given basic block. + """ + raise NotImplementedError() + @abc.abstractmethod def get_instructions(self, f: FunctionHandle, bb: BBHandle) -> Iterator[InsnHandle]: """ diff --git a/capa/features/extractors/binexport2/extractor.py b/capa/features/extractors/binexport2/extractor.py index 49b8fad70e..0e8c38bc4b 100644 --- a/capa/features/extractors/binexport2/extractor.py +++ b/capa/features/extractors/binexport2/extractor.py @@ -119,6 +119,14 @@ def get_basic_blocks(self, fh: FunctionHandle) -> Iterator[BBHandle]: inner=BasicBlockContext(basic_block_index), ) + def get_next_basic_blocks(self, bb: BBHandle): + # not implemented yet + return [] + + def get_basic_block_size(self, bb: BBHandle): + # not implemented yet + return 0 + def extract_basic_block_features(self, fh: FunctionHandle, bbh: BBHandle) -> Iterator[tuple[Feature, Address]]: yield from capa.features.extractors.binexport2.basicblock.extract_features(fh, bbh) diff --git a/capa/features/extractors/binja/extractor.py b/capa/features/extractors/binja/extractor.py index 100526f119..0758420042 100644 --- a/capa/features/extractors/binja/extractor.py +++ b/capa/features/extractors/binja/extractor.py @@ -63,6 +63,13 @@ def get_basic_blocks(self, fh: FunctionHandle) -> Iterator[BBHandle]: for bb in f.basic_blocks: yield BBHandle(address=AbsoluteVirtualAddress(bb.start), inner=bb) + def get_next_basic_blocks(self, bb) -> Iterator[AbsoluteVirtualAddress]: + for edge in bb.outgoing_edges: + yield AbsoluteVirtualAddress(edge.target.start) + + def get_basic_block_size(self, bb: BBHandle) -> int: + return bb.inner.length + def extract_basic_block_features(self, fh: FunctionHandle, bbh: BBHandle) -> Iterator[tuple[Feature, Address]]: yield from capa.features.extractors.binja.basicblock.extract_features(fh, bbh) diff --git a/capa/features/extractors/dnfile/extractor.py b/capa/features/extractors/dnfile/extractor.py index 4b6694f57d..b057408518 100644 --- a/capa/features/extractors/dnfile/extractor.py +++ b/capa/features/extractors/dnfile/extractor.py @@ -153,6 +153,12 @@ def get_basic_blocks(self, f) -> Iterator[BBHandle]: inner=f.inner, ) + def get_next_basic_blocks(self, bb): + yield from [] + + def get_basic_block_size(self, bb: BBHandle) -> int: + return bb.inner.code_size + def extract_basic_block_features(self, fh, bbh): # we don't support basic block features yield from [] diff --git a/capa/features/extractors/dotnetfile.py b/capa/features/extractors/dotnetfile.py index dcba2c2f2d..11d79c2ccc 100644 --- a/capa/features/extractors/dotnetfile.py +++ b/capa/features/extractors/dotnetfile.py @@ -239,6 +239,12 @@ def extract_function_features(self, f): def get_basic_blocks(self, f): raise NotImplementedError("DotnetFileFeatureExtractor can only be used to extract file features") + def get_next_basic_blocks(self, bb): + raise NotImplementedError("DotnetFileFeatureExtractor can only be used to extract file features") + + def get_basic_block_size(self, bb): + raise NotImplementedError("DotnetFileFeatureExtractor can only be used to extract file features") + def extract_basic_block_features(self, f, bb): raise NotImplementedError("DotnetFileFeatureExtractor can only be used to extract file features") diff --git a/capa/features/extractors/elffile.py b/capa/features/extractors/elffile.py index 3f4eea7522..a1fa230bf8 100644 --- a/capa/features/extractors/elffile.py +++ b/capa/features/extractors/elffile.py @@ -240,6 +240,12 @@ def extract_function_features(self, f): def get_basic_blocks(self, f): raise NotImplementedError("ElfFeatureExtractor can only be used to extract file features") + def get_next_basic_blocks(self, bb): + raise NotImplementedError("ElfFeatureExtractor can only be used to extract file features") + + def get_basic_block_size(self, bb): + raise NotImplementedError("ElfFeatureExtractor can only be used to extract file features") + def extract_basic_block_features(self, f, bb): raise NotImplementedError("ElfFeatureExtractor can only be used to extract file features") diff --git a/capa/features/extractors/ghidra/extractor.py b/capa/features/extractors/ghidra/extractor.py index d5fc3230df..5dc857507c 100644 --- a/capa/features/extractors/ghidra/extractor.py +++ b/capa/features/extractors/ghidra/extractor.py @@ -88,6 +88,14 @@ def get_basic_blocks(self, fh: FunctionHandle) -> Iterator[BBHandle]: yield from ghidra_helpers.get_function_blocks(fh) + def get_next_basic_blocks(self, bb: BBHandle): + # not implemented yet + return [] + + def get_basic_block_size(self, bb: BBHandle): + # not implemented yet + return 0 + def extract_basic_block_features(self, fh: FunctionHandle, bbh: BBHandle) -> Iterator[tuple[Feature, Address]]: yield from capa.features.extractors.ghidra.basicblock.extract_features(fh, bbh) diff --git a/capa/features/extractors/ida/extractor.py b/capa/features/extractors/ida/extractor.py index b139f2f38f..b58c2970b0 100644 --- a/capa/features/extractors/ida/extractor.py +++ b/capa/features/extractors/ida/extractor.py @@ -77,6 +77,14 @@ def get_basic_blocks(self, fh: FunctionHandle) -> Iterator[BBHandle]: for bb in ida_helpers.get_function_blocks(fh.inner): yield BBHandle(address=AbsoluteVirtualAddress(bb.start_ea), inner=bb) + def get_next_basic_blocks(self, bb: BBHandle): + # not implemented yet + return [] + + def get_basic_block_size(self, bb: BBHandle): + # not implemented yet + return 0 + def extract_basic_block_features(self, fh: FunctionHandle, bbh: BBHandle) -> Iterator[tuple[Feature, Address]]: yield from capa.features.extractors.ida.basicblock.extract_features(fh, bbh) diff --git a/capa/features/extractors/null.py b/capa/features/extractors/null.py index 8d32e63dea..8b9af9d3b6 100644 --- a/capa/features/extractors/null.py +++ b/capa/features/extractors/null.py @@ -16,7 +16,13 @@ from dataclasses import dataclass from capa.features.common import Feature -from capa.features.address import NO_ADDRESS, Address, ThreadAddress, ProcessAddress, DynamicCallAddress +from capa.features.address import ( + NO_ADDRESS, + Address, + ThreadAddress, + ProcessAddress, + DynamicCallAddress, +) from capa.features.extractors.base_extractor import ( BBHandle, CallHandle, @@ -87,6 +93,14 @@ def get_basic_blocks(self, f): for address in sorted(self.functions[f.address].basic_blocks.keys()): yield BBHandle(address, None) + def get_next_basic_blocks(self, bb): + # not implemented yet + return [] + + def get_basic_block_size(self, bb): + # not implemented yet + return 0 + def extract_basic_block_features(self, f, bb): for address, feature in self.functions[f.address].basic_blocks[bb.address].features: yield feature, address diff --git a/capa/features/extractors/pefile.py b/capa/features/extractors/pefile.py index 8b76e1d8ab..3787cf67e8 100644 --- a/capa/features/extractors/pefile.py +++ b/capa/features/extractors/pefile.py @@ -226,6 +226,12 @@ def get_basic_blocks(self, f): def extract_basic_block_features(self, f, bb): raise NotImplementedError("PefileFeatureExtract can only be used to extract file features") + def get_next_basic_blocks(self, bb): + raise NotImplementedError("PefileFeatureExtract can only be used to extract file features") + + def get_basic_block_size(self, bb): + raise NotImplementedError("PefileFeatureExtract can only be used to extract file features") + def get_instructions(self, f, bb): raise NotImplementedError("PefileFeatureExtract can only be used to extract file features") diff --git a/capa/features/extractors/viv/extractor.py b/capa/features/extractors/viv/extractor.py index 99d60e4a80..8063b7376f 100644 --- a/capa/features/extractors/viv/extractor.py +++ b/capa/features/extractors/viv/extractor.py @@ -16,6 +16,7 @@ from typing import Any, Iterator from pathlib import Path +import envi import viv_utils import viv_utils.flirt @@ -76,6 +77,14 @@ def get_basic_blocks(self, fh: FunctionHandle) -> Iterator[BBHandle]: for bb in f.basic_blocks: yield BBHandle(address=AbsoluteVirtualAddress(bb.va), inner=bb) + def get_next_basic_blocks(self, bb: BBHandle) -> Iterator[AbsoluteVirtualAddress]: + for bva, bflags in bb.inner.instructions[-1].getBranches(): + if bflags & envi.BR_COND: + yield AbsoluteVirtualAddress(bva) + + def get_basic_block_size(self, bb: BBHandle) -> int: + return bb.inner.size + def extract_basic_block_features(self, fh: FunctionHandle, bbh) -> Iterator[tuple[Feature, Address]]: yield from capa.features.extractors.viv.basicblock.extract_features(fh, bbh) diff --git a/capa/features/freeze/__init__.py b/capa/features/freeze/__init__.py index 2e12d2ffd7..cdaac20fa1 100644 --- a/capa/features/freeze/__init__.py +++ b/capa/features/freeze/__init__.py @@ -53,6 +53,7 @@ class HashableModel(BaseModel): class AddressType(str, Enum): ABSOLUTE = "absolute" RELATIVE = "relative" + SUPERBLOCK = "superblock" FILE = "file" DN_TOKEN = "dn token" DN_TOKEN_OFFSET = "dn token offset" @@ -67,7 +68,7 @@ class Address(HashableModel): value: Union[ # for absolute, relative, file int, - # for DNToken, Process, Thread, Call + # for DNToken, Process, Thread, Call, Superblock tuple[int, ...], # for NO_ADDRESS, None, @@ -81,6 +82,9 @@ def from_capa(cls, a: capa.features.address.Address) -> "Address": elif isinstance(a, capa.features.address.RelativeVirtualAddress): return cls(type=AddressType.RELATIVE, value=int(a)) + elif isinstance(a, capa.features.address.SuperblockAddress): + return cls(type=AddressType.SUPERBLOCK, value=(*a.addresses,)) + elif isinstance(a, capa.features.address.FileOffsetAddress): return cls(type=AddressType.FILE, value=int(a)) @@ -120,6 +124,10 @@ def to_capa(self) -> capa.features.address.Address: assert isinstance(self.value, int) return capa.features.address.RelativeVirtualAddress(self.value) + elif self.type is AddressType.SUPERBLOCK: + assert isinstance(self.value, tuple) + return capa.features.address.SuperblockAddress(list(self.value)) + elif self.type is AddressType.FILE: assert isinstance(self.value, int) return capa.features.address.FileOffsetAddress(self.value) diff --git a/capa/loader.py b/capa/loader.py index ec0295ac8f..4fc8aa32de 100644 --- a/capa/loader.py +++ b/capa/loader.py @@ -643,9 +643,13 @@ def compute_static_layout(rules: RuleSet, extractor: StaticFeatureExtractor, cap functions_by_bb[bb.address] = f.address bbs_by_function[f.address].append(bb.address) + matched_sbs = set() matched_bbs = set() for rule_name, matches in capabilities.items(): rule = rules[rule_name] + if capa.rules.Scope.SUPERBLOCK in rule.scopes: + for addr, _ in matches: + matched_sbs.add(addr) if capa.rules.Scope.BASIC_BLOCK in rule.scopes: for addr, _ in matches: assert addr in functions_by_bb @@ -655,10 +659,20 @@ def compute_static_layout(rules: RuleSet, extractor: StaticFeatureExtractor, cap functions=tuple( rdoc.FunctionLayout( address=frz.Address.from_capa(f), + matched_superblocks=tuple( + rdoc.SuperblockLayout( + address=frz.Address.from_capa(sb), + matched_basic_blocks=tuple( + rdoc.BasicBlockLayout(address=frz.Address.from_capa(bb)) for bb in bbs if bb in sb + ), + ) + for sb in matched_sbs + if sb.addresses[0] in bbs # this object is open to extension in the future, + # such as with the function name, etc. + ), matched_basic_blocks=tuple( rdoc.BasicBlockLayout(address=frz.Address.from_capa(bb)) for bb in bbs if bb in matched_bbs - ), # this object is open to extension in the future, - # such as with the function name, etc. + ), ) for f, bbs in bbs_by_function.items() if len([bb for bb in bbs if bb in matched_bbs]) > 0 diff --git a/capa/render/result_document.py b/capa/render/result_document.py index 3ad71752dc..301511c88f 100644 --- a/capa/render/result_document.py +++ b/capa/render/result_document.py @@ -54,9 +54,15 @@ class BasicBlockLayout(Model): address: frz.Address +class SuperblockLayout(Model): + address: frz.Address + matched_basic_blocks: tuple[BasicBlockLayout, ...] + + class FunctionLayout(Model): address: frz.Address matched_basic_blocks: tuple[BasicBlockLayout, ...] + matched_superblocks: tuple[SuperblockLayout, ...] class CallLayout(Model): diff --git a/capa/render/verbose.py b/capa/render/verbose.py index 11f2442372..e669c92113 100644 --- a/capa/render/verbose.py +++ b/capa/render/verbose.py @@ -51,6 +51,9 @@ def format_address(address: frz.Address) -> str: elif address.type == frz.AddressType.RELATIVE: assert isinstance(address.value, int) return f"base address+{capa.helpers.hex(address.value)}" + elif address.type == frz.AddressType.SUPERBLOCK: + assert isinstance(address.value, tuple) + return "Superblock(" + " -> ".join(["BB:" + capa.helpers.hex(a) for a in address.value]) + ")" elif address.type == frz.AddressType.FILE: assert isinstance(address.value, int) return f"file+{capa.helpers.hex(address.value)}" diff --git a/capa/render/vverbose.py b/capa/render/vverbose.py index e905f8c0f1..f29a7d4a43 100644 --- a/capa/render/vverbose.py +++ b/capa/render/vverbose.py @@ -117,7 +117,9 @@ def render_locations( raise RuntimeError("unreachable") -def render_statement(console: Console, layout: rd.Layout, match: rd.Match, statement: rd.Statement, indent: int): +def render_statement( + console: Console, layout: rd.Layout, match: rd.Match, statement: rd.Statement, indent: int, is_superblock=False +): console.write(" " * indent) if isinstance(statement, rd.SubscopeStatement): @@ -128,6 +130,9 @@ def render_statement(console: Console, layout: rd.Layout, match: rd.Match, state console.write(":") if statement.description: console.write(f" = {statement.description}") + logger.debug("is_superblock: %s", is_superblock) + if statement.scope == capa.rules.Scope.BASIC_BLOCK and is_superblock: + render_locations(console, layout, match.locations, indent) console.writeln() elif isinstance(statement, (rd.CompoundStatement)): @@ -275,9 +280,17 @@ def render_feature( console.writeln() -def render_node(console: Console, layout: rd.Layout, rule: rd.RuleMatches, match: rd.Match, node: rd.Node, indent: int): +def render_node( + console: Console, + layout: rd.Layout, + rule: rd.RuleMatches, + match: rd.Match, + node: rd.Node, + indent: int, + is_superblock=False, +): if isinstance(node, rd.StatementNode): - render_statement(console, layout, match, node.statement, indent=indent) + render_statement(console, layout, match, node.statement, indent=indent, is_superblock=is_superblock) elif isinstance(node, rd.FeatureNode): render_feature(console, layout, rule, match, node.feature, indent=indent) else: @@ -293,7 +306,13 @@ def render_node(console: Console, layout: rd.Layout, rule: rd.RuleMatches, match def render_match( - console: Console, layout: rd.Layout, rule: rd.RuleMatches, match: rd.Match, indent=0, mode=MODE_SUCCESS + console: Console, + layout: rd.Layout, + rule: rd.RuleMatches, + match: rd.Match, + indent=0, + mode=MODE_SUCCESS, + is_superblock=False, ): child_mode = mode if mode == MODE_SUCCESS: @@ -326,10 +345,18 @@ def render_match( else: raise RuntimeError("unexpected mode: " + mode) - render_node(console, layout, rule, match, match.node, indent=indent) + render_node(console, layout, rule, match, match.node, indent=indent, is_superblock=is_superblock) for child in match.children: - render_match(console, layout, rule, child, indent=indent + 1, mode=child_mode) + render_match( + console, + layout, + rule, + child, + indent=indent + 1, + mode=child_mode, + is_superblock=bool(is_superblock or rule.meta.scopes.static == capa.rules.Scope.SUPERBLOCK), + ) def collect_span_of_calls_locations( @@ -477,6 +504,11 @@ def render_rules(console: Console, doc: rd.ResultDocument): # because we do the file-scope evaluation a single time. # but i'm not 100% sure if this is/will always be true. # so, lets be explicit about our assumptions and raise an exception if they fail. + if len(matches) > 1: + # this is expected, if the file scope rule didn't match. + # so, don't raise an exception. + logger.debug("%s: %s", rule, matches[0]) + logger.debug("unexpected file scope match count: %s", matches[1]) raise RuntimeError(f"unexpected file scope match count: {len(matches)}") _, first_match = matches[0] render_match(console, doc.meta.analysis.layout, rule, first_match, indent=0) diff --git a/capa/rules/__init__.py b/capa/rules/__init__.py index 9fa80a29eb..b2c858bb8b 100644 --- a/capa/rules/__init__.py +++ b/capa/rules/__init__.py @@ -89,6 +89,7 @@ class Scope(str, Enum): SPAN_OF_CALLS = "span of calls" CALL = "call" FUNCTION = "function" + SUPERBLOCK = "superblock" BASIC_BLOCK = "basic block" INSTRUCTION = "instruction" @@ -107,6 +108,7 @@ def to_yaml(cls, representer, node): Scope.FILE, Scope.GLOBAL, Scope.FUNCTION, + Scope.SUPERBLOCK, Scope.BASIC_BLOCK, Scope.INSTRUCTION, } @@ -219,6 +221,7 @@ def from_dict(self, scopes: dict[str, str]) -> "Scopes": capa.features.common.Characteristic("recursive call"), # plus basic block scope features, see below }, + Scope.SUPERBLOCK: set(), Scope.BASIC_BLOCK: { capa.features.common.MatchedRule, capa.features.common.Characteristic("tight loop"), @@ -252,6 +255,7 @@ def from_dict(self, scopes: dict[str, str]) -> "Scopes": # global scope features are available in all other scopes SUPPORTED_FEATURES[Scope.INSTRUCTION].update(SUPPORTED_FEATURES[Scope.GLOBAL]) SUPPORTED_FEATURES[Scope.BASIC_BLOCK].update(SUPPORTED_FEATURES[Scope.GLOBAL]) +SUPPORTED_FEATURES[Scope.SUPERBLOCK].update(SUPPORTED_FEATURES[Scope.GLOBAL]) SUPPORTED_FEATURES[Scope.FUNCTION].update(SUPPORTED_FEATURES[Scope.GLOBAL]) SUPPORTED_FEATURES[Scope.FILE].update(SUPPORTED_FEATURES[Scope.GLOBAL]) SUPPORTED_FEATURES[Scope.PROCESS].update(SUPPORTED_FEATURES[Scope.GLOBAL]) @@ -269,8 +273,10 @@ def from_dict(self, scopes: dict[str, str]) -> "Scopes": # all instruction scope features are also basic block features SUPPORTED_FEATURES[Scope.BASIC_BLOCK].update(SUPPORTED_FEATURES[Scope.INSTRUCTION]) +# all superblock scope features are also basic block features +SUPPORTED_FEATURES[Scope.SUPERBLOCK].update(SUPPORTED_FEATURES[Scope.BASIC_BLOCK]) # all basic block scope features are also function scope features -SUPPORTED_FEATURES[Scope.FUNCTION].update(SUPPORTED_FEATURES[Scope.BASIC_BLOCK]) +SUPPORTED_FEATURES[Scope.FUNCTION].update(SUPPORTED_FEATURES[Scope.SUPERBLOCK]) class InvalidRule(ValueError): @@ -600,6 +606,7 @@ def unique(sequence): STATIC_SCOPE_ORDER = [ Scope.FILE, Scope.FUNCTION, + Scope.SUPERBLOCK, Scope.BASIC_BLOCK, Scope.INSTRUCTION, ] @@ -714,6 +721,17 @@ def build_statements(d, scopes: Scopes): Scope.FUNCTION, build_statements(d[key][0], Scopes(static=Scope.FUNCTION)), description=description ) + elif key == "superblock": + if not is_subscope_compatible(scopes.static, Scope.SUPERBLOCK): + raise InvalidRule("`superblock` subscope supported only for `function` scope") + + if len(d[key]) != 1: + raise InvalidRule("subscope must have exactly one child statement") + + return ceng.Subscope( + Scope.SUPERBLOCK, build_statements(d[key][0], Scopes(static=Scope.SUPERBLOCK)), description=description + ) + elif key == "basic block": if not is_subscope_compatible(scopes.static, Scope.BASIC_BLOCK): raise InvalidRule("`basic block` subscope supported only for `function` scope") @@ -727,7 +745,9 @@ def build_statements(d, scopes: Scopes): elif key == "instruction": if not is_subscope_compatible(scopes.static, Scope.INSTRUCTION): - raise InvalidRule("`instruction` subscope supported only for `function` and `basic block` scope") + raise InvalidRule( + "`instruction` subscope supported only for `function`, `superblock`, and `basic block` scope" + ) if len(d[key]) == 1: statements = build_statements(d[key][0], Scopes(static=Scope.INSTRUCTION)) @@ -1442,6 +1462,7 @@ def __init__( Scope.PROCESS, Scope.INSTRUCTION, Scope.BASIC_BLOCK, + Scope.SUPERBLOCK, Scope.FUNCTION, Scope.FILE, ) @@ -1480,6 +1501,10 @@ def call_rules(self): def function_rules(self): return self.rules_by_scope[Scope.FUNCTION] + @property + def superblock_rules(self): + return self.rules_by_scope[Scope.SUPERBLOCK] + @property def basic_block_rules(self): return self.rules_by_scope[Scope.BASIC_BLOCK]