GemFire 异步写和同步读

异步写介绍

概览

许多应用使用关系型数据库来持久化数据。这种方式有数据延迟和瓶颈问题。GemFire使用Write-Behind技术,以内存的速度访问数据,以异步的形式把数据更新到数据源。


Write-Behind模式中,更新缓存条目异步地写到后台数据库。这一特性,GemFire使用了独创的队列实现机制,Gateway Queue网关队列能够关联多个数据区域网关封装了一个高可靠的队列实现机制,线程根据数据批处理大小,和特定的应用回调来处理队列更新。

GemFireGateway Queue 技术本身用来对跨WAN网的分布式缓存数据进行传输和实时更新。跨WAN网的远程集群和本地关系型数据库,GemFire传输数据的方式相同,都是把小量数据,快速更新合并为一个大的数据块。利用TCP/IP 保证可靠传输,跨WAN网异步数据的速率和本地系统同步数据的速率基本相同。

 

传统关系型数据库基于磁盘的数据传输特点是大数据块/高延时/高吞吐,而GemFire Write-Behind技术是基于内存的,数据传输特点是小数据块/快速更新/低延时/高吞吐,利用不断传输小数据块,持续更新数据来达到低延时和高吞吐量。

 

GemFire Gateway Queue技术通过维护队列实例,并行地队列化缓存更新,消息周期性地与主备网关队列实例通信,来投递数据更新。

 

在故障切换方面,如果某个队列更新出现问题,则GemFire可以把更新切换到备用队列上,从而不影响应用正常使用。可以指定多个备用队列来达到队列冗余的目的。

 

GemFire  Write-behind 特性总结:

1. 异步更新数据到外部数据存储来提高应用的性能。应用直接与数据库进行交互,可能会受到数据库事务控制的影响而降低应用的性能。

2. GemFire Gateway Queue提供高可靠,保证达到零数据丢失,外部数据源故障丝毫不影响应用正常运行。

3. 比关系型数据库支持更多的并发用户,扩展“大事务更新”吞吐量,增强应用的能力。

4.    通过合并来减少数据库负载。相同键的多次更新能够合并成一次更新,只有最后的数据条目写入数据库中。

 

 

 

Gateways通过gateway hub来创建。一个Hub负责管理一个到多个Gateways。你可以配置从多个分布式系统来接受复制事件。比如,一个gateway负责队列化更新事件,把更新事件写到数据库中,同时,另一个gateway负责把更新事件传到另一个数据库中。

 

每一个gateway管理一个队列。队列被2个以上节点维护,主网关Hub或者多个备网关Hub都有自己的队列。这些队列可以配置到内存中,或者部分溢出到磁盘中,或者所有全部溢出到磁盘中。当然了,这种磁盘写,对于GemFire是没有瓶颈的,我们通过异步磁盘写模式来维护我们的磁盘文件。

 

GemFire队列溢出磁盘只是内存的一个扩展。与关系型数据库的做法是不同的,关系型数据库把数据写到物理磁盘上,而RondomAccessFile缓冲区更新是永久删除的。GemFire也支持内存数据拷贝备份到磁盘上,来保障高可用。即使在最差的应用场景中,应用选择同步磁盘锁来备份Gateway Queue 状态,GemFire架构也仍然有非常强大的优势: GemFire集群使用各个节点的本地磁盘,这样就给了整个集群聚合的吞吐能力。

 

在冗余方面,GemFire自动切换队列的故障,通过把当前队列的条目拷贝到备份节点来达到队列的冗余。GemFire会周期性地在备份节点重建冗余队列。当主队列由于异常情况不能使用,备份队列马上接管成为主队列,这一期间是没有数据丢失的。

由于一些技术人员可能不熟悉HA模式:当故障切换期间,在备用队列的一些事件,可能已经被处理了。GemFire标识了这些可能的重复事件允许应用回调。比如,如果事件是一个’create’通告,插入一条记录到数据库,如果事件被标记为重复,那么数据库监听器可能忽略’primary key violation’

 

当外部数据源长时间不可用,那么持久化是必须的操作。这样当集群宕机或关闭时,也不会出现更新事件丢失的情况,也可以消去不断增长的队列,避免内存溢出。

 

异步写实现 代码和配置

 

应用可以在任一节点配置Gateway Hub。每一个数据区域能够配置启用,路由事件到Gateway Hub。多Hub被用来并行分发事件到外部数据源。比如,新的订单能够被并行路由到生产端。

所有相关的数据区域能配置使用一个Hub在外部数据源保存事件更新,避免数据完整性冲突。


Gateway listener

GatewayEventLister 接口定义了一个方法: public boolean processEvents(List events);

List events 包含GatewayEvents

 

监听器接口实现

下面是一个GatewayEventLister的简单实现。



package com.example.listener;
import com.example.common.Data;
import com.example.db.DBPersistence;

import com. gemstone.gemfire.cache.CacheWriterException;
import com.gemstone.gemfire.cache.Declarable;
import com.gemstone.gemfire.cache.util.GatewayEvent;
import com.gemstone.gemfire.cache.util.GatewayEventListener;
import com.gemstone.gemfire.cache.Operation;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;

public class TestGatewayListener implements GatewayEventListener, Declarable {

  public boolean processEvents(List events) {
    for (Iterator i = events.iterator(); i.hasNext(); ) {
      GatewayEvent ge = (GatewayEvent) i.next();
      process(ge);
    }
    return true;
  }

  public void init(Properties p) {}

  protected void process(GatewayEvent ge) {
       String key = (String) ge.getKey();
       Data data = (Data) ge.getDeserializedValue();
       PreparedStatement pstmt=null;

       Connection conn = DBPersistence.getConnection();

       try {
           if ( ge.getOperation().equals(Operation.UPDATE))
           {
               pstmt = conn.prepareStatement("Update Data set value = ?, price = ? where id = ?");
               pstmt.setString(1, data.getValue());
               pstmt.setInt(2, data.getPrice());
               pstmt.setString(3, key);
           }
           else if ( ge.getOperation().equals(Operation.CREATE))
           {
               pstmt = conn.prepareStatement("Insert Data (id, value, price) values (?, ?, ?)");
               pstmt.setString(1, key);
               pstmt.setString(2, data.getValue());
               pstmt.setInt(3, data.getPrice());
           }
           else if ( ge.getOperation().equals(Operation.DESTROY))
           {
               pstmt = conn.prepareStatement("Delete from Data where key =>");
               pstmt.setString(1, key);
           }

           if ( pstmt != null)
           {
               pstmt.execute();
               pstmt.close();
           }
       }
       catch (SQLException se) {
           se.printStackTrace();
           throw new CacheWriterException ( se );
       }
  }
 }

以上的GatewayEventListener接受回调,和操作类型,根据这些信息来更新数据库。这个事件接受一个包含键值对的参数。

 

Data Class定义如下:


package com.example.common;

import java.io.IOException;
import java.io.DataInput;
import java.io.DataOutput;

import com.gemstone.gemfire.DataSerializable;
import com.gemstone.gemfire.DataSerializer;

public class Data implements DataSerializable {
    private int id;
    private String value;
    private int price;

    public Data()     {
    }

    public Data(int i, String s, int p)
    {
        id = i;
        value = s;
        price =p;
    }
    public void setId(int i)
    {
        id=i;
    }

    public int getId()
    {
        return id;
    }

    public void setValue(String s)
    {
        value = s;
    }

    public String getValue()
    {
        return value;
    }

    public int getPrice()
    {
        return price;
    }

    public void setPrice(int p)
    {
        price=p;
    }

    public void fromData(DataInput in) throws IOException, ClassNotFoundException
    {
        try {
            id = DataSerializer.readPrimitiveInt ( in );
            value = DataSerializer.readString(in);
            price = DataSerializer.readPrimitiveInt(in);
        } catch ( Exception e ) {
            System.out.println ( "Deserialize Error: " + e );
            e.printStackTrace();
        }
    }

    public void toData(DataOutput out) throws IOException
    {
        DataSerializer.writePrimitiveInt(id, out);
        DataSerializer.writeString(value, out);
        DataSerializer.writePrimitiveInt(price, out);
    }
}

配置网关

Cache cache = Cache.getAnyInstance();
GatewayHub hub = cache.addGatewayHub("DBWriterHub", \-1);
hub.setStartupPolicy(GatewayHub.STARTUP_POLICY_PRIMARY);
Gateway gateway = hub.addGateway("DBWriter");
gateway.addListener(new TestGatewayListener());
 
// Start the gateway hub using the start method.

hub.start();


数据区域路由事件到Gateway Hubs:

RegionAttributes ra = AttributesFactory.create();ra.setEnableGateway(true);
ra.setGatewayHubId("DBWriterHub");
 

... set other attributes and create the region ...


XML配置


<gateway-hub id="DBWriterHub" port="-1" startup-policy="primary">
   <gateway id="DBWriter">
      <gateway-listener>
         <class-name>com.example.listener.TestGatewayListener</class-name>
      </gateway-listener>
   </gateway>
</gateway-hub>

为缓存配置多Gateway Hubs

Gateway Hubs能够在单缓存中定义。这一特性关联'GatewayHub Startup Policy'和'Region to GatewayHub Distribution',跨多个VMs来传播Gateway负载。


代码配置

Cache cache = Cache.getAnyInstance();GatewayHub hub1 = cache.addGatewayHub("OrderDBWriter", \-1);
hub1.setStartupPolicy(GatewayHub.STARTUP_POLICY_PRIMARY);
Gateway gateway1 = hub1.addGateway("OrderDBWriter");
gateway1.addListener(new OrderDBGatewayListener());
hub1.start();
 
GatewayHub hub2 = cache.addGatewayHub("ProductDBWriter", \-1);
hub2.setStartupPolicy(GatewayHub.STARTUP_POLICY_SECONDARY);
Gateway gateway2 = hub2.addGateway("ProductDBWriter");
gateway2.addListener(new ProductDBGatewayListener());

hub2.start();


XML配置

 

<gateway-hub id="OrderDBWriter" port="-1" startup-policy="primary">
   <gateway id="OrderDBWriter">
      <gateway-listener>
         <class-name>OrderDBGatewayListener</class-name>
      </gateway-listener>
   </gateway>
</gateway-hub>
 
<gateway-hub id="ProductDBWriter" port="-1" startup-policy="secondary">
   <gateway id="ProductDBWriter">
     <gateway-listener>
        <class-name>ProductDBGatewayListener</class-name>
     </gateway-listener>
   </gateway>

</gateway-hub>


GatewayHub 启动策略

GatewayHubs有三种启动策略: primary, secondary none。启动策略none 是缺省的。在这个模式中,GatewayHub尝试获得一个特定的分布式锁。如果成功,那么就是 primary。如果不成功,就变成secondary。如果GatewayHub被设定成primary,尝试获得分布式锁。如果失败的话就抛出GatewayExceptionGatewayHub在这种情况下不启动。如果GatewayHub被设定成secondary,尝试获得分布式锁。如果锁已经被其他GatewayHub获得,他就释放这个锁,继续获得锁,每60秒一次。如果失败的话就抛出GatewayException。在60秒期间,GatewayHub的状态是secondary

 

代码配置



Cache cache = Cache.getAnyInstance();
GatewayHub hub = cache.addGatewayHub("OrderDBWriter", -1);
hub.setStartupPolicy(GatewayHub.STARTUP_POLICY_SECONDARY);
Gateway gateway = hub.addGateway("OrderDBWriter");
gateway.addListener(new OrderDBGatewayListener());
hub.start();


XML配置


区域到GatewayHub 分布

 

一个区域能够指定GatewayHub去分布他的事件。使用’hub-id’属性。如果’hub-id’区域属性被设定,然后任何的Region事件被分布到带有此hub-idGatewayHub上。这样允许负载在不同的VM上传播。比如,区域1能够配置成分布事件到GatewayHub1,区域2能够配置成分布事件到GatewayHub2。如果GatewayHub1VM1上是primary的,GatewayHub1在相同VM上是secondary的,同时在VM2上正好相反,那么VM1将要分布区域1事件,VM2将要分布区域2事件。

代码配置


Cache cache = Cache.getAnyInstance();
AttributesFactory factory = new AttributesFactory();
factory.setGatewayHubId("OrderDBWriter");
RegionAttributes attributes = factory.createRegionAttributes();
Region region = cache.createRegion("Orders", attributes);



XML配置



<region name="Orders">
   <region-attributes scope="local" enable-gateway="true" hub-id="OrderDBWriter"/>
</region>


同步写

当一个用户请求一个数据条目时,比如条目key1,在内存中已经不存在了,如果同步读开启了,GemFire将要从外部数据源载入需要的条目key1。我们通过在区域上定义一个data loader 来开启同步读数据功能。Loader get操作中调用,把值返回给调用线程。Cache loader行为的依赖于区域的类型。

 

在分区区域中的数据载入

由于要处理大量数据,已分区的区域支持分区载入数据。每一个cache loader只载入本地分区定义的数据。如果配置了数据冗余,只要本地分区持有主拷贝那么数据就能载入。分区载入需要Cache Loader安装在每一个分区上。

 

如果你使用JDBC连接,那么每一个分区必须连接到数据源。如图1所示,三个成员需要三个JDBC连接,此图与图2做比较,三个成员共享一个JDBC连接。

 

分区需要更多的JDBC连接




在非分区区域中的数据载入

 

在非分区区域中,Cache Loader可用于其他所有成员中。Loaders常常只在缓存的子集中定义。如果需要一个Loader,所有可用的Loader被调用,使用开销最小的Loader,直至数据被载入或者所有Loader被尝试调用过。

在接下来的图中,分布式系统的成员运行在不同的机器上。数据载入从M1开始运行。事件监听器只接受在本地缓存中的回调。    


在非分区区域中的数据载入

 

在非分区区域中,Cache Loader可用于其他所有成员中。Loaders常常只在缓存的子集中定义。如果需要一个Loader,所有可用的Loader被调用,使用开销最小的Loader,直至数据被载入或者所有Loader被尝试调用过。

在接下来的图中,分布式系统的成员运行在不同的机器上。数据载入从M1开始运行。事件监听器只接受在本地缓存中的回调。

 

 

图2 非分区区域中的Cache Loader

 

带有Local Scope区域的数据载入

 

带有Local Scope区域中,Cache Loader只在定义的成员中可用。无论值是否在本地缓存中存在Loader都被调用。

 

实现 Cache Loader

实现Cache Loader需要在cache.xml 文件中声明Loader。或者在代码中实现com.gemstone.GemFire.cache.Declarable

 

CacheLoader 接口有一个单独的回调方法,load,你的应用在任何时候都可以调用这个方法来从缓存外边查询数据。有关更详细的信息,请看相关Java 文档。但是你不能在load()方法中直接调用Region方法。这样做将导致Cache Loader 阻塞,这样会损伤分布式系统的性能。

 

 

XML代码



<region name="exampleRegion">
  <region-attributes>
    <cache-loader>
      <class-name>quickstart.SimpleCacheLoader</class-name>
    </cache-loader>
  </region-attributes>
</region>


代码实现


public class SimpleCacheLoader implements CacheLoader, Declarable {

    PreparedStatement pstmt=null;
    Connection conn = null;

   public Object load(LoaderHelper helper) {
     String key = (String) helper.getKey();
     System.out.println(" Loader called to retrieve value for " + key);

     ResultSet rs = null;
     // Create a value using the suffix number of the key (key1, key2, etc.)
     try {
             pstmt = conn.prepareStatement("Select Data where id = ?");
             pstmt.setString(1, key);
             rs = pstmt.execute();
         } catch (SQLException se) {
              se.printStackTrace();
              throw new CacheLoaderException ( se );
         }
     return rs;
  }
  public void close() { // do nothing }
  public void init(Properties props) {
    conn = DriverManager.getConnection(URL, props);
  }
}

带有数据库连接的CacheLoader 声明



<cache-loader>
   <class-name>com.company.data.DatabaseLoader</class-name>
   <parameter name="URL">
      <string>jdbc:cloudscape:rmi:MyData</string>
   </parameter>
</cache-loader>


上一篇:LINUX高性能服务器读书笔记之程序规范


下一篇:大搜索时代下的网络拓扑搜索定位