| 1 | //! Join functionality. |
| 2 | |
| 3 | use super::{Relation, Variable}; |
| 4 | use std::cell::Ref; |
| 5 | use std::ops::Deref; |
| 6 | |
| 7 | /// Implements `join`. Note that `input1` must be a variable, but |
| 8 | /// `input2` can be either a variable or a relation. This is necessary |
| 9 | /// because relations have no "recent" tuples, so the fn would be a |
| 10 | /// guaranteed no-op if both arguments were relations. See also |
| 11 | /// `join_into_relation`. |
| 12 | pub(crate) fn join_into<'me, Key: Ord, Val1: Ord, Val2: Ord, Result: Ord>( |
| 13 | input1: &Variable<(Key, Val1)>, |
| 14 | input2: impl JoinInput<'me, (Key, Val2)>, |
| 15 | output: &Variable<Result>, |
| 16 | mut logic: impl FnMut(&Key, &Val1, &Val2) -> Result, |
| 17 | ) { |
| 18 | let mut results: Vec = Vec::new(); |
| 19 | |
| 20 | let recent1: Ref<'_, [(Key, Val1)]> = input1.recent(); |
| 21 | let recent2: as JoinInput<'me, …>>::RecentTuples = input2.recent(); |
| 22 | |
| 23 | { |
| 24 | // scoped to let `closure` drop borrow of `results`. |
| 25 | |
| 26 | let mut closure: impl FnMut(&Key, &Val1, &Val2) = |k: &Key, v1: &Val1, v2: &Val2| results.push(logic(k, v1, v2)); |
| 27 | |
| 28 | for batch2: &Relation<(Key, Val2)> in input2.stable().iter() { |
| 29 | join_helper(&recent1, &batch2, &mut closure); |
| 30 | } |
| 31 | |
| 32 | for batch1: &Relation<(Key, Val1)> in input1.stable().iter() { |
| 33 | join_helper(&batch1, &recent2, &mut closure); |
| 34 | } |
| 35 | |
| 36 | join_helper(&recent1, &recent2, &mut closure); |
| 37 | } |
| 38 | |
| 39 | output.insert(Relation::from_vec(elements:results)); |
| 40 | } |
| 41 | |
| 42 | /// Join, but for two relations. |
| 43 | pub(crate) fn join_into_relation<'me, Key: Ord, Val1: Ord, Val2: Ord, Result: Ord>( |
| 44 | input1: &Relation<(Key, Val1)>, |
| 45 | input2: &Relation<(Key, Val2)>, |
| 46 | mut logic: impl FnMut(&Key, &Val1, &Val2) -> Result, |
| 47 | ) -> Relation<Result> { |
| 48 | let mut results: Vec = Vec::new(); |
| 49 | |
| 50 | join_helper(&input1.elements, &input2.elements, |k: &Key, v1: &Val1, v2: &Val2| { |
| 51 | results.push(logic(k, v1, v2)); |
| 52 | }); |
| 53 | |
| 54 | Relation::from_vec(elements:results) |
| 55 | } |
| 56 | |
| 57 | /// Moves all recent tuples from `input1` that are not present in `input2` into `output`. |
| 58 | pub(crate) fn antijoin<'me, Key: Ord, Val: Ord, Result: Ord>( |
| 59 | input1: impl JoinInput<'me, (Key, Val)>, |
| 60 | input2: &Relation<Key>, |
| 61 | mut logic: impl FnMut(&Key, &Val) -> Result, |
| 62 | ) -> Relation<Result> { |
| 63 | let mut tuples2: &[Key] = &input2[..]; |
| 64 | |
| 65 | let results: Vec = input1impl Iterator |
| 66 | .recent() |
| 67 | .iter() |
| 68 | .filter(|(ref key: &Key, _)| { |
| 69 | tuples2 = gallop(slice:tuples2, |k: &Key| k < key); |
| 70 | tuples2.first() != Some(key) |
| 71 | }) |
| 72 | .map(|(ref key: &Key, ref val: &Val)| logic(key, val)) |
| 73 | .collect::<Vec<_>>(); |
| 74 | |
| 75 | Relation::from_vec(elements:results) |
| 76 | } |
| 77 | |
| 78 | fn join_helper<K: Ord, V1, V2>( |
| 79 | mut slice1: &[(K, V1)], |
| 80 | mut slice2: &[(K, V2)], |
| 81 | mut result: impl FnMut(&K, &V1, &V2), |
| 82 | ) { |
| 83 | while !slice1.is_empty() && !slice2.is_empty() { |
| 84 | use std::cmp::Ordering; |
| 85 | |
| 86 | // If the keys match produce tuples, else advance the smaller key until they might. |
| 87 | match slice1[0].0.cmp(&slice2[0].0) { |
| 88 | Ordering::Less => { |
| 89 | slice1 = gallop(slice1, |x| x.0 < slice2[0].0); |
| 90 | } |
| 91 | Ordering::Equal => { |
| 92 | // Determine the number of matching keys in each slice. |
| 93 | let count1 = slice1.iter().take_while(|x| x.0 == slice1[0].0).count(); |
| 94 | let count2 = slice2.iter().take_while(|x| x.0 == slice2[0].0).count(); |
| 95 | |
| 96 | // Produce results from the cross-product of matches. |
| 97 | for index1 in 0..count1 { |
| 98 | for s2 in slice2[..count2].iter() { |
| 99 | result(&slice1[0].0, &slice1[index1].1, &s2.1); |
| 100 | } |
| 101 | } |
| 102 | |
| 103 | // Advance slices past this key. |
| 104 | slice1 = &slice1[count1..]; |
| 105 | slice2 = &slice2[count2..]; |
| 106 | } |
| 107 | Ordering::Greater => { |
| 108 | slice2 = gallop(slice2, |x| x.0 < slice1[0].0); |
| 109 | } |
| 110 | } |
| 111 | } |
| 112 | } |
| 113 | |
| 114 | pub(crate) fn gallop<T>(mut slice: &[T], mut cmp: impl FnMut(&T) -> bool) -> &[T] { |
| 115 | // if empty slice, or already >= element, return |
| 116 | if !slice.is_empty() && cmp(&slice[0]) { |
| 117 | let mut step: usize = 1; |
| 118 | while step < slice.len() && cmp(&slice[step]) { |
| 119 | slice = &slice[step..]; |
| 120 | step <<= 1; |
| 121 | } |
| 122 | |
| 123 | step >>= 1; |
| 124 | while step > 0 { |
| 125 | if step < slice.len() && cmp(&slice[step]) { |
| 126 | slice = &slice[step..]; |
| 127 | } |
| 128 | step >>= 1; |
| 129 | } |
| 130 | |
| 131 | slice = &slice[1..]; // advance one, as we always stayed < value |
| 132 | } |
| 133 | |
| 134 | slice |
| 135 | } |
| 136 | |
| 137 | /// An input that can be used with `from_join`; either a `Variable` or a `Relation`. |
| 138 | pub trait JoinInput<'me, Tuple: Ord>: Copy { |
| 139 | /// If we are on iteration N of the loop, these are the tuples |
| 140 | /// added on iteration N-1. (For a `Relation`, this is always an |
| 141 | /// empty slice.) |
| 142 | type RecentTuples: Deref<Target = [Tuple]>; |
| 143 | |
| 144 | /// If we are on iteration N of the loop, these are the tuples |
| 145 | /// added on iteration N - 2 or before. (For a `Relation`, this is |
| 146 | /// just `self`.) |
| 147 | type StableTuples: Deref<Target = [Relation<Tuple>]>; |
| 148 | |
| 149 | /// Get the set of recent tuples. |
| 150 | fn recent(self) -> Self::RecentTuples; |
| 151 | |
| 152 | /// Get the set of stable tuples. |
| 153 | fn stable(self) -> Self::StableTuples; |
| 154 | } |
| 155 | |
| 156 | impl<'me, Tuple: Ord> JoinInput<'me, Tuple> for &'me Variable<Tuple> { |
| 157 | type RecentTuples = Ref<'me, [Tuple]>; |
| 158 | type StableTuples = Ref<'me, [Relation<Tuple>]>; |
| 159 | |
| 160 | fn recent(self) -> Self::RecentTuples { |
| 161 | Ref::map(self.recent.borrow(), |r: &Relation| &r.elements[..]) |
| 162 | } |
| 163 | |
| 164 | fn stable(self) -> Self::StableTuples { |
| 165 | Ref::map(self.stable.borrow(), |v: &Vec>| &v[..]) |
| 166 | } |
| 167 | } |
| 168 | |
| 169 | impl<'me, Tuple: Ord> JoinInput<'me, Tuple> for &'me Relation<Tuple> { |
| 170 | type RecentTuples = &'me [Tuple]; |
| 171 | type StableTuples = &'me [Relation<Tuple>]; |
| 172 | |
| 173 | fn recent(self) -> Self::RecentTuples { |
| 174 | &[] |
| 175 | } |
| 176 | |
| 177 | fn stable(self) -> Self::StableTuples { |
| 178 | std::slice::from_ref(self) |
| 179 | } |
| 180 | } |
| 181 | |