【python】——sql模拟

一.作业需求:

【python】——sql模拟

当然此表你在文件存储时可以这样表示

1,Alex Li,22,13651054608,IT,2013-04-01

现需要对这个员工信息文件,实现增删改查操作

  1. 可进行模糊查询,语法至少支持下面3种:
    1.   select name,age from staff_table where age > 22
    2.   select  * from staff_table where dept = "IT"
    3. select  * from staff_table where enroll_date like "2013"
    4. 查到的信息,打印后,最后面还要显示查到的条数
  2. 可创建新员工纪录,以phone做唯一键,staff_id需自增
  3. 可删除指定员工信息纪录,输入员工id,即可删除
  4. 可修改员工信息,语法如下:
    1.   UPDATE staff_table SET dept="Market" WHERE where dept = "IT"

 注意:以上需求,要充分使用函数,请尽你的最大限度来减少重复代码!

二.流程图

【python】——sql模拟

三.代码

1.数据库文件

name:mysql.user

,陈浩元,,,运维,--
,刘阔,,,运维,--
,夏婉婉,,,测试工程师,--
,彭思龙,,,测试攻城狮,--
,党禾迪,,,运维,--
,孙京国,,,运维工程师,--
,王冬,,,运维,--
,章曾,,,运维,--
,陈俊明,,,运维背锅者,--
,戴成旭,,,运维,--
,肖静,, ,None,--
,王振山,,,运维,--
,韩韬,, ,系统集成,--
,韩钰柯,,,运维, --
,王兴平,,,运维工程师,--
,崔晓娜,,,运维,--
,田晨超,,,Linux爱好者,--
,张志凡,,,运维工程师,--
,曾志高翔,,,运维,--
,翁建伟,,,无,--
,王华强,,,测试,--
,蔡志亮,,,运维,--
,颜明建,,,打杂,--
,李毅,,,QA,--
,刘振涛,,,高级运维工程师,--
,李善超,,,网络安全,--
,金煜东,,,运维,--
,许晨阳,,,运维,--
,孙克东,,,运维工程师,--
,王朝,,,DBA,--
,段晓冲,,,运维,--
,江帆,,,运维,--
,李众,,,运维工程师,--
,何金刚,,,运维工程师,--
,张美玲,,,linux运维,--
,唐磊,,,测试,--
,黄骁瀚,,,系统工程师,--
,李英琦,,,运维,--
,刘亮亮,,,运维,--
,赵宏宇,,,Linux运维工程师,--
,黄宏伟,,,运维,--
,何玲,, ,数据分析师,--
,杨建波,,,系统工程师,--
,倪耀毅,,,自动化测试,--
,乔路,,,运维,--
,赵凯,,,运维工程师,--

2.代码

#!/usr/bin/env python
#-*- coding:utf-8 -*-
#Author:__Json.Zzgx__
import os
def sql_cmd(sql):
parse_func={
'insert':insert_cmd,
'delete':delete_cmd,
'update':update_cmd,
'select':select_cmd,
}
sql_l=sql.split(' ')
func=sql_l[0]
res=''
if func in parse_func:
res=parse_func[func](sql_l)
return res def insert_cmd(sql_1):
sql_dic={
'func':insert,
'into':[],
'values':[]
}
return handle_cmd(sql_1,sql_dic) def delete_cmd(sql_1):
sql_dic={
'func':delete,
'from':[],
'where':[]
}
return handle_cmd(sql_1,sql_dic) def update_cmd(sql_1):
sql_dic={
'func':update,
'update':[],
'set':[],
'where':[]
}
return handle_cmd(sql_1,sql_dic)
def select_cmd(sql_1):
sql_dic={
'func':select,
'select':[],
'from':[],
'where':[],
'limit':[],
}
return handle_cmd(sql_1,sql_dic) def handle_cmd(sql_l,sql_dic):
tag=False
for item in sql_l:
if item in sql_dic:
tag=True
key=item
if tag and item not in sql_dic:
sql_dic[key].append(item)
if sql_dic.get('where'):
sql_dic['where']=where_cmd(sql_dic.get('where'))
return sql_dic def where_cmd(where_l):
res=[]
key=['and','or','not']
char=''
for i in where_l:
if i not in key:
char=char+i
if i in key:
char=three_cmd(char)
res.append(char)
res.append(i)
char=''
char = three_cmd(char)
res.append(char)
return res def three_cmd(str):
key=['>','<','=']
res=[]
char=''
for i in str:
if i not in key:
char += i
if i in key:
res.append(char)
res.append(i)
char=''
res.append(char)
if len(res) == 1:
res=res[0].split('like')
res.insert(1,'like')
return res def sql_action(sql_dic):
return sql_dic["func"](sql_dic) def select(sql_dic):
db=sql_dic["from"][0].strip()
f=open("%s" %(db),"r",encoding="utf-8")
where_res=where_action(f,sql_dic["where"])
f.close()
limit_res=limit_action(sql_dic["limit"],where_res)
select_res=select_action(sql_dic["select"],limit_res)
return select_res def insert(sql_dic):
try:
ph=sql_dic["values"][0].strip("'").split(",")[2]
ph=sql_dic["values"][0].strip('"').split(",")[2]
except:
pass
sql="select * from mysql.user where phone = %s" %ph
a=sql_action(sql_cmd(sql))[1]
if len(a):
return "你的手机号%s已存在,更换其他手机号!" %ph
else:
id=1
db=sql_dic["into"][0].strip()
with open("%s" %db,"ab+") as f ,\
open("%s"%db,'r', encoding="utf-8") as f1:
for line in f1:
id=id+1
try:
record=sql_dic["values"][0].strip("'").split(",")
record=sql_dic["values"][0].strip('"').split(",")
except:
pass
record.insert(0,str(id))
record_new=",".join(record)+"\n"
f.write(record_new.encode("utf-8"))
f.flush()
return "%s插入成功" %record_new def delete(sql_dic):
db=sql_dic["from"][0].strip()
filename="%s" %(db)
filename_new=db+"_new"
file=open(filename,'r',encoding="utf-8")
where_res=where_action(file,sql_dic["where"])
file.close()
if len(where_res)==0:
return "删除记录不存在!"
else:
tmp=[]
for line in where_res:
tmp.append(line.strip())
with open(filename,'r',encoding="utf-8") as f,\
open(filename_new,'w',encoding="utf-8") as f1:
id = 0
for f_line in f:
if f_line.strip() in tmp:
continue
else:
id=1+id
con = f_line.strip("'").split(",")
con = f_line.strip('"').split(",")
con[0]=str(id)
content=",".join(con)
f1.write(content)
f1.flush()
os.rename(filename,filename+"b")
os.rename(filename_new,filename)
os.remove(filename+"b")
return "共计删除%s条数据" %len(where_res) def update(sql_dic):
db=sql_dic["update"][0].strip()
name="%s" %(db)
name_new=db+"_new"
file=open(name,'r',encoding="utf-8")
records=where_action(file,sql_dic["where"])
records_new=[]
file.close()
set_tmp=sql_dic["set"]
data_tmp={}
for i in set_tmp[0].split(","):
if len(i):
s=i.split("=")
try:
data_tmp[s[0]]=str(s[1].strip("'"))
data_tmp[s[0]]=str(s[1].strip('"'))
except:
pass
if len(records)==0:
return "要修改的记录不存在!"
else:
for line in records:
dic={}
dic['id'],dic['name'],dic['age'],dic['phone'],dic['dept'],dic['date']=line.split(",")
for data_k in data_tmp:
if data_k in dic:
dic[data_k]=data_tmp[data_k]
str_tmp=[]
for k in dic:
str_tmp.append(dic[k])
records_new.append(str_tmp)
str_tmp=[]
for line in records:
str_tmp.append(line.strip())
with open(name,'r',encoding="utf-8") as f,\
open(name_new,'w',encoding="utf-8") as f1:
for f_line in f:
if f_line.strip() in str_tmp:
temp=records_new[str_tmp.index(f_line.strip())]
temp=",".join(temp)
f1.write(temp)
continue
f1.write(f_line)
f1.flush()
os.rename(name,name+"b")
os.rename(name_new,name)
os.remove(name+"b")
return "共计修改%s条数" %len(records) def where_action(f,where_sql):
res=[]
if len(where_sql):
for line in f:
dic={}
dic['id'], dic['name'], dic['age'], dic['phone'], dic['dept'], dic['date'] = line.split(",") logic_res=logic_action(dic,where_sql)
if logic_res:
res.append(line)
else:
res=f.readlines() return res def logic_action(dic,where_l):
res=[]
for i in where_l:
if type(i) is list:
a,o,b=i
if i[1] == '=':
o="%s=" %i[1]
dic_v=""
if dic[a].isdigit():
dic_v=int(dic[a])
b=int(b)
else:
dic_v="'%s'" %dic[a]
if o != 'like':
if type(b)==str:
try:
b= b.strip("'")
b= b.strip('"')
except:
pass
b="'%s'" %b
i=str(eval("%s%s%s" %(dic_v,o,b)))
else:
try:
b= b.strip("'")
b= b.strip('"')
except:
pass
if b in dic_v:
i='True'
else:
i='False'
res.append(i) res=eval(' '.join(res))
return res def limit_action(limit_sql,where_res):
if len(limit_sql)!=0:
index=int(limit_sql[0])
res=where_res[0:index]
else:
res=where_res
return res def select_action(select_sql,limit_res):
res=[]
select_field=select_sql
if select_sql[0]=="*":
res=limit_res
select_field=['id', 'name', 'age', 'phone', 'dept', 'date']
else:
for line in limit_res:
dic={}
dic['id'], dic['name'], dic['age'], dic['phone'], dic['dept'], dic['date'] = line.split(",")
r=[]
for field in select_field[0].split(","):
r.append(dic[field])
res.append(",".join(r))
return [select_field,res] if __name__ == '__main__':
print("表名:mysql.user,字段:id,name,age,phone,dept,date")
while True:
sql=input("sql> ").strip()
if sql == 'exit':break
if len(sql) == 0 :continue
sql_dic=sql_cmd(sql)
if len(sql_dic) == 0:continue
res=sql_action(sql_dic)
if type(res)==list:
for line in res[1]:
print(line.strip())
print("\n共计查询出%s条数据" %len(res[1]))
else:
print(res)

【开源是一种精神,分享是一种美德】

— By GoodCook

— 笔者QQ:253097001

— 欢迎大家随时来交流

—原创作品,允许转载,转载时请务必以超链接形式标明文章 原始出处 、作者信息和本声明。否则将追究法律责任。

上一篇:k8s(5)-拓展服务


下一篇:php中jpgraph库的使用