Skip to content

Commit d567ab3

Browse files
committed
Rebased on main and added file removal flag.
1 parent 486bb61 commit d567ab3

File tree

3 files changed

+25
-17
lines changed

3 files changed

+25
-17
lines changed

src/wbase/FileChannelShared.cc

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ FileChannelShared::~FileChannelShared() {
279279
// dead it means there was a problem to process a query or send back a response
280280
// to Czar. In either case, the file would be useless and it has to be deleted
281281
// in order to avoid leaving unclaimed result files within the results folder.
282-
if (isDead()) {
282+
if (_issueRequiresFileRemoval || isDead()) {
283283
_removeFile(lock_guard<mutex>(_tMtx));
284284
}
285285
if (_sendChannel != nullptr) {
@@ -329,6 +329,7 @@ bool FileChannelShared::buildAndTransmitError(util::MultiError& multiErr, shared
329329

330330
bool FileChannelShared::buildAndTransmitResult(MYSQL_RES* mResult, shared_ptr<Task> const& task,
331331
util::MultiError& multiErr, atomic<bool>& cancelled) {
332+
LOGS(_log, LOG_LVL_WARN, "&&& FileChannelShared::buildAndTransmitResult start");
332333
// Operation stats. Note that "buffer fill time" included the amount
333334
// of time needed to write the result set to disk.
334335
util::Timer transmitT;
@@ -348,22 +349,16 @@ bool FileChannelShared::buildAndTransmitResult(MYSQL_RES* mResult, shared_ptr<Ta
348349

349350
// &&& Arena may not really be needed.
350351
std::unique_ptr<google::protobuf::Arena> protobufArena = make_unique<google::protobuf::Arena>();
351-
proto::ResponseData* responseData = 0;
352+
proto::ResponseData* responseData = nullptr;
352353

353354
while (hasMoreRows && !cancelled) {
354-
// This lock is to protect the stream from having other Tasks mess with it
355-
// while data is loading.
356-
lock_guard<mutex> const tMtxLockA(_tMtx);
357-
358355
util::Timer bufferFillT;
359356
bufferFillT.start();
360357

361358
// Transfer as many rows as it's allowed by limitations of
362359
// the Google Protobuf into the output file.
363360
int bytes = 0;
364361
int rows = 0;
365-
//&&& hasMoreRows = _writeToFile(tMtxLockA, task, mResult, bytes, rows, multiErr);
366-
367362
hasMoreRows = _writeToFile(responseData, protobufArena, task, mResult, bytes, rows, multiErr);
368363
bytesTransmitted += bytes;
369364
rowsTransmitted += rows;
@@ -394,15 +389,16 @@ bool FileChannelShared::buildAndTransmitResult(MYSQL_RES* mResult, shared_ptr<Ta
394389
// the current request (note that certain classes of requests may require
395390
// more than one task for processing).
396391
if (!hasMoreRows && transmitTaskLast()) {
392+
LOGS(_log, LOG_LVL_WARN, "&&& FileChannelShared::buildAndTransmitResult e");
397393
lock_guard<mutex> const tMtxLock(_tMtx);
394+
LOGS(_log, LOG_LVL_WARN, "&&& FileChannelShared::buildAndTransmitResult e1");
398395

399396
// Make sure the file is sync to disk before notifying Czar.
400397
_file.flush();
401398
_file.close();
402399

403400
// Only the last ("summary") message, w/o any rows, is sent to the Czar to notify
404401
// it about the completion of the request.
405-
//&&&if (!_sendResponse(tMtxLockA, task, cancelled, multiErr)) {
406402
if (!_sendResponse(tMtxLock, protobufArena, task, cancelled, multiErr)) {
407403
LOGS(_log, LOG_LVL_ERROR, "Could not transmit the request completion message to Czar.");
408404
erred = true;
@@ -428,9 +424,14 @@ bool FileChannelShared::buildAndTransmitResult(MYSQL_RES* mResult, shared_ptr<Ta
428424
// successfully processing the query and writing all results into the file.
429425
// The file is not going to be used by Czar in either of these scenarios.
430426
if (cancelled || erred || isDead()) {
427+
/* &&&
431428
//&&& it may be better to set a flag and call _removeFile in the destructor.
432429
lock_guard<mutex> const tMtxLockA(_tMtx);
433430
_removeFile(tMtxLockA);
431+
*/
432+
// Set a flag to delete the file in the destructor. That should prevent any
433+
// possible race conditions with other threads expecting the file to exist.
434+
_issueRequiresFileRemoval = true;
434435
}
435436
return erred;
436437
}
@@ -445,19 +446,13 @@ bool FileChannelShared::_writeToFile(proto::ResponseData* responseData,
445446
shared_ptr<Task> const& task, MYSQL_RES* mResult, int& bytes, int& rows,
446447
util::MultiError& multiErr) {
447448
// Transfer rows from a result set into the response data object.
449+
LOGS(_log, LOG_LVL_WARN, "&&& _writeToFile start");
448450
if (nullptr == responseData) {
449451
responseData = google::protobuf::Arena::CreateMessage<proto::ResponseData>(protobufArena.get());
450452
} else {
451453
responseData->clear_row();
452454
}
453455
size_t tSize = 0;
454-
/* &&&
455-
LOGS(_log, LOG_LVL_TRACE, __func__ << " _fillRows " << task->getIdStr() << " start");
456-
bool const hasMoreRows = _fillRows(tMtxLock, mResult, rows, tSize);
457-
LOGS(_log, LOG_LVL_TRACE, __func__ << " _fillRows " << task->getIdStr() << " end");
458-
_responseData->set_rowcount(rows);
459-
_responseData->set_transmitsize(tSize);
460-
*/
461456
bool const hasMoreRows = _fillRows(responseData, mResult, rows, tSize);
462457
responseData->set_rowcount(rows);
463458
responseData->set_transmitsize(tSize);
@@ -468,8 +463,10 @@ bool FileChannelShared::_writeToFile(proto::ResponseData* responseData,
468463
responseData->SerializeToString(&msg);
469464
bytes = msg.size();
470465

471-
//&&&LOGS(_log, LOG_LVL_TRACE, __func__ << " file write " << task->getIdStr() << " start");
466+
LOGS(_log, LOG_LVL_TRACE, __func__ << " file write " << task->getIdStr() << " start");
467+
LOGS(_log, LOG_LVL_WARN, "&&& _writeToFile d");
472468
lock_guard<mutex> const tMtxLock(_tMtx);
469+
LOGS(_log, LOG_LVL_WARN, "&&& _writeToFile d1");
473470
// Create the file if not open.
474471
if (!_file.is_open()) {
475472
_fileName = task->resultFilePath();
@@ -491,6 +488,7 @@ bool FileChannelShared::_writeToFile(proto::ResponseData* responseData,
491488
throw runtime_error("FileChannelShared::" + string(__func__) + " failed to write " +
492489
to_string(msg.size()) + " bytes into the file '" + _fileName + "'.");
493490
}
491+
LOGS(_log, LOG_LVL_WARN, "&&& _writeToFile end");
494492
return hasMoreRows;
495493
}
496494

src/wbase/FileChannelShared.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,10 @@ class FileChannelShared {
274274

275275
uint32_t _rowcount = 0; ///< The total numnber of rows in all result sets of a query.
276276
uint64_t _transmitsize = 0; ///< The total amount of data (bytes) in all result sets of a query.
277+
278+
/// This should be set to true if there were any issues that invalidate the file, such as errors
279+
/// or cancellation.
280+
std::atomic<bool> _issueRequiresFileRemoval{false};
277281
};
278282

279283
} // namespace lsst::qserv::wbase

src/wdb/QueryRunner.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ size_t QueryRunner::_getDesiredLimit() {
147147
util::TimerHistogram memWaitHisto("memWait Hist", {1, 5, 10, 20, 40});
148148

149149
bool QueryRunner::runQuery() {
150+
LOGS(_log, LOG_LVL_WARN, "&&& runQuery start");
150151
util::InstanceCount ic(to_string(_task->getQueryId()) + "_rq_LDB"); // LockupDB
151152
util::HoldTrack::Mark runQueryMarkA(ERR_LOC, "runQuery " + to_string(_task->getQueryId()));
152153
QSERV_LOGCONTEXT_QUERY_JOB(_task->getQueryId(), _task->getJobId());
@@ -254,6 +255,7 @@ class ChunkResourceRequest {
254255
};
255256

256257
bool QueryRunner::_dispatchChannel() {
258+
LOGS(_log, LOG_LVL_WARN, "&&& dispatch start");
257259
bool erred = false;
258260
bool needToFreeRes = false; // set to true once there are results to be freed.
259261
// Collect the result in _transmitData. When a reasonable amount of data has been collected,
@@ -298,7 +300,9 @@ bool QueryRunner::_dispatchChannel() {
298300
if (sendChan == nullptr) {
299301
throw util::Bug(ERR_LOC, "QueryRunner::_dispatchChannel() sendChan==null");
300302
}
303+
LOGS(_log, LOG_LVL_WARN, "&&& dispatch calling buildAndTransmitResult a");
301304
erred = sendChan->buildAndTransmitResult(res, _task, _multiError, _cancelled);
305+
LOGS(_log, LOG_LVL_WARN, "&&& dispatch calling buildAndTransmitResult a1");
302306
}
303307
}
304308
} catch (sql::SqlErrorObject const& e) {
@@ -321,10 +325,12 @@ bool QueryRunner::_dispatchChannel() {
321325
erred = true;
322326
// Send results. This needs to happen after the error check.
323327
// If any errors were found, send an error back.
328+
LOGS(_log, LOG_LVL_WARN, "&&& dispatch calling buildAndTransmiError b");
324329
if (!_task->getSendChannel()->buildAndTransmitError(_multiError, _task, _cancelled)) {
325330
LOGS(_log, LOG_LVL_WARN,
326331
" Could not report error to czar as sendChannel not accepting msgs." << _task->getIdStr());
327332
}
333+
LOGS(_log, LOG_LVL_WARN, "&&& dispatch calling buildAndTransmiError b1");
328334
}
329335
return !erred;
330336
}

0 commit comments

Comments
 (0)