迭代器
迭代器是访问集合元素的一种方式,迭代器从对象的第一个元素开始访问,知道所有元素被访问完成。迭代器只能往前访问,不能通过索引访问。
类型内部使用__iter__()方法转为迭代器,使用__next__()取值。
特点:
- 访问者不需要关心迭代器内部的结构,仅需通过next()方法不断去取下一个内容
- 不能随机访问集合中的某个值 ,只能从头到尾依次访问
- 访问到一半时不能往回退
- 便于循环比较大的数据集合,节省内存
迭代器协议
对象必须提供一个__next__方法执行该方法要么返回迭代中的下一项,要么返回一个Stopiteration异常以终止迭代
迭代对象
实现了迭代协议(for循环机制就基于可迭代协议)
s_str = "hello world!"
# 使用__iter__()方法转变为可迭代对象
s_str_iter = s_str.__iter__()
print(s_str_iter)
# 使用可迭代对象里的next方法取值 (一直取到报异常:StopIteration)
print(s_str_iter.__next__())
# print(s_str_iter.__next__())
# print(s_str_iter.__next__())
# print(s_str_iter.__next__())
# print(s_str_iter.__next__())
# print(s_str_iter.__next__())
# print(s_str_iter.__next__())
# print(s_str_iter.__next__())
# print(s_str_iter.__next__())
# print(s_str_iter.__next__())
# print(s_str_iter.__next__())
# print(s_str_iter.__next__())
# print(s_str_iter.__next__())
# print(s_str_iter.__next__())
# print(s_str_iter.__next__()) # for 循环本质 1:先执行__iter__()转化为可迭代对象(遵循迭代协议) 2:然后依次执行__next__()方法 3:最后捕捉StopIteration异常 dic = {"name": "张三", "age": 18, "sex": "男"}
dic_iter = dic.__iter__()
print(dic_iter.__next__())
print(dic_iter.__next__()) # 模拟for循环输出
str = "1232456"
str_iter = str.__iter__() while True:
try:
# print(str_iter.__next__())
# 系统next()放和可迭代对象内部的__next__方法是一样的
print(next(str_iter))
except Exception as e:
print(e)
break
生成器
·一个函数调用时返回一个迭代器,那么这个函数叫做生成器,如果函数中包含yield,那么这个函数就叫做生成器(自动生成或实现了可迭代协议),生成器的本质就是迭代器
1:生成器函数:内部使用了yield关键字 2:生成器表达式:列表推导式,返回的就是生成器
yield关键字
def func():
yield 1
yield 2
yield 3
yield 4
# 定义生成器
def my_rang(start, stop, step=0):
while start <= stop:
yield start + step
start += 1 # 获取生成器
number = my_rang(1, 7)
# 依次获取值
# print(number.__next__())
# print(number.__next__())
# print(number.__next__())
# print(number.__next__())
# print(number.__next__())
# print(number.__next__())
# print(number.__next__())
# 便利获取值
for i in my_rang(1, 7):
print(i)
print("========================生成100个包子案例===========================") def product_baozi():
li = []
for i in range(100):
li.append("包子%s" % i)
return li l = product_baozi()
print(list(l)) def product_pro():
for i in range(100):
print('正在生产包子.....')
yield '以产包子%d' % i
print('正在售卖包子') p_pro = product_pro()
print(p_pro.__next__())
print(p_pro.__next__())
print(p_pro.__next__()) # 生成器效率高 内存占用小
# yield总结:
# 1、把函数做成迭代器
# 2、对比return,可以返回多次值,可以挂起/保存函数的运行状态 print("=========================人口普查案例=======================") # def get_population():
# res =[]
# with open("../files/population", "r", encoding="utf-8") as f:
# for i in f:
# res.append(i)
# return res
#
# p_gen = get_population()
# print(p_gen) def get_population():
with open("../files/population", "r", encoding="utf-8") as f:
for i in f:
yield i # 获取生成器
p_gen = get_population() # 统计总人口数
count = sum(eval(i)["population"] for i in p_gen)
print(count) ppp = get_population()
for i in ppp:
print("%s 占统计人口的 %d %%" % (eval(i)['city'], count // eval(i)['population'])) print('=========================跑步案例=======================') def run():
i = 0
while True:
time.sleep(1)
print('from run')
yield i
i += 1 f = run()
t = 0;
while t <= 1:
print(f.__next__())
t += 1
else:
print("运行完成") # 几乎不占内存
su = sum(i for i in range(100))
print(su) # 三元表达式
age = 10
t = True if age > 10 else False # 列表推导式
lit = [i for i in range(100) if i % 2 == 0]
print(lit) def test():
for i in range(4):
yield i t = test()
t1 = (i for i in t)
t2 = (i for i in t1)
print(list(t1))
# t2为空,因为t2取自t1 ,在上面t1已经打印并且释放完了,所以t2为空
print(list(t2))
生产者和消费者
import time # 生产包子
def consumer(name):
print("我是【%s】,我准备开始吃包子了" % name)
while True:
baozi = yield
time.sleep(1)
print("%s 很开心把【%s】吃掉了" % (name, baozi)) # 消费包子
def producer():
c1 = consumer("张三")
c2 = consumer("李四")
c2.__next__()
c1.__next__()
for i in range(10):
time.sleep(1)
c1.send('包子 %s' % i)
c2.send('包子 %s' % i) producer()
装饰器
本质就是函数,为其它函数添加附加功能。2个原则,1:不修改被修饰的源代码,2:不修改被修饰函数的调用方式。
装饰器 = 高阶函数 + 函数嵌套+ 闭包
函数对象
#1 可以被引用
· #2 可以当作参数传递
#3 返回值可以是函数
#3 可以当作容器类型的元素
函数嵌套
在函数内部再次定义函数
def text1():
def text2():
print('我是在text1函数内部定义的text2函数。')
text2()
text1()
高阶函数
函数接收的参数是一个函数名称或者函数的返回值是一个函数名
def text1():
def text2():
print('我是在text1函数内部定义的text2函数。') return text2 # 返回text2函数内存地址 # 调用 # text1()()
# text1() 是调用text返回的都是text2的地址,
t = text1()
print(t)
# 真正调用函数
t()
闭包
在第五篇函数中有讲到.......
装饰器代码案例
为一个函数添加一个运行时间计算的方法
def text(func):
def text1():
start_time = time.time()
func()
end_time = time.time()
print("foo函数运行时间为:%s" % (end_time - start_time)) return text1 def foo():
print('来自foo函数运行模块....') foo = text(foo)
foo()
@装饰器名称 ----------》python语法糖
def text(func):
def text1():
start_time = time.time()
func()
end_time = time.time()
print("foo函数运行时间为:%s" % (end_time - start_time)) return text1 @text # foo = text(foo)
def foo():
print('来自foo函数运行模块....') # @text 相当于foo = text(foo)
# foo = text(foo)
# foo()
# 为改变调用方式
foo()
装饰器带参数、返回值
from 数据库操作.MySqlHelper import MySqlHelper
from 数据库操作.RedisHelper import RedisHelper
from hashlib import sha1 ## 装饰器返回值
# def text1(func):
# def text2():
# func()
# return "text2返回值"
# return text2
#
# @text1
# def text():
# print('这是text1')
#
# t = text()
# print(t) # # 装饰器带参数
# def text1(func):
# def text2(a,b):
# func(a,b)
# return "text2返回值"
# return text2
#
# @text1
# def text(a,b):
# print('这是text1')
# print("a+b=%s"%(a+b))
#
# t = text(13,14)
# print(t) # 装饰器带参数
def text1(func):
def text2(*args, **kwargs):
func(*args, **kwargs)
return "text2返回值" return text2 @text1
def text(a, b):
print('这是text1')
print("a+b=%s" % (a + b)) text(15, 18)
# t = text(13, 14)
# print(t)
装饰器综合案例
不同数据库的登录验证
# -*- coding: utf-8 -*- # 声明字符编码
# coding:utf-8
from 数据库操作.MySqlHelper import MySqlHelper
from 数据库操作.RedisHelper import RedisHelper
from hashlib import sha1 print("========================验证功能========================") # 定义登录返回的消息格式
result_msg = {"success": False, 'message': ''} # 定义文件读取生成器
def file_yeid():
with open("../files/users", 'r', encoding='utf-8') as f:
for row in f:
yield row # file登录验证
def file_login(user_name, user_pwd):
# 获取文件生成器
users = file_yeid()
for row in users:
try:
user = eval(row)
print(user)
if user['user_name'] == user_name:
if user['user_pwd'] == user_pwd:
result_msg['success'] = True
result_msg['message'] = '登录成功'
else:
result_msg['success'] = False
result_msg['message'] = '密码错误'
break
except Exception as e:
print('账号文件内容异常。。。。')
else:
result_msg['success'] = False
result_msg['message'] = '用户名不存在'
return result_msg # redis登录验证
def redis_login(user_name, user_pwd):
redis = RedisHelper(host="118.24.53.196", port=6379)
name_value = redis.get(user_name)
print("redis用户名:%s"%name_value)
if name_value is None:
result_msg['success'] = False
result_msg['message'] = '用户名不存在'
else:
name_value = name_value.decode("utf8")
if name_value == user_pwd:
result_msg['success'] = True
result_msg['message'] = '登录成功'
else:
result_msg['success'] = False
result_msg['message'] = '密码错误'
return result_msg # MySql登录验证
def mysql_login(user_name, user_pwd):
# 从数据库查询
sql_helper = MySqlHelper(host='127.0.0.1', user='admin', password='123456', db='python')
sql = "SELECT * FROM users WHERE uname = %s"
params = [user_name]
user_info = sql_helper.fetchall(sql, params)
print(user_info)
if user_info is None:
result_msg['success'] = False
result_msg['message'] = '用户名不存在'
elif user_info[0][2] == user_pwd:
result_msg['success'] = True
result_msg['message'] = '登录成功' else:
result_msg['success'] = False
result_msg['message'] = '密码错误' return result_msg def login_verification(db_type):
def login_gener(func):
# 接收用户输入
user_name = input("请输入用户名:")
user_pwd = input("请输入密码:")
# 用户密码加密操作
s1 = sha1()
s1.update(user_pwd.encode('utf-8'))
pwd_sha = s1.hexdigest()
print("密码加密后:%s" % pwd_sha) def login(*args, **kwargs): resut = ""
if db_type == 'file_db':
resut = file_login(user_name, pwd_sha)
print('file登录信息:%s' % resut)
elif db_type == 'redis_db':
resut = redis_login(user_name, pwd_sha)
print('redis登录信息:%s' % resut)
elif db_type == 'mysql_db': resut = mysql_login(user_name, pwd_sha)
print('mysql登录信息:%s' % resut)
else:
print('暂无当前数据库类型的验证!!')
return
if resut['success'] == True:
func(user_name)
else:
func("游客") return login return login_gener @login_verification(db_type='redis_db')
def index(name):
print("欢迎[%s]来到首页。。。。。。。。。" % name) @login_verification(db_type='redis_db')
def home(name):
print("欢迎[%s]来到家目录" % name) # index("hehe")
home() # 单个测试
# print(file_login('lisi', 'c3be7cf2877085fed38e0ec93b009e547e2929e0'))
# print(redis_login('admin1', 'c3be7cf2877085fed38e0ec93b009e547e2929e'))
# print(mysql_login('admin','c3be7cf2877085fed38e0ec93b009e547e2929e0'))# 密码为:123
MySqlHelper:
# -*- coding: utf-8 -*- # 声明字符编码
# coding:utf-8
import pymysql class MySqlHelper(object): def __init__(self, host, user, password, db, charset='utf8', port=3306):
""" 初始化 """
self.host = host , # 服务器名称
self.port = port # 3306, 3306
self.user = user , # 用户名
self.password = password , # 密码
self.db = db #'python', # 数据库名称
self.charset = charset #'utf8' # 字符编码
self.conn = None
self.cursor = None def connect(self):
"""链接数据库"""
db_config = {
'host': self.host, # 服务器名称
'port': self.port, # 3306
'user': self.user, # 用户名
'password': self.password, # 密码
'db': self.db, # 数据库名称
'charset': self.charset # 字符编码
}
self.conn = pymysql.connect(**db_config)
self.cursor = self.conn.cursor() def close(self):
"""关闭数据库"""
self.cursor.close()
self.conn.close() def execute(self, sql, params=[]):
"""执行数据库 增删改"""
try:
self.connect()
self.cursor.execute(sql, params)
self.conn.commit()
self.close()
print("OK")
except Exception as e:
print("异常信息如下:")
print("%s" % e) def fetchall(self, sql, params=[]):
"""数据查询"""
try:
self.connect()
self.cursor.execute(sql, params)
result = self.cursor.fetchall()
self.close()
return result
except Exception as e:
print('%s' % e)
RedisHelper
# -*- coding: utf-8 -*- # 声明字符编码
# coding:utf-8 from redis import *
# 这种连接不需要指定数据库及密码
# = StrictRedis(host="127.0.0.1", port=6379) class RedisHelper():
def __init__(self, host, port):
self.host = host
self.port = port
self.__redis = StrictRedis(host, port) def set(self, key, value):
self.__redis.set(key, value) def get(self, key):
return self.__redis.get(key)