A proxy-powered data pipeline collects information from the web at scale, transforms it into structured datasets, and delivers actionable insights. This guide walks through designing each stage — from collection through proxies to storage and delivery — with real implementation examples.
Pipeline Architecture
A production data pipeline has five stages:
- Source Definition: URLs, APIs, and data sources to collect from
- Collection: Fetching data through rotating proxies at scale
- Extraction: Parsing raw HTML/JSON into structured data
- Storage: Persisting data in databases with deduplication
- Delivery: APIs, dashboards, exports, or webhook notifications
Collection Layer
import asyncioimport aiohttpfrom dataclasses import dataclassfrom datetime import datetimeimport hashlib
@dataclassclass CollectionJob: url: str country: str = "us" max_retries: int = 3 render_js: bool = False
@dataclassclass CollectionResult: url: str html: str status_code: int proxy_ip: str collected_at: datetime content_hash: str
async def collect(job: CollectionJob) -> CollectionResult: """Fetch a URL through ZentisLabs proxy with retry logic.""" proxy = f"http://USER:PASS_country-{job.country}@gate.zentislabs.com:7777" for attempt in range(job.max_retries): try: async with aiohttp.ClientSession() as session: async with session.get( job.url, proxy=proxy, timeout=aiohttp.ClientTimeout(total=20), headers={"User-Agent": "Mozilla/5.0 Chrome/122.0.0.0"}, ) as resp: html = await resp.text() if resp.status == 200: return CollectionResult( url=job.url, html=html, status_code=resp.status, proxy_ip="rotated", collected_at=datetime.utcnow(), content_hash=hashlib.sha256(html.encode()).hexdigest(), ) except Exception as e: if attempt == job.max_retries - 1: raise await asyncio.sleep(2 ** attempt) # Exponential backoffExtraction Layer
from bs4 import BeautifulSoupimport jsonimport re
def extract_product(html: str) -> dict: """Extract structured product data from HTML.""" soup = BeautifulSoup(html, "html.parser") # Try JSON-LD first (most reliable) for script in soup.select('script[type="application/ld+json"]'): try: data = json.loads(script.string) if data.get("@type") == "Product": return { "name": data.get("name"), "price": data.get("offers", {}).get("price"), "currency": data.get("offers", {}).get("priceCurrency"), "availability": data.get("offers", {}).get("availability"), "brand": data.get("brand", {}).get("name"), "description": data.get("description"), "image": data.get("image"), "source": "json-ld", } except json.JSONDecodeError: continue # Fallback to HTML parsing return { "name": soup.select_one("h1").get_text(strip=True) if soup.select_one("h1") else None, "price": soup.select_one("[itemprop=price], .price").get_text(strip=True) if soup.select_one("[itemprop=price], .price") else None, "source": "html-parser", }Storage Layer
import sqlite3from datetime import datetime
def init_db(db_path="pipeline.db"): conn = sqlite3.connect(db_path) conn.execute(""" CREATE TABLE IF NOT EXISTS products ( id INTEGER PRIMARY KEY AUTOINCREMENT, url TEXT NOT NULL, name TEXT, price REAL, currency TEXT, content_hash TEXT, collected_at TEXT, UNIQUE(url, content_hash) ) """) conn.commit() return conn
def store_product(conn, url, product, content_hash): """Store product with deduplication by content hash.""" try: conn.execute( "INSERT OR IGNORE INTO products (url, name, price, currency, content_hash, collected_at) VALUES (?, ?, ?, ?, ?, ?)", (url, product.get("name"), product.get("price"), product.get("currency"), content_hash, datetime.utcnow().isoformat()) ) conn.commit() return True except sqlite3.IntegrityError: return False # DuplicateOrchestrating the Pipeline
async def run_pipeline(urls, country="us", max_concurrent=10): """Run the full collection -> extraction -> storage pipeline.""" conn = init_db() semaphore = asyncio.Semaphore(max_concurrent) stats = {"collected": 0, "extracted": 0, "stored": 0, "errors": 0} async def process(url): async with semaphore: try: result = await collect(CollectionJob(url=url, country=country)) stats["collected"] += 1 product = extract_product(result.html) stats["extracted"] += 1 if store_product(conn, url, product, result.content_hash): stats["stored"] += 1 except Exception as e: stats["errors"] += 1 print(f"Error processing {url}: {e}") await asyncio.gather(*[process(url) for url in urls]) conn.close() return stats
# Run iturls = [f"https://store.com/product/{i}" for i in range(1, 1001)]stats = asyncio.run(run_pipeline(urls))print(f"Collected: {stats['collected']}, Stored: {stats['stored']}, Errors: {stats['errors']}")Pipeline Monitoring
- Success rate: Track the percentage of successful collections vs failures
- New data rate: Monitor how many new (non-duplicate) records are stored per run
- Bandwidth usage: Use the ZentisLabs dashboard to track proxy bandwidth consumption
- Latency: Monitor average response times to detect when targets start throttling
- Schema changes: Alert when extraction returns null fields that should have data
📊 ZentisLabs non-expiring bandwidth is ideal for data pipelines — buy bandwidth once and use it whenever your pipeline runs. No monthly resets, no wasted credits.
