From 2038cbbba7ebfaa6cd14244f8454a568d9928cd9 Mon Sep 17 00:00:00 2001 From: Kao Makino Date: Wed, 6 Oct 2021 11:29:28 -0700 Subject: [PATCH 1/7] Always wait for the result once started --- joshua/joshua_model.py | 18 +++-- tests/sanity_test.sh | 14 ++++ tests/sanity_test_script.sh | 143 ++++++++++++++++++++++++++++++++++++ 3 files changed, 170 insertions(+), 5 deletions(-) create mode 100755 tests/sanity_test.sh create mode 100755 tests/sanity_test_script.sh diff --git a/joshua/joshua_model.py b/joshua/joshua_model.py index a3d12db..04f1391 100644 --- a/joshua/joshua_model.py +++ b/joshua/joshua_model.py @@ -595,13 +595,21 @@ def _add(tr: fdb.Transaction, ensemble_id: str, counter: str, value: int) -> Non tr.add(dir_all_ensembles[ensemble_id]["count"][counter], byte_val) -def _get_snap_counter(tr: fdb.Transaction, ensemble_id: str, counter: str) -> int: - value = tr.snapshot.get(dir_all_ensembles[ensemble_id]["count"][counter]) +def _get_counter_impl(tr: fdb.Transaction, ensemble_id: str, counter: str, snapshot: bool) -> int: + if snapshot: + value = tr.snapshot.get(dir_all_ensembles[ensemble_id]["count"][counter]) + else: + value = tr.get(dir_all_ensembles[ensemble_id]["count"][counter]) if value == None: return 0 else: return struct.unpack(" int: + return _get_counter_impl(tr, ensemble_id, counter, True) + +def _get_counter(tr: fdb.Transaction, ensemble_id: str, counter: str) -> int: + return _get_counter_impl(tr, ensemble_id, counter, False) def _get_seeds_and_heartbeats( ensemble_id: str, tr: fdb.Transaction @@ -768,9 +776,9 @@ def _insert_results( results = dir_ensemble_results_pass if max_runs > 0: - # This is a snapshot read so that two insertions don't conflict. - ended = _get_snap_counter(tr, ensemble_id, "ended") - if ended >= max_runs: + started = _get_counter(tr, ensemble_id, "started") + ended = _get_counter(tr, ensemble_id, "ended") + if ended >= max(max_runs, started): _stop_ensemble(tr, ensemble_id, sanity) if duration: diff --git a/tests/sanity_test.sh b/tests/sanity_test.sh new file mode 100755 index 0000000..219620d --- /dev/null +++ b/tests/sanity_test.sh @@ -0,0 +1,14 @@ +#!/bin/bash + +tag=latest +if [ $# -eq 1 ]; then + tag=$1 +fi +docker run -t --rm --name joshua-test -u root:root -v $(pwd):/joshua foundationdb/joshua-agent:${tag} /joshua/sanity_test_script.sh +rc=$? +if [ $rc -eq 0 ]; then + echo "PASSED!" +else + echo "FAILED!" +fi +exit $rc diff --git a/tests/sanity_test_script.sh b/tests/sanity_test_script.sh new file mode 100755 index 0000000..f578007 --- /dev/null +++ b/tests/sanity_test_script.sh @@ -0,0 +1,143 @@ +#!/bin/bash + +one_agent() { + python -m joshua.joshua_agent -C ${FDB_CLUSTER_FILE} --work_dir /tmp/work --agent-idle-timeout 5 +} + +two_agent() { + python -m joshua.joshua_agent -C ${FDB_CLUSTER_FILE} --work_dir /tmp/work/1 --agent-idle-timeout 5 & + pid1=$! + python -m joshua.joshua_agent -C ${FDB_CLUSTER_FILE} --work_dir /tmp/work/2 --agent-idle-timeout 5 & + pid2=$! + wait $pid1 $pid2 +} + +two_agent_kill_one() { + python -m joshua.joshua_agent -C ${FDB_CLUSTER_FILE} --work_dir /tmp/work/1 --agent-idle-timeout 5 & + pid1=$! + python -m joshua.joshua_agent -C ${FDB_CLUSTER_FILE} --work_dir /tmp/work/2 --agent-idle-timeout 5 & + pid2=$! + sleep 5 + kill $pid1 + wait $pid2 +} + +source /opt/rh/rh-python38/enable + +# get latest fdb version +fdbver=$(curl --silent https://www.foundationdb.org/downloads/version.txt) + +# fdb_binaries +curl --silent https://www.foundationdb.org/downloads/${fdbver}/linux/fdb_${fdbver}.tar.gz -o fdb_${fdbver}.tar.gz +tar xzf fdb_${fdbver}.tar.gz +export PATH=$(pwd)/fdb_binaries:${PATH} + +# libfdb_c.so +curl --silent https://www.foundationdb.org/downloads/${fdbver}/linux/libfdb_c_${fdbver}.so -o libfdb_c.so +chmod +x libfdb_c.so +export LD_LIBRARY_PATH=$(pwd):${LD_LIBRARY_PATH} + +# python binding +curl --silent https://www.foundationdb.org/downloads/${fdbver}/bindings/python/foundationdb-${fdbver}.tar.gz -o foundationdb-${fdbver}.tar.gz +tar xzf foundationdb-${fdbver}.tar.gz +export PYTHONPATH=$(pwd)/foundationdb-${fdbver} + +# generate fdb.cluster +echo "joshua:joshua@$(hostname -I | tr -d ' '):4500" > fdb.cluster +export FDB_CLUSTER_FILE=$(pwd)/fdb.cluster + +# start fdb +mkdir data logs +fdbserver \ + --datadir $(pwd)/data/4500 \ + --listen_address public \ + --logdir $(pwd)/logs \ + --public_address auto:4500 \ + --trace_format json > fdb.log 2>&1 & +fdbpid=$! +sleep 1 +fdbcli --exec 'configure new single ssd' + + +# create joshua test package +cat > joshua_test < joshua_timeout < Date: Wed, 6 Oct 2021 15:53:54 -0700 Subject: [PATCH 2/7] Ignore cancelled tests --- joshua/joshua_model.py | 8 ++++++++ tests/sanity_test_script.sh | 16 ++++++++++++---- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/joshua/joshua_model.py b/joshua/joshua_model.py index 04f1391..f5035d0 100644 --- a/joshua/joshua_model.py +++ b/joshua/joshua_model.py @@ -50,6 +50,7 @@ FDBError = fdb.FDBError ONE = b"\x01" + b"\x00" * 7 +NEGATIVE_ONE = struct.pack(' None: tr.add(dir_all_ensembles[ensemble_id]["count"][counter], ONE) +def _decrement(tr: fdb.Transaction, ensemble_id: str, counter: str) -> None: + tr.add(dir_all_ensembles[ensemble_id]["count"][counter], NEGATIVE_ONE) + + def _add(tr: fdb.Transaction, ensemble_id: str, counter: str, value: int) -> None: byte_val = struct.pack(" bool: ) ) + # ignore cancelled test + _decrement(tr, ensemble_id, "started") + # If we read at snapshot isolation then an arbitrary number of agents could steal this run/seed. # We only want one agent to succeed in taking over for the dead agent's run/seed. tr.add_read_conflict_key( diff --git a/tests/sanity_test_script.sh b/tests/sanity_test_script.sh index f578007..0b10291 100755 --- a/tests/sanity_test_script.sh +++ b/tests/sanity_test_script.sh @@ -81,9 +81,12 @@ tar czf test.tar.gz joshua_test joshua_timeout mkdir /tmp/work -failures=0 +total=0 +pass=0 +fail=0 for test in one_agent two_agent two_agent_kill_one; do + (( total++ )) echo "=== TEST: ${test} ===" python -m joshua.joshua start --tarball test.tar.gz --max-runs 6 python -m joshua.joshua list @@ -104,15 +107,17 @@ for test in one_agent two_agent two_agent_kill_one; do done if [ $pass -eq $ended ] && [ $ended -eq $max_runs ]; then echo "Pass: pass:$pass ended:$ended max_runs:$max_runs" + (( pass++ )) else echo "Fail: pass:$pass ended:$ended max_runs:$max_runs" - (( failures++ )) + (( fail++ )) fi python -m joshua.joshua delete -y ${ensemble} done # timeout test for test in two_agent; do + (( total++ )) echo "=== TEST: ${test} TIMEOUT ===" python -m joshua.joshua start --tarball test.tar.gz --max-runs 6 --timeout 2 python -m joshua.joshua list @@ -130,14 +135,17 @@ for test in two_agent; do done if [ $fail -eq $ended ] && [ $fail -eq 2 ]; then echo "Pass: fail:$fail == ended:$ended" + (( pass++ )) else echo "Fail: fail:$fail != ended:$ended or fail:$fail != 2" - (( failures++ )) + (( fail++ )) fi python -m joshua.joshua delete -y ${ensemble} done kill -9 ${fdbpid} -exit $failures +echo "${pass} / ${total} passed" + +exit $fail From c01defeb4a05f1adef2a277540ba62c22087f397 Mon Sep 17 00:00:00 2001 From: Kao Makino Date: Wed, 6 Oct 2021 16:16:52 -0700 Subject: [PATCH 3/7] Do not record cancelled tests --- joshua/joshua_agent.py | 39 +++++++------------------------------ tests/sanity_test_script.sh | 22 ++++++++++----------- 2 files changed, 18 insertions(+), 43 deletions(-) diff --git a/joshua/joshua_agent.py b/joshua/joshua_agent.py index 0426cc0..c9917c4 100644 --- a/joshua/joshua_agent.py +++ b/joshua/joshua_agent.py @@ -110,25 +110,6 @@ def stopAgent(): return stop_agent -def trim_jobqueue(cutoff_date, remove_jobs=True): - global job_queue - jobs_pass = 0 - jobs_fail = 0 - cutoff_string = joshua_model.format_datetime(cutoff_date) - - for job_record in list(job_queue.queue): - (result, job_date) = fdb.tuple.unpack(job_record) - if job_date <= cutoff_string: - if remove_jobs: - old_record = job_queue.get_nowait() - elif result == 0: - jobs_pass += 1 - else: - jobs_fail += 1 - - return (jobs_pass + jobs_fail, jobs_pass, jobs_fail) - - def log(outputText, newline=True): return ( print(outputText, file=getFileHandle()) @@ -320,7 +301,8 @@ def remove_old_artifacts(path, age=24 * 60 * 60): # Returns whether the artifacts should be saved based on run state. def should_save(retcode, save_on="FAILURE"): - return save_on == "ALWAYS" or save_on == "FAILURE" and retcode != 0 + # do not save when cancelled (retcode == -1) + return save_on == "ALWAYS" or save_on == "FAILURE" and retcode != 0 and retcode != -1 # Removes artifacts from the current run, saving them if necessary. @@ -411,7 +393,6 @@ def run_ensemble( :param sanity: :return: 0 for success """ - global jobs_pass, jobs_fail if not work_dir: raise JoshuaError( "Unable to run function since work_dir is not defined. Exiting. (CWD=" @@ -556,6 +537,11 @@ def run_ensemble( cleanup(ensemble, where, seed, retcode, save_on, work_dir=work_dir) + if retcode == -1: + # no results to record when cancelled + self._retcode = retcode + return + try: joshua_model.insert_results( ensemble, @@ -582,17 +568,6 @@ def run_ensemble( duration, ) - # Add the result to the job queue - job_queue.put(fdb.tuple.pack((retcode, done_timestamp))) - - # Update the job counts - job_mutex.acquire() - if retcode == 0: - jobs_pass += 1 - else: - jobs_fail += 1 - job_mutex.release() - self._retcode = retcode diff --git a/tests/sanity_test_script.sh b/tests/sanity_test_script.sh index 0b10291..317c798 100755 --- a/tests/sanity_test_script.sh +++ b/tests/sanity_test_script.sh @@ -81,12 +81,12 @@ tar czf test.tar.gz joshua_test joshua_timeout mkdir /tmp/work -total=0 -pass=0 -fail=0 +total_tests=0 +total_passed=0 +total_failed=0 for test in one_agent two_agent two_agent_kill_one; do - (( total++ )) + (( total_tests++ )) echo "=== TEST: ${test} ===" python -m joshua.joshua start --tarball test.tar.gz --max-runs 6 python -m joshua.joshua list @@ -107,17 +107,17 @@ for test in one_agent two_agent two_agent_kill_one; do done if [ $pass -eq $ended ] && [ $ended -eq $max_runs ]; then echo "Pass: pass:$pass ended:$ended max_runs:$max_runs" - (( pass++ )) + (( total_passed++ )) else echo "Fail: pass:$pass ended:$ended max_runs:$max_runs" - (( fail++ )) + (( total_failed++ )) fi python -m joshua.joshua delete -y ${ensemble} done # timeout test for test in two_agent; do - (( total++ )) + (( total_tests++ )) echo "=== TEST: ${test} TIMEOUT ===" python -m joshua.joshua start --tarball test.tar.gz --max-runs 6 --timeout 2 python -m joshua.joshua list @@ -135,17 +135,17 @@ for test in two_agent; do done if [ $fail -eq $ended ] && [ $fail -eq 2 ]; then echo "Pass: fail:$fail == ended:$ended" - (( pass++ )) + (( total_passed++ )) else echo "Fail: fail:$fail != ended:$ended or fail:$fail != 2" - (( fail++ )) + (( total_failed++ )) fi python -m joshua.joshua delete -y ${ensemble} done kill -9 ${fdbpid} -echo "${pass} / ${total} passed" +echo "${total_passed} / ${total_tests} passed" -exit $fail +exit $total_failed From 435b390f82df73f07e88eb14b7a73cce28df78f0 Mon Sep 17 00:00:00 2001 From: Kao Makino Date: Wed, 6 Oct 2021 16:33:51 -0700 Subject: [PATCH 4/7] Timeout agent if no ensembles to run --- joshua/joshua_agent.py | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/joshua/joshua_agent.py b/joshua/joshua_agent.py index c9917c4..2033993 100644 --- a/joshua/joshua_agent.py +++ b/joshua/joshua_agent.py @@ -748,14 +748,9 @@ def agent( ensembles_can_run = list( filter(joshua_model.should_run_ensemble, ensembles) ) - if not ensembles_can_run: - # All the ensembles have enough runs started for now. Don't - # time the agent out, just wait until there are no - # ensembles or the other agents might have died. - time.sleep(1) - continue - else: - # No ensembles at all. Consider timing this agent out. + + if not ensembles or (ensembles and not ensembles_can_run): + # No ensembles to run. Consider timing this agent out. try: watch.wait_for_any(watch, sanity_watch, TimeoutFuture(1.0)) except Exception as e: From 130b644023192c61f748c74e389b789cd4a87573 Mon Sep 17 00:00:00 2001 From: Kao Makino Date: Thu, 7 Oct 2021 19:51:41 -0700 Subject: [PATCH 5/7] Fix dead agent test --- test_joshua_model.py | 5 +- tests/sanity_test.sh | 14 ---- tests/sanity_test_script.sh | 151 ------------------------------------ 3 files changed, 4 insertions(+), 166 deletions(-) delete mode 100755 tests/sanity_test.sh delete mode 100755 tests/sanity_test_script.sh diff --git a/test_joshua_model.py b/test_joshua_model.py index 13a1bc7..12f352a 100644 --- a/test_joshua_model.py +++ b/test_joshua_model.py @@ -168,12 +168,15 @@ def test_dead_agent(tmp_path, empty_ensemble): # simulate another agent dying after starting a test assert joshua_model.try_starting_test(ensemble_id, 12345) + # agent needs to wait for >10 seconds in order to detect the dead agent. + # in the real deployment, agent should exit early + # and scaler should spin up another one if necessary. agent = threading.Thread( target=joshua_agent.agent, args=(), kwargs={ "work_dir": tmp_path, - "agent_idle_timeout": 1, + "agent_idle_timeout": 15, }, ) agent.setDaemon(True) diff --git a/tests/sanity_test.sh b/tests/sanity_test.sh deleted file mode 100755 index 219620d..0000000 --- a/tests/sanity_test.sh +++ /dev/null @@ -1,14 +0,0 @@ -#!/bin/bash - -tag=latest -if [ $# -eq 1 ]; then - tag=$1 -fi -docker run -t --rm --name joshua-test -u root:root -v $(pwd):/joshua foundationdb/joshua-agent:${tag} /joshua/sanity_test_script.sh -rc=$? -if [ $rc -eq 0 ]; then - echo "PASSED!" -else - echo "FAILED!" -fi -exit $rc diff --git a/tests/sanity_test_script.sh b/tests/sanity_test_script.sh deleted file mode 100755 index 317c798..0000000 --- a/tests/sanity_test_script.sh +++ /dev/null @@ -1,151 +0,0 @@ -#!/bin/bash - -one_agent() { - python -m joshua.joshua_agent -C ${FDB_CLUSTER_FILE} --work_dir /tmp/work --agent-idle-timeout 5 -} - -two_agent() { - python -m joshua.joshua_agent -C ${FDB_CLUSTER_FILE} --work_dir /tmp/work/1 --agent-idle-timeout 5 & - pid1=$! - python -m joshua.joshua_agent -C ${FDB_CLUSTER_FILE} --work_dir /tmp/work/2 --agent-idle-timeout 5 & - pid2=$! - wait $pid1 $pid2 -} - -two_agent_kill_one() { - python -m joshua.joshua_agent -C ${FDB_CLUSTER_FILE} --work_dir /tmp/work/1 --agent-idle-timeout 5 & - pid1=$! - python -m joshua.joshua_agent -C ${FDB_CLUSTER_FILE} --work_dir /tmp/work/2 --agent-idle-timeout 5 & - pid2=$! - sleep 5 - kill $pid1 - wait $pid2 -} - -source /opt/rh/rh-python38/enable - -# get latest fdb version -fdbver=$(curl --silent https://www.foundationdb.org/downloads/version.txt) - -# fdb_binaries -curl --silent https://www.foundationdb.org/downloads/${fdbver}/linux/fdb_${fdbver}.tar.gz -o fdb_${fdbver}.tar.gz -tar xzf fdb_${fdbver}.tar.gz -export PATH=$(pwd)/fdb_binaries:${PATH} - -# libfdb_c.so -curl --silent https://www.foundationdb.org/downloads/${fdbver}/linux/libfdb_c_${fdbver}.so -o libfdb_c.so -chmod +x libfdb_c.so -export LD_LIBRARY_PATH=$(pwd):${LD_LIBRARY_PATH} - -# python binding -curl --silent https://www.foundationdb.org/downloads/${fdbver}/bindings/python/foundationdb-${fdbver}.tar.gz -o foundationdb-${fdbver}.tar.gz -tar xzf foundationdb-${fdbver}.tar.gz -export PYTHONPATH=$(pwd)/foundationdb-${fdbver} - -# generate fdb.cluster -echo "joshua:joshua@$(hostname -I | tr -d ' '):4500" > fdb.cluster -export FDB_CLUSTER_FILE=$(pwd)/fdb.cluster - -# start fdb -mkdir data logs -fdbserver \ - --datadir $(pwd)/data/4500 \ - --listen_address public \ - --logdir $(pwd)/logs \ - --public_address auto:4500 \ - --trace_format json > fdb.log 2>&1 & -fdbpid=$! -sleep 1 -fdbcli --exec 'configure new single ssd' - - -# create joshua test package -cat > joshua_test < joshua_timeout < Date: Mon, 1 Nov 2021 17:56:57 +0000 Subject: [PATCH 6/7] fixed concurrency bug where _decrement is called multiple times by agents attempting to take over a dead agent's run --- joshua/joshua_model.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/joshua/joshua_model.py b/joshua/joshua_model.py index f5035d0..d7b3fd2 100644 --- a/joshua/joshua_model.py +++ b/joshua/joshua_model.py @@ -664,16 +664,20 @@ def should_run_ensemble(tr: fdb.Transaction, ensemble_id: str) -> bool: ) ) - # ignore cancelled test - _decrement(tr, ensemble_id, "started") - - # If we read at snapshot isolation then an arbitrary number of agents could steal this run/seed. + # Without this, an arbitrary number of agents could steal this run/seed. # We only want one agent to succeed in taking over for the dead agent's run/seed. - tr.add_read_conflict_key( + # Added a write conflict key so that only one agent could read the key + # and do clean up operations + tr.add_write_conflict_key( dir_ensemble_incomplete[ensemble_id]["heartbeat"][max_seed] ) - del tr[dir_ensemble_incomplete[ensemble_id][max_seed].range()] - del tr[dir_ensemble_incomplete[ensemble_id]["heartbeat"][max_seed]] + + if dir_ensemble_incomplete[ensemble_id]["heartbeat"][max_seed]: + # ignore cancelled test + _decrement(tr, ensemble_id, "started") + del tr[dir_ensemble_incomplete[ensemble_id][max_seed].range()] + del tr[dir_ensemble_incomplete[ensemble_id]["heartbeat"][max_seed]] + return True return False else: From 5ed130b571f8126d6e3f4d0e518e4b09a82e1c56 Mon Sep 17 00:00:00 2001 From: QA Hoang Date: Wed, 3 Nov 2021 03:56:15 +0000 Subject: [PATCH 7/7] attempt to fix started counter --- joshua/joshua_agent.py | 1 + joshua/joshua_model.py | 26 +++++++++++++++----------- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/joshua/joshua_agent.py b/joshua/joshua_agent.py index 2033993..c61e1fe 100644 --- a/joshua/joshua_agent.py +++ b/joshua/joshua_agent.py @@ -540,6 +540,7 @@ def run_ensemble( if retcode == -1: # no results to record when cancelled self._retcode = retcode + joshua_model.cancel_agent_cleanup(ensemble) return try: diff --git a/joshua/joshua_model.py b/joshua/joshua_model.py index d7b3fd2..1a0513b 100644 --- a/joshua/joshua_model.py +++ b/joshua/joshua_model.py @@ -664,20 +664,13 @@ def should_run_ensemble(tr: fdb.Transaction, ensemble_id: str) -> bool: ) ) - # Without this, an arbitrary number of agents could steal this run/seed. + # If we read at snapshot isolation then an arbitrary number of agents could steal this run/seed. # We only want one agent to succeed in taking over for the dead agent's run/seed. - # Added a write conflict key so that only one agent could read the key - # and do clean up operations - tr.add_write_conflict_key( + tr.add_read_conflict_key( dir_ensemble_incomplete[ensemble_id]["heartbeat"][max_seed] ) - - if dir_ensemble_incomplete[ensemble_id]["heartbeat"][max_seed]: - # ignore cancelled test - _decrement(tr, ensemble_id, "started") - del tr[dir_ensemble_incomplete[ensemble_id][max_seed].range()] - del tr[dir_ensemble_incomplete[ensemble_id]["heartbeat"][max_seed]] - + del tr[dir_ensemble_incomplete[ensemble_id][max_seed].range()] + del tr[dir_ensemble_incomplete[ensemble_id]["heartbeat"][max_seed]] return True return False else: @@ -998,3 +991,14 @@ def get_agent_failures(tr, time_start=None, time_end=None): failures.append((info, msg)) return failures + +@transactional +def cancel_agent_cleanup(tr, ensemble_id): + """ + Clean-up method for when an agent takes a job but gets cancelled + """ + + # TODO(qhoang) let's try this but there must be a better way + # When an agent is cancelled, it has already incremented the __started__ counter + # but will never get to increment the __ended__ counter + _decrement(tr, ensemble_id, "started") \ No newline at end of file