diff --git a/src/wbase/FileChannelShared.cc b/src/wbase/FileChannelShared.cc index 42a8814822..5e943e7d25 100644 --- a/src/wbase/FileChannelShared.cc +++ b/src/wbase/FileChannelShared.cc @@ -267,11 +267,7 @@ shared_ptr FileChannelShared::create(shared_ptr const& sendChannel, qmeta::CzarId czarId, string const& workerId) - : _sendChannel(sendChannel), - _czarId(czarId), - _workerId(workerId), - _protobufArena(make_unique()), - _scsId(scsSeqId++) { + : _sendChannel(sendChannel), _czarId(czarId), _workerId(workerId), _scsId(scsSeqId++) { LOGS(_log, LOG_LVL_DEBUG, "FileChannelShared created"); if (_sendChannel == nullptr) { throw util::Bug(ERR_LOC, "FileChannelShared constructor given nullptr"); @@ -283,7 +279,7 @@ FileChannelShared::~FileChannelShared() { // dead it means there was a problem to process a query or send back a response // to Czar. In either case, the file would be useless and it has to be deleted // in order to avoid leaving unclaimed result files within the results folder. - if (isDead()) { + if (_issueRequiresFileRemoval || isDead()) { _removeFile(lock_guard(_tMtx)); } if (_sendChannel != nullptr) { @@ -321,8 +317,9 @@ string FileChannelShared::makeIdStr(int qId, int jId) { bool FileChannelShared::buildAndTransmitError(util::MultiError& multiErr, shared_ptr const& task, bool cancelled) { + std::unique_ptr protobufArena = make_unique(); lock_guard const tMtxLock(_tMtx); - if (!_sendResponse(tMtxLock, task, cancelled, multiErr)) { + if (!_sendResponse(tMtxLock, protobufArena, task, cancelled, multiErr)) { LOGS(_log, LOG_LVL_ERROR, "Could not transmit the error message to Czar."); return false; } @@ -348,11 +345,10 @@ bool FileChannelShared::buildAndTransmitResult(MYSQL_RES* mResult, shared_ptr const tMtxLockA(_tMtx); + std::unique_ptr protobufArena = make_unique(); + proto::ResponseData* responseData = nullptr; + while (hasMoreRows && !cancelled) { util::Timer bufferFillT; bufferFillT.start(); @@ -360,7 +356,7 @@ bool FileChannelShared::buildAndTransmitResult(MYSQL_RES* mResult, shared_ptr const tMtxLock(_tMtx); + // Make sure the file is sync to disk before notifying Czar. _file.flush(); _file.close(); // Only the last ("summary") message, w/o any rows, is sent to the Czar to notify // it about the completion of the request. - if (!_sendResponse(tMtxLockA, task, cancelled, multiErr)) { + if (!_sendResponse(tMtxLock, protobufArena, task, cancelled, multiErr)) { LOGS(_log, LOG_LVL_ERROR, "Could not transmit the request completion message to Czar."); erred = true; break; @@ -421,8 +419,9 @@ bool FileChannelShared::buildAndTransmitResult(MYSQL_RES* mResult, shared_ptr const tMtxLockA(_tMtx); - _removeFile(tMtxLockA); + // Set a flag to delete the file in the destructor. That should prevent any + // possible race conditions with other threads expecting the file to exist. + _issueRequiresFileRemoval = true; } return erred; } @@ -432,28 +431,29 @@ bool FileChannelShared::_kill(lock_guard const& streamMutexLock, string c return _sendChannel->kill(note); } -bool FileChannelShared::_writeToFile(lock_guard const& tMtxLock, shared_ptr const& task, - MYSQL_RES* mResult, int& bytes, int& rows, util::MultiError& multiErr) { +bool FileChannelShared::_writeToFile(proto::ResponseData* responseData, + unique_ptr const& protobufArena, + shared_ptr const& task, MYSQL_RES* mResult, int& bytes, int& rows, + util::MultiError& multiErr) { // Transfer rows from a result set into the response data object. - if (nullptr == _responseData) { - _responseData = google::protobuf::Arena::CreateMessage(_protobufArena.get()); + if (nullptr == responseData) { + responseData = google::protobuf::Arena::CreateMessage(protobufArena.get()); } else { - _responseData->clear_row(); + responseData->clear_row(); } size_t tSize = 0; - LOGS(_log, LOG_LVL_TRACE, __func__ << " _fillRows " << task->getIdStr() << " start"); - bool const hasMoreRows = _fillRows(tMtxLock, mResult, rows, tSize); - LOGS(_log, LOG_LVL_TRACE, __func__ << " _fillRows " << task->getIdStr() << " end"); - _responseData->set_rowcount(rows); - _responseData->set_transmitsize(tSize); + bool const hasMoreRows = _fillRows(responseData, mResult, rows, tSize); + responseData->set_rowcount(rows); + responseData->set_transmitsize(tSize); // Serialize the content of the data buffer into the Protobuf data message // that will be written into the output file. std::string msg; - _responseData->SerializeToString(&msg); + responseData->SerializeToString(&msg); bytes = msg.size(); LOGS(_log, LOG_LVL_TRACE, __func__ << " file write " << task->getIdStr() << " start"); + lock_guard const tMtxLock(_tMtx); // Create the file if not open. if (!_file.is_open()) { _fileName = task->resultFilePath(); @@ -478,7 +478,7 @@ bool FileChannelShared::_writeToFile(lock_guard const& tMtxLock, shared_p return hasMoreRows; } -bool FileChannelShared::_fillRows(lock_guard const& tMtxLock, MYSQL_RES* mResult, int& rows, +bool FileChannelShared::_fillRows(proto::ResponseData* responseData, MYSQL_RES* mResult, int& rows, size_t& tSize) { int const numFields = mysql_num_fields(mResult); unsigned int szLimit = min(proto::ProtoHeaderWrap::PROTOBUFFER_DESIRED_LIMIT, @@ -488,7 +488,7 @@ bool FileChannelShared::_fillRows(lock_guard const& tMtxLock, MYSQL_RES* MYSQL_ROW row; while ((row = mysql_fetch_row(mResult))) { auto lengths = mysql_fetch_lengths(mResult); - proto::RowBundle* rawRow = _responseData->add_row(); + proto::RowBundle* rawRow = responseData->add_row(); for (int i = 0; i < numFields; ++i) { if (row[i]) { rawRow->add_column(row[i], lengths[i]); @@ -521,8 +521,10 @@ void FileChannelShared::_removeFile(lock_guard const& tMtxLock) { } } -bool FileChannelShared::_sendResponse(lock_guard const& tMtxLock, shared_ptr const& task, - bool cancelled, util::MultiError const& multiErr) { +bool FileChannelShared::_sendResponse(lock_guard const& tMtxLock, + std::unique_ptr const& protobufArena, + shared_ptr const& task, bool cancelled, + util::MultiError const& multiErr) { auto const queryId = task->getQueryId(); auto const jobId = task->getJobId(); auto const idStr(makeIdStr(queryId, jobId)); @@ -534,10 +536,10 @@ bool FileChannelShared::_sendResponse(lock_guard const& tMtxLock, shared_ // This will deallocate any memory managed by the Google Protobuf Arena // to avoid unnecessary memory utilization by the application. LOGS(_log, LOG_LVL_DEBUG, - __func__ << ": Google Protobuf Arena, 1:SpaceUsed=" << _protobufArena->SpaceUsed()); - _protobufArena->Reset(); + __func__ << ": Google Protobuf Arena, 1:SpaceUsed=" << protobufArena->SpaceUsed()); + protobufArena->Reset(); LOGS(_log, LOG_LVL_DEBUG, - __func__ << ": Google Protobuf Arena, 2:SpaceUsed=" << _protobufArena->SpaceUsed()); + __func__ << ": Google Protobuf Arena, 2:SpaceUsed=" << protobufArena->SpaceUsed()); QSERV_LOGCONTEXT_QUERY_JOB(queryId, jobId); LOGS(_log, LOG_LVL_DEBUG, __func__); diff --git a/src/wbase/FileChannelShared.h b/src/wbase/FileChannelShared.h index 0febe6f460..a26314e82f 100644 --- a/src/wbase/FileChannelShared.h +++ b/src/wbase/FileChannelShared.h @@ -180,9 +180,10 @@ class FileChannelShared { * @note The method may not extract all rows if the amount of data found * in the result set exceeded the maximum size allowed by the Google Protobuf * implementation. Also, the iterative approach to the data extraction allows - * the driving code to be interrupted should the correponding query be cancelled + * the driving code to be interrupted should the corresponding query be cancelled * during the lengthy data processing phase. - * @param tMtxLock - a lock on the mutex tMtx + * @param responseData - proto buffer to hold the response being constructed. + * @param protobufArena - proto buffer memory management control. * @param task - a task that produced the result set * @param mResult - MySQL result to be used as a source * @param bytes - the number of bytes in the result message recorded into the file @@ -192,19 +193,23 @@ class FileChannelShared { * @throws std::runtime_error for problems encountered when attemting to create the file * or write into the file. */ - bool _writeToFile(std::lock_guard const& tMtxLock, std::shared_ptr const& task, - MYSQL_RES* mResult, int& bytes, int& rows, util::MultiError& multiErr); + bool _writeToFile(proto::ResponseData* responseData, + std::unique_ptr const& protobufArena, + std::shared_ptr const& task, MYSQL_RES* mResult, int& bytes, int& rows, + util::MultiError& multiErr); /** * Extract as many rows as allowed by the Google Protobuf implementation from * from the input result set into the output result object. - * @param tMtxLock - a lock on the mutex tMtx + * @param responseData - proto buffer to hold the response being constructed. + * @param protobufArena - proto buffer memory management control. * @param mResult - MySQL result to be used as a source * @param rows - the number of rows extracted from the result set * @param tSize - the approximate amount of data extracted from the result set * @return 'true' if there are more rows left in the result set. */ - bool _fillRows(std::lock_guard const& tMtxLock, MYSQL_RES* mResult, int& rows, size_t& tSize); + static bool _fillRows(proto::ResponseData* responseData, MYSQL_RES* mResult, int& rows, size_t& tSize); + /** * Unconditionaly close and remove (potentially - the partially written) file. * This method gets called in case of any failure detected while processing @@ -225,8 +230,9 @@ class FileChannelShared { * @param multiErr - a collector of any errors that were captured during result set processing * @return 'true' if the operation was successfull */ - bool _sendResponse(std::lock_guard const& tMtxLock, std::shared_ptr const& task, - bool cancelled, util::MultiError const& multiErr); + bool _sendResponse(std::lock_guard const& tMtxLock, + std::unique_ptr const& protobufArena, + std::shared_ptr const& task, bool cancelled, util::MultiError const& multiErr); mutable std::mutex _tMtx; ///< Protects data recording and Czar notification @@ -234,10 +240,6 @@ class FileChannelShared { qmeta::CzarId const _czarId; ///< id of the czar that requested this task(s). std::string const _workerId; ///< The unique identifier of the worker. - // Allocatons/deletion of the data messages are managed by Google Protobuf Arena. - std::unique_ptr _protobufArena; - proto::ResponseData* _responseData = 0; - uint64_t const _scsId; ///< id number for this FileChannelShared /// streamMutex is used to protect _lastCount and messages that are sent @@ -272,6 +274,10 @@ class FileChannelShared { uint32_t _rowcount = 0; ///< The total numnber of rows in all result sets of a query. uint64_t _transmitsize = 0; ///< The total amount of data (bytes) in all result sets of a query. + + /// This should be set to true if there were any issues that invalidate the file, such as errors + /// or cancellation. + std::atomic _issueRequiresFileRemoval{false}; }; } // namespace lsst::qserv::wbase