Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions art/attacks/poisoning/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,5 @@
from art.attacks.poisoning.hidden_trigger_backdoor.hidden_trigger_backdoor_pytorch import HiddenTriggerBackdoorPyTorch
from art.attacks.poisoning.hidden_trigger_backdoor.hidden_trigger_backdoor_keras import HiddenTriggerBackdoorKeras
from art.attacks.poisoning.sleeper_agent_attack import SleeperAgentAttack
from art.attacks.poisoning.dynamic_backdoor_gan import DynamicBackdoorGAN

73 changes: 73 additions & 0 deletions art/attacks/poisoning/dynamic_backdoor_gan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# -*- coding: utf-8 -*-
"""dynamic_backdoor_gan

Automatically generated by Colab.

Original file is located at
https://colab.research.google.com/drive/19W9gZ2gUxkgu6rr5qAT1Arf7iauCj2QT
"""

#Trigger Generator:A small CNN that learns to generate input-specific triggers
class TriggerGenerator(nn.Module):
def __init__(self, input_channels=3):
super().__init__()
self.net = nn.Sequential(
nn.Conv2d(input_channels, 32, kernel_size=3, padding=1),
nn.ReLU(),
nn.Conv2d(32, 32, kernel_size=3, padding=1),
nn.ReLU(),
nn.Conv2d(32, input_channels, kernel_size=3, padding=1),
nn.Tanh()
)

def forward(self, x):
return self.net(x)
# Custom Poisoning Attack: DynamicBackdoorGAN-This class defines how to poison data using the GAN trigger generator
class DynamicBackdoorGAN(PoisoningAttackBackdoor):
def __init__(self, generator, target_label, backdoor_rate, classifier, epsilon=0.5):
super().__init__(perturbation=lambda x: x)
self.classifier = classifier
self.generator = generator.to(classifier.device)
self.target_label = target_label
self.backdoor_rate = backdoor_rate
self.epsilon = epsilon
# Add trigger to a given image batch
def apply_trigger(self, images):
self.generator.eval()
with torch.no_grad():
images = nn.functional.interpolate(images, size=(32, 32), mode='bilinear') # Resize images to ensure uniform dimension
triggers = self.generator(images.to(self.classifier.device)) #Generate dynamic, input-specific triggers using the trained TriggerGenerator
poisoned = (images.to(self.classifier.device) + self.epsilon * triggers).clamp(0, 1) # Clamp the pixel values to ensure they stay in the valid [0, 1] range.
return poisoned
# Poison the training data by injecting dynamic triggers and changing labels
def poison(self, x, y):
# Convert raw image data (x) to torch tensors (float), and convert one-hot labels (y) to class indices-required by ART
x_tensor = torch.tensor(x).float()
y_tensor = torch.tensor(np.argmax(y, axis=1))
# Calculate total number of samples and how many should be poisoned(posion ratio=backdoor_rate)
batch_size = x_tensor.shape[0]
n_poison = int(self.backdoor_rate * batch_size)
# Apply the learned trigger to the first 'n_poison' samples
poisoned = self.apply_trigger(x_tensor[:n_poison])
# The remaining samples remain clean
clean = x_tensor[n_poison:].to(self.classifier.device)
# Combine poisoned and clean samples into a single batch
poisoned_images = torch.cat([poisoned, clean], dim=0).cpu().numpy()
# Modify the labels of poisoned samples to the attacker's target class
new_labels = y_tensor.clone()
new_labels[:n_poison] = self.target_label # Set the poisoned labels to the desired misclassification
# Convert all labels back to one-hot encoding (required by ART classifiers)
new_labels = to_categorical(new_labels.numpy(), nb_classes=self.classifier.nb_classes)
return poisoned_images.astype(np.float32), new_labels.astype(np.float32)
#Evaluate the attack's success on test data
def evaluate(self, x_clean, y_clean):
x_tensor = torch.tensor(x_clean).float()
poisoned_test = self.apply_trigger(x_tensor).cpu().numpy().astype(np.float32)# Apply the trigger to every test image to create a poisoned test set

preds = self.classifier.predict(poisoned_test)
true_target = np.full((len(preds),), self.target_label)
pred_labels = np.argmax(preds, axis=1)

success = np.sum(pred_labels == true_target)
asr = 100.0 * success / len(pred_labels)
return asr
215 changes: 215 additions & 0 deletions examples/dynamicbackdoorgan_demo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
# -*- coding: utf-8 -*-
"""DynamicBackdoorGAN_Demo.ipynb

Automatically generated by Colab.

Original file is located at
https://colab.research.google.com/drive/1Uxw5hHxnvtDh2-dC5cHgSfBMNMl05lpD
"""

# ✅ Imports
import torch
import torch.nn as nn
import numpy as np
from torch.utils.data import Subset
from torchvision import datasets, transforms, models
from art.estimators.classification import PyTorchClassifier
from art.utils import to_categorical
from art.attacks.poisoning import PoisoningAttackBackdoor


# ✅ User Config
config = {
"dataset": "MNIST", # CIFAR10, CIFAR100, MNIST
"model_name": "densenet121", # resnet18, resnet50, mobilenetv2, densenet121
"poison_ratio": 0.1,
"target_label": 0, # Target label to which poisoned samples are mapped
"epochs": 30,
"batch_size": 128,
"epsilon": 0.5,
"train_subset": None,
"test_subset": None
}


# ✅ Trigger Generator
class TriggerGenerator(nn.Module):
def __init__(self, input_channels=3):
super().__init__()
self.net = nn.Sequential(
nn.Conv2d(input_channels, 32, kernel_size=3, padding=1),
nn.ReLU(),
nn.Conv2d(32, 32, kernel_size=3, padding=1),
nn.ReLU(),
nn.Conv2d(32, input_channels, kernel_size=3, padding=1),
nn.Tanh()
)

def forward(self, x):
return self.net(x)


# ✅ ART-Compatible Poisoning Attack
class DynamicBackdoorGAN(PoisoningAttackBackdoor):
def __init__(self, generator, target_label, backdoor_rate, classifier, epsilon=0.5):
super().__init__(perturbation=lambda x: x)
self.classifier = classifier
self.generator = generator.to(classifier.device)
self.target_label = target_label
self.backdoor_rate = backdoor_rate
self.epsilon = epsilon

def apply_trigger(self, images):
self.generator.eval()
with torch.no_grad():
images = nn.functional.interpolate(images, size=(32, 32), mode='bilinear')
triggers = self.generator(images.to(self.classifier.device))
poisoned = (images.to(self.classifier.device) + self.epsilon * triggers).clamp(0, 1)
return poisoned

def poison(self, x, y):
x_tensor = torch.tensor(x).float()
y_tensor = torch.tensor(np.argmax(y, axis=1))

batch_size = x_tensor.shape[0]
n_poison = int(self.backdoor_rate * batch_size)

poisoned = self.apply_trigger(x_tensor[:n_poison])
clean = x_tensor[n_poison:].to(self.classifier.device)

poisoned_images = torch.cat([poisoned, clean], dim=0).cpu().numpy()

new_labels = y_tensor.clone()
new_labels[:n_poison] = self.target_label

new_labels = to_categorical(new_labels.numpy(), nb_classes=self.classifier.nb_classes)
return poisoned_images.astype(np.float32), new_labels.astype(np.float32)

def evaluate(self, x_clean, y_clean):
x_tensor = torch.tensor(x_clean).float()
poisoned_test = self.apply_trigger(x_tensor).cpu().numpy().astype(np.float32)

preds = self.classifier.predict(poisoned_test)
true_target = np.full((len(preds),), self.target_label)
pred_labels = np.argmax(preds, axis=1)

success = np.sum(pred_labels == true_target)
asr = 100.0 * success / len(pred_labels)
return asr


# ✅ Utility: Load Data

def get_data(dataset="CIFAR10", train_subset=None, test_subset=None):
dataset_cls, num_classes = None, None
if dataset in ["CIFAR10", "CIFAR100"]:
transform = transforms.Compose([transforms.Resize((32, 32)), transforms.ToTensor()])
elif dataset == "MNIST":
transform = transforms.Compose([
transforms.Grayscale(num_output_channels=3),
transforms.Resize((32, 32)),
transforms.ToTensor()
])
else:
raise ValueError("Unsupported dataset")

if dataset == "CIFAR10":
dataset_cls = datasets.CIFAR10
num_classes = 10
elif dataset == "CIFAR100":
dataset_cls = datasets.CIFAR100
num_classes = 100
elif dataset == "MNIST":
dataset_cls = datasets.MNIST
num_classes = 10
if dataset_cls is None or num_classes is None:
raise ValueError(f"Dataset {dataset} not handled correctly.")

train_set = dataset_cls(root="./data", train=True, download=True, transform=transform)
test_set = dataset_cls(root="./data", train=False, download=True, transform=transform)

if train_subset is not None:
train_set = Subset(train_set, range(train_subset))
if test_subset is not None:
test_set = Subset(test_set, range(test_subset))

x_train = torch.stack([x for x, _ in train_set]).numpy()
y_train = to_categorical([y for _, y in train_set], nb_classes=num_classes)

x_test = torch.stack([x for x, _ in test_set]).numpy()
y_test = to_categorical([y for _, y in test_set], nb_classes=num_classes)

return x_train, y_train, x_test, y_test, num_classes


# ✅ Utility: Get ART Classifier
def get_classifier(config):
model_name = config["model_name"]
nb_classes = config["nb_classes"]
input_shape = config["input_shape"]
lr = config.get("learning_rate", 0.001)

if model_name == "resnet18":
model = models.resnet18(num_classes=nb_classes)
elif model_name == "resnet50":
model = models.resnet50(num_classes=nb_classes)
elif model_name == "mobilenetv2":
model = models.mobilenet_v2(num_classes=nb_classes)
elif model_name == "densenet121":
model = models.densenet121(num_classes=nb_classes)
else:
raise ValueError(f"Unsupported model: {model_name}")

loss = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=lr)

classifier = PyTorchClassifier(
model=model,
loss=loss,
optimizer=optimizer,
input_shape=input_shape,
nb_classes=nb_classes,
clip_values=(0.0, 1.0),
device_type="gpu" if torch.cuda.is_available() else "cpu"
)
return classifier


# ✅ Full Experiment
def run_dynamic_backdoor_experiment(config):
x_train, y_train, x_test, y_test, num_classes = get_data(
dataset=config["dataset"],
train_subset=config.get("train_subset"),
test_subset=config.get("test_subset")
)
config["nb_classes"] = num_classes
config["input_shape"] = x_train.shape[1:]

classifier = get_classifier(config)

# Clean training
classifier.fit(x_train, y_train, nb_epochs=config["epochs"], batch_size=config["batch_size"])
clean_acc = np.mean(np.argmax(classifier.predict(x_test), axis=1) == np.argmax(y_test, axis=1))
print(f"✅ Clean Accuracy: {clean_acc * 100:.2f}%")

# Poison training
generator = TriggerGenerator()
attack = DynamicBackdoorGAN(
generator,
config["target_label"],
config["poison_ratio"],
classifier,
epsilon=config["epsilon"]
)
x_poison, y_poison = attack.poison(x_train, y_train)

classifier.fit(x_poison, y_poison, nb_epochs=config["epochs"], batch_size=config["batch_size"])
poisoned_acc = np.mean(np.argmax(classifier.predict(x_test), axis=1) == np.argmax(y_test, axis=1))
print(f"🎯 Poisoned Accuracy: {poisoned_acc * 100:.2f}%")

asr = attack.evaluate(x_test, y_test)
print(f"💥 Attack Success Rate (ASR): {asr:.2f}%")


# ✅ Run
run_dynamic_backdoor_experiment(config)
Loading