from confluent_kafka import Consumer, TopicPartition import time def str_to_timestamp(str_time, format_type='%Y-%m-%d %H:%M:%S'): time_array = time.strptime(str_time, format_type) time_int = int(time.mktime(time_array)) * 1000 return time_int def listdata_to_file(list_data, file='abnormal.logs'): with open(file, "w") as f: for line in list_data: f.write(line + '\n') KAFKASERVERS = 'xxxxxxxxxxxxx' GROUPNAME = 'xxxxxxxxxxx' c = Consumer({ 'bootstrap.servers': KAFKASERVERS, 'group.id': GROUPNAME, 'auto.offset.reset': 'earliest', 'session.timeout.ms': 6000, 'security.protocol': 'SASL_PLAINTEXT', 'sasl.mechanism': 'PLAIN', 'sasl.username': 'xxxxx', 'sasl.password': 'xxxxxxx', }) # 主题名 topic = 'test' str_time_begin = '2021-07-12 14:03:00' str_time_end = '2021-07-12 14:06:00' file_name = topic + str_time_begin.replace(" ", "-").replace(":", "-") # 获取当前topic存在多少个分区 cluster_data = c.list_topics(topic=topic) topic_data = cluster_data.topics[topic] available_partitions = topic_data.partitions # c.subscribe([topic]) # 把每个partition的offset设置到指定时间戳下,即获取大于改timestamp入库kafka的数据 # 注意这里的时间戳位数 timestamp_begin = str_to_timestamp(str_time_begin) timestamp_end = str_to_timestamp(str_time_end) tps = [TopicPartition(topic, tp, timestamp_begin) for tp in range(len(available_partitions))] offsets = c.offsets_for_times(tps) c.assign(offsets) list_data = [] while True: # 阻塞等待消息的最大时间 msg = c.poll(1.0) if msg is None: break if msg.error(): print("Consumer error: {}".format(msg.error())) continue # 获取该数据入kafka时间戳 kafka_timestamp = msg.timestamp()[1] #print(msg.partition()) #print(kafka_timestamp) if(kafka_timestamp > timestamp_end): break list_data.append(msg.value().decode('utf-8')) if len(list_data) > 5000: listdata_to_file(list_data, file=file_name) # 消费kafka相应数据 print('Received message: {}'.format(msg.value().decode('utf-8'))) listdata_to_file(list_data, file=file_name) c.unassign() c.close()