Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions specs/tri/sharded-precomputed-test/copy.cue
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
#SRC_PATH: "https://storage.googleapis.com/fafb_v15_aligned/v0/img/img_norm"
#DST_PATH: "gs://tmp_2w/tmp_data_x0-sharded"
#BBOX: {
"@type": "BBox3D.from_coords"
start_coord: [29696, 16384, 2000]
end_coord: [29696 + 2048, 16384 + 2048, 2000 + 16]
resolution: [16, 16, 40]
}

"@type": "mazepa.execute_on_gcp_with_sqs"
worker_cluster_region: "us-east1"
worker_cluster_project: "zetta-research"
worker_cluster_name: "zutils-x3"
worker_image: "us-east1-docker.pkg.dev/zetta-research/containers-test/zetta_utils:tri-test-231107a"
worker_resources: {memory: "18560Mi"}
worker_replicas: 1
local_test: true

target: {
"@type": "build_subchunkable_apply_flow"
bbox: #BBOX
dst_resolution: [16, 16, 40]
processing_chunk_sizes: [[1024, 1024, 16]] // create 16M shards
// We don't need intermediaries when https://github.com/ZettaAI/zetta_utils/issues/644 is fixed
// skip_intermediaries: true
level_intermediaries_dirs: ["file://~/.zetta_utils/tmp/"]
fn: {
"@type": "lambda"
lambda_str: "lambda src: src"
}
op_kwargs: {
src: {
"@type": "build_cv_layer"
path: #SRC_PATH
}
}
dst: {
// "@type": "build_cv_layer"
// cv_kwargs: {compress: false} // need this if using CV
"@type": "build_ts_layer"
path: #DST_PATH
info_reference_path: #SRC_PATH
on_info_exists: "overwrite"
info_chunk_size: [256, 256, 4]
info_sharding: {
// "@type": "neuroglancer_uint64_sharded_v1" // not needed - zutils will add this if missing
hash: "identity"
minishard_index_encoding: "gzip"
data_encoding: "gzip"

// Best practice:
// `processing_chunk_sizes` need to contain whole shards
// -> processing_chunk_sizes is multiples of (chunk_size << preshift_bits)
// Minishard index should be < 32KB. Which means <= 1500 chunks, each 24B, per minishard
// -> preshift_bits <= 10
// Shard index should be < 8KB, so up to 512 minishards (each 16B)
// -> minishard_bits <= 9
// Uncompressed size at different preshift_bits+minishard_bits when chunks are 256K:
// 3b = 2M, 6b = 16M, 9b = 128M shards

preshift_bits: 6 // [256, 256, 4] << preshift_bits = [1024, 1024, 16]
minishard_bits: 0 // No need to have more than 1 minishard since preshift_bits is < 10
shard_bits: 21 // Just need preshift_bits+minishard_bits+shard_bits <= 64
}
// remove unneeded scales
info_add_scales: [dst_resolution]
info_add_scales_mode: "replace"
}
}
5 changes: 4 additions & 1 deletion zetta_utils/layer/volumetric/cloudvol/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ def with_changes(self, **kwargs) -> CVBackend:
"chunk_size_res" = (chunk_size, resolution): Tuple[Vec3D[int], Vec3D]
"dataset_size_res" = (dataset_size, resolution): Tuple[Vec3D[int], Vec3D]
"allow_cache" = value: Union[bool, str]
"no_sharding" = value: bool
"""
assert self.info_spec is not None

Expand All @@ -255,12 +256,14 @@ def with_changes(self, **kwargs) -> CVBackend:
"voxel_offset_res",
"chunk_size_res",
"dataset_size_res",
"no_sharding",
]
keys_to_kwargs = {"name": "path"}
keys_to_infospec_fn = {
"voxel_offset_res": info_spec.set_voxel_offset,
"chunk_size_res": info_spec.set_chunk_size,
"dataset_size_res": info_spec.set_dataset_size,
"no_sharding": info_spec.set_no_sharding,
}
keys_to_cv_kwargs = {
"use_compression": "compress",
Expand All @@ -278,7 +281,7 @@ def with_changes(self, **kwargs) -> CVBackend:
if k in keys_to_kwargs:
evolve_kwargs[keys_to_kwargs[k]] = v
if k in keys_to_infospec_fn:
keys_to_infospec_fn[k](v)
keys_to_infospec_fn[k](v) # type: ignore

if "name" in kwargs:
_clear_cv_cache(kwargs["name"])
Expand Down
6 changes: 6 additions & 0 deletions zetta_utils/layer/volumetric/cloudvol/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ def build_cv_layer( # pylint: disable=too-many-locals
info_dataset_size_map: dict[str, Sequence[int]] | None = None,
info_voxel_offset: Sequence[int] | None = None,
info_voxel_offset_map: dict[str, Sequence[int]] | None = None,
info_sharding: dict[str, Any] | None = None,
info_sharding_map: dict[str, dict[str, Any]] | None = None,
info_add_scales: Sequence[Sequence[int] | dict[str, Any]] | None = None,
info_add_scales_ref: str | dict[str, Any] | None = None,
info_add_scales_mode: Literal["merge", "replace"] = "merge",
Expand Down Expand Up @@ -74,6 +76,8 @@ def build_cv_layer( # pylint: disable=too-many-locals
:param info_dataset_size_map: Precomputed dataset size for each resolution.
:param info_voxel_offset: Precomputed voxel offset for all scales.
:param info_voxel_offset_map: Precomputed voxel offset for each resolution.
:param info_sharding: Precomputed sharding spec for all scales.
:param info_sharding_map: Precomputed sharding spec for each resolution.
:param info_add_scales: List of scales to be added based on ``info_add_scales_ref``
Each entry can be either a resolution (e.g., [4, 4, 40]) or a partially filled
Precomputed scale. By default, ``size`` and ``voxel_offset`` will be scaled
Expand Down Expand Up @@ -115,6 +119,8 @@ def build_cv_layer( # pylint: disable=too-many-locals
dataset_size_map=info_dataset_size_map,
default_voxel_offset=info_voxel_offset,
voxel_offset_map=info_voxel_offset_map,
default_sharding=info_sharding,
sharding_map=info_sharding_map,
add_scales=info_add_scales,
add_scales_ref=info_add_scales_ref,
add_scales_mode=info_add_scales_mode,
Expand Down
25 changes: 24 additions & 1 deletion zetta_utils/layer/volumetric/precomputed/precomputed.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
_info_cache: cachetools.LRUCache = cachetools.LRUCache(maxsize=500)
_info_hash_key = hashkey


# wrapper to cache using absolute paths with '/info'.
# invalidates the cached infofile if the infofile is local and has since been deleted.
def get_info(path: str) -> Dict[str, Any]:
Expand Down Expand Up @@ -109,6 +110,8 @@ def _make_scale(ref: dict[str, Any], target: Sequence[int] | dict[str, Any]) ->
ret["chunk_sizes"] = ref["chunk_sizes"]
if "encoding" not in ret:
ret["encoding"] = ref["encoding"]
if "sharding" in ref:
ret["sharding"] = ref["sharding"]

# check and convert values to int
errored = _check_seq_is_int(ret["size"])
Expand Down Expand Up @@ -140,13 +143,16 @@ class PrecomputedInfoSpec:
default_chunk_size: Sequence[int] | None = None
default_voxel_offset: Sequence[int] | None = None
default_dataset_size: Sequence[int] | None = None
default_sharding: dict[str, Any] | None = None
chunk_size_map: dict[str, Sequence[int]] | None = None
voxel_offset_map: dict[str, Sequence[int]] | None = None
dataset_size_map: dict[str, Sequence[int]] | None = None
sharding_map: dict[str, dict[str, Any]] | None = None
data_type: str | None = None
add_scales: Sequence[Sequence[int] | dict[str, Any]] | None = None
add_scales_ref: str | dict[str, Any] | None = None
add_scales_mode: str = "merge"
no_sharding: bool = False
# ensure_scales: Optional[Iterable[int]] = None

def set_voxel_offset(self, voxel_offset_and_res: Tuple[Vec3D[int], Vec3D]) -> None:
Expand All @@ -171,6 +177,9 @@ def set_dataset_size(self, dataset_size_and_res: Tuple[Vec3D[int], Vec3D]) -> No
self.dataset_size_map = {}
self.dataset_size_map[key] = dataset_size

def set_no_sharding(self, val: bool) -> None:
self.no_sharding = val

def make_info( # pylint: disable=too-many-branches, consider-iterating-dictionary
self,
) -> Optional[Dict[str, Any]]:
Expand Down Expand Up @@ -227,9 +236,23 @@ def make_info( # pylint: disable=too-many-branches, consider-iterating-dictiona
for e in result["scales"]:
if e["key"] in self.dataset_size_map.keys():
e["size"] = [*self.dataset_size_map[e["key"]]]
if self.default_sharding is not None:
for e in result["scales"]:
e["sharding"] = self.default_sharding
if self.sharding_map is not None:
for e in result["scales"]:
if e["key"] in self.sharding_map.keys():
e["sharding"] = self.sharding_map[e["key"]]
if self.data_type is not None:
result["data_type"] = self.data_type

if self.no_sharding:
for e in result["scales"]:
e.pop("sharding", None)
else:
# make sure that "@type" is present
for e in result["scales"]:
if "sharding" in e and "@type" not in e["sharding"]:
e["sharding"] = {"@type": "neuroglancer_uint64_sharded_v1"} | e["sharding"]
# if self.ensure_scales is not None: # pragma: no cover
# raise NotImplementedError()

Expand Down
5 changes: 4 additions & 1 deletion zetta_utils/layer/volumetric/tensorstore/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ def with_changes(self, **kwargs) -> TSBackend:
"voxel_offset_res" = (voxel_offset, resolution): Tuple[Vec3D[int], Vec3D]
"chunk_size_res" = (chunk_size, resolution): Tuple[Vec3D[int], Vec3D]
"dataset_size_res" = (dataset_size, resolution): Tuple[Vec3D[int], Vec3D]
"no_sharding" = value: bool
"""
# TODO: implement proper allow_cache logic
assert self.info_spec is not None
Expand All @@ -256,12 +257,14 @@ def with_changes(self, **kwargs) -> TSBackend:
"chunk_size_res",
"dataset_size_res",
"use_compression",
"no_sharding",
]
keys_to_kwargs = {"name": "path"}
keys_to_infospec_fn = {
"voxel_offset_res": info_spec.set_voxel_offset,
"chunk_size_res": info_spec.set_chunk_size,
"dataset_size_res": info_spec.set_dataset_size,
"no_sharding": info_spec.set_no_sharding,
}
keys_to_assert = {"enforce_chunk_aligned_writes": False}
evolve_kwargs = {}
Expand All @@ -271,7 +274,7 @@ def with_changes(self, **kwargs) -> TSBackend:
if k in keys_to_kwargs:
evolve_kwargs[keys_to_kwargs[k]] = v
if k in keys_to_infospec_fn:
keys_to_infospec_fn[k](v)
keys_to_infospec_fn[k](v) # type: ignore
if k in keys_to_assert:
if v != keys_to_assert[k]:
raise ValueError(
Expand Down
26 changes: 25 additions & 1 deletion zetta_utils/layer/volumetric/tensorstore/build.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# pylint: disable=missing-docstring
from __future__ import annotations

from typing import Any, Iterable, Sequence, Union
from typing import Any, Iterable, Literal, Sequence, Union

import torch

Expand Down Expand Up @@ -32,6 +32,11 @@ def build_ts_layer( # pylint: disable=too-many-locals
info_dataset_size_map: dict[str, Sequence[int]] | None = None,
info_voxel_offset: Sequence[int] | None = None,
info_voxel_offset_map: dict[str, Sequence[int]] | None = None,
info_sharding: dict[str, Any] | None = None,
info_sharding_map: dict[str, dict[str, Any]] | None = None,
info_add_scales: Sequence[Sequence[int] | dict[str, Any]] | None = None,
info_add_scales_ref: str | dict[str, Any] | None = None,
info_add_scales_mode: Literal["merge", "replace"] = "merge",
on_info_exists: InfoExistsModes = "expect_same",
cache_bytes_limit: int | None = None,
allow_slice_rounding: bool = False,
Expand Down Expand Up @@ -69,6 +74,20 @@ def build_ts_layer( # pylint: disable=too-many-locals
:param info_dataset_size_map: Precomputed dataset size for each resolution.
:param info_voxel_offset: Precomputed voxel offset for all scales.
:param info_voxel_offset_map: Precomputed voxel offset for each resolution.
:param info_sharding: Precomputed sharding spec for all scales.
:param info_sharding_map: Precomputed sharding spec for each resolution.
:param info_add_scales: List of scales to be added based on ``info_add_scales_ref``
Each entry can be either a resolution (e.g., [4, 4, 40]) or a partially filled
Precomputed scale. By default, ``size`` and ``voxel_offset`` will be scaled
accordingly to the reference scale, while keeping ``chunk_sizes`` the same.
Note that using ``info_[chunk_size,dataset_size,voxel_offset][_map]`` will
override these values. Using this will also sort the added and existing scales
by their resolutions.
:param info_add_scales_ref: Reference scale to be used. If `None`, use
the highest available resolution scale.
:param info_add_scales_mode: Either "merge" or "replace". "merge" will
merge added scales to existing scales if ``info_reference_path`` is
used, while "replace" will not keep them.
:param on_info_exists: Behavior mode for when both new info specs aregiven
and layer info already exists.
:param allow_slice_rounding: Whether layer allows IO operations where the specified index
Expand All @@ -95,6 +114,11 @@ def build_ts_layer( # pylint: disable=too-many-locals
dataset_size_map=info_dataset_size_map,
default_voxel_offset=info_voxel_offset,
voxel_offset_map=info_voxel_offset_map,
default_sharding=info_sharding,
sharding_map=info_sharding_map,
add_scales=info_add_scales,
add_scales_ref=info_add_scales_ref,
add_scales_mode=info_add_scales_mode,
),
cache_bytes_limit=cache_bytes_limit,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ def _get_temp_dst(
enforce_chunk_aligned_writes=False,
allow_cache=allow_cache,
use_compression=False,
no_sharding=True,
)
return dst.with_procs(read_procs=()).with_changes(backend=backend_temp)

Expand Down