Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gcp pubsub source: at_least_once #3767

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix existing test by sending suggest truncate msg
  • Loading branch information
AyWa committed Aug 23, 2023
commit 8e89963d90a251ed07e6f8deab129c9b80e215bb
41 changes: 25 additions & 16 deletions quickwit/quickwit-indexing/src/source/gcp_pubsub_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use google_cloud_gax::retry::RetrySetting;
use google_cloud_pubsub::client::{Client, ClientConfig};
use google_cloud_pubsub::subscription::Subscription;
use quickwit_actors::{ActorContext, ActorExitStatus, Mailbox};
use quickwit_common::rand::append_random_suffix;
use quickwit_config::GcpPubSubSourceParams;
use quickwit_metastore::checkpoint::{PartitionId, Position, SourceCheckpoint};
use serde_json::{json, Value as JsonValue};
Expand Down Expand Up @@ -154,9 +153,8 @@ impl GcpPubSubSource {
.max_messages_per_pull
.unwrap_or(DEFAULT_MAX_MESSAGES_PER_PULL);

// TODO: replace with "<node_id>/<index_id>/<source_id>/<pipeline_ord>"
let partition_id = append_random_suffix(&format!("gpc-pubsub-{subscription_name}"));
let partition_id = PartitionId::from(partition_id);
// TODO: replace with "<node_id>/<index_id>/<source_id>/<pipeline_ord>" !
let partition_id = PartitionId::from(format!("gpc-pubsub-{subscription_name}"));
info!(
index_id=%ctx.index_uid.index_id(),
source_id=%ctx.source_config.source_id,
Expand Down Expand Up @@ -291,22 +289,16 @@ impl GcpPubSubSource {
.await
.context("Failed to pull messages from subscription.")?;

let Some(last_message) = messages.last() else {
if messages.is_empty() {
return Ok(());
};
let _message_id = last_message.message.message_id.clone();
let _publish_timestamp_millis = last_message
.message
.publish_time
.as_ref()
.map(|timestamp| timestamp.seconds * 1_000 + (timestamp.nanos as i64 / 1_000_000))
.unwrap_or(0); // TODO: Replace with now UTC millis.

// TODO: do we want to log some stuff depending on the publish time. Like the "lag" behind
// or put it in the position ?
let mut message_ids: MessageIDs = Vec::with_capacity(messages.len());
for message in messages {
self.state.num_messages_processed += 1;
self.state.num_bytes_processed += message.message.data.len() as u64;
message_ids.push(message.message.message_id);
message_ids.push(message.ack_id().to_string());
let doc: Bytes = Bytes::from(message.message.data);
if doc.is_empty() {
self.state.num_invalid_messages += 1;
Expand Down Expand Up @@ -341,14 +333,15 @@ mod gcp_pubsub_emulator_tests {
use google_cloud_pubsub::publisher::Publisher;
use google_cloud_pubsub::subscription::SubscriptionConfig;
use quickwit_actors::Universe;
use quickwit_common::rand::append_random_suffix;
use quickwit_config::{SourceConfig, SourceInputFormat, SourceParams};
use quickwit_metastore::metastore_for_test;
use quickwit_proto::IndexUid;
use serde_json::json;

use super::*;
use crate::models::RawDocBatch;
use crate::source::quickwit_supported_sources;
use crate::source::{quickwit_supported_sources, SuggestTruncate};

static GCP_TEST_PROJECT: &str = "quickwit-emulator";

Expand Down Expand Up @@ -463,8 +456,24 @@ mod gcp_pubsub_emulator_tests {
source,
doc_processor_mailbox: doc_processor_mailbox.clone(),
};
let (_source_mailbox, source_handle) = universe.spawn_builder().spawn(source_actor);
let (source_mailbox, source_handle) = universe.spawn_builder().spawn(source_actor);
let partition = format!("gpc-pubsub-{subscription}");
let trigger_suggest_truncate = tokio::spawn(async move {
loop {
let to_position = Position::from(format!("{}", Ulid::new()));
AyWa marked this conversation as resolved.
Show resolved Hide resolved
let checkpoint: SourceCheckpoint =
vec![(PartitionId::from(partition.clone()), to_position)]
.into_iter()
.collect();

let suggest_truncate_req = SuggestTruncate(checkpoint);
tokio::time::sleep(Duration::from_millis(50)).await;
source_mailbox.ask(suggest_truncate_req).await.unwrap();
}
});

let (exit_status, exit_state) = source_handle.join().await;
trigger_suggest_truncate.abort();
assert!(exit_status.is_success());

let messages: Vec<RawDocBatch> = doc_processor_inbox.drain_for_test_typed();
Expand Down
Loading