Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18813: ConsumerGroupHeartbeat API and ConsumerGroupDescribe API must check topic describe #18989

Open
wants to merge 37 commits into
base: trunk
Choose a base branch
from

Conversation

dongnuo123
Copy link
Collaborator

@dongnuo123 dongnuo123 commented Feb 21, 2025

This patch filters out the topic describe unauthorized topics from the ConsumerGroupHeartbeat and ConsumerGroupDescribe response.

In ConsumerGroupHeartbeat,

  • if the request has subscribedTopicNames set, we directly check the authz in KafkaApis and return a topic auth failure in the response if any of the topics is denied.
  • Otherwise, we check the authz only if a regex refresh is triggered and we do it based on the acl of the consumer that triggered the refresh. If any of the topic is denied, we filter it out from the resolved subscription.

In ConsumerGroupDescribe, we check the authz of the coordinator response. If any of the topic in the group is denied, we remove the described info and add a topic auth failure to the described group. (similar to the group auth failure)

Reviewers: David Jacot [email protected], Lianet Magrans [email protected], Rajini Sivaram [email protected], Chia-Ping Tsai [email protected], TaiJuWu [email protected], TengYao Chi [email protected]

@github-actions github-actions bot added triage PRs from the community core Kafka Broker clients labels Feb 21, 2025
@github-actions github-actions bot removed the triage PRs from the community label Feb 21, 2025
@dajac dajac added KIP-848 The Next Generation of the Consumer Rebalance Protocol Blocker This pull request is identified as solving a blocker for a release. ci-approved labels Feb 21, 2025
Copy link
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongnuo123 Thanks for the patch. I left some high level comments.

@github-actions github-actions bot added the KIP-932 Queues for Kafka label Feb 21, 2025
@dajac
Copy link
Member

dajac commented Feb 22, 2025

@dongnuo123 Thanks for the update. The patch looks pretty good to me. There are some test failures which are related. Could you please check them with @lianetm?

Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongnuo123 thanks for this patch.

@@ -2529,9 +2529,24 @@ class KafkaApis(val requestChannel: RequestChannel,
requestHelper.sendMaybeThrottle(request, consumerGroupHeartbeatRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
CompletableFuture.completedFuture[Unit](())
} else {
if (consumerGroupHeartbeatRequest.data.subscribedTopicNames != null &&
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does handleShareGroupHeartbeat have similar issue?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes but it will be treated separately. This is a critical blocker for 4.0 for the new consumer protocol whereas it is not for share group.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that there is already one open but I cannot find it from my phone.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks! will close KAFKA-18851 as duplicate

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to complete the story, we need this for KS too, they also have their jira already https://issues.apache.org/jira/browse/KAFKA-18819

}
}
})
val authorizedTopics = authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems handleDescribeGroupsRequest does not require topic authorization. Does it mean users need to update the ACLs when migrating to use AsyncConsumer?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That’s right. The thing is that we cannot apply it on there because the broker is not aware of the topic partitions. It just sees bytes by design.

We could consider parsing the bytes to check the partitions too but as it has been like this since the beginning of the classic protocol. We would need a KIP to discuss it for sure. Personally, I would not change it.

Regarding your second question, the Consumer is not impacted by this as both implementations require topic describe but in different ways. Only the admin client will require extra permissions if not given yet.

@dongnuo123 Could you add a sentence about it in the doc? We could mention this in the ops.html, consumer group section.

@@ -645,6 +645,7 @@ class BrokerServer(
.withGroupCoordinatorMetrics(new GroupCoordinatorMetrics(KafkaYammerMetrics.defaultRegistry, metrics))
.withGroupConfigManager(groupConfigManager)
.withPersister(persister)
.withAuthorizer(authorizer.toJava)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we consider splitting the processing of heartbeat request in the group coordinator so that topic authorization can continue to be in KafkaApis? Moving some of the authorization out of KafkaApis and into the group coordinator doesn't seem ideal. If we could instead ask the coordinator first for list of topics to filter out for regex and then invoke GroupCoordinatorService.consumerGroupHeartbeat() passing the filtered topics as well, we could avoid splitting out authorization between KafkaApis and group coordinator. And it would avoid knowledge of logIfDenied etc. mentioned in https://github.com/apache/kafka/pull/18989/files#r1966700208 outside of KafkaApis.

Copy link
Collaborator Author

@dongnuo123 dongnuo123 Feb 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we could instead ask the coordinator first for list of topics to filter out for regex

With the current implementation, the challenge with this is that the resolution and filtering of the regex are done in an async fashion. consumerGroupHeartbeat will return before we finish resolving the list of topics to filter which is done in a separate thread in the coordinator.

But it does make sense that it's a bit strange carrying the authorizer to the coordinator. I think we can work on it as a followup improvement.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, do we have a jira for it @dongnuo123 ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rajinisivaram any concerns with this as follow-up? It will be handled to improve right after this and included in 4.1 as described in KAFKA-18857

@lianetm
Copy link
Member

lianetm commented Feb 24, 2025

I'm taking a closer look at the changes, but regarding the test failures, they show the consumer getting more than one TopicAuth after the ACLs are added (before this change it expected at most 1 more TopicAuth here).

I wonder if that expectation is too strict now, that we send the error via HBs continuously on the interval, even if it fails with error (vs before we would only get the auth error from the Metadata path). The consumer should recover, but I don't think we can guarantee that it will only get 1 TopicAuth after the ACLs are added.

if (authorizedTopics.size < consumerGroupHeartbeatRequest.data.subscribedTopicNames.size) {
val responseData = new ConsumerGroupHeartbeatResponseData()
.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
.setErrorMessage("The client is not authorized to describe the provided subscribed topics.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in this case we had a request with a list of topics, and we're already sending a response with errorCode TOPIC_AUTH_FAILED, so I wonder if adding this message is bringing much value?

Also, it's not consistent with how other APIs handle the same situation. Ex. on the metadata path, we get a request with topic list and if auth fails the response will only include the error code, no custom message. This inconsistency is relevant because it will translate into the consumer btw (whatever topic auth message we get from metadata or HB will bubble up to the consumer.poll). We can always make it consistent on the client side of course, but seems to me that making it consistent here makes more sense given that the message seems redundant. Thoughts?

@dongnuo123 dongnuo123 changed the title KAFKA-18813: [1/N] ConsumerGroupHeartbeat API and ConsumerGroupDescribe API must check topic describe KAFKA-18813: ConsumerGroupHeartbeat API and ConsumerGroupDescribe API must check topic describe Feb 24, 2025
@chia7712
Copy link
Member

@dongnuo123 could you please sync trunk for #18770?

new ConsumerGroupDescribeResponseData.DescribedGroup()
.setGroupId(group.groupId)
.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
.setErrorMessage("The group has described topic(s) that the client is not authorized to describe.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this align #18989 (comment) ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this one I find it valuable, not the same case as my other comment. In this case we don't have topics in the request, so getting a topic_auth in response may not be clear enough, and this generic msg may be helpful.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, thanks.

Copy link
Contributor

@frankvicky frankvicky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the patch
I have some comments. PTAL

Copy link
Member

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates @dongnuo123 ! Some minor comments

Copy link
Member

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates @dongnuo123, LGTM.

Just to recap, we have KAFKA-18857 as follow-up, and related jiras to address this same gap for share (KAFKA-18817) and streams (KAFKA-18819) groups (not 4.0 blockers)

I will wait for the build, and confirmation on this comment https://github.com/apache/kafka/pull/18989/files#r1969914513 in case @rajinisivaram or @chia7712 have any other comments.

Thanks!

.setMemberId(memberId2)
.setMemberEpoch(10)
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicRegex("foo*|bar*")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just for the record, this makes me notice that we require a HB with regex in this case (and makes sense, because IIRC we only refresh it if it changes or if there is a new topic metadata image, which is not the case here).

I remember there was already a jira/intention to consider time-based refresh, which I expect would help improve the experience in this case. I will find the jira (or file) and update here just for reference.

Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall LGTM!

try {
consumeRecords(consumer, numRecords, startingOffset, topic)
} catch {
case _: TopicAuthorizationException => consumeRecordsIgnoreAuthorizationException(consumer, numRecords, startingOffset, topic)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this recursive call necessary? TestUtils.waitUntilTrue already contains a loop, so perhaps we can streamline the check as follows:

      TestUtils.waitUntilTrue(() => {
        try {
          consumeRecords(consumer)
          true
        } catch {
          case _: TopicAuthorizationException => false
        }
      }, "Consumer didn't manage to consume the records within timeout.")

@@ -2592,6 +2607,34 @@ class KafkaApis(val requestChannel: RequestChannel,
response.groups.addAll(results)
}

// Clients are not allowed to see topics that are not authorized for Describe.
val topicsToCheck = response.groups.stream()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one small optimization - could we do this check only if authorizer is defined?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Blocker This pull request is identified as solving a blocker for a release. ci-approved clients core Kafka Broker KIP-848 The Next Generation of the Consumer Rebalance Protocol KIP-932 Queues for Kafka performance
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants