- Notifications
You must be signed in to change notification settings - Fork0
This project focuses on developing a robust data ingestion pipeline from a MySQL database to Snowflake utilizing Snowflake's Internal Stage and orchestrated with Airflow. The pipeline is designed to perform incremental data loads, enhancing efficiency and reducing data transfer volumes.
melwinmpk/Snowflake_Internal_Stage_DataPipeline
Folders and files
Name | Name | Last commit message | Last commit date | |
---|---|---|---|---|
Repository files navigation
This project, titled "Snowflake Internal Stage Data Pipeline," focuses on developing a robust data ingestion pipeline from a MySQL database to Snowflake utilizing Snowflake's Internal Stage and orchestrated with Airflow. The pipeline is designed to perform incremental data loads, enhancing efficiency and reducing data transfer volumes. A metadata-driven approach is employed to streamline and automate the data ingestion process.
- Incremental Data Loading: Efficiently transfers only new or updated records from MySQL to Snowflake, minimizing data transfer and processing time.
- Metadata-Driven Pipeline: Leverages metadata to dynamically configure and execute data ingestion tasks, reducing the need for hard-coded configurations.
- Airflow Orchestration: Utilizes Apache Airflow to manage workflow orchestration, scheduling, and monitoring, ensuring reliable execution of data loading processes.
- Snowflake Integration: Employs Snowflake's Internal Stage for secure and scalable data staging before ingestion into the target tables.
- MySQL: Source database for extracting data.
- Snowflake: Target cloud data warehouse for analytics.
- Apache Airflow: Workflow orchestration tool to manage the data pipeline.
- Python: The primary programming language for scripting and automation.

There are 2 tables which are getting ingested to Snowflake.
Dedicated Dags are developed for each Table
- amazone_books
- amazonebook_reviews
CREATE TABLE amazone_books (book_id INT NOT NULL AUTO_INCREMENT,book_title TEXT,book_amount FLOAT,book_author TEXT,book_rating FLOAT,book_link TEXT,business_date DATE DEFAULT(CURRENT_DATE),PRIMARY KEY (book_id));CREATE TABLE amazonebook_reviews (book_id INT NOT NULL,reviewer_name TEXT,rating FLOAT,review_title TEXT,review_content TEXT,reviewed_on DATE,business_date DATE DEFAULT(CURRENT_DATE));
For the Incremental load. Primary Keys are required in the Tables. Respective Primary key for the Table are
- amazone_books
- book_id
- amazonebook_reviews
- book_id
- reviewer_name
- business_date
Note: This Source Data is from another Project. To know more about how source data is generated please referAmazonBooks_DataPipeline
The Airflow Dag Ids for respective Tables are
- amazone_books
- Snowflake_InternalStage_amazone_books_Dag
- amazonebook_reviews
- Snowflake_InternalStage_amazonebook_review_Dag
- Workflow Design: A DAG (Directed Acyclic Graph) in Airflow defines the sequence of tasks for data extraction, staging, and loading.
- Incremental Loading: The pipeline identifies new or updated records in MySQL using timestamps or sequence IDs.
- Metadata Management: Metadata definitions (i.e Config) guide the extraction and loading processes, allowing for flexibility and scalability.
- Monitoring and Logging: Airflow provides comprehensive monitoring and logging capabilities, facilitating troubleshooting and performance optimization.
- Data Quality Checks: Implement additional data validation and quality checks within the pipeline.
- Advanced Scheduling: Enhance scheduling capabilities to support more complex dependencies and triggers.











- Incremental Load:Medium
- Internal stage:Snowflake Docs
- Power of Metadata driven ETL Frameworks:Linkedin
About
This project focuses on developing a robust data ingestion pipeline from a MySQL database to Snowflake utilizing Snowflake's Internal Stage and orchestrated with Airflow. The pipeline is designed to perform incremental data loads, enhancing efficiency and reducing data transfer volumes.
Topics
Resources
Uh oh!
There was an error while loading.Please reload this page.