I built an automated ETL (Extract, Transform, Load) system in Python that can pull data from any source, transform it using modular components, and export it into any format.
The ETL pipeline is fully modular:
- Extract data from any source:
- Databases
- APIs
- Local files
- Cloud storage
- Transform with custom pluggable modules like:
- Column renaming
- Data type casting
- Value substitutions
- Any other custom logic
- Filter consistently. Save filters in:
- Source control, with versioning (Github)
- In a database as JSON (e.g. Postgres)
- Load into any format:
- CSV
- Excel
- JSON
- Slice data in common reporting ranges:
- Daily
- Weekly, with variations, including but not limited to:
- Monday through Friday
- Sunday through Saturday
- Only business days
- Monthly
- Year to Date (YTD)
- Month to Date (MTD)
- Custom ranges, such as biweekly
- Send to any destination:
- SFTP
- Database
- Schedule flexibly on any platform, at any interval:
- Apache Airflow
- Cron
- Power Automate
- Task Scheduler
Simple example
./scheduled_etl.py
import pandas as pd
import json
def extract_from_csv(file_path: str) -> pd.DataFrame:
"""Extract data from a CSV file"""
return pd.read_csv(file_path)
def transform_rename_columns(df: pd.DataFrame, rename_map: dict) -> pd.DataFrame:
"""Rename columns based on a mapping dictionary"""
return df.rename(columns=rename_map)
def load_to_json(df: pd.DataFrame, output_path: str):
"""Load DataFrame to JSON file"""
df.to_json(output_path, orient='records', lines=True)
if __name__ == "__main__":
# Extract
data = extract_from_csv("data/source.csv")
# Transform
rename_mapping = {"old_col1": "new_col1", "old_col2": "new_col2"}
transformed = transform_rename_columns(data, rename_mapping)
# Load
load_to_json(transformed, "data/output.json")