@@ -492,6 +492,27 @@ public final class KafkaConsumer: Sendable, Service {
492492 /// - message: Last received message that shall be marked as read.
493493 /// - Throws: A ``KafkaError`` if committing failed.
494494 public func scheduleCommit( _ message: KafkaConsumerMessage ) throws {
495+ try scheduleCommit ( topic: message. topic,
496+ partition: message. partition,
497+ offset: message. offset)
498+ }
499+
500+ /// Mark all messages up to the passed message in the topic as read.
501+ /// Schedules a commit and returns immediately.
502+ /// Any errors encountered after scheduling the commit will be discarded.
503+ ///
504+ /// This method is only used for manual offset management.
505+ ///
506+ /// - Warning: This method fails if the ``KafkaConsumerConfiguration/isAutoCommitEnabled`` configuration property is set to `true` (default).
507+ ///
508+ /// - Parameters:
509+ /// - topic: Topic where the message that should be marked as read resides.
510+ /// - partition: Partition where the message that should be marked as read resides.
511+ /// - offset: Offset of the message that shall be marked as read.
512+ /// - Throws: A ``KafkaError`` if committing failed.
513+ public func scheduleCommit( topic: String ,
514+ partition: KafkaPartition ,
515+ offset: KafkaOffset ) throws {
495516 let action = self . stateMachine. withLockedValue { $0. commit ( ) }
496517 switch action {
497518 case . throwClosedError:
@@ -501,7 +522,9 @@ public final class KafkaConsumer: Sendable, Service {
501522 throw KafkaError . config ( reason: " Committing manually only works if isAutoCommitEnabled set to false " )
502523 }
503524
504- try client. scheduleCommit ( message)
525+ try client. scheduleCommit ( topic: topic,
526+ partition: partition,
527+ offset: offset)
505528 }
506529 }
507530
@@ -521,6 +544,26 @@ public final class KafkaConsumer: Sendable, Service {
521544 /// - message: Last received message that shall be marked as read.
522545 /// - Throws: A ``KafkaError`` if committing failed.
523546 public func commit( _ message: KafkaConsumerMessage ) async throws {
547+ try await commit ( topic: message. topic,
548+ partition: message. partition,
549+ offset: message. offset)
550+ }
551+
552+ /// Mark all messages up to the passed message in the topic as read.
553+ /// Awaits until the commit succeeds or an error is encountered.
554+ ///
555+ /// This method is only used for manual offset management.
556+ ///
557+ /// - Warning: This method fails if the ``KafkaConsumerConfiguration/isAutoCommitEnabled`` configuration property is set to `true` (default).
558+ ///
559+ /// - Parameters:
560+ /// - topic: Topic where the message that should be marked as read resides.
561+ /// - partition: Partition where the message that should be marked as read resides.
562+ /// - offset: Offset of the message that shall be marked as read.
563+ /// - Throws: A ``KafkaError`` if committing failed.
564+ public func commit( topic: String ,
565+ partition: KafkaPartition ,
566+ offset: KafkaOffset ) async throws {
524567 let action = self . stateMachine. withLockedValue { $0. commit ( ) }
525568 switch action {
526569 case . throwClosedError:
@@ -530,10 +573,12 @@ public final class KafkaConsumer: Sendable, Service {
530573 throw KafkaError . config ( reason: " Committing manually only works if isAutoCommitEnabled set to false " )
531574 }
532575
533- try await client. commit ( message)
576+ try await client. commit ( topic: topic,
577+ partition: partition,
578+ offset: offset)
534579 }
535580 }
536-
581+
537582 /// This function is used to gracefully shut down a Kafka consumer client.
538583 ///
539584 /// - Note: Invoking this function is not always needed as the ``KafkaConsumer``
0 commit comments