要读取的文件为:/user/hdfs/stdin.xml
<?xml version="1.0" encoding="UTF-8"?>
<request>
<jobinstanceid>SK9cohJD4yklcD8dJuZXDA</jobinstanceid>
<context>
<property name="userName" value="xdf"/>
<property name="queueName" value="queue1"/>
<property name="processId" value="dns"/>
<property name="jobId" value="jobID"/>
<property name="hiveServerAddress" value="IP:port "/>
<property name="databaseName" value="wx"/>
<property name="basePath" value="HDFS_BasePath1/20141216/jobinstanceid/${operator.name}"/>
</context> <operator name="convert" alias="lowerUpperCaseConvert" class="lowerUpperCaseConvert">
<parameterlist name="fields">
<parametermap fieldname="name" fieldvalue="m_uuid()" fieldtype="String"/>
</parameterlist>
</operator>
<datasets>
<dataset name="inport1">
<row>default.test1</row>
</dataset>
</datasets>
</request>
要存的文件为:/user/hdfs/stdin.xml
<?xml version="1.0" encoding="UTF-8"?> <response>
<jobinstanceid>SK9cohJD4yklcD8dJuZXDA</jobinstanceid>
<datasets>
<dataset name="outport1">
<row>default.tmp_e93eba2c_f22d_4dc1_9e86_a342a0ea0625</row>
</dataset>
</datasets>
<operatortracker>
<portcounter name="inport1" dataCount="4"/>
<portcounter name="outport1" dataCount="4"/>
</operatortracker>
</response>
读stdin.xml文件的实现如下:
public List<Map> parseStdinXml(String xmlParams) throws Exception { String userName = null;
String operatorName = null;
String dbName = null;
String inputTabName = null;
String strs = null;
String fieldName = null;
String fieldType = null;
String jobinstanceid = null;
int fieldCount = 0; List<Map> list = new ArrayList<Map>();
Map<String, String> map = new HashMap<String, String>();
Document document = DocumentHelper.parseText(xmlParams); // 将字符串转化为xml
Element node1 = document.getRootElement(); // 获得根节点
Iterator iter1 = node1.elementIterator(); // 获取根节点下的子节点
while (iter1.hasNext()) {
Element node2 = (Element) iter1.next(); // 获取jobinstanceid
if ("jobinstanceid".equals(node2.getName())) {
jobinstanceid = node2.getText();
map.put("jobinstanceid", jobinstanceid);
}
// 获取通用参数
if ("context".equals(node2.getName())) {
Iterator iter2 = node2.elementIterator();
while (iter2.hasNext()) {
Element node3 = (Element) iter2.next();
if ("property".equals(node3.getName())) {
if ("userName".equals(node3.attributeValue("name"))) {
userName = node3.attributeValue("value");
}
}
map.put("userName", userName);
}
} // 获取算子参数
if ("operator".equals(node2.getName())) {
operatorName = node2.attributeValue("name");
map.put("operatorName", operatorName);
Iterator iter2 = node2.elementIterator();
while (iter2.hasNext()) {
Element node3 = (Element) iter2.next();
if ("parameterlist".equals(node3.getName())) {
if ("fields".equals(node3.attributeValue("name"))) {
Iterator iter3 = node3.elementIterator();
while (iter3.hasNext()) {
Element node4 = (Element) iter3.next();
if ("parametermap".equals(node4.getName())) {
fieldName = node4
.attributeValue("fieldname");
fieldType = node4
.attributeValue("fieldtype");
fieldCount++;
map.put("fieldName" + fieldCount, fieldName);
map.put("fieldType" + fieldCount, fieldType);
}
}
}
}
}
map.put("fieldCount", Integer.toString(fieldCount));
}
// 获取输入数据库
if ("datasets".equals(node2.getName())) {
Iterator iter2 = node2.elementIterator();
while (iter2.hasNext()) {
Element node3 = (Element) iter2.next();
if ("inport1".equals(node3.attributeValue("name"))) {
Iterator iter3 = node3.elementIterator();
while (iter3.hasNext()) {
Element node4 = (Element) iter3.next();
strs = node4.getText();
}
}
if (!"".equals(strs.trim())) {
String[] arr = strs.split("\\.");
dbName = arr[0];
inputTabName = arr[1];
}
map.put("dbName", dbName);
map.put("inputTabName", inputTabName);
}
}
}
list.add(map);
return list;
}
存stdout.xml文件的实现如下:
public void genStdoutXml(String fileName, List<Map> listOut) { String jobinstance = null;
String dbName = null;
String outputTable = null;
String outputDataCount = null;
String inputDataCount = null; dbName = listOut.get(0).get("dbName").toString();
jobinstance = listOut.get(0).get("jobinstanceid").toString();
outputTable = listOut.get(0).get("outputTable").toString();
inputDataCount = listOut.get(0).get("inputDataCount").toString();
outputDataCount = listOut.get(0).get("outputDataCount").toString(); Document document = DocumentHelper.createDocument();
Element response = document.addElement("response");
Element jobinstanceid = response.addElement("jobinstanceid");
jobinstanceid.setText(jobinstance);
Element datasets = response.addElement("datasets");
Element dataset = datasets.addElement("dataset");
dataset.addAttribute("name", "outport1");
Element row = dataset.addElement("row");
row.setText(dbName + "." + outputTable);
Element operatortracker = response.addElement("operatortracker");
Element portcounter1 = operatortracker.addElement("portcounter");
portcounter1.addAttribute("name", "inport1");
portcounter1.addAttribute("dataCount", inputDataCount);
Element portcounter2 = operatortracker.addElement("portcounter");
portcounter2.addAttribute("name", "outport1");
portcounter2.addAttribute("dataCount", outputDataCount); try {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(fileName), conf);
OutputStream out = fs.create(new Path(fileName),
new Progressable() {
public void progress() {
}
});
OutputFormat format = OutputFormat.createPrettyPrint();
format.setEncoding("UTF-8");
XMLWriter xmlWriter = new XMLWriter(out, format);
xmlWriter.write(document);
xmlWriter.close();
} catch (IOException e) {
System.out.println(e.getMessage());
} }