征服 Kestrel + XMemcached

原文链接:https://my.oschina.net/mohaiyong/blog/221298 征服Kestrel,介绍XMemcached对于Kestrel的支持实现。

关于XMemcached具体代码,可以参考 Memcached笔记——(二)XMemcached&Spring集成

这里为了代码简洁,直接使用Spring+XMemcached集成模式,先给出Spring的配置文件:
<?xml version="1.0" encoding="UTF-8"?>
<beans
	xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:p="http://www.springframework.org/schema/p"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"
>
	<!-- http://code.google.com/p/xmemcached/wiki/Spring_Integration -->
	<bean
		id="memcachedClientBuilder"
		class="net.rubyeye.xmemcached.XMemcachedClientBuilder"
		p:connectionPoolSize="${kestrel.connectionPoolSize}"
		p:failureMode="${kestrel.failureMode}"
	>
		<!-- XMemcachedClientBuilder have two arguments.First is server list,and 
			second is weights array. -->
		<constructor-arg>
			<list>
				<bean class="java.net.InetSocketAddress">
					<constructor-arg>
						<value>${kestrel.server1.host}</value>
					</constructor-arg>
					<constructor-arg>
						<value>${kestrel.server1.port}</value>
					</constructor-arg>
				</bean>
			</list>
		</constructor-arg>
		<constructor-arg>
			<list>
				<value>${kestrel.server1.weight}</value>
			</list>
		</constructor-arg>
		<property name="commandFactory">
			<bean class="net.rubyeye.xmemcached.command.KestrelCommandFactory" />
		</property>
		<property name="sessionLocator">
			<bean class="net.rubyeye.xmemcached.impl.KetamaMemcachedSessionLocator" />
		</property>
		<property name="transcoder">
			<bean class="net.rubyeye.xmemcached.transcoders.SerializingTranscoder" />
		</property>
	</bean>
	<!-- Use factory bean to build memcached client -->
	<bean
		id="memcachedClient"
		factory-bean="memcachedClientBuilder"
		factory-method="build"
		destroy-method="shutdown" />
</beans>


注意:这里要使用KestrelCommandFactory
<property name="commandFactory">
	<bean class="net.rubyeye.xmemcached.command.KestrelCommandFactory" />
</property>


注意:KestrelCommandFactory会在数据头加一个4字节的flag,支持存储任意可序列化类型。(从我的理解就是标记消息长度 )。如果收发双方Client不同,为确保移植性,需要将数据转为String格式,舍弃这个flag!
<bean
		id="memcachedClient"
		factory-bean="memcachedClientBuilder"
		factory-method="build"
		destroy-method="shutdown"
	>
		<!-- 如果收发双方使用Client不同,开启该选项 -->
		<property
			name="primitiveAsString"
			value="true" />
	</bean>

如果开启上述选项,则不能支持序列化操作,仅支持String类型!!!

附kestrel.properties:
#连接池大小即客户端个数
kestrel.connectionPoolSize=50
kestrel.failureMode=true
#server1
kestrel.server1.host=10.11.155.24
kestrel.server1.port=22133
kestrel.server1.weight=1


做一个简要的TestUnit:
import static junit.framework.Assert.*;
import net.rubyeye.xmemcached.MemcachedClient;

import org.junit.Before;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
 * Kestrel 测试
 * 
 * @author Snowolf
 * @version 1.0
 * @since 1.0
 */
public class KestrelTest {

        // 队列名称
	private final static String QUEUE_NAME = "KQ";

	private ApplicationContext app;
	private MemcachedClient memcachedClient;

	/**
	 * @throws java.lang.Exception
	 */
	@Before
	public void before() throws Exception {
		app = new ClassPathXmlApplicationContext("applicationContext.xml");
		memcachedClient = (MemcachedClient) app.getBean("memcachedClient");
	}

	/**
	 * 发包测试
	 */
	@Test
	public void send() {

		for (int i = 0; i < 100; i++) {
			try {
				memcachedClient.set(QUEUE_NAME, 0, i);
			} catch (Exception e) {
				fail(e.getMessage());
				e.printStackTrace();
			}
		}
	}

	/**
	 * 收包测试
	 */
	@Test
	public void receive() {
		Integer value;
		try {
			while (true) {
				value = (Integer) memcachedClient.get(QUEUE_NAME);
				if (value == null) {
					break;
				}
				System.out.println(value);
			}
		} catch (Exception e) {
			fail(e.getMessage());
			e.printStackTrace();
		}
	}
}

可通过上述代码进行服务的简要测试,同样可作为Kestrel+XMemcached接入项目的参考实现!


简述:
  • 发包服务持续将数据SET到队列KQ中
  • 收包服务持续将数据从队列KE中GET出来

队列服务,都是基本概念,队首出队,队尾入队,适用于异步消息传输。
Kestrel基本参照Memcached协议,可恢复,确保服务重启后可以保存重启前的消息队列,也就是不会丢消息。
依靠XMemcahed,可以做Kestrel集群,分布式扩充Kestrel服务!

相关工程代码,详见附件!


相关链接:
征服 Kestrel
征服 Kestrel + XMemcached
征服 Kestrel + XMemcached + Spring TaskExecutor

转载于:https://my.oschina.net/mohaiyong/blog/221298

上一篇:将Xyplorer设为windows默认的资源管理器,及设为左右双窗口


下一篇:将Scala应用程序与NewRelic Java Agent集成