DuckDB has rapidly emerged as a popular in-process analytics database. Dagster, on the other hand, is a modern data orchestration framework that makes it easy to build and manage data pipelines. Combining Dagster with DuckDB allows data engineers to build pipelines where data is stored in DuckDB tables or queried via DuckDB’s SQL engine – all within a Dagster workflow. This guide will walk you through setting up a new Dagster project from scratch with DuckDB integration. It will demonstrate two practical Dagster assets: one that inserts a Pandas DataFrame into a DuckDB database, and another that reads data from files (CSV) using DuckDB’s SQL capabilities. The goal is to provide a clear, working example that you can use as a foundation for your own projects.
Basic Setup and Dependencies
Create a new Dagster project
I am using poetry to manage my dependencies. Install these dependencies to be able to follow the next steps: poetry add dagster dagster-duckdb dagster-webserver pandas
This leaves us with the project structure below after setting up the dagster project with dagster project scaffold --name dagster_duckdb_demo
. I added a folder called data
. It will become necessary in the next section.
1.
2└── dagster_duckdb_demo
3 ├── dagster_duckdb_demo
4 │ ├── __init__.py
5 │ ├── assets.py
6 │ └── definitions.py
7 ├── dagster_duckdb_demo_tests
8 │ ├── __init__.py
9 │ └── test_assets.py
10 ├── data
11 ├── poetry.lock
12 ├── pyproject.toml
13 ├── README.md
14 ├── setup.cfg
15 └── setup.py
You should also have DuckDB installed on your machine in order to inspect the database with the CLI.
Define a DuckDB resource
To enable the usage of DuckDB in Dagster, a resource must be added. This can be achieved in the definitions.py
file where you simply add a DuckDBResource
from the dagster_duckdb
package and reference the database. I will be using a database file in the data
folder that was created in the previous step. The database will be created automatically if it does not exist.
1# definitions.py 2 3from dagster import Definitions, load_assets_from_modules 4from dagster_duckdb import DuckDBResource 5 6import assets 7 8all_assets = load_assets_from_modules([assets]) 9 10defs = Definitions( 11 assets=all_assets, 12 resources={ 13 "duckdb": DuckDBResource(database="data/data.duckdb") 14 } 15)
Creating Assets
Dagster encourages asset abstraction. Here’s how you can write simple read/write assets.
Writing a table from a pandas DataFrame
This example will load a pandas Dataframe into DuckDB for later processing. This is only an example to show the usage of python Dataframes directly inside SQL that will be used to fill the table with DuckDB.
1# dagster_duckdb_demo/assets/pandas_import.py
2
3import dagster as dg
4import pandas as pd
5from dagster_duckdb import DuckDBResource
6
7
8def source_data() -> pd.DataFrame:
9 return pd.DataFrame({
10 "user_id": [1, 2, 3],
11 "event": ["login", "purchase", "logout"],
12 "timestamp": pd.to_datetime(["2025-05-01", "2025-05-02", "2025-05-03"])
13 })
14
15
16@dg.asset(
17 kinds={"python", "pandas", "duckdb"}
18)
19def pandas_import(context: dg.AssetExecutionContext, duckdb: DuckDBResource) -> dg.MaterializeResult:
20 example_df = source_data()
21
22 with duckdb.get_connection() as conn:
23 conn.execute("CREATE OR REPLACE TABLE events AS SELECT * FROM example_df")
24 preview_df = conn.execute("SELECT * FROM events limit 5").fetch_df()
25 count = conn.execute("SELECT COUNT(1) FROM events").fetchone()[0]
26
27 return dg.MaterializeResult(
28 metadata={
29 "row_count": dg.MetadataValue.int(count),
30 "preview": dg.MetadataValue.md(preview_df.to_markdown(index=False))
31 }
32 )
Importing Data from CSV
To show an alternative to the data import, an example CSV file is imported into the same DuckDB database with the following script. The file is directly referenced from the path so it must not be opened or imported before processing. It would also be possible to use a URL that is offering a CSV file.
1# dagster_duckdb_demo/assets/csv_import.py
2
3import dagster as dg
4from dagster_duckdb import DuckDBResource
5
6
7@dg.asset(
8 kinds={"python", "duckdb"}
9)
10def csv_import(duckdb: DuckDBResource) -> dg.MaterializeResult:
11 with duckdb.get_connection() as conn:
12 conn.execute("""
13 CREATE OR REPLACE TABLE example_data AS
14 SELECT *
15 FROM 'data/example.csv'
16 """)
17
18 row_count = conn.execute("""
19 SELECT COUNT(1)
20 FROM example_data
21 """).fetchone()[0]
22
23 return dg.MaterializeResult(
24 metadata={
25 "row_count": dg.MetadataValue.int(row_count)
26 }
27 )
Downstream Assets
To show the usage of downstream assets in Dagster with DuckDB, an additional script will be added. Pay attention to the dependency between the login_events
and the pandas_import
asset.
1# dagster_duckdb_demo/assets/csv_import.py
2
3import dagster as dg
4from dagster_duckdb import DuckDBResource
5
6
7@dg.asset(
8 kinds={"python", "duckdb"},
9 description="Select login events from events table for further processing",
10 deps=["pandas_import"]
11)
12def login_events(duckdb: DuckDBResource) -> dg.MaterializeResult:
13 with duckdb.get_connection() as conn:
14 conn.execute("""
15 CREATE OR REPLACE TABLE login_events AS
16 SELECT *
17 FROM events
18 WHERE event = 'login'
19 """)
20 login_event_count = conn.execute("""
21 SELECT COUNT(1)
22 FROM login_events
23 """).fetchone()[0]
24
25 return dg.MaterializeResult(
26 metadata={
27 "login_event_count": dg.MetadataValue.int(login_event_count)
28 }
29 )
Updating the Definitions and Materialize
The updated defitions.py
file now contains the assets from the previous step.
1# dagster_duckdb_demo/defintions.py 2 3import dagster as dg 4from dagster_duckdb import DuckDBResource 5 6from .assets import csv_import, login_events, pandas_import 7 8all_assets = dg.load_assets_from_modules([csv_import, login_events, pandas_import]) 9 10defs = dg.Definitions( 11 assets=all_assets, 12 resources={ 13 "duckdb": DuckDBResource(database="data/data.duckdb") 14 } 15)
Launching the Dagster webserver via poetry run dagster dev
will now show the following dependency graph:
Materializing these assets should show the configured metadata after a successful run for each asset. Inspecting the DuckDB file via the CLI returns the following overview:
Conclusion
As we have now seen, integrating DuckDB into the Dagster workflow is quite simple. Once the resource is defined, it can be used for every asset easily and offers a capable API to directly save Dataframe or CSV files for further processing. In a next step, dbt could be added for data transformations or the local DuckDB instance could be swapped for a connection to MotherDuck to leverage their Cloud Data Warehouse features.
More articles
fromHendrik Kamp
Your job at codecentric?
Jobs
Agile Developer und Consultant (w/d/m)
Alle Standorte
More articles in this subject area
Discover exciting further topics and let the codecentric world inspire you.
Blog author
Hendrik Kamp
IT Consultant
Do you still have questions? Just send me a message.
Do you still have questions? Just send me a message.