rebalance_cb_callback

@brief \b Consumer: Set rebalance callback for use with coordinated consumer group balancing.

The \p err field is set to either RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS or RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS and 'partitions' contains the full partition set that was either assigned or revoked.

Registering a \p rebalance_cb turns off librdkafka's automatic partition assignment/revocation and instead delegates that responsibility to the application's \p rebalance_cb.

The rebalance callback is responsible for updating librdkafka's assignment set based on the two events: RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS and RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS but should also be able to handle arbitrary rebalancing failures where \p err is neither of those. @remark In this latter case (arbitrary error), the application must call rd_kafka_assign(rk, NULL) to synchronize state.

Without a rebalance callback this is done automatically by librdkafka but registering a rebalance callback gives the application flexibility in performing other operations along with the assinging/revocation, such as fetching offsets from an alternate location (on assign) or manually committing offsets (on revoke).

@remark The \p partitions list is destroyed by librdkafka on return return from the rebalance_cb and must not be freed or saved by the application.

The following example shows the application's responsibilities: @code static void rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *partitions, void *opaque) {

switch (err) { case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: // application may load offets from arbitrary external // storage here and update \p partitions

rd_kafka_assign(rk, partitions); break;

case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: if (manual_commits) // Optional explicit manual commit rd_kafka_commit(rk, partitions, 0); // sync commit

rd_kafka_assign(rk, NULL); break;

default: handle_unlikely_error(err); rd_kafka_assign(rk, NULL); // sync state break; } } @endcode

extern (C) nothrow @nogc
alias rebalance_cb_callback = void function nothrow @nogc

Meta