FlinkMysqlSource

/**

 * 自定义Mysql Source

 */

public class CustomerMysqlSourceDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 获得自定义Source对象

        DataStreamSource<UserInfo> mysqlSource = env.addSource(new MyMysqlSource());

        mysqlSource.print();

        env.execute("CustomerMySQLSourceDemo");

    }

    /**

     自定义Mysql Source实现类

     */

    public static class MyMysqlSource extends RichSourceFunction<UserInfo> {

        private Connection connection = null;       // 定义数据库连接对象

        private PreparedStatement ps = null;        // 定义PreparedStatement对象

        /*

        使用open方法, 这个方法在实例化类的时候会执行一次, 比较适合用来做数据库连接

         */

        @Override

        public void open(Configuration parameters) throws Exception {

            super.open(parameters);

            // 加载数据库驱动

            Class.forName("com.mysql.jdbc.Driver");

            // 创建数据库连接

            String url = "jdbc:mysql://node01:3306/flinkdemo?useUnicode=true&characterEncoding=utf-8&useSSL=false";

            this.connection = DriverManager.getConnection(url, "root", "123456");

            // 准备PreparedStatement对象

            this.ps = connection.prepareStatement("SELECT id, username, password, name FROM user");

        }

        /*

        使用close方法, 这个方法在销毁实例的时候会执行一次, 比较适合用来关闭连接

         */

        @Override

        public void close() throws Exception {

            super.close();

            // 关闭资源

            if (this.ps != null) this.ps.close();

            if (this.connection != null) this.connection.close();

        }

        @Override

        public void run(SourceContext<UserInfo> ctx) throws Exception {

            ResultSet resultSet = ps.executeQuery();

            while (resultSet.next()) {

                int id = resultSet.getInt("id");

                String username = resultSet.getString("username");

                String password = resultSet.getString("password");

                String name = resultSet.getString("name");

                ctx.collect(new UserInfo(id, username, password, name));

            }

        }

        @Override

        public void cancel() {

            System.out.println("任务被取消......");

        }

    }

    /**

     数据定义类, POJO

     */

    @Data

    @AllArgsConstructor

    @NoArgsConstructor

    public static class UserInfo {

        int id;

        String username;

        String password;

        String name;

    }

}

上一篇:吴裕雄 Bootstrap 前端框架开发——Bootstrap 网格系统实例:响应式的列重置


下一篇:航班信息系统(JDBC)