Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 176 additions & 2 deletions capa/capabilities/static.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,19 @@
import logging
import itertools
import collections
from typing import Optional
from dataclasses import dataclass

import intervaltree

import capa.perf
import capa.helpers
import capa.features.freeze as frz
import capa.render.result_document as rdoc
from capa.rules import Scope, RuleSet
from capa.engine import FeatureSet, MatchResults
from capa.features.common import Result
from capa.features.address import Address, SuperblockAddress
from capa.capabilities.common import Capabilities, find_file_capabilities
from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, StaticFeatureExtractor

Expand Down Expand Up @@ -110,11 +115,159 @@ def find_basic_block_capabilities(
@dataclass
class CodeCapabilities:
function_matches: MatchResults
superblock_matches: MatchResults
basic_block_matches: MatchResults
instruction_matches: MatchResults
feature_count: int


@dataclass
class FlowGraphNode:
# This dataclass will be used in the construction of the function's basic block flow graph.
# Some analysis backends provide native support for flow graphs, but we construct it here regardless
# to decrease the amount of code required on the analysis backend's side (feature extractors).
bva: Address
left: Optional[Address]
right: Optional[Address]


class SuperblockMatcher:
def __init__(self, ruleset: RuleSet, extractor: StaticFeatureExtractor):
self.ruleset: RuleSet = ruleset
self.extractor: StaticFeatureExtractor = extractor
self.features: FeatureSet = collections.defaultdict(set)
self.matches: MatchResults = collections.defaultdict(list)
self.flow_graph: dict[Address, FlowGraphNode] = {}
self.addr_to_bb = intervaltree.IntervalTree()

def add_basic_block(self, bb: BBHandle, features: FeatureSet):
"""
Register a basic block and its features inot the superblock matcher.

The basic block is added to the flowgraph tree maintained by the matcher object,
and its features are added to the global feature set maintained by the matcher object.

Capabilities will later be extracted from all the registered features (as if they were extracted at the same level),
and will be pruned after that while keeping only capabilities matched on superblocks (i.e., relevant basic blocks
in series with no interruptions between)
"""
# Get the basic blocks that follow the current one.
branches = list(self.extractor.get_next_basic_blocks(bb))
# Get the current bb's size
bb_size = self.extractor.get_basic_block_size(bb)

# Add the basic block to the flow graph.
self.flow_graph[bb.address] = FlowGraphNode(
bva=bb.address,
left=branches[0] if branches and branches[0] != bb.address else None,
right=branches[1] if len(branches) > 1 and branches[1] != bb.address else None,
)

# Register bb's address space in the interval tree.
# This will later be used to determine the bb that a feature was extracted from.
if bb_size != 0:
self.addr_to_bb[int(bb.address) : int(bb.address) + bb_size] = bb.address

# Add features extracted from this bb into the matcher's overall collection of features.
for feature, va in features.items():
self.features[feature].update(va)

def _prune(self):
# go through each rule in self.matches, and each match in self.matches[rule_name],
# and then check if the self.matches[rule_name].locations or locations in each child of self.matches[rule_name].children
# have basic block gaps in them. If so, remove that specific match from self.matches[rule_name].
# if self.matches[rule_name] then becomes empty, remove it from self.matches.
def form_superblock_from_bbs(bb_locations: set[Address]) -> list[Address]:
cycle_heads: dict[Address, list] = collections.defaultdict(list)

# If one of the basic blocks has both a left and a right branch in the list of basic blocks,
# then we cannot form a superblock and we return an empty list
for location in bb_locations:
if self.flow_graph[location].left in bb_locations and self.flow_graph[location].right in bb_locations:
return []

# Go through the list of provided basic blocks and form superblocks from it.
# If we find that only one cycle exits, and not multiple disjoint cycles, we return that cycle.
# Otherwise, we return an empty list.
while bb_locations:
# We pick a random basic block and try to form a cycle from it.
# The resulting cycle (of length greater or equal to 1) is storred
head: FlowGraphNode = self.flow_graph[bb_locations.pop()]
node = head
while node:
cycle_heads[head.bva].append(node.bva)
# Check if branch is in the list of basic blocks. If so, add to current cycle.
if node.left in bb_locations:
bb_locations.remove(node.left)
node = self.flow_graph[node.left]
elif node.right in bb_locations:
bb_locations.remove(node.right)
node = self.flow_graph[node.right]
# Check if branch is the start of an encountered cycle. If so, connect the two cycles.
elif node.left in cycle_heads:
cycle_heads[head.bva] += cycle_heads.pop(node.left)
break
elif node.right in cycle_heads:
cycle_heads[head.bva] += cycle_heads.pop(node.right)
break
# The current basic block either branches to a basic block that holds no relevant features,
# or loops back to a basic block in the cycle, or the basic block is at the end of the function.
else:
break

if len(cycle_heads) == 1 and len(list(cycle_heads.values())[0]) > 1:
# Inputted basic blocks form a single cycle (i.e., superblock) of length > 1.
# Return that cycle (superblock).
return cycle_heads.popitem()[1]
else:
# Inputted basic blocks form either multiple disjoint cycles, or a cycle of length <= 1.
# Return an empty list (i.e., boolean False).
return []

def get_bbs_from_locations(locations: set[Address]) -> set[Address]:
bbs_addresses = set()
for location in locations:
# get the bb address from the location
# and add it to the set of bb addresses.
bbs_addresses.add(list(self.addr_to_bb[int(location)])[0].begin)
return bbs_addresses

def get_locations(result: Result) -> set[Address]:
# get all locations of found features in the result.
if not result.success:
# Not statements are an edge case, but the locations of their children is not set anyways.
# Logically this is still valid because "not" is usually used to make sure features do not exist.
return set()
if result.children:
# Statements are usually what returns children, and they usually do not have locations.
locations: set[Address] = set()
for child in result.children:
locations.update(get_locations(child))
return locations
if result.locations:
# We are dealing with a feature. Convert locations from frozenset to set then return it.
return set(result.locations)
return set()

pruned_matches: MatchResults = collections.defaultdict(list)
for rule_name, matches in self.matches.items():
for _, result in matches:
locations = get_locations(result)
features_bbs = get_bbs_from_locations(locations)
superblock = form_superblock_from_bbs(features_bbs)
if superblock:
# The match spans multiple basic blocks that form a superblock. Therefore, we keep it.
pruned_matches[rule_name].append((SuperblockAddress(superblock), result))

# update the list of valid matches.
self.matches = pruned_matches

def match(self, f_address: Address):
# match superblock rules against the constructed flow graph.
_, self.matches = self.ruleset.match(Scope.SUPERBLOCK, self.features, f_address)
self._prune()


def find_code_capabilities(ruleset: RuleSet, extractor: StaticFeatureExtractor, fh: FunctionHandle) -> CodeCapabilities:
"""
find matches for the given rules within the given function.
Expand All @@ -123,6 +276,9 @@ def find_code_capabilities(ruleset: RuleSet, extractor: StaticFeatureExtractor,
# includes features found within basic blocks (and instructions).
function_features: FeatureSet = collections.defaultdict(set)

# matches found at the constituent superblocks of this function.
superblock_matches: MatchResults = collections.defaultdict(list)

# matches found at the basic block scope.
# might be found at different basic blocks, that's ok.
bb_matches: MatchResults = collections.defaultdict(list)
Expand All @@ -131,6 +287,8 @@ def find_code_capabilities(ruleset: RuleSet, extractor: StaticFeatureExtractor,
# might be found at different instructions, that's ok.
insn_matches: MatchResults = collections.defaultdict(list)

superblock_matcher = SuperblockMatcher(ruleset, extractor)

for bb in extractor.get_basic_blocks(fh):
basic_block_capabilities = find_basic_block_capabilities(ruleset, extractor, fh, bb)
for feature, vas in basic_block_capabilities.features.items():
Expand All @@ -142,17 +300,29 @@ def find_code_capabilities(ruleset: RuleSet, extractor: StaticFeatureExtractor,
for rule_name, res in basic_block_capabilities.instruction_matches.items():
insn_matches[rule_name].extend(res)

# add basic block and its features and capabilities to the superblock matcher.
superblock_matcher.add_basic_block(bb, basic_block_capabilities.features)

# match capabilities at the superblock scope once all basic blocks have been added.
superblock_matcher.match(fh.address)
for rule_name, res in superblock_matcher.matches.items():
superblock_matches[rule_name].extend(res)
rule = ruleset[rule_name]
for va, _ in res:
capa.engine.index_rule_matches(function_features, rule, [va])

for feature, va in itertools.chain(extractor.extract_function_features(fh), extractor.extract_global_features()):
function_features[feature].add(va)

_, function_matches = ruleset.match(Scope.FUNCTION, function_features, fh.address)
return CodeCapabilities(function_matches, bb_matches, insn_matches, len(function_features))
return CodeCapabilities(function_matches, superblock_matches, bb_matches, insn_matches, len(function_features))


def find_static_capabilities(
ruleset: RuleSet, extractor: StaticFeatureExtractor, disable_progress=None
) -> Capabilities:
all_function_matches: MatchResults = collections.defaultdict(list)
all_superblock_matches: MatchResults = collections.defaultdict(list)
all_bb_matches: MatchResults = collections.defaultdict(list)
all_insn_matches: MatchResults = collections.defaultdict(list)

Expand Down Expand Up @@ -196,6 +366,7 @@ def find_static_capabilities(
match_count = 0
for name, matches_ in itertools.chain(
code_capabilities.function_matches.items(),
code_capabilities.superblock_matches.items(),
code_capabilities.basic_block_matches.items(),
code_capabilities.instruction_matches.items(),
):
Expand All @@ -212,6 +383,8 @@ def find_static_capabilities(

for rule_name, res in code_capabilities.function_matches.items():
all_function_matches[rule_name].extend(res)
for rule_name, res in code_capabilities.superblock_matches.items():
all_superblock_matches[rule_name].extend(res)
for rule_name, res in code_capabilities.basic_block_matches.items():
all_bb_matches[rule_name].extend(res)
for rule_name, res in code_capabilities.instruction_matches.items():
Expand All @@ -223,7 +396,7 @@ def find_static_capabilities(
# mapping from feature (matched rule) to set of addresses at which it matched.
function_and_lower_features: FeatureSet = collections.defaultdict(set)
for rule_name, results in itertools.chain(
all_function_matches.items(), all_bb_matches.items(), all_insn_matches.items()
all_function_matches.items(), all_superblock_matches.items(), all_bb_matches.items(), all_insn_matches.items()
):
locations = {p[0] for p in results}
rule = ruleset[rule_name]
Expand All @@ -239,6 +412,7 @@ def find_static_capabilities(
# and we can merge the dictionaries naively.
all_insn_matches.items(),
all_bb_matches.items(),
all_superblock_matches.items(),
all_function_matches.items(),
all_file_capabilities.matches.items(),
)
Expand Down
33 changes: 33 additions & 0 deletions capa/features/address.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,39 @@ def __hash__(self):
return int.__hash__(self)


class SuperblockAddress(Address):
"""an address of a superblock in a dynamic execution trace"""

def __init__(self, addresses: list[Address]):
for address in addresses:
assert isinstance(address, AbsoluteVirtualAddress)
assert address >= 0
self.addresses: list[Address] = addresses

def __repr__(self):
return "superblock(" + " -> ".join([f"0x{address:x}" for address in self.addresses]) + ")"

def __hash__(self):
return hash(tuple(self.addresses))

def __eq__(self, other):
assert isinstance(other, SuperblockAddress)
return self.addresses == other.addresses

def __lt__(self, other):
assert isinstance(other, SuperblockAddress)
if not other.addresses or not self.addresses:
return False

if self.addresses[0] != other.addresses[0]:
return self.addresses[0] < other.addresses[0]
else:
return len(self.addresses) < len(other.addresses)

def __contains__(self, address: Address):
return address in self.addresses


class ProcessAddress(Address):
"""an address of a process in a dynamic execution trace"""

Expand Down
14 changes: 14 additions & 0 deletions capa/features/extractors/base_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,20 @@ def extract_basic_block_features(self, f: FunctionHandle, bb: BBHandle) -> Itera
"""
raise NotImplementedError()

@abc.abstractmethod
def get_next_basic_blocks(self, bb: BBHandle) -> Iterator[Address]:
"""
for a given basic block, retrieve the basic blocks that follow it (if any).
"""
raise NotImplementedError()

@abc.abstractmethod
def get_basic_block_size(self, bb: BBHandle) -> int:
"""
get the size of the given basic block.
"""
raise NotImplementedError()

@abc.abstractmethod
def get_instructions(self, f: FunctionHandle, bb: BBHandle) -> Iterator[InsnHandle]:
"""
Expand Down
8 changes: 8 additions & 0 deletions capa/features/extractors/binexport2/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,14 @@ def get_basic_blocks(self, fh: FunctionHandle) -> Iterator[BBHandle]:
inner=BasicBlockContext(basic_block_index),
)

def get_next_basic_blocks(self, bb: BBHandle):
# not implemented yet
return []

def get_basic_block_size(self, bb: BBHandle):
# not implemented yet
return 0

def extract_basic_block_features(self, fh: FunctionHandle, bbh: BBHandle) -> Iterator[tuple[Feature, Address]]:
yield from capa.features.extractors.binexport2.basicblock.extract_features(fh, bbh)

Expand Down
7 changes: 7 additions & 0 deletions capa/features/extractors/binja/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ def get_basic_blocks(self, fh: FunctionHandle) -> Iterator[BBHandle]:
for bb in f.basic_blocks:
yield BBHandle(address=AbsoluteVirtualAddress(bb.start), inner=bb)

def get_next_basic_blocks(self, bb) -> Iterator[AbsoluteVirtualAddress]:
for edge in bb.outgoing_edges:
yield AbsoluteVirtualAddress(edge.target.start)

def get_basic_block_size(self, bb: BBHandle) -> int:
return bb.inner.length

def extract_basic_block_features(self, fh: FunctionHandle, bbh: BBHandle) -> Iterator[tuple[Feature, Address]]:
yield from capa.features.extractors.binja.basicblock.extract_features(fh, bbh)

Expand Down
6 changes: 6 additions & 0 deletions capa/features/extractors/dnfile/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,12 @@ def get_basic_blocks(self, f) -> Iterator[BBHandle]:
inner=f.inner,
)

def get_next_basic_blocks(self, bb):
yield from []

def get_basic_block_size(self, bb: BBHandle) -> int:
return bb.inner.code_size

def extract_basic_block_features(self, fh, bbh):
# we don't support basic block features
yield from []
Expand Down
6 changes: 6 additions & 0 deletions capa/features/extractors/dotnetfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,12 @@ def extract_function_features(self, f):
def get_basic_blocks(self, f):
raise NotImplementedError("DotnetFileFeatureExtractor can only be used to extract file features")

def get_next_basic_blocks(self, bb):
raise NotImplementedError("DotnetFileFeatureExtractor can only be used to extract file features")

def get_basic_block_size(self, bb):
raise NotImplementedError("DotnetFileFeatureExtractor can only be used to extract file features")

def extract_basic_block_features(self, f, bb):
raise NotImplementedError("DotnetFileFeatureExtractor can only be used to extract file features")

Expand Down
6 changes: 6 additions & 0 deletions capa/features/extractors/elffile.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,12 @@ def extract_function_features(self, f):
def get_basic_blocks(self, f):
raise NotImplementedError("ElfFeatureExtractor can only be used to extract file features")

def get_next_basic_blocks(self, bb):
raise NotImplementedError("ElfFeatureExtractor can only be used to extract file features")

def get_basic_block_size(self, bb):
raise NotImplementedError("ElfFeatureExtractor can only be used to extract file features")

def extract_basic_block_features(self, f, bb):
raise NotImplementedError("ElfFeatureExtractor can only be used to extract file features")

Expand Down
Loading
Loading