@@ -252,6 +252,37 @@ Status ServiceBasedActorInfoAccessor::AsyncCreateActor(
252252 return Status::OK ();
253253}
254254
255+ Status ServiceBasedActorInfoAccessor::AsyncSubscribeAll (
256+ const SubscribeCallback<ActorID, rpc::ActorTableData> &subscribe,
257+ const StatusCallback &done) {
258+ RAY_CHECK (subscribe != nullptr );
259+ fetch_all_data_operation_ = [this , subscribe](const StatusCallback &done) {
260+ auto callback = [subscribe, done](
261+ const Status &status,
262+ const std::vector<rpc::ActorTableData> &actor_info_list) {
263+ for (auto &actor_info : actor_info_list) {
264+ subscribe (ActorID::FromBinary (actor_info.actor_id ()), actor_info);
265+ }
266+ if (done) {
267+ done (status);
268+ }
269+ };
270+ RAY_CHECK_OK (AsyncGetAll (callback));
271+ };
272+
273+ subscribe_all_operation_ = [this , subscribe](const StatusCallback &done) {
274+ auto on_subscribe = [subscribe](const std::string &id, const std::string &data) {
275+ ActorTableData actor_data;
276+ actor_data.ParseFromString (data);
277+ subscribe (ActorID::FromBinary (actor_data.actor_id ()), actor_data);
278+ };
279+ return client_impl_->GetGcsPubSub ().SubscribeAll (ACTOR_CHANNEL, on_subscribe, done);
280+ };
281+
282+ return subscribe_all_operation_ (
283+ [this , done](const Status &status) { fetch_all_data_operation_ (done); });
284+ }
285+
255286Status ServiceBasedActorInfoAccessor::AsyncSubscribe (
256287 const ActorID &actor_id,
257288 const SubscribeCallback<ActorID, rpc::ActorTableData> &subscribe,
@@ -835,6 +866,189 @@ Status ServiceBasedNodeResourceInfoAccessor::AsyncGetAllResourceUsage(
835866 return Status::OK ();
836867}
837868
869+ ServiceBasedTaskInfoAccessor::ServiceBasedTaskInfoAccessor (
870+ ServiceBasedGcsClient *client_impl)
871+ : client_impl_(client_impl) {}
872+
873+ Status ServiceBasedTaskInfoAccessor::AsyncAdd (
874+ const std::shared_ptr<rpc::TaskTableData> &data_ptr, const StatusCallback &callback) {
875+ TaskID task_id = TaskID::FromBinary (data_ptr->task ().task_spec ().task_id ());
876+ JobID job_id = JobID::FromBinary (data_ptr->task ().task_spec ().job_id ());
877+ RAY_LOG (DEBUG) << " Adding task, task id = " << task_id << " , job id = " << job_id;
878+ rpc::AddTaskRequest request;
879+ request.mutable_task_data ()->CopyFrom (*data_ptr);
880+ client_impl_->GetGcsRpcClient ().AddTask (
881+ request,
882+ [task_id, job_id, callback](const Status &status, const rpc::AddTaskReply &reply) {
883+ if (callback) {
884+ callback (status);
885+ }
886+ RAY_LOG (DEBUG) << " Finished adding task, status = " << status
887+ << " , task id = " << task_id << " , job id = " << job_id;
888+ });
889+ return Status::OK ();
890+ }
891+
892+ Status ServiceBasedTaskInfoAccessor::AsyncGet (
893+ const TaskID &task_id, const OptionalItemCallback<rpc::TaskTableData> &callback) {
894+ RAY_LOG (DEBUG) << " Getting task, task id = " << task_id
895+ << " , job id = " << task_id.JobId ();
896+ rpc::GetTaskRequest request;
897+ request.set_task_id (task_id.Binary ());
898+ client_impl_->GetGcsRpcClient ().GetTask (
899+ request, [task_id, callback](const Status &status, const rpc::GetTaskReply &reply) {
900+ if (reply.has_task_data ()) {
901+ callback (status, reply.task_data ());
902+ } else {
903+ callback (status, boost::none);
904+ }
905+ RAY_LOG (DEBUG) << " Finished getting task, status = " << status
906+ << " , task id = " << task_id << " , job id = " << task_id.JobId ();
907+ });
908+ return Status::OK ();
909+ }
910+
911+ Status ServiceBasedTaskInfoAccessor::AsyncAddTaskLease (
912+ const std::shared_ptr<rpc::TaskLeaseData> &data_ptr, const StatusCallback &callback) {
913+ TaskID task_id = TaskID::FromBinary (data_ptr->task_id ());
914+ NodeID node_id = NodeID::FromBinary (data_ptr->node_manager_id ());
915+ RAY_LOG (DEBUG) << " Adding task lease, task id = " << task_id
916+ << " , node id = " << node_id << " , job id = " << task_id.JobId ();
917+ rpc::AddTaskLeaseRequest request;
918+ request.mutable_task_lease_data ()->CopyFrom (*data_ptr);
919+ client_impl_->GetGcsRpcClient ().AddTaskLease (
920+ request, [task_id, node_id, callback](const Status &status,
921+ const rpc::AddTaskLeaseReply &reply) {
922+ if (callback) {
923+ callback (status);
924+ }
925+ RAY_LOG (DEBUG) << " Finished adding task lease, status = " << status
926+ << " , task id = " << task_id << " , node id = " << node_id
927+ << " , job id = " << task_id.JobId ();
928+ });
929+ return Status::OK ();
930+ }
931+
932+ Status ServiceBasedTaskInfoAccessor::AsyncGetTaskLease (
933+ const TaskID &task_id, const OptionalItemCallback<rpc::TaskLeaseData> &callback) {
934+ RAY_LOG (DEBUG) << " Getting task lease, task id = " << task_id
935+ << " , job id = " << task_id.JobId ();
936+ rpc::GetTaskLeaseRequest request;
937+ request.set_task_id (task_id.Binary ());
938+ client_impl_->GetGcsRpcClient ().GetTaskLease (
939+ request,
940+ [task_id, callback](const Status &status, const rpc::GetTaskLeaseReply &reply) {
941+ if (reply.has_task_lease_data ()) {
942+ callback (status, reply.task_lease_data ());
943+ } else {
944+ callback (status, boost::none);
945+ }
946+ RAY_LOG (DEBUG) << " Finished getting task lease, status = " << status
947+ << " , task id = " << task_id << " , job id = " << task_id.JobId ();
948+ });
949+ return Status::OK ();
950+ }
951+
952+ Status ServiceBasedTaskInfoAccessor::AsyncSubscribeTaskLease (
953+ const TaskID &task_id,
954+ const SubscribeCallback<TaskID, boost::optional<rpc::TaskLeaseData>> &subscribe,
955+ const StatusCallback &done) {
956+ RAY_CHECK (subscribe != nullptr )
957+ << " Failed to subscribe task lease, task id = " << task_id
958+ << " , job id = " << task_id.JobId ();
959+
960+ auto fetch_data_operation = [this , task_id,
961+ subscribe](const StatusCallback &fetch_done) {
962+ auto callback = [task_id, subscribe, fetch_done](
963+ const Status &status,
964+ const boost::optional<rpc::TaskLeaseData> &result) {
965+ subscribe (task_id, result);
966+ if (fetch_done) {
967+ fetch_done (status);
968+ }
969+ };
970+ RAY_CHECK_OK (AsyncGetTaskLease (task_id, callback));
971+ };
972+
973+ auto subscribe_operation = [this , task_id,
974+ subscribe](const StatusCallback &subscribe_done) {
975+ auto on_subscribe = [task_id, subscribe](const std::string &id,
976+ const std::string &data) {
977+ TaskLeaseData task_lease_data;
978+ task_lease_data.ParseFromString (data);
979+ subscribe (task_id, task_lease_data);
980+ };
981+ return client_impl_->GetGcsPubSub ().Subscribe (TASK_LEASE_CHANNEL, task_id.Hex (),
982+ on_subscribe, subscribe_done);
983+ };
984+
985+ subscribe_task_lease_operations_[task_id] = subscribe_operation;
986+ fetch_task_lease_data_operations_[task_id] = fetch_data_operation;
987+ return subscribe_operation (
988+ [fetch_data_operation, done](const Status &status) { fetch_data_operation (done); });
989+ }
990+
991+ Status ServiceBasedTaskInfoAccessor::AsyncUnsubscribeTaskLease (const TaskID &task_id) {
992+ RAY_LOG (DEBUG) << " Unsubscribing task lease, task id = " << task_id
993+ << " , job id = " << task_id.JobId ();
994+ auto status =
995+ client_impl_->GetGcsPubSub ().Unsubscribe (TASK_LEASE_CHANNEL, task_id.Hex ());
996+ subscribe_task_lease_operations_.erase (task_id);
997+ fetch_task_lease_data_operations_.erase (task_id);
998+ RAY_LOG (DEBUG) << " Finished unsubscribing task lease, task id = " << task_id
999+ << " , job id = " << task_id.JobId ();
1000+ return status;
1001+ }
1002+
1003+ Status ServiceBasedTaskInfoAccessor::AttemptTaskReconstruction (
1004+ const std::shared_ptr<rpc::TaskReconstructionData> &data_ptr,
1005+ const StatusCallback &callback) {
1006+ auto num_reconstructions = data_ptr->num_reconstructions ();
1007+ NodeID node_id = NodeID::FromBinary (data_ptr->node_manager_id ());
1008+ TaskID task_id = TaskID::FromBinary (data_ptr->task_id ());
1009+ RAY_LOG (DEBUG) << " Reconstructing task, reconstructions num = " << num_reconstructions
1010+ << " , node id = " << node_id << " , task id = " << task_id
1011+ << " , job id = " << task_id.JobId ();
1012+ rpc::AttemptTaskReconstructionRequest request;
1013+ request.mutable_task_reconstruction ()->CopyFrom (*data_ptr);
1014+ client_impl_->GetGcsRpcClient ().AttemptTaskReconstruction (
1015+ request,
1016+ [num_reconstructions, node_id, task_id, callback](
1017+ const Status &status, const rpc::AttemptTaskReconstructionReply &reply) {
1018+ if (callback) {
1019+ callback (status);
1020+ }
1021+ RAY_LOG (DEBUG) << " Finished reconstructing task, status = " << status
1022+ << " , reconstructions num = " << num_reconstructions
1023+ << " , node id = " << node_id << " , task id = " << task_id
1024+ << " , job id = " << task_id.JobId ();
1025+ });
1026+ return Status::OK ();
1027+ }
1028+
1029+ void ServiceBasedTaskInfoAccessor::AsyncResubscribe (bool is_pubsub_server_restarted) {
1030+ RAY_LOG (DEBUG) << " Reestablishing subscription for task info." ;
1031+ // If only the GCS sever has restarted, we only need to fetch data from the GCS server.
1032+ // If the pub-sub server has also restarted, we need to resubscribe to the pub-sub
1033+ // server first, then fetch data from the GCS server.
1034+ if (is_pubsub_server_restarted) {
1035+ for (auto &item : subscribe_task_lease_operations_) {
1036+ auto &task_id = item.first ;
1037+ RAY_CHECK_OK (item.second ([this , task_id](const Status &status) {
1038+ fetch_task_lease_data_operations_[task_id](nullptr );
1039+ }));
1040+ }
1041+ } else {
1042+ for (auto &item : fetch_task_lease_data_operations_) {
1043+ item.second (nullptr );
1044+ }
1045+ }
1046+ }
1047+
1048+ bool ServiceBasedTaskInfoAccessor::IsTaskLeaseUnsubscribed (const TaskID &task_id) {
1049+ return client_impl_->GetGcsPubSub ().IsUnsubscribed (TASK_LEASE_CHANNEL, task_id.Hex ());
1050+ }
1051+
8381052ServiceBasedObjectInfoAccessor::ServiceBasedObjectInfoAccessor (
8391053 ServiceBasedGcsClient *client_impl)
8401054 : client_impl_(client_impl) {}
0 commit comments