Structured Streaming 的文章参考这里: Spark 2.0 Structured Streaming 分析。2.0的时候只是把架子搭建起来了,当时也只支持FileSource(监控目录增量文件),到2.0.2后支持Kafka了,也就进入实用阶段了,目前只支持0.10的Kafka。Structured Streaming 采用dataframe API,并且对流式计算重新进行了抽象,个人认为Spark streaming 更灵活,Structured Streaming 在某些场景则更方便,但是在StreamingPro中他们之间则没太大区别,唯一能够体现出来的是,Structured Streaming 使得checkpoint真的进入实用阶段。
假设我们都放在/tmp目录下
新建一个文件,/tmp/ss-test.json,内容如下:
{
"scalamaptojson": {
"desc": "测试",
"strategy": "spark",
"algorithm": [],
"ref": [
],
"compositor": [
{
"name": "ss.source.mock",
"params": [{"duration1":["1","2","3"]}]
},
{
"name": "ss.table",
"params": [{"tableName": "test"}]
},
{
"name": "ss.sql",
"params": [
{
"sql": "select value + 100 from test",
"outputTableName": "test2"
}
]
},
{
"name": "ss.output",
"params": [
{
"mode": "append",
"format": "console"
}
]
}
],
"configParams": {
}
}
}
StreamingPro 现在支持短名称了,不用写那么冗长的package名。
- ss 开头指的是structrued streaming。
- batch 则是spark 批处理
- stream 则是 spark streaming
逻辑:
- 配置模拟数据
- 映射为表
- 使用SQL查询
- 输出(console)
如果是接的kafka,则配置如下即可:
{
"name": "ss.source",
"params": [{
"format":"kafka"
"kaka.bootstrap.servers":"host1:port1,host2:port2",
"subscribe":"topic1,topic2"
}]
}
运行
./bin/spark-submit --class streaming.core.StreamingApp \
--master local[2] \
--name test \
/tmp/streamingpro-0.4.7-SNAPSHOT-online-2.0.2.jar \
-streaming.name test \
-streaming.platform ss \
-streaming.checkpoint file:///tmp/ss \
-streaming.job.file.path file:///tmp/ss-test.json