diff --git a/src/phoenix/config/config.go b/src/phoenix/config/config.go index 29f76db..3646aa1 100644 --- a/src/phoenix/config/config.go +++ b/src/phoenix/config/config.go @@ -34,10 +34,11 @@ type TaskSchedulerConfig struct { } type WorkerGodConfig struct { - Addr string - WorkerGod phoenix.WorkerGod - Ready chan bool + Addr string + WorkerGod phoenix.WorkerGod + Ready chan bool } + //type JobGeneratorConfig struct { // Seed int // TaskDuration int @@ -90,9 +91,9 @@ func (pc *PhoenixConfig) NewExecutorConfig(i int, ec phoenix.ExecutorInterface) func (pc *PhoenixConfig) NewWorkerGodConfig(i int, ww phoenix.WorkerGod) *WorkerGodConfig { return &WorkerGodConfig{ - Addr: pc.WorkerGods[i], + Addr: pc.WorkerGods[i], WorkerGod: ww, - Ready: make(chan bool, 1), + Ready: make(chan bool, 1), } } diff --git a/src/phoenix/config/multi_node.conf b/src/phoenix/config/multi_node.conf index 5fb27d5..7595e27 100644 --- a/src/phoenix/config/multi_node.conf +++ b/src/phoenix/config/multi_node.conf @@ -1,18 +1,18 @@ { - "NumSlots": 4, - "Frontends": [ - "172.31.28.99:31363" - ], - "Schedulers": [ - "172.31.31.12:31359" - ], - "Monitors": [ - "172.31.22.104:31360" - ], - "Executors": [ - "172.31.22.104:31361" - ], - "WorkerGods": [ - "172.31.22.104:31362" - ] + "NumSlots": 4, + "Frontends": [ + "172.31.28.99:31363" + ], + "Schedulers": [ + "172.31.31.12:31359" + ], + "Monitors": [ + "172.31.22.104:31360" + ], + "Executors": [ + "172.31.22.104:31361" + ], + "WorkerGods": [ + "172.31.22.104:31362" + ] } diff --git a/src/phoenix/frontend/manual-1/g_emulation_generator.go b/src/phoenix/frontend/manual-1/g_emulation_generator.go new file mode 100644 index 0000000..1cad531 --- /dev/null +++ b/src/phoenix/frontend/manual-1/g_emulation_generator.go @@ -0,0 +1,52 @@ +package main + +import "math/rand" + +type GoogleClusterTaskGenerator struct { + TaskDuration float64 + RandSeed int64 + TaskCount int +} + +var googleTimeFactors = []int{ + 51182, + 61100, + 76970, + 96318, + 102699, + 106596, + 110659, + 111951, + 112349, + 114887, + 123163, + 129392, + 129573, + 129698, + 129844, + 129954} + +const GOOGLE_TIME_MAX_RANGE = 129954 + +func NewGoogleClusterTaskGenerator(taskDuration float64, randSeed int64, taskCount int) GoogleClusterTaskGenerator { + ret := GoogleClusterTaskGenerator{ + TaskDuration: taskDuration, + RandSeed: randSeed, + TaskCount: taskCount, + } + + rand.Seed(randSeed) + return ret +} + +func (g GoogleClusterTaskGenerator) GetTaskDuration() float64 { + targetRange := rand.Int() % GOOGLE_TIME_MAX_RANGE + + for i := 0; i < len(googleTimeFactors); i++ { + if googleTimeFactors[i] >= targetRange { + return g.TaskDuration * (float64(i) + 1.0) + } + } + + return g.TaskDuration +} diff --git a/src/phoenix/frontend/manual-1/main.go b/src/phoenix/frontend/manual-1/main.go index 1f8ee71..a39c092 100644 --- a/src/phoenix/frontend/manual-1/main.go +++ b/src/phoenix/frontend/manual-1/main.go @@ -14,12 +14,16 @@ import ( "time" ) +const DefaultRandSeed int64 = -13131313 + var ( frc = flag.String("conf", config.DefaultConfigPath, "config file") useRand = flag.Bool("useRand", false, "use random seed to generate job, default to hash based on address") jobCount = flag.Int("jobCount", 10, "number of job to generate") taskCount = flag.Int("taskCount", 10, "number of task in a job") meanDuration = flag.Float64("jobDuration", 3.0, "job duration in second") + randSeed = flag.Int64("randSeed", DefaultRandSeed, "task generation seed") + gEmulation = flag.Bool("gEmu", false, "use google cluster workload pattern") ) func noError(e error) { @@ -54,11 +58,21 @@ func main() { <-feConfig.Ready if *useRand { - rand.Seed(time.Now().UnixNano()) + // The case where we did not pass in a seed but still want to be rand + if *randSeed == DefaultRandSeed { + rand.Seed(time.Now().UnixNano()) + } else { + rand.Seed(*randSeed) + } } else { - // just some random number here - var randSeed int64 = 1111 - rand.Seed(randSeed) + // just some random number here for the purpose of predictable workload emulation + var localRandSeed int64 = 1111 + rand.Seed(localRandSeed) + } + + var gGenerator GoogleClusterTaskGenerator + if *gEmulation { + gGenerator = NewGoogleClusterTaskGenerator(*meanDuration, *randSeed, *taskCount) } numTasks := *taskCount @@ -78,6 +92,10 @@ func main() { currTaskDuration *= rand.ExpFloat64() } + if *gEmulation { + currTaskDuration = gGenerator.GetTaskDuration() + } + for j := 0; j < numTasks; j++ { taskid := jobid + "-task" + strconv.Itoa(j) @@ -121,7 +139,7 @@ func main() { // We can use worker-god to start or kill more jobs here - <- allJobsDoneSignal + <-allJobsDoneSignal slotCount := len(rc.Executors) * rc.NumSlots theoreticalLowerBound := sumOfTaskTimes / float64(slotCount) diff --git a/src/phoenix/init/init-config/main.go b/src/phoenix/init/init-config/main.go index 9bfd926..05e82be 100644 --- a/src/phoenix/init/init-config/main.go +++ b/src/phoenix/init/init-config/main.go @@ -33,7 +33,6 @@ func main() { phoenixConfig.Executors = make([]string, *nMonitors) phoenixConfig.WorkerGods = make([]string, *nMonitors) - ipAddrs := strings.Split(*ips, ",") if nMachine := len(ipAddrs); nMachine > 0 { for i := 0; i < *nSchedulers; i++ { diff --git a/src/phoenix/scheduler/task_scheduler.go b/src/phoenix/scheduler/task_scheduler.go index dbfd98f..64d3b45 100644 --- a/src/phoenix/scheduler/task_scheduler.go +++ b/src/phoenix/scheduler/task_scheduler.go @@ -18,7 +18,7 @@ type TaskScheduler struct { Addr string // map of addresses to Monitors that we are able to contact - MonitorClientPool map[string]*monitor.NodeMonitorClient + MonitorClientPool map[string]*monitor.NodeMonitorClient // addresses of our workers workerAddresses []string @@ -27,10 +27,10 @@ type TaskScheduler struct { workerAddrToTask map[string]map[string]bool // map of worker address to count per jobId reservations - workerAddrToJobReservations map[string]map[string] int + workerAddrToJobReservations map[string]map[string]int // lock around MonitorClientPool, workerAddresses, workerAddrToTask, workerIdReservations - workerLock sync.Mutex + workerLock sync.Mutex // Frontend Client Pool FrontendClientPool map[string]phoenix.FrontendInterface @@ -135,7 +135,6 @@ func (ts *TaskScheduler) watchWorkerNodes(zkHostPorts []string, ready chan bool) func (ts *TaskScheduler) rescheduleLostTasks(children []string) { - // create new client pool and newWorkerIds newClientPool := make(map[string]*monitor.NodeMonitorClient) newWorkerIds := make([]string, len(children)) @@ -275,7 +274,7 @@ func (ts *TaskScheduler) GetTask(taskRequest types.TaskRequest, outputTask *type // pendingTask is now inflight at workerAddrToTask _, exists := ts.workerAddrToTask[taskRequest.WorkerAddr] - if ! exists { + if !exists { ts.workerAddrToTask[taskRequest.WorkerAddr] = make(map[string]bool) } @@ -398,7 +397,7 @@ func (ts *TaskScheduler) enqueueJob(enqueueCount int, jobId string) error { targetMonitor, mExists := ts.MonitorClientPool[targetWorkerId] ts.workerLock.Unlock() - if ! mExists { + if !mExists { continue } @@ -411,12 +410,12 @@ func (ts *TaskScheduler) enqueueJob(enqueueCount int, jobId string) error { ts.workerLock.Lock() _, exists := ts.workerAddrToJobReservations[targetMonitor.Addr] - if ! exists { + if !exists { ts.workerAddrToJobReservations[targetMonitor.Addr] = make(map[string]int) } // targetMonitor.Addr has one more jobId in it's queue - ts.workerAddrToJobReservations[targetMonitor.Addr][jobId] ++ + ts.workerAddrToJobReservations[targetMonitor.Addr][jobId]++ ts.workerLock.Unlock() fmt.Printf("[TaskScheduler %s: enqueueJob]: Enqueuing reservation on monitor %s for job reservation %s\n", diff --git a/src/phoenix/types/types.go b/src/phoenix/types/types.go index 93d1a38..8e8c0c4 100644 --- a/src/phoenix/types/types.go +++ b/src/phoenix/types/types.go @@ -13,8 +13,8 @@ type Task struct { } type TaskRequest struct { - JobId string - WorkerAddr string + JobId string + WorkerAddr string } type WorkerTaskCompleteMsg struct { diff --git a/src/phoenix/worker-god/workergod.go b/src/phoenix/worker-god/workergod.go index 7ddaa6b..1f62027 100644 --- a/src/phoenix/worker-god/workergod.go +++ b/src/phoenix/worker-god/workergod.go @@ -13,22 +13,21 @@ import ( "time" ) - type WorkerWrapper struct { // map where key is worker index and value is processId for that worker - RunningMonitors map[int] *exec.Cmd - RunningExecutors map[int] *exec.Cmd + RunningMonitors map[int]*exec.Cmd + RunningExecutors map[int]*exec.Cmd // phoenix configuration - Config *config.PhoenixConfig + Config *config.PhoenixConfig } func NewWorkerGod(config *config.PhoenixConfig) phoenix.WorkerGod { return &WorkerWrapper{ - RunningMonitors: make(map[int]*exec.Cmd), + RunningMonitors: make(map[int]*exec.Cmd), RunningExecutors: make(map[int]*exec.Cmd), - Config: config, + Config: config, } } @@ -70,7 +69,7 @@ func (ww *WorkerWrapper) Kill(workerId int, ret *bool) error { return nil } -func (ww *WorkerWrapper) Start(workerId int, ret* bool) error { +func (ww *WorkerWrapper) Start(workerId int, ret *bool) error { fmt.Println("[WorkerWrapper: Start] workerId:", workerId) fmt.Println("[WorkerWrapper: Start] Start Timestamp: ", time.Now().UnixNano()) @@ -99,7 +98,6 @@ func (ww *WorkerWrapper) Start(workerId int, ret* bool) error { mtor := exec.Command("init-monitor", "-workerId", strconv.Itoa(workerId)) etor := exec.Command("init-executor", "-workerId", strconv.Itoa(workerId)) - ww.RunningMonitors[workerId] = mtor ww.RunningExecutors[workerId] = etor @@ -122,9 +120,9 @@ func (ww *WorkerWrapper) Start(workerId int, ret* bool) error { } // save output to logs - go writeToLog(mStdout, "logs/monitor_"+strconv.Itoa(workerId)+"_" + + go writeToLog(mStdout, "logs/monitor_"+strconv.Itoa(workerId)+"_"+ strconv.FormatInt(time.Now().Unix(), 36)+".log") - go writeToLog(eStdout, "logs/executor_"+strconv.Itoa(workerId)+"_" + + go writeToLog(eStdout, "logs/executor_"+strconv.Itoa(workerId)+"_"+ strconv.FormatInt(time.Now().Unix(), 36)+".log") *ret = true @@ -135,11 +133,9 @@ func writeToLog(out io.ReadCloser, logFileName string) { scanner := bufio.NewScanner(out) scanner.Split(bufio.ScanLines) - logFile, _ := os.OpenFile(logFileName, os.O_CREATE | os.O_RDWR, 0777) + logFile, _ := os.OpenFile(logFileName, os.O_CREATE|os.O_RDWR, 0777) for scanner.Scan() { m := scanner.Text() logFile.WriteString(m + "\n") } } - - diff --git a/src/phoenix/zk.go b/src/phoenix/zk.go index fce10f1..3e1f239 100644 --- a/src/phoenix/zk.go +++ b/src/phoenix/zk.go @@ -8,4 +8,5 @@ const ( ) var ZkLocalServers = []string{"localhost:2181"} + // var ZkLocalServers = []string{"172.31.28.99:2181", "172.31.31.12:2181", "172.31.22.104:2181"}