Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
234 changes: 131 additions & 103 deletions engine/pose_estimation/video2motion.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@

def load_video(video_path, pad_ratio, max_resolution):
frames = []
for i in range(2):
for i in range(1):
cap = cv2.VideoCapture(video_path)
assert cap.isOpened(), f"fail to load video file {video_path}"
fps = cap.get(cv2.CAP_PROP_FPS)
Expand Down Expand Up @@ -81,40 +81,40 @@ def load_video(video_path, pad_ratio, max_resolution):

def images_crop(images, bboxes, target_size, device=torch.device("cuda")):
# bboxes: cx, cy, w, h
crop_img_list = []
crop_annotations = []
i = 0
raw_img_size = max(images[0].shape[:2])
for img, bbox in zip(images, bboxes):

left = max(0, int(bbox[0] - bbox[2] // 2))
right = min(img.shape[1] - 1, int(bbox[0] + bbox[2] // 2))
top = max(0, int(bbox[1] - bbox[3] // 2))
bottom = min(img.shape[0] - 1, int(bbox[1] + bbox[3] // 2))
crop_img = img[top:bottom, left:right]
crop_img = torch.Tensor(crop_img).to(device).unsqueeze(0).permute(0, 3, 1, 2)

_, _, h, w = crop_img.shape
scale_factor = min(target_size / w, target_size / h)
crop_img = F.interpolate(crop_img, scale_factor=scale_factor, mode="bilinear")

_, _, h, w = crop_img.shape
pad_left = (target_size - w) // 2
pad_top = (target_size - h) // 2
pad_right = target_size - w - pad_left
pad_bottom = target_size - h - pad_top
crop_img = F.pad(
crop_img,
(pad_left, pad_right, pad_top, pad_bottom),
mode="constant",
value=0,
)

resize_img = normalize_rgb_tensor(crop_img)
def generator():
for img, bbox in zip(images, bboxes):
# Calculate crop region coordinates
left = max(0, int(bbox[0] - bbox[2] // 2))
right = min(img.shape[1] - 1, int(bbox[0] + bbox[2] // 2))
top = max(0, int(bbox[1] - bbox[3] // 2))
bottom = min(img.shape[0] - 1, int(bbox[1] + bbox[3] // 2))
crop_img = img[top:bottom, left:right]
crop_img = torch.Tensor(crop_img).to(device).unsqueeze(0).permute(0, 3, 1, 2)

# Calculate scaling factor and resize
_, _, h, w = crop_img.shape
scale_factor = min(target_size / w, target_size / h)
crop_img = F.interpolate(crop_img, scale_factor=scale_factor, mode="bilinear")

# Calculate padding for center alignment
_, _, h, w = crop_img.shape
pad_left = (target_size - w) // 2
pad_top = (target_size - h) // 2
pad_right = target_size - w - pad_left
pad_bottom = target_size - h - pad_top
crop_img = F.pad(
crop_img,
(pad_left, pad_right, pad_top, pad_bottom),
mode="constant",
value=0,
)

resize_img = normalize_rgb_tensor(crop_img)

crop_img_list.append(resize_img)
crop_annotations.append(
(
# Yield processed image and corresponding transformation metadata
yield resize_img, (
left,
top,
pad_left,
Expand All @@ -123,9 +123,11 @@ def images_crop(images, bboxes, target_size, device=torch.device("cuda")):
target_size / scale_factor,
raw_img_size,
)
)

return crop_img_list, crop_annotations
# Create and return the generator
gen = generator()

return gen


def generate_pseudo_idx(keypoints, patch_size, n_patch, crop_annotation):
Expand Down Expand Up @@ -201,74 +203,99 @@ def parse_chunks(
k2d,
bboxes,
min_len=10,
max_len=50
):
"""If a track disappear in the middle,
we separate it to different segments
we separate it to different segments with overlapping chunks
"""
data_chunks = []
if isinstance(frame_ids, list):
frame_ids = np.array(frame_ids)

# Find all discontinuous points
step = frame_ids[1:] - frame_ids[:-1]
step = np.concatenate([[0], step])
breaks = np.where(step != 1)[0]
start = 0
for bk in breaks[1:]:
f_chunk = frame_ids[start:bk]

if len(f_chunk) >= min_len:
data_chunk = {
"frame_id": f_chunk,
"keypoints_2d": k2d[start:bk],
"bbox": bboxes[start:bk],
"rotvec": [],
"beta": [],
"loc": [],
"dist": [],
}
padded_pose_results = empty_frame_pad(pose_results[start:bk])

for pose_result in padded_pose_results:
data_chunk["rotvec"].append(pose_result["rotvec"])
data_chunk["beta"].append(pose_result["shape"])
data_chunk["loc"].append(pose_result["loc"])
data_chunk["dist"].append(pose_result["dist"])
if len(padded_pose_results) > 0:
data_chunks.append(data_chunk)
start = bk

start = breaks[-1] # last chunk
bk = len(frame_ids)
f_chunk = frame_ids[start:bk]

if len(f_chunk) >= min_len:
data_chunk = {
"frame_id": f_chunk,
"keypoints_2d": k2d[start:bk].clone().detach(),
"bbox": bboxes[start:bk].clone().detach(),
"rotvec": [],
"beta": [],
"loc": [],
"dist": [],
}
padded_pose_results = empty_frame_pad(pose_results[start:bk])
for pose_result in padded_pose_results:
data_chunk["rotvec"].append(pose_result["rotvec"])
data_chunk["beta"].append(pose_result["shape"])
data_chunk["loc"].append(pose_result["loc"])
data_chunk["dist"].append(pose_result["dist"])

if len(padded_pose_results) > 0:

data_chunks.append(data_chunk)

for data_chunk in data_chunks:

# Get all continuous segments
segments = []
start_idx = 0
for break_point in breaks[1:]:
segments.append((start_idx, break_point))
start_idx = break_point
segments.append((breaks[-1], len(frame_ids)))

# Process each continuous segment
for seg_start, seg_end in segments:
seg_length = seg_end - seg_start

if seg_length < min_len:
continue # Skip segments that are too short

# Calculate how many chunks are needed (rounding up)
num_chunks = (seg_length + max_len - 1) // max_len

if num_chunks <= 1:
# If only 1 chunk is needed, process directly
_create_and_add_chunk(
data_chunks, frame_ids, pose_results, k2d, bboxes,
seg_start, seg_end
)
else:
# Calculate overlap step size
total_length = seg_end - seg_start
overlap = max_len - (total_length - max_len) / (num_chunks - 1)
step_size = max_len - overlap

# Create overlapping chunks
for i in range(num_chunks):
chunk_start = min(seg_start + int(i * step_size), seg_end - max_len)
chunk_end = chunk_start + max_len

if chunk_end - chunk_start < min_len:
continue # Skip chunks that are too short

_create_and_add_chunk(
data_chunks, frame_ids, pose_results, k2d, bboxes,
chunk_start, chunk_end
)

return data_chunks


def _create_and_add_chunk(data_chunks, frame_ids, pose_results, k2d, bboxes, start, end):
"""Create a single chunk and add it to the list"""
f_chunk = frame_ids[start:end]

data_chunk = {
"frame_id": f_chunk,
"keypoints_2d": k2d[start:end],
"bbox": bboxes[start:end],
"rotvec": [],
"beta": [],
"loc": [],
"dist": [],
}

# Process pose_results for the current chunk
chunk_pose_results = pose_results[start:end]
padded_pose_results = empty_frame_pad(chunk_pose_results)

for pose_result in padded_pose_results:
data_chunk["rotvec"].append(pose_result["rotvec"])
data_chunk["beta"].append(pose_result["shape"])
data_chunk["loc"].append(pose_result["loc"])
data_chunk["dist"].append(pose_result["dist"])

if len(padded_pose_results) > 0:
# Convert to tensor
for key in ["rotvec", "beta", "loc", "dist"]:
try:
if data_chunk[key]:
data_chunk[key] = torch.stack(data_chunk[key])
except:
print(key)
else:
data_chunk[key] = torch.tensor([])

return data_chunks
data_chunks.append(data_chunk)


def load_models(model_path, device):
Expand All @@ -294,14 +321,14 @@ def load_models(model_path, device):

class Video2MotionPipeline:
def __init__(
self,
model_path,
fitting_steps,
device,
kp_mode="vitpose",
visualize=True,
pad_ratio=0.2,
fov=60,
self,
model_path,
fitting_steps,
device,
kp_mode="vitpose",
visualize=True,
pad_ratio=0.2,
fov=60,
):
self.MAX_RESOLUTION = 1280 * 720
self.device = device
Expand Down Expand Up @@ -357,30 +384,31 @@ def estimate_pose(self, frame_ids, frames, keypoints, bboxes, raw_K, video_lengt
bboxes = torch.tensor(bboxes, device=self.device)
bboxes = bbox_xyxy_to_cxcywh(bboxes, scale=1.5)

crop_images, crop_annotations = images_crop(
crop_annotation_gen = images_crop(
frames, bboxes, target_size=target_img_size, device=self.device
)

all_frame_results = []
i = 0
# model inference
for i, image in enumerate(crop_images):

for image, annotation in crop_annotation_gen:
# Calculate the possible search area for the primary joint (head) based on 2D keypoints
# pseudo_idx: The index of the search area center after patching
# max_dist: The maximum radius of the search area
pseudo_idx, max_dist = generate_pseudo_idx(
keypoints[i],
patch_size,
int(target_img_size / patch_size),
crop_annotations[i],
annotation,
)
humans = forward_model(
self.pose_model, image, K, pseudo_idx=pseudo_idx, max_dist=max_dist
)
target_human = track_by_area(humans, target_img_size)
target_human = project2origin_img(target_human, crop_annotations[i])
target_human = project2origin_img(target_human, annotation)

all_frame_results.append(target_human)
i += 1

# parse chunk & missed frame padding
data_chunks = parse_chunks(
Expand Down