deimos.rdkafka

@file rdkafka.h @brief Apache Kafka C/C++ consumer and producer client library.

rdkafka.h contains the public API for librdkafka. The API is documented in this file as comments prefixing the function, type, enum, define, etc.

@sa For the C++ interface see rdkafkacpp.h

@tableofcontents

Members

Aliases

consume_callback_callback
alias consume_callback_callback = void function(rd_kafka_message_t* rkmessage, void* opaque) nothrow @(nogc)

@brief Consumes messages from topic \p rkt and \p partition, calling the provided callback for each consumed messsage.

consume_callback_queue_callback
alias consume_callback_queue_callback = void function(rd_kafka_message_t* rkmessage, void* opaque) nothrow @(nogc)

@brief Consume multiple messages from queue with callback

dr_cb_callback
deprecated alias dr_cb_callback = void function(rd_kafka_t* rk, void* payload, size_t len, rd_kafka_resp_err_t err, void* opaque, void* msg_opaque) nothrow @(nogc)

@deprecated See rd_kafka_conf_set_dr_msg_cb() deprecated

dr_msg_cb_callback
alias dr_msg_cb_callback = void function(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void* opaque) nothrow @(nogc)

@brief \b Producer: Set delivery report callback in provided \p conf object.

error_cb_callback
alias error_cb_callback = void function(rd_kafka_t* rk, int err, const(char)* reason, void* opaque) nothrow @(nogc)

@brief Set error callback in provided conf object.

func_callback
deprecated alias func_callback = void function(const rd_kafka_t* rk, int level, const(char)* fac, const(char)* buf) nothrow @(nogc)

@brief Set logger function.

log_cb_callback
alias log_cb_callback = void function(const rd_kafka_t* rk, int level, const(char)* fac, const(char)* buf) nothrow @(nogc)

@brief Set logger callback.

offset_commit_cb_call_back
alias offset_commit_cb_call_back = void function(rd_kafka_t* rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t* offsets, void* opaque)

@brief \b Consumer: Set offset commit callback for use with consumer groups.

open_cb_callback
alias open_cb_callback = int function(const(char)* pathname, int flags, mode_t mode, void* opaque) nothrow @(nogc)

@brief Set open callback.

partitioner_callback
alias partitioner_callback = int32_t function(const rd_kafka_topic_t* rkt, const(void)* keydata, size_t keylen, int32_t partition_cnt, void* rkt_opaque, void* msg_opaque) nothrow @(nogc)

@brief \b Producer: Set partitioner callback in provided topic conf object.

rd_kafka_conf_t
alias rd_kafka_conf_t = rd_kafka_conf_s
Undocumented in source but is binding to C. You might be able to learn more by searching the web for its name.
rd_kafka_event_t
alias rd_kafka_event_t = rd_kafka_op_s
Undocumented in source but is binding to C. You might be able to learn more by searching the web for its name.
rd_kafka_event_type_t
alias rd_kafka_event_type_t = int

@brief Event types

rd_kafka_message_t
alias rd_kafka_message_t = rd_kafka_message_s
Undocumented in source but is binding to C. You might be able to learn more by searching the web for its name.
rd_kafka_metadata_broker_t
alias rd_kafka_metadata_broker_t = rd_kafka_metadata_broker
Undocumented in source but is binding to C. You might be able to learn more by searching the web for its name.
rd_kafka_metadata_partition_t
alias rd_kafka_metadata_partition_t = rd_kafka_metadata_partition
Undocumented in source but is binding to C. You might be able to learn more by searching the web for its name.
rd_kafka_metadata_topic_t
alias rd_kafka_metadata_topic_t = rd_kafka_metadata_topic
Undocumented in source but is binding to C. You might be able to learn more by searching the web for its name.
rd_kafka_queue_t
alias rd_kafka_queue_t = rd_kafka_queue_s
Undocumented in source but is binding to C. You might be able to learn more by searching the web for its name.
rd_kafka_t
alias rd_kafka_t = rd_kafka_s
Undocumented in source but is binding to C. You might be able to learn more by searching the web for its name.
rd_kafka_topic_conf_t
alias rd_kafka_topic_conf_t = rd_kafka_topic_conf_s
Undocumented in source but is binding to C. You might be able to learn more by searching the web for its name.
rd_kafka_topic_partition_list_t
alias rd_kafka_topic_partition_list_t = rd_kafka_topic_partition_list_s
Undocumented in source but is binding to C. You might be able to learn more by searching the web for its name.
rd_kafka_topic_partition_t
alias rd_kafka_topic_partition_t = rd_kafka_topic_partition_s
Undocumented in source but is binding to C. You might be able to learn more by searching the web for its name.
rd_kafka_topic_t
alias rd_kafka_topic_t = rd_kafka_topic_s
Undocumented in source but is binding to C. You might be able to learn more by searching the web for its name.
rebalance_cb_callback
alias rebalance_cb_callback = void function(rd_kafka_t* rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t* partitions, void* opaque) nothrow @(nogc)

@brief \b Consumer: Set rebalance callback for use with coordinated consumer group balancing.

socket_cb_callback
alias socket_cb_callback = int function(int domain, int type, int protocol, void* opaque) nothrow @(nogc)

@brief Set socket callback.

stats_cb_callback
alias stats_cb_callback = int function(rd_kafka_t* rk, char* json, size_t json_len, void* opaque) nothrow @(nogc)

@brief Set statistics callback in provided conf object.

throttle_cb_callback
alias throttle_cb_callback = void function(rd_kafka_t* rk, const(char)* broker_name, int32_t broker_id, int throttle_time_ms, void* opaque) nothrow @(nogc)

@brief Set throttle callback.

Enums

rd_kafka_conf_res_t
enum rd_kafka_conf_res_t

@enum rd_kafka_conf_res_t @brief Configuration result type

rd_kafka_resp_err_t
enum rd_kafka_resp_err_t

@enum rd_kafka_resp_err_t @brief Error codes.

rd_kafka_timestamp_type_t
enum rd_kafka_timestamp_type_t

@enum Timestamp types

rd_kafka_type_t
enum rd_kafka_type_t

@enum rd_kafka_type_t

rd_kafka_vtype_t
enum rd_kafka_vtype_t

@enum rd_kafka_vtype_t

Functions

RD_KAFKA_OFFSET_TAIL
auto RD_KAFKA_OFFSET_TAIL(T CNT)

* @brief Start consuming \p CNT messages from topic's current end offset. * * That is, if current end offset is 12345 and \p CNT is 200, it will start * consuming from offset \c 12345-200 = \c 12145.

rd_kafka_assign
rd_kafka_resp_err_t rd_kafka_assign(rd_kafka_t* rk, rd_kafka_topic_partition_list_t* partitions)

@brief Atomic assignment of partitions to consume.

rd_kafka_assignment
rd_kafka_resp_err_t rd_kafka_assignment(rd_kafka_t* rk, rd_kafka_topic_partition_list_t** partitions)

@brief Returns the current partition assignment

rd_kafka_brokers_add
int rd_kafka_brokers_add(rd_kafka_t* rk, const(char)* brokerlist)

@brief Adds one or more brokers to the kafka handle's list of initial bootstrap brokers.

rd_kafka_commit
rd_kafka_resp_err_t rd_kafka_commit(rd_kafka_t* rk, rd_kafka_topic_partition_list_t* offsets, int async)

@brief Commit offsets on broker for the provided list of partitions.

rd_kafka_commit_message
rd_kafka_resp_err_t rd_kafka_commit_message(rd_kafka_t* rk, rd_kafka_message_t* rkmessage, int async)

@brief Commit message's offset on broker for the message's partition.

rd_kafka_commit_queue
rd_kafka_resp_err_t rd_kafka_commit_queue(rd_kafka_t* rk, rd_kafka_topic_partition_list_t* offsets, rd_kafka_queue_t* rkqu, void function(rd_kafka_t* rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t* offsets, void* opaque) nothrow @(nogc) cb, void* opaque)

@brief Commit offsets on broker for the provided list of partitions.

rd_kafka_committed
rd_kafka_resp_err_t rd_kafka_committed(rd_kafka_t* rk, rd_kafka_topic_partition_list_t* partitions, int timeout_ms)

@brief Retrieve committed offsets for topics+partitions.

rd_kafka_conf_destroy
void rd_kafka_conf_destroy(rd_kafka_conf_t* conf)

@brief Destroys a conf object.

rd_kafka_conf_dump
const(char)** rd_kafka_conf_dump(rd_kafka_conf_t* conf, size_t* cntp)

@brief Dump the configuration properties and values of \p conf to an array with \"key\", \"value\" pairs.

rd_kafka_conf_dump_free
void rd_kafka_conf_dump_free(const(char)** arr, size_t cnt)

@brief Frees a configuration dump returned from rd_kafka_conf_dump() or `rd_kafka_topic_conf_dump().

rd_kafka_conf_dup
rd_kafka_conf_t* rd_kafka_conf_dup(rd_kafka_conf_t* conf)

@brief Creates a copy/duplicate of configuration object \p conf

rd_kafka_conf_get
rd_kafka_conf_res_t rd_kafka_conf_get(rd_kafka_conf_t* conf, const(char)* name, char* dest, size_t* dest_size)

@brief Retrieve configuration value for property \p name.

rd_kafka_conf_new
rd_kafka_conf_t* rd_kafka_conf_new()

@brief Create configuration object.

rd_kafka_conf_properties_show
void rd_kafka_conf_properties_show(FILE* fp)

@brief Prints a table to \p fp of all supported configuration properties, their default values as well as a description.

rd_kafka_conf_set
rd_kafka_conf_res_t rd_kafka_conf_set(rd_kafka_conf_t* conf, const(char)* name, const(char)* value, char* errstr, size_t errstr_size)

@brief Sets a configuration property.

rd_kafka_conf_set_closesocket_cb
void rd_kafka_conf_set_closesocket_cb(rd_kafka_conf_t* conf, int function(int sockfd, void* opaque) closesocket_cb)

@brief Set close socket callback.

rd_kafka_conf_set_connect_cb
void rd_kafka_conf_set_connect_cb(rd_kafka_conf_t* conf, int function(int sockfd, const sockaddr* addr, int addrlen, const char* id, void* opaque) connect_cb)

@brief Set connect callback.

rd_kafka_conf_set_consume_cb
void rd_kafka_conf_set_consume_cb(rd_kafka_conf_t* conf, void function(rd_kafka_message_t* rkmessage, void* opaque) nothrow @(nogc) consume_cb)

@brief \b Consumer: Set consume callback for use with rd_kafka_consumer_poll()

rd_kafka_conf_set_default_topic_conf
void rd_kafka_conf_set_default_topic_conf(rd_kafka_conf_t* conf, rd_kafka_topic_conf_t* tconf)

Sets the default topic configuration to use for automatically subscribed topics (e.g., through pattern-matched topics). The topic config object is not usable after this call.

rd_kafka_conf_set_dr_cb
deprecated void rd_kafka_conf_set_dr_cb(rd_kafka_conf_t* conf, dr_cb_callback dr_cb)
Undocumented in source but is binding to C. You might be able to learn more by searching the web for its name.
rd_kafka_conf_set_dr_msg_cb
void rd_kafka_conf_set_dr_msg_cb(rd_kafka_conf_t* conf, dr_msg_cb_callback dr_msg_cb)
Undocumented in source but is binding to C. You might be able to learn more by searching the web for its name.
rd_kafka_conf_set_error_cb
void rd_kafka_conf_set_error_cb(rd_kafka_conf_t* conf, error_cb_callback error_cb)
Undocumented in source but is binding to C. You might be able to learn more by searching the web for its name.
rd_kafka_conf_set_events
void rd_kafka_conf_set_events(rd_kafka_conf_t* conf, int events)

@brief Enable event sourcing. \p events is a bitmask of \c RD_KAFKA_EVENT_* of events to enable for consumption by rd_kafka_queue_poll().

rd_kafka_conf_set_log_cb
void rd_kafka_conf_set_log_cb(rd_kafka_conf_t* conf, log_cb_callback log_cb)
Undocumented in source but is binding to C. You might be able to learn more by searching the web for its name.
rd_kafka_conf_set_offset_commit_cb
void rd_kafka_conf_set_offset_commit_cb(rd_kafka_conf_t* conf, offset_commit_cb_call_back offset_commit_cb)
Undocumented in source but is binding to C. You might be able to learn more by searching the web for its name.
rd_kafka_conf_set_opaque
void rd_kafka_conf_set_opaque(rd_kafka_conf_t* conf, void* opaque)

@brief Sets the application's opaque pointer that will be passed to callbacks

rd_kafka_conf_set_open_cb
void rd_kafka_conf_set_open_cb(rd_kafka_conf_t* conf, open_cb_callback open_cb)
Undocumented in source but is binding to C. You might be able to learn more by searching the web for its name.
rd_kafka_conf_set_rebalance_cb
void rd_kafka_conf_set_rebalance_cb(rd_kafka_conf_t* conf, rebalance_cb_callback rebalance_cb)
Undocumented in source but is binding to C. You might be able to learn more by searching the web for its name.
rd_kafka_conf_set_socket_cb
void rd_kafka_conf_set_socket_cb(rd_kafka_conf_t* conf, socket_cb_callback socket_cb)
Undocumented in source but is binding to C. You might be able to learn more by searching the web for its name.
rd_kafka_conf_set_stats_cb
void rd_kafka_conf_set_stats_cb(rd_kafka_conf_t* conf, stats_cb_callback stats_cb)
Undocumented in source but is binding to C. You might be able to learn more by searching the web for its name.
rd_kafka_conf_set_throttle_cb
void rd_kafka_conf_set_throttle_cb(rd_kafka_conf_t* conf, throttle_cb_callback throttle_cb)
Undocumented in source but is binding to C. You might be able to learn more by searching the web for its name.
rd_kafka_consume
rd_kafka_message_t* rd_kafka_consume(rd_kafka_topic_t* rkt, int32_t partition, int timeout_ms)

@brief Consume a single message from topic \p rkt and \p partition

rd_kafka_consume_batch
ssize_t rd_kafka_consume_batch(rd_kafka_topic_t* rkt, int32_t partition, int timeout_ms, rd_kafka_message_t** rkmessages, size_t rkmessages_size)

@brief Consume up to \p rkmessages_size from topic \p rkt and \p partition putting a pointer to each message in the application provided array \p rkmessages (of size \p rkmessages_size entries).

rd_kafka_consume_batch_queue
ssize_t rd_kafka_consume_batch_queue(rd_kafka_queue_t* rkqu, int timeout_ms, rd_kafka_message_t** rkmessages, size_t rkmessages_size)

@brief Consume batch of messages from queue

rd_kafka_consume_callback
int rd_kafka_consume_callback(rd_kafka_topic_t* rkt, int32_t partition, int timeout_ms, consume_callback_callback consume_cb, void* opaque)

@brief Consumes messages from topic \p rkt and \p partition, calling the provided callback for each consumed messsage.

rd_kafka_consume_callback_queue
int rd_kafka_consume_callback_queue(rd_kafka_queue_t* rkqu, int timeout_ms, consume_callback_queue_callback consume_cb, void* opaque)

@brief Consume multiple messages from queue with callback

rd_kafka_consume_queue
rd_kafka_message_t* rd_kafka_consume_queue(rd_kafka_queue_t* rkqu, int timeout_ms)

@brief Consume from queue

rd_kafka_consume_start
int rd_kafka_consume_start(rd_kafka_topic_t* rkt, int32_t partition, int64_t offset)

@brief Start consuming messages for topic \p rkt and \p partition at offset \p offset which may either be an absolute \c (0..N) or one of the logical offsets: - RD_KAFKA_OFFSET_BEGINNING - RD_KAFKA_OFFSET_END - RD_KAFKA_OFFSET_STORED - RD_KAFKA_OFFSET_TAIL

rd_kafka_consume_start_queue
int rd_kafka_consume_start_queue(rd_kafka_topic_t* rkt, int32_t partition, int64_t offset, rd_kafka_queue_t* rkqu)

@brief Same as rd_kafka_consume_start() but re-routes incoming messages to the provided queue \p rkqu (which must have been previously allocated with rd_kafka_queue_new().

rd_kafka_consume_stop
int rd_kafka_consume_stop(rd_kafka_topic_t* rkt, int32_t partition)

@brief Stop consuming messages for topic \p rkt and \p partition, purging all messages currently in the local queue.

rd_kafka_consumer_close
rd_kafka_resp_err_t rd_kafka_consumer_close(rd_kafka_t* rk)

@brief Close down the KafkaConsumer.

rd_kafka_consumer_poll
rd_kafka_message_t* rd_kafka_consumer_poll(rd_kafka_t* rk, int timeout_ms)

@brief Poll the consumer for messages or events.

rd_kafka_destroy
void rd_kafka_destroy(rd_kafka_t* rk)

@brief Destroy Kafka handle.

rd_kafka_dump
void rd_kafka_dump(FILE* fp, rd_kafka_t* rk)

@brief Dumps rdkafka's internal state for handle \p rk to stream \p fp

rd_kafka_err2name
const(char)* rd_kafka_err2name(rd_kafka_resp_err_t err)

@brief Returns the error code name (enum name).

rd_kafka_err2str
const(char)* rd_kafka_err2str(rd_kafka_resp_err_t err)

@brief Returns a human readable representation of a kafka error.

rd_kafka_errno
int rd_kafka_errno()

@brief Returns the thread-local system errno

rd_kafka_errno2err
rd_kafka_resp_err_t rd_kafka_errno2err(int errnox)

@brief Converts the system errno value \p errnox to a rd_kafka_resp_err_t error code upon failure from the following functions: - rd_kafka_topic_new() - rd_kafka_consume_start() - rd_kafka_consume_stop() - rd_kafka_consume() - rd_kafka_consume_batch() - rd_kafka_consume_callback() - rd_kafka_consume_queue() - rd_kafka_produce()

rd_kafka_event_destroy
void rd_kafka_event_destroy(rd_kafka_event_t* rkev)

@brief Destroy an event.

rd_kafka_event_error
rd_kafka_resp_err_t rd_kafka_event_error(rd_kafka_event_t* rkev)

@returns the error code for the event.

rd_kafka_event_error_string
const(char)* rd_kafka_event_error_string(rd_kafka_event_t* rkev)

@returns the error string (if any). An application should check that rd_kafka_event_error() returns non-zero before calling this function.

rd_kafka_event_log
int rd_kafka_event_log(rd_kafka_event_t* rkev, const(char)** fac, const(char)** str, int* level)

@brief Extract log message from the event.

rd_kafka_event_message_array
size_t rd_kafka_event_message_array(rd_kafka_event_t* rkev, rd_kafka_message_t** rkmessages, size_t size)

@brief Extacts \p size message(s) from the event into the pre-allocated array \p rkmessages.

rd_kafka_event_message_count
size_t rd_kafka_event_message_count(rd_kafka_event_t* rkev)

@returns the number of remaining messages in the event.

rd_kafka_event_message_next
const(rd_kafka_message_t)* rd_kafka_event_message_next(rd_kafka_event_t* rkev)

@returns the next message from an event.

rd_kafka_event_name
const(char)* rd_kafka_event_name(rd_kafka_event_t* rkev)

@returns the event type's name for the given event.

rd_kafka_event_opaque
void* rd_kafka_event_opaque(rd_kafka_event_t* rkev)

@returns the user opaque (if any)

rd_kafka_event_topic_partition
rd_kafka_topic_partition_t* rd_kafka_event_topic_partition(rd_kafka_event_t* rkev)

@returns a newly allocated topic_partition container, if applicable for the event type, else NULL.

rd_kafka_event_topic_partition_list
rd_kafka_topic_partition_list_t* rd_kafka_event_topic_partition_list(rd_kafka_event_t* rkev)

@returns the topic partition list from the event.

rd_kafka_event_type
rd_kafka_event_type_t rd_kafka_event_type(rd_kafka_event_t* rkev)

@returns the event type for the given event.

rd_kafka_flush
rd_kafka_resp_err_t rd_kafka_flush(rd_kafka_t* rk, int timeout_ms)

@brief Wait until all outstanding produce requests, et.al, are completed. This should typically be done prior to destroying a producer instance to make sure all queued and in-flight produce requests are completed before terminating.

rd_kafka_get_debug_contexts
const(char)* rd_kafka_get_debug_contexts()

@brief Retrieve supported debug contexts for use with the \c \"debug\" configuration property. (runtime)

rd_kafka_get_err_descs
void rd_kafka_get_err_descs(rd_kafka_err_desc** errdescs, size_t* cntp)

@brief Returns the full list of error codes.

rd_kafka_get_watermark_offsets
rd_kafka_resp_err_t rd_kafka_get_watermark_offsets(rd_kafka_t* rk, const(char)* topic, int32_t partition, int64_t* low, int64_t* high)

@brief Get last known low (oldest/beginning) and high (newest/end) offsets for partition.

rd_kafka_group_list_destroy
void rd_kafka_group_list_destroy(rd_kafka_group_list* grplist)

@brief Release list memory

rd_kafka_last_error
rd_kafka_resp_err_t rd_kafka_last_error()

@brief Returns the last error code generated by a legacy API call in the current thread.

rd_kafka_list_groups
rd_kafka_resp_err_t rd_kafka_list_groups(rd_kafka_t* rk, const(char)* group, rd_kafka_group_list** grplistp, int timeout_ms)

@brief List and describe client groups in cluster.

rd_kafka_log_print
void rd_kafka_log_print(rd_kafka_t* rk, int level, const(char)* fac, const(char)* buf)

@brief Builtin (default) log sink: print to stderr

rd_kafka_log_syslog
void rd_kafka_log_syslog(rd_kafka_t* rk, int level, const(char)* fac, const(char)* buf)

@brief Builtin log sink: print to syslog.

rd_kafka_mem_free
void rd_kafka_mem_free(rd_kafka_t* rk, void* ptr)

@brief Free pointer returned by librdkafka

rd_kafka_memberid
char* rd_kafka_memberid(rd_kafka_t* rk)

@brief Returns this client's broker-assigned group member id

rd_kafka_message_destroy
void rd_kafka_message_destroy(rd_kafka_message_t* rkmessage)

@brief Frees resources for \p rkmessage and hands ownership back to rdkafka.

rd_kafka_message_errstr
const(char)* rd_kafka_message_errstr(rd_kafka_message_t* rkmessage)

@brief Returns the error string for an errored rd_kafka_message_t or NULL if there was no error.

rd_kafka_message_timestamp
int64_t rd_kafka_message_timestamp(rd_kafka_message_t* rkmessage, rd_kafka_timestamp_type_t* tstype)

@brief Returns the message timestamp for a consumed message.

rd_kafka_metadata
rd_kafka_resp_err_t rd_kafka_metadata(rd_kafka_t* rk, int all_topics, rd_kafka_topic_t* only_rkt, rd_kafka_metadata_t** metadatap, int timeout_ms)

@brief Request Metadata from broker.

rd_kafka_metadata_destroy
void rd_kafka_metadata_destroy(rd_kafka_metadata_t* metadata)

@brief Release metadata memory.

rd_kafka_msg_partitioner_consistent
int32_t rd_kafka_msg_partitioner_consistent(rd_kafka_topic_t* rkt, const(void)* key, size_t keylen, int32_t partition_cnt, void* opaque, void* msg_opaque)

@brief Consistent partitioner.

rd_kafka_msg_partitioner_consistent_random
int32_t rd_kafka_msg_partitioner_consistent_random(rd_kafka_topic_t* rkt, const(void)* key, size_t keylen, int32_t partition_cnt, void* opaque, void* msg_opaque)

@brief Consistent-Random partitioner.

rd_kafka_msg_partitioner_random
int32_t rd_kafka_msg_partitioner_random(rd_kafka_topic_t* rkt, const(void)* key, size_t keylen, int32_t partition_cnt, void* opaque, void* msg_opaque)

@brief Random partitioner.

rd_kafka_name
const(char)* rd_kafka_name(rd_kafka_t* rk)

@brief Returns Kafka handle name.

rd_kafka_new
rd_kafka_t* rd_kafka_new(rd_kafka_type_t type, rd_kafka_conf_t* conf, char* errstr, size_t errstr_size)

@brief Creates a new Kafka handle and starts its operation according to the specified \p type (\p RD_KAFKA_CONSUMER or \p RD_KAFKA_PRODUCER).

rd_kafka_offset_store
rd_kafka_resp_err_t rd_kafka_offset_store(rd_kafka_topic_t* rkt, int32_t partition, int64_t offset)

@brief Store offset \p offset for topic \p rkt partition \p partition.

rd_kafka_offsets_for_times
rd_kafka_resp_err_t rd_kafka_offsets_for_times(rd_kafka_t* rk, rd_kafka_topic_partition_list_t* offsets, int timeout_ms)

@brief Look up the offsets for the given partitions by timestamp.

rd_kafka_opaque
void* rd_kafka_opaque(rd_kafka_t* rk)

@brief Retrieves the opaque pointer previously set with rd_kafka_conf_set_opaque()

rd_kafka_outq_len
int rd_kafka_outq_len(rd_kafka_t* rk)

@brief Returns the current out queue length.

rd_kafka_pause_partitions
rd_kafka_resp_err_t rd_kafka_pause_partitions(rd_kafka_t* rk, rd_kafka_topic_partition_list_t* partitions)

@brief Pause producing or consumption for the provided list of partitions.

rd_kafka_poll
int rd_kafka_poll(rd_kafka_t* rk, int timeout_ms)

@brief Polls the provided kafka handle for events.

rd_kafka_poll_set_consumer
rd_kafka_resp_err_t rd_kafka_poll_set_consumer(rd_kafka_t* rk)

@brief Redirect the main (rd_kafka_poll()) queue to the KafkaConsumer's queue (rd_kafka_consumer_poll()).

rd_kafka_position
rd_kafka_resp_err_t rd_kafka_position(rd_kafka_t* rk, rd_kafka_topic_partition_list_t* partitions)

@brief Retrieve current positions (offsets) for topics+partitions.

rd_kafka_produce
int rd_kafka_produce(rd_kafka_topic_t* rkt, int32_t partition, int msgflags, void* payload, size_t len, const(void)* key, size_t keylen, void* msg_opaque)

@brief Produce and send a single message to broker.

rd_kafka_produce_batch
int rd_kafka_produce_batch(rd_kafka_topic_t* rkt, int32_t partition, int msgflags, rd_kafka_message_t* rkmessages, int message_cnt)

@brief Produce multiple messages.

rd_kafka_producev
rd_kafka_resp_err_t rd_kafka_producev(rd_kafka_t* rk, ...)

@brief Produce and send a single message to broker.

rd_kafka_query_watermark_offsets
rd_kafka_resp_err_t rd_kafka_query_watermark_offsets(rd_kafka_t* rk, const(char)* topic, int32_t partition, int64_t* low, int64_t* high, int timeout_ms)

@brief Query broker for low (oldest/beginning) and high (newest/end) offsets for partition.

rd_kafka_queue_destroy
void rd_kafka_queue_destroy(rd_kafka_queue_t* rkqu)

Destroy a queue, purging all of its enqueued messages.

rd_kafka_queue_forward
void rd_kafka_queue_forward(rd_kafka_queue_t* src, rd_kafka_queue_t* dst)

@brief Forward/re-route queue \p src to \p dst. If \p dst is \c NULL the forwarding is removed.

rd_kafka_queue_get_consumer
rd_kafka_queue_t* rd_kafka_queue_get_consumer(rd_kafka_t* rk)

@returns a reference to the librdkafka consumer queue. This is the queue served by rd_kafka_consumer_poll().

rd_kafka_queue_get_main
rd_kafka_queue_t* rd_kafka_queue_get_main(rd_kafka_t* rk)

@returns a reference to the main librdkafka event queue. This is the queue served by rd_kafka_poll().

rd_kafka_queue_get_partition
rd_kafka_queue_t* rd_kafka_queue_get_partition(rd_kafka_t* rk, char* topic, int32_t partition)

@returns a reference to the partition's queue, or NULL if partition is invalid.

rd_kafka_queue_io_event_enable
void rd_kafka_queue_io_event_enable(rd_kafka_queue_t* rkqu, int fd, const(void)* payload, size_t size)

@brief Enable IO event triggering for queue.

rd_kafka_queue_length
size_t rd_kafka_queue_length(rd_kafka_queue_t* rkqu)

@returns the current number of elements in queue.

rd_kafka_queue_new
rd_kafka_queue_t* rd_kafka_queue_new(rd_kafka_t* rk)

@brief Create a new message queue.

rd_kafka_queue_poll
rd_kafka_event_t* rd_kafka_queue_poll(rd_kafka_queue_t* rkqu, int timeout_ms)

@brief Poll a queue for an event for max \p timeout_ms.

rd_kafka_queue_poll_callback
int rd_kafka_queue_poll_callback(rd_kafka_queue_t* rkqu, int timeout_ms)

@brief Poll a queue for events served through callbacks for max \p timeout_ms.

rd_kafka_resume_partitions
rd_kafka_resp_err_t rd_kafka_resume_partitions(rd_kafka_t* rk, rd_kafka_topic_partition_list_t* partitions)

@brief Resume producing consumption for the provided list of partitions.

rd_kafka_seek
rd_kafka_resp_err_t rd_kafka_seek(rd_kafka_topic_t* rkt, int32_t partition, int64_t offset, int timeout_ms)

@brief Seek consumer for topic+partition to \p offset which is either an absolute or logical offset.

rd_kafka_set_log_level
void rd_kafka_set_log_level(rd_kafka_t* rk, int level)

@brief Specifies the maximum logging level produced by internal kafka logging and debugging.

rd_kafka_set_log_queue
rd_kafka_resp_err_t rd_kafka_set_log_queue(rd_kafka_t* rk, rd_kafka_queue_t* rkqu)

@brief Forward librdkafka logs (and debug) to the specified queue for serving with one of the ..poll() calls.

rd_kafka_set_logger
deprecated void rd_kafka_set_logger(rd_kafka_t* rk, func_callback func)
Undocumented in source but is binding to C. You might be able to learn more by searching the web for its name.
rd_kafka_subscribe
rd_kafka_resp_err_t rd_kafka_subscribe(rd_kafka_t* rk, rd_kafka_topic_partition_list_t* topics)

@brief Subscribe to topic set using balanced consumer groups.

rd_kafka_subscription
rd_kafka_resp_err_t rd_kafka_subscription(rd_kafka_t* rk, rd_kafka_topic_partition_list_t** topics)

@brief Returns the current topic subscription

rd_kafka_thread_cnt
int rd_kafka_thread_cnt()

@brief Retrieve the current number of threads in use by librdkafka.

rd_kafka_topic_conf_destroy
void rd_kafka_topic_conf_destroy(rd_kafka_topic_conf_t* topic_conf)

@brief Destroys a topic conf object.

rd_kafka_topic_conf_dump
const(char)** rd_kafka_topic_conf_dump(rd_kafka_topic_conf_t* conf, size_t* cntp)

@brief Dump the topic configuration properties and values of \p conf to an array with \"key\", \"value\" pairs.

rd_kafka_topic_conf_dup
rd_kafka_topic_conf_t* rd_kafka_topic_conf_dup(rd_kafka_topic_conf_t* conf)

@brief Creates a copy/duplicate of topic configuration object \p conf.

rd_kafka_topic_conf_get
rd_kafka_conf_res_t rd_kafka_topic_conf_get(rd_kafka_topic_conf_t* conf, const(char)* name, char* dest, size_t* dest_size)

@brief Retrieve topic configuration value for property \p name.

rd_kafka_topic_conf_new
rd_kafka_topic_conf_t* rd_kafka_topic_conf_new()

@brief Create topic configuration object

rd_kafka_topic_conf_set
rd_kafka_conf_res_t rd_kafka_topic_conf_set(rd_kafka_topic_conf_t* conf, const(char)* name, const(char)* value, char* errstr, size_t errstr_size)

@brief Sets a single rd_kafka_topic_conf_t value by property name.

rd_kafka_topic_conf_set_opaque
void rd_kafka_topic_conf_set_opaque(rd_kafka_topic_conf_t* conf, void* opaque)

@brief Sets the application's opaque pointer that will be passed to all topic callbacks as the \c rkt_opaque argument.

rd_kafka_topic_conf_set_partitioner_cb
void rd_kafka_topic_conf_set_partitioner_cb(rd_kafka_topic_conf_t* topic_conf, partitioner_callback partitioner)
Undocumented in source but is binding to C. You might be able to learn more by searching the web for its name.
rd_kafka_topic_destroy
void rd_kafka_topic_destroy(rd_kafka_topic_t* rkt)

@brief Loose application's topic handle refcount as previously created with rd_kafka_topic_new().

rd_kafka_topic_name
const(char)* rd_kafka_topic_name(rd_kafka_topic_t* rkt)

@brief Returns the topic name.

rd_kafka_topic_new
rd_kafka_topic_t* rd_kafka_topic_new(rd_kafka_t* rk, const(char)* topic, rd_kafka_topic_conf_t* conf)

@brief Creates a new topic handle for topic named \p topic.

rd_kafka_topic_opaque
void* rd_kafka_topic_opaque(rd_kafka_topic_t* rkt)

@brief Get the \p rkt_opaque pointer that was set in the topic configuration.

rd_kafka_topic_partition_available
int rd_kafka_topic_partition_available(rd_kafka_topic_t* rkt, int32_t partition)

@brief Check if partition is available (has a leader broker).

rd_kafka_topic_partition_destroy
void rd_kafka_topic_partition_destroy(rd_kafka_topic_partition_t* rktpar)

@brief Destroy a rd_kafka_topic_partition_t. @remark This must not be called for elements in a topic partition list.

rd_kafka_topic_partition_list_add
rd_kafka_topic_partition_t* rd_kafka_topic_partition_list_add(rd_kafka_topic_partition_list_t* rktparlist, const(char)* topic, int32_t partition)

@brief Add topic+partition to list

rd_kafka_topic_partition_list_add_range
void rd_kafka_topic_partition_list_add_range(rd_kafka_topic_partition_list_t* rktparlist, const(char)* topic, int32_t start, int32_t stop)

@brief Add range of partitions from \p start to \p stop inclusive.

rd_kafka_topic_partition_list_copy
rd_kafka_topic_partition_list_t* rd_kafka_topic_partition_list_copy(rd_kafka_topic_partition_list_t* src)

@brief Make a copy of an existing list.

rd_kafka_topic_partition_list_del
int rd_kafka_topic_partition_list_del(rd_kafka_topic_partition_list_t* rktparlist, const(char)* topic, int32_t partition)

@brief Delete partition from list.

rd_kafka_topic_partition_list_del_by_idx
int rd_kafka_topic_partition_list_del_by_idx(rd_kafka_topic_partition_list_t* rktparlist, int idx)

@brief Delete partition from list by elems[] index.

rd_kafka_topic_partition_list_destroy
void rd_kafka_topic_partition_list_destroy(rd_kafka_topic_partition_list_t* rkparlist)

@brief Free all resources used by the list and the list itself.

rd_kafka_topic_partition_list_find
rd_kafka_topic_partition_t* rd_kafka_topic_partition_list_find(rd_kafka_topic_partition_list_t* rktparlist, const(char)* topic, int32_t partition)

@brief Find element by \p topic and \p partition.

rd_kafka_topic_partition_list_new
rd_kafka_topic_partition_list_t* rd_kafka_topic_partition_list_new(int size)

@brief Create a new list/vector Topic+Partition container.

rd_kafka_topic_partition_list_set_offset
rd_kafka_resp_err_t rd_kafka_topic_partition_list_set_offset(rd_kafka_topic_partition_list_t* rktparlist, const(char)* topic, int32_t partition, int64_t offset)

@brief Set offset to \p offset for \p topic and \p partition

rd_kafka_topic_partition_list_sort
void rd_kafka_topic_partition_list_sort(rd_kafka_topic_partition_list_t* rktparlist, int function(const void* a, const void* b, void* opaque) cmp, void* opaque)

@brief Sort list using comparator \p cmp.

rd_kafka_unsubscribe
rd_kafka_resp_err_t rd_kafka_unsubscribe(rd_kafka_t* rk)

@brief Unsubscribe from the current subscription set.

rd_kafka_version
int rd_kafka_version()

@brief Returns the librdkafka version as integer.

rd_kafka_version_str
const(char)* rd_kafka_version_str()

@brief Returns the librdkafka version as string.

rd_kafka_wait_destroyed
int rd_kafka_wait_destroyed(int timeout_ms)

@brief Wait for all rd_kafka_t objects to be destroyed.

rd_kafka_yield
void rd_kafka_yield(rd_kafka_t* rk)

@brief Cancels the current callback dispatcher (rd_kafka_poll(), rd_kafka_consume_callback(), etc).

Manifest constants

RD_KAFKA_DEBUG_CONTEXTS
deprecated enum RD_KAFKA_DEBUG_CONTEXTS;

@brief Supported debug contexts. (compile time)

RD_KAFKA_EVENT_DR
enum RD_KAFKA_EVENT_DR;

< Producer Delivery report batch

RD_KAFKA_EVENT_ERROR
enum RD_KAFKA_EVENT_ERROR;

< Error

RD_KAFKA_EVENT_FETCH
enum RD_KAFKA_EVENT_FETCH;

< Fetched message (consumer)

RD_KAFKA_EVENT_LOG
enum RD_KAFKA_EVENT_LOG;

< Log message

RD_KAFKA_EVENT_NONE
enum RD_KAFKA_EVENT_NONE;
Undocumented in source but is binding to C. You might be able to learn more by searching the web for its name.
RD_KAFKA_EVENT_OFFSET_COMMIT
enum RD_KAFKA_EVENT_OFFSET_COMMIT;

< Offset commit result

RD_KAFKA_EVENT_REBALANCE
enum RD_KAFKA_EVENT_REBALANCE;

< Group rebalance (consumer)

RD_KAFKA_MSG_F_COPY
enum RD_KAFKA_MSG_F_COPY;

< rdkafka will make a copy of the payload.

RD_KAFKA_MSG_F_FREE
enum RD_KAFKA_MSG_F_FREE;

* @brief Producer message flags *//**< Delegate freeing of payload to rdkafka.

RD_KAFKA_OFFSET_BEGINNING
enum RD_KAFKA_OFFSET_BEGINNING;

* * @name Simple Consumer API (legacy) * @{ * *//**< Start consuming from beginning of * kafka partition queue: oldest msg

RD_KAFKA_OFFSET_END
enum RD_KAFKA_OFFSET_END;

< Start consuming from end of kafka * partition queue: next msg

RD_KAFKA_OFFSET_INVALID
enum RD_KAFKA_OFFSET_INVALID;

< Invalid offset

RD_KAFKA_OFFSET_STORED
enum RD_KAFKA_OFFSET_STORED;

< Start consuming from offset retrieved * from offset store

RD_KAFKA_OFFSET_TAIL_BASE
enum RD_KAFKA_OFFSET_TAIL_BASE;

@cond NO_DOC

Structs

rd_kafka_conf_s
struct rd_kafka_conf_s
Undocumented in source but is binding to C. You might be able to learn more by searching the web for its name.
rd_kafka_err_desc
struct rd_kafka_err_desc

@brief Error code value, name and description. Typically for use with language bindings to automatically expose the full set of librdkafka error codes.

rd_kafka_group_info
struct rd_kafka_group_info

@brief Group information

rd_kafka_group_list
struct rd_kafka_group_list

@brief List of groups

rd_kafka_group_member_info
struct rd_kafka_group_member_info

@brief Group member information

rd_kafka_message_s
struct rd_kafka_message_s

@brief A Kafka message as returned by the \c rd_kafka_consume*() family of functions as well as provided to the Producer \c dr_msg_cb().

rd_kafka_metadata_broker
struct rd_kafka_metadata_broker

@brief Broker information

rd_kafka_metadata_partition
struct rd_kafka_metadata_partition

@brief Partition information

rd_kafka_metadata_t
struct rd_kafka_metadata_t

@brief Metadata container

rd_kafka_metadata_topic
struct rd_kafka_metadata_topic

@brief Topic information

rd_kafka_op_s
struct rd_kafka_op_s
Undocumented in source but is binding to C. You might be able to learn more by searching the web for its name.
rd_kafka_queue_s
struct rd_kafka_queue_s
Undocumented in source but is binding to C. You might be able to learn more by searching the web for its name.
rd_kafka_s
struct rd_kafka_s
Undocumented in source but is binding to C. You might be able to learn more by searching the web for its name.
rd_kafka_topic_conf_s
struct rd_kafka_topic_conf_s
Undocumented in source but is binding to C. You might be able to learn more by searching the web for its name.
rd_kafka_topic_partition_list_s
struct rd_kafka_topic_partition_list_s

@brief A growable list of Topic+Partitions.

rd_kafka_topic_partition_s
struct rd_kafka_topic_partition_s

@brief Generic place holder for a specific Topic+Partition.

rd_kafka_topic_s
struct rd_kafka_topic_s
Undocumented in source but is binding to C. You might be able to learn more by searching the web for its name.

Variables

RD_KAFKA_PARTITION_UA
enum int RD_KAFKA_PARTITION_UA;

@brief Unassigned partition.

Meta