Perl多线程(2):数据共享和线程安全

线程数据共享

在介绍Perl解释器线程的时候一直强调,Perl解释器线程在被创建出来的时候,将从父线程中拷贝数据到子线程中,使得数据是线程私有的,并且数据是线程隔离的。如果真的想要在线程间共享数据,需要显式使用threads::shared模块来扩展threads模块的功能。这个模块必须在先导入了threads模块的情况下使用,否则threads::shared模块里的功能都将没效果。

use threads;
use threads::shared;

要共享数据,只需使用threads::shared模块的share方法即可,也可以直接将数据标记上:shared属性的方式来共享。例如:

my $answer = 43;
my @arr = qw(1 2 3);
share($answer);
share(@arr);

my $answer :shared = 43;
my @arr :shared = qw(1 2 3);
my %hash :shared = (one=>1, two=>2, three=>3);

使用share()和:shared的区别在于后面这种标记的方式是在编译期间完成的。另外,使用:shared属性标记的方式可以直接共享引用类型的数据,但是share()不允许,它使用prototype限制了只允许接收变量类型的参数,可以使用&share()调用子程序的方式禁用prototype:

my $aref = &share([1 2 3]);

例如:

#!/usr/bin/perl
use strict;
use warnings;
use 5.010;

use threads;
use threads::shared;

my $foo :shared = 1;
my $bar = 1;
threads->create(
    sub {
        $foo++;
        $bar++;
        say "new thread: \$foo=$foo";   # 2
        say "new thread: \$bar=$bar";   # 2 
    }
)->join();

say "main thread: \$foo=$foo";   # 2
say "main thread: \$bar=$bar";   # 1

什么数据会共享

并非所有数据都可以共享,只有普通变量、数组、hash以及已共享数据的引用可以共享。也就是:

Ordinary scalars
Array refs
Hash refs
Scalar refs
Objects based on the above

例如:

use threads;
use threads::shared;

my $var = 1;     #  未共享数据
my $svar :shared = 2;    # 共享标量
my @arr :shared = qw(perl python shell);   # 共享数组
my %hash :shared;       # 共享hash

my $thr = threads->new(\&mysub);

sub mysub {
    $hash{a} = 1;       # 成功
    $hash{b} = $var;    # 成功:$var是普通变量
    $hash{c} = \$svar;  # 成功:\$svar是已共享标量
    $hash{d} = @arr;    # 成功:普通数组
    $hash{e} = \@arr;   # 成功:已共享数组的引用

    # $hash{f} = \$var; # 失败并die:$var未共享标量的引用
    # $hash{g} = [];    # 失败:未共享数组的引用
    # $hash{h} = {a=>1};# 失败:未共享hash的引用
}

$thr->join();     # join后文解释
while( my ($key, $value) = each %hash ){
    say "$key => $value";
}

如果共享hash或array类型,那么里面的所有元素都对外可见,但并不意味着里面的元素是共享的。共享hash/array和共享它们里面的元素是独立的,共享hash/array只意味着共享它们自身,但里面的元素会暴露。反之,可以直接共享某个元素,但hash/array自身不共享。(经测试,数组的元素无法共享,hash的元素可正常共享)

数据共享的问题:竞态

当多个线程在某一时刻都访问或修改同一个共享数据时,就会出现竞态问题(race condition),它意味着多线程的数据竞争问题。

例如:

use threads;
use threads::shared;

my $x :shared = 1;
my $thr1 = threads->new(\&sub1);
my $thr2 = threads->new(\&sub2);

$thr1->join();
$thr2->join();
print "$x\n";

sub sub1 { my $foo = $x; $x = $foo + 1; }
sub sub1 { my $bar = $x; $x = $bar + 1; }

执行上面的程序,结果可能会输出2或3,因为两个线程可能都取得x=1的值,也可能后一个线程取得前一个线程加法之后的值。

之所以会发生竞态问题,是因为对多个线程对同个数据的访问和修改时间点无法保证,这个时候数据是线程不安全的,也可称之为线程数据不同步。

所以,要解决数据竞态问题,必须对共享数据的步骤进行协调,比如修改数据时必须保证只能有一个线程去修改,这可以通过锁的方式来实现。

变量锁

threads::shared模块中提供了一个lock()方法,用来将共享数据进行独占锁定,被锁定的数据无法被其它线程修改,直到释放锁其它线程才可以获取锁并修改数据。

例如:

use threads;
use threads::shared;

my $var :shared = 3;

sub mysub {
    ...
    lock($var);
    ...
}  # 锁在这里自动被释放

没有unlock()这样直接释放锁的方法,而是在退出当前作用域的时候自动释放锁,就像词法变量一样。

另外,锁住hash和数组的时候,仅仅只是锁住它们自身,但lock()无法去锁hash/array中的元素。所以:

lock $myhash{'abc'};    # Error
lock %myhash;           # Correct

如果真想基于容器中元素进行锁定,可以使用线程信号量模块Thread::Semaphore,后文会介绍。

另外,lock()是可以递归的,在退出最外层lock()的作用域时释放锁。且递归时重复锁定同一个变量是幂等的。例如:

my $x :shared;
doit();
sub doit {
    {
        {
            lock($x); # Wait for lock
            lock($x); # 没任何作用,因为已经锁过一次了
            {
                lock($x); # 没任何作用,因为已经锁过一次了
                {
                    lock($x); # 没任何作用,因为已经锁过一次了
                    lockit_some_more();
                }
            }
        } # *** Implicit unlock here ***
    }
}
sub lockit_some_more {
    lock($x); # 没任何作用,因为已经锁过一次了
} # Nothing happens here

死锁问题

使用锁来协调共享数据的步骤能解决竞态问题,但是如果协调不好,很容易出现死锁问题。死锁是指两个或更多进程/线程互相等待锁的释放,导致每一个进程/线程都无法释放,从而出现无限等待的死循环问题。

例如:

use threads;
use threads::shared;

my $x :shared = 4;
my $y :shared = 'foo';

my $thr1 = threads->create(
    sub {
        lock($x);
        sleep 3;
        lock($y);
    }
);

my $thr2 = threads->create(
    sub {
        lock($y);
        sleep 3;
        lock($x);
    }
);

sleep 10;

上面的例子只要运行,两个线程将会出现死锁问题,因为thr1线程锁住$x、thr2锁住$y后,thr1申请$y的锁将等待thr2先释放,同理thr2申请$x的锁将等待thr1先释放。于是出现了互相等待的僵局,谁也不会也无法释放。

解决死锁最简单且最佳的方式是保证所有线程以相同的顺序去锁住每一个数据。例如,所有线程都以先锁住$x,再锁住$y,最后锁住$z的方式去执行代码。

另一个避免死锁的解决方案是尽可能让锁住共享数据的时间段变短,这样出现僵局的几率就会小很多。

但是这两种方式很多时候都派不上用场,因为需要用到锁的情况可能会比较复杂。下面介绍几种方式。

线程队列(Thread::Queue)

(Thread::Queue)队列数据结构(FIFO)是线程安全的,它保证了某些线程从一端写入数据,另一些线程从另一端读取数据。只要队列已经满了,写入操作就自动被阻塞直到有空间支持写操作,只要队列空了,读取操作就会自动阻塞直到队列中有数据可读。这种模式自身就保证了线程安全性。

在Perl中要使用线程队列,需要使用Thread::Queue模块,使用方式很简单。如下示例:

#!/usr/bin/perl
use strict;
use warnings;

use threads;
use Thread::Queue;

# 创建一个线程队列
my $DataQueue = Thread::Queue->new();

# 创建线程
my $thr = threads->new(
    sub {
        # 在循环中读取队列
        while (my $DataElement = $DataQueue->dequeue()) {
            print "Poped $DataElement off the queue\n";
        }
    }
);

# 向队列中写入一个数据
$DataQueue->enqueue(12);
sleep 1;

# 再次写入队列3个数据
$DataQueue->enqueue('a','b','c');
sleep 3;

# 关闭队列,让读取端不再阻塞
$DataQueue->enqueue(undef);

# 等待子线程并为其收尸
$thr->join();

关于Thread::Queue模块的用法,参见:https://www.cnblogs.com/f-ck-need-u/p/10422293.html

Thread::Semaphore

Thread::Semaphore实现了线程信号量,可以通过up()和down()来操作信号量,up()表示增加信号量的值,down()表示减信号量的值,按照锁的角度来看这是申请锁的操作,只要减法操作后信号量的值为负数,这次减法操作就会被阻塞,就像被锁住。

通过Thread::Semaphore的new()方法来创建一个信号量,如果不给任何参数,则默认创建一个信号量值为1的信号量。如果给new()一个整数值N,则表示创建一个信号量值为N的信号量。

使用信号量实现锁机制的示例:

#!/usr/bin/perl

use 5.010;
use threads;
use Thread::Semaphore;

# 新建一个信号量
my $sem = Thread::Semaphore->new();

# 全局共享变量
my $gbvar :shared = 0;

my $thr1 = threads->create(\&mysub, 1);
my $thr2 = threads->create(\&mysub, 2);
my $thr3 = threads->create(\&mysub, 3);

# 每个线程给全局共享变量依次加10
sub mysub {
    my $thr_id = shift;
    my $try_left = 10;
    my $local_value;
    sleep 1;
    while($try_left--){
        # 相当于获取锁
        $sem->down();

        $local_value = $gbvar;
        say "$try_left tries left for sub $thr_id "."(\$gbvar is $gbvar)";
        sleep 1;
        $local_value++;
        $gbvar = $local_value;
        
        # 相当于释放锁
        $sem->up();
    }
}

$thr1->join();
$thr2->join();
$thr3->join();

由于信号量可以锁住任何片段的代码,所以它的锁机制非常灵活。

实际上,up()和down()每次操作默认都只增、减1个信号量的值,但可以给它们传递参数来一次性请求加N、减N个信号量值,对于减法操作,如果请求减N导致信号量的值为负数,则该减法操作被阻塞,直到有足够的信号量完成这次减法。这时的信号量就像是一个计数器一样。

例如:

use threads;
use Thread::Semaphore;

# 创建一个信号量值为5的信号量
my $sem = Thread::Semaphore->new(5);

my $thr1 = threads->new(\&sub1);
my $thr2 = threads->new(\&sub1);

sub sub1 {
    # 申请锁
    $sem->down(5);  # 一次减5
    ... do something here ...
    $sem->up(5);  # 一次加5
}

$thr1->join();
$thr2->join();
上一篇:[原创]nginx添加module之threads


下一篇:mysql工具类日志(binlog、slowlog、errorlog)