import redis import msgpack class rediscon(): def __init__(self,host,db = 0 ,pwd="",port=6379): self.conn = redis.Redis(host=host,db=db,password=pwd,port=port) @staticmethod def __conn(host,db = 0 ,pwd="",port=6379): r = redis.Redis(host=host,db=db,password=pwd,port=port) return r def redisKeys(self,key): return self.conn.keys(key) def redisGet(self,key): return self.conn.get(key) def redisDel(self,key): return self.conn.delete(key) def redisSet(self,primary,key): return self.conn.set(primary,key) def redisFlush(self): return self.conn.flushall() def redisSmembers(self,name): return self.conn.smembers(name) def redisExists(self,name): return self.conn.exists(name) def redisType(self,name): return self.conn.type(name) class redis_main(): tmp=[] def __init__(self,hostname): self.r = rediscon(host=hostname,pwd="test1:test123test") def check(self): self.tmp.clear() Input_name = input("请输入查询条件 >>>>") data = self.r.redisKeys(Input_name) self.tmp= data print("查询的数据>>>>" + str(self.tmp) + "\n") while True: Input_Select= input("1.查询到所有的内容解码 2.输入精准查询的内容 3.不解码 >>>>") if not Input_Select: continue if Input_Select == ‘1‘: conntent = [] conntent.clear() if not self.tmp: return self.tmp for i in self.tmp: i_data = self.r.redisType(i) if i_data.decode() == "string": data1 = self.r.redisGet(i) pack_name1 = msgpack.unpackb(data1) # print(str(i) +"读取redis的值: >>>" + str(pack_name1) + "\n" ) data1_bak=[i,pack_name1] conntent.append(data1_bak) elif i_data.decode() == "set": data2=self.r.redisSmembers(i) # print(str(i) + "读取set的值>>>" + str(data2)) data1_bak = [i, data2] conntent.append(data1_bak) else: # print(i_data.decode()) conntent.append(i_data.decode()) return conntent elif Input_Select == ‘2‘: conntent2 = [] conntent2.clear() Input_content=input("请输入要精准查询的内容 :>>>") data2_bak = [Input_content,self.cinfor(con=Input_content)] conntent2.append(data2_bak) return conntent2 elif Input_Select ==‘3‘: return "退出" def dinfor(self): data = input("1.删除当前查询的所有数据 , 2.清空redis所有数据 >>>") if data == ‘1‘: for i in self.tmp: self.r.redisDel(i) self.tmp.clear() return "delete done \n" elif data == ‘2‘: self.r.redisFlush() def ininfor(self): key = input("请输入键值>>>>") value = input("请输入vlue值>>>>") self.r.redisSet(key,value) def cinfor(self,con=None): i_data2 = self.r.redisType(con) if i_data2.decode() == "string": data_Select1 = self.r.redisGet(con) pack_name1 = msgpack.unpackb(data_Select1) # print(str(Input_content) + "读取redis的值: >>>" + str(pack_name1) + "\n") data2_bak = [con, pack_name1] return data2_bak elif i_data2.decode() == "set": data_Select1 = self.r.redisSmembers(con) # print(str(Input_content) + "读取set的值>>>" + str(data_Select1)) data3_bak = [con, data_Select1] return data3_bak else: # print(i_data2.decode()) return i_data2.decode() def main(hostname): msg= ">>>1.进入查询; 2.删除 3.插入 6.退出" run=redis_main(hostname) type_select={ ‘1‘: run.check, ‘2‘: run.dinfor, ‘3‘: run.ininfor, ‘4‘: run.cinfor, } while True: print(msg) name = input("输入内容: ") try: if not name: continue if name == ‘1‘: print(type_select[‘1‘]()) if name == ‘2‘: print(type_select[‘2‘]()) if name == ‘3‘: data_main1=input("请输入精准查询的key内容>>>") print(type_select[‘3‘](con=data_main1)) if name==‘6‘: break except Exception as e : error_name1 = "UnicodeError: label empty or too long" if error_name1 in str(e): print("请检查连接的地址,是否有问题") else: print(e) if __name__ == "__main__": main("localhost")