---
title: 30+ SQL databases (powered by SQLAlchemy)
description: SQLAlchemy destination
keywords: [sql, sqlalchemy, database, destination]
---

# SQLAlchemy destination

The SQLAlchemy destination allows you to use any database that has an [SQLAlchemy dialect](https://docs.sqlalchemy.org/en/20/dialects/) implemented as a destination.

Currently, MySQL and SQLite are considered to have full support and are tested as part of the `dlt` CI suite. Other dialects are not tested but should generally work.

## Install dlt with SQLAlchemy

Install dlt with the `sqlalchemy` extra dependency:

```sh
pip install "dlt[sqlalchemy]"
```

Note that database drivers are not included and need to be installed separately for the database you plan on using. For example, for MySQL:

```sh
pip install mysqlclient
```

Refer to the [SQLAlchemy documentation on dialects](https://docs.sqlalchemy.org/en/20/dialects/) for information about client libraries required for supported databases.

## Destination capabilities
The following table shows the capabilities of the Sqlalchemy destination:

| Feature | Value | More |
|---------|-------|------|
| Preferred loader file format | typed-jsonl | [File formats](../file-formats/) |
| Supported loader file formats | typed-jsonl, parquet, model | [File formats](../file-formats/) |
| Has case sensitive identifiers | True | [Naming convention](../../general-usage/naming-convention#case-sensitive-and-insensitive-destinations) |
| Supported merge strategies | delete-insert, scd2 | [Merge strategy](../../general-usage/merge-loading#merge-strategies) |
| Supported replace strategies | truncate-and-insert, insert-from-staging | [Replace strategy](../../general-usage/full-loading#choosing-the-correct-replace-strategy-for-your-full-load) |
| Supports tz aware datetime | True | [Timestamps and Timezones](../../general-usage/schema#handling-of-timestamp-and-time-zones) |
| Supports naive datetime | True | [Timestamps and Timezones](../../general-usage/schema#handling-of-timestamp-and-time-zones) |

*This table shows the supported features of the Sqlalchemy destination in dlt.*


### Create a pipeline

**1. Initialize a project with a pipeline that loads to MS SQL by running:**
```sh
dlt init chess sqlalchemy
```

**2. Install the necessary dependencies for SQLAlchemy by running:**
```sh
pip install -r requirements.txt
```
or run:
```sh
pip install "dlt[sqlalchemy]"
```

**3. Install your database client library.**

E.g., for MySQL:
```sh
pip install mysqlclient
```

**4. Enter your credentials into `.dlt/secrets.toml`.**

For example, replace with your database connection info:
```toml
[destination.sqlalchemy.credentials]
database = "dlt_data"
username = "loader"
password = "<password>"
host = "localhost"
port = 3306
driver_name = "mysql"
```

Alternatively, a valid SQLAlchemy database URL can be used, either in `secrets.toml` or as an environment variable.
E.g.

```toml
[destination.sqlalchemy]
credentials = "mysql://loader:<password>@localhost:3306/dlt_data"
```

or

```sh
export DESTINATION__SQLALCHEMY__CREDENTIALS="mysql://loader:<password>@localhost:3306/dlt_data"
```

An SQLAlchemy `Engine` can also be passed directly by creating an instance of the destination:

```py
import sqlalchemy as sa
import dlt

engine = sa.create_engine('sqlite:///chess_data.db')

pipeline = dlt.pipeline(
    pipeline_name='chess',
    destination=dlt.destinations.sqlalchemy(engine),
    dataset_name='main'
)
```

### Passing SQLAlchemy Engine Options: `engine_kwargs`

The SQLAlchemy destination accepts an optional `engine_kwargs` parameter, which is forwarded directly to `sqlalchemy.create_engine`.
The equivalent `engine_args` parameter is maintained for backward compatibility, but will be removed in a future release.

Example enabling SQLAlchemy verbose logging:
#### In `.dlt/secrets.toml`
```toml
[destination.sqlalchemy]
credentials = "sqlite:///logger.db"
```

#### In `.dlt/config.toml`
```toml
[destination.sqlalchemy.engine_kwargs]
echo = true
```

Or, directly in code:

```py
import logging
import dlt
from dlt.destinations import sqlalchemy

logging.basicConfig(level=logging.INFO)

dest = sqlalchemy(
    credentials="sqlite:///logger.db",
    engine_kwargs={"echo": True},
)

pipeline = dlt.pipeline(
    pipeline_name='logger',
    destination=dest,
    dataset_name='main'
)

pipeline.run(
    [
        {'id': 1},
        {'id': 2},
        {'id': 3},
    ],
    table_name="logger"
)
```

Here, `engine_kwargs` configures only the engine used by SQLAlchemy as a **destination**. It does not affect resource extraction (use `engine_kwargs` for sql sources, see [here](../verified-sources/sql_database/configuration.md#passing-sqlalchemy-engine-options-engine_kwargs)).

## Notes on SQLite

### Dataset files
When using an SQLite database file, each dataset is stored in a separate file since SQLite does not support multiple schemas in a single database file.
Under the hood, this uses [`ATTACH DATABASE`](https://www.sqlite.org/lang_attach.html).

The file is stored in the same directory as the main database file (provided by your database URL).

E.g., if your SQLite URL is `sqlite:////home/me/data/chess_data.db` and your `dataset_name` is `games`, the data
is stored in `/home/me/data/chess_data__games.db`

**Note**: If the dataset name is `main`, no additional file is created as this is the default SQLite database.

### In-memory databases
In-memory databases require a persistent connection as the database is destroyed when the connection is closed.
Normally, connections are opened and closed for each load job and in other stages during the pipeline run.
To ensure the database persists throughout the pipeline run, you need to pass in an SQLAlchemy `Engine` object instead of credentials.
This engine is not disposed of automatically by `dlt`.

#### Shared-cache URI mode (recommended)

The recommended approach uses SQLite's [shared-cache URI](https://www.sqlite.org/inmemorydb.html) format.
This creates a named in-memory database that can be safely accessed by multiple connections across threads
via `SingletonThreadPool` (one connection per thread):

```py
import dlt
import sqlalchemy as sa

engine = sa.create_engine(
    "sqlite:///file:shared?mode=memory&cache=shared&uri=true",
    connect_args={"check_same_thread": False},
    poolclass=sa.pool.SingletonThreadPool,
)

pipeline = dlt.pipeline(
    "my_pipeline",
    destination=dlt.destinations.sqlalchemy(engine),
    dataset_name="main",
)

pipeline.run([1, 2, 3], table_name="my_table")

with engine.connect() as conn:
    result = conn.execute(sa.text("SELECT * FROM my_table"))
    print(result.fetchall())

engine.dispose()
```

- `mode=memory&cache=shared` creates a named in-memory database shared across all connections in the process.
- `uri=true` is required for `pysqlite` to interpret the database path as a URI.
- `SingletonThreadPool` gives each thread its own connection while all threads see the same data.

#### StaticPool with single worker

Alternatively, you can use `StaticPool` (single shared connection) with `workers=1` to avoid concurrent
access on the same connection:

```py
import dlt
import sqlalchemy as sa

engine = sa.create_engine(
    "sqlite:///:memory:",
    connect_args={"check_same_thread": False},
    poolclass=sa.pool.StaticPool,
)

pipeline = dlt.pipeline(
    "my_pipeline",
    destination=dlt.destinations.sqlalchemy(engine),
    dataset_name="main",
)

pipeline.run([1, 2, 3], table_name="my_table", loader_file_format="typed-jsonl")

with engine.connect() as conn:
    result = conn.execute(sa.text("SELECT * FROM my_table"))
    print(result.fetchall())

engine.dispose()
```

```toml
[load]
workers=1
```

:::caution
With `StaticPool`, all threads share a single underlying database connection.
Using the default parallel loader (`workers > 1`) can cause race conditions where
committed data appears missing. Always set `workers=1` when using `StaticPool`.
:::

### Database locking with `ATTACH DATABASE` on Windows
When `dataset_name` is not `main`, dlt uses SQLite's `ATTACH DATABASE` to store each dataset in a separate file. On Windows, a second `ATTACH` on the same connection can lock indefinitely under concurrent access (e.g. when using the default parallel loading strategy).

To work around this issue, use one of the following approaches:

1. **Set `dataset_name` to `main`** so that no `ATTACH` is needed:
   ```py
   pipeline = dlt.pipeline(
       pipeline_name='my_pipeline',
       destination=dlt.destinations.sqlalchemy(credentials="sqlite:///my_data.db"),
       dataset_name='main'
   )
   ```

2. **Use sequential loading** to avoid concurrent `ATTACH` calls:
   ```toml
   [load]
   workers=1
   ```

## Notes on other dialects
We tested this destination on **mysql**, **sqlite**, **oracledb** and **mssql** dialects. Below are a few notes that may help enabling other dialects:
1. `dlt` must be able to recognize if a database exception relates to non existing entity (like table or schema). We put
some work to recognize those for most of the popular dialects (look for `db_api_client.py`)
2. Primary keys and unique constraints are not created by default to avoid problems with particular dialects.
3. `merge` write disposition uses only `DELETE` and `INSERT` operations to enable as many dialects as possible.

Please report issues with particular dialects. We'll try to make them work.

### Trino limitations
* Trino dialect does not case fold identifiers. Use `snake_case` naming convention only.
* Trino does not support merge/scd2 write disposition (or you somehow create PRIMARY KEYs on engine tables)
* We convert JSON and BINARY types are cast to STRING (dialect seems to have a conversion bug)
* Trino does not support PRIMARY/UNIQUE constraints

### Oracle limitations
* In Oracle, regular (non-DBA, non-SYS/SYSOPS) users are assigned one schema on user creation, and usually cannot create other schemas. For features requiring staging datasets you should either ensure schema creation rights for the DB user or exactly specify existing schema to be used for staging dataset. See [staging dataset documentation](../staging.md#staging-dataset) for more details


### Adapting destination for a dialect

#### Quick approach: pass `type_mapper` directly
You can adapt destination capabilities for a particular dialect [by passing your custom settings](../../general-usage/destination.md#pass-additional-parameters-and-change-destination-capabilities). In the example below we pass custom `TypeMapper` that
converts `json` data into `text` on the fly.
```py
from dlt.common import json

import dlt
import sqlalchemy as sa
from dlt.destinations.impl.sqlalchemy.type_mapper import SqlalchemyTypeMapper

class JSONString(sa.TypeDecorator):
    """
    A custom SQLAlchemy type that stores JSON data as a string in the database.
    Automatically serializes Python objects to JSON strings on write and
    deserializes JSON strings back to Python objects on read.
    """

    impl = sa.String
    cache_ok = True

    def process_bind_param(self, value, dialect):
        if value is None:
            return None

        return json.dumps(value)

    def process_result_value(self, value, dialect):
        if value is None:
            return None

        return json.loads(value)

class TrinoTypeMapper(SqlalchemyTypeMapper):
    """Example mapper that plugs custom string type that serialized to from/json

    Note that instance of TypeMapper contains dialect and destination capabilities instance
    for a deeper integration
    """

    def _db_type_from_json_type(self, column, table=None):
        return JSONString()

# pass dest_ in `destination` argument to dlt.pipeline
dest_ = dlt.destinations.sqlalchemy(type_mapper=TrinoTypeMapper)
```

The `SqlalchemyTypeMapper` dispatches to per-type visitor methods (`db_type_from_text_type`, `db_type_from_json_type`, `db_type_from_bool_type`, etc.), so you only need to override the type(s) you want to customize. You can also override `to_destination_type()` directly for full control.

Custom type mapper is also useful when you want to limit the length of the string. Below we are adding variant
for `mssql` dialect:
```py
import sqlalchemy as sa
from dlt.destinations.impl.sqlalchemy.type_mapper import SqlalchemyTypeMapper

class CustomMssqlTypeMapper(SqlalchemyTypeMapper):
    """This is only an illustration, `sqlalchemy` destination already handles mssql types"""

    def db_type_from_text_type(self, column, table=None):
        type_ = super().db_type_from_text_type(column, table)
        length = column.get("precision")
        if length is None:
            return type_.with_variant(sa.UnicodeText(), "mssql")  # type: ignore[no-any-return]
        else:
            return type_.with_variant(sa.Unicode(length=length), "mssql")  # type: ignore[no-any-return]
```

:::warning
When extending type mapper for mssql, mysql and trino start with MssqlVariantTypeMapper, MysqlVariantTypeMapper and
TrinoVariantTypeMapper respectively
:::

#### Full approach: register custom dialect capabilities

For a more comprehensive integration, you can register a `DialectCapabilities` class for your database backend. This allows you to customize type mapping, destination capabilities, table structure, and error handling — all in one place. Registered capabilities are automatically applied when the SQLAlchemy destination connects to a matching database.

```py
from typing import Optional, Type

import sqlalchemy as sa
from dlt.common.destination.capabilities import DataTypeMapper, DestinationCapabilitiesContext
from dlt.common.destination.typing import PreparedTableSchema
from dlt.destinations.impl.sqlalchemy.dialect import (
    DialectCapabilities,
    register_dialect_capabilities,
)
from dlt.destinations.impl.sqlalchemy.type_mapper import SqlalchemyTypeMapper


class MyTypeMapper(SqlalchemyTypeMapper):
    """Override only the types you need to customize."""

    def _db_type_from_json_type(self, column, table=None):
        # store JSON as VARCHAR instead of native JSON
        return sa.String(length=4000)


class MyDialectCapabilities(DialectCapabilities):
    def adjust_capabilities(
        self, caps: DestinationCapabilitiesContext, dialect: sa.engine.interfaces.Dialect
    ) -> None:
        caps.max_identifier_length = 128
        caps.max_column_identifier_length = 128
        caps.sqlglot_dialect = "oracle"  # type: ignore[assignment]

    def type_mapper_class(self) -> Optional[Type[DataTypeMapper]]:
        return MyTypeMapper

    def adapt_table(
        self, table: sa.Table, table_schema: PreparedTableSchema
    ) -> sa.Table:
        # Example: reorder columns so primary key columns come first.
        # Some databases (e.g. StarRocks) require this ordering.
        pk_col_names = [c.name for c in table.primary_key.columns]
        if not pk_col_names:
            return table
        pk_cols = [c for c in table.columns if c.name in pk_col_names]
        other_cols = [c for c in table.columns if c.name not in pk_col_names]
        if [c.name for c in table.columns] == [c.name for c in pk_cols + other_cols]:
            return table  # already in order
        schema = table.schema
        name = table.name
        metadata = table.metadata
        metadata.remove(table)
        return sa.Table(
            name, metadata,
            *[c.copy() for c in pk_cols + other_cols],
            sa.PrimaryKeyConstraint(*pk_col_names),
            schema=schema,
        )

    def is_undefined_relation(self, e: Exception) -> Optional[bool]:
        # return True if the exception means table/schema doesn't exist
        # return False to prevent default pattern matching
        # return None to fall through to built-in patterns
        if "MY_CUSTOM_MISSING_TABLE_CODE" in str(e):
            return True
        return None


# register for your backend name (as shown in the SQLAlchemy connection URL)
register_dialect_capabilities("my_dialect", MyDialectCapabilities)
```

After registration, any pipeline using a `my_dialect://` connection URL will automatically use the custom capabilities. No additional configuration is needed.

The `DialectCapabilities` class supports four extension points:

| Method | Description |
| --- | --- |
| `adjust_capabilities` | Modify destination capabilities (identifier lengths, timestamp precision, sqlglot dialect, etc.) |
| `type_mapper_class` | Return a custom `DataTypeMapper` subclass for the dialect |
| `adapt_table` | Modify `sa.Table` objects before they are created or used for loading (e.g. reorder columns for StarRocks) |
| `is_undefined_relation` | Classify exceptions as "table/schema not found" errors for the dialect |

:::tip
Passing `type_mapper=` directly to `dlt.destinations.sqlalchemy()` always takes precedence over the registered dialect capabilities. Use direct passing for one-off overrides and registration for reusable dialect support.
:::


## Write dispositions

The following write dispositions are supported:

- `append`
- `replace` with `truncate-and-insert` and `insert-from-staging` replace strategies. `staging-optimized` falls back to `insert-from-staging`.
- `merge` with `delete-insert` and `scd2` merge strategies.

## Data loading

### Fast loading with parquet

[parquet](../file-formats/parquet.md) file format is supported via [ADBC driver](https://arrow.apache.org/adbc/) for **mysql**.
The driver is provided by [Columnar](https://columnar.tech/). To install it you'll need `dbc` which is a tool to manage ADBC drivers:
```sh
pip install adbc-driver-manager dbc
dbc install mysql
```

with `uv` you can run `dbc` directly:
```sh
uv tool run dbc install mysql
```

You must have the correct driver installed and `loader_file_format` set to `parquet` in order to use ADBC. If driver is not found,
`dlt` will convert parquet into INSERT statements.

We copy parquet files with batches of size of 1 row group. All groups are copied in a single transaction.

:::caution
The ADBC driver is based on go-mysql. We do minimal conversion of connection strings from SQLAlchemy (ssl cert settings for mysql).
:::

#### Why ADBC is not supported for SQLite

ADBC is disabled for SQLite because Python's `sqlite3` module and `adbc_driver_sqlite` bundle different SQLite library versions.
When both libraries operate on the same database file in WAL mode, they have conflicting memory-mapped views of the
WAL index file (`-shm`), causing data corruption. See [TensorBoard issue #1467](https://github.com/tensorflow/tensorboard/issues/1467)
for details on this two-library conflict.

For SQLite, parquet files are loaded using batch INSERT statements instead.

### Loading with SqlAlchemy batch INSERTs

Data is loaded in a dialect-agnostic manner with an `insert` statement generated by SQLAlchemy's core API.
Rows are inserted in batches as long as the underlying database driver supports it. By default, the batch size is 10,000 rows.

## Syncing of `dlt` state

This destination fully supports [dlt state sync](../../general-usage/state#syncing-state-with-destination).

### Data types

All `dlt` data types are supported, but how they are stored in the database depends on the SQLAlchemy dialect.
For example, SQLite does not have `DATETIME` or `TIMESTAMP` types, so `timestamp` columns are stored as `TEXT` in ISO 8601 format.

## Supported file formats

* [typed-jsonl](../file-formats/jsonl.md) is used by default. JSON-encoded data with typing information included.
* [Parquet](../file-formats/parquet.md) is supported.

## Supported column hints
No indexes or constraints are created on the table. You can enable the following via destination configuration
```toml
[destination.sqlalchemy]
create_unique_indexes=true
create_primary_keys=true
```
* `unique` hints are translated to `UNIQUE` constraints via SQLAlchemy.
* `primary_key` hints are translated to `PRIMARY KEY` constraints via SQLAlchemy.
