Skip to content

S3 Operations

The S3FileSystem class provides S3 operations optimized for parquet files.

Overview

from eftoolkit import S3FileSystem

s3 = S3FileSystem(
    access_key_id='...',
    secret_access_key='...',
    region='us-east-1',
)

Configuration

Explicit Credentials

s3 = S3FileSystem(
    access_key_id='AKIAIOSFODNN7EXAMPLE',
    secret_access_key='wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY',
    region='us-east-1',
)

Environment Variables

Credentials can come from environment variables:

Variable Fallback Description
S3_ACCESS_KEY_ID AWS_ACCESS_KEY_ID Access key
S3_SECRET_ACCESS_KEY AWS_SECRET_ACCESS_KEY Secret key
S3_REGION AWS_REGION AWS region
S3_ENDPOINT - Custom endpoint
# Uses environment variables
s3 = S3FileSystem()

Custom Endpoints

For S3-compatible services:

s3 = S3FileSystem(
    access_key_id='...',
    secret_access_key='...',
    region='nyc3',
    endpoint='nyc3.digitaloceanspaces.com',
)

Reading Parquet

Single File

df = s3.read_df_from_parquet('s3://my-bucket/data.parquet')

Directory (Multiple Files)

# Reads all .parquet files in the directory and concatenates
df = s3.read_df_from_parquet('s3://my-bucket/data/')

Writing Parquet

import pandas as pd

df = pd.DataFrame({'id': [1, 2], 'value': ['a', 'b']})
s3.write_df_to_parquet(df, 's3://my-bucket/output.parquet')

File Extension Required

The S3 URI must end with .parquet.

File Operations

Check Existence

if s3.file_exists('s3://my-bucket/data.parquet'):
    print("File exists!")

Copy

s3.cp('s3://bucket/source.parquet', 's3://bucket/dest.parquet')

# Cross-bucket copy
s3.cp('s3://bucket-a/data.parquet', 's3://bucket-b/data.parquet')

Delete

s3.delete_object('s3://my-bucket/old-file.parquet')

# Idempotent - no error if file doesn't exist

Raw Object Operations

# Put raw bytes
s3.put_object('s3://bucket/file.json', b'{"key": "value"}', content_type='application/json')

# Get raw bytes
data = s3.get_object('s3://bucket/file.json')

Listing Objects

Basic Listing

# List all objects (recursive by default)
for obj in s3.ls('s3://my-bucket/'):
    print(f"{obj.key}: {obj.metadata.size} bytes")

With Prefix Filter

# List objects under a prefix
for obj in s3.ls('s3://my-bucket/data/2024/'):
    print(obj.uri)

Non-Recursive (Single Level)

# List only immediate files, not nested
for obj in s3.ls('s3://my-bucket/data/', recursive=False):
    print(obj.key)

Include Prefixes (Directories)

# Include "directory" entries
for obj in s3.ls('s3://my-bucket/data/', recursive=False, include_prefixes=True):
    if obj.metadata.is_prefix:
        print(f"Directory: {obj.key}")
    else:
        print(f"File: {obj.key} ({obj.metadata.size} bytes)")

S3Object Metadata

Objects returned by ls() include metadata:

obj = next(s3.ls('s3://my-bucket/'))

# Location
print(obj.key)      # 'path/to/file.parquet'
print(obj.bucket)   # 'my-bucket'
print(obj.uri)      # 's3://my-bucket/path/to/file.parquet'

# Metadata
print(obj.metadata.size)                      # Size in bytes
print(obj.metadata.last_modified_timestamp_utc)  # datetime
print(obj.metadata.etag)                      # ETag hash
print(obj.metadata.storage_class)             # 'STANDARD', 'GLACIER', etc.
print(obj.metadata.is_prefix)                 # True for directories

# Convert metadata to dict
meta_dict = dict(obj.metadata)

Error Handling

from eftoolkit import S3FileSystem

s3 = S3FileSystem(...)

try:
    df = s3.read_df_from_parquet('s3://bucket/missing.parquet')
except FileNotFoundError:
    print("File does not exist")

try:
    s3.cp('s3://bucket/missing.parquet', 's3://bucket/dest.parquet')
except FileNotFoundError:
    print("Source file does not exist")

See Also