Idempotency Keys in Spatial ETL
Spatial extract-transform-load (ETL) pipelines operate in inherently unstable environments: network partitions during large GeoJSON downloads, intermittent PostGIS connection drops, and partial raster tile writes are routine. Without deterministic safeguards, retrying a failed spatial task often results in duplicate features, corrupted topology, or orphaned geometry columns. Idempotency keys solve this by guaranteeing that repeated executions of the same operation produce identical side effects, regardless of how many times the orchestrator triggers the run.
Implementing Idempotency Keys in Spatial ETL requires a shift from stateless task execution to state-aware, transactionally bounded workflows. As part of a broader strategy for Resilience & Failure Handling for GIS Pipelines, this guide outlines a production-grade pattern for orchestrating idempotent geospatial pipelines using Prefect or Dagster, complete with prerequisites, step-by-step implementation, tested code, and error remediation strategies.
Prerequisites
Before deploying idempotent spatial workflows, ensure your platform meets these baseline requirements:
- Orchestrator Environment: Prefect 2.x or Dagster 1.x with persistent state storage (e.g., Prefect Cloud, Dagster Daemon, or self-hosted PostgreSQL backend).
- Spatial Processing Stack:
geopandas>=0.13,shapely>=2.0,psycopg2orasyncpgfor PostGIS, andrasterioif handling gridded data. - Key-Value or Relational Store: Redis, PostgreSQL, or DynamoDB to persist idempotency keys with TTL and atomic upsert capabilities.
- Deterministic Input Fingerprinting: Ability to generate stable hashes from source metadata (file paths, checksums, CRS, bounding boxes, or API query parameters).
- Transaction-Aware Database: PostGIS or SpatiaLite configured with explicit transaction boundaries and constraint enforcement.
The Five-Phase Idempotent Workflow
The idempotent spatial ETL workflow follows a strict five-phase pattern designed to survive retries, network blips, and orchestrator restarts.
Phase 1: Deterministic Key Generation
Extract immutable attributes from the input payload and compute a cryptographic hash. For vector data, combine the source URI, file modification timestamp, SHA-256 checksum, and target CRS. For API-driven spatial queries, hash the endpoint, query parameters, and pagination token. The resulting string becomes your idempotency key.
When working with legacy formats or complex multipart archives, attribute normalization becomes critical. See Generating idempotent keys for shapefile uploads for strategies that handle .shp/.shx/.dbf triplet synchronization and metadata drift. Python’s standard library provides reliable hashing primitives; refer to the official hashlib documentation for secure digest algorithms and hex encoding patterns suitable for pipeline state tracking.
Phase 2: State Verification & Concurrency Control
Before any spatial transformation begins, query the key store. If the key exists and maps to a COMPLETED state, skip execution and return the cached result. If the key maps to IN_PROGRESS, implement a short polling loop or distributed lock to prevent concurrent duplicate runs. If absent, proceed.
Concurrency control must account for orchestrator retries. When a worker crashes mid-execution, the key remains IN_PROGRESS until a TTL expires or a watchdog process resets it. Pair this state machine with Exponential Backoff for API Rate Limits to avoid thundering herd scenarios when multiple workers simultaneously query the same spatial endpoint or attempt to acquire the same distributed lock.
Phase 3: Atomic Spatial Execution
Wrap the ETL operation in a database transaction. Perform geometry validation, CRS transformation, and feature insertion. Do not commit the idempotency key state until the spatial transaction succeeds. Spatial operations are particularly prone to partial writes: a network timeout during a bulk COPY or INSERT can leave a table with malformed polygons or missing spatial indexes.
Enforce strict transaction isolation levels (READ COMMITTED or SERIALIZABLE depending on concurrent write volume) and rely on database-level atomicity rather than application-level state flags. PostgreSQL’s transaction model guarantees that either all spatial mutations commit or none do, preventing topology corruption. Review PostgreSQL transaction documentation to understand how BEGIN, COMMIT, and ROLLBACK interact with spatial extensions and connection pooling.
Phase 4: Transactional State Commit
Once the spatial transaction commits successfully, atomically upsert the idempotency key to COMPLETED alongside a result pointer (e.g., affected row count, output table name, or checksum). Use INSERT ... ON CONFLICT DO UPDATE or Redis SETNX with a TTL to ensure race conditions between parallel workers do not overwrite successful states. The key store and spatial database should ideally share a transactional boundary or use a two-phase commit pattern if cross-system consistency is required.
Phase 5: Failure Routing & Remediation
If the spatial transaction fails due to constraint violations, invalid geometries, or resource exhaustion, roll back the database changes and transition the idempotency key to FAILED. Do not silently retry indefinitely. Hard failures—such as topology errors from self-intersecting polygons, missing CRS definitions, or permission denials—require human intervention or automated repair workflows. Route these payloads to Dead-Letter Queues for Failed Geotasks where they can be inspected, repaired with ST_MakeValid(), and reprocessed under a new idempotency key derived from the corrected payload.
Production Implementation Pattern
The following Python implementation demonstrates a framework-agnostic, transaction-safe idempotency controller for PostGIS-backed spatial ETL. It uses explicit transaction management, atomic key upserts, and deterministic hashing.
import hashlib
import json
import logging
from contextlib import contextmanager
from typing import Any, Dict, Optional
import psycopg2
from psycopg2.extras import execute_values
logger = logging.getLogger(__name__)
class SpatialIdempotencyController:
def __init__(self, dsn: str, key_ttl_hours: int = 24):
self.dsn = dsn
self.key_ttl_hours = key_ttl_hours
@staticmethod
def generate_key(source_uri: str, metadata: Dict[str, Any]) -> str:
"""Deterministic SHA-256 key from URI + normalized metadata."""
payload = json.dumps(
{"uri": source_uri, "meta": dict(sorted(metadata.items()))},
sort_keys=True
)
return hashlib.sha256(payload.encode("utf-8")).hexdigest()
@contextmanager
def _db_connection(self):
conn = psycopg2.connect(self.dsn)
try:
yield conn
finally:
conn.close()
def check_and_acquire(self, key: str) -> bool:
"""Returns True if key is new or expired. False if already COMPLETED."""
with self._db_connection() as conn:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO etl_idempotency_keys (key, status, created_at, expires_at)
VALUES (%s, 'IN_PROGRESS', NOW(), NOW() + INTERVAL '%s hours')
ON CONFLICT (key) DO UPDATE SET
status = CASE
WHEN EXCLUDED.status = 'IN_PROGRESS' THEN 'IN_PROGRESS'
WHEN etl_idempotency_keys.status = 'COMPLETED' THEN 'COMPLETED'
WHEN etl_idempotency_keys.expires_at < NOW() THEN 'IN_PROGRESS'
ELSE etl_idempotency_keys.status
END
RETURNING status;
""",
(key, self.key_ttl_hours)
)
status = cur.fetchone()[0]
conn.commit()
return status == "IN_PROGRESS"
def commit_success(self, key: str):
with self._db_connection() as conn:
with conn.cursor() as cur:
cur.execute(
"UPDATE etl_idempotency_keys SET status = 'COMPLETED' WHERE key = %s",
(key,)
)
conn.commit()
def mark_failed(self, key: str):
with self._db_connection() as conn:
with conn.cursor() as cur:
cur.execute(
"UPDATE etl_idempotency_keys SET status = 'FAILED' WHERE key = %s",
(key,)
)
conn.commit()
def execute_idempotent_spatial_load(
self,
source_uri: str,
metadata: Dict[str, Any],
spatial_records: list
) -> Dict[str, Any]:
key = self.generate_key(source_uri, metadata)
if not self.check_and_acquire(key):
logger.info("Idempotency key %s already processed. Skipping.", key)
return {"status": "skipped", "key": key}
try:
with self._db_connection() as conn:
try:
with conn.cursor() as cur:
# Begin explicit transaction
cur.execute("BEGIN")
# Example: Bulk insert with PostGIS geometry casting
# Assumes spatial_records is a list of dicts with 'geom' and 'attributes'
values = [(r["geom"], r.get("name"), r.get("crs")) for r in spatial_records]
execute_values(
cur,
"""
INSERT INTO spatial_features (geom, name, crs, source_key)
VALUES %s
ON CONFLICT (source_key) DO NOTHING
""",
values,
template="(%s::geometry, %s, %s, %s)"
)
# Commit spatial transaction
conn.commit()
except Exception:
# Roll back the spatial transaction while the connection is still open
conn.rollback()
raise
# Only mark key as COMPLETED after spatial commit succeeds
self.commit_success(key)
return {"status": "completed", "key": key, "records": len(spatial_records)}
except Exception as e:
self.mark_failed(key)
logger.error("Spatial ETL failed for key %s: %s", key, str(e))
raise
Key Reliability Considerations
- Atomic Upserts: The
ON CONFLICTclause ensures that concurrent workers cannot duplicate state transitions. Expired keys are automatically reclaimed via TTL logic. - Transaction Isolation: Spatial inserts and key state updates are decoupled but sequenced. The key only transitions to
COMPLETEDafterconn.commit()succeeds. - Geometry Validation: In production, wrap
execute_valueswithST_IsValid()checks or pre-validate withshapely.validation.make_valid()to prevent silent topology corruption. - Orchestrator Integration: Prefect and Dagster both support retry policies. Configure
max_retries=3with jittered delays. The idempotency controller ensures that retries are safe and do not mutate the database twice.
Operational Best Practices
- Key Granularity: Avoid overly broad keys (e.g., hashing only a directory path). Include query parameters, CRS, and timestamp windows to prevent stale cache hits when source data updates.
- Storage Backend Choice: PostgreSQL is ideal for spatial ETL because it natively supports both the idempotency table and the spatial workload. Redis offers lower latency for high-throughput pipelines but requires careful TTL management to avoid orphaned
IN_PROGRESSstates. - Monitoring & Alerting: Track
FAILEDkey ratios and transaction rollback rates. Sudden spikes often indicate upstream schema changes, CRS mismatches, or storage quota exhaustion. - Testing Idempotency: Run synthetic failure injections (e.g.,
pg_terminate_backend()mid-insert, network latency simulation, or orchestrator kill signals) to verify that retries produce zero duplicate geometries and consistent row counts.
By anchoring spatial transformations to deterministic keys and strict transaction boundaries, GIS data engineers can eliminate duplicate features, guarantee topology integrity, and build pipelines that recover gracefully from partial failures. Idempotency is not merely a retry safeguard—it is the foundation of reliable, production-grade geospatial infrastructure.