Java Hive UDTF 将WKT格式的Geomotry转换成GeoJSON


背景知识

WKT(Well-known text)是一种文本标记语言,用于表示矢量几何对象、空间参照系统及空间参照系统之间的转换。它的二进制表示方式,亦即WKB(well-known-binary)则胜于在传输和在数据库中存储相同的信息。

GeoJSON是一种对各种地理数据结构进行编 码的格式,可以表示几何、特征或者特征集合。

WTK

点 POINT(6 10)

线 LINESTRING(44 4,11 44,33 25)

面 POLYGON((1 1,2 2,3 3,4 4,5 5,1 1),(11 11,2 13,34 43,34 42,52 52,11 11))

多点 MULTIPOINT(44 4,11 44,33 25)

多线 MULTILINESTRING((3 4,10 50,20 25),(-5 -8,-10 -8,-15 -4))

多面 MULTIPOLYGON(((1 1,5 1,5 5,1 5,1 1),(2 2,2 3,3 3,3 2,2 2)),((6 3,9 2,9 4,6 3)))

几何集合 GEOMETRYCOLLECTION(POINT(4 6),LINESTRING(4 6,7 10))

对应geojson

点 {"type":"Point","coordinates":[6,10]}

线 {"type":"LineString","coordinates":[[44, 4],[11, 44],[33, 25]]}

面 {"type":"Polygon","coordinates":[[[1, 1],[2, 2],[3, 3],[4, 4],[5, 5],[1, 1]],[[11, 11],[2, 13],[34, 43],[34, 42],[52, 52],[11, 11]]]}

多点 {"type":"MultiPoint","coordinates":[[44, 4],[11, 44],[33, 25]]}

多线 {"type":"MultiLineString","coordinates":[[[3, 4],[10, 50],[20, 25]],[[-5, -8],[-10, -8],[-15, -4]]}

多面 {"type":"MultiPolygon","coordinates":[[[3, 4],[10, 50],[20, 25]],[[-5, -8],[-10, -8],[-15, -4]]]}

几何集合 {"type":"FeatureCollection","features":[{"type":"Feature","geometry":{"type":"Point","coordinates":[4,6]},},{"type":"Feature","geometry":{"type":"LineString","coordinates":[[[4,6],[7,10]]}]}

具体需求

目前MULTIPOLYGON不能直接转成geojson,主要是业务方不能处理多面,需要对多面进行拆分成多个Polygon ,具体到hive数据就是实现udtf对这种多面类型的数据进行解析处理,拆分成多行一行代表一个Polygon。

解决方案

方法一:调研库函数直接封装进行处理:比如 vividsolutions jts包 进行处理

vividsolutions比较强大,其实现原理如下:

Geometry类作为基础数据类型,单点、单线、单面都继承Geometry去实现,同时定义了一个GeometryCollection来实现Geometry 底层用Geometry类型的数组数据结果进行存储,多点、多线、多面都继承自GeometryCollection ,基本上解析后只需要对类型进行判断,然后根据类型进行层层处理 比如:多面类型里面有多个面,对多面进行循环解析就可以了。

如下代码片段

// 如下传如一个 Geometry 基础类,这里会利用java多态进行判断 
public static JSONObject parsePolygon2Geojson(Geometry geom) throws ParseException {
   JSONObject jsonObject = new JSONObject();
   String type = geom.getGeometryType();
   jsonObject.put("type", geom.getGeometryType());
   if ("MultiPolygon".equalsIgnoreCase(type)) {
     JSONArray coordJson = new JSONArray();
     //geom 实际上底层数据结构是数组,可以从中获取单个Polygon
     for (int i = 0; i < geom.getNumGeometries(); i++) {
       // polygon
       coordJson.add(parsePolygon(geom.getGeometryN(i)));
     }
     jsonObject.put("coordinates", coordJson);
   } else if ("Polygon".equalsIgnoreCase(type)) {
     jsonObject.put("coordinates", parsePolygon(geom));
   } else if ("MultiLineString".equalsIgnoreCase(type)) {
     JSONArray coordJson = new JSONArray();
     for (int i = 0; i < geom.getNumGeometries(); i++) {
       List<List<Double>> line = new ArrayList<>();
       for (Coordinate coordinate : geom.getGeometryN(i).getCoordinates()) {
         line.add(getPoint(coordinate));
       }
       coordJson.add(line);
     }
     jsonObject.put("coordinates", coordJson);
   } else if ("Point".equalsIgnoreCase(type)) {
     jsonObject.put("coordinates", getPoint(geom.getCoordinate()));
   } else if ("MultiPoint".equalsIgnoreCase(type) || "LineString".equalsIgnoreCase(type)) {
     List<List<Double>> coordJson = new ArrayList<>();
     for (Coordinate coordinate : geom.getCoordinates()) {
       coordJson.add(getPoint(coordinate));
     }
     jsonObject.put("coordinates", coordJson);
   } else {
     jsonObject.put("coordinates", new JSONArray());
   }
   return jsonObject;
 }

方法二:暴力求解 按照点-多点-线-多线-面-多面 层次关系进行数据解析

测试案例

案例一: wkt 格式数据转为geojson 给google map使用

数据格式:

wkt: MULTIPOLYGON(((1 1,5 1,5 5,1 5,1 1),(2 2,2 3,3 3,3 2,2 2)),((6 3,9 2,9 4,6 3))) : 这里包含两个Polygon

geoJson: {"type":"MultiPolygon","coordinates":[[[[1,1],[5,1],[5,5],[1,5],[1,1]],[[2,2],[2,3],[3,3],[3,2],[2,2]]],[[[6,3],[9,2],[9,4],[6,3]]]]}

拆分成多个 POLYGON

转换后返回数组UDTF循环处理每个对象

[{"coordinates":[[[1.0,1.0],[5.0,1.0],[5.0,5.0],[1.0,5.0],[1.0,1.0]],[[2.0,2.0],[2.0,3.0],[3.0,3.0],[3.0,2.0],[2.0,2.0]]],"type":"Polygon"},{"coordinates":[[[6.0,3.0],[9.0,2.0],[9.0,4.0],[6.0,3.0]]],"type":"Polygon"}]

一共有【2】个POLYGON转换后的结果第:【1】 个POLYGON 结果 : {"coordinates":[[[1.0,1.0],[5.0,1.0],[5.0,5.0],[1.0,5.0],[1.0,1.0]],[[2.0,2.0],[2.0,3.0],[3.0,3.0],[3.0,2.0],[2.0,2.0]]],"type":"Polygon"}

一共有【2】个POLYGON转换后的结果第:【2】 个POLYGON 结果 : {"coordinates":[[[6.0,3.0],[9.0,2.0],[9.0,4.0],[6.0,3.0]]],"type":"Polygon"}

测试案例二:单独POLYGON类型数据

原始Wkt字符串: POLYGON((1 1,2 2,3 3,4 4,5 5,1 1),(11 11,2 13,34 43,34 42,52 52,11 11))

转换后返回数组UDTF循环处理每个对象:[{"coordinates":[[[1.0,1.0],[2.0,2.0],[3.0,3.0],[4.0,4.0],[5.0,5.0],[1.0,1.0]],[[11.0,11.0],[2.0,13.0],[34.0,43.0],[34.0,42.0],[52.0,52.0],[11.0,11.0]]],"type":"Polygon"}]

一共有【1】个POLYGON转换后的结果第:【1】 个POLYGON 结果 : {"coordinates":[[[1.0,1.0],[2.0,2.0],[3.0,3.0],[4.0,4.0],[5.0,5.0],[1.0,1.0]],[[11.0,11.0],[2.0,13.0],[34.0,43.0],[34.0,42.0],[52.0,52.0],[11.0,11.0]]],"type":"Polygon"}

HIVE UDTF封装

hive支持三种类型的UDF函数:

  • 普通UDF函数

操作单个数据行,且产生一个数据作为输出。例如(数学函数,字符串函数)

  • 聚合udf (UDAF)

接受多个数据行,并产生一个数据行作为输出。例如(COUNT,MAX函数等)

  • 表生成UDF(UDTF)

接受一个数据行,然后返回产生多个数据行(一个表作为输出)

这里的需求是MULTIPOLYGON拆成POLYGON所以需要实现UDTF函数

UDTF自定义函数的实现:

UDTF函数的实现必须通过继承抽象类GenericUDTF,并且要实现initialize, process,close 函数

  • Hive 调用 initialize 方法来确定传入参数的类型并确定 UDTF 生成表的每个字段的数据类型(即输入类型和输出类型。initialize 方法必须返回一个生成表的字段的相应的 StructObjectInspector。
@Override
public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
    //1.判断参数个数   参数都是用结构体对象检查器来封装的,注意返回值类型也是
    //如果参数的个数不是1个则抛出异常
    if (argOIs.getAllStructFieldRefs().size() != 1) {
        throw new UDFArgumentLengthException("输出参数个数不对---");
    }
    //2.判断参数类型
    //如果参数的类型不是String则抛出异常
    String typeName = argOIs.getAllStructFieldRefs().get(0).getFieldObjectInspector().getTypeName();
    //注意hive中的字符串是  string
    if (!"string".equals(typeName)) {
        throw new UDFArgumentTypeException(0, "函数参数类型不对");
    }
    //3.返回值
    //StructObjectInspector  是<key,value>形式的  key为列名,value 为列的类型,所以要用list封装
    List<String> fieldName = new ArrayList<>();
    List<ObjectInspector> fieldOIs = new ArrayList<>();
    fieldName.add("item"); //设置默认列名
    fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);  //设置列的类型为string
    return ObjectInspectorFactory.getStandardStructObjectInspector(fieldName, fieldOIs); //返回值类型封装
}
  • 初始化完成后,会调用process方法,真正的处理过程在process函数中,在process中,每一次forward()调用产生一行;如果产生多列可以将多个列的值放在一个数组中,然后将该数组传入到forward()函数。
@Override
public void process(Object[] objects) throws HiveException {
    String wktStr = objects[0].toString();
    try {
        JSONArray jsonArray = WKTUtil.parsePolygonArrayGeojsonByStr(wktStr);
        if (jsonArray != null && jsonArray.size() > 0) {
            for (Object polygy :jsonArray) {
               forward(polygy);
            }
        }
    } catch (ParseException e) {
        e.printStackTrace();
    }
}
  • 调用close()方法,对需要清理的方法进行清理。



参考文档

https://zhuanlan.zhihu.com/p/357614752

https://www.jianshu.com/p/ac352ceab9cd

https://www.jianshu.com/p/7ba944bb8775

上一篇:《LabVIEW 虚拟仪器程序设计从入门到精通(第二版)》一第一篇 LabVIEW技术基础


下一篇:微信公众平台身份验证出问题 回应称开发哥正在修复