一 平台搭建与运维
任务一:大数据平台搭建
子任务一:Hadoop安装配置
hostname master & bash
hostname slave1 & bash
hostname slave2 & bash
vim /ets/hosts
172.18.2.44 master
172.18.14.185 slave1
172.18.14.184 slave2
mkdir -p /opt/software/
mkdir -p /opt/module/
cp /root/software/package/* /opt/software/
tar -zxvf jdk-8u191-linux-x64.tar.gz -C /opt/module/
tar -zxvf hadoop-3.1.3.tar.gz -C /opt/module/
vim /etc/profile
export JAVA_HOME=/opt/module/jdk1.8.0_191
export PATH=$PATH:$JAVA_HOME/bin
source /etc/profile
ssh-keygen
ssh-copy-id -i /root/.ssh/id_rsa.pub master
scp -r /opt/module/jdk1.8.0_191/ root@slave1:/opt/module/
scp /etc/profile root@slave1:/etc/profile
echo "export JAVA_HOME=/opt/module/jdk1.8.0_191" >> /opt/module/hadoop-3.1.3/etc/hadoop/hadoop-env.sh
# vim core-site.xml
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://master:9000</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/opt/module/hadoop-3.1.3/data</value>
</property>
<property>
<name>hadoop.security.authorization</name>
<value>true</value>
</property>
</configuration>
# vim hdfs-site.xml
<configuration>
<property>
<name>dfs.namenode.http-address</name>
<value>master:9870</value>
</property>
<property>
<name>dfs.replication</name>
<value>3</value>
</property>
<property>
<name>dfs.permissions.enabled</name>
<value>true</value>
</property>
<property>
<name>dfs.permissions.superusergroup</name>
<value>root</value>
</property>
</configuration>
# vim mapred-site.xml
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<property>
<name>yarn.app.mapreduce.am.env</name>
<value>HADOOP_MAPRED_HOME=/opt/module/hadoop-3.1.3</value>
</property>
<property>
<name>mapreduce.map.env</name>
<value>HADOOP_MAPRED_HOME=/opt/module/hadoop-3.1.3</value>
</property>
<property>
<name>mapreduce.reduce.env</name>
<value>HADOOP_MAPRED_HOME=/opt/module/hadoop-3.1.3</value>
</property>
</configuration>
# vim yarn-site.xml
<configuration>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.resourcemanager.hostname</name>
<value>master</value>
</property>
<property>
<name>yarn.resourcemanager.bind-host</name>
<value>master</value>
</property>
<property>
<name>yarn.resourcemanager.webapp.address</name>
<value>0.0.0.0:0</value>
</property>
<property>
<name>yarn.resourcemanager.webapp.https.address</name>
<value>0.0.0.0:0</value>
</property>
<property>
<name>yarn.nodemanager.webapp.address</name>
<value>0.0.0.0:0</value>
</property>
<property>
<name>yarn.nodemanager.webapp.https.address</name>
<value>0.0.0.0:0</value>
</property>
<property>
<name>yarn.resourcemanager.scheduler.address</name>
<value>0.0.0.0:0</value>
</property>
<property>
<name>yarn.nodemanager.env-whitelist</name>
<value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value>
</property>
</configuration>
# vim workers
master
slave1
slave2
scp -r /opt/module/hadoop-3.1.3/ root@slave1:/opt/module/
export HADOOP_HOME=/opt/module/hadoop-3.1.3
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export HADOOP_CLASSPATH=$(hadoop classpath)
export HDFS_NAMENODE_USER=root
export HDFS_DATANODE_USER=root
export HDFS_SECONDARYNAMENODE_USER=root
export YARN_RESOURCEMANAGER_USER=root
export YARN_NODEMANAGER_USER=root
export HDFS_JOURNALNODE_USER=root
export HDFS_ZKFC_USER=root
scp /etc/profile root@slave1:/etc/
hdfs namenode -format
start-all.sh
子任务二:flume安装配置
tar -xzvf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/
vim /etc/profile
export FLUME_HOME=/opt/module/apache-flume-1.9.0-bin
export PATH=$PATH:$FLUME_HOME/bin
flume-ng version
mv /opt/module/apache-flume-1.9.0-bin/conf/flume-env.sh.template flume-env.sh
echo "export JAVA_HOME=/opt/module/jdk1.8.0_191" >> /opt/module/apache-flume-1.9.0-bin/conf/flume-env.sh
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1
agent1.sources.source1.type = exec
agent1.sources.source1.command = cat /opt/module/hadoop-3.1.3/logs/hadoop-root-namenode-master.log
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path = hdfs://master:9000/tmp/flume/
agent1.sinks.sink1.hdfs.filePrefix = log-
agent1.sinks.sink1.hdfs.fileSuffix = .txt
agent1.sinks.sink1.hdfs.rollInterval = 120
agent1.sinks.sink1.hdfs.rollSize = 1024
agent1.sinks.sink1.hdfs.rollCount = 0
agent1.channels.channel1.type = memory
agent1.channels.channel1.capacity = 1000
agent1.channels.channel1.transactionCapacity = 100
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
mv /opt/module/apache-flume-1.9.0-bin/lib/guava-11.0.2.jar /opt/module/apache-flume-1.9.0-bin/lib/guava-11.0.2.jar_back
cp /opt/module/hadoop-3.1.3/share/hadoop/common/lib/guava-27.0-jre.jar /opt/module/apache-flume-1.9.0-bin/lib/
bin/flume-ng agent -n agent1 -c conf/ -f conf/flume.conf -Dflume.root.logger=INFO,console
hadoop fs -cat /tmp/flume/*
子任务三:Flink on Yarn安装配置
tar -xvf /opt/software/flink-1.14.0-bin-scala_2.12.tgz -C /opt/module/
vim /etc/profile
export FLINK_HOME=/opt/module/flink-1.14.0
export PATH=$PATH:$FLINK_HOME/bin
source /etc/profile
echo "
fs.hdfs.hadoopconf: $HADOOP_HOME/etc/hadoop
classloader.check-leaked-classloader: false
log4j.logger.org.apache.flink: DEBUG, console" >> /opt/module/flink-1.14.0/conf/flink-conf.yaml
# 注意需要将yarn-site.xml文件拷贝到flink的conf目录下,添加如下内容,重启hadoop。
vim /opt/module/hadoop-3.1.3/etc/hadoop/yarn-site.xml
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>
<property>
<name>yarn.nodemanager.vmem-pmem-ratio</name>
<value>5</value>
</property>
<property>
<name>yarn.scheduler.minimum-allocation-mb</name>
<value>2048</value>
</property>
<property>
<name>yarn.scheduler.maximum-allocation-mb</name>
<value>4096</value>
</property>
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>4096</value>
</property>
cp /opt/module/hadoop-3.1.3/etc/hadoop/yarn-site.xml /opt/module/flink-1.14.0/conf/
scp /opt/module/hadoop-3.1.3/etc/hadoop/yarn-site.xml root@slave1:/opt/module/hadoop-3.1.3/etc/hadoop/
stop-all.shstart-all.sh
start-all.sh
hdfs dfsadmin -safemode leave
flink run -m yarn-cluster -p 2 -yjm 2G -ytm 2G $FLINK_HOME/examples/batch/WordCount.jar
任务二:数据库配置维护
子任务一:数据库配置
/usr/sbin/mysqld
systemctl start mysqld
mysql -uroot -p123456
show tables in mysql;
select user,host from mysql.user;
update mysql.user set host='%' where host='localhost';
flush privileges;
create user 'user01'@'localhost' IDENTIFIED BY '123456';
GRANT ALL PRIVILEGES ON *.* To 'user01'@'localhost' ;
flush privileges;
子任务二:创建相关表
create database test01;
use test01;
CREATE TABLE hotel (
Id INT PRIMARY KEY,
hotel_name VARCHAR(255),
City VARCHAR(255),
Province VARCHAR(255),
Level VARCHAR(255),
room_num INT,
Score FLOAT,
shopping VARCHAR(255)
);
CREATE TABLE comment (
Id INT PRIMARY KEY,
Name VARCHAR(255),
commentator VARCHAR(255),
Score FLOAT,
comment_time TIMESTAMP,
Content TEXT
);
子任务三:维护数据表
# 导入本地数据
source /root/eduhq/04shoping/hotel_all_data.sql;
source /root/eduhq/04shoping/comment_all_data.sql;
# 删除id为25的hotel_all数据
delete from hotel_all where id =25;
# 在comment_all表中将id为30的评分改为5
update comment_all set score=5 where id=30;
二 数据获取与处理
任务一:数据获取与清洗
子任务一:数据获取
import pandas as pd
df = pd.read_csv('/root/eduhq/04shoping/shopping.csv')
print(df)
子任务二:数据清洗
import pandas as pd
data1 = pd.read_csv('/root/eduhq/04shoping/shopping.csv')
data1 = data1[(data1['库存'] >= 10) & (data1['库存'] <= 10000)]
data1.to_csv('/root/eduhq/04shoping/M2/T1/S2/shop1.csv', encoding='utf8', index=False)
print('删除库存小于 10 或库存大于 10000 的数据已经保存~')
import pandas as pd
data = pd.read_csv('/root/eduhq/04shoping/shopping.csv')
filtered_data = data[~data['名称'].str.contains("刷单|捡漏")]
filtered_data.to_csv("/root/eduhq/04shoping/M2/T1/S2/shop2.csv", encoding='utf8', index=False)
import pandas as pd
data = pd.read_csv('/root/eduhq/04shoping/shopping.csv')
filtered_data = data[~data['名称'].str.contains("女装")]
filtered_data.to_csv("/root/eduhq/04shoping/M2/T1/S2/shop3.csv", index=False, encoding='utf8')
import pandas as pd
def mean_price(x):
if '-' in x:
low, high = x.split('-')
price = (round(float(low), 2) + round(float(high), 2)) / 2
return price
else:
return round(float(x), 2)
def main():
data = pd.read_csv('/root/eduhq/04shoping/shopping.csv')
data['价格'] = data['价格'].apply(lambda x: mean_price(x))
data.to_csv('/root/eduhq/04shoping/M2/T1/S2/shop4.csv', encoding='utf8', index=False)
print('平均价格数据已保存~')
if __name__ == '__main__':
main()
任务二:数据标注
import pandas as pd
from snownlp import SnowNLP
df = pd.read_csv('/root/eduhq/04shoping/model_comment.csv', encoding='utf-8')
def get_sentiment_label(sentiment):
if sentiment >= 0.6:
return '正向'
elif sentiment > 0.4:
return '中性'
else:
return '负向'
standard_df = pd.DataFrame(columns=['编号', '手机品牌', '评论信息', '情感倾向', '编号'])
for index, row in df.iterrows():
comment = row['评论信息']
sentiment = SnowNLP(comment).sentiments
label = get_sentiment_label(sentiment)
standard_df.loc[index] = [index+1, row['手机品牌'], comment, label, index+1]
standard_df.to_csv('/root/eduhq/04shoping/M2/T2/S1/model_sen.csv', encoding='utf-8', index=False)
standard_df.to_excel('/root/eduhq/04shoping/M2/T2/S1/model_sen.xlsx', sheet_name='sheet1', index=False)
任务三:数据统计
子任务一:HDFS文件上传下载
hdfs dfs -mkdir /input/
hdfs dfs -put mobile.txt /input/
hdfs dfs -cp /input/mobile.txt /user/hive/warehouse/
子任务二:处理异常数据
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
</dependencies>
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import