Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18601: Assume a baseline of 3.3 for server protocol versions #18845

Merged
merged 33 commits into from
Feb 19, 2025
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
e654cdb
KAFKA-18601: Assume a baseline of 3.3 for server protocol versions
ijuma Feb 1, 2025
059866e
Fix a few tests and tweak error messages
ijuma Feb 10, 2025
86d1053
Minor improvements, address review comments and fix tests
ijuma Feb 10, 2025
97e2c43
Formatting fixes
ijuma Feb 10, 2025
b60fbe4
Minor tweak
ijuma Feb 10, 2025
4338a21
Merge remote-tracking branch 'apache-github/trunk' into kafka-18601-a…
ijuma Feb 11, 2025
9fb5b3d
Remove `ibp` parameter from `BootstrapDirectory`
ijuma Feb 11, 2025
3203398
Address a couple of minor review comments
ijuma Feb 11, 2025
b31bc04
Merge remote-tracking branch 'apache-github/trunk' into kafka-18601-a…
ijuma Feb 11, 2025
8531c36
Address review comments
ijuma Feb 11, 2025
8a81124
Replace IBP_3_3_IV3 with MINIMUM_VERSION where applicable
ijuma Feb 11, 2025
cdd7c9a
Set `topicId` a bit more consistently in `PartitionTest`
ijuma Feb 11, 2025
942bd2a
Improve comments and remove code that is no longer needed
ijuma Feb 11, 2025
2f32f35
Address review comments
ijuma Feb 11, 2025
b93ba62
Don't set metadata version to the `MINIMUM_VERSION` and rely only on …
ijuma Feb 12, 2025
d8f5346
Add fallback in case we read a feature level we don't understand (for…
ijuma Feb 12, 2025
27e33c4
Simplify code based on code review
ijuma Feb 12, 2025
2d40e5b
Remove metadata version default
ijuma Feb 12, 2025
32a4a94
More IBP_3_3_IV3 -> MINIMUM_VERSION
ijuma Feb 12, 2025
4b331af
Reject broker registration without metadata.version and fix tests
ijuma Feb 13, 2025
70667a4
Update MetadataBatchLoader.resetToImage to set `hasSeenRecord` based …
ijuma Feb 14, 2025
d66ef98
Merge remote-tracking branch 'apache-github/trunk' into kafka-18601-a…
ijuma Feb 14, 2025
efaf362
Merge remote-tracking branch 'apache-github/trunk' into kafka-18601-a…
ijuma Feb 14, 2025
a551f46
Fix test failures
ijuma Feb 15, 2025
5834410
Make FeaturesImage.metadataVersion optional and fail-fast instead of …
ijuma Feb 15, 2025
afb9d97
Minor tweak
ijuma Feb 15, 2025
d5860c4
More 3_3_IV3 -> MINIMUM_VERSION
ijuma Feb 15, 2025
4505d0a
Fix several tests
ijuma Feb 15, 2025
7fcb280
Fix ReconfigurableQuorumIntegrationTest and QuorumControllerTest by a…
ijuma Feb 18, 2025
d505508
Minor clean-ups in `ReconfigurableQuorumIntegrationTest`
ijuma Feb 18, 2025
07a728b
"3-3-IV3" -> `MINIMUM_VERSION.toString`
ijuma Feb 18, 2025
52fd223
Remove metadata version fallback from `FeaturesDelta` - snapshots hap…
ijuma Feb 18, 2025
6c5a0f9
Address review feedback.
ijuma Feb 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
"type": "request",
"listeners": ["controller"],
"name": "AlterPartitionRequest",
// Versions 0-1 were removed in Apache Kafka 4.0, version 2 is the new baseline.

// Version 1 adds LeaderRecoveryState field (KIP-704).
//
// Version 2 adds TopicId field to replace TopicName field (KIP-841).
//
// Version 3 adds the NewIsrEpochs field and deprecates the NewIsr field (KIP-903).
"validVersions": "0-3",
"validVersions": "2-3",
"flexibleVersions": "0+",
"fields": [
{ "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
Expand All @@ -32,8 +33,6 @@
"about": "The epoch of the requesting broker." },
{ "name": "Topics", "type": "[]TopicData", "versions": "0+",
"about": "The topics to alter ISRs for.", "fields": [
{ "name": "TopicName", "type": "string", "versions": "0-1", "ignorable": true, "entityType": "topicName",
"about": "The name of the topic to alter ISRs for." },
{ "name": "TopicId", "type": "uuid", "versions": "2+", "ignorable": true,
"about": "The ID of the topic to alter ISRs for." },
{ "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@
"apiKey": 56,
"type": "response",
"name": "AlterPartitionResponse",
// Versions 0-1 were removed in Apache Kafka 4.0, version 2 is the new baseline.

// Version 1 adds LeaderRecoveryState field (KIP-704).
//
// Version 2 adds TopicId field to replace TopicName field, can return the following new errors:
// INELIGIBLE_REPLICA, NEW_LEADER_ELECTED and UNKNOWN_TOPIC_ID (KIP-841).
//
// Version 3 is the same as version 2 (KIP-903).
"validVersions": "0-3",
"validVersions": "2-3",
"flexibleVersions": "0+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
Expand All @@ -32,8 +33,6 @@
"about": "The top level response error code." },
{ "name": "Topics", "type": "[]TopicData", "versions": "0+",
"about": "The responses for each topic.", "fields": [
{ "name": "TopicName", "type": "string", "versions": "0-1", "ignorable": true, "entityType": "topicName",
"about": "The name of the topic." },
{ "name": "TopicId", "type": "uuid", "versions": "2+", "ignorable": true,
"about": "The ID of the topic." },
{ "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import static org.junit.jupiter.api.Assertions.assertTrue;

class AlterPartitionRequestTest {
String topic = "test-topic";
Uuid topicId = Uuid.randomUuid();

@ParameterizedTest
Expand All @@ -44,9 +43,7 @@ public void testBuildAlterPartitionRequest(short version) {
.setBrokerId(1)
.setBrokerEpoch(1);

TopicData topicData = new TopicData()
.setTopicId(topicId)
.setTopicName(topic);
TopicData topicData = new TopicData().setTopicId(topicId);

List<BrokerState> newIsrWithBrokerEpoch = new LinkedList<>();
newIsrWithBrokerEpoch.add(new BrokerState().setBrokerId(1).setBrokerEpoch(1001));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1097,7 +1097,7 @@ private AbstractResponse getResponse(ApiKeys apikey, short version) {
case BEGIN_QUORUM_EPOCH: return createBeginQuorumEpochResponse();
case END_QUORUM_EPOCH: return createEndQuorumEpochResponse();
case DESCRIBE_QUORUM: return createDescribeQuorumResponse();
case ALTER_PARTITION: return createAlterPartitionResponse(version);
case ALTER_PARTITION: return createAlterPartitionResponse();
case UPDATE_FEATURES: return createUpdateFeaturesResponse();
case ENVELOPE: return createEnvelopeResponse();
case FETCH_SNAPSHOT: return createFetchSnapshotResponse();
Expand Down Expand Up @@ -1695,41 +1695,31 @@ private AlterPartitionRequest createAlterPartitionRequest(short version) {
.setPartitionIndex(1)
.setPartitionEpoch(2)
.setLeaderEpoch(3)
.setNewIsrWithEpochs(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(asList(1, 2)));

if (version >= 1) {
// Use the none default value; 1 - RECOVERING
partitionData.setLeaderRecoveryState((byte) 1);
}
.setNewIsrWithEpochs(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(asList(1, 2)))
.setLeaderRecoveryState((byte) 1); // non-default value

AlterPartitionRequestData data = new AlterPartitionRequestData()
.setBrokerEpoch(123L)
.setBrokerId(1)
.setTopics(singletonList(new AlterPartitionRequestData.TopicData()
.setTopicName("topic1")
.setTopicId(Uuid.randomUuid())
.setPartitions(singletonList(partitionData))));
return new AlterPartitionRequest.Builder(data).build(version);
}

private AlterPartitionResponse createAlterPartitionResponse(int version) {
private AlterPartitionResponse createAlterPartitionResponse() {
AlterPartitionResponseData.PartitionData partitionData = new AlterPartitionResponseData.PartitionData()
.setPartitionEpoch(1)
.setIsr(asList(0, 1, 2))
.setErrorCode(Errors.NONE.code())
.setLeaderEpoch(2)
.setLeaderId(3);

if (version >= 1) {
// Use the none default value; 1 - RECOVERING
partitionData.setLeaderRecoveryState((byte) 1);
}
.setLeaderId(3)
.setLeaderRecoveryState((byte) 1); // non-default value

AlterPartitionResponseData data = new AlterPartitionResponseData()
.setErrorCode(Errors.NONE.code())
.setThrottleTimeMs(123)
.setTopics(singletonList(new AlterPartitionResponseData.TopicData()
.setTopicName("topic1")
.setTopicId(Uuid.randomUuid())
.setPartitions(singletonList(partitionData))));
return new AlterPartitionResponse(data);
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ object Partition {
time: Time,
replicaManager: ReplicaManager): Partition = {
Partition(
topicPartition = topicIdPartition.topicPartition(),
topicId = Option(topicIdPartition.topicId()),
topicPartition = topicIdPartition.topicPartition,
topicId = Some(topicIdPartition.topicId),
time = time,
replicaManager = replicaManager)
}
Expand Down Expand Up @@ -1813,7 +1813,7 @@ class Partition(val topicPartition: TopicPartition,
private def submitAlterPartition(proposedIsrState: PendingPartitionChange): CompletableFuture[LeaderAndIsr] = {
debug(s"Submitting ISR state change $proposedIsrState")
val future = alterIsrManager.submit(
new TopicIdPartition(topicId.getOrElse(Uuid.ZERO_UUID), topicPartition),
new org.apache.kafka.server.common.TopicIdPartition(topicId.getOrElse(throw new IllegalStateException("Topic id not set for " + topicPartition)), topicPartition.partition),
proposedIsrState.sentLeaderAndIsr
)
future.whenComplete { (leaderAndIsr, e) =>
Expand Down
106 changes: 36 additions & 70 deletions core/src/main/scala/kafka/server/AlterPartitionManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,18 @@
*/
package kafka.server

import java.util
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.{CompletableFuture, ConcurrentHashMap}
import kafka.utils.Logging
import org.apache.kafka.clients.ClientResponse
import org.apache.kafka.common.TopicIdPartition
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.errors.OperationNotAttemptedException
import org.apache.kafka.common.message.AlterPartitionRequestData
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.RequestHeader
import org.apache.kafka.common.requests.{AlterPartitionRequest, AlterPartitionResponse}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState}
import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, MetadataVersion, NodeToControllerChannelManager}
import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, NodeToControllerChannelManager, TopicIdPartition}
import org.apache.kafka.server.util.Scheduler

import scala.collection.mutable
Expand Down Expand Up @@ -92,8 +87,7 @@ object AlterPartitionManager {
scheduler = scheduler,
time = time,
brokerId = config.brokerId,
brokerEpochSupplier = brokerEpochSupplier,
metadataVersionSupplier = () => metadataCache.metadataVersion()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can remove metadataCache from the apply method

brokerEpochSupplier = brokerEpochSupplier
)
}
}
Expand All @@ -104,7 +98,6 @@ class DefaultAlterPartitionManager(
val time: Time,
val brokerId: Int,
val brokerEpochSupplier: () => Long,
val metadataVersionSupplier: () => MetadataVersion
) extends AlterPartitionManager with Logging {

// Used to allow only one pending ISR update per partition (visible for testing).
Expand All @@ -114,7 +107,7 @@ class DefaultAlterPartitionManager(
// and re-created, we cannot have two entries in this Map especially if we cannot
// use an AlterPartition request version which supports topic ids in the end because
// the two updates with the same name would be merged together.
private[server] val unsentIsrUpdates: util.Map[TopicPartition, AlterPartitionItem] = new ConcurrentHashMap[TopicPartition, AlterPartitionItem]()
private[server] val unsentIsrUpdates = new ConcurrentHashMap[TopicIdPartition, AlterPartitionItem]()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please update the docs according to the change?


// Used to allow only one in-flight request at a time
private val inflightRequest: AtomicBoolean = new AtomicBoolean(false)
Expand All @@ -133,7 +126,7 @@ class DefaultAlterPartitionManager(
): CompletableFuture[LeaderAndIsr] = {
val future = new CompletableFuture[LeaderAndIsr]()
val alterPartitionItem = AlterPartitionItem(topicIdPartition, leaderAndIsr, future)
val enqueued = unsentIsrUpdates.putIfAbsent(alterPartitionItem.topicIdPartition.topicPartition, alterPartitionItem) == null
val enqueued = unsentIsrUpdates.putIfAbsent(alterPartitionItem.topicIdPartition, alterPartitionItem) == null
if (enqueued) {
maybePropagateIsrChanges()
} else {
Expand Down Expand Up @@ -161,7 +154,7 @@ class DefaultAlterPartitionManager(

private def sendRequest(inflightAlterPartitionItems: Seq[AlterPartitionItem]): Unit = {
val brokerEpoch = brokerEpochSupplier()
val (request, topicNamesByIds) = buildRequest(inflightAlterPartitionItems, brokerEpoch)
val request = buildRequest(inflightAlterPartitionItems, brokerEpoch)
debug(s"Sending AlterPartition to controller $request")

// We will not timeout AlterPartition request, instead letting it retry indefinitely
Expand All @@ -182,11 +175,9 @@ class DefaultAlterPartitionManager(
Errors.UNSUPPORTED_VERSION
} else {
handleAlterPartitionResponse(
response.requestHeader,
response.responseBody.asInstanceOf[AlterPartitionResponse],
brokerEpoch,
inflightAlterPartitionItems,
topicNamesByIds
inflightAlterPartitionItems
)
}
} finally {
Expand Down Expand Up @@ -218,59 +209,40 @@ class DefaultAlterPartitionManager(
* supported by the controller. The final decision is taken when the AlterPartitionRequest
* is built in the network client based on the advertised api versions of the controller.
*
* @return A tuple containing the AlterPartitionRequest.Builder and a mapping from
* topic id to topic name. This mapping is used in the response handling.
* @return an AlterPartitionRequest.Builder with the provided parameters.
*/
private def buildRequest(
inflightAlterPartitionItems: Seq[AlterPartitionItem],
brokerEpoch: Long
): (AlterPartitionRequest.Builder, mutable.Map[Uuid, String]) = {
val metadataVersion = metadataVersionSupplier()
// We build this mapping in order to map topic id back to their name when we
// receive the response. We cannot rely on the metadata cache for this because
// the metadata cache is updated after the partition state so it might not know
// yet about a topic id already used here.
val topicNamesByIds = mutable.HashMap[Uuid, String]()

): AlterPartitionRequest.Builder = {
val message = new AlterPartitionRequestData()
.setBrokerId(brokerId)
.setBrokerEpoch(brokerEpoch)

inflightAlterPartitionItems.groupBy(_.topicIdPartition.topic).foreach { case (topicName, items) =>
val topicId = items.head.topicIdPartition.topicId
topicNamesByIds(topicId) = topicName

// Both the topic name and the topic id are set here because at this stage
// we don't know which version of the request will be used.
val topicData = new AlterPartitionRequestData.TopicData()
.setTopicName(topicName)
.setTopicId(topicId)
inflightAlterPartitionItems.groupBy(_.topicIdPartition.topicId).foreach { case (topicId, items) =>
val topicData = new AlterPartitionRequestData.TopicData().setTopicId(topicId)
message.topics.add(topicData)

items.foreach { item =>
val partitionData = new AlterPartitionRequestData.PartitionData()
.setPartitionIndex(item.topicIdPartition.partition)
.setPartitionIndex(item.topicIdPartition.partitionId)
.setLeaderEpoch(item.leaderAndIsr.leaderEpoch)
.setNewIsrWithEpochs(item.leaderAndIsr.isrWithBrokerEpoch)
.setPartitionEpoch(item.leaderAndIsr.partitionEpoch)

if (metadataVersion.isLeaderRecoverySupported) {
partitionData.setLeaderRecoveryState(item.leaderAndIsr.leaderRecoveryState.value)
}
partitionData.setLeaderRecoveryState(item.leaderAndIsr.leaderRecoveryState.value)

topicData.partitions.add(partitionData)
}
}

(new AlterPartitionRequest.Builder(message), topicNamesByIds)
new AlterPartitionRequest.Builder(message)
}

private def handleAlterPartitionResponse(
requestHeader: RequestHeader,
alterPartitionResp: AlterPartitionResponse,
sentBrokerEpoch: Long,
inflightAlterPartitionItems: Seq[AlterPartitionItem],
topicNamesByIds: mutable.Map[Uuid, String]
): Errors = {
val data = alterPartitionResp.data

Expand All @@ -284,37 +256,31 @@ class DefaultAlterPartitionManager(

case Errors.NONE =>
// Collect partition-level responses to pass to the callbacks
val partitionResponses = new mutable.HashMap[TopicPartition, Either[Errors, LeaderAndIsr]]()
val partitionResponses = new mutable.HashMap[TopicIdPartition, Either[Errors, LeaderAndIsr]]()
data.topics.forEach { topic =>
// Topic IDs are used since version 2 of the AlterPartition API.
val topicName = if (requestHeader.apiVersion > 1) topicNamesByIds.get(topic.topicId).orNull else topic.topicName
if (topicName == null || topicName.isEmpty) {
error(s"Received an unexpected topic $topic in the alter partition response, ignoring it.")
} else {
topic.partitions.forEach { partition =>
val tp = new TopicPartition(topicName, partition.partitionIndex)
val apiError = Errors.forCode(partition.errorCode)
debug(s"Controller successfully handled AlterPartition request for $tp: $partition")
if (apiError == Errors.NONE) {
LeaderRecoveryState.optionalOf(partition.leaderRecoveryState).toScala match {
case Some(leaderRecoveryState) =>
partitionResponses(tp) = Right(
new LeaderAndIsr(
partition.leaderId,
partition.leaderEpoch,
partition.isr,
leaderRecoveryState,
partition.partitionEpoch
)
topic.partitions.forEach { partition =>
val tp = new TopicIdPartition(topic.topicId, partition.partitionIndex)
val apiError = Errors.forCode(partition.errorCode)
debug(s"Controller successfully handled AlterPartition request for $tp: $partition")
if (apiError == Errors.NONE) {
LeaderRecoveryState.optionalOf(partition.leaderRecoveryState).toScala match {
case Some(leaderRecoveryState) =>
partitionResponses(tp) = Right(
new LeaderAndIsr(
partition.leaderId,
partition.leaderEpoch,
partition.isr,
leaderRecoveryState,
partition.partitionEpoch
)
)

case None =>
error(s"Controller returned an invalid leader recovery state (${partition.leaderRecoveryState}) for $tp: $partition")
partitionResponses(tp) = Left(Errors.UNKNOWN_SERVER_ERROR)
}
} else {
partitionResponses(tp) = Left(apiError)
case None =>
error(s"Controller returned an invalid leader recovery state (${partition.leaderRecoveryState}) for $tp: $partition")
partitionResponses(tp) = Left(Errors.UNKNOWN_SERVER_ERROR)
}
} else {
partitionResponses(tp) = Left(apiError)
}
}
}
Expand All @@ -323,11 +289,11 @@ class DefaultAlterPartitionManager(
// partition was somehow erroneously excluded from the response. Note that these callbacks are run from
// the leaderIsrUpdateLock write lock in Partition#sendAlterPartitionRequest
inflightAlterPartitionItems.foreach { inflightAlterPartition =>
partitionResponses.get(inflightAlterPartition.topicIdPartition.topicPartition) match {
partitionResponses.get(inflightAlterPartition.topicIdPartition) match {
case Some(leaderAndIsrOrError) =>
// Regardless of callback outcome, we need to clear from the unsent updates map to unblock further
// updates. We clear it now to allow the callback to submit a new update if needed.
unsentIsrUpdates.remove(inflightAlterPartition.topicIdPartition.topicPartition)
unsentIsrUpdates.remove(inflightAlterPartition.topicIdPartition)
leaderAndIsrOrError match {
case Left(error) => inflightAlterPartition.future.completeExceptionally(error.exception)
case Right(leaderAndIsr) => inflightAlterPartition.future.complete(leaderAndIsr)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class ControllerRegistrationManager(
/**
* The current metadata version that is in effect. Only read or written from the event queue thread.
*/
private var metadataVersion: MetadataVersion = MetadataVersion.MINIMUM_KRAFT_VERSION
private var metadataVersion: MetadataVersion = MetadataVersion.MINIMUM_VERSION

/**
* True if we're registered. Only read or written from the event queue thread.
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/KafkaRaftServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ object KafkaRaftServer {
}

// Load the BootstrapMetadata.
val bootstrapDirectory = new BootstrapDirectory(config.metadataLogDir, Optional.empty())
val bootstrapDirectory = new BootstrapDirectory(config.metadataLogDir)
val bootstrapMetadata = bootstrapDirectory.read()
(metaPropsEnsemble, bootstrapMetadata)
}
Expand Down
Loading
Loading