图解JanusGraph系列 - 关于JanusGraph图数据批量快速导入的方案和想法(bulk load data)

大家好,我是洋仔,JanusGraph图解系列文章,实时更新~

图数据库文章总目录:

源码分析相关可查看github码文不易,求个star~): https://github.com/YYDreamer/janusgraph

版本:JanusGraph-0.5.2

转载文章请保留以下声明:

作者:洋仔聊编程

微信公众号:匠心Java

原文地址:https://liyangyang.blog.csdn.net/

前言

JanusGraph的批量导入速度一直是用户使用的痛点, 下面会依托官网的介绍和个人理解,聊一下关于图数据批量快速导入的一些方案、方案使用场景和一些想;

写这篇文章的目的主要是为了让大家了解一下janus的导入的一些常用方案,算是一个总结吧,如有疑问或者文章错误,欢迎留言联系我

首先,说一下JanusGraph的批量导入的可配置的优化配置选项 和 基于第三方存储和索引的优化配置选项:

  • 批量导入的配置选项
  • 第三方存储后端的优化选项(Hbase为例)
  • 第三方索引后端的优化选项(ES为例)

之后分析一下数据导入的四个方案:

  • 基于JanusGraph Api的批量导入
  • 基于Gremlin Server的批量导入
  • 使用JanusGraph-utils的批量导入
  • 基于bulk loader 导入方式
  • 基于抽取序列化逻辑生成Hfile离线批量导入

最后聊一下关于批量导入的一些想法;

一:批量导入的优化配置选项

1、批量导入的配置选项

JanusGraph中有许多配置选项和工具可以将大量的图数据更有效地导入。这种导入称为批量加载,与默认的事务性加载相反,默认的事务性加载单个事务只会添加少量数据。

下述介绍了配置选项和工具,这些工具和工具使JanusGraph中的批量加载更加高效。

在继续操作之前,请仔细遵守每个选项的限制和假设,以免丢失数据或损坏数据。

配置选项

janusgraph支持批量导入,可通过相关配置项设置

下面具体看一下对应配置项的详细作用:

批量加载

1) 配置项:storage.batch-loading

启用该配置项,相当于打开了JanusGraph的批量导入开关;

影响:

启用批处理加载会在许多地方禁用JanusGraph内部一致性检查,重要的是会禁用lock锁定来保证分布式一致性;JanusGraph假定要加载到JanusGraph中的数据与图形一致,因此出于性能考虑禁用了自己的一致性检查。

换句话说,我们要在导入数据之前,保证要导入的数据和图中已有的数据不会产生冲突也就是保持一致性!

为什么要禁用一致性检查来提升性能?

在许多批量加载方案中,在加载数据之前确保去数据一致性,然后在将数据加载到数据库比在加载数据到图库时确保数据一致性,消耗的成本要便宜的多。

例如,将现有用户数据文件批量加载到JanusGraph中的用例:假设用户名属性键具有定义的唯一复合索引,即用户名在整个图中必须是唯一的。

那么按名称对数据文件进行排序并过滤出重复项或编写执行此类过滤的Hadoop作业消耗的时间成本,就会比开启一致性检查在导入图数据时janusgraph检查花费的成本要少的多。

基于上述,我们可以启用 storage.batch-loading配置,从而大大减少了批量加载时间,因为JanusGraph不必检查每个添加的用户该名称是否已存在于数据库中。

重要提示

启用storage.batch-loading要求用户确保加载的数据在内部是一致的,并且与图中已存在的任何数据一致。

特别是,启用批处理加载时,并发类型创建会导致严重的数据完整性问题。因此,我们强烈建议通过schema.default = none在图形配置中进行设置来禁用自动类型创建。

优化ID分配

1、ID块大小

1)配置项:ids.block-size

该配置项为配置在分布式id的生成过程中每次获取 id block的大小;

分布式id相关具体可看文章《图解Janusgraph系列-分布式id生成策略分析》

原理:

每个新添加的顶点或边都分配有唯一的ID。JanusGraph的ID池管理器以block的形式获取特定JanusGraph实例的ID。id块获取过程很昂贵,因为它需要保证块的全局唯一分配。

增加 ids.block-size会减少获取次数,但可能会使许多ID未被分配,从而造成浪费。对于事务性工作负载,默认块大小是合理的,但是在批量加载期间,顶点和边的添加要频繁得多,而且要快速连续。

因此,通常建议将块大小增加10倍或更多,具体取决于每台机器要添加的顶点数量。

经验法则

设置ids.block-size为您希望每小时为每个JanusGraph实例添加的顶点数。

重要提示:

必须为所有JanusGraph实例配置相同的值,ids.block-size以确保正确的ID分配。因此,在更改此值之前,请务必关闭所有JanusGraph实例

2、ID Acquisition流程

当许多JanusGraph实例频繁并行分配id块时,不可避免地会出现实例之间的分配冲突,从而减慢了分配过程。

此外,由于大容量加载而导致的增加的写负载可能会使该过程进一步减慢到JanusGraph认为失败并引发异常的程度。可以调整2个配置选项来避免这种情况;

1)配置项:ids.authority.wait-time

配置ID池管理器等待应用程序获取ID块被存储后端确认的时间(以毫秒为单位)。这段时间越短,应用程序在拥挤的存储群集上发生故障的可能性就越大。

经验法则

将其设置为负载下存储后端集群上测量的第95百分位读写时间的总和。

重要说明

所有JanusGraph实例的该值都应该相同。

2)配置项:ids.renew-timeout

配置JanusGraph的ID池管理器在尝试获取新的ID块总共等待的毫秒数。

经验法则

将此值设置为尽可能大,不必为不可恢复的故障等待太久。增加它的唯一缺点是JanusGraph将在不可用的存储后端群集上尝试更长的时间

优化读写

1、缓冲区大小

JanusGraph在数据导入时存在一个缓冲区,用来缓冲当前事务的部分请求,从而可以小批量的写入和执行,从而减少针对存储后端的请求数。在短时间内执行大量写操作时,存储后端可能会因为大量的写请求打入而变得超负荷;

配置项:storage.buffer-size

这些批次的大小由storage.buffer-size来控制。 增加storage.buffer-size可以通过增加缓冲区大小,来使得批次保存更多的请求,从而减少写请求的次数来避免上述失败。

注意:

增加缓冲区大小会增加写请求的等待时间及其失败的可能性。因此,不建议为事务性负载增加此设置,并且应该在批量加载期间仔细尝试此设置的一个合适的值。

2、读写健壮性

在批量加载期间,群集上的负载通常会增加,从而使读和写操作失败的可能性更大(尤其是如上所述,如果缓冲区大小增加了)。

1)配置项:storage.read-attempts

该配置项配置JanusGraph在放弃之前尝试对存储后端执行读取或写入操作的次数。

2)配置项:storage.attempt-wait

改配置项指定JanusGraph在重新尝试失败的后端操作之前将等待的毫秒数。较高的值可以确保重试操作不会进一步增加后端的负载。

注意:

如果在批量加载期间后端上可能会有很高的负载,通常建议增加这些配置选项。

其他

1)配置项:storage.read-attempts

2、第三方存储后端的优化选项

针对于第三方存储的优化分为两部分:

  • 第三方存储集群自身的优化
  • JanusGraph结合第三方存储的优化选项

集群自身的优化

集群自身的优化,本文主要介绍janusgraph相关优化这里就不多说这部分了,主要是提升hbase集群的读写能力;

这里主要还是关注的Hbase的写数据能力优化后的提升!这部分的优化至关重要! 下面举几个例子:

1)配置项: hbase.client.write.buffer

设置buffer的容量

HBase Client会在数据累积到设置的阈值后才提交Region Server。这样做的好处在于可以减少RPC连接次数。

计算一下服务端因此而消耗的内存:hbase.client.write.buffer * hbase.regionserver.handler.count从在减少PRC次数和增加服务器端内存之间找到平衡点。

2)配置项: hbase.regionserver.handler.count

定义每个Region Server上的RPC Handler的数量

Region Server通过RPC Handler接收外部请求并加以处理。所以提升RPC Handler的数量可以一定程度上提高HBase接收请求的能力。

当然,handler数量也不是越大越好,这要取决于节点的硬件情况。

等等各种配置项

3)针对一些CF、RowKey设计之类的优化点,因为这些都是janus预设好的,所以在janusGraph中使用不到;

JanusGraph针对优化

针对于JanusGraph+第三方存储的优化,官网(配置项文档超链接) 给出了一些配置选项,可从其找出对应的配置项;

针对于hbase,我在配置项中找出了对应的一些可能有作用的配置如下:

1)配置项: storage.hbase.compression-algorithm

hbase存储数据压缩算法的配置,我们在《图解图库JanusGraph系列-一文知晓“图数据“底层存储结构》文章中提到有好几个地方都是压缩存储的,此处就是配置的压缩算法;

类型: 枚举值,支持 lzogzsnappylz4bzip2zstd五种压缩算法 和 不压缩配置:none

默认值: gz压缩;

注意:此处配置的算法需要hbase也支持才可以! 如果存储空间足够,可以考虑配置为不压缩,也会提升导入速率!

2)配置项:storage.hbase.skip-schema-check

假设JanusGraph的HBase表和列族已经存在。 如果是这样,JanusGraph将不会检查其 table/ CF 的存在,也不会在任何情况下尝试创建它们。

类型: 布尔值

默认值: false,检查

注意: 可以在数据导入时,将该配置项设置为true,去除table/ CF的检查,这个其实作用不大;因为都是在初始化图实例的时候就去检查了。。

3、第三方索引后端的优化选项

针对于第三方存索引的优化分为两部分:

  • 第三方索引集群自身的优化
  • JanusGraph结合第三方索引的优化选项

集群自身的优化

集群自身的优化,本文主要介绍janusgraph相关优化这里就不多说这部分了,主要是提升索引集群的读写能力;

这里主要还是关注的索引的写数据能力优化后的提升!这部分的优化至关重要!

例如es的线程池参数优化等

JanusGraph针对优化

针对于JanusGraph+第三方索引的优化,官网(配置项文档超链接) 给出了一些配置选项,可从其找出对应的配置项;

针对于es,我再配置项中找出了对应的一些可能有作用的配置如下:

1)配置项: index.[X].elasticsearch.retry_on_conflict

指定在发生冲突时应重试操作多少次。

类型: 整数

默认值: 0次

注意: 增大该值可以提升在批量导入中,发生冲突后解决冲突的几率

3、JVM的优化

JanusGraph基于Java语言编写,则毋庸置疑会用到JVM

对JVM的调优也主要集中到垃圾收集器和堆内存的调优

堆大小调整:

我们在导入图数据时会产生大量的临时数据,这里需要我们调整一个合适的堆空间;

推荐至少为8G

垃圾收集器调优:

如果在使用CMS发现GC过于频繁的话,我们可以考虑将垃圾收集器设置为:G1

这个收集器适用于大堆空间的垃圾收集,有效的减少垃圾收集消耗的时间;

注意:

此处的JVM调优设计JanusGraph java api项目gremlin server部分的JVM调优;

二:基于数据层面的优化

2.1 拆分图 并发执行

在某些情况下,图数据可以分解为多个断开连接的子图。这些子图可以跨多台机器独立地并行加载;不管是采用下述的那种方式加载都可以;

这里有一个前提: 底层第三方存储集群的处理能力没有达到最大; 如果底层存储集群当前的平均cpu已经是80 90%的了,就算拆分多个图也没用,底层存储的处理能力已经给限制住当前的速度了;

这个方式官网上提了一句,这个地方其实很难可以将图拆分为断开的子图,并且针对于拆分为多个子图来说,主要还是依托于底层存储集群的处理能力;

一般情况下,不用拆分图进行一个好的优化后,底层存储集群的处理能力都可以完全调用起来;

2.2 分步骤 并发执行

如果无法分解图形,则分多个步骤加载通常是有益的,也就是将vertex 和 edge 分开导入;

这种方式,需要数据同学做好充分的数据探查,不然可能会产生数据不一致的情况! 下面是步骤(其中最后两个步骤可以在多台计算机上并行执行):

  1. 前提: 确保vertex和edge数据集 删除了重复数据 并且是一致的
  2. 环境配置: 设置batch-loading=true 并且优化上述介绍的其他选项
  3. vertex全量导入: 将所有的vertex及节点对应的property添加到图中。维护一份从顶点ID(由加载的数据用户自定义)到JanusGraph的内部顶点分布式一致性ID(即vertex.getId())的映射,该ID 为64位长
  4. edge全量导入: 使用映射添加所有的边 来查找JanusGraph的顶点id 并使用该id检索顶点。

讲述过程:

假设存在3个用户,“-”号后为对应的自定义的顶点id值(注意,非导入图库中的顶点id,只是标识当前节点的业务id):

user1-1
user2-2
user3-3

上述第三步,我们将这些节点导入到图库中! 产生一个业务id 与 图库中节点的分布式唯一id的对应关系如下:

我们在导入一个点后,janus会返回一个vertex实例对象,通过这个对象就可以拿到对应的图库vertexId

业务id-图库中节点id
1-4261
2-4274
3-4351

注意:这一步骤,我们可以多线程并行导入而无需担心一致性问题,因为节点全部唯一

节点导入完成!

假设存在对应的有3条边如下,

edge1:user1 --> user2
edge2:user1 --> user3
edge3:user2 --> user3

我们通过user1对应业务id:1,而业务id:1对应节点id:4261,我们就可以转化为下述对应关系:

4261 --> 4274
4261 --> 4351
4274 --> 4351

在JanusGraph中通过节点id查询节点,是获取节点的最快方式!!

我们就可以通过id获取图库中对应的vertex对象实例,然后使用addVertex将edge导入!

注意:这一步骤,我们可以多线程并行导入而无需担心一致性问题,因为edge全部唯一

第三个步骤和第四个步骤也可以并行执行,我们在导入点的过程中,可以也同时将源节点和目标节点已经导入到图库中的edge同步入图;

三:批量导入方案

下述介绍一下4种导入方案,其中包含3中批量导入方案;

3.1 方案一:基于JanusGraph Api的数据导入

该方案可以整合上述第二部分二:基于数据层面的优化

涉及方法:

public JanusGraphVertex addVertex(Object... keyValues);
public JanusGraphEdge addEdge(String label, Vertex vertex, Object... keyValues);

在janusGraph的业务项目中,可以开发一个数据导入模块,使用提供的类似于java api等,进行数据的导入;

流程:

这种是最简单的方案,具体的细节,这里就不给出了,节点导入大体流程为下述:

  1. 获取图实例
  2. 获取图实例事务对象
  3. 插入节点
  4. 提交事务

边导入大体流程如下:

  1. 获取图实例
  2. 获取图实例事务对象
  3. 查询源节点 + 目标节点(这个地方可能是性能瓶颈)
  4. 在两个节点中插入边
  5. 提交事务

主要作用:

此方案可以用于数据量较小的情况下使用,例如每天的增量导入等;

优化点:

1、批量事务提交

此处的事务提交,我们可以通过一个常用的优化手段: 处理多个vertex 或者 edge后再提交事务!

可以减少janus与底层存储的交互,减少网络消耗和连接数,提升导入的性能!

处理的个数多少主要还是和底层存储集群相关,几百还是几千这就需要自己调试获取当前环境下的最优配置了

注意:

如果开启了上述提到的storage.batch-loading,则需要你们现在的环境下注意一致性的问题;

例如图库中原本存在一个a节点,你又插入一个a节点,便会有一致性问题;

我们可以通过插入数据前,先通过唯一索引查询节点,节点存在则更新节点,不存在则插入节点;

3.2 方案二:基于Gremlin Server的批量导入

该方案可以整合上述第二部分二:基于数据层面的优化

这里需要我们搭建一个Gremlin server服务器,通过在服务器执行gremlin-server.sh即可,暴露出一个tcp接口;

则可以将对应的gremlin 语句提交到对应的gremlin服务器执行;

具体的流程和第一个方案一致

优化点:

同上一个方案优化点1;

3、gremlin server池参数调整

除了上述给定的一些配置的优化项,还有两个gremlin server的优化项需要调整

  • threadPoolWorke:最大2*core个数,用于处理非阻塞读写的Gremlin服务器可用的线程数;

  • gremlinPool:用于在ScriptEngine中执行实际脚本的“Gremlin”线程的数量。此池表示Gremlin服务器中可用于处理阻塞操作的工作者;

和线程池调优一样,要找出最合适的一个值,太小不好,太大也不好;

注意:

该方案本质上和第一个方案类似,只不过是一个是通过给定的java api提交插入请求,一个直接通过gremlin语句提交插入请求到gremlin server;

3.3 方案三:IBM的janusgraph-utils

这个方案没用过,简单看了一下,这个主要也是通过多线程对数据进行导入;

自己手动组装对应的schema文件,将schema导入到数据库;

然后将组装为特定格式的csv文件中的数据,导入到图库中;

github地址: https://github.com/IBM/janusgraph-utils

优点:

1、使用难度不高,让我们不用再去手写多线程的导入了;减少工作量

2、直连hbase和es,相对于前两种减少了对应的gremlin server和janus server的网络交互

3、支持通过配置文件自动创建Janusgraph schema和index

4、可配置化的线程池大小和每次批量提交的数量

问题:

1、schema和csv文件也是要用户组装出对应格式

2、相对于前两种方式性能提升有限,主要是少了一层网络交互。多线程和批量提交,前两种都可以手动去实现;还需要引入一个新的组件

3、支持janus版本较低,可以手动升级,不难

4、相对于下面两种方案,性能还是较低

3.4 方案四:bulk loader

官方提供的批量导入方式;需要hadoop集群和spark集群的支持;

hadoop和spark集群配置,可以看官网:https://docs.janusgraph.org/advanced-topics/hadoop/

该方案对导入的数据有着严格的要求,支持多住数据格式:jsoncsvxmlkryo

数据要求: 节点、节点对应的属性、节点对应的边需要在一行中(一个json中、一个xml项中)

数据案例: 下面给一下官网的案例,在data目录下:

-- json格式
{"id":2,"label":"song","inE":{"followedBy":[{"id":0,"outV":1,"properties":{"weight":1}},{"id":323,"outV":34,"properties":{"weight":1}}]},"outE":{"followedBy":[{"id":6190,"inV":123,"properties":{"weight":1}},{"id":6191,"inV":50,"properties":{"weight":1}}],"sungBy":[{"id":7666,"inV":525}],"writtenBy":[{"id":7665,"inV":525}]},"properties":{"name":[{"id":3,"value":"IM A MAN"}],"songType":[{"id":5,"value":"cover"}],"performances":[{"id":4,"value":1}]}} -- xml格式
<node id="4"><data key="labelV">song</data><data key="name">BERTHA</data><data key="songType">original</data><data key="performances">394</data></node><node id="5"><data key="labelV">song</data><data key="name">GOING DOWN THE ROAD FEELING BAD</data><data key="songType">cover</data><data key="performances">293</data></node><node id="6"><data key="labelV">song</data><data key="name">MONA</data><data key="songType">cover</data><data key="performances">1</data></node><node id="7"><data key="labelV">song</data><data key="name">WHERE HAVE THE HEROES GONE</data><data key="songType"></data><data key="performances">0</data></node> -- csv格式
2,song,IM A MAN,cover,1 followedBy,50,1|followedBy,123,1|sungBy,525|writtenBy,525 followedBy,1,1|followedBy,34,1

我们可以观察到,这其实是不容易构造的,节点属性边全部需要整合到一块;

数据整理方案: spark的cogroup, cogroup的作用就是将多个 RDD将相同的key jion成一行,从而使用csv格式进行导入,操作实示例如下:

val rdd1 = sc.parallelize(Array(("aa",1),("bb",2),("cc",6)))
val rdd2 = sc.parallelize(Array(("aa",3),("dd",4),("aa",5)))
rdd1.cogroup(rdd2).collect() output:
(aa,(CompactBuffer(1),CompactBuffer(3, 5)))
(dd,(CompactBuffer(),CompactBuffer(4)))
(bb,(CompactBuffer(2),CompactBuffer()))
(cc,(CompactBuffer(6),CompactBuffer()))

这里大家可以参考360对这方面的处理,转化代码github地址:https://github.com/360jinrong/janusgraph-data-importer

注意:

此处的原始数据的准备需要细致,一致性保证完全依赖于原始数据的一致性保证;

3.5 方案五:基于抽取序列化逻辑的生成Hbase File离线批量导入

博主在图库初始化时采用了这种方式,前前后后花费了接近一个月的时间,经过细致的验证,现已应用到生产环境使用,下面介绍一下对应的注意点和主要流程:

方案: 依据源码抽取出对应的序列化逻辑,分布式生成Hfile,将Hfile导入到Hbase;

问题: 人力成本过高,需要看源码抽逻辑,并且需要一个细致的验证;

方案难点:

JanusGraph对于Hbase的数据底层格式,可以看我写的博客:

这两篇博客,一个分析了底层存储的格式,一个进行了相应的源码分析;

流程+验证+建议: 请看我写的另外一个博客:《图解JanusGraph系列-生成Hbase file离线批量导入方案》

这种方式,其实消耗的人力会比较大;另外,对于抽取的逻辑是否开源,这个后续我们会考虑这个问题,开源后地址会同步更新到本文章;

四:几种场景

4.1 图库中已经存在数据

如果图库总已经存在数据,对于3.4 方案四:bulk loader3.5 方案五:基于抽取序列化逻辑的生成Hbase File离线批量导入 这两种方案可能就无法使用了;

我们可以采取两种方式:

  1. 使用第一种方案和第二种方案进行导入(注意数据一致性)
  2. 整体迁移图库,将图库中现有数据和将要导入的数据整体迁移到另外一个新图库,就可以使用4、5方案进行导入

4.2 图数据初始化或者迁移

数据量小,建议使用3.1 方案一:基于JanusGraph Api的数据导入3.2 方案二:基于Gremlin Server的批量导入3.3 方案三:IBM的janusgraph-utils

数据量大,建议使用3.4 方案四:bulk loader3.5 方案五:基于抽取序列化逻辑的生成Hbase File离线批量导入

4.3 单纯只看业务数据量

选择什么方式导入,单纯基于业务数据量给一些个人建议:

  • 小数据量(亿级以下): 直接janusgraph api 或者 gremlin server导入即可,几小时就ok了; 如果想要更快可以使用另外的方式,只是会增加人力成本;
  • 中等数据量(十亿级以下):数据充分探查,开启storage.batch-loading完全可以支持,使用api,2天左右可以完成全量的数据导入
  • 大数据量(百亿级数据):推荐采用bulk load方式,配置hadoop集群,使用spark cluster导入
  • 另一个方案:如果上述还是无法满足你们的需求,可以采用依据源码抽取序列化逻辑生成Hfile,然后离线导入到Hbase的方案,不过这种是花费人力成本最大的一种方式,不过效果也几乎是最好的,尤其是数据量越大效果越明显

总结

数据的批量导入一直是JanusGraph让人难受的地方,经过本文的介绍大家应该有一个大体的认识,针对于百亿级的数据导入,上述的几种方案是可以支持的;

其他:批量导入后,每天的增量采用消息中间件接入JanusGraph api导入即可;

数据导入过程中,针对于不同的底层存储、不同的版本还是会有一些问题,具体的导入的坑大家可以加我v,邀你加群

注意!!!以上仅作为参考,有任何问题可评论或加博主v讨论

参考:

JanusGaph官网

https://www.jianshu.com/p/f372f0ef6c42

https://www.jianshu.com/p/4b59c00a15de/

上一篇:DataTable 快速导入数据库——百万条数据只需几秒


下一篇:外网访问VMware虚拟机