Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion RecommenderSystems/deepfm/deepfm_train_eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ def __init__(
self.shard_count = shard_count
self.cur_shard = cur_shard

fields = ["Label"]
fields = ["label"]
fields += [f"I{i+1}" for i in range(num_dense_fields)]
fields += [f"C{i+1}" for i in range(num_sparse_fields)]
self.fields = fields
Expand Down
99 changes: 99 additions & 0 deletions RecommenderSystems/deepfm/tools/make_parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
"""
Copyright 2020 The OneFlow Authors. All rights reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import os
import json
import argparse
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql.functions import rand, udf, lit, xxhash64
from pyspark.sql.types import FloatType


def make_dataframe(spark, input_files):
sparse_names = [f"C{i}" for i in range(1, 27)]
dense_names = [f"I{i}" for i in range(1, 14)]
column_names = ["label"] + dense_names + sparse_names

make_label = udf(lambda s: float(s), FloatType())
label_col = make_label("label").alias("label")

dense_cols = [xxhash64(Ii, lit(i - 1)).alias(Ii) for i, Ii in enumerate(dense_names)]
sparse_cols = [xxhash64(Ci, lit(i - 1)).alias(Ci) for i, Ci in enumerate(sparse_names, start=len(dense_names))]

cols = [label_col] + dense_cols + sparse_cols

df = (
spark.read.options(delimiter="\t")
.csv(input_files)
.toDF(*column_names)
.select(cols)
)
return df


def make_kaggle_parquet(input_dir, output_dir, spark_driver_memory_gb=32):
train_txt = os.path.join(input_dir, "train.txt")
assert os.path.isfile(train_txt), f"Can not find train.txt in folder {input_dir}."

meta = {}

# start spark session
conf = SparkConf()
conf.set("spark.driver.memory", f"{spark_driver_memory_gb}g")
spark = SparkSession.builder.config(conf=conf).master("local[*]").getOrCreate()

print("reading raw train.txt ...")
df = make_dataframe(spark, train_txt)
meta["field_dtypes"] = df.dtypes

train_df, test_df, val_df = df.randomSplit([0.8, 0.1, 0.1])

print("saving dataset ...")
train_df.orderBy(rand()).write.mode("overwrite").parquet(os.path.join(output_dir, "train"))
test_df.write.mode("overwrite").parquet(os.path.join(output_dir, "test"))
val_df.write.mode("overwrite").parquet(os.path.join(output_dir, "val"))

print("calculating number of samples ...")
meta["num_train_samples"] = train_df.count()
meta["num_test_samples"] = test_df.count()
meta["num_val_samples"] = val_df.count()

print("calculating table size array ...")
col_names = [f"I{i}" for i in range(1, 14)] + [f"C{i}" for i in range(1, 27)]
table_size_array = [df.select(c).distinct().count() for c in col_names]
meta["table_size_array"] = table_size_array

print("save meta.json")
meta_json = os.path.join(output_dir, "meta.json")
with open(meta_json, "w") as fp:
json.dump(meta, fp)
print(meta)
return meta_json


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--input_dir",
type=str,
default="/data",
help="Path to downloaded and unzipd criteo kaggle datasets: train.txt test.txt",
)
parser.add_argument("--output_dir", type=str, required=True)
parser.add_argument("--spark_driver_memory_gb", type=int, default=32)
args = parser.parse_args()
make_kaggle_parquet(args.input_dir, args.output_dir, args.spark_driver_memory_gb)

7 changes: 7 additions & 0 deletions RecommenderSystems/test/install_oneflow.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
apt-get update
apt-get install -y default-jdk
python3 -m pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple
python3 -m pip install -r /workspace/test/requirements.txt
python3 -m pip uninstall -y oneflow
python3 -m pip install --pre oneflow -f https://staging.oneflow.info/branch/master/cu112
python3 -m oneflow --doctor
19 changes: 19 additions & 0 deletions RecommenderSystems/test/launch.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
image_tag=oneflowinc/oneflow:nightly-cuda11.2
raw_data_dir=/data
parquet_data_dir=/workspace/kaggle_parquet
script_path=/workspace/deepfm/deepfm_train_eval.py

cmd_install_oneflow="/workspace/test/install_oneflow.sh | tee install.log"
cmd_make_dataset="python3 /workspace/deepfm/tools/make_parquet.py --input_dir=$raw_data_dir --output_dir=${parquet_data_dir} | tee make_dataset.log"
cmd_test_deepfm="python3 /workspace/test/test_deepfm.py --data_dir=$parquet_data_dir --script_path=$script_path"

docker run --privileged --network=host --ipc=host --gpus=all \
-d \
-v $PWD:/workspace \
-v /data/criteo_kaggle/dac:$raw_data_dir \
-w /workspace \
$image_tag \
/bin/sh \
-c "$cmd_install_oneflow ; $cmd_make_dataset ; $cmd_test_deepfm"

docker rm $(docker stop $(docker ps -a -q --filter ancestor=$image_tag --format="{{.ID}}"))
2 changes: 2 additions & 0 deletions RecommenderSystems/test/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
petastorm
psutil
50 changes: 50 additions & 0 deletions RecommenderSystems/test/test_deepfm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import os
import sys
import json
import argparse


def run_command():
pass

def prepare_args(kwargs):
str_args = ""
for k, v in kwargs.items():
str_args += f"--{k}={v} "
return str_args


if __name__ == "__main__":
persistent_path = "persistent"
parser = argparse.ArgumentParser()
parser.add_argument("--data_dir", type=str, required=True)
parser.add_argument("--script_path", type=str, required=True)
args = parser.parse_args()

meta_file = os.path.join(args.data_dir, "meta.json")
assert os.path.isfile(meta_file)
with open(meta_file, "r") as fp:
kwargs = json.load(fp)
del kwargs["field_dtypes"]
kwargs["data_dir"] = args.data_dir
kwargs["table_size_array"] = ",".join([str(s) for s in kwargs["table_size_array"]])
kwargs["persistent_path"] = persistent_path
kwargs["store_type"] = "device_mem"
kwargs["embedding_vec_size"] = 10

str_args = prepare_args(kwargs)

script_path = "/data/xiexuan/git-repos/models/RecommenderSystems/deepfm/deepfm_train_eval.py"

dl = sys.executable + " -m oneflow.distributed.launch "
dl += "--nproc_per_node 4 "
dl += "--nnodes 1 "
dl += "--node_rank 0 "
dl += "--master_addr 127.0.0.1 "
dl += f"{args.script_path} "

cmd = dl + str_args
print(cmd)
os.system(f'rm -rf {persistent_path}/*')
os.system(cmd + f" | tee t.log")