import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FraudDetection {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<MyTransaction> myTransactions = env.socketTextStream("localhost", 9999).map(new MapFunction<String, MyTransaction>() {
MyTransaction myTransaction = null;
@Override
public MyTransaction map(String value) throws Exception {
String[] split = value.split(",");
myTransaction = new MyTransaction(split[0].trim(), Long.valueOf(split[1].trim()));
return myTransaction;
}
});
myTransactions.keyBy(new KeySelector<MyTransaction, String>() {
@Override
public String getKey(MyTransaction value) throws Exception {
return value.getAccounId();
}
},TypeInformation.of(String.class)).sum("amount").print();
env.execute();
// DataStream<MyTransaction> transactions = env.addSource(new TransactionSource()).name("transaction");
}
public static class MyTransaction {
private String accounId;
private long amount;
public MyTransaction() {
}
public MyTransaction(String accounId, long amount) {
this.accounId = accounId;
this.amount = amount;
}
public String getAccounId() {
return accounId;
}
public void setAccounId(String accounId) {
this.accounId = accounId;
}
public long getAmount() {
return amount;
}
public void setAmount(long amount) {
this.amount = amount;
}
@Override
public String toString() {
return "{\"accounId\":\"" + accounId + "\"," +
"\"amount\":" + amount + "}";
}
}
}
官网介绍的使用可转化为流的类型