- 1. The API-First Mindset
- 2. REST vs GraphQL vs WebSockets
- 3. Authentication Patterns
- 4. Rate Limiting and Backoff Strategies
- 5. Error Handling Best Practices
- 6. Webhook Patterns
- 7. Case Study: LeafLink Integration
- 8. Case Study: Google OAuth Flow
- 9. Case Study: Real-Time Blockchain Data
- 10. Building Robust Integrations
- 11. Testing and Monitoring
- 12. Security Considerations
In 2024, I watched a developer spend three weeks building an integration that broke on the first day of production. The API documentation said one thing; the actual API did something else. The rate limits weren't documented. The error messages were useless. The authentication flow had undocumented edge cases.
In 2026, that same integration would take three days—not because APIs got better (they didn't), but because we've accumulated hard-won knowledge about what actually works. This guide is that knowledge: the patterns that survive contact with production, the failures that teach you what documentation won't, and the code that handles the edge cases that don't exist in tutorials.
We're not talking theory. Every pattern in this guide comes from real integrations we've built and operated:
- LeafLink API — B2B wholesale marketplace, 9,000+ orders processed
- Distru GraphQL API — Manufacturing ERP with 986 products
- Google Calendar/Gmail APIs — Full workspace integration via OAuth
- Twilio APIs — Voice and SMS for customer outreach
- Etherscan API — Real-time blockchain data
- QuickBooks API — Financial system integration
This is what we learned.
1. The API-First Mindset
Before we discuss protocols and patterns, let's establish the philosophy that makes integration work tractable. The API-first mindset isn't about technology—it's about how you think about systems.
Everything is a Service
Modern business runs on services. Your CRM is a service. Your accounting system is a service. Your wholesale marketplace is a service. Your calendar is a service. Even internal tools should be thought of as services with APIs.
When you internalize this, integration becomes natural. The question isn't "can we connect these systems?"—it's "what's the API, and what does it let us do?"
Before building anything, map your service landscape:
The Contract Mindset
An API is a contract. The documentation says: "Send me this, and I'll give you that." Like any contract, the devil is in the details—and sometimes the counterparty doesn't honor the terms exactly.
Treat APIs defensively:
- Never trust input: Validate everything coming from an API, even if it should be valid
- Expect the unexpected: Fields that "always exist" will sometimes be missing
- Version everything: APIs change. Your code should handle old and new responses
- Log everything: You'll need it when debugging production issues
Composition Over Custom
The API-first mindset prefers composing existing services over building custom solutions. Need email? Use an email API (SendGrid, Mailgun, SES). Need payments? Use a payment API (Stripe, Square). Need voice? Use a voice API (Twilio).
This isn't laziness—it's leverage. A well-integrated system using mature APIs will be more reliable, more feature-rich, and cheaper to maintain than a custom-built alternative.
Build when: You need complete control, the problem is core to your business, or no adequate API exists.
Integrate when: The problem is solved, APIs are mature, and your differentiation isn't in the plumbing.
For most businesses, the answer is integrate. Your competitive advantage probably isn't in how you send emails.
2. REST vs GraphQL vs WebSockets
The three dominant paradigms for API communication each have their place. Understanding when to use each—and how they differ in practice—saves enormous debugging time.
Resource-based, HTTP verbs, stateless
- Most common
- Simple to understand
- Cacheable
- May over-fetch
Query language, single endpoint, typed schema
- Precise data fetching
- Self-documenting
- Complex queries
- Learning curve
Bidirectional, persistent connection, real-time
- Real-time updates
- Low latency
- Connection state
- Complex to manage
REST: The Workhorse
REST (Representational State Transfer) is what you'll encounter most often. It maps HTTP methods to CRUD operations on resources:
| Method | Operation | Example |
|---|---|---|
| GET | Read | GET /api/v2/orders/12345 |
| POST | Create | POST /api/v2/orders |
| PUT | Replace | PUT /api/v2/orders/12345 |
| PATCH | Update | PATCH /api/v2/orders/12345 |
| DELETE | Delete | DELETE /api/v2/orders/12345 |
LeafLink uses REST. Here's what a typical interaction looks like:
# List recent orders
GET https://www.leaflink.com/api/v2/orders/?limit=50&ordering=-modified
Authorization: App [API_KEY]
# Response (simplified)
{
"count": 9131,
"next": "https://www.leaflink.com/api/v2/orders/?limit=50&offset=50",
"results": [
{
"id": 4821934,
"number": "SO-2026-0892",
"status": "Submitted",
"customer": 28451,
"total": "4250.00",
"created_on": "2026-02-03T14:22:18Z",
"line_items": [...]
}
]
}
REST Pagination Patterns
Real APIs return paginated results. You'll encounter three patterns:
?limit=50&offset=100Simple but inefficient for deep pages. Skip 100 items, return 50. LeafLink, QuickBooks use this.
?cursor=eyJpZCI6MTIzNH0=Opaque token for next page. More efficient, handles inserts/deletes during pagination. Stripe uses this.
?after_id=12345Use last item's ID to fetch next page. Efficient, but requires stable sort. Etherscan uses this.
Production pattern: Always implement pagination handling as a generator or iterator. Fetch pages on demand rather than loading everything into memory:
async def fetch_all_orders(session, base_url, api_key):
"""Generator that yields all orders across pages."""
url = f"{base_url}/orders/?limit=100"
while url:
async with session.get(url, headers={"Authorization": f"App {api_key}"}) as resp:
if resp.status == 429: # Rate limited
retry_after = int(resp.headers.get("Retry-After", 60))
await asyncio.sleep(retry_after)
continue
resp.raise_for_status()
data = await resp.json()
for order in data["results"]:
yield order
url = data.get("next") # None when no more pages
if url:
await asyncio.sleep(0.1) # Respect rate limits
GraphQL: Precision Queries
GraphQL solves REST's over-fetching problem. Instead of getting a fixed response shape, you request exactly the fields you need. Distru uses GraphQL for their manufacturing ERP API.
# Fetch products with only the fields we need
query GetProducts($first: Int!, $after: String) {
products(first: $first, after: $after) {
edges {
node {
id
name
sku
category {
name
}
currentInventory {
quantity
locationName
}
unitPrice
metrcTagIds
}
cursor
}
pageInfo {
hasNextPage
endCursor
}
}
}
# Variables
{
"first": 50,
"after": "cursor_abc123"
}
Key GraphQL concepts:
- Single endpoint: All queries go to
/graphql. No more hunting for endpoints. - Schema as documentation: The schema defines all types, queries, and mutations. Self-documenting.
- Connections pattern: Pagination uses edges/nodes/pageInfo. More complex but more powerful.
- Mutations: Create/update operations are explicit, named operations.
mutation UpdateProductInventory($input: UpdateInventoryInput!) {
updateInventory(input: $input) {
product {
id
currentInventory {
quantity
locationName
}
}
errors {
field
messages
}
}
}
# Variables
{
"input": {
"productId": "prod_123",
"locationId": "loc_456",
"adjustment": 100,
"reason": "Received shipment"
}
}
- N+1 queries: Requesting nested data can cause performance issues server-side
- Caching: Harder to cache than REST (POST requests, custom queries)
- Error handling: HTTP 200 with errors in the response body. Check both!
- Tooling required: You really want a GraphQL client library, not raw HTTP
WebSockets: Real-Time Updates
When you need live data—chat messages, price feeds, blockchain events—WebSockets provide bidirectional, persistent connections. They're essential for real-time applications but more complex to manage.
import websockets
import asyncio
import json
class BlockchainPriceStream:
def __init__(self, url: str):
self.url = url
self.ws = None
self.reconnect_delay = 1
self.max_reconnect_delay = 60
async def connect(self):
while True:
try:
self.ws = await websockets.connect(self.url)
self.reconnect_delay = 1 # Reset on successful connection
# Subscribe to channels
await self.ws.send(json.dumps({
"method": "subscribe",
"params": ["eth_price", "gas_price"]
}))
async for message in self.ws:
await self.handle_message(json.loads(message))
except websockets.ConnectionClosed:
print(f"Connection closed, reconnecting in {self.reconnect_delay}s...")
await asyncio.sleep(self.reconnect_delay)
self.reconnect_delay = min(self.reconnect_delay * 2, self.max_reconnect_delay)
except Exception as e:
print(f"Error: {e}, reconnecting...")
await asyncio.sleep(self.reconnect_delay)
async def handle_message(self, data: dict):
if data.get("type") == "price_update":
# Process real-time price update
print(f"ETH: ${data['price']}")
elif data.get("type") == "ping":
await self.ws.send(json.dumps({"type": "pong"}))
WebSocket challenges:
- Connection management: Handle disconnects, reconnects, and state sync
- Heartbeats: Most servers require ping/pong to keep connections alive
- Ordering: Messages may arrive out of order or be duplicated
- Buffering: If you process slowly, messages queue up and memory grows
- Authentication: Often done via initial message or URL parameter, not HTTP headers
Choosing the Right Paradigm
| Use Case | Best Choice | Why |
|---|---|---|
| CRUD operations | REST | Simple, well-understood, cacheable |
| Complex data fetching | GraphQL | Avoid over-fetching, single request |
| Real-time updates | WebSocket | Push-based, low latency |
| Batch operations | REST | Bulk endpoints, parallel requests |
| Mobile/bandwidth constrained | GraphQL | Minimize payload size |
| Simple public data | REST | CDN caching, link sharing |
3. Authentication Patterns
Authentication is where integrations often fail first. Different APIs use different patterns, and getting it wrong means 401 errors forever. Here's what you'll encounter.
API Keys: Simple but Limited
API keys are the simplest authentication: a secret string you include with each request. LeafLink, Etherscan, and Distru use API keys.
# LeafLink: Custom Authorization header
GET /api/v2/orders/
Authorization: App sk_live_abc123xyz
# Etherscan: Query parameter
GET /api?module=account&action=balance&address=0x...&apikey=YOURKEY
# Distru: Bearer token (though it's an API key)
POST /graphql
Authorization: Bearer dist_abc123xyz
API key best practices:
- Never hardcode: Use environment variables or secrets managers
- Rotate regularly: Set calendar reminders to rotate keys quarterly
- Principle of least privilege: Use read-only keys when you don't need write access
- Monitor usage: Unexpected spikes may indicate a leaked key
- Separate per environment: Different keys for dev, staging, production
import os
from pathlib import Path
def load_api_key(service: str) -> str:
"""Load API key from file or environment."""
# Try environment variable first
env_key = f"{service.upper()}_API_KEY"
if key := os.environ.get(env_key):
return key
# Fall back to config file
key_file = Path.home() / ".config" / service / "api_key"
if key_file.exists():
return key_file.read_text().strip()
raise ValueError(f"No API key found for {service}. "
f"Set {env_key} or create {key_file}")
# Usage
leaflink_key = load_api_key("leaflink")
etherscan_key = load_api_key("etherscan")
OAuth 2.0: The Full Dance
OAuth 2.0 is complex but necessary when you need to access user data on their behalf. Google, QuickBooks, and many other enterprise APIs require OAuth.
The OAuth dance step by step:
- Build authorization URL: Include client_id, redirect_uri, scopes, and state
- User authenticates: They log in to the service and grant permissions
- Receive authorization code: Service redirects to your URL with a code
- Exchange code for tokens: POST the code to get access and refresh tokens
- Use access token: Include it in API requests
- Refresh when expired: Use refresh token to get new access token
import os
import json
import webbrowser
from datetime import datetime, timedelta
from pathlib import Path
from urllib.parse import urlencode, parse_qs, urlparse
from http.server import HTTPServer, BaseHTTPRequestHandler
import requests
class GoogleOAuth:
"""Complete OAuth 2.0 implementation for Google APIs."""
AUTH_URL = "https://accounts.google.com/o/oauth2/v2/auth"
TOKEN_URL = "https://oauth2.googleapis.com/token"
def __init__(self, client_id: str, client_secret: str,
scopes: list[str], redirect_port: int = 8080):
self.client_id = client_id
self.client_secret = client_secret
self.scopes = scopes
self.redirect_uri = f"http://localhost:{redirect_port}/callback"
self.redirect_port = redirect_port
self.tokens_file = Path.home() / ".config" / "google-calendar" / "tokens.json"
def get_authorization_url(self, state: str = None) -> str:
"""Build the URL to start OAuth flow."""
params = {
"client_id": self.client_id,
"redirect_uri": self.redirect_uri,
"response_type": "code",
"scope": " ".join(self.scopes),
"access_type": "offline", # Get refresh token
"prompt": "consent", # Force consent to get refresh token
}
if state:
params["state"] = state
return f"{self.AUTH_URL}?{urlencode(params)}"
def exchange_code(self, code: str) -> dict:
"""Exchange authorization code for tokens."""
response = requests.post(self.TOKEN_URL, data={
"client_id": self.client_id,
"client_secret": self.client_secret,
"code": code,
"grant_type": "authorization_code",
"redirect_uri": self.redirect_uri,
})
response.raise_for_status()
tokens = response.json()
# Add expiration timestamp
tokens["expires_at"] = (
datetime.now() + timedelta(seconds=tokens["expires_in"])
).isoformat()
self._save_tokens(tokens)
return tokens
def refresh_access_token(self) -> str:
"""Use refresh token to get new access token."""
tokens = self._load_tokens()
if not tokens or "refresh_token" not in tokens:
raise ValueError("No refresh token available. Re-authorize.")
response = requests.post(self.TOKEN_URL, data={
"client_id": self.client_id,
"client_secret": self.client_secret,
"refresh_token": tokens["refresh_token"],
"grant_type": "refresh_token",
})
response.raise_for_status()
new_tokens = response.json()
# Preserve refresh token (not always returned)
new_tokens["refresh_token"] = tokens["refresh_token"]
new_tokens["expires_at"] = (
datetime.now() + timedelta(seconds=new_tokens["expires_in"])
).isoformat()
self._save_tokens(new_tokens)
return new_tokens["access_token"]
def get_valid_token(self) -> str:
"""Get a valid access token, refreshing if necessary."""
tokens = self._load_tokens()
if not tokens:
raise ValueError("No tokens found. Run authorization flow first.")
expires_at = datetime.fromisoformat(tokens["expires_at"])
if datetime.now() >= expires_at - timedelta(minutes=5):
# Token expired or expiring soon, refresh
return self.refresh_access_token()
return tokens["access_token"]
def _save_tokens(self, tokens: dict):
self.tokens_file.parent.mkdir(parents=True, exist_ok=True)
self.tokens_file.write_text(json.dumps(tokens, indent=2))
self.tokens_file.chmod(0o600) # Secure permissions
def _load_tokens(self) -> dict | None:
if self.tokens_file.exists():
return json.loads(self.tokens_file.read_text())
return None
# Usage example
oauth = GoogleOAuth(
client_id=os.environ["GOOGLE_CLIENT_ID"],
client_secret=os.environ["GOOGLE_CLIENT_SECRET"],
scopes=[
"https://www.googleapis.com/auth/calendar",
"https://www.googleapis.com/auth/gmail.modify",
"https://www.googleapis.com/auth/drive",
]
)
# Get access token for API calls
token = oauth.get_valid_token()
headers = {"Authorization": f"Bearer {token}"}
- Missing
access_type=offline: You won't get a refresh token without it - Forgetting to refresh: Access tokens expire (usually 1 hour). Handle it!
- Lost refresh token: If you don't persist it, user must re-authorize
- Scope creep: Request only the scopes you need. Users notice.
- State parameter: Always use it to prevent CSRF attacks
JWT Tokens: Self-Contained Claims
JSON Web Tokens (JWTs) are often used as access tokens. They're special because they contain embedded claims that can be verified without a database lookup.
# A JWT has three parts, base64-encoded and separated by dots:
eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxMjM0IiwiZXhwIjoxNzA3MDAwMDAwfQ.signature
# Decoded header:
{"alg": "HS256", "typ": "JWT"}
# Decoded payload (claims):
{
"sub": "1234", # Subject (user ID)
"exp": 1707000000, # Expiration timestamp
"iat": 1706999000, # Issued at
"scope": "read write", # Permissions
"iss": "api.example.com" # Issuer
}
# Signature: HMAC-SHA256 of header + payload using secret key
Working with JWTs:
import jwt
from datetime import datetime
def validate_jwt(token: str, secret: str, required_scopes: list[str] = None) -> dict:
"""Validate JWT and return claims."""
try:
# Decode and verify signature
claims = jwt.decode(token, secret, algorithms=["HS256"])
# Check expiration (jwt library does this, but be explicit)
if claims.get("exp") and datetime.fromtimestamp(claims["exp"]) < datetime.now():
raise ValueError("Token expired")
# Check required scopes
if required_scopes:
token_scopes = claims.get("scope", "").split()
missing = set(required_scopes) - set(token_scopes)
if missing:
raise ValueError(f"Missing scopes: {missing}")
return claims
except jwt.InvalidTokenError as e:
raise ValueError(f"Invalid token: {e}")
# For APIs that use JWTs, you often just pass them through
# The API validates; you just need to refresh when expired
4. Rate Limiting and Backoff Strategies
Every API has limits. Hit them, and you'll get blocked. Understanding rate limiting patterns and implementing proper backoff is essential for production integrations.
Types of Rate Limits
| API | Limit Type | Limit | Window |
|---|---|---|---|
| LeafLink | Per-user | ~60 req | per minute |
| Etherscan | Per-key | 5 req | per second |
| Google APIs | Per-project | Varies by API | per day/minute |
| QuickBooks | Per-app | 500 req | per minute |
| Twilio | Per-account | Varies by endpoint | concurrency |
| Distru | Not documented | ~100 req | per minute (observed) |
Detecting Rate Limits
Rate limit responses vary by API, but common patterns include:
# Standard: HTTP 429 Too Many Requests
HTTP/1.1 429 Too Many Requests
Retry-After: 60
X-RateLimit-Limit: 100
X-RateLimit-Remaining: 0
X-RateLimit-Reset: 1707001200
# Etherscan: Returns 200 with error in body
{
"status": "0",
"message": "NOTOK",
"result": "Max rate limit reached, please use API Key for higher rate limit"
}
# Some APIs: HTTP 503 Service Unavailable (overload protection)
Exponential Backoff with Jitter
When rate limited, don't retry immediately—you'll just get blocked again. Exponential backoff with jitter is the standard pattern:
import asyncio
import random
from dataclasses import dataclass
from datetime import datetime, timedelta
import aiohttp
@dataclass
class RateLimitConfig:
requests_per_window: int
window_seconds: int
max_retries: int = 5
base_delay: float = 1.0
max_delay: float = 60.0
class RateLimitedClient:
"""HTTP client with rate limiting and exponential backoff."""
def __init__(self, config: RateLimitConfig):
self.config = config
self.request_times: list[datetime] = []
self.session: aiohttp.ClientSession = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, *args):
if self.session:
await self.session.close()
async def request(self, method: str, url: str, **kwargs) -> dict:
"""Make rate-limited request with automatic retry."""
for attempt in range(self.config.max_retries):
# Pre-emptive rate limiting
await self._wait_for_slot()
try:
async with self.session.request(method, url, **kwargs) as resp:
# Track request time
self._record_request()
# Success
if resp.status == 200:
return await resp.json()
# Rate limited
if resp.status == 429:
retry_after = self._parse_retry_after(resp)
delay = retry_after or self._backoff_delay(attempt)
print(f"Rate limited. Waiting {delay:.1f}s...")
await asyncio.sleep(delay)
continue
# Server error (may be transient)
if resp.status >= 500:
delay = self._backoff_delay(attempt)
print(f"Server error {resp.status}. Retrying in {delay:.1f}s...")
await asyncio.sleep(delay)
continue
# Client error (don't retry)
resp.raise_for_status()
except aiohttp.ClientError as e:
if attempt < self.config.max_retries - 1:
delay = self._backoff_delay(attempt)
print(f"Network error: {e}. Retrying in {delay:.1f}s...")
await asyncio.sleep(delay)
else:
raise
raise Exception(f"Max retries ({self.config.max_retries}) exceeded")
async def _wait_for_slot(self):
"""Wait until we have capacity within rate limit window."""
now = datetime.now()
window_start = now - timedelta(seconds=self.config.window_seconds)
# Remove old requests outside window
self.request_times = [t for t in self.request_times if t > window_start]
# If at capacity, wait for oldest request to exit window
if len(self.request_times) >= self.config.requests_per_window:
oldest = self.request_times[0]
wait_time = (oldest + timedelta(seconds=self.config.window_seconds) - now).total_seconds()
if wait_time > 0:
await asyncio.sleep(wait_time + 0.1) # Small buffer
def _record_request(self):
"""Record timestamp of request."""
self.request_times.append(datetime.now())
def _backoff_delay(self, attempt: int) -> float:
"""Calculate exponential backoff with jitter."""
delay = self.config.base_delay * (2 ** attempt)
delay = min(delay, self.config.max_delay)
# Add jitter: ±25% randomization
jitter = delay * 0.25 * (2 * random.random() - 1)
return delay + jitter
def _parse_retry_after(self, response) -> float | None:
"""Parse Retry-After header if present."""
if retry_after := response.headers.get("Retry-After"):
try:
return float(retry_after)
except ValueError:
pass
return None
# Usage for Etherscan (5 req/sec)
etherscan_config = RateLimitConfig(
requests_per_window=5,
window_seconds=1,
max_retries=5
)
async def fetch_balance(address: str) -> dict:
async with RateLimitedClient(etherscan_config) as client:
return await client.request(
"GET",
"https://api.etherscan.io/v2/api",
params={
"chainid": 1,
"module": "account",
"action": "balance",
"address": address,
"tag": "latest",
"apikey": ETHERSCAN_API_KEY
}
)
- Pre-emptive limiting: Track your own requests; don't wait for 429s
- Jitter is critical: Without it, retries from multiple clients sync up and cause thundering herd
- Respect Retry-After: If the API tells you when to retry, use it
- Batch when possible: One request for 100 items beats 100 requests for 1 item
- Cache aggressively: Don't re-fetch data that hasn't changed
5. Error Handling Best Practices
APIs fail. Networks fail. Services go down. Your integration needs to handle errors gracefully, provide useful debugging information, and recover automatically when possible.
Error Categories
| Category | HTTP Codes | Retry? | Action |
|---|---|---|---|
| Client Error | 400, 401, 403, 404, 422 | No | Fix your request |
| Rate Limit | 429 | Yes (with backoff) | Slow down |
| Server Error | 500, 502, 503, 504 | Yes (limited) | Wait and retry |
| Network Error | N/A (connection failed) | Yes (limited) | Check connectivity |
| Timeout | N/A | Maybe | Increase timeout or retry |
Structured Error Handling
from dataclasses import dataclass
from enum import Enum
from typing import Optional
import traceback
class ErrorSeverity(Enum):
TRANSIENT = "transient" # Retry will likely succeed
PERMANENT = "permanent" # Won't succeed without code change
UNKNOWN = "unknown" # Unclear, may need investigation
@dataclass
class APIError(Exception):
"""Structured API error with context."""
message: str
status_code: Optional[int] = None
response_body: Optional[str] = None
endpoint: Optional[str] = None
severity: ErrorSeverity = ErrorSeverity.UNKNOWN
retry_after: Optional[float] = None
request_id: Optional[str] = None
def __str__(self):
parts = [self.message]
if self.status_code:
parts.append(f"[HTTP {self.status_code}]")
if self.endpoint:
parts.append(f"at {self.endpoint}")
if self.request_id:
parts.append(f"(request_id: {self.request_id})")
return " ".join(parts)
@property
def should_retry(self) -> bool:
return self.severity == ErrorSeverity.TRANSIENT
def to_dict(self) -> dict:
"""Serialize for logging."""
return {
"message": self.message,
"status_code": self.status_code,
"response_body": self.response_body,
"endpoint": self.endpoint,
"severity": self.severity.value,
"retry_after": self.retry_after,
"request_id": self.request_id,
}
def classify_error(status_code: int, response_body: str = "") -> ErrorSeverity:
"""Classify error severity based on status code and response."""
if status_code == 429:
return ErrorSeverity.TRANSIENT
elif status_code >= 500:
return ErrorSeverity.TRANSIENT
elif status_code in (401, 403):
return ErrorSeverity.PERMANENT # Auth issues need manual fix
elif status_code == 404:
return ErrorSeverity.PERMANENT # Resource doesn't exist
elif status_code == 400:
# Could be transient (data validation) or permanent (bad request format)
if "temporarily" in response_body.lower():
return ErrorSeverity.TRANSIENT
return ErrorSeverity.PERMANENT
return ErrorSeverity.UNKNOWN
async def handle_response(response, endpoint: str) -> dict:
"""Process API response with structured error handling."""
body = await response.text()
request_id = response.headers.get("X-Request-ID")
if response.status == 200:
try:
data = await response.json()
# Some APIs return errors in 200 responses (looking at you, Etherscan)
if isinstance(data, dict) and data.get("status") == "0":
raise APIError(
message=data.get("message", "API returned error status"),
status_code=200,
response_body=body,
endpoint=endpoint,
severity=ErrorSeverity.PERMANENT,
request_id=request_id,
)
return data
except json.JSONDecodeError as e:
raise APIError(
message=f"Invalid JSON response: {e}",
status_code=response.status,
response_body=body[:500],
endpoint=endpoint,
severity=ErrorSeverity.TRANSIENT, # Might be temporary glitch
request_id=request_id,
)
# Error response
severity = classify_error(response.status, body)
retry_after = None
if response.status == 429:
retry_after = float(response.headers.get("Retry-After", 60))
raise APIError(
message=f"API request failed",
status_code=response.status,
response_body=body[:1000], # Truncate large error bodies
endpoint=endpoint,
severity=severity,
retry_after=retry_after,
request_id=request_id,
)
Logging for Debugging
When an integration fails in production, you need enough context to diagnose the issue. Log strategically:
import logging
import json
from datetime import datetime
class APILogger:
"""Structured logging for API operations."""
def __init__(self, service_name: str):
self.service = service_name
self.logger = logging.getLogger(f"api.{service_name}")
def request(self, method: str, url: str, **kwargs):
"""Log outgoing request (sanitize sensitive data)."""
# Remove auth headers from logs
safe_headers = {k: v for k, v in kwargs.get("headers", {}).items()
if k.lower() not in ("authorization", "x-api-key")}
self.logger.info(json.dumps({
"event": "api_request",
"service": self.service,
"method": method,
"url": url,
"headers": safe_headers,
"timestamp": datetime.now().isoformat(),
}))
def response(self, status: int, duration_ms: float, **kwargs):
"""Log response details."""
self.logger.info(json.dumps({
"event": "api_response",
"service": self.service,
"status": status,
"duration_ms": duration_ms,
"timestamp": datetime.now().isoformat(),
**kwargs,
}))
def error(self, error: APIError):
"""Log structured error."""
self.logger.error(json.dumps({
"event": "api_error",
"service": self.service,
**error.to_dict(),
"timestamp": datetime.now().isoformat(),
}))
- API keys, tokens, or credentials
- Customer personal information (PII)
- Payment details (card numbers, bank accounts)
- Full request bodies with sensitive data
- Internal system paths that reveal architecture
6. Webhook Patterns
Webhooks flip the model: instead of polling an API for changes, the API pushes changes to you. They're essential for real-time integrations but introduce new challenges.
Your App → (every 5 min) → API: "Any changes?"Delayed, wasteful, but simple
API → (on change) → Your App: "Here's what changed"Real-time, efficient, but complex
Webhook Security
Anyone can send HTTP requests to your webhook endpoint. You must verify that requests actually came from the expected service.
import hmac
import hashlib
from fastapi import FastAPI, Request, HTTPException
app = FastAPI()
WEBHOOK_SECRET = os.environ["LEAFLINK_WEBHOOK_SECRET"]
def verify_webhook_signature(payload: bytes, signature: str, secret: str) -> bool:
"""Verify HMAC-SHA256 webhook signature."""
expected = hmac.new(
secret.encode(),
payload,
hashlib.sha256
).hexdigest()
# Constant-time comparison to prevent timing attacks
return hmac.compare_digest(expected, signature)
@app.post("/webhooks/leaflink")
async def handle_leaflink_webhook(request: Request):
# Get raw body for signature verification
body = await request.body()
# Get signature from header (format varies by provider)
signature = request.headers.get("X-LeafLink-Signature", "")
if not verify_webhook_signature(body, signature, WEBHOOK_SECRET):
raise HTTPException(status_code=401, detail="Invalid signature")
# Parse and process
event = await request.json()
await process_webhook_event(event)
# Always return 200 quickly (process async if needed)
return {"status": "received"}
Idempotency: Handling Duplicate Webhooks
Webhooks can be delivered multiple times. Network issues, retries, and bugs all cause duplicates. Your handler must be idempotent—processing the same event twice should have no additional effect.
import hashlib
from datetime import datetime, timedelta
# Simple in-memory store (use Redis in production)
processed_events: dict[str, datetime] = {}
def get_event_id(event: dict) -> str:
"""Extract or generate unique event identifier."""
# Many providers include an event ID
if event_id := event.get("event_id") or event.get("id"):
return str(event_id)
# Fall back to content hash
content = json.dumps(event, sort_keys=True).encode()
return hashlib.sha256(content).hexdigest()[:16]
def is_duplicate(event_id: str, window_hours: int = 24) -> bool:
"""Check if event was already processed."""
# Clean old entries
cutoff = datetime.now() - timedelta(hours=window_hours)
expired = [k for k, v in processed_events.items() if v < cutoff]
for k in expired:
del processed_events[k]
return event_id in processed_events
def mark_processed(event_id: str):
"""Record that event was processed."""
processed_events[event_id] = datetime.now()
async def process_webhook_event(event: dict):
"""Process webhook with idempotency."""
event_id = get_event_id(event)
if is_duplicate(event_id):
print(f"Skipping duplicate event: {event_id}")
return
try:
# Actual processing
event_type = event.get("type")
if event_type == "order.created":
await handle_new_order(event["data"])
elif event_type == "order.updated":
await handle_order_update(event["data"])
elif event_type == "order.shipped":
await handle_order_shipped(event["data"])
else:
print(f"Unknown event type: {event_type}")
# Only mark processed after successful handling
mark_processed(event_id)
except Exception as e:
# Don't mark as processed - allow retry
print(f"Error processing {event_id}: {e}")
raise
Webhook Best Practices
- Respond quickly: Return 200 within 5 seconds. Process async if needed.
- Queue for processing: Don't block the response on slow operations.
- Implement retries: If you return non-2xx, expect retries.
- Log everything: Webhooks are hard to replay; good logs are essential.
- Have a fallback: Poll periodically to catch missed webhooks.
- Use a public URL: Localhost won't work. Use ngrok for development.
7. Case Study: LeafLink Integration
LeafLink is the dominant B2B wholesale marketplace in the cannabis industry. Our integration connects to 285 customers, 1,468 products, and has processed over 9,000 orders. Here's how we built it.
API Overview
LeafLink provides a REST API with comprehensive access to orders, products, customers, and inventory. Authentication is via API key in a custom header format.
from dataclasses import dataclass
from pathlib import Path
@dataclass
class LeafLinkConfig:
"""LeafLink API configuration."""
base_url: str = "https://www.leaflink.com/api/v2"
api_key: str = None
seller_id: int = 6808
brand_id: int = 1518
def __post_init__(self):
if not self.api_key:
# Load from secure file
key_file = Path.home() / ".config" / "leaflink" / "api_key"
self.api_key = key_file.read_text().strip()
@property
def headers(self) -> dict:
return {
"Authorization": f"App {self.api_key}",
"Content-Type": "application/json",
"Accept": "application/json",
}
Fetching Orders with Full Context
The real challenge with LeafLink isn't basic API calls—it's efficiently fetching orders with all related data (customer details, product info, line items) without hammering the API.
import asyncio
import aiohttp
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import AsyncIterator, Optional
import json
@dataclass
class LeafLinkOrder:
"""Structured order data."""
id: int
number: str
status: str
customer_id: int
customer_name: Optional[str] = None
total: float = 0.0
created_on: Optional[datetime] = None
modified_on: Optional[datetime] = None
line_items: list = field(default_factory=list)
raw_data: dict = field(default_factory=dict)
class LeafLinkClient:
"""Production LeafLink API client."""
def __init__(self, config: LeafLinkConfig):
self.config = config
self.session: aiohttp.ClientSession = None
self._customer_cache: dict[int, dict] = {}
self._product_cache: dict[int, dict] = {}
async def __aenter__(self):
self.session = aiohttp.ClientSession(
headers=self.config.headers,
timeout=aiohttp.ClientTimeout(total=30)
)
return self
async def __aexit__(self, *args):
if self.session:
await self.session.close()
async def get_orders(
self,
since: datetime = None,
status: str = None,
limit: int = 100
) -> AsyncIterator[LeafLinkOrder]:
"""Fetch orders with pagination."""
params = {
"limit": min(limit, 100),
"ordering": "-modified",
"seller": self.config.seller_id,
}
if since:
params["modified__gte"] = since.isoformat()
if status:
params["status"] = status
url = f"{self.config.base_url}/orders/"
while url:
async with self.session.get(url, params=params if "?" not in url else None) as resp:
if resp.status == 429:
retry_after = int(resp.headers.get("Retry-After", 60))
print(f"Rate limited, waiting {retry_after}s...")
await asyncio.sleep(retry_after)
continue
resp.raise_for_status()
data = await resp.json()
for order_data in data.get("results", []):
yield await self._parse_order(order_data)
url = data.get("next")
params = {} # Next URL includes params
if url:
await asyncio.sleep(0.1) # Rate limit courtesy
async def _parse_order(self, data: dict) -> LeafLinkOrder:
"""Parse order with customer lookup."""
customer_id = data.get("customer")
customer_name = None
if customer_id:
customer = await self._get_customer(customer_id)
customer_name = customer.get("name") if customer else None
return LeafLinkOrder(
id=data["id"],
number=data.get("number", ""),
status=data.get("status", "Unknown"),
customer_id=customer_id,
customer_name=customer_name,
total=float(data.get("total", 0)),
created_on=datetime.fromisoformat(data["created_on"].replace("Z", "+00:00")) if data.get("created_on") else None,
modified_on=datetime.fromisoformat(data["modified_on"].replace("Z", "+00:00")) if data.get("modified_on") else None,
line_items=data.get("line_items", []),
raw_data=data,
)
async def _get_customer(self, customer_id: int) -> dict:
"""Fetch customer with caching."""
if customer_id in self._customer_cache:
return self._customer_cache[customer_id]
url = f"{self.config.base_url}/customers/{customer_id}/"
try:
async with self.session.get(url) as resp:
if resp.status == 200:
customer = await resp.json()
self._customer_cache[customer_id] = customer
return customer
elif resp.status == 404:
self._customer_cache[customer_id] = {}
return {}
except Exception as e:
print(f"Error fetching customer {customer_id}: {e}")
return {}
async def get_products(self, active_only: bool = True) -> AsyncIterator[dict]:
"""Fetch all products."""
params = {
"limit": 100,
"seller": self.config.seller_id,
}
if active_only:
params["archived"] = False
url = f"{self.config.base_url}/products/"
while url:
async with self.session.get(url, params=params if "?" not in url else None) as resp:
resp.raise_for_status()
data = await resp.json()
for product in data.get("results", []):
yield product
url = data.get("next")
params = {}
if url:
await asyncio.sleep(0.1)
async def get_order_transitions(self, order_id: int) -> list[dict]:
"""Get status change history for an order."""
url = f"{self.config.base_url}/orders-status-transitions/"
params = {"order": order_id}
async with self.session.get(url, params=params) as resp:
resp.raise_for_status()
data = await resp.json()
return data.get("results", [])
# Usage example
async def sync_recent_orders():
"""Sync orders from the last 24 hours."""
config = LeafLinkConfig()
since = datetime.now() - timedelta(hours=24)
async with LeafLinkClient(config) as client:
orders = []
async for order in client.get_orders(since=since):
orders.append(order)
print(f"Order {order.number}: {order.status} - ${order.total:.2f} ({order.customer_name})")
print(f"\nSynced {len(orders)} orders")
return orders
# Run it
# asyncio.run(sync_recent_orders())
Lessons Learned from LeafLink
- Customer IDs are separate: Orders reference customer IDs; you need a separate call to get names
- Line items are nested: Full product details require following the product ID
- Status transitions are gold: The transitions endpoint shows order history
- Modified ordering catches updates: Sort by -modified to see recently changed orders
- Archived products still exist: Filter by archived=false for active inventory
8. Case Study: Google OAuth Flow
Google's APIs provide access to Calendar, Gmail, Drive, Docs, Sheets, and more. The authentication is OAuth 2.0, which is powerful but requires careful implementation.
Setting Up Google OAuth
Before any code, you need to configure a Google Cloud project:
- Create a project in Google Cloud Console
- Enable the APIs you need (Calendar, Gmail, Drive, etc.)
- Configure the OAuth consent screen (app name, scopes, test users)
- Create OAuth 2.0 credentials (client ID and secret)
- Set authorized redirect URIs (localhost for development)
Complete OAuth + Calendar Implementation
import os
import json
import webbrowser
from datetime import datetime, timedelta
from pathlib import Path
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlencode, parse_qs, urlparse
import threading
import requests
class GoogleCalendarClient:
"""Complete Google Calendar integration with OAuth."""
AUTH_URL = "https://accounts.google.com/o/oauth2/v2/auth"
TOKEN_URL = "https://oauth2.googleapis.com/token"
CALENDAR_API = "https://www.googleapis.com/calendar/v3"
SCOPES = [
"https://www.googleapis.com/auth/calendar", # Full calendar access
"https://www.googleapis.com/auth/calendar.events", # Events only
]
def __init__(self):
self.config_dir = Path.home() / ".config" / "google-calendar"
self.config_dir.mkdir(parents=True, exist_ok=True)
# Load credentials from env file
self._load_credentials()
self.tokens_file = self.config_dir / "tokens.json"
self.tokens = self._load_tokens()
def _load_credentials(self):
"""Load OAuth credentials from secrets file."""
secrets_file = self.config_dir / "secrets.env"
if secrets_file.exists():
for line in secrets_file.read_text().splitlines():
if "=" in line and not line.startswith("#"):
key, value = line.split("=", 1)
os.environ[key.strip()] = value.strip()
self.client_id = os.environ.get("GOOGLE_CLIENT_ID")
self.client_secret = os.environ.get("GOOGLE_CLIENT_SECRET")
if not self.client_id or not self.client_secret:
raise ValueError("Missing GOOGLE_CLIENT_ID or GOOGLE_CLIENT_SECRET")
def _load_tokens(self) -> dict:
if self.tokens_file.exists():
return json.loads(self.tokens_file.read_text())
return {}
def _save_tokens(self, tokens: dict):
self.tokens_file.write_text(json.dumps(tokens, indent=2))
self.tokens_file.chmod(0o600)
self.tokens = tokens
def authorize(self, port: int = 8080):
"""Run OAuth authorization flow."""
redirect_uri = f"http://localhost:{port}/callback"
# Build authorization URL
params = {
"client_id": self.client_id,
"redirect_uri": redirect_uri,
"response_type": "code",
"scope": " ".join(self.SCOPES),
"access_type": "offline",
"prompt": "consent",
}
auth_url = f"{self.AUTH_URL}?{urlencode(params)}"
# Local server to receive callback
authorization_code = None
class CallbackHandler(BaseHTTPRequestHandler):
def do_GET(self):
nonlocal authorization_code
query = parse_qs(urlparse(self.path).query)
authorization_code = query.get("code", [None])[0]
self.send_response(200)
self.send_header("Content-Type", "text/html")
self.end_headers()
if authorization_code:
self.wfile.write(b"Authorization successful!
You can close this window.
")
else:
self.wfile.write(b"Authorization failed
")
def log_message(self, *args):
pass # Suppress logging
# Start server
server = HTTPServer(("localhost", port), CallbackHandler)
server_thread = threading.Thread(target=server.handle_request)
server_thread.start()
# Open browser
print(f"Opening browser for authorization...")
webbrowser.open(auth_url)
# Wait for callback
server_thread.join(timeout=120)
server.server_close()
if not authorization_code:
raise ValueError("Authorization failed - no code received")
# Exchange code for tokens
response = requests.post(self.TOKEN_URL, data={
"client_id": self.client_id,
"client_secret": self.client_secret,
"code": authorization_code,
"grant_type": "authorization_code",
"redirect_uri": redirect_uri,
})
response.raise_for_status()
tokens = response.json()
tokens["expires_at"] = (
datetime.now() + timedelta(seconds=tokens["expires_in"])
).isoformat()
self._save_tokens(tokens)
print("Authorization complete! Tokens saved.")
def get_access_token(self) -> str:
"""Get valid access token, refreshing if needed."""
if not self.tokens:
raise ValueError("Not authorized. Run authorize() first.")
expires_at = datetime.fromisoformat(self.tokens["expires_at"])
if datetime.now() >= expires_at - timedelta(minutes=5):
# Refresh the token
response = requests.post(self.TOKEN_URL, data={
"client_id": self.client_id,
"client_secret": self.client_secret,
"refresh_token": self.tokens["refresh_token"],
"grant_type": "refresh_token",
})
response.raise_for_status()
new_tokens = response.json()
new_tokens["refresh_token"] = self.tokens.get("refresh_token")
new_tokens["expires_at"] = (
datetime.now() + timedelta(seconds=new_tokens["expires_in"])
).isoformat()
self._save_tokens(new_tokens)
return self.tokens["access_token"]
@property
def headers(self) -> dict:
return {"Authorization": f"Bearer {self.get_access_token()}"}
def list_calendars(self) -> list[dict]:
"""List all calendars for the user."""
response = requests.get(
f"{self.CALENDAR_API}/users/me/calendarList",
headers=self.headers
)
response.raise_for_status()
return response.json().get("items", [])
def get_upcoming_events(
self,
calendar_id: str = "primary",
days: int = 7,
max_results: int = 50
) -> list[dict]:
"""Get upcoming events from a calendar."""
time_min = datetime.utcnow().isoformat() + "Z"
time_max = (datetime.utcnow() + timedelta(days=days)).isoformat() + "Z"
response = requests.get(
f"{self.CALENDAR_API}/calendars/{calendar_id}/events",
headers=self.headers,
params={
"timeMin": time_min,
"timeMax": time_max,
"maxResults": max_results,
"singleEvents": True,
"orderBy": "startTime",
}
)
response.raise_for_status()
return response.json().get("items", [])
def create_event(
self,
summary: str,
start: datetime,
end: datetime,
description: str = "",
calendar_id: str = "primary",
attendees: list[str] = None,
) -> dict:
"""Create a calendar event."""
event = {
"summary": summary,
"description": description,
"start": {
"dateTime": start.isoformat(),
"timeZone": "America/Anchorage",
},
"end": {
"dateTime": end.isoformat(),
"timeZone": "America/Anchorage",
},
}
if attendees:
event["attendees"] = [{"email": email} for email in attendees]
response = requests.post(
f"{self.CALENDAR_API}/calendars/{calendar_id}/events",
headers=self.headers,
json=event
)
response.raise_for_status()
return response.json()
# Usage
def demo_calendar():
client = GoogleCalendarClient()
# First time: authorize
# client.authorize()
# List calendars
calendars = client.list_calendars()
for cal in calendars:
print(f"📅 {cal['summary']} ({cal['id']})")
# Get upcoming events
events = client.get_upcoming_events(days=7)
for event in events:
start = event["start"].get("dateTime", event["start"].get("date"))
print(f" • {start}: {event['summary']}")
# Create an event
# new_event = client.create_event(
# summary="Team Standup",
# start=datetime.now() + timedelta(days=1, hours=9),
# end=datetime.now() + timedelta(days=1, hours=9, minutes=30),
# description="Daily sync meeting",
# )
Gmail Integration
With the same OAuth flow (different scopes), you can access Gmail:
GMAIL_API = "https://www.googleapis.com/gmail/v1"
def get_unread_emails(headers: dict, max_results: int = 10) -> list[dict]:
"""Fetch unread emails from inbox."""
response = requests.get(
f"{GMAIL_API}/users/me/messages",
headers=headers,
params={
"labelIds": ["INBOX", "UNREAD"],
"maxResults": max_results,
}
)
response.raise_for_status()
messages = []
for msg in response.json().get("messages", []):
# Fetch full message
detail = requests.get(
f"{GMAIL_API}/users/me/messages/{msg['id']}",
headers=headers,
params={"format": "metadata", "metadataHeaders": ["Subject", "From", "Date"]}
)
detail.raise_for_status()
msg_data = detail.json()
headers_list = msg_data.get("payload", {}).get("headers", [])
headers_dict = {h["name"]: h["value"] for h in headers_list}
messages.append({
"id": msg["id"],
"subject": headers_dict.get("Subject", "(no subject)"),
"from": headers_dict.get("From", ""),
"date": headers_dict.get("Date", ""),
"snippet": msg_data.get("snippet", ""),
})
return messages
- Refresh tokens disappear: If you don't request
access_type=offlineandprompt=consent, you won't get a refresh token - Scopes matter: Users see exactly what permissions you request. Request only what you need.
- Verification required: For sensitive scopes (Gmail modify, Drive), Google requires app verification
- Test users: Before verification, only test users in your consent screen can authorize
- Quota limits: Free tier has per-day limits. Check the APIs & Services dashboard.
9. Case Study: Real-Time Blockchain Data
Etherscan provides blockchain data via a REST API. Unlike typical web APIs, blockchain data has unique characteristics: immutable history, real-time mempool, and cryptographic verification.
Etherscan API Patterns
Etherscan's API is unusual: it returns HTTP 200 for errors, with error details in the response body. This requires careful error handling.
import asyncio
import aiohttp
from dataclasses import dataclass
from decimal import Decimal
from typing import Optional
from pathlib import Path
@dataclass
class EtherscanConfig:
"""Etherscan API configuration."""
api_key: str = None
base_url: str = "https://api.etherscan.io/v2/api"
chain_id: int = 1 # Ethereum mainnet
rate_limit: float = 0.2 # 5 requests per second
def __post_init__(self):
if not self.api_key:
key_file = Path.home() / ".config" / "etherscan" / "api_key"
if key_file.exists():
self.api_key = key_file.read_text().strip()
else:
raise ValueError(f"No API key found. Create {key_file}")
class EtherscanError(Exception):
"""Etherscan API error."""
pass
class EtherscanClient:
"""Production Etherscan API client."""
def __init__(self, config: EtherscanConfig = None):
self.config = config or EtherscanConfig()
self.session: aiohttp.ClientSession = None
self._last_request = 0
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, *args):
if self.session:
await self.session.close()
async def _request(self, module: str, action: str, **params) -> dict:
"""Make rate-limited request to Etherscan."""
# Enforce rate limit
now = asyncio.get_event_loop().time()
wait_time = self.config.rate_limit - (now - self._last_request)
if wait_time > 0:
await asyncio.sleep(wait_time)
all_params = {
"chainid": self.config.chain_id,
"module": module,
"action": action,
"apikey": self.config.api_key,
**params
}
async with self.session.get(self.config.base_url, params=all_params) as resp:
self._last_request = asyncio.get_event_loop().time()
data = await resp.json()
# Etherscan returns 200 with errors in body
if data.get("status") == "0":
message = data.get("message", "Unknown error")
result = data.get("result", "")
# Rate limit error
if "rate limit" in result.lower():
await asyncio.sleep(1)
return await self._request(module, action, **params) # Retry
# No transactions is not an error
if "No transactions found" in str(result):
return {"result": []}
raise EtherscanError(f"{message}: {result}")
return data
async def get_balance(self, address: str) -> Decimal:
"""Get ETH balance for address."""
data = await self._request("account", "balance", address=address, tag="latest")
# Balance is in wei, convert to ETH
wei = int(data["result"])
return Decimal(wei) / Decimal(10**18)
async def get_token_balance(self, address: str, contract: str) -> Decimal:
"""Get ERC-20 token balance."""
data = await self._request(
"account", "tokenbalance",
address=address,
contractaddress=contract,
tag="latest"
)
# Need to know token decimals for proper conversion
return Decimal(data["result"])
async def get_transactions(
self,
address: str,
start_block: int = 0,
end_block: int = 99999999,
page: int = 1,
offset: int = 100,
sort: str = "desc"
) -> list[dict]:
"""Get normal transactions for address."""
data = await self._request(
"account", "txlist",
address=address,
startblock=start_block,
endblock=end_block,
page=page,
offset=offset,
sort=sort
)
return data.get("result", [])
async def get_token_transfers(
self,
address: str,
contract: str = None,
start_block: int = 0,
end_block: int = 99999999,
) -> list[dict]:
"""Get ERC-20 token transfers."""
params = {
"address": address,
"startblock": start_block,
"endblock": end_block,
"sort": "desc"
}
if contract:
params["contractaddress"] = contract
data = await self._request("account", "tokentx", **params)
return data.get("result", [])
async def get_gas_price(self) -> dict:
"""Get current gas prices."""
data = await self._request("gastracker", "gasoracle")
result = data.get("result", {})
return {
"safe": int(result.get("SafeGasPrice", 0)),
"standard": int(result.get("ProposeGasPrice", 0)),
"fast": int(result.get("FastGasPrice", 0)),
"base_fee": float(result.get("suggestBaseFee", 0)),
}
async def get_contract_abi(self, contract: str) -> list:
"""Get verified contract ABI."""
data = await self._request("contract", "getabi", address=contract)
import json
return json.loads(data["result"])
# Usage examples
async def demo_etherscan():
async with EtherscanClient() as client:
# Check an address balance
address = "0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045" # vitalik.eth
balance = await client.get_balance(address)
print(f"Balance: {balance:.4f} ETH")
# Get recent transactions
txs = await client.get_transactions(address, offset=5)
for tx in txs:
value_eth = Decimal(tx["value"]) / Decimal(10**18)
direction = "←" if tx["to"].lower() == address.lower() else "→"
print(f" {direction} {value_eth:.4f} ETH ({tx['hash'][:16]}...)")
# Current gas prices
gas = await client.get_gas_price()
print(f"\nGas: {gas['safe']}/{gas['standard']}/{gas['fast']} gwei (safe/standard/fast)")
# asyncio.run(demo_etherscan())
Monitoring Wallets for Activity
A common use case is monitoring wallets for incoming transactions or token transfers:
class WalletMonitor:
"""Monitor wallets for new transactions."""
def __init__(self, client: EtherscanClient, addresses: list[str]):
self.client = client
self.addresses = [a.lower() for a in addresses]
self.last_seen: dict[str, str] = {} # address -> last tx hash
async def check_for_new_transactions(self) -> list[dict]:
"""Check all addresses for new transactions."""
new_txs = []
for address in self.addresses:
txs = await self.client.get_transactions(address, offset=10)
if not txs:
continue
last_hash = self.last_seen.get(address)
for tx in txs:
if tx["hash"] == last_hash:
break # Reached already-seen transactions
new_txs.append({
"type": "transaction",
"address": address,
"tx": tx,
"direction": "in" if tx["to"].lower() == address else "out",
"value_eth": Decimal(tx["value"]) / Decimal(10**18),
})
if txs:
self.last_seen[address] = txs[0]["hash"]
return new_txs
async def run(self, interval_seconds: int = 30):
"""Continuously monitor for new transactions."""
print(f"Monitoring {len(self.addresses)} addresses...")
while True:
try:
new_txs = await self.check_for_new_transactions()
for tx in new_txs:
direction = "Received" if tx["direction"] == "in" else "Sent"
print(f"🔔 {direction} {tx['value_eth']:.4f} ETH to {tx['address'][:10]}...")
await asyncio.sleep(interval_seconds)
except EtherscanError as e:
print(f"API error: {e}")
await asyncio.sleep(60) # Back off on errors
- Data is immutable: Once confirmed, blockchain data never changes. Cache aggressively.
- Block confirmations: Wait for 12+ confirmations before considering transactions final
- Wei vs ETH: All values are in wei (10^18 wei = 1 ETH). Don't lose precision.
- Checksummed addresses: Ethereum addresses should be checksummed (mixed case)
- Multiple chains: Etherscan v2 API supports multiple chains via chainid parameter
10. Building Robust Integrations That Don't Break
Building an integration is easy. Building one that survives months of production without constant firefighting is hard. Here are the patterns that make integrations resilient.
The Circuit Breaker Pattern
When an API is down, hammering it with retries makes things worse. A circuit breaker "trips" after repeated failures, fast-failing subsequent requests until the service recovers.
from enum import Enum
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Callable
import asyncio
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject requests
HALF_OPEN = "half_open" # Testing if recovered
@dataclass
class CircuitBreakerConfig:
failure_threshold: int = 5 # Failures before opening
reset_timeout: float = 60.0 # Seconds before trying again
half_open_requests: int = 3 # Test requests in half-open
class CircuitBreaker:
"""Circuit breaker for API calls."""
def __init__(self, name: str, config: CircuitBreakerConfig = None):
self.name = name
self.config = config or CircuitBreakerConfig()
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
self.last_failure_time: datetime = None
async def call(self, func: Callable, *args, **kwargs):
"""Execute function with circuit breaker protection."""
if self.state == CircuitState.OPEN:
if self._should_try_reset():
self.state = CircuitState.HALF_OPEN
self.success_count = 0
else:
raise CircuitOpenError(f"Circuit {self.name} is open")
try:
result = await func(*args, **kwargs)
self._record_success()
return result
except Exception as e:
self._record_failure()
raise
def _should_try_reset(self) -> bool:
"""Check if enough time passed to try recovery."""
if not self.last_failure_time:
return True
return datetime.now() > self.last_failure_time + timedelta(seconds=self.config.reset_timeout)
def _record_success(self):
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.config.half_open_requests:
self.state = CircuitState.CLOSED
self.failure_count = 0
print(f"Circuit {self.name} closed (recovered)")
self.failure_count = 0
def _record_failure(self):
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.OPEN
print(f"Circuit {self.name} reopened (still failing)")
elif self.failure_count >= self.config.failure_threshold:
self.state = CircuitState.OPEN
print(f"Circuit {self.name} opened (threshold reached)")
class CircuitOpenError(Exception):
pass
# Usage
leaflink_circuit = CircuitBreaker("leaflink")
async def fetch_orders_safe():
return await leaflink_circuit.call(fetch_orders_from_api)
Graceful Degradation
When an integration fails, your application should continue functioning with reduced capability rather than crashing entirely.
from dataclasses import dataclass
from typing import Optional
import json
from pathlib import Path
@dataclass
class CachedData:
"""Wrapper for data with freshness tracking."""
data: any
fetched_at: datetime
source: str # "api" or "cache"
@property
def age_seconds(self) -> float:
return (datetime.now() - self.fetched_at).total_seconds()
@property
def is_stale(self) -> bool:
return self.age_seconds > 300 # 5 minutes
class ResilientDataFetcher:
"""Fetch data with fallback to cache."""
def __init__(self, cache_dir: Path):
self.cache_dir = cache_dir
self.cache_dir.mkdir(parents=True, exist_ok=True)
async def fetch_with_fallback(
self,
key: str,
fetch_func,
max_cache_age: int = 3600
) -> CachedData:
"""Fetch fresh data, fall back to cache on failure."""
cache_file = self.cache_dir / f"{key}.json"
# Try fresh fetch
try:
data = await fetch_func()
# Cache the successful result
self._save_cache(cache_file, data)
return CachedData(
data=data,
fetched_at=datetime.now(),
source="api"
)
except Exception as e:
print(f"API fetch failed: {e}")
# Fall back to cache
cached = self._load_cache(cache_file)
if cached:
age = (datetime.now() - cached["fetched_at"]).total_seconds()
if age < max_cache_age:
print(f"Using cached data ({age:.0f}s old)")
return CachedData(
data=cached["data"],
fetched_at=cached["fetched_at"],
source="cache"
)
else:
print(f"Cache too old ({age:.0f}s), but using anyway")
return CachedData(
data=cached["data"],
fetched_at=cached["fetched_at"],
source="stale_cache"
)
# No cache available
raise
def _save_cache(self, path: Path, data: any):
cache_data = {
"data": data,
"fetched_at": datetime.now().isoformat(),
}
path.write_text(json.dumps(cache_data, default=str))
def _load_cache(self, path: Path) -> Optional[dict]:
if not path.exists():
return None
try:
cached = json.loads(path.read_text())
cached["fetched_at"] = datetime.fromisoformat(cached["fetched_at"])
return cached
except:
return None
Health Checks and Dependency Monitoring
@dataclass
class HealthStatus:
service: str
healthy: bool
latency_ms: float
last_check: datetime
error: Optional[str] = None
class APIHealthMonitor:
"""Monitor health of API dependencies."""
def __init__(self):
self.status: dict[str, HealthStatus] = {}
async def check_leaflink(self) -> HealthStatus:
"""Check LeafLink API health."""
start = datetime.now()
try:
async with aiohttp.ClientSession() as session:
async with session.get(
"https://www.leaflink.com/api/v2/orders/?limit=1",
headers={"Authorization": f"App {LEAFLINK_KEY}"},
timeout=aiohttp.ClientTimeout(total=10)
) as resp:
latency = (datetime.now() - start).total_seconds() * 1000
return HealthStatus(
service="leaflink",
healthy=resp.status == 200,
latency_ms=latency,
last_check=datetime.now(),
error=None if resp.status == 200 else f"HTTP {resp.status}"
)
except Exception as e:
return HealthStatus(
service="leaflink",
healthy=False,
latency_ms=-1,
last_check=datetime.now(),
error=str(e)
)
async def check_all(self) -> dict[str, HealthStatus]:
"""Check all configured APIs."""
checks = [
self.check_leaflink(),
# self.check_distru(),
# self.check_quickbooks(),
# ...
]
results = await asyncio.gather(*checks, return_exceptions=True)
for result in results:
if isinstance(result, HealthStatus):
self.status[result.service] = result
return self.status
def is_healthy(self, service: str) -> bool:
"""Check if a specific service is healthy."""
status = self.status.get(service)
if not status:
return True # Unknown = assume healthy
# Consider unhealthy if last check failed or is too old
if not status.healthy:
return False
if (datetime.now() - status.last_check).total_seconds() > 300:
return False # Stale health check
return True
11. Testing and Monitoring
Integration code is notoriously hard to test. External APIs aren't available in CI, responses change, and rate limits make exhaustive testing impractical. Here's how to approach it pragmatically.
Testing Strategy
| Test Type | What It Tests | How to Do It |
|---|---|---|
| Unit Tests | Your parsing, transformation logic | Mock API responses with saved fixtures |
| Contract Tests | Response schema matches expectations | Validate against JSON Schema or Pydantic models |
| Integration Tests | End-to-end flows work | Use sandbox/test accounts, run sparingly |
| Smoke Tests | Production API is reachable | Lightweight health checks in prod |
Mocking API Responses
import pytest
from unittest.mock import AsyncMock, patch
from pathlib import Path
import json
# Save real API responses as fixtures
FIXTURES_DIR = Path(__file__).parent / "fixtures"
def load_fixture(name: str) -> dict:
"""Load saved API response."""
return json.loads((FIXTURES_DIR / f"{name}.json").read_text())
class TestLeafLinkClient:
"""Tests for LeafLink integration."""
@pytest.fixture
def mock_session(self):
"""Create mock aiohttp session."""
session = AsyncMock()
return session
@pytest.mark.asyncio
async def test_parse_order(self, mock_session):
"""Test order parsing logic."""
# Load fixture
order_data = load_fixture("leaflink_order")
# Test parsing
client = LeafLinkClient(LeafLinkConfig(api_key="test"))
client.session = mock_session
client._customer_cache = {28451: {"name": "Test Dispensary"}}
order = await client._parse_order(order_data)
assert order.number == "SO-2026-0892"
assert order.status == "Submitted"
assert order.customer_name == "Test Dispensary"
assert order.total == 4250.00
@pytest.mark.asyncio
async def test_handles_rate_limit(self, mock_session):
"""Test rate limit handling."""
# First call returns 429, second succeeds
mock_response_429 = AsyncMock()
mock_response_429.status = 429
mock_response_429.headers = {"Retry-After": "1"}
mock_response_200 = AsyncMock()
mock_response_200.status = 200
mock_response_200.json = AsyncMock(return_value={"results": []})
mock_session.get.return_value.__aenter__.side_effect = [
mock_response_429,
mock_response_200
]
# Should retry and succeed
# ... test implementation
@pytest.mark.asyncio
async def test_handles_missing_customer(self, mock_session):
"""Test graceful handling when customer lookup fails."""
order_data = load_fixture("leaflink_order")
# Mock 404 on customer lookup
mock_response = AsyncMock()
mock_response.status = 404
mock_session.get.return_value.__aenter__.return_value = mock_response
client = LeafLinkClient(LeafLinkConfig(api_key="test"))
client.session = mock_session
order = await client._parse_order(order_data)
# Should still parse, just without customer name
assert order.customer_name is None
assert order.customer_id == 28451
# Recording fixtures from real API (run manually)
async def record_fixtures():
"""Save real API responses for testing."""
config = LeafLinkConfig()
async with LeafLinkClient(config) as client:
async for order in client.get_orders(limit=1):
FIXTURES_DIR.mkdir(exist_ok=True)
(FIXTURES_DIR / "leaflink_order.json").write_text(
json.dumps(order.raw_data, indent=2)
)
break
Schema Validation
APIs change without warning. Schema validation catches breaking changes before they cause silent failures:
from pydantic import BaseModel, validator
from typing import Optional
from datetime import datetime
class LeafLinkOrderSchema(BaseModel):
"""Expected schema for LeafLink order response."""
id: int
number: str
status: str
customer: Optional[int]
total: str # LeafLink returns as string
created_on: datetime
modified_on: Optional[datetime]
line_items: list
@validator("total", pre=True)
def parse_total(cls, v):
return str(v) # Ensure string
class Config:
extra = "allow" # Allow extra fields (API may add new ones)
def validate_order_response(data: dict) -> LeafLinkOrderSchema:
"""Validate API response matches expected schema."""
try:
return LeafLinkOrderSchema(**data)
except Exception as e:
# Log schema violation for investigation
logger.error(f"Schema violation in order {data.get('id')}: {e}")
logger.error(f"Data: {json.dumps(data)[:500]}")
raise
# In your API client:
async def get_order(self, order_id: int) -> LeafLinkOrderSchema:
data = await self._request(f"orders/{order_id}/")
return validate_order_response(data) # Fails fast on schema change
Production Monitoring
Testing catches issues before deployment. Monitoring catches issues in production. Both are necessary.
Target: 99.9%+ for critical integrations
Alert on P95 > 2x normal
Alert on error rate > 1%
Should be near zero if properly managed
Alert if data > 1 hour old
from dataclasses import dataclass, field
from datetime import datetime
from collections import defaultdict
import statistics
@dataclass
class APIMetrics:
"""Collect metrics for API calls."""
service: str
requests: int = 0
errors: int = 0
rate_limits: int = 0
latencies: list = field(default_factory=list)
last_success: datetime = None
last_error: datetime = None
last_error_msg: str = None
def record_request(self, success: bool, latency_ms: float, error: str = None):
self.requests += 1
self.latencies.append(latency_ms)
# Keep only last 1000 latencies
if len(self.latencies) > 1000:
self.latencies = self.latencies[-1000:]
if success:
self.last_success = datetime.now()
else:
self.errors += 1
self.last_error = datetime.now()
self.last_error_msg = error
def record_rate_limit(self):
self.rate_limits += 1
@property
def error_rate(self) -> float:
if self.requests == 0:
return 0.0
return self.errors / self.requests
@property
def p50_latency(self) -> float:
if not self.latencies:
return 0.0
return statistics.median(self.latencies)
@property
def p95_latency(self) -> float:
if not self.latencies:
return 0.0
return statistics.quantiles(self.latencies, n=20)[18] # 95th percentile
def to_dict(self) -> dict:
return {
"service": self.service,
"requests": self.requests,
"errors": self.errors,
"error_rate": f"{self.error_rate:.2%}",
"rate_limits": self.rate_limits,
"p50_latency_ms": round(self.p50_latency, 2),
"p95_latency_ms": round(self.p95_latency, 2),
"last_success": self.last_success.isoformat() if self.last_success else None,
"last_error": self.last_error.isoformat() if self.last_error else None,
"last_error_msg": self.last_error_msg,
}
# Global metrics registry
_metrics: dict[str, APIMetrics] = {}
def get_metrics(service: str) -> APIMetrics:
if service not in _metrics:
_metrics[service] = APIMetrics(service=service)
return _metrics[service]
def get_all_metrics() -> dict:
return {name: m.to_dict() for name, m in _metrics.items()}
12. Security Considerations
API integrations are attack surfaces. Every credential, every endpoint, every piece of data flowing through your integrations needs protection. Here's what matters.
Secure Credential Storage
import os
from pathlib import Path
from dataclasses import dataclass
from typing import Optional
@dataclass
class SecureCredentials:
"""Manage API credentials securely."""
service: str
def __post_init__(self):
self.config_dir = Path.home() / ".config" / self.service
def get_api_key(self) -> str:
"""Load API key with fallback chain."""
# 1. Environment variable (highest priority, for CI/containers)
env_key = f"{self.service.upper()}_API_KEY"
if key := os.environ.get(env_key):
return key
# 2. File (for local development)
key_file = self.config_dir / "api_key"
if key_file.exists():
key_file.chmod(0o600) # Ensure secure permissions
return key_file.read_text().strip()
# 3. Secrets manager (for production)
if key := self._get_from_secrets_manager():
return key
raise ValueError(f"No API key found for {self.service}")
def _get_from_secrets_manager(self) -> Optional[str]:
"""Load from AWS Secrets Manager, HashiCorp Vault, etc."""
# Example with AWS Secrets Manager
try:
import boto3
client = boto3.client("secretsmanager")
response = client.get_secret_value(SecretId=f"api/{self.service}")
return response["SecretString"]
except:
return None
def save_api_key(self, key: str):
"""Securely save API key to file."""
self.config_dir.mkdir(parents=True, exist_ok=True)
key_file = self.config_dir / "api_key"
key_file.write_text(key)
key_file.chmod(0o600) # Owner read/write only
# Best practices for each service
CREDENTIAL_PATTERNS = {
"leaflink": "App {api_key}", # Custom header format
"etherscan": "apikey={api_key}", # Query parameter
"distru": "Bearer {api_key}", # Bearer token
"google": "Bearer {access_token}", # OAuth access token
"quickbooks": "Bearer {access_token}", # OAuth access token
"twilio": "{account_sid}:{auth_token}", # Basic auth
}
Webhook Security
import hmac
import hashlib
import time
from fastapi import FastAPI, Request, HTTPException, Header
from typing import Optional
app = FastAPI()
# Store webhook secrets securely
WEBHOOK_SECRETS = {
"leaflink": os.environ.get("LEAFLINK_WEBHOOK_SECRET"),
"stripe": os.environ.get("STRIPE_WEBHOOK_SECRET"),
}
def verify_signature(
payload: bytes,
signature: str,
secret: str,
algorithm: str = "sha256"
) -> bool:
"""Verify HMAC signature."""
if algorithm == "sha256":
expected = hmac.new(secret.encode(), payload, hashlib.sha256).hexdigest()
elif algorithm == "sha1":
expected = hmac.new(secret.encode(), payload, hashlib.sha1).hexdigest()
else:
raise ValueError(f"Unknown algorithm: {algorithm}")
return hmac.compare_digest(expected, signature)
def verify_timestamp(timestamp: int, tolerance_seconds: int = 300) -> bool:
"""Verify request timestamp is recent (prevent replay attacks)."""
now = int(time.time())
return abs(now - timestamp) <= tolerance_seconds
@app.post("/webhooks/{provider}")
async def handle_webhook(
provider: str,
request: Request,
x_signature: Optional[str] = Header(None, alias="X-Signature"),
x_timestamp: Optional[str] = Header(None, alias="X-Timestamp"),
):
"""Generic webhook handler with security checks."""
# Get provider secret
secret = WEBHOOK_SECRETS.get(provider)
if not secret:
raise HTTPException(status_code=404, detail="Unknown provider")
# Get raw body
body = await request.body()
# Verify signature
if not x_signature:
raise HTTPException(status_code=401, detail="Missing signature")
# Some providers include timestamp in signature
if x_timestamp:
try:
timestamp = int(x_timestamp)
if not verify_timestamp(timestamp):
raise HTTPException(status_code=401, detail="Stale timestamp")
# Include timestamp in signed payload
signed_payload = f"{timestamp}.{body.decode()}"
body_for_verify = signed_payload.encode()
except ValueError:
raise HTTPException(status_code=400, detail="Invalid timestamp")
else:
body_for_verify = body
if not verify_signature(body_for_verify, x_signature, secret):
raise HTTPException(status_code=401, detail="Invalid signature")
# Process webhook
data = await request.json()
await process_webhook(provider, data)
return {"status": "received"}
Data Protection
- Never log credentials: Scrub auth headers from logs
- Never log PII: Customer names, emails, addresses should not appear in logs
- Never log full requests: Request bodies may contain sensitive data
- Encrypt at rest: Cached API data should be encrypted
- Encrypt in transit: Always use HTTPS (verify SSL)
- Timeout sessions: Don't keep OAuth tokens longer than needed
- Audit access: Know who has access to credentials
import re
from typing import Any
# Patterns to redact
SENSITIVE_PATTERNS = [
(r"Authorization:\s*\S+", "Authorization: [REDACTED]"),
(r"api[_-]?key[\"']?\s*[:=]\s*[\"']?\S+", "api_key: [REDACTED]"),
(r"Bearer\s+\S+", "Bearer [REDACTED]"),
(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", "[EMAIL]"),
(r"\b\d{3}-\d{2}-\d{4}\b", "[SSN]"), # SSN pattern
(r"\b\d{16}\b", "[CARD]"), # Credit card
]
def redact_sensitive(text: str) -> str:
"""Redact sensitive information from text."""
for pattern, replacement in SENSITIVE_PATTERNS:
text = re.sub(pattern, replacement, text, flags=re.IGNORECASE)
return text
def safe_log_request(method: str, url: str, headers: dict, body: Any = None):
"""Log request with sensitive data redacted."""
# Redact headers
safe_headers = {}
for key, value in headers.items():
if key.lower() in ("authorization", "x-api-key", "api-key"):
safe_headers[key] = "[REDACTED]"
else:
safe_headers[key] = value
# Redact URL params
safe_url = redact_sensitive(url)
# Log safely
log_entry = {
"method": method,
"url": safe_url,
"headers": safe_headers,
}
if body:
# Don't log body for sensitive endpoints
if any(x in url.lower() for x in ["auth", "token", "password", "secret"]):
log_entry["body"] = "[REDACTED]"
else:
log_entry["body"] = redact_sensitive(str(body)[:500])
print(json.dumps(log_entry))
Conclusion: The Integration Mindset
API integration isn't a one-time project—it's an ongoing practice. The services you integrate with will change. Their APIs will evolve. New services will emerge that you need to connect. The patterns in this guide prepare you for that evolution.
The key insights:
- Think in systems: APIs are contracts between services. Understand the contract before writing code.
- Plan for failure: Networks fail. Services go down. Rate limits hit. Build resilience in from the start.
- Test what matters: You can't test everything against live APIs. Focus on your logic, mock the rest.
- Monitor in production: What you don't measure, you can't improve. Track availability, latency, and errors.
- Security is not optional: Every integration is an attack surface. Treat credentials like secrets. Validate all input.
The integrations we've covered—LeafLink for B2B wholesale, Distru for manufacturing, Google for productivity, Twilio for communication, Etherscan for blockchain, QuickBooks for finance—represent the real complexity of modern systems. Each has its quirks. Each taught us something.
Your integrations will teach you too. Document what you learn. Share the patterns that work. Build systems that survive contact with production.
That's API integration mastery.
Before deploying any new integration:
- ☐ Credentials stored securely (not in code)
- ☐ Rate limiting implemented
- ☐ Exponential backoff for retries
- ☐ Error handling for all failure modes
- ☐ Schema validation on responses
- ☐ Health check endpoint
- ☐ Metrics collection
- ☐ Graceful degradation (cache fallback)
- ☐ Webhook signature verification (if applicable)
- ☐ Tests with mocked responses
- ☐ Documented in runbook