Hard-won lessons from building and maintaining 40+ production integrations including Guesty, Airbnb, Booking.com, CloudBeds, and more. This is a deep dive into the architectural patterns, failure modes, and battle-tested strategies for building integration systems that actually work at scale.
Table of Contents
- Introduction
- Architecture Overview
- The Driver Pattern
- Distributed Task Processing
- Rate Limiting Strategies
- Queue Management & Backpressure
- Error Handling & Retries
- Authentication Patterns
- API Changes & Versioning
- Monitoring & Observability
- Cascading Requests
- Memory Management & Batching
- Data Consistency
- Testing Strategies
- Lessons Learned
Introduction
After building 40+ integrations with third-party APIs (property management systems, booking platforms, smart locks, etc.), I’ve learned that integration engineering is fundamentally different from traditional backend development. You’re operating in a hostile environment where:
- You don’t control the API - rate limits, outages, and breaking changes happen without warning
- Each partner is unique - OAuth flows, pagination, error formats, and data models vary wildly
- Failure is the norm - networks fail, APIs timeout, data is inconsistent
- Scale amplifies problems - what works for 5 partners breaks at 40+
This post documents the patterns, strategies, and hard lessons learned from building a production integration system that processes millions of API calls daily.
Architecture Overview
System Topology
┌─────────────────────────────────────────────────────────────────┐
│ APPLICATION LAYER │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Web API │ │ Admin UI │ │ Webhooks │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
└─────────┼──────────────────┼──────────────────┼─────────────────┘
│ │ │
└──────────────────┴──────────────────┘
│
┌────────────────────────────┼─────────────────────────────────────┐
│ INTEGRATION ORCHESTRATION │
│ │ │
│ ┌────────────────────────▼────────────────────────┐ │
│ │ Task Scheduler (Celery Beat) │ │
│ │ • Periodic imports (properties, reservations) │ │
│ │ • Health checks │ │
│ └────────────────────┬────────────────────────────┘ │
│ │ │
│ ┌────────────────────▼────────────────────────────┐ │
│ │ Message Queue (RabbitMQ/Redis) │ │
│ │ │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌──────────┐ │ │
│ │ │ properties │ │ reservations│ │ photos │ │ │
│ │ │ queue │ │ queue │ │ queue │ │ │
│ │ └─────────────┘ └─────────────┘ └──────────┘ │ │
│ └─────────────────────────────────────────────────┘ │
└─────────────────────────┬───────────────────────────────────────┘
│
┌───────────────┴───────────────┬───────────────┐
│ │ │
┌─────────▼──────────┐ ┌──────────────▼────┐ ┌──────▼──────┐
│ Worker Pool 1 │ │ Worker Pool 2 │ │ Worker N │
│ ┌──────────────┐ │ │ ┌──────────────┐ │ │ ┌─────────┐ │
│ │ Driver │ │ │ │ Driver │ │ │ │ Driver │ │
│ │ Factory │ │ │ │ Factory │ │ │ │ Factory │ │
│ └──────┬───────┘ │ │ └──────┬───────┘ │ │ └────┬────┘ │
│ │ │ │ │ │ │ │ │
│ ┌──────▼───────┐ │ │ ┌──────▼───────┐ │ │ ┌────▼────┐ │
│ │ API Client │ │ │ │ API Client │ │ │ │ API │ │
│ │ + Adapter │ │ │ │ + Adapter │ │ │ │ Client │ │
│ └──────────────┘ │ │ └──────────────┘ │ │ └─────────┘ │
└─────────┬──────────┘ └──────────┬────────┘ └──────┬──────┘
│ │ │
└───────────────┬───────────┴───────────────────┘
│
┌───────────────┴─────────────┬──────────────┐
│ │ │
┌─────────▼────────┐ ┌────────────────▼──┐ ┌──────▼──────┐
│ Guesty API │ │ Airbnb API │ │ Partner N │
│ Rate Limit: │ │ Rate Limit: │ │ Rate Limit:│
│ 600 req/min │ │ 200 req/10min │ │ Varies │
└──────────────────┘ └───────────────────┘ └─────────────┘
Key Design Principles
- Isolation - Each integration is a separate driver with its own error handling
- Async by default - All I/O operations are non-blocking via Celery tasks
- Idempotency - Tasks can be safely retried without side effects
- Observable - Comprehensive logging and metrics at every layer
The Driver Pattern
The most important architectural decision was implementing a driver pattern that abstracts partner-specific logic while providing a consistent interface.
Driver Architecture
┌─────────────────────────────────────────────────────────────┐
│ Abstract Driver │
│ │
│ + get_homes() : PropertyImportIds │
│ + get_reservations(): List[Reservation] │
│ + push_status() : bool │
│ + authenticate() : Token │
└────────────────────┬────────────────────────────────────────┘
│
┌────────────┴────────────┬─────────────┐
│ │ │
┌───────▼─────────┐ ┌──────────▼─────┐ ┌───▼──────────┐
│ GuestyDriver │ │ AirbnbDriver │ │ CustomDriver │
│ │ │ │ │ │
│ - api: GuestyAPI│ │ - api: ... │ │ - api: ... │
│ - adapter │ │ - adapter │ │ - adapter │
│ - processor │ │ - processor │ │ - processor │
└───────┬─────────┘ └──────┬─────────┘ └───┬──────────┘
│ │ │
│ Uses │ │
▼ ▼ ▼
┌────────────────┐ ┌────────────────┐ ┌──────────────┐
│ GuestyAdapter │ │ AirbnbAdapter │ │ ... Adapter │
│ │ │ │ │ │
│ normalize() │ │ normalize() │ │ normalize() │
│ validate() │ │ validate() │ │ validate() │
└────────────────┘ └────────────────┘ └──────────────┘
Why This Pattern Works
Separation of Concerns:
- Driver - Orchestration and business logic
- API Client - HTTP communication, rate limiting, auth
- Adapter - Data transformation and normalization
- Processor - Domain-specific operations
Example: Fetching Properties
# High-level task
@celery_task
def import_properties(integration_id: int):
driver = DriverFactory.get_driver(integration_id)
property_ids = driver.get_homes()
# Schedule dependent tasks
for home_id in property_ids.homes_added_ids:
import_photos.delay(integration_id, home_id)
The driver handles all partner-specific complexity internally:
User calls: driver.get_homes()
│
├─> driver.api.authenticate() # Partner-specific auth
│
├─> driver.api.fetch_all('listings') # Paginated API calls
│ │
│ ├─> check_rate_limit() # Partner-specific limits
│ ├─> retry_with_backoff() # Handle transient failures
│ └─> parse_response()
│
├─> driver.adapter.normalize(raw) # Transform to standard format
│
├─> driver.processor.validate() # Business logic validation
│
└─> driver.processor.save() # Persist to database
Distributed Task Processing
Celery Task Architecture
We use Celery with RabbitMQ for distributed task processing. The key insight: task granularity matters.
Task Hierarchy
┌────────────────────────────────────────────────────────────┐
│ import_all_properties() - Master Task (runs every 4h) │
│ │
│ Scans all active integrations │
└────────────────────┬───────────────────────────────────────┘
│
┌───────────┴───────────────────┬──────────┐
│ │ │
┌────────▼────────────┐ ┌─────────────▼──┐ ┌───▼───────┐
│import_properties_ │ │import_properties│ │ ... │
│for_company(123) │ │_for_company(456)│ │ │
│ │ │ │ │ │
│ • Company-level │ │ │ │ │
│ • Can fail │ │ │ │ │
│ independently │ │ │ │ │
└────────┬────────────┘ └─────────┬───────┘ └───────────┘
│ │
┌────┴────┬─────────┐ ┌────┴─────┬──────┐
│ │ │ │ │ │
┌───▼───┐ ┌──▼───┐ ┌───▼──┐ ┌─▼───┐ ┌────▼─┐ ┌──▼──┐
│Guesty │ │Airbnb│ │Track │ │... │ │... │ │ ... │
│import │ │import│ │import│ │ │ │ │ │ │
└───┬───┘ └──┬───┘ └───┬──┘ └─────┘ └──────┘ └─────┘
│ │ │
└────────┴─────┬───┘
│
┌─────────┴──────────┬──────────┐
│ │ │
┌────────▼────────┐ ┌────────▼──────┐ ┌▼──────────┐
│import_photos │ │import_reserv. │ │update_... │
│(integration, │ │(integration, │ │ │
│ property_123) │ │ property_123) │ │ │
└─────────────────┘ └───────────────┘ └───────────┘
Task Design Principles
1. Fine-grained tasks for parallelism
# BAD: Single monolithic task
@task
def import_everything(company_id):
for integration in get_integrations(company_id):
properties = api.get_properties()
for prop in properties:
photos = api.get_photos(prop.id)
reservations = api.get_reservations(prop.id)
# ... more work
# Takes 30+ minutes, blocks worker
# GOOD: Decomposed tasks
@task
def import_company_properties(company_id):
for integration in get_integrations(company_id):
import_integration_properties.delay(integration.id)
@task
def import_integration_properties(integration_id):
driver = get_driver(integration_id)
props = driver.get_homes()
for prop_id in props.homes_added_ids:
import_property_details.delay(integration_id, prop_id)
2. Independent task failure
Company A fails ──┐
│
Company B succeeds├─> Other companies unaffected
│
Company C succeeds┘
3. Task timeouts and retries
@celery_task(
max_retries=3,
default_retry_delay=120, # 2 minutes
soft_time_limit=300, # 5 minutes
time_limit=360, # 6 minutes (hard limit)
)
def import_properties(integration_id):
try:
# ... work
except RateLimitError as e:
# Exponential backoff
raise self.retry(exc=e, countdown=min(2 ** self.request.retries * 60, 1800))
Rate Limiting Strategies
This is where theory meets brutal reality. Every API has different rate limits, and violating them can get you temporarily banned.
Common Rate Limit Patterns
┌────────────────────────────────────────────────────────────────┐
│ Rate Limit Patterns │
├────────────────────────────────────────────────────────────────┤
│ │
│ 1. Fixed Window │
│ ┌────────┬────────┬────────┬────────┐ │
│ │ 100req │ 100req │ 100req │ 100req │ │
│ └────────┴────────┴────────┴────────┘ │
│ 0s 60s 120s 180s │
│ Example: "100 requests per minute" │
│ Issue: Burst at window boundary │
│ │
│ 2. Sliding Window │
│ ┌──────────────────────────────────┐ │
│ │ Last 60 seconds = 100 req │ │
│ └──────────────────────────────────┘ │
│ ▲ moves continuously │
│ Better: No boundary burst │
│ │
│ 3. Token Bucket │
│ Bucket: [●●●○○] (3 tokens available) │
│ Refill: +1 token every 600ms │
│ Burst: Up to bucket size │
│ │
│ 4. Multi-tier Limits │
│ Per second: 10 requests ─┐ │
│ Per minute: 100 requests ├─ All must be satisfied │
│ Per hour: 5000 requests ┘ │
└────────────────────────────────────────────────────────────────┘
Strategy 1: Header-based Rate Limiting
Many APIs return rate limit info in response headers:
def check_rate_limit(self, headers):
"""
Headers example:
X-RateLimit-Remaining-Second: 8
X-RateLimit-Remaining-Minute: 95
"""
remaining_per_second = int(headers.get('X-RateLimit-Remaining-Second', 999))
remaining_per_minute = int(headers.get('X-RateLimit-Remaining-Minute', 999))
# Proactive throttling - don't wait until we hit 0
if remaining_per_second < 5:
time.sleep(1) # Wait for window to reset
if remaining_per_minute < 100:
time.sleep(1) # Slow down
Lesson learned: Be conservative. If the limit says “600/minute”, treat it as 500/minute to account for clock skew and concurrent workers.
Strategy 2: Celery Rate Limiting
# Limit task execution rate
@celery_task(rate_limit='10/m') # 10 tasks per minute
def call_partner_api(integration_id):
# This task won't execute more than 10 times/min
pass
Problem: This limits task execution, not API calls. One task might make 10 API calls.
Better approach: Separate queue per partner with concurrency control:
# celeryconfig.py
task_routes = {
'integrations.guesty.*': {'queue': 'guesty'},
'integrations.airbnb.*': {'queue': 'airbnb'},
}
# Start workers with concurrency limits
# celery -A app worker -Q guesty -c 2 # Max 2 concurrent guesty tasks
# celery -A app worker -Q airbnb -c 1 # Max 1 concurrent airbnb task
Strategy 3: Distributed Rate Limiting (Redis)
For APIs with strict limits across all servers:
import redis
import time
class DistributedRateLimiter:
def __init__(self, key, max_requests, window_seconds):
self.redis = redis.Redis()
self.key = f"ratelimit:{key}"
self.max_requests = max_requests
self.window = window_seconds
def allow_request(self):
now = time.time()
window_start = now - self.window
pipe = self.redis.pipeline()
# Remove old entries
pipe.zremrangebyscore(self.key, 0, window_start)
# Count requests in window
pipe.zcard(self.key)
# Add current request
pipe.zadd(self.key, {str(now): now})
# Set expiry
pipe.expire(self.key, self.window)
results = pipe.execute()
request_count = results[1]
return request_count < self.max_requests
# Usage
limiter = DistributedRateLimiter('guesty_api', max_requests=600, window_seconds=60)
if limiter.allow_request():
response = api.call()
else:
time.sleep(1) # Wait and retry
Strategy 4: Adaptive Rate Limiting
Learn from 429 responses:
class AdaptiveRateLimiter:
def __init__(self):
self.request_interval = 0.1 # Start optimistic
self.consecutive_429s = 0
def before_request(self):
time.sleep(self.request_interval)
def after_request(self, response):
if response.status_code == 429:
self.consecutive_429s += 1
# Exponential backoff
self.request_interval *= 1.5
# Check Retry-After header
retry_after = response.headers.get('Retry-After')
if retry_after:
time.sleep(int(retry_after))
else:
self.consecutive_429s = 0
# Slowly decrease interval (additive decrease)
self.request_interval = max(0.05, self.request_interval - 0.01)
Queue Management & Backpressure
The Queue Buildup Problem
Normal Operation:
┌───────┐ 100/s ┌───────┐ 100/s ┌─────────┐
│ Tasks │────────>│ Queue │────────>│ Workers │
└───────┘ └───────┘ └─────────┘
Size: ~10
Problem: API Slowdown (partner outage, rate limit)
┌───────┐ 100/s ┌──────────────┐ 10/s ┌─────────┐
│ Tasks │────────>│ Queue │───────>│ Workers │
└───────┘ │ │ └─────────┘
│ Size: 10,000 │ ← Buildup!
└──────────────┘
Result:
• Queue grows unbounded
• Memory exhaustion
• Old tasks process stale data
• Cascading failures
Solution 1: Task Expiration
@celery_task(expires=300) # Task expires after 5 minutes
def import_reservations(integration_id, property_id):
# If this task waits in queue > 5 minutes, discard it
# Fresh data will be imported in next scheduled run
pass
Solution 2: Queue Length Monitoring
def should_enqueue_task(queue_name):
"""Check queue depth before adding more tasks"""
queue_length = celery.app.control.inspect().active_queues()[queue_name]['messages']
if queue_length > 10000:
logger.warning(f"Queue {queue_name} backed up: {queue_length} messages")
return False
return True
# Usage
if should_enqueue_task('reservations'):
import_reservations.delay(integration_id, property_id)
else:
logger.info(f"Skipping task - queue backed up")
Solution 3: Circuit Breaker Pattern
Stop sending tasks when partner API is down:
┌─────────────────────────────────────────────────────────┐
│ Circuit Breaker States │
├─────────────────────────────────────────────────────────┤
│ │
│ CLOSED (Normal) │
│ ┌────────┐ │
│ │ Success│ ─────────> Allow requests │
│ └────────┘ │
│ │ │
│ │ Failure threshold exceeded (5 failures) │
│ ▼ │
│ ┌────────┐ │
│ │ OPEN │ ─────────> Reject requests (fail fast) │
│ └────────┘ Return cached/default data │
│ │ │
│ │ After timeout (60s) │
│ ▼ │
│ ┌──────────┐ │
│ │ HALF-OPEN│ ───────> Allow limited requests │
│ └──────────┘ (test if API recovered) │
│ │ │
│ ├─ Success ────> Go to CLOSED │
│ └─ Failure ────> Go to OPEN │
└─────────────────────────────────────────────────────────┘
Implementation:
from pybreaker import CircuitBreaker
# Create breaker per partner
guesty_breaker = CircuitBreaker(
fail_max=5, # Open after 5 failures
timeout_duration=60, # Try again after 60s
exclude=[requests.HTTPError], # Don't trip on expected errors
)
@guesty_breaker
def call_guesty_api():
response = requests.get('https://api.guesty.com/...')
response.raise_for_status()
return response.json()
# Usage
try:
data = call_guesty_api()
except CircuitBreakerError:
logger.warning("Guesty API circuit breaker open - skipping import")
return # Don't queue more tasks
Solution 4: Priority Queues
Not all tasks are equal:
# High priority: User-initiated actions
@celery_task(queue='high_priority')
def sync_single_property(property_id):
# User clicked "sync now" - should happen immediately
pass
# Normal priority: Scheduled imports
@celery_task(queue='normal_priority')
def import_properties(integration_id):
pass
# Low priority: Non-critical updates
@celery_task(queue='low_priority')
def import_photos(property_id):
# Nice to have but not critical
pass
Worker configuration:
# Worker processes queues in priority order
celery -A app worker -Q high_priority,normal_priority,low_priority
Solution 5: Worker Downtime & Queue Explosions
The nightmare scenario: workers crash at 2 AM, scheduled tasks keep queuing.
┌────────────────────────────────────────────────────────────────┐
│ Worker Downtime Disaster Timeline │
├────────────────────────────────────────────────────────────────┤
│ │
│ 00:00 Normal Operation │
│ ┌──────────┐ │
│ │ Worker 1 │───┐ │
│ └──────────┘ │ │
│ ┌──────────┐ ├─> [ Queue: 50 tasks ] ──> Processing OK │
│ │ Worker 2 │───┤ │
│ └──────────┘ │ │
│ ┌──────────┐ │ │
│ │ Worker 3 │───┘ │
│ └──────────┘ │
│ │
│ 02:15 ⚠ Disaster Strikes │
│ ┌──────────┐ │
│ │ Worker 1 │ ✗ CRASHED (OOM) │
│ └──────────┘ │
│ ┌──────────┐ │
│ │ Worker 2 │ ✗ CRASHED (OOM) │
│ └──────────┘ │
│ ┌──────────┐ │
│ │ Worker 3 │ ✗ CRASHED (OOM) │
│ └──────────┘ │
│ │
│ [ Queue: 50 tasks ] ──> Nobody processing! │
│ │
│ 02:20 Celery Beat keeps scheduling │
│ ┌────────────────────┐ │
│ │ New tasks added! │ │
│ └─────────┬──────────┘ │
│ ▼ │
│ [ Queue: 250 tasks ] (+200) │
│ │
│ 02:30 More scheduled tasks arrive │
│ ▼ │
│ [ Queue: 850 tasks ] (+600) │
│ │
│ 03:00 Queue explosion │
│ ▼ │
│ [ Queue: 2,450 tasks ] (+1,600) │
│ │
│ 06:00 Engineer wakes up │
│ ┌─────────────────────────────────────┐ │
│ │ ALERT: 10,000 tasks in queue! │ │
│ │ Oldest task age: 3h 45m │ │
│ │ No workers active since 02:15 │ │
│ └─────────────────────────────────────┘ │
│ ▼ │
│ [ Queue: 10,000+ tasks ] (+7,550) │
│ │
│ Problems: │
│ • Most tasks are hours old (stale data) │
│ • Will take 10+ hours to drain at normal rate │
│ • Duplicate/conflicting updates │
│ • Waste API quota on stale imports │
│ • Customer data not synced for hours │
└────────────────────────────────────────────────────────────────┘
Defense 1: Worker Health Monitoring
# Celery worker events
from celery.events import EventReceiver
from kombu import Connection
def monitor_workers():
"""Alert when no workers are processing tasks"""
with Connection(BROKER_URL) as conn:
recv = EventReceiver(conn, handlers={
'worker-heartbeat': on_heartbeat,
'worker-offline': on_worker_offline,
})
# Track active workers
active_workers = {}
last_heartbeat = {}
def on_heartbeat(event):
active_workers[event['hostname']] = True
last_heartbeat[event['hostname']] = time.time()
def on_worker_offline(event):
active_workers.pop(event['hostname'], None)
alert_oncall(f"Worker {event['hostname']} went offline")
# Periodic health check
while True:
recv.capture(limit=None, timeout=1, wakeup=True)
# Check for stale heartbeats
now = time.time()
for hostname, last_beat in last_heartbeat.items():
if now - last_beat > 60: # No heartbeat for 1 min
alert_oncall(f"Worker {hostname} heartbeat stale")
# Check if any workers are alive
if not active_workers:
critical_alert("NO WORKERS ACTIVE - QUEUE BUILDING UP")
time.sleep(10)
Defense 2: Smart Queue Draining
@celery_task
def import_properties(integration_id, enqueued_at=None):
"""Task with staleness check"""
if enqueued_at is None:
enqueued_at = datetime.utcnow()
# Check task staleness
age_seconds = (datetime.utcnow() - enqueued_at).total_seconds()
if age_seconds > 3600: # Task sat in queue for 1+ hour
logger.warning(
f"Skipping stale task for integration {integration_id}, "
f"age: {age_seconds}s"
)
return # Don't process stale data
# Check if already processed
last_run = get_last_successful_run(integration_id)
if last_run and last_run > enqueued_at:
logger.info(f"Already processed newer data, skipping")
return
# Process the task
driver = get_driver(integration_id)
driver.get_homes()
Defense 3: Queue Purging on Recovery
def recover_from_worker_outage():
"""Called when workers come back online after downtime"""
# Check queue depths
from celery import current_app
inspect = current_app.control.inspect()
active_queues = inspect.active_queues()
for queue_name, info in active_queues.items():
queue_length = info.get('messages', 0)
if queue_length > 5000:
logger.warning(f"Queue {queue_name} has {queue_length} tasks")
# Strategy 1: Purge and re-enqueue fresh
if queue_name in ['properties', 'reservations']:
logger.info(f"Purging stale {queue_name} queue")
current_app.control.purge()
# Re-trigger fresh imports
trigger_fresh_import_for_all_integrations()
# Strategy 2: Let it drain with staleness checks
elif queue_name in ['photos', 'low_priority']:
logger.info(f"Letting {queue_name} drain naturally")
# Tasks will self-skip if stale
# Strategy 3: Pause scheduling until drained
else:
logger.info(f"Pausing new tasks for {queue_name}")
pause_scheduled_tasks(queue_name)
Defense 4: Deployment Strategy
# Pre-deployment hook
def before_deploy():
"""Prepare for deployment to avoid queue buildup"""
# 1. Stop scheduling new tasks
logger.info("Pausing Celery Beat scheduler")
os.system("supervisorctl stop celerybeat")
# 2. Let current tasks drain
logger.info("Waiting for tasks to complete...")
wait_for_queue_drain(max_wait=300) # Wait up to 5 min
# 3. Graceful worker shutdown
logger.info("Shutting down workers gracefully")
os.system("celery -A app control shutdown")
# Post-deployment hook
def after_deploy():
"""Resume operations after deployment"""
# 1. Start workers
logger.info("Starting workers")
os.system("supervisorctl start celery-workers")
# 2. Verify workers are up
if not verify_workers_active(timeout=60):
critical_alert("Workers failed to start after deployment!")
return
# 3. Resume scheduling
logger.info("Resuming Celery Beat scheduler")
os.system("supervisorctl start celerybeat")
logger.info("Deployment complete, system operational")
Defense 5: Backup Queue Processing
# Cron job that runs every 5 minutes
def emergency_queue_processor():
"""Fallback processor in case Celery workers are down"""
queue_stats = get_queue_stats()
for queue_name, stats in queue_stats.items():
messages = stats['messages']
oldest_age = stats['oldest_age_seconds']
# If queue building up AND old tasks (workers likely down)
if messages > 1000 and oldest_age > 600: # 10 min old
logger.warning(
f"Emergency processing for {queue_name}: "
f"{messages} messages, oldest: {oldest_age}s"
)
# Process directly (bypass Celery)
process_queue_directly(queue_name, max_tasks=100)
# Alert
alert_oncall(
f"Queue {queue_name} processed via emergency fallback. "
f"Check worker health!"
)
Error Handling & Retries
Error Classification
Not all errors are equal. Classification determines retry strategy:
┌─────────────────────────────────────────────────────────────┐
│ Error Categories │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. TRANSIENT (Retry immediately) │
│ • 500 Internal Server Error │
│ • 502 Bad Gateway │
│ • 503 Service Unavailable │
│ • Network timeout │
│ • Connection refused │
│ Action: Retry with exponential backoff │
│ │
│ 2. RATE LIMIT (Retry with delay) │
│ • 429 Too Many Requests │
│ Action: Wait for Retry-After header, then retry │
│ │
│ 3. CLIENT ERROR (Don't retry) │
│ • 400 Bad Request │
│ • 401 Unauthorized │
│ • 403 Forbidden │
│ • 404 Not Found │
│ Action: Log and skip, possible integration issue │
│ │
│ 4. AUTH ERROR (Refresh token) │
│ • 401 with expired token message │
│ Action: Refresh OAuth token, then retry │
│ │
│ 5. DATA ERROR (Skip record) │
│ • Invalid/malformed response │
│ • Missing required fields │
│ Action: Log for investigation, continue with next │
└─────────────────────────────────────────────────────────────┘
Retry Strategy Implementation
from requests.exceptions import RequestException, Timeout
from requests import HTTPError
@celery_task(
bind=True,
max_retries=3,
default_retry_delay=60,
)
def import_properties(self, integration_id):
try:
driver = get_driver(integration_id)
properties = driver.get_homes()
return properties
except HTTPError as e:
status_code = e.response.status_code
# Client errors - don't retry
if 400 <= status_code < 500:
if status_code == 429: # Rate limit
retry_after = int(e.response.headers.get('Retry-After', 60))
logger.warning(f"Rate limited, retry after {retry_after}s")
raise self.retry(exc=e, countdown=retry_after)
elif status_code == 401: # Auth error
logger.error(f"Auth failed for integration {integration_id}")
# Try to refresh token
if refresh_oauth_token(integration_id):
raise self.retry(exc=e, countdown=5)
else:
# Can't refresh - alert humans
alert_auth_failure(integration_id)
return
else:
# Other 4xx - don't retry
logger.error(f"Client error {status_code}: {e.response.text}")
return
# Server errors - retry with backoff
elif 500 <= status_code < 600:
retry_count = self.request.retries
backoff = min(2 ** retry_count * 60, 1800) # Max 30 min
logger.warning(f"Server error {status_code}, retry {retry_count}, backoff {backoff}s")
raise self.retry(exc=e, countdown=backoff)
except (Timeout, RequestException) as e:
# Network errors - retry with backoff
retry_count = self.request.retries
backoff = min(2 ** retry_count * 30, 900) # Max 15 min
logger.warning(f"Network error: {e}, retry {retry_count}")
raise self.retry(exc=e, countdown=backoff)
except Exception as e:
# Unexpected error - log and alert
logger.exception(f"Unexpected error in import_properties: {e}")
alert_on_call_engineer(integration_id, e)
raise
Dead Letter Queue
Tasks that fail after all retries go to a dead letter queue for investigation:
@celery_task
def import_properties(integration_id):
try:
# ... work
except Exception as e:
if self.request.retries >= self.max_retries:
# Final failure - send to DLQ
dead_letter_queue.send({
'task': self.name,
'args': [integration_id],
'error': str(e),
'traceback': traceback.format_exc(),
'failed_at': datetime.utcnow(),
})
raise
Monitor DLQ for patterns:
-- Find common failure patterns
SELECT error, COUNT(*) as count
FROM dead_letter_queue
WHERE failed_at > NOW() - INTERVAL '24 hours'
GROUP BY error
ORDER BY count DESC;
Authentication Patterns
Every integration handles auth differently. Here are the common patterns:
Pattern 1: API Key (Simplest)
Request:
┌─────────────────────────────────────────────┐
│ GET /api/properties │
│ Headers: │
│ Authorization: Bearer abc123xyz │
└─────────────────────────────────────────────┘
Implementation:
class ApiKeyAuth:
def __init__(self, api_key):
self.api_key = api_key
def __call__(self, request):
request.headers['Authorization'] = f'Bearer {self.api_key}'
return request
session.auth = ApiKeyAuth(api_key)
Pattern 2: OAuth 2.0 (Common, Complex)
┌──────────────────────────────────────────────────────────────┐
│ OAuth 2.0 Flow │
├──────────────────────────────────────────────────────────────┤
│ │
│ Initial Setup (One-time): │
│ 1. User → Auth URL │
│ 2. User grants permission │
│ 3. Redirect with auth code │
│ 4. Exchange code for access + refresh tokens │
│ │
│ Ongoing API Calls: │
│ ┌──────────────────────────────────────┐ │
│ │ Use access token ────> API Call │ │
│ └────────┬─────────────────────────────┘ │
│ │ │
│ ├─ Success ────> Continue │
│ │ │
│ └─ 401 Unauthorized │
│ │ │
│ ▼ │
│ Refresh token flow: │
│ POST /oauth/token │
│ { │
│ grant_type: "refresh_token", │
│ refresh_token: "xyz...", │
│ client_id: "...", │
│ client_secret: "..." │
│ } │
│ │ │
│ ▼ │
│ New access token ─────> Retry API call │
│ │
│ Token Lifecycle: │
│ • Access token: Expires in 1-24 hours │
│ • Refresh token: Expires in 30-90 days │
│ • Refresh proactively before expiration │
└──────────────────────────────────────────────────────────────┘
Implementation:
class OAuthSession:
def __init__(self, integration_id):
self.integration_id = integration_id
self.load_tokens()
def load_tokens(self):
integration = CompanyIntegration.find(self.integration_id)
self.access_token = integration.access_token
self.refresh_token = integration.refresh_token
self.expires_at = integration.token_expires_at
def is_token_expired(self):
# Refresh 5 minutes before actual expiry
buffer = datetime.timedelta(minutes=5)
return datetime.utcnow() + buffer >= self.expires_at
def refresh_access_token(self):
response = requests.post(
'https://api.partner.com/oauth/token',
data={
'grant_type': 'refresh_token',
'refresh_token': self.refresh_token,
'client_id': settings.OAUTH_CLIENT_ID,
'client_secret': settings.OAUTH_CLIENT_SECRET,
}
)
if response.status_code != 200:
raise OAuthRefreshError(f"Failed to refresh token: {response.text}")
data = response.json()
self.access_token = data['access_token']
self.refresh_token = data.get('refresh_token', self.refresh_token)
self.expires_at = datetime.utcnow() + datetime.timedelta(
seconds=data['expires_in']
)
# Persist to database
self.save_tokens()
def make_request(self, url, **kwargs):
# Proactive token refresh
if self.is_token_expired():
self.refresh_access_token()
# Make request
headers = kwargs.pop('headers', {})
headers['Authorization'] = f'Bearer {self.access_token}'
response = requests.request(url=url, headers=headers, **kwargs)
# Reactive token refresh (if proactive failed)
if response.status_code == 401:
self.refresh_access_token()
headers['Authorization'] = f'Bearer {self.access_token}'
response = requests.request(url=url, headers=headers, **kwargs)
return response
Pattern 3: Rotating Credentials
Some partners (looking at you, Airbnb) rotate credentials periodically via webhooks:
# Webhook endpoint
@app.post('/webhooks/partner/credentials')
def update_credentials(request):
data = request.json()
integration_id = data['integration_id']
new_secret = data['secret']
integration = CompanyIntegration.find(integration_id)
integration.update(api_secret=new_secret)
return {'status': 'ok'}
Critical lesson: Always update credentials atomically and test before committing:
def rotate_credentials(integration_id, new_secret):
integration = CompanyIntegration.find(integration_id)
old_secret = integration.api_secret
# Test new credentials
try:
test_api_call(integration.api_key, new_secret)
except AuthError:
logger.error(f"New credentials invalid for {integration_id}")
return False
# Update
integration.update(api_secret=new_secret)
# Verify
try:
test_api_call(integration.api_key, new_secret)
except AuthError:
# Rollback
integration.update(api_secret=old_secret)
raise
return True
API Changes & Versioning
Partner APIs change without warning. Here’s how to survive:
Strategy 1: Version Pinning
class PartnerAPI:
BASE_URL = 'https://api.partner.com'
API_VERSION = 'v2' # Pin to specific version
def get_url(self, endpoint):
return f'{self.BASE_URL}/{self.API_VERSION}/{endpoint}'
Strategy 2: Response Validation
Catch breaking changes early:
from marshmallow import Schema, fields, ValidationError
class PropertySchema(Schema):
id = fields.Str(required=True)
name = fields.Str(required=True)
address = fields.Dict(required=True)
# ... other fields
def fetch_properties(self):
response = self.api.get('properties')
data = response.json()
schema = PropertySchema(many=True)
try:
validated = schema.load(data)
except ValidationError as e:
logger.error(f"API response validation failed: {e.messages}")
# Alert engineers
alert_api_schema_change(self.partner, e.messages)
raise
return validated
Strategy 3: Adapter Versioning
Support multiple API versions simultaneously:
class GuestyAdapterV1:
def normalize_property(self, raw):
return {
'external_id': raw['_id'],
'name': raw['title'],
'address': raw['address']['full'],
}
class GuestyAdapterV2:
def normalize_property(self, raw):
return {
'external_id': raw['id'], # Changed from '_id'
'name': raw['nickname'], # Changed from 'title'
'address': raw['location']['formatted'], # New structure
}
def get_adapter(integration):
api_version = integration.metadata.get('api_version', 'v1')
if api_version == 'v2':
return GuestyAdapterV2()
return GuestyAdapterV1()
Strategy 4: Gradual Rollout
When partner announces breaking changes:
# Feature flag for new API version
def get_api_version(integration_id):
if is_feature_flag_enabled('guesty_api_v2', integration_id=integration_id):
return 'v2'
return 'v1'
# Rollout plan:
# Week 1: Enable for 1 test customer
# Week 2: Enable for 10% of customers
# Week 3: Enable for 50% of customers
# Week 4: Enable for 100% of customers
# Week 5: Remove v1 code
Monitoring & Observability
You can’t fix what you can’t see. Comprehensive monitoring is non-negotiable.
Metrics to Track
┌──────────────────────────────────────────────────────────────┐
│ Monitoring Dashboard │
├──────────────────────────────────────────────────────────────┤
│ │
│ API Call Metrics (per partner): │
│ • Total calls/min [████░░░░] 450/600 │
│ • Success rate [█████████] 99.2% │
│ • Average response time [███░░░░░░] 250ms │
│ • P95 response time [█████░░░░] 800ms │
│ • P99 response time [███████░░] 1.2s │
│ │
│ Error Breakdown: │
│ • 4xx errors 12/hour │
│ └─ 401 Unauthorized 8 │
│ └─ 404 Not Found 4 │
│ • 5xx errors 3/hour │
│ • Timeouts 1/hour │
│ • Rate limits (429) 0/hour ✓ │
│ │
│ Queue Metrics: │
│ • properties queue [██░░░░░░░] 234 │
│ • reservations queue [█░░░░░░░░] 45 │
│ • photos queue [████░░░░░] 890 │
│ • Age of oldest task 2m 34s │
│ │
│ Task Metrics: │
│ • Tasks succeeded 1,234/hour │
│ • Tasks failed 12/hour │
│ • Tasks retried 45/hour │
│ • Average task duration [███░░░░░░] 8.3s │
│ │
│ Integration Health: │
│ • Guesty ✓ Healthy Last run: 2m ago │
│ • Airbnb ⚠ Degraded Rate limited │
│ • Booking.com ✗ Down Auth failure │
│ • CloudBeds ✓ Healthy Last run: 5m ago │
└──────────────────────────────────────────────────────────────┘
Implementation
from prometheus_client import Counter, Histogram, Gauge
# Define metrics
api_calls_total = Counter(
'integration_api_calls_total',
'Total API calls',
['partner', 'endpoint', 'status_code']
)
api_call_duration = Histogram(
'integration_api_call_duration_seconds',
'API call duration',
['partner', 'endpoint']
)
queue_length = Gauge(
'integration_queue_length',
'Number of tasks in queue',
['queue_name']
)
# Instrument API calls
class LoggedApiSession(requests.Session):
def __init__(self, partner):
super().__init__()
self.partner = partner
def request(self, method, url, **kwargs):
endpoint = url.split('/')[-1] # Simple endpoint extraction
with api_call_duration.labels(self.partner, endpoint).time():
response = super().request(method, url, **kwargs)
api_calls_total.labels(
self.partner,
endpoint,
response.status_code
).inc()
return response
Alerting Rules
# Prometheus alert rules
groups:
- name: integrations
rules:
# High error rate
- alert: HighAPIErrorRate
expr: |
rate(integration_api_calls_total{status_code=~"5.."}[5m]) > 0.05
for: 10m
annotations:
summary: "High API error rate for "
# Auth failures
- alert: AuthenticationFailures
expr: |
rate(integration_api_calls_total{status_code="401"}[5m]) > 0
for: 5m
annotations:
summary: "Auth failures for "
# Queue backup
- alert: QueueBackingUp
expr: integration_queue_length > 5000
for: 15m
annotations:
summary: "Queue has tasks"
# Slow API calls
- alert: SlowAPIResponses
expr: |
histogram_quantile(0.95,
rate(integration_api_call_duration_seconds_bucket[5m])
) > 5
for: 10m
annotations:
summary: "P95 latency for is s"
Structured Logging
import structlog
logger = structlog.get_logger()
# Every log includes context
logger.info(
"api_call_completed",
partner="guesty",
endpoint="properties",
integration_id=integration_id,
duration_ms=response_time,
status_code=response.status_code,
properties_returned=len(properties),
)
# Enables powerful queries:
# "Show me all Guesty calls that took >2s"
# "Which integrations had auth failures today?"
# "What's the average properties per API call by partner?"
Cascading Requests
Some operations trigger chains of API calls. This is where things get interesting.
The Problem
Import Properties Request
│
├─> Fetch property list (1 API call)
│
├─> For each property (100 properties):
│ ├─> Fetch property details (100 API calls)
│ ├─> Fetch property photos (100 API calls)
│ └─> Fetch reservations (100 API calls)
│
└─> Total: 301 API calls for one import!
With 50 integrations running every hour:
• 15,050 API calls/hour
• 250 API calls/minute
• 4+ API calls/second
Each partner has different rate limits!
Strategy 1: Batching
Fetch multiple items in one request:
# BAD: N+1 queries
for property_id in property_ids:
details = api.get(f'/properties/{property_id}') # 100 API calls
# GOOD: Batch fetch
property_ids_str = ','.join(property_ids)
details = api.get(f'/properties?ids={property_ids_str}') # 1 API call
Strategy 2: Parallel with Concurrency Control
from concurrent.futures import ThreadPoolExecutor, as_completed
def fetch_property_details(property_id):
return api.get(f'/properties/{property_id}')
property_ids = [...] # 100 IDs
# Limit concurrency to respect rate limits
MAX_CONCURRENT = 5
with ThreadPoolExecutor(max_workers=MAX_CONCURRENT) as executor:
futures = {
executor.submit(fetch_property_details, pid): pid
for pid in property_ids
}
results = []
for future in as_completed(futures):
property_id = futures[future]
try:
result = future.result()
results.append(result)
except Exception as e:
logger.error(f"Failed to fetch property {property_id}: {e}")
# Result: 100 properties fetched in ~20 API calls (5 at a time)
# Instead of 100 sequential calls taking 100s, takes ~20s
Strategy 3: Task Chaining
Break cascading requests into separate tasks:
@celery_task
def import_properties(integration_id):
driver = get_driver(integration_id)
property_ids = driver.get_property_ids() # Fast, just IDs
# Chain: fetch details for each property
for prop_id in property_ids:
fetch_property_details.delay(integration_id, prop_id)
@celery_task
def fetch_property_details(integration_id, property_id):
driver = get_driver(integration_id)
details = driver.get_property_details(property_id)
# Chain: fetch related data
fetch_property_photos.delay(integration_id, property_id)
fetch_reservations.delay(integration_id, property_id)
@celery_task
def fetch_property_photos(integration_id, property_id):
# Fetches photos
pass
@celery_task
def fetch_reservations(integration_id, property_id):
# Fetches reservations
pass
Benefits:
- Each task is independently retryable
- Failures don’t cascade
- Easy to monitor progress
- Natural rate limiting via queue
Strategy 4: Smart Prioritization
Don’t fetch everything for every property:
@celery_task
def import_properties(integration_id):
properties = driver.get_properties()
for prop in properties:
# Always import critical data
import_property_basic.delay(integration_id, prop.id)
# Only import photos if changed
if prop.photos_updated_at > last_import:
import_photos.delay(integration_id, prop.id)
# Only import future reservations
if has_upcoming_reservations(prop.id):
import_reservations.delay(integration_id, prop.id)
Memory Management & Batching
One of the sneakiest failure modes: your integration works perfectly for 10 properties, then crashes with OOM (Out Of Memory) when processing 10,000.
The Memory Bloat Problem
Scenario: Import 5,000 properties for a large customer
┌────────────────────────────────────────────────────────────┐
│ Memory Usage Timeline │
├────────────────────────────────────────────────────────────┤
│ │
│ 8GB │ ┌─CRASH │
│ │ ┌────┘ │
│ 6GB │ ┌────┘ │
│ │ ┌────┘ │
│ 4GB │ ┌────┘ │
│ │ ┌────┘ │
│ 2GB │ ┌────┘ │
│ │ ┌────┘ │
│ 0GB └──────────┘──────────────────────────────────────────┤
│ 0 1000 2000 3000 4000 5000 │
│ Properties Processed │
│ │
│ Problem: Loading all data into memory at once │
│ • Each property: ~50KB │
│ • 5,000 properties: 250MB base data │
│ • Python object overhead: ~3x │
│ • List/dict allocations: ~2x │
│ • Total: ~1.5GB just for raw data │
│ • Peak with processing: 3-8GB! │
└────────────────────────────────────────────────────────────┘
Root Causes:
- Loading entire dataset into memory
- Building large lists/dicts without bounds
- Not releasing references (Python GC can’t collect)
- SQLAlchemy session holding all objects
- Pandas DataFrames growing unbounded
Strategy 1: Chunking / Pagination
Process data in fixed-size chunks:
# BAD: Load everything into memory
@celery_task
def import_all_reservations(integration_id):
driver = get_driver(integration_id)
# This loads ALL reservations into memory!
all_reservations = driver.get_reservations() # Could be 50,000+ items
for reservation in all_reservations: # Memory keeps growing
process_reservation(reservation)
save_reservation(reservation)
# Memory not released until task completes
# GOOD: Process in chunks
@celery_task
def import_all_reservations(integration_id):
driver = get_driver(integration_id)
CHUNK_SIZE = 100 # Process 100 at a time
offset = 0
while True:
# Fetch chunk
chunk = driver.get_reservations(limit=CHUNK_SIZE, offset=offset)
if not chunk:
break # No more data
# Process chunk
for reservation in chunk:
process_reservation(reservation)
save_reservation(reservation)
# Clear chunk from memory
del chunk
# Move to next chunk
offset += CHUNK_SIZE
# Optional: Force garbage collection
import gc
gc.collect()
logger.info(f"Processed {offset} reservations")
Memory comparison:
Without chunking: 5,000 items × 50KB = 250MB minimum
With chunking: 100 items × 50KB = 5MB at a time (50x reduction!)
Strategy 2: Streaming / Generator Pattern
Use generators to process data lazily:
# BAD: Return entire list
def fetch_all_properties(api):
properties = []
page = 1
while True:
response = api.get(f'/properties?page={page}')
properties.extend(response['data']) # List keeps growing!
if not response['has_more']:
break
page += 1
return properties # Entire dataset in memory
# GOOD: Yield items one at a time
def stream_properties(api):
"""Generator that yields properties without loading all into memory"""
page = 1
while True:
response = api.get(f'/properties?page={page}')
for prop in response['data']:
yield prop # Yield one item at a time
if not response['has_more']:
break
page += 1
# Usage
@celery_task
def import_properties(integration_id):
api = get_api(integration_id)
# Memory efficient: only one property in memory at a time
for prop in stream_properties(api):
process_property(prop)
save_property(prop)
# prop is garbage collected after this iteration
Strategy 3: Database Batching
Batch database operations to avoid holding objects in session:
# BAD: Session accumulates all objects
@celery_task
def import_properties(integration_id):
session = get_session()
properties = fetch_all_properties() # 5,000 items
for prop_data in properties:
# SQLAlchemy session keeps reference to every object!
property = Property(**prop_data)
session.add(property)
session.commit() # Huge commit, all objects in memory
# Memory only released after commit
# GOOD: Batch commits with session cleanup
@celery_task
def import_properties(integration_id):
BATCH_SIZE = 100
for chunk in chunked(stream_properties(api), BATCH_SIZE):
session = get_session()
for prop_data in chunk:
property = Property(**prop_data)
session.add(property)
# Commit batch
session.commit()
# Critical: Close session to release memory
session.close()
# Alternative: Expunge objects
# session.expunge_all()
def chunked(iterable, size):
"""Yield successive chunks from iterable"""
chunk = []
for item in iterable:
chunk.append(item)
if len(chunk) == size:
yield chunk
chunk = []
if chunk:
yield chunk
Strategy 4: Bulk Operations
Use bulk operations instead of individual inserts:
# BAD: Individual inserts
for prop_data in properties:
Property.create(**prop_data) # Separate query each time
# GOOD: Bulk insert
Property.bulk_insert([
{'name': prop['name'], 'address': prop['address'], ...}
for prop in properties
])
# BETTER: Bulk insert in chunks
for chunk in chunked(properties, 1000):
Property.bulk_insert([
{'name': p['name'], 'address': p['address']}
for p in chunk
])
# Each chunk is separate transaction
Strategy 5: Memory Profiling
Identify memory leaks before they hit production:
from memory_profiler import profile
import tracemalloc
# Method 1: Decorator-based profiling
@profile
@celery_task
def import_properties(integration_id):
# Function body
pass
# Outputs:
# Line # Mem usage Increment
# ======================================
# 12 50.2 MiB 0.0 MiB def import_properties():
# 13 75.4 MiB 25.2 MiB data = fetch_all()
# 14 75.4 MiB 0.0 MiB for item in data:
# 15 145.8 MiB 70.4 MiB process(item) # Memory leak!
# Method 2: Tracemalloc for production monitoring
@celery_task
def import_properties(integration_id):
tracemalloc.start()
# Take snapshot before
snapshot_before = tracemalloc.take_snapshot()
# Do work
properties = fetch_all_properties()
for prop in properties:
process_property(prop)
# Take snapshot after
snapshot_after = tracemalloc.take_snapshot()
# Compare
top_stats = snapshot_after.compare_to(snapshot_before, 'lineno')
# Log top memory consumers
logger.info("Top 10 memory allocations:")
for stat in top_stats[:10]:
logger.info(f"{stat}")
# Alert if memory usage too high
current, peak = tracemalloc.get_traced_memory()
if peak > 1024 * 1024 * 1024: # 1GB
alert_oncall(f"High memory usage: {peak / 1024 / 1024:.1f}MB")
tracemalloc.stop()
Strategy 6: Explicit Garbage Collection
Force Python to release memory:
import gc
@celery_task
def import_large_dataset(integration_id):
for i, chunk in enumerate(fetch_in_chunks()):
process_chunk(chunk)
# Clear local references
del chunk
# Force GC every 10 chunks
if i % 10 == 0:
# Disable GC during processing
gc.disable()
# Process chunk
# ...
# Enable and force collection
gc.enable()
collected = gc.collect()
logger.debug(f"GC collected {collected} objects")
# Log memory stats
memory_info = get_memory_usage()
logger.info(f"Memory: {memory_info['rss']}MB RSS, {memory_info['vms']}MB VMS")
Strategy 7: Circular Reference Detection
Find and break circular references that prevent GC:
import gc
import sys
def find_circular_references():
"""Debug helper to find circular references"""
gc.collect() # Force collection first
# Find all objects
for obj in gc.get_objects():
if isinstance(obj, dict):
# Check for circular references in dicts
for key, value in obj.items():
if value is obj:
logger.warning(f"Circular reference found: {obj}")
# Common circular reference patterns
# Pattern 1: Parent-child relationship
class Property:
def __init__(self):
self.reservations = []
class Reservation:
def __init__(self, property):
self.property = property # Reference to parent
property.reservations.append(self) # Parent references child
# Circular reference!
# Solution: Use weak references
import weakref
class Reservation:
def __init__(self, property):
self.property = weakref.ref(property) # Weak reference
property.reservations.append(self)
# Pattern 2: Cached data with references
cache = {}
def get_property(property_id):
if property_id in cache:
return cache[property_id]
prop = Property.find(property_id)
cache[property_id] = prop # Keeps reference forever!
return prop
# Solution: Use LRU cache with size limit
from functools import lru_cache
@lru_cache(maxsize=1000) # Limit cache size
def get_property(property_id):
return Property.find(property_id)
Strategy 8: Memory Limits per Task
Prevent runaway tasks from consuming all memory:
import resource
@celery_task
def import_properties(integration_id):
# Set memory limit: 512MB
soft, hard = resource.getrlimit(resource.RLIMIT_AS)
resource.setrlimit(resource.RLIMIT_AS, (512 * 1024 * 1024, hard))
try:
# Do work
process_data()
except MemoryError:
logger.error(f"Task exceeded memory limit")
# Graceful degradation
process_data_in_smaller_chunks()
# Celery task time and memory limits
@celery_task(
soft_time_limit=300, # 5 min soft limit
time_limit=360, # 6 min hard limit
)
def import_properties(integration_id):
pass
Real-World Example: Processing 10,000 Properties
@celery_task(bind=True)
def import_all_properties_chunked(self, integration_id):
"""Memory-efficient property import"""
driver = get_driver(integration_id)
stats = {
'processed': 0,
'failed': 0,
'peak_memory_mb': 0,
}
CHUNK_SIZE = 100
tracemalloc.start()
try:
# Stream properties (no memory buildup)
for chunk_idx, property_chunk in enumerate(
chunked(driver.stream_properties(), CHUNK_SIZE)
):
# Track memory
current, peak = tracemalloc.get_traced_memory()
peak_mb = peak / 1024 / 1024
stats['peak_memory_mb'] = max(stats['peak_memory_mb'], peak_mb)
# Alert if memory growing
if peak_mb > 500: # 500MB threshold
logger.warning(f"High memory usage: {peak_mb:.1f}MB")
gc.collect()
# Process chunk with separate session
session = get_session()
try:
bulk_data = []
for prop_data in property_chunk:
try:
# Transform data
normalized = normalize_property(prop_data)
bulk_data.append(normalized)
stats['processed'] += 1
except Exception as e:
logger.error(f"Failed to process property: {e}")
stats['failed'] += 1
# Bulk insert chunk
if bulk_data:
Property.bulk_insert(bulk_data)
session.commit()
except Exception as e:
session.rollback()
logger.error(f"Failed to save chunk {chunk_idx}: {e}")
finally:
session.close()
# Clear chunk from memory
del property_chunk
del bulk_data
# Force GC every 10 chunks
if chunk_idx % 10 == 0:
collected = gc.collect()
logger.info(
f"Chunk {chunk_idx}: {stats['processed']} processed, "
f"GC collected {collected} objects, "
f"Peak memory: {peak_mb:.1f}MB"
)
# Update task progress
self.update_state(
state='PROGRESS',
meta={
'processed': stats['processed'],
'failed': stats['failed'],
}
)
finally:
tracemalloc.stop()
logger.info(
f"Import complete: {stats['processed']} processed, "
f"{stats['failed']} failed, "
f"Peak memory: {stats['peak_memory_mb']:.1f}MB"
)
return stats
Memory Management Checklist
┌────────────────────────────────────────────────────────────┐
│ Memory Optimization Checklist │
├────────────────────────────────────────────────────────────┤
│ │
│ ✓ Use generators/streaming instead of loading all data │
│ ✓ Process data in chunks (100-1000 items) │
│ ✓ Use bulk operations for database writes │
│ ✓ Close database sessions after each batch │
│ ✓ Delete large objects after use (del variable) │
│ ✓ Force garbage collection in long-running tasks │
│ ✓ Avoid circular references (use weakref) │
│ ✓ Profile memory usage in development │
│ ✓ Set memory limits per task │
│ ✓ Monitor memory in production │
│ ✓ Clear caches periodically │
│ ✓ Use LRU cache with size limits │
│ ✓ Avoid global state accumulation │
│ │
└────────────────────────────────────────────────────────────┘
Key Insight: Memory issues are like compound interest - small leaks accumulate over time. A 1MB leak per task becomes 1GB after 1,000 tasks. Design for bounded memory usage from day one.
Data Consistency
Integration data is inherently eventually consistent. Embrace it.
Challenge: Concurrent Updates
Scenario: Two workers processing same property
Worker A Worker B
│ │
├─ Fetch property @ 10:00:00 │
│ name: "Beach House" │
│ ├─ Fetch property @ 10:00:01
│ │ name: "Beach House"
│ │
├─ Update name to "Ocean Villa" │
│ (from API) │
│ Save @ 10:00:05 │
│ │
│ ├─ Update name to "Beach House"
│ │ (from stale cached data)
│ │ Save @ 10:00:06
│ │
└─ RESULT: Old data wins! ✗ ─┘
Solution 1: Last-Write-Wins with Timestamps
def update_property(property_id, new_data, fetched_at):
property = Property.find(property_id)
# Only update if data is newer
if fetched_at > property.last_synced_at:
property.update(
name=new_data['name'],
address=new_data['address'],
last_synced_at=fetched_at,
)
return True
else:
logger.info(f"Skipping stale update for property {property_id}")
return False
Solution 2: Optimistic Locking
from sqlalchemy.orm import version_id_column
class Property(Base):
__tablename__ = 'properties'
id = Column(Integer, primary_key=True)
name = Column(String)
version = version_id_column() # Auto-incremented on each update
def update_property(property_id, new_data):
from sqlalchemy.orm.exc import StaleDataError
session = get_session()
try:
property = session.query(Property).filter_by(id=property_id).one()
property.name = new_data['name']
session.commit() # Fails if version changed
except StaleDataError:
session.rollback()
logger.warning(f"Concurrent update detected for property {property_id}")
# Retry or skip
Solution 3: Idempotency Keys
Make operations idempotent:
@celery_task
def import_reservations(integration_id, property_id, idempotency_key):
# Check if already processed
cache_key = f'import_res:{idempotency_key}'
if redis.exists(cache_key):
logger.info(f"Skipping duplicate import: {idempotency_key}")
return
# Do the import
reservations = driver.get_reservations(property_id)
for res in reservations:
upsert_reservation(res)
# Mark as processed (24 hour expiry)
redis.setex(cache_key, 86400, '1')
# Usage: generate stable idempotency key
idempotency_key = f"{integration_id}:{property_id}:{date.today().isoformat()}"
import_reservations.delay(integration_id, property_id, idempotency_key)
Solution 4: Event Sourcing
Store all changes as events:
# Instead of updating property directly:
property.name = "New Name" # Lost history
# Store events:
events.append({
'type': 'property.name_changed',
'property_id': property_id,
'old_value': 'Beach House',
'new_value': 'Ocean Villa',
'source': 'guesty_api',
'timestamp': datetime.utcnow(),
})
# Rebuild current state from events
def get_property_state(property_id):
events = Event.query.filter_by(property_id=property_id).order_by('timestamp')
state = {}
for event in events:
apply_event(state, event)
return state
Testing Strategies
Testing integrations is hard because you depend on external APIs.
Strategy 1: Record & Replay (VCR)
import vcr
# First run: records real API responses
# Subsequent runs: replays from cassette
@vcr.use_cassette('fixtures/vcr_cassettes/guesty_properties.yaml')
def test_fetch_properties():
api = GuestyAPI(client_id='test', client_secret='test')
properties = api.get_homes()
assert len(properties) > 0
assert properties[0]['_id']
assert properties[0]['nickname']
# Cassette file (auto-generated):
# interactions:
# - request:
# method: GET
# uri: https://api.guesty.com/api/v2/listings
# response:
# status: {code: 200}
# body: {string: '{"results": [...]}'}
Benefits:
- Tests don’t hit real API (fast, free)
- Deterministic results
- Works offline
Drawbacks:
- Cassettes get stale
- Need to refresh periodically
Strategy 2: Contract Testing
Validate our assumptions about partner APIs:
import pytest
def test_guesty_api_contract():
"""Validates Guesty API still matches our expectations"""
api = GuestyAPI(client_id=TEST_CREDS['id'], client_secret=TEST_CREDS['secret'])
# Test 1: Auth works
assert api.is_authenticated
# Test 2: Properties endpoint exists
properties = api.get_homes()
assert isinstance(properties, list)
# Test 3: Property structure matches schema
if properties:
prop = properties[0]
assert '_id' in prop
assert 'nickname' in prop
assert 'address' in prop
assert 'accommodates' in prop
# Test 4: Rate limit headers present
response = api._last_response
assert 'X-RateLimit-Remaining-Minute' in response.headers
# Run these tests daily against production APIs
# Alert if contracts break
Strategy 3: Integration Test Environment
Some partners provide sandbox/test environments:
# config.py
if ENV == 'production':
GUESTY_BASE_URL = 'https://api.guesty.com'
elif ENV == 'test':
GUESTY_BASE_URL = 'https://api.sandbox.guesty.com'
Reality check: Most partners don’t have good test environments. When they exist:
- Often missing features
- Stale data
- Different behavior than production
- Extra cost
Strategy 4: Chaos Testing
Simulate failures to ensure resilience:
import random
class ChaosAPISession(requests.Session):
def request(self, *args, **kwargs):
# Randomly inject failures
if random.random() < 0.1: # 10% failure rate
failure_type = random.choice([
'timeout',
'500_error',
'429_rate_limit',
'network_error',
])
if failure_type == 'timeout':
time.sleep(30)
raise Timeout("Simulated timeout")
elif failure_type == '500_error':
return Mock(status_code=500, text="Internal Server Error")
elif failure_type == '429_rate_limit':
return Mock(status_code=429, headers={'Retry-After': '60'})
elif failure_type == 'network_error':
raise ConnectionError("Simulated network failure")
return super().request(*args, **kwargs)
# Use in staging environment
if ENV == 'staging':
session_class = ChaosAPISession
else:
session_class = requests.Session
Lessons Learned
After 40+ integrations, here are the hard-won lessons:
1. Expect Failure
❌ Don’t: Assume APIs are reliable
properties = api.get_properties() # What if this fails?
for prop in properties:
save(prop)
✅ Do: Design for failure
try:
properties = api.get_properties()
except APIError as e:
logger.error(f"Failed to fetch properties: {e}")
alert_on_call()
return # Graceful degradation
for prop in properties:
try:
save(prop)
except Exception as e:
logger.error(f"Failed to save property {prop['id']}: {e}")
# Continue with next property
2. Rate Limits Are Real
Mistake: Treating rate limits as suggestions
Reality: You will get banned. Recovery is painful (support tickets, manual approval).
Defense in depth:
- Read the docs (they’re usually wrong)
- Implement conservative limits (50-70% of stated limit)
- Use multiple layers (per-worker, per-queue, distributed)
- Monitor actual limits via response headers
- Implement backoff on 429s
- Have circuit breakers
3. Authentication Will Break
Plan for:
- Token expiration (even “long-lived” tokens)
- Token refresh failures
- Credential rotation
- OAuth provider outages
- Webhook auth updates
Solution: Monitoring + Alerts + Runbooks
# Alert on auth failures
if auth_failure_count_last_hour(partner) > 5:
page_oncall(f"{partner} auth failing - check credentials")
4. APIs Change Without Warning
Examples I’ve seen:
- Field renamed (
property_name→name) - Response structure changed (flat → nested)
- Pagination method changed (offset → cursor)
- Rate limits tightened (1000/min → 100/min)
- Endpoints deprecated with no notice
Defense:
- Version pinning
- Response validation (schemas)
- Contract tests (daily)
- Adapter pattern (easy to swap implementations)
- Feature flags (gradual rollouts)
5. One Integration, Many Edge Cases
Common issues:
- Missing required fields (e.g., no address for some properties)
- Inconsistent data types (string vs int for IDs)
- Timezone confusion (UTC? Local? Ambiguous?)
- Encoding issues (UTF-8? Latin-1? Emoji?)
- Null vs empty string vs missing field
Solution: Defensive parsing + validation
def parse_property(raw):
return {
'id': str(raw['id']), # Always string, even if sometimes int
'name': raw.get('name', '').strip() or 'Untitled Property',
'address': raw.get('address', {}).get('formatted', 'Unknown'),
'capacity': int(raw.get('accommodates') or 0),
'created_at': parse_datetime(raw.get('createdAt')), # Handles many formats
}
6. Logs Are Your Best Friend
When something goes wrong (and it will), comprehensive logs make the difference between 15 minute debugging and 4 hour investigation.
Log:
- Every API call (URL, method, status, duration)
- Every error (with context: integration_id, property_id, etc.)
- State changes (property created, updated, deleted)
- Rate limit info (headers, delays)
- Auth events (token refresh, failures)
Don’t log:
- API keys, tokens, passwords (obviously)
- PII without careful consideration
- High-cardinality data in metric labels
7. Monitoring Must Be Actionable
Bad alert:
"API error rate increased"
Good alert:
"Guesty API 401 errors for integration #1234 (Company: ACME Inc)
Last successful auth: 2 hours ago
Action: Check if customer revoked permissions
Runbook: https://wiki.company.com/integrations/guesty-auth-failures"
8. Start Simple, Add Complexity When Needed
Don’t prematurely optimize:
- First integration: Simple cron job might be fine
- 5 integrations: Add basic queue
- 10+ integrations: Distributed tasks
- 40+ integrations: Need all the patterns in this post
Over-engineering early is worse than under-engineering.
9. Memory Leaks Kill Silently
The problem: Works perfectly for 100 items, crashes mysteriously at 10,000.
Common causes:
- Loading entire dataset into memory
- SQLAlchemy session holding all objects
- Circular references preventing GC
- Cached data growing unbounded
- Large lists/dicts built incrementally
Solutions:
# BAD: Load everything
all_items = fetch_all() # 10,000 items in memory!
for item in all_items:
process(item)
# GOOD: Stream/chunk
for chunk in fetch_in_chunks(size=100):
process_chunk(chunk)
del chunk # Release memory
gc.collect() # Force cleanup
Key insight: Profile memory in dev, monitor in prod. Memory issues compound over time.
10. Worker Downtime = Queue Disaster
Scenario: Workers crash at 2 AM, scheduler keeps queuing tasks.
Result: 10,000+ stale tasks by morning, 10+ hours to drain.
Defense:
- Monitor worker heartbeats (alert within 1 minute)
- Task staleness checks (skip tasks older than 1 hour)
- Queue depth alerts (page when > 1000)
- Graceful deployment (drain before restart)
- Emergency fallback processor
Critical: Most “queue backup” problems are actually “worker down” problems.
11. Partner APIs Are Not Created Equal
Tier 1 (Great):
- Comprehensive docs
- Webhook support
- Reasonable rate limits
- Stable APIs
- Responsive support
- Example: Stripe, Twilio
Tier 2 (Okay):
- Basic docs
- Some rate limits
- Breaking changes occasionally
- Slow support
- Example: Most PMS systems
Tier 3 (Painful):
- Outdated docs (or no docs)
- Aggressive rate limits
- Frequent breaking changes
- No support
- Example: You know who you are
Adjust complexity accordingly. Don’t build the same architecture for all partners.
12. Documentation Debt Is Real
Document:
- Partner-specific quirks
- Rate limits (real ones, not official docs)
- Auth renewal process
- Common failures + solutions
- Deployment considerations
Example quirks doc:
## Guesty Integration
### Rate Limits
Official: 600 req/min
Actual: ~500 req/min (aggressive throttling around 550)
Per-second limit: 10 req/s (undocumented!)
### Auth
- Token expires after 24 hours
- Refresh can take up to 30s
- Refresh endpoint rate limited to 10/hour
### Known Issues
- Property IDs sometimes change (merge/split operations)
- Address field can be null even for published properties
- Photos API occasionally returns 502 (retry works)
### Support
- Response time: 3-5 business days
- Slack channel: #vendor-guesty
Conclusion
Building scalable integrations is a marathon, not a sprint. The patterns in this post took years to develop through trial, error, and production incidents.
Key Takeaways:
- Isolation - Use driver pattern to contain partner-specific logic
- Async - Celery/queues for distributed processing
- Rate Limiting - Multiple layers, conservative limits, adaptive backoff
- Error Handling - Classify errors, retry intelligently, dead letter queue
- Circuit Breakers - Fail fast when partner is down
- Memory Management - Chunk data, stream processing, explicit GC
- Worker Monitoring - Heartbeat checks, queue depth alerts, graceful deploys
- Monitoring - Comprehensive metrics, actionable alerts
- Testing - Contract tests, chaos engineering, record/replay
- Data Consistency - Timestamps, optimistic locking, idempotency
- Documentation - Runbooks, quirks, postmortems
Remember: Integration engineering is fundamentally different from building your own APIs. You’re operating in a hostile, unpredictable environment where Murphy’s Law is the only constant.
The teams that succeed are those that:
- Expect failure and design for resilience
- Monitor relentlessly
- Iterate based on production feedback
- Document everything (especially the weird stuff)
Final Advice: Start simple. Add complexity only when you feel the pain. Every pattern in this post was born from a production incident.
Good luck, and may your rate limits be generous and your tokens forever fresh.
Questions? Want to discuss integration architecture? Find me on GitHub or LinkedIn.