In 2024, I watched a developer spend three weeks building an integration that broke on the first day of production. The API documentation said one thing; the actual API did something else. The rate limits weren't documented. The error messages were useless. The authentication flow had undocumented edge cases.

In 2026, that same integration would take three days—not because APIs got better (they didn't), but because we've accumulated hard-won knowledge about what actually works. This guide is that knowledge: the patterns that survive contact with production, the failures that teach you what documentation won't, and the code that handles the edge cases that don't exist in tutorials.

We're not talking theory. Every pattern in this guide comes from real integrations we've built and operated:

This is what we learned.

1. The API-First Mindset

Before we discuss protocols and patterns, let's establish the philosophy that makes integration work tractable. The API-first mindset isn't about technology—it's about how you think about systems.

Everything is a Service

Modern business runs on services. Your CRM is a service. Your accounting system is a service. Your wholesale marketplace is a service. Your calendar is a service. Even internal tools should be thought of as services with APIs.

When you internalize this, integration becomes natural. The question isn't "can we connect these systems?"—it's "what's the API, and what does it let us do?"

🔌 The Integration Inventory

Before building anything, map your service landscape:

1. List
What services does your business use? CRM, accounting, inventory, communication, analytics, payments...
2. API Check
Does each service have an API? What type? REST, GraphQL, SOAP? Is it documented? Is there a sandbox?
3. Auth Model
How do you authenticate? API key? OAuth? What permissions/scopes are available?
4. Limits
Rate limits? Data limits? Webhook support? What operations are read-only vs. read-write?
5. Data Model
What objects exist? Orders, customers, products? How do they relate? What's the identifier strategy?

The Contract Mindset

An API is a contract. The documentation says: "Send me this, and I'll give you that." Like any contract, the devil is in the details—and sometimes the counterparty doesn't honor the terms exactly.

Treat APIs defensively:

Composition Over Custom

The API-first mindset prefers composing existing services over building custom solutions. Need email? Use an email API (SendGrid, Mailgun, SES). Need payments? Use a payment API (Stripe, Square). Need voice? Use a voice API (Twilio).

This isn't laziness—it's leverage. A well-integrated system using mature APIs will be more reliable, more feature-rich, and cheaper to maintain than a custom-built alternative.

💡 The Build vs. Integrate Decision

Build when: You need complete control, the problem is core to your business, or no adequate API exists.

Integrate when: The problem is solved, APIs are mature, and your differentiation isn't in the plumbing.

For most businesses, the answer is integrate. Your competitive advantage probably isn't in how you send emails.

2. REST vs GraphQL vs WebSockets

The three dominant paradigms for API communication each have their place. Understanding when to use each—and how they differ in practice—saves enormous debugging time.

REST

Resource-based, HTTP verbs, stateless

  • Most common
  • Simple to understand
  • Cacheable
  • May over-fetch
GraphQL

Query language, single endpoint, typed schema

  • Precise data fetching
  • Self-documenting
  • Complex queries
  • Learning curve
WebSocket

Bidirectional, persistent connection, real-time

  • Real-time updates
  • Low latency
  • Connection state
  • Complex to manage

REST: The Workhorse

REST (Representational State Transfer) is what you'll encounter most often. It maps HTTP methods to CRUD operations on resources:

Method Operation Example
GET Read GET /api/v2/orders/12345
POST Create POST /api/v2/orders
PUT Replace PUT /api/v2/orders/12345
PATCH Update PATCH /api/v2/orders/12345
DELETE Delete DELETE /api/v2/orders/12345

LeafLink uses REST. Here's what a typical interaction looks like:

LeafLink Orders API HTTP
# List recent orders
GET https://www.leaflink.com/api/v2/orders/?limit=50&ordering=-modified
Authorization: App [API_KEY]

# Response (simplified)
{
  "count": 9131,
  "next": "https://www.leaflink.com/api/v2/orders/?limit=50&offset=50",
  "results": [
    {
      "id": 4821934,
      "number": "SO-2026-0892",
      "status": "Submitted",
      "customer": 28451,
      "total": "4250.00",
      "created_on": "2026-02-03T14:22:18Z",
      "line_items": [...]
    }
  ]
}

REST Pagination Patterns

Real APIs return paginated results. You'll encounter three patterns:

Offset
?limit=50&offset=100
Simple but inefficient for deep pages. Skip 100 items, return 50. LeafLink, QuickBooks use this.
Cursor
?cursor=eyJpZCI6MTIzNH0=
Opaque token for next page. More efficient, handles inserts/deletes during pagination. Stripe uses this.
Keyset
?after_id=12345
Use last item's ID to fetch next page. Efficient, but requires stable sort. Etherscan uses this.

Production pattern: Always implement pagination handling as a generator or iterator. Fetch pages on demand rather than loading everything into memory:

Pagination Generator Python
async def fetch_all_orders(session, base_url, api_key):
    """Generator that yields all orders across pages."""
    url = f"{base_url}/orders/?limit=100"
    
    while url:
        async with session.get(url, headers={"Authorization": f"App {api_key}"}) as resp:
            if resp.status == 429:  # Rate limited
                retry_after = int(resp.headers.get("Retry-After", 60))
                await asyncio.sleep(retry_after)
                continue
                
            resp.raise_for_status()
            data = await resp.json()
            
            for order in data["results"]:
                yield order
            
            url = data.get("next")  # None when no more pages
            
            if url:
                await asyncio.sleep(0.1)  # Respect rate limits

GraphQL: Precision Queries

GraphQL solves REST's over-fetching problem. Instead of getting a fixed response shape, you request exactly the fields you need. Distru uses GraphQL for their manufacturing ERP API.

Distru GraphQL Query GraphQL
# Fetch products with only the fields we need
query GetProducts($first: Int!, $after: String) {
  products(first: $first, after: $after) {
    edges {
      node {
        id
        name
        sku
        category {
          name
        }
        currentInventory {
          quantity
          locationName
        }
        unitPrice
        metrcTagIds
      }
      cursor
    }
    pageInfo {
      hasNextPage
      endCursor
    }
  }
}

# Variables
{
  "first": 50,
  "after": "cursor_abc123"
}

Key GraphQL concepts:

GraphQL Mutation Example GraphQL
mutation UpdateProductInventory($input: UpdateInventoryInput!) {
  updateInventory(input: $input) {
    product {
      id
      currentInventory {
        quantity
        locationName
      }
    }
    errors {
      field
      messages
    }
  }
}

# Variables
{
  "input": {
    "productId": "prod_123",
    "locationId": "loc_456",
    "adjustment": 100,
    "reason": "Received shipment"
  }
}
⚠️ GraphQL Gotchas
  • N+1 queries: Requesting nested data can cause performance issues server-side
  • Caching: Harder to cache than REST (POST requests, custom queries)
  • Error handling: HTTP 200 with errors in the response body. Check both!
  • Tooling required: You really want a GraphQL client library, not raw HTTP

WebSockets: Real-Time Updates

When you need live data—chat messages, price feeds, blockchain events—WebSockets provide bidirectional, persistent connections. They're essential for real-time applications but more complex to manage.

WebSocket Client Pattern Python
import websockets
import asyncio
import json

class BlockchainPriceStream:
    def __init__(self, url: str):
        self.url = url
        self.ws = None
        self.reconnect_delay = 1
        self.max_reconnect_delay = 60
        
    async def connect(self):
        while True:
            try:
                self.ws = await websockets.connect(self.url)
                self.reconnect_delay = 1  # Reset on successful connection
                
                # Subscribe to channels
                await self.ws.send(json.dumps({
                    "method": "subscribe",
                    "params": ["eth_price", "gas_price"]
                }))
                
                async for message in self.ws:
                    await self.handle_message(json.loads(message))
                    
            except websockets.ConnectionClosed:
                print(f"Connection closed, reconnecting in {self.reconnect_delay}s...")
                await asyncio.sleep(self.reconnect_delay)
                self.reconnect_delay = min(self.reconnect_delay * 2, self.max_reconnect_delay)
                
            except Exception as e:
                print(f"Error: {e}, reconnecting...")
                await asyncio.sleep(self.reconnect_delay)
                
    async def handle_message(self, data: dict):
        if data.get("type") == "price_update":
            # Process real-time price update
            print(f"ETH: ${data['price']}")
        elif data.get("type") == "ping":
            await self.ws.send(json.dumps({"type": "pong"}))

WebSocket challenges:

Choosing the Right Paradigm

Use Case Best Choice Why
CRUD operations REST Simple, well-understood, cacheable
Complex data fetching GraphQL Avoid over-fetching, single request
Real-time updates WebSocket Push-based, low latency
Batch operations REST Bulk endpoints, parallel requests
Mobile/bandwidth constrained GraphQL Minimize payload size
Simple public data REST CDN caching, link sharing

3. Authentication Patterns

Authentication is where integrations often fail first. Different APIs use different patterns, and getting it wrong means 401 errors forever. Here's what you'll encounter.

API Keys: Simple but Limited

API keys are the simplest authentication: a secret string you include with each request. LeafLink, Etherscan, and Distru use API keys.

API Key Authentication Examples HTTP
# LeafLink: Custom Authorization header
GET /api/v2/orders/
Authorization: App sk_live_abc123xyz

# Etherscan: Query parameter
GET /api?module=account&action=balance&address=0x...&apikey=YOURKEY

# Distru: Bearer token (though it's an API key)
POST /graphql
Authorization: Bearer dist_abc123xyz

API key best practices:

Secure Key Loading Python
import os
from pathlib import Path

def load_api_key(service: str) -> str:
    """Load API key from file or environment."""
    # Try environment variable first
    env_key = f"{service.upper()}_API_KEY"
    if key := os.environ.get(env_key):
        return key
    
    # Fall back to config file
    key_file = Path.home() / ".config" / service / "api_key"
    if key_file.exists():
        return key_file.read_text().strip()
    
    raise ValueError(f"No API key found for {service}. "
                     f"Set {env_key} or create {key_file}")

# Usage
leaflink_key = load_api_key("leaflink")
etherscan_key = load_api_key("etherscan")

OAuth 2.0: The Full Dance

OAuth 2.0 is complex but necessary when you need to access user data on their behalf. Google, QuickBooks, and many other enterprise APIs require OAuth.

OAuth 2.0 Authorization Code Flow
Your App
→ 1. Redirect
Auth Server
→ 2. Login
User
↓ 3. Authorization Code
Your App
→ 4. Exchange Code
Auth Server
→ 5. Tokens
Access + Refresh

The OAuth dance step by step:

  1. Build authorization URL: Include client_id, redirect_uri, scopes, and state
  2. User authenticates: They log in to the service and grant permissions
  3. Receive authorization code: Service redirects to your URL with a code
  4. Exchange code for tokens: POST the code to get access and refresh tokens
  5. Use access token: Include it in API requests
  6. Refresh when expired: Use refresh token to get new access token
Google OAuth Implementation Python
import os
import json
import webbrowser
from datetime import datetime, timedelta
from pathlib import Path
from urllib.parse import urlencode, parse_qs, urlparse
from http.server import HTTPServer, BaseHTTPRequestHandler
import requests

class GoogleOAuth:
    """Complete OAuth 2.0 implementation for Google APIs."""
    
    AUTH_URL = "https://accounts.google.com/o/oauth2/v2/auth"
    TOKEN_URL = "https://oauth2.googleapis.com/token"
    
    def __init__(self, client_id: str, client_secret: str, 
                 scopes: list[str], redirect_port: int = 8080):
        self.client_id = client_id
        self.client_secret = client_secret
        self.scopes = scopes
        self.redirect_uri = f"http://localhost:{redirect_port}/callback"
        self.redirect_port = redirect_port
        self.tokens_file = Path.home() / ".config" / "google-calendar" / "tokens.json"
        
    def get_authorization_url(self, state: str = None) -> str:
        """Build the URL to start OAuth flow."""
        params = {
            "client_id": self.client_id,
            "redirect_uri": self.redirect_uri,
            "response_type": "code",
            "scope": " ".join(self.scopes),
            "access_type": "offline",  # Get refresh token
            "prompt": "consent",  # Force consent to get refresh token
        }
        if state:
            params["state"] = state
        return f"{self.AUTH_URL}?{urlencode(params)}"
    
    def exchange_code(self, code: str) -> dict:
        """Exchange authorization code for tokens."""
        response = requests.post(self.TOKEN_URL, data={
            "client_id": self.client_id,
            "client_secret": self.client_secret,
            "code": code,
            "grant_type": "authorization_code",
            "redirect_uri": self.redirect_uri,
        })
        response.raise_for_status()
        tokens = response.json()
        
        # Add expiration timestamp
        tokens["expires_at"] = (
            datetime.now() + timedelta(seconds=tokens["expires_in"])
        ).isoformat()
        
        self._save_tokens(tokens)
        return tokens
    
    def refresh_access_token(self) -> str:
        """Use refresh token to get new access token."""
        tokens = self._load_tokens()
        if not tokens or "refresh_token" not in tokens:
            raise ValueError("No refresh token available. Re-authorize.")
        
        response = requests.post(self.TOKEN_URL, data={
            "client_id": self.client_id,
            "client_secret": self.client_secret,
            "refresh_token": tokens["refresh_token"],
            "grant_type": "refresh_token",
        })
        response.raise_for_status()
        new_tokens = response.json()
        
        # Preserve refresh token (not always returned)
        new_tokens["refresh_token"] = tokens["refresh_token"]
        new_tokens["expires_at"] = (
            datetime.now() + timedelta(seconds=new_tokens["expires_in"])
        ).isoformat()
        
        self._save_tokens(new_tokens)
        return new_tokens["access_token"]
    
    def get_valid_token(self) -> str:
        """Get a valid access token, refreshing if necessary."""
        tokens = self._load_tokens()
        if not tokens:
            raise ValueError("No tokens found. Run authorization flow first.")
        
        expires_at = datetime.fromisoformat(tokens["expires_at"])
        if datetime.now() >= expires_at - timedelta(minutes=5):
            # Token expired or expiring soon, refresh
            return self.refresh_access_token()
        
        return tokens["access_token"]
    
    def _save_tokens(self, tokens: dict):
        self.tokens_file.parent.mkdir(parents=True, exist_ok=True)
        self.tokens_file.write_text(json.dumps(tokens, indent=2))
        self.tokens_file.chmod(0o600)  # Secure permissions
        
    def _load_tokens(self) -> dict | None:
        if self.tokens_file.exists():
            return json.loads(self.tokens_file.read_text())
        return None

# Usage example
oauth = GoogleOAuth(
    client_id=os.environ["GOOGLE_CLIENT_ID"],
    client_secret=os.environ["GOOGLE_CLIENT_SECRET"],
    scopes=[
        "https://www.googleapis.com/auth/calendar",
        "https://www.googleapis.com/auth/gmail.modify",
        "https://www.googleapis.com/auth/drive",
    ]
)

# Get access token for API calls
token = oauth.get_valid_token()
headers = {"Authorization": f"Bearer {token}"}
⚠️ OAuth Common Mistakes
  • Missing access_type=offline: You won't get a refresh token without it
  • Forgetting to refresh: Access tokens expire (usually 1 hour). Handle it!
  • Lost refresh token: If you don't persist it, user must re-authorize
  • Scope creep: Request only the scopes you need. Users notice.
  • State parameter: Always use it to prevent CSRF attacks

JWT Tokens: Self-Contained Claims

JSON Web Tokens (JWTs) are often used as access tokens. They're special because they contain embedded claims that can be verified without a database lookup.

JWT Structure Text
# A JWT has three parts, base64-encoded and separated by dots:
eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxMjM0IiwiZXhwIjoxNzA3MDAwMDAwfQ.signature

# Decoded header:
{"alg": "HS256", "typ": "JWT"}

# Decoded payload (claims):
{
  "sub": "1234",           # Subject (user ID)
  "exp": 1707000000,       # Expiration timestamp
  "iat": 1706999000,       # Issued at
  "scope": "read write",   # Permissions
  "iss": "api.example.com" # Issuer
}

# Signature: HMAC-SHA256 of header + payload using secret key

Working with JWTs:

JWT Validation Python
import jwt
from datetime import datetime

def validate_jwt(token: str, secret: str, required_scopes: list[str] = None) -> dict:
    """Validate JWT and return claims."""
    try:
        # Decode and verify signature
        claims = jwt.decode(token, secret, algorithms=["HS256"])
        
        # Check expiration (jwt library does this, but be explicit)
        if claims.get("exp") and datetime.fromtimestamp(claims["exp"]) < datetime.now():
            raise ValueError("Token expired")
        
        # Check required scopes
        if required_scopes:
            token_scopes = claims.get("scope", "").split()
            missing = set(required_scopes) - set(token_scopes)
            if missing:
                raise ValueError(f"Missing scopes: {missing}")
        
        return claims
        
    except jwt.InvalidTokenError as e:
        raise ValueError(f"Invalid token: {e}")

# For APIs that use JWTs, you often just pass them through
# The API validates; you just need to refresh when expired

4. Rate Limiting and Backoff Strategies

Every API has limits. Hit them, and you'll get blocked. Understanding rate limiting patterns and implementing proper backoff is essential for production integrations.

Types of Rate Limits

API Limit Type Limit Window
LeafLink Per-user ~60 req per minute
Etherscan Per-key 5 req per second
Google APIs Per-project Varies by API per day/minute
QuickBooks Per-app 500 req per minute
Twilio Per-account Varies by endpoint concurrency
Distru Not documented ~100 req per minute (observed)

Detecting Rate Limits

Rate limit responses vary by API, but common patterns include:

Rate Limit Response Examples HTTP
# Standard: HTTP 429 Too Many Requests
HTTP/1.1 429 Too Many Requests
Retry-After: 60
X-RateLimit-Limit: 100
X-RateLimit-Remaining: 0
X-RateLimit-Reset: 1707001200

# Etherscan: Returns 200 with error in body
{
  "status": "0",
  "message": "NOTOK",
  "result": "Max rate limit reached, please use API Key for higher rate limit"
}

# Some APIs: HTTP 503 Service Unavailable (overload protection)

Exponential Backoff with Jitter

When rate limited, don't retry immediately—you'll just get blocked again. Exponential backoff with jitter is the standard pattern:

Production-Ready Rate Limit Handler Python
import asyncio
import random
from dataclasses import dataclass
from datetime import datetime, timedelta
import aiohttp

@dataclass
class RateLimitConfig:
    requests_per_window: int
    window_seconds: int
    max_retries: int = 5
    base_delay: float = 1.0
    max_delay: float = 60.0

class RateLimitedClient:
    """HTTP client with rate limiting and exponential backoff."""
    
    def __init__(self, config: RateLimitConfig):
        self.config = config
        self.request_times: list[datetime] = []
        self.session: aiohttp.ClientSession = None
        
    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        return self
        
    async def __aexit__(self, *args):
        if self.session:
            await self.session.close()
    
    async def request(self, method: str, url: str, **kwargs) -> dict:
        """Make rate-limited request with automatic retry."""
        
        for attempt in range(self.config.max_retries):
            # Pre-emptive rate limiting
            await self._wait_for_slot()
            
            try:
                async with self.session.request(method, url, **kwargs) as resp:
                    # Track request time
                    self._record_request()
                    
                    # Success
                    if resp.status == 200:
                        return await resp.json()
                    
                    # Rate limited
                    if resp.status == 429:
                        retry_after = self._parse_retry_after(resp)
                        delay = retry_after or self._backoff_delay(attempt)
                        print(f"Rate limited. Waiting {delay:.1f}s...")
                        await asyncio.sleep(delay)
                        continue
                    
                    # Server error (may be transient)
                    if resp.status >= 500:
                        delay = self._backoff_delay(attempt)
                        print(f"Server error {resp.status}. Retrying in {delay:.1f}s...")
                        await asyncio.sleep(delay)
                        continue
                    
                    # Client error (don't retry)
                    resp.raise_for_status()
                    
            except aiohttp.ClientError as e:
                if attempt < self.config.max_retries - 1:
                    delay = self._backoff_delay(attempt)
                    print(f"Network error: {e}. Retrying in {delay:.1f}s...")
                    await asyncio.sleep(delay)
                else:
                    raise
        
        raise Exception(f"Max retries ({self.config.max_retries}) exceeded")
    
    async def _wait_for_slot(self):
        """Wait until we have capacity within rate limit window."""
        now = datetime.now()
        window_start = now - timedelta(seconds=self.config.window_seconds)
        
        # Remove old requests outside window
        self.request_times = [t for t in self.request_times if t > window_start]
        
        # If at capacity, wait for oldest request to exit window
        if len(self.request_times) >= self.config.requests_per_window:
            oldest = self.request_times[0]
            wait_time = (oldest + timedelta(seconds=self.config.window_seconds) - now).total_seconds()
            if wait_time > 0:
                await asyncio.sleep(wait_time + 0.1)  # Small buffer
    
    def _record_request(self):
        """Record timestamp of request."""
        self.request_times.append(datetime.now())
    
    def _backoff_delay(self, attempt: int) -> float:
        """Calculate exponential backoff with jitter."""
        delay = self.config.base_delay * (2 ** attempt)
        delay = min(delay, self.config.max_delay)
        # Add jitter: ±25% randomization
        jitter = delay * 0.25 * (2 * random.random() - 1)
        return delay + jitter
    
    def _parse_retry_after(self, response) -> float | None:
        """Parse Retry-After header if present."""
        if retry_after := response.headers.get("Retry-After"):
            try:
                return float(retry_after)
            except ValueError:
                pass
        return None

# Usage for Etherscan (5 req/sec)
etherscan_config = RateLimitConfig(
    requests_per_window=5,
    window_seconds=1,
    max_retries=5
)

async def fetch_balance(address: str) -> dict:
    async with RateLimitedClient(etherscan_config) as client:
        return await client.request(
            "GET",
            "https://api.etherscan.io/v2/api",
            params={
                "chainid": 1,
                "module": "account",
                "action": "balance",
                "address": address,
                "tag": "latest",
                "apikey": ETHERSCAN_API_KEY
            }
        )
✅ Rate Limit Pro Tips
  • Pre-emptive limiting: Track your own requests; don't wait for 429s
  • Jitter is critical: Without it, retries from multiple clients sync up and cause thundering herd
  • Respect Retry-After: If the API tells you when to retry, use it
  • Batch when possible: One request for 100 items beats 100 requests for 1 item
  • Cache aggressively: Don't re-fetch data that hasn't changed

5. Error Handling Best Practices

APIs fail. Networks fail. Services go down. Your integration needs to handle errors gracefully, provide useful debugging information, and recover automatically when possible.

Error Categories

Category HTTP Codes Retry? Action
Client Error 400, 401, 403, 404, 422 No Fix your request
Rate Limit 429 Yes (with backoff) Slow down
Server Error 500, 502, 503, 504 Yes (limited) Wait and retry
Network Error N/A (connection failed) Yes (limited) Check connectivity
Timeout N/A Maybe Increase timeout or retry

Structured Error Handling

Error Handling Framework Python
from dataclasses import dataclass
from enum import Enum
from typing import Optional
import traceback

class ErrorSeverity(Enum):
    TRANSIENT = "transient"    # Retry will likely succeed
    PERMANENT = "permanent"    # Won't succeed without code change
    UNKNOWN = "unknown"        # Unclear, may need investigation

@dataclass
class APIError(Exception):
    """Structured API error with context."""
    message: str
    status_code: Optional[int] = None
    response_body: Optional[str] = None
    endpoint: Optional[str] = None
    severity: ErrorSeverity = ErrorSeverity.UNKNOWN
    retry_after: Optional[float] = None
    request_id: Optional[str] = None
    
    def __str__(self):
        parts = [self.message]
        if self.status_code:
            parts.append(f"[HTTP {self.status_code}]")
        if self.endpoint:
            parts.append(f"at {self.endpoint}")
        if self.request_id:
            parts.append(f"(request_id: {self.request_id})")
        return " ".join(parts)
    
    @property
    def should_retry(self) -> bool:
        return self.severity == ErrorSeverity.TRANSIENT
    
    def to_dict(self) -> dict:
        """Serialize for logging."""
        return {
            "message": self.message,
            "status_code": self.status_code,
            "response_body": self.response_body,
            "endpoint": self.endpoint,
            "severity": self.severity.value,
            "retry_after": self.retry_after,
            "request_id": self.request_id,
        }

def classify_error(status_code: int, response_body: str = "") -> ErrorSeverity:
    """Classify error severity based on status code and response."""
    if status_code == 429:
        return ErrorSeverity.TRANSIENT
    elif status_code >= 500:
        return ErrorSeverity.TRANSIENT
    elif status_code in (401, 403):
        return ErrorSeverity.PERMANENT  # Auth issues need manual fix
    elif status_code == 404:
        return ErrorSeverity.PERMANENT  # Resource doesn't exist
    elif status_code == 400:
        # Could be transient (data validation) or permanent (bad request format)
        if "temporarily" in response_body.lower():
            return ErrorSeverity.TRANSIENT
        return ErrorSeverity.PERMANENT
    return ErrorSeverity.UNKNOWN

async def handle_response(response, endpoint: str) -> dict:
    """Process API response with structured error handling."""
    body = await response.text()
    request_id = response.headers.get("X-Request-ID")
    
    if response.status == 200:
        try:
            data = await response.json()
            # Some APIs return errors in 200 responses (looking at you, Etherscan)
            if isinstance(data, dict) and data.get("status") == "0":
                raise APIError(
                    message=data.get("message", "API returned error status"),
                    status_code=200,
                    response_body=body,
                    endpoint=endpoint,
                    severity=ErrorSeverity.PERMANENT,
                    request_id=request_id,
                )
            return data
        except json.JSONDecodeError as e:
            raise APIError(
                message=f"Invalid JSON response: {e}",
                status_code=response.status,
                response_body=body[:500],
                endpoint=endpoint,
                severity=ErrorSeverity.TRANSIENT,  # Might be temporary glitch
                request_id=request_id,
            )
    
    # Error response
    severity = classify_error(response.status, body)
    retry_after = None
    if response.status == 429:
        retry_after = float(response.headers.get("Retry-After", 60))
    
    raise APIError(
        message=f"API request failed",
        status_code=response.status,
        response_body=body[:1000],  # Truncate large error bodies
        endpoint=endpoint,
        severity=severity,
        retry_after=retry_after,
        request_id=request_id,
    )

Logging for Debugging

When an integration fails in production, you need enough context to diagnose the issue. Log strategically:

Structured Logging Python
import logging
import json
from datetime import datetime

class APILogger:
    """Structured logging for API operations."""
    
    def __init__(self, service_name: str):
        self.service = service_name
        self.logger = logging.getLogger(f"api.{service_name}")
        
    def request(self, method: str, url: str, **kwargs):
        """Log outgoing request (sanitize sensitive data)."""
        # Remove auth headers from logs
        safe_headers = {k: v for k, v in kwargs.get("headers", {}).items()
                       if k.lower() not in ("authorization", "x-api-key")}
        
        self.logger.info(json.dumps({
            "event": "api_request",
            "service": self.service,
            "method": method,
            "url": url,
            "headers": safe_headers,
            "timestamp": datetime.now().isoformat(),
        }))
    
    def response(self, status: int, duration_ms: float, **kwargs):
        """Log response details."""
        self.logger.info(json.dumps({
            "event": "api_response",
            "service": self.service,
            "status": status,
            "duration_ms": duration_ms,
            "timestamp": datetime.now().isoformat(),
            **kwargs,
        }))
    
    def error(self, error: APIError):
        """Log structured error."""
        self.logger.error(json.dumps({
            "event": "api_error",
            "service": self.service,
            **error.to_dict(),
            "timestamp": datetime.now().isoformat(),
        }))
🚫 Never Log
  • API keys, tokens, or credentials
  • Customer personal information (PII)
  • Payment details (card numbers, bank accounts)
  • Full request bodies with sensitive data
  • Internal system paths that reveal architecture

6. Webhook Patterns

Webhooks flip the model: instead of polling an API for changes, the API pushes changes to you. They're essential for real-time integrations but introduce new challenges.

Webhook vs Polling
Polling:
Your App → (every 5 min) → API: "Any changes?"
Delayed, wasteful, but simple
Webhook:
API → (on change) → Your App: "Here's what changed"
Real-time, efficient, but complex

Webhook Security

Anyone can send HTTP requests to your webhook endpoint. You must verify that requests actually came from the expected service.

Webhook Signature Verification Python
import hmac
import hashlib
from fastapi import FastAPI, Request, HTTPException

app = FastAPI()

WEBHOOK_SECRET = os.environ["LEAFLINK_WEBHOOK_SECRET"]

def verify_webhook_signature(payload: bytes, signature: str, secret: str) -> bool:
    """Verify HMAC-SHA256 webhook signature."""
    expected = hmac.new(
        secret.encode(),
        payload,
        hashlib.sha256
    ).hexdigest()
    
    # Constant-time comparison to prevent timing attacks
    return hmac.compare_digest(expected, signature)

@app.post("/webhooks/leaflink")
async def handle_leaflink_webhook(request: Request):
    # Get raw body for signature verification
    body = await request.body()
    
    # Get signature from header (format varies by provider)
    signature = request.headers.get("X-LeafLink-Signature", "")
    
    if not verify_webhook_signature(body, signature, WEBHOOK_SECRET):
        raise HTTPException(status_code=401, detail="Invalid signature")
    
    # Parse and process
    event = await request.json()
    await process_webhook_event(event)
    
    # Always return 200 quickly (process async if needed)
    return {"status": "received"}

Idempotency: Handling Duplicate Webhooks

Webhooks can be delivered multiple times. Network issues, retries, and bugs all cause duplicates. Your handler must be idempotent—processing the same event twice should have no additional effect.

Idempotent Webhook Handler Python
import hashlib
from datetime import datetime, timedelta

# Simple in-memory store (use Redis in production)
processed_events: dict[str, datetime] = {}

def get_event_id(event: dict) -> str:
    """Extract or generate unique event identifier."""
    # Many providers include an event ID
    if event_id := event.get("event_id") or event.get("id"):
        return str(event_id)
    
    # Fall back to content hash
    content = json.dumps(event, sort_keys=True).encode()
    return hashlib.sha256(content).hexdigest()[:16]

def is_duplicate(event_id: str, window_hours: int = 24) -> bool:
    """Check if event was already processed."""
    # Clean old entries
    cutoff = datetime.now() - timedelta(hours=window_hours)
    expired = [k for k, v in processed_events.items() if v < cutoff]
    for k in expired:
        del processed_events[k]
    
    return event_id in processed_events

def mark_processed(event_id: str):
    """Record that event was processed."""
    processed_events[event_id] = datetime.now()

async def process_webhook_event(event: dict):
    """Process webhook with idempotency."""
    event_id = get_event_id(event)
    
    if is_duplicate(event_id):
        print(f"Skipping duplicate event: {event_id}")
        return
    
    try:
        # Actual processing
        event_type = event.get("type")
        
        if event_type == "order.created":
            await handle_new_order(event["data"])
        elif event_type == "order.updated":
            await handle_order_update(event["data"])
        elif event_type == "order.shipped":
            await handle_order_shipped(event["data"])
        else:
            print(f"Unknown event type: {event_type}")
        
        # Only mark processed after successful handling
        mark_processed(event_id)
        
    except Exception as e:
        # Don't mark as processed - allow retry
        print(f"Error processing {event_id}: {e}")
        raise

Webhook Best Practices

LeafLink is the dominant B2B wholesale marketplace in the cannabis industry. Our integration connects to 285 customers, 1,468 products, and has processed over 9,000 orders. Here's how we built it.

🌿 LeafLink B2B Integration
REST API v2
9,131
Orders Processed
285
Customers
1,468
Products
6808
Seller ID

API Overview

LeafLink provides a REST API with comprehensive access to orders, products, customers, and inventory. Authentication is via API key in a custom header format.

LeafLink API Configuration Python
from dataclasses import dataclass
from pathlib import Path

@dataclass
class LeafLinkConfig:
    """LeafLink API configuration."""
    base_url: str = "https://www.leaflink.com/api/v2"
    api_key: str = None
    seller_id: int = 6808
    brand_id: int = 1518
    
    def __post_init__(self):
        if not self.api_key:
            # Load from secure file
            key_file = Path.home() / ".config" / "leaflink" / "api_key"
            self.api_key = key_file.read_text().strip()
    
    @property
    def headers(self) -> dict:
        return {
            "Authorization": f"App {self.api_key}",
            "Content-Type": "application/json",
            "Accept": "application/json",
        }

Fetching Orders with Full Context

The real challenge with LeafLink isn't basic API calls—it's efficiently fetching orders with all related data (customer details, product info, line items) without hammering the API.

Complete LeafLink Client Python
import asyncio
import aiohttp
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import AsyncIterator, Optional
import json

@dataclass
class LeafLinkOrder:
    """Structured order data."""
    id: int
    number: str
    status: str
    customer_id: int
    customer_name: Optional[str] = None
    total: float = 0.0
    created_on: Optional[datetime] = None
    modified_on: Optional[datetime] = None
    line_items: list = field(default_factory=list)
    raw_data: dict = field(default_factory=dict)

class LeafLinkClient:
    """Production LeafLink API client."""
    
    def __init__(self, config: LeafLinkConfig):
        self.config = config
        self.session: aiohttp.ClientSession = None
        self._customer_cache: dict[int, dict] = {}
        self._product_cache: dict[int, dict] = {}
        
    async def __aenter__(self):
        self.session = aiohttp.ClientSession(
            headers=self.config.headers,
            timeout=aiohttp.ClientTimeout(total=30)
        )
        return self
    
    async def __aexit__(self, *args):
        if self.session:
            await self.session.close()
    
    async def get_orders(
        self,
        since: datetime = None,
        status: str = None,
        limit: int = 100
    ) -> AsyncIterator[LeafLinkOrder]:
        """Fetch orders with pagination."""
        params = {
            "limit": min(limit, 100),
            "ordering": "-modified",
            "seller": self.config.seller_id,
        }
        
        if since:
            params["modified__gte"] = since.isoformat()
        if status:
            params["status"] = status
        
        url = f"{self.config.base_url}/orders/"
        
        while url:
            async with self.session.get(url, params=params if "?" not in url else None) as resp:
                if resp.status == 429:
                    retry_after = int(resp.headers.get("Retry-After", 60))
                    print(f"Rate limited, waiting {retry_after}s...")
                    await asyncio.sleep(retry_after)
                    continue
                
                resp.raise_for_status()
                data = await resp.json()
            
            for order_data in data.get("results", []):
                yield await self._parse_order(order_data)
            
            url = data.get("next")
            params = {}  # Next URL includes params
            
            if url:
                await asyncio.sleep(0.1)  # Rate limit courtesy
    
    async def _parse_order(self, data: dict) -> LeafLinkOrder:
        """Parse order with customer lookup."""
        customer_id = data.get("customer")
        customer_name = None
        
        if customer_id:
            customer = await self._get_customer(customer_id)
            customer_name = customer.get("name") if customer else None
        
        return LeafLinkOrder(
            id=data["id"],
            number=data.get("number", ""),
            status=data.get("status", "Unknown"),
            customer_id=customer_id,
            customer_name=customer_name,
            total=float(data.get("total", 0)),
            created_on=datetime.fromisoformat(data["created_on"].replace("Z", "+00:00")) if data.get("created_on") else None,
            modified_on=datetime.fromisoformat(data["modified_on"].replace("Z", "+00:00")) if data.get("modified_on") else None,
            line_items=data.get("line_items", []),
            raw_data=data,
        )
    
    async def _get_customer(self, customer_id: int) -> dict:
        """Fetch customer with caching."""
        if customer_id in self._customer_cache:
            return self._customer_cache[customer_id]
        
        url = f"{self.config.base_url}/customers/{customer_id}/"
        
        try:
            async with self.session.get(url) as resp:
                if resp.status == 200:
                    customer = await resp.json()
                    self._customer_cache[customer_id] = customer
                    return customer
                elif resp.status == 404:
                    self._customer_cache[customer_id] = {}
                    return {}
        except Exception as e:
            print(f"Error fetching customer {customer_id}: {e}")
        
        return {}
    
    async def get_products(self, active_only: bool = True) -> AsyncIterator[dict]:
        """Fetch all products."""
        params = {
            "limit": 100,
            "seller": self.config.seller_id,
        }
        if active_only:
            params["archived"] = False
        
        url = f"{self.config.base_url}/products/"
        
        while url:
            async with self.session.get(url, params=params if "?" not in url else None) as resp:
                resp.raise_for_status()
                data = await resp.json()
            
            for product in data.get("results", []):
                yield product
            
            url = data.get("next")
            params = {}
            
            if url:
                await asyncio.sleep(0.1)
    
    async def get_order_transitions(self, order_id: int) -> list[dict]:
        """Get status change history for an order."""
        url = f"{self.config.base_url}/orders-status-transitions/"
        params = {"order": order_id}
        
        async with self.session.get(url, params=params) as resp:
            resp.raise_for_status()
            data = await resp.json()
        
        return data.get("results", [])

# Usage example
async def sync_recent_orders():
    """Sync orders from the last 24 hours."""
    config = LeafLinkConfig()
    since = datetime.now() - timedelta(hours=24)
    
    async with LeafLinkClient(config) as client:
        orders = []
        async for order in client.get_orders(since=since):
            orders.append(order)
            print(f"Order {order.number}: {order.status} - ${order.total:.2f} ({order.customer_name})")
        
        print(f"\nSynced {len(orders)} orders")
        return orders

# Run it
# asyncio.run(sync_recent_orders())

Lessons Learned from LeafLink

💡 LeafLink Integration Insights
  • Customer IDs are separate: Orders reference customer IDs; you need a separate call to get names
  • Line items are nested: Full product details require following the product ID
  • Status transitions are gold: The transitions endpoint shows order history
  • Modified ordering catches updates: Sort by -modified to see recently changed orders
  • Archived products still exist: Filter by archived=false for active inventory

8. Case Study: Google OAuth Flow

Google's APIs provide access to Calendar, Gmail, Drive, Docs, Sheets, and more. The authentication is OAuth 2.0, which is powerful but requires careful implementation.

📅 Google Workspace Integration
OAuth 2.0 + REST
5
APIs Integrated
OAuth
Auth Method
Full
Access Level
1hr
Token Lifetime

Setting Up Google OAuth

Before any code, you need to configure a Google Cloud project:

  1. Create a project in Google Cloud Console
  2. Enable the APIs you need (Calendar, Gmail, Drive, etc.)
  3. Configure the OAuth consent screen (app name, scopes, test users)
  4. Create OAuth 2.0 credentials (client ID and secret)
  5. Set authorized redirect URIs (localhost for development)

Complete OAuth + Calendar Implementation

Google Calendar Integration Python
import os
import json
import webbrowser
from datetime import datetime, timedelta
from pathlib import Path
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlencode, parse_qs, urlparse
import threading
import requests

class GoogleCalendarClient:
    """Complete Google Calendar integration with OAuth."""
    
    AUTH_URL = "https://accounts.google.com/o/oauth2/v2/auth"
    TOKEN_URL = "https://oauth2.googleapis.com/token"
    CALENDAR_API = "https://www.googleapis.com/calendar/v3"
    
    SCOPES = [
        "https://www.googleapis.com/auth/calendar",  # Full calendar access
        "https://www.googleapis.com/auth/calendar.events",  # Events only
    ]
    
    def __init__(self):
        self.config_dir = Path.home() / ".config" / "google-calendar"
        self.config_dir.mkdir(parents=True, exist_ok=True)
        
        # Load credentials from env file
        self._load_credentials()
        
        self.tokens_file = self.config_dir / "tokens.json"
        self.tokens = self._load_tokens()
    
    def _load_credentials(self):
        """Load OAuth credentials from secrets file."""
        secrets_file = self.config_dir / "secrets.env"
        if secrets_file.exists():
            for line in secrets_file.read_text().splitlines():
                if "=" in line and not line.startswith("#"):
                    key, value = line.split("=", 1)
                    os.environ[key.strip()] = value.strip()
        
        self.client_id = os.environ.get("GOOGLE_CLIENT_ID")
        self.client_secret = os.environ.get("GOOGLE_CLIENT_SECRET")
        
        if not self.client_id or not self.client_secret:
            raise ValueError("Missing GOOGLE_CLIENT_ID or GOOGLE_CLIENT_SECRET")
    
    def _load_tokens(self) -> dict:
        if self.tokens_file.exists():
            return json.loads(self.tokens_file.read_text())
        return {}
    
    def _save_tokens(self, tokens: dict):
        self.tokens_file.write_text(json.dumps(tokens, indent=2))
        self.tokens_file.chmod(0o600)
        self.tokens = tokens
    
    def authorize(self, port: int = 8080):
        """Run OAuth authorization flow."""
        redirect_uri = f"http://localhost:{port}/callback"
        
        # Build authorization URL
        params = {
            "client_id": self.client_id,
            "redirect_uri": redirect_uri,
            "response_type": "code",
            "scope": " ".join(self.SCOPES),
            "access_type": "offline",
            "prompt": "consent",
        }
        auth_url = f"{self.AUTH_URL}?{urlencode(params)}"
        
        # Local server to receive callback
        authorization_code = None
        
        class CallbackHandler(BaseHTTPRequestHandler):
            def do_GET(self):
                nonlocal authorization_code
                query = parse_qs(urlparse(self.path).query)
                authorization_code = query.get("code", [None])[0]
                
                self.send_response(200)
                self.send_header("Content-Type", "text/html")
                self.end_headers()
                
                if authorization_code:
                    self.wfile.write(b"

Authorization successful!

You can close this window.

") else: self.wfile.write(b"

Authorization failed

") def log_message(self, *args): pass # Suppress logging # Start server server = HTTPServer(("localhost", port), CallbackHandler) server_thread = threading.Thread(target=server.handle_request) server_thread.start() # Open browser print(f"Opening browser for authorization...") webbrowser.open(auth_url) # Wait for callback server_thread.join(timeout=120) server.server_close() if not authorization_code: raise ValueError("Authorization failed - no code received") # Exchange code for tokens response = requests.post(self.TOKEN_URL, data={ "client_id": self.client_id, "client_secret": self.client_secret, "code": authorization_code, "grant_type": "authorization_code", "redirect_uri": redirect_uri, }) response.raise_for_status() tokens = response.json() tokens["expires_at"] = ( datetime.now() + timedelta(seconds=tokens["expires_in"]) ).isoformat() self._save_tokens(tokens) print("Authorization complete! Tokens saved.") def get_access_token(self) -> str: """Get valid access token, refreshing if needed.""" if not self.tokens: raise ValueError("Not authorized. Run authorize() first.") expires_at = datetime.fromisoformat(self.tokens["expires_at"]) if datetime.now() >= expires_at - timedelta(minutes=5): # Refresh the token response = requests.post(self.TOKEN_URL, data={ "client_id": self.client_id, "client_secret": self.client_secret, "refresh_token": self.tokens["refresh_token"], "grant_type": "refresh_token", }) response.raise_for_status() new_tokens = response.json() new_tokens["refresh_token"] = self.tokens.get("refresh_token") new_tokens["expires_at"] = ( datetime.now() + timedelta(seconds=new_tokens["expires_in"]) ).isoformat() self._save_tokens(new_tokens) return self.tokens["access_token"] @property def headers(self) -> dict: return {"Authorization": f"Bearer {self.get_access_token()}"} def list_calendars(self) -> list[dict]: """List all calendars for the user.""" response = requests.get( f"{self.CALENDAR_API}/users/me/calendarList", headers=self.headers ) response.raise_for_status() return response.json().get("items", []) def get_upcoming_events( self, calendar_id: str = "primary", days: int = 7, max_results: int = 50 ) -> list[dict]: """Get upcoming events from a calendar.""" time_min = datetime.utcnow().isoformat() + "Z" time_max = (datetime.utcnow() + timedelta(days=days)).isoformat() + "Z" response = requests.get( f"{self.CALENDAR_API}/calendars/{calendar_id}/events", headers=self.headers, params={ "timeMin": time_min, "timeMax": time_max, "maxResults": max_results, "singleEvents": True, "orderBy": "startTime", } ) response.raise_for_status() return response.json().get("items", []) def create_event( self, summary: str, start: datetime, end: datetime, description: str = "", calendar_id: str = "primary", attendees: list[str] = None, ) -> dict: """Create a calendar event.""" event = { "summary": summary, "description": description, "start": { "dateTime": start.isoformat(), "timeZone": "America/Anchorage", }, "end": { "dateTime": end.isoformat(), "timeZone": "America/Anchorage", }, } if attendees: event["attendees"] = [{"email": email} for email in attendees] response = requests.post( f"{self.CALENDAR_API}/calendars/{calendar_id}/events", headers=self.headers, json=event ) response.raise_for_status() return response.json() # Usage def demo_calendar(): client = GoogleCalendarClient() # First time: authorize # client.authorize() # List calendars calendars = client.list_calendars() for cal in calendars: print(f"📅 {cal['summary']} ({cal['id']})") # Get upcoming events events = client.get_upcoming_events(days=7) for event in events: start = event["start"].get("dateTime", event["start"].get("date")) print(f" • {start}: {event['summary']}") # Create an event # new_event = client.create_event( # summary="Team Standup", # start=datetime.now() + timedelta(days=1, hours=9), # end=datetime.now() + timedelta(days=1, hours=9, minutes=30), # description="Daily sync meeting", # )

Gmail Integration

With the same OAuth flow (different scopes), you can access Gmail:

Gmail API Example Python
GMAIL_API = "https://www.googleapis.com/gmail/v1"

def get_unread_emails(headers: dict, max_results: int = 10) -> list[dict]:
    """Fetch unread emails from inbox."""
    response = requests.get(
        f"{GMAIL_API}/users/me/messages",
        headers=headers,
        params={
            "labelIds": ["INBOX", "UNREAD"],
            "maxResults": max_results,
        }
    )
    response.raise_for_status()
    
    messages = []
    for msg in response.json().get("messages", []):
        # Fetch full message
        detail = requests.get(
            f"{GMAIL_API}/users/me/messages/{msg['id']}",
            headers=headers,
            params={"format": "metadata", "metadataHeaders": ["Subject", "From", "Date"]}
        )
        detail.raise_for_status()
        
        msg_data = detail.json()
        headers_list = msg_data.get("payload", {}).get("headers", [])
        headers_dict = {h["name"]: h["value"] for h in headers_list}
        
        messages.append({
            "id": msg["id"],
            "subject": headers_dict.get("Subject", "(no subject)"),
            "from": headers_dict.get("From", ""),
            "date": headers_dict.get("Date", ""),
            "snippet": msg_data.get("snippet", ""),
        })
    
    return messages
⚠️ Google OAuth Gotchas
  • Refresh tokens disappear: If you don't request access_type=offline and prompt=consent, you won't get a refresh token
  • Scopes matter: Users see exactly what permissions you request. Request only what you need.
  • Verification required: For sensitive scopes (Gmail modify, Drive), Google requires app verification
  • Test users: Before verification, only test users in your consent screen can authorize
  • Quota limits: Free tier has per-day limits. Check the APIs & Services dashboard.

9. Case Study: Real-Time Blockchain Data

Etherscan provides blockchain data via a REST API. Unlike typical web APIs, blockchain data has unique characteristics: immutable history, real-time mempool, and cryptographic verification.

⛓️ Etherscan Blockchain Integration
REST API v2
5/sec
Rate Limit
Ethereum
Primary Chain
Free
Tier
Chainid
Multi-chain

Etherscan API Patterns

Etherscan's API is unusual: it returns HTTP 200 for errors, with error details in the response body. This requires careful error handling.

Etherscan Client Python
import asyncio
import aiohttp
from dataclasses import dataclass
from decimal import Decimal
from typing import Optional
from pathlib import Path

@dataclass
class EtherscanConfig:
    """Etherscan API configuration."""
    api_key: str = None
    base_url: str = "https://api.etherscan.io/v2/api"
    chain_id: int = 1  # Ethereum mainnet
    rate_limit: float = 0.2  # 5 requests per second
    
    def __post_init__(self):
        if not self.api_key:
            key_file = Path.home() / ".config" / "etherscan" / "api_key"
            if key_file.exists():
                self.api_key = key_file.read_text().strip()
            else:
                raise ValueError(f"No API key found. Create {key_file}")

class EtherscanError(Exception):
    """Etherscan API error."""
    pass

class EtherscanClient:
    """Production Etherscan API client."""
    
    def __init__(self, config: EtherscanConfig = None):
        self.config = config or EtherscanConfig()
        self.session: aiohttp.ClientSession = None
        self._last_request = 0
    
    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        return self
    
    async def __aexit__(self, *args):
        if self.session:
            await self.session.close()
    
    async def _request(self, module: str, action: str, **params) -> dict:
        """Make rate-limited request to Etherscan."""
        # Enforce rate limit
        now = asyncio.get_event_loop().time()
        wait_time = self.config.rate_limit - (now - self._last_request)
        if wait_time > 0:
            await asyncio.sleep(wait_time)
        
        all_params = {
            "chainid": self.config.chain_id,
            "module": module,
            "action": action,
            "apikey": self.config.api_key,
            **params
        }
        
        async with self.session.get(self.config.base_url, params=all_params) as resp:
            self._last_request = asyncio.get_event_loop().time()
            data = await resp.json()
        
        # Etherscan returns 200 with errors in body
        if data.get("status") == "0":
            message = data.get("message", "Unknown error")
            result = data.get("result", "")
            
            # Rate limit error
            if "rate limit" in result.lower():
                await asyncio.sleep(1)
                return await self._request(module, action, **params)  # Retry
            
            # No transactions is not an error
            if "No transactions found" in str(result):
                return {"result": []}
            
            raise EtherscanError(f"{message}: {result}")
        
        return data
    
    async def get_balance(self, address: str) -> Decimal:
        """Get ETH balance for address."""
        data = await self._request("account", "balance", address=address, tag="latest")
        # Balance is in wei, convert to ETH
        wei = int(data["result"])
        return Decimal(wei) / Decimal(10**18)
    
    async def get_token_balance(self, address: str, contract: str) -> Decimal:
        """Get ERC-20 token balance."""
        data = await self._request(
            "account", "tokenbalance",
            address=address,
            contractaddress=contract,
            tag="latest"
        )
        # Need to know token decimals for proper conversion
        return Decimal(data["result"])
    
    async def get_transactions(
        self,
        address: str,
        start_block: int = 0,
        end_block: int = 99999999,
        page: int = 1,
        offset: int = 100,
        sort: str = "desc"
    ) -> list[dict]:
        """Get normal transactions for address."""
        data = await self._request(
            "account", "txlist",
            address=address,
            startblock=start_block,
            endblock=end_block,
            page=page,
            offset=offset,
            sort=sort
        )
        return data.get("result", [])
    
    async def get_token_transfers(
        self,
        address: str,
        contract: str = None,
        start_block: int = 0,
        end_block: int = 99999999,
    ) -> list[dict]:
        """Get ERC-20 token transfers."""
        params = {
            "address": address,
            "startblock": start_block,
            "endblock": end_block,
            "sort": "desc"
        }
        if contract:
            params["contractaddress"] = contract
        
        data = await self._request("account", "tokentx", **params)
        return data.get("result", [])
    
    async def get_gas_price(self) -> dict:
        """Get current gas prices."""
        data = await self._request("gastracker", "gasoracle")
        result = data.get("result", {})
        return {
            "safe": int(result.get("SafeGasPrice", 0)),
            "standard": int(result.get("ProposeGasPrice", 0)),
            "fast": int(result.get("FastGasPrice", 0)),
            "base_fee": float(result.get("suggestBaseFee", 0)),
        }
    
    async def get_contract_abi(self, contract: str) -> list:
        """Get verified contract ABI."""
        data = await self._request("contract", "getabi", address=contract)
        import json
        return json.loads(data["result"])

# Usage examples
async def demo_etherscan():
    async with EtherscanClient() as client:
        # Check an address balance
        address = "0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045"  # vitalik.eth
        balance = await client.get_balance(address)
        print(f"Balance: {balance:.4f} ETH")
        
        # Get recent transactions
        txs = await client.get_transactions(address, offset=5)
        for tx in txs:
            value_eth = Decimal(tx["value"]) / Decimal(10**18)
            direction = "←" if tx["to"].lower() == address.lower() else "→"
            print(f"  {direction} {value_eth:.4f} ETH ({tx['hash'][:16]}...)")
        
        # Current gas prices
        gas = await client.get_gas_price()
        print(f"\nGas: {gas['safe']}/{gas['standard']}/{gas['fast']} gwei (safe/standard/fast)")

# asyncio.run(demo_etherscan())

Monitoring Wallets for Activity

A common use case is monitoring wallets for incoming transactions or token transfers:

Wallet Monitor Python
class WalletMonitor:
    """Monitor wallets for new transactions."""
    
    def __init__(self, client: EtherscanClient, addresses: list[str]):
        self.client = client
        self.addresses = [a.lower() for a in addresses]
        self.last_seen: dict[str, str] = {}  # address -> last tx hash
    
    async def check_for_new_transactions(self) -> list[dict]:
        """Check all addresses for new transactions."""
        new_txs = []
        
        for address in self.addresses:
            txs = await self.client.get_transactions(address, offset=10)
            
            if not txs:
                continue
            
            last_hash = self.last_seen.get(address)
            
            for tx in txs:
                if tx["hash"] == last_hash:
                    break  # Reached already-seen transactions
                
                new_txs.append({
                    "type": "transaction",
                    "address": address,
                    "tx": tx,
                    "direction": "in" if tx["to"].lower() == address else "out",
                    "value_eth": Decimal(tx["value"]) / Decimal(10**18),
                })
            
            if txs:
                self.last_seen[address] = txs[0]["hash"]
        
        return new_txs
    
    async def run(self, interval_seconds: int = 30):
        """Continuously monitor for new transactions."""
        print(f"Monitoring {len(self.addresses)} addresses...")
        
        while True:
            try:
                new_txs = await self.check_for_new_transactions()
                
                for tx in new_txs:
                    direction = "Received" if tx["direction"] == "in" else "Sent"
                    print(f"🔔 {direction} {tx['value_eth']:.4f} ETH to {tx['address'][:10]}...")
                
                await asyncio.sleep(interval_seconds)
                
            except EtherscanError as e:
                print(f"API error: {e}")
                await asyncio.sleep(60)  # Back off on errors
💡 Blockchain API Insights
  • Data is immutable: Once confirmed, blockchain data never changes. Cache aggressively.
  • Block confirmations: Wait for 12+ confirmations before considering transactions final
  • Wei vs ETH: All values are in wei (10^18 wei = 1 ETH). Don't lose precision.
  • Checksummed addresses: Ethereum addresses should be checksummed (mixed case)
  • Multiple chains: Etherscan v2 API supports multiple chains via chainid parameter

10. Building Robust Integrations That Don't Break

Building an integration is easy. Building one that survives months of production without constant firefighting is hard. Here are the patterns that make integrations resilient.

The Circuit Breaker Pattern

When an API is down, hammering it with retries makes things worse. A circuit breaker "trips" after repeated failures, fast-failing subsequent requests until the service recovers.

Circuit Breaker Implementation Python
from enum import Enum
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Callable
import asyncio

class CircuitState(Enum):
    CLOSED = "closed"      # Normal operation
    OPEN = "open"          # Failing, reject requests
    HALF_OPEN = "half_open"  # Testing if recovered

@dataclass
class CircuitBreakerConfig:
    failure_threshold: int = 5       # Failures before opening
    reset_timeout: float = 60.0      # Seconds before trying again
    half_open_requests: int = 3      # Test requests in half-open

class CircuitBreaker:
    """Circuit breaker for API calls."""
    
    def __init__(self, name: str, config: CircuitBreakerConfig = None):
        self.name = name
        self.config = config or CircuitBreakerConfig()
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.success_count = 0
        self.last_failure_time: datetime = None
    
    async def call(self, func: Callable, *args, **kwargs):
        """Execute function with circuit breaker protection."""
        if self.state == CircuitState.OPEN:
            if self._should_try_reset():
                self.state = CircuitState.HALF_OPEN
                self.success_count = 0
            else:
                raise CircuitOpenError(f"Circuit {self.name} is open")
        
        try:
            result = await func(*args, **kwargs)
            self._record_success()
            return result
        except Exception as e:
            self._record_failure()
            raise
    
    def _should_try_reset(self) -> bool:
        """Check if enough time passed to try recovery."""
        if not self.last_failure_time:
            return True
        return datetime.now() > self.last_failure_time + timedelta(seconds=self.config.reset_timeout)
    
    def _record_success(self):
        if self.state == CircuitState.HALF_OPEN:
            self.success_count += 1
            if self.success_count >= self.config.half_open_requests:
                self.state = CircuitState.CLOSED
                self.failure_count = 0
                print(f"Circuit {self.name} closed (recovered)")
        self.failure_count = 0
    
    def _record_failure(self):
        self.failure_count += 1
        self.last_failure_time = datetime.now()
        
        if self.state == CircuitState.HALF_OPEN:
            self.state = CircuitState.OPEN
            print(f"Circuit {self.name} reopened (still failing)")
        elif self.failure_count >= self.config.failure_threshold:
            self.state = CircuitState.OPEN
            print(f"Circuit {self.name} opened (threshold reached)")

class CircuitOpenError(Exception):
    pass

# Usage
leaflink_circuit = CircuitBreaker("leaflink")

async def fetch_orders_safe():
    return await leaflink_circuit.call(fetch_orders_from_api)

Graceful Degradation

When an integration fails, your application should continue functioning with reduced capability rather than crashing entirely.

Graceful Degradation Pattern Python
from dataclasses import dataclass
from typing import Optional
import json
from pathlib import Path

@dataclass
class CachedData:
    """Wrapper for data with freshness tracking."""
    data: any
    fetched_at: datetime
    source: str  # "api" or "cache"
    
    @property
    def age_seconds(self) -> float:
        return (datetime.now() - self.fetched_at).total_seconds()
    
    @property
    def is_stale(self) -> bool:
        return self.age_seconds > 300  # 5 minutes

class ResilientDataFetcher:
    """Fetch data with fallback to cache."""
    
    def __init__(self, cache_dir: Path):
        self.cache_dir = cache_dir
        self.cache_dir.mkdir(parents=True, exist_ok=True)
    
    async def fetch_with_fallback(
        self,
        key: str,
        fetch_func,
        max_cache_age: int = 3600
    ) -> CachedData:
        """Fetch fresh data, fall back to cache on failure."""
        cache_file = self.cache_dir / f"{key}.json"
        
        # Try fresh fetch
        try:
            data = await fetch_func()
            
            # Cache the successful result
            self._save_cache(cache_file, data)
            
            return CachedData(
                data=data,
                fetched_at=datetime.now(),
                source="api"
            )
            
        except Exception as e:
            print(f"API fetch failed: {e}")
            
            # Fall back to cache
            cached = self._load_cache(cache_file)
            if cached:
                age = (datetime.now() - cached["fetched_at"]).total_seconds()
                
                if age < max_cache_age:
                    print(f"Using cached data ({age:.0f}s old)")
                    return CachedData(
                        data=cached["data"],
                        fetched_at=cached["fetched_at"],
                        source="cache"
                    )
                else:
                    print(f"Cache too old ({age:.0f}s), but using anyway")
                    return CachedData(
                        data=cached["data"],
                        fetched_at=cached["fetched_at"],
                        source="stale_cache"
                    )
            
            # No cache available
            raise
    
    def _save_cache(self, path: Path, data: any):
        cache_data = {
            "data": data,
            "fetched_at": datetime.now().isoformat(),
        }
        path.write_text(json.dumps(cache_data, default=str))
    
    def _load_cache(self, path: Path) -> Optional[dict]:
        if not path.exists():
            return None
        try:
            cached = json.loads(path.read_text())
            cached["fetched_at"] = datetime.fromisoformat(cached["fetched_at"])
            return cached
        except:
            return None

Health Checks and Dependency Monitoring

API Health Monitor Python
@dataclass
class HealthStatus:
    service: str
    healthy: bool
    latency_ms: float
    last_check: datetime
    error: Optional[str] = None

class APIHealthMonitor:
    """Monitor health of API dependencies."""
    
    def __init__(self):
        self.status: dict[str, HealthStatus] = {}
    
    async def check_leaflink(self) -> HealthStatus:
        """Check LeafLink API health."""
        start = datetime.now()
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(
                    "https://www.leaflink.com/api/v2/orders/?limit=1",
                    headers={"Authorization": f"App {LEAFLINK_KEY}"},
                    timeout=aiohttp.ClientTimeout(total=10)
                ) as resp:
                    latency = (datetime.now() - start).total_seconds() * 1000
                    return HealthStatus(
                        service="leaflink",
                        healthy=resp.status == 200,
                        latency_ms=latency,
                        last_check=datetime.now(),
                        error=None if resp.status == 200 else f"HTTP {resp.status}"
                    )
        except Exception as e:
            return HealthStatus(
                service="leaflink",
                healthy=False,
                latency_ms=-1,
                last_check=datetime.now(),
                error=str(e)
            )
    
    async def check_all(self) -> dict[str, HealthStatus]:
        """Check all configured APIs."""
        checks = [
            self.check_leaflink(),
            # self.check_distru(),
            # self.check_quickbooks(),
            # ...
        ]
        
        results = await asyncio.gather(*checks, return_exceptions=True)
        
        for result in results:
            if isinstance(result, HealthStatus):
                self.status[result.service] = result
        
        return self.status
    
    def is_healthy(self, service: str) -> bool:
        """Check if a specific service is healthy."""
        status = self.status.get(service)
        if not status:
            return True  # Unknown = assume healthy
        
        # Consider unhealthy if last check failed or is too old
        if not status.healthy:
            return False
        if (datetime.now() - status.last_check).total_seconds() > 300:
            return False  # Stale health check
        
        return True

11. Testing and Monitoring

Integration code is notoriously hard to test. External APIs aren't available in CI, responses change, and rate limits make exhaustive testing impractical. Here's how to approach it pragmatically.

Testing Strategy

Test Type What It Tests How to Do It
Unit Tests Your parsing, transformation logic Mock API responses with saved fixtures
Contract Tests Response schema matches expectations Validate against JSON Schema or Pydantic models
Integration Tests End-to-end flows work Use sandbox/test accounts, run sparingly
Smoke Tests Production API is reachable Lightweight health checks in prod

Mocking API Responses

Test Fixtures and Mocking Python
import pytest
from unittest.mock import AsyncMock, patch
from pathlib import Path
import json

# Save real API responses as fixtures
FIXTURES_DIR = Path(__file__).parent / "fixtures"

def load_fixture(name: str) -> dict:
    """Load saved API response."""
    return json.loads((FIXTURES_DIR / f"{name}.json").read_text())

class TestLeafLinkClient:
    """Tests for LeafLink integration."""
    
    @pytest.fixture
    def mock_session(self):
        """Create mock aiohttp session."""
        session = AsyncMock()
        return session
    
    @pytest.mark.asyncio
    async def test_parse_order(self, mock_session):
        """Test order parsing logic."""
        # Load fixture
        order_data = load_fixture("leaflink_order")
        
        # Test parsing
        client = LeafLinkClient(LeafLinkConfig(api_key="test"))
        client.session = mock_session
        client._customer_cache = {28451: {"name": "Test Dispensary"}}
        
        order = await client._parse_order(order_data)
        
        assert order.number == "SO-2026-0892"
        assert order.status == "Submitted"
        assert order.customer_name == "Test Dispensary"
        assert order.total == 4250.00
    
    @pytest.mark.asyncio
    async def test_handles_rate_limit(self, mock_session):
        """Test rate limit handling."""
        # First call returns 429, second succeeds
        mock_response_429 = AsyncMock()
        mock_response_429.status = 429
        mock_response_429.headers = {"Retry-After": "1"}
        
        mock_response_200 = AsyncMock()
        mock_response_200.status = 200
        mock_response_200.json = AsyncMock(return_value={"results": []})
        
        mock_session.get.return_value.__aenter__.side_effect = [
            mock_response_429,
            mock_response_200
        ]
        
        # Should retry and succeed
        # ... test implementation
    
    @pytest.mark.asyncio  
    async def test_handles_missing_customer(self, mock_session):
        """Test graceful handling when customer lookup fails."""
        order_data = load_fixture("leaflink_order")
        
        # Mock 404 on customer lookup
        mock_response = AsyncMock()
        mock_response.status = 404
        mock_session.get.return_value.__aenter__.return_value = mock_response
        
        client = LeafLinkClient(LeafLinkConfig(api_key="test"))
        client.session = mock_session
        
        order = await client._parse_order(order_data)
        
        # Should still parse, just without customer name
        assert order.customer_name is None
        assert order.customer_id == 28451

# Recording fixtures from real API (run manually)
async def record_fixtures():
    """Save real API responses for testing."""
    config = LeafLinkConfig()
    async with LeafLinkClient(config) as client:
        async for order in client.get_orders(limit=1):
            FIXTURES_DIR.mkdir(exist_ok=True)
            (FIXTURES_DIR / "leaflink_order.json").write_text(
                json.dumps(order.raw_data, indent=2)
            )
            break

Schema Validation

APIs change without warning. Schema validation catches breaking changes before they cause silent failures:

Pydantic Schema Validation Python
from pydantic import BaseModel, validator
from typing import Optional
from datetime import datetime

class LeafLinkOrderSchema(BaseModel):
    """Expected schema for LeafLink order response."""
    id: int
    number: str
    status: str
    customer: Optional[int]
    total: str  # LeafLink returns as string
    created_on: datetime
    modified_on: Optional[datetime]
    line_items: list
    
    @validator("total", pre=True)
    def parse_total(cls, v):
        return str(v)  # Ensure string
    
    class Config:
        extra = "allow"  # Allow extra fields (API may add new ones)

def validate_order_response(data: dict) -> LeafLinkOrderSchema:
    """Validate API response matches expected schema."""
    try:
        return LeafLinkOrderSchema(**data)
    except Exception as e:
        # Log schema violation for investigation
        logger.error(f"Schema violation in order {data.get('id')}: {e}")
        logger.error(f"Data: {json.dumps(data)[:500]}")
        raise

# In your API client:
async def get_order(self, order_id: int) -> LeafLinkOrderSchema:
    data = await self._request(f"orders/{order_id}/")
    return validate_order_response(data)  # Fails fast on schema change

Production Monitoring

Testing catches issues before deployment. Monitoring catches issues in production. Both are necessary.

📊 Key Metrics to Monitor
Availability
API uptime — Track success rate of health checks
Target: 99.9%+ for critical integrations
Latency
Response times — P50, P95, P99 latencies
Alert on P95 > 2x normal
Errors
Error rate — 4xx and 5xx responses
Alert on error rate > 1%
Rate Limits
429 responses — Are you hitting limits?
Should be near zero if properly managed
Data
Sync freshness — Age of most recent synced data
Alert if data > 1 hour old
Simple Metrics Collection Python
from dataclasses import dataclass, field
from datetime import datetime
from collections import defaultdict
import statistics

@dataclass
class APIMetrics:
    """Collect metrics for API calls."""
    service: str
    requests: int = 0
    errors: int = 0
    rate_limits: int = 0
    latencies: list = field(default_factory=list)
    last_success: datetime = None
    last_error: datetime = None
    last_error_msg: str = None
    
    def record_request(self, success: bool, latency_ms: float, error: str = None):
        self.requests += 1
        self.latencies.append(latency_ms)
        
        # Keep only last 1000 latencies
        if len(self.latencies) > 1000:
            self.latencies = self.latencies[-1000:]
        
        if success:
            self.last_success = datetime.now()
        else:
            self.errors += 1
            self.last_error = datetime.now()
            self.last_error_msg = error
    
    def record_rate_limit(self):
        self.rate_limits += 1
    
    @property
    def error_rate(self) -> float:
        if self.requests == 0:
            return 0.0
        return self.errors / self.requests
    
    @property
    def p50_latency(self) -> float:
        if not self.latencies:
            return 0.0
        return statistics.median(self.latencies)
    
    @property
    def p95_latency(self) -> float:
        if not self.latencies:
            return 0.0
        return statistics.quantiles(self.latencies, n=20)[18]  # 95th percentile
    
    def to_dict(self) -> dict:
        return {
            "service": self.service,
            "requests": self.requests,
            "errors": self.errors,
            "error_rate": f"{self.error_rate:.2%}",
            "rate_limits": self.rate_limits,
            "p50_latency_ms": round(self.p50_latency, 2),
            "p95_latency_ms": round(self.p95_latency, 2),
            "last_success": self.last_success.isoformat() if self.last_success else None,
            "last_error": self.last_error.isoformat() if self.last_error else None,
            "last_error_msg": self.last_error_msg,
        }

# Global metrics registry
_metrics: dict[str, APIMetrics] = {}

def get_metrics(service: str) -> APIMetrics:
    if service not in _metrics:
        _metrics[service] = APIMetrics(service=service)
    return _metrics[service]

def get_all_metrics() -> dict:
    return {name: m.to_dict() for name, m in _metrics.items()}

12. Security Considerations

API integrations are attack surfaces. Every credential, every endpoint, every piece of data flowing through your integrations needs protection. Here's what matters.

🔐
Credential Management
Never hardcode secrets. Use environment variables, secrets managers, or encrypted config files.
🔄
Key Rotation
Rotate API keys quarterly. Automate where possible. Have a rotation playbook.
🛡️
Least Privilege
Request only necessary scopes. Use read-only keys when you don't need write access.
📝
Audit Logging
Log all API operations (without sensitive data). Know who did what, when.
🔍
Input Validation
Validate all data from APIs. Don't trust external data, even from "trusted" services.
🚫
Data Minimization
Don't fetch or store more data than needed. Less data = less risk.

Secure Credential Storage

Credential Management Patterns Python
import os
from pathlib import Path
from dataclasses import dataclass
from typing import Optional

@dataclass
class SecureCredentials:
    """Manage API credentials securely."""
    service: str
    
    def __post_init__(self):
        self.config_dir = Path.home() / ".config" / self.service
    
    def get_api_key(self) -> str:
        """Load API key with fallback chain."""
        # 1. Environment variable (highest priority, for CI/containers)
        env_key = f"{self.service.upper()}_API_KEY"
        if key := os.environ.get(env_key):
            return key
        
        # 2. File (for local development)
        key_file = self.config_dir / "api_key"
        if key_file.exists():
            key_file.chmod(0o600)  # Ensure secure permissions
            return key_file.read_text().strip()
        
        # 3. Secrets manager (for production)
        if key := self._get_from_secrets_manager():
            return key
        
        raise ValueError(f"No API key found for {self.service}")
    
    def _get_from_secrets_manager(self) -> Optional[str]:
        """Load from AWS Secrets Manager, HashiCorp Vault, etc."""
        # Example with AWS Secrets Manager
        try:
            import boto3
            client = boto3.client("secretsmanager")
            response = client.get_secret_value(SecretId=f"api/{self.service}")
            return response["SecretString"]
        except:
            return None
    
    def save_api_key(self, key: str):
        """Securely save API key to file."""
        self.config_dir.mkdir(parents=True, exist_ok=True)
        key_file = self.config_dir / "api_key"
        key_file.write_text(key)
        key_file.chmod(0o600)  # Owner read/write only

# Best practices for each service
CREDENTIAL_PATTERNS = {
    "leaflink": "App {api_key}",           # Custom header format
    "etherscan": "apikey={api_key}",       # Query parameter
    "distru": "Bearer {api_key}",          # Bearer token
    "google": "Bearer {access_token}",     # OAuth access token
    "quickbooks": "Bearer {access_token}", # OAuth access token
    "twilio": "{account_sid}:{auth_token}", # Basic auth
}

Webhook Security

Secure Webhook Handling Python
import hmac
import hashlib
import time
from fastapi import FastAPI, Request, HTTPException, Header
from typing import Optional

app = FastAPI()

# Store webhook secrets securely
WEBHOOK_SECRETS = {
    "leaflink": os.environ.get("LEAFLINK_WEBHOOK_SECRET"),
    "stripe": os.environ.get("STRIPE_WEBHOOK_SECRET"),
}

def verify_signature(
    payload: bytes,
    signature: str,
    secret: str,
    algorithm: str = "sha256"
) -> bool:
    """Verify HMAC signature."""
    if algorithm == "sha256":
        expected = hmac.new(secret.encode(), payload, hashlib.sha256).hexdigest()
    elif algorithm == "sha1":
        expected = hmac.new(secret.encode(), payload, hashlib.sha1).hexdigest()
    else:
        raise ValueError(f"Unknown algorithm: {algorithm}")
    
    return hmac.compare_digest(expected, signature)

def verify_timestamp(timestamp: int, tolerance_seconds: int = 300) -> bool:
    """Verify request timestamp is recent (prevent replay attacks)."""
    now = int(time.time())
    return abs(now - timestamp) <= tolerance_seconds

@app.post("/webhooks/{provider}")
async def handle_webhook(
    provider: str,
    request: Request,
    x_signature: Optional[str] = Header(None, alias="X-Signature"),
    x_timestamp: Optional[str] = Header(None, alias="X-Timestamp"),
):
    """Generic webhook handler with security checks."""
    
    # Get provider secret
    secret = WEBHOOK_SECRETS.get(provider)
    if not secret:
        raise HTTPException(status_code=404, detail="Unknown provider")
    
    # Get raw body
    body = await request.body()
    
    # Verify signature
    if not x_signature:
        raise HTTPException(status_code=401, detail="Missing signature")
    
    # Some providers include timestamp in signature
    if x_timestamp:
        try:
            timestamp = int(x_timestamp)
            if not verify_timestamp(timestamp):
                raise HTTPException(status_code=401, detail="Stale timestamp")
            # Include timestamp in signed payload
            signed_payload = f"{timestamp}.{body.decode()}"
            body_for_verify = signed_payload.encode()
        except ValueError:
            raise HTTPException(status_code=400, detail="Invalid timestamp")
    else:
        body_for_verify = body
    
    if not verify_signature(body_for_verify, x_signature, secret):
        raise HTTPException(status_code=401, detail="Invalid signature")
    
    # Process webhook
    data = await request.json()
    await process_webhook(provider, data)
    
    return {"status": "received"}

Data Protection

🚫 Data Security Checklist
  • Never log credentials: Scrub auth headers from logs
  • Never log PII: Customer names, emails, addresses should not appear in logs
  • Never log full requests: Request bodies may contain sensitive data
  • Encrypt at rest: Cached API data should be encrypted
  • Encrypt in transit: Always use HTTPS (verify SSL)
  • Timeout sessions: Don't keep OAuth tokens longer than needed
  • Audit access: Know who has access to credentials
Safe Logging Python
import re
from typing import Any

# Patterns to redact
SENSITIVE_PATTERNS = [
    (r"Authorization:\s*\S+", "Authorization: [REDACTED]"),
    (r"api[_-]?key[\"']?\s*[:=]\s*[\"']?\S+", "api_key: [REDACTED]"),
    (r"Bearer\s+\S+", "Bearer [REDACTED]"),
    (r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", "[EMAIL]"),
    (r"\b\d{3}-\d{2}-\d{4}\b", "[SSN]"),  # SSN pattern
    (r"\b\d{16}\b", "[CARD]"),  # Credit card
]

def redact_sensitive(text: str) -> str:
    """Redact sensitive information from text."""
    for pattern, replacement in SENSITIVE_PATTERNS:
        text = re.sub(pattern, replacement, text, flags=re.IGNORECASE)
    return text

def safe_log_request(method: str, url: str, headers: dict, body: Any = None):
    """Log request with sensitive data redacted."""
    # Redact headers
    safe_headers = {}
    for key, value in headers.items():
        if key.lower() in ("authorization", "x-api-key", "api-key"):
            safe_headers[key] = "[REDACTED]"
        else:
            safe_headers[key] = value
    
    # Redact URL params
    safe_url = redact_sensitive(url)
    
    # Log safely
    log_entry = {
        "method": method,
        "url": safe_url,
        "headers": safe_headers,
    }
    
    if body:
        # Don't log body for sensitive endpoints
        if any(x in url.lower() for x in ["auth", "token", "password", "secret"]):
            log_entry["body"] = "[REDACTED]"
        else:
            log_entry["body"] = redact_sensitive(str(body)[:500])
    
    print(json.dumps(log_entry))

Conclusion: The Integration Mindset

API integration isn't a one-time project—it's an ongoing practice. The services you integrate with will change. Their APIs will evolve. New services will emerge that you need to connect. The patterns in this guide prepare you for that evolution.

The key insights:

The integrations we've covered—LeafLink for B2B wholesale, Distru for manufacturing, Google for productivity, Twilio for communication, Etherscan for blockchain, QuickBooks for finance—represent the real complexity of modern systems. Each has its quirks. Each taught us something.

Your integrations will teach you too. Document what you learn. Share the patterns that work. Build systems that survive contact with production.

That's API integration mastery.

✅ Integration Checklist

Before deploying any new integration:

  • ☐ Credentials stored securely (not in code)
  • ☐ Rate limiting implemented
  • ☐ Exponential backoff for retries
  • ☐ Error handling for all failure modes
  • ☐ Schema validation on responses
  • ☐ Health check endpoint
  • ☐ Metrics collection
  • ☐ Graceful degradation (cache fallback)
  • ☐ Webhook signature verification (if applicable)
  • ☐ Tests with mocked responses
  • ☐ Documented in runbook

Share this article