Discuz!NT中集成Memcached分布式缓存

大约在两年前我写过一篇关于Discuz!NT缓存架构的文章,在那篇文章的结尾介绍了在IIS中如果开启多个
应用程序池会造成多个缓存实例之间数据同步的问题。虽然给出了一个解决方案,但无形中却把压力转移到了
磁盘I/O上(多个进程并发访问cache.config文件)。其实从那时起我就开始关注有什么更好的方案,当然今
天本文中所说的Memcached,以及Velocity等这类的分布式缓存方案之前都考虑过,但一直未能决定该使用那
个。起码Velocity要在.net 4.0之后才会提供,虽然是原生态,但有些远水解不了近火。

我想真正等到Velocity能堪当重任还要等上一段时间。于是我就开始将注意力转移到了Memcached,必定
有Facebook这只“超级小白鼠”使用它并且反响还不错。所以就开始尝试动手在产品中集成Memcached。

其实在之前的那篇关于Discuz!NT缓存架构的文章中已提到过,使用了设计模式中的“策略模式”来构造。
所以为了与以往使用缓存的代码格式相兼容,所以这里采用新添加MemCachedStrategy(MemCached策略)
来构造一个缓存策略类以便于当管理后台开启“MemCached”时以“MemCached策略模式”来做为当前系统默认
的策略模式。

其代码段如下(Discuz.Cache/MemCached.cs):


/// <summary>
/// MemCache缓存策略类
/// </summary>
public class MemCachedStrategy : Discuz.Cache.ICacheStrategy
{     /// <summary>
    /// 添加指定ID的对象
    /// </summary>
    /// <param name="objId"></param>
    /// <param name="o"></param>
    public void AddObject(string objId, object o)
    {
        RemoveObject(objId);
        if (TimeOut > 0)
        {
            MemCachedManager.CacheClient.Set(objId, o, System.DateTime.Now.AddMinutes(TimeOut));
        }
        else
        {
            MemCachedManager.CacheClient.Set(objId, o);
        }
    }     /// <summary>
    /// 添加指定ID的对象(关联指定文件组)
    /// </summary>
    /// <param name="objId"></param>
    /// <param name="o"></param>
    /// <param name="files"></param>
    public void AddObjectWithFileChange(string objId, object o, string[] files)
    {
        ;
    }     /// <summary>
    /// 添加指定ID的对象(关联指定键值组)
    /// </summary>
    /// <param name="objId"></param>
    /// <param name="o"></param>
    /// <param name="dependKey"></param>
    public void AddObjectWithDepend(string objId, object o, string[] dependKey)
    {
        ;
    }     /// <summary>
    /// 移除指定ID的对象
    /// </summary>
    /// <param name="objId"></param>
    public void RemoveObject(string objId)
    {
        if (MemCachedManager.CacheClient.KeyExists(objId))
            MemCachedManager.CacheClient.Delete(objId);
    }     /// <summary>
    /// 返回指定ID的对象
    /// </summary>
    /// <param name="objId"></param>
    /// <returns></returns>
    public object RetrieveObject(string objId)
    {
        return MemCachedManager.CacheClient.Get(objId);
    }     /// <summary>
    /// 到期时间
    /// </summary>
    public int TimeOut { set; get; }
}

上面类实现的接口Discuz.Cache.ICacheStrategy定义如下:


/// <summary>
 /// 公共缓存策略接口
 /// </summary>
 public interface ICacheStrategy
 {
     /// <summary>
     /// 添加指定ID的对象
     /// </summary>
     /// <param name="objId"></param>
     /// <param name="o"></param>
     void AddObject(string objId, object o);      /// <summary>
     /// 添加指定ID的对象(关联指定文件组)
     /// </summary>
     /// <param name="objId"></param>
     /// <param name="o"></param>
     /// <param name="files"></param>
     void AddObjectWithFileChange(string objId, object o, string[] files);      /// <summary>
     /// 添加指定ID的对象(关联指定键值组)
     /// </summary>
     /// <param name="objId"></param>
     /// <param name="o"></param>
     /// <param name="dependKey"></param>
     void AddObjectWithDepend(string objId, object o, string[] dependKey);      /// <summary>
     /// 移除指定ID的对象
     /// </summary>
     /// <param name="objId"></param>
     void RemoveObject(string objId);      /// <summary>
     /// 返回指定ID的对象
     /// </summary>
     /// <param name="objId"></param>
     /// <returns></returns>
     object RetrieveObject(string objId);      /// <summary>
     /// 到期时间
     /// </summary>
     int TimeOut { set;get;}
}

当然在MemCachedStrategy类中还有一个对象要加以说明,就是MemCachedManager,该类主要是对
Memcached一些常操作和相关初始化实例调用的“封装”,下面是是其变量定义和初始化构造方法的代码:


/// <summary>
/// MemCache管理操作类
/// </summary>
public sealed class MemCachedManager
{
    #region 静态方法和属性
    private static MemcachedClient mc = null;     private static SockIOPool pool = null;     private static MemCachedConfigInfo memCachedConfigInfo = MemCachedConfigs.GetConfig();     private static string [] serverList = null;     static MemCachedManager()
    {
        CreateManager();
    }     private static void CreateManager()
    {
        serverList = Utils.SplitString(memCachedConfigInfo.ServerList, ""r"n");         pool = SockIOPool.GetInstance(memCachedConfigInfo.PoolName);
        pool.SetServers(serverList);
        pool.InitConnections = memCachedConfigInfo.IntConnections;//初始化链接数
        pool.MinConnections = memCachedConfigInfo.MinConnections;//最少链接数
        pool.MaxConnections = memCachedConfigInfo.MaxConnections;//最大连接数
        pool.SocketConnectTimeout = memCachedConfigInfo.SocketConnectTimeout;//Socket链接超时时间
        pool.SocketTimeout = memCachedConfigInfo.SocketTimeout;// Socket超时时间
        pool.MaintenanceSleep = memCachedConfigInfo.MaintenanceSleep;//维护线程休息时间
        pool.Failover = memCachedConfigInfo.FailOver; //失效转移(一种备份操作模式)
        pool.Nagle = memCachedConfigInfo.Nagle;//是否用nagle算法启动socket
        pool.HashingAlgorithm = HashingAlgorithm.NewCompatibleHash;
        pool.Initialize();
                 mc = new MemcachedClient();
        mc.PoolName = memCachedConfigInfo.PoolName;
        mc.EnableCompression = false;
    }     /// <summary>
    /// 缓存服务器地址列表
    /// </summary>
    public static string[] ServerList
    {
        set
        {
            if (value != null)
                serverList = value;
        }
        get { return serverList; }
    }     /// <summary>
    /// 客户端缓存操作对象
    /// </summary>
    public static MemcachedClient CacheClient
    {
        get
        {
            if (mc == null)
                CreateManager();             return mc;
        }
    }     public static void Dispose()
    {
        if (pool != null)
            pool.Shutdown();
    }
    

上面代码中构造方法会初始化一个池来管理执行Socket链接,并提供静态属性CacheClient以便MemCachedStrategy
来调用。

当然我还在这个管理操作类中添加了几个方法分别用于检测当前有效的分布式缓存服务器的列表,向指定(或全部)
缓存服务器发送特定stats命令来获取当前缓存服务器上的数据信息和内存分配信息等,相应的方法如下(详情见注释):


/// <summary>
/// 获取当前缓存键值所存储在的服务器
/// </summary>
/// <param name="key">当前缓存键</param>
/// <returns>当前缓存键值所存储在的服务器</returns>
public static string GetSocketHost(string key)
{
    string hostName = "";
    SockIO sock = null;
    try
    {
        sock = SockIOPool.GetInstance(memCachedConfigInfo.PoolName).GetSock(key);
        if (sock != null)
        {
            hostName = sock.Host;
        }
    }
    finally
    {
        if (sock != null)
            sock.Close();
    }
    return hostName;
} /// <summary>
/// 获取有效的服务器地址
/// </summary>
/// <returns>有效的服务器地</returns>
public static string[] GetConnectedSocketHost()
{
    SockIO sock = null;
    string connectedHost = null;
    foreach (string hostName in serverList)
    {
        if (!Discuz.Common.Utils.StrIsNullOrEmpty(hostName))
        {
            try
            {
                sock = SockIOPool.GetInstance(memCachedConfigInfo.PoolName).GetConnection(hostName);
                if (sock != null)
                {
                    connectedHost = Discuz.Common.Utils.MergeString(hostName, connectedHost);
                }
            }
            finally
            {
                if (sock != null)
                    sock.Close();
            }
        }
    }
    return Discuz.Common.Utils.SplitString(connectedHost, ",");
} /// <summary>
/// 获取服务器端缓存的数据信息
/// </summary>
/// <returns>返回信息</returns>
public static ArrayList GetStats()
{
    ArrayList arrayList = new ArrayList();
    foreach (string server in serverList)
    {
        arrayList.Add(server);
    }
    return GetStats(arrayList, Stats.Default, null);
} /// <summary>
/// 获取服务器端缓存的数据信息
/// </summary>
/// <param name="serverArrayList">要访问的服务列表</param>
/// <returns>返回信息</returns>
public static ArrayList GetStats(ArrayList serverArrayList, Stats statsCommand, string param)
{
    ArrayList statsArray = new ArrayList();
    param =  Utils.StrIsNullOrEmpty(param) ? "" : param.Trim().ToLower();     string commandstr = "stats";
    //转换stats命令参数
    switch (statsCommand)
    {
        case Stats.Reset: { commandstr = "stats reset"; break; }
        case Stats.Malloc: { commandstr = "stats malloc"; break; }
        case Stats.Maps: { commandstr = "stats maps"; break; }
        case Stats.Sizes: { commandstr = "stats sizes"; break; }
        case Stats.Slabs: { commandstr = "stats slabs"; break; }
        case Stats.Items: { commandstr = "stats"; break; }
        case Stats.CachedDump:
        {
            string[] statsparams = Utils.SplitString(param, " ");
            if(statsparams.Length == 2)
                if(Utils.IsNumericArray(statsparams))
                    commandstr = "stats cachedump  " + param;             break;                     
        }
        case Stats.Detail:
            {
                if(string.Equals(param, "on") || string.Equals(param, "off") || string.Equals(param, "dump"))
                    commandstr = "stats detail " + param.Trim();                 break;
            }
        default: { commandstr = "stats"; break; }
    }
    //加载返回值
    Hashtable stats = MemCachedManager.CacheClient.Stats(serverArrayList, commandstr);
    foreach (string key in stats.Keys)
    {
        statsArray.Add(key);
        Hashtable values = (Hashtable)stats[key];
        foreach (string key2 in values.Keys)
        {
            statsArray.Add(key2 + ":" + values[key2]);
        }
    }
    return statsArray;
} /// <summary>
/// Stats命令行参数
/// </summary>
public enum Stats
{
    /// <summary>
    /// stats : 显示服务器信息, 统计数据等
    /// </summary>
    Default = 0,
    /// <summary>
    /// stats reset : 清空统计数据
    /// </summary>
    Reset = 1,
    /// <summary>
    /// stats malloc : 显示内存分配数据
    /// </summary>
    Malloc = 2,
    /// <summary>
    /// stats maps : 显示"/proc/self/maps"数据
    /// </summary>
    Maps =3,
    /// <summary>
    /// stats sizes
    /// </summary>
    Sizes = 4,
    /// <summary>
    /// stats slabs : 显示各个slab的信息,包括chunk的大小,数目,使用情况等
    /// </summary>
    Slabs = 5,
    /// <summary>
    /// stats items : 显示各个slab中item的数目和最老item的年龄(最后一次访问距离现在的秒数)
    /// </summary>
    Items = 6,
    /// <summary>
    /// stats cachedump slab_id limit_num : 显示某个slab中的前 limit_num 个 key 列表
    /// </summary>
    CachedDump =7,
    /// <summary>
    /// stats detail [on|off|dump] : 设置或者显示详细操作记录   on:打开详细操作记录  off:关闭详细操作记录 dump: 显示详细操作记录(每一个键值get,set,hit,del的次数)
    /// </summary>
    Detail = 8
}

当然在配置初始化缓存链接池时使用了配置文件方式(memcached.config)来管理相关参数,其info信息
类说明如下(Discuz.Config/MemCachedConfigInfo.cs):


/// <summary>
/// MemCached配置信息类文件
/// </summary>
public class MemCachedConfigInfo : IConfigInfo
{
    private bool _applyMemCached;
    /// <summary>
    /// 是否应用MemCached
    /// </summary>
    public bool ApplyMemCached
    {
        get
        {
            return _applyMemCached;
        }
        set
        {
            _applyMemCached = value;
        }
    }     private string _serverList;
    /// <summary>
    /// 链接地址
    /// </summary>
    public string ServerList
    {
        get
        {
            return _serverList;
        }
        set
        {
            _serverList = value;
        }
    }     private string _poolName;
    /// <summary>
    /// 链接池名称
    /// </summary>
    public string PoolName
    {
        get
        {
            return Utils.StrIsNullOrEmpty(_poolName) ? "DiscuzNT_MemCache" : _poolName;
        }
        set
        {
            _poolName = value;
        }
    }     private int _intConnections;
    /// <summary>
    /// 初始化链接数
    /// </summary>
    public int IntConnections
    {
        get
        {
            return _intConnections > 0 ? _intConnections : 3;
        }
        set
        {
            _intConnections = value;
        }
    }     private int _minConnections;
    /// <summary>
    /// 最少链接数
    /// </summary>
    public int MinConnections
    {
        get
        {
            return _minConnections > 0 ? _minConnections : 3;
        }
        set
        {
            _minConnections = value;
        }
    }     private int _maxConnections;
    /// <summary>
    /// 最大连接数
    /// </summary>
    public int MaxConnections
    {
        get
        {
            return _maxConnections > 0 ?_maxConnections : 5;
        }
        set
        {
            _maxConnections = value;
        }
    }     private int _socketConnectTimeout;
    /// <summary>
    /// Socket链接超时时间
    /// </summary>
    public int SocketConnectTimeout
    {
        get
        {
            return _socketConnectTimeout > 1000 ? _socketConnectTimeout : 1000;
        }
        set
        {
            _socketConnectTimeout = value;
        }
    }     private int _socketTimeout;
    /// <summary>
    /// socket超时时间
    /// </summary>
    public int SocketTimeout
    {
        get
        {
            return _socketTimeout > 1000 ? _maintenanceSleep : 3000;
        }
        set
        {
            _socketTimeout = value;
        }
    }     private int _maintenanceSleep;
    /// <summary>
    /// 维护线程休息时间
    /// </summary>
    public int MaintenanceSleep
    {
        get
        {
            return _maintenanceSleep > 0 ? _maintenanceSleep : 30;
        }
        set
        {
            _maintenanceSleep = value;
        }
    }     private bool _failOver;
    /// <summary>
    /// 链接失败后是否重启,详情参见http://baike.baidu.com/view/1084309.htm
    /// </summary>
    public bool FailOver
    {
        get
        {
            return _failOver;
        }
        set
        {
            _failOver = value;
        }
    }     private bool _nagle;
    /// <summary>
    /// 是否用nagle算法启动socket
    /// </summary>
    public bool Nagle
    {
        get
        {
            return _nagle;
        }
        set
        {
            _nagle = value;
        }
    }
}      

这些参数我们通过注释应该有一些了解,可以说memcached的主要性能都是通过这些参数来决定的,大家
应根据自己公司产品和应用的实际情况配置相应的数值。

当然,做完这一步之后就是对调用“缓存策略”的主体类进行修改来,使其根据对管理后台的设计来决定
加载什么样的缓存策略,如下:


/// <summary>
/// Discuz!NT缓存类
/// 对Discuz!NT论坛缓存进行全局控制管理
/// </summary>
public class DNTCache
{
    .
    
    //通过该变量决定是否启用MemCached
    private static bool applyMemCached = MemCachedConfigs.GetConfig().ApplyMemCached;          /// <summary>
    /// 构造函数
    /// </summary>
    private DNTCache()
    {
        if (applyMemCached)
            cs = new MemCachedStrategy();
        else
        {
            cs = new DefaultCacheStrategy();             objectXmlMap = rootXml.CreateElement("Cache");
            //建立内部XML文档.
            rootXml.AppendChild(objectXmlMap);             //LogVisitor clv = new CacheLogVisitor();
            //cs.Accept(clv);             cacheConfigTimer.AutoReset = true;
            cacheConfigTimer.Enabled = true;
            cacheConfigTimer.Elapsed += new System.Timers.ElapsedEventHandler(Timer_Elapsed);
            cacheConfigTimer.Start();
        }
    }
    
    

到这里,主要的开发和修改基本上就告一段落了。下面开始介绍一下如果使用Stats命令来查看缓存的
分配和使用等情况。之前在枚举类型Stats中看到该命令有几个主要的参数,分别是:


    stats
    stats reset
    stats malloc
    stats maps
    stats sizes
    stats slabs
    stats items
    stats cachedump slab_id limit_num
    stats detail [on|off|dump]

而JAVAEYE的 robbin 写过一篇文章:贴一段遍历memcached缓存对象的小脚本,来介绍如何使用其中的   
“stats cachedump”来获取信息。受这篇文章的启发,我将MemCachedClient.cs文件中的Stats方法加以修
改,添加了一个command参数(字符串型),这样就可以向缓存服务器发送上面所说的那几种类型的命令了。

测试代码如下:


protected void Submit_Click(object sender, EventArgs e)
{
    ArrayList arrayList = new ArrayList();
    arrayList.Add("10.0.1.52:11211");//缓存服务器的地址     StateResult.DataSource = MemCachedManager.GetStats(arrayList, (MemCachedManager.Stats)         
                                     Utils.StrToInt(StatsParam.SelectedValue, 0), Param.Text);
    StateResult.DataBind();            

页面代码如下:
    
    Discuz!NT中集成Memcached分布式缓存

Discuz!NT中集成Memcached分布式缓存

Discuz!NT中集成Memcached分布式缓存

我这样做的目的有两个,一个是避免每次都使用telnet协议远程登陆缓存服务器并输入相应的命令行
参数(我记忆力不好,参数多了之后就爱忘)。二是将来会把这个页面功能内置到管理后台上,以便后台
管理员可以动态监测每台缓存服务器上的数据。

好了,到这里今天的内容就差不多了。在本文中我们看到了使用设计模式的好处,通过它我们可以让
自己写的代码支持“变化”。这里不妨再多说几句,大家看到了velocity在使用上也是很方便,如果可以
的话,未来可以也会将velocity做成一个“缓存策略”,这样站长或管理员就可以根据自己公司的实际情
况来加以灵活配置了。

上一篇:【比赛】NOIP2018 铺设道路


下一篇:hdu2444The Accomodation of Students (最大匹配+推断是否为二分图)