Apache Kafka 是一个分布式流处理平台,广泛用于构建实时数据管道和流应用。以下是使用 PHP 操作 Kafka 的基本步骤和示例,帮助您快速上手。
1. 环境准备
1.1 安装 Kafka
首先,您需要在本地或服务器上安装 Kafka。可以从 Kafka 官方网站 下载并解压 Kafka。
1.2 启动 Kafka 和 Zookeeper
Kafka 依赖 Zookeeper,因此需要先启动 Zookeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties
然后启动 Kafka 服务器:
bin/kafka-server-start.sh config/server.properties
1.3 安装 PHP 扩展
为了在 PHP 中使用 Kafka,您可以使用 php-rdkafka
扩展。您可以通过 PECL 安装:
sudo pecl install rdkafka
确保在 php.ini
文件中添加以下行:
extension=rdkafka.so
2. 创建 Kafka 主题
在使用 Kafka 之前,您需要创建一个主题。可以使用以下命令创建一个名为 test
的主题:
bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1
3. PHP 生产者示例
下面是一个简单的 PHP 生产者示例,用于发送消息到 Kafka 主题:
<?php
$conf = new RdKafka\Conf();
$conf->set('metadata.broker.list', 'localhost:9092'); // 设置 broker 地址
$producer = new RdKafka\Producer($conf);
$topic = $producer->newTopic("test");
for ($i = 0; $i < 10; $i++) {
$message = "Message $i";
$topic->produce(RD_KAFKA_PARTITION_UA, 0, $message);
echo "Sent: $message\n";
}
$producer->flush(10000); // 确保消息发送
?>
4. PHP 消费者示例
下面是一个简单的 PHP 消费者示例,用于从 Kafka 主题中接收消息:
<?php
$conf = new RdKafka\Conf();
$conf->set('metadata.broker.list', 'localhost:9092'); // 设置 broker 地址
$consumer = new RdKafka\Consumer($conf);
$consumer->addBrokers("localhost:9092");
$topic = $consumer->newTopic("test");
$topic->consumeStart(0, RD_KAFKA_OFFSET_END); // 从最新位置开始消费
while (true) {
$message = $topic->consume(0, 1000); // 等待消息
if ($message->err) {
if ($message->err == RD_KAFKA_RESP_ERR__TIMED_OUT) {
continue;
}
echo "Consumer error: {$message->err}\n";
break;
}
echo "Received: {$message->payload}\n";
}
?>
5. 运行示例
运行生产者:在一个终端中运行生产者脚本:
php producer.php
运行消费者:在另一个终端中运行消费者脚本:
php consumer.php
您应该能够看到生产者发送的消息被消费者接收。