https://www.cnblogs.com/1ssqq1lxr/p/10417005.html
由于公司业务需求,需要搭建一套实时处理数据平台,基于多方面调研选择了Flink.
- 初始化Swarm环境(也可以选择k8s)
部署zookeeper集群 基于docker-compose ,使用 docker stack 部署在容器中,由于zookeeper存在数据持久化存储,这块后面可以考虑共享存储方案.
services:
zoo1:
image: zookeeper
restart: always
hostname: zoo1
ports:
- :
environment:
ZOO_MY_ID:
ZOO_SERVERS: server.=0.0.0.0:: server.=zoo2:: server.=zoo3:: zoo2:
image: zookeeper
restart: always
hostname: zoo2
ports:
- :
environment:
ZOO_MY_ID:
ZOO_SERVERS: server.=zoo1:: server.=0.0.0.0:: server.=zoo3:: zoo3:
image: zookeeper
restart: always
hostname: zoo3
ports:
- :
environment:
ZOO_MY_ID:
ZOO_SERVERS: server.=zoo1:: server.=zoo2:: server.=0.0.0.0::
- 部署flink镜像
version: "" services:
jobmanager:
image: flink:1.7.-scala_2.-alpine
ports:
- "8081:8081"
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager taskmanager:
image: flink:1.7.-scala_2.-alpine
command: taskmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
此时只是一个jobmanager 存在单机问题,可以考虑将容器内部的 fluentd.conf 挂载出来,配置zookeeper HA。
- 对于扩充 TaskManager直接 docker service scala TaskManager-NAME=3即可
Flink案例demo,采用读取kafka中数据实时处理,然后将结果存储到influxDb中展示
// 实时流main
public class SportRealTimeJob { public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
KafkaConnector connector = new KafkaConnector("192.168.30.60:9092","big-data");
env
.addSource(connector.getConsumerConnector(Lists.newArrayList("test0")))
.<MessageBody>flatMap((sentence,out)->{
MessageBody body=JSON.parseObject(sentence, MessageBody.class);
out.collect(body);
})
.shuffle()
.keyBy(messageBody -> messageBody.getPhone()+messageBody.getUserId())
.timeWindow(Time.seconds())
.reduce((t0, t1) -> new MessageBody(t0.getUserId(),t0.getPhone(),t0.getValue()+t1.getValue()))
.addSink(new InfluxWriter())
.setParallelism();
env.execute("Window WordCount");
} }
// 数据处理实体类demo
@Data
@Measurement(name = "sport")
public class MessageBody { @Column(name = "userId",tag = true)
private String userId; @Column(name = "phone",tag = true)
private String phone; @Column(name = "value")
private int value; public MessageBody() {
} public MessageBody(String userId, String phone, int value) {
this.userId = userId;
this.phone = phone;
this.value = value;
}
}
// 自定义数据输出源
public class InfluxWriter extends RichSinkFunction<MessageBody> { private InfluxTemplate template; @Override
public void open(Configuration parameters) throws Exception {
InfluxBean bean= InfluxBean.builder().dbName("game")
.url("http://localhost:8086")
.username("admin")
.password("admin")
.build();
template = new SimpleInfluxTemplate(bean);
} @Override
public void close() throws Exception {
template.close();
} @Override
public void invoke(MessageBody value, Context context) throws Exception {
template.write(Point.measurement("sport")
.addField("value",value.getValue())
.tag("userId",String.valueOf(value.getUserId()))
.tag("phone",value.getPhone())
.time(context.currentProcessingTime(), TimeUnit.MILLISECONDS).build());
}
}
// influxDb操作类
public class SimpleInfluxTemplate implements InfluxTemplate { private final InfluxDB db; public SimpleInfluxTemplate(InfluxBean bean){
this.db= InfluxDBFactory.connect(bean.getUrl(), bean.getUsername(), bean.getPassword());
db.setDatabase(bean.getDbName());
db.enableBatch(BatchOptions.DEFAULTS.exceptionHandler(
(failedPoints, throwable) -> {
/* custom error handling here */ })
.consistency(InfluxDB.ConsistencyLevel.ALL)
.bufferLimit()
);
} @Override
public void write(Point point) {
db.write(point);
} @Override
public void bentchWrite(BatchPoints points) {
db.write(points);
} @Override
public <T> List<T> query(Query query, Class<T> tClass) {
QueryResult result=db.query(query);
InfluxDBResultMapper resultMapper = new InfluxDBResultMapper(); // thread-safe - can be reused
return resultMapper.toPOJO(result, tClass);
} @Override
public void close() {
db.close();
} public interface InfluxTemplate { void write(Point point); void bentchWrite(BatchPoints points); <T> List<T> query(Query query, Class<T> tClass); void close();
} @ToString
@Getter
@Setter
@Builder
public class InfluxBean { private String url; private String username; private String password; private String dbName; }