Skip to content

swift-kafka-client consumer is very slow (was: Provide bulk messages from KafkaConsumer) #132

@blindspotbounty

Description

@blindspotbounty

Currently, messages in KafkaConsumer provide messages one by one.
That is convenient, however it is not efficient for reading big topics, especially on service recovery.

I've made a small experiment by changing consumerMessages enum to accept array instead of single message and pack all messages from single poll to one event.

// RDKafkaClient.swift
    /// Swift wrapper for events from `librdkafka`'s event queue.
    enum KafkaEvent {
        case deliveryReport(results: [KafkaDeliveryReport])
        case consumerMessages(results: [KafkaConsumerMessage])
        case error(result: Error)
...        
    func eventPoll(maxEvents: Int, consumer: Bool = false) -> [KafkaEvent] {
        var events = [KafkaEvent]()
        events.reserveCapacity(maxEvents)
        // ...
        var msgs = [KafkaConsumerMessage]()
        if consumer {
            msgs.reserveCapacity(maxEvents)
        }
        
        for _ in 0..<maxEvents {
            // ...
            switch eventType {
            // ...
            case .fetch:
                do {
                    if let msg = try self.handleFetchEvent(event) {
                        msgs.append(msg)
                        shouldSleep = false
                    }
                } catch {
                    events.append(.error(result: error))
                }
            // ...
            case .none:
                if !msgs.isEmpty {
                    events.append(.consumerMessages(results: msgs))
                }
                // Finished reading events, return early
                return events
            default:
                break // Ignored Event
            }
        }

        if !msgs.isEmpty {
            events.append(.consumerMessages(results: msgs))
        }
        return events
    }

Also changed messages and KafkaConsumerMessages to provide bulks.

Then tested that with simple consumer applications.
For single messages:

var i = 0
var ctr: UInt64 = 0
var tmpCtr: UInt64 = 0

let interval: UInt64 = 1_000_000

var counter = ProcessingRateLocal(interval: interval)
var startDate = Date.now
var bytes: UInt64 = 0

let totalStartDate = Date.now
var totalBytes: UInt64 = 0

for try await record in consumer.messages {
    i = record.offset.offset()
    ctr += 1
    bytes += UInt64(record.msg.value.readableBytes)
    totalBytes += UInt64(record.msg.value.readableBytes)

    tmpCtr += 1
    if tmpCtr >= interval {
        let timeInterval = -startDate.timeIntervalSinceNow
        let rate = Int64(Double(tmpCtr) / timeInterval)
        let rateMb = Double(bytes) / timeInterval / 1024 / 1024

        let timeIntervalTotal = -totalStartDate.timeIntervalSinceNow
        let avgRateMb = Double(totalBytes) / timeIntervalTotal / 1024 / 1024
        
        print("read up to \(record.offset.offset()) in partition \(record.partition.idx()), ctr: \(ctr), rate: \(rate) (\(Int(rateMb))MB/s), avgRate: (\(Int(avgRateMb))MB/s), timePassed: \(Int(timeIntervalTotal))sec")

        tmpCtr = 0
        bytes = 0
        startDate = .now
    }
}

With results:

read up to 9319749 in partition 5, ctr: 1000000, rate: 115862 (25MB/s), avgRate: (25MB/s), timePassed: 8sec
read up to 9880212 in partition 5, ctr: 2000000, rate: 121114 (23MB/s), avgRate: (24MB/s), timePassed: 16sec
read up to 8904245 in partition 2, ctr: 3000000, rate: 120674 (23MB/s), avgRate: (24MB/s), timePassed: 25sec
read up to 8979310 in partition 2, ctr: 4000000, rate: 122616 (22MB/s), avgRate: (24MB/s), timePassed: 33sec
read up to 9214895 in partition 4, ctr: 5000000, rate: 120713 (23MB/s), avgRate: (23MB/s), timePassed: 41sec
read up to 9222254 in partition 1, ctr: 6000000, rate: 122643 (21MB/s), avgRate: (23MB/s), timePassed: 49sec
read up to 9257242 in partition 2, ctr: 7000000, rate: 123706 (20MB/s), avgRate: (23MB/s), timePassed: 57sec
read up to 9701009 in partition 1, ctr: 8000000, rate: 121007 (20MB/s), avgRate: (22MB/s), timePassed: 66sec
read up to 9731942 in partition 2, ctr: 9000000, rate: 121474 (20MB/s), avgRate: (22MB/s), timePassed: 74sec
read up to 10128593 in partition 0, ctr: 10000000, rate: 121398 (20MB/s), avgRate: (22MB/s), timePassed: 82sec
read up to 10478118 in partition 4, ctr: 11000000, rate: 109100 (18MB/s), avgRate: (21MB/s), timePassed: 91sec
read up to 10446066 in partition 1, ctr: 12000000, rate: 120968 (20MB/s), avgRate: (21MB/s), timePassed: 100sec
read up to 10700779 in partition 2, ctr: 13000000, rate: 120796 (20MB/s), avgRate: (21MB/s), timePassed: 108sec
read up to 10921598 in partition 2, ctr: 14000000, rate: 119480 (20MB/s), avgRate: (21MB/s), timePassed: 116sec
read up to 11358237 in partition 4, ctr: 15000000, rate: 119992 (20MB/s), avgRate: (21MB/s), timePassed: 124sec
read up to 11544553 in partition 4, ctr: 16000000, rate: 119870 (20MB/s), avgRate: (21MB/s), timePassed: 133sec
read up to 11966376 in partition 3, ctr: 17000000, rate: 119556 (20MB/s), avgRate: (21MB/s), timePassed: 141sec
read up to 11400495 in partition 1, ctr: 18000000, rate: 117463 (20MB/s), avgRate: (21MB/s), timePassed: 150sec
read up to 11834628 in partition 1, ctr: 19000000, rate: 119794 (20MB/s), avgRate: (21MB/s), timePassed: 158sec

For bulk:

var i = 0
var ctr: UInt64 = 0
var tmpCtr: UInt64 = 0

let interval: UInt64 = 1_000_000

var counter = ProcessingRateLocal(interval: interval)
var startDate = Date.now
var bytes: UInt64 = 0

let totalStartDate = Date.now
var totalBytes: UInt64 = 0

for try await bulk in consumer.bulkMessages {
    if let offset = bulk.last?.offset.offset() {
        i = offset
    }
    ctr += UInt64(bulk.count)
    bulk.forEach { record in
        bytes += UInt64(record.msg.value.readableBytes)
        totalBytes += UInt64(record.msg.value.readableBytes)
    }
    tmpCtr += UInt64(bulk.count)
    if tmpCtr >= interval {
        let timeInterval = -startDate.timeIntervalSinceNow
        let rate = Int64(Double(tmpCtr) / timeInterval)
        let rateMb = Double(bytes) / timeInterval / 1024 / 1024
        
        let timeIntervalTotal = -totalStartDate.timeIntervalSinceNow
        let avgRateMb = Double(totalBytes) / timeIntervalTotal / 1024 / 1024
        print("read up to \(bulk.last!.offset.offset()) in partition \(bulk.last!.partition.idx()), ctr: \(ctr), rate: \(rate) (\(Int(rateMb))MB/s), avgRate: (\(Int(avgRateMb))MB/s), timePassed: \(Int(timeIntervalTotal))sec")

        tmpCtr = 0
        bytes = 0
        startDate = .now
    }
}

Results:

read up to 9346205 in partition 5, ctr: 1000098, rate: 378051 (83MB/s), avgRate: (83MB/s), timePassed: 2sec
read up to 9937226 in partition 5, ctr: 2000115, rate: 633834 (123MB/s), avgRate: (98MB/s), timePassed: 4sec
read up to 10265507 in partition 5, ctr: 3000197, rate: 458818 (93MB/s), avgRate: (96MB/s), timePassed: 6sec
read up to 8969720 in partition 2, ctr: 4000283, rate: 750584 (140MB/s), avgRate: (104MB/s), timePassed: 7sec
read up to 9370086 in partition 0, ctr: 5000321, rate: 636718 (118MB/s), avgRate: (106MB/s), timePassed: 9sec
read up to 10288374 in partition 3, ctr: 6000369, rate: 918704 (162MB/s), avgRate: (112MB/s), timePassed: 10sec
read up to 9291663 in partition 1, ctr: 7000412, rate: 1017207 (175MB/s), avgRate: (117MB/s), timePassed: 11sec
read up to 10165390 in partition 0, ctr: 8000416, rate: 1289984 (218MB/s), avgRate: (124MB/s), timePassed: 12sec
read up to 9612901 in partition 2, ctr: 9000486, rate: 1028644 (172MB/s), avgRate: (127MB/s), timePassed: 13sec
read up to 11031623 in partition 5, ctr: 10000548, rate: 1000979 (167MB/s), avgRate: (130MB/s), timePassed: 14sec
read up to 10267946 in partition 1, ctr: 11000590, rate: 647163 (109MB/s), avgRate: (128MB/s), timePassed: 15sec
read up to 10599079 in partition 0, ctr: 12000591, rate: 595727 (100MB/s), avgRate: (125MB/s), timePassed: 17sec
read up to 10614768 in partition 1, ctr: 13000606, rate: 552293 (93MB/s), avgRate: (122MB/s), timePassed: 19sec
read up to 10867621 in partition 0, ctr: 14000681, rate: 538282 (91MB/s), avgRate: (119MB/s), timePassed: 21sec
read up to 11014089 in partition 0, ctr: 15000754, rate: 497392 (84MB/s), avgRate: (116MB/s), timePassed: 23sec
read up to 11211448 in partition 2, ctr: 16000776, rate: 520717 (89MB/s), avgRate: (114MB/s), timePassed: 24sec
read up to 11950906 in partition 5, ctr: 17000866, rate: 483280 (82MB/s), avgRate: (112MB/s), timePassed: 27sec
read up to 11601443 in partition 1, ctr: 18000878, rate: 395840 (67MB/s), avgRate: (108MB/s), timePassed: 29sec
read up to 11959260 in partition 4, ctr: 19000952, rate: 375111 (64MB/s), avgRate: (104MB/s), timePassed: 32sec

The latter shows results that are near 1Gbps network limits.

This is interesting as it is mostly done in the library and very natural with current poll implementation (as in some librdkafka examples) but not provided to end user.
From our perspective, that is especially useful when application require recovery from a huge topic(s) and needs to cache data e.g. in database, so it can receive and use bulk data.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions