kafaka获取指定时间内的消息并写入指定文件

from confluent_kafka import Consumer, TopicPartition
import time


def str_to_timestamp(str_time, format_type='%Y-%m-%d %H:%M:%S'):
    time_array = time.strptime(str_time, format_type)
    time_int = int(time.mktime(time_array)) * 1000
    return time_int

def listdata_to_file(list_data, file='abnormal.logs'):
    with open(file, "w") as f:
        for line in list_data:
            f.write(line + '\n')

KAFKASERVERS = 'xxxxxxxxxxxxx'
GROUPNAME = 'xxxxxxxxxxx'

c = Consumer({
    'bootstrap.servers': KAFKASERVERS,
    'group.id': GROUPNAME,
    'auto.offset.reset': 'earliest',
    'session.timeout.ms': 6000,
    'security.protocol': 'SASL_PLAINTEXT',
    'sasl.mechanism': 'PLAIN',
    'sasl.username': 'xxxxx',
    'sasl.password': 'xxxxxxx',
})
# 主题名
topic = 'test'
str_time_begin = '2021-07-12 14:03:00'
str_time_end = '2021-07-12 14:06:00'
file_name = topic + str_time_begin.replace(" ", "-").replace(":", "-")


# 获取当前topic存在多少个分区
cluster_data = c.list_topics(topic=topic)
topic_data = cluster_data.topics[topic]
available_partitions = topic_data.partitions
# c.subscribe([topic])
# 把每个partition的offset设置到指定时间戳下,即获取大于改timestamp入库kafka的数据
# 注意这里的时间戳位数
timestamp_begin = str_to_timestamp(str_time_begin)
timestamp_end = str_to_timestamp(str_time_end)
tps = [TopicPartition(topic, tp, timestamp_begin) for tp in range(len(available_partitions))]
offsets = c.offsets_for_times(tps)
c.assign(offsets)
list_data = []
while True:
    # 阻塞等待消息的最大时间
    msg = c.poll(1.0)
    if msg is None:
        break
    if msg.error():
        print("Consumer error: {}".format(msg.error()))
        continue
    # 获取该数据入kafka时间戳
    kafka_timestamp = msg.timestamp()[1]
    #print(msg.partition())
    #print(kafka_timestamp)
    if(kafka_timestamp > timestamp_end):
        break
    list_data.append(msg.value().decode('utf-8'))
    if len(list_data) > 5000:
        listdata_to_file(list_data, file=file_name)
    # 消费kafka相应数据
    print('Received message: {}'.format(msg.value().decode('utf-8')))

listdata_to_file(list_data, file=file_name)
c.unassign()
c.close()

 

上一篇:LC981-基于时间的键值存储


下一篇:力扣 981. 基于时间的键值存储