Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
8c7ca45
Add executor support
jterapin Oct 7, 2025
c21969a
Add changelog entry
jterapin Oct 7, 2025
39ecf0a
Update TM with executor changes
jterapin Oct 7, 2025
a3f2b9f
Remove thread count support from MPU
jterapin Oct 7, 2025
3156f7c
Update Object usage of executor
jterapin Oct 7, 2025
84c9966
Add documentation/remove unused methods from DefaultExecutor
jterapin Oct 8, 2025
8e16a3b
Add Default Executor specs
jterapin Oct 8, 2025
db1cb62
Update TM docs and impl
jterapin Oct 8, 2025
f907c3b
Update streaming MPU to use executor
jterapin Oct 9, 2025
7cb940a
More MP Stream updates
jterapin Oct 9, 2025
4003536
Update specs
jterapin Oct 9, 2025
7dddda9
Update interfaces
jterapin Oct 9, 2025
481f198
Update specs
jterapin Oct 9, 2025
88bf44a
Update changelog
jterapin Oct 9, 2025
c1a25cd
Minor updates
jterapin Oct 9, 2025
7522a16
Fix failing specs
jterapin Oct 9, 2025
89cffe7
Merge branch 'version-3' into s3-executor-support
jterapin Oct 10, 2025
9eea233
Feedback - address sleep in specs
jterapin Oct 10, 2025
75b0d96
Feedback - update method name for cleanup_team_file
jterapin Oct 10, 2025
ad943ee
Feedback - wrap checksum callback
jterapin Oct 10, 2025
f1fc86a
Feedback - update method name in MPU
jterapin Oct 10, 2025
09eae68
Feedback - streamline handling of progress callbacks
jterapin Oct 10, 2025
e824de0
Feedback - streamline docs
jterapin Oct 10, 2025
c073349
Merge branch 'version-3' into s3-executor-support
jterapin Oct 13, 2025
cd91eb7
Feedback - streamline opts
jterapin Oct 13, 2025
abf78d6
Feedback - remove sleep from specs when possible
jterapin Oct 13, 2025
04a287f
Feedback - update to use 10 threads only
jterapin Oct 13, 2025
54b9add
Add directory features
jterapin Oct 13, 2025
ca6c2ae
Add temp changelog entry
jterapin Oct 13, 2025
c9bf8ed
Minor updates
jterapin Oct 13, 2025
5c6caa7
Improve directory uploader
jterapin Oct 13, 2025
7eafc7c
Update uploader
jterapin Oct 13, 2025
d7d5738
Remove keyword args
jterapin Oct 14, 2025
845aa13
Add documentation
jterapin Oct 14, 2025
5201f35
Merge version-3
jterapin Dec 31, 2025
f9e0758
Refactor upload directory
jterapin Dec 31, 2025
bacc68d
Minor refactors to uploader
jterapin Dec 31, 2025
51b76dc
Add upload test holders
jterapin Dec 31, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions gems/aws-sdk-s3/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
Unreleased Changes
------------------

* Feature - TODO

1.209.0 (2025-12-23)
------------------

Expand Down
10 changes: 9 additions & 1 deletion gems/aws-sdk-s3/lib/aws-sdk-s3/customizations.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,23 @@ module S3
autoload :Encryption, 'aws-sdk-s3/encryption'
autoload :EncryptionV2, 'aws-sdk-s3/encryption_v2'
autoload :EncryptionV3, 'aws-sdk-s3/encryption_v3'
autoload :LegacySigner, 'aws-sdk-s3/legacy_signer'

# transfer manager + multipart upload/download utilities
autoload :DefaultExecutor, 'aws-sdk-s3/default_executor'
autoload :FilePart, 'aws-sdk-s3/file_part'
autoload :DefaultExecutor, 'aws-sdk-s3/default_executor'
autoload :FileUploader, 'aws-sdk-s3/file_uploader'
autoload :FileDownloader, 'aws-sdk-s3/file_downloader'
autoload :LegacySigner, 'aws-sdk-s3/legacy_signer'
autoload :MultipartDownloadError, 'aws-sdk-s3/multipart_download_error'
autoload :MultipartFileUploader, 'aws-sdk-s3/multipart_file_uploader'
autoload :MultipartStreamUploader, 'aws-sdk-s3/multipart_stream_uploader'
autoload :MultipartUploadError, 'aws-sdk-s3/multipart_upload_error'
autoload :DirectoryProgress, 'aws-sdk-s3/directory_progress'
autoload :DirectoryDownloadError, 'aws-sdk-s3/directory_download_error'
autoload :DirectoryDownloader, '.aws-sdk-s3/directory_downloader'
autoload :DirectoryUploadError, 'aws-sdk-s3/directory_upload_error'
autoload :DirectoryUploader, 'aws-sdk-s3/directory_uploader'
autoload :ObjectCopier, 'aws-sdk-s3/object_copier'
autoload :ObjectMultipartCopier, 'aws-sdk-s3/object_multipart_copier'
autoload :PresignedPost, 'aws-sdk-s3/presigned_post'
Expand Down
16 changes: 16 additions & 0 deletions gems/aws-sdk-s3/lib/aws-sdk-s3/directory_download_error.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# frozen_string_literal: true

module Aws
module S3
# Raised when DirectoryDownloader fails to download objects from S3 bucket
class DirectoryDownloadError < StandardError
def initialize(message, errors = [])
@errors = errors
super(message)
end

# @return [Array<StandardError>] The list of errors encountered when downloading objects
attr_reader :errors
end
end
end
162 changes: 162 additions & 0 deletions gems/aws-sdk-s3/lib/aws-sdk-s3/directory_downloader.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
# frozen_string_literal: true

module Aws
module S3
# @api private
class DirectoryDownloader
def initialize(options = {})
@client = options[:client]
@executor = options[:executor]
@abort_requested = false
@mutex = Mutex.new
end

attr_reader :abort_requested

def download(destination, bucket:, **options)
if File.exist?(destination)
raise ArgumentError, 'invalid destination, expected a directory' unless File.directory?(destination)
else
FileUtils.mkdir_p(destination)
end

download_opts = build_download_opts(destination, bucket, options.dup)
downloader = FileDownloader.new(client: @client, executor: @executor)
producer = ObjectProducer.new(download_opts.merge(client: @client, directory_downloader: self))
downloads, errors = process_download_queue(producer, downloader, download_opts)
build_result(downloads, errors)
ensure
@abort_requested = false
end

private

def request_abort
@mutex.synchronize { @abort_requested = true }
end
def build_download_opts(destination, bucket, opts)
{
destination: destination,
bucket: bucket,
s3_prefix: opts.delete(:s3_prefix),
ignore_failure: opts.delete(:ignore_failure) || false,
filter_callback: opts.delete(:filter_callback),
progress_callback: opts.delete(:progress_callback)
}
end

def build_result(download_count, errors)
if @abort_requested
msg = "directory download failed: #{errors.map(&:message).join('; ')}"
raise DirectoryDownloadError.new(msg, errors)
else
{
completed_downloads: [download_count - errors.count, 0].max,
failed_downloads: errors.count,
errors: errors.any? ? errors : nil
}.compact
end
end

def handle_error(executor, opts)
return if opts[:ignore_failure]

request_abort
executor.kill
end

def process_download_queue(producer, downloader, opts)
# Separate executor for lightweight queuing tasks, avoiding interference with main @executor lifecycle
queue_executor = DefaultExecutor.new
progress = DirectoryProgress.new(opts[:progress_callback]) if opts[:progress_callback]
download_attempts = 0
errors = []
begin
producer.each do |object|
break if @abort_requested

download_attempts += 1
queue_executor.post(object) do |o|
dir_path = File.dirname(o[:path])
FileUtils.mkdir_p(dir_path) unless dir_path == opts[:destination] || Dir.exist?(dir_path)

downloader.download(o[:path], bucket: opts[:bucket], key: o[:key])
progress&.call(File.size(o[:path]))
rescue StandardError => e
errors << e
handle_error(queue_executor, opts)
end
end
rescue StandardError => e
errors << e
handle_error(queue_executor, opts)
end
queue_executor.shutdown
[download_attempts, errors]
end

# @api private
class ObjectProducer
include Enumerable

DEFAULT_QUEUE_SIZE = 100

def initialize(options = {})
@destination_dir = options[:destination]
@client = options[:client]
@bucket = options[:bucket]
@s3_prefix = options[:s3_prefix]
@filter_callback = options[:filter_callback]
@directory_downloader = options[:directory_downloader]
@object_queue = SizedQueue.new(DEFAULT_QUEUE_SIZE)
end

def each
producer_thread = Thread.new do
stream_objects
ensure
@object_queue << :done
end

# Yield objects from internal queue
while (object = @object_queue.shift) != :done
break if @directory_downloader.abort_requested

yield object
end
ensure
producer_thread.join
end

private

def build_object_entry(key)
{ path: File.join(@destination_dir, normalize_key(key)), key: key }
end

def stream_objects(continuation_token: nil)
resp = @client.list_objects_v2(bucket: @bucket, prefix: @s3_prefix, continuation_token: continuation_token)
resp.contents.each do |o|
break if @directory_downloader.abort_requested
next if o.key.end_with?('/') && o.size.zero?
next unless include_object?(o.key)

@object_queue << build_object_entry(o.key)
end
stream_objects(continuation_token: resp.next_continuation_token) if resp.next_continuation_token
end

def include_object?(key)
return true unless @filter_callback

@filter_callback.call(key)
end

def normalize_key(key)
key = key.delete_prefix(@s3_prefix) if @s3_prefix
File::SEPARATOR == '/' ? key : key.tr('/', File::SEPARATOR)
end
end
end
end
end
24 changes: 24 additions & 0 deletions gems/aws-sdk-s3/lib/aws-sdk-s3/directory_progress.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# frozen_string_literal: true

module Aws
module S3
# @api private
class DirectoryProgress
def initialize(progress_callback)
@transferred_bytes = 0
@transferred_files = 0
@progress_callback = progress_callback
@mutex = Mutex.new
end

def call(bytes_transferred)
@mutex.synchronize do
@transferred_bytes += bytes_transferred
@transferred_files += 1

@progress_callback.call(@transferred_bytes, @transferred_files)
end
end
end
end
end
16 changes: 16 additions & 0 deletions gems/aws-sdk-s3/lib/aws-sdk-s3/directory_upload_error.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# frozen_string_literal: true

module Aws
module S3
# Raised when DirectoryUploader fails to upload files to S3 bucket
class DirectoryUploadError < StandardError
def initialize(message, errors = [])
@errors = errors
super(message)
end

# @return [Array<StandardError>] The list of errors encountered when uploading files
attr_reader :errors
end
end
end
Loading
Loading