-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Enhancement] Set default value to MessageId.Latest in subscription creating API #23661
Comments
Hi, @lhotari . What's your suggestion ? Feel free any comments. |
Since this is existing behavior in the API that you are using, it would be a breaking change to change it. It's better to solve this type of cases by documenting the behavior instead of changing the default behavior. The value is set to latest in the Java Admin client code: pulsar/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java Lines 1077 to 1078 in fe42748
It's not impossible to change the default behavior in the broker side code, but that would require a Pulsar Improvement Proposal (PIP). Is it really worth it? |
The first step would be to document the existing behavior to https://pulsar.apache.org/docs/next/admin-api-topics/#create-subscription and to OpenAPI spec (generated from Swagger annotations). |
Okay, I understand you, seems like a better approach is for the client to use the API correctly |
Yes, and only by documenting the API will it be possible to know how to use it. That's why contributing to documentation is important and useful. |
Hi, @lhotari . I still regard this issue as a bug. Here's a case, if customer create a subscription only with properties, there will be a different behavior between rest API and admin CLI. e.g. Reproduce step
ExpectationBy default, the 2 subscriptions will consume message from the ActuallyChecking stats-internal, we got different SuggestionTo set public class ResetCursorData {
protected long ledgerId;
protected long entryId;
protected int partitionIndex = -1;
protected boolean isExcluded = false;
protected int batchIndex = -1;
protected Map<String, String> properties;
} Execution recordvisxu➜~/pulsar-cluster-3.0.6» ./pulsar-1/bin/pulsar-admin topics create public/default/topic-x [8:38:50]
visxu➜~/pulsar-cluster-3.0.6» ./pulsar-1/bin/pulsar-client produce public/default/topic-x --messages "Hello, Pulsar1" [8:39:03]
visxu➜~/pulsar-cluster-3.0.6» ./pulsar-1/bin/pulsar-client produce public/default/topic-x --messages "Hello, Pulsar2" [8:39:24]
visxu➜~/pulsar-cluster-3.0.6» ./pulsar-1/bin/pulsar-client produce public/default/topic-x --messages "Hello, Pulsar3" [8:39:29]
visxu➜~/pulsar-cluster-3.0.6» ./pulsar-1/bin/pulsar-admin topics create-subscription --subscription sub-x-1 --property key1=value1 public/default/topic-x
visxu➜~/pulsar-cluster-3.0.6» curl --location --request PUT 'http://127.0.0.1:18080/admin/v2/persistent/public/default/topic-x/subscription/sub-x-2' \ [8:40:03]
--header 'Content-Type: application/json' \
--data '{
"properties": {
"key2": "value2"
}
}'
visxu➜~/pulsar-cluster-3.0.6» ./pulsar-1/bin/pulsar-admin topics stats-internal public/default/topic-x [8:40:37]
{
"entriesAddedCounter" : 6,
"numberOfEntries" : 6,
"totalSize" : 327,
"currentLedgerEntries" : 6,
"currentLedgerSize" : 327,
"lastLedgerCreatedTimestamp" : "2024-12-11T08:39:03.068+08:00",
"waitingCursorsCount" : 0,
"pendingAddEntriesCount" : 0,
"lastConfirmedEntry" : "63:5",
"state" : "LedgerOpened",
"ledgers" : [ {
"ledgerId" : 63,
"entries" : 0,
"size" : 0,
"offloaded" : false,
"underReplicated" : false
} ],
"cursors" : {
"sub-x-1" : {
"markDeletePosition" : "63:5",
"readPosition" : "63:6",
"waitingReadOp" : false,
"pendingReadOps" : 0,
"messagesConsumedCounter" : 6,
"cursorLedger" : 64,
"cursorLedgerLastEntry" : 1,
"individuallyDeletedMessages" : "[]",
"lastLedgerSwitchTimestamp" : "2024-12-11T08:40:17.804+08:00",
"state" : "Open",
"active" : false,
"numberOfEntriesSinceFirstNotAckedMessage" : 1,
"totalNonContiguousDeletedMessagesRange" : 0,
"subscriptionHavePendingRead" : false,
"subscriptionHavePendingReplayRead" : false,
"properties" : { }
},
"sub-x-2" : {
"markDeletePosition" : "63:-1",
"readPosition" : "63:0",
"waitingReadOp" : false,
"pendingReadOps" : 0,
"messagesConsumedCounter" : 0,
"cursorLedger" : 65,
"cursorLedgerLastEntry" : 1,
"individuallyDeletedMessages" : "[]",
"lastLedgerSwitchTimestamp" : "2024-12-11T08:40:34.635+08:00",
"state" : "Open",
"active" : false,
"numberOfEntriesSinceFirstNotAckedMessage" : 1,
"totalNonContiguousDeletedMessagesRange" : 0,
"subscriptionHavePendingRead" : false,
"subscriptionHavePendingReplayRead" : false,
"properties" : { }
}
},
"schemaLedgers" : [ ],
"compactedLedger" : {
"ledgerId" : -1,
"entries" : -1,
"size" : -1,
"offloaded" : false,
"underReplicated" : false
}
}
visxu➜~/pulsar-cluster-3.0.6» [8:40:49] |
Search before asking
Motivation
We met a corner case when invoking subscription creating API. If client sends a empty JSON object, the backend service will create a subscription with earliest initialPosition.
Likes this, the requestBody JSON object, is deserialized to API param
ResetCursorData.java
instance,ledgerId
andentryId
are set to0
.In Pulsar, if client didn't set a specific value, the expected is Latest. But in the situation above, subscription was created with earliest position.
Solution
In the creating subscription API, the requestBody param ResetCursorData.java, set
ledgerId
andentryId
toLong.MAX_VALUE
, notlong
default value0
.Alternatives
No response
Anything else?
No response
Are you willing to submit a PR?
The text was updated successfully, but these errors were encountered: