diff --git a/art/attacks/poisoning/__init__.py b/art/attacks/poisoning/__init__.py index fa62ad125a..0eaa91671a 100644 --- a/art/attacks/poisoning/__init__.py +++ b/art/attacks/poisoning/__init__.py @@ -19,3 +19,5 @@ from art.attacks.poisoning.hidden_trigger_backdoor.hidden_trigger_backdoor_pytorch import HiddenTriggerBackdoorPyTorch from art.attacks.poisoning.hidden_trigger_backdoor.hidden_trigger_backdoor_keras import HiddenTriggerBackdoorKeras from art.attacks.poisoning.sleeper_agent_attack import SleeperAgentAttack +from art.attacks.poisoning.dynamic_backdoor_gan import DynamicBackdoorGAN + diff --git a/art/attacks/poisoning/dynamic_backdoor_gan.py b/art/attacks/poisoning/dynamic_backdoor_gan.py new file mode 100644 index 0000000000..b2d96e894a --- /dev/null +++ b/art/attacks/poisoning/dynamic_backdoor_gan.py @@ -0,0 +1,73 @@ +# -*- coding: utf-8 -*- +"""dynamic_backdoor_gan + +Automatically generated by Colab. + +Original file is located at + https://colab.research.google.com/drive/19W9gZ2gUxkgu6rr5qAT1Arf7iauCj2QT +""" + +#Trigger Generator:A small CNN that learns to generate input-specific triggers +class TriggerGenerator(nn.Module): + def __init__(self, input_channels=3): + super().__init__() + self.net = nn.Sequential( + nn.Conv2d(input_channels, 32, kernel_size=3, padding=1), + nn.ReLU(), + nn.Conv2d(32, 32, kernel_size=3, padding=1), + nn.ReLU(), + nn.Conv2d(32, input_channels, kernel_size=3, padding=1), + nn.Tanh() + ) + + def forward(self, x): + return self.net(x) +# Custom Poisoning Attack: DynamicBackdoorGAN-This class defines how to poison data using the GAN trigger generator +class DynamicBackdoorGAN(PoisoningAttackBackdoor): + def __init__(self, generator, target_label, backdoor_rate, classifier, epsilon=0.5): + super().__init__(perturbation=lambda x: x) + self.classifier = classifier + self.generator = generator.to(classifier.device) + self.target_label = target_label + self.backdoor_rate = backdoor_rate + self.epsilon = epsilon +# Add trigger to a given image batch + def apply_trigger(self, images): + self.generator.eval() + with torch.no_grad(): + images = nn.functional.interpolate(images, size=(32, 32), mode='bilinear') # Resize images to ensure uniform dimension + triggers = self.generator(images.to(self.classifier.device)) #Generate dynamic, input-specific triggers using the trained TriggerGenerator + poisoned = (images.to(self.classifier.device) + self.epsilon * triggers).clamp(0, 1) # Clamp the pixel values to ensure they stay in the valid [0, 1] range. + return poisoned +# Poison the training data by injecting dynamic triggers and changing labels + def poison(self, x, y): + # Convert raw image data (x) to torch tensors (float), and convert one-hot labels (y) to class indices-required by ART + x_tensor = torch.tensor(x).float() + y_tensor = torch.tensor(np.argmax(y, axis=1)) + # Calculate total number of samples and how many should be poisoned(posion ratio=backdoor_rate) + batch_size = x_tensor.shape[0] + n_poison = int(self.backdoor_rate * batch_size) + # Apply the learned trigger to the first 'n_poison' samples + poisoned = self.apply_trigger(x_tensor[:n_poison]) + # The remaining samples remain clean + clean = x_tensor[n_poison:].to(self.classifier.device) + # Combine poisoned and clean samples into a single batch + poisoned_images = torch.cat([poisoned, clean], dim=0).cpu().numpy() + # Modify the labels of poisoned samples to the attacker's target class + new_labels = y_tensor.clone() + new_labels[:n_poison] = self.target_label # Set the poisoned labels to the desired misclassification + # Convert all labels back to one-hot encoding (required by ART classifiers) + new_labels = to_categorical(new_labels.numpy(), nb_classes=self.classifier.nb_classes) + return poisoned_images.astype(np.float32), new_labels.astype(np.float32) +#Evaluate the attack's success on test data + def evaluate(self, x_clean, y_clean): + x_tensor = torch.tensor(x_clean).float() + poisoned_test = self.apply_trigger(x_tensor).cpu().numpy().astype(np.float32)# Apply the trigger to every test image to create a poisoned test set + + preds = self.classifier.predict(poisoned_test) + true_target = np.full((len(preds),), self.target_label) + pred_labels = np.argmax(preds, axis=1) + + success = np.sum(pred_labels == true_target) + asr = 100.0 * success / len(pred_labels) + return asr \ No newline at end of file diff --git a/examples/dynamicbackdoorgan_demo.py b/examples/dynamicbackdoorgan_demo.py new file mode 100644 index 0000000000..21f75f9b71 --- /dev/null +++ b/examples/dynamicbackdoorgan_demo.py @@ -0,0 +1,215 @@ +# -*- coding: utf-8 -*- +"""DynamicBackdoorGAN_Demo.ipynb + +Automatically generated by Colab. + +Original file is located at + https://colab.research.google.com/drive/1Uxw5hHxnvtDh2-dC5cHgSfBMNMl05lpD +""" + +# ✅ Imports +import torch +import torch.nn as nn +import numpy as np +from torch.utils.data import Subset +from torchvision import datasets, transforms, models +from art.estimators.classification import PyTorchClassifier +from art.utils import to_categorical +from art.attacks.poisoning import PoisoningAttackBackdoor + + +# ✅ User Config +config = { + "dataset": "MNIST", # CIFAR10, CIFAR100, MNIST + "model_name": "densenet121", # resnet18, resnet50, mobilenetv2, densenet121 + "poison_ratio": 0.1, + "target_label": 0, # Target label to which poisoned samples are mapped + "epochs": 30, + "batch_size": 128, + "epsilon": 0.5, + "train_subset": None, + "test_subset": None +} + + +# ✅ Trigger Generator +class TriggerGenerator(nn.Module): + def __init__(self, input_channels=3): + super().__init__() + self.net = nn.Sequential( + nn.Conv2d(input_channels, 32, kernel_size=3, padding=1), + nn.ReLU(), + nn.Conv2d(32, 32, kernel_size=3, padding=1), + nn.ReLU(), + nn.Conv2d(32, input_channels, kernel_size=3, padding=1), + nn.Tanh() + ) + + def forward(self, x): + return self.net(x) + + +# ✅ ART-Compatible Poisoning Attack +class DynamicBackdoorGAN(PoisoningAttackBackdoor): + def __init__(self, generator, target_label, backdoor_rate, classifier, epsilon=0.5): + super().__init__(perturbation=lambda x: x) + self.classifier = classifier + self.generator = generator.to(classifier.device) + self.target_label = target_label + self.backdoor_rate = backdoor_rate + self.epsilon = epsilon + + def apply_trigger(self, images): + self.generator.eval() + with torch.no_grad(): + images = nn.functional.interpolate(images, size=(32, 32), mode='bilinear') + triggers = self.generator(images.to(self.classifier.device)) + poisoned = (images.to(self.classifier.device) + self.epsilon * triggers).clamp(0, 1) + return poisoned + + def poison(self, x, y): + x_tensor = torch.tensor(x).float() + y_tensor = torch.tensor(np.argmax(y, axis=1)) + + batch_size = x_tensor.shape[0] + n_poison = int(self.backdoor_rate * batch_size) + + poisoned = self.apply_trigger(x_tensor[:n_poison]) + clean = x_tensor[n_poison:].to(self.classifier.device) + + poisoned_images = torch.cat([poisoned, clean], dim=0).cpu().numpy() + + new_labels = y_tensor.clone() + new_labels[:n_poison] = self.target_label + + new_labels = to_categorical(new_labels.numpy(), nb_classes=self.classifier.nb_classes) + return poisoned_images.astype(np.float32), new_labels.astype(np.float32) + + def evaluate(self, x_clean, y_clean): + x_tensor = torch.tensor(x_clean).float() + poisoned_test = self.apply_trigger(x_tensor).cpu().numpy().astype(np.float32) + + preds = self.classifier.predict(poisoned_test) + true_target = np.full((len(preds),), self.target_label) + pred_labels = np.argmax(preds, axis=1) + + success = np.sum(pred_labels == true_target) + asr = 100.0 * success / len(pred_labels) + return asr + + +# ✅ Utility: Load Data + +def get_data(dataset="CIFAR10", train_subset=None, test_subset=None): + dataset_cls, num_classes = None, None + if dataset in ["CIFAR10", "CIFAR100"]: + transform = transforms.Compose([transforms.Resize((32, 32)), transforms.ToTensor()]) + elif dataset == "MNIST": + transform = transforms.Compose([ + transforms.Grayscale(num_output_channels=3), + transforms.Resize((32, 32)), + transforms.ToTensor() + ]) + else: + raise ValueError("Unsupported dataset") + + if dataset == "CIFAR10": + dataset_cls = datasets.CIFAR10 + num_classes = 10 + elif dataset == "CIFAR100": + dataset_cls = datasets.CIFAR100 + num_classes = 100 + elif dataset == "MNIST": + dataset_cls = datasets.MNIST + num_classes = 10 + if dataset_cls is None or num_classes is None: + raise ValueError(f"Dataset {dataset} not handled correctly.") + + train_set = dataset_cls(root="./data", train=True, download=True, transform=transform) + test_set = dataset_cls(root="./data", train=False, download=True, transform=transform) + + if train_subset is not None: + train_set = Subset(train_set, range(train_subset)) + if test_subset is not None: + test_set = Subset(test_set, range(test_subset)) + + x_train = torch.stack([x for x, _ in train_set]).numpy() + y_train = to_categorical([y for _, y in train_set], nb_classes=num_classes) + + x_test = torch.stack([x for x, _ in test_set]).numpy() + y_test = to_categorical([y for _, y in test_set], nb_classes=num_classes) + + return x_train, y_train, x_test, y_test, num_classes + + +# ✅ Utility: Get ART Classifier +def get_classifier(config): + model_name = config["model_name"] + nb_classes = config["nb_classes"] + input_shape = config["input_shape"] + lr = config.get("learning_rate", 0.001) + + if model_name == "resnet18": + model = models.resnet18(num_classes=nb_classes) + elif model_name == "resnet50": + model = models.resnet50(num_classes=nb_classes) + elif model_name == "mobilenetv2": + model = models.mobilenet_v2(num_classes=nb_classes) + elif model_name == "densenet121": + model = models.densenet121(num_classes=nb_classes) + else: + raise ValueError(f"Unsupported model: {model_name}") + + loss = torch.nn.CrossEntropyLoss() + optimizer = torch.optim.Adam(model.parameters(), lr=lr) + + classifier = PyTorchClassifier( + model=model, + loss=loss, + optimizer=optimizer, + input_shape=input_shape, + nb_classes=nb_classes, + clip_values=(0.0, 1.0), + device_type="gpu" if torch.cuda.is_available() else "cpu" + ) + return classifier + + +# ✅ Full Experiment +def run_dynamic_backdoor_experiment(config): + x_train, y_train, x_test, y_test, num_classes = get_data( + dataset=config["dataset"], + train_subset=config.get("train_subset"), + test_subset=config.get("test_subset") + ) + config["nb_classes"] = num_classes + config["input_shape"] = x_train.shape[1:] + + classifier = get_classifier(config) + + # Clean training + classifier.fit(x_train, y_train, nb_epochs=config["epochs"], batch_size=config["batch_size"]) + clean_acc = np.mean(np.argmax(classifier.predict(x_test), axis=1) == np.argmax(y_test, axis=1)) + print(f"✅ Clean Accuracy: {clean_acc * 100:.2f}%") + + # Poison training + generator = TriggerGenerator() + attack = DynamicBackdoorGAN( + generator, + config["target_label"], + config["poison_ratio"], + classifier, + epsilon=config["epsilon"] + ) + x_poison, y_poison = attack.poison(x_train, y_train) + + classifier.fit(x_poison, y_poison, nb_epochs=config["epochs"], batch_size=config["batch_size"]) + poisoned_acc = np.mean(np.argmax(classifier.predict(x_test), axis=1) == np.argmax(y_test, axis=1)) + print(f"🎯 Poisoned Accuracy: {poisoned_acc * 100:.2f}%") + + asr = attack.evaluate(x_test, y_test) + print(f"💥 Attack Success Rate (ASR): {asr:.2f}%") + + +# ✅ Run +run_dynamic_backdoor_experiment(config)