Crafting a Basic Data Pipeline with Airflow.

Crafting a Basic Data Pipeline with Airflow.

From setup to mastery: A Guide to Crafting Your Inaugural DAG


In a previous blog post, we explored the fundamental concepts of Apache Airflow—a versatile workflow management platform that empowers users to orchestrate complex data pipelines with ease. If you haven't had the chance to delve into the world of Airflow yet, I highly recommend checking out my earlier guide on setting up Airflow locally using Docker Compose.

Now that we've laid the foundation, let's take the next step in our Airflow journey. In this blog, we'll dive into the practical aspect of creating your first Directed Acyclic Graph (DAG). Think of DAGs as the blueprints for your workflows, guiding the execution of tasks and managing dependencies between them.

As we proceed, we'll build upon the groundwork established in the previous tutorial. Assuming you've successfully set up Airflow on your local machine, with the necessary directories such as dags, plugins, and config, it's time to breathe life into our workflows.

Our journey begins with understanding the key components of a DAG, initializing it, and defining tasks that make up the steps of our workflow. Let's explore each of these elements in detail, paving the way for you to harness the full power of Airflow in orchestrating your data pipelines.

A peep into DAG components

Before diving into the code, let's take a look at some of a DAG components.

  • DAG (Directed Acyclic Graph): Picture it as the roadmap for your workflow. It outlines a series of tasks and their relationships, ensuring a clear, organized flow of operations.

  • Tasks: These are the individual units of work within a DAG. Each task represents a specific operation or computation, forming the backbone of your workflow.

  • Operators: Operators define the logic of a task. Airflow offers a variety of built-in operators (such as PythonOperator and BashOperator) to execute different types of tasks.

  • Dependencies: The relationships between tasks. You can set tasks to depend on the completion of others, allowing for precise control over the execution sequence.

Setting the stage

We will proceed to implement a process that involves scraping data from a website and subsequently saving it to a CSV file.

Importing Necessary Libraries

  1. from airflow import DAG: This imports the Directed Acyclic Graph (DAG) class from the Apache Airflow library. DAG is the core component in Airflow that represents a workflow.

  2. from airflow.operators.python_operator import PythonOperator: This imports the PythonOperator class, which allows you to define a task in a DAG that executes a Python callable.

  3. from bs4 import BeautifulSoup: This imports the BeautifulSoup class from the bs4 (Beautiful Soup) library. Beautiful Soup is a Python library for pulling data out of HTML and XML files.

  4. import requests: This imports the requests module, which is a popular Python library for making HTTP requests. It is commonly used for fetching web pages and interacting with APIs.

  5. import pandas as pd: This imports the pandas library and assigns it the alias pd. Pandas is a powerful data manipulation and analysis library for Python.

  6. import os: This imports the os module, which provides a way of using operating system-dependent functionality like reading or writing to the file system.

  7. from datetime import datetime, timedelta: This imports the datetime and timedelta classes from the datetime module. These classes are used to work with dates and times. In Airflow, they are often used to define the start date, schedule interval, and other time-related parameters in a

from airflow.operators.python_operator import PythonOperator
from bs4 import BeautifulSoup
import requests
import pandas as pd
import os
from datetime import datetime, timedelta

Setting our default arguments

  1. 'owner': 'nduti': Specifies the owner of the DAG. The owner is typically the person or team responsible for the DAG.

  2. 'depends_on_past': False: Indicates whether the tasks in the DAG should depend on the success or failure of the previous run's instances. In this case, it's set to False, meaning the tasks won't depend on the past instances.

  3. 'start_date': datetime(2023, 11, 11): Defines the start date of the DAG. The DAG will not run any instances before this date. In this example, the start date is set to November 11, 2023.

  4. 'retries': 1: Specifies the number of times a task should be retried in case of a failure. If a task fails, it will be retried once.

  5. 'retry_delay': timedelta(minutes=1): Determines the delay between retries for a failed task. In this case, it's set to 1 minute.

default_args = {
    'owner': 'nduti',
    'depends_on_past': False,
    'start_date': datetime(2023, 11, 11),
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
}

Initializing the dag instance

  1. 'scrape_and_save_to_csv': The first argument is the dag_id, which is a unique identifier for the Directed Acyclic Graph (DAG). It's a user-defined string that should be unique among all DAGs.

  2. default_args=default_args: This sets the default configuration parameters for the DAG. It refers to the dictionary default_args that you defined earlier, providing default values for various parameters like owner, start date, retries, etc.

  3. description='Scraping_with_airflow': This provides a human-readable description of the DAG. It's optional but helps in documenting the purpose or workflow of the DAG.

  4. schedule_interval=timedelta(days=1): Specifies the interval at which the DAG should run. In this case, the DAG is scheduled to run every day (timedelta(days=1)). This means that a new DAG run will be triggered once a day.

dag = DAG(
    'scrape_n_save_to_csv',
    default_args=default_args,
    description='Scraping_with_airflow',
    schedule_interval=timedelta(days=1),
)

Creating Python functions to be invoked during task definition. One will be responsible for web scraping, and the other will handle saving the data to a CSV file.

def scrape_quotes():
    url = 'http://quotes.toscrape.com'
    response = requests.get(url)

    if response.status_code == 200:
        soup = BeautifulSoup(response.text, 'html.parser')
        quotes = [quote.text.strip() for quote in soup.select('span.text')]
        return quotes
    else:
        print(f"Failed to fetch quotes. Status code: {response.status_code}")
        return []

def save_to_csv(quotes, **kwargs):
    if not quotes:
        print("No quotes to save.")
        return

    # Get the directory of the current DAG file
    current_dag_directory = os.path.dirname(os.path.abspath(__file__))

    # Specify the directory where you want to save the CSV file
    output_directory = os.path.join(current_dag_directory, 'output')

    # Create the output directory if it doesn't exist
    os.makedirs(output_directory, exist_ok=True)

    # Create a Pandas DataFrame
    df = pd.DataFrame({'Quote': quotes})

    # Save to CSV file in the specified output directory
    csv_path = os.path.join(output_directory, 'quotes.csv')
    df.to_csv(csv_path, index=False)

    print(f"Quotes saved to {csv_path}.")

Task definition using a Python Operator

task_scrape_quotes Configuration:

  • task_id='scrape_quotes': Assigns a unique identifier to the task.

  • python_callable=scrape_quotes: Specifies the Python callable (function) that will be executed when the task runs. In this case, it's the scrape_quotes function.

  • dag=dag: Associates the task with the DAG (dag) to which it belongs.

task_scrape_quotes = PythonOperator(
    task_id='scrape_quotes',
    python_callable=scrape_quotes,
    dag=dag,
)

task_save_to_csv Configuration:

  • task_id='save_to_csv': Assigns a unique identifier to the task.

  • python_callable=save_to_csv: Specifies the Python callable (function) that will be executed when the task runs. In this case, it's the save_to_csv function.

  • op_args=[task_scrape_quotes.output]: Specifies the positional arguments to be passed to the save_to_csv function. It attempts to pass the output of the task_scrape_quotes task as an argument. However, note that PythonOperator doesn't inherently support passing outputs between tasks.

  • provide_context=True: Enables the passing of additional context (like task instance details) to the callable. It's set to True to enable context passing.

  • dag=dag: Associates the task with the DAG (dag) to which it belongs.

task_save_to_csv = PythonOperator(
    task_id='save_to_csv',
    python_callable=save_to_csv,
    op_args=[task_scrape_quotes.output],  # Pass the output of the first task as an argument
    provide_context=True,  # This allows passing parameters between tasks
    dag=dag,
)

Creating tasks dependency

Defines that task_save_to_csv depends on the successful completion of task_scrape_quotes

task_scrape_quotes >> task_save_to_csv

Ensure that you save your DAG file in your integrated development environment (IDE).

Navigate to the Airflow user interface by entering localhost:8080 (or the specific port you used in your Docker Compose file) in your web browser.

Upon accessing the Airflow web interface, follow these steps:

  • Search for your DAG with the name you assigned to it.

  • Manually trigger the DAG to run for testing purposes.

  • Observe the DAG's progress as it runs; the colors of the bars indicate its current state.

  • Utilize the Graph view for insights into each task's status and to visualize task dependencies

In the Graph view, the successful completion of both tasks will be visually represented as below:

Feel free to inspect the logs of individual tasks within the Airflow UI.

A quick check in your IDE should confirm the creation of a new 'output' folder, containing a CSV file named 'quotes.csv' that holds the scraped data.

Voila! We've successfully orchestrated a simple pipeline. However, it doesn't end here. Keep exploring the vast capabilities of Airflow, experiment with different operators to enhance your ability.

Happy orchestrating!!!