Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ dependencies = [
"newscale@git+https://github.com/AllenNeuralDynamics/python-newscale@axes-on-target",
"aind-auto-train@git+https://github.com/AllenNeuralDynamics/aind-foraging-behavior-bonsai-automatic-training.git@main",
"aind-slims-api@git+https://github.com/AllenNeuralDynamics/aind-slims-api@main",
"aind-dynamic-foraging-models@git+https://github.com/AllenNeuralDynamics/aind-dynamic-foraging-models@main",
"aind-dynamic-foraging-models >= 0.12.2",
"aind-dynamic-foraging-basic-analysis >= 0.3.34",
"aind-behavior-services >=0.8, <0.9",
"pynwb >=2, <3",
"requests >=2, <3",
Expand Down
159 changes: 12 additions & 147 deletions src/foraging_gui/MyFunctions.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from serial.tools.list_ports import comports as list_comports

from foraging_gui.reward_schedules.uncoupled_block import UncoupledBlocks
from aind_dynamic_foraging_basic_analysis import compute_foraging_efficiency

if PLATFORM == "win32":
from newscale.usbxpress import USBXpressDevice, USBXpressLib
Expand Down Expand Up @@ -928,42 +929,7 @@ def _GetBasic(self):
# foraging efficiency
Len = np.shape(self.B_RewardedHistory)[1]
if Len > 0:
reward_rate = np.sum(self.B_RewardedHistory) / Len
p_Ls = self.B_RewardProHistory[0][:Len]
p_Rs = self.B_RewardProHistory[1][:Len]
random_number_L = np.concatenate(
self.B_CurrentRewardProbRandomNumber, axis=0
)[0::2][:Len]
random_number_R = np.concatenate(
self.B_CurrentRewardProbRandomNumber, axis=0
)[1::2][:Len]
if self.TP_Task in ["Coupled Baiting", "Uncoupled Baiting"]:
self.B_for_eff_optimal, self.B_for_eff_optimal_random_seed = (
self.foraging_eff(
reward_rate,
p_Ls,
p_Rs,
random_number_L,
random_number_R,
)
)
elif self.TP_Task in [
"Coupled Without Baiting",
"Uncoupled Without Baiting",
]:
self.B_for_eff_optimal, self.B_for_eff_optimal_random_seed = (
self.foraging_eff_no_baiting(
reward_rate,
p_Ls,
p_Rs,
random_number_L,
random_number_R,
)
)
else:
self.B_for_eff_optimal = np.nan
self.B_for_eff_optimal_random_seed = np.nan
"""Some complex calculations can be separated from _GenerateATrial using different threads"""
self.B_for_eff_optimal, self.B_for_eff_optimal_random_seed = self.foraging_eff()

def _process_values(
self, values, auto_water_trial, multiplier_values, rewarded_history
Expand All @@ -984,118 +950,17 @@ def _process_values(
logging.error(str(e))
return BS_AutoWater, BS_EarnedReward, BS_AutoWater_N, BS_EarnedReward_N

def foraging_eff_no_baiting(
self,
reward_rate,
p_Ls,
p_Rs,
random_number_L=None,
random_number_R=None,
): # Calculate foraging efficiency (only for 2lp)
"""Calculating the foraging efficiency of no baiting tasks (Code is from Han)"""
# --- Optimal-aver (use optimal expectation as 100% efficiency) ---
for_eff_optimal = float(
reward_rate / np.nanmean(np.max([p_Ls, p_Rs], axis=0))
)

if random_number_L is None:
return for_eff_optimal, np.nan

# --- Optimal-actual (uses the actual random numbers by simulation)
reward_refills = np.vstack(
[p_Ls >= random_number_L, p_Rs >= random_number_R]
def foraging_eff(self):
"""Calculating the foraging efficiency"""
return compute_foraging_efficiency(
baited=self.TP_Task in ["Coupled Baiting", "Uncoupled Baiting"],
choice_history=np.where(self.B_AnimalResponseHistory == 2, np.nan, self.B_AnimalResponseHistory),
reward_history=self.B_RewardedHistory[0] | self.B_RewardedHistory[1],
p_reward=self.B_RewardProHistory[:, :-1],
random_number=[[arr[0] for arr in self.B_CurrentRewardProbRandomNumber],
[arr[1] for arr in self.B_CurrentRewardProbRandomNumber]],
autowater_offered=(self.B_AutoWaterTrial[0, :] == 1) | (self.B_AutoWaterTrial[1, :] == 1),
)
optimal_choices = np.argmax(
[p_Ls, p_Rs], axis=0
) # Greedy choice, assuming the agent knows the groundtruth
optimal_rewards = (
reward_refills[0][optimal_choices == 0].sum()
+ reward_refills[1][optimal_choices == 1].sum()
)
for_eff_optimal_random_seed = float(
reward_rate / (optimal_rewards / len(optimal_choices))
)

return for_eff_optimal, for_eff_optimal_random_seed

def foraging_eff(
self,
reward_rate,
p_Ls,
p_Rs,
random_number_L=None,
random_number_R=None,
): # Calculate foraging efficiency (only for 2lp)
"""Calculating the foraging efficiency of baiting tasks (Code is from Han)"""
# --- Optimal-aver (use optimal expectation as 100% efficiency) ---
p_stars = np.zeros_like(p_Ls)
for i, (p_L, p_R) in enumerate(zip(p_Ls, p_Rs)): # Sum over all ps
p_max = np.max([p_L, p_R])
p_min = np.min([p_L, p_R])
if p_min == 0 or p_max >= 1:
p_stars[i] = p_max
else:
m_star = np.floor(np.log(1 - p_max) / np.log(1 - p_min))
p_stars[i] = p_max + (
1 - (1 - p_min) ** (m_star + 1) - p_max**2
) / (m_star + 1)

for_eff_optimal = float(reward_rate / np.nanmean(p_stars))

if random_number_L is None:
return for_eff_optimal, np.nan

# --- Optimal-actual (uses the actual random numbers by simulation)
block_trans = np.where(np.diff(np.hstack([np.inf, p_Ls, np.inf])))[
0
].tolist()
reward_refills = [p_Ls >= random_number_L, p_Rs >= random_number_R]
reward_optimal_random_seed = 0

# Generate optimal choice pattern
for b_start, b_end in zip(block_trans[:-1], block_trans[1:]):
p_max = np.max([p_Ls[b_start], p_Rs[b_start]])
p_min = np.min([p_Ls[b_start], p_Rs[b_start]])
side_max = np.argmax([p_Ls[b_start], p_Rs[b_start]])

# Get optimal choice pattern and expected optimal rate
if p_min == 0 or p_max >= 1:
this_choice = np.array(
[1] * (b_end - b_start)
) # Greedy is obviously optimal
else:
m_star = np.floor(np.log(1 - p_max) / np.log(1 - p_min))
this_choice = np.array(
(
([1] * int(m_star) + [0])
* (1 + int((b_end - b_start) / (m_star + 1)))
)[: b_end - b_start]
)

# Do simulation, using optimal choice pattern and actual random numbers
reward_refill = np.vstack(
[
reward_refills[1 - side_max][b_start:b_end],
reward_refills[side_max][b_start:b_end],
]
).astype(
int
) # Max = 1, Min = 0
reward_remain = [0, 0]
for t in range(b_end - b_start):
reward_available = reward_remain | reward_refill[:, t]
reward_optimal_random_seed += reward_available[this_choice[t]]
reward_remain = reward_available.copy()
reward_remain[this_choice[t]] = 0

if reward_optimal_random_seed:
for_eff_optimal_random_seed = float(
reward_rate / (reward_optimal_random_seed / len(p_Ls))
)
else:
for_eff_optimal_random_seed = np.nan

return for_eff_optimal, for_eff_optimal_random_seed

def _LickSta(self, Trials=None):
"""Perform lick stats for the input trials"""
Expand Down