-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-18757: Create full-function SimpleAssignor to match KIP-932 description #18864
base: trunk
Are you sure you want to change the base?
Conversation
…hash for current assignment + unit test
…ulating hash for current assignment + unit test" This reverts commit 86a4c6f.
Hi @AndrewJSchofield @apoorvmittal10 , the step 3 described above is a little tricky to implement (since we can only know the current assignment, not whether it was calculated by step 1 or step 2). I have implemented a way to filter current assignment as required in step 3 in function |
…hash for current assignment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copilot reviewed 2 out of 3 changed files in this pull request and generated 1 comment.
Files not reviewed (1)
- core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala: Language not supported
Comments suppressed due to low confidence (1)
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java:304
- The 'partition' field should be declared as 'final' to make the 'TargetPartition' class immutable.
int partition;
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copilot reviewed 2 out of 3 changed files in this pull request and generated no comments.
Files not reviewed (1)
- core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala: Language not supported
I have amended the implementation of the step 3 of the assignment such that we will combine new and current assignment without revoking the assignments that were assigned by step 1 in the new assignment and have members in current assignment by step 2. This has been done to avoid the complexity in both the implementation and the run time complexity because as of now we can only get the current assignment while calculating the new assignment. We do not have a way to know with which step, a particular assignment happened in the current assignment. I do have a way with which we can recreate the step wise assignment using the current assignment but that involves sorting and unnecessary computation. Hence, I am deferring with that approach. |
Marked failed test as flaky in #18925. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. Only a partial review so far, but I've left some initial comments.
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, took an initial look. Some comments.
...oordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some comments, though seems PR as good starting point and we migh improve on better partition stickiness while revoking and assigning partitions.
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java
Outdated
Show resolved
Hide resolved
...oordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java
Outdated
Show resolved
Hide resolved
...oordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java
Outdated
Show resolved
Hide resolved
...oordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java
Outdated
Show resolved
Hide resolved
...oordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly looks good to me, can you please share the numbers with 16 partitions and 25 share consumers, with and without the PR.
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java
Outdated
Show resolved
Hide resolved
...oordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java
Show resolved
Hide resolved
@apoorvmittal10 , here are the numbers for 16 partitions and 25 share consumers - With PR - Without PR - As mentioned above as well, this PR reduces the sharing of topic partitions from the assignor, hence the decline in performance is expected. With the future PRs, the performance should reach an optimum number. |
Just to clarify, how was the partition allocation with the current PR code. Also if members are removed and added then there would be more sharing of partitions as per the combine logic in the PR, correct? Will it affect the performance? |
right now, most of the members had 1-2 topic partitions allocated to them excepted 1-2 members which had a good 12-14 partitions assigned to them. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, given the code of the simple assignor will change in future PRs. One comment to address.
core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Just two nit for question but they are not very important.
...oordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java
Show resolved
Hide resolved
...oordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java
Show resolved
Hide resolved
// the burden of certain members of the share groups. This can be achieved with the help of limiting the max | ||
// no. of partitions assignment for every member(KAFKA-18788). Hence, the potential problem of burdening | ||
// the share consumers will be addressed in a future PR. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't the following do the job a bit better?
newAssignment.forEach((targetPartition, members) -> members.forEach(member ->
finalAssignment.computeIfAbsent(member, k -> new HashSet<>()).add(targetPartition)));
currentAssignment.forEach((targetPartition, members) -> {
if (subscribedTopicIds.contains(targetPartition.topicId())) {}
members.forEach(member -> {
if (groupSpec.memberIds().contains(member) && !newAssignment.containsKey(targetPartition))
finalAssignment.computeIfAbsent(member, k -> new HashSet<>()).add(targetPartition);
});
});
The problem with the code as it currently exists is that it assigns all partitions to the first member, and then as other members join, it leaves all partitions with the first member in spite of assigning the partitions to the other members.
What the snippet above does is essentially give precedence to the new assignment, and only copies over information from the current assignment which augments the new assignment. It's still not perfect because the round-robin nature of the reassignment is not sophisticated enough, but I think it's probably better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense. This will help reduce burdening of members, though it affects the stickiness of assignments now since we are revoking the assignments from current assignment. Now, we'll need to think of a way for optimum sharing in the future PRs. I have made this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm. Needs a bit more refinement, but this is a good start.
About
The current
SimpleAssignor
in AK assigned all subscribed topic partitions to all the share group members. This does not match the description given in KIP-932. Here are the rules as mentioned in the KIP by which the assignment should happen. We have changed the step 3 implementation here due to the reasons described -Tests
The added code has been verified with unit tests and the already present integration tests.