@brief \b Consumer: Set rebalance callback for use with
coordinated consumer group balancing.
The \p err field is set to either RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS
or RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS and 'partitions'
contains the full partition set that was either assigned or revoked.
Registering a \p rebalance_cb turns off librdkafka's automatic
partition assignment/revocation and instead delegates that responsibility
to the application's \p rebalance_cb.
The rebalance callback is responsible for updating librdkafka's
assignment set based on the two events: RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS
and RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS but should also be able to handle
arbitrary rebalancing failures where \p err is neither of those.
@remark In this latter case (arbitrary error), the application must
call rd_kafka_assign(rk, NULL) to synchronize state.
Without a rebalance callback this is done automatically by librdkafka
but registering a rebalance callback gives the application flexibility
in performing other operations along with the assinging/revocation,
such as fetching offsets from an alternate location (on assign)
or manually committing offsets (on revoke).
@remark The \p partitions list is destroyed by librdkafka on return
return from the rebalance_cb and must not be freed or
saved by the application.
The following example shows the application's responsibilities:
@code
static void rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
rd_kafka_topic_partition_list_t *partitions,
void *opaque) {
switch (err)
{
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
// application may load offets from arbitrary external
// storage here and update \p partitions
rd_kafka_assign(rk, partitions);
break;
case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
if (manual_commits) // Optional explicit manual commit
rd_kafka_commit(rk, partitions, 0); // sync commit
@brief \b Consumer: Set rebalance callback for use with coordinated consumer group balancing.
The \p err field is set to either RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS or RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS and 'partitions' contains the full partition set that was either assigned or revoked.
Registering a \p rebalance_cb turns off librdkafka's automatic partition assignment/revocation and instead delegates that responsibility to the application's \p rebalance_cb.
The rebalance callback is responsible for updating librdkafka's assignment set based on the two events: RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS and RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS but should also be able to handle arbitrary rebalancing failures where \p err is neither of those. @remark In this latter case (arbitrary error), the application must call rd_kafka_assign(rk, NULL) to synchronize state.
Without a rebalance callback this is done automatically by librdkafka but registering a rebalance callback gives the application flexibility in performing other operations along with the assinging/revocation, such as fetching offsets from an alternate location (on assign) or manually committing offsets (on revoke).
@remark The \p partitions list is destroyed by librdkafka on return return from the rebalance_cb and must not be freed or saved by the application.
The following example shows the application's responsibilities: @code static void rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *partitions, void *opaque) {
switch (err) { case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: // application may load offets from arbitrary external // storage here and update \p partitions
rd_kafka_assign(rk, partitions); break;
case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: if (manual_commits) // Optional explicit manual commit rd_kafka_commit(rk, partitions, 0); // sync commit
rd_kafka_assign(rk, NULL); break;
default: handle_unlikely_error(err); rd_kafka_assign(rk, NULL); // sync state break; } } @endcode