diff --git a/specs/tri/sharded-precomputed-test/copy.cue b/specs/tri/sharded-precomputed-test/copy.cue new file mode 100644 index 000000000..8b31c6d32 --- /dev/null +++ b/specs/tri/sharded-precomputed-test/copy.cue @@ -0,0 +1,69 @@ +#SRC_PATH: "https://storage.googleapis.com/fafb_v15_aligned/v0/img/img_norm" +#DST_PATH: "gs://tmp_2w/tmp_data_x0-sharded" +#BBOX: { + "@type": "BBox3D.from_coords" + start_coord: [29696, 16384, 2000] + end_coord: [29696 + 2048, 16384 + 2048, 2000 + 16] + resolution: [16, 16, 40] +} + +"@type": "mazepa.execute_on_gcp_with_sqs" +worker_cluster_region: "us-east1" +worker_cluster_project: "zetta-research" +worker_cluster_name: "zutils-x3" +worker_image: "us-east1-docker.pkg.dev/zetta-research/containers-test/zetta_utils:tri-test-231107a" +worker_resources: {memory: "18560Mi"} +worker_replicas: 1 +local_test: true + +target: { + "@type": "build_subchunkable_apply_flow" + bbox: #BBOX + dst_resolution: [16, 16, 40] + processing_chunk_sizes: [[1024, 1024, 16]] // create 16M shards + // We don't need intermediaries when https://github.com/ZettaAI/zetta_utils/issues/644 is fixed + // skip_intermediaries: true + level_intermediaries_dirs: ["file://~/.zetta_utils/tmp/"] + fn: { + "@type": "lambda" + lambda_str: "lambda src: src" + } + op_kwargs: { + src: { + "@type": "build_cv_layer" + path: #SRC_PATH + } + } + dst: { + // "@type": "build_cv_layer" + // cv_kwargs: {compress: false} // need this if using CV + "@type": "build_ts_layer" + path: #DST_PATH + info_reference_path: #SRC_PATH + on_info_exists: "overwrite" + info_chunk_size: [256, 256, 4] + info_sharding: { + // "@type": "neuroglancer_uint64_sharded_v1" // not needed - zutils will add this if missing + hash: "identity" + minishard_index_encoding: "gzip" + data_encoding: "gzip" + + // Best practice: + // `processing_chunk_sizes` need to contain whole shards + // -> processing_chunk_sizes is multiples of (chunk_size << preshift_bits) + // Minishard index should be < 32KB. Which means <= 1500 chunks, each 24B, per minishard + // -> preshift_bits <= 10 + // Shard index should be < 8KB, so up to 512 minishards (each 16B) + // -> minishard_bits <= 9 + // Uncompressed size at different preshift_bits+minishard_bits when chunks are 256K: + // 3b = 2M, 6b = 16M, 9b = 128M shards + + preshift_bits: 6 // [256, 256, 4] << preshift_bits = [1024, 1024, 16] + minishard_bits: 0 // No need to have more than 1 minishard since preshift_bits is < 10 + shard_bits: 21 // Just need preshift_bits+minishard_bits+shard_bits <= 64 + } + // remove unneeded scales + info_add_scales: [dst_resolution] + info_add_scales_mode: "replace" + } +} diff --git a/zetta_utils/layer/volumetric/cloudvol/backend.py b/zetta_utils/layer/volumetric/cloudvol/backend.py index ad08aa815..a076a6971 100644 --- a/zetta_utils/layer/volumetric/cloudvol/backend.py +++ b/zetta_utils/layer/volumetric/cloudvol/backend.py @@ -241,6 +241,7 @@ def with_changes(self, **kwargs) -> CVBackend: "chunk_size_res" = (chunk_size, resolution): Tuple[Vec3D[int], Vec3D] "dataset_size_res" = (dataset_size, resolution): Tuple[Vec3D[int], Vec3D] "allow_cache" = value: Union[bool, str] + "no_sharding" = value: bool """ assert self.info_spec is not None @@ -255,12 +256,14 @@ def with_changes(self, **kwargs) -> CVBackend: "voxel_offset_res", "chunk_size_res", "dataset_size_res", + "no_sharding", ] keys_to_kwargs = {"name": "path"} keys_to_infospec_fn = { "voxel_offset_res": info_spec.set_voxel_offset, "chunk_size_res": info_spec.set_chunk_size, "dataset_size_res": info_spec.set_dataset_size, + "no_sharding": info_spec.set_no_sharding, } keys_to_cv_kwargs = { "use_compression": "compress", @@ -278,7 +281,7 @@ def with_changes(self, **kwargs) -> CVBackend: if k in keys_to_kwargs: evolve_kwargs[keys_to_kwargs[k]] = v if k in keys_to_infospec_fn: - keys_to_infospec_fn[k](v) + keys_to_infospec_fn[k](v) # type: ignore if "name" in kwargs: _clear_cv_cache(kwargs["name"]) diff --git a/zetta_utils/layer/volumetric/cloudvol/build.py b/zetta_utils/layer/volumetric/cloudvol/build.py index 0e970cc8f..ebadcf449 100644 --- a/zetta_utils/layer/volumetric/cloudvol/build.py +++ b/zetta_utils/layer/volumetric/cloudvol/build.py @@ -34,6 +34,8 @@ def build_cv_layer( # pylint: disable=too-many-locals info_dataset_size_map: dict[str, Sequence[int]] | None = None, info_voxel_offset: Sequence[int] | None = None, info_voxel_offset_map: dict[str, Sequence[int]] | None = None, + info_sharding: dict[str, Any] | None = None, + info_sharding_map: dict[str, dict[str, Any]] | None = None, info_add_scales: Sequence[Sequence[int] | dict[str, Any]] | None = None, info_add_scales_ref: str | dict[str, Any] | None = None, info_add_scales_mode: Literal["merge", "replace"] = "merge", @@ -74,6 +76,8 @@ def build_cv_layer( # pylint: disable=too-many-locals :param info_dataset_size_map: Precomputed dataset size for each resolution. :param info_voxel_offset: Precomputed voxel offset for all scales. :param info_voxel_offset_map: Precomputed voxel offset for each resolution. + :param info_sharding: Precomputed sharding spec for all scales. + :param info_sharding_map: Precomputed sharding spec for each resolution. :param info_add_scales: List of scales to be added based on ``info_add_scales_ref`` Each entry can be either a resolution (e.g., [4, 4, 40]) or a partially filled Precomputed scale. By default, ``size`` and ``voxel_offset`` will be scaled @@ -115,6 +119,8 @@ def build_cv_layer( # pylint: disable=too-many-locals dataset_size_map=info_dataset_size_map, default_voxel_offset=info_voxel_offset, voxel_offset_map=info_voxel_offset_map, + default_sharding=info_sharding, + sharding_map=info_sharding_map, add_scales=info_add_scales, add_scales_ref=info_add_scales_ref, add_scales_mode=info_add_scales_mode, diff --git a/zetta_utils/layer/volumetric/precomputed/precomputed.py b/zetta_utils/layer/volumetric/precomputed/precomputed.py index d0618cb03..1de01a3b2 100644 --- a/zetta_utils/layer/volumetric/precomputed/precomputed.py +++ b/zetta_utils/layer/volumetric/precomputed/precomputed.py @@ -19,6 +19,7 @@ _info_cache: cachetools.LRUCache = cachetools.LRUCache(maxsize=500) _info_hash_key = hashkey + # wrapper to cache using absolute paths with '/info'. # invalidates the cached infofile if the infofile is local and has since been deleted. def get_info(path: str) -> Dict[str, Any]: @@ -109,6 +110,8 @@ def _make_scale(ref: dict[str, Any], target: Sequence[int] | dict[str, Any]) -> ret["chunk_sizes"] = ref["chunk_sizes"] if "encoding" not in ret: ret["encoding"] = ref["encoding"] + if "sharding" in ref: + ret["sharding"] = ref["sharding"] # check and convert values to int errored = _check_seq_is_int(ret["size"]) @@ -140,13 +143,16 @@ class PrecomputedInfoSpec: default_chunk_size: Sequence[int] | None = None default_voxel_offset: Sequence[int] | None = None default_dataset_size: Sequence[int] | None = None + default_sharding: dict[str, Any] | None = None chunk_size_map: dict[str, Sequence[int]] | None = None voxel_offset_map: dict[str, Sequence[int]] | None = None dataset_size_map: dict[str, Sequence[int]] | None = None + sharding_map: dict[str, dict[str, Any]] | None = None data_type: str | None = None add_scales: Sequence[Sequence[int] | dict[str, Any]] | None = None add_scales_ref: str | dict[str, Any] | None = None add_scales_mode: str = "merge" + no_sharding: bool = False # ensure_scales: Optional[Iterable[int]] = None def set_voxel_offset(self, voxel_offset_and_res: Tuple[Vec3D[int], Vec3D]) -> None: @@ -171,6 +177,9 @@ def set_dataset_size(self, dataset_size_and_res: Tuple[Vec3D[int], Vec3D]) -> No self.dataset_size_map = {} self.dataset_size_map[key] = dataset_size + def set_no_sharding(self, val: bool) -> None: + self.no_sharding = val + def make_info( # pylint: disable=too-many-branches, consider-iterating-dictionary self, ) -> Optional[Dict[str, Any]]: @@ -227,9 +236,23 @@ def make_info( # pylint: disable=too-many-branches, consider-iterating-dictiona for e in result["scales"]: if e["key"] in self.dataset_size_map.keys(): e["size"] = [*self.dataset_size_map[e["key"]]] + if self.default_sharding is not None: + for e in result["scales"]: + e["sharding"] = self.default_sharding + if self.sharding_map is not None: + for e in result["scales"]: + if e["key"] in self.sharding_map.keys(): + e["sharding"] = self.sharding_map[e["key"]] if self.data_type is not None: result["data_type"] = self.data_type - + if self.no_sharding: + for e in result["scales"]: + e.pop("sharding", None) + else: + # make sure that "@type" is present + for e in result["scales"]: + if "sharding" in e and "@type" not in e["sharding"]: + e["sharding"] = {"@type": "neuroglancer_uint64_sharded_v1"} | e["sharding"] # if self.ensure_scales is not None: # pragma: no cover # raise NotImplementedError() diff --git a/zetta_utils/layer/volumetric/tensorstore/backend.py b/zetta_utils/layer/volumetric/tensorstore/backend.py index 11305cbd4..d255ba547 100644 --- a/zetta_utils/layer/volumetric/tensorstore/backend.py +++ b/zetta_utils/layer/volumetric/tensorstore/backend.py @@ -242,6 +242,7 @@ def with_changes(self, **kwargs) -> TSBackend: "voxel_offset_res" = (voxel_offset, resolution): Tuple[Vec3D[int], Vec3D] "chunk_size_res" = (chunk_size, resolution): Tuple[Vec3D[int], Vec3D] "dataset_size_res" = (dataset_size, resolution): Tuple[Vec3D[int], Vec3D] + "no_sharding" = value: bool """ # TODO: implement proper allow_cache logic assert self.info_spec is not None @@ -256,12 +257,14 @@ def with_changes(self, **kwargs) -> TSBackend: "chunk_size_res", "dataset_size_res", "use_compression", + "no_sharding", ] keys_to_kwargs = {"name": "path"} keys_to_infospec_fn = { "voxel_offset_res": info_spec.set_voxel_offset, "chunk_size_res": info_spec.set_chunk_size, "dataset_size_res": info_spec.set_dataset_size, + "no_sharding": info_spec.set_no_sharding, } keys_to_assert = {"enforce_chunk_aligned_writes": False} evolve_kwargs = {} @@ -271,7 +274,7 @@ def with_changes(self, **kwargs) -> TSBackend: if k in keys_to_kwargs: evolve_kwargs[keys_to_kwargs[k]] = v if k in keys_to_infospec_fn: - keys_to_infospec_fn[k](v) + keys_to_infospec_fn[k](v) # type: ignore if k in keys_to_assert: if v != keys_to_assert[k]: raise ValueError( diff --git a/zetta_utils/layer/volumetric/tensorstore/build.py b/zetta_utils/layer/volumetric/tensorstore/build.py index 42b64e423..b11188666 100644 --- a/zetta_utils/layer/volumetric/tensorstore/build.py +++ b/zetta_utils/layer/volumetric/tensorstore/build.py @@ -1,7 +1,7 @@ # pylint: disable=missing-docstring from __future__ import annotations -from typing import Any, Iterable, Sequence, Union +from typing import Any, Iterable, Literal, Sequence, Union import torch @@ -32,6 +32,11 @@ def build_ts_layer( # pylint: disable=too-many-locals info_dataset_size_map: dict[str, Sequence[int]] | None = None, info_voxel_offset: Sequence[int] | None = None, info_voxel_offset_map: dict[str, Sequence[int]] | None = None, + info_sharding: dict[str, Any] | None = None, + info_sharding_map: dict[str, dict[str, Any]] | None = None, + info_add_scales: Sequence[Sequence[int] | dict[str, Any]] | None = None, + info_add_scales_ref: str | dict[str, Any] | None = None, + info_add_scales_mode: Literal["merge", "replace"] = "merge", on_info_exists: InfoExistsModes = "expect_same", cache_bytes_limit: int | None = None, allow_slice_rounding: bool = False, @@ -69,6 +74,20 @@ def build_ts_layer( # pylint: disable=too-many-locals :param info_dataset_size_map: Precomputed dataset size for each resolution. :param info_voxel_offset: Precomputed voxel offset for all scales. :param info_voxel_offset_map: Precomputed voxel offset for each resolution. + :param info_sharding: Precomputed sharding spec for all scales. + :param info_sharding_map: Precomputed sharding spec for each resolution. + :param info_add_scales: List of scales to be added based on ``info_add_scales_ref`` + Each entry can be either a resolution (e.g., [4, 4, 40]) or a partially filled + Precomputed scale. By default, ``size`` and ``voxel_offset`` will be scaled + accordingly to the reference scale, while keeping ``chunk_sizes`` the same. + Note that using ``info_[chunk_size,dataset_size,voxel_offset][_map]`` will + override these values. Using this will also sort the added and existing scales + by their resolutions. + :param info_add_scales_ref: Reference scale to be used. If `None`, use + the highest available resolution scale. + :param info_add_scales_mode: Either "merge" or "replace". "merge" will + merge added scales to existing scales if ``info_reference_path`` is + used, while "replace" will not keep them. :param on_info_exists: Behavior mode for when both new info specs aregiven and layer info already exists. :param allow_slice_rounding: Whether layer allows IO operations where the specified index @@ -95,6 +114,11 @@ def build_ts_layer( # pylint: disable=too-many-locals dataset_size_map=info_dataset_size_map, default_voxel_offset=info_voxel_offset, voxel_offset_map=info_voxel_offset_map, + default_sharding=info_sharding, + sharding_map=info_sharding_map, + add_scales=info_add_scales, + add_scales_ref=info_add_scales_ref, + add_scales_mode=info_add_scales_mode, ), cache_bytes_limit=cache_bytes_limit, ) diff --git a/zetta_utils/mazepa_layer_processing/common/volumetric_apply_flow.py b/zetta_utils/mazepa_layer_processing/common/volumetric_apply_flow.py index 27ad42b59..666c8c2de 100644 --- a/zetta_utils/mazepa_layer_processing/common/volumetric_apply_flow.py +++ b/zetta_utils/mazepa_layer_processing/common/volumetric_apply_flow.py @@ -376,6 +376,7 @@ def _get_temp_dst( enforce_chunk_aligned_writes=False, allow_cache=allow_cache, use_compression=False, + no_sharding=True, ) return dst.with_procs(read_procs=()).with_changes(backend=backend_temp)