Database Sharding Strategies: A Practical Guide with PostgreSQL and Cloud Spanner

25 min read
By Atul Shukla

As your application grows from thousands to millions of users, you'll eventually hit a wall where vertical scaling (adding more CPU, RAM, disk) becomes prohibitively expensive or simply impossible. This is where database sharding comes in—the practice of horizontally partitioning your data across multiple database instances.

In this guide, I'll share practical insights from implementing sharding strategies in production systems using PostgreSQL and Google Cloud Spanner, including the pitfalls to avoid and when each approach makes sense.

What is Database Sharding?

Sharding is a database architecture pattern where you split your data across multiple database instances (shards) based on a sharding key. Each shard contains a subset of the total data, allowing you to distribute both the storage and query load.

Think of it like this: instead of one massive library containing all books, you create multiple smaller libraries, each responsible for books from certain authors (alphabetically split).

  • Shard: An individual database instance holding a subset of data
  • Sharding Key: The column(s) used to determine which shard holds a particular row
  • Shard Map: A lookup table or algorithm that maps sharding keys to specific shards

When Do You Actually Need Sharding?

Before diving into implementation, let's be honest: sharding adds significant complexity. You should only consider it when:

  • Your dataset exceeds single-server capacity (typically 1TB+ for PostgreSQL)
  • Query throughput exceeds what read replicas can handle (100K+ QPS)
  • You need geographic data distribution for latency requirements
  • Vertical scaling costs become unreasonable (diminishing returns on larger instances)

What to Try First

Before sharding, exhaust these options:

  • Vertical scaling: Upgrade to larger instances
  • Read replicas: Offload read queries
  • Caching: Redis/Memcached for hot data
  • Query optimization: Proper indexing, query tuning
  • Table partitioning: PostgreSQL native partitioning (within a single instance)

Sharding Strategies Explained

1. Range-Based Sharding

Data is distributed based on ranges of the sharding key:

Shard 1: user_id 1 - 1,000,000
Shard 2: user_id 1,000,001 - 2,000,000
Shard 3: user_id 2,000,001 - 3,000,000

Pros: Simple to understand and implement, range queries are efficient, easy to add new shards for new ranges

Cons: Hotspots if data isn't evenly distributed, requires careful monitoring of shard sizes

Best for: Time-series data, monotonically increasing IDs

2. Hash-Based Sharding

A hash function determines which shard stores the data:

shard_id = hash(user_id) % number_of_shards

Pros: Even distribution of data, no hotspot issues, predictable shard assignment

Cons: Range queries require hitting multiple shards, resharding is painful

Best for: User data, randomly distributed keys

3. Geographic/Directory-Based Sharding

Data is sharded based on geographic location:

US-East shard: users in East Coast
US-West shard: users in West Coast
EU shard: users in Europe

Best for: Multi-tenant SaaS, global applications with data residency needs

PostgreSQL Sharding Implementation

Application-Level Sharding Example

The application logic determines which database to query:

class ShardedDatabase:
    def __init__(self):
        self.shards = {
            0: psycopg2.connect("host=shard0.db user=app"),
            1: psycopg2.connect("host=shard1.db user=app"),
            2: psycopg2.connect("host=shard2.db user=app"),
            3: psycopg2.connect("host=shard3.db user=app"),
        }
        self.num_shards = len(self.shards)
    
    def get_shard(self, user_id):
        """Hash-based sharding"""
        shard_id = hash(user_id) % self.num_shards
        return self.shards[shard_id]
    
    def get_user(self, user_id):
        conn = self.get_shard(user_id)
        cursor = conn.cursor()
        cursor.execute(
            "SELECT * FROM users WHERE user_id = %s", 
            (user_id,)
        )
        return cursor.fetchone()
    
    def get_users_by_email(self, prefix):
        """Cross-shard query - hits all shards"""
        results = []
        for shard_conn in self.shards.values():
            cursor = shard_conn.cursor()
            cursor.execute(
                "SELECT * FROM users WHERE email LIKE %s",
                (f"{prefix}%",)
            )
            results.extend(cursor.fetchall())
        return results

Foreign Data Wrappers (FDW)

PostgreSQL can query remote databases:

-- Create foreign server connections
CREATE SERVER shard1
    FOREIGN DATA WRAPPER postgres_fdw
    OPTIONS (host 'shard1.db', port '5432', dbname 'users');

CREATE SERVER shard2
    FOREIGN DATA WRAPPER postgres_fdw
    OPTIONS (host 'shard2.db', port '5432', dbname 'users');

-- Create foreign tables
CREATE FOREIGN TABLE users_shard1 (
    user_id BIGINT,
    email VARCHAR(255),
    created_at TIMESTAMP
) SERVER shard1 OPTIONS (table_name 'users');

-- Create a view that unions all shards
CREATE VIEW users_all AS
    SELECT * FROM users_shard1
    UNION ALL
    SELECT * FROM users_shard2;

Citus Extension

Citus transforms PostgreSQL into a distributed database:

-- Install Citus extension
CREATE EXTENSION citus;

-- Add worker nodes
SELECT * from citus_add_node('worker1.db', 5432);
SELECT * from citus_add_node('worker2.db', 5432);

-- Create and distribute a table
CREATE TABLE users (
    user_id BIGINT PRIMARY KEY,
    email VARCHAR(255),
    created_at TIMESTAMP
);

-- Distribute by user_id
SELECT create_distributed_table('users', 'user_id');

-- Queries work transparently
SELECT * FROM users WHERE user_id = 12345;

Cloud Spanner's Approach

Google Cloud Spanner handles sharding automatically. Table creation with interleaved tables:

-- Parent table
CREATE TABLE Users (
    UserId INT64 NOT NULL,
    Email STRING(255),
    Username STRING(50),
    CreatedAt TIMESTAMP,
) PRIMARY KEY (UserId);

-- Child table with co-location
CREATE TABLE UserOrders (
    UserId INT64 NOT NULL,
    OrderId INT64 NOT NULL,
    OrderData JSON,
    CreatedAt TIMESTAMP,
) PRIMARY KEY (UserId, OrderId),
  INTERLEAVE IN PARENT Users ON DELETE CASCADE;

Primary Key Design in Spanner

Bad design (causes hotspots):

PRIMARY KEY (UserId)  -- Auto-incrementing
-- All writes go to the last split!

Good design (distributes load):

PRIMARY KEY (UserUuid)  -- UUID v4

Best design (optimizes both):

PRIMARY KEY (TenantId, UserId)
-- Even distribution + efficient queries

Spanner Python Example

from google.cloud import spanner

spanner_client = spanner.Client(project='your-project')
instance = spanner_client.instance('your-instance')
database = instance.database('your-database')

def insert_user(user_id, email, username):
    def insert_txn(transaction):
        transaction.insert(
            table='Users',
            columns=['UserId', 'Email', 'Username', 'CreatedAt'],
            values=[
                (user_id, email, username, 
                 spanner.COMMIT_TIMESTAMP)
            ]
        )
    database.run_in_transaction(insert_txn)

def get_user_with_orders(user_id):
    """Leverages interleaved tables"""
    with database.snapshot() as snapshot:
        results = snapshot.execute_sql(
            """
            SELECT u.UserId, u.Email, u.Username,
                   ARRAY_AGG(o.OrderId) as Orders
            FROM Users u
            LEFT JOIN UserOrders o ON u.UserId = o.UserId
            WHERE u.UserId = @user_id
            GROUP BY u.UserId, u.Email, u.Username
            """,
            params={'user_id': user_id},
            param_types={'user_id': spanner.param_types.INT64}
        )
        return list(results)

Performance Comparison

Based on production experience with 50M users:

PostgreSQL (4 shards, hash-based)

  • Single-key read: 2-5ms latency, 50K QPS
  • Cross-shard query: 50-200ms latency, 2K QPS
  • Single-shard write: 3-8ms latency, 20K QPS
  • Distributed transactions: Not supported

Cloud Spanner (5 nodes, US multi-region)

  • Single-key read: 3-7ms latency, 40K QPS per node
  • Secondary index query: 10-30ms latency, 15K QPS per node
  • Write with index: 8-15ms latency, 10K QPS per node
  • Distributed transaction: 20-50ms latency, 5K QPS per node

Consistent Hashing for Resharding

Minimize data movement when adding shards:

import hashlib

class ConsistentHashing:
    def __init__(self, num_shards, virtual_nodes=150):
        self.ring = {}
        self.sorted_keys = []
        
        for shard_id in range(num_shards):
            for vnode in range(virtual_nodes):
                key = f"shard{shard_id}-vnode{vnode}"
                hash_val = int(
                    hashlib.md5(key.encode()).hexdigest(), 
                    16
                )
                self.ring[hash_val] = shard_id
        
        self.sorted_keys = sorted(self.ring.keys())
    
    def get_shard(self, key):
        hash_val = int(
            hashlib.md5(str(key).encode()).hexdigest(),
            16
        )
        
        # Find first node >= hash_val
        for ring_key in self.sorted_keys:
            if hash_val <= ring_key:
                return self.ring[ring_key]
        
        # Wrap around
        return self.ring[self.sorted_keys[0]]

Monitoring Shard Health

from prometheus_client import Gauge, Histogram

shard_size = Gauge(
    'db_shard_size_bytes',
    'Shard size in bytes',
    ['shard_id']
)

shard_query_latency = Histogram(
    'db_shard_query_duration_seconds',
    'Query latency per shard',
    ['shard_id', 'query_type']
)

def check_shard_balance():
    sizes = [get_shard_size(i) for i in range(num_shards)]
    max_size = max(sizes)
    min_size = min(sizes)
    
    ratio = max_size / min_size if min_size > 0 else float('inf')
    
    if ratio > 1.5:  # 50% imbalance
        alert("Shard imbalance detected", {
            'max_size': max_size,
            'min_size': min_size,
            'ratio': ratio
        })

Decision Framework

Choose PostgreSQL sharding when:

  • Budget is constrained
  • You have strong PostgreSQL expertise in-house
  • Your query patterns are predictable and single-shard-friendly
  • You can tolerate eventual consistency
  • You need maximum control over infrastructure

Choose Cloud Spanner when:

  • You need global distribution with strong consistency
  • Your team is small and operational overhead is a concern
  • Budget allows for managed service premium
  • You need distributed transactions
  • Your data model requires complex relationships across regions

Conclusion

Database sharding is a powerful tool for scaling, but it's not a silver bullet. Both PostgreSQL and Cloud Spanner can handle massive scale, but they require different approaches and trade-offs.

Key Takeaways

  • Delay sharding as long as possible—exhaust simpler options first
  • Choose your sharding key carefully—it's nearly impossible to change later
  • Design for single-shard queries—cross-shard operations are expensive
  • Monitor shard balance—hotspots will kill your performance
  • Plan for resharding—you'll eventually need to add capacity
  • Test failure scenarios—shards will fail, your application must handle it

The "best" approach depends entirely on your specific requirements, team expertise, and budget constraints. Start simple, measure everything, and scale when the data tells you it's time.

DatabaseShardingPostgreSQLCloud SpannerDistributed SystemsArchitecturePerformanceScalability

Enjoyed this article?

Share it with your network!

Database Sharding Strategies: A Practical Guide with PostgreSQL and Cloud Spanner | Atul Shukla