Automated ETL Pipeline
What you'll learn
~40 min- Build a Python CLI that extracts data from multiple source formats
- Apply SQL transformations including JOINs, aggregations, and deduplication
- Load cleaned data into SQLite with schema validation
- Generate a data quality report flagging nulls, duplicates, and type mismatches
What you’re building
In Lesson 4, you built a report generator that turns a single CSV into a polished executive report. But where does that CSV come from? In any real organization, data lives in multiple places: a CSV export from the accounting system, a JSON feed from the CRM, a spreadsheet from the regional office that uses completely different column names. Before you can analyze anything, you need to merge, clean, and standardize that data. That process has a name: ETL — Extract, Transform, Load.
ETL is core MIS curriculum. Every data warehousing course, every BI capstone, every enterprise systems class circles back to the same problem: how do you get data from Point A, clean it up, and put it in Point B so people can actually use it? Tools like Informatica, SSIS, and dbt exist because this problem is that common.
In this lesson you will build a Python CLI tool that extracts data from multiple source formats (CSV, JSON, and a stubbed REST API), applies SQL transformations (JOINs, aggregations, deduplication), loads the result into a SQLite database, and generates an HTML data quality report. One command, multiple sources in, one clean database out.
SQLite is a file-based database that requires zero configuration, zero server setup, and zero network access. You write standard SQL, and the database is a single .db file you can copy, email, or commit to Git. For learning ETL, it removes every obstacle between you and the SQL. In production, the same SQL patterns transfer directly to PostgreSQL, MySQL, or SQL Server — the only differences are dialect-specific syntax.
This lesson builds on Lesson 4’s Python CLI pattern. If you completed Lesson 4, you already have Python 3 and pip installed. If not, verify with python3 --version and install from python.org if needed.
The showcase
When finished, your CLI tool will:
- Accept multiple input files (CSV, JSON) and an optional REST API endpoint stub via command-line arguments.
- Read a YAML configuration file that defines column mappings, type coercions, join keys, and aggregation rules.
- Extract data from each source into a normalized in-memory representation.
- Transform using SQL logic executed in SQLite:
- JOINs across sources (e.g., orders + customers + products).
- GROUP BY aggregations (revenue by region, order counts by customer).
- Deduplication using DISTINCT and window functions (ROW_NUMBER to keep the most recent record per key).
- Column renaming, type coercion, and null handling.
- Load the transformed data into a SQLite database with schema validation, upsert support (INSERT OR REPLACE), and foreign key enforcement.
- Generate an HTML data quality report showing:
- Null counts per column (with percentage).
- Duplicate row counts (exact duplicates and near-duplicates by key columns).
- Type mismatch counts (values that do not match the expected column type).
- Row counts per source file.
- A summary pass/fail status for each quality check.
Think of it as the plumbing that connects raw data sources to the dashboards and reports you built in earlier lessons.
In data warehousing courses, you learn the difference between operational databases (where transactions happen) and analytical databases (where reporting happens). ETL is the bridge between them. Every night, ETL pipelines in companies like Amazon, Walmart, and your bank extract transaction data from dozens of operational systems, transform it into a consistent schema, and load it into a data warehouse where analysts run queries. Building one yourself — even at a small scale — gives you concrete understanding of what tools like Informatica, Talend, and dbt automate at enterprise scale.
The prompt
Open your AI CLI tool (such as Claude Code, Gemini CLI, or your preferred tool) in an empty directory and paste this prompt:
Create a Python CLI tool called etl-pipeline that extracts data from multiplesource formats, transforms it using SQL logic, and loads it into a SQLitedatabase. Use Python 3.10+ with these packages: pandas, pyyaml, requests,jinja2. Structure the project properly.
PROJECT STRUCTURE:etl_pipeline/├── etl_pipeline/│ ├── __init__.py│ ├── __main__.py # enables python -m etl_pipeline│ ├── cli.py # argparse CLI entry point│ ├── extractor.py # reads CSV, JSON, and API sources│ ├── transformer.py # SQL transformations in SQLite│ ├── loader.py # loads data into target SQLite database│ ├── quality_report.py # generates HTML data quality report│ ├── config.py # YAML config loader and validator│ └── templates/│ └── report.html # Jinja2 template for data quality report├── config.yaml # default pipeline configuration├── sample_data/│ ├── orders.csv # e-commerce orders (100 rows)│ ├── customers.json # customer records (30 records)│ └── products.csv # product catalog (20 rows)├── requirements.txt├── setup.py└── README.md
CLI INTERFACE: python -m etl_pipeline --config config.yaml --output warehouse.db python -m etl_pipeline --config config.yaml --output warehouse.db --report quality_report.html python -m etl_pipeline --config config.yaml --output warehouse.db --dry-run python -m etl_pipeline --config config.yaml --output warehouse.db --verbose
OPTIONS: --config, -c Path to YAML config file (required) --output, -o Path to output SQLite database (required) --report, -r Path to output HTML data quality report (optional) --dry-run Validate config and sources without writing to the database --verbose Enable debug logging with row counts at each stage --force Overwrite existing database without confirmation
SAMPLE DATA:
1. orders.csv (100 rows): Generate a realistic e-commerce orders dataset with columns: - order_id (integer, unique, 1001-1100) - customer_id (integer, references customers, 1-30) - product_id (integer, references products, 1-20) - order_date (date, between 2024-01-01 and 2024-12-31) - quantity (integer, 1-10) - unit_price (decimal, should match the product's price) - discount_pct (decimal, 0-25, mostly 0) - status (string: "completed", "shipped", "pending", "returned") Include realistic patterns: some customers order frequently, some products are more popular, Q4 has more orders, some duplicate order_ids to test deduplication (5 exact duplicates).
2. customers.json (30 records): Generate as a JSON array of objects with: - id (integer, 1-30) - first_name, last_name (realistic names) - email (realistic email addresses) - region (string: "North", "South", "East", "West") - signup_date (date, between 2022-01-01 and 2024-06-30) - tier (string: "bronze", "silver", "gold", "platinum") Include some records with missing email (null) and inconsistent region capitalization ("north" vs "North") to test data cleaning.
3. products.csv (20 rows): - product_id (integer, 1-20) - product_name (realistic product names: electronics, office supplies, etc.) - category (string: "Electronics", "Furniture", "Office Supplies", "Software") - base_price (decimal, 9.99-999.99) - supplier (string, 5 different supplier names) - in_stock (boolean, 1 or 0)
EXTRACTOR (extractor.py): Extract data from each source type into pandas DataFrames:
1. CSV extraction: - Read CSV with configurable encoding (default UTF-8), delimiter, and quoting rules - Auto-detect date columns and parse them - Handle common null representations: "", "N/A", "null", "NULL", "-" - Return DataFrame with source metadata (filename, row count, columns)
2. JSON extraction: - Read JSON arrays or JSON objects with a configurable root key - Flatten nested objects one level deep (e.g., address.city -> address_city) - Handle both .json files and .jsonl (line-delimited) formats - Return DataFrame with source metadata
3. API extraction (stub): - Accept a URL endpoint and optional headers/params from config - If the URL is a file:// path, read the local file instead (for testing) - Implement retry logic with 3 attempts and exponential backoff - Return DataFrame with source metadata
Each extractor logs: source path, row count, column count, detected types.
TRANSFORMER (transformer.py): Load all extracted DataFrames into a temporary SQLite database and execute SQL transformations defined in the config:
1. STAGING: Load each DataFrame into a staging table named after the source (e.g., stg_orders, stg_customers, stg_products).
2. CLEANING: - Standardize text columns: strip whitespace, consistent capitalization per config (upper, lower, title) - Coerce types: convert string dates to DATE, string numbers to NUMERIC - Replace configured null synonyms with actual NULL - Log cleaning actions: "Standardized 5 region values to title case"
3. DEDUPLICATION: - For each table, apply dedup rules from config - Support DISTINCT (exact row dedup) and window function dedup: ROW_NUMBER() OVER (PARTITION BY key_cols ORDER BY sort_col DESC) keeping only row_number = 1 - Log: "Removed 5 duplicate rows from stg_orders by order_id"
4. JOINS: - Execute JOIN queries defined in config to create merged tables - Example: JOIN stg_orders with stg_customers ON customer_id, JOIN result with stg_products ON product_id - Support INNER JOIN, LEFT JOIN - Create a merged table: order_details with all columns from all 3 sources
5. AGGREGATIONS: - Execute GROUP BY queries defined in config - Example: revenue_by_region = SELECT region, SUM(quantity * unit_price) as total_revenue, COUNT(*) as order_count FROM order_details GROUP BY region - Example: customer_summary = SELECT customer_id, first_name, last_name, COUNT(*) as total_orders, SUM(quantity * unit_price) as lifetime_value FROM order_details GROUP BY customer_id, first_name, last_name
Each transformation step logs its SQL query and resulting row count.
LOADER (loader.py): Load transformed tables into the target SQLite database:
1. Schema validation: - Before loading, verify that the transformed data matches expected column names and types defined in config - Log mismatches as warnings, fail on critical mismatches (missing required columns)
2. Table creation: - CREATE TABLE IF NOT EXISTS with proper column types and constraints - Add PRIMARY KEY, NOT NULL, and FOREIGN KEY constraints from config - Create indexes on foreign key columns and configured index columns
3. Data loading: - INSERT OR REPLACE (upsert) for idempotent loading - Batch inserts (500 rows per batch) with transaction wrapping - Enable PRAGMA foreign_keys = ON for FK enforcement
4. Post-load verification: - SELECT COUNT(*) from each loaded table - Verify foreign key integrity: no orphaned references - Log final row counts and any integrity issues
QUALITY REPORT (quality_report.py): Generate an HTML data quality report using the Jinja2 template:
1. SOURCE SUMMARY: - Table showing each source file: name, format, row count, column count - Total rows extracted vs total rows loaded (with loss percentage)
2. NULL ANALYSIS: - Per-column null counts and percentages for each table - Color-coded: green (0%), yellow (1-10%), red (>10%) - Highlight columns where nulls exceed a configurable threshold
3. DUPLICATE ANALYSIS: - Exact duplicate counts per table (before and after dedup) - Near-duplicate counts by configured key columns - Show sample duplicate rows for manual review
4. TYPE MISMATCH ANALYSIS: - Columns where values do not match expected types - Show sample mismatched values (first 5) - Suggest type coercions for common mismatches
5. REFERENTIAL INTEGRITY: - Foreign key violations: child rows with no matching parent - Show counts and sample orphaned records
6. PIPELINE SUMMARY: - Total execution time - Steps completed with pass/warn/fail status - Overall pipeline health: PASS (all green), WARNING (some yellow), FAIL (any red)
STYLING: - Professional report matching the corporate theme from Lesson 4: navy headers, white background, gray accents, professional fonts - Tables with alternating row colors and formatted numbers - Status badges: green PASS, yellow WARNING, red FAIL - Print-friendly CSS
CONFIG (config.yaml): pipeline: name: "E-Commerce Data Warehouse" description: "Merge orders, customers, and products into a unified warehouse"
sources: orders: type: csv path: "sample_data/orders.csv" encoding: "utf-8" delimiter: "," customers: type: json path: "sample_data/customers.json" root_key: null # top-level array products: type: csv path: "sample_data/products.csv" encoding: "utf-8"
cleaning: null_synonyms: ["N/A", "n/a", "NA", "", "null", "NULL", "-"] text_standardization: - column: "region" case: "title" # North, South, East, West - column: "status" case: "lower" - column: "tier" case: "lower"
deduplication: orders: method: "window" partition_by: ["order_id"] order_by: "order_date" order_direction: "desc" customers: method: "distinct" products: method: "distinct"
transforms: - name: "order_details" type: "join" sql: | SELECT o.order_id, o.order_date, o.quantity, o.unit_price, o.discount_pct, o.status, c.first_name, c.last_name, c.email, c.region, c.tier, p.product_name, p.category, p.base_price, p.supplier, (o.quantity * o.unit_price * (1 - o.discount_pct / 100.0)) as line_total FROM stg_orders o LEFT JOIN stg_customers c ON o.customer_id = c.id LEFT JOIN stg_products p ON o.product_id = p.product_id
- name: "revenue_by_region" type: "aggregation" sql: | SELECT region, COUNT(*) as order_count, SUM(line_total) as total_revenue, AVG(line_total) as avg_order_value, COUNT(DISTINCT order_id) as unique_orders FROM order_details GROUP BY region ORDER BY total_revenue DESC
- name: "customer_summary" type: "aggregation" sql: | SELECT first_name || ' ' || last_name as customer_name, email, region, tier, COUNT(*) as total_orders, SUM(line_total) as lifetime_value, AVG(line_total) as avg_order_value, MIN(order_date) as first_order, MAX(order_date) as last_order FROM order_details GROUP BY first_name, last_name, email, region, tier ORDER BY lifetime_value DESC
- name: "category_performance" type: "aggregation" sql: | SELECT category, COUNT(*) as total_orders, SUM(quantity) as total_units, SUM(line_total) as total_revenue, AVG(line_total) as avg_order_value FROM order_details GROUP BY category ORDER BY total_revenue DESC
load: tables: - name: "order_details" primary_key: "order_id" indexes: ["region", "category", "order_date"] - name: "revenue_by_region" primary_key: "region" - name: "customer_summary" primary_key: "customer_name" indexes: ["region", "tier"] - name: "category_performance" primary_key: "category"
quality: null_threshold: 10 # percentage — flag columns above this duplicate_check_keys: orders: ["order_id"] customers: ["email"] products: ["product_id"]
Generate all files with complete, working implementations. Include the sampledata with realistic values and deliberate quality issues (duplicates, nulls,inconsistent casing). The tool should run end-to-end on the first try.Notice how much detail is in the YAML configuration section of the prompt. In ETL work, the pipeline configuration is the specification. The code is generic plumbing; the config defines what happens. This separation means you can add new data sources, new transformations, and new quality checks by editing YAML — not by writing new code. That principle scales from this learning project to enterprise ETL platforms.
What you get
After the LLM generates the project, set it up:
cd etl-pipelinepython -m venv .venvsource .venv/bin/activate # On Windows: .venv\Scripts\activatepip install -e .Then run the pipeline:
python -m etl_pipeline --config config.yaml --output warehouse.db --report quality_report.html --verboseExpected output
[EXTRACT] orders.csv: 105 rows, 8 columns (CSV)[EXTRACT] customers.json: 30 rows, 7 columns (JSON)[EXTRACT] products.csv: 20 rows, 6 columns (CSV)[CLEAN] Standardized 4 region values to title case in stg_customers[CLEAN] Standardized 2 status values to lower case in stg_orders[CLEAN] Replaced 3 null synonyms with NULL in stg_customers.email[DEDUP] Removed 5 duplicate rows from stg_orders (window: order_id, order_date desc)[DEDUP] 0 duplicates found in stg_customers (distinct)[DEDUP] 0 duplicates found in stg_products (distinct)[TRANSFORM] order_details: 100 rows (JOIN orders + customers + products)[TRANSFORM] revenue_by_region: 4 rows (GROUP BY region)[TRANSFORM] customer_summary: 30 rows (GROUP BY customer)[TRANSFORM] category_performance: 4 rows (GROUP BY category)[LOAD] order_details: 100 rows loaded (upsert)[LOAD] revenue_by_region: 4 rows loaded (upsert)[LOAD] customer_summary: 30 rows loaded (upsert)[LOAD] category_performance: 4 rows loaded (upsert)[VERIFY] Foreign key integrity: PASS[REPORT] Data quality report written to quality_report.html[DONE] Pipeline completed in 1.2s — 4 tables, 138 rows loadedVerify the database
Open the database with the SQLite command-line tool (usually pre-installed on macOS and Linux, or install it from sqlite.org):
sqlite3 warehouse.db.tables-- order_details revenue_by_region customer_summary category_performance
SELECT * FROM revenue_by_region;-- North|28|12450.50|444.66|25-- South|24|10230.75|426.28|22-- East|26|11890.20|457.31|24-- West|22|9870.40|448.65|20
SELECT customer_name, total_orders, lifetime_valueFROM customer_summary ORDER BY lifetime_value DESC LIMIT 5;-- (Top 5 customers by lifetime value)
.quitThen open quality_report.html in your browser. You should see:
- Source summary showing 3 files extracted, row counts, and formats.
- Null analysis with green/yellow/red indicators — the email column in customers should show yellow (a few nulls).
- Duplicate analysis showing 5 duplicates removed from orders.
- Pipeline summary with overall PASS or WARNING status.
The most common issue with a freshly generated ETL pipeline is incorrect JOINs. After running, verify that order_details has the expected row count (100 after deduplication, not 105). If the count is higher, the JOIN is producing a cartesian product — a sign that the join keys are wrong or that deduplication ran after the JOIN instead of before. Run SELECT COUNT(*) FROM order_details; to verify.
When things go wrong
ETL pipelines introduce a specific category of issues: encoding problems, schema mismatches, and data integrity failures. Here is how to diagnose the most common problems.
When Things Go Wrong
Use the Symptom → Evidence → Request pattern: describe what you see, paste the error, then ask for a fix.
How it works
The ETL pipeline follows a strict four-stage architecture:
-
CLI (
cli.py) parses arguments with argparse, loads and validates the YAML config, and orchestrates the pipeline stages in order: extract, transform, load, report. Each stage receives the config and passes its output to the next stage. -
Extractor (
extractor.py) has one function per source type.extract_csv()usespd.read_csv()with configurable encoding and null handling.extract_json()usespd.read_json()orjson.load()with optional flattening.extract_api()usesrequests.get()with retry logic. Each function returns a DataFrame and metadata dict. -
Transformer (
transformer.py) creates an in-memory SQLite database, loads all extracted DataFrames into staging tables usingdf.to_sql(), then executes the SQL transformations defined in the config. This is where the real work happens: JOINs combine data from multiple sources, GROUP BY aggregations summarize it, and window functions deduplicate it. The transformer returns a dict of DataFrames (one per output table). -
Loader (
loader.py) creates the target SQLite database, defines the schema (CREATE TABLE with constraints), and inserts the transformed data in batches. Upsert logic (INSERT OR REPLACE) makes the pipeline idempotent — you can run it twice and get the same result. -
Quality Report (
quality_report.py) analyzes the extracted and loaded data for quality issues, then renders the Jinja2 HTML template with the results. This is the audit trail that tells you whether the pipeline worked correctly.
🔍ETL vs. ELT: The modern data warehouse pattern
The pipeline you built follows the classic ETL pattern: Extract data, Transform it in a staging area, then Load the clean result into the target database. This pattern is decades old and works well when transformation logic is complex and you want to validate data before it enters the warehouse.
Modern cloud data warehouses (Snowflake, BigQuery, Databricks) have flipped this to ELT: Extract data, Load it raw into the warehouse, then Transform it using SQL inside the warehouse. Why? Cloud warehouses have massive parallel processing power, so running transformations inside the warehouse is faster than doing it in an external tool.
The key difference:
- ETL: Transform happens outside the database (in Python, Informatica, SSIS). The database receives clean data.
- ELT: Transform happens inside the database (using dbt, SQL views, stored procedures). The database receives raw data and cleans it itself.
Tools for each approach:
- ETL: Informatica, SSIS, Talend, Apache NiFi, custom Python scripts (what you built)
- ELT: dbt (data build tool), Snowflake Tasks, BigQuery scheduled queries, Databricks notebooks
Your pipeline is actually a hybrid: it uses SQLite as the transformation engine (SQL transformations inside a database) but loads the transformed results into a separate target database. This is close to how dbt works — dbt writes SQL transformations that run inside the warehouse, and the results become new tables.
Why this matters for MIS careers: If you interview at a company using Snowflake or BigQuery, they will expect you to know the ELT pattern. If you interview at a company using SQL Server or Oracle, they will expect ETL. Knowing both patterns — and when each is appropriate — makes you versatile.
🔍Connection to enterprise tools
The pipeline you built is a miniature version of what enterprise ETL tools do at scale:
- Apache Airflow: An open-source orchestrator that schedules and monitors ETL pipelines. Your
cli.pyis a single-run version of what Airflow’s DAGs (Directed Acyclic Graphs) do across hundreds of tasks. - dbt (data build tool): Transforms data using SQL inside a data warehouse. Your
transformer.pySQL queries are essentially dbt models. - Informatica / SSIS / Talend: Visual ETL tools where you drag-and-drop extraction, transformation, and loading steps. Your YAML config is the text-based equivalent of their visual pipeline definitions.
- Apache Spark: Distributed data processing for datasets too large for a single machine. Your pandas DataFrames are the single-machine equivalent of Spark DataFrames.
The concepts are identical across all of these: extract from sources, stage the raw data, apply transformations, load into a target, validate quality. The tools differ in scale, interface, and ecosystem — but if you understand the pipeline you just built, you can learn any of them quickly.
Customize it
Add incremental loading
Add incremental (delta) loading support. Instead of reloading the entire datasetevery run, track a high-water mark (the maximum value of a configured column,like order_date or order_id) in a metadata table in the target database. On thenext run, only extract rows where the tracked column is greater than the storedhigh-water mark. Update the high-water mark after a successful load. This turnsthe pipeline from a full refresh to an append-only incremental load. Add a--full-refresh flag that ignores the high-water mark and reloads everything.Add data lineage tracking
Add data lineage metadata to every loaded row. For each row in the targettables, add columns: _source_file (which file the row came from),_extracted_at (timestamp of extraction), _pipeline_run_id (UUID for this run),and _transform_steps (comma-separated list of transforms applied). Store apipeline_runs metadata table with run_id, start_time, end_time, status,config_hash, and row counts. This creates an audit trail showing exactly whereevery row came from and when.Add schema migration support
Add automatic schema migration. When the config defines a new column that doesnot exist in the target database, automatically run ALTER TABLE ADD COLUMNinstead of failing. When a column type changes, log a warning and create amigration script (saved as a .sql file) that the user can review and runmanually. Store a schema_versions table that tracks all migrations withtimestamps.Add PostgreSQL adapter
Add a --target flag that accepts "sqlite" or "postgresql". When PostgreSQL isselected, use psycopg2 to connect to a PostgreSQL database specified by aconnection string in the config. Adjust SQL syntax: use ON CONFLICT for upsertinstead of INSERT OR REPLACE, use PostgreSQL data types (SERIAL, TIMESTAMPTZ),and use the COPY command for bulk loading instead of batch inserts. Keep SQLiteas the default for local development.If you enjoy building ETL pipelines more than building dashboards, you are discovering the data engineering career path. Data engineers build and maintain the pipelines that feed data to analysts and scientists. The role is in high demand, well-compensated, and directly aligned with MIS skills: SQL, database design, systems thinking, and process automation. The tools are different (Airflow, Spark, dbt, cloud platforms), but the fundamentals are exactly what you practiced in this lesson.
Try it yourself
- Generate the ETL pipeline with the prompt above.
- Run it on the included sample data and verify the output database.
- Open the data quality report. Are there any warnings? Investigate each one by querying the SQLite database directly.
- Open
warehouse.dbin SQLite and run some queries:- Which region has the highest revenue? (
SELECT * FROM revenue_by_region ORDER BY total_revenue DESC LIMIT 1;) - Who are the top 5 customers by lifetime value? (
SELECT * FROM customer_summary ORDER BY lifetime_value DESC LIMIT 5;) - Which product category has the most orders? (
SELECT * FROM category_performance ORDER BY total_orders DESC LIMIT 1;)
- Which region has the highest revenue? (
- Edit the sample data to introduce a new quality issue: add a row to
orders.csvwith acustomer_idthat does not exist incustomers.json. Re-run the pipeline. Does the quality report catch it? - Add a new data source: create a
returns.csvfile with columnsorder_id,return_date,reason,refund_amount. Editconfig.yamlto extract it, join it with order_details, and create areturn_analysisaggregation table. - Try the
--dry-runflag. It should validate everything without writing to the database.
Key Takeaways
- ETL is the most common data workflow in business. Before anyone can build a dashboard, write a report, or train a model, someone has to extract data from sources, clean it up, and put it somewhere useful. That someone is often an MIS professional.
- SQL is the transformation language. The JOINs, GROUP BYs, and window functions you used in the transformer are the same SQL you learned in database courses. ETL is where that SQL knowledge becomes directly productive.
- Configuration separates pipeline logic from pipeline definition. The YAML config defines what data to extract, how to transform it, and where to load it. The Python code is generic. This separation means non-developers can modify the pipeline by editing YAML, which is a key design principle in enterprise tools.
- Data quality is not optional. The quality report is not a nice-to-have — it is the proof that your pipeline worked. In production, pipelines that load bad data cause more damage than pipelines that fail loudly. Always validate.
- Idempotency makes pipelines safe. Using INSERT OR REPLACE means you can run the pipeline multiple times without duplicating data. This is critical for production pipelines that run on a schedule — if a run fails halfway, you re-run it from the start and get the correct result.
Your ETL pipeline joins orders with customers and produces 150 rows in the order_details table. But the orders table only has 100 rows after deduplication. What is the most likely cause?
What’s next
In the final lesson of this module, you will build a Scheduled Business Operations Orchestrator that chains this ETL pipeline with the report generator from Lesson 4 and adds automated scheduling, error handling, and notification. It is the capstone that connects every tool you have built into a single automated workflow.