Skip to content

eftoolkit.sql

DuckDB wrapper with S3 integration.

Module Contents

DuckDB

DuckDB(
    database: str = ':memory:',
    *,
    s3: Optional[S3FileSystem] = None,
    s3_region: str | None = None,
    s3_access_key_id: str | None = None,
    s3_secret_access_key: str | None = None,
    s3_endpoint: str | None = None,
    s3_url_style: str | None = None,
)

Thin wrapper around duckdb.DuckDBPyConnection with S3 integration.

Inherits all native DuckDB methods (query, execute, sql, fetchone, fetchall, etc.) via delegation to the underlying connection.

S3 operations use eftoolkit.s3.S3FileSystem internally.

Initialize DuckDB with optional S3 integration.

Parameters:

Name Type Description Default
database str

Path to the database file or ':memory:' for in-memory database

':memory:'
s3 Optional[S3FileSystem]

Existing S3FileSystem instance to use for S3 operations

None
s3_region str | None

AWS region for S3 access (creates S3FileSystem internally)

None
s3_access_key_id str | None

AWS access key ID for S3 access

None
s3_secret_access_key str | None

AWS secret access key for S3 access

None
s3_endpoint str | None

Custom S3 endpoint

None
s3_url_style str | None

S3 URL style ('path' or 'vhost')

None
Source code in eftoolkit/sql/duckdb.py
def __init__(
    self,
    database: str = ':memory:',
    *,
    s3: Optional['S3FileSystem'] = None,
    s3_region: str | None = None,
    s3_access_key_id: str | None = None,
    s3_secret_access_key: str | None = None,
    s3_endpoint: str | None = None,
    s3_url_style: str | None = None,
):
    """Initialize DuckDB with optional S3 integration.

    Args:
        database: Path to the database file or ':memory:' for in-memory database
        s3: Existing S3FileSystem instance to use for S3 operations
        s3_region: AWS region for S3 access (creates S3FileSystem internally)
        s3_access_key_id: AWS access key ID for S3 access
        s3_secret_access_key: AWS secret access key for S3 access
        s3_endpoint: Custom S3 endpoint
        s3_url_style: S3 URL style ('path' or 'vhost')
    """
    self.database = database
    self._s3 = s3
    self.s3_region = s3_region
    self.s3_access_key_id = s3_access_key_id
    self.s3_secret_access_key = s3_secret_access_key
    self.s3_endpoint = s3_endpoint
    self.s3_url_style = s3_url_style
    self._active_conn: duckdb.DuckDBPyConnection | None = None

    # Create S3FileSystem from credentials if provided and no s3 instance given
    if self._s3 is None and s3_access_key_id and s3_secret_access_key:
        from eftoolkit.s3 import S3FileSystem

        self._s3 = S3FileSystem(
            access_key_id=s3_access_key_id,
            secret_access_key=s3_secret_access_key,
            region=s3_region,
            endpoint=s3_endpoint,
        )

connection property

connection: DuckDBPyConnection

Underlying DuckDB connection (for direct access to native API).

s3 property

s3: Optional[S3FileSystem]

S3FileSystem instance used for S3 operations, or None if not configured.

query

query(sql: str) -> DataFrame

Execute SQL and return DataFrame.

Parameters:

Name Type Description Default
sql str

SQL query to execute.

required

Returns:

Type Description
DataFrame

DataFrame containing the query results.

Example

db = DuckDB() df = db.query("SELECT 1 as id, 'Alice' as name") print(df) id name 0 1 Alice

Source code in eftoolkit/sql/duckdb.py
def query(self, sql: str) -> pd.DataFrame:
    """Execute SQL and return DataFrame.

    Args:
        sql: SQL query to execute.

    Returns:
        DataFrame containing the query results.

    Example:
        >>> db = DuckDB()
        >>> df = db.query("SELECT 1 as id, 'Alice' as name")
        >>> print(df)
           id   name
        0   1  Alice
    """
    with self._get_connection() as conn:
        return conn.query(sql).fetchdf()

execute

execute(sql: str, *args: object, **kwargs: object) -> None

Execute SQL without returning results.

This method can be used for any DuckDB SQL command, including: - DDL statements (CREATE, DROP, ALTER) - DML statements (INSERT, UPDATE, DELETE) - DuckDB COPY commands for S3 writes (e.g., COPY ... TO 's3://...')

Parameters:

Name Type Description Default
sql str

SQL statement to execute.

required
*args object

Positional arguments passed to duckdb execute.

()
**kwargs object

Keyword arguments passed to duckdb execute.

{}
Source code in eftoolkit/sql/duckdb.py
def execute(self, sql: str, *args: object, **kwargs: object) -> None:
    """Execute SQL without returning results.

    This method can be used for any DuckDB SQL command, including:
    - DDL statements (CREATE, DROP, ALTER)
    - DML statements (INSERT, UPDATE, DELETE)
    - DuckDB COPY commands for S3 writes (e.g., COPY ... TO 's3://...')

    Args:
        sql: SQL statement to execute.
        *args: Positional arguments passed to duckdb execute.
        **kwargs: Keyword arguments passed to duckdb execute.
    """
    with self._get_connection() as conn:
        conn.execute(sql, *args, **kwargs)

get_table

get_table(
    table_name: str, where: str | None = None
) -> DataFrame

SELECT * FROM table with optional WHERE clause.

Automatically cleans inf/nan values to None.

Parameters:

Name Type Description Default
table_name str

Name of the table to query.

required
where str | None

Optional WHERE clause (without 'WHERE' keyword).

None

Returns:

Type Description
DataFrame

DataFrame with table contents.

Example

db = DuckDB() db.create_table('users', "SELECT 1 as id, 'Alice' as name") df = db.get_table('users') filtered = db.get_table('users', where="id = 1")

Source code in eftoolkit/sql/duckdb.py
def get_table(self, table_name: str, where: str | None = None) -> pd.DataFrame:
    """SELECT * FROM table with optional WHERE clause.

    Automatically cleans inf/nan values to None.

    Args:
        table_name: Name of the table to query.
        where: Optional WHERE clause (without 'WHERE' keyword).

    Returns:
        DataFrame with table contents.

    Example:
        >>> db = DuckDB()
        >>> db.create_table('users', "SELECT 1 as id, 'Alice' as name")
        >>> df = db.get_table('users')
        >>> filtered = db.get_table('users', where="id = 1")
    """
    where_clause = f' WHERE {where}' if where else ''
    df = self.query(f'SELECT * FROM {table_name}{where_clause}')
    return self._clean_df(df)

create_table

create_table(table_name: str, sql: str) -> None

CREATE OR REPLACE TABLE from SQL.

Parameters:

Name Type Description Default
table_name str

Name for the new table.

required
sql str

SQL SELECT statement to define table contents.

required
Example

db = DuckDB() db.create_table('active_users', "SELECT * FROM users WHERE active = true")

Source code in eftoolkit/sql/duckdb.py
def create_table(self, table_name: str, sql: str) -> None:
    """CREATE OR REPLACE TABLE from SQL.

    Args:
        table_name: Name for the new table.
        sql: SQL SELECT statement to define table contents.

    Example:
        >>> db = DuckDB()
        >>> db.create_table('active_users', "SELECT * FROM users WHERE active = true")
    """
    self.execute(f'CREATE OR REPLACE TABLE {table_name} AS ({sql})')

create_table_from_df

create_table_from_df(
    table_name: str, df: DataFrame
) -> None

CREATE OR REPLACE TABLE from DataFrame.

Parameters:

Name Type Description Default
table_name str

Name for the new table.

required
df DataFrame

DataFrame to store as a table.

required
Example

import pandas as pd db = DuckDB() df = pd.DataFrame({'id': [1, 2], 'name': ['Alice', 'Bob']}) db.create_table_from_df('users', df)

Source code in eftoolkit/sql/duckdb.py
def create_table_from_df(self, table_name: str, df: pd.DataFrame) -> None:
    """CREATE OR REPLACE TABLE from DataFrame.

    Args:
        table_name: Name for the new table.
        df: DataFrame to store as a table.

    Example:
        >>> import pandas as pd
        >>> db = DuckDB()
        >>> df = pd.DataFrame({'id': [1, 2], 'name': ['Alice', 'Bob']})
        >>> db.create_table_from_df('users', df)
    """
    with self._get_connection() as conn:
        conn.register('temp_df', df)
        conn.execute(
            f'CREATE OR REPLACE TABLE {table_name} AS (SELECT * FROM temp_df)'
        )

read_parquet_from_s3

read_parquet_from_s3(s3_uri: str) -> DataFrame

Read parquet from S3.

Parameters:

Name Type Description Default
s3_uri str

S3 URI (e.g., 's3://bucket/path/file.parquet')

required

Returns:

Type Description
DataFrame

DataFrame with parquet contents

Raises:

Type Description
ValueError

If S3 is not configured

Source code in eftoolkit/sql/duckdb.py
def read_parquet_from_s3(self, s3_uri: str) -> pd.DataFrame:
    """Read parquet from S3.

    Args:
        s3_uri: S3 URI (e.g., 's3://bucket/path/file.parquet')

    Returns:
        DataFrame with parquet contents

    Raises:
        ValueError: If S3 is not configured
    """
    if self._s3 is None:
        raise ValueError(
            'S3 not configured. Pass s3= or S3 credentials to __init__'
        )
    return self._s3.read_df_from_parquet(s3_uri)

write_df_to_s3_parquet

write_df_to_s3_parquet(df: DataFrame, s3_uri: str) -> None

Write DataFrame to S3 as parquet.

Parameters:

Name Type Description Default
df DataFrame

DataFrame to write

required
s3_uri str

S3 URI (e.g., 's3://bucket/path/file.parquet')

required

Raises:

Type Description
ValueError

If S3 is not configured

Source code in eftoolkit/sql/duckdb.py
def write_df_to_s3_parquet(self, df: pd.DataFrame, s3_uri: str) -> None:
    """Write DataFrame to S3 as parquet.

    Args:
        df: DataFrame to write
        s3_uri: S3 URI (e.g., 's3://bucket/path/file.parquet')

    Raises:
        ValueError: If S3 is not configured
    """
    if self._s3 is None:
        raise ValueError(
            'S3 not configured. Pass s3= or S3 credentials to __init__'
        )
    self._s3.write_df_to_parquet(df, s3_uri)

close

close() -> None

Close the active connection if one exists.

Source code in eftoolkit/sql/duckdb.py
def close(self) -> None:
    """Close the active connection if one exists."""
    if self._active_conn is not None:
        self._active_conn.close()
        self._active_conn = None