1 | //! Join functionality. |
2 | |
3 | use super::{Relation, Variable}; |
4 | use std::cell::Ref; |
5 | use std::ops::Deref; |
6 | |
7 | /// Implements `join`. Note that `input1` must be a variable, but |
8 | /// `input2` can be either a variable or a relation. This is necessary |
9 | /// because relations have no "recent" tuples, so the fn would be a |
10 | /// guaranteed no-op if both arguments were relations. See also |
11 | /// `join_into_relation`. |
12 | pub(crate) fn join_into<'me, Key: Ord, Val1: Ord, Val2: Ord, Result: Ord>( |
13 | input1: &Variable<(Key, Val1)>, |
14 | input2: impl JoinInput<'me, (Key, Val2)>, |
15 | output: &Variable<Result>, |
16 | mut logic: impl FnMut(&Key, &Val1, &Val2) -> Result, |
17 | ) { |
18 | let mut results: Vec = Vec::new(); |
19 | |
20 | let recent1: Ref<'_, [(Key, Val1)]> = input1.recent(); |
21 | let recent2: as JoinInput<…>>::RecentTuples = input2.recent(); |
22 | |
23 | { |
24 | // scoped to let `closure` drop borrow of `results`. |
25 | |
26 | let mut closure: impl FnMut(&Key, &Val1, &…) = |k: &Key, v1: &Val1, v2: &Val2| results.push(logic(k, v1, v2)); |
27 | |
28 | for batch2: &Relation<(Key, Val2)> in input2.stable().iter() { |
29 | join_helper(&recent1, &batch2, &mut closure); |
30 | } |
31 | |
32 | for batch1: &Relation<(Key, Val1)> in input1.stable().iter() { |
33 | join_helper(&batch1, &recent2, &mut closure); |
34 | } |
35 | |
36 | join_helper(&recent1, &recent2, &mut closure); |
37 | } |
38 | |
39 | output.insert(Relation::from_vec(elements:results)); |
40 | } |
41 | |
42 | /// Join, but for two relations. |
43 | pub(crate) fn join_into_relation<'me, Key: Ord, Val1: Ord, Val2: Ord, Result: Ord>( |
44 | input1: &Relation<(Key, Val1)>, |
45 | input2: &Relation<(Key, Val2)>, |
46 | mut logic: impl FnMut(&Key, &Val1, &Val2) -> Result, |
47 | ) -> Relation<Result> { |
48 | let mut results: Vec = Vec::new(); |
49 | |
50 | join_helper(&input1.elements, &input2.elements, |k: &Key, v1: &Val1, v2: &Val2| { |
51 | results.push(logic(k, v1, v2)); |
52 | }); |
53 | |
54 | Relation::from_vec(elements:results) |
55 | } |
56 | |
57 | /// Moves all recent tuples from `input1` that are not present in `input2` into `output`. |
58 | pub(crate) fn antijoin<'me, Key: Ord, Val: Ord, Result: Ord>( |
59 | input1: impl JoinInput<'me, (Key, Val)>, |
60 | input2: &Relation<Key>, |
61 | mut logic: impl FnMut(&Key, &Val) -> Result, |
62 | ) -> Relation<Result> { |
63 | let mut tuples2: &[Key] = &input2[..]; |
64 | |
65 | let results: Vec = input1impl Iterator |
66 | .recent() |
67 | .iter() |
68 | .filter(|(ref key: &Key, _)| { |
69 | tuples2 = gallop(slice:tuples2, |k: &Key| k < key); |
70 | tuples2.first() != Some(key) |
71 | }) |
72 | .map(|(ref key: &Key, ref val: &Val)| logic(key, val)) |
73 | .collect::<Vec<_>>(); |
74 | |
75 | Relation::from_vec(elements:results) |
76 | } |
77 | |
78 | fn join_helper<K: Ord, V1, V2>( |
79 | mut slice1: &[(K, V1)], |
80 | mut slice2: &[(K, V2)], |
81 | mut result: impl FnMut(&K, &V1, &V2), |
82 | ) { |
83 | while !slice1.is_empty() && !slice2.is_empty() { |
84 | use std::cmp::Ordering; |
85 | |
86 | // If the keys match produce tuples, else advance the smaller key until they might. |
87 | match slice1[0].0.cmp(&slice2[0].0) { |
88 | Ordering::Less => { |
89 | slice1 = gallop(slice1, |x| x.0 < slice2[0].0); |
90 | } |
91 | Ordering::Equal => { |
92 | // Determine the number of matching keys in each slice. |
93 | let count1 = slice1.iter().take_while(|x| x.0 == slice1[0].0).count(); |
94 | let count2 = slice2.iter().take_while(|x| x.0 == slice2[0].0).count(); |
95 | |
96 | // Produce results from the cross-product of matches. |
97 | for index1 in 0..count1 { |
98 | for s2 in slice2[..count2].iter() { |
99 | result(&slice1[0].0, &slice1[index1].1, &s2.1); |
100 | } |
101 | } |
102 | |
103 | // Advance slices past this key. |
104 | slice1 = &slice1[count1..]; |
105 | slice2 = &slice2[count2..]; |
106 | } |
107 | Ordering::Greater => { |
108 | slice2 = gallop(slice2, |x| x.0 < slice1[0].0); |
109 | } |
110 | } |
111 | } |
112 | } |
113 | |
114 | pub(crate) fn gallop<T>(mut slice: &[T], mut cmp: impl FnMut(&T) -> bool) -> &[T] { |
115 | // if empty slice, or already >= element, return |
116 | if !slice.is_empty() && cmp(&slice[0]) { |
117 | let mut step: usize = 1; |
118 | while step < slice.len() && cmp(&slice[step]) { |
119 | slice = &slice[step..]; |
120 | step <<= 1; |
121 | } |
122 | |
123 | step >>= 1; |
124 | while step > 0 { |
125 | if step < slice.len() && cmp(&slice[step]) { |
126 | slice = &slice[step..]; |
127 | } |
128 | step >>= 1; |
129 | } |
130 | |
131 | slice = &slice[1..]; // advance one, as we always stayed < value |
132 | } |
133 | |
134 | slice |
135 | } |
136 | |
137 | /// An input that can be used with `from_join`; either a `Variable` or a `Relation`. |
138 | pub trait JoinInput<'me, Tuple: Ord>: Copy { |
139 | /// If we are on iteration N of the loop, these are the tuples |
140 | /// added on iteration N-1. (For a `Relation`, this is always an |
141 | /// empty slice.) |
142 | type RecentTuples: Deref<Target = [Tuple]>; |
143 | |
144 | /// If we are on iteration N of the loop, these are the tuples |
145 | /// added on iteration N - 2 or before. (For a `Relation`, this is |
146 | /// just `self`.) |
147 | type StableTuples: Deref<Target = [Relation<Tuple>]>; |
148 | |
149 | /// Get the set of recent tuples. |
150 | fn recent(self) -> Self::RecentTuples; |
151 | |
152 | /// Get the set of stable tuples. |
153 | fn stable(self) -> Self::StableTuples; |
154 | } |
155 | |
156 | impl<'me, Tuple: Ord> JoinInput<'me, Tuple> for &'me Variable<Tuple> { |
157 | type RecentTuples = Ref<'me, [Tuple]>; |
158 | type StableTuples = Ref<'me, [Relation<Tuple>]>; |
159 | |
160 | fn recent(self) -> Self::RecentTuples { |
161 | Ref::map(self.recent.borrow(), |r: &Relation| &r.elements[..]) |
162 | } |
163 | |
164 | fn stable(self) -> Self::StableTuples { |
165 | Ref::map(self.stable.borrow(), |v: &Vec>| &v[..]) |
166 | } |
167 | } |
168 | |
169 | impl<'me, Tuple: Ord> JoinInput<'me, Tuple> for &'me Relation<Tuple> { |
170 | type RecentTuples = &'me [Tuple]; |
171 | type StableTuples = &'me [Relation<Tuple>]; |
172 | |
173 | fn recent(self) -> Self::RecentTuples { |
174 | &[] |
175 | } |
176 | |
177 | fn stable(self) -> Self::StableTuples { |
178 | std::slice::from_ref(self) |
179 | } |
180 | } |
181 | |