diff --git a/src/qana/RelationGraph.cc b/src/qana/RelationGraph.cc index 773a190b78..968a9aebf1 100644 --- a/src/qana/RelationGraph.cc +++ b/src/qana/RelationGraph.cc @@ -33,6 +33,7 @@ #include #include #include +#include // LSST headers #include "lsst/log/Log.h" @@ -79,10 +80,12 @@ using query::QueryTemplate; using query::SelectStmt; using query::TableRef; using query::TableRefList; +using query::ValueExpr; using query::ValueExprPtr; using query::ValueExprPtrVector; using query::ValueFactor; using query::ValueFactorPtr; +using query::WhereClause; std::ostream& operator<<(std::ostream& out, Vertex const& vertex) { out << "Vertex(" @@ -155,6 +158,13 @@ void Vertex::insert(Edge const& e) { } } +std::shared_ptr Vertex::makeSubChunkCompPredicate() const { + auto const table = tr.hasAlias() ? tr.getAlias() : info->table; + return std::make_shared(ValueExpr::newColumnExpr("", table, "", SUB_CHUNK_COLUMN), + CompPredicate::EQUALS_OP, + ValueExpr::newSimple(ValueFactor::newConstFactor(SUBCHUNK_TAG))); +} + // ---------------------------------------------------------------- // RelationGraph implementation @@ -1029,12 +1039,14 @@ void RelationGraph::rewrite(SelectStmtPtrVector& outputs, QueryMapping& mapping) // Find directors for which overlap is required. At the same time, rewrite // all table references as their corresponding chunk templates. std::vector overlapRefs; + LOGS(_log, LOG_LVL_INFO, "1: rewrite _vertices.size()=" << _vertices.size()); for (ListIter i = _vertices.begin(), e = _vertices.end(); i != e; ++i) { i->rewriteAsChunkTemplate(); if (i->info->kind == TableInfo::DIRECTOR && i->overlap > 0.0) { overlapRefs.push_back(&(*i)); } } + LOGS(_log, LOG_LVL_INFO, "2: rewrite overlapRefs.size()=" << overlapRefs.size()); if (overlapRefs.empty()) { // There is no need for sub-chunking, so leave it off for now. // @@ -1058,31 +1070,80 @@ void RelationGraph::rewrite(SelectStmtPtrVector& outputs, QueryMapping& mapping) "references that require overlap"); } + // Add sub-chunk constraint for all director & match tables + // Collect full table names (or aliases) in a set to avoid duplicates in setting sub-chunk constraints + std::unordered_set tableNames; + for (ListIter i = _vertices.begin(), e = _vertices.end(); i != e; ++i) { + std::string kindName; + switch (i->info->kind) { + case TableInfo::DIRECTOR: + kindName = "DIRECTOR"; + break; + case TableInfo::CHILD: + kindName = "CHILD"; + break; + case TableInfo::MATCH: + kindName = "MATCH"; + break; + default: + kindName = "UNKNOWN"; + break; + } + LOGS(_log, LOG_LVL_INFO, + "3: rewrite i->tr.getAlias()=" << i->tr.getAlias() << " i->info->table=" << i->info->table + << " overlap=" << i->overlap << " kind=" << kindName); + if (i->info->kind == TableInfo::DIRECTOR || i->info->kind == TableInfo::MATCH) { + std::string tableName; + if (i->tr.hasAlias()) { + tableName = i->tr.getAlias(); + } else { + tableName = i->info->database + "." + i->info->table; + } + if (tableNames.count(tableName) == 0) { + tableNames.insert(tableName); + auto predicate = i->makeSubChunkCompPredicate(); + BoolFactor::Ptr bfactor = std::make_shared(); + bfactor->_terms.push_back(predicate); + _query->getWhereClause(true).prependAndTerm(bfactor); + } + } + } + // Rewrite director table references not requiring overlap as their // corresponding sub-chunk templates, and record the names of all // sub-chunked tables. for (ListIter i = _vertices.begin(), e = _vertices.end(); i != e; ++i) { if (i->info->kind == TableInfo::DIRECTOR) { if (i->overlap == 0.0) { - i->rewriteAsSubChunkTemplate(); + i->rewriteAsChunkTemplate(); } DbTable dbTable(i->info->database, i->info->table); - LOGS(_log, LOG_LVL_TRACE, "rewrite db=" << dbTable.db << " table=" << dbTable.table); + LOGS(_log, LOG_LVL_INFO, + "4: rewrite db=" << i->info->database << " table=" << i->info->table + << " overlap=" << i->overlap); mapping.insertSubChunkTable(dbTable); } } unsigned n = static_cast(overlapRefs.size()); unsigned numPermutations = 1 << n; - // Each director requiring overlap must be rewritten as both a sub-chunk - // template and an overlap sub-chunk template. There are 2ⁿ different + // Each director requiring overlap must be rewritten as both a chunk + // template and chunk overlap template. There are 2ⁿ different // template permutations for n directors requiring overlap; generate them // all. for (unsigned p = 0; p < numPermutations; ++p) { for (unsigned i = 0; i < n; ++i) { if ((p & (1 << i)) != 0) { overlapRefs[i]->rewriteAsOverlapTemplate(); + LOGS(_log, LOG_LVL_INFO, + "5.o: rewrite db=" << overlapRefs[i]->info->database + << " table=" << overlapRefs[i]->info->table + << " overlap=" << overlapRefs[i]->overlap); } else { - overlapRefs[i]->rewriteAsSubChunkTemplate(); + overlapRefs[i]->rewriteAsChunkTemplate(); + LOGS(_log, LOG_LVL_INFO, + "5.c: rewrite db=" << overlapRefs[i]->info->database + << " table=" << overlapRefs[i]->info->table + << " overlap=" << overlapRefs[i]->overlap); } } // Given the use of shared_ptr by the IR classes, we could shallow diff --git a/src/qana/RelationGraph.h b/src/qana/RelationGraph.h index 8731d7395b..3e7fb0df6a 100644 --- a/src/qana/RelationGraph.h +++ b/src/qana/RelationGraph.h @@ -494,6 +494,7 @@ namespace lsst::qserv { namespace query { class ColumnRef; +class CompPredicate; class QueryContext; class SelectStmt; } // namespace query @@ -577,19 +578,16 @@ struct Vertex { tr.setTable(info->getChunkTemplate()); } - /// `rewriteAsSubChunkTemplate` rewrites `tr` to contain a sub-chunk - /// specific name pattern. - void rewriteAsSubChunkTemplate() { - tr.setDb(info->getSubChunkDb()); - tr.setTable(info->getSubChunkTemplate()); - } - /// `rewriteAsOverlapTemplate` rewrites `tr` to contain an overlap /// sub-chunk specific name pattern. void rewriteAsOverlapTemplate() { - tr.setDb(info->getSubChunkDb()); + tr.setDb(info->database); tr.setTable(info->getOverlapTemplate()); } + + /// `makeSubChunkCompPredicate` creates a shared pointer to a new + /// comparison predicate for the sub-chunk column. + std::shared_ptr makeSubChunkCompPredicate() const; }; std::ostream& operator<<(std::ostream& out, Vertex const& vertex); diff --git a/src/qana/TableInfo.h b/src/qana/TableInfo.h index b54e1663ec..e46f347a77 100644 --- a/src/qana/TableInfo.h +++ b/src/qana/TableInfo.h @@ -69,7 +69,7 @@ #include // Qserv headers -#include "global/constants.h" // for SUBCHUNKDB_PREFIX +#include "global/constants.h" // Forward declarations namespace lsst::qserv::query { @@ -134,12 +134,8 @@ struct TableInfo { return false; } - std::string const getSubChunkDb() const { return SUBCHUNKDB_PREFIX + database + "_" + CHUNK_TAG; } std::string const getChunkTemplate() const { return table + "_" + CHUNK_TAG; } - std::string const getSubChunkTemplate() const { return table + "_" + CHUNK_TAG + "_" + SUBCHUNK_TAG; } - std::string const getOverlapTemplate() const { - return table + "FullOverlap_" + CHUNK_TAG + "_" + SUBCHUNK_TAG; - } + std::string const getOverlapTemplate() const { return table + "FullOverlap_" + CHUNK_TAG; } virtual void dump(std::ostream& os) const; }; diff --git a/src/qproc/testQueryAnaGeneral.cc b/src/qproc/testQueryAnaGeneral.cc index cced49a07e..274029959a 100644 --- a/src/qproc/testQueryAnaGeneral.cc +++ b/src/qproc/testQueryAnaGeneral.cc @@ -199,16 +199,20 @@ BOOST_AUTO_TEST_CASE(RestrictorNeighborCount) { "where qserv_areaspec_box(6,6,7,7) AND rFlux_PS<0.005 AND " "scisql_angSep(o1.ra_Test,o1.decl_Test,o2.ra_Test,o2.decl_Test) < 0.001;"; std::string expected_100_subchunk_core = - "SELECT count(*) AS `QS1_COUNT` FROM `Subchunks_LSST_100`.`Object_100_%S\007S%` AS " - "`o1`,`Subchunks_LSST_100`.`Object_100_%S\007S%` AS `o2` " - "WHERE scisql_s2PtInBox(`o1`.`ra_Test`,`o1`.`decl_Test`,6,6,7,7)=1 AND " + "SELECT count(*) AS `QS1_COUNT` FROM `LSST`.`Object_100` AS " + "`o1`,`LSST`.`Object_100` AS `o2` " + "WHERE `o2`.`subChunkId`=%S\007S% " + "AND `o1`.`subChunkId`=%S\007S% " + "AND scisql_s2PtInBox(`o1`.`ra_Test`,`o1`.`decl_Test`,6,6,7,7)=1 AND " "scisql_s2PtInBox(`o2`.`ra_Test`,`o2`.`decl_Test`,6,6,7,7)=1 AND " "`o1`.`rFlux_PS`<0.005 AND " "scisql_angSep(`o1`.`ra_Test`,`o1`.`decl_Test`,`o2`.`ra_Test`,`o2`.`decl_Test`)<0.001"; std::string expected_100_subchunk_overlap = - "SELECT count(*) AS `QS1_COUNT` FROM `Subchunks_LSST_100`.`Object_100_%S\007S%` AS " - "`o1`,`Subchunks_LSST_100`.`ObjectFullOverlap_100_%S\007S%` AS `o2` " - "WHERE scisql_s2PtInBox(`o1`.`ra_Test`,`o1`.`decl_Test`,6,6,7,7)=1 " + "SELECT count(*) AS `QS1_COUNT` FROM `LSST`.`Object_100` AS " + "`o1`,`LSST`.`ObjectFullOverlap_100` AS `o2` " + "WHERE `o2`.`subChunkId`=%S\007S% " + "AND `o1`.`subChunkId`=%S\007S% " + "AND scisql_s2PtInBox(`o1`.`ra_Test`,`o1`.`decl_Test`,6,6,7,7)=1 " "AND scisql_s2PtInBox(`o2`.`ra_Test`,`o2`.`decl_Test`,6,6,7,7)=1 " "AND `o1`.`rFlux_PS`<0.005 AND " "scisql_angSep(`o1`.`ra_Test`,`o1`.`decl_Test`,`o2`.`ra_Test`,`o2`.`decl_Test`)<0.001"; @@ -253,9 +257,11 @@ BOOST_AUTO_TEST_CASE(Triple) { "0.024 > scisql_angSep(o1.ra_Test,o1.decl_Test,o2.ra_Test,o2.decl_Test) and " "Source.objectIdSourceTest=o2.objectIdObjTest;"; std::string expected = - "SELECT * FROM `Subchunks_LSST_100`.`Object_100_%S\007S%` AS " - "`o1`,`Subchunks_LSST_100`.`Object_100_%S\007S%` AS `o2`,`LSST`.`Source_100` AS `LSST.Source` " - "WHERE `o1`.`id`!=`o2`.`id` AND " + "SELECT * FROM `LSST`.`Object_100` AS " + "`o1`,`LSST`.`Object_100` AS `o2`,`LSST`.`Source_100` AS `LSST.Source` " + "WHERE `o2`.`subChunkId`=%S\007S% " + "AND `o1`.`subChunkId`=%S\007S% " + "AND `o1`.`id`!=`o2`.`id` AND " "0.024>scisql_angSep(`o1`.`ra_Test`,`o1`.`decl_Test`,`o2`.`ra_Test`,`o2`.`decl_Test`) AND " "`LSST.Source`.`objectIdSourceTest`=`o2`.`objectIdObjTest`"; @@ -509,9 +515,11 @@ BOOST_AUTO_TEST_CASE(ObjectSelfJoinDistance) { "scisql_angSep(o1.ra_Test,o1.decl_Test,o2.ra_Test,o2.decl_Test) < 0.02"; std::string expected = "SELECT count(*) AS `QS1_COUNT` " - "FROM `Subchunks_LSST_100`.`Object_100_%S\007S%` AS `o1`," - "`Subchunks_LSST_100`.`Object_100_%S\007S%` AS `o2` " - "WHERE scisql_s2PtInBox(`o1`.`ra_Test`,`o1`.`decl_Test`,5.5,5.5,6.1,6.1)=1 " + "FROM `LSST`.`Object_100` AS `o1`," + "`LSST`.`Object_100` AS `o2` " + "WHERE `o2`.`subChunkId`=%S\007S% " + "AND `o1`.`subChunkId`=%S\007S% " + "AND scisql_s2PtInBox(`o1`.`ra_Test`,`o1`.`decl_Test`,5.5,5.5,6.1,6.1)=1 " "AND scisql_s2PtInBox(`o2`.`ra_Test`,`o2`.`decl_Test`,5.5,5.5,6.1,6.1)=1 " "AND scisql_angSep(`o1`.`ra_Test`,`o1`.`decl_Test`,`o2`.`ra_Test`,`o2`.`decl_Test`)<0.02"; qsTest.sqlConfig = @@ -951,9 +959,11 @@ BOOST_AUTO_TEST_CASE(FuncExprPred) { "(scisql_fluxToAbMag(o2.gFlux_PS)-scisql_fluxToAbMag(o2.rFlux_PS)) ) < 1;"; expected = "SELECT `o1`.`objectId` AS `o1.objectId`,`o2`.`objectId` AS `objectId2` " - "FROM `Subchunks_LSST_100`.`Object_100_%S\007S%` AS " - "`o1`,`Subchunks_LSST_100`.`Object_100_%S\007S%` AS `o2` " - "WHERE scisql_angSep(`o1`.`ra_Test`,`o1`.`decl_Test`,`o2`.`ra_Test`,`o2`.`decl_Test`)<0.00001 " + "FROM `LSST`.`Object_100` AS " + "`o1`,`LSST`.`Object_100` AS `o2` " + "WHERE `o2`.`subChunkId`=%S\007S% " + "AND `o1`.`subChunkId`=%S\007S% " + "AND scisql_angSep(`o1`.`ra_Test`,`o1`.`decl_Test`,`o2`.`ra_Test`,`o2`.`decl_Test`)<0.00001 " "AND `o1`.`objectId`<>`o2`.`objectId` AND " "ABS((scisql_fluxToAbMag(`o1`.`gFlux_PS`)-scisql_fluxToAbMag(`o1`.`rFlux_PS`))-(scisql_" "fluxToAbMag(`o2`.`gFlux_PS`)-scisql_fluxToAbMag(`o2`.`rFlux_PS`)))<1"; @@ -1294,18 +1304,24 @@ BOOST_AUTO_TEST_CASE(Case01_1081) { "WHERE closestToObj = 1 OR closestToObj is NULL;"; std::string expected_100_subchunk_core = "SELECT count(*) AS `QS1_COUNT` " - "FROM `Subchunks_LSST_100`.`Object_100_%S\007S%` AS `o` " + "FROM `LSST`.`Object_100` AS `o` " "INNER JOIN `LSST`.`RefObjMatch_100` AS `o2t` ON `o`.`objectIdObjTest`=`o2t`.`objectId` " - "INNER JOIN `Subchunks_LSST_100`.`SimRefObject_100_%S\007S%` AS `t` ON " + "INNER JOIN `LSST`.`SimRefObject_100` AS `t` ON " "`o2t`.`refObjectId`=`t`.`refObjectId` " - "WHERE `o`.`closestToObj`=1 OR `o`.`closestToObj` IS NULL"; + "WHERE `t`.`subChunkId`=%S\007S% " + "AND `o2t`.`subChunkId`=%S\007S% " + "AND `o`.`subChunkId`=%S\007S% " + "AND (`o`.`closestToObj`=1 OR `o`.`closestToObj` IS NULL)"; std::string expected_100_subchunk_overlap = "SELECT count(*) AS `QS1_COUNT` " - "FROM `Subchunks_LSST_100`.`Object_100_%S\007S%` AS `o` " + "FROM `LSST`.`Object_100` AS `o` " "INNER JOIN `LSST`.`RefObjMatch_100` AS `o2t` ON `o`.`objectIdObjTest`=`o2t`.`objectId` " - "INNER JOIN `Subchunks_LSST_100`.`SimRefObjectFullOverlap_100_%S\007S%` AS `t` ON " + "INNER JOIN `LSST`.`SimRefObjectFullOverlap_100` AS `t` ON " "`o2t`.`refObjectId`=`t`.`refObjectId` " - "WHERE `o`.`closestToObj`=1 OR `o`.`closestToObj` IS NULL"; + "WHERE `t`.`subChunkId`=%S\007S% " + "AND `o2t`.`subChunkId`=%S\007S% " + "AND `o`.`subChunkId`=%S\007S% " + "AND (`o`.`closestToObj`=1 OR `o`.`closestToObj` IS NULL)"; qsTest.sqlConfig = SqlConfig(SqlConfig::MockDbTableColumns({{"LSST", {{"Object", {"objectIdObjTest", "closestToObj"}}, diff --git a/src/query/SelectStmt.cc b/src/query/SelectStmt.cc index a17cbae7db..8154815ebb 100644 --- a/src/query/SelectStmt.cc +++ b/src/query/SelectStmt.cc @@ -169,6 +169,11 @@ void SelectStmt::setFromListAsTable(std::string const& t) { _fromList = std::make_shared(tr); } +WhereClause& SelectStmt::getWhereClause(bool createIfMissing) { + if (createIfMissing && !_whereClause) _whereClause = std::make_shared(); + return *_whereClause; +} + bool SelectStmt::operator==(const SelectStmt& rhs) const { return (util::ptrCompare(_fromList, rhs._fromList) && util::ptrCompare(_selectList, rhs._selectList) && diff --git a/src/query/SelectStmt.h b/src/query/SelectStmt.h index 5ddb90b7a7..c9799f0106 100644 --- a/src/query/SelectStmt.h +++ b/src/query/SelectStmt.h @@ -104,7 +104,7 @@ class SelectStmt { bool hasWhereClause() const { return static_cast(_whereClause); } WhereClause const& getWhereClause() const { return *_whereClause; } - WhereClause& getWhereClause() { return *_whereClause; } + WhereClause& getWhereClause(bool createIfMissing = false); void setWhereClause(std::shared_ptr w) { _whereClause = w; } /** diff --git a/src/query/WhereClause.cc b/src/query/WhereClause.cc index 59687d426f..0ac4bc095c 100644 --- a/src/query/WhereClause.cc +++ b/src/query/WhereClause.cc @@ -141,7 +141,26 @@ void WhereClause::prependAndTerm(std::shared_ptr t) { throw std::logic_error("Term of first OR term is not an AND term; there is no global AND term"); } } else { - throw std::logic_error("There is more than term in the root OR term; can't pick a global AND term"); + // We've hit the following case: + // + // WHERE ... OR .. OR ... + // + // A solution is to construct the new root and the global AndTerm. + // Push down the current root as the only child of the AndTerm. + // In essence, we're replacing the following constraints: + // + // WHERE ... OR .. OR ... + // + // With: + // + // WHERE AND (... OR .. OR ...) + // + auto newRootOrTerm = std::make_shared(); + andTerm = std::make_shared(); + andTerm->addBoolTerm(_rootOrTerm); + newRootOrTerm->addBoolTerm(andTerm); + _rootOrTerm = newRootOrTerm; + andTerm = std::dynamic_pointer_cast(_rootOrTerm->_terms[0]); } if (!andTerm->merge(*t, AndTerm::PREPEND)) { diff --git a/src/wbase/Task.cc b/src/wbase/Task.cc index b6f86633a1..201529cfd0 100644 --- a/src/wbase/Task.cc +++ b/src/wbase/Task.cc @@ -164,7 +164,7 @@ Task::Task(TaskMsgPtr const& t, int fragmentNumber, shared_ptr co _scanInfo.sortTablesSlowestFirst(); _scanInteractive = t->scaninteractive(); _maxTableSize = t->maxtablesize_mb() * ::MB_SIZE_BYTES; - +#if 0 // Create sets and vectors for 'aquiring' subchunk temporary tables. proto::TaskMsg_Fragment const& fragment(t->fragment(_queryFragmentNum)); DbTableSet dbTbls_; @@ -203,6 +203,7 @@ Task::Task(TaskMsgPtr const& t, int fragmentNumber, shared_ptr co << " subChunks=" << util::printable(subchunksVect_)); } _dbTblsAndSubchunks = make_unique(dbTbls_, subchunksVect_); +#endif if (_sendChannel == nullptr) { throw util::Bug(ERR_LOC, "Task::Task _sendChannel==null " + getIdStr()); } @@ -221,7 +222,6 @@ Task::~Task() { vector Task::createTasks(shared_ptr const& taskMsg, shared_ptr const& sendChannel, - shared_ptr const& chunkResourceMgr, mysql::MySqlConfig const& mySqlConfig, shared_ptr const& sqlConnMgr, shared_ptr const& queriesAndChunks, @@ -259,8 +259,8 @@ vector Task::createTasks(shared_ptr const& taskMsg, } for (auto task : vect) { // newQueryRunner sets the `_taskQueryRunner` pointer in `task`. - task->setTaskQueryRunner(wdb::QueryRunner::newQueryRunner(task, chunkResourceMgr, mySqlConfig, - sqlConnMgr, queriesAndChunks)); + task->setTaskQueryRunner( + wdb::QueryRunner::newQueryRunner(task, mySqlConfig, sqlConnMgr, queriesAndChunks)); } sendChannel->setTaskCount(vect.size()); diff --git a/src/wbase/Task.h b/src/wbase/Task.h index 22663ba5aa..7c2f67752c 100644 --- a/src/wbase/Task.h +++ b/src/wbase/Task.h @@ -60,9 +60,6 @@ class FileChannelShared; namespace lsst::qserv::wcontrol { class SqlConnMgr; } -namespace lsst::qserv::wdb { -class ChunkResourceMgr; -} namespace lsst::qserv::wpublish { class QueriesAndChunks; class QueryStatistics; @@ -129,7 +126,7 @@ class Task : public util::CommandForThreadPool { static std::string const defaultUser; using Ptr = std::shared_ptr; using TaskMsgPtr = std::shared_ptr; - +#if 0 /// Class to store constant sets and vectors. class DbTblsAndSubchunks { public: @@ -147,7 +144,7 @@ class Task : public util::CommandForThreadPool { /// Vector of subchunkIds. Set in constructor and should never change. const IntVector subchunksVect; }; - +#endif struct ChunkEqual { bool operator()(Ptr const& x, Ptr const& y); }; @@ -165,7 +162,6 @@ class Task : public util::CommandForThreadPool { /// Read 'taskMsg' to generate a vector of one or more task objects all using the same 'sendChannel' static std::vector createTasks(std::shared_ptr const& taskMsg, std::shared_ptr const& sendChannel, - std::shared_ptr const& chunkResourceMgr, mysql::MySqlConfig const& mySqlConfig, std::shared_ptr const& sqlConnMgr, std::shared_ptr const& queriesAndChunks, @@ -276,10 +272,7 @@ class Task : public util::CommandForThreadPool { int getSubchunkId() const { return _subchunkId; } /// Returns a reference to dbTbls. - const DbTableSet& getDbTbls() const { return _dbTblsAndSubchunks->dbTbls; } - - /// Return a reference to the list of subchunk ids. - const IntVector& getSubchunksVect() const { return _dbTblsAndSubchunks->subchunksVect; } + // const DbTableSet& getDbTbls() const { return _dbTblsAndSubchunks->dbTbls; } /// Return an identifier of the corresponding MySQL query (if any was set). unsigned long getMySqlThreadId() const { return _mysqlThreadId.load(); } @@ -321,7 +314,7 @@ class Task : public util::CommandForThreadPool { int const _czarId; ///< czar Id from the task message. /// Set of tables and vector of subchunk ids used by ChunkResourceRequest. Do not change/reset. - std::unique_ptr _dbTblsAndSubchunks; + // std::unique_ptr _dbTblsAndSubchunks; /// The path to the result file. std::string _resultFileAbsPath; diff --git a/src/wcontrol/Foreman.cc b/src/wcontrol/Foreman.cc index 84b9a44ef4..f46313eff0 100644 --- a/src/wcontrol/Foreman.cc +++ b/src/wcontrol/Foreman.cc @@ -44,8 +44,6 @@ #include "wcontrol/ResourceMonitor.h" #include "wcontrol/SqlConnMgr.h" #include "wcontrol/WorkerStats.h" -#include "wdb/ChunkResource.h" -#include "wdb/SQLBackend.h" #include "wpublish/QueriesAndChunks.h" using namespace std; @@ -90,13 +88,6 @@ Foreman::Foreman(Scheduler::Ptr const& scheduler, unsigned int poolSize, unsigne _resourceMonitor(make_shared()), _io_service(), _httpServer(qhttp::Server::create(_io_service, 0 /* grab the first available port */)) { - // Make the chunk resource mgr - // Creating backend makes a connection to the database for making temporary tables. - // It will delete temporary tables that it can identify as being created by a worker. - // Previous instances of the worker will terminate when they try to use or create temporary tables. - // Previous instances of the worker should be terminated before a new worker is started. - _chunkResourceMgr = wdb::ChunkResourceMgr::newMgr(make_shared(_mySqlConfig)); - assert(_scheduler); // Cannot operate without scheduler. LOGS(_log, LOG_LVL_DEBUG, "poolSize=" << poolSize << " maxPoolThreads=" << maxPoolThreads); diff --git a/src/wcontrol/Foreman.h b/src/wcontrol/Foreman.h index 17fd0f14f6..96afd99dce 100644 --- a/src/wcontrol/Foreman.h +++ b/src/wcontrol/Foreman.h @@ -59,7 +59,6 @@ class Server; } // namespace lsst::qserv::qhttp namespace lsst::qserv::wdb { -class ChunkResourceMgr; class QueryRunner; } // namespace lsst::qserv::wdb @@ -93,7 +92,6 @@ class Scheduler : public wbase::TaskScheduler, public util::CommandQueue { }; /// Foreman is used to maintain a thread pool and schedule Tasks for the thread pool. -/// It also manages sub-chunk tables with the ChunkResourceMgr. /// The schedulers may limit the number of threads they will use from the thread pool. class Foreman : public wbase::MsgProcessor { public: @@ -117,7 +115,6 @@ class Foreman : public wbase::MsgProcessor { Foreman(Foreman const&) = delete; Foreman& operator=(Foreman const&) = delete; - std::shared_ptr const& chunkResourceMgr() const { return _chunkResourceMgr; } mysql::MySqlConfig const& mySqlConfig() const { return _mySqlConfig; } std::shared_ptr const& queriesAndChunks() const { return _queries; } std::shared_ptr const& chunkInventory() const { return _chunkInventory; } @@ -139,8 +136,6 @@ class Foreman : public wbase::MsgProcessor { virtual nlohmann::json statusToJson(wbase::TaskSelector const& taskSelector) override; private: - std::shared_ptr _chunkResourceMgr; - util::ThreadPool::Ptr _pool; Scheduler::Ptr _scheduler; diff --git a/src/wdb/CMakeLists.txt b/src/wdb/CMakeLists.txt index ba92138265..631877d3ee 100644 --- a/src/wdb/CMakeLists.txt +++ b/src/wdb/CMakeLists.txt @@ -2,10 +2,8 @@ add_library(wdb SHARED) add_dependencies(wdb proto) target_sources(wdb PRIVATE - ChunkResource.cc QueryRunner.cc QuerySql.cc - SQLBackend.cc ) target_include_directories(wdb PRIVATE @@ -38,7 +36,6 @@ FUNCTION(wdb_tests) ENDFUNCTION() wdb_tests( - testChunkResource testQueryRunner testQuerySql ) diff --git a/src/wdb/ChunkResource.cc b/src/wdb/ChunkResource.cc deleted file mode 100644 index b131552e29..0000000000 --- a/src/wdb/ChunkResource.cc +++ /dev/null @@ -1,358 +0,0 @@ -// -*- LSST-C++ -*- -/* - * LSST Data Management System - * Copyright 2014-2016 AURA/LSST. - * - * This product includes software developed by the - * LSST Project (http://www.lsst.org/). - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the LSST License Statement and - * the GNU General Public License along with this program. If not, - * see . - */ - -/** - * @file - * - * @brief ChunkResource implementation - * - * @author Daniel L. Wang, SLAC - */ - -// Class header -#include "wdb/ChunkResource.h" - -// System headers -#include -#include - -// Third-party headers -#include "boost/format.hpp" - -// LSST headers -#include "lsst/log/Log.h" - -// Qserv headers -#include "global/constants.h" -#include "sql/SqlResults.h" -#include "util/Bug.h" -#include "util/IterableFormatter.h" -#include "wbase/Base.h" -#include "wdb/QuerySql.h" - -namespace { - -LOG_LOGGER _log = LOG_GET("lsst.qserv.wdb.ChunkResource"); - -template -class ScScriptBuilder { -public: - ScScriptBuilder(lsst::qserv::wdb::QuerySql& qSql_, std::string const& db, std::string const& table, - std::string const& scColumn, int chunkId) - : qSql(qSql_) { - buildT = (boost::format(lsst::qserv::wbase::CREATE_SUBCHUNK_SCRIPT) % db % table % scColumn % - chunkId % "%1%") - .str(); - cleanT = (boost::format(lsst::qserv::wbase::CLEANUP_SUBCHUNK_SCRIPT) % db % table % chunkId % "%1%") - .str(); - } - void operator()(T const& subc) { - qSql.buildList.push_back((boost::format(buildT) % subc).str()); - qSql.cleanupList.push_back((boost::format(cleanT) % subc).str()); - } - std::string buildT; - std::string cleanT; - lsst::qserv::wdb::QuerySql& qSql; -}; -} // anonymous namespace - -namespace lsst::qserv::wdb { -//////////////////////////////////////////////////////////////////////// -// ChunkResource -//////////////////////////////////////////////////////////////////////// -class ChunkResource::Info { -public: - Info(std::string const& db_, int chunkId_, DbTableSet const& tables_, IntVector const& subChunkIds_) - : db{db_}, chunkId{chunkId_}, tables{tables_}, subChunkIds{subChunkIds_} {} - - Info(std::string const& db_, int chunkId_, DbTableSet const& tables_) - : db{db_}, chunkId{chunkId_}, tables{tables_} {} - - std::string db; - int chunkId; - DbTableSet tables; - IntVector subChunkIds; -}; -std::ostream& operator<<(std::ostream& os, ChunkResource::Info const& i) { - os << "CrInfo(" << i.chunkId << "; "; - std::copy(i.subChunkIds.begin(), i.subChunkIds.end(), std::ostream_iterator(os, ",")); - os << ")"; - return os; -} -//////////////////////////////////////////////////////////////////////// -// ChunkResource -//////////////////////////////////////////////////////////////////////// -ChunkResource::ChunkResource(ChunkResourceMgr* mgr) : _mgr{mgr} {} - -ChunkResource::ChunkResource(ChunkResourceMgr* mgr, ChunkResource::Info* info) : _mgr{mgr}, _info{info} { - LOGS(_log, LOG_LVL_DEBUG, "ChunkResource info=" << *info); - _mgr->acquireUnit(*_info); -} -ChunkResource::ChunkResource(ChunkResource const& cr) : _mgr{cr._mgr}, _info{new Info(*cr._info)} { - _mgr->acquireUnit(*_info); -} - -ChunkResource& ChunkResource::operator=(ChunkResource const& cr) { - _mgr = cr._mgr; - _info.reset(new Info(*cr._info)); - _mgr->acquireUnit(*_info); - return *this; -} - -ChunkResource::~ChunkResource() { - if (_info.get()) { - _mgr->release(*_info); - } -} - -std::string const& ChunkResource::getDb() const { return _info->db; } - -int ChunkResource::getChunkId() const { return _info->chunkId; } - -DbTableSet const& ChunkResource::getTables() const { return _info->tables; } - -IntVector const& ChunkResource::getSubChunkIds() const { return _info->subChunkIds; } - -std::ostream& operator<<(std::ostream& os, const SQLBackend::LockStatus& ls) { - switch (ls) { - case SQLBackend::UNLOCKED: - os << "UNLOCKED"; - break; - case SQLBackend::LOCKED_OTHER: - os << "LOCKED_OTHER"; - break; - case SQLBackend::LOCKED_OURS: - os << "LOCKED_OURS"; - break; - } - os << "unknown"; - return os; -} - -/// ChunkEntry is an entry that represents table subchunks for a given -/// database and chunkid. -class ChunkEntry { -public: - typedef std::map SubChunkMap; // subchunkid -> count - typedef std::map TableMap; // tablename -> subchunk map - - typedef std::shared_ptr Ptr; - - ChunkEntry(int chunkId) : _chunkId(chunkId), _refCount(0) {} - - int getRefCount() const { - std::lock_guard lock(_mutex); - return _refCount; - } - - /// @return a copy of _tableMap - TableMap getTableMapCopy() const { - std::lock_guard lock(_mutex); - return _tableMap; - } - - /// Acquire a resource, loading if needed - void acquire(std::string const& db, DbTableSet const& dbTableSet, IntVector const& sc, - SQLBackend::Ptr backend) { - ScTableVector needed; - std::lock_guard lock(_mutex); - backend->memLockRequireOwnership(); - ++_refCount; // Increase usage count - LOGS(_log, LOG_LVL_DEBUG, - "SubChunk acquire refC=" << _refCount << " db=" << db << " tables[" - << util::printable(dbTableSet) << "]" - << " sc[" << util::printable(sc) << "]"); - for (auto const& dbTbl : dbTableSet) { - SubChunkMap& scm = _tableMap[dbTbl]; // implicit creation OK. - IntVector::const_iterator i, e; - for (i = sc.begin(), e = sc.end(); i != e; ++i) { - SubChunkMap::iterator it = scm.find(*i); - int last = 0; - if (it == scm.end()) { - needed.push_back(ScTable(_chunkId, dbTbl, *i)); - } else { - last = it->second; - } - scm[*i] = last + 1; // write new value - } // All subchunks - } // All tables - // For now, every other user of this chunk must wait while - // we fetch the resource. - if (needed.size() > 0) { - sql::SqlErrorObject err; - bool loadOk = backend->load(needed, err); - if (!loadOk) { - // Release - _release(needed); - throw err; - } - } - } - - /// Release a resource, flushing if no more users need it. - void release(std::string const& db, DbTableSet const& dbTableSet, IntVector const& sc, - SQLBackend::Ptr backend) { - std::lock_guard lock(_mutex); - backend->memLockRequireOwnership(); - StringVector::const_iterator ti, te; - LOGS(_log, LOG_LVL_DEBUG, - "SubChunk release refC=" << _refCount << " db=" << db << " dbTableSet[" - << util::printable(dbTableSet) << "]" - << " sc[" << util::printable(sc) << "]"); - for (auto const& dbTbl : dbTableSet) { - SubChunkMap& scm = _tableMap[dbTbl]; // Should be in there. - IntVector::const_iterator i, e; - for (i = sc.begin(), e = sc.end(); i != e; ++i) { - SubChunkMap::iterator it = scm.find(*i); // Should be there - if (it == scm.end()) { - throw util::Bug( - ERR_LOC, - "ChunkResource ChunkEntry::release: Error releasing un-acquired resource"); - } - scm[*i] = it->second - 1; // write new value - } // All subchunks - } // All tables - --_refCount; - _flush(db, backend); // Discard resources no longer needed by anyone. - // flush could be detached from the release function, to be called at a - // high-water mark and/or on periodic intervals - } - -private: - /// Flush resources no longer needed by anybody - void _flush(std::string const& db, SQLBackend::Ptr backend) { - ScTableVector discardable; - for (auto& elem : _tableMap) { - IntVector mapDiscardable; - SubChunkMap& scm = elem.second; - SubChunkMap::iterator si, se; - for (si = scm.begin(), se = scm.end(); si != se; ++si) { - if (si->second == 0) { - discardable.push_back(ScTable(_chunkId, elem.first, si->first)); - mapDiscardable.push_back(si->first); - } else if (si->second < 0) { - throw util::Bug(ERR_LOC, - "ChunkResource ChunkEntry::flush: Invalid negative use count when " - "flushing subchunks"); - } - } // All subchunks - // Prune zero elements for this db+table+chunk - // (invalidates iterators) - IntVector::iterator di, de; - for (di = mapDiscardable.begin(), de = mapDiscardable.end(); di != de; ++di) { - scm.erase(*di); - } - } // All tables - // Delegate actual table dropping to the backend. - if (discardable.size() > 0) { - backend->discard(discardable); - } - } - - void _release(ScTableVector const& needed) { - // _mutex should be held. - // Release subChunkId for the right table - for (auto const& elem : needed) { - SubChunkMap& scm = _tableMap[elem.dbTable]; - --scm[elem.subChunkId]; - } - } - - std::shared_ptr _backend; ///< Delegate stage/unstage - int _chunkId; - int _refCount; ///< Number of known users - TableMap _tableMap; ///< tables in use - mutable std::mutex _mutex; -}; - -//////////////////////////////////////////////////////////////////////// -// ChunkResourceMgr -//////////////////////////////////////////////////////////////////////// - -ChunkResourceMgr::Ptr ChunkResourceMgr::newMgr(SQLBackend::Ptr const& backend) { - // return std::shared_ptr(new Impl(backend)); - return std::make_shared(backend); -} - -ChunkResource ChunkResourceMgr::acquire(std::string const& db, int chunkId, DbTableSet const& tables) { - // Make sure that the chunk is ready. (NOP right now.) - LOGS(_log, LOG_LVL_DEBUG, - "acquire db=" << db << " chunkId=" << chunkId << " tables=" << util::printable(tables)); - ChunkResource cr(this, new ChunkResource::Info(db, chunkId, tables)); - return cr; -} - -ChunkResource ChunkResourceMgr::acquire(std::string const& db, int chunkId, DbTableSet const& dbTableSet, - IntVector const& subChunks) { - ChunkResource cr(this, new ChunkResource::Info(db, chunkId, dbTableSet, subChunks)); - return cr; -} - -void ChunkResourceMgr::release(ChunkResource::Info const& i) { - std::lock_guard lock(_mapMutex); - Map& map = _getMap(i.db); - ChunkEntry& ce = _getChunkEntry(map, i.chunkId); - ce.release(i.db, i.tables, i.subChunkIds, _backend); -} - -void ChunkResourceMgr::acquireUnit(ChunkResource::Info const& i) { - std::lock_guard lock(_mapMutex); - Map& map = _getMap(i.db); // Select db - ChunkEntry& ce = _getChunkEntry(map, i.chunkId); - // Actually acquire - LOGS(_log, LOG_LVL_DEBUG, "acquireUnit info=" << i); - ce.acquire(i.db, i.tables, i.subChunkIds, _backend); -} - -int ChunkResourceMgr::getRefCount(std::string const& db, int chunkId) { - std::lock_guard lock(_mapMutex); - Map& map = _getMap(db); // Select db - Map::iterator it = map.find(chunkId); // Select chunkId - if (it == map.end()) { - return 0; - } - ChunkEntry& ce = *(it->second.get()); - return ce.getRefCount(); -} - -ChunkResourceMgr::Map& ChunkResourceMgr::_getMap(std::string const& db) { - DbMap::iterator it = _dbMap.find(db); - if (it == _dbMap.end()) { - DbMap::value_type v(db, Map()); - _dbMap.insert(v); - it = _dbMap.find(db); - } - return it->second; -} - -ChunkEntry& ChunkResourceMgr::_getChunkEntry(Map& m, int chunkId) { - Map::iterator it = m.find(chunkId); // Select chunkId - if (it == m.end()) { // Insert if not exist - Map::value_type v(chunkId, std::make_shared(chunkId)); - m.insert(v); - return *(v.second.get()); - } - return *(it->second.get()); -} - -} // namespace lsst::qserv::wdb diff --git a/src/wdb/ChunkResource.h b/src/wdb/ChunkResource.h deleted file mode 100644 index f030c23f37..0000000000 --- a/src/wdb/ChunkResource.h +++ /dev/null @@ -1,151 +0,0 @@ -// -*- LSST-C++ -*- -/* - * LSST Data Management System - * Copyright 2015-2016 LSST Corporation. - * - * This product includes software developed by the - * LSST Project (http://www.lsst.org/). - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the LSST License Statement and - * the GNU General Public License along with this program. If not, - * see . - */ - -#ifndef LSST_QSERV_WDB_CHUNKRESOURCE_H -#define LSST_QSERV_WDB_CHUNKRESOURCE_H -/** - * @file - * - * @brief ChunkResource tracks which chunks are needed. Right now, it is used - * to manage subchunk creation. - * - * @author Daniel L. Wang, SLAC - */ - -// System headers -#include -#include -#include -#include -#include -#include - -// Third-party headers -#include "boost/utility.hpp" - -// Qserv headers -#include "global/DbTable.h" -#include "global/intTypes.h" -#include "global/stringTypes.h" -#include "wdb/SQLBackend.h" - -// Forward declarations -namespace lsst::qserv { -namespace proto { -class TaskMsg_Fragment; -} -namespace wdb { -class Task; -} -} // namespace lsst::qserv - -namespace lsst::qserv::wdb { - -class ChunkEntry; -class ChunkResourceMgr; - -/// ChunkResources are reservations on data resources. Releases its resource -/// when it dies. If you make a copy, the copy holds its own reservation on the -/// same resource. -class ChunkResource { -public: - class Info; // Internal metadata for the resource. - using Ptr = std::shared_ptr; - - ~ChunkResource(); - - std::string const& getDb() const; - int getChunkId() const; - DbTableSet const& getTables() const; - IntVector const& getSubChunkIds() const; - - friend class ChunkResourceMgr; - ChunkResource(ChunkResource const& cr); - ChunkResource& operator=(ChunkResource const& cr); - -private: - ChunkResource(ChunkResourceMgr* mgr); - ChunkResource(ChunkResourceMgr* mgr, Info* info); - - ChunkResourceMgr* _mgr; ///< Do not delete, not owner. - std::unique_ptr _info; -}; - -/// ChunkResourceMgr is a lightweight manager for holding reservations on subchunks. -class ChunkResourceMgr { -public: - using Ptr = std::shared_ptr; - typedef std::map> Map; - typedef std::map DbMap; - - /// Factory - static Ptr newMgr(SQLBackend::Ptr const& backend); - ChunkResourceMgr(SQLBackend::Ptr const& backend) : _backend(backend) {} - virtual ~ChunkResourceMgr() {} - - /// Reserve a chunk. Currently, this does not result in any explicit chunk - /// loading. - /// @return a ChunkResource which should be used for releasing the - /// reservation. - ChunkResource acquire(std::string const& db, int chunkId, DbTableSet const& tables); - - /// Reserve a list of subchunks for a chunk. If they are not yet available, - /// block until they are. - /// @return a ChunkResource which should be used for releasing the - /// reservation. - ChunkResource acquire(std::string const& db, int chunkId, DbTableSet const& DbTableSet, - IntVector const& subChunks); - - /// Release a reservation. Currently, block until the resource has been - /// released if the resource is no longer needed by anyone. - /// Clients should not need to call this explicitly-- ChunkResource - /// instances are implicit references and will release upon their - /// destruction. - void release(ChunkResource::Info const& i); - - /// Acquire a reservation. Block until it is available if it is not - /// already. Clients should not need to call this explicitly. - void acquireUnit(ChunkResource::Info const& i); - - /// @return the reference count for the database and chunkId. - int getRefCount(std::string const& db, int chunkId); - -private: - /// precondition: _mapMutex is held (locked by the caller) - /// Get the ChunkEntry map for a db, creating if necessary - Map& _getMap(std::string const& db); - - /// precondition: _mapMutex is held (locked by the caller) - /// Get the ChunkEntry for a chunkId, creating if necessary - ChunkEntry& _getChunkEntry(Map& m, int chunkId); - - DbMap _dbMap; - // Consider having separate mutexes for each db's map if contention becomes - // a problem. - std::shared_ptr _backend; - std::mutex _mapMutex; // Do not alter map without this mutex -}; - -} // namespace lsst::qserv::wdb - -#endif // LSST_QSERV_WDB_CHUNKRESOURCE_H diff --git a/src/wdb/QueryRunner.cc b/src/wdb/QueryRunner.cc index 06a2498e8e..355a457c32 100644 --- a/src/wdb/QueryRunner.cc +++ b/src/wdb/QueryRunner.cc @@ -67,7 +67,6 @@ #include "wbase/FileChannelShared.h" #include "wconfig/WorkerConfig.h" #include "wcontrol/SqlConnMgr.h" -#include "wdb/ChunkResource.h" #include "wpublish/QueriesAndChunks.h" namespace { @@ -79,23 +78,20 @@ using namespace std; namespace lsst::qserv::wdb { QueryRunner::Ptr QueryRunner::newQueryRunner(wbase::Task::Ptr const& task, - ChunkResourceMgr::Ptr const& chunkResourceMgr, mysql::MySqlConfig const& mySqlConfig, shared_ptr const& sqlConnMgr, shared_ptr const& queriesAndChunks) { - Ptr qr(new QueryRunner(task, chunkResourceMgr, mySqlConfig, sqlConnMgr, + Ptr qr(new QueryRunner(task, mySqlConfig, sqlConnMgr, queriesAndChunks)); // Private constructor. return qr; } /// New instances need to be made with QueryRunner to ensure registration with the task /// and correct setup of enable_shared_from_this. -QueryRunner::QueryRunner(wbase::Task::Ptr const& task, ChunkResourceMgr::Ptr const& chunkResourceMgr, - mysql::MySqlConfig const& mySqlConfig, +QueryRunner::QueryRunner(wbase::Task::Ptr const& task, mysql::MySqlConfig const& mySqlConfig, shared_ptr const& sqlConnMgr, shared_ptr const& queriesAndChunks) : _task(task), - _chunkResourceMgr(chunkResourceMgr), _mySqlConfig(mySqlConfig), _sqlConnMgr(sqlConnMgr), _queriesAndChunks(queriesAndChunks) { @@ -194,45 +190,15 @@ MYSQL_RES* QueryRunner::_primeResult(string const& query) { return _mysqlConn->getResult(); } -class ChunkResourceRequest { -public: - using Ptr = std::shared_ptr; - - ChunkResourceRequest(shared_ptr const& mgr, wbase::Task& task) - : _mgr(mgr), _task(task) {} - - // Since each Task has only one subchunk, fragment number isn't needed. - ChunkResource getResourceFragment() { - if (!_task.getFragmentHasSubchunks()) { - /// Why acquire anything if there are no subchunks in the fragment? - /// Future: Need to be certain this never happens before removing. - return _mgr->acquire(_task.getDb(), _task.getChunkId(), _task.getDbTbls()); - } - - return _mgr->acquire(_task.getDb(), _task.getChunkId(), _task.getDbTbls(), _task.getSubchunksVect()); - } - -private: - shared_ptr const _mgr; - wbase::Task& _task; -}; - bool QueryRunner::_dispatchChannel() { bool erred = false; bool needToFreeRes = false; // set to true once there are results to be freed. // Collect the result in _transmitData. When a reasonable amount of data has been collected, // or there are no more rows to collect, pass _transmitData to _sendChannel. - ChunkResourceRequest::Ptr req; - ChunkResource::Ptr cr; try { util::Timer subChunkT; subChunkT.start(); - req.reset(new ChunkResourceRequest(_chunkResourceMgr, *_task)); - cr.reset(new ChunkResource(req->getResourceFragment())); subChunkT.stop(); - // TODO: Hold onto this for longer period of time as the odds of reuse are pretty low at this scale - // Ideally, hold it until moving on to the next chunk. Try to clean up ChunkResource code. - auto taskSched = _task->getTaskScheduler(); if (!_cancelled && !_task->getSendChannel()->isDead()) { string const& query = _task->getQueryString(); diff --git a/src/wdb/QueryRunner.h b/src/wdb/QueryRunner.h index 91e83db11a..54f7290c2d 100644 --- a/src/wdb/QueryRunner.h +++ b/src/wdb/QueryRunner.h @@ -43,7 +43,6 @@ #include "qmeta/types.h" #include "util/MultiError.h" #include "wbase/Task.h" -#include "wdb/ChunkResource.h" namespace lsst::qserv::wcontrol { class SqlConnMgr; @@ -61,8 +60,8 @@ class QueryRunner : public wbase::TaskQueryRunner, public std::enable_shared_fro public: using Ptr = std::shared_ptr; static QueryRunner::Ptr newQueryRunner( - wbase::Task::Ptr const& task, ChunkResourceMgr::Ptr const& chunkResourceMgr, - mysql::MySqlConfig const& mySqlConfig, std::shared_ptr const& sqlConnMgr, + wbase::Task::Ptr const& task, mysql::MySqlConfig const& mySqlConfig, + std::shared_ptr const& sqlConnMgr, std::shared_ptr const& queriesAndChunks); // Having more than one copy of this would making tracking its progress difficult. QueryRunner(QueryRunner const&) = delete; @@ -80,8 +79,7 @@ class QueryRunner : public wbase::TaskQueryRunner, public std::enable_shared_fro void cancel() override; protected: - QueryRunner(wbase::Task::Ptr const& task, ChunkResourceMgr::Ptr const& chunkResourceMgr, - mysql::MySqlConfig const& mySqlConfig, + QueryRunner(wbase::Task::Ptr const& task, mysql::MySqlConfig const& mySqlConfig, std::shared_ptr const& sqlConnMgr, std::shared_ptr const& queriesAndChunks); @@ -95,7 +93,6 @@ class QueryRunner : public wbase::TaskQueryRunner, public std::enable_shared_fro wbase::Task::Ptr const _task; ///< Actual task /// Resource reservation - ChunkResourceMgr::Ptr _chunkResourceMgr; std::atomic _cancelled{false}; mysql::MySqlConfig const _mySqlConfig; std::unique_ptr _mysqlConn; diff --git a/src/wdb/QuerySql.cc b/src/wdb/QuerySql.cc index 7dd1279578..886301d6ac 100644 --- a/src/wdb/QuerySql.cc +++ b/src/wdb/QuerySql.cc @@ -51,6 +51,7 @@ namespace { LOG_LOGGER _log = LOG_GET("lsst.qserv.wdb.QuerySql"); +#if 0 template class ScScriptBuilder { public: @@ -71,6 +72,7 @@ class ScScriptBuilder { std::string cleanT; lsst::qserv::wdb::QuerySql& qSql; }; +#endif } // anonymous namespace namespace lsst::qserv::wdb { @@ -116,7 +118,7 @@ QuerySql::QuerySql(std::string const& db, int chunkId, proto::TaskMsg_Fragment c executeList.push_back(ss.str()); ss.str(""); } - +#if 0 if (f.has_subchunks()) { proto::TaskMsg_Subchunk const& sc = f.subchunks(); for (int i = 0; i < sc.dbtbl_size(); ++i) { @@ -128,6 +130,7 @@ QuerySql::QuerySql(std::string const& db, int chunkId, proto::TaskMsg_Fragment c } } } +#endif } } // namespace lsst::qserv::wdb diff --git a/src/wdb/SQLBackend.cc b/src/wdb/SQLBackend.cc deleted file mode 100644 index 53c0670132..0000000000 --- a/src/wdb/SQLBackend.cc +++ /dev/null @@ -1,279 +0,0 @@ -// -*- LSST-C++ -*- -/* - * LSST Data Management System - * Copyright 2014-2016 AURA/LSST. - * - * This product includes software developed by the - * LSST Project (http://www.lsst.org/). - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the LSST License Statement and - * the GNU General Public License along with this program. If not, - * see . - */ - -// Class header -#include "wdb/SQLBackend.h" - -// System headers -#include - -// Third-party headers -#include "boost/uuid/uuid.hpp" -#include "boost/uuid/uuid_generators.hpp" -#include "boost/uuid/uuid_io.hpp" - -// LSST headers -#include "lsst/log/Log.h" - -// Qserv headers -#include "global/constants.h" -#include "sql/SqlResults.h" -#include "wbase/Base.h" - -namespace { - -LOG_LOGGER _log = LOG_GET("lsst.qserv.wdb.ChunkResource"); - -std::string makeUuid() { - boost::uuids::uuid id = boost::uuids::random_generator()(); - return boost::uuids::to_string(id); -} - -} // anonymous namespace - -namespace lsst::qserv::wdb { - -SQLBackend::SQLBackend() : _uid(makeUuid()) {} - -SQLBackend::SQLBackend(mysql::MySqlConfig const& mc) - : _sqlConn(sql::SqlConnectionFactory::make(sql::SqlConfig(mc))), _uid(makeUuid()) { - std::lock_guard lock(_mtx); - _memLockAcquire(); -} - -SQLBackend::~SQLBackend() { - std::lock_guard lock(_mtx); - _memLockRelease(); -} - -std::ostream& operator<<(std::ostream& os, ScTable const& st) { - return os << SUBCHUNKDB_PREFIX << st.dbTable.db << "_" << st.chunkId << "." << st.dbTable.table << "_" - << st.subChunkId; -} - -bool SQLBackend::load(ScTableVector const& v, sql::SqlErrorObject& err) { - using namespace lsst::qserv::wbase; - std::lock_guard lock(_mtx); - _memLockRequireOwnership(); - for (ScTableVector::const_iterator i = v.begin(), e = v.end(); i != e; ++i) { - std::string const* createScript = nullptr; - if (i->chunkId == DUMMY_CHUNK) { - createScript = &CREATE_DUMMY_SUBCHUNK_SCRIPT; - } else { - createScript = &CREATE_SUBCHUNK_SCRIPT; - } - std::string create = (boost::format(*createScript) % i->dbTable.db % i->dbTable.table % - SUB_CHUNK_COLUMN % i->chunkId % i->subChunkId) - .str(); - - if (!_sqlConn->runQuery(create, err)) { - LOGS(_log, LOG_LVL_ERROR, "sql query err=" << err.errMsg() << " with '" << create << "'"); - _discard(v.begin(), i); - return false; - } - } - return true; -} - -void SQLBackend::discard(ScTableVector const& v) { - std::lock_guard lock(_mtx); - _discard(v.begin(), v.end()); -} - -void SQLBackend::memLockRequireOwnership() { - std::lock_guard lock(_mtx); - _memLockRequireOwnership(); -} - -void SQLBackend::_memLockRequireOwnership() { - // Must hold _mtx - if (_memLockStatus() != LOCKED_OURS) { - _exitDueToConflict( - "memLockRequireOwnership could not verify this program owned the memory table lock, " - "Exiting."); - } -} - -void SQLBackend::_discard(ScTableVector::const_iterator begin, ScTableVector::const_iterator end) { - // Must hold _mtx - _memLockRequireOwnership(); - for (ScTableVector::const_iterator i = begin, e = end; i != e; ++i) { - std::string discard = (boost::format(lsst::qserv::wbase::CLEANUP_SUBCHUNK_SCRIPT) % i->dbTable.db % - i->dbTable.table % i->chunkId % i->subChunkId) - .str(); - sql::SqlErrorObject err; - if (!_sqlConn->runQuery(discard, err)) { - throw err; - } - } -} - -/// Run the 'query'. If it fails, terminate the program. -void SQLBackend::_execLockSql(std::string const& query) { - // Must hold _mtx - LOGS(_log, LOG_LVL_DEBUG, "execLockSql " << query); - sql::SqlErrorObject err; - if (!_sqlConn->runQuery(query, err)) { - _exitDueToConflict("Lock failed, exiting. query=" + query + " err=" + err.printErrMsg()); - } -} - -/// Return the status of the lock on the in memory tables. -SQLBackend::LockStatus SQLBackend::_memLockStatus() { - // Must hold _mtx - std::string sql = "SELECT uid FROM " + _lockDbTbl + " WHERE keyId = 1"; - sql::SqlResults results; - sql::SqlErrorObject err; - if (!_sqlConn->runQuery(sql, results, err)) { - // Assuming UNLOCKED should be safe as either it must be LOCKED_OURS to continue - // or we are about to try to lock. Failure to lock will cause the program to exit. - LOGS(_log, LOG_LVL_WARN, - "memLockStatus query failed, assuming UNLOCKED. " << sql << " err=" << err.printErrMsg()); - return UNLOCKED; - } - std::string uidStr; - if (!results.extractFirstValue(uidStr, err)) { - LOGS(_log, LOG_LVL_WARN, - "memLockStatus unexpected results, assuming LOCKED_OTHER. err=" << err.printErrMsg()); - return LOCKED_OTHER; - } - if (uidStr != _uid) { - LOGS(_log, LOG_LVL_WARN, - "memLockStatus LOCKED_OTHER wrong uid. Expected " << _uid << " got " << uidStr - << " err=" << err.printErrMsg()); - return LOCKED_OTHER; - } - return LOCKED_OURS; -} - -/// Attempt to acquire the memory table lock, terminate this program if the lock is not acquired. -// This must be run before any other operations on in memory tables. -void SQLBackend::_memLockAcquire() { - // Must hold _mtx - _lockDb = MEMLOCKDB; - _lockTbl = MEMLOCKTBL; - _lockDbTbl = _lockDb + "." + _lockTbl; - LockStatus mls = _memLockStatus(); - if (mls != UNLOCKED) { - LOGS(_log, LOG_LVL_WARN, "Memory tables were not released cleanly! LockStatus=" << mls); - // Drop the database to clear the table. - std::string sql = "DROP DATABASE " + _lockDb + ";"; - sql::SqlErrorObject err; - if (!_sqlConn->runQuery(sql, err)) { - LOGS(_log, LOG_LVL_WARN, "Could not drop memLockDB " << _lockDb << " " << err.printErrMsg()); - } - } - - // Lock the memory tables. - std::string sql = "CREATE DATABASE IF NOT EXISTS " + _lockDb + ";"; - sql += "CREATE TABLE IF NOT EXISTS " + _lockDbTbl + - " ( keyId INT UNIQUE, uid VARCHAR(255) ) ENGINE = MEMORY;"; - _execLockSql(sql); - // The following 2 lines will cause the new worker to always take the lock. - sql = "TRUNCATE TABLE " + _lockDbTbl; - _execLockSql(sql); - std::ostringstream insert; - insert << "INSERT INTO " << _lockDbTbl << " (keyId, uid) VALUES(1, '" << _uid << "' )"; - _execLockSql(insert.str()); - _lockAcquired = true; - - // Delete any old in memory databases. They could be empty or otherwise wrong. - // Empty tables would prevent new tables from being created. - std::string subChunkPrefix = SUBCHUNKDB_PREFIX; - sql = "SHOW DATABASES LIKE '" + subChunkPrefix + "%'"; - sql::SqlResults results; - sql::SqlErrorObject err; - if (!_sqlConn->runQuery(sql, results, err)) { - _exitDueToConflict("SQLBackend query failed, exiting. " + sql + " err=" + err.printErrMsg()); - } - std::vector databases; - results.extractFirstColumn(databases, err); - for (auto iter = databases.begin(), end = databases.end(); iter != end;) { - // Delete in blocks of 50 to save time. - std::string dropDb = ""; - int count = 0; - while (iter != end && count < 50) { - std::string db = *iter; - ++iter; - // Check that the name is actually a match to subChunkPrefix and not a wild card match. - if (db.compare(0, subChunkPrefix.length(), subChunkPrefix) == 0) { - dropDb += "DROP DATABASE " + db + ";"; - ++count; - } - } - if (count > 0) { - _execLockSql(dropDb); - } - } -} - -/// Delete the memory lock database and everything in it. -void SQLBackend::_memLockRelease() { - // Must hold _mtx - LOGS(_log, LOG_LVL_DEBUG, "memLockRelease"); - if (_lockAcquired && !_lockConflict) { - // Only attempt to release tables if the lock on the db was acquired. - LOGS(_log, LOG_LVL_DEBUG, "memLockRelease releasing lock."); - std::string sql = "DROP DATABASE " + _lockDb + ";"; - _execLockSql(sql); - } -} - -/// Exit the program immediately to reduce minimize possible problems. -void SQLBackend::_exitDueToConflict(const std::string& msg) { - // This will likely not be clean exit, but clean exit is impossible - // with xrootd anyway. - _lockConflict = true; - LOGS(_log, LOG_LVL_ERROR, msg); - exit(EXIT_FAILURE); -} - -bool FakeBackend::load(ScTableVector const& v, sql::SqlErrorObject& err) { - using namespace lsst::qserv::wbase; - std::ostringstream os; - os << "Pretending to load:"; - std::copy(v.begin(), v.end(), std::ostream_iterator(os, ",")); - os << std::endl; - LOGS(_log, LOG_LVL_DEBUG, os.str()); - for (auto& scTbl : v) { - std::string key = makeFakeKey(scTbl); - fakeSet.insert(key); - } - return true; -} - -void FakeBackend::discard(ScTableVector const& v) { - for (auto const& scTbl : v) { - fakeSet.erase(makeFakeKey(scTbl)); - } - _discard(v.begin(), v.end()); -} - -void FakeBackend::_discard(ScTableVector::const_iterator begin, ScTableVector::const_iterator end) { - std::ostringstream os; - os << "Pretending to discard:"; - std::copy(begin, end, std::ostream_iterator(os, ",")); - LOGS(_log, LOG_LVL_DEBUG, os.str()); -} - -} // namespace lsst::qserv::wdb diff --git a/src/wdb/SQLBackend.h b/src/wdb/SQLBackend.h deleted file mode 100644 index a8cd993e2d..0000000000 --- a/src/wdb/SQLBackend.h +++ /dev/null @@ -1,147 +0,0 @@ -// -*- LSST-C++ -*- -/* - * LSST Data Management System - * Copyright 2016 LSST Corporation. - * - * This product includes software developed by the - * LSST Project (http://www.lsst.org/). - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the LSST License Statement and - * the GNU General Public License along with this program. If not, - * see . - */ - -#ifndef LSST_QSERV_WDB_SQLBACKEND_H -#define LSST_QSERV_WDB_SQLBACKEND_H - -// System headers -#include -#include -#include -#include -#include -#include - -// Qserv headers -#include "global/DbTable.h" -#include "sql/SqlConnection.h" -#include "sql/SqlConfig.h" -#include "sql/SqlConnectionFactory.h" -#include "sql/SqlErrorObject.h" - -// Forward declarations -namespace lsst::qserv::mysql { -class MySqlConfig; -} // namespace lsst::qserv::mysql - -namespace lsst::qserv::wdb { - -struct ScTable { - ScTable(int chunkId_, DbTable const& dbTable_, int subChunkId_) - : chunkId(chunkId_), dbTable(dbTable_), subChunkId(subChunkId_) {} - - int chunkId; - DbTable dbTable; - int subChunkId; -}; - -typedef std::vector ScTableVector; - -/// This class maintains a connection to the database for making temporary in-memory tables -/// for subchunks. -/// It is important at startup that any tables from a previous run are deleted. This happens -/// in the SQLBackend constructor call to SQLBackend::_memLockAcquire(). The reason it is so important -/// is that the in-memory tables have their schema written to disk but no data, so they are -/// just a bunch of empty tables when the program starts up. -class SQLBackend { -public: - using Ptr = std::shared_ptr; - - SQLBackend(mysql::MySqlConfig const& mc); - - virtual ~SQLBackend(); - - virtual bool load(ScTableVector const& v, sql::SqlErrorObject& err); - - virtual void discard(ScTableVector const& v); - - enum LockStatus { UNLOCKED, LOCKED_OTHER, LOCKED_OURS }; - - virtual void memLockRequireOwnership(); - -protected: - /// Construct a fake instance - SQLBackend(); - - virtual void _discard(ScTableVector::const_iterator begin, ScTableVector::const_iterator end); - - /// Run the 'query'. If it fails, terminate the program. Must hold _mtx - void _execLockSql(std::string const& query); - - /// Return the status of the lock on the in memory tables. Must hold _mtx - LockStatus _memLockStatus(); - - /// Checks that this program is the owner of the database. Must hold _mtx - void _memLockRequireOwnership(); - - /// Attempt to acquire the memory table lock, terminate this program if the lock is not acquired. - // This must be run before any other operations on in memory tables. Must hold _mtx - void _memLockAcquire(); - - /// Delete the memory lock database and everything in it. Must hold _mtx - void _memLockRelease(); - /// Exit the program immediately to reduce minimize possible problems. - void _exitDueToConflict(const std::string& msg); - - std::shared_ptr _sqlConn; - - // Memory lock table members. - std::atomic _lockConflict{false}; - std::atomic _lockAcquired{false}; - std::string _lockDb; - std::string _lockTbl; - std::string _lockDbTbl; - std::string _uid; // uuid - std::mutex _mtx; // protects the sql connection. -}; - -/// Mock for unit testing other classes. -class FakeBackend : public SQLBackend { -public: - using Ptr = std::shared_ptr; - - FakeBackend() {} - - virtual ~FakeBackend() {} - - bool load(ScTableVector const& v, sql::SqlErrorObject& err) override; - - void discard(ScTableVector const& v) override; - - void memLockRequireOwnership() override {}; ///< Do nothing for fake version. - - /// For unit tests only. - static std::string makeFakeKey(ScTable const& sctbl) { - std::string str = sctbl.dbTable.db + ":" + std::to_string(sctbl.chunkId) + ":" + sctbl.dbTable.table + - ":" + std::to_string(sctbl.subChunkId); - return str; - } - std::set fakeSet; // set of strings for tracking unique tables. - -private: - void _discard(ScTableVector::const_iterator begin, ScTableVector::const_iterator end) override; -}; - -} // namespace lsst::qserv::wdb - -#endif // LSST_QSERV_WDB_SQLBACKEND_H diff --git a/src/wdb/testChunkResource.cc b/src/wdb/testChunkResource.cc deleted file mode 100644 index 13780d0313..0000000000 --- a/src/wdb/testChunkResource.cc +++ /dev/null @@ -1,142 +0,0 @@ -// -*- LSST-C++ -*- -/* - * LSST Data Management System - * Copyright 2014-2015 AURA/LSST. - * - * This product includes software developed by the - * LSST Project (http://www.lsst.org/). - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the LSST License Statement and - * the GNU General Public License along with this program. If not, - * see . - */ -/** - * @brief Simple testing for class ChunkResource - * - * @author Daniel L. Wang, SLAC - */ - -// System headers -#include -#include - -// Qserv headers -#include "wdb/ChunkResource.h" - -// Boost unit test header -#define BOOST_TEST_MODULE ChunkResource_1 -#include - -namespace test = boost::test_tools; - -using lsst::qserv::wdb::ChunkResource; -using lsst::qserv::wdb::ChunkResourceMgr; -using lsst::qserv::wdb::FakeBackend; - -struct Fixture { - Fixture() { - for (int i = 11; i < 16; ++i) { - subchunks.push_back(i); - } - thedb = "Snowden"; - tables.emplace(lsst::qserv::DbTable(thedb, "hello")); - tables.emplace(lsst::qserv::DbTable(thedb, "goodbye")); - } - ~Fixture() {} - - std::vector subchunks; - std::string thedb; - lsst::qserv::DbTableSet tables; -}; - -BOOST_FIXTURE_TEST_SUITE(All, Fixture) - -BOOST_AUTO_TEST_CASE(Basic) { - auto backend = std::make_shared(); - std::shared_ptr crm = ChunkResourceMgr::newMgr(backend); - BOOST_CHECK(backend->fakeSet.empty()); - BOOST_CHECK(crm->getRefCount(thedb, 12345) == 0); - { - ChunkResource cr12345(crm->acquire(thedb, 12345, tables)); - std::cout << "backend->fakeSet.size() ==" << backend->fakeSet.size() << std::endl; - BOOST_CHECK(backend->fakeSet.size() == 0); - - subchunks = {28, 33}; - ChunkResource cr12345sub(crm->acquire(thedb, 12345, tables, subchunks)); - BOOST_CHECK(backend->fakeSet.size() == 4); // 2 tables * 2 subchunks - BOOST_CHECK(crm->getRefCount(thedb, 12345) == 2); - { - ChunkResource foo = cr12345; - BOOST_CHECK(crm->getRefCount(thedb, 12345) == 3); - - ChunkResource bar(cr12345sub); - BOOST_CHECK(crm->getRefCount(thedb, 12345) == 4); - BOOST_CHECK(backend->fakeSet.size() == 4); - } - BOOST_CHECK(crm->getRefCount(thedb, 12345) == 2); - { - ChunkResource foo = cr12345sub; - BOOST_CHECK(crm->getRefCount(thedb, 12345) == 3); - - ChunkResource bar(cr12345); - BOOST_CHECK(crm->getRefCount(thedb, 12345) == 4); - BOOST_CHECK(backend->fakeSet.size() == 4); - } - // now, these resources should be in acquired - BOOST_CHECK(backend->fakeSet.size() == 4); - } - // Now, these resources should be freed. - BOOST_CHECK(crm->getRefCount(thedb, 12345) == 0); - BOOST_CHECK(backend->fakeSet.size() == 0); -} - -BOOST_AUTO_TEST_CASE(TwoChunk) { - auto backend = std::make_shared(); - std::shared_ptr crm = ChunkResourceMgr::newMgr(backend); - int scarray[] = {11, 12, 13, 14, 15}; - std::vector subchunks(scarray, scarray + 5); - std::string thedb("Snowden"); - lsst::qserv::DbTableSet tables; - tables.emplace(lsst::qserv::DbTable(thedb, "hello")); - tables.emplace(lsst::qserv::DbTable(thedb, "goodbye")); - { - BOOST_CHECK(backend->fakeSet.size() == 0); - BOOST_CHECK(crm->getRefCount(thedb, 1) == 0); - BOOST_CHECK(crm->getRefCount(thedb, 12345) == 0); - - ChunkResource cr12345(crm->acquire(thedb, 12345, tables)); - BOOST_CHECK(crm->getRefCount(thedb, 12345) == 1); - BOOST_CHECK(backend->fakeSet.size() == 0); - - ChunkResource cr12345sub(crm->acquire(thedb, 1, tables, subchunks)); - BOOST_CHECK(crm->getRefCount(thedb, 12345) == 1); - BOOST_CHECK(crm->getRefCount(thedb, 1) == 1); - BOOST_CHECK(backend->fakeSet.size() == 10); // 2 tables * 5 subchunks - - ChunkResource foo = cr12345sub; - BOOST_CHECK(crm->getRefCount(thedb, 12345) == 1); - BOOST_CHECK(crm->getRefCount(thedb, 1) == 2); - BOOST_CHECK(backend->fakeSet.size() == 10); - - ChunkResource bar(cr12345sub); - // now, these resources should be in acquired - BOOST_CHECK(crm->getRefCount(thedb, 12345) == 1); - BOOST_CHECK(crm->getRefCount(thedb, 1) == 3); - BOOST_CHECK(backend->fakeSet.size() == 10); - } - // Now, these resources should be freed. - BOOST_CHECK(crm->getRefCount(thedb, 12345) == 0); - BOOST_CHECK(backend->fakeSet.size() == 0); -} - -BOOST_AUTO_TEST_SUITE_END() diff --git a/src/wdb/testQueryRunner.cc b/src/wdb/testQueryRunner.cc index 319d4252b2..63b2506d6a 100644 --- a/src/wdb/testQueryRunner.cc +++ b/src/wdb/testQueryRunner.cc @@ -34,7 +34,6 @@ #include "wbase/Task.h" #include "wconfig/WorkerConfig.h" #include "wcontrol/SqlConnMgr.h" -#include "wdb/ChunkResource.h" #include "wdb/QueryRunner.h" #include "wpublish/QueriesAndChunks.h" @@ -60,9 +59,6 @@ using lsst::qserv::wbase::SendChannel; using lsst::qserv::wbase::Task; using lsst::qserv::wconfig::WorkerConfig; using lsst::qserv::wcontrol::SqlConnMgr; -using lsst::qserv::wdb::ChunkResource; -using lsst::qserv::wdb::ChunkResourceMgr; -using lsst::qserv::wdb::FakeBackend; using lsst::qserv::wdb::QueryRunner; using lsst::qserv::wpublish::QueriesAndChunks; @@ -107,13 +103,11 @@ BOOST_AUTO_TEST_CASE(Simple) { shared_ptr msg(newTaskMsg()); shared_ptr sendC(SendChannel::newNopChannel()); auto sc = FileChannelShared::create(sendC, msg->czarid()); - FakeBackend::Ptr backend = make_shared(); - shared_ptr crm = ChunkResourceMgr::newMgr(backend); SqlConnMgr::Ptr sqlConnMgr = make_shared(20, 15); auto const queries = queriesAndChunks(); - auto taskVect = Task::createTasks(msg, sc, crm, newMySqlConfig(), sqlConnMgr, queries); + auto taskVect = Task::createTasks(msg, sc, newMySqlConfig(), sqlConnMgr, queries); Task::Ptr task = taskVect[0]; - QueryRunner::Ptr a(QueryRunner::newQueryRunner(task, crm, newMySqlConfig(), sqlConnMgr, queries)); + QueryRunner::Ptr a(QueryRunner::newQueryRunner(task, newMySqlConfig(), sqlConnMgr, queries)); BOOST_CHECK(a->runQuery()); } @@ -123,13 +117,11 @@ BOOST_AUTO_TEST_CASE(Output) { shared_ptr msg(newTaskMsg()); shared_ptr sendC(SendChannel::newStringChannel(out)); auto sc = FileChannelShared::create(sendC, msg->czarid()); - FakeBackend::Ptr backend = make_shared(); - shared_ptr crm = ChunkResourceMgr::newMgr(backend); SqlConnMgr::Ptr sqlConnMgr = make_shared(20, 15); auto const queries = queriesAndChunks(); - auto taskVect = Task::createTasks(msg, sc, crm, newMySqlConfig(), sqlConnMgr, queries); + auto taskVect = Task::createTasks(msg, sc, newMySqlConfig(), sqlConnMgr, queries); Task::Ptr task = taskVect[0]; - QueryRunner::Ptr a(QueryRunner::newQueryRunner(task, crm, newMySqlConfig(), sqlConnMgr, queries)); + QueryRunner::Ptr a(QueryRunner::newQueryRunner(task, newMySqlConfig(), sqlConnMgr, queries)); BOOST_CHECK(a->runQuery()); } diff --git a/src/wsched/testSchedulers.cc b/src/wsched/testSchedulers.cc index f27b7269c9..8c1e785432 100644 --- a/src/wsched/testSchedulers.cc +++ b/src/wsched/testSchedulers.cc @@ -66,7 +66,6 @@ using lsst::qserv::wbase::SendChannel; using lsst::qserv::wbase::Task; using lsst::qserv::wconfig::WorkerConfig; using lsst::qserv::wcontrol::SqlConnMgr; -using lsst::qserv::wdb::ChunkResourceMgr; using lsst::qserv::wpublish::QueriesAndChunks; double const oneHr = 60.0; @@ -75,9 +74,8 @@ bool const resetForTestingC = true; int const maxBootedC = 5; int const maxDarkTasksC = 25; -shared_ptr crm; // not used in this test, required by Task::createTasks -MySqlConfig mySqlConfig; // not used in this test, required by Task::createTasks -SqlConnMgr::Ptr sqlConnMgr; // not used in this test, required by Task::createTasks +MySqlConfig mySqlConfig; // not used in this test, required by Task::createTasks +SqlConnMgr::Ptr sqlConnMgr; // not used in this test, required by Task::createTasks auto workerCfg = lsst::qserv::wconfig::WorkerConfig::create(); @@ -88,7 +86,7 @@ Task::Ptr makeTask(std::shared_ptr tm, shared_ptr con auto sendC = std::make_shared(); auto sc = FileChannelShared::create(sendC, tm->czarid()); locSendSharedPtrs.push_back(sc); - auto taskVect = Task::createTasks(tm, sc, crm, mySqlConfig, sqlConnMgr, queries); + auto taskVect = Task::createTasks(tm, sc, mySqlConfig, sqlConnMgr, queries); Task::Ptr task = taskVect[0]; return task; } diff --git a/src/xrdsvc/SsiRequest.cc b/src/xrdsvc/SsiRequest.cc index c1c9da2341..02a169bf2f 100644 --- a/src/xrdsvc/SsiRequest.cc +++ b/src/xrdsvc/SsiRequest.cc @@ -157,9 +157,9 @@ void SsiRequest::execute(XrdSsiRequest& req) { } _channelShared = wbase::FileChannelShared::create(sendChannel, taskMsg->czarid(), _foreman->chunkInventory()->id()); - auto const tasks = wbase::Task::createTasks(taskMsg, _channelShared, _foreman->chunkResourceMgr(), - _foreman->mySqlConfig(), _foreman->sqlConnMgr(), - _foreman->queriesAndChunks(), _foreman->httpPort()); + auto const tasks = wbase::Task::createTasks(taskMsg, _channelShared, _foreman->mySqlConfig(), + _foreman->sqlConnMgr(), _foreman->queriesAndChunks(), + _foreman->httpPort()); for (auto const& task : tasks) { _tasks.push_back(task); }