|
| 1 | +import argparse |
| 2 | +import logging |
| 3 | +import os |
| 4 | + |
| 5 | +import pandas as pd |
| 6 | +import numpy as np |
| 7 | +from itertools import combinations |
| 8 | + |
| 9 | + |
| 10 | +def parse_args(): |
| 11 | + parser = argparse.ArgumentParser() |
| 12 | + parser.add_argument('--data-dir', type=str, default='/opt/ml/processing/input') |
| 13 | + parser.add_argument('--output-dir', type=str, default='/opt/ml/processing/output') |
| 14 | + parser.add_argument('--transactions', type=str, default='transaction.csv', help='name of file with transactions') |
| 15 | + parser.add_argument('--identity', type=str, default='identity.csv', help='name of file with identity info') |
| 16 | + parser.add_argument('--id-cols', type=str, default='', help='comma separated id cols in transactions table') |
| 17 | + parser.add_argument('--cat-cols', type=str, default='', help='comma separated categorical cols in transactions') |
| 18 | + parser.add_argument('--cat-cols-xgboost', type=str, default='', help='comma separated categorical cols that can be used as features for xgboost in transactions') |
| 19 | + parser.add_argument('--train-data-ratio', type=float, default=0.7, help='fraction of data to use in training set') |
| 20 | + parser.add_argument('--valid-data-ratio', type=float, default=0.2, help='fraction of data to use in validation set') |
| 21 | + parser.add_argument('--construct-homogeneous', action="store_true", default=False, |
| 22 | + help='use bipartite graphs edgelists to construct homogenous graph edgelist') |
| 23 | + return parser.parse_args() |
| 24 | + |
| 25 | + |
| 26 | +def get_logger(name): |
| 27 | + logger = logging.getLogger(name) |
| 28 | + log_format = '%(asctime)s %(levelname)s %(name)s: %(message)s' |
| 29 | + logging.basicConfig(format=log_format, level=logging.INFO) |
| 30 | + logger.setLevel(logging.INFO) |
| 31 | + return logger |
| 32 | + |
| 33 | + |
| 34 | +def load_data(data_dir, transaction_data, identity_data, train_data_ratio, valid_data_ratio, output_dir): |
| 35 | + transaction_df = pd.read_csv(os.path.join(data_dir, transaction_data)) |
| 36 | + logging.info("Shape of transaction data is {}".format(transaction_df.shape)) |
| 37 | + logging.info("# Tagged transactions: {}".format(len(transaction_df) - transaction_df.isFraud.isnull().sum())) |
| 38 | + |
| 39 | + identity_df = pd.read_csv(os.path.join(data_dir, identity_data)) |
| 40 | + logging.info("Shape of identity data is {}".format(identity_df.shape)) |
| 41 | + |
| 42 | + # extract out transactions for train, validation, and test data |
| 43 | + logging.info("Training, validation, and test data fraction are {}, {}, and {}, respectively".format(train_data_ratio, valid_data_ratio, 1-train_data_ratio-valid_data_ratio)) |
| 44 | + assert train_data_ratio + valid_data_ratio < 1, "The sum of training and validation ratio is found more than or equal to 1." |
| 45 | + n_train = int(transaction_df.shape[0]*train_data_ratio) |
| 46 | + n_valid = int(transaction_df.shape[0]*(train_data_ratio+valid_data_ratio)) |
| 47 | + valid_ids = transaction_df.TransactionID.values[n_train:n_valid] |
| 48 | + test_ids = transaction_df.TransactionID.values[n_valid:] |
| 49 | + |
| 50 | + get_fraud_frac = lambda series: 100 * sum(series)/len(series) |
| 51 | + logging.info("Percentage of fraud transactions for train data: {}".format(get_fraud_frac(transaction_df.isFraud[:n_train]))) |
| 52 | + logging.info("Percentage of fraud transactions for validation data: {}".format(get_fraud_frac(transaction_df.isFraud[n_train:n_valid]))) |
| 53 | + logging.info("Percentage of fraud transactions for test data: {}".format(get_fraud_frac(transaction_df.isFraud[n_valid:]))) |
| 54 | + logging.info("Percentage of fraud transactions for all data: {}".format(get_fraud_frac(transaction_df.isFraud))) |
| 55 | + |
| 56 | + with open(os.path.join(output_dir, 'validation.csv'), 'w') as f: |
| 57 | + f.writelines(map(lambda x: str(x) + "\n", valid_ids)) |
| 58 | + logging.info("Wrote validaton data to file: {}".format(os.path.join(output_dir, 'validation.csv'))) |
| 59 | + |
| 60 | + with open(os.path.join(output_dir, 'test.csv'), 'w') as f: |
| 61 | + f.writelines(map(lambda x: str(x) + "\n", test_ids)) |
| 62 | + logging.info("Wrote test data to file: {}".format(os.path.join(output_dir, 'test.csv'))) |
| 63 | + |
| 64 | + return transaction_df, identity_df, valid_ids, test_ids |
| 65 | + |
| 66 | + |
| 67 | +def get_features_and_labels(transactions_df, transactions_id_cols, transactions_cat_cols, transactions_cat_cols_xgboost, output_dir): |
| 68 | + # Get features |
| 69 | + non_feature_cols = ['isFraud', 'TransactionDT'] + transactions_id_cols.split(",") |
| 70 | + feature_cols = [col for col in transactions_df.columns if col not in non_feature_cols] |
| 71 | + logging.info("Categorical columns: {}".format(transactions_cat_cols.split(","))) |
| 72 | + features = pd.get_dummies(transactions_df[feature_cols], columns=transactions_cat_cols.split(",")).fillna(0) |
| 73 | + features['TransactionAmt'] = features['TransactionAmt'].apply(np.log10) |
| 74 | + logging.info("Transformed feature columns: {}".format(list(features.columns))) |
| 75 | + logging.info("Shape of features: {}".format(features.shape)) |
| 76 | + features.to_csv(os.path.join(output_dir, 'features.csv'), index=False, header=False) |
| 77 | + logging.info("Wrote features to file: {}".format(os.path.join(output_dir, 'features.csv'))) |
| 78 | + |
| 79 | + |
| 80 | + logging.info("Processing feature columns for XGBoost.") |
| 81 | + cat_cols_xgb = transactions_cat_cols_xgboost.split(",") |
| 82 | + logging.info("Categorical feature columns for XGBoost: {}".format(cat_cols_xgb)) |
| 83 | + logging.info("Numerical feature column for XGBoost: 'TransactionAmt'") |
| 84 | + features_xgb = pd.get_dummies(transactions_df[['TransactionID']+cat_cols_xgb], columns=cat_cols_xgb).fillna(0) |
| 85 | + features_xgb['TransactionAmt'] = features['TransactionAmt'] |
| 86 | + features_xgb.to_csv(os.path.join(output_dir, 'features_xgboost.csv'), index=False, header=False) |
| 87 | + logging.info("Wrote features to file: {}".format(os.path.join(output_dir, 'features_xgboost.csv'))) |
| 88 | + |
| 89 | + # Get labels |
| 90 | + transactions_df[['TransactionID', 'isFraud']].to_csv(os.path.join(output_dir, 'tags.csv'), index=False) |
| 91 | + logging.info("Wrote labels to file: {}".format(os.path.join(output_dir, 'tags.csv'))) |
| 92 | + |
| 93 | + |
| 94 | +def get_relations_and_edgelist(transactions_df, identity_df, transactions_id_cols, output_dir): |
| 95 | + # Get relations |
| 96 | + edge_types = transactions_id_cols.split(",") + list(identity_df.columns) |
| 97 | + logging.info("Found the following distinct relation types: {}".format(edge_types)) |
| 98 | + id_cols = ['TransactionID'] + transactions_id_cols.split(",") |
| 99 | + full_identity_df = transactions_df[id_cols].merge(identity_df, on='TransactionID', how='left') |
| 100 | + logging.info("Shape of identity columns: {}".format(full_identity_df.shape)) |
| 101 | + |
| 102 | + # extract edges |
| 103 | + edges = {} |
| 104 | + for etype in edge_types: |
| 105 | + edgelist = full_identity_df[['TransactionID', etype]].dropna() |
| 106 | + edgelist.to_csv(os.path.join(output_dir, 'relation_{}_edgelist.csv').format(etype), index=False, header=True) |
| 107 | + logging.info("Wrote edgelist to: {}".format(os.path.join(output_dir, 'relation_{}_edgelist.csv').format(etype))) |
| 108 | + edges[etype] = edgelist |
| 109 | + return edges |
| 110 | + |
| 111 | + |
| 112 | +def create_homogeneous_edgelist(edges, output_dir): |
| 113 | + homogeneous_edges = [] |
| 114 | + for etype, relations in edges.items(): |
| 115 | + for edge_relation, frame in relations.groupby(etype): |
| 116 | + new_edges = [(a, b) for (a, b) in combinations(frame.TransactionID.values, 2) |
| 117 | + if (a, b) not in homogeneous_edges and (b, a) not in homogeneous_edges] |
| 118 | + homogeneous_edges.extend(new_edges) |
| 119 | + |
| 120 | + with open(os.path.join(output_dir, 'homogeneous_edgelist.csv'), 'w') as f: |
| 121 | + f.writelines(map(lambda x: "{}, {}\n".format(x[0], x[1]), homogeneous_edges)) |
| 122 | + logging.info("Wrote homogeneous edgelist to file: {}".format(os.path.join(output_dir, 'homogeneous_edgelist.csv'))) |
| 123 | + |
| 124 | + |
| 125 | +if __name__ == '__main__': |
| 126 | + logging = get_logger(__name__) |
| 127 | + |
| 128 | + args = parse_args() |
| 129 | + |
| 130 | + transactions, identity, _, _ = load_data(args.data_dir, |
| 131 | + args.transactions, |
| 132 | + args.identity, |
| 133 | + args.train_data_ratio, |
| 134 | + args.valid_data_ratio, |
| 135 | + args.output_dir) |
| 136 | + |
| 137 | + get_features_and_labels(transactions, args.id_cols, args.cat_cols, args.cat_cols_xgboost, args.output_dir) |
| 138 | + relational_edges = get_relations_and_edgelist(transactions, identity, args.id_cols, args.output_dir) |
| 139 | + |
| 140 | + if args.construct_homogeneous: |
| 141 | + create_homogeneous_edgelist(relational_edges, args.output_dir) |
| 142 | + |
| 143 | + |
| 144 | + |
0 commit comments