1 | /// This module implements the "container" file format that `measureme` uses for |
2 | /// storing things on disk. The format supports storing three independent |
3 | /// streams of data: one for events, one for string data, and one for string |
4 | /// index data (in theory it could support an arbitrary number of separate |
5 | /// streams but three is all we need). The data of each stream is split into |
6 | /// "pages", where each page has a small header designating what kind of |
7 | /// data it is (i.e. event, string data, or string index), and the length of |
8 | /// the page. |
9 | /// |
10 | /// Pages of different kinds can be arbitrarily interleaved. The headers allow |
11 | /// for reconstructing each of the streams later on. An example file might thus |
12 | /// look like this: |
13 | /// |
14 | /// ```ignore |
15 | /// | file header | page (events) | page (string data) | page (events) | page (string index) | |
16 | /// ``` |
17 | /// |
18 | /// The exact encoding of a page is: |
19 | /// |
20 | /// | byte slice | contents | |
21 | /// |-------------------------|-----------------------------------------| |
22 | /// | &[0 .. 1] | page tag | |
23 | /// | &[1 .. 5] | page size as little endian u32 | |
24 | /// | &[5 .. (5 + page_size)] | page contents (exactly page_size bytes) | |
25 | /// |
26 | /// A page is immediately followed by the next page, without any padding. |
27 | use parking_lot::Mutex; |
28 | use rustc_hash::FxHashMap; |
29 | use std::cmp::min; |
30 | use std::convert::TryInto; |
31 | use std::error::Error; |
32 | use std::fmt::Debug; |
33 | use std::fs; |
34 | use std::io::Write; |
35 | use std::sync::Arc; |
36 | |
37 | const MAX_PAGE_SIZE: usize = 256 * 1024; |
38 | |
39 | /// The number of bytes we consider enough to warrant their own page when |
40 | /// deciding whether to flush a partially full buffer. Actual pages may need |
41 | /// to be smaller, e.g. when writing the tail of the data stream. |
42 | const MIN_PAGE_SIZE: usize = MAX_PAGE_SIZE / 2; |
43 | |
44 | #[derive (Copy, Clone, Debug, PartialEq, Eq, Hash)] |
45 | #[repr (u8)] |
46 | pub enum PageTag { |
47 | Events = 0, |
48 | StringData = 1, |
49 | StringIndex = 2, |
50 | } |
51 | |
52 | impl std::convert::TryFrom<u8> for PageTag { |
53 | type Error = String; |
54 | |
55 | fn try_from(value: u8) -> Result<Self, Self::Error> { |
56 | match value { |
57 | 0 => Ok(PageTag::Events), |
58 | 1 => Ok(PageTag::StringData), |
59 | 2 => Ok(PageTag::StringIndex), |
60 | _ => Err(format!("Could not convert byte ` {}` to PageTag." , value)), |
61 | } |
62 | } |
63 | } |
64 | |
65 | /// An address within a data stream. Each data stream has its own address space, |
66 | /// i.e. the first piece of data written to the events stream will have |
67 | /// `Addr(0)` and the first piece of data written to the string data stream |
68 | /// will *also* have `Addr(0)`. |
69 | // |
70 | // TODO: Evaluate if it makes sense to add a type tag to `Addr` in order to |
71 | // prevent accidental use of `Addr` values with the wrong address space. |
72 | #[derive (Clone, Copy, Eq, PartialEq, Debug)] |
73 | pub struct Addr(pub u64); |
74 | |
75 | impl Addr { |
76 | pub fn as_usize(self) -> usize { |
77 | self.0 as usize |
78 | } |
79 | } |
80 | |
81 | #[derive (Debug)] |
82 | pub struct SerializationSink { |
83 | shared_state: SharedState, |
84 | data: Mutex<SerializationSinkInner>, |
85 | page_tag: PageTag, |
86 | } |
87 | |
88 | pub struct SerializationSinkBuilder(SharedState); |
89 | |
90 | impl SerializationSinkBuilder { |
91 | pub fn new_from_file(file: fs::File) -> Result<Self, Box<dyn Error + Send + Sync>> { |
92 | Ok(Self(SharedState(Arc::new(data:Mutex::new( |
93 | val:BackingStorage::File(file), |
94 | ))))) |
95 | } |
96 | |
97 | pub fn new_in_memory() -> SerializationSinkBuilder { |
98 | Self(SharedState(Arc::new(data:Mutex::new(val:BackingStorage::Memory( |
99 | Vec::new(), |
100 | ))))) |
101 | } |
102 | |
103 | pub fn new_sink(&self, page_tag: PageTag) -> SerializationSink { |
104 | SerializationSink { |
105 | data: Mutex::new(val:SerializationSinkInner { |
106 | buffer: Vec::with_capacity(MAX_PAGE_SIZE), |
107 | addr: 0, |
108 | }), |
109 | shared_state: self.0.clone(), |
110 | page_tag, |
111 | } |
112 | } |
113 | } |
114 | |
115 | /// The `BackingStorage` is what the data gets written to. Usually that is a |
116 | /// file but for testing purposes it can also be an in-memory vec of bytes. |
117 | #[derive (Debug)] |
118 | enum BackingStorage { |
119 | File(fs::File), |
120 | Memory(Vec<u8>), |
121 | } |
122 | |
123 | impl Write for BackingStorage { |
124 | #[inline ] |
125 | fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> { |
126 | match *self { |
127 | BackingStorage::File(ref mut file: &mut File) => file.write(buf), |
128 | BackingStorage::Memory(ref mut vec: &mut Vec) => vec.write(buf), |
129 | } |
130 | } |
131 | |
132 | fn flush(&mut self) -> std::io::Result<()> { |
133 | match *self { |
134 | BackingStorage::File(ref mut file: &mut File) => file.flush(), |
135 | BackingStorage::Memory(_) => { |
136 | // Nothing to do |
137 | Ok(()) |
138 | } |
139 | } |
140 | } |
141 | } |
142 | |
143 | /// This struct allows to treat `SerializationSink` as `std::io::Write`. |
144 | pub struct StdWriteAdapter<'a>(&'a SerializationSink); |
145 | |
146 | impl<'a> Write for StdWriteAdapter<'a> { |
147 | fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> { |
148 | self.0.write_bytes_atomic(bytes:buf); |
149 | Ok(buf.len()) |
150 | } |
151 | |
152 | fn flush(&mut self) -> std::io::Result<()> { |
153 | let mut data: MutexGuard<'_, RawMutex, …> = self.0.data.lock(); |
154 | let SerializationSinkInner { |
155 | ref mut buffer: &mut Vec, |
156 | addr: _, |
157 | } = *data; |
158 | |
159 | // First flush the local buffer. |
160 | self.0.flush(buffer); |
161 | |
162 | // Then flush the backing store. |
163 | self.0.shared_state.0.lock().flush()?; |
164 | |
165 | Ok(()) |
166 | } |
167 | } |
168 | |
169 | #[derive (Debug)] |
170 | struct SerializationSinkInner { |
171 | buffer: Vec<u8>, |
172 | addr: u64, |
173 | } |
174 | |
175 | /// This state is shared between all `SerializationSink`s writing to the same |
176 | /// backing storage (e.g. the same file). |
177 | #[derive (Clone, Debug)] |
178 | struct SharedState(Arc<Mutex<BackingStorage>>); |
179 | |
180 | impl SharedState { |
181 | /// Copies out the contents of all pages with the given tag and |
182 | /// concatenates them into a single byte vec. This method is only meant to |
183 | /// be used for testing and will panic if the underlying backing storage is |
184 | /// a file instead of in memory. |
185 | fn copy_bytes_with_page_tag(&self, page_tag: PageTag) -> Vec<u8> { |
186 | let data: MutexGuard<'_, RawMutex, …> = self.0.lock(); |
187 | let data: &Vec = match *data { |
188 | BackingStorage::File(_) => panic!(), |
189 | BackingStorage::Memory(ref data: &Vec) => data, |
190 | }; |
191 | |
192 | split_streams(data).remove(&page_tag).unwrap_or(default:Vec::new()) |
193 | } |
194 | } |
195 | |
196 | /// This function reconstructs the individual data streams from their paged |
197 | /// version. |
198 | /// |
199 | /// For example, if `E` denotes the page header of an events page, `S` denotes |
200 | /// the header of a string data page, and lower case letters denote page |
201 | /// contents then a paged stream could look like: |
202 | /// |
203 | /// ```ignore |
204 | /// s = Eabcd_Sopq_Eef_Eghi_Srst |
205 | /// ``` |
206 | /// |
207 | /// and `split_streams` would result in the following set of streams: |
208 | /// |
209 | /// ```ignore |
210 | /// split_streams(s) = { |
211 | /// events: [abcdefghi], |
212 | /// string_data: [opqrst], |
213 | /// } |
214 | /// ``` |
215 | pub fn split_streams(paged_data: &[u8]) -> FxHashMap<PageTag, Vec<u8>> { |
216 | let mut result: FxHashMap<PageTag, Vec<u8>> = FxHashMap::default(); |
217 | |
218 | let mut pos: usize = 0; |
219 | while pos < paged_data.len() { |
220 | let tag: PageTag = TryInto::try_into(self:paged_data[pos]).unwrap(); |
221 | let page_size: usize = |
222 | u32::from_le_bytes(paged_data[pos + 1..pos + 5].try_into().unwrap()) as usize; |
223 | |
224 | assert!(page_size > 0); |
225 | |
226 | result&mut Vec |
227 | .entry(key:tag) |
228 | .or_default() |
229 | .extend_from_slice(&paged_data[pos + 5..pos + 5 + page_size]); |
230 | |
231 | pos += page_size + 5; |
232 | } |
233 | |
234 | result |
235 | } |
236 | |
237 | impl SerializationSink { |
238 | /// Writes `bytes` as a single page to the shared backing storage. The |
239 | /// method will first write the page header (consisting of the page tag and |
240 | /// the number of bytes in the page) and then the page contents |
241 | /// (i.e. `bytes`). |
242 | fn write_page(&self, bytes: &[u8]) { |
243 | if bytes.len() > 0 { |
244 | // We explicitly don't assert `bytes.len() >= MIN_PAGE_SIZE` because |
245 | // `MIN_PAGE_SIZE` is just a recommendation and the last page will |
246 | // often be smaller than that. |
247 | assert!(bytes.len() <= MAX_PAGE_SIZE); |
248 | |
249 | let mut file = self.shared_state.0.lock(); |
250 | |
251 | file.write_all(&[self.page_tag as u8]).unwrap(); |
252 | |
253 | let page_size: [u8; 4] = (bytes.len() as u32).to_le_bytes(); |
254 | file.write_all(&page_size).unwrap(); |
255 | file.write_all(&bytes[..]).unwrap(); |
256 | } |
257 | } |
258 | |
259 | /// Flushes `buffer` by writing its contents as a new page to the backing |
260 | /// storage and then clearing it. |
261 | fn flush(&self, buffer: &mut Vec<u8>) { |
262 | self.write_page(&buffer[..]); |
263 | buffer.clear(); |
264 | } |
265 | |
266 | /// Creates a copy of all data written so far. This method is meant to be |
267 | /// used for writing unit tests. It will panic if the underlying |
268 | /// `BackingStorage` is a file. |
269 | pub fn into_bytes(mut self) -> Vec<u8> { |
270 | // Swap out the contains of `self` with something that can safely be |
271 | // dropped without side effects. |
272 | let mut data = Mutex::new(SerializationSinkInner { |
273 | buffer: Vec::new(), |
274 | addr: 0, |
275 | }); |
276 | std::mem::swap(&mut self.data, &mut data); |
277 | |
278 | // Extract the data from the mutex. |
279 | let SerializationSinkInner { |
280 | ref mut buffer, |
281 | addr: _, |
282 | } = data.into_inner(); |
283 | |
284 | // Make sure we write the current contents of the buffer to the |
285 | // backing storage before proceeding. |
286 | self.flush(buffer); |
287 | |
288 | self.shared_state.copy_bytes_with_page_tag(self.page_tag) |
289 | } |
290 | |
291 | /// Atomically writes `num_bytes` of data to this `SerializationSink`. |
292 | /// Atomic means the data is guaranteed to be written as a contiguous range |
293 | /// of bytes. |
294 | /// |
295 | /// The buffer provided to the `write` callback is guaranteed to be of size |
296 | /// `num_bytes` and `write` is supposed to completely fill it with the data |
297 | /// to be written. |
298 | /// |
299 | /// The return value is the address of the data written and can be used to |
300 | /// refer to the data later on. |
301 | pub fn write_atomic<W>(&self, num_bytes: usize, write: W) -> Addr |
302 | where |
303 | W: FnOnce(&mut [u8]), |
304 | { |
305 | if num_bytes > MAX_PAGE_SIZE { |
306 | let mut bytes = vec![0u8; num_bytes]; |
307 | write(&mut bytes[..]); |
308 | return self.write_bytes_atomic(&bytes[..]); |
309 | } |
310 | |
311 | let mut data = self.data.lock(); |
312 | let SerializationSinkInner { |
313 | ref mut buffer, |
314 | ref mut addr, |
315 | } = *data; |
316 | |
317 | if buffer.len() + num_bytes > MAX_PAGE_SIZE { |
318 | self.flush(buffer); |
319 | assert!(buffer.is_empty()); |
320 | } |
321 | |
322 | let curr_addr = *addr; |
323 | |
324 | let buf_start = buffer.len(); |
325 | let buf_end = buf_start + num_bytes; |
326 | buffer.resize(buf_end, 0u8); |
327 | write(&mut buffer[buf_start..buf_end]); |
328 | |
329 | *addr += num_bytes as u64; |
330 | |
331 | Addr(curr_addr) |
332 | } |
333 | |
334 | /// Atomically writes the data in `bytes` to this `SerializationSink`. |
335 | /// Atomic means the data is guaranteed to be written as a contiguous range |
336 | /// of bytes. |
337 | /// |
338 | /// This method may perform better than `write_atomic` because it may be |
339 | /// able to skip the sink's internal buffer. Use this method if the data to |
340 | /// be written is already available as a `&[u8]`. |
341 | /// |
342 | /// The return value is the address of the data written and can be used to |
343 | /// refer to the data later on. |
344 | pub fn write_bytes_atomic(&self, bytes: &[u8]) -> Addr { |
345 | // For "small" data we go to the buffered version immediately. |
346 | if bytes.len() <= 128 { |
347 | return self.write_atomic(bytes.len(), |sink| { |
348 | sink.copy_from_slice(bytes); |
349 | }); |
350 | } |
351 | |
352 | let mut data = self.data.lock(); |
353 | let SerializationSinkInner { |
354 | ref mut buffer, |
355 | ref mut addr, |
356 | } = *data; |
357 | |
358 | let curr_addr = Addr(*addr); |
359 | *addr += bytes.len() as u64; |
360 | |
361 | let mut bytes_left = bytes; |
362 | |
363 | // Do we have too little data in the buffer? If so, fill up the buffer |
364 | // to the minimum page size. |
365 | if buffer.len() < MIN_PAGE_SIZE { |
366 | let num_bytes_to_take = min(MIN_PAGE_SIZE - buffer.len(), bytes_left.len()); |
367 | buffer.extend_from_slice(&bytes_left[..num_bytes_to_take]); |
368 | bytes_left = &bytes_left[num_bytes_to_take..]; |
369 | } |
370 | |
371 | if bytes_left.is_empty() { |
372 | return curr_addr; |
373 | } |
374 | |
375 | // Make sure we flush the buffer before writing out any other pages. |
376 | self.flush(buffer); |
377 | |
378 | for chunk in bytes_left.chunks(MAX_PAGE_SIZE) { |
379 | if chunk.len() == MAX_PAGE_SIZE { |
380 | // This chunk has the maximum size. It might or might not be the |
381 | // last one. In either case we want to write it to disk |
382 | // immediately because there is no reason to copy it to the |
383 | // buffer first. |
384 | self.write_page(chunk); |
385 | } else { |
386 | // This chunk is less than the chunk size that we requested, so |
387 | // it must be the last one. If it is big enough to warrant its |
388 | // own page, we write it to disk immediately. Otherwise, we copy |
389 | // it to the buffer. |
390 | if chunk.len() >= MIN_PAGE_SIZE { |
391 | self.write_page(chunk); |
392 | } else { |
393 | debug_assert!(buffer.is_empty()); |
394 | buffer.extend_from_slice(chunk); |
395 | } |
396 | } |
397 | } |
398 | |
399 | curr_addr |
400 | } |
401 | |
402 | pub fn as_std_write<'a>(&'a self) -> impl Write + 'a { |
403 | StdWriteAdapter(self) |
404 | } |
405 | } |
406 | |
407 | impl Drop for SerializationSink { |
408 | fn drop(&mut self) { |
409 | let mut data: MutexGuard<'_, RawMutex, …> = self.data.lock(); |
410 | let SerializationSinkInner { |
411 | ref mut buffer: &mut Vec, |
412 | addr: _, |
413 | } = *data; |
414 | |
415 | self.flush(buffer); |
416 | } |
417 | } |
418 | |
419 | #[cfg (test)] |
420 | mod tests { |
421 | use super::*; |
422 | |
423 | // This function writes `chunk_count` byte-slices of size `chunk_size` to |
424 | // three `SerializationSinks` that all map to the same underlying stream, |
425 | // so we get interleaved pages with different tags. |
426 | // It then extracts the data out again and asserts that it is the same as |
427 | // has been written. |
428 | fn test_roundtrip<W>(chunk_size: usize, chunk_count: usize, write: W) |
429 | where |
430 | W: Fn(&SerializationSink, &[u8]) -> Addr, |
431 | { |
432 | let sink_builder = SerializationSinkBuilder::new_in_memory(); |
433 | let tags = [PageTag::Events, PageTag::StringData, PageTag::StringIndex]; |
434 | let expected_chunk: Vec<u8> = (0..chunk_size).map(|x| (x % 239) as u8).collect(); |
435 | |
436 | { |
437 | let sinks: Vec<SerializationSink> = |
438 | tags.iter().map(|&tag| sink_builder.new_sink(tag)).collect(); |
439 | |
440 | for chunk_index in 0..chunk_count { |
441 | let expected_addr = Addr((chunk_index * chunk_size) as u64); |
442 | for sink in sinks.iter() { |
443 | assert_eq!(write(sink, &expected_chunk[..]), expected_addr); |
444 | } |
445 | } |
446 | } |
447 | |
448 | let streams: Vec<Vec<u8>> = tags |
449 | .iter() |
450 | .map(|&tag| sink_builder.0.copy_bytes_with_page_tag(tag)) |
451 | .collect(); |
452 | |
453 | for stream in streams { |
454 | for chunk in stream.chunks(chunk_size) { |
455 | assert_eq!(chunk, expected_chunk); |
456 | } |
457 | } |
458 | } |
459 | |
460 | fn write_closure(sink: &SerializationSink, bytes: &[u8]) -> Addr { |
461 | sink.write_atomic(bytes.len(), |dest| dest.copy_from_slice(bytes)) |
462 | } |
463 | |
464 | fn write_slice(sink: &SerializationSink, bytes: &[u8]) -> Addr { |
465 | sink.write_bytes_atomic(bytes) |
466 | } |
467 | |
468 | // Creates two roundtrip tests, one using `SerializationSink::write_atomic` |
469 | // and one using `SerializationSink::write_bytes_atomic`. |
470 | macro_rules! mk_roundtrip_test { |
471 | ($name:ident, $chunk_size:expr, $chunk_count:expr) => { |
472 | mod $name { |
473 | use super::*; |
474 | |
475 | #[test] |
476 | fn write_atomic() { |
477 | test_roundtrip($chunk_size, $chunk_count, write_closure); |
478 | } |
479 | |
480 | #[test] |
481 | fn write_bytes_atomic() { |
482 | test_roundtrip($chunk_size, $chunk_count, write_slice); |
483 | } |
484 | } |
485 | }; |
486 | } |
487 | |
488 | mk_roundtrip_test!(small_data, 10, (90 * MAX_PAGE_SIZE) / 100); |
489 | mk_roundtrip_test!(huge_data, MAX_PAGE_SIZE * 10, 5); |
490 | |
491 | mk_roundtrip_test!(exactly_max_page_size, MAX_PAGE_SIZE, 10); |
492 | mk_roundtrip_test!(max_page_size_plus_one, MAX_PAGE_SIZE + 1, 10); |
493 | mk_roundtrip_test!(max_page_size_minus_one, MAX_PAGE_SIZE - 1, 10); |
494 | |
495 | mk_roundtrip_test!(exactly_min_page_size, MIN_PAGE_SIZE, 10); |
496 | mk_roundtrip_test!(min_page_size_plus_one, MIN_PAGE_SIZE + 1, 10); |
497 | mk_roundtrip_test!(min_page_size_minus_one, MIN_PAGE_SIZE - 1, 10); |
498 | } |
499 | |