本文实例讲述了php测试kafka项目。分享给大家供大家参考,具体如下:
概述
Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。
主要应用场景是:日志收集系统和消息系统。
安装kafka-php项目依赖
composer require nmred/kafka-php
produce.php
<?php require './vendor/autoload.php'; date_default_timezone_set('PRC'); $config = \Kafka\ProducerConfig::getInstance(); $config->setMetadataRefreshIntervalMs(10000); $config->setMetadataBrokerList('127.0.0.1:9092'); $config->setBrokerVersion('0.10.2.1'); $config->setRequiredAck(1); $config->setIsAsyn(false); $config->setProduceInterval(500); $producer = new \Kafka\Producer(function() { $t = time(); return array( array( 'topic' => 'test', 'value' => $t, 'key' => $t, ), ); }); $producer->success(function($result) { var_export($result); }); $producer->error(function($errorCode) { var_dump('error', $errorCode); }); $producer->send();
consumer.php
<?php require './vendor/autoload.php'; date_default_timezone_set('PRC'); $config = \Kafka\ConsumerConfig::getInstance(); $config->setMetadataRefreshIntervalMs(10000); $config->setMetadataBrokerList('127.0.0.1:9092'); $config->setGroupId('test'); $config->setBrokerVersion('0.10.2.1'); $config->setTopics(array('test')); $consumer = new \Kafka\Consumer(); $consumer->start(function($topic, $part, $message) { var_dump($message); });
测试生产者
php produce.php
测试消费者
php consumer.php
更多关于PHP相关内容感兴趣的读者可查看本站专题