Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/ccontrol/UserQueryFactory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,11 @@ UserQuery::Ptr UserQueryFactory::newUserQuery(std::string const& aQuery, std::st
}
auto stmt = parser->getSelectStmt();

// Parsing doesn't need protection, but CSS in analyze query does and
// some of the uq related calls may require protection, but it
// isn't clear.
std::lock_guard<std::mutex> factoryLock(_factoryMtx);

// handle special database/table names
if (_stmtRefersToProcessListTable(stmt, defaultDb)) {
return _makeUserQueryProcessList(stmt, _userQuerySharedResources, userQueryId, resultDb, aQuery,
Expand Down Expand Up @@ -378,12 +383,14 @@ UserQuery::Ptr UserQueryFactory::newUserQuery(std::string const& aQuery, std::st
}
return uq;
} else if (UserQueryType::isSelectResult(query, userJobId)) {
std::lock_guard<std::mutex> factoryLock(_factoryMtx);
auto uq = std::make_shared<UserQueryAsyncResult>(userJobId, _userQuerySharedResources->czarId,
_userQuerySharedResources->queryMetadata);
LOGS(_log, LOG_LVL_DEBUG, "make UserQueryAsyncResult: userJobId=" << userJobId);
return uq;
} else if (UserQueryType::isShowProcessList(query, full)) {
LOGS(_log, LOG_LVL_DEBUG, "make UserQueryProcessList: full=" << (full ? 'y' : 'n'));
std::lock_guard<std::mutex> factoryLock(_factoryMtx);
try {
return std::make_shared<UserQueryProcessList>(full, _userQuerySharedResources->qMetaSelect,
_userQuerySharedResources->czarId, userQueryId,
Expand All @@ -392,10 +399,12 @@ UserQuery::Ptr UserQueryFactory::newUserQuery(std::string const& aQuery, std::st
return std::make_shared<UserQueryInvalid>(exc.what());
}
} else if (UserQueryType::isCall(query)) {
std::lock_guard<std::mutex> factoryLock(_factoryMtx);
auto parser = std::make_shared<ParseRunner>(
query, _userQuerySharedResources->makeUserQueryResources(userQueryId, resultDb));
return parser->getUserQuery();
} else if (UserQueryType::isSet(query)) {
std::lock_guard<std::mutex> factoryLock(_factoryMtx);
ParseRunner::Ptr parser;
try {
parser = std::make_shared<ParseRunner>(query);
Expand All @@ -414,6 +423,7 @@ UserQuery::Ptr UserQueryFactory::newUserQuery(std::string const& aQuery, std::st
}
return uq;
} else {
std::lock_guard<std::mutex> factoryLock(_factoryMtx);
// something that we don't recognize
auto uq = std::make_shared<UserQueryInvalid>("Invalid or unsupported query: " + query);
return uq;
Expand Down
4 changes: 4 additions & 0 deletions src/ccontrol/UserQueryFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ class UserQueryFactory : private boost::noncopyable {
boost::asio::io_service _asioIoService;
std::unique_ptr<boost::asio::io_service::work> _asioWork;
std::unique_ptr<std::thread> _asioTimerThread;

/// Protects CSS in `qs->analyzeQuery(query, stmt);` and
/// protects uq calls that alter the database.
std::mutex _factoryMtx;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CSS and all of these other things should be made thread safe, but that's a much bigger job and CSS is about to be under the microscope for other reasons in the next couple of weeks.

};

} // namespace lsst::qserv::ccontrol
Expand Down
8 changes: 2 additions & 6 deletions src/czar/Czar.cc
Original file line number Diff line number Diff line change
Expand Up @@ -224,12 +224,8 @@ SubmitResult Czar::submitQuery(string const& query, map<string, string> const& h

// make new UserQuery
// this is atomic
ccontrol::UserQuery::Ptr uq;
{
lock_guard<mutex> lock(_mutex);
uq = _uqFactory->newUserQuery(query, defaultDb, getQdispSharedResources(), userQueryId, msgTableName,
resultDb);
}
ccontrol::UserQuery::Ptr uq = _uqFactory->newUserQuery(query, defaultDb, getQdispSharedResources(),
userQueryId, msgTableName, resultDb);

// Add logging context with query ID
QSERV_LOGCONTEXT_QUERY(uq->getQueryId());
Expand Down