PHP-rdkafka 内核扩展相关源码分析

Posted by LB on Fri, Mar 6, 2020

这篇文章主要针对PHP生态的的kafka组件 php-rdkafka 进行相关的内核源码分析,方便大家把握组件的相关使用,目前文章主要针对kafka生产者部分。

一. 样例PHP代码

 1public function __construct($config)
 2    {
 3        $conf = new \RdKafka\Conf();
 4        $conf->set('metadata.broker.list', $config['brokerList']);
 5        $conf->set('message.max.bytes', $config['messageMaxBytes']);
 6        $conf->set('metadata.request.timeout.ms', $config['requestTimeout']);
 7        $conf->set('session.timeout.ms', $config['sessionTimeout']);
 8        $this->producer = new \RdKafka\Producer($conf);
 9        $this->producer->addBrokers($config['brokerList']);
10    }
11
12    public function sendMessage($data){
13        $result = 1;
14        $topic = $this->producer->newTopic($data[0]['topic']);
15        $topic->produce(RD_KAFKA_PARTITION_UA, 0, $data[0]['value']);
16        $this->producer->poll(0);
17        for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
18            $result = $this->producer->flush(10000);
19            if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
20                break;
21            }
22        }
23        if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
24            throw new \RuntimeException('Was unable to flush, messages might be lost!');
25        }
26    }

二. RdKafka\Conf类内核实现

2.1 Conf钩子函数

Conf类注册在Conf.c源码文件kafka_conf_minit函数

1    INIT_NS_CLASS_ENTRY(tmpce, "RdKafka", "Conf", kafka_conf_fe);
2    ce_kafka_conf = zend_register_internal_class(&tmpce TSRMLS_CC);
3    ce_kafka_conf->create_object = kafka_conf_new; //注册对象创建钩子函数
4
5    INIT_NS_CLASS_ENTRY(tmpce, "RdKafka", "TopicConf", kafka_topic_conf_fe);
6    ce_kafka_topic_conf = zend_register_internal_class(&tmpce TSRMLS_CC);
7    ce_kafka_topic_conf->create_object = kafka_conf_new;

2.2 Conf对象内核创建阶段

PHP内核接入层暴露如下:

 1PHP_METHOD(RdKafka__Conf, __construct)
 2{
 3    kafka_conf_object *intern;
 4    zend_error_handling error_handling;
 5
 6    zend_replace_error_handling(EH_THROW, spl_ce_InvalidArgumentException, &error_handling TSRMLS_CC);
 7
 8    if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "") == FAILURE) {
 9        zend_restore_error_handling(&error_handling TSRMLS_CC);
10        return;
11    }
12
13    intern = get_custom_object_zval(kafka_conf_object, getThis());
14    intern->type = KAFKA_CONF;
15    intern->u.conf = rd_kafka_conf_new(); //关键函数:rd_kafka_conf_new
16
17    zend_restore_error_handling(&error_handling TSRMLS_CC);
18}
19/* }}} */

RdKafka Conf的创建对象执行函数为 kafka_conf_new ,函数中关键是kafka_conf_object数据结构

 1typedef struct _kafka_conf_object {
 2#if PHP_MAJOR_VERSION < 7
 3    zend_object                 std;
 4#endif
 5    kafka_conf_type type;
 6    union {
 7        rd_kafka_conf_t         *conf;  //此处关键,这个数据结构定义属于 librdkafka库
 8        rd_kafka_topic_conf_t   *topic_conf; //此处关键,这个数据结构定义属于 librdkafka库
 9    } u;
10    kafka_conf_callbacks cbs;
11#if PHP_MAJOR_VERSION >= 7
12    zend_object                 std;
13#endif
14} kafka_conf_object;
 1static zend_object_value kafka_conf_new(zend_class_entry *class_type TSRMLS_DC) /* {{{ */
 2{
 3    zend_object_value retval;
 4    kafka_conf_object *intern;
 5
 6    intern = alloc_object(intern, class_type); //kafka conf对象内存分配
 7    zend_object_std_init(&intern->std, class_type TSRMLS_CC);
 8    object_properties_init(&intern->std, class_type);
 9
10    STORE_OBJECT(retval, intern, (zend_objects_store_dtor_t) zend_objects_destroy_object, kafka_conf_free, NULL); //把创建好的object返回php调用层。
11    SET_OBJECT_HANDLERS(retval, &handlers);
12
13    return retval;
14}
15/* }}}

2.3 Conf对象Set函数

 1PHP_METHOD(RdKafka__Conf, set)
 2{
 3    char *name;
 4    arglen_t name_len;
 5    char *value;
 6    arglen_t value_len;
 7    kafka_conf_object *intern;
 8    rd_kafka_conf_res_t ret = 0;
 9    char errstr[512];
10
11    if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "ss", &name, &name_len, &value, &value_len) == FAILURE) {
12        return;
13    }
14
15    intern = get_kafka_conf_object(getThis() TSRMLS_CC); //获取kafka_conf_new创建的kafka_conf_object对象
16    if (!intern) {
17        return;
18    }
19
20    errstr[0] = '\0';
21
22    switch (intern->type) { //获取librdkafka 的kafka config类型
23        case KAFKA_CONF: //进行librdkafka KAFKA_CONF参数设置
24            ret = rd_kafka_conf_set(intern->u.conf, name, value, errstr, sizeof(errstr));
25            break;
26        case KAFKA_TOPIC_CONF: //进行librdkafka KAFKA_TOPIC_CONF参数设置
27            ret = rd_kafka_topic_conf_set(intern->u.topic_conf, name, value, errstr, sizeof(errstr));
28            break;
29    }
30
31    switch (ret) { //此处的ret返回值为ibrdkafka库的执行状态返回,如果一场则进行异常上报
32        case RD_KAFKA_CONF_UNKNOWN: // -2
33            zend_throw_exception(ce_kafka_exception, errstr, RD_KAFKA_CONF_UNKNOWN TSRMLS_CC);
34            return;
35        case RD_KAFKA_CONF_INVALID: // -1
36            zend_throw_exception(ce_kafka_exception, errstr, RD_KAFKA_CONF_INVALID TSRMLS_CC);
37            return;
38        case RD_KAFKA_CONF_OK: // 0
39            break;
40    }
41}
42/* }}} */

这个set阶段,就是调用librdkafka的提供的set函数族相关函数进行参数传递。

三. RdKafka\Producer类 内核实现

3.1 Producer 类内核注册

1INIT_NS_CLASS_ENTRY(ce, "RdKafka", "Producer", kafka_producer_fe);
2ce_kafka_producer = rdkafka_register_internal_class_ex(&ce, ce_kafka TSRMLS_CC);
3//扩展模块初始化阶段、注册类内核结构
1static const zend_function_entry kafka_producer_fe[] = {
2    PHP_ME(RdKafka__Producer, __construct, arginfo_kafka_producer___construct, ZEND_ACC_PUBLIC)
3    PHP_FE_END
4};

接下来,看一下\RdKafka\Producer 的构造函数阶段,这个阶段关键是kafka_init函数

 1PHP_METHOD(RdKafka__Producer, __construct)
 2{
 3    zval *zconf = NULL;
 4    zend_error_handling error_handling;
 5
 6    zend_replace_error_handling(EH_THROW, spl_ce_InvalidArgumentException, &error_handling TSRMLS_CC);
 7
 8    if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|O!", &zconf, ce_kafka_conf) == FAILURE) {
 9        zend_restore_error_handling(&error_handling TSRMLS_CC);
10        return;
11    }
12
13    kafka_init(getThis(), RD_KAFKA_PRODUCER, zconf TSRMLS_CC); //新建kafka库内存对象,这个对象有个选项,这个创建的RD_KAFKA_PRODUCER对象,即生产者对象。
14
15    zend_restore_error_handling(&error_handling TSRMLS_CC);
16}
17/* }}} */

四. 重要公共内核函数分析

4.1 kafka_init 内核函数(生产者、消费者)

这个是创建kafka生产者、消费者的内核函数,具体函数实现如下:

 1static void kafka_init(zval *this_ptr, rd_kafka_type_t type, zval *zconf TSRMLS_DC) /* {{{ */
 2{
 3    char errstr[512];
 4    rd_kafka_t *rk;
 5    kafka_object *intern; //这是关键的kafka内核对象
 6    kafka_conf_object *conf_intern;
 7    rd_kafka_conf_t *conf = NULL;
 8
 9    intern = get_custom_object_zval(kafka_object, this_ptr); //获取当前对象注入或者绑定的conf属性
10    intern->type = type;
11
12    if (zconf) { //如果php内核有conf参数则使用获取conf的内部结构conf_intern
13        conf_intern = get_kafka_conf_object(zconf TSRMLS_CC);
14        if (conf_intern) { 
15          //如果conf_intern结构存在
16          //调用rd_kafka_conf_dup这个librdkafka库函数进行创建配置对象conf的副本
17            conf = rd_kafka_conf_dup(conf_intern->u.conf);
18          //这个执行相关回调拷贝过程,大多数是拷贝函数执行过程的一些信息。
19            kafka_conf_callbacks_copy(&intern->cbs, &conf_intern->cbs TSRMLS_CC);
20            intern->cbs.zrk = *this_ptr;
21          //这部分也是librdkafka库内核函数,设置应用程序的不透明指针,该指针将传递给回调
22            rd_kafka_conf_set_opaque(conf, &intern->cbs);
23        }
24    }
25
26    //关键之处:
27    //此处根据type进行kafka相关对象元素的创建,主要是生产者、消费者对象
28    rk = rd_kafka_new(type, conf, errstr, sizeof(errstr));
29
30    if (rk == NULL) { //如果创建失败则上报异常
31        zend_throw_exception(ce_kafka_exception, errstr, 0 TSRMLS_CC);
32        return;
33    }
34
35    if (intern->cbs.log) {
36        rd_kafka_set_log_queue(rk, NULL);
37    }
38
39    intern->rk = rk; //这是关键,把librdkafka库的内存结构指针存储在php扩展层相关内存结构。
40
41   //如果是消费者对象 则针对php内核层的intern的consuming 和queues哈希表空间,并设置相关的内存回收钩子函数
42    if (type == RD_KAFKA_CONSUMER) {
43        zend_hash_init(&intern->consuming, 0, NULL, (dtor_func_t)toppar_pp_dtor, 0);
44        zend_hash_init(&intern->queues, 0, NULL, (dtor_func_t)kafka_queue_object_pre_free, 0);
45    }
46
47   //生产者、消费者均需要初始化intern对象的topics哈希表空间。
48    zend_hash_init(&intern->topics, 0, NULL, (dtor_func_t)kafka_topic_object_pre_free, 0);
49}
50/* }}} */

4.2 addBrokers函数(生产者、消费者)

 1PHP_METHOD(RdKafka__Kafka, addBrokers)
 2{
 3    char *broker_list;
 4    arglen_t broker_list_len;
 5    kafka_object *intern;
 6
 7    //把broker_list php语言层的字符串数组转换为 当前的broker_list字符串空间。 
 8    if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s", &broker_list, &broker_list_len) == FAILURE) {
 9        return;
10    }
11
12    intern = get_kafka_object(getThis() TSRMLS_CC); //获取当前intern对象 当前可能是消费者、或者生产者
13    if (!intern) {
14        return;
15    }
16
17   //利用librdkafka库进行集群节点的注入操作
18    RETURN_LONG(rd_kafka_brokers_add(intern->rk, broker_list));
19}
20/* }}} */

4.3 newTopic函数(生产者、消费者)

 1PHP_METHOD(RdKafka__Kafka, newTopic)
 2{
 3    char *topic;
 4    arglen_t topic_len;
 5    rd_kafka_topic_t *rkt;
 6    kafka_object *intern;
 7    kafka_topic_object *topic_intern;
 8    zend_class_entry *topic_type;
 9    zval *zconf = NULL;
10    rd_kafka_topic_conf_t *conf = NULL;
11    kafka_conf_object *conf_intern;
12   
13    //解析PHP层参数,参数解析进入topic参数
14    if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s|O!", &topic, &topic_len, &zconf, ce_kafka_topic_conf) == FAILURE) {
15        return;
16    }
17
18   //获取当下的kafka对象
19    intern = get_kafka_object(getThis() TSRMLS_CC); 
20    if (!intern) {
21        return;
22    }
23
24   //获取kafka配置对象
25    if (zconf) {
26        conf_intern = get_kafka_conf_object(zconf TSRMLS_CC);
27        if (conf_intern) {
28            conf = rd_kafka_topic_conf_dup(conf_intern->u.topic_conf);
29        }
30    }
31
32   //调用librdkafka库rd_kafka_topic_new函数创建新的kafka topic内存对象
33    rkt = rd_kafka_topic_new(intern->rk, topic, conf);
34
35    if (!rkt) {
36        return;
37    }
38
39   //根据消费者或者生产者的类别进行topic类型设置
40    switch (intern->type) {
41        case RD_KAFKA_CONSUMER:
42            topic_type = ce_kafka_consumer_topic;
43            break;
44        case RD_KAFKA_PRODUCER:
45            topic_type = ce_kafka_producer_topic;
46            break;
47        default:
48            return;
49    }
50
51    //创建PHP层topic对象作为返回值
52    if (object_init_ex(return_value, topic_type) != SUCCESS) {
53        return;
54    }
55
56    topic_intern = get_custom_object_zval(kafka_topic_object, return_value);
57    if (!topic_intern) {
58        return;
59    }
60
61    topic_intern->rkt = rkt;
62#if PHP_MAJOR_VERSION >= 7
63    topic_intern->zrk = *getThis();
64#else
65    topic_intern->zrk = getThis();
66#endif
67    Z_ADDREF_P(P_ZEVAL(topic_intern->zrk));
68
69    zend_hash_index_add_ptr(&intern->topics, (zend_ulong)topic_intern, topic_intern);
70}
71/* }}} */

4.4 poll函数

poll提供了kafka的事件句柄,事件将导致应用程序提供的回调被调用。timeout_ms参数指定最长时间,以毫秒为单位),该调用将阻止等待事件,对于非阻塞调用,设置参数为0,如果需要无限等待则设置参数为-1。

 1PHP_METHOD(RdKafka__Kafka, poll)
 2{
 3    kafka_object *intern;
 4    zend_long timeout;
 5
 6    if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "l", &timeout) == FAILURE) {
 7        return;
 8    }
 9
10    //获取php内核当前环境的kafka对象
11    intern = get_kafka_object(getThis() TSRMLS_CC);
12    if (!intern) {
13        return;
14    }
15
16    //调用librdkafka库rd_kafka_poll函数等待kafka事件。
17    RETURN_LONG(rd_kafka_poll(intern->rk, timeout));
18}

4.5 flush函数

等待所有的生产请求结束,通常应在销毁生产者实例之前完成,确保所有排队的生产请求均已完成,该函数将调用rd_kafka_poll()并由此触发回调。

如果已启用RD_KAFKA_EVENT_DR(通过rd_kafka_conf_set_events函数),该函数不会调用rd_kafka_poll(),但要等待librdkafka处理消息计数达到零。这需要应用程序在单独的线程中提供事件队列。在此模式下,仅统计messages,不计算其他类型的排队的事件。

返回值:RD_KAFKA_RESP_ERR__TIMED_OUT(如果在所有时间之前都达到了timeout_ms),未完成的请求已完成,否则RD_KAFKA_RESP_ERR_NO_ERROR

 1/* {{{ proto int RdKafka\Kafka::flush(int $timeout_ms)
 2   Wait until all outstanding produce requests, et.al, are completed. */
 3ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_flush, 0, 0, 1)
 4    ZEND_ARG_INFO(0, timeout_ms)
 5ZEND_END_ARG_INFO()
 6
 7PHP_METHOD(RdKafka__Kafka, flush)
 8{
 9    kafka_object *intern;
10    zend_long timeout;
11
12    if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "l", &timeout) == FAILURE) {
13        return;
14    }
15    //获取当前kafka内存对象
16    intern = get_kafka_object(getThis() TSRMLS_CC);
17    if (!intern) {
18        return;
19    }
20
21    //针对kafka内存对象的rk,这个对象可以是消费者、也可以是生产者。
22    //调用librdkafka库的rd_kafka_flush函数进行刷新操作,内部会控制调用rd_kafka_poll函数。
23    RETURN_LONG(rd_kafka_flush(intern->rk, timeout));
24}
25/* }}} */