feat(stitched): Fix bugs in stitcher and correct two testcases

This commit is contained in:
Ginger
2026-01-22 12:13:13 -05:00
parent b242bad6d2
commit 70c62158d0
9 changed files with 93 additions and 62 deletions
@@ -1,8 +1,9 @@
use std::{
cmp::Ordering,
collections::{BTreeSet, HashSet},
collections::{BTreeSet, HashMap, HashSet},
};
use indexmap::IndexSet;
use itertools::Itertools;
use super::{Batch, Gap, OrderKey, StitchedItem, StitcherBackend};
@@ -40,8 +41,9 @@ impl<B: StitcherBackend> Stitcher<'_, B> {
pub(super) fn stitch<'id>(&self, batch: Batch<'id>) -> OrderUpdates<'id, B::Key> {
let mut gap_updates = Vec::new();
let mut all_new_events: HashSet<&'id str> = HashSet::new();
let mut remaining_events: BTreeSet<_> = batch.events().collect();
let mut remaining_events: IndexSet<_> = batch.events().collect();
// 1: Find existing gaps which include IDs of events in `batch`
let matching_gaps = self.backend.find_matching_gaps(batch.events());
@@ -59,12 +61,15 @@ impl<B: StitcherBackend> Stitcher<'_, B> {
.copied()
.collect();
all_new_events.extend(events_to_insert.iter());
// 4. Remove the events in the to-insert list from `remaining_events` so they
// aren't processed again
remaining_events.retain(|event| !events_to_insert.contains(event));
// 5 and 6
let inserted_items = self.sort_events_and_create_gaps(&batch, events_to_insert);
let inserted_items =
self.sort_events_and_create_gaps(&batch, &all_new_events, events_to_insert);
// 8. Update gap
gap.retain(|id| !batch.contains(id));
@@ -75,7 +80,10 @@ impl<B: StitcherBackend> Stitcher<'_, B> {
}
// 10. Append remaining events and gaps
let new_items = self.sort_events_and_create_gaps(&batch, remaining_events);
all_new_events.extend(remaining_events.iter());
let new_items =
self.sort_events_and_create_gaps(&batch, &all_new_events, remaining_events);
OrderUpdates { gap_updates, new_items }
}
@@ -83,12 +91,13 @@ impl<B: StitcherBackend> Stitcher<'_, B> {
fn sort_events_and_create_gaps<'id>(
&self,
batch: &Batch<'id>,
all_new_events: &HashSet<&'id str>,
events_to_insert: impl IntoIterator<Item = &'id str>,
) -> Vec<StitchedItem<'id>> {
// 5. Sort the to-insert list with DAG;received order
let events_to_insert = events_to_insert
.into_iter()
.sorted_by(Self::compare_by_dag_received(batch))
.sorted_by(batch.compare_by_dag_received())
.collect_vec();
let mut items = Vec::with_capacity(
@@ -102,7 +111,9 @@ impl<B: StitcherBackend> Stitcher<'_, B> {
.prev_events
.iter()
.filter(|prev_event| {
!(batch.contains(prev_event) || self.backend.event_exists(prev_event))
!(batch.contains(prev_event)
|| all_new_events.contains(*prev_event)
|| self.backend.event_exists(prev_event))
})
.map(|id| String::from(*id))
.collect();
@@ -116,36 +127,4 @@ impl<B: StitcherBackend> Stitcher<'_, B> {
items
}
/// Compare two events by DAG;received order.
///
/// If either event is in the other's predecessor set it comes first,
/// otherwise they are sorted by which comes first in the batch.
fn compare_by_dag_received<'id>(
batch: &Batch<'id>,
) -> impl FnMut(&&'id str, &&'id str) -> Ordering {
|a, b| {
if batch
.predecessors(a)
.is_some_and(|it| it.predecessor_set.contains(b))
{
Ordering::Greater
} else if batch
.predecessors(b)
.is_some_and(|it| it.predecessor_set.contains(a))
{
Ordering::Less
} else {
for event in batch.events() {
if event == *a {
return Ordering::Greater;
} else if event == *b {
return Ordering::Less;
}
}
panic!("neither {a} nor {b} in batch");
}
}
}
}