diff --git a/pyproject.toml b/pyproject.toml index 0a1309e47..5e9ff6095 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,7 +27,8 @@ dependencies = [ "newscale@git+https://github.com/AllenNeuralDynamics/python-newscale@axes-on-target", "aind-auto-train@git+https://github.com/AllenNeuralDynamics/aind-foraging-behavior-bonsai-automatic-training.git@main", "aind-slims-api@git+https://github.com/AllenNeuralDynamics/aind-slims-api@main", - "aind-dynamic-foraging-models@git+https://github.com/AllenNeuralDynamics/aind-dynamic-foraging-models@main", + "aind-dynamic-foraging-models >= 0.12.2", + "aind-dynamic-foraging-basic-analysis >= 0.3.34", "aind-behavior-services >=0.8, <0.9", "pynwb >=2, <3", "requests >=2, <3", diff --git a/src/foraging_gui/MyFunctions.py b/src/foraging_gui/MyFunctions.py index 504964879..9e92883b0 100644 --- a/src/foraging_gui/MyFunctions.py +++ b/src/foraging_gui/MyFunctions.py @@ -15,6 +15,7 @@ from serial.tools.list_ports import comports as list_comports from foraging_gui.reward_schedules.uncoupled_block import UncoupledBlocks +from aind_dynamic_foraging_basic_analysis import compute_foraging_efficiency if PLATFORM == "win32": from newscale.usbxpress import USBXpressDevice, USBXpressLib @@ -928,42 +929,7 @@ def _GetBasic(self): # foraging efficiency Len = np.shape(self.B_RewardedHistory)[1] if Len > 0: - reward_rate = np.sum(self.B_RewardedHistory) / Len - p_Ls = self.B_RewardProHistory[0][:Len] - p_Rs = self.B_RewardProHistory[1][:Len] - random_number_L = np.concatenate( - self.B_CurrentRewardProbRandomNumber, axis=0 - )[0::2][:Len] - random_number_R = np.concatenate( - self.B_CurrentRewardProbRandomNumber, axis=0 - )[1::2][:Len] - if self.TP_Task in ["Coupled Baiting", "Uncoupled Baiting"]: - self.B_for_eff_optimal, self.B_for_eff_optimal_random_seed = ( - self.foraging_eff( - reward_rate, - p_Ls, - p_Rs, - random_number_L, - random_number_R, - ) - ) - elif self.TP_Task in [ - "Coupled Without Baiting", - "Uncoupled Without Baiting", - ]: - self.B_for_eff_optimal, self.B_for_eff_optimal_random_seed = ( - self.foraging_eff_no_baiting( - reward_rate, - p_Ls, - p_Rs, - random_number_L, - random_number_R, - ) - ) - else: - self.B_for_eff_optimal = np.nan - self.B_for_eff_optimal_random_seed = np.nan - """Some complex calculations can be separated from _GenerateATrial using different threads""" + self.B_for_eff_optimal, self.B_for_eff_optimal_random_seed = self.foraging_eff() def _process_values( self, values, auto_water_trial, multiplier_values, rewarded_history @@ -984,118 +950,17 @@ def _process_values( logging.error(str(e)) return BS_AutoWater, BS_EarnedReward, BS_AutoWater_N, BS_EarnedReward_N - def foraging_eff_no_baiting( - self, - reward_rate, - p_Ls, - p_Rs, - random_number_L=None, - random_number_R=None, - ): # Calculate foraging efficiency (only for 2lp) - """Calculating the foraging efficiency of no baiting tasks (Code is from Han)""" - # --- Optimal-aver (use optimal expectation as 100% efficiency) --- - for_eff_optimal = float( - reward_rate / np.nanmean(np.max([p_Ls, p_Rs], axis=0)) - ) - - if random_number_L is None: - return for_eff_optimal, np.nan - - # --- Optimal-actual (uses the actual random numbers by simulation) - reward_refills = np.vstack( - [p_Ls >= random_number_L, p_Rs >= random_number_R] + def foraging_eff(self): + """Calculating the foraging efficiency""" + return compute_foraging_efficiency( + baited=self.TP_Task in ["Coupled Baiting", "Uncoupled Baiting"], + choice_history=np.where(self.B_AnimalResponseHistory == 2, np.nan, self.B_AnimalResponseHistory), + reward_history=self.B_RewardedHistory[0] | self.B_RewardedHistory[1], + p_reward=self.B_RewardProHistory[:, :-1], + random_number=[[arr[0] for arr in self.B_CurrentRewardProbRandomNumber], + [arr[1] for arr in self.B_CurrentRewardProbRandomNumber]], + autowater_offered=(self.B_AutoWaterTrial[0, :] == 1) | (self.B_AutoWaterTrial[1, :] == 1), ) - optimal_choices = np.argmax( - [p_Ls, p_Rs], axis=0 - ) # Greedy choice, assuming the agent knows the groundtruth - optimal_rewards = ( - reward_refills[0][optimal_choices == 0].sum() - + reward_refills[1][optimal_choices == 1].sum() - ) - for_eff_optimal_random_seed = float( - reward_rate / (optimal_rewards / len(optimal_choices)) - ) - - return for_eff_optimal, for_eff_optimal_random_seed - - def foraging_eff( - self, - reward_rate, - p_Ls, - p_Rs, - random_number_L=None, - random_number_R=None, - ): # Calculate foraging efficiency (only for 2lp) - """Calculating the foraging efficiency of baiting tasks (Code is from Han)""" - # --- Optimal-aver (use optimal expectation as 100% efficiency) --- - p_stars = np.zeros_like(p_Ls) - for i, (p_L, p_R) in enumerate(zip(p_Ls, p_Rs)): # Sum over all ps - p_max = np.max([p_L, p_R]) - p_min = np.min([p_L, p_R]) - if p_min == 0 or p_max >= 1: - p_stars[i] = p_max - else: - m_star = np.floor(np.log(1 - p_max) / np.log(1 - p_min)) - p_stars[i] = p_max + ( - 1 - (1 - p_min) ** (m_star + 1) - p_max**2 - ) / (m_star + 1) - - for_eff_optimal = float(reward_rate / np.nanmean(p_stars)) - - if random_number_L is None: - return for_eff_optimal, np.nan - - # --- Optimal-actual (uses the actual random numbers by simulation) - block_trans = np.where(np.diff(np.hstack([np.inf, p_Ls, np.inf])))[ - 0 - ].tolist() - reward_refills = [p_Ls >= random_number_L, p_Rs >= random_number_R] - reward_optimal_random_seed = 0 - - # Generate optimal choice pattern - for b_start, b_end in zip(block_trans[:-1], block_trans[1:]): - p_max = np.max([p_Ls[b_start], p_Rs[b_start]]) - p_min = np.min([p_Ls[b_start], p_Rs[b_start]]) - side_max = np.argmax([p_Ls[b_start], p_Rs[b_start]]) - - # Get optimal choice pattern and expected optimal rate - if p_min == 0 or p_max >= 1: - this_choice = np.array( - [1] * (b_end - b_start) - ) # Greedy is obviously optimal - else: - m_star = np.floor(np.log(1 - p_max) / np.log(1 - p_min)) - this_choice = np.array( - ( - ([1] * int(m_star) + [0]) - * (1 + int((b_end - b_start) / (m_star + 1))) - )[: b_end - b_start] - ) - - # Do simulation, using optimal choice pattern and actual random numbers - reward_refill = np.vstack( - [ - reward_refills[1 - side_max][b_start:b_end], - reward_refills[side_max][b_start:b_end], - ] - ).astype( - int - ) # Max = 1, Min = 0 - reward_remain = [0, 0] - for t in range(b_end - b_start): - reward_available = reward_remain | reward_refill[:, t] - reward_optimal_random_seed += reward_available[this_choice[t]] - reward_remain = reward_available.copy() - reward_remain[this_choice[t]] = 0 - - if reward_optimal_random_seed: - for_eff_optimal_random_seed = float( - reward_rate / (reward_optimal_random_seed / len(p_Ls)) - ) - else: - for_eff_optimal_random_seed = np.nan - - return for_eff_optimal, for_eff_optimal_random_seed def _LickSta(self, Trials=None): """Perform lick stats for the input trials"""