这篇文章主要针对PHP生态的的kafka组件 php-rdkafka 进行相关的内核源码分析,方便大家把握组件的相关使用,目前文章主要针对kafka生产者部分。
一. 样例PHP代码
1public function __construct($config)
2 {
3 $conf = new \RdKafka\Conf();
4 $conf->set('metadata.broker.list', $config['brokerList']);
5 $conf->set('message.max.bytes', $config['messageMaxBytes']);
6 $conf->set('metadata.request.timeout.ms', $config['requestTimeout']);
7 $conf->set('session.timeout.ms', $config['sessionTimeout']);
8 $this->producer = new \RdKafka\Producer($conf);
9 $this->producer->addBrokers($config['brokerList']);
10 }
11
12 public function sendMessage($data){
13 $result = 1;
14 $topic = $this->producer->newTopic($data[0]['topic']);
15 $topic->produce(RD_KAFKA_PARTITION_UA, 0, $data[0]['value']);
16 $this->producer->poll(0);
17 for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
18 $result = $this->producer->flush(10000);
19 if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
20 break;
21 }
22 }
23 if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
24 throw new \RuntimeException('Was unable to flush, messages might be lost!');
25 }
26 }
二. RdKafka\Conf类内核实现
2.1 Conf钩子函数
Conf类注册在Conf.c源码文件kafka_conf_minit函数
1 INIT_NS_CLASS_ENTRY(tmpce, "RdKafka", "Conf", kafka_conf_fe);
2 ce_kafka_conf = zend_register_internal_class(&tmpce TSRMLS_CC);
3 ce_kafka_conf->create_object = kafka_conf_new; //注册对象创建钩子函数
4
5 INIT_NS_CLASS_ENTRY(tmpce, "RdKafka", "TopicConf", kafka_topic_conf_fe);
6 ce_kafka_topic_conf = zend_register_internal_class(&tmpce TSRMLS_CC);
7 ce_kafka_topic_conf->create_object = kafka_conf_new;
2.2 Conf对象内核创建阶段
PHP内核接入层暴露如下:
1PHP_METHOD(RdKafka__Conf, __construct)
2{
3 kafka_conf_object *intern;
4 zend_error_handling error_handling;
5
6 zend_replace_error_handling(EH_THROW, spl_ce_InvalidArgumentException, &error_handling TSRMLS_CC);
7
8 if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "") == FAILURE) {
9 zend_restore_error_handling(&error_handling TSRMLS_CC);
10 return;
11 }
12
13 intern = get_custom_object_zval(kafka_conf_object, getThis());
14 intern->type = KAFKA_CONF;
15 intern->u.conf = rd_kafka_conf_new(); //关键函数:rd_kafka_conf_new
16
17 zend_restore_error_handling(&error_handling TSRMLS_CC);
18}
19/* }}} */
RdKafka Conf的创建对象执行函数为 kafka_conf_new ,函数中关键是kafka_conf_object数据结构
1typedef struct _kafka_conf_object {
2#if PHP_MAJOR_VERSION < 7
3 zend_object std;
4#endif
5 kafka_conf_type type;
6 union {
7 rd_kafka_conf_t *conf; //此处关键,这个数据结构定义属于 librdkafka库
8 rd_kafka_topic_conf_t *topic_conf; //此处关键,这个数据结构定义属于 librdkafka库
9 } u;
10 kafka_conf_callbacks cbs;
11#if PHP_MAJOR_VERSION >= 7
12 zend_object std;
13#endif
14} kafka_conf_object;
1static zend_object_value kafka_conf_new(zend_class_entry *class_type TSRMLS_DC) /* {{{ */
2{
3 zend_object_value retval;
4 kafka_conf_object *intern;
5
6 intern = alloc_object(intern, class_type); //kafka conf对象内存分配
7 zend_object_std_init(&intern->std, class_type TSRMLS_CC);
8 object_properties_init(&intern->std, class_type);
9
10 STORE_OBJECT(retval, intern, (zend_objects_store_dtor_t) zend_objects_destroy_object, kafka_conf_free, NULL); //把创建好的object返回php调用层。
11 SET_OBJECT_HANDLERS(retval, &handlers);
12
13 return retval;
14}
15/* }}}
2.3 Conf对象Set函数
1PHP_METHOD(RdKafka__Conf, set)
2{
3 char *name;
4 arglen_t name_len;
5 char *value;
6 arglen_t value_len;
7 kafka_conf_object *intern;
8 rd_kafka_conf_res_t ret = 0;
9 char errstr[512];
10
11 if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "ss", &name, &name_len, &value, &value_len) == FAILURE) {
12 return;
13 }
14
15 intern = get_kafka_conf_object(getThis() TSRMLS_CC); //获取kafka_conf_new创建的kafka_conf_object对象
16 if (!intern) {
17 return;
18 }
19
20 errstr[0] = '\0';
21
22 switch (intern->type) { //获取librdkafka 的kafka config类型
23 case KAFKA_CONF: //进行librdkafka KAFKA_CONF参数设置
24 ret = rd_kafka_conf_set(intern->u.conf, name, value, errstr, sizeof(errstr));
25 break;
26 case KAFKA_TOPIC_CONF: //进行librdkafka KAFKA_TOPIC_CONF参数设置
27 ret = rd_kafka_topic_conf_set(intern->u.topic_conf, name, value, errstr, sizeof(errstr));
28 break;
29 }
30
31 switch (ret) { //此处的ret返回值为ibrdkafka库的执行状态返回,如果一场则进行异常上报
32 case RD_KAFKA_CONF_UNKNOWN: // -2
33 zend_throw_exception(ce_kafka_exception, errstr, RD_KAFKA_CONF_UNKNOWN TSRMLS_CC);
34 return;
35 case RD_KAFKA_CONF_INVALID: // -1
36 zend_throw_exception(ce_kafka_exception, errstr, RD_KAFKA_CONF_INVALID TSRMLS_CC);
37 return;
38 case RD_KAFKA_CONF_OK: // 0
39 break;
40 }
41}
42/* }}} */
这个set阶段,就是调用librdkafka的提供的set函数族相关函数进行参数传递。
三. RdKafka\Producer类 内核实现
3.1 Producer 类内核注册
1INIT_NS_CLASS_ENTRY(ce, "RdKafka", "Producer", kafka_producer_fe);
2ce_kafka_producer = rdkafka_register_internal_class_ex(&ce, ce_kafka TSRMLS_CC);
3//扩展模块初始化阶段、注册类内核结构
1static const zend_function_entry kafka_producer_fe[] = {
2 PHP_ME(RdKafka__Producer, __construct, arginfo_kafka_producer___construct, ZEND_ACC_PUBLIC)
3 PHP_FE_END
4};
接下来,看一下\RdKafka\Producer 的构造函数阶段,这个阶段关键是kafka_init函数
1PHP_METHOD(RdKafka__Producer, __construct)
2{
3 zval *zconf = NULL;
4 zend_error_handling error_handling;
5
6 zend_replace_error_handling(EH_THROW, spl_ce_InvalidArgumentException, &error_handling TSRMLS_CC);
7
8 if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|O!", &zconf, ce_kafka_conf) == FAILURE) {
9 zend_restore_error_handling(&error_handling TSRMLS_CC);
10 return;
11 }
12
13 kafka_init(getThis(), RD_KAFKA_PRODUCER, zconf TSRMLS_CC); //新建kafka库内存对象,这个对象有个选项,这个创建的RD_KAFKA_PRODUCER对象,即生产者对象。
14
15 zend_restore_error_handling(&error_handling TSRMLS_CC);
16}
17/* }}} */
四. 重要公共内核函数分析
4.1 kafka_init 内核函数(生产者、消费者)
这个是创建kafka生产者、消费者的内核函数,具体函数实现如下:
1static void kafka_init(zval *this_ptr, rd_kafka_type_t type, zval *zconf TSRMLS_DC) /* {{{ */
2{
3 char errstr[512];
4 rd_kafka_t *rk;
5 kafka_object *intern; //这是关键的kafka内核对象
6 kafka_conf_object *conf_intern;
7 rd_kafka_conf_t *conf = NULL;
8
9 intern = get_custom_object_zval(kafka_object, this_ptr); //获取当前对象注入或者绑定的conf属性
10 intern->type = type;
11
12 if (zconf) { //如果php内核有conf参数则使用获取conf的内部结构conf_intern
13 conf_intern = get_kafka_conf_object(zconf TSRMLS_CC);
14 if (conf_intern) {
15 //如果conf_intern结构存在
16 //调用rd_kafka_conf_dup这个librdkafka库函数进行创建配置对象conf的副本
17 conf = rd_kafka_conf_dup(conf_intern->u.conf);
18 //这个执行相关回调拷贝过程,大多数是拷贝函数执行过程的一些信息。
19 kafka_conf_callbacks_copy(&intern->cbs, &conf_intern->cbs TSRMLS_CC);
20 intern->cbs.zrk = *this_ptr;
21 //这部分也是librdkafka库内核函数,设置应用程序的不透明指针,该指针将传递给回调
22 rd_kafka_conf_set_opaque(conf, &intern->cbs);
23 }
24 }
25
26 //关键之处:
27 //此处根据type进行kafka相关对象元素的创建,主要是生产者、消费者对象
28 rk = rd_kafka_new(type, conf, errstr, sizeof(errstr));
29
30 if (rk == NULL) { //如果创建失败则上报异常
31 zend_throw_exception(ce_kafka_exception, errstr, 0 TSRMLS_CC);
32 return;
33 }
34
35 if (intern->cbs.log) {
36 rd_kafka_set_log_queue(rk, NULL);
37 }
38
39 intern->rk = rk; //这是关键,把librdkafka库的内存结构指针存储在php扩展层相关内存结构。
40
41 //如果是消费者对象 则针对php内核层的intern的consuming 和queues哈希表空间,并设置相关的内存回收钩子函数
42 if (type == RD_KAFKA_CONSUMER) {
43 zend_hash_init(&intern->consuming, 0, NULL, (dtor_func_t)toppar_pp_dtor, 0);
44 zend_hash_init(&intern->queues, 0, NULL, (dtor_func_t)kafka_queue_object_pre_free, 0);
45 }
46
47 //生产者、消费者均需要初始化intern对象的topics哈希表空间。
48 zend_hash_init(&intern->topics, 0, NULL, (dtor_func_t)kafka_topic_object_pre_free, 0);
49}
50/* }}} */
4.2 addBrokers函数(生产者、消费者)
1PHP_METHOD(RdKafka__Kafka, addBrokers)
2{
3 char *broker_list;
4 arglen_t broker_list_len;
5 kafka_object *intern;
6
7 //把broker_list php语言层的字符串数组转换为 当前的broker_list字符串空间。
8 if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s", &broker_list, &broker_list_len) == FAILURE) {
9 return;
10 }
11
12 intern = get_kafka_object(getThis() TSRMLS_CC); //获取当前intern对象 当前可能是消费者、或者生产者
13 if (!intern) {
14 return;
15 }
16
17 //利用librdkafka库进行集群节点的注入操作
18 RETURN_LONG(rd_kafka_brokers_add(intern->rk, broker_list));
19}
20/* }}} */
4.3 newTopic函数(生产者、消费者)
1PHP_METHOD(RdKafka__Kafka, newTopic)
2{
3 char *topic;
4 arglen_t topic_len;
5 rd_kafka_topic_t *rkt;
6 kafka_object *intern;
7 kafka_topic_object *topic_intern;
8 zend_class_entry *topic_type;
9 zval *zconf = NULL;
10 rd_kafka_topic_conf_t *conf = NULL;
11 kafka_conf_object *conf_intern;
12
13 //解析PHP层参数,参数解析进入topic参数
14 if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s|O!", &topic, &topic_len, &zconf, ce_kafka_topic_conf) == FAILURE) {
15 return;
16 }
17
18 //获取当下的kafka对象
19 intern = get_kafka_object(getThis() TSRMLS_CC);
20 if (!intern) {
21 return;
22 }
23
24 //获取kafka配置对象
25 if (zconf) {
26 conf_intern = get_kafka_conf_object(zconf TSRMLS_CC);
27 if (conf_intern) {
28 conf = rd_kafka_topic_conf_dup(conf_intern->u.topic_conf);
29 }
30 }
31
32 //调用librdkafka库rd_kafka_topic_new函数创建新的kafka topic内存对象
33 rkt = rd_kafka_topic_new(intern->rk, topic, conf);
34
35 if (!rkt) {
36 return;
37 }
38
39 //根据消费者或者生产者的类别进行topic类型设置
40 switch (intern->type) {
41 case RD_KAFKA_CONSUMER:
42 topic_type = ce_kafka_consumer_topic;
43 break;
44 case RD_KAFKA_PRODUCER:
45 topic_type = ce_kafka_producer_topic;
46 break;
47 default:
48 return;
49 }
50
51 //创建PHP层topic对象作为返回值
52 if (object_init_ex(return_value, topic_type) != SUCCESS) {
53 return;
54 }
55
56 topic_intern = get_custom_object_zval(kafka_topic_object, return_value);
57 if (!topic_intern) {
58 return;
59 }
60
61 topic_intern->rkt = rkt;
62#if PHP_MAJOR_VERSION >= 7
63 topic_intern->zrk = *getThis();
64#else
65 topic_intern->zrk = getThis();
66#endif
67 Z_ADDREF_P(P_ZEVAL(topic_intern->zrk));
68
69 zend_hash_index_add_ptr(&intern->topics, (zend_ulong)topic_intern, topic_intern);
70}
71/* }}} */
4.4 poll函数
poll提供了kafka的事件句柄,事件将导致应用程序提供的回调被调用。timeout_ms参数指定最长时间,以毫秒为单位),该调用将阻止等待事件,对于非阻塞调用,设置参数为0,如果需要无限等待则设置参数为-1。
1PHP_METHOD(RdKafka__Kafka, poll)
2{
3 kafka_object *intern;
4 zend_long timeout;
5
6 if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "l", &timeout) == FAILURE) {
7 return;
8 }
9
10 //获取php内核当前环境的kafka对象
11 intern = get_kafka_object(getThis() TSRMLS_CC);
12 if (!intern) {
13 return;
14 }
15
16 //调用librdkafka库rd_kafka_poll函数等待kafka事件。
17 RETURN_LONG(rd_kafka_poll(intern->rk, timeout));
18}
4.5 flush函数
等待所有的生产请求结束,通常应在销毁生产者实例之前完成,确保所有排队的生产请求均已完成,该函数将调用rd_kafka_poll()并由此触发回调。
如果已启用RD_KAFKA_EVENT_DR(通过rd_kafka_conf_set_events函数),该函数不会调用rd_kafka_poll(),但要等待librdkafka处理消息计数达到零。这需要应用程序在单独的线程中提供事件队列。在此模式下,仅统计messages,不计算其他类型的排队的事件。
返回值:RD_KAFKA_RESP_ERR__TIMED_OUT(如果在所有时间之前都达到了timeout_ms),未完成的请求已完成,否则RD_KAFKA_RESP_ERR_NO_ERROR
1/* {{{ proto int RdKafka\Kafka::flush(int $timeout_ms)
2 Wait until all outstanding produce requests, et.al, are completed. */
3ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_flush, 0, 0, 1)
4 ZEND_ARG_INFO(0, timeout_ms)
5ZEND_END_ARG_INFO()
6
7PHP_METHOD(RdKafka__Kafka, flush)
8{
9 kafka_object *intern;
10 zend_long timeout;
11
12 if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "l", &timeout) == FAILURE) {
13 return;
14 }
15 //获取当前kafka内存对象
16 intern = get_kafka_object(getThis() TSRMLS_CC);
17 if (!intern) {
18 return;
19 }
20
21 //针对kafka内存对象的rk,这个对象可以是消费者、也可以是生产者。
22 //调用librdkafka库的rd_kafka_flush函数进行刷新操作,内部会控制调用rd_kafka_poll函数。
23 RETURN_LONG(rd_kafka_flush(intern->rk, timeout));
24}
25/* }}} */