Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Set default value to MessageId.Latest in subscription creating API #23661

Labels
type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages

Comments

Copy link
Contributor

Search before asking

  • I searched in the issues and found nothing similar.

Motivation

We met a corner case when invoking subscription creating API. If client sends a empty JSON object, the backend service will create a subscription with earliest initialPosition.
Likes this, the requestBody JSON object, is deserialized to API param ResetCursorData.java instance, ledgerId and entryId are set to 0.
image

public class ResetCursorData {
    protected long ledgerId;
    protected long entryId;
    protected int partitionIndex = -1;
    protected boolean isExcluded = false;
    protected int batchIndex = -1;
    protected Map<String, String> properties;
    
}   

In Pulsar, if client didn't set a specific value, the expected is Latest. But in the situation above, subscription was created with earliest position.

Solution

In the creating subscription API, the requestBody param ResetCursorData.java, set ledgerId and entryId to Long.MAX_VALUE, not long default value 0.

Alternatives

No response

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!
type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages label Nov 29, 2024
Copy link
Contributor Author

Hi, @lhotari . What's your suggestion ? Feel free any comments.

@lhotari
Copy link
Member

lhotari commented Nov 29, 2024

In Pulsar, if client didn't set a specific value, the expected is Latest. But in the situation above, subscription was created with earliest position.

Since this is existing behavior in the API that you are using, it would be a breaking change to change it. It's better to solve this type of cases by documenting the behavior instead of changing the default behavior.

The value is set to latest in the Java Admin client code:

ResetCursorData resetCursorData = messageId != null
? new ResetCursorData(messageId) : new ResetCursorData(MessageId.latest);

It's not impossible to change the default behavior in the broker side code, but that would require a Pulsar Improvement Proposal (PIP). Is it really worth it?

@lhotari
Copy link
Member

lhotari commented Nov 29, 2024

The first step would be to document the existing behavior to https://pulsar.apache.org/docs/next/admin-api-topics/#create-subscription and to OpenAPI spec (generated from Swagger annotations).

image

Copy link
Contributor Author

Since this is existing behavior in the API that you are using, it would be a breaking change to change it. It's better to solve this type of cases by documenting the behavior instead of changing the default behavior.

The value is set to latest in the Java Admin client code:

ResetCursorData resetCursorData = messageId != null
? new ResetCursorData(messageId) : new ResetCursorData(MessageId.latest);

It's not impossible to change the default behavior in the broker side code, but that would require a Pulsar Improvement Proposal (PIP). Is it really worth it?

Okay, I understand you, seems like a better approach is for the client to use the API correctly

@lhotari
Copy link
Member

lhotari commented Nov 29, 2024

Okay, I understand you, seems like a better approach is for the client to use the API correctly

Yes, and only by documenting the API will it be possible to know how to use it. That's why contributing to documentation is important and useful.

Copy link
Contributor Author

Hi, @lhotari . I still regard this issue as a bug. Here's a case, if customer create a subscription only with properties, there will be a different behavior between rest API and admin CLI. e.g.

Reproduce step

  1. create a topic and produce some messages.
  2. create subscriptions by admin CLI and rest API, only with properties setting, DO NOT specify the ledgerId, entryId, or Latest/Earliest.
  3. check the topic's stats-internal

Expectation

By default, the 2 subscriptions will consume message from the Latest.

Actually

Checking stats-internal, we got different readPosition value.

Suggestion

To set ledgerId and entryId default value to Long.MAX_VALUE

public class ResetCursorData {
    protected long ledgerId;
    protected long entryId;
    protected int partitionIndex = -1;
    protected boolean isExcluded = false;
    protected int batchIndex = -1;
    protected Map<String, String> properties;
    
}   

Execution record

visxu➜~/pulsar-cluster-3.0.6» ./pulsar-1/bin/pulsar-admin topics create public/default/topic-x                                                                                                                         [8:38:50]
visxu➜~/pulsar-cluster-3.0.6» ./pulsar-1/bin/pulsar-client produce public/default/topic-x --messages "Hello, Pulsar1"                                                                                                  [8:39:03]
visxu➜~/pulsar-cluster-3.0.6» ./pulsar-1/bin/pulsar-client produce public/default/topic-x --messages "Hello, Pulsar2"                                                                                                  [8:39:24]
visxu➜~/pulsar-cluster-3.0.6» ./pulsar-1/bin/pulsar-client produce public/default/topic-x --messages "Hello, Pulsar3"                                                                                                  [8:39:29]
visxu➜~/pulsar-cluster-3.0.6» ./pulsar-1/bin/pulsar-admin topics create-subscription --subscription sub-x-1 --property key1=value1 public/default/topic-x  

visxu➜~/pulsar-cluster-3.0.6» curl --location --request PUT 'http://127.0.0.1:18080/admin/v2/persistent/public/default/topic-x/subscription/sub-x-2' \                                                                 [8:40:03]
--header 'Content-Type: application/json' \
--data '{
    "properties": {
        "key2": "value2"
    }
}'
visxu➜~/pulsar-cluster-3.0.6» ./pulsar-1/bin/pulsar-admin topics stats-internal public/default/topic-x                                                                                                                 [8:40:37]
{
  "entriesAddedCounter" : 6,
  "numberOfEntries" : 6,
  "totalSize" : 327,
  "currentLedgerEntries" : 6,
  "currentLedgerSize" : 327,
  "lastLedgerCreatedTimestamp" : "2024-12-11T08:39:03.068+08:00",
  "waitingCursorsCount" : 0,
  "pendingAddEntriesCount" : 0,
  "lastConfirmedEntry" : "63:5",
  "state" : "LedgerOpened",
  "ledgers" : [ {
    "ledgerId" : 63,
    "entries" : 0,
    "size" : 0,
    "offloaded" : false,
    "underReplicated" : false
  } ],
  "cursors" : {
    "sub-x-1" : {
      "markDeletePosition" : "63:5",
      "readPosition" : "63:6",
      "waitingReadOp" : false,
      "pendingReadOps" : 0,
      "messagesConsumedCounter" : 6,
      "cursorLedger" : 64,
      "cursorLedgerLastEntry" : 1,
      "individuallyDeletedMessages" : "[]",
      "lastLedgerSwitchTimestamp" : "2024-12-11T08:40:17.804+08:00",
      "state" : "Open",
      "active" : false,
      "numberOfEntriesSinceFirstNotAckedMessage" : 1,
      "totalNonContiguousDeletedMessagesRange" : 0,
      "subscriptionHavePendingRead" : false,
      "subscriptionHavePendingReplayRead" : false,
      "properties" : { }
    },
    "sub-x-2" : {
      "markDeletePosition" : "63:-1",
      "readPosition" : "63:0",
      "waitingReadOp" : false,
      "pendingReadOps" : 0,
      "messagesConsumedCounter" : 0,
      "cursorLedger" : 65,
      "cursorLedgerLastEntry" : 1,
      "individuallyDeletedMessages" : "[]",
      "lastLedgerSwitchTimestamp" : "2024-12-11T08:40:34.635+08:00",
      "state" : "Open",
      "active" : false,
      "numberOfEntriesSinceFirstNotAckedMessage" : 1,
      "totalNonContiguousDeletedMessagesRange" : 0,
      "subscriptionHavePendingRead" : false,
      "subscriptionHavePendingReplayRead" : false,
      "properties" : { }
    }
  },
  "schemaLedgers" : [ ],
  "compactedLedger" : {
    "ledgerId" : -1,
    "entries" : -1,
    "size" : -1,
    "offloaded" : false,
    "underReplicated" : false
  }
}
visxu➜~/pulsar-cluster-3.0.6»                                                                                                                                                                                          [8:40:49]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages
Projects
None yet
Development

No branches or pull requests

2 participants