Skip to content

eftoolkit.s3

S3 filesystem operations for parquet files.

Classes

S3FileSystem

S3FileSystem

S3FileSystem(
    *,
    access_key_id: str | None = None,
    secret_access_key: str | None = None,
    region: str | None = None,
    endpoint: str | None = None,
)

S3 filesystem client for reading/writing parquet files.

Falls back to environment variables if credentials are not provided
  • S3_ACCESS_KEY_ID / AWS_ACCESS_KEY_ID
  • S3_SECRET_ACCESS_KEY / AWS_SECRET_ACCESS_KEY
  • S3_REGION / AWS_REGION
  • S3_ENDPOINT

Initialize S3 filesystem.

Parameters:

Name Type Description Default
access_key_id str | None

AWS access key ID

None
secret_access_key str | None

AWS secret access key

None
region str | None

AWS region

None
endpoint str | None

Custom S3 endpoint (e.g., 'nyc3.digitaloceanspaces.com')

None
Source code in eftoolkit/s3/filesystem.py
def __init__(
    self,
    *,
    access_key_id: str | None = None,
    secret_access_key: str | None = None,
    region: str | None = None,
    endpoint: str | None = None,
) -> None:
    """Initialize S3 filesystem.

    Args:
        access_key_id: AWS access key ID
        secret_access_key: AWS secret access key
        region: AWS region
        endpoint: Custom S3 endpoint (e.g., 'nyc3.digitaloceanspaces.com')
    """
    self.access_key_id = access_key_id or os.getenv(
        'S3_ACCESS_KEY_ID', os.getenv('AWS_ACCESS_KEY_ID')
    )
    self.secret_access_key = secret_access_key or os.getenv(
        'S3_SECRET_ACCESS_KEY', os.getenv('AWS_SECRET_ACCESS_KEY')
    )
    self.region = region or os.getenv('S3_REGION', os.getenv('AWS_REGION'))
    self.endpoint = endpoint or os.getenv('S3_ENDPOINT')

    if not self.access_key_id or not self.secret_access_key:
        raise ValueError(
            'S3 credentials required. Pass access_key_id/secret_access_key '
            'or set S3_ACCESS_KEY_ID/S3_SECRET_ACCESS_KEY environment variables.'
        )

read_df_from_parquet

read_df_from_parquet(s3_uri: str) -> DataFrame

Read parquet file(s) from S3.

Supports both single files and directories containing parquet files.

Parameters:

Name Type Description Default
s3_uri str

S3 URI. Can be: - A URI ending in .parquet (reads that exact file) - A prefix/directory URI (reads all .parquet files and concatenates)

required

Returns:

Type Description
DataFrame

DataFrame with parquet contents

Source code in eftoolkit/s3/filesystem.py
def read_df_from_parquet(self, s3_uri: str) -> pd.DataFrame:
    """Read parquet file(s) from S3.

    Supports both single files and directories containing parquet files.

    Args:
        s3_uri: S3 URI. Can be:
            - A URI ending in .parquet (reads that exact file)
            - A prefix/directory URI (reads all .parquet files and concatenates)

    Returns:
        DataFrame with parquet contents
    """
    bucket, key = _parse_s3_uri(s3_uri)

    if key.endswith('.parquet'):
        # Single file read - use get_object
        data = self.get_object(s3_uri)
        return pd.read_parquet(io.BytesIO(data))

    # Key is a prefix - list all parquet files under it
    client = self._get_client()
    prefix = key.rstrip('/') + '/'
    paginator = client.get_paginator('list_objects_v2')
    parquet_keys = []

    for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
        for obj in page.get('Contents', []):
            if obj['Key'].endswith('.parquet'):
                parquet_keys.append(obj['Key'])

    if not parquet_keys:
        # Check if the prefix exists at all
        response = client.list_objects_v2(Bucket=bucket, Prefix=prefix, MaxKeys=1)
        if not response.get('Contents'):
            raise FileNotFoundError(
                f'{s3_uri} does not exist. '
                f'For single files, use a URI ending in .parquet'
            )
        raise FileNotFoundError(f'{s3_uri} exists but contains no .parquet files')

    dfs = []
    for pq_key in parquet_keys:
        data = self.get_object(f's3://{bucket}/{pq_key}')
        dfs.append(pd.read_parquet(io.BytesIO(data)))

    return pd.concat(dfs, ignore_index=True)

write_df_to_parquet

write_df_to_parquet(df: DataFrame, s3_uri: str) -> None

Write DataFrame as parquet to S3.

Parameters:

Name Type Description Default
df DataFrame

DataFrame to write

required
s3_uri str

S3 URI (e.g., 's3://bucket/path/file.parquet')

required

Raises:

Type Description
ValueError

If URI does not end with .parquet

Source code in eftoolkit/s3/filesystem.py
def write_df_to_parquet(self, df: pd.DataFrame, s3_uri: str) -> None:
    """Write DataFrame as parquet to S3.

    Args:
        df: DataFrame to write
        s3_uri: S3 URI (e.g., 's3://bucket/path/file.parquet')

    Raises:
        ValueError: If URI does not end with .parquet
    """
    _, key = _parse_s3_uri(s3_uri)
    if not key.endswith('.parquet'):
        raise ValueError(f"S3 URI must end with .parquet, got: '{s3_uri}'")

    buffer = io.BytesIO()
    df.to_parquet(buffer, engine='pyarrow', index=False)
    buffer.seek(0)

    self.put_object(
        s3_uri,
        buffer.getvalue(),
        content_type='application/octet-stream',
    )

file_exists

file_exists(s3_uri: str) -> bool

Check if object exists.

Parameters:

Name Type Description Default
s3_uri str

S3 URI (e.g., 's3://bucket/key')

required

Returns:

Type Description
bool

True if object exists

Source code in eftoolkit/s3/filesystem.py
def file_exists(self, s3_uri: str) -> bool:
    """Check if object exists.

    Args:
        s3_uri: S3 URI (e.g., 's3://bucket/key')

    Returns:
        True if object exists
    """
    bucket, key = _parse_s3_uri(s3_uri)
    client = self._get_client()
    try:
        client.head_object(Bucket=bucket, Key=key)
        return True
    except ClientError as e:
        if e.response['Error']['Code'] == '404':
            return False
        raise

ls

ls(
    s3_uri: str,
    *,
    recursive: bool = True,
    include_prefixes: bool = False,
) -> Iterator[S3Object]

List objects at an S3 URI.

Parameters:

Name Type Description Default
s3_uri str

S3 URI (e.g., 's3://bucket' or 's3://bucket/prefix')

required
recursive bool

If True, list all objects under prefix recursively. If False, list only files at the immediate level.

True
include_prefixes bool

If True and recursive=False, also yield prefix (directory) entries with is_prefix=True in metadata. Ignored when recursive=True.

False

Yields:

Type Description
S3Object

S3Object instances with metadata for each file (and prefix if requested)

Source code in eftoolkit/s3/filesystem.py
def ls(
    self,
    s3_uri: str,
    *,
    recursive: bool = True,
    include_prefixes: bool = False,
) -> Iterator[S3Object]:
    """List objects at an S3 URI.

    Args:
        s3_uri: S3 URI (e.g., 's3://bucket' or 's3://bucket/prefix')
        recursive: If True, list all objects under prefix recursively.
            If False, list only files at the immediate level.
        include_prefixes: If True and recursive=False, also yield prefix
            (directory) entries with is_prefix=True in metadata.
            Ignored when recursive=True.

    Yields:
        S3Object instances with metadata for each file (and prefix if requested)
    """
    bucket, prefix = _parse_s3_uri(s3_uri)
    client = self._get_client()

    if recursive:
        paginator = client.get_paginator('list_objects_v2')
        paginate_params = {'Bucket': bucket}
        if prefix:
            paginate_params['Prefix'] = prefix

        for page in paginator.paginate(**paginate_params):
            for obj in page.get('Contents', []):
                yield S3Object.from_boto_response(obj, bucket=bucket)
    else:
        # Non-recursive: use delimiter to get only immediate files/prefixes
        normalized_prefix = prefix.rstrip('/') + '/' if prefix else ''
        paginator = client.get_paginator('list_objects_v2')
        paginate_params = {
            'Bucket': bucket,
            'Prefix': normalized_prefix,
            'Delimiter': '/',
        }

        for page in paginator.paginate(**paginate_params):
            # Yield files at this level
            for obj in page.get('Contents', []):
                yield S3Object.from_boto_response(obj, bucket=bucket)

            # Optionally yield prefixes (directories)
            if include_prefixes:
                for prefix_entry in page.get('CommonPrefixes', []):
                    yield S3Object(
                        key=prefix_entry['Prefix'],
                        bucket=bucket,
                        metadata=S3ObjectMetadata(is_prefix=True),
                    )

cp

cp(src_uri: str, dst_uri: str) -> None

Copy an object within or across buckets.

Parameters:

Name Type Description Default
src_uri str

Source S3 URI (e.g., 's3://bucket/key')

required
dst_uri str

Destination S3 URI (e.g., 's3://bucket/key')

required

Raises:

Type Description
FileNotFoundError

If the source object does not exist

Source code in eftoolkit/s3/filesystem.py
def cp(self, src_uri: str, dst_uri: str) -> None:
    """Copy an object within or across buckets.

    Args:
        src_uri: Source S3 URI (e.g., 's3://bucket/key')
        dst_uri: Destination S3 URI (e.g., 's3://bucket/key')

    Raises:
        FileNotFoundError: If the source object does not exist
    """
    src_bucket, src_key = _parse_s3_uri(src_uri)
    dst_bucket, dst_key = _parse_s3_uri(dst_uri)
    client = self._get_client()
    try:
        client.copy_object(
            CopySource={'Bucket': src_bucket, 'Key': src_key},
            Bucket=dst_bucket,
            Key=dst_key,
        )
    except ClientError as e:
        if e.response['Error']['Code'] == 'NoSuchKey':
            raise FileNotFoundError(f'{src_uri} does not exist') from e
        raise

delete_object

delete_object(s3_uri: str) -> None

Delete an object from S3.

Parameters:

Name Type Description Default
s3_uri str

S3 URI (e.g., 's3://bucket/key')

required
Note

This is idempotent - deleting a non-existent object does not error.

Source code in eftoolkit/s3/filesystem.py
def delete_object(self, s3_uri: str) -> None:
    """Delete an object from S3.

    Args:
        s3_uri: S3 URI (e.g., 's3://bucket/key')

    Note:
        This is idempotent - deleting a non-existent object does not error.
    """
    bucket, key = _parse_s3_uri(s3_uri)
    client = self._get_client()
    client.delete_object(Bucket=bucket, Key=key)

put_object

put_object(
    s3_uri: str,
    body: bytes,
    *,
    content_type: str | None = None,
) -> None

Upload raw bytes to S3.

Parameters:

Name Type Description Default
s3_uri str

S3 URI (e.g., 's3://bucket/key')

required
body bytes

Raw bytes to upload

required
content_type str | None

Optional content type (e.g., 'application/octet-stream')

None
Source code in eftoolkit/s3/filesystem.py
def put_object(
    self,
    s3_uri: str,
    body: bytes,
    *,
    content_type: str | None = None,
) -> None:
    """Upload raw bytes to S3.

    Args:
        s3_uri: S3 URI (e.g., 's3://bucket/key')
        body: Raw bytes to upload
        content_type: Optional content type (e.g., 'application/octet-stream')
    """
    bucket, key = _parse_s3_uri(s3_uri)
    client = self._get_client()
    params = {'Bucket': bucket, 'Key': key, 'Body': body}
    if content_type:
        params['ContentType'] = content_type
    client.put_object(**params)

get_object

get_object(s3_uri: str) -> bytes

Download raw bytes from S3.

Parameters:

Name Type Description Default
s3_uri str

S3 URI (e.g., 's3://bucket/key')

required

Returns:

Type Description
bytes

Raw bytes of the object

Raises:

Type Description
FileNotFoundError

If the object does not exist

Source code in eftoolkit/s3/filesystem.py
def get_object(self, s3_uri: str) -> bytes:
    """Download raw bytes from S3.

    Args:
        s3_uri: S3 URI (e.g., 's3://bucket/key')

    Returns:
        Raw bytes of the object

    Raises:
        FileNotFoundError: If the object does not exist
    """
    bucket, key = _parse_s3_uri(s3_uri)
    client = self._get_client()
    try:
        response = client.get_object(Bucket=bucket, Key=key)
        return response['Body'].read()
    except ClientError as e:
        if e.response['Error']['Code'] == 'NoSuchKey':
            raise FileNotFoundError(f'{s3_uri} does not exist') from e
        raise

S3Object

S3Object dataclass

S3Object(key: str, bucket: str, metadata: S3ObjectMetadata)

Represents an S3 object with its location and metadata.

Attributes:

Name Type Description
key str

Object key (path within bucket)

bucket str

Bucket name

uri str

Full S3 URI (s3://bucket/key)

metadata S3ObjectMetadata

Object metadata (size, last_modified, etc.)

key instance-attribute

key: str

bucket instance-attribute

bucket: str

uri property

uri: str

Return the full S3 URI.

metadata instance-attribute

metadata: S3ObjectMetadata

S3ObjectMetadata

S3ObjectMetadata dataclass

S3ObjectMetadata(
    is_prefix: bool = False,
    last_modified_timestamp_utc: datetime | None = None,
    size: int | None = None,
    etag: str | None = None,
    storage_class: str | None = None,
)

Metadata for an S3 object from boto3 response.

Attributes:

Name Type Description
is_prefix bool

True if this represents a prefix/directory, not an actual object

last_modified_timestamp_utc datetime | None

When the object was last modified (UTC)

size int | None

Object size in bytes

etag str | None

Object ETag hash

storage_class str | None

S3 storage class (STANDARD, GLACIER, etc.)

is_prefix class-attribute instance-attribute

is_prefix: bool = False

last_modified_timestamp_utc class-attribute instance-attribute

last_modified_timestamp_utc: datetime | None = None

size class-attribute instance-attribute

size: int | None = None

etag class-attribute instance-attribute

etag: str | None = None

storage_class class-attribute instance-attribute

storage_class: str | None = None

items

items()

Yield key-value pairs of metadata fields.

Enables dict(metadata.items()) and for k, v in metadata.items().

Source code in eftoolkit/s3/filesystem.py
def items(self):
    """Yield key-value pairs of metadata fields.

    Enables dict(metadata.items()) and `for k, v in metadata.items()`.
    """
    yield ('is_prefix', self.is_prefix)
    yield ('last_modified_timestamp_utc', self.last_modified_timestamp_utc)
    yield ('size', self.size)
    yield ('etag', self.etag)
    yield ('storage_class', self.storage_class)