1//! Join functionality.
2
3use super::{Relation, Variable};
4use std::cell::Ref;
5use std::ops::Deref;
6
7/// Implements `join`. Note that `input1` must be a variable, but
8/// `input2` can be either a variable or a relation. This is necessary
9/// because relations have no "recent" tuples, so the fn would be a
10/// guaranteed no-op if both arguments were relations. See also
11/// `join_into_relation`.
12pub(crate) fn join_into<'me, Key: Ord, Val1: Ord, Val2: Ord, Result: Ord>(
13 input1: &Variable<(Key, Val1)>,
14 input2: impl JoinInput<'me, (Key, Val2)>,
15 output: &Variable<Result>,
16 mut logic: impl FnMut(&Key, &Val1, &Val2) -> Result,
17) {
18 let mut results: Vec = Vec::new();
19
20 let recent1: Ref<'_, [(Key, Val1)]> = input1.recent();
21 let recent2: as JoinInput<…>>::RecentTuples = input2.recent();
22
23 {
24 // scoped to let `closure` drop borrow of `results`.
25
26 let mut closure: impl FnMut(&Key, &Val1, &…) = |k: &Key, v1: &Val1, v2: &Val2| results.push(logic(k, v1, v2));
27
28 for batch2: &Relation<(Key, Val2)> in input2.stable().iter() {
29 join_helper(&recent1, &batch2, &mut closure);
30 }
31
32 for batch1: &Relation<(Key, Val1)> in input1.stable().iter() {
33 join_helper(&batch1, &recent2, &mut closure);
34 }
35
36 join_helper(&recent1, &recent2, &mut closure);
37 }
38
39 output.insert(Relation::from_vec(elements:results));
40}
41
42/// Join, but for two relations.
43pub(crate) fn join_into_relation<'me, Key: Ord, Val1: Ord, Val2: Ord, Result: Ord>(
44 input1: &Relation<(Key, Val1)>,
45 input2: &Relation<(Key, Val2)>,
46 mut logic: impl FnMut(&Key, &Val1, &Val2) -> Result,
47) -> Relation<Result> {
48 let mut results: Vec = Vec::new();
49
50 join_helper(&input1.elements, &input2.elements, |k: &Key, v1: &Val1, v2: &Val2| {
51 results.push(logic(k, v1, v2));
52 });
53
54 Relation::from_vec(elements:results)
55}
56
57/// Moves all recent tuples from `input1` that are not present in `input2` into `output`.
58pub(crate) fn antijoin<'me, Key: Ord, Val: Ord, Result: Ord>(
59 input1: impl JoinInput<'me, (Key, Val)>,
60 input2: &Relation<Key>,
61 mut logic: impl FnMut(&Key, &Val) -> Result,
62) -> Relation<Result> {
63 let mut tuples2: &[Key] = &input2[..];
64
65 let results: Vec = input1impl Iterator
66 .recent()
67 .iter()
68 .filter(|(ref key: &Key, _)| {
69 tuples2 = gallop(slice:tuples2, |k: &Key| k < key);
70 tuples2.first() != Some(key)
71 })
72 .map(|(ref key: &Key, ref val: &Val)| logic(key, val))
73 .collect::<Vec<_>>();
74
75 Relation::from_vec(elements:results)
76}
77
78fn join_helper<K: Ord, V1, V2>(
79 mut slice1: &[(K, V1)],
80 mut slice2: &[(K, V2)],
81 mut result: impl FnMut(&K, &V1, &V2),
82) {
83 while !slice1.is_empty() && !slice2.is_empty() {
84 use std::cmp::Ordering;
85
86 // If the keys match produce tuples, else advance the smaller key until they might.
87 match slice1[0].0.cmp(&slice2[0].0) {
88 Ordering::Less => {
89 slice1 = gallop(slice1, |x| x.0 < slice2[0].0);
90 }
91 Ordering::Equal => {
92 // Determine the number of matching keys in each slice.
93 let count1 = slice1.iter().take_while(|x| x.0 == slice1[0].0).count();
94 let count2 = slice2.iter().take_while(|x| x.0 == slice2[0].0).count();
95
96 // Produce results from the cross-product of matches.
97 for index1 in 0..count1 {
98 for s2 in slice2[..count2].iter() {
99 result(&slice1[0].0, &slice1[index1].1, &s2.1);
100 }
101 }
102
103 // Advance slices past this key.
104 slice1 = &slice1[count1..];
105 slice2 = &slice2[count2..];
106 }
107 Ordering::Greater => {
108 slice2 = gallop(slice2, |x| x.0 < slice1[0].0);
109 }
110 }
111 }
112}
113
114pub(crate) fn gallop<T>(mut slice: &[T], mut cmp: impl FnMut(&T) -> bool) -> &[T] {
115 // if empty slice, or already >= element, return
116 if !slice.is_empty() && cmp(&slice[0]) {
117 let mut step: usize = 1;
118 while step < slice.len() && cmp(&slice[step]) {
119 slice = &slice[step..];
120 step <<= 1;
121 }
122
123 step >>= 1;
124 while step > 0 {
125 if step < slice.len() && cmp(&slice[step]) {
126 slice = &slice[step..];
127 }
128 step >>= 1;
129 }
130
131 slice = &slice[1..]; // advance one, as we always stayed < value
132 }
133
134 slice
135}
136
137/// An input that can be used with `from_join`; either a `Variable` or a `Relation`.
138pub trait JoinInput<'me, Tuple: Ord>: Copy {
139 /// If we are on iteration N of the loop, these are the tuples
140 /// added on iteration N-1. (For a `Relation`, this is always an
141 /// empty slice.)
142 type RecentTuples: Deref<Target = [Tuple]>;
143
144 /// If we are on iteration N of the loop, these are the tuples
145 /// added on iteration N - 2 or before. (For a `Relation`, this is
146 /// just `self`.)
147 type StableTuples: Deref<Target = [Relation<Tuple>]>;
148
149 /// Get the set of recent tuples.
150 fn recent(self) -> Self::RecentTuples;
151
152 /// Get the set of stable tuples.
153 fn stable(self) -> Self::StableTuples;
154}
155
156impl<'me, Tuple: Ord> JoinInput<'me, Tuple> for &'me Variable<Tuple> {
157 type RecentTuples = Ref<'me, [Tuple]>;
158 type StableTuples = Ref<'me, [Relation<Tuple>]>;
159
160 fn recent(self) -> Self::RecentTuples {
161 Ref::map(self.recent.borrow(), |r: &Relation| &r.elements[..])
162 }
163
164 fn stable(self) -> Self::StableTuples {
165 Ref::map(self.stable.borrow(), |v: &Vec>| &v[..])
166 }
167}
168
169impl<'me, Tuple: Ord> JoinInput<'me, Tuple> for &'me Relation<Tuple> {
170 type RecentTuples = &'me [Tuple];
171 type StableTuples = &'me [Relation<Tuple>];
172
173 fn recent(self) -> Self::RecentTuples {
174 &[]
175 }
176
177 fn stable(self) -> Self::StableTuples {
178 std::slice::from_ref(self)
179 }
180}
181