Skip to content

Commit 3e02f29

Browse files
author
Devdutt Shenoi
authored
fix: query staging(in-mem) when concerned with the past 5 minutes (#1194)
1 parent e1e2f2a commit 3e02f29

File tree

2 files changed

+14
-16
lines changed

2 files changed

+14
-16
lines changed

src/query/stream_schema_provider.rs

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use crate::{
2727
use arrow_array::RecordBatch;
2828
use arrow_schema::{Schema, SchemaRef, SortOptions};
2929
use bytes::Bytes;
30-
use chrono::{DateTime, NaiveDateTime, Timelike, Utc};
30+
use chrono::{DateTime, NaiveDateTime, TimeDelta, Timelike, Utc};
3131
use datafusion::catalog::Session;
3232
use datafusion::common::stats::Precision;
3333
use datafusion::logical_expr::utils::conjunction;
@@ -442,7 +442,7 @@ impl TableProvider for StandardTableProvider {
442442
return Err(DataFusionError::Plan("potentially unbounded query on time range. Table scanning requires atleast one time bound".to_string()));
443443
}
444444

445-
if include_now(filters, &time_partition) {
445+
if is_within_staging_window(&time_filters) {
446446
if let Ok(staging) = PARSEABLE.get_stream(&self.stream) {
447447
let records = staging.recordbatches_cloned(&self.schema);
448448
let reversed_mem_table = reversed_mem_table(records, self.schema.clone())?;
@@ -730,23 +730,21 @@ fn return_listing_time_filters(
730730
}
731731
}
732732

733-
pub fn include_now(filters: &[Expr], time_partition: &Option<String>) -> bool {
734-
let current_minute = Utc::now()
733+
/// We should consider data in staging for queries concerning a time period,
734+
/// ending within 5 minutes from now. e.g. If current time is 5
735+
pub fn is_within_staging_window(time_filters: &[PartialTimeFilter]) -> bool {
736+
let five_minutes_back = (Utc::now() - TimeDelta::minutes(5))
735737
.with_second(0)
736738
.and_then(|x| x.with_nanosecond(0))
737739
.expect("zeroed value is valid")
738740
.naive_utc();
739741

740-
let time_filters = extract_primary_filter(filters, time_partition);
741-
742-
let upper_bound_matches = time_filters.iter().any(|filter| match filter {
742+
if time_filters.iter().any(|filter| match filter {
743743
PartialTimeFilter::High(Bound::Excluded(time))
744744
| PartialTimeFilter::High(Bound::Included(time))
745-
| PartialTimeFilter::Eq(time) => time > &current_minute,
745+
| PartialTimeFilter::Eq(time) => time >= &five_minutes_back,
746746
_ => false,
747-
});
748-
749-
if upper_bound_matches {
747+
}) {
750748
return true;
751749
}
752750

@@ -828,7 +826,7 @@ pub async fn collect_manifest_files(
828826
}
829827

830828
// Extract start time and end time from filter predicate
831-
fn extract_primary_filter(
829+
pub fn extract_primary_filter(
832830
filters: &[Expr],
833831
time_partition: &Option<String>,
834832
) -> Vec<PartialTimeFilter> {

src/utils/arrow/flight.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use crate::event::Event;
2020
use crate::handlers::http::ingest::push_logs_unchecked;
2121
use crate::handlers::http::query::Query as QueryJson;
2222
use crate::parseable::PARSEABLE;
23-
use crate::query::stream_schema_provider::include_now;
23+
use crate::query::stream_schema_provider::{extract_primary_filter, is_within_staging_window};
2424
use crate::{handlers::http::modal::IngestorMetadata, option::Mode};
2525

2626
use arrow_array::RecordBatch;
@@ -131,9 +131,9 @@ pub fn send_to_ingester(start: i64, end: i64) -> bool {
131131
datafusion::logical_expr::Operator::Lt,
132132
Box::new(filter_end),
133133
);
134-
let ex = [Expr::BinaryExpr(ex1), Expr::BinaryExpr(ex2)];
135-
136-
PARSEABLE.options.mode == Mode::Query && include_now(&ex, &None)
134+
let time_filters =
135+
extract_primary_filter(&[Expr::BinaryExpr(ex1), Expr::BinaryExpr(ex2)], &None);
136+
PARSEABLE.options.mode == Mode::Query && is_within_staging_window(&time_filters)
137137
}
138138

139139
fn lit_timestamp_milli(time: i64) -> Expr {

0 commit comments

Comments
 (0)