伍哥原创之rabbitmq在豆荚商城的应用

【伍哥原创】

1,前言

RabbitMQ 是由 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健壮以及可伸缩性出名的 Erlang 写成,因此也是继承了这些优点。

AMQP 里主要要说两个组件:Exchange 和 Queue (在 AMQP 1.0 里还会有变动),如下图所示,绿色的 X 就是 Exchange ,红色的是 Queue ,这两者都在 Server 端,又称作 Broker ,这部分是 RabbitMQ 实现的,而蓝色的则是客户端,通常有 Producer 和 Consumer 两种类型:
伍哥原创之rabbitmq在豆荚商城的应用
RabbitMQ作为一个非常成熟的消息队列技术方案,也应用到了豆荚商城项目里面。

2,邮件服务:将慢动作从请求中分离出来

作为一个商城,自然少不了给用户发送邮件。比如注册的时候要发送确认邮件,下单以后发送订单邮件,推广信息也需要发送邮件,类似的情况非常多。
实现邮件发送的通常做法比较简单,就是在HTTP请求中一并完成邮件发送这个动作。而发送邮件依赖于SMTP服务。在小并发的环境下,一切都工作的很正常。

但是,当并发请求上到一定的程度,问题就来了。HTTP必须等待SMTP这个慢动作,如果你需要带附件的话,情况就更糟糕了。
另外一个问题来自于SMTP,当请求过于频密的时候,SMTP就出现超负荷工作的情况,这样各种邮件发送的异常情况就在所难免了。

怎么才能很好的解决这个问题呢?
答案在前面就给出了,就是建立消息队列机制!
其实原理非常简单,就是在内存维护一个队列(queue),如果要发送一封邮件,就往队列里面写一条消息,也就是所谓的信息生产者。
再建立一个进程,处理队列里面的邮件发送,就是所谓的信息消费者。

由于商城是用PHP开发的,所以就需要支持amqp的PHP客户端代码。这里用的是php amqplib (http://code.google.com/p/php-amqplib/)。
首先是连接rabbitmq,获取一个通道,然后是发送消息,最后断开通道和连接。下面是代码示例:

1
2
3
4
5
6
7
8
9
10
11
$queue = 'mail_queue'
$conn = new AMQPConnection( $config [ 'host' ], $config [ 'port' ], $config [ 'user' ], $config [ 'pass' ]);
$channel = $conn ->channel();
$channel ->queue_declare( $queue , false, true, false, false);
$send_data = serialize( $send_msg ); //数据先序列化一下,也可以使用JSON格式化
$msg = new AMQPMessage( $send_data ,
         array ( 'delivery_mode' => 2) //让消息持久化
     );
$channel ->basic_publish( $msg , '' , $queue );
$channel ->close();
$conn ->close();

在项目里当然不能这样写,应该封装成一个分布式服务接口,融入到整个系统代码架构里面,方便其他地方,比如controller,model的使用。

接下来是实现消息的消费程序。这里用的是python的pika。
首先是连接rabbitmq,获取通道,开始消费队列里面的信息。以下的代码写在类里面:

1
2
3
4
5
6
7
self .connection = pika.BlockingConnection(pika.ConnectionParameters(host = self .rmq_host))
self .channel = self .connection.channel()
self .channel.queue_declare(queue = self .rmq_queue, durable = True )
self .channel.basic_qos(prefetch_count = 1 )
# callback里面就是具体处理消息的地方
self .channel.basic_consume( self .callback, queue = self .rmq_queue)
self .channel.start_consuming()

callback回调

1
2
3
4
5
6
7
8
9
10
11
def callback( self , ch, method, properties, body):
         time.sleep( 1 ) #休息一秒才发送邮件
     msg = phpserialize.loads(body) #按PHP的格式做反序列化
     validateutil = ValidateUtil()       
     if validateutil.isEmail(msg[ 'mail_to' ]):
         mail = Mailer()
         mail.setMailTo(msg[ 'mail_to' ])
         mail.setMailSubject(msg[ 'mail_subject' ])
         mail.setMailHtmlBody(msg[ 'mail_body' ])
         mail.sendEmail()
     ch.basic_ack(delivery_tag = method.delivery_tag)

这里只是骨干代码。应该建立一个python的project,在eclipse(加PyDev)里面管理起来。
你还需要用到:配置文件以及配置文件解析库,系统日志,Mailer,phpserialize,ValidateUtil等等辅助类库。
关于PyDev请参考:http://www.ibm.com/developerworks/cn/opensource/os-cn-ecl-pydev/
熟悉了邮件的应用,后面扩展到手机短信通知服务、站内通信消息等等就非常方便了。当然,面对这样的需求,我们就需要在实现时考虑使用可扩展的消息队列的模型了。

3,页面访问统计:通过写缓存减轻DB的负载

对于商城来说,都有商品推荐的功能,比如人气商品推荐。怎么定义人气呢?一般看商品页面的访问量。这里就出现了页面统计的需求了。统计的数据一般需要持久化到DB。

一般来说,某商品页面被访问一次,就应该插入或者更新一次DB记录。这完全没有什么技术难度。
然而当并发连接上到一定水平,DB的性能问题就出来了。因为DB,比如MYSQL,都有一定的锁机制。当出现频繁的insert或者update时,select的速度自然就受到很大制约了。而且打开一次页面就触发一次统计,也就要操作一次DB,那DB不哭才怪!

有见及此,我们就通过消息队列实现了页面访问统计的写缓存。

何谓写缓存?对于某些不需要高实时的数据,比如我们这里的页面访问统计,可以把更新操作先缓存起来,当累积到一定程度时,才进行一次实际的更新。这样的好处是显而易见的,DB操作少了很多,而且也避免的DB锁机制引发的性能问题。

实现写缓存的方式有很多,比如通过memcached来实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
$page = 'goods_100052' ;
$memcache = memcache_connect( '192.168.1.100' , 11711);
// 通过memcached提供的原子加操作,避免并发访问带来的统计出错.
$count = $memcache ->increment( $page , 1);
if (! $count ) {
     $memcache ->add( $page , 1, false, 0);
     exit ;
}
if ( $count >= 1000) {
     $sql = "update `goods_viewlog` set `count` = `count`+{$count} where `page` = $page" ;
     $result = $mysql ->query( $sql );
     if ( $result ) {
         // 更新成功后,把缓存统计清零
         $memcache ->set( $page , 0, false, 0);
     }
}

我们这里采用了消息队列的实现方式。
消息生产者代码和消息消费者代码和上面介绍的邮件是几乎一样的。唯一不同在于回调函数那里。这里就不再重复说明了。

4,总结

我们在开发消息队列应用特别要注意的是要先搞清楚消息队列的主要概念和机制:比如交换,队列,绑定,持久化等等。
搞清楚了以后,再根据具体的应用类型,定义好消息队列模型。
具体可以参考伍哥前面的文章

上一篇:WPF布局之让你的控件随着窗口等比放大缩小,适应多分辨率满屏填充应用


下一篇:TensorFlow 资料外链