HyperLedger Fabric基于zookeeper和kafka集群配置解析

简述

在搭建HyperLedger Fabric环境的过程中,我们会用到一个configtx.yaml文件(可参考Hyperledger Fabric 1.0 从零开始(八)——Fabric多节点集群生产部署),该配置文件主要用于构建创世区块(在构建创世区块之前需要先创建与之对应的所有节点的验证文件集合),其中在配置Orderer信息中有一个OrdererType参数,该参数可配置为"solo" and "kafka",之前博文所讲的环境配置皆是solo,即单节点共识。

本文主要介绍的是采用kafka(分布式队列)方式实现的共识方案。
使用kafka集群配置的原因也很简单,为orderer共识及排序服务提供足够的容错空间,当我们向peer节点提交Transaction的时候,peer节点会得到或返回(基于SDK)一个读写集结果,该结果会发送给orderer节点进行共识和排序,此时如果orderer节点突然down掉,致使请求服务失效而引发的数据丢失等问题,且目前的sdk对orderer发送的Transaction的回调会占用极长的时间,当大批量数据导入的时候该回调可认为不可用。

固此,在部署生产环境时,需要对orderer进行容错处理,而所谓的容错即搭建一个orderer节点集群,该集群会依赖于kafka和zookeeper。

crypto-config.yaml

该文件在之前介绍过,将被crytogen工具调用,文件中包括了网络拓扑,同时允许我们给organization以及component(隶属于organization的组件)生成一个证书与私钥的集合。每一个organization被分配一个唯一的根证书(绑定了隶属于organization的具体的component,包括peers与orderers)。Hyperledger Fabric的transaction与通信均被节点的私钥(keystore)进行签名,截止被公钥进行验证(signcerts)。

在crypto-comfig中有一个OrdererOrgs配置,在该配置中我们可以设置当前Fabric生产中允许的最大orderer节点数量及名称,具体在OrdererOrgs-Specs中进行设置,如下图所示:

HyperLedger Fabric基于zookeeper和kafka集群配置解析

在上述配置中,我们假定设置了三台orderer节点服务器,这个数量根据自身平台可自行设置。

随后,我们可以通过如下命令来生成我们需要的验证文件信息集:

./bin/cryptogen generate --config=./crypto-config.yaml

该信息集在ordererOrganizations目录下,通过浏览该目录下组织机构目录下的orderers目录,我们可以看到如下图所示结构:

HyperLedger Fabric基于zookeeper和kafka集群配置解析

即生成所有我们所需的orderer节点验证文件。

configtx.yaml

该文件即本文开头所述文件,用于构建创世区块所需文件。

结合crypto-comfig文件的配置内容,既然指定并创建了三个orderer节点服务配置,则根据上述配置结果,我们来定义configtx文件中的Orderer配置信息,具体参考如下:

 ################################################################################
#
# SECTION: Orderer
#
# - This section defines the values to encode into a config transaction or
# genesis block for orderer related parameters
#
################################################################################
Orderer: &OrdererExample # Orderer Type: The orderer implementation to start
# Available types are "solo" and "kafka"
OrdererType: kafka Addresses:
- orderer0.example.com:
- orderer1.example.com:
- orderer2.example.com: # Batch Timeout: The amount of time to wait before creating a batch
BatchTimeout: 2s # Batch Size: Controls the number of messages batched into a block
BatchSize: # Max Message Count: The maximum number of messages to permit in a batch
MaxMessageCount: # Absolute Max Bytes: The absolute maximum number of bytes allowed for
# the serialized messages in a batch.
# 设置最大的区块大小。每个区块最大有Orderer.AbsoluteMaxBytes个字节(不包括头部)。
# 假定这里设置的值为A,记住这个值,这会影响怎样配置Kafka代理。
AbsoluteMaxBytes: MB # Preferred Max Bytes: The preferred maximum number of bytes allowed for
# the serialized messages in a batch. A message larger than the preferred
# max bytes will result in a batch larger than preferred max bytes.
# 设置每个区块建议的大小。Kafka对于相对小的消息提供更高的吞吐量;区块大小最好不要超过1MB。
PreferredMaxBytes: KB Kafka:
# Brokers: A list of Kafka brokers to which the orderer connects
# NOTE: Use IP:port notation
# 包含Kafka集群中至少两个代理的地址信息(IP:port),
# 这个list不需要是完全的(这些是你的种子代理),
# 这个代理表示当前Order所要连接的Kafka代理。
Brokers:
- x.x.x.x:
- x.x.x.xx: # Organizations is the list of orgs which are defined as participants on
# the orderer side of the network
Organizations:

该配置文件中的主要设置都做了注释,可参考注释来对自己的生产平台进行设定,另外OrdererType的value需设置成kafka。

在完成crypto-comfig和configtx配置后,接下来就需要配置zookeeper、kafka和orderer的启动yaml文件了。

zookeeper

zookeeper的yaml文件配置主要约定了集群内彼此的端口信息,官方的demo已经比较详细了,这里贴出配置内容如下:

 # Copyright IBM Corp. All Rights Reserved.
#
# SPDX-License-Identifier: Apache-2.0
#
# ZooKeeper的基本运转流程:
# 、选举Leader。
# 、同步数据。
# 、选举Leader过程中算法有很多,但要达到的选举标准是一致的。
# 、Leader要具有最高的执行ID,类似root权限。
# 、集群中大多数的机器得到响应并follow选出的Leader。
# version: '' services: zookeeper1:
container_name: zookeeper1
hostname: zookeeper1
image: hyperledger/fabric-zookeeper
restart: always
environment:
# ========================================================================
# Reference: https://zookeeper.apache.org/doc/r3.4.9/zookeeperAdmin.html#sc_configuration
# ========================================================================
#
# myid
# The ID must be unique within the ensemble and should have a value
# ID在集合中必须是唯一的并且应该有一个值
# between and .
# 在1和255之间。
- ZOO_MY_ID=
#
# server.x=[hostname]:nnnnn[:nnnnn]
# The list of servers that make up the ZK ensemble. The list that is used
# by the clients must match the list of ZooKeeper servers that each ZK
# server has. There are two port numbers `nnnnn`. The first is what
# followers use to connect to the leader, while the second is for leader
# election.
# 组成ZK集合的服务器列表。客户端使用的列表必须与ZooKeeper服务器列表所拥有的每一个ZK服务器相匹配。
# 有两个端口号 `nnnnn`。第一个是追随者用来连接领导者的东西,第二个是*选举。
- ZOO_SERVERS=server.=zookeeper1:: server.=zookeeper2:: server.=zookeeper3::
ports:
- "2181:2181"
- "2888:2888"
- "3888:3888"
extra_hosts:
- "zookeeper1:x.x.x.x"
- "zookeeper2:x.x.x.xx"
- "zookeeper3:x.x.x.xxx"
- "kafka1:xx.x.x.x"
- "kafka2:xx.x.x.xx"
- "kafka3:xx.x.x.xxx"
- "kafka3:xx.x.x.xxxx"

zookeeper集群将会是3个、5个或7个,它的值需要是一个奇数避免split-brain情况,同时选择大于1的值为了避免单点故障。超过7个zookeeper servers会被认为overkill。

这里给出了zookeeper1的配置文件,其它2、3…等配置文件内容类似,其中的ID禁止相同。

kafka

kafka的配置信息与zookeeper和orderer都息息相关,具体都已经写在注释中了,如还有疑问可留言提问,具体的kafka1的yaml如下:

 # Copyright IBM Corp. All Rights Reserved.
#
# SPDX-License-Identifier: Apache-2.0
# version: '' services: kafka1:
container_name: kafka1
hostname: kafka1
image: hyperledger/fabric-kafka
restart: always
environment:
# ========================================================================
# Reference: https://kafka.apache.org/documentation/#configuration
# ========================================================================
#
# broker.id
- KAFKA_BROKER_ID=
#
# min.insync.replicas
# Let the value of this setting be M. Data is considered committed when
# it is written to at least M replicas (which are then considered in-sync
# and belong to the in-sync replica set, or ISR). In any other case, the
# write operation returns an error. Then:
# . If up to M-N replicas -- out of the N (see default.replication.factor
# below) that the channel data is written to -- become unavailable,
# operations proceed normally.
# . If more replicas become unavailable, Kafka cannot maintain an ISR set
# of M, so it stops accepting writes. Reads work without issues. The
# channel becomes writeable again when M replicas get in-sync.
#
# min.insync.replicas = M---设置一个M值(例如1<M<N,查看下面的default.replication.factor)
# 数据提交时会写入至少M个副本(这些数据然后会被同步并且归属到in-sync 副本集合或ISR)。
# 其它情况,写入操作会返回一个错误。接下来:
# )如果channel写入的数据多达N-M个副本变的不可用,操作可以正常执行。
# )如果有更多的副本不可用,Kafka不可以维护一个有M数量的ISR集合,因此Kafka停止接收写操作。Channel只有当同步M个副本后才可以重新可以写。
- KAFKA_MIN_INSYNC_REPLICAS=
#
# default.replication.factor
# Let the value of this setting be N. A replication factor of N means that
# each channel will have its data replicated to N brokers. These are the
# candidates for the ISR set of a channel. As we noted in the
# min.insync.replicas section above, not all of these brokers have to be
# available all the time. In this sample configuration we choose a
# default.replication.factor of K- (where K is the total number of brokers in
# our Kafka cluster) so as to have the largest possible candidate set for
# a channel's ISR. We explicitly avoid setting N equal to K because
# channel creations cannot go forward if less than N brokers are up. If N
# were set equal to K, a single broker going down would mean that we would
# not be able to create new channels, i.e. the crash fault tolerance of
# the ordering service would be non-existent.
#
# 设置一个值N,N<K。
# 设置replication factor参数为N代表着每个channel都保存N个副本的数据到Kafka的代理上。
# 这些都是一个channel的ISR集合的候选。
# 如同在上边min.insync.replicas section设置部分所描述的,不是所有的代理(orderer)在任何时候都是可用的。
# N的值必须小于K,如果少于N个代理的话,channel的创建是不能成功的。
# 因此,如果设置N的值为K,一个代理失效后,那么区块链网络将不能再创建新的channel---orderering service的crash容错也就不存在了。
- KAFKA_DEFAULT_REPLICATION_FACTOR=
#
# zookeper.connect
# Point to the set of Zookeeper nodes comprising a ZK ensemble.
# 指向Zookeeper节点的集合,其中包含ZK的集合。
- KAFKA_ZOOKEEPER_CONNECT=zookeeper1:,zookeeper2:,zookeeper3:
#
# zookeeper.connection.timeout.ms
# The max time that the client waits to establish a connection to
# Zookeeper. If not set, the value in zookeeper.session.timeout.ms (below)
# is used.
#- KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS =
#
# zookeeper.session.timeout.ms
#- KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS =
#
# socket.request.max.bytes
# The maximum number of bytes in a socket request. ATTN: If you set this
# env var, make sure to update `brokerConfig.Producer.MaxMessageBytes` in
# `newBrokerConfig()` in `fabric/orderer/kafka/config.go` accordingly.
#- KAFKA_SOCKET_REQUEST_MAX_BYTES= # * * B
#
# message.max.bytes
# The maximum size of envelope that the broker can receive.
#
# 在configtx.yaml中会设置最大的区块大小(参考configtx.yaml中AbsoluteMaxBytes参数)。
# 每个区块最大有Orderer.AbsoluteMaxBytes个字节(不包括头部),假定这里设置的值为A(目前99)。
# message.max.bytes和replica.fetch.max.bytes应该设置一个大于A。
# 为header增加一些缓冲区空间---1MB已经足够大。上述不同设置值之间满足如下关系:
# Orderer.AbsoluteMaxBytes < replica.fetch.max.bytes <= message.max.bytes
# (更完整的是,message.max.bytes应该严格小于socket.request.max.bytes的值,socket.request.max.bytes的值默认被设置为100MB。
# 如果想要区块的大小大于100MB,需要编辑fabric/orderer/kafka/config.go文件里硬编码的值brokerConfig.Producer.MaxMessageBytes,
# 修改后重新编译源码得到二进制文件,这种设置是不建议的。)
- KAFKA_MESSAGE_MAX_BYTES= # * * B
#
# replica.fetch.max.bytes
# The number of bytes of messages to attempt to fetch for each channel.
# This is not an absolute maximum, if the fetched envelope is larger than
# this value, the envelope will still be returned to ensure that progress
# can be made. The maximum message size accepted by the broker is defined
# via message.max.bytes above.
#
# 试图为每个通道获取的消息的字节数。
# 这不是绝对最大值,如果获取的信息大于这个值,则仍然会返回信息,以确保可以取得进展。
# 代理所接受的最大消息大小是通过上一条message.max.bytes定义的。
- KAFKA_REPLICA_FETCH_MAX_BYTES= # * * B
#
# unclean.leader.election.enable
# Data consistency is key in a blockchain environment. We cannot have a
# leader chosen outside of the in-sync replica set, or we run the risk of
# overwriting the offsets that the previous leader produced, and --as a
# result-- rewriting the blockchain that the orderers produce.
# 数据一致性在区块链环境中是至关重要的。
# 我们不能从in-sync 副本(ISR)集合之外选取channel leader,
# 否则我们将会面临对于之前的leader产生的offsets覆盖的风险,
# 这样的结果是,orderers产生的区块可能会重新写入区块链。
- KAFKA_UNCLEAN_LEADER_ELECTION_ENABLE=false
#
# log.retention.ms
# Until the ordering service in Fabric adds support for pruning of the
# Kafka logs, time-based retention should be disabled so as to prevent
# segments from expiring. (Size-based retention -- see
# log.retention.bytes -- is disabled by default so there is no need to set
# it explicitly.)
#
# 除非orderering service对Kafka日志的修剪增加支持,
# 否则需要关闭基于时间的日志保留方式并且避免分段到期
# (基于大小的日志保留方式log.retention.bytes在写本文章时在Kafka中已经默认关闭,因此不需要再次明确设置这个配置)。
- KAFKA_LOG_RETENTION_MS=-
ports:
- "9092:9092"
extra_hosts:
- "zookeeper1:x.x.x.x"
- "zookeeper2:x.x.x.xx"
- "zookeeper3:x.x.x.xxx"
- "kafka1:xx.x.x.x"
- "kafka2:xx.x.x.xx"
- "kafka3:xx.x.x.xxx"
- "kafka4:xx.x.x.xxxx"

kafka至少需要4台服务器来构成集群,这是为了满足crash容错的最小节点数。如果有4个代理,那么可以容错一个代理崩溃,一个代理停止服务后,channel仍然可以继续读写,新的channel可以被创建。

如配置信息中注释所示,最小写入同步的副本数量大于1,即最小为2;

如配置信息中注释所示,默认副本保存channel信息的数量,必须大于最小写入同步的副本数量,即最小值3;

若要保证容错,即kafka集群最少需要4台服务器来保证,此时允许一个代理出现问题。

orderer

orderer服务节点配置信息相对于solo有些变化,主要是新增了部分参数,如下所示:

 # Copyright IBM Corp. All Rights Reserved.
#
# SPDX-License-Identifier: Apache-2.0
# version: '' services: orderer0.example.com:
container_name: orderer0.example.com
image: hyperledger/fabric-orderer
environment:
- CORE_VM_DOCKER_HOSTCONFIG_NETWORKMODE=example_default
- ORDERER_GENERAL_LOGLEVEL=error
# - ORDERER_GENERAL_LOGLEVEL=debug
- ORDERER_GENERAL_LISTENADDRESS=0.0.0.0
- ORDERER_GENERAL_LISTENPORT=
#- ORDERER_GENERAL_GENESISPROFILE=ExampleOrdererGenesis
- ORDERER_GENERAL_GENESISMETHOD=file
- ORDERER_GENERAL_GENESISFILE=/var/hyperledger/orderer/orderer.genesis.block
- ORDERER_GENERAL_LOCALMSPID=ExampleMSP
- ORDERER_GENERAL_LOCALMSPDIR=/var/hyperledger/orderer/msp
#- ORDERER_GENERAL_LEDGERTYPE=ram
#- ORDERER_GENERAL_LEDGERTYPE=file
# enabled TLS
- ORDERER_GENERAL_TLS_ENABLED=false
- ORDERER_GENERAL_TLS_PRIVATEKEY=/var/hyperledger/orderer/tls/server.key
- ORDERER_GENERAL_TLS_CERTIFICATE=/var/hyperledger/orderer/tls/server.crt
- ORDERER_GENERAL_TLS_ROOTCAS=[/var/hyperledger/orderer/tls/ca.crt] - ORDERER_KAFKA_RETRY_LONGINTERVAL=10s
- ORDERER_KAFKA_RETRY_LONGTOTAL=100s
- ORDERER_KAFKA_RETRY_SHORTINTERVAL=1s
- ORDERER_KAFKA_RETRY_SHORTTOTAL=30s
- ORDERER_KAFKA_VERBOSE=true
- ORDERER_KAFKA_BROKERS=[xx.x.x.x:,xx.x.x.xx:,xx.x.x.xxx:,xx.x.x.xxxx:]
working_dir: /opt/gopath/src/github.com/hyperledger/fabric
command: orderer
volumes:
- ../config/channel-artifacts/genesis.block:/var/hyperledger/orderer/orderer.genesis.block
- ../config/crypto-config/ordererOrganizations/example.com/orderers/orderer0.example.com/msp:/var/hyperledger/orderer/msp
- ../config/crypto-config/ordererOrganizations/example.com/orderers/orderer0.example.com/tls/:/var/hyperledger/orderer/tls
networks:
default:
aliases:
- example
ports:
- :
extra_hosts:
- "kafka1:xx.x.x.x"
- "kafka2:xx.x.x.xx"
- "kafka3:xx.x.x.xxx"
- "kafka4:xx.x.x.xxxx"

启动顺序

基于zookeeper和kafka的集群方案,启动顺序主要跟随依赖,先启动zookeeper集群,随后启动kafka集群,最后启动orderer集群。

各位如果还有不明白的地方,可以参考官方demo,具体可以在fabric/bddtests目录中找到dc-orderer-kafka-base.yml和dc-orderer-kafka.yml两个配置文件。

关于15楼romeovan的回复,提至此处,以便更多朋友能够看到:

还有个问题,想提醒下各位,在hyperledger的zookeeper分布式部署到多台服务器的时候,需要在docker-compose的zookeeper配置中修改参数:
ZOO_SERVERS后面加上quorumListenOnAllIPs=true,不然很可能会抛出异常:This ZooKeeper instance is not currently serving requests
因为不设置这个参数,zookeeper只会访问第一块网卡即127.0.0.1会把此网卡的端口开启监听,不会开启实际ip的网卡的端口监听

参阅18楼Will.Hu使用TLS跑通的方案,该层主已提供git地址,如下:

https://github.com/keenkit/fabric-sample-with-kafka

上一篇:《深入理解Java7核心技术与最佳实践》读书笔记(1.1)---Project Coin介绍


下一篇:使用kafka bin目录中的zookeeper-shell.sh来查看kafka在zookeeper中的配置