首页 » 网站建设 » kafkaphpswoole技巧_swoole结合kafka实现超高机能消息队列

kafkaphpswoole技巧_swoole结合kafka实现超高机能消息队列

访客 2024-11-27 0

扫一扫用手机浏览

文章目录 [+]

利用kafka结合easyswoole异步定时任务已经多进程来实现一个高性能的行列步队做事,紧张用来实现飞飞物联的设备逻辑(规则引擎),比如根据传感器的数据发短信等等。

首先连接kafka,这里的kaka我利用的百度云供应的kafka做事,自己支配太麻烦而且难以掩护,连接的参考例子在这里https://github.com/BCEBIGDATA/kafka-sample-php ,实在最想利用的是微博的那个不该用扩展来连接kafka的库,但是一贯没有办理利用ssl文件连接的问题,因此便是用了rdkafka扩展,首先按照样例中的解释安装librdkafka

kafkaphpswoole技巧_swoole结合kafka实现超高机能消息队列

sh setup-librdkafka.shpecl install rdkafkaecho "extension=rdkafka.so" >> /etc/php.ini //根据实际位置

这样就安装好了librdkafka和php扩展,要把稳的是版本号必须要新一些的,否则利用ssl的会报没有该设置项的非常,排查这个非常花了一晚上的韶光。

kafkaphpswoole技巧_swoole结合kafka实现超高机能消息队列
(图片来自网络侵删)

接下来在easyswoole创建一个连接kafka的基类,在飞飞物联的项目中只会利用到consumer,由于producer的数据是来自天工的设备数据

kafka.php – 连接kafka的基类

namespace App\Lib\Kafka;use \RdKafka\Conf;use \RdKafka\KafkaConsumer;use Swoole\Exception;class Kafka{ private $topic = ''; private $config = [ 'broker' => 'xxxxxxxxx:9092', 'security_protocol' => 'ssl', 'client_pem' => EASYSWOOLE_ROOT.'/App/Lib/Kafka/client.pem', 'client_key' => EASYSWOOLE_ROOT.'/App/Lib/Kafka/client.key', 'ca_pem' => EASYSWOOLE_ROOT.'/App/Lib/Kafka/ca.pem', 'group_id' => 'kafka-feifei-swoole-consumer' ]; public function __construct($topic) { if(!extension_loaded(rdkafka)){ throw new Exception('rdkafka.so扩展必须开启'); } if(!isset($topic) || empty($topic)){ throw new Exception('kafak实例化必须设置topic'); } $this->topic = $topic; } public function subscribe(){ $conf = new \RdKafka\Conf(); $conf->set('metadata.broker.list', $this->config['broker']); $conf->set('group.id', $this->config['group_id'].rand(0,10)); $conf->set('security.protocol', $this->config['security_protocol']); $conf->set('ssl.certificate.location', $this->config['client_pem']); $conf->set('ssl.key.location', $this->config['client_key']); $conf->set('ssl.ca.location', $this->config['ca_pem']); $consumer = new \RdKafka\KafkaConsumer($conf); $consumer->subscribe([$this->topic]); return $consumer; }}

这里须要特殊把稳的是PHPstorm的代码检讨器彷佛找不到rdkafka这个扩展,但是没有关系,我没可以在初始化这个类的时候判断一下扩展是否存在。
这里只实现了消费者,要利用消费者须要实例化的时候传入消费者的topic,然后调用subscribe方法,接下来实际在easyswoole的mainServiceCreate中创建三个进程来处理kafka的订阅事宜

public static function mainServerCreate(EventRegister $register){ // TODO: Implement mainServerCreate() method.// 注册Kafka消费事宜, 开三个进程来处理 $allNum = 3; for($i = 0; $i < $allNum; $i++){ ServerManager::getInstance()->getSwooleServer()->addProcess((new Consumer("consumer_{$i}"))->getProcess()); }}

这里new的Consumer便是处理消费的进程

Consumer.php

<?php/ Created by bingxiong. Date: 4/8/19 Time: 10:22 PM Description: /namespace App\Lib\Kafka;use EasySwoole\Component\Process\AbstractProcess;class Consumer extends AbstractProcess{ private $isRun = false; public function run($arg) { // 在这里处理kafka连接 // TODO: Implement run() method. $this->addTick(500,function (){ if(!$this->isRun){ $this->isRun = true; // 连接kafka并订阅TOPIC $kafka = new Kafka('xxxxxxxxxxxx');//topic $consumer = $kafka->subscribe(); while(true){ try{ $message = $consumer->consume(1201000); if($message){ switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: echo 'process name is'.$this->getProcessName().'\n'; echo "partition:", $message->partition,", offset:", $message->offset,", ", $message->payload, "\n"; break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "No more messages; will wait for more\n"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "Timed out\n"; break; default: throw new \Exception($message->errstr(), $message->err); break; } }else{ break; } }catch (\Throwable $throwable){ break; } } $this->isRun = false; } var_dump($this->getProcessName().' task run check'); }); } public function onShutDown() { // TODO: Implement onShutDown() method. } public function onReceive(string $str, ...$args) { // TODO: Implement onReceive() method. }}

这里利用了一个异步任务addTick,如果长期没有的话也会每500秒去检讨一下有没有新的。
这里还是用了一个去世循环,在这里去世循环中持续处理过来之后的逻辑

1.png

现在已经利用swoole+kafka拿到设备的数据了,接下来便是利用异步任务或者异步redis之类的去实行相应的业务逻辑了。

本文作者熊冰,个人网站[Bing的天涯路](https://bingxiong.vip/),转载请注明出处。

标签:

相关文章

微信第三方登录便捷与安全的完美融合

社交平台已成为人们日常生活中不可或缺的一部分。微信作为我国最受欢迎的社交软件之一,拥有庞大的用户群体。为了方便用户在不同平台间切换...

网站建设 2025-02-18 阅读0 评论0

广东高速代码表解码高速公路管理智慧

高速公路作为国家交通动脉,连接着城市与城市,承载着巨大的物流和人流。广东作为我国经济大省,高速公路网络密布,交通流量巨大。为了更好...

网站建设 2025-02-18 阅读0 评论0