各组件命令

【02】Kafka主题的增、删、查

  • 增: bin/kafka-topics.sh --create --topic flink_kafka --partitions 3 --replication-factor 2 --bootstrap-server node1:9092,node2:9092,node3:9092

  • 删: bin/kafka-topics.sh --delete --topic bigdata01 --bootstrap-server node1:9092,node2:9092,node3:9092

  • 查看Topic信息: bin/kafka-topics.sh --describe --topic bigdata01 --bootstrap-server node1:9092,node2:9092,node3:9092

  • 列举Topic: bin/kafka-topics.sh --list -bootstrap-server node1:9092,node2:9092,node3:9092


启动Kafka自带的控制台生产者进程 或者 消费者进程

  • 生产者:bin/kafka-console-producer.sh --topic flink_kafka --broker-list node1:9092,node2:9092,node3:9092
  • 注意:broker-list与bootstrap-server功能上一致,但语义上不一样,前者把它作为存储目录、后者把它作为读取目录
  • 消费者:bin/kafka-console-consumer.sh --topic flink_kafka --bootstrap-server node1:9092,node2:9092,node3:9092 --from-beginning
  • (不写--from-beginning默认从最新的offset消费)

========= MySQL ==========
【01】. 导入SQL文件数据
source /opt/insurance.sql
mysql -uroot -p --default-character-set=utf8mb4 insurance </opt/insurance/1_data_mysql/insurance.sql

【02】. 备份MySQL数据库到一个SQL文件:
mysqldump -uroot -p --databases insurance > /opt/insurance/1_data_mysql/insurance.sql

=【 Linux 】==
cat /etc/profile
dos2unix ./sqoop.sh
yum -y install dos2unix
yum -y install lrzsz
cat -v ./sqoop.sh
chmod +x ./sqoop.sh

nc -lk 9999

=【 集群启动命令 】==

  • 【01】. 启动Hadoop集群服务(HDFS和Yarn):
    /export/server/hadoop/sbin/start-all.sh

  • 【02】. 启动Hive服务(hive metastore 和hiveserver2): (jdbc:hive2://node3:10000)
    nohup hive --service metastore &
    nohup hiveserver2 start &

  • 【03】. 启动spark-thrifserver服务 (jdbc:hive2://node3:10001)
    /export/server/spark/sbin/start-thriftserver.sh
    --hiveconf hive.server2.thrift.port=10001
    --hiveconf hive.server2.thrift.bind.host=node3
    --master local[*]


文件系统:

  • hdfs://node1:8020/flink-checkpoints/checkpoint
  • file://

JDBC

套路4步:

  • 1、加载驱动类
    • Class.forName("com.mysql.jdbc.Driver")
  • 2、定义变量(3个变量)
    • Connection (水管道)
    • Statement 【PreparedStatement】(阀门)
    • ResultSet (抽水机)
  • 3、查询SQL
  • 4、获取结果
  • 5、关闭连接(关闭3个))
// 定义变量
Connection conn = null;
Statement stmt = null;
ResultSet rs = null;
//加载MySQL驱动
Class.forName("com.mysql.jdbc.Driver");
// 获得数据库连接
Connection conn = DriverManager.getConnection("jdbc:msql://127.0.0.1:3306/samsung","root","123456");
// 创建Statement\PreparedStatement对象
Statement stmt = conn.createStatement();
ps = conn.prepareStatement(sql);
// 执行查询,获取数据集
ResultSet rs = stmt.executeQuery(sql);
ResultSet rs = ps.executeQuery()
// 关闭连接
rs.cloas
上一篇:Kafka参数zookeeper和bootstrap-server的区别


下一篇:Flink java作为消费者连接虚拟机中的kafka/或本地的kafka,并解决java.net.UnknownHostException报错