[Real World Haskell翻译]第24章 并发和多核编程 第一部分并发编程

第24章 并发和多核编程 第一部分并发编程

当我们写这本书的时候,CPU架构正在以比过去几十年间更快的速度变化。

并发和并行的定义

并发程序需要同时执行多个不相关任务。考虑游戏服务器的例子:它通常是由几十个组件组成,每个组件都与外部世界有着复杂的交互。一个组件可能处理多用户聊天;几个会将处理几个玩家的输入并回复他们状态的更新,而另一个执行物理计算。
并发程序的正确执行并不需要多核,尽管可能会提高性能和响应速度。
与此相反,一个并行程序解决了一个单一的问题。考虑一个尝试在单只股票的价格波动来预测下一分钟的金融模型。如果我们要将此模型应用于每一支正在交易的股票,例如,估计哪一支股票我们应该买和卖,如果这个模型运行在500个核上而不是在1个核上,我们希望更快地得到答案。这表明,并行程序通常不取决于多个内核的存在才能正常工作。
另一个有用的并发和并行程序之间的区别在于他们与外界的交互。根据定义,并发程序持续地处理网络协议,数据库和与此类似。典型的并行程序可能会更专注:将数据组织成流,压缩(用更少的IO),然后将流传出。
许多传统的语言,并发和并行编程之间的边界进一步模糊,因为他们迫使程序员使用相同的原语来建构两种程序。
在本章中,我们将关注运行在单一操作系统进程上的并发和并行程序。 使用线程进行并发编程 作为并发程序组成的一部分,大多数编程语言提供了创建多个独立的控制线程的方式。 Haskell也不例外,虽然在Haskell中线程编程比其他语言看起来有点不同

在Haskell中,一个线程是一个独立于其他线程执行的IO action。创建一个线程,我们导入Control.Concurrent的模块并使用forkIO函数: ghci> :m +Control.Concurrent
ghci> :t forkIO
forkIO :: IO () -> IO ThreadId
ghci> :m +System.Directory
ghci> forkIO (writeFile "xyzzy" "seo craic nua!") >> doesFileExist "xyzzy"
False 新线程几乎是立即开始执行,同时创建它的线程仍在并发地执行。该线程会在它的IO action结束时停止执行。 线程是不确定的 GHC的runtime组件并不指定执行线程的顺序。这导致在前面的例子中,在原始线程检查新线程创建的文件xyzzy时,它可能被创建,或者还没被创建。如果我们尝试这个例子,然后删除xyzzy再试一次,我们可能会在第二次得到不同的结果。 隐藏延时 假设我们有一个大文件需要压缩并写入到磁盘,但我们要足够快地处理用户的输入,以使她感觉到我们的程序立即响应。如果我们使用forkIO在一个单独的线程将文件写出,我们就可以同时做这两件事: -- file: ch24/Compressor.hs
import Control.Concurrent (forkIO)
import Control.Exception (handle)
import Control.Monad (forever)
import qualified Data.ByteString.Lazy as L
import System.Console.Readline (readline) -- Provided by the 'zlib' package on http://hackage.haskell.org/
import Codec.Compression.GZip (compress) main = do
maybeLine <- readline "Enter a file to compress> "
case maybeLine of
Nothing -> return () -- user entered EOF
Just "" -> return () -- treat no name as "want to quit"
Just name -> do
handle print $ do
content <- L.readFile name
forkIO (compressFile name content)
return ()
main
where compressFile path = L.writeFile (path ++ ".gz") . compress 因为我们在这里使用的是lazy ByteString I/O,我们在主线程中所需做的所有就是打开该文件。实际读数发生在其它线程中。
如果用户输入的文件名不存在,handle print的使用提供给我们一种廉价的方式来打印错误消息。 简单的线程之间的通信 两个线程之间共享信息的最简单的方法是让他们都使用同一个变量。在我们的文件压缩的例子中,主线程与其他线程共享文件的名称和它的内容。因为Haskell的数据默认情况下是不可改变的,这会造成无风险:没有线程可以修改其他线程的文件的名称或它的内容。
我们往往需要线程积极地互相通信。例如,GHC不为线程提供判断其他线程目前是否正在执行,已完成或已崩溃的方法。但是,它提供了一个同步的变量类型,MVar,我们可以用它来为自己创造这种能力。
一个MVar就像一个单元素的盒子:它可以是满或空。我们可以把一些东西放进去,使其充满,或拿东西出来,使其空: ghci> :t putMVar
putMVar :: MVar a -> a -> IO ()
ghci> :t takeMVar
takeMVar :: MVar a -> IO a %就像我们即将展示的那样,GHC的线程是非常轻量级的。如果运行时提供一种方法来检查每个线程的状态,每一个线程的开销会增加,即使这个信息从未使用过。 如果我们试图把一个值放进满的MVar,我们的线程将会休眠,直到另一个线程把值取出去。同样,如果我们试图从一个空的MVar中取值,我们的线程进入睡眠状态,直到其他线程放一个值进去: -- file: ch24/MVarExample.hs
import Control.Concurrent
communicate = do
m <- newEmptyMVar
forkIO $ do
v <- takeMVar m
putStrLn ("received " ++ show v)
putStrLn "sending"
putMVar m "wake up!" newEmptyMVar函数有一个描述性的名称。要创建一个非空的MVar,我们使用newMVar: ghci> :t newEmptyMVar
newEmptyMVar :: IO (MVar a)
ghci> :t newMVar
newMVar :: a -> IO (MVar a) 让我们在GHC中运行我们的例子: ghci> :load MVarExample
[1 of 1] Compiling Main ( MVarExample.hs, interpreted )
Ok, modules loaded: Main.
ghci> communicate
sending
received "wake up!" 如果你有在传统编程语言下进行并发编程的背景,你能想到MVaras对于两个熟悉的目的是有用的:
•发送一个消息从一个线程到另一个,例如,通知。
•对一块可变数据提供互斥就是在线程*享。当它没有被使用时,我们把数据放入MVar。一个线程暂时取出它读取或修改。 主线程和等待其他线程 GHC的运行时系统对待程序的原始控制线程不同于其他线程。当这个线程执行完成时,运行时系统认为整体程序已完成。如果任何其他线程仍在执行,他们都将终止。
因此,当我们有长期运行的线程不能被杀死时,我们需要特别安排,确保直到其它线程完成时主线程才能完成。让我们开发一个小型库,使这个很容易做到: -- file: ch24/NiceFork.hs
import Control.Concurrent
import Control.Exception (Exception, try)
import qualified Data.Map as M data ThreadStatus = Running
| Finished -- terminated normally
| Threw Exception -- killed by uncaught exception
deriving (Eq, Show) -- | Create a new thread manager.
newManager :: IO ThreadManager -- | Create a new managed thread.
forkManaged :: ThreadManager -> IO () -> IO ThreadId -- | Immediately return the status of a managed thread.
getStatus :: ThreadManager -> ThreadId -> IO (Maybe ThreadStatus) -- | Block until a specific managed thread terminates.
waitFor :: ThreadManager -> ThreadId -> IO (Maybe ThreadStatus) -- | Block until all managed threads terminate.
waitAll :: ThreadManager -> IO () 使用通常的做法我们保持我们的ThreadManager类型抽象:我们把它包在一个newtype中并防止客户端创建这种类型的值。在我们的模块导出部分,我们列出了类型构造器和构建manager的IO action,但我们不导出数据构造器: -- file: ch24/NiceFork.hs
module NiceFork
(
ThreadManager
, newManager
, forkManaged
, getStatus
, waitFor
, waitAll
) where 对于ThreadManager的实现,我们维护了从线程ID到线程状态的映射。我们会参考这个作为线程的映射: -- file: ch24/NiceFork.hs
newtype ThreadManager =
Mgr (MVar (M.Map ThreadId (MVar ThreadStatus)))
deriving (Eq) newManager = Mgr `fmap` newMVar M.empty 我们有两个级别的MVar在这里使用。我们保持Map在一个MVar中。这让我们通过用新版本替换它“修改”Map。我们也确保任何线程使用Map都看到一致的内容。
对于我们管理的每个线程,我们维护一个MVar。每个线程MVar初始为空,这表示该线程正在执行。当线程完成或未捕获的异常杀死时,我们把这个信息送入MVar。
创建一个线程并查看其状态,我们必须执行一点点记录的工作: -- file: ch24/NiceFork.hs
forkManaged (Mgr mgr) body =
modifyMVar mgr $ \m -> do
state <- newEmptyMVar
tid <- forkIO $ do
result <- try body
putMVar state (either Threw (const Finished) result)
return (M.insert tid state m, tid) 安全修改MVar 前面的代码中我们在forkManaged中的modifyMVar函数是非常有用的。它是一个takeMVar和putMVar的安全的结合: ghci> :t modifyMVar
modifyMVar :: MVar a -> (a -> IO (a, b)) -> IO b 它从MVar中取值并传递给一个函数。此函数产生一个新的值并返回一个结果。如果函数抛出一个异常,modifyMVar把原始值重新放入MVar,否则,它放入新的值。它返回函数的其它元素作为它自己的结果。
当我们使用modifyMVar替代使用takeMVar和putMVar手动管理一个MVar,我们避免了两种常见的并发错误:
•忘记把一个值重新放入MVar。这可能会导致死锁,在这种情况下,一些线程会永远等待一个MVar,但永远不会有一个值放入它。
•如果没有考虑到会抛出一个异常的可能性,扰乱一段代码的流。这会导致对putMVar的调用产生,但实际上没有发生,一样会导致死锁。
由于这些不错的安全特性,使用modifyMVar是明智的。 安全资源管理:一个容易实现的好主意 我们可以采取modifyMVar遵照的并把它应用到很多其他资源管理情况的模式。下面是该模式的步骤:
1。获取资源。
2。把资源传给一个用它做一些事情的函数。
3。一定要释放资源,即使函数抛出一个异常。如果发生这种情况,重新抛出异常,然后应用程序代码可以捕获。
安全性不谈,这种方法还有一个好处:它可以使我们的代码更短更容易理解。正如我们可以在之前的代码清单里的forkManaged中看到的,Haskell匿名函数的轻量级语法使得这种风格的代码不引人注目。
下面是modifyMVar的定义,这让你可以看到这种模式的一种特定形式: -- file: ch24/ModifyMVar.hs
import Control.Concurrent (MVar, putMVar, takeMVar)
import Control.Exception (block, catch, throw, unblock)
import Prelude hiding (catch) -- use Control.Exception's version
modifyMVar :: MVar a -> (a -> IO (a,b)) -> IO b
modifyMVar m io =
block $ do
a <- takeMVar m
(b,r) <- unblock (io a) `catch` \e ->
putMVar m a >> throw e
putMVar m b
return r 无论你的工作是和网络连接,数据库handles,还是和用C库管理的数据相关,你应该都很容易采用上面的代码来适应您的特定需求。 寻找线程状态 我们的getStatus函数告诉我们线程的当前状态。如果该线程不再被管理,它返回Nothing: -- file: ch24/NiceFork.hs
getStatus (Mgr mgr) tid =
modifyMVar mgr $ \m ->
case M.lookup tid m of
Nothing -> return (m, Nothing)
Just st -> tryTakeMVar st >>= \mst -> case mst of
Nothing -> return (m, Just Running)
Just sth -> return (M.delete tid m, Just sth) 如果线程仍在运行,只返回Just Running。否则,它表明了为什么线程被终止并停止管理线程。
如果tryTakeMVar函数发现MVar是空的,它立即返回Nothing而不是阻塞: ghci> :t tryTakeMVar
tryTakeMVar :: MVar a -> IO (Maybe a) 否则,它通常从MVar中取值。
waitFor函数有类似的行为,但它不是立即返回,而是直到给定线程在返回前终止时阻塞: -- file: ch24/NiceFork.hs
waitFor (Mgr mgr) tid = do
maybeDone <- modifyMVar mgr $ \m ->
return $ case M.updateLookupWithKey (\_ _ -> Nothing) tid m of
(Nothing, _) -> (m, Nothing)
(done, m') -> (m', done)
case maybeDone of
Nothing -> return Nothing
Just st -> Just `fmap` takeMVar st 首先提取持有线程的状态的MVar,如果它存在。Map类型的updateLookupWithKey函数是有用的,它综合了查找、修改和删除值: ghci> :m +Data.Map
ghci> :t updateLookupWithKey
updateLookupWithKey :: (Ord k) =>
(k -> a -> Maybe a) -> k -> Map k a -> (Maybe a, Map k a) 在这种情况下,我们总是要删除MVar持有的线程状态,如果它存在,我们的线程管理器将不再管理线程。如果有要提取的值,我们从MVar取得线程的退出状态并返回它。
我们最终的有用的函数只是简单地等待目前管理的所有线程完成并忽略他们的退出状态: -- file: ch24/NiceFork.hs
waitAll (Mgr mgr) = modifyMVar mgr elems >>= mapM_ takeMVar
where elems m = return (M.empty, M.elems m) 写出更紧凑的代码 我们对waitFor的定义有点不太满意,因为我们或多或少在两个相同的情景下执行:在modifyMVar函数的内部调用,和在它返回值的时候再次调用。
果然,我们可以运用一个函数让我们轻易消除这种重复。这个问题中的函数是join,来自Control.Monad模块: ghci> :m +Control.Monad
ghci> :t join
join :: (Monad m) => m (m a) -> m a The trick here is to see that we can get rid of the second caseexpression by having the first one return the IOaction that we should perform once we return from modifyMVar.We’ll use jointo execute the action: -- file: ch24/NiceFork.hs
waitFor2 (Mgr mgr) tid =
join . modifyMVar mgr $ \m ->
return $ case M.updateLookupWithKey (\_ _ -> Nothing) tid m of
(Nothing, _) -> (m, return Nothing)
(Just st, m') -> (m', Just `fmap` takeMVar st) 这是一个有趣的想法:我们可以在纯的代码中创建一个monadic函数或action,然后传递它,直到我们在可以使用它的monad中结束。这是编写代码的敏捷方法,once you develop an eye for when it makes sense. 通过channels通信 线程之间的一次性通信,MVar是非常好的。另一种类型,Chan,提供了一个单向通信的渠道。下面是使用它的简单的例子: -- file: ch24/Chan.hs
import Control.Concurrent
import Control.Concurrent.Chan
chanExample = do
ch <- newChan
forkIO $ do
writeChan ch "hello world"
writeChan ch "now i quit"
readChan ch >>= print
readChan ch >>= print 如果一个Chan是空的,readChan阻塞,直到存在一个值可以读取。 writeChan函数从不阻塞;它立即把一个新值写入Chan。 需要知道的有用的东西 MVar和Chan并非严格 像大多数的haskell容器类型,MVar和Chan是不严格的:从不计算其内容。我们提到这一点,不是因为它是个问题,而是因为它是常见的盲点。人们倾向于假设,这些类型是严格的,也许是因为他们用在IO monad中。
至于其他的容器类型,关于MVar和Chan类型严格的错误假设往往是一个空间或性能缺陷的主因。这是一个看似正常的情景。
我们fork一个线程在另一个核执行一些昂贵的计算: -- file: ch24/Expensive.hs
import Control.Concurrent
notQuiteRight = do
mv <- newEmptyMVar
forkIO $ expensiveComputation_stricter mv
someOtherActivity
result <- takeMVar mv
print result 它似乎做了一些东西,并把结果返回给MVar: -- file: ch24/Expensive.hs
expensiveComputation mv = do
let a = "this is "
b = "not really "
c = "all that expensive"
putMVar mv (a ++ b ++ c) 当我们把结果从父线程的MVar中取出并尝试用它做一些事情,我们的线程开始飞快地计算,因为我们从未强迫计算发生在其它的线程!
像往常一样,这个解决方案很直接,我们知道这有一个潜在的问题:我们给forked线程增加严格性,以确保计算发生这里。此严格最好是添加在一个位置,以避免我们可能会忘记添加它的可能: -- file: ch24/ModifyMVarStrict.hs
{-# LANGUAGE BangPatterns #-} import Control.Concurrent (MVar, putMVar, takeMVar)
import Control.Exception (block, catch, throw, unblock)
import Prelude hiding (catch) -- use Control.Exception's version modifyMVar_strict :: MVar a -> (a -> IO a) -> IO ()
modifyMVar_strict m io = block $ do
a <- takeMVar m
!b <- unblock (io a) `catch` \e ->
putMVar m a >> throw e
putMVar m b %检查Hackage总是值得的
%在Hackage包数据库,你会发现一个库,strict-concurrency,提供了MVar和Chan类型的严格版本。 前面的代码中的!模式非常易用,但它并不总是足以确保我们的数据被计算。如需更完整的方法,请参阅第552页的“从计算中分离算法”。 Chan是无限的 由于writeChan总是立即成功执行,使用Chan有一个潜在的风险。如果一个线程向Chan写入比另一个线程从中读取得更频繁,Chan将会增长在一个未检查的方法:未读消息就会堆积起来,读者就会越来越落在后面。 共享状态并发仍然困难 尽管相对其他语言Haskell有不同的原语用于在其他线程之间共享数据,它仍然遭遇了相同的基本问题:编写正确的并发程序仍是极度困难的。事实上,一些在其他语言中并发编程的陷阱同样适用于Haskell。两个较知名的问题是死锁和饥饿。 死锁 在死锁的情况下,两个或多个线程在访问共享资源的时候相冲突而永远卡住。使一个多线程程序死锁的经典方法是忘记获取锁的顺序。这种错误是很常见的,它有一个名字:lock order inversion。 Haskell没有提供锁,MVar类型很容易出现顺序倒置的问题。下面是一个简单的例子: -- file: ch24/LockHierarchy.hs
import Control.Concurrent nestedModification outer inner = do
modifyMVar_ outer $ \x -> do
yield -- force this thread to temporarily yield the CPU
modifyMVar_ inner $ \y -> return (y + 1)
return (x + 1)
putStrLn "done" main = do
a <- newMVar 1
b <- newMVar 2
forkIO $ nestedModification a b
forkIO $ nestedModification b a 如果我们在ghci中运行这个,它通常但并不总是什么也不打印,说明这两个线程都被卡住。
与nestedModification函数相关的问题很容易被发现。在第一个线程,我们先取MVar a,再取b。在第二个中,我们先取b,然后a。如果第一个线程成功取得a,第二个取得b,两个线程将阻塞,都试图取一个已经被另一个线程掏空的MVar,所以都不能进行下去。
在众多的语言中,解决顺序倒置问题的通常做法是始终遵循请求资源时保持一致的顺序。由于这种方法需要手动维护一种编码规范,在实践中很容易迷失。
为了使问题更加复杂,在真正的代码中这些倒置问题可能难以发现。Mvar的获取经常是在不同文件的多个函数中,使目视检查更加困难。更糟糕的是,这些问题往往是间歇性的,这使得他们很难再现,不要想着可以隔离并修复它。 饥饿 并发软件也很容易饥饿,其中一个线程抢占一个共享资源,防止另一个线程使用它。人们很容易想象这是如何发生的:一个线程用一个执行100毫秒的代码块调用modifyMVar,而另一个用一个执行1毫秒的代码块在相同的MVar上调用modifyMVar。第二个线程不能取得进展,直到第一个把一个值放回MVar。
MVar类型不严格的特征不但会引起还会加剧饥饿问题。如果我们把thunk放入MVar计算是很昂贵的,然后在一个线程中把它从MVar中取出,否则看起来它应该很便宜,如果计算thunk,该线程可能会突然变成昂贵的计算。我们在第539页的关于“MVar和Chan非严格”中给出了建议。 有希望吗? 幸运的是,我们这里介绍的并发API决非故事的结局。最近的一个Haskell更新,软件事务内存(STM),既容易,又安全地工作。我们将在第28章讨论。 %练习 %1。Chan类型的实现使用MVar。使用MVar开发一个BoundedChan库。
%你的newBoundedChan函数应该接受一个Int型的参数,限制未读条目的数量需要在BoundedChan中呈现一次。
如果这个限制被触及到,对你的writeBoundedChan函数的调用必须阻塞,直到读取器使用readBoundedChan消耗一个值。
%2。虽然我们已经提到在Hackage库中strict-concurrency package的存在,尝试开发自己的,作为内建类型MVar的封装。遵照经典的haskell实践,使你的库类型安全,如此用户将不会意外地混合使用严格和不严格的MVar。 在GHC中使用多核 默认情况下,GHC生成只使用一个核的程序,即使我们明确地写并发的代码。要使用多个核,我们必须明确地选择这样做。当我们生成一个可执行程序,我们在link时做出这样的选择:
•nonthreaded运行时库运行的所有haskell线程在一个单一的操作系统线程。这种运行时对于创建线程和数据在MVar中的传递是非常高效的。
•threaded运行时库使用多个操作系统线程运行haskell线程。它创建线程和使用MVar需要更大的开销。
如果我们传递-threaded选项给编译器,它将我们的程序和threaded运行时库相链接。当我们在编译库或源文件时,只有当我们最后生成一个可执行文件时,我们并不需要使用-threaded。
即使我们为我们的程序选择threaded运行时,它仍然会默认使用单核,我们必须明确地告诉运行时要使用多少个核。 运行时选项 在我们程序的命令行上,我们可以传递选项给GHC的运行时系统。在把控制权交给我们的代码之前,运行时为了特殊的命令行选项+ RTS扫描程序的参数。它解释了所有的运行时系统选项(直到特殊选项​​-RTS),而不是我们的程序来解释。它从我们的代码中隐藏了所有这些选项。当我们使用System.Environment模块的getArgs函数获得我们的命令行参数,我们不会在列表中找到任何运行时选项。
threaded运行时接受一个选项-N。这需要一个参数,它指定GHC的运行时系统应该使用的核数。选项​​解析器是挑剔的:-N和跟随的数字之间不能有任何空格。选项-N4是可接受,但-N 4不能。 从Haskell寻找可用的核数 模块GHC.Conc导出一个变量,numCapabilities,它告诉我们运行时系统被-N RTS选项指定多少个核: -- file: ch24/NumCapabilities.hs
import GHC.Conc (numCapabilities)
import System.Environment (getArgs) main = do
args <- getArgs
putStrLn $ "command line arguments: " ++ show args
putStrLn $ "number of cores: " ++ show numCapabilities 如果我们编译并运行这个程序,我们可以看到这个选项对于运行时系统是不可见的,但是我们可以看到它可以运行在多少个核上: $ ghc -c NumCapabilities.hs
$ ghc -threaded -o NumCapabilities NumCapabilities.o
$ ./NumCapabilities +RTS -N4 -RTS foo
command line arguments: ["foo"]
number of cores: 4 选择正确的运行时 决定使用哪个运行时并不完全明确。treaded运行时可以使用多个核,它有成本:线程和它们之间共享的数据比nonthreaded运行时更昂贵。
此外,GHC6.8.3版本使用的垃圾收集器是单线程的:当它运行在一个核的时候它暂停所有其他的线程。这限制了我们希望看到的使用多个核时性能的提升。在许多现实世界的并发程序中,个别的线程将其大部分时间花在等待网络请求或响应。在这些情况下,如果一个Haskell程序为数以万计的并发的客户服务,nonthreaded线程的低开销可能会有帮助。例如,代替在四个核上使用threaded运行时的单一的服务器程序,我们可能会看到更好的表现,如果我们设计的服务器同时运行四个副本并使用nonthreaded运行时。
我们这里的目的不是阻止你使用threaded运行时。它不再比nonthreaded运行时昂贵得很多,相比大多数其他编程语言的运行时线程仍然惊人的便宜。我们只希望明确表示切换到线程运行时不一定会导致自动获胜。
上一篇:JQuery选择器,事件,DOM操作,动画


下一篇:Java static块