You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Pulsar comes with an inbuilt schema registry and a pluggable schema storage. Schema storage's default implementation is bookkeeper-based but a new implementation can be provided by the user. Schema registry on the other hand interacts with the configured schema storage and offers ability to do schema CRUD as well as performs compatibility checks.
Since schema registry is not pluggable, it is not possible as of today to enhance/extend what pulsar offers out of the box. This is usually true in organizations where an existing schema registry is already present and used in other systems that producers and consumers interact with. Having a pluggable storage is not sufficient, some of the reasons are:
The user may want to cache (schema-version,schema-data) in-memory rather than doing a fetch from schema storage for scaling purposes.
The user has a compatibility evaluation logic of their own. For example, the user wants to implement semver-based versioning of schemas with minor versions being forwards-compatible and major versions indicating breaking changes.
Additional policies need to be applied to compatibility or schema fetch patterns depending on business requirements and the schema in question.
For above reasons, we are proposing support for a pluggable schema registry in pulsar.
Goal
The scope of this is to make schema registry service pluggable in order to support different forms of versioning and compatibility checks, depending on the use case.
All of the changes are on the broker-side and backwards compatible. Existing schema registry service implementation will continue to remain the default one. Users will be able to provide a different registry by specifying the class name in the broker config and loading the relevant jar on the broker's class path. The experience for pluggable schema registry is going to be the same as the one for pluggable schema storage.
User-facing addition of flag in broker.conf
# Override the schema registry used by pulsar with a custom implementation. If this config is not provided,
# the default schema registry (SchemaRegistryServiceImpl) will be used.
schemaRegistryClassName=
Example of how a custom schema registry can be implemented by the user where they only want to change a certain set of behaviors and reuse the rest.
publicclassMySchemaRegistryextendsSchemaRegistryServiceImpl {
@Overridepublicvoidinitialize(ServiceConfigurationconfiguration, SchemaStorageschemaStorage) throwsPulsarServerException {
super.initialize(configuration, schemaStorage);
//read config and do some other op
}
@OverridepublicCompletableFuture<List<CompletableFuture<SchemaAndMetadata>>> getAllSchemas(StringschemaId) {
CompletableFuture<List<CompletableFuture<SchemaAndMetadata>>> schemas = super.getAllSchemas(schemaId);
//apply some filtering logic and return
}
@OverridepublicCompletableFuture<Boolean> isCompatible(StringschemaId, SchemaDataschema, SchemaCompatibilityStrategystrategy) {
//custom operation
}
}
API Changes
Add a configuration parameter to set the schema registry to be used. This is illustrated in the "Goals" section above.
Add an initialize(...) method to SchemaRegistryService interface, similar in lines to the AuthorizationProvider interface already existing in Pulsar. This will ensure initialization of the plugged in schema registry with the required dependencies.
Make the schema registry configurable via broker config. The changes are:
Add a new config called schemaRegistryClassName in ServiceConfiguration with a default value of org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl
Use initialize(...) method instead of constructor to set dependencies like schema storage and compatibility checkers. This is being done to ensure that the contract becomes explicit to the implementer of the interface rather than having an assumption in the codebase around what the constructor params should be. The changes are:
Add an initialize(...) method in SchemaRegistryService without an implementation
Update create(...) method in SchemaRegistryService to create the instance of schema registry based on the ServiceConfiguration and initialize(...) method, instead of using a particular constructor overload.
Renaming a few methods in the SchemaRegistryService interface to reflect their behavior. The changes are:
Rename deleteSchema to putEmptySchema in SchemaRegistryService
Rename deleteSchemaStorage to deleteSchemaFromStorage in SchemaRegistryService
Remove checkCompatible from SchemaRegistryService and make it a private method in SchemaRegistryServiceImpl because it is not used anywhere else
Make compatibilityChecks, schemaStorage and clock non-final in the default implementation as they are now being set in initialize(...) method
Move getCheckers(...) from SchemaRegistryService to SchemaRegistryServiceImpl since this behavior is tied to how default schema registry implementation works.
Move the null check on schemaStorage from SchemaRegistryService to PulsarService. The null check will now be used to decide the name of the schema registry which needs to be instantiated.
This change has been done and a PR has been raised here: #14102
Reject Alternatives
We had a requirement to override/change the way compatibility logic is performed in the default schema registry. To that end, it was considered separating out schema validation/compatibility logic into a different interface and impl, similar to how schema storage is modeled as of today. The user will be able to use the default schema registry but influence the behavior of compatibility checks by providing their own implementation.
We did not end up taking this route because in most of the cases, the default compatibility checks should make sense as-is. And in scenarios where they require change, users can always plug in a new schema registry and use the default schema registry for behavior they want to retain using composition. This is demonstrated in the example given in the "Goals" section.
We had a requirement to influence the list of schemas returned by the schema registry. To that end, we considered changing the SchemaStorage implementation itself (which is already pluggable and user-facing as of today). But the filtering needed to be done on the properties in SchemaData/SchemaInfo, and doing the deserialization in the schema storage layer did not match its purpose.
Motivation
Pulsar comes with an inbuilt schema registry and a pluggable schema storage. Schema storage's default implementation is bookkeeper-based but a new implementation can be provided by the user. Schema registry on the other hand interacts with the configured schema storage and offers ability to do schema CRUD as well as performs compatibility checks.
Since schema registry is not pluggable, it is not possible as of today to enhance/extend what pulsar offers out of the box. This is usually true in organizations where an existing schema registry is already present and used in other systems that producers and consumers interact with. Having a pluggable storage is not sufficient, some of the reasons are:
For above reasons, we are proposing support for a pluggable schema registry in pulsar.
Goal
The scope of this is to make schema registry service pluggable in order to support different forms of versioning and compatibility checks, depending on the use case.
All of the changes are on the broker-side and backwards compatible. Existing schema registry service implementation will continue to remain the default one. Users will be able to provide a different registry by specifying the class name in the broker config and loading the relevant jar on the broker's class path. The experience for pluggable schema registry is going to be the same as the one for pluggable schema storage.
User-facing addition of flag in broker.conf
Example of how a custom schema registry can be implemented by the user where they only want to change a certain set of behaviors and reuse the rest.
API Changes
Add a configuration parameter to set the schema registry to be used. This is illustrated in the "Goals" section above.
Add an
initialize(...)
method toSchemaRegistryService
interface, similar in lines to theAuthorizationProvider
interface already existing in Pulsar. This will ensure initialization of the plugged in schema registry with the required dependencies.void initialize(ServiceConfiguration configuration, SchemaStorage schemaStorage) throws PulsarServerException;
Implementation
These are the changes covered in the PR:
schemaRegistryClassName
inServiceConfiguration
with a default value oforg.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl
initialize(...)
method instead of constructor to set dependencies like schema storage and compatibility checkers. This is being done to ensure that the contract becomes explicit to the implementer of the interface rather than having an assumption in the codebase around what the constructor params should be. The changes are:initialize(...)
method inSchemaRegistryService
without an implementationcreate(...)
method inSchemaRegistryService
to create the instance of schema registry based on theServiceConfiguration
andinitialize(...)
method, instead of using a particular constructor overload.SchemaRegistryService
interface to reflect their behavior. The changes are:deleteSchema
toputEmptySchema
inSchemaRegistryService
deleteSchemaStorage
todeleteSchemaFromStorage
inSchemaRegistryService
checkCompatible
fromSchemaRegistryService
and make it a private method inSchemaRegistryServiceImpl
because it is not used anywhere elsecompatibilityChecks
,schemaStorage
andclock
non-final in the default implementation as they are now being set ininitialize(...)
methodgetCheckers(...)
fromSchemaRegistryService
toSchemaRegistryServiceImpl
since this behavior is tied to how default schema registry implementation works.SchemaRegistryService
toPulsarService
. The null check will now be used to decide the name of the schema registry which needs to be instantiated.This change has been done and a PR has been raised here: #14102
Reject Alternatives
We did not end up taking this route because in most of the cases, the default compatibility checks should make sense as-is. And in scenarios where they require change, users can always plug in a new schema registry and use the default schema registry for behavior they want to retain using composition. This is demonstrated in the example given in the "Goals" section.
SchemaStorage
implementation itself (which is already pluggable and user-facing as of today). But the filtering needed to be done on the properties inSchemaData
/SchemaInfo
, and doing the deserialization in the schema storage layer did not match its purpose.Authors
Aparajita Singh (https://github.com/aparajita89)
Ankur Jain (https://github.com/anvinjain)
The text was updated successfully, but these errors were encountered: