上一课大家跟福哥学会了在我们的TFLinux系统上面安装Kafka软件,本日福哥要带着大家学习利用Python去操作Kafka系统的方法。
Python操作Kafka可以利用pykafka这个库来实现,看这个命名是不是很眼熟?对了,类似的库名还有一个便是用来操作MySQL数据库的pymysql。pykafka库的语法和pymysql很像,不过福哥以为Kafka和MySQL之间没有什么可比性,以是福哥选择了其余一个库。
福哥是用kafka-python这个库来操作Kafka的,这个库的设计和我们之前学习的PHP措辞的rdkafka以及Java措辞的KafkaConsumer/KafkaProducer工具很相似,学习起来更加舒畅。

直接通过pip安装kafka-python库就可以。
pipinstallkafka-python
3. 利用
3.1 消费者
3.1.1 观点
低级消费者模式便是一个人接了一个项目,所有事情都要自己一点点做。如果当天没有做完,我会记录做到第几件了,然后第二天来了连续做下面的事情。
3.1.2 代码
低级消费者示例代码,Python作为掌握台程序运行消费者真是太得当了。
fromkafkaimportKafkaConsumertopic="test"group_id="lowLevel"bootstrap_servers="192.168.2.168:9092"consumer=KafkaConsumer(bootstrap_servers=bootstrap_servers,group_id=group_id,auto_offset_reset="latest")consumer.subscribe([topic])whileTrue:msg=consumer.poll(timeout_ms=12000)forrecordsinmsg.values():foriinrange(0,len(records)):record=records[i]print(record.value.decode("utf-8"))
3.1.3 效果
3.2 生产者
3.2.1 代码
生产者示例代码。
fromkafkaimportKafkaProducerimporttimetopic="test"bootstrap_servers="192.168.2.168:9092"producer=KafkaProducer(bootstrap_servers=bootstrap_servers)msg="福哥说:现在是"+time.strftime("%H:%M:%S",time.localtime())print(msg)producer.send(topic,msg)
4. 踩坑
4.1 消费者
4.1.1 壅塞进程
编写消费者代码的时候,福哥特殊不建议利用如下两种办法,由于它会壅塞进程,按Ctrl+C都退不出来。
for办法
formsginconsumer:forrecordsinmsg.values():foriinrange(0,len(records)):record=records[i]print(record.value.decode("utf-8"))
next办法
whileTrue:msg=next(consumer)forrecordsinmsg.values():foriinrange(0,len(records)):record=records[i]print(record.value.decode("utf-8"))
5. 官方文档
5.1 消费者
这是官方的消费者文档
KafkaConsumer — kafka-python 2.0.2-dev documentation
5.2 生产者
这是官方的生产者文档
KafkaProducer — kafka-python 2.0.2-dev documentation
6. 总结本日福哥带着童鞋们学习了利用Python措辞连接操作Kafka系统的方法,Python操作Kafka有pykafka和kafka-python两种办法,大家有空可以自行学习一下pykafka库的利用方法,然后比拟一下,看看是不是和福哥有着一样的不雅观点。
https://m.tongfu.net/home/35/blog/513273.html