hadoop搭建,数据可视化,数据分析

一 平台搭建与运维

任务一:大数据平台搭建

子任务一:Hadoop安装配置
# 修改主机名
hostname master & bash
hostname slave1 & bash
hostname slave2 & bash
# 配置hosts文件
vim /ets/hosts
172.18.2.44	master
172.18.14.185	slave1
172.18.14.184	slave2
# 1.解压hadoop和jdk。
mkdir -p /opt/software/
mkdir -p /opt/module/
cp /root/software/package/* /opt/software/
tar -zxvf jdk-8u191-linux-x64.tar.gz -C /opt/module/
tar -zxvf hadoop-3.1.3.tar.gz -C /opt/module/
# 2. JDK 环境变量并使其生效。
vim /etc/profile
export JAVA_HOME=/opt/module/jdk1.8.0_191
export PATH=$PATH:$JAVA_HOME/bin
source /etc/profile
# 免密登录
ssh-keygen
ssh-copy-id -i /root/.ssh/id_rsa.pub master
# 分发jdk及配置文件
scp -r /opt/module/jdk1.8.0_191/ root@slave1:/opt/module/
scp /etc/profile root@slave1:/etc/profile
# 修改 hadoop-env.sh 文件,添加 jdk 环境变量
echo "export JAVA_HOME=/opt/module/jdk1.8.0_191" >> /opt/module/hadoop-3.1.3/etc/hadoop/hadoop-env.sh
# vim core-site.xml
<configuration>
<property>
 <name>fs.defaultFS</name>
 <value>hdfs://master:9000</value>
</property>
<property>
 <name>hadoop.tmp.dir</name>
 <value>/opt/module/hadoop-3.1.3/data</value>
</property>
<property>
 <name>hadoop.security.authorization</name>
 <value>true</value>
</property>
</configuration>

# vim hdfs-site.xml
<configuration>
<property>
 <name>dfs.namenode.http-address</name>
 <value>master:9870</value>
</property>
<property>
 <name>dfs.replication</name>
 <value>3</value>
</property>
<property>
 <name>dfs.permissions.enabled</name>
 <value>true</value>
</property>
<property>
 <name>dfs.permissions.superusergroup</name>
 <value>root</value>
</property>
</configuration>

# vim mapred-site.xml
<configuration>
<property>
 <name>mapreduce.framework.name</name>
 <value>yarn</value>
</property>
<property>
 <name>yarn.app.mapreduce.am.env</name>
 <value>HADOOP_MAPRED_HOME=/opt/module/hadoop-3.1.3</value>
</property>
<property>
 <name>mapreduce.map.env</name>
 <value>HADOOP_MAPRED_HOME=/opt/module/hadoop-3.1.3</value>
</property>
<property>
 <name>mapreduce.reduce.env</name>
 <value>HADOOP_MAPRED_HOME=/opt/module/hadoop-3.1.3</value>
</property>
</configuration>

# vim yarn-site.xml
<configuration>
<property>
 <name>yarn.nodemanager.aux-services</name>
 <value>mapreduce_shuffle</value>
</property>
<property>
 <name>yarn.resourcemanager.hostname</name>
 <value>master</value>
</property>
<property>
  <name>yarn.resourcemanager.bind-host</name>
  <value>master</value>
</property>
<property>
  <name>yarn.resourcemanager.webapp.address</name>
  <value>0.0.0.0:0</value>
</property>
<property>
  <name>yarn.resourcemanager.webapp.https.address</name>
  <value>0.0.0.0:0</value>
</property>
<property>
  <name>yarn.nodemanager.webapp.address</name>
  <value>0.0.0.0:0</value>
</property>
<property>
  <name>yarn.nodemanager.webapp.https.address</name>
  <value>0.0.0.0:0</value>
</property>
<property>
  <name>yarn.resourcemanager.scheduler.address</name>
  <value>0.0.0.0:0</value>
</property>
<property>
 <name>yarn.nodemanager.env-whitelist</name>
 <value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value>
</property>
</configuration>

# vim workers
master
slave1
slave2
# 分发hadoop
scp -r /opt/module/hadoop-3.1.3/ root@slave1:/opt/module/
# 配置hadoop环境变量
export HADOOP_HOME=/opt/module/hadoop-3.1.3
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export HADOOP_CLASSPATH=$(hadoop classpath)
export HDFS_NAMENODE_USER=root
export HDFS_DATANODE_USER=root
export HDFS_SECONDARYNAMENODE_USER=root
export YARN_RESOURCEMANAGER_USER=root
export YARN_NODEMANAGER_USER=root
export HDFS_JOURNALNODE_USER=root
export HDFS_ZKFC_USER=root
# 分发环境变量
scp /etc/profile root@slave1:/etc/
# 格式化文件系统
hdfs namenode -format
# 启动集群
start-all.sh
子任务二:flume安装配置
# 解压
tar -xzvf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/
vim /etc/profile
export FLUME_HOME=/opt/module/apache-flume-1.9.0-bin
export PATH=$PATH:$FLUME_HOME/bin
flume-ng version
# 修改配置文件
mv /opt/module/apache-flume-1.9.0-bin/conf/flume-env.sh.template flume-env.sh
echo "export JAVA_HOME=/opt/module/jdk1.8.0_191" >> /opt/module/apache-flume-1.9.0-bin/conf/flume-env.sh
# vim flume.conf
# 定义Flume代理名称
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1
# 定义输入源
agent1.sources.source1.type = exec
agent1.sources.source1.command = cat /opt/module/hadoop-3.1.3/logs/hadoop-root-namenode-master.log
# 定义输出目标
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path = hdfs://master:9000/tmp/flume/
agent1.sinks.sink1.hdfs.filePrefix = log-
agent1.sinks.sink1.hdfs.fileSuffix = .txt
agent1.sinks.sink1.hdfs.rollInterval = 120
agent1.sinks.sink1.hdfs.rollSize = 1024
agent1.sinks.sink1.hdfs.rollCount = 0
# 定义通道
agent1.channels.channel1.type = memory
agent1.channels.channel1.capacity = 1000
agent1.channels.channel1.transactionCapacity = 100
# 将输入源与输出目标连接起来
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
# 解决flume和hadoop包冲突
mv /opt/module/apache-flume-1.9.0-bin/lib/guava-11.0.2.jar /opt/module/apache-flume-1.9.0-bin/lib/guava-11.0.2.jar_back
cp /opt/module/hadoop-3.1.3/share/hadoop/common/lib/guava-27.0-jre.jar /opt/module/apache-flume-1.9.0-bin/lib/
# 启动flume传输日志
bin/flume-ng agent -n agent1 -c conf/ -f conf/flume.conf -Dflume.root.logger=INFO,console
hadoop fs -cat /tmp/flume/*
子任务三:Flink on Yarn安装配置
# 解压配置环境变量
tar -xvf /opt/software/flink-1.14.0-bin-scala_2.12.tgz -C /opt/module/
vim /etc/profile
export FLINK_HOME=/opt/module/flink-1.14.0
export PATH=$PATH:$FLINK_HOME/bin
source /etc/profile
# 追加 hadoop 配置
echo "
fs.hdfs.hadoopconf: $HADOOP_HOME/etc/hadoop
classloader.check-leaked-classloader: false
log4j.logger.org.apache.flink: DEBUG, console" >> /opt/module/flink-1.14.0/conf/flink-conf.yaml
# 注意需要将yarn-site.xml文件拷贝到flink的conf目录下,添加如下内容,重启hadoop。
vim /opt/module/hadoop-3.1.3/etc/hadoop/yarn-site.xml

<!-- 以下配置关闭yarn内存检查 -->
<property>
 <name>yarn.nodemanager.pmem-check-enabled</name>
 <value>false</value>
</property>
<property>
 <name>yarn.nodemanager.vmem-pmem-ratio</name>
 <value>5</value>
</property>
<property>
<name>yarn.scheduler.minimum-allocation-mb</name>
<value>2048</value>
</property>
<property>
<name>yarn.scheduler.maximum-allocation-mb</name>
<value>4096</value>
</property>
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>4096</value>
</property>
# 拷贝yarn-site.xml到flink中
cp /opt/module/hadoop-3.1.3/etc/hadoop/yarn-site.xml /opt/module/flink-1.14.0/conf/
# 分发hadoop配置文件并重启hadoop集群
scp /opt/module/hadoop-3.1.3/etc/hadoop/yarn-site.xml root@slave1:/opt/module/hadoop-3.1.3/etc/hadoop/
stop-all.shstart-all.sh
start-all.sh
# 离开安全模式
hdfs dfsadmin -safemode leave
# 运行wordcount
flink run -m yarn-cluster -p 2 -yjm 2G -ytm 2G $FLINK_HOME/examples/batch/WordCount.jar

任务二:数据库配置维护

子任务一:数据库配置
# 初始化数据库
/usr/sbin/mysqld --initialize-insecure --console --user=mysql
# 开启MySQL服务
systemctl start mysqld
# 登录MySQL
mysql -uroot -p123456
# 查看 mysql 库下的所有表
show tables in mysql;
# 修改登录权限
select user,host from mysql.user;
update mysql.user set host='%'  where host='localhost';
flush privileges;
# 创建新用户并授权
create user 'user01'@'localhost' IDENTIFIED BY '123456';
GRANT ALL PRIVILEGES ON *.* To 'user01'@'localhost' ;
flush privileges;
子任务二:创建相关表
# 创建数据库
create database test01;
use test01;
# 创建数据表
CREATE TABLE hotel (  
    Id INT PRIMARY KEY,  
    hotel_name VARCHAR(255),  
    City VARCHAR(255),  
    Province VARCHAR(255),  
    Level VARCHAR(255),  
    room_num INT,  
    Score FLOAT,  
    shopping VARCHAR(255)  
);

CREATE TABLE comment (  
    Id INT PRIMARY KEY,  
    Name VARCHAR(255),  
    commentator VARCHAR(255),  
    Score FLOAT,  
    comment_time TIMESTAMP,  
    Content TEXT  
);
子任务三:维护数据表
# 导入本地数据
source /root/eduhq/04shoping/hotel_all_data.sql;
source /root/eduhq/04shoping/comment_all_data.sql;
# 删除id为25的hotel_all数据
delete from hotel_all where id =25;
# 在comment_all表中将id为30的评分改为5
update comment_all set score=5 where id=30;

二 数据获取与处理

任务一:数据获取与清洗

子任务一:数据获取
# vim M2-T1-S1-1.py
import  pandas as pd
df = pd.read_csv('/root/eduhq/04shoping/shopping.csv')
print(df)
子任务二:数据清洗
# vim 1.py
# 删除库存小于10和大于10000的数据
import  pandas as pd
data1 = pd.read_csv('/root/eduhq/04shoping/shopping.csv')
data1 = data1[(data1['库存'] >= 10) & (data1['库存'] <= 10000)]
data1.to_csv('/root/eduhq/04shoping/M2/T1/S2/shop1.csv', encoding='utf8', index=False)
print('删除库存小于 10 或库存大于 10000 的数据已经保存~')
# vim 2.py ************************************有问题***************************************************
import pandas as pd
data = pd.read_csv('/root/eduhq/04shoping/shopping.csv')
filtered_data = data[~data['名称'].str.contains("刷单|捡漏")]
filtered_data.to_csv("/root/eduhq/04shoping/M2/T1/S2/shop2.csv", encoding='utf8', index=False)

# vim 3.py
import pandas as pd
data = pd.read_csv('/root/eduhq/04shoping/shopping.csv')
filtered_data = data[~data['名称'].str.contains("女装")]
filtered_data.to_csv("/root/eduhq/04shoping/M2/T1/S2/shop3.csv", index=False, encoding='utf8')
# vim 4.py	手机价格为区间数据的,设置为价格区间的平均数
import pandas as pd

def mean_price(x):
    if '-' in x:
        low, high = x.split('-')
        price = (round(float(low), 2) + round(float(high), 2)) / 2

        return price
    else:
        return round(float(x), 2)
def main():

    data = pd.read_csv('/root/eduhq/04shoping/shopping.csv')
    data['价格'] = data['价格'].apply(lambda x: mean_price(x))

    data.to_csv('/root/eduhq/04shoping/M2/T1/S2/shop4.csv', encoding='utf8', index=False)
    print('平均价格数据已保存~')
if __name__ == '__main__':
    main()

任务二:数据标注

# vim 1.py  cat model_comment.csv | grep 正向 | wc -l
import pandas as pd
from snownlp import SnowNLP

# 读取数据
df = pd.read_csv('/root/eduhq/04shoping/model_comment.csv', encoding='utf-8')  # 修改为正确的文件路径和编码格式

# 定义情感倾向标注函数
def get_sentiment_label(sentiment):
    if sentiment >= 0.6:
        return '正向'
    elif sentiment > 0.4:
        return '中性'
    else:
        return '负向'

# 标注情感倾向并存入新的 DataFrame
standard_df = pd.DataFrame(columns=['编号', '手机品牌', '评论信息', '情感倾向', '编号'])
for index, row in df.iterrows():
    comment = row['评论信息']
    sentiment = SnowNLP(comment).sentiments
    label = get_sentiment_label(sentiment)
    standard_df.loc[index] = [index+1, row['手机品牌'], comment, label, index+1] 

# 存储标注结果
standard_df.to_csv('/root/eduhq/04shoping/M2/T2/S1/model_sen.csv', encoding='utf-8', index=False)  # 修改保存文件的编码格式
standard_df.to_excel('/root/eduhq/04shoping/M2/T2/S1/model_sen.xlsx', sheet_name='sheet1', index=False)

任务三:数据统计

子任务一:HDFS文件上传下载
# 上传文件
hdfs dfs -mkdir /input/
hdfs dfs -put mobile.txt /input/
hdfs dfs -cp /input/mobile.txt /user/hive/warehouse/
子任务二:处理异常数据
    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.1.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.1.3</version>
        </dependency>
    </dependencies>
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import 
上一篇:Java 虚拟机是什么?——探秘 JVM 的核心机制!


下一篇:探讨Facebook的AI研究:未来社交平台的技术前瞻