Skip to content

Commit e1f80fd

Browse files
committed
Rebase fixes.
1 parent a27525c commit e1f80fd

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+766
-992
lines changed

src/CMakeLists.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@ add_subdirectory(css)
6262
add_subdirectory(czar)
6363
add_subdirectory(global)
6464
add_subdirectory(http)
65-
add_subdirectory(memman)
6665
add_subdirectory(mimic)
6766
add_subdirectory(mysql)
6867
add_subdirectory(parser)

src/admin/templates/xrootd/etc/xrdssi.cf.jinja

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -11,19 +11,6 @@ socket = {{ db_socket }}
1111
username = qsmaster
1212
password = CHANGEME2
1313

14-
[memman]
15-
16-
# MemMan class to use for managing memory for tables
17-
# can be "MemManReal", "MemManNone" or "MemManNoneRelaxed"
18-
# class = MemManReal
19-
20-
# Memory available for locking tables, in MB
21-
# memory = 1000
22-
memory = 7900
23-
24-
# Path to database tables
25-
location = /qserv/data/mysql
26-
2714
[scheduler]
2815

2916
# Thread pool size
@@ -79,13 +66,6 @@ maxsqlconn = 980
7966
# This value must be less than maxsqlconn.
8067
reservedinteractivesqlconn = 930
8168

82-
[transmits]
83-
# Maximum number of concurrent transmits to a czar.
84-
maxtransmits = 50
85-
# If more than this number of large transmits is happening at once, wait to
86-
# start more transmits until some are done.
87-
maxalreadytransmitting = 10
88-
8969
[results]
9070

9171
# The name of a folder where query results will be stored.

src/cconfig/CzarConfig.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,6 @@ class CzarConfig {
124124
/// Getters for result aggregation options.
125125
int getMaxTableSizeMB() const { return _maxTableSizeMB->getVal(); }
126126
int getMaxSqlConnectionAttempts() const { return _maxSqlConnectionAttempts->getVal(); }
127-
int getResultMaxConnections() const { return _resultMaxConnections->getVal(); }
128127

129128
/// The size of the TCP connection pool witin the client API that is used
130129
/// by the merger to pool result files from workers via the HTTP protocol.
@@ -285,8 +284,6 @@ class CzarConfig {
285284
util::ConfigValTInt::create(_configValMap, "resultdb", "maxtablesize_mb", notReq, 5001);
286285
CVTIntPtr _maxSqlConnectionAttempts =
287286
util::ConfigValTInt::create(_configValMap, "resultdb", "maxsqlconnectionattempts", notReq, 10);
288-
CVTIntPtr _resultMaxConnections =
289-
util::ConfigValTInt::create(_configValMap, "resultdb", "maxconnections", notReq, 40);
290287
CVTIntPtr _resultMaxHttpConnections =
291288
util::ConfigValTInt::create(_configValMap, "resultdb", "maxhttpconnections", notReq, 2000);
292289
CVTIntPtr _oldestResultKeptDays =

src/ccontrol/CMakeLists.txt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,7 @@ target_sources(ccontrol PRIVATE
1212
ParseRunner.cc
1313
QueryState.cc
1414
UserQueryAsyncResult.cc
15-
UserQueryDrop.cc
1615
UserQueryFactory.cc
17-
UserQueryFlushChunksCache.cc
1816
UserQueryProcessList.cc
1917
UserQueryQueries.cc
2018
UserQuerySelectCountStar.cc

src/ccontrol/MergingHandler.cc

Lines changed: 254 additions & 126 deletions
Large diffs are not rendered by default.

src/ccontrol/MergingHandler.h

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,12 @@ class MergingHandler : public qdisp::ResponseHandler {
7474

7575
/// @see ResponseHandler::flushHttp
7676
/// @see MerginHandler::_mergeHttp
77-
std::tuple<bool, bool> flushHttp(std::string const& fileUrl, uint64_t expectedRows,
78-
uint64_t& resultRows) override;
77+
std::tuple<bool, bool> flushHttp(
78+
std::string const& fileUrl, uint64_t fileSize, uint64_t expectedRows,
79+
/* &&&
80+
std::tuple<bool, bool> flushHttp(std::string const& fileUrl, uint64_t expectedRows,
81+
>>>>>>> a27525c04017db9a30061fa0bb4b5228c0c5d1b2 */
82+
uint64_t& resultRows) override;
7983

8084
/// @see ResponseHandler::flushHttpError
8185
void flushHttpError(int errorCode, std::string const& errorMsg, int status) override;
@@ -88,7 +92,9 @@ class MergingHandler : public qdisp::ResponseHandler {
8892

8993
private:
9094
/// Call InfileMerger to do the work of merging this data to the result.
91-
bool _mergeHttp(std::shared_ptr<qdisp::UberJob> const& uberJob, proto::ResponseData const& responseData);
95+
96+
bool _mergeHttp(std::shared_ptr<qdisp::UberJob> const& uberJob, std::string const& fileUrl,
97+
uint64_t fileSize);
9298

9399
/// Set error code and string.
94100
void _setError(int code, std::string const& msg, int errorState);

src/ccontrol/UserQueryAsyncResult.cc

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,13 @@
3131
#include "lsst/log/Log.h"
3232

3333
// Qserv headers
34+
#include "cconfig/CzarConfig.h"
3435
#include "qmeta/Exceptions.h"
3536
#include "qmeta/JobStatus.h"
3637
#include "qmeta/QMeta.h"
3738
#include "qmeta/MessageStore.h"
3839
#include "sql/SqlConnection.h"
40+
#include "sql/SqlConnectionFactory.h"
3941
#include "sql/SqlResults.h"
4042

4143
namespace {
@@ -46,13 +48,11 @@ namespace lsst::qserv::ccontrol {
4648

4749
// Constructors
4850
UserQueryAsyncResult::UserQueryAsyncResult(QueryId queryId, qmeta::CzarId qMetaCzarId,
49-
std::shared_ptr<qmeta::QMeta> const& qMeta,
50-
sql::SqlConnection* resultDbConn)
51+
std::shared_ptr<qmeta::QMeta> const& qMeta)
5152
: UserQuery(),
5253
_queryId(queryId),
5354
_qMetaCzarId(qMetaCzarId),
5455
_qMeta(qMeta),
55-
_resultDbConn(resultDbConn),
5656
_messageStore(std::make_shared<qmeta::MessageStore>()) {
5757
LOGS(_log, LOG_LVL_DEBUG, "UserQueryAsyncResult: QID=" << queryId);
5858

@@ -122,9 +122,11 @@ void UserQueryAsyncResult::submit() {
122122
std::string const resultTableName = _qInfo.resultLocation().substr(6);
123123

124124
// check that message and result tables exist
125+
auto const czarConfig = cconfig::CzarConfig::instance();
126+
auto const resultDbConn = sql::SqlConnectionFactory::make(czarConfig->getMySqlResultConfig());
125127
sql::SqlErrorObject sqlErrObj;
126-
if (!_resultDbConn->tableExists(_qInfo.msgTableName(), sqlErrObj) or
127-
!_resultDbConn->tableExists(resultTableName, sqlErrObj)) {
128+
if (!resultDbConn->tableExists(_qInfo.msgTableName(), sqlErrObj) or
129+
!resultDbConn->tableExists(resultTableName, sqlErrObj)) {
128130
std::string message = "Result or message table does not exist, result is likely expired.";
129131
LOGS(_log, LOG_LVL_DEBUG, message);
130132
_messageStore->addErrorMessage("SYSTEM", message);
@@ -135,7 +137,7 @@ void UserQueryAsyncResult::submit() {
135137
// into the message store, at this point original result table must be unlocked
136138
std::string query = "SELECT chunkId, code, message, severity, timeStamp FROM " + _qInfo.msgTableName();
137139
sql::SqlResults sqlResults;
138-
if (!_resultDbConn->runQuery(query, sqlResults, sqlErrObj)) {
140+
if (!resultDbConn->runQuery(query, sqlResults, sqlErrObj)) {
139141
LOGS(_log, LOG_LVL_ERROR, "Failed to retrieve message table data: " << sqlErrObj.errMsg());
140142
std::string message = "Failed to retrieve message table data.";
141143
_messageStore->addErrorMessage("SYSTEM_SQL", message);
@@ -170,7 +172,7 @@ void UserQueryAsyncResult::submit() {
170172
// of results I'm going to drop this table now, meaning result can be only
171173
// retrieved once.
172174
query = "DROP TABLE " + _qInfo.msgTableName();
173-
if (!_resultDbConn->runQuery(query, sqlErrObj)) {
175+
if (!resultDbConn->runQuery(query, sqlErrObj)) {
174176
LOGS(_log, LOG_LVL_ERROR, "Failed to drop message table: " << sqlErrObj.errMsg());
175177
// Users do not care about this error, so don't send it upstream.
176178
} else {

src/ccontrol/UserQueryAsyncResult.h

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,6 @@ namespace lsst::qserv::qmeta {
3939
class QMeta;
4040
}
4141

42-
namespace lsst::qserv::sql {
43-
class SqlConnection;
44-
}
45-
4642
namespace lsst::qserv::ccontrol {
4743

4844
/// @addtogroup ccontrol
@@ -61,10 +57,9 @@ class UserQueryAsyncResult : public UserQuery {
6157
* @param queryId: Query ID for which to return result
6258
* @param qMetaCzarId: ID for current czar
6359
* @param qMetaSelect: QMetaSelect instance
64-
* @param resultDbConn: Connection to results database
6560
*/
6661
UserQueryAsyncResult(QueryId queryId, qmeta::CzarId qMetaCzarId,
67-
std::shared_ptr<qmeta::QMeta> const& qMeta, sql::SqlConnection* resultDbConn);
62+
std::shared_ptr<qmeta::QMeta> const& qMeta);
6863

6964
// Destructor
7065
~UserQueryAsyncResult();
@@ -111,7 +106,6 @@ class UserQueryAsyncResult : public UserQuery {
111106
QueryId _queryId;
112107
qmeta::CzarId _qMetaCzarId;
113108
std::shared_ptr<qmeta::QMeta> _qMeta;
114-
sql::SqlConnection* _resultDbConn;
115109
qmeta::QInfo _qInfo;
116110
std::shared_ptr<qmeta::MessageStore> _messageStore;
117111
QueryState _qState = UNKNOWN;

src/ccontrol/UserQueryFactory.cc

Lines changed: 11 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,6 @@
4141
#include "ccontrol/ConfigMap.h"
4242
#include "ccontrol/ParseRunner.h"
4343
#include "ccontrol/UserQueryAsyncResult.h"
44-
#include "ccontrol/UserQueryDrop.h"
45-
#include "ccontrol/UserQueryFlushChunksCache.h"
4644
#include "ccontrol/UserQueryInvalid.h"
4745
#include "ccontrol/UserQueryProcessList.h"
4846
#include "ccontrol/UserQueryQueries.h"
@@ -135,8 +133,7 @@ std::shared_ptr<UserQuery> _makeUserQueryProcessList(query::SelectStmt::Ptr& stm
135133
}
136134
LOGS(_log, LOG_LVL_DEBUG, "SELECT query is a PROCESSLIST");
137135
try {
138-
return std::make_shared<UserQueryProcessList>(stmt, sharedResources->resultDbConn.get(),
139-
sharedResources->qMetaSelect,
136+
return std::make_shared<UserQueryProcessList>(stmt, sharedResources->qMetaSelect,
140137
sharedResources->qMetaCzarId, userQueryId, resultDb);
141138
} catch (std::exception const& exc) {
142139
return std::make_shared<UserQueryInvalid>(exc.what());
@@ -164,9 +161,8 @@ std::shared_ptr<UserQuery> _makeUserQueryQueries(query::SelectStmt::Ptr& stmt,
164161
}
165162
LOGS(_log, LOG_LVL_DEBUG, "SELECT query is a QUERIES");
166163
try {
167-
return std::make_shared<UserQueryQueries>(stmt, sharedResources->resultDbConn.get(),
168-
sharedResources->qMetaSelect, sharedResources->qMetaCzarId,
169-
userQueryId, resultDb);
164+
return std::make_shared<UserQueryQueries>(stmt, sharedResources->qMetaSelect,
165+
sharedResources->qMetaCzarId, userQueryId, resultDb);
170166
} catch (std::exception const& exc) {
171167
return std::make_shared<UserQueryInvalid>(exc.what());
172168
}
@@ -219,8 +215,7 @@ std::shared_ptr<UserQuerySharedResources> makeUserQuerySharedResources(
219215
std::make_shared<qmeta::QMetaMysql>(czarConfig->getMySqlQmetaConfig(),
220216
czarConfig->getMaxMsgSourceStore()),
221217
std::make_shared<qmeta::QStatusMysql>(czarConfig->getMySqlQStatusDataConfig()),
222-
std::make_shared<qmeta::QMetaSelect>(czarConfig->getMySqlQmetaConfig()),
223-
sql::SqlConnectionFactory::make(czarConfig->getMySqlResultConfig()), dbModels, czarName,
218+
std::make_shared<qmeta::QMetaSelect>(czarConfig->getMySqlQmetaConfig()), dbModels, czarName,
224219
czarConfig->getInteractiveChunkLimit());
225220
}
226221

@@ -325,9 +320,9 @@ UserQuery::Ptr UserQueryFactory::newUserQuery(std::string const& aQuery, std::st
325320
qmetaHasDataForSelectCountStarQuery(stmt, _userQuerySharedResources, defaultDb, rowsTable)) {
326321
LOGS(_log, LOG_LVL_DEBUG, "make UserQuerySelectCountStar");
327322
auto uq = std::make_shared<UserQuerySelectCountStar>(
328-
query, _userQuerySharedResources->resultDbConn, _userQuerySharedResources->qMetaSelect,
329-
_userQuerySharedResources->queryMetadata, userQueryId, rowsTable, resultDb, countSpelling,
330-
_userQuerySharedResources->qMetaCzarId, async);
323+
query, _userQuerySharedResources->qMetaSelect, _userQuerySharedResources->queryMetadata,
324+
userQueryId, rowsTable, resultDb, countSpelling, _userQuerySharedResources->qMetaCzarId,
325+
async);
331326
uq->qMetaRegister(resultLocation, msgTableName);
332327
return uq;
333328
}
@@ -372,8 +367,8 @@ UserQuery::Ptr UserQueryFactory::newUserQuery(std::string const& aQuery, std::st
372367
auto uq = std::make_shared<UserQuerySelect>(
373368
qs, messageStore, executive, _userQuerySharedResources->databaseModels, infileMergerConfig,
374369
_userQuerySharedResources->secondaryIndex, _userQuerySharedResources->queryMetadata,
375-
_userQuerySharedResources->queryStatsData, _userQuerySharedResources->semaMgrConnections,
376-
_userQuerySharedResources->qMetaCzarId, errorExtra, async, resultDb, uberJobMaxChunks);
370+
_userQuerySharedResources->queryStatsData, _userQuerySharedResources->qMetaCzarId, errorExtra,
371+
async, resultDb, uberJobMaxChunks);
377372
if (sessionValid) {
378373
uq->qMetaRegister(resultLocation, msgTableName);
379374
uq->setupMerger();
@@ -383,39 +378,13 @@ UserQuery::Ptr UserQueryFactory::newUserQuery(std::string const& aQuery, std::st
383378
return uq;
384379
} else if (UserQueryType::isSelectResult(query, userJobId)) {
385380
auto uq = std::make_shared<UserQueryAsyncResult>(userJobId, _userQuerySharedResources->qMetaCzarId,
386-
_userQuerySharedResources->queryMetadata,
387-
_userQuerySharedResources->resultDbConn.get());
381+
_userQuerySharedResources->queryMetadata);
388382
LOGS(_log, LOG_LVL_DEBUG, "make UserQueryAsyncResult: userJobId=" << userJobId);
389383
return uq;
390-
} else if (UserQueryType::isDropTable(query, dbName, tableName)) {
391-
// processing DROP TABLE
392-
if (dbName.empty()) {
393-
dbName = defaultDb;
394-
}
395-
auto uq = std::make_shared<UserQueryDrop>(_userQuerySharedResources->css, dbName, tableName,
396-
_userQuerySharedResources->resultDbConn.get(),
397-
_userQuerySharedResources->queryMetadata,
398-
_userQuerySharedResources->qMetaCzarId);
399-
LOGS(_log, LOG_LVL_DEBUG, "make UserQueryDrop: " << dbName << "." << tableName);
400-
return uq;
401-
} else if (UserQueryType::isDropDb(query, dbName)) {
402-
// processing DROP DATABASE
403-
auto uq = std::make_shared<UserQueryDrop>(_userQuerySharedResources->css, dbName, std::string(),
404-
_userQuerySharedResources->resultDbConn.get(),
405-
_userQuerySharedResources->queryMetadata,
406-
_userQuerySharedResources->qMetaCzarId);
407-
LOGS(_log, LOG_LVL_DEBUG, "make UserQueryDrop: db=" << dbName);
408-
return uq;
409-
} else if (UserQueryType::isFlushChunksCache(query, dbName)) {
410-
auto uq = std::make_shared<UserQueryFlushChunksCache>(_userQuerySharedResources->css, dbName,
411-
_userQuerySharedResources->resultDbConn.get());
412-
LOGS(_log, LOG_LVL_DEBUG, "make UserQueryFlushChunksCache: " << dbName);
413-
return uq;
414384
} else if (UserQueryType::isShowProcessList(query, full)) {
415385
LOGS(_log, LOG_LVL_DEBUG, "make UserQueryProcessList: full=" << (full ? 'y' : 'n'));
416386
try {
417-
return std::make_shared<UserQueryProcessList>(full, _userQuerySharedResources->resultDbConn.get(),
418-
_userQuerySharedResources->qMetaSelect,
387+
return std::make_shared<UserQueryProcessList>(full, _userQuerySharedResources->qMetaSelect,
419388
_userQuerySharedResources->qMetaCzarId, userQueryId,
420389
resultDb);
421390
} catch (std::exception const& exc) {

src/ccontrol/UserQueryProcessList.cc

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,14 @@
3535
// Qserv headers
3636
#include "css/CssAccess.h"
3737
#include "css/CssError.h"
38+
#include "cconfig/CzarConfig.h"
3839
#include "qmeta/MessageStore.h"
3940
#include "qmeta/Exceptions.h"
4041
#include "qmeta/QMetaSelect.h"
4142
#include "query/FromList.h"
4243
#include "query/SelectStmt.h"
4344
#include "sql/SqlConnection.h"
45+
#include "sql/SqlConnectionFactory.h"
4446
#include "sql/SqlErrorObject.h"
4547
#include "sql/SqlBulkInsert.h"
4648
#include "sql/statement.h"
@@ -61,12 +63,10 @@ namespace lsst::qserv::ccontrol {
6163

6264
// Constructor
6365
UserQueryProcessList::UserQueryProcessList(std::shared_ptr<query::SelectStmt> const& statement,
64-
sql::SqlConnection* resultDbConn,
6566
std::shared_ptr<qmeta::QMetaSelect> const& qMetaSelect,
6667
qmeta::CzarId qMetaCzarId, std::string const& userQueryId,
6768
std::string const& resultDb)
68-
: _resultDbConn(resultDbConn),
69-
_qMetaSelect(qMetaSelect),
69+
: _qMetaSelect(qMetaSelect),
7070
_qMetaCzarId(qMetaCzarId),
7171
_messageStore(std::make_shared<qmeta::MessageStore>()),
7272
_resultTableName(::g_nextResultTableId(userQueryId)),
@@ -90,12 +90,10 @@ UserQueryProcessList::UserQueryProcessList(std::shared_ptr<query::SelectStmt> co
9090
}
9191
}
9292

93-
UserQueryProcessList::UserQueryProcessList(bool full, sql::SqlConnection* resultDbConn,
94-
std::shared_ptr<qmeta::QMetaSelect> const& qMetaSelect,
93+
UserQueryProcessList::UserQueryProcessList(bool full, std::shared_ptr<qmeta::QMetaSelect> const& qMetaSelect,
9594
qmeta::CzarId qMetaCzarId, std::string const& userQueryId,
9695
std::string const& resultDb)
97-
: _resultDbConn(resultDbConn),
98-
_qMetaSelect(qMetaSelect),
96+
: _qMetaSelect(qMetaSelect),
9997
_qMetaCzarId(qMetaCzarId),
10098
_messageStore(std::make_shared<qmeta::MessageStore>()),
10199
_resultTableName(::g_nextResultTableId(userQueryId)),
@@ -156,8 +154,11 @@ void UserQueryProcessList::submit() {
156154
if (col.colType.sqlType == "TIMESTAMP") createTable += " NULL";
157155
}
158156
createTable += ')';
157+
159158
LOGS(_log, LOG_LVL_DEBUG, "creating result table: " << createTable);
160-
if (!_resultDbConn->runQuery(createTable, errObj)) {
159+
auto const czarConfig = cconfig::CzarConfig::instance();
160+
auto const resultDbConn = sql::SqlConnectionFactory::make(czarConfig->getMySqlResultConfig());
161+
if (!resultDbConn->runQuery(createTable, errObj)) {
161162
LOGS(_log, LOG_LVL_ERROR, "failed to create result table: " << errObj.errMsg());
162163
std::string message = "Internal failure, failed to create result table: " + errObj.errMsg();
163164
_messageStore->addMessage(-1, "PROCESSLIST", 1051, message, MessageSeverity::MSG_ERROR);
@@ -172,7 +173,7 @@ void UserQueryProcessList::submit() {
172173
}
173174

174175
// copy stuff over to result table
175-
sql::SqlBulkInsert bulkInsert(_resultDbConn, _resultTableName, resColumns);
176+
sql::SqlBulkInsert bulkInsert(resultDbConn.get(), _resultTableName, resColumns);
176177
for (auto& row : *results) {
177178
std::vector<std::string> values;
178179
for (unsigned i = 0; i != row.size(); ++i) {
@@ -191,7 +192,7 @@ void UserQueryProcessList::submit() {
191192
values.push_back(std::string(ptr, ptr + len));
192193
} else {
193194
// everything else should be quoted
194-
values.push_back("'" + _resultDbConn->escapeString(std::string(ptr, ptr + len)) + "'");
195+
values.push_back("'" + resultDbConn->escapeString(std::string(ptr, ptr + len)) + "'");
195196
}
196197
}
197198

0 commit comments

Comments
 (0)