Building Scalable Integrations - Lessons From 40+ Partner APIs

April 21, 2026

Hard-won lessons from building and maintaining 40+ production integrations including Guesty, Airbnb, Booking.com, CloudBeds, and more. This is a deep dive into the architectural patterns, failure modes, and battle-tested strategies for building integration systems that actually work at scale.


Table of Contents

  1. Introduction
  2. Architecture Overview
  3. The Driver Pattern
  4. Distributed Task Processing
  5. Rate Limiting Strategies
  6. Queue Management & Backpressure
  7. Error Handling & Retries
  8. Authentication Patterns
  9. API Changes & Versioning
  10. Monitoring & Observability
  11. Cascading Requests
  12. Memory Management & Batching
  13. Data Consistency
  14. Testing Strategies
  15. Lessons Learned

Introduction

After building 40+ integrations with third-party APIs (property management systems, booking platforms, smart locks, etc.), I’ve learned that integration engineering is fundamentally different from traditional backend development. You’re operating in a hostile environment where:

  • You don’t control the API - rate limits, outages, and breaking changes happen without warning
  • Each partner is unique - OAuth flows, pagination, error formats, and data models vary wildly
  • Failure is the norm - networks fail, APIs timeout, data is inconsistent
  • Scale amplifies problems - what works for 5 partners breaks at 40+

This post documents the patterns, strategies, and hard lessons learned from building a production integration system that processes millions of API calls daily.


Architecture Overview

System Topology

┌─────────────────────────────────────────────────────────────────┐
│                        APPLICATION LAYER                         │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐         │
│  │   Web API    │  │  Admin UI    │  │   Webhooks   │         │
│  └──────┬───────┘  └──────┬───────┘  └──────┬───────┘         │
└─────────┼──────────────────┼──────────────────┼─────────────────┘
          │                  │                  │
          └──────────────────┴──────────────────┘
                             │
┌────────────────────────────┼─────────────────────────────────────┐
│                   INTEGRATION ORCHESTRATION                       │
│                            │                                      │
│   ┌────────────────────────▼────────────────────────┐           │
│   │         Task Scheduler (Celery Beat)            │           │
│   │  • Periodic imports (properties, reservations)  │           │
│   │  • Health checks                                │           │
│   └────────────────────┬────────────────────────────┘           │
│                        │                                         │
│   ┌────────────────────▼────────────────────────────┐           │
│   │           Message Queue (RabbitMQ/Redis)        │           │
│   │                                                  │           │
│   │  ┌─────────────┐  ┌─────────────┐ ┌──────────┐ │           │
│   │  │ properties  │  │ reservations│ │  photos  │ │           │
│   │  │   queue     │  │   queue     │ │  queue   │ │           │
│   │  └─────────────┘  └─────────────┘ └──────────┘ │           │
│   └─────────────────────────────────────────────────┘           │
└─────────────────────────┬───────────────────────────────────────┘
                          │
          ┌───────────────┴───────────────┬───────────────┐
          │                               │               │
┌─────────▼──────────┐     ┌──────────────▼────┐  ┌──────▼──────┐
│  Worker Pool 1     │     │  Worker Pool 2    │  │ Worker N    │
│  ┌──────────────┐  │     │  ┌──────────────┐ │  │ ┌─────────┐ │
│  │   Driver     │  │     │  │   Driver     │ │  │ │ Driver  │ │
│  │   Factory    │  │     │  │   Factory    │ │  │ │ Factory │ │
│  └──────┬───────┘  │     │  └──────┬───────┘ │  │ └────┬────┘ │
│         │          │     │         │         │  │      │      │
│  ┌──────▼───────┐  │     │  ┌──────▼───────┐ │  │ ┌────▼────┐ │
│  │ API Client   │  │     │  │ API Client   │ │  │ │  API    │ │
│  │ + Adapter    │  │     │  │ + Adapter    │ │  │ │ Client  │ │
│  └──────────────┘  │     │  └──────────────┘ │  │ └─────────┘ │
└─────────┬──────────┘     └──────────┬────────┘  └──────┬──────┘
          │                           │                   │
          └───────────────┬───────────┴───────────────────┘
                          │
          ┌───────────────┴─────────────┬──────────────┐
          │                             │              │
┌─────────▼────────┐   ┌────────────────▼──┐   ┌──────▼──────┐
│   Guesty API     │   │  Airbnb API       │   │  Partner N  │
│  Rate Limit:     │   │  Rate Limit:      │   │  Rate Limit:│
│  600 req/min     │   │  200 req/10min    │   │  Varies     │
└──────────────────┘   └───────────────────┘   └─────────────┘

Key Design Principles

  1. Isolation - Each integration is a separate driver with its own error handling
  2. Async by default - All I/O operations are non-blocking via Celery tasks
  3. Idempotency - Tasks can be safely retried without side effects
  4. Observable - Comprehensive logging and metrics at every layer

The Driver Pattern

The most important architectural decision was implementing a driver pattern that abstracts partner-specific logic while providing a consistent interface.

Driver Architecture

┌─────────────────────────────────────────────────────────────┐
│                      Abstract Driver                         │
│                                                              │
│  + get_homes()      : PropertyImportIds                     │
│  + get_reservations(): List[Reservation]                    │
│  + push_status()    : bool                                  │
│  + authenticate()   : Token                                 │
└────────────────────┬────────────────────────────────────────┘
                     │
        ┌────────────┴────────────┬─────────────┐
        │                         │             │
┌───────▼─────────┐    ┌──────────▼─────┐  ┌───▼──────────┐
│ GuestyDriver    │    │ AirbnbDriver   │  │ CustomDriver │
│                 │    │                │  │              │
│ - api: GuestyAPI│    │ - api: ...     │  │ - api: ...   │
│ - adapter       │    │ - adapter      │  │ - adapter    │
│ - processor     │    │ - processor    │  │ - processor  │
└───────┬─────────┘    └──────┬─────────┘  └───┬──────────┘
        │                     │                 │
        │ Uses                │                 │
        ▼                     ▼                 ▼
┌────────────────┐    ┌────────────────┐   ┌──────────────┐
│  GuestyAdapter │    │ AirbnbAdapter  │   │ ... Adapter  │
│                │    │                │   │              │
│ normalize()    │    │ normalize()    │   │ normalize()  │
│ validate()     │    │ validate()     │   │ validate()   │
└────────────────┘    └────────────────┘   └──────────────┘

Why This Pattern Works

Separation of Concerns:

  • Driver - Orchestration and business logic
  • API Client - HTTP communication, rate limiting, auth
  • Adapter - Data transformation and normalization
  • Processor - Domain-specific operations

Example: Fetching Properties

# High-level task
@celery_task
def import_properties(integration_id: int):
    driver = DriverFactory.get_driver(integration_id)
    property_ids = driver.get_homes()

    # Schedule dependent tasks
    for home_id in property_ids.homes_added_ids:
        import_photos.delay(integration_id, home_id)

The driver handles all partner-specific complexity internally:

User calls: driver.get_homes()
     │
     ├─> driver.api.authenticate()         # Partner-specific auth
     │
     ├─> driver.api.fetch_all('listings')  # Paginated API calls
     │       │
     │       ├─> check_rate_limit()        # Partner-specific limits
     │       ├─> retry_with_backoff()      # Handle transient failures
     │       └─> parse_response()
     │
     ├─> driver.adapter.normalize(raw)     # Transform to standard format
     │
     ├─> driver.processor.validate()       # Business logic validation
     │
     └─> driver.processor.save()           # Persist to database

Distributed Task Processing

Celery Task Architecture

We use Celery with RabbitMQ for distributed task processing. The key insight: task granularity matters.

Task Hierarchy

┌────────────────────────────────────────────────────────────┐
│  import_all_properties() - Master Task (runs every 4h)    │
│                                                            │
│  Scans all active integrations                            │
└────────────────────┬───────────────────────────────────────┘
                     │
         ┌───────────┴───────────────────┬──────────┐
         │                               │          │
┌────────▼────────────┐    ┌─────────────▼──┐  ┌───▼───────┐
│import_properties_   │    │import_properties│  │  ...      │
│for_company(123)     │    │_for_company(456)│  │           │
│                     │    │                 │  │           │
│ • Company-level     │    │                 │  │           │
│ • Can fail         │    │                 │  │           │
│   independently    │    │                 │  │           │
└────────┬────────────┘    └─────────┬───────┘  └───────────┘
         │                           │
    ┌────┴────┬─────────┐      ┌────┴─────┬──────┐
    │         │         │      │          │      │
┌───▼───┐ ┌──▼───┐ ┌───▼──┐ ┌─▼───┐ ┌────▼─┐ ┌──▼──┐
│Guesty │ │Airbnb│ │Track │ │...  │ │...   │ │ ... │
│import │ │import│ │import│ │     │ │      │ │     │
└───┬───┘ └──┬───┘ └───┬──┘ └─────┘ └──────┘ └─────┘
    │        │         │
    └────────┴─────┬───┘
                   │
         ┌─────────┴──────────┬──────────┐
         │                    │          │
┌────────▼────────┐  ┌────────▼──────┐ ┌▼──────────┐
│import_photos    │  │import_reserv. │ │update_... │
│(integration,    │  │(integration,  │ │           │
│ property_123)   │  │ property_123) │ │           │
└─────────────────┘  └───────────────┘ └───────────┘

Task Design Principles

1. Fine-grained tasks for parallelism

# BAD: Single monolithic task
@task
def import_everything(company_id):
    for integration in get_integrations(company_id):
        properties = api.get_properties()
        for prop in properties:
            photos = api.get_photos(prop.id)
            reservations = api.get_reservations(prop.id)
            # ... more work
    # Takes 30+ minutes, blocks worker

# GOOD: Decomposed tasks
@task
def import_company_properties(company_id):
    for integration in get_integrations(company_id):
        import_integration_properties.delay(integration.id)

@task
def import_integration_properties(integration_id):
    driver = get_driver(integration_id)
    props = driver.get_homes()
    for prop_id in props.homes_added_ids:
        import_property_details.delay(integration_id, prop_id)

2. Independent task failure

Company A fails ──┐
                  │
Company B succeeds├─> Other companies unaffected
                  │
Company C succeeds┘

3. Task timeouts and retries

@celery_task(
    max_retries=3,
    default_retry_delay=120,  # 2 minutes
    soft_time_limit=300,      # 5 minutes
    time_limit=360,           # 6 minutes (hard limit)
)
def import_properties(integration_id):
    try:
        # ... work
    except RateLimitError as e:
        # Exponential backoff
        raise self.retry(exc=e, countdown=min(2 ** self.request.retries * 60, 1800))

Rate Limiting Strategies

This is where theory meets brutal reality. Every API has different rate limits, and violating them can get you temporarily banned.

Common Rate Limit Patterns

┌────────────────────────────────────────────────────────────────┐
│                    Rate Limit Patterns                         │
├────────────────────────────────────────────────────────────────┤
│                                                                │
│ 1. Fixed Window                                               │
│    ┌────────┬────────┬────────┬────────┐                     │
│    │ 100req │ 100req │ 100req │ 100req │                     │
│    └────────┴────────┴────────┴────────┘                     │
│    0s      60s     120s     180s                              │
│    Example: "100 requests per minute"                         │
│    Issue: Burst at window boundary                            │
│                                                                │
│ 2. Sliding Window                                             │
│    ┌──────────────────────────────────┐                      │
│    │      Last 60 seconds = 100 req   │                      │
│    └──────────────────────────────────┘                      │
│         ▲ moves continuously                                  │
│    Better: No boundary burst                                  │
│                                                                │
│ 3. Token Bucket                                               │
│    Bucket: [●●●○○] (3 tokens available)                      │
│    Refill: +1 token every 600ms                              │
│    Burst: Up to bucket size                                   │
│                                                                │
│ 4. Multi-tier Limits                                          │
│    Per second:  10 requests  ─┐                              │
│    Per minute:  100 requests  ├─ All must be satisfied       │
│    Per hour:    5000 requests ┘                              │
└────────────────────────────────────────────────────────────────┘

Strategy 1: Header-based Rate Limiting

Many APIs return rate limit info in response headers:

def check_rate_limit(self, headers):
    """
    Headers example:
    X-RateLimit-Remaining-Second: 8
    X-RateLimit-Remaining-Minute: 95
    """

    remaining_per_second = int(headers.get('X-RateLimit-Remaining-Second', 999))
    remaining_per_minute = int(headers.get('X-RateLimit-Remaining-Minute', 999))

    # Proactive throttling - don't wait until we hit 0
    if remaining_per_second < 5:
        time.sleep(1)  # Wait for window to reset

    if remaining_per_minute < 100:
        time.sleep(1)  # Slow down

Lesson learned: Be conservative. If the limit says “600/minute”, treat it as 500/minute to account for clock skew and concurrent workers.

Strategy 2: Celery Rate Limiting

# Limit task execution rate
@celery_task(rate_limit='10/m')  # 10 tasks per minute
def call_partner_api(integration_id):
    # This task won't execute more than 10 times/min
    pass

Problem: This limits task execution, not API calls. One task might make 10 API calls.

Better approach: Separate queue per partner with concurrency control:

# celeryconfig.py
task_routes = {
    'integrations.guesty.*': {'queue': 'guesty'},
    'integrations.airbnb.*': {'queue': 'airbnb'},
}

# Start workers with concurrency limits
# celery -A app worker -Q guesty -c 2  # Max 2 concurrent guesty tasks
# celery -A app worker -Q airbnb -c 1  # Max 1 concurrent airbnb task

Strategy 3: Distributed Rate Limiting (Redis)

For APIs with strict limits across all servers:

import redis
import time

class DistributedRateLimiter:
    def __init__(self, key, max_requests, window_seconds):
        self.redis = redis.Redis()
        self.key = f"ratelimit:{key}"
        self.max_requests = max_requests
        self.window = window_seconds

    def allow_request(self):
        now = time.time()
        window_start = now - self.window

        pipe = self.redis.pipeline()
        # Remove old entries
        pipe.zremrangebyscore(self.key, 0, window_start)
        # Count requests in window
        pipe.zcard(self.key)
        # Add current request
        pipe.zadd(self.key, {str(now): now})
        # Set expiry
        pipe.expire(self.key, self.window)

        results = pipe.execute()
        request_count = results[1]

        return request_count < self.max_requests

# Usage
limiter = DistributedRateLimiter('guesty_api', max_requests=600, window_seconds=60)

if limiter.allow_request():
    response = api.call()
else:
    time.sleep(1)  # Wait and retry

Strategy 4: Adaptive Rate Limiting

Learn from 429 responses:

class AdaptiveRateLimiter:
    def __init__(self):
        self.request_interval = 0.1  # Start optimistic
        self.consecutive_429s = 0

    def before_request(self):
        time.sleep(self.request_interval)

    def after_request(self, response):
        if response.status_code == 429:
            self.consecutive_429s += 1
            # Exponential backoff
            self.request_interval *= 1.5

            # Check Retry-After header
            retry_after = response.headers.get('Retry-After')
            if retry_after:
                time.sleep(int(retry_after))
        else:
            self.consecutive_429s = 0
            # Slowly decrease interval (additive decrease)
            self.request_interval = max(0.05, self.request_interval - 0.01)

Queue Management & Backpressure

The Queue Buildup Problem

Normal Operation:
┌───────┐  100/s   ┌───────┐  100/s   ┌─────────┐
│ Tasks │────────>│ Queue │────────>│ Workers │
└───────┘          └───────┘          └─────────┘
                   Size: ~10

Problem: API Slowdown (partner outage, rate limit)
┌───────┐  100/s   ┌──────────────┐  10/s   ┌─────────┐
│ Tasks │────────>│    Queue     │───────>│ Workers │
└───────┘          │              │         └─────────┘
                   │ Size: 10,000 │   ← Buildup!
                   └──────────────┘

Result:
• Queue grows unbounded
• Memory exhaustion
• Old tasks process stale data
• Cascading failures

Solution 1: Task Expiration

@celery_task(expires=300)  # Task expires after 5 minutes
def import_reservations(integration_id, property_id):
    # If this task waits in queue > 5 minutes, discard it
    # Fresh data will be imported in next scheduled run
    pass

Solution 2: Queue Length Monitoring

def should_enqueue_task(queue_name):
    """Check queue depth before adding more tasks"""
    queue_length = celery.app.control.inspect().active_queues()[queue_name]['messages']

    if queue_length > 10000:
        logger.warning(f"Queue {queue_name} backed up: {queue_length} messages")
        return False
    return True

# Usage
if should_enqueue_task('reservations'):
    import_reservations.delay(integration_id, property_id)
else:
    logger.info(f"Skipping task - queue backed up")

Solution 3: Circuit Breaker Pattern

Stop sending tasks when partner API is down:

┌─────────────────────────────────────────────────────────┐
│                  Circuit Breaker States                  │
├─────────────────────────────────────────────────────────┤
│                                                          │
│   CLOSED (Normal)                                       │
│   ┌────────┐                                            │
│   │ Success│  ─────────> Allow requests                 │
│   └────────┘                                            │
│        │                                                 │
│        │ Failure threshold exceeded (5 failures)        │
│        ▼                                                 │
│   ┌────────┐                                            │
│   │  OPEN  │  ─────────> Reject requests (fail fast)   │
│   └────────┘             Return cached/default data     │
│        │                                                 │
│        │ After timeout (60s)                            │
│        ▼                                                 │
│   ┌──────────┐                                          │
│   │ HALF-OPEN│  ───────> Allow limited requests        │
│   └──────────┘           (test if API recovered)        │
│        │                                                 │
│        ├─ Success ────> Go to CLOSED                    │
│        └─ Failure ────> Go to OPEN                      │
└─────────────────────────────────────────────────────────┘

Implementation:

from pybreaker import CircuitBreaker

# Create breaker per partner
guesty_breaker = CircuitBreaker(
    fail_max=5,           # Open after 5 failures
    timeout_duration=60,   # Try again after 60s
    exclude=[requests.HTTPError],  # Don't trip on expected errors
)

@guesty_breaker
def call_guesty_api():
    response = requests.get('https://api.guesty.com/...')
    response.raise_for_status()
    return response.json()

# Usage
try:
    data = call_guesty_api()
except CircuitBreakerError:
    logger.warning("Guesty API circuit breaker open - skipping import")
    return  # Don't queue more tasks

Solution 4: Priority Queues

Not all tasks are equal:

# High priority: User-initiated actions
@celery_task(queue='high_priority')
def sync_single_property(property_id):
    # User clicked "sync now" - should happen immediately
    pass

# Normal priority: Scheduled imports
@celery_task(queue='normal_priority')
def import_properties(integration_id):
    pass

# Low priority: Non-critical updates
@celery_task(queue='low_priority')
def import_photos(property_id):
    # Nice to have but not critical
    pass

Worker configuration:

# Worker processes queues in priority order
celery -A app worker -Q high_priority,normal_priority,low_priority

Solution 5: Worker Downtime & Queue Explosions

The nightmare scenario: workers crash at 2 AM, scheduled tasks keep queuing.

┌────────────────────────────────────────────────────────────────┐
│              Worker Downtime Disaster Timeline                  │
├────────────────────────────────────────────────────────────────┤
│                                                                 │
│  00:00                    Normal Operation                     │
│  ┌──────────┐                                                  │
│  │ Worker 1 │───┐                                              │
│  └──────────┘   │                                              │
│  ┌──────────┐   ├─> [ Queue: 50 tasks ] ──> Processing OK     │
│  │ Worker 2 │───┤                                              │
│  └──────────┘   │                                              │
│  ┌──────────┐   │                                              │
│  │ Worker 3 │───┘                                              │
│  └──────────┘                                                  │
│                                                                 │
│  02:15                    ⚠ Disaster Strikes                   │
│  ┌──────────┐                                                  │
│  │ Worker 1 │ ✗ CRASHED (OOM)                                 │
│  └──────────┘                                                  │
│  ┌──────────┐                                                  │
│  │ Worker 2 │ ✗ CRASHED (OOM)                                 │
│  └──────────┘                                                  │
│  ┌──────────┐                                                  │
│  │ Worker 3 │ ✗ CRASHED (OOM)                                 │
│  └──────────┘                                                  │
│                                                                 │
│      [ Queue: 50 tasks ] ──> Nobody processing!                │
│                                                                 │
│  02:20        Celery Beat keeps scheduling                     │
│      ┌────────────────────┐                                    │
│      │  New tasks added!  │                                    │
│      └─────────┬──────────┘                                    │
│                ▼                                                │
│      [ Queue: 250 tasks ]  (+200)                              │
│                                                                 │
│  02:30        More scheduled tasks arrive                      │
│                ▼                                                │
│      [ Queue: 850 tasks ]  (+600)                              │
│                                                                 │
│  03:00        Queue explosion                                  │
│                ▼                                                │
│      [ Queue: 2,450 tasks ]  (+1,600)                          │
│                                                                 │
│  06:00        Engineer wakes up                                │
│      ┌─────────────────────────────────────┐                  │
│      │ ALERT: 10,000 tasks in queue!       │                  │
│      │ Oldest task age: 3h 45m             │                  │
│      │ No workers active since 02:15       │                  │
│      └─────────────────────────────────────┘                  │
│                ▼                                                │
│      [ Queue: 10,000+ tasks ]  (+7,550)                        │
│                                                                 │
│  Problems:                                                     │
│  • Most tasks are hours old (stale data)                       │
│  • Will take 10+ hours to drain at normal rate                │
│  • Duplicate/conflicting updates                               │
│  • Waste API quota on stale imports                            │
│  • Customer data not synced for hours                          │
└────────────────────────────────────────────────────────────────┘

Defense 1: Worker Health Monitoring

# Celery worker events
from celery.events import EventReceiver
from kombu import Connection

def monitor_workers():
    """Alert when no workers are processing tasks"""
    with Connection(BROKER_URL) as conn:
        recv = EventReceiver(conn, handlers={
            'worker-heartbeat': on_heartbeat,
            'worker-offline': on_worker_offline,
        })

        # Track active workers
        active_workers = {}
        last_heartbeat = {}

        def on_heartbeat(event):
            active_workers[event['hostname']] = True
            last_heartbeat[event['hostname']] = time.time()

        def on_worker_offline(event):
            active_workers.pop(event['hostname'], None)
            alert_oncall(f"Worker {event['hostname']} went offline")

        # Periodic health check
        while True:
            recv.capture(limit=None, timeout=1, wakeup=True)

            # Check for stale heartbeats
            now = time.time()
            for hostname, last_beat in last_heartbeat.items():
                if now - last_beat > 60:  # No heartbeat for 1 min
                    alert_oncall(f"Worker {hostname} heartbeat stale")

            # Check if any workers are alive
            if not active_workers:
                critical_alert("NO WORKERS ACTIVE - QUEUE BUILDING UP")

            time.sleep(10)

Defense 2: Smart Queue Draining

@celery_task
def import_properties(integration_id, enqueued_at=None):
    """Task with staleness check"""
    if enqueued_at is None:
        enqueued_at = datetime.utcnow()

    # Check task staleness
    age_seconds = (datetime.utcnow() - enqueued_at).total_seconds()

    if age_seconds > 3600:  # Task sat in queue for 1+ hour
        logger.warning(
            f"Skipping stale task for integration {integration_id}, "
            f"age: {age_seconds}s"
        )
        return  # Don't process stale data

    # Check if already processed
    last_run = get_last_successful_run(integration_id)
    if last_run and last_run > enqueued_at:
        logger.info(f"Already processed newer data, skipping")
        return

    # Process the task
    driver = get_driver(integration_id)
    driver.get_homes()

Defense 3: Queue Purging on Recovery

def recover_from_worker_outage():
    """Called when workers come back online after downtime"""

    # Check queue depths
    from celery import current_app
    inspect = current_app.control.inspect()
    active_queues = inspect.active_queues()

    for queue_name, info in active_queues.items():
        queue_length = info.get('messages', 0)

        if queue_length > 5000:
            logger.warning(f"Queue {queue_name} has {queue_length} tasks")

            # Strategy 1: Purge and re-enqueue fresh
            if queue_name in ['properties', 'reservations']:
                logger.info(f"Purging stale {queue_name} queue")
                current_app.control.purge()

                # Re-trigger fresh imports
                trigger_fresh_import_for_all_integrations()

            # Strategy 2: Let it drain with staleness checks
            elif queue_name in ['photos', 'low_priority']:
                logger.info(f"Letting {queue_name} drain naturally")
                # Tasks will self-skip if stale

            # Strategy 3: Pause scheduling until drained
            else:
                logger.info(f"Pausing new tasks for {queue_name}")
                pause_scheduled_tasks(queue_name)

Defense 4: Deployment Strategy

# Pre-deployment hook
def before_deploy():
    """Prepare for deployment to avoid queue buildup"""

    # 1. Stop scheduling new tasks
    logger.info("Pausing Celery Beat scheduler")
    os.system("supervisorctl stop celerybeat")

    # 2. Let current tasks drain
    logger.info("Waiting for tasks to complete...")
    wait_for_queue_drain(max_wait=300)  # Wait up to 5 min

    # 3. Graceful worker shutdown
    logger.info("Shutting down workers gracefully")
    os.system("celery -A app control shutdown")

# Post-deployment hook
def after_deploy():
    """Resume operations after deployment"""

    # 1. Start workers
    logger.info("Starting workers")
    os.system("supervisorctl start celery-workers")

    # 2. Verify workers are up
    if not verify_workers_active(timeout=60):
        critical_alert("Workers failed to start after deployment!")
        return

    # 3. Resume scheduling
    logger.info("Resuming Celery Beat scheduler")
    os.system("supervisorctl start celerybeat")

    logger.info("Deployment complete, system operational")

Defense 5: Backup Queue Processing

# Cron job that runs every 5 minutes
def emergency_queue_processor():
    """Fallback processor in case Celery workers are down"""

    queue_stats = get_queue_stats()

    for queue_name, stats in queue_stats.items():
        messages = stats['messages']
        oldest_age = stats['oldest_age_seconds']

        # If queue building up AND old tasks (workers likely down)
        if messages > 1000 and oldest_age > 600:  # 10 min old
            logger.warning(
                f"Emergency processing for {queue_name}: "
                f"{messages} messages, oldest: {oldest_age}s"
            )

            # Process directly (bypass Celery)
            process_queue_directly(queue_name, max_tasks=100)

            # Alert
            alert_oncall(
                f"Queue {queue_name} processed via emergency fallback. "
                f"Check worker health!"
            )

Error Handling & Retries

Error Classification

Not all errors are equal. Classification determines retry strategy:

┌─────────────────────────────────────────────────────────────┐
│                    Error Categories                          │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│ 1. TRANSIENT (Retry immediately)                            │
│    • 500 Internal Server Error                              │
│    • 502 Bad Gateway                                        │
│    • 503 Service Unavailable                                │
│    • Network timeout                                        │
│    • Connection refused                                     │
│    Action: Retry with exponential backoff                   │
│                                                              │
│ 2. RATE LIMIT (Retry with delay)                            │
│    • 429 Too Many Requests                                  │
│    Action: Wait for Retry-After header, then retry         │
│                                                              │
│ 3. CLIENT ERROR (Don't retry)                               │
│    • 400 Bad Request                                        │
│    • 401 Unauthorized                                       │
│    • 403 Forbidden                                          │
│    • 404 Not Found                                          │
│    Action: Log and skip, possible integration issue         │
│                                                              │
│ 4. AUTH ERROR (Refresh token)                               │
│    • 401 with expired token message                         │
│    Action: Refresh OAuth token, then retry                  │
│                                                              │
│ 5. DATA ERROR (Skip record)                                 │
│    • Invalid/malformed response                             │
│    • Missing required fields                                │
│    Action: Log for investigation, continue with next        │
└─────────────────────────────────────────────────────────────┘

Retry Strategy Implementation

from requests.exceptions import RequestException, Timeout
from requests import HTTPError

@celery_task(
    bind=True,
    max_retries=3,
    default_retry_delay=60,
)
def import_properties(self, integration_id):
    try:
        driver = get_driver(integration_id)
        properties = driver.get_homes()
        return properties

    except HTTPError as e:
        status_code = e.response.status_code

        # Client errors - don't retry
        if 400 <= status_code < 500:
            if status_code == 429:  # Rate limit
                retry_after = int(e.response.headers.get('Retry-After', 60))
                logger.warning(f"Rate limited, retry after {retry_after}s")
                raise self.retry(exc=e, countdown=retry_after)

            elif status_code == 401:  # Auth error
                logger.error(f"Auth failed for integration {integration_id}")
                # Try to refresh token
                if refresh_oauth_token(integration_id):
                    raise self.retry(exc=e, countdown=5)
                else:
                    # Can't refresh - alert humans
                    alert_auth_failure(integration_id)
                    return
            else:
                # Other 4xx - don't retry
                logger.error(f"Client error {status_code}: {e.response.text}")
                return

        # Server errors - retry with backoff
        elif 500 <= status_code < 600:
            retry_count = self.request.retries
            backoff = min(2 ** retry_count * 60, 1800)  # Max 30 min
            logger.warning(f"Server error {status_code}, retry {retry_count}, backoff {backoff}s")
            raise self.retry(exc=e, countdown=backoff)

    except (Timeout, RequestException) as e:
        # Network errors - retry with backoff
        retry_count = self.request.retries
        backoff = min(2 ** retry_count * 30, 900)  # Max 15 min
        logger.warning(f"Network error: {e}, retry {retry_count}")
        raise self.retry(exc=e, countdown=backoff)

    except Exception as e:
        # Unexpected error - log and alert
        logger.exception(f"Unexpected error in import_properties: {e}")
        alert_on_call_engineer(integration_id, e)
        raise

Dead Letter Queue

Tasks that fail after all retries go to a dead letter queue for investigation:

@celery_task
def import_properties(integration_id):
    try:
        # ... work
    except Exception as e:
        if self.request.retries >= self.max_retries:
            # Final failure - send to DLQ
            dead_letter_queue.send({
                'task': self.name,
                'args': [integration_id],
                'error': str(e),
                'traceback': traceback.format_exc(),
                'failed_at': datetime.utcnow(),
            })
        raise

Monitor DLQ for patterns:

-- Find common failure patterns
SELECT error, COUNT(*) as count
FROM dead_letter_queue
WHERE failed_at > NOW() - INTERVAL '24 hours'
GROUP BY error
ORDER BY count DESC;

Authentication Patterns

Every integration handles auth differently. Here are the common patterns:

Pattern 1: API Key (Simplest)

Request:
┌─────────────────────────────────────────────┐
│ GET /api/properties                         │
│ Headers:                                    │
│   Authorization: Bearer abc123xyz           │
└─────────────────────────────────────────────┘

Implementation:

class ApiKeyAuth:
    def __init__(self, api_key):
        self.api_key = api_key

    def __call__(self, request):
        request.headers['Authorization'] = f'Bearer {self.api_key}'
        return request

session.auth = ApiKeyAuth(api_key)

Pattern 2: OAuth 2.0 (Common, Complex)

┌──────────────────────────────────────────────────────────────┐
│                   OAuth 2.0 Flow                             │
├──────────────────────────────────────────────────────────────┤
│                                                              │
│  Initial Setup (One-time):                                  │
│  1. User → Auth URL                                         │
│  2. User grants permission                                  │
│  3. Redirect with auth code                                 │
│  4. Exchange code for access + refresh tokens               │
│                                                              │
│  Ongoing API Calls:                                         │
│  ┌──────────────────────────────────────┐                  │
│  │ Use access token ────> API Call      │                  │
│  └────────┬─────────────────────────────┘                  │
│           │                                                  │
│           ├─ Success ────> Continue                         │
│           │                                                  │
│           └─ 401 Unauthorized                               │
│                    │                                         │
│                    ▼                                         │
│              Refresh token flow:                            │
│              POST /oauth/token                              │
│              {                                              │
│                grant_type: "refresh_token",                 │
│                refresh_token: "xyz...",                     │
│                client_id: "...",                            │
│                client_secret: "..."                         │
│              }                                              │
│                    │                                         │
│                    ▼                                         │
│              New access token ─────> Retry API call        │
│                                                              │
│  Token Lifecycle:                                           │
│  • Access token: Expires in 1-24 hours                     │
│  • Refresh token: Expires in 30-90 days                    │
│  • Refresh proactively before expiration                   │
└──────────────────────────────────────────────────────────────┘

Implementation:

class OAuthSession:
    def __init__(self, integration_id):
        self.integration_id = integration_id
        self.load_tokens()

    def load_tokens(self):
        integration = CompanyIntegration.find(self.integration_id)
        self.access_token = integration.access_token
        self.refresh_token = integration.refresh_token
        self.expires_at = integration.token_expires_at

    def is_token_expired(self):
        # Refresh 5 minutes before actual expiry
        buffer = datetime.timedelta(minutes=5)
        return datetime.utcnow() + buffer >= self.expires_at

    def refresh_access_token(self):
        response = requests.post(
            'https://api.partner.com/oauth/token',
            data={
                'grant_type': 'refresh_token',
                'refresh_token': self.refresh_token,
                'client_id': settings.OAUTH_CLIENT_ID,
                'client_secret': settings.OAUTH_CLIENT_SECRET,
            }
        )

        if response.status_code != 200:
            raise OAuthRefreshError(f"Failed to refresh token: {response.text}")

        data = response.json()
        self.access_token = data['access_token']
        self.refresh_token = data.get('refresh_token', self.refresh_token)
        self.expires_at = datetime.utcnow() + datetime.timedelta(
            seconds=data['expires_in']
        )

        # Persist to database
        self.save_tokens()

    def make_request(self, url, **kwargs):
        # Proactive token refresh
        if self.is_token_expired():
            self.refresh_access_token()

        # Make request
        headers = kwargs.pop('headers', {})
        headers['Authorization'] = f'Bearer {self.access_token}'

        response = requests.request(url=url, headers=headers, **kwargs)

        # Reactive token refresh (if proactive failed)
        if response.status_code == 401:
            self.refresh_access_token()
            headers['Authorization'] = f'Bearer {self.access_token}'
            response = requests.request(url=url, headers=headers, **kwargs)

        return response

Pattern 3: Rotating Credentials

Some partners (looking at you, Airbnb) rotate credentials periodically via webhooks:

# Webhook endpoint
@app.post('/webhooks/partner/credentials')
def update_credentials(request):
    data = request.json()
    integration_id = data['integration_id']
    new_secret = data['secret']

    integration = CompanyIntegration.find(integration_id)
    integration.update(api_secret=new_secret)

    return {'status': 'ok'}

Critical lesson: Always update credentials atomically and test before committing:

def rotate_credentials(integration_id, new_secret):
    integration = CompanyIntegration.find(integration_id)
    old_secret = integration.api_secret

    # Test new credentials
    try:
        test_api_call(integration.api_key, new_secret)
    except AuthError:
        logger.error(f"New credentials invalid for {integration_id}")
        return False

    # Update
    integration.update(api_secret=new_secret)

    # Verify
    try:
        test_api_call(integration.api_key, new_secret)
    except AuthError:
        # Rollback
        integration.update(api_secret=old_secret)
        raise

    return True

API Changes & Versioning

Partner APIs change without warning. Here’s how to survive:

Strategy 1: Version Pinning

class PartnerAPI:
    BASE_URL = 'https://api.partner.com'
    API_VERSION = 'v2'  # Pin to specific version

    def get_url(self, endpoint):
        return f'{self.BASE_URL}/{self.API_VERSION}/{endpoint}'

Strategy 2: Response Validation

Catch breaking changes early:

from marshmallow import Schema, fields, ValidationError

class PropertySchema(Schema):
    id = fields.Str(required=True)
    name = fields.Str(required=True)
    address = fields.Dict(required=True)
    # ... other fields

def fetch_properties(self):
    response = self.api.get('properties')
    data = response.json()

    schema = PropertySchema(many=True)
    try:
        validated = schema.load(data)
    except ValidationError as e:
        logger.error(f"API response validation failed: {e.messages}")
        # Alert engineers
        alert_api_schema_change(self.partner, e.messages)
        raise

    return validated

Strategy 3: Adapter Versioning

Support multiple API versions simultaneously:

class GuestyAdapterV1:
    def normalize_property(self, raw):
        return {
            'external_id': raw['_id'],
            'name': raw['title'],
            'address': raw['address']['full'],
        }

class GuestyAdapterV2:
    def normalize_property(self, raw):
        return {
            'external_id': raw['id'],  # Changed from '_id'
            'name': raw['nickname'],    # Changed from 'title'
            'address': raw['location']['formatted'],  # New structure
        }

def get_adapter(integration):
    api_version = integration.metadata.get('api_version', 'v1')
    if api_version == 'v2':
        return GuestyAdapterV2()
    return GuestyAdapterV1()

Strategy 4: Gradual Rollout

When partner announces breaking changes:

# Feature flag for new API version
def get_api_version(integration_id):
    if is_feature_flag_enabled('guesty_api_v2', integration_id=integration_id):
        return 'v2'
    return 'v1'

# Rollout plan:
# Week 1: Enable for 1 test customer
# Week 2: Enable for 10% of customers
# Week 3: Enable for 50% of customers
# Week 4: Enable for 100% of customers
# Week 5: Remove v1 code

Monitoring & Observability

You can’t fix what you can’t see. Comprehensive monitoring is non-negotiable.

Metrics to Track

┌──────────────────────────────────────────────────────────────┐
│                    Monitoring Dashboard                       │
├──────────────────────────────────────────────────────────────┤
│                                                              │
│  API Call Metrics (per partner):                            │
│  • Total calls/min                    [████░░░░] 450/600    │
│  • Success rate                       [█████████] 99.2%     │
│  • Average response time              [███░░░░░░] 250ms     │
│  • P95 response time                  [█████░░░░] 800ms     │
│  • P99 response time                  [███████░░] 1.2s      │
│                                                              │
│  Error Breakdown:                                           │
│  • 4xx errors                         12/hour               │
│    └─ 401 Unauthorized                8                     │
│    └─ 404 Not Found                   4                     │
│  • 5xx errors                         3/hour                │
│  • Timeouts                           1/hour                │
│  • Rate limits (429)                  0/hour  ✓             │
│                                                              │
│  Queue Metrics:                                             │
│  • properties queue                   [██░░░░░░░] 234       │
│  • reservations queue                 [█░░░░░░░░] 45        │
│  • photos queue                       [████░░░░░] 890       │
│  • Age of oldest task                 2m 34s                │
│                                                              │
│  Task Metrics:                                              │
│  • Tasks succeeded                    1,234/hour            │
│  • Tasks failed                       12/hour               │
│  • Tasks retried                      45/hour               │
│  • Average task duration              [███░░░░░░] 8.3s      │
│                                                              │
│  Integration Health:                                        │
│  • Guesty          ✓ Healthy        Last run: 2m ago       │
│  • Airbnb          ⚠ Degraded       Rate limited            │
│  • Booking.com     ✗ Down           Auth failure            │
│  • CloudBeds       ✓ Healthy        Last run: 5m ago       │
└──────────────────────────────────────────────────────────────┘

Implementation

from prometheus_client import Counter, Histogram, Gauge

# Define metrics
api_calls_total = Counter(
    'integration_api_calls_total',
    'Total API calls',
    ['partner', 'endpoint', 'status_code']
)

api_call_duration = Histogram(
    'integration_api_call_duration_seconds',
    'API call duration',
    ['partner', 'endpoint']
)

queue_length = Gauge(
    'integration_queue_length',
    'Number of tasks in queue',
    ['queue_name']
)

# Instrument API calls
class LoggedApiSession(requests.Session):
    def __init__(self, partner):
        super().__init__()
        self.partner = partner

    def request(self, method, url, **kwargs):
        endpoint = url.split('/')[-1]  # Simple endpoint extraction

        with api_call_duration.labels(self.partner, endpoint).time():
            response = super().request(method, url, **kwargs)

        api_calls_total.labels(
            self.partner,
            endpoint,
            response.status_code
        ).inc()

        return response

Alerting Rules

# Prometheus alert rules
groups:
  - name: integrations
    rules:
      # High error rate
      - alert: HighAPIErrorRate
        expr: |
          rate(integration_api_calls_total{status_code=~"5.."}[5m]) > 0.05
        for: 10m
        annotations:
          summary: "High API error rate for "

      # Auth failures
      - alert: AuthenticationFailures
        expr: |
          rate(integration_api_calls_total{status_code="401"}[5m]) > 0
        for: 5m
        annotations:
          summary: "Auth failures for "

      # Queue backup
      - alert: QueueBackingUp
        expr: integration_queue_length > 5000
        for: 15m
        annotations:
          summary: "Queue  has  tasks"

      # Slow API calls
      - alert: SlowAPIResponses
        expr: |
          histogram_quantile(0.95,
            rate(integration_api_call_duration_seconds_bucket[5m])
          ) > 5
        for: 10m
        annotations:
          summary: "P95 latency for  is s"

Structured Logging

import structlog

logger = structlog.get_logger()

# Every log includes context
logger.info(
    "api_call_completed",
    partner="guesty",
    endpoint="properties",
    integration_id=integration_id,
    duration_ms=response_time,
    status_code=response.status_code,
    properties_returned=len(properties),
)

# Enables powerful queries:
# "Show me all Guesty calls that took >2s"
# "Which integrations had auth failures today?"
# "What's the average properties per API call by partner?"

Cascading Requests

Some operations trigger chains of API calls. This is where things get interesting.

The Problem

Import Properties Request
    │
    ├─> Fetch property list (1 API call)
    │
    ├─> For each property (100 properties):
    │   ├─> Fetch property details (100 API calls)
    │   ├─> Fetch property photos (100 API calls)
    │   └─> Fetch reservations (100 API calls)
    │
    └─> Total: 301 API calls for one import!

With 50 integrations running every hour:
• 15,050 API calls/hour
• 250 API calls/minute
• 4+ API calls/second

Each partner has different rate limits!

Strategy 1: Batching

Fetch multiple items in one request:

# BAD: N+1 queries
for property_id in property_ids:
    details = api.get(f'/properties/{property_id}')  # 100 API calls

# GOOD: Batch fetch
property_ids_str = ','.join(property_ids)
details = api.get(f'/properties?ids={property_ids_str}')  # 1 API call

Strategy 2: Parallel with Concurrency Control

from concurrent.futures import ThreadPoolExecutor, as_completed

def fetch_property_details(property_id):
    return api.get(f'/properties/{property_id}')

property_ids = [...]  # 100 IDs

# Limit concurrency to respect rate limits
MAX_CONCURRENT = 5

with ThreadPoolExecutor(max_workers=MAX_CONCURRENT) as executor:
    futures = {
        executor.submit(fetch_property_details, pid): pid
        for pid in property_ids
    }

    results = []
    for future in as_completed(futures):
        property_id = futures[future]
        try:
            result = future.result()
            results.append(result)
        except Exception as e:
            logger.error(f"Failed to fetch property {property_id}: {e}")

# Result: 100 properties fetched in ~20 API calls (5 at a time)
# Instead of 100 sequential calls taking 100s, takes ~20s

Strategy 3: Task Chaining

Break cascading requests into separate tasks:

@celery_task
def import_properties(integration_id):
    driver = get_driver(integration_id)
    property_ids = driver.get_property_ids()  # Fast, just IDs

    # Chain: fetch details for each property
    for prop_id in property_ids:
        fetch_property_details.delay(integration_id, prop_id)

@celery_task
def fetch_property_details(integration_id, property_id):
    driver = get_driver(integration_id)
    details = driver.get_property_details(property_id)

    # Chain: fetch related data
    fetch_property_photos.delay(integration_id, property_id)
    fetch_reservations.delay(integration_id, property_id)

@celery_task
def fetch_property_photos(integration_id, property_id):
    # Fetches photos
    pass

@celery_task
def fetch_reservations(integration_id, property_id):
    # Fetches reservations
    pass

Benefits:

  • Each task is independently retryable
  • Failures don’t cascade
  • Easy to monitor progress
  • Natural rate limiting via queue

Strategy 4: Smart Prioritization

Don’t fetch everything for every property:

@celery_task
def import_properties(integration_id):
    properties = driver.get_properties()

    for prop in properties:
        # Always import critical data
        import_property_basic.delay(integration_id, prop.id)

        # Only import photos if changed
        if prop.photos_updated_at > last_import:
            import_photos.delay(integration_id, prop.id)

        # Only import future reservations
        if has_upcoming_reservations(prop.id):
            import_reservations.delay(integration_id, prop.id)

Memory Management & Batching

One of the sneakiest failure modes: your integration works perfectly for 10 properties, then crashes with OOM (Out Of Memory) when processing 10,000.

The Memory Bloat Problem

Scenario: Import 5,000 properties for a large customer

┌────────────────────────────────────────────────────────────┐
│                    Memory Usage Timeline                    │
├────────────────────────────────────────────────────────────┤
│                                                             │
│  8GB │                                             ┌─CRASH │
│      │                                        ┌────┘        │
│  6GB │                                   ┌────┘             │
│      │                              ┌────┘                  │
│  4GB │                         ┌────┘                       │
│      │                    ┌────┘                            │
│  2GB │               ┌────┘                                 │
│      │          ┌────┘                                      │
│  0GB └──────────┘──────────────────────────────────────────┤
│      0        1000      2000      3000      4000      5000  │
│                    Properties Processed                     │
│                                                             │
│  Problem: Loading all data into memory at once             │
│  • Each property: ~50KB                                    │
│  • 5,000 properties: 250MB base data                       │
│  • Python object overhead: ~3x                             │
│  • List/dict allocations: ~2x                              │
│  • Total: ~1.5GB just for raw data                         │
│  • Peak with processing: 3-8GB!                            │
└────────────────────────────────────────────────────────────┘

Root Causes:

  1. Loading entire dataset into memory
  2. Building large lists/dicts without bounds
  3. Not releasing references (Python GC can’t collect)
  4. SQLAlchemy session holding all objects
  5. Pandas DataFrames growing unbounded

Strategy 1: Chunking / Pagination

Process data in fixed-size chunks:

# BAD: Load everything into memory
@celery_task
def import_all_reservations(integration_id):
    driver = get_driver(integration_id)

    # This loads ALL reservations into memory!
    all_reservations = driver.get_reservations()  # Could be 50,000+ items

    for reservation in all_reservations:  # Memory keeps growing
        process_reservation(reservation)
        save_reservation(reservation)
    # Memory not released until task completes

# GOOD: Process in chunks
@celery_task
def import_all_reservations(integration_id):
    driver = get_driver(integration_id)

    CHUNK_SIZE = 100  # Process 100 at a time
    offset = 0

    while True:
        # Fetch chunk
        chunk = driver.get_reservations(limit=CHUNK_SIZE, offset=offset)

        if not chunk:
            break  # No more data

        # Process chunk
        for reservation in chunk:
            process_reservation(reservation)
            save_reservation(reservation)

        # Clear chunk from memory
        del chunk

        # Move to next chunk
        offset += CHUNK_SIZE

        # Optional: Force garbage collection
        import gc
        gc.collect()

        logger.info(f"Processed {offset} reservations")

Memory comparison:

Without chunking: 5,000 items × 50KB = 250MB minimum
With chunking:    100 items × 50KB = 5MB at a time (50x reduction!)

Strategy 2: Streaming / Generator Pattern

Use generators to process data lazily:

# BAD: Return entire list
def fetch_all_properties(api):
    properties = []
    page = 1

    while True:
        response = api.get(f'/properties?page={page}')
        properties.extend(response['data'])  # List keeps growing!

        if not response['has_more']:
            break
        page += 1

    return properties  # Entire dataset in memory

# GOOD: Yield items one at a time
def stream_properties(api):
    """Generator that yields properties without loading all into memory"""
    page = 1

    while True:
        response = api.get(f'/properties?page={page}')

        for prop in response['data']:
            yield prop  # Yield one item at a time

        if not response['has_more']:
            break
        page += 1

# Usage
@celery_task
def import_properties(integration_id):
    api = get_api(integration_id)

    # Memory efficient: only one property in memory at a time
    for prop in stream_properties(api):
        process_property(prop)
        save_property(prop)
        # prop is garbage collected after this iteration

Strategy 3: Database Batching

Batch database operations to avoid holding objects in session:

# BAD: Session accumulates all objects
@celery_task
def import_properties(integration_id):
    session = get_session()
    properties = fetch_all_properties()  # 5,000 items

    for prop_data in properties:
        # SQLAlchemy session keeps reference to every object!
        property = Property(**prop_data)
        session.add(property)

    session.commit()  # Huge commit, all objects in memory
    # Memory only released after commit

# GOOD: Batch commits with session cleanup
@celery_task
def import_properties(integration_id):
    BATCH_SIZE = 100

    for chunk in chunked(stream_properties(api), BATCH_SIZE):
        session = get_session()

        for prop_data in chunk:
            property = Property(**prop_data)
            session.add(property)

        # Commit batch
        session.commit()

        # Critical: Close session to release memory
        session.close()

        # Alternative: Expunge objects
        # session.expunge_all()

def chunked(iterable, size):
    """Yield successive chunks from iterable"""
    chunk = []
    for item in iterable:
        chunk.append(item)
        if len(chunk) == size:
            yield chunk
            chunk = []
    if chunk:
        yield chunk

Strategy 4: Bulk Operations

Use bulk operations instead of individual inserts:

# BAD: Individual inserts
for prop_data in properties:
    Property.create(**prop_data)  # Separate query each time

# GOOD: Bulk insert
Property.bulk_insert([
    {'name': prop['name'], 'address': prop['address'], ...}
    for prop in properties
])

# BETTER: Bulk insert in chunks
for chunk in chunked(properties, 1000):
    Property.bulk_insert([
        {'name': p['name'], 'address': p['address']}
        for p in chunk
    ])
    # Each chunk is separate transaction

Strategy 5: Memory Profiling

Identify memory leaks before they hit production:

from memory_profiler import profile
import tracemalloc

# Method 1: Decorator-based profiling
@profile
@celery_task
def import_properties(integration_id):
    # Function body
    pass

# Outputs:
# Line #    Mem usage    Increment
# ======================================
#     12     50.2 MiB     0.0 MiB    def import_properties():
#     13     75.4 MiB    25.2 MiB        data = fetch_all()
#     14     75.4 MiB     0.0 MiB        for item in data:
#     15    145.8 MiB    70.4 MiB            process(item)  # Memory leak!

# Method 2: Tracemalloc for production monitoring
@celery_task
def import_properties(integration_id):
    tracemalloc.start()

    # Take snapshot before
    snapshot_before = tracemalloc.take_snapshot()

    # Do work
    properties = fetch_all_properties()
    for prop in properties:
        process_property(prop)

    # Take snapshot after
    snapshot_after = tracemalloc.take_snapshot()

    # Compare
    top_stats = snapshot_after.compare_to(snapshot_before, 'lineno')

    # Log top memory consumers
    logger.info("Top 10 memory allocations:")
    for stat in top_stats[:10]:
        logger.info(f"{stat}")

    # Alert if memory usage too high
    current, peak = tracemalloc.get_traced_memory()
    if peak > 1024 * 1024 * 1024:  # 1GB
        alert_oncall(f"High memory usage: {peak / 1024 / 1024:.1f}MB")

    tracemalloc.stop()

Strategy 6: Explicit Garbage Collection

Force Python to release memory:

import gc

@celery_task
def import_large_dataset(integration_id):
    for i, chunk in enumerate(fetch_in_chunks()):
        process_chunk(chunk)

        # Clear local references
        del chunk

        # Force GC every 10 chunks
        if i % 10 == 0:
            # Disable GC during processing
            gc.disable()

            # Process chunk
            # ...

            # Enable and force collection
            gc.enable()
            collected = gc.collect()
            logger.debug(f"GC collected {collected} objects")

            # Log memory stats
            memory_info = get_memory_usage()
            logger.info(f"Memory: {memory_info['rss']}MB RSS, {memory_info['vms']}MB VMS")

Strategy 7: Circular Reference Detection

Find and break circular references that prevent GC:

import gc
import sys

def find_circular_references():
    """Debug helper to find circular references"""
    gc.collect()  # Force collection first

    # Find all objects
    for obj in gc.get_objects():
        if isinstance(obj, dict):
            # Check for circular references in dicts
            for key, value in obj.items():
                if value is obj:
                    logger.warning(f"Circular reference found: {obj}")

# Common circular reference patterns

# Pattern 1: Parent-child relationship
class Property:
    def __init__(self):
        self.reservations = []

class Reservation:
    def __init__(self, property):
        self.property = property  # Reference to parent
        property.reservations.append(self)  # Parent references child
        # Circular reference!

# Solution: Use weak references
import weakref

class Reservation:
    def __init__(self, property):
        self.property = weakref.ref(property)  # Weak reference
        property.reservations.append(self)

# Pattern 2: Cached data with references
cache = {}

def get_property(property_id):
    if property_id in cache:
        return cache[property_id]

    prop = Property.find(property_id)
    cache[property_id] = prop  # Keeps reference forever!
    return prop

# Solution: Use LRU cache with size limit
from functools import lru_cache

@lru_cache(maxsize=1000)  # Limit cache size
def get_property(property_id):
    return Property.find(property_id)

Strategy 8: Memory Limits per Task

Prevent runaway tasks from consuming all memory:

import resource

@celery_task
def import_properties(integration_id):
    # Set memory limit: 512MB
    soft, hard = resource.getrlimit(resource.RLIMIT_AS)
    resource.setrlimit(resource.RLIMIT_AS, (512 * 1024 * 1024, hard))

    try:
        # Do work
        process_data()
    except MemoryError:
        logger.error(f"Task exceeded memory limit")
        # Graceful degradation
        process_data_in_smaller_chunks()

# Celery task time and memory limits
@celery_task(
    soft_time_limit=300,    # 5 min soft limit
    time_limit=360,         # 6 min hard limit
)
def import_properties(integration_id):
    pass

Real-World Example: Processing 10,000 Properties

@celery_task(bind=True)
def import_all_properties_chunked(self, integration_id):
    """Memory-efficient property import"""

    driver = get_driver(integration_id)
    stats = {
        'processed': 0,
        'failed': 0,
        'peak_memory_mb': 0,
    }

    CHUNK_SIZE = 100
    tracemalloc.start()

    try:
        # Stream properties (no memory buildup)
        for chunk_idx, property_chunk in enumerate(
            chunked(driver.stream_properties(), CHUNK_SIZE)
        ):
            # Track memory
            current, peak = tracemalloc.get_traced_memory()
            peak_mb = peak / 1024 / 1024
            stats['peak_memory_mb'] = max(stats['peak_memory_mb'], peak_mb)

            # Alert if memory growing
            if peak_mb > 500:  # 500MB threshold
                logger.warning(f"High memory usage: {peak_mb:.1f}MB")
                gc.collect()

            # Process chunk with separate session
            session = get_session()

            try:
                bulk_data = []
                for prop_data in property_chunk:
                    try:
                        # Transform data
                        normalized = normalize_property(prop_data)
                        bulk_data.append(normalized)
                        stats['processed'] += 1
                    except Exception as e:
                        logger.error(f"Failed to process property: {e}")
                        stats['failed'] += 1

                # Bulk insert chunk
                if bulk_data:
                    Property.bulk_insert(bulk_data)
                    session.commit()

            except Exception as e:
                session.rollback()
                logger.error(f"Failed to save chunk {chunk_idx}: {e}")
            finally:
                session.close()

            # Clear chunk from memory
            del property_chunk
            del bulk_data

            # Force GC every 10 chunks
            if chunk_idx % 10 == 0:
                collected = gc.collect()
                logger.info(
                    f"Chunk {chunk_idx}: {stats['processed']} processed, "
                    f"GC collected {collected} objects, "
                    f"Peak memory: {peak_mb:.1f}MB"
                )

            # Update task progress
            self.update_state(
                state='PROGRESS',
                meta={
                    'processed': stats['processed'],
                    'failed': stats['failed'],
                }
            )

    finally:
        tracemalloc.stop()

    logger.info(
        f"Import complete: {stats['processed']} processed, "
        f"{stats['failed']} failed, "
        f"Peak memory: {stats['peak_memory_mb']:.1f}MB"
    )

    return stats

Memory Management Checklist

┌────────────────────────────────────────────────────────────┐
│           Memory Optimization Checklist                     │
├────────────────────────────────────────────────────────────┤
│                                                             │
│  ✓ Use generators/streaming instead of loading all data    │
│  ✓ Process data in chunks (100-1000 items)                 │
│  ✓ Use bulk operations for database writes                 │
│  ✓ Close database sessions after each batch                │
│  ✓ Delete large objects after use (del variable)           │
│  ✓ Force garbage collection in long-running tasks          │
│  ✓ Avoid circular references (use weakref)                 │
│  ✓ Profile memory usage in development                     │
│  ✓ Set memory limits per task                              │
│  ✓ Monitor memory in production                            │
│  ✓ Clear caches periodically                               │
│  ✓ Use LRU cache with size limits                          │
│  ✓ Avoid global state accumulation                         │
│                                                             │
└────────────────────────────────────────────────────────────┘

Key Insight: Memory issues are like compound interest - small leaks accumulate over time. A 1MB leak per task becomes 1GB after 1,000 tasks. Design for bounded memory usage from day one.


Data Consistency

Integration data is inherently eventually consistent. Embrace it.

Challenge: Concurrent Updates

Scenario: Two workers processing same property

Worker A                          Worker B
   │                                 │
   ├─ Fetch property @ 10:00:00     │
   │  name: "Beach House"           │
   │                                 ├─ Fetch property @ 10:00:01
   │                                 │  name: "Beach House"
   │                                 │
   ├─ Update name to "Ocean Villa"  │
   │  (from API)                     │
   │  Save @ 10:00:05                │
   │                                 │
   │                                 ├─ Update name to "Beach House"
   │                                 │  (from stale cached data)
   │                                 │  Save @ 10:00:06
   │                                 │
   └─ RESULT: Old data wins! ✗     ─┘

Solution 1: Last-Write-Wins with Timestamps

def update_property(property_id, new_data, fetched_at):
    property = Property.find(property_id)

    # Only update if data is newer
    if fetched_at > property.last_synced_at:
        property.update(
            name=new_data['name'],
            address=new_data['address'],
            last_synced_at=fetched_at,
        )
        return True
    else:
        logger.info(f"Skipping stale update for property {property_id}")
        return False

Solution 2: Optimistic Locking

from sqlalchemy.orm import version_id_column

class Property(Base):
    __tablename__ = 'properties'

    id = Column(Integer, primary_key=True)
    name = Column(String)
    version = version_id_column()  # Auto-incremented on each update

def update_property(property_id, new_data):
    from sqlalchemy.orm.exc import StaleDataError

    session = get_session()
    try:
        property = session.query(Property).filter_by(id=property_id).one()
        property.name = new_data['name']
        session.commit()  # Fails if version changed
    except StaleDataError:
        session.rollback()
        logger.warning(f"Concurrent update detected for property {property_id}")
        # Retry or skip

Solution 3: Idempotency Keys

Make operations idempotent:

@celery_task
def import_reservations(integration_id, property_id, idempotency_key):
    # Check if already processed
    cache_key = f'import_res:{idempotency_key}'
    if redis.exists(cache_key):
        logger.info(f"Skipping duplicate import: {idempotency_key}")
        return

    # Do the import
    reservations = driver.get_reservations(property_id)
    for res in reservations:
        upsert_reservation(res)

    # Mark as processed (24 hour expiry)
    redis.setex(cache_key, 86400, '1')

# Usage: generate stable idempotency key
idempotency_key = f"{integration_id}:{property_id}:{date.today().isoformat()}"
import_reservations.delay(integration_id, property_id, idempotency_key)

Solution 4: Event Sourcing

Store all changes as events:

# Instead of updating property directly:
property.name = "New Name"  # Lost history

# Store events:
events.append({
    'type': 'property.name_changed',
    'property_id': property_id,
    'old_value': 'Beach House',
    'new_value': 'Ocean Villa',
    'source': 'guesty_api',
    'timestamp': datetime.utcnow(),
})

# Rebuild current state from events
def get_property_state(property_id):
    events = Event.query.filter_by(property_id=property_id).order_by('timestamp')
    state = {}
    for event in events:
        apply_event(state, event)
    return state

Testing Strategies

Testing integrations is hard because you depend on external APIs.

Strategy 1: Record & Replay (VCR)

import vcr

# First run: records real API responses
# Subsequent runs: replays from cassette
@vcr.use_cassette('fixtures/vcr_cassettes/guesty_properties.yaml')
def test_fetch_properties():
    api = GuestyAPI(client_id='test', client_secret='test')
    properties = api.get_homes()

    assert len(properties) > 0
    assert properties[0]['_id']
    assert properties[0]['nickname']

# Cassette file (auto-generated):
# interactions:
# - request:
#     method: GET
#     uri: https://api.guesty.com/api/v2/listings
#   response:
#     status: {code: 200}
#     body: {string: '{"results": [...]}'}

Benefits:

  • Tests don’t hit real API (fast, free)
  • Deterministic results
  • Works offline

Drawbacks:

  • Cassettes get stale
  • Need to refresh periodically

Strategy 2: Contract Testing

Validate our assumptions about partner APIs:

import pytest

def test_guesty_api_contract():
    """Validates Guesty API still matches our expectations"""
    api = GuestyAPI(client_id=TEST_CREDS['id'], client_secret=TEST_CREDS['secret'])

    # Test 1: Auth works
    assert api.is_authenticated

    # Test 2: Properties endpoint exists
    properties = api.get_homes()
    assert isinstance(properties, list)

    # Test 3: Property structure matches schema
    if properties:
        prop = properties[0]
        assert '_id' in prop
        assert 'nickname' in prop
        assert 'address' in prop
        assert 'accommodates' in prop

    # Test 4: Rate limit headers present
    response = api._last_response
    assert 'X-RateLimit-Remaining-Minute' in response.headers

# Run these tests daily against production APIs
# Alert if contracts break

Strategy 3: Integration Test Environment

Some partners provide sandbox/test environments:

# config.py
if ENV == 'production':
    GUESTY_BASE_URL = 'https://api.guesty.com'
elif ENV == 'test':
    GUESTY_BASE_URL = 'https://api.sandbox.guesty.com'

Reality check: Most partners don’t have good test environments. When they exist:

  • Often missing features
  • Stale data
  • Different behavior than production
  • Extra cost

Strategy 4: Chaos Testing

Simulate failures to ensure resilience:

import random

class ChaosAPISession(requests.Session):
    def request(self, *args, **kwargs):
        # Randomly inject failures
        if random.random() < 0.1:  # 10% failure rate
            failure_type = random.choice([
                'timeout',
                '500_error',
                '429_rate_limit',
                'network_error',
            ])

            if failure_type == 'timeout':
                time.sleep(30)
                raise Timeout("Simulated timeout")
            elif failure_type == '500_error':
                return Mock(status_code=500, text="Internal Server Error")
            elif failure_type == '429_rate_limit':
                return Mock(status_code=429, headers={'Retry-After': '60'})
            elif failure_type == 'network_error':
                raise ConnectionError("Simulated network failure")

        return super().request(*args, **kwargs)

# Use in staging environment
if ENV == 'staging':
    session_class = ChaosAPISession
else:
    session_class = requests.Session

Lessons Learned

After 40+ integrations, here are the hard-won lessons:

1. Expect Failure

❌ Don’t: Assume APIs are reliable

properties = api.get_properties()  # What if this fails?
for prop in properties:
    save(prop)

✅ Do: Design for failure

try:
    properties = api.get_properties()
except APIError as e:
    logger.error(f"Failed to fetch properties: {e}")
    alert_on_call()
    return  # Graceful degradation

for prop in properties:
    try:
        save(prop)
    except Exception as e:
        logger.error(f"Failed to save property {prop['id']}: {e}")
        # Continue with next property

2. Rate Limits Are Real

Mistake: Treating rate limits as suggestions

Reality: You will get banned. Recovery is painful (support tickets, manual approval).

Defense in depth:

  1. Read the docs (they’re usually wrong)
  2. Implement conservative limits (50-70% of stated limit)
  3. Use multiple layers (per-worker, per-queue, distributed)
  4. Monitor actual limits via response headers
  5. Implement backoff on 429s
  6. Have circuit breakers

3. Authentication Will Break

Plan for:

  • Token expiration (even “long-lived” tokens)
  • Token refresh failures
  • Credential rotation
  • OAuth provider outages
  • Webhook auth updates

Solution: Monitoring + Alerts + Runbooks

# Alert on auth failures
if auth_failure_count_last_hour(partner) > 5:
    page_oncall(f"{partner} auth failing - check credentials")

4. APIs Change Without Warning

Examples I’ve seen:

  • Field renamed (property_namename)
  • Response structure changed (flat → nested)
  • Pagination method changed (offset → cursor)
  • Rate limits tightened (1000/min → 100/min)
  • Endpoints deprecated with no notice

Defense:

  • Version pinning
  • Response validation (schemas)
  • Contract tests (daily)
  • Adapter pattern (easy to swap implementations)
  • Feature flags (gradual rollouts)

5. One Integration, Many Edge Cases

Common issues:

  • Missing required fields (e.g., no address for some properties)
  • Inconsistent data types (string vs int for IDs)
  • Timezone confusion (UTC? Local? Ambiguous?)
  • Encoding issues (UTF-8? Latin-1? Emoji?)
  • Null vs empty string vs missing field

Solution: Defensive parsing + validation

def parse_property(raw):
    return {
        'id': str(raw['id']),  # Always string, even if sometimes int
        'name': raw.get('name', '').strip() or 'Untitled Property',
        'address': raw.get('address', {}).get('formatted', 'Unknown'),
        'capacity': int(raw.get('accommodates') or 0),
        'created_at': parse_datetime(raw.get('createdAt')),  # Handles many formats
    }

6. Logs Are Your Best Friend

When something goes wrong (and it will), comprehensive logs make the difference between 15 minute debugging and 4 hour investigation.

Log:

  • Every API call (URL, method, status, duration)
  • Every error (with context: integration_id, property_id, etc.)
  • State changes (property created, updated, deleted)
  • Rate limit info (headers, delays)
  • Auth events (token refresh, failures)

Don’t log:

  • API keys, tokens, passwords (obviously)
  • PII without careful consideration
  • High-cardinality data in metric labels

7. Monitoring Must Be Actionable

Bad alert:

"API error rate increased"

Good alert:

"Guesty API 401 errors for integration #1234 (Company: ACME Inc)
Last successful auth: 2 hours ago
Action: Check if customer revoked permissions
Runbook: https://wiki.company.com/integrations/guesty-auth-failures"

8. Start Simple, Add Complexity When Needed

Don’t prematurely optimize:

  • First integration: Simple cron job might be fine
  • 5 integrations: Add basic queue
  • 10+ integrations: Distributed tasks
  • 40+ integrations: Need all the patterns in this post

Over-engineering early is worse than under-engineering.

9. Memory Leaks Kill Silently

The problem: Works perfectly for 100 items, crashes mysteriously at 10,000.

Common causes:

  • Loading entire dataset into memory
  • SQLAlchemy session holding all objects
  • Circular references preventing GC
  • Cached data growing unbounded
  • Large lists/dicts built incrementally

Solutions:

# BAD: Load everything
all_items = fetch_all()  # 10,000 items in memory!
for item in all_items:
    process(item)

# GOOD: Stream/chunk
for chunk in fetch_in_chunks(size=100):
    process_chunk(chunk)
    del chunk  # Release memory
    gc.collect()  # Force cleanup

Key insight: Profile memory in dev, monitor in prod. Memory issues compound over time.

10. Worker Downtime = Queue Disaster

Scenario: Workers crash at 2 AM, scheduler keeps queuing tasks.

Result: 10,000+ stale tasks by morning, 10+ hours to drain.

Defense:

  • Monitor worker heartbeats (alert within 1 minute)
  • Task staleness checks (skip tasks older than 1 hour)
  • Queue depth alerts (page when > 1000)
  • Graceful deployment (drain before restart)
  • Emergency fallback processor

Critical: Most “queue backup” problems are actually “worker down” problems.

11. Partner APIs Are Not Created Equal

Tier 1 (Great):

  • Comprehensive docs
  • Webhook support
  • Reasonable rate limits
  • Stable APIs
  • Responsive support
  • Example: Stripe, Twilio

Tier 2 (Okay):

  • Basic docs
  • Some rate limits
  • Breaking changes occasionally
  • Slow support
  • Example: Most PMS systems

Tier 3 (Painful):

  • Outdated docs (or no docs)
  • Aggressive rate limits
  • Frequent breaking changes
  • No support
  • Example: You know who you are

Adjust complexity accordingly. Don’t build the same architecture for all partners.

12. Documentation Debt Is Real

Document:

  • Partner-specific quirks
  • Rate limits (real ones, not official docs)
  • Auth renewal process
  • Common failures + solutions
  • Deployment considerations

Example quirks doc:

## Guesty Integration

### Rate Limits
Official: 600 req/min
Actual: ~500 req/min (aggressive throttling around 550)
Per-second limit: 10 req/s (undocumented!)

### Auth
- Token expires after 24 hours
- Refresh can take up to 30s
- Refresh endpoint rate limited to 10/hour

### Known Issues
- Property IDs sometimes change (merge/split operations)
- Address field can be null even for published properties
- Photos API occasionally returns 502 (retry works)

### Support
- Response time: 3-5 business days
- Slack channel: #vendor-guesty

Conclusion

Building scalable integrations is a marathon, not a sprint. The patterns in this post took years to develop through trial, error, and production incidents.

Key Takeaways:

  1. Isolation - Use driver pattern to contain partner-specific logic
  2. Async - Celery/queues for distributed processing
  3. Rate Limiting - Multiple layers, conservative limits, adaptive backoff
  4. Error Handling - Classify errors, retry intelligently, dead letter queue
  5. Circuit Breakers - Fail fast when partner is down
  6. Memory Management - Chunk data, stream processing, explicit GC
  7. Worker Monitoring - Heartbeat checks, queue depth alerts, graceful deploys
  8. Monitoring - Comprehensive metrics, actionable alerts
  9. Testing - Contract tests, chaos engineering, record/replay
  10. Data Consistency - Timestamps, optimistic locking, idempotency
  11. Documentation - Runbooks, quirks, postmortems

Remember: Integration engineering is fundamentally different from building your own APIs. You’re operating in a hostile, unpredictable environment where Murphy’s Law is the only constant.

The teams that succeed are those that:

  • Expect failure and design for resilience
  • Monitor relentlessly
  • Iterate based on production feedback
  • Document everything (especially the weird stuff)

Final Advice: Start simple. Add complexity only when you feel the pain. Every pattern in this post was born from a production incident.

Good luck, and may your rate limits be generous and your tokens forever fresh.


Questions? Want to discuss integration architecture? Find me on GitHub or LinkedIn.