Horizontal Scaling via Spatial Chunking¶
Overview¶
OCR uses a spatial chunking system that enables embarrassingly parallel processing of large-scale CONUS fire risk datasets at 30m resolution. Similar to the serverless approach described in "A Serverless Approach to Building Planetary-Scale EO Datacubes", OCR breaks down the complete CONUS dataset into manageable, independently processable regions.
This approach provides several key advantages:
- Parallelization: Process hundreds of regions simultaneously across distributed compute resources
- Fault tolerance: Failed regions can be reprocessed independently without affecting others
- Resource efficiency: Match chunk sizes to available memory and compute resources
- Incremental processing: Process and commit regions as they complete, enabling progress tracking
Core Concepts¶
The spatial chunking system is built around the ChunkingConfig class, which provides:
- Chunk definition and layout: Defines the spatial grid dividing CONUS into processable regions
- Spatial indexing: Rapidly identifies chunks intersecting with regions of interest
- Coordinate transformations: Converts between pixel indices, geographic coordinates, and chunk IDs
- Visualization capabilities: Displays chunk layouts and spatial relationships
Architecture¶
graph TB
A[CONUS 30m Dataset] --> B[ChunkingConfig]
B --> C[Spatial Grid Definition]
B --> D[Coordinate System]
B --> E[Region ID Mapping]
C --> F[595 Processing Regions]
D --> G[EPSG:4326 Transform]
E --> H[Region ID ↔ Chunk ID ↔ Lat/Lon Slices]
F --> I[Parallel Processing]
G --> I
H --> I
I --> J[Region 0: y0_x0]
I --> K[Region 1: y0_x1]
I --> L[Region N: yM_xN]
J --> M[Icechunk Store]
K --> M
L --> M
style A fill:#e1f5ff
style M fill:#ffe1e1
style I fill:#fff4e1
Chunking Configuration¶
Let's explore the chunking configuration using the ChunkingConfig class.
from ocr.config import ChunkingConfig
config = ChunkingConfig(debug=True)
config
<POLYGON ((-64.054 22.428, -64.054 52.482, -128.387 52.482, -128.387 22.428,...>
Template Dataset¶
The chunking system uses the scott-et-al-2024-30m-4326 dataset as a template to establish the spatial reference framework:
| Property | Value |
|---|---|
| Resolution | 30m (~0.0002778°) |
| Projection | EPSG:4326 (WGS84) |
| Total Chunks | 799 regions |
| Chunk Size | 6000 × 4500 pixels |
| Dimensions | latitude × longitude |
ChunkingConfig uses this dataset as a template to compute properties (e.g. CONUS bounds, transform, chunk information) that are used to convert between chunk IDs and spatial bounds.
config.ds.odc.geobox
GeoBox
WKT
ENSEMBLE["World Geodetic System 1984 ensemble",
MEMBER["World Geodetic System 1984 (Transit)"],
MEMBER["World Geodetic System 1984 (G730)"],
MEMBER["World Geodetic System 1984 (G873)"],
MEMBER["World Geodetic System 1984 (G1150)"],
MEMBER["World Geodetic System 1984 (G1674)"],
MEMBER["World Geodetic System 1984 (G1762)"],
MEMBER["World Geodetic System 1984 (G2139)"],
MEMBER["World Geodetic System 1984 (G2296)"],
ELLIPSOID["WGS 84",6378137,298.257223563,
LENGTHUNIT["metre",1]],
ENSEMBLEACCURACY[2.0]],
PRIMEM["Greenwich",0,
ANGLEUNIT["degree",0.0174532925199433]],
CS[ellipsoidal,2],
AXIS["geodetic latitude (Lat)",north,
ORDER[1],
ANGLEUNIT["degree",0.0174532925199433]],
AXIS["geodetic longitude (Lon)",east,
ORDER[2],
ANGLEUNIT["degree",0.0174532925199433]],
USAGE[
SCOPE["Horizontal component of 3D system."],
AREA["World."],
BBOX[-90,-180,90,180]],
ID["EPSG",4326]]
Coordinate System¶
The system maintains an affine transformation matrix that enables bidirectional conversion between:
- Pixel indices (array coordinates)
- Geographic coordinates (EPSG:4326 longitude/latitude)
- Chunk IDs (region identifiers)
This transformation is crucial for:
- Converting bounding boxes to chunk selections
- Extracting geographic extents from pixel regions
- Validating spatial queries and data alignment
config.plot_all_chunks()
In the plot above, we can see the chunk layout of the USFS-wildfire-risk-communities dataset. The dataset has 595 chunks, each with size (6000, 4500). The chunks are arranged in a grid layout, with each chunk covering a specific geographic area.
Let's examine the dataset structure and verify the number of chunks:
config.ds
<xarray.Dataset> Size: 82GB
Dimensions: (latitude: 97579, longitude: 208881)
Coordinates:
* latitude (latitude) float64 781kB 22.43 22.43 22.43 ... 52.48 52.48
* longitude (longitude) float64 2MB -128.4 -128.4 -128.4 ... -64.05 -64.05
spatial_ref int32 4B 4326
Data variables:
CRPS (latitude, longitude) float32 82GB dask.array<chunksize=(6000, 4500), meta=np.ndarray>
Attributes:
title: RDS-2020-0016-02
version: 2024-V2
data_source: https://www.fs.usda.gov/rds/archive/catalog/RDS-2020-0016-2
description: Wildfire Risk to Communities: Spatial datasets of landscape...
EPSG: 4326
resolution: 30mprint(f'Number of dask/zarr chunks: {config.ds.CRPS.data.npartitions}')
Number of dask/zarr chunks: 799
config.region_id_to_latlon_slices('y5_x14')
(slice(np.float64(31.667930580724892), np.float64(33.515893851345204), None), slice(np.float64(-108.98394187791843), np.float64(-107.59796942495319), None))
Region Identification System¶
Region ID Format¶
Each processing region is identified by a unique string ID:
y{iy}_x{ix}
Where:
iy: Index along the y-dimension (latitude), ranging from 0 to number of y-chunks - 1ix: Index along the x-dimension (longitude), ranging from 0 to number of x-chunks - 1
Example: y5_x14 refers to the chunk at row 5, column 14 of the spatial grid.
Region ID Lookup Process¶
flowchart LR
A[Region ID
y5_x14] --> B[Chunk Index
iy=5, ix=14]
B --> C[Pixel Slices
y: 30000-36000
x: 63000-67500]
C --> D[Geographic Bounds
lat: 31.6° - 33.5°
lon: -108.9° - -107.5°]
style A fill:#e1f5ff
style D fill:#ffe1e1
Valid Region Filtering¶
Not all regions in the grid contain valid data. The system maintains a list of valid region IDs that:
- Contain non-null data (not entirely ocean or outside CONUS)
- Have been precomputed to avoid processing empty regions
- Are checked during pipeline execution to skip invalid regions
Spatial Indexing¶
Bounding Box Queries¶
The chunking system can efficiently identify which chunks intersect with arbitrary geographic bounding boxes. Let's demonstrate this with several U.S. states:
We've defined bounding boxes for three different regions:
- Colorado (co_bbox): A rectangular region covering the state of Colorado
- California (ca_bbox): A rectangular region covering the state of California
- Arkansas (ar_bbox): A rectangular region covering the state of Arkansas
co_bbox = config.bbox_from_wgs84(-109.059196, 36.992751, -102.042126, 41.001982)
ca_bbox = config.bbox_from_wgs84(
-124.41060660766607, 32.5342307609976, -114.13445790587905, 42.00965914828148
)
ar_bbox = config.bbox_from_wgs84(
-94.61946646626465, 33.00413641175411, -89.65547287402873, 36.49965029279292
)
# Let's display the bounding boxes to see their coordinates
print('Colorado bbox:', co_bbox)
print('California bbox:', ca_bbox)
print('Arkansas bbox:', ar_bbox)
Colorado bbox: POLYGON ((-102.042126 36.992751, -102.042126 41.001982, -109.059196 41.001982, -109.059196 36.992751, -102.042126 36.992751)) California bbox: POLYGON ((-114.13445790587905 32.5342307609976, -114.13445790587905 42.00965914828148, -124.41060660766607 42.00965914828148, -124.41060660766607 32.5342307609976, -114.13445790587905 32.5342307609976)) Arkansas bbox: POLYGON ((-89.65547287402873 33.00413641175411, -89.65547287402873 36.49965029279292, -94.61946646626465 36.49965029279292, -94.61946646626465 33.00413641175411, -89.65547287402873 33.00413641175411))
Spatial Query Workflow¶
sequenceDiagram
participant User
participant Config as ChunkingConfig
participant Index as Spatial Index
participant Store as Data Store
User->>Config: bbox_from_wgs84(lon, lat bounds)
Config->>Index: Compute intersecting chunks
Index->>Index: Transform bounds to pixel space
Index->>Index: Identify overlapping chunks
Index-->>Config: Return chunk IDs
Config-->>User: ['y7_x12', 'y8_x13', ...]
User->>Store: Process only selected chunks
Store-->>User: Targeted results
The next cell will:
- Identify all chunks that intersect with each bounding box using
get_chunks_for_bbox() - Visualize these chunks on the CONUS map to demonstrate how our chunking system can efficiently select only the data chunks needed for specific geographic regions
This illustrates a key advantage of the spatial chunking system; rather than processing the entire CONUS dataset, we can quickly identify and process only the chunks that contain our regions of interest. This is particularly useful for:
- Targeted processing: Only compute regions of interest
- Debugging workflows: Re-run specific geographic areas
- Incremental updates: Process new regions without reprocessing everything
co_chunks = config.get_chunks_for_bbox(co_bbox)
ca_chunks = config.get_chunks_for_bbox(ca_bbox)
ar_chunks = config.get_chunks_for_bbox(ar_bbox)
config.visualize_chunks_on_conus(chunks=ca_chunks + co_chunks + ar_chunks)
Fine-Grained Spatial Queries¶
In the cell below, we identify the chunks that intersect with the Eaton fire in California, demonstrating the system's ability to work at very fine spatial scales.
eaton_fire_bbox = config.bbox_from_wgs84(
-118.16359507363843, 34.1609756217009, -118.01833468855428, 34.23216929604394
)
eaton_fire_chunks = config.get_chunks_for_bbox(eaton_fire_bbox)
config.visualize_chunks_on_conus(
eaton_fire_chunks, highlight_chunks=eaton_fire_chunks, include_all_chunks=True
)
Parallel Processing Framework¶
Processing Pipeline¶
Like the serverless approach described in the blog post, our system enables embarrassingly parallel processing of chunks.
graph TB
subgraph "Input"
A[Valid Region IDs]
end
subgraph "Orchestration"
B[Task Scheduler
Coiled/Dask]
end
subgraph "Parallel Workers"
C1[Worker 1
Region y0_x5]
C2[Worker 2
Region y1_x8]
C3[Worker 3
Region y2_x12]
CN[Worker N
Region yM_xN]
end
subgraph "Processing"
D1[Calculate Risk
Region y0_x5]
D2[Calculate Risk
Region y1_x8]
D3[Calculate Risk
Region y2_x12]
DN[Calculate Risk
Region yM_xN]
end
subgraph "Storage"
E1[Commit to Icechunk
Region y0_x5]
E2[Commit to Icechunk
Region y1_x8]
E3[Commit to Icechunk
Region y2_x12]
EN[Commit to Icechunk
Region yM_xN]
end
subgraph "Output"
G[Icechunk Store
Raster Data]
H[GeoParquet Files
Vector Data]
end
A --> B
B --> C1 & C2 & C3 & CN
C1 --> D1 --> E1
C2 --> D2 --> E2
C3 --> D3 --> E3
CN --> DN --> EN
E1 & E2 & E3 & EN --> G
E1 & E2 & E3 & EN --> H
style A fill:#e1f5ff
style B fill:#fff4e1
style G fill:#ffe1e1
style H fill:#ffe1e1
This means that we can:
- Create a template zarr store for the entire CONUS dataset
- Each worker processes its assigned region independently and writes its own data directly to Icechunk
- Because our chunking system ensures that chunks are aligned with the CONUS geography, we can use xarray's
to_zarr(region='auto')method to automatically determine the region for each chunk - Each worker creates its own commit for the region it processed, enabling parallel writes without conflicts
Implementation Pattern¶
Below is an example of how this might look in practice:
def process_region(region_id: str, config: OCRConfig):
"""Process a single region independently"""
# 1. Get spatial bounds for this region
y_slice, x_slice = config.chunking.region_id_to_latlon_slices(region_id)
# 2. Calculate wind-adjusted risk for this region
ds = calculate_wind_adjusted_risk(y_slice=y_slice, x_slice=x_slice)
# 3. Each worker writes its own data to Icechunk with automatic region detection
config.icechunk.insert_region_uncooperative(ds, region_id=region_id)
# 4. Sample buildings and write vector data
buildings_gdf = sample_risk_to_buildings(ds=ds)
buildings_gdf.to_parquet(f'{region_id}.parquet')
return region_id
# Execute in parallel across all valid regions
# Each worker independently processes and writes its region
results = []
for region_id in config.chunking.valid_region_ids:
result = process_region(region_id, config)
results.append(result)
Key Features¶
| Feature | Description |
|---|---|
| Independence | Each region processes without dependencies on others |
| Parallel Writes | Each worker writes its own region data directly to Icechunk |
| Idempotency | Regions can be reprocessed safely; commits track completion |
| Scalability | Add more workers to process more regions simultaneously |
| Progress Tracking | Each worker's commit logs the completed region for restart capability |
| Error Isolation | Failed regions don't affect successful ones |
Fault Tolerance¶
Restart Capability¶
The system tracks processed regions via Icechunk commit messages:
# Check which regions are already processed
processed = config.icechunk.processed_regions()
# Filter to unprocessed regions only
status = config.select_region_ids(
region_ids=all_valid_regions,
all_region_ids=True
)
to_process = status.unprocessed_valid_region_ids
# Resume processing from where it left off
for region_id in to_process:
process_region(region_id, config)
Error Handling¶
flowchart TD
A[Start Processing Region] --> B{Region Valid?}
B -->|No| C[Skip]
B -->|Yes| D{Already Processed?}
D -->|Yes| E[Skip]
D -->|No| F[Process Region]
F --> G{Success?}
G -->|Yes| H[Commit to Store]
G -->|No| I[Log Error]
I --> J[Continue to Next Region]
H --> K[Update Progress]
style C fill:#ffd4d4
style E fill:#fff4d4
style H fill:#d4ffd4
style I fill:#ffd4d4
Benefits:
- Partial completion: Successfully processed regions persist even if others fail
- Retry logic: Failed regions can be identified and reprocessed
- No cascading failures: One region's error doesn't affect others
Best Practices¶
Region Selection¶
- Validate region IDs: Always check against
valid_region_idsbefore processing - Check processed status: Skip already-completed regions to avoid duplicate work
- Use spatial queries: When debugging, target specific geographic areas
Processing Configuration¶
- Match chunk size to memory: Ensure workers have sufficient RAM for chunk size
- Monitor progress: Track commit messages to verify completion
- Handle edge cases: Smaller chunks near CONUS boundaries may need special handling
Debugging Workflow¶
- Visualize target area: Use
visualize_chunks_on_conus()to identify relevant chunks - Process subset: Test on 2-3 chunks before full CONUS run
- Verify outputs: Check both Icechunk commits and GeoParquet files
Related Documentation¶
- Data Schema: Structure of output datasets
- Deployment: Infrastructure for distributed processing