Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gcp pubsub source: at_least_once #3767

Open
wants to merge 7 commits into
base: main
Choose a base branch
from

Conversation

AyWa
Copy link
Collaborator

@AyWa AyWa commented Aug 20, 2023

Description

Following on #3720 and #3744 for #1107

  • implements the missing check_connectivity
  • Goal of this PR, is to support at_least_once delivery, and add test for it.
  • add tests

How was this PR tested?

It is adding some test, to ensure that the message are ACK in order

Next step

In next PR we can focus on:

  • ensure we extends message ack deadline (extends) or ensure that the subscription has a deadline of at least twice bigger than the flush
  • benchmark perfs

@AyWa AyWa self-assigned this Aug 20, 2023
@AyWa AyWa requested a review from guilload August 20, 2023 14:53
Copy link
Member

@guilload guilload left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good so far. This PR opens the door for proper unit testing of the checkpointing logic.

@AyWa
Copy link
Collaborator Author

AyWa commented Aug 21, 2023

Looking good so far. This PR opens the door for proper unit testing of the checkpointing logic.

Actually I am now looking how to test the suggest_truncate. Because in the current test, nothing is calling suggest_truncate. Because there is no uploader right ? Does there is a test util to trigger suggest_truncate or other ?

@guilload
Copy link
Member

Looking good so far. This PR opens the door for proper unit testing of the checkpointing logic.

Actually I am now looking how to test the suggest_truncate. Because in the current test, nothing is calling suggest_truncate. Because there is no uploader right ? Does there is a test util to trigger suggest_truncate or other ?

You can move the ack logic outside the suggest_truncate method of the Source trait and call the method directly on the GcpPubSubSource. Some Kafka tests do that. Or you can send SuggestTruncate messages to the actor` and perform some asserts via the observed state.

@AyWa AyWa force-pushed the feat/1107/gcp-pubsub-atleastonce branch from b39649d to 4cd9949 Compare August 22, 2023 14:37
@AyWa AyWa force-pushed the feat/1107/gcp-pubsub-atleastonce branch from 351d161 to 8e89963 Compare August 23, 2023 15:09
@AyWa AyWa changed the title [DRAFT] gcp pubsub source: at_least_once gcp pubsub source: at_least_once Aug 27, 2023
@AyWa AyWa marked this pull request as ready for review August 27, 2023 05:19
@AyWa AyWa requested a review from guilload August 27, 2023 14:25
quickwit/quickwit-indexing/src/source/gcp_pubsub_source.rs Outdated Show resolved Hide resolved
quickwit/quickwit-indexing/src/source/gcp_pubsub_source.rs Outdated Show resolved Hide resolved
quickwit/quickwit-indexing/src/source/gcp_pubsub_source.rs Outdated Show resolved Hide resolved
quickwit/quickwit-indexing/src/source/gcp_pubsub_source.rs Outdated Show resolved Hide resolved
quickwit/quickwit-indexing/src/source/gcp_pubsub_source.rs Outdated Show resolved Hide resolved
quickwit/quickwit-indexing/src/source/gcp_pubsub_source.rs Outdated Show resolved Hide resolved
quickwit/quickwit-indexing/src/source/gcp_pubsub_source.rs Outdated Show resolved Hide resolved
quickwit/quickwit-indexing/src/source/gcp_pubsub_source.rs Outdated Show resolved Hide resolved
"num_bytes_processed": 54,
"num_messages_processed": 6,
"num_invalid_messages": 0,
"num_consecutive_empty_batches": 4,
});
assert_eq!(exit_state, expected_exit_state);
}

#[tokio::test]
async fn test_gcp_pubsub_source_ack() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could drop the line self.subscription.ack(message_ids).await, and this test would still pass. Does the PubSub API have some way to tell us that we acknowledged the messages correctly?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah... sadly I think there is no way to really know if the message are ack.
My guess is because gcp doesn't garantee you that the message will not be redeliver even when you call ack.

When I was working with go, we usually tested that with an interface mock...

Options1:

One way, would be to set an ack deadline really small. And at the end of the test just subscribe and wait to see if we receive message. It will not be perfect but it should be "fine". Problem is we might need to wait few seconds...

Options2:

having some interface so we can mock / ensure that the function was call.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm leaning towards Option 2. We can only mock the call to ack for now. There are examples in the codebase of mocked traits (Metastore, code-generated clients) with mockall.

quickwit/quickwit-indexing/src/source/gcp_pubsub_source.rs Outdated Show resolved Hide resolved
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants