概述
Kafka 是一个开源的分布式流处理平台,其简化了不同数据系统的集成。流指的是一个数据管道,应用能够通过流不断地接收数据。Kafka 作为流处理系统主要有两个用处:
- 数据集成: Kafka 捕捉事件流或数据变化流,并将这些数据送给其它数据系统,如关系型数据库,键值对数据库或者数据仓库。
- 流处理:Kafka接收事件流并保存在一个只能追加的队列里,该队列称为日志(log)。日志里的信息是不可变的,因此支持连续实时的数据处理和流转换,并使结果在系统级别可访问。
相比于其它技术,Kafka 拥有更高的吞吐量,内置分区,副本和容错率。这些使得 Kafka 成为大规模消息处理应用的良好解决方案。
Kafka 系统有三个主要的部分:
- 生产者(Producer): 产生原始数据的服务。
- 中间人(Broker): Kafka 是生产者和消费者之间的中间人,它使用API来获取和发布数据。
- 消费者(Consumer): 使用中间人发布的数据的服务。
安装模块
python连接kafka的标准库,kafka-python和pykafka。kafka-python使用的人多是比较成熟的库,kafka-python并没有zk的支持。pykafka是Samsa的升级版本,使用samsa连接zookeeper,生产者直接连接kafka服务器列表,消费者才用zookeeper。
生产与消费实例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
|
''''' 使用kafka-Python 1.3.3模块 # pip install kafka==1.3.5 # pip install kafka-python==1.3.5 '''
import sys import time import json
from kafka import KafkaProducer from kafka import KafkaConsumer from kafka.errors import KafkaError
KAFAKA_HOST = "10.151.160.247" KAFAKA_PORT = 9092
KAFAKA_TOPIC = "test_kafka"
class Kafka_producer(): ''''' 生产模块:根据不同的key,区分消息 '''
def __init__(self, kafkahost,kafkaport, kafkatopic, key): self.kafkaHost = kafkahost self.kafkaPort = kafkaport self.kafkatopic = kafkatopic self.key = key print("producer:h,p,t,k",kafkahost,kafkaport,kafkatopic,key) bootstrap_servers = '{kafka_host}:{kafka_port}'.format( kafka_host=self.kafkaHost, kafka_port=self.kafkaPort ) print("boot svr:",bootstrap_servers) self.producer = KafkaProducer(bootstrap_servers = bootstrap_servers )
def sendjsondata(self, params): try: parmas_message = json.dumps(params,ensure_ascii=False) producer = self.producer print(parmas_message) v = parmas_message.encode('utf-8') k = key.encode('utf-8') print("send msg:(k,v)",k,v) producer.send(self.kafkatopic, key=k, value= v) producer.flush() except KafkaError as e: print(e)
class Kafka_consumer(): ''''' 消费模块: 通过不同groupid消费topic里面的消息 '''
def __init__(self, kafkahost, kafkaport, kafkatopic, groupid): self.kafkaHost = kafkahost self.kafkaPort = kafkaport self.kafkatopic = kafkatopic self.groupid = groupid self.key = key self.consumer = KafkaConsumer(self.kafkatopic, group_id = self.groupid, bootstrap_servers = '{kafka_host}:{kafka_port}'.format( kafka_host=self.kafkaHost, kafka_port=self.kafkaPort ) )
def consume_data(self): try: for message in self.consumer: yield message except KeyboardInterrupt as e: print(e)
def main(xtype, group, key): ''''' 测试consumer和producer ''' if xtype == "p": producer = Kafka_producer(KAFAKA_HOST, KAFAKA_PORT, KAFAKA_TOPIC, key) print("===========> producer:", producer) for _id in range(100): params = '{"msg" : "%s"}' % str(_id) params=[{"msg0" :_id},{"msg1" :_id}] producer.sendjsondata(params) time.sleep(1)
if xtype == 'c': consumer = Kafka_consumer(KAFAKA_HOST, KAFAKA_PORT, KAFAKA_TOPIC, group) print("===========> consumer:", consumer) message = consumer.consume_data() for msg in message: print('msg---------------->k,v', msg.key,msg.value) print('offset---------------->', msg.offset)
if __name__ == '__main__': xtype = sys.argv[1] group = sys.argv[2] key = sys.argv[3] main(xtype, group, key) print(sys.argv[1])
|
生产消息
1
| python test_kafka.py p g k
|
消费消息
1
| python test_kafka.py c g k
|
以消费实例说明
代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| import json, demjson import numpy as np
from kafka import KafkaConsumer from kafka.errors import KafkaError
KAFAKA_HOST = "10.151.160.247" KAFAKA_PORT = 9092 KAFAKA_TOPIC = "log_fluent_health"
class Kafka_consumer(): ''''' 消费模块: 通过不同groupid消费topic里面的消息 '''
def __init__(self, kafkahost, kafkaport, kafkatopic, groupid): self.kafkaHost = kafkahost self.kafkaPort = kafkaport self.kafkatopic = kafkatopic self.groupid = groupid self.consumer = KafkaConsumer(self.kafkatopic, group_id = self.groupid, bootstrap_servers = '{kafka_host}:{kafka_port}'.format( kafka_host=self.kafkaHost, kafka_port=self.kafkaPort ) )
def consume_data(self): try: for message in self.consumer: yield message except KeyboardInterrupt as e: print(e)
def main(): consumer = Kafka_consumer(KAFAKA_HOST, KAFAKA_PORT, KAFAKA_TOPIC, 'g') print("===========> consumer:", consumer) message = consumer.consume_data() for msg in message: print('msg---------------->k,v', msg.key, msg.value) print('offset---------------->', msg.offset)
|
运行结果:
1 2 3 4
| offset----------------> 18215735556 msg---------------->k,v None b'{"traceId":-2892788567737060969,"deploy_env":null,"baggage":{},"tag_name":"monitor.monitor_soa_rpc","ip":"127.0.0.1","operationName":"RPC","init_id":"1544976000925-localhost-70","parentId":0,"tags":{"providerip":"10.151.31.130:8095","role":"consumer","method":"collect","sampler.type":"probabilistic","sampler.param":1.0,"concurrent":0,"consumerip":"10.151.31.138","interface":"com.yeepay.g3.utils.soa.service.NetworkMonitorService","totalCount":2,"elapsed":8,"output":0,"max.concurrent":1,"input":0,"port":0,"failure":0,"success":2,"max.elapsed":5,"max.input":0,"max.output":0,"capplication":"yqt-cashier-app"},"spanId":-2892788567737060969,"hostname":"localhost","delay":90,"appname":"soa","linenum":93022,"id":null,"time":"2018-12-17T08:35:52.415"}'
...
|
从运行结果可以看出msg.value
类型为byte
型。因实际需求,msg.value
需要保存为json
格式文件。此时,我们联想到python中json
文件处理涉及的四个函数json.dumps()
和json.loads()
、json.dump()
和json.load()
。它们的区别:
- json.dumps( ):函数是将 Python 对象编码成 JSON 字符串。字典转化为字符串可用到此函数。
- json.loads( ):函数是把 JSON字符串串变成 python 的数据类型。字符串转化为字典可用到此函数。
- json.dump( ):函数是可以将 Python 对象编码成 JSON 字符串,并写入到JSON文件中。
- json.load( ):读取JSON文件。
1 2 3 4 5
| for msg in message: print('msg---------------->k,v', msg.key,msg.value) print('offset---------------->', msg.offset) msgs = json.dumps(msg) print(msgs)
|
运行结果出错:
1
| TypeError: Object of type 'bytes' is not JSON serializable
|
因为json.dumps函数发现字典里面有bytes类型的数据,因此无法编码,只要在编码函数之前写一个编码类就行了,只要检查到了是bytes类型的数据就把它转化成str类型。
1 2 3 4 5 6 7 8
| class MyEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, bytes): return str(obj, encoding='utf-8'); return json.JSONEncoder.default(self, obj) for msg in message: ff = open('info.json', 'w', encoding='utf-8') json.dump(msg.value, ff, cls=MyEncoder,indent=4,ensure_ascii=False)
|
json.dumps
序列化时对中文默认使用的ascii编码若出现中文字符,指定ensure_ascii=False
可以输出中文。indent
代码缩进,对json
进行数据格式化输出。cls
是class
方法。
查看info.json
文件,发现字符串转义,出现了”\“。
1 2 3
| "{\"traceId\":2851729268110980367,\"deploy_env\":null,\"baggage\":{},\"tag_name\":\"monitor.monitor_yop_center\",\"ip\":\"180.30.17.13\",\"operationName\":\"REQUEST\",\"data_center\":\"QA\",\"init_id\":\"1545013900874-yop-center-z-554ccdbf8b-mjd2w-1\",\"parentId\":0,\"tags\":{\"apiGroupCode\":\"bankfront-hessian\",\"apiUri\":\"\\/rest\\/v1.0\\/bankfront-hessian\\/SyncTradeOrder\",\"sampler.type\":\"probabilistic\",\"requestId\":\"68c492a3-695d-40b5-b6e2-7ae2d8852837\",\"sampler.param\":1.0,\"requestMethod\":\"POST\",\"guid\":\"425b41eea8d2419ab265ddbbcba52d7b\",\"serverIp\":\"180.30.17.13\",\"requestIp\":\"10.151.32.230\",\"appKey\":\"OPR:10040040286\",\"customerNo\":\"\"},\"spanId\":2851729268110980367,\"hostname\":\"yop-center-z-554ccdbf8b-mjd2w\",\"delay\":3,\"appname\":\"yop-center\",\"linenum\":2509802,\"id\":null,\"time\":\"2018-12-17T08:49:08.524\"}"
...
|
为解决转义的反斜杠”\“,使json
能被正常解析(不带转义字符),在网上查找到相应的方法。
- 使用eval函数
eval
功能:将字符串str当成有效的表达式来求值并返回计算结果。
语法: eval(source[, globals[, locals]]) -> value
参数:
source:一个Python表达式或函数compile()返回的代码对象
globals:可选。必须是dictionary
locals:可选。任意map对象
实例如下:
1 2 3 4
| str = """{\\"age\\":18}""" str1 = eval("'{}'".format(str)) print(str1) print(json.loads(str2))
|
1 2 3 4
| str = """{\\"age\\":18}""" str2 = re.sub(r'\\','',str) print(str2) print(json.loads(str2))
|
将这两种方法运用到“消费实例”中,控制台输出字符串被正确转义,但是json
文件仍出现”\“,因此需另外找方法。无意中看到python 自带模块demjson
。这个方法可以很方便的把dict
,list
等json
格式的数据编码成字符串和把字符串解码还原为json
。实例如下:
1 2 3 4 5 6 7 8 9 10
| import demjson data = [ { 'a' : 1, 'b' : 2, 'c' : 3, 'd' : 4, 'e' : 5 } ] json = demjson.encode(data) data2 = demjson.decode(json) print(type(data)) print(type(json)) print(type(data2)) print(json) print(data2)
|
运行结果:
1 2 3 4 5
| <class 'list'> <class 'str'> <class 'list'> [{"a":1,"b":2,"c":3,"d":4,"e":5}] [{'a': 1, 'b': 2, 'c': 3, 'd': 4, 'e': 5}]
|
将此方法运用到“消费实例”中
1 2 3 4
| for msg in message: fw = open('info1.json', 'w', encoding='utf-8') jsontest = demjson.decode(msg.value) json.dump(jsontest, fw, indent=4, ensure_ascii=False)
|
查看info1.json
文件,json
被正确解析。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| { "traceId": 2851729268110980367, "deploy_env": null, "baggage": {}, "tag_name": "monitor.monitor_yop_center", "ip": "180.30.17.13", "operationName": "REQUEST", "data_center": "QA", "init_id": "1545013900874-yop-center-z-554ccdbf8b-mjd2w-1", "parentId": 0, "tags": { "apiGroupCode": "bankfront-hessian", "apiUri": "/rest/v1.0/bankfront-hessian/SyncTradeOrder", "sampler.type": "probabilistic", "requestId": "68c492a3-695d-40b5-b6e2-7ae2d8852837", "sampler.param": 1.0, "requestMethod": "POST", "guid": "425b41eea8d2419ab265ddbbcba52d7b", "serverIp": "180.30.17.13", "requestIp": "10.151.32.230", "appKey": "OPR:10040040286", "customerNo": "" },
}
|