Visualizing how S3 deletes 1 billion objects with Athena and Rust
A few weeks ago I had the chance to delete 1 petabyte of data spread across 1 billion objects from S3. Well,
actually
940 million, but close enough to the click-baitable 1 billion. I thought it would be interesting challenge to try
and
visualize the execution of these deletions and possibly gain some insights into how
S3 Lifecycle Policies work under
the
hood.
The post below details how I generated the gif shown on this page using Athena and a custom Rust tool, including an
interesting bug I encountered with Athena along the way.
Lifecycle policy deletions in a versioned S3 bucket go through several phases. At first each key gets a
“Delete Marker” added to it. Delete
markers
are a special kind of object that hides the key from standard ListObjects calls, making keys appear like they are
no
longer present.
After a configurable amount of time all “noncurrent versions” of objects are expired,
which means all data that has a delete marker as its “current version” will be permanently erased. After this, S3
cleans up all “expired delete markers” - that is delete markers where all the data for the key has been deleted.
The visualization represents this flow with yellow, red and black
We want to represent each file as a pixel in a GIF image and change its colour as its state changes. To do this we
need two sets of data: the set of all keys in the bucket and the set of all state changes on the keys.
Once we have this data in an Athena table, we need to assign a pixel to every file in S3. Naively this could be
assumed to be as simple as:
select key, row_number(order by key) as unique_pixelfrom set_of_all_keysorder by key
However, this is kind of query is very slow to process when using Athena because it effectively means it cannot
parallelize the query across multiple workers. A simple approach to this is breaking the image down into
segements,
where the (segment, index) value uniquely identifies a pixel within the image. This allows Athena to process
each
segment in parallel.
The diagram shows how this works: the image is broken up into possibly uneven segments from left to right. Each
segment
is then broken down into individual pixels.
Once we have this, a given key can be identified by a (segment, index) tuple which we can then easily convert to a
stable pixel x/y coordinate.
To start with this we want to find a suitable “prefix” we will use to group our rows together. The prefix should
ideally be evenly distributed. You could split the key and take the first N characters of a specific part of the key,
as depending on how your keys are generated this may result in a more evenly distributed set of prefixes.
In the example below we’re taking the first 2 characters of the 4th part of the bucket key:
-- Split /foo/bar/baz into [/foo, /bar, /b], then join it back into a string to form the prefixselect array_join(slice(split(key, '/'), 1, 2) || ARRAY[SUBSTR(split(key, '/')[3], 1, 1)], '/')from...
However below we just use the first 5 characters for simplicity:
CREATE TABLE keys_with_prefix ASSELECT substr(key, 1, 5) as prefix, keyFROM s3_inventories oWHERE is_delete_marker = false-- Make sure this is a snapshot taken before the lifecycle-- rules are appliedAND dt = '2022-09-02-00-00'AND bucket = 'target-bucket';
prefix
key
foo/b
foo/bar
foo/b
foo/baz
foo/q
foo/qux
Now we have our set of keys and their associated prefixes we need to associate numeric (segment, index) values with
each of them. Because the set of unique prefixes is much smaller than the set of keys, we are able to use a simple
row_number() window function to order them. We then end up with (key, segment, index):
CREATE TABLE key_prefix_segment AS-- Select the set of distinct prefixesWITH distinct_prefixes AS (select distinct prefix FROM keys_with_prefix),-- Sort the smaller number of prefixes and add the row_number to them.-- This produces an ordered set of (prefix, incrementing counter)prefix_count AS (select distinct_prefixes.prefix as prefix,row_number() OVER (ORDER BY distinct_prefixes.prefix ASC) as segmentFROM distinct_prefixes)-- Select each key, the segment it belongs to and the row number within-- the segment.SELECT o.key,prefix_count.segment,row_number() OVER ( -- Use rank here in case of duplicate keysPARTITION BY o.prefix ORDER BY o.prefix ASC, o.key ASC) as indexFROM keys_with_prefix oINNER JOIN prefix_count ON (o.prefix = prefix_count.prefix);
key
segment
index
foo/bar
1
1
foo/baz
1
2
foo/qux
2
1
Now that we have this, we can reduce this down into a small set of we can convert these into a much smaller set of
(segment, max_index), which form basically a bounding box within the image. This will come in handy later when we
start generating the image:
CREATE TABLE ordered_segment_intervals WITH (format = 'JSON') ASselect segment, max(index) as indexfrom key_prefix_segmentgroup by 1order by 1;
segment
index
1
2
2
1
Now we have each key in the bucket mapped to a (segment, index) tuple we can parse our access logs to produce a
stream of events. S3 access logs are delivered as uncompressed text files, so they can be a bit expensive to query. S3
will log a S3.CREATE.DELETEMARKER and S3.EXPIRE.OBJECT event for lifecycle executions, so we filter for those
requests and produce a simple table of requests against keys:
CREATE TABLE parsed_access_logs AS-- Access logs datetime fields need to be parsedSELECT parse_datetime(requestdatetime, 'dd/MMM/yyyy:HH:mm:ss Z') as datetime,(caseWHEN operation = 'S3.EXPIRE.OBJECT' then 'expire'ELSE 'delete'END) as operation,url_decode(key) as key -- And the keys need to be decodedFROM "s3_access_logs"-- Only select the operations we care aboutWHERE operation IN ('S3.CREATE.DELETEMARKER', 'S3.EXPIRE.OBJECT')AND bucket = 'target-bucket'
datetime
operation
key
2022-09-10 01:00
delete
foo/bar
2022-09-10 01:00
delete
foo/baz
2022-09-10 02:00
delete
foo/qux
2022-09-11 00:00
expire
foo/bar
2022-09-12 01:00
expire
foo/baz
2022-09-12 01:00
expire
foo/qux
And now we have our request logs and (key, segment, index) mapping we can combine them all together to produce a table
of events, segments and the indexes that those events operated on. We can reduce and increase the granularity of these
events (and thus the image) by adjusting the size of the bucket that all events are grouped into:
CREATE TABLE bucket_s3_events_grouped WITH (format = 'JSON') AS-- Truncate the event time into a group. Can be hours, minutes, etc.select date_trunc('hour', datetime) as bucket,operation,i.segment,-- Create an array of all *numbered* items within the segment-- that the given operation was applied to, within the "bucket".array_agg(i.index order by i.index) as indexesfrom parsed_access_logs lINNER JOIN key_prefix_segment i ON (i.key = i.key)group by 1, 2, 3order by 1, 2, 3
bucket
operation
segment
indexes
2022-09-10 01:00
delete
1
[1, 2]
2022-09-10 02:00
delete
2
[1]
2022-10-11 00:00
expire
1
[1]
2022-10-12 01:00
expire
1
[2]
2022-10-12 01:00
expire
2
[1]
Hurray! With this we have all the data we need to build the visualization!
It seems like Athena outputted the end of the 22:00:00 group, and the start of the 23:00:00 group, before it had
finished writing the 22:00:00 group. Very nnnoying, but it’s ok to sort locally with jq.
We’re reading JSON files that Athena produces, so the obvious way to do this is to use Serde. We
just need to define a few structures that derive from Deserialize and we’re good to go:
Once we’ve done that we can define our State, where we hold information about all the events. I chose to store the
state of all files in a single contiguous vector, which worked out to use only a gigabyte of memory in total. We then
store the smaller (segment, max(index)) bounding boxes in offsets, which means that given a (segment, index) tuple
we can compute the pixel by indexing into offsets and adding index to the returned value. This gets us an index
into the files vec:
#[derive(Clone, Debug, Eq, PartialEq)]enum FileState { // File exists Present, // Key is deleted DeleteMarker, // File completely deleted Expired, // The delete marker is gone DeleteMarkerDeleted,}struct State { offsets: Vec<usize>, files: Vec<FileState>}impl State { fn set_item(&mut self, segment: usize, index: usize, state: FileState) { let offset = self.offsets[segment - 1]; let idx = offset + index - 1; self.files[idx] = state }}
And then we can have some logic to turn our State into an image using the image crate.
This was the slowest part of the process
use image::{Rgb, RgbImage};impl State { fn get_frame(&self, image_size: u32) -> RgbImage { // The slowest part of the whole shebang. image::ImageBuffer::from_fn(image_size, image_size, |x, y| { let row_idx = y * image_size; let idx = row_idx + x; match self.files.get(idx as usize) { None => Rgb([0, 0, 0]), Some(v) => turn_state_into_rgb(v), } }); }}
And then we can start loading our events! As Athena outputs multiple (sorted 😅) files, we can create a serde iterator
from each of the files and use kmerge_by from the awesome itertools crate to
yield them in a sorted fashion! We then group the events by the bucket (the time period when the events happened).
Being able to do something like this feels very ‘python-y’ and it’s one of the reasons I love working with Rust.
use itertools::Itertools;fn main() { let state = State::new(); let mut event_iterators = vec![]; let events_files = fs::read_dir("some-event-dir")?; for event in events_files { let file = File::open(event?.path())?; let reader = BufReader::new(GzDecoder::new(file)); let event_lines = serde_json::Deserializer::from_reader(reader) .into_iter::<Event>(); event_iterators.push(Box::new(event_lines)); } let items = event_iterators .into_iter() .kmerge_by(|a, b| a.bucket < b.bucket) .group_by(|e| e.bucket); for (group, events) in items.into_iter() { println!("Group: {}", group); for event in events { state.update(event); } let image = state.get_frame(1_000); image.save(format!("images/{}.png", group))?; }}
Once we have our stream of sorted events being fed to us as simple Rust structs, we can manipulate our state:
impl State { fn update(&self, event: Event) { let current_state = self.get_state(event.bucket, event.index); // Do some logic here to transition objects. i.e if the object is present // and the event is 'delete', then it is now FileState::Deleted. let new_state = compute_new_state(event, current_state); self.set_item(event.bucket, event.index, new_state); }}
And that is it! Now we have a directory full of PNG images that we can create a gif from using imagemagick:
This was a great excuse to brush up on my SQL skills, learn more about Athena and get the chance to play with some Rust
imaging libraries.
As for S3? The gif clearly shows that each of the underlying storage partitions executes the lifecycle policies
independently, and that these partitions are a function of the object key.
In total it took about 6 days to delete all the objects.