目录
Python生产消费Kafka实例

概述

Kafka 是一个开源的分布式流处理平台,其简化了不同数据系统的集成。流指的是一个数据管道,应用能够通过流不断地接收数据。Kafka 作为流处理系统主要有两个用处:

  • 数据集成: Kafka 捕捉事件流或数据变化流,并将这些数据送给其它数据系统,如关系型数据库,键值对数据库或者数据仓库。
  • 流处理:Kafka接收事件流并保存在一个只能追加的队列里,该队列称为日志(log)。日志里的信息是不可变的,因此支持连续实时的数据处理和流转换,并使结果在系统级别可访问。

相比于其它技术,Kafka 拥有更高的吞吐量,内置分区,副本和容错率。这些使得 Kafka 成为大规模消息处理应用的良好解决方案。

Kafka 系统有三个主要的部分:

  1. 生产者(Producer): 产生原始数据的服务。
  2. 中间人(Broker): Kafka 是生产者和消费者之间的中间人,它使用API来获取和发布数据。
  3. 消费者(Consumer): 使用中间人发布的数据的服务。

安装模块

python连接kafka的标准库,kafka-python和pykafka。kafka-python使用的人多是比较成熟的库,kafka-python并没有zk的支持。pykafka是Samsa的升级版本,使用samsa连接zookeeper,生产者直接连接kafka服务器列表,消费者才用zookeeper。

1
pip install kafka

生产与消费实例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# -*- coding: utf-8 -*-

'''''
使用kafka-Python 1.3.3模块
# pip install kafka==1.3.5
# pip install kafka-python==1.3.5
'''

import sys
import time
import json

from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError


KAFAKA_HOST = "10.151.160.247"
KAFAKA_PORT = 9092
#KAFAKA_TOPIC = "log_fluent_health"
KAFAKA_TOPIC = "test_kafka"


class Kafka_producer():
'''''
生产模块:根据不同的key,区分消息
'''

def __init__(self, kafkahost,kafkaport, kafkatopic, key):
self.kafkaHost = kafkahost
self.kafkaPort = kafkaport
self.kafkatopic = kafkatopic
self.key = key
print("producer:h,p,t,k",kafkahost,kafkaport,kafkatopic,key)
bootstrap_servers = '{kafka_host}:{kafka_port}'.format(
kafka_host=self.kafkaHost,
kafka_port=self.kafkaPort
)
print("boot svr:",bootstrap_servers)
self.producer = KafkaProducer(bootstrap_servers = bootstrap_servers
)

def sendjsondata(self, params):
try:
parmas_message = json.dumps(params,ensure_ascii=False)
producer = self.producer
print(parmas_message)
v = parmas_message.encode('utf-8')
k = key.encode('utf-8')
print("send msg:(k,v)",k,v)
producer.send(self.kafkatopic, key=k, value= v)
producer.flush()
except KafkaError as e:
print(e)

class Kafka_consumer():
'''''
消费模块: 通过不同groupid消费topic里面的消息
'''

def __init__(self, kafkahost, kafkaport, kafkatopic, groupid):
self.kafkaHost = kafkahost
self.kafkaPort = kafkaport
self.kafkatopic = kafkatopic
self.groupid = groupid
self.key = key
self.consumer = KafkaConsumer(self.kafkatopic, group_id = self.groupid,
bootstrap_servers = '{kafka_host}:{kafka_port}'.format(
kafka_host=self.kafkaHost,
kafka_port=self.kafkaPort )
)

def consume_data(self):
try:
for message in self.consumer:
yield message
except KeyboardInterrupt as e:
print(e)


def main(xtype, group, key):
'''''
测试consumer和producer
'''
if xtype == "p":
# 生产模块
producer = Kafka_producer(KAFAKA_HOST, KAFAKA_PORT, KAFAKA_TOPIC, key)
print("===========> producer:", producer)
for _id in range(100):
params = '{"msg" : "%s"}' % str(_id)
params=[{"msg0" :_id},{"msg1" :_id}]
producer.sendjsondata(params)
time.sleep(1)

if xtype == 'c':
# 消费模块
consumer = Kafka_consumer(KAFAKA_HOST, KAFAKA_PORT, KAFAKA_TOPIC, group)
print("===========> consumer:", consumer)
message = consumer.consume_data()
for msg in message:
print('msg---------------->k,v', msg.key,msg.value)
print('offset---------------->', msg.offset)
# msgs = json.dumps(msg)
# print(msgs)

if __name__ == '__main__':
xtype = sys.argv[1]
group = sys.argv[2]
key = sys.argv[3]
main(xtype, group, key)
print(sys.argv[1])

生产消息

1
python test_kafka.py p g k

消费消息

1
python test_kafka.py c g k

以消费实例说明

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import json, demjson
import numpy as np

from kafka import KafkaConsumer
from kafka.errors import KafkaError

KAFAKA_HOST = "10.151.160.247"
KAFAKA_PORT = 9092
KAFAKA_TOPIC = "log_fluent_health"


class Kafka_consumer():
'''''
消费模块: 通过不同groupid消费topic里面的消息
'''

def __init__(self, kafkahost, kafkaport, kafkatopic, groupid):
self.kafkaHost = kafkahost
self.kafkaPort = kafkaport
self.kafkatopic = kafkatopic
self.groupid = groupid
# self.key = key
self.consumer = KafkaConsumer(self.kafkatopic, group_id = self.groupid,
bootstrap_servers = '{kafka_host}:{kafka_port}'.format(
kafka_host=self.kafkaHost,
kafka_port=self.kafkaPort )
)

def consume_data(self):
try:
for message in self.consumer:
yield message
except KeyboardInterrupt as e:
print(e)

def main():
# 消费模块
consumer = Kafka_consumer(KAFAKA_HOST, KAFAKA_PORT, KAFAKA_TOPIC, 'g')
print("===========> consumer:", consumer)
message = consumer.consume_data()
for msg in message:
print('msg---------------->k,v', msg.key, msg.value)
print('offset---------------->', msg.offset)

运行结果:

1
2
3
4
offset----------------> 18215735556
msg---------------->k,v None b'{"traceId":-2892788567737060969,"deploy_env":null,"baggage":{},"tag_name":"monitor.monitor_soa_rpc","ip":"127.0.0.1","operationName":"RPC","init_id":"1544976000925-localhost-70","parentId":0,"tags":{"providerip":"10.151.31.130:8095","role":"consumer","method":"collect","sampler.type":"probabilistic","sampler.param":1.0,"concurrent":0,"consumerip":"10.151.31.138","interface":"com.yeepay.g3.utils.soa.service.NetworkMonitorService","totalCount":2,"elapsed":8,"output":0,"max.concurrent":1,"input":0,"port":0,"failure":0,"success":2,"max.elapsed":5,"max.input":0,"max.output":0,"capplication":"yqt-cashier-app"},"spanId":-2892788567737060969,"hostname":"localhost","delay":90,"appname":"soa","linenum":93022,"id":null,"time":"2018-12-17T08:35:52.415"}'

...

从运行结果可以看出msg.value类型为byte型。因实际需求,msg.value需要保存为json格式文件。此时,我们联想到python中json文件处理涉及的四个函数json.dumps()json.loads()json.dump()json.load()。它们的区别:

  • json.dumps( ):函数是将 Python 对象编码成 JSON 字符串。字典转化为字符串可用到此函数。
  • json.loads( ):函数是把 JSON字符串串变成 python 的数据类型。字符串转化为字典可用到此函数。
  • json.dump( ):函数是可以将 Python 对象编码成 JSON 字符串,并写入到JSON文件中。
  • json.load( ):读取JSON文件。
1
2
3
4
5
for msg in message:
print('msg---------------->k,v', msg.key,msg.value)
print('offset---------------->', msg.offset)
msgs = json.dumps(msg)
print(msgs)

运行结果出错:

1
TypeError: Object of type 'bytes' is not JSON serializable

因为json.dumps函数发现字典里面有bytes类型的数据,因此无法编码,只要在编码函数之前写一个编码类就行了,只要检查到了是bytes类型的数据就把它转化成str类型。

1
2
3
4
5
6
7
8
class MyEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, bytes):
return str(obj, encoding='utf-8');
return json.JSONEncoder.default(self, obj)
for msg in message:
ff = open('info.json', 'w', encoding='utf-8')
json.dump(msg.value, ff, cls=MyEncoder,indent=4,ensure_ascii=False)

json.dumps 序列化时对中文默认使用的ascii编码若出现中文字符,指定ensure_ascii=False可以输出中文。indent代码缩进,对json进行数据格式化输出。clsclass方法。
查看info.json文件,发现字符串转义,出现了”\“。

1
2
3
"{\"traceId\":2851729268110980367,\"deploy_env\":null,\"baggage\":{},\"tag_name\":\"monitor.monitor_yop_center\",\"ip\":\"180.30.17.13\",\"operationName\":\"REQUEST\",\"data_center\":\"QA\",\"init_id\":\"1545013900874-yop-center-z-554ccdbf8b-mjd2w-1\",\"parentId\":0,\"tags\":{\"apiGroupCode\":\"bankfront-hessian\",\"apiUri\":\"\\/rest\\/v1.0\\/bankfront-hessian\\/SyncTradeOrder\",\"sampler.type\":\"probabilistic\",\"requestId\":\"68c492a3-695d-40b5-b6e2-7ae2d8852837\",\"sampler.param\":1.0,\"requestMethod\":\"POST\",\"guid\":\"425b41eea8d2419ab265ddbbcba52d7b\",\"serverIp\":\"180.30.17.13\",\"requestIp\":\"10.151.32.230\",\"appKey\":\"OPR:10040040286\",\"customerNo\":\"\"},\"spanId\":2851729268110980367,\"hostname\":\"yop-center-z-554ccdbf8b-mjd2w\",\"delay\":3,\"appname\":\"yop-center\",\"linenum\":2509802,\"id\":null,\"time\":\"2018-12-17T08:49:08.524\"}"

...

为解决转义的反斜杠”\“,使json能被正常解析(不带转义字符),在网上查找到相应的方法。

  • 使用eval函数
    eval
      功能:将字符串str当成有效的表达式来求值并返回计算结果。
      语法: eval(source[, globals[, locals]]) -> value
      参数:
        source:一个Python表达式或函数compile()返回的代码对象
        globals:可选。必须是dictionary
        locals:可选。任意map对象
    实例如下:
1
2
3
4
str = """{\\"age\\":18}"""
str1 = eval("'{}'".format(str))
print(str1)
print(json.loads(str2))
  • 使用正则表达式:
1
2
3
4
str = """{\\"age\\":18}"""
str2 = re.sub(r'\\','',str)
print(str2)
print(json.loads(str2))

将这两种方法运用到“消费实例”中,控制台输出字符串被正确转义,但是json文件仍出现”\“,因此需另外找方法。无意中看到python 自带模块demjson。这个方法可以很方便的把dictlistjson格式的数据编码成字符串和把字符串解码还原为json。实例如下:

1
2
3
4
5
6
7
8
9
10
#coding:utf8
import demjson
data = [ { 'a' : 1, 'b' : 2, 'c' : 3, 'd' : 4, 'e' : 5 } ]
json = demjson.encode(data)
data2 = demjson.decode(json)
print(type(data))
print(type(json))
print(type(data2))
print(json)
print(data2)

运行结果:

1
2
3
4
5
<class 'list'>
<class 'str'>
<class 'list'>
[{"a":1,"b":2,"c":3,"d":4,"e":5}]
[{'a': 1, 'b': 2, 'c': 3, 'd': 4, 'e': 5}]

将此方法运用到“消费实例”中

1
2
3
4
for msg in message:
fw = open('info1.json', 'w', encoding='utf-8')
jsontest = demjson.decode(msg.value)
json.dump(jsontest, fw, indent=4, ensure_ascii=False)

查看info1.json文件,json被正确解析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
{
"traceId": 2851729268110980367,
"deploy_env": null,
"baggage": {},
"tag_name": "monitor.monitor_yop_center",
"ip": "180.30.17.13",
"operationName": "REQUEST",
"data_center": "QA",
"init_id": "1545013900874-yop-center-z-554ccdbf8b-mjd2w-1",
"parentId": 0,
"tags": {
"apiGroupCode": "bankfront-hessian",
"apiUri": "/rest/v1.0/bankfront-hessian/SyncTradeOrder",
"sampler.type": "probabilistic",
"requestId": "68c492a3-695d-40b5-b6e2-7ae2d8852837",
"sampler.param": 1.0,
"requestMethod": "POST",
"guid": "425b41eea8d2419ab265ddbbcba52d7b",
"serverIp": "180.30.17.13",
"requestIp": "10.151.32.230",
"appKey": "OPR:10040040286",
"customerNo": ""
},

}
文章作者: Kylen Chan
文章链接: https://booku.ltd/posts/python-kafka/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Kylen's Blog

评论