Popular searches
//

Using Dagster with DuckDB

16.5.2025 | 4 minutes reading time

DuckDB has rapidly emerged as a popular in-process analytics database. Dagster, on the other hand, is a modern data orchestration framework that makes it easy to build and manage data pipelines. Combining Dagster with DuckDB allows data engineers to build pipelines where data is stored in DuckDB tables or queried via DuckDB’s SQL engine – all within a Dagster workflow. This guide will walk you through setting up a new Dagster project from scratch with DuckDB integration. It will demonstrate two practical Dagster assets: one that inserts a Pandas DataFrame into a DuckDB database, and another that reads data from files (CSV) using DuckDB’s SQL capabilities. The goal is to provide a clear, working example that you can use as a foundation for your own projects.

Basic Setup and Dependencies

Create a new Dagster project

I am using poetry to manage my dependencies. Install these dependencies to be able to follow the next steps: poetry add dagster dagster-duckdb dagster-webserver pandas This leaves us with the project structure below after setting up the dagster project with dagster project scaffold --name dagster_duckdb_demo. I added a folder called data. It will become necessary in the next section.

1.
2└── dagster_duckdb_demo
3    ├── dagster_duckdb_demo
4    │   ├── __init__.py
5    │   ├── assets.py
6    │   └── definitions.py
7    ├── dagster_duckdb_demo_tests
8    │   ├── __init__.py
9    │   └── test_assets.py
10   ├── data
11   ├── poetry.lock
12   ├── pyproject.toml
13   ├── README.md
14   ├── setup.cfg
15   └── setup.py

You should also have DuckDB installed on your machine in order to inspect the database with the CLI.

Define a DuckDB resource

To enable the usage of DuckDB in Dagster, a resource must be added. This can be achieved in the definitions.py file where you simply add a DuckDBResource from the dagster_duckdb package and reference the database. I will be using a database file in the data folder that was created in the previous step. The database will be created automatically if it does not exist.

1# definitions.py
2
3from dagster import Definitions, load_assets_from_modules  
4from dagster_duckdb import DuckDBResource  
5 
6import assets  
7 
8all_assets = load_assets_from_modules([assets])  
9 
10defs = Definitions(  
11    assets=all_assets,  
12    resources={  
13        "duckdb": DuckDBResource(database="data/data.duckdb")  
14    }  
15)

Creating Assets

Dagster encourages asset abstraction. Here’s how you can write simple read/write assets.

Writing a table from a pandas DataFrame

This example will load a pandas Dataframe into DuckDB for later processing. This is only an example to show the usage of python Dataframes directly inside SQL that will be used to fill the table with DuckDB.

1# dagster_duckdb_demo/assets/pandas_import.py
2
3import dagster as dg  
4import pandas as pd  
5from dagster_duckdb import DuckDBResource  
6 
7 
8def source_data() -> pd.DataFrame:  
9    return pd.DataFrame({  
10        "user_id": [1, 2, 3],  
11        "event": ["login", "purchase", "logout"],  
12        "timestamp": pd.to_datetime(["2025-05-01", "2025-05-02", "2025-05-03"])  
13    })  
14 
15 
16@dg.asset(  
17    kinds={"python", "pandas", "duckdb"}  
18)  
19def pandas_import(context: dg.AssetExecutionContext, duckdb: DuckDBResource) -> dg.MaterializeResult:  
20    example_df = source_data()  
21 
22    with duckdb.get_connection() as conn:  
23        conn.execute("CREATE OR REPLACE TABLE events AS SELECT * FROM example_df")  
24        preview_df = conn.execute("SELECT * FROM events limit 5").fetch_df()  
25        count = conn.execute("SELECT COUNT(1) FROM events").fetchone()[0]  
26 
27    return dg.MaterializeResult(  
28        metadata={  
29            "row_count": dg.MetadataValue.int(count),  
30            "preview": dg.MetadataValue.md(preview_df.to_markdown(index=False))  
31        }  
32    )

Importing Data from CSV

To show an alternative to the data import, an example CSV file is imported into the same DuckDB database with the following script. The file is directly referenced from the path so it must not be opened or imported before processing. It would also be possible to use a URL that is offering a CSV file.

1# dagster_duckdb_demo/assets/csv_import.py
2
3import dagster as dg  
4from dagster_duckdb import DuckDBResource  
5 
6 
7@dg.asset(  
8    kinds={"python", "duckdb"}  
9)  
10def csv_import(duckdb: DuckDBResource) -> dg.MaterializeResult:  
11    with duckdb.get_connection() as conn:  
12        conn.execute("""  
13            CREATE OR REPLACE TABLE example_data AS
14            SELECT *        	
15            FROM 'data/example.csv'  
16        """)  
17 
18        row_count = conn.execute("""  
19            SELECT COUNT(1)        	
20            FROM example_data    	
21        """).fetchone()[0]  
22 
23    return dg.MaterializeResult(  
24        metadata={  
25            "row_count": dg.MetadataValue.int(row_count)  
26        }  
27    )

Downstream Assets

To show the usage of downstream assets in Dagster with DuckDB, an additional script will be added. Pay attention to the dependency between the login_events and the pandas_import asset.

1# dagster_duckdb_demo/assets/csv_import.py
2
3import dagster as dg  
4from dagster_duckdb import DuckDBResource  
5 
6 
7@dg.asset(  
8    kinds={"python", "duckdb"},  
9    description="Select login events from events table for further processing",  
10    deps=["pandas_import"]  
11)  
12def login_events(duckdb: DuckDBResource) -> dg.MaterializeResult:  
13    with duckdb.get_connection() as conn:  
14        conn.execute("""
15            CREATE OR REPLACE TABLE login_events AS
16            SELECT *
17            FROM events
18            WHERE event = 'login'            	
19        """)  
20        login_event_count = conn.execute("""
21            SELECT COUNT(1)
22            FROM login_events
23        """).fetchone()[0]  
24 
25        return dg.MaterializeResult(  
26            metadata={  
27                "login_event_count": dg.MetadataValue.int(login_event_count)  
28            }  
29        )

Updating the Definitions and Materialize

The updated defitions.py file now contains the assets from the previous step.

1# dagster_duckdb_demo/defintions.py
2
3import dagster as dg  
4from dagster_duckdb import DuckDBResource  
5 
6from .assets import csv_import, login_events, pandas_import  
7 
8all_assets = dg.load_assets_from_modules([csv_import, login_events, pandas_import])  
9 
10defs = dg.Definitions(  
11    assets=all_assets,  
12    resources={  
13        "duckdb": DuckDBResource(database="data/data.duckdb")  
14    }  
15)

Launching the Dagster webserver via poetry run dagster dev will now show the following dependency graph: dagster+duckdb+article+dependency+graph.png

Materializing these assets should show the configured metadata after a successful run for each asset. Inspecting the DuckDB file via the CLI returns the following overview: dagster+duckdb+article+duckdb+tables.png

Conclusion

As we have now seen, integrating DuckDB into the Dagster workflow is quite simple. Once the resource is defined, it can be used for every asset easily and offers a capable API to directly save Dataframe or CSV files for further processing. In a next step, dbt could be added for data transformations or the local DuckDB instance could be swapped for a connection to MotherDuck to leverage their Cloud Data Warehouse features.

share post

//

More articles in this subject area

Discover exciting further topics and let the codecentric world inspire you.