| 1 | /// This module implements the "container" file format that `measureme` uses for |
| 2 | /// storing things on disk. The format supports storing three independent |
| 3 | /// streams of data: one for events, one for string data, and one for string |
| 4 | /// index data (in theory it could support an arbitrary number of separate |
| 5 | /// streams but three is all we need). The data of each stream is split into |
| 6 | /// "pages", where each page has a small header designating what kind of |
| 7 | /// data it is (i.e. event, string data, or string index), and the length of |
| 8 | /// the page. |
| 9 | /// |
| 10 | /// Pages of different kinds can be arbitrarily interleaved. The headers allow |
| 11 | /// for reconstructing each of the streams later on. An example file might thus |
| 12 | /// look like this: |
| 13 | /// |
| 14 | /// ```ignore |
| 15 | /// | file header | page (events) | page (string data) | page (events) | page (string index) | |
| 16 | /// ``` |
| 17 | /// |
| 18 | /// The exact encoding of a page is: |
| 19 | /// |
| 20 | /// | byte slice | contents | |
| 21 | /// |-------------------------|-----------------------------------------| |
| 22 | /// | &[0 .. 1] | page tag | |
| 23 | /// | &[1 .. 5] | page size as little endian u32 | |
| 24 | /// | &[5 .. (5 + page_size)] | page contents (exactly page_size bytes) | |
| 25 | /// |
| 26 | /// A page is immediately followed by the next page, without any padding. |
| 27 | use parking_lot::Mutex; |
| 28 | use rustc_hash::FxHashMap; |
| 29 | use std::cmp::min; |
| 30 | use std::convert::TryInto; |
| 31 | use std::error::Error; |
| 32 | use std::fmt::Debug; |
| 33 | use std::fs; |
| 34 | use std::io::Write; |
| 35 | use std::sync::Arc; |
| 36 | |
| 37 | const MAX_PAGE_SIZE: usize = 256 * 1024; |
| 38 | |
| 39 | /// The number of bytes we consider enough to warrant their own page when |
| 40 | /// deciding whether to flush a partially full buffer. Actual pages may need |
| 41 | /// to be smaller, e.g. when writing the tail of the data stream. |
| 42 | const MIN_PAGE_SIZE: usize = MAX_PAGE_SIZE / 2; |
| 43 | |
| 44 | #[derive (Copy, Clone, Debug, PartialEq, Eq, Hash)] |
| 45 | #[repr (u8)] |
| 46 | pub enum PageTag { |
| 47 | Events = 0, |
| 48 | StringData = 1, |
| 49 | StringIndex = 2, |
| 50 | } |
| 51 | |
| 52 | impl std::convert::TryFrom<u8> for PageTag { |
| 53 | type Error = String; |
| 54 | |
| 55 | fn try_from(value: u8) -> Result<Self, Self::Error> { |
| 56 | match value { |
| 57 | 0 => Ok(PageTag::Events), |
| 58 | 1 => Ok(PageTag::StringData), |
| 59 | 2 => Ok(PageTag::StringIndex), |
| 60 | _ => Err(format!("Could not convert byte ` {}` to PageTag." , value)), |
| 61 | } |
| 62 | } |
| 63 | } |
| 64 | |
| 65 | /// An address within a data stream. Each data stream has its own address space, |
| 66 | /// i.e. the first piece of data written to the events stream will have |
| 67 | /// `Addr(0)` and the first piece of data written to the string data stream |
| 68 | /// will *also* have `Addr(0)`. |
| 69 | // |
| 70 | // TODO: Evaluate if it makes sense to add a type tag to `Addr` in order to |
| 71 | // prevent accidental use of `Addr` values with the wrong address space. |
| 72 | #[derive (Clone, Copy, Eq, PartialEq, Debug)] |
| 73 | pub struct Addr(pub u64); |
| 74 | |
| 75 | impl Addr { |
| 76 | pub fn as_usize(self) -> usize { |
| 77 | self.0 as usize |
| 78 | } |
| 79 | } |
| 80 | |
| 81 | #[derive (Debug)] |
| 82 | pub struct SerializationSink { |
| 83 | shared_state: SharedState, |
| 84 | data: Mutex<SerializationSinkInner>, |
| 85 | page_tag: PageTag, |
| 86 | } |
| 87 | |
| 88 | pub struct SerializationSinkBuilder(SharedState); |
| 89 | |
| 90 | impl SerializationSinkBuilder { |
| 91 | pub fn new_from_file(file: fs::File) -> Result<Self, Box<dyn Error + Send + Sync>> { |
| 92 | Ok(Self(SharedState(Arc::new(data:Mutex::new( |
| 93 | val:BackingStorage::File(file), |
| 94 | ))))) |
| 95 | } |
| 96 | |
| 97 | pub fn new_in_memory() -> SerializationSinkBuilder { |
| 98 | Self(SharedState(Arc::new(data:Mutex::new(val:BackingStorage::Memory( |
| 99 | Vec::new(), |
| 100 | ))))) |
| 101 | } |
| 102 | |
| 103 | pub fn new_sink(&self, page_tag: PageTag) -> SerializationSink { |
| 104 | SerializationSink { |
| 105 | data: Mutex::new(val:SerializationSinkInner { |
| 106 | buffer: Vec::with_capacity(MAX_PAGE_SIZE), |
| 107 | addr: 0, |
| 108 | }), |
| 109 | shared_state: self.0.clone(), |
| 110 | page_tag, |
| 111 | } |
| 112 | } |
| 113 | } |
| 114 | |
| 115 | /// The `BackingStorage` is what the data gets written to. Usually that is a |
| 116 | /// file but for testing purposes it can also be an in-memory vec of bytes. |
| 117 | #[derive (Debug)] |
| 118 | enum BackingStorage { |
| 119 | File(fs::File), |
| 120 | Memory(Vec<u8>), |
| 121 | } |
| 122 | |
| 123 | impl Write for BackingStorage { |
| 124 | #[inline ] |
| 125 | fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> { |
| 126 | match *self { |
| 127 | BackingStorage::File(ref mut file: &mut File) => file.write(buf), |
| 128 | BackingStorage::Memory(ref mut vec: &mut Vec) => vec.write(buf), |
| 129 | } |
| 130 | } |
| 131 | |
| 132 | fn flush(&mut self) -> std::io::Result<()> { |
| 133 | match *self { |
| 134 | BackingStorage::File(ref mut file: &mut File) => file.flush(), |
| 135 | BackingStorage::Memory(_) => { |
| 136 | // Nothing to do |
| 137 | Ok(()) |
| 138 | } |
| 139 | } |
| 140 | } |
| 141 | } |
| 142 | |
| 143 | /// This struct allows to treat `SerializationSink` as `std::io::Write`. |
| 144 | pub struct StdWriteAdapter<'a>(&'a SerializationSink); |
| 145 | |
| 146 | impl<'a> Write for StdWriteAdapter<'a> { |
| 147 | fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> { |
| 148 | self.0.write_bytes_atomic(bytes:buf); |
| 149 | Ok(buf.len()) |
| 150 | } |
| 151 | |
| 152 | fn flush(&mut self) -> std::io::Result<()> { |
| 153 | let mut data: MutexGuard<'_, RawMutex, …> = self.0.data.lock(); |
| 154 | let SerializationSinkInner { |
| 155 | ref mut buffer: &mut Vec, |
| 156 | addr: _, |
| 157 | } = *data; |
| 158 | |
| 159 | // First flush the local buffer. |
| 160 | self.0.flush(buffer); |
| 161 | |
| 162 | // Then flush the backing store. |
| 163 | self.0.shared_state.0.lock().flush()?; |
| 164 | |
| 165 | Ok(()) |
| 166 | } |
| 167 | } |
| 168 | |
| 169 | #[derive (Debug)] |
| 170 | struct SerializationSinkInner { |
| 171 | buffer: Vec<u8>, |
| 172 | addr: u64, |
| 173 | } |
| 174 | |
| 175 | /// This state is shared between all `SerializationSink`s writing to the same |
| 176 | /// backing storage (e.g. the same file). |
| 177 | #[derive (Clone, Debug)] |
| 178 | struct SharedState(Arc<Mutex<BackingStorage>>); |
| 179 | |
| 180 | impl SharedState { |
| 181 | /// Copies out the contents of all pages with the given tag and |
| 182 | /// concatenates them into a single byte vec. This method is only meant to |
| 183 | /// be used for testing and will panic if the underlying backing storage is |
| 184 | /// a file instead of in memory. |
| 185 | fn copy_bytes_with_page_tag(&self, page_tag: PageTag) -> Vec<u8> { |
| 186 | let data: MutexGuard<'_, RawMutex, …> = self.0.lock(); |
| 187 | let data: &Vec = match *data { |
| 188 | BackingStorage::File(_) => panic!(), |
| 189 | BackingStorage::Memory(ref data: &Vec) => data, |
| 190 | }; |
| 191 | |
| 192 | split_streams(data).remove(&page_tag).unwrap_or(default:Vec::new()) |
| 193 | } |
| 194 | } |
| 195 | |
| 196 | /// This function reconstructs the individual data streams from their paged |
| 197 | /// version. |
| 198 | /// |
| 199 | /// For example, if `E` denotes the page header of an events page, `S` denotes |
| 200 | /// the header of a string data page, and lower case letters denote page |
| 201 | /// contents then a paged stream could look like: |
| 202 | /// |
| 203 | /// ```ignore |
| 204 | /// s = Eabcd_Sopq_Eef_Eghi_Srst |
| 205 | /// ``` |
| 206 | /// |
| 207 | /// and `split_streams` would result in the following set of streams: |
| 208 | /// |
| 209 | /// ```ignore |
| 210 | /// split_streams(s) = { |
| 211 | /// events: [abcdefghi], |
| 212 | /// string_data: [opqrst], |
| 213 | /// } |
| 214 | /// ``` |
| 215 | pub fn split_streams(paged_data: &[u8]) -> FxHashMap<PageTag, Vec<u8>> { |
| 216 | let mut result: FxHashMap<PageTag, Vec<u8>> = FxHashMap::default(); |
| 217 | |
| 218 | let mut pos: usize = 0; |
| 219 | while pos < paged_data.len() { |
| 220 | let tag: PageTag = TryInto::try_into(self:paged_data[pos]).unwrap(); |
| 221 | let page_size: usize = |
| 222 | u32::from_le_bytes(paged_data[pos + 1..pos + 5].try_into().unwrap()) as usize; |
| 223 | |
| 224 | assert!(page_size > 0); |
| 225 | |
| 226 | result&mut Vec |
| 227 | .entry(key:tag) |
| 228 | .or_default() |
| 229 | .extend_from_slice(&paged_data[pos + 5..pos + 5 + page_size]); |
| 230 | |
| 231 | pos += page_size + 5; |
| 232 | } |
| 233 | |
| 234 | result |
| 235 | } |
| 236 | |
| 237 | impl SerializationSink { |
| 238 | /// Writes `bytes` as a single page to the shared backing storage. The |
| 239 | /// method will first write the page header (consisting of the page tag and |
| 240 | /// the number of bytes in the page) and then the page contents |
| 241 | /// (i.e. `bytes`). |
| 242 | fn write_page(&self, bytes: &[u8]) { |
| 243 | if bytes.len() > 0 { |
| 244 | // We explicitly don't assert `bytes.len() >= MIN_PAGE_SIZE` because |
| 245 | // `MIN_PAGE_SIZE` is just a recommendation and the last page will |
| 246 | // often be smaller than that. |
| 247 | assert!(bytes.len() <= MAX_PAGE_SIZE); |
| 248 | |
| 249 | let mut file = self.shared_state.0.lock(); |
| 250 | |
| 251 | file.write_all(&[self.page_tag as u8]).unwrap(); |
| 252 | |
| 253 | let page_size: [u8; 4] = (bytes.len() as u32).to_le_bytes(); |
| 254 | file.write_all(&page_size).unwrap(); |
| 255 | file.write_all(&bytes[..]).unwrap(); |
| 256 | } |
| 257 | } |
| 258 | |
| 259 | /// Flushes `buffer` by writing its contents as a new page to the backing |
| 260 | /// storage and then clearing it. |
| 261 | fn flush(&self, buffer: &mut Vec<u8>) { |
| 262 | self.write_page(&buffer[..]); |
| 263 | buffer.clear(); |
| 264 | } |
| 265 | |
| 266 | /// Creates a copy of all data written so far. This method is meant to be |
| 267 | /// used for writing unit tests. It will panic if the underlying |
| 268 | /// `BackingStorage` is a file. |
| 269 | pub fn into_bytes(mut self) -> Vec<u8> { |
| 270 | // Swap out the contains of `self` with something that can safely be |
| 271 | // dropped without side effects. |
| 272 | let mut data = Mutex::new(SerializationSinkInner { |
| 273 | buffer: Vec::new(), |
| 274 | addr: 0, |
| 275 | }); |
| 276 | std::mem::swap(&mut self.data, &mut data); |
| 277 | |
| 278 | // Extract the data from the mutex. |
| 279 | let SerializationSinkInner { |
| 280 | ref mut buffer, |
| 281 | addr: _, |
| 282 | } = data.into_inner(); |
| 283 | |
| 284 | // Make sure we write the current contents of the buffer to the |
| 285 | // backing storage before proceeding. |
| 286 | self.flush(buffer); |
| 287 | |
| 288 | self.shared_state.copy_bytes_with_page_tag(self.page_tag) |
| 289 | } |
| 290 | |
| 291 | /// Atomically writes `num_bytes` of data to this `SerializationSink`. |
| 292 | /// Atomic means the data is guaranteed to be written as a contiguous range |
| 293 | /// of bytes. |
| 294 | /// |
| 295 | /// The buffer provided to the `write` callback is guaranteed to be of size |
| 296 | /// `num_bytes` and `write` is supposed to completely fill it with the data |
| 297 | /// to be written. |
| 298 | /// |
| 299 | /// The return value is the address of the data written and can be used to |
| 300 | /// refer to the data later on. |
| 301 | pub fn write_atomic<W>(&self, num_bytes: usize, write: W) -> Addr |
| 302 | where |
| 303 | W: FnOnce(&mut [u8]), |
| 304 | { |
| 305 | if num_bytes > MAX_PAGE_SIZE { |
| 306 | let mut bytes = vec![0u8; num_bytes]; |
| 307 | write(&mut bytes[..]); |
| 308 | return self.write_bytes_atomic(&bytes[..]); |
| 309 | } |
| 310 | |
| 311 | let mut data = self.data.lock(); |
| 312 | let SerializationSinkInner { |
| 313 | ref mut buffer, |
| 314 | ref mut addr, |
| 315 | } = *data; |
| 316 | |
| 317 | if buffer.len() + num_bytes > MAX_PAGE_SIZE { |
| 318 | self.flush(buffer); |
| 319 | assert!(buffer.is_empty()); |
| 320 | } |
| 321 | |
| 322 | let curr_addr = *addr; |
| 323 | |
| 324 | let buf_start = buffer.len(); |
| 325 | let buf_end = buf_start + num_bytes; |
| 326 | buffer.resize(buf_end, 0u8); |
| 327 | write(&mut buffer[buf_start..buf_end]); |
| 328 | |
| 329 | *addr += num_bytes as u64; |
| 330 | |
| 331 | Addr(curr_addr) |
| 332 | } |
| 333 | |
| 334 | /// Atomically writes the data in `bytes` to this `SerializationSink`. |
| 335 | /// Atomic means the data is guaranteed to be written as a contiguous range |
| 336 | /// of bytes. |
| 337 | /// |
| 338 | /// This method may perform better than `write_atomic` because it may be |
| 339 | /// able to skip the sink's internal buffer. Use this method if the data to |
| 340 | /// be written is already available as a `&[u8]`. |
| 341 | /// |
| 342 | /// The return value is the address of the data written and can be used to |
| 343 | /// refer to the data later on. |
| 344 | pub fn write_bytes_atomic(&self, bytes: &[u8]) -> Addr { |
| 345 | // For "small" data we go to the buffered version immediately. |
| 346 | if bytes.len() <= 128 { |
| 347 | return self.write_atomic(bytes.len(), |sink| { |
| 348 | sink.copy_from_slice(bytes); |
| 349 | }); |
| 350 | } |
| 351 | |
| 352 | let mut data = self.data.lock(); |
| 353 | let SerializationSinkInner { |
| 354 | ref mut buffer, |
| 355 | ref mut addr, |
| 356 | } = *data; |
| 357 | |
| 358 | let curr_addr = Addr(*addr); |
| 359 | *addr += bytes.len() as u64; |
| 360 | |
| 361 | let mut bytes_left = bytes; |
| 362 | |
| 363 | // Do we have too little data in the buffer? If so, fill up the buffer |
| 364 | // to the minimum page size. |
| 365 | if buffer.len() < MIN_PAGE_SIZE { |
| 366 | let num_bytes_to_take = min(MIN_PAGE_SIZE - buffer.len(), bytes_left.len()); |
| 367 | buffer.extend_from_slice(&bytes_left[..num_bytes_to_take]); |
| 368 | bytes_left = &bytes_left[num_bytes_to_take..]; |
| 369 | } |
| 370 | |
| 371 | if bytes_left.is_empty() { |
| 372 | return curr_addr; |
| 373 | } |
| 374 | |
| 375 | // Make sure we flush the buffer before writing out any other pages. |
| 376 | self.flush(buffer); |
| 377 | |
| 378 | for chunk in bytes_left.chunks(MAX_PAGE_SIZE) { |
| 379 | if chunk.len() == MAX_PAGE_SIZE { |
| 380 | // This chunk has the maximum size. It might or might not be the |
| 381 | // last one. In either case we want to write it to disk |
| 382 | // immediately because there is no reason to copy it to the |
| 383 | // buffer first. |
| 384 | self.write_page(chunk); |
| 385 | } else { |
| 386 | // This chunk is less than the chunk size that we requested, so |
| 387 | // it must be the last one. If it is big enough to warrant its |
| 388 | // own page, we write it to disk immediately. Otherwise, we copy |
| 389 | // it to the buffer. |
| 390 | if chunk.len() >= MIN_PAGE_SIZE { |
| 391 | self.write_page(chunk); |
| 392 | } else { |
| 393 | debug_assert!(buffer.is_empty()); |
| 394 | buffer.extend_from_slice(chunk); |
| 395 | } |
| 396 | } |
| 397 | } |
| 398 | |
| 399 | curr_addr |
| 400 | } |
| 401 | |
| 402 | pub fn as_std_write<'a>(&'a self) -> impl Write + 'a { |
| 403 | StdWriteAdapter(self) |
| 404 | } |
| 405 | } |
| 406 | |
| 407 | impl Drop for SerializationSink { |
| 408 | fn drop(&mut self) { |
| 409 | let mut data: MutexGuard<'_, RawMutex, …> = self.data.lock(); |
| 410 | let SerializationSinkInner { |
| 411 | ref mut buffer: &mut Vec, |
| 412 | addr: _, |
| 413 | } = *data; |
| 414 | |
| 415 | self.flush(buffer); |
| 416 | } |
| 417 | } |
| 418 | |
| 419 | #[cfg (test)] |
| 420 | mod tests { |
| 421 | use super::*; |
| 422 | |
| 423 | // This function writes `chunk_count` byte-slices of size `chunk_size` to |
| 424 | // three `SerializationSinks` that all map to the same underlying stream, |
| 425 | // so we get interleaved pages with different tags. |
| 426 | // It then extracts the data out again and asserts that it is the same as |
| 427 | // has been written. |
| 428 | fn test_roundtrip<W>(chunk_size: usize, chunk_count: usize, write: W) |
| 429 | where |
| 430 | W: Fn(&SerializationSink, &[u8]) -> Addr, |
| 431 | { |
| 432 | let sink_builder = SerializationSinkBuilder::new_in_memory(); |
| 433 | let tags = [PageTag::Events, PageTag::StringData, PageTag::StringIndex]; |
| 434 | let expected_chunk: Vec<u8> = (0..chunk_size).map(|x| (x % 239) as u8).collect(); |
| 435 | |
| 436 | { |
| 437 | let sinks: Vec<SerializationSink> = |
| 438 | tags.iter().map(|&tag| sink_builder.new_sink(tag)).collect(); |
| 439 | |
| 440 | for chunk_index in 0..chunk_count { |
| 441 | let expected_addr = Addr((chunk_index * chunk_size) as u64); |
| 442 | for sink in sinks.iter() { |
| 443 | assert_eq!(write(sink, &expected_chunk[..]), expected_addr); |
| 444 | } |
| 445 | } |
| 446 | } |
| 447 | |
| 448 | let streams: Vec<Vec<u8>> = tags |
| 449 | .iter() |
| 450 | .map(|&tag| sink_builder.0.copy_bytes_with_page_tag(tag)) |
| 451 | .collect(); |
| 452 | |
| 453 | for stream in streams { |
| 454 | for chunk in stream.chunks(chunk_size) { |
| 455 | assert_eq!(chunk, expected_chunk); |
| 456 | } |
| 457 | } |
| 458 | } |
| 459 | |
| 460 | fn write_closure(sink: &SerializationSink, bytes: &[u8]) -> Addr { |
| 461 | sink.write_atomic(bytes.len(), |dest| dest.copy_from_slice(bytes)) |
| 462 | } |
| 463 | |
| 464 | fn write_slice(sink: &SerializationSink, bytes: &[u8]) -> Addr { |
| 465 | sink.write_bytes_atomic(bytes) |
| 466 | } |
| 467 | |
| 468 | // Creates two roundtrip tests, one using `SerializationSink::write_atomic` |
| 469 | // and one using `SerializationSink::write_bytes_atomic`. |
| 470 | macro_rules! mk_roundtrip_test { |
| 471 | ($name:ident, $chunk_size:expr, $chunk_count:expr) => { |
| 472 | mod $name { |
| 473 | use super::*; |
| 474 | |
| 475 | #[test] |
| 476 | fn write_atomic() { |
| 477 | test_roundtrip($chunk_size, $chunk_count, write_closure); |
| 478 | } |
| 479 | |
| 480 | #[test] |
| 481 | fn write_bytes_atomic() { |
| 482 | test_roundtrip($chunk_size, $chunk_count, write_slice); |
| 483 | } |
| 484 | } |
| 485 | }; |
| 486 | } |
| 487 | |
| 488 | mk_roundtrip_test!(small_data, 10, (90 * MAX_PAGE_SIZE) / 100); |
| 489 | mk_roundtrip_test!(huge_data, MAX_PAGE_SIZE * 10, 5); |
| 490 | |
| 491 | mk_roundtrip_test!(exactly_max_page_size, MAX_PAGE_SIZE, 10); |
| 492 | mk_roundtrip_test!(max_page_size_plus_one, MAX_PAGE_SIZE + 1, 10); |
| 493 | mk_roundtrip_test!(max_page_size_minus_one, MAX_PAGE_SIZE - 1, 10); |
| 494 | |
| 495 | mk_roundtrip_test!(exactly_min_page_size, MIN_PAGE_SIZE, 10); |
| 496 | mk_roundtrip_test!(min_page_size_plus_one, MIN_PAGE_SIZE + 1, 10); |
| 497 | mk_roundtrip_test!(min_page_size_minus_one, MIN_PAGE_SIZE - 1, 10); |
| 498 | } |
| 499 | |