PHP扩展之kafka安装应用案例详解
话说用了就要有点产出,要不然过段时间又忘了,所以在这里就记录一下试用Kafka的安装过程和php扩展的试用。
实话说,如果用于队列的话,跟PHP比较配的,还是Redis。用的顺手,呵呵,只是Redis不能有多个consumer。但Kafka官方对PHP不支持,PHP扩展是爱好者或使用者写的。下面就开始讲Kafka的安装吧。我以CentOS6.4为例,64位。
一. 首先确认下jdk有没有安装使用命令
23
4
[root@localhost ~]
# java -version
java version
"1.8.0_73"
Java(TM) SE Runtime Environment (build 1.8.0_73-b02)
Java HotSpot(TM) 64-Bit Server VM (build 25.73-b02, mixed mode)
如果有以上信息的话,就往下安装吧,有些可能是jdk对不上,那就装到对的上的。如果没有安装,就看一下下面的jdk安装方法:
oracle/technetwork/java/javase/downloads/jdk8-downloads-2133151
到这个地址下载jdk8版本,我下载的是jdk-8u73-linux-x64.tar.gz,然后解压到/usr/local/jdk/下。
然后打开/etc/profile文件
[root@localhost ~]
# vim /etc/profile
把下面这段代码写到文件里
23
export
JAVA_HOME=
/usr/local/jdk/jdk1
.8.0_73
export
CLASSPATH=.:$JAVA_HOME
/lib/tools
.jar:$JAVA_HOME
/lib/dt
.jar
export
PATH=$JAVA_HOME
/bin
:$PATH
最后
[root@localhost ~]
# source /etc/profile
这时jdk就生效了,可以使用 java -version验证下。
二. 接下来安装Kafka1. 下载Kafka
到kafka.apache.org/downloads下载相应的版本,我使用的是kafka_2.9.1-0.8.2.2.tgz。
2. 下载完解压到你喜欢的目录
我是解压到 /usr/local/kafka/kafka_2.9.1-0.8.2.2
3. 运行默认的Kafka
启动Zookeeper server
[root@localhost kafka_2.9.1-0.8.2.2]
# sh bin/zookeeper-server-start.sh config/zookeeper.properties &
启动Kafka server
[root@localhost kafka_2.9.1-0.8.2.2]
# sh bin/kafka-server-start.sh config/server.properties &
运行生产者producer
[root@localhost kafka_2.9.1-0.8.2.2]
# sh bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
运行消费者consumer
[root@localhost kafka_2.9.1-0.8.2.2]
# sh bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
这样,在producer那边输入内容,consumer马上就能接收到。
4. 当有跨机的producer或consumer连接时
需要配置config/server.properties的host.name,要不然跨机的连不上。
三. Kafka-PHP扩展使用了一圈,就github/nmred/kafka-php可以用。
我是使用composer安装的,以下是示例:
producer.php
23
4
5
6
7
8
9
10
11
12
13
14
15
<?php
require
'vendor/autoload.php'
;
while
(1) {
$part
= mt_rand(0, 1);
$produce
= \Kafka\Produce::getInstance(
'kafka0:2181'
, 3000);
// get available partitions
$partitions
=
$produce
->getAvailablePartitions(
'topic_name'
);
var_dump(
$partitions
);
// send message
$produce
->setRequireAck(-1);
$produce
->setMessages(
'topic_name'
, 0,
array
(
date
(
'Y-m-d H:i:s'
));
sleep(3);
}
consumer.php
23
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
require
'vendor/autoload.php'
;
$consumer
= \Kafka\Consumer::getInstance(
'kafka0:2181'
);
$group
=
'topic_name'
;
$consumer
->setGroup(
$group
);
$consumer
->setFromOffset(true);
$consumer
->setTopic(
'topic_name'
, 0);
$consumer
->setMaxBytes(102400);
$result
=
$consumer
->fetch();
print_r(
$result
);
foreach
(
$result
as
$topicName
=>
$partition
) {
foreach
(
$partition
as
$partId
=>
$messageSet
) {
var_dump(
$partition
->getHighOffset());
foreach
(
$messageSet
as
$message
) {
var_dump((string)
$message
);
}
var_dump(
$partition
->getMessageOffset());
}
}
到此这篇关于PHP扩展之kafka安装应用案例详解的文章就介绍到这了,更多相关PHP扩展之kafka安装应用内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!
上一篇:php curl发起get与post网络请求案例详解
下一篇:实例解析PHP定时器的具体实现