引言
讲到协程,首先来介绍一下线程和协程的区别
lua协程和多线程
相同之处:拥有自己独立的桟、局部变量和PC计数器,同时又与其他协程共享全局变量和其他大部分东西
不同之处:一个多线程程序可以同时运行几个线程(并发执行、抢占),而协程却需要彼此协作地运行,并非真正的多线程,即一个多协程程序在同一时间只能运行一个协程,并且正在执行的协程只会在其显式地要求挂起(suspend)时,它的执行才会暂停(无抢占、无并发)。
注意:
Lua中的协程无法在外部将其停止,有可能导致程序阻塞
运行的是主线程时调用coroutine.yield()会报错LuaException: attempt to yield from outside a coroutine
协程函数的相关函数方法
方法 | 描述 | 返回值 |
---|---|---|
coroutine.create (f) | 创建一个主体函数为 f 的新协程。 f 必须是一个 Lua 的函数。 返回这个新协程,它是一个类型为 "thread" 的对象,和 resume 配合使用的时候就唤醒函数调用 | 返回它的控制器(一个对象为thread)的对象 |
coroutine.resume (co [, val1, ···]) | 使协同从挂起变为运行. (1)激活coroutine,也就是让协程函数开始运行;(2)唤醒yield,使挂起的协同接着上次的地方继续运行。该函数可以传入参数 | 运行成功返回true和前一个调用cyiled中传入的参数;反之false+错误信息 |
coroutine.yield (···) | 挂起正在调用的协程的执行。 传递给 yield 的参数都会转为 resume 的额外返回值。 |
返回resume中传入的参数 |
coroutine.status (co) | 以字符串形式返回协程 co 的状态: 当协程正在运行(它就是调用 status 的那个) ,返回 "running"; 如果协程调用 yield 挂起或是还没有开始运行,返回 "suspended"; 如果协程是活动的,都并不在运行(即它正在延续其它协程),返回 "normal"; 如果协程运行完主体函数或因错误停止,返回 "dead"。 | running,suspended,normal,dead |
coroutine.wrap (f) | 创建一个主体函数为 f 的新协程。 f 必须是一个 Lua 的函数。 返回一个函数, 每次调用该函数都会延续该协程。 传给这个函数的参数都会作为 resume 的额外参数。 和 resume 返回相同的值, 只是没有第一个布尔量。 如果发生任何错误,抛出这个错误。 | . |
coroutine.running() | 返回当前正在运行的协程加一个布尔量。 如果当前运行的协程是主线程,其为真。 | |
coroutine.isyieldable () | 如果正在运行的协程可以让出,则返回真。不在主线程中或不在一个无法让出的 C 函数中时,当前协程是可让出的。 |
下面举例来说明协程的简单创建使用,第一个是create,看下例子了解return参数的应用
print("coroutine start!");
--没有yield的协程
local newCor1 = coroutine.create(function()
return 1,"a"
end)
local ret1, num1, str1 = coroutine.resume(newCor1)
print("1-----", ret1, num1, str1)
--包含一个yield的协程,主要看参数相关
local newCor2 = coroutine.create(function(x)
x = x+10;
--str和y的值为resume传入的值
local str, y = coroutine.yield(x);
return str, y + x
end)
local ret2, x = coroutine.resume(newCor2, 50)
print("2-----", x)
local ret3, str2, y = coroutine.resume(newCor2, "sss", 100);
print("2-----", str2, y)
--输出如下:
coroutine start!
1----- true 1 a
2----- 60
2----- sss 160
接着可以看看wrap的方式,具体的区别在于,wrap的返回值是一个函数,所以唤起协程的时候不需要调用resume方法,直接调用wrap返回的函数即可。还有就是和resume返回的不同的是,返回值中少了协程是否成功运行的布尔值。
local newCor3 = coroutine.wrap(function(x)
x = x - 10;
local y = coroutine.yield(x);
return y;
end)
--不需要resume函数来唤起,直接调用wrap返回的值
local ret4 = newCor3(100);
print("3-----", ret4)
local ret5 = newCor3(10);
print("3-----", ret5)
3----- 90
3----- 10
继续体会一下用法。
co = coroutine.create(
function(i)
print(i);
end
)
print(coroutine.status(co)) --suspended
coroutine.resume(co, 1) -- 1
print(coroutine.status(co)) -- dead
coroutine.resume(co, 1)
print("----------")
print(coroutine.status(co)) --dead
print(coroutine.resume(co)) --false cannot resume dead coroutine
co = coroutine.wrap(
function(i)
print(i);
end
)
co(1)
print("----------")
co2 = coroutine.create(
function()
for i=1,10 do
print(i)
if i == 3 then
print(coroutine.status(co2)) --running
print(coroutine.running()) --thread:XXXXXX
end
coroutine.yield() --这里被挂起,也就是说for循环只会进行一次
end
end
)
coroutine.resume(co2) --1
coroutine.resume(co2) --2
coroutine.resume(co2) --3
print(coroutine.status(co2)) -- suspended
print(coroutine.running())
print("----------")
suspended
1
dead
----------
1
----------
1
2
3
running
thread: 0x7f9c12c07c78 false
suspended
thread: 0x7f9c13001008 true
----------
coroutine.running就可以看出来,coroutine在底层实现就是一个线程。
当create一个coroutine的时候就是在新线程中注册了一个事件。
当使用resume触发事件的时候,create的coroutine函数就被执行了,当遇到yield的时候就代表挂起当前线程,等候再次resume触发事件。
function foo (a)
print("foo 函数输出", a)
print("执行次数", 1,os.date())
return coroutine.yield(2 * a) -- 返回 2*a 的值
end
co = coroutine.create(function (a , b)
print("第一次协同程序执行输出", a, b) -- co-body 1 10
local r = foo(a + 1)
print("第二次协同程序执行输出", r)
local r, s = coroutine.yield(a + b, a - b) -- a,b的值为第一次调用协同程序时传入
print("第三次协同程序执行输出", r, s)
return b, "结束协同程序" -- b的值为第二次调用协同程序时传入
end)
print("main", coroutine.resume(co, 1, 10)) -- true, 4
print("--分割线----")
print("main", coroutine.resume(co, "r")) -- true 11 -9
print("---分割线---")
print("main", coroutine.resume(co, "x", "y")) -- true 10 end
print("---分割线---")
print("main", coroutine.resume(co, "x", "y")) -- cannot resume dead coroutine
print("---分割线---")
第一次协同程序执行输出 1 10
foo 函数输出 2
执行次数 1 Tue Nov 5 13:23:41 2019
main true 4
--分割线----
第二次协同程序执行输出 r
main true 11 -9
---分割线---
第三次协同程序执行输出 x y
main true 10 结束协同程序
---分割线---
main false cannot resume dead coroutine
---分割线---
总结:
- 调用resume,将协同程序唤醒,resume操作成功返回true,否则返回false;
- 协同程序运行,运行到yield语句;
- yield挂起协同程序,第一次resume返回;(注意:此处yield返回,参数是resume的参数)
- 第二次resume,再次唤醒协同程序;(注意:此处resume的参数中,除了第一个参数,剩下的参数将作为yield的参数)
- yield返回;
- 协同程序继续运行;
- 如果使用的协同程序继续运行完成后继续调用 resume方法则输出:cannot resume dead coroutine
resume和yield的配合强大之处在于,resume处于主程中,它将外部状态(数据)传入到协同程序内部;而yield则将内部的状态(数据)返回到主程中。
总体来说有点像断点再执行的样子。yield函数可以将正在运行的coroutine 挂起,并可以在适当的时候再重新被唤醒,然后继续运行,这是协程的精髓。
协程执行唯一性
当一个coroutine A在resume另一个coroutine B时,A的状态没有变为suspended,我们不能去resume它;但是它也不是running状态,因为当前正在running的是B。这时A的状态其实就是normal 状态了。
前面已经提到了返回值的case,下面再单独列出来加深理解
- main函数中没有yield,调用resume时,多余的参数,都被传递给main函数作为参数,下面的示例,1 2 3分别就是a b c的值了:
co = coroutine.create(function (a,b,c)
print(a,b,c)
end
)
coroutine.resume(co,1,2,3)
1 2 3
- main函数中有yield,所有被传递给yield的参数,都被返回。因此resume的返回值,除了标志正确运行的true外,还有传递给yield的参数值:
co = coroutine.create(function(a,b)
coroutine.yield(a+b,a-b)end)
a,b,c = coroutine.resume(co,10,20)
print(a,b,c)
true 30 -10
- yield也会把多余的参数返回给对应的resume,如下:
co = coroutine.create(function()
print("co",coroutine.yield())
end)
print(coroutine.status(co))
coroutine.resume(co)
print(coroutine.status(co))
print(coroutine.resume(co,4,5))
suspended
suspended
co 4 5
true
但是这里有一个现象,resume第一次返回竟然是空?没有执行起来,是不是很奇怪?再看一下例子:
coroutineFunc = function (a, b)
for i = 1, 10 do
print(i, a, b)
coroutine.yield()
end
end
co2 = coroutine.create(coroutineFunc) --创建协同程序co2
coroutine.resume(co2, 100, 200) -- 1 100 200 开启协同,传入参数用于初始化
coroutine.resume(co2) -- 2 100 200
coroutine.resume(co2, 500, 600) -- 3 100 200 继续协同,传入参数无效
co3 = coroutine.create(coroutineFunc) --创建协同程序co3
coroutine.resume(co3, 300, 400) -- 1 300 400 开启协同,传入参数用于初始化
coroutine.resume(co3) -- 2 300 400
coroutine.resume(co3) -- 3 300 400
协同中的参数传递形势很灵活,一定要注意区分,在启动coroutine的时候,resume的参数是传给主程序的;在唤醒yield的时候,参数是传递给yield的。看下面这个例子:
co = coroutine.create(function (a, b) print("co", a, b, coroutine.yield()) end)
coroutine.resume(co, 1, 2) --没输出结果,注意两个数字参数是传递给函数的
coroutine.resume(co, 3, 4, 5) --co 1 2 3 4 5,这里的两个数字参数由resume传递给yield
resume-yield来交换数据:
(1)resume把参数传给程序(相当于函数的参数调用);
(2)数据由yield传递给resume;
(3)resume的参数传递给yield;
(4)协同代码结束时的返回值,也会传给resume
再废话一步,举个例子解释上面的作用,完美百分百让你理解传递逻辑,
coroutineFunc = function (a, b)
key = a
print ("co",a)
coroutine.yield(b)
end
co2 = coroutine.create(coroutineFunc)
print(coroutine.resume(co2, 100, 200))
--co 100
--true 200
我们都知道resume返回的值;一部分是协同本身return的结果,另一部分执行成功返回true+value(这个value是yield传入的),这样是不是理清楚了.
当一个coroutine结束的时候,main函数的所有返回值都被返回给resume:
-
co = coroutine.create(function()
return 6,7 end)
print (coroutine.resume(co))true 6 7
总结
我们在同一个coroutine中,很少会将上面介绍的这些功能全都用上,但是所有这些功能都是很useful的。
目前为止,我们已经了解了Lua中coroutine的一些知识了。下面我们需要明确几个概念。Lua提供的是asymmetric coroutine,意思是说,它需要一个函数(yield)来挂起一个coroutine,但需要另一个函数(resume)来唤醒这个被挂起的coroutine。对应的,一些语言提供了symmetric coroutine,用来切换当前coroutine的函数只有一个。
有人想把Lua的coroutine称为semi-coroutine,但是这个词已经被用作别的意义了,用来表示一个被限制了一些功能来实现出来的coroutine,这样的coroutine,只有在一个coroutine的调用堆栈中,没有剩余任何挂起的调用时,才会被挂起,换句话说,就是只有main可以挂起。Python中的generator好像就是这样一个类似的semi-coroutine。
跟asymmetric coroutine和symmetric coroutine的区别不同,coroutine和generator(Python中的)的不同在于,generator并么有coroutine的功能强大,一些用coroutine可实现的有趣的功能,用generator是实现不了的。Lua提供了一个功能完整的coroutine,如果有人喜欢symmetric coroutine,可以自己简单的进行一下封装。
pipes和filters
下面这个例子是resume 和yield很好的实际应用。
function receive (prod)
local status, value = coroutine.resume(prod)
return value
end
function send (x)
coroutine.yield(x)
end
function producer()
return coroutine.create(function ()
while true do
local x = io.read() -- produce new value
print("我进来了吗?1")
send(x)
print("我进来了吗?2")
end
end)
end
function consumer (prod)
while true do
local x = receive(prod) -- receive from producer
io.write(x, "\n") -- consume new value
end
end
p = producer()
consumer(p)
简言之这个程序的执行顺序先调用consumer, 然后recv函数去resume唤醒producer,produce一个值,send给consumer,然后继续等待下一次resume唤醒。
同样下面这个例子是上面的拆分,帮助理解过程。
function send(x)
coroutine.yield(x)
end
co = coroutine.create(
function()
while true do
local x = io.read()
send(x) end
end)
print(coroutine.status(co))
print(coroutine.resume(co))
我们可以继续扩展一下上面的例子,增加一个filter,在producer和consumer之间做一些数据转换啥的。那么filter里都做些什么呢?我们先看一下没加filter之前的逻辑,基本就是producer去send,send to consumer,consumer去recv,recv from producer,可以这么理解吧。加了filter之后呢,因为filter需要对data做一些转换操作,因此这时的逻辑为,producer去send,send tofilter,filter去recv,recv from producer,filter去send,send to consumer,consumer去recv,recv from filter.
function send(x)
coroutine.yield(x)
end
function receive (prod)
local status, value = coroutine.resume(prod)
print("echo 1")
print("value is",value)
return value
end
function producer()
return coroutine.create(function ()
print("echo 3")
while true do
local x = io.read()
send(x)
end
end)
end
function consumer(prod)
while true do
print("why ?")
local x = receive(prod)
if x then
print("echo 2")
io.write(x, '\n')
else
break
end
end
end
function filter(prod)
return coroutine.create(function ()
for line = 1, math.huge do
print("echo ")
local x = receive(prod)
x = string.format('%5d %s', line, x)
send(x)
end
end)
end
p = producer()
f = filter(p)
consumer(f)
印有字串辅助看懂整个sequence,可以看到, consumer执行,透过receive resume叫起filter的协程,同时filter进来后又在透过receive的resume再叫起producer的协程。
讲到这里,你是否想起了unix中的pipe?coroutine怎么说也是multithreading的一种。使用pipe,每个task得以在各自的process里执行,而是用coroutine,每个task在各自的coroutine中执行。pipe在writer(producer)和reader(consumer)之间提供了一个buffer,因此相对的运行速度还是相当可以的。这个是pipe很重要的一个特性,因为process间通信,代价还是有点大的。使用coroutine,不同task之间的切换成本更小,基本上也就是一个函数调用,因此,writer和reader几乎可以说是齐头并进了啊。
用coroutine实现迭代器
我们可以把迭代器 循环看成是一个特殊的producer-consumer例子:迭代器produce,循环体consume。下面我们就看一下coroutine为我们提供的强大的功能,用coroutine来实现迭代器。
我们来遍历一个数组的全排列。先看一下普通的loop实现,代码如下:
function printResult(a)
for i = 1, #a do
io.write(a[i], ' ')
end
io.write('\n')
end
function permgen(a, n)
n = n or #a
if n <= 1 then
printResult(a)
else
for i = 1, n do
a[n], a[i] = a[i], a[n]
permgen(a, n-1)
a[n], a[i] = a[i], a[n]
end
end
end
permgen({1,2,3})
2 3 1
3 2 1
3 1 2
1 3 2
2 1 3
1 2 3
采用协程实现如下:
function printResult(a)
for i = 1, #a do
io.write(a[i], ' ')
end
io.write('\n')
end
function permgen(a, n)
n = n or #a
if n <= 1 then
coroutine.yield(a)
else
for i = 1, n do
a[n], a[i] = a[i], a[n]
permgen(a, n-1)
a[n], a[i] = a[i], a[n]
end
end
end
function permutations(a)
local co = coroutine.create(function () permgen(a) end)
return function ()
local code, res = coroutine.resume(co)
return res
end
end
for p in permutations({"a", "b", "c"}) do
printResult(p)
end
b c a
c b a
c a b
a c b
b a c
a b c
permutations 函数使用了一个Lua中的常规模式,将在函数中去resume一个对应的coroutine进行封装。Lua对这种模式提供了一个函数coroutine.wap 。跟create 一样,wrap 创建一个新的coroutine ,但是并不返回给coroutine,而是返回一个函数,调用这个函数,对应的coroutine就被唤醒去运行。跟原来的resume 不同的是,该函数不会返回errcode作为第一个返回值,一旦有error发生,就退出了(类似C语言的assert)。使用wrap, permutations可以如下实现:
function permutations (a)
return coroutine.wrap(function () permgen(a) end)
end
wrap 比create 跟简单,它实在的返回了我们最需要的东西:一个可以唤醒对应coroutine的函数。 但是不够灵活。没有办法去检查wrap 创建的coroutine的status, 也不能检查runtime-error(没有返回errcode,而是直接assert)
非抢占式多线程
我们知道,coroutine运行一系列的协作的多线程。每个coroutine相当于一个thread。一个yield-resume对可以在不同的thread之间切换控制权。但是,跟常规的multithr不同,coroutine是非抢占式的。一个coroutine在运行的时候,不可能被其他的coroutine从外部将其挂起,只有由其本身显式地调用yield才会挂起,并交出控制权。对一些程序来说,这没有任何问题,相反,因为非抢占式的缘故,程序变得更加简单。我们不需要担心同步问题的bug,因为在threads之间的同步都是显式的。我们只需要保证在对的时刻调用yield就可以了。
但是,使用非抢占式multithreading,不管哪个thread调用了一个阻塞的操作,那么整个程序都会被阻塞,这是不能容忍的。由于这个原因,很多程序员并不认为coroutine可以替代传统的multithread,但是,下面我们可以看到一个有趣的解决办法。
一个很典型的multithreading场景:通过http下载多个remote files。我们先来看下如何下载一个文件,这需要使用LuaSocket库,
local socket = require("socket")
require("socket")
host = "www.w3.org"
file = "/standards/xml/schema"
c = assert(socket.connect(host, 80))
c:send("GET " .. file .. " HTTP/1.0\r\n\r\n") -- 注意GET后和HTTP前面的空格
while true do
local s, status, partial = c:receive(2^10)
io.write(s or partial)
if status == "closed" then
break
end
end
c:close()
现在我们就知道怎么下载一个文件了。现在回到前面说的下载多个remote files的问题。当我们接收一个remote file的时候,程序花费了大多数时间去等待数据的到来,也就是在receive函数的调用是阻塞。因此,如果能够同时下载所有的files,那么程序的运行速度会快很多。下面我们看一下如何用coroutine来模拟这个实现。我们为每一个下载任务创建一个thread,在一个thread没有数据可用的时候,就调用yield 将程序控制权交给一个简单的dispatcher,由dispatcher来唤醒另一个thread。下面我们先把之前的代码写成一个函数,但是有少许改动,不再将file的内容输出到stdout了,而只是间的的输出filesize。
function download(host, file)
local c = assert(socket.connect(host, 80))
local count = 0 -- counts number of bytes read
c:send("GET " .. file .. " HTTP/1.0\r\n\r\n")
while true do
local s, status, partial = receive(c)
count = count + #(s or partial)
if status == "closed" then
break
end
end
c:close()
print(file, count)
end
function receive (connection)
return connection:receive(2^10)
end
但是,如果要同时下载多文件的话,这个函数必须非阻塞地接收数据。在没有数据接收的时候,就调用yield挂起,交出控制权。实现应该如下:
function receive(connection)
connection:settimeout(0) -- do not block
local s, status, partial = connection:receive(2^10)
if status == "timeout" then
coroutine.yield(connection)
end
return s or partial, status
end
settimeout(0)将这个连接设为非阻塞模式。当status变为“timeout”时,意味着该操作还没完成就返回了,这种情况下,该thread就yield。传递给yield的non-false参数,告诉dispatcher该线程仍然在运行。注意,即使timeout了,该连接还是会返回它已经收到的东西,存在partial变量中。
下面的代码展示了一个简单的dispatcher。表threads保存了一系列的运行中的thread。函数get 确保每个下载任务都单独一个thread。dispatcher本身是一个循环,不断的遍历所有的thread,一个一个的去resume。如果一个下载任务已经完成,一定要将该thread从表thread中删除。当没有thread在运行的时候,循环就停止了。
最后,程序创建它需要的threads,并调用dispatcher。例如,从w3c网站下载四个文档,程序如下所示:
local socket = require("socket")
function receive(connection)
connection:settimeout(0) -- do not block
local s, status, partial = connection:receive(2^10)
if status == "timeout" then
coroutine.yield(connection)
end
return s or partial, status
end
function download(host, file)
local c = assert(socket.connect(host, 80))
local count = 0 -- counts number of bytes read
c:send("GET " .. file .. " HTTP/1.0\r\n\r\n")
while true do
local s, status, partial = receive(c)
count = count + #(s or partial)
if status == "closed" then
break
end
end
c:close()
print(file, count)
end
threads = {} -- list of all live threads
function get(host, file)
-- create coroutine
local co = coroutine.create(function ()
download(host, file)
end)
-- intert it in the list
table.insert(threads, co)
end
function dispatch()
local i = 1
while true do
if threads[i] == nil then -- no more threads?
if threads[1] == nil then -- list is empty?
break
end
i = 1 -- restart the loop
end
local status, res = coroutine.resume(threads[i])
if not res then -- thread finished its task?
table.remove(threads, i)
else
i = i + 1
end
end
end
host = "www.w3.org"
get(host, "/TR/html401/html40.txt")
get(host, "/TR/2002/REC-xhtml1-20020801/xhtml1.pdf")
get(host, "/TR/REC-html32.html")
get(host, "/TR/2000/REC-DOM-Level-2-Core-20001113/DOM2-Core.txt")
dispatch() -- main loop
/TR/html401/html40.txt 629
/TR/REC-html32.html 606
/TR/2000/REC-DOM-Level-2-Core-20001113/DOM2-Core.txt 229699
/TR/2002/REC-xhtml1-20020801/xhtml1.pdf 115777
程序运行了10s左右,4个文件已经下载完成
又重新用阻塞式的顺序下载重试了一下,需要时间本地测试时变长,不知道是不是网路问题,阻塞的多文件下载代码如下,其实就是上面几段代码放在一块了
local socket = require("socket")
function receive (connection)
return connection:receive(2^10)
end
function download(host, file)
local c = assert(socket.connect(host, 80))
local count = 0 -- counts number of bytes read
c:send("GET " .. file .. " HTTP/1.0\r\n\r\n")
while true do
local s, status, partial = receive(c)
count = count + #(s or partial)
if status == "closed" then
break
end
end
c:close()
print(file, count)
end
host = "www.w3.org"
download(host, "/TR/html401/html40.txt")
download(host, "/TR/2002/REC-xhtml1-20020801/xhtml1.pdf")
download(host, "/TR/REC-html32.html")
download(host, "/TR/2000/REC-DOM-Level-2-Core-20001113/DOM2-Core.txt")
除了速度以外,其他有没有优化空间呢,答案是,有。当没有thread有数据接收时,dispatcher遍历了每一个thread去看它有没有数据过来,结果这个过程比阻塞式的版本多耗费了30倍的cpu。
为了避免这个情况,我们使用LuaSocket提供的select函数。它运行程序在等待一组sockets状态改变时阻塞。代码改动比较少,在循环中,收集timeout的连接到表connections 中,当所有的连接都timeout了,dispatcher调用select 来等待这些连接改变状态。该版本的程序,在博主开发环境测试,只需7s不到,就下载完成4个文件,除此之外,对cpu的消耗也小了很多,只比阻塞版本多一点点而已。新的dispatch代码如下:
function dispatch()
local i = 1
local connections = {}
while true do
if threads[i] == nil then -- no more threads?
if threads[1] == nil then -- list is empty?
break
end
i = 1 -- restart the loop
connections = {}
end
local status, res = coroutine.resume(threads[i])
if not res then -- thread finished its task?
table.remove(threads, i)
else
i = i + 1
connections[#connections + 1] = res
if #connections == #threads then -- all threads blocked?
socket.select(connections)
end
end
end
end
协程讲述完毕,上面的例子很好的阐述了其实现,供各位参考。