一、Hadoop概述与架构 (一)Hadoop简介
什么是Hadoop
Apache Hadoop:开源分布式存储和计算框架
设计目标:处理大规模数据集的存储和分析
核心理念:移动计算而非移动数据
容错性:硬件故障是常态而非异常
可扩展性:从单机扩展到数千台机器
Hadoop发展历程
2003年:Google发布GFS和MapReduce论文
2006年:Doug Cutting创建Hadoop项目
2008年:Hadoop成为Apache顶级项目
2012年:Hadoop 2.0引入YARN架构
现在:Hadoop 3.x版本持续演进
Hadoop应用场景
大数据存储:PB级数据存储和管理
批处理计算:离线数据分析和处理
数据仓库:企业级数据仓库建设
日志分析:网站访问日志、系统日志分析
机器学习:大规模机器学习数据预处理
(二)Hadoop核心组件
HDFS(Hadoop分布式文件系统)
分布式存储:数据分布在多台机器上
高容错性:数据自动备份,故障自动恢复
高吞吐量:适合大文件的顺序读写
流式数据访问:一次写入,多次读取
商用硬件:运行在普通x86服务器上
MapReduce(分布式计算框架)
编程模型:Map阶段和Reduce阶段
自动并行化:框架自动处理并行执行
容错处理:任务失败自动重试
数据本地性:计算向数据移动
简化编程:隐藏分布式计算复杂性
YARN(资源管理器)
资源管理:集群资源统一管理和调度
多框架支持:支持MapReduce、Spark等
容器化:应用运行在容器中
高可用性:ResourceManager高可用
资源隔离:CPU、内存资源隔离
(三)Hadoop生态系统
数据存储层
HDFS:分布式文件系统
HBase:NoSQL列式数据库
Kudu:实时分析存储引擎
Alluxio:内存分布式存储系统
数据处理层
MapReduce:批处理计算框架
Spark:内存计算框架
Flink:流处理计算框架
Storm:实时流处理系统
数据管理层
Hive:数据仓库软件,SQL查询
Pig:数据流语言和执行环境
Sqoop:关系数据库数据导入导出
Flume:日志收集系统
二、HDFS分布式文件系统 (一)HDFS架构设计
主从架构
NameNode:主节点,管理文件系统元数据
DataNode:从节点,存储实际数据块
Secondary NameNode:辅助NameNode,定期合并元数据
客户端:文件系统访问接口
数据存储机制
块存储:文件分割成固定大小的块(默认128MB)
副本机制:每个块默认存储3个副本
副本放置策略:机架感知,提高可靠性和性能
数据完整性:校验和机制检测数据损坏
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 hdfs dfs -ls / hdfs dfs -mkdir /user/data hdfs dfs -put localfile.txt /user/data/ hdfs dfs -get /user/data/hdfsfile.txt ./ hdfs dfs -cat /user/data/file.txt hdfs dfs -rm /user/data/file.txt hdfs dfsadmin -report hdfs fsck /
(二)NameNode详解
元数据管理
文件系统树:目录结构和文件信息
块映射:文件块到DataNode的映射关系
内存存储:元数据全部加载到内存中
持久化:FSImage和EditLog文件
NameNode高可用(HA)
Active/Standby模式:主备NameNode
共享存储:QJM(Quorum Journal Manager)
自动故障转移:ZKFC(ZooKeeper Failover Controller)
数据同步:实时同步元数据变更
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 <configuration > <property > <name > dfs.nameservices</name > <value > mycluster</value > </property > <property > <name > dfs.ha.namenodes.mycluster</name > <value > nn1,nn2</value > </property > <property > <name > dfs.namenode.rpc-address.mycluster.nn1</name > <value > namenode1:8020</value > </property > <property > <name > dfs.namenode.shared.edits.dir</name > <value > qjournal://journal1:8485;journal2:8485;journal3:8485/mycluster</value > </property > </configuration >
(三)DataNode详解
数据存储
块存储:将文件块存储在本地文件系统
多目录:支持多个存储目录,提高I/O性能
数据校验:定期检查数据块完整性
心跳机制:定期向NameNode报告状态
数据读写流程
写入流程:客户端→NameNode→DataNode管道写入
读取流程:客户端→NameNode获取位置→直接从DataNode读取
数据本地性:优先读取本地或同机架的数据
负载均衡:自动平衡各DataNode的存储负载
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FSDataOutputStream;public class HDFSExample { public static void main (String[] args) throws Exception { Configuration conf = new Configuration (); conf.set("fs.defaultFS" , "hdfs://namenode:8020" ); FileSystem fs = FileSystem.get(conf); Path outputPath = new Path ("/user/data/output.txt" ); FSDataOutputStream out = fs.create(outputPath); out.writeUTF("Hello Hadoop HDFS!" ); out.close(); FSDataInputStream in = fs.open(outputPath); String content = in.readUTF(); System.out.println("文件内容: " + content); in.close(); fs.close(); } }
三、MapReduce计算框架 (一)MapReduce编程模型
Map阶段
输入分片:将输入数据分割成独立的块
Map函数:处理键值对,输出中间结果
分区:根据key将Map输出分配到不同Reducer
排序:对Map输出按key排序
合并:可选的Combiner减少网络传输
Reduce阶段
Shuffle:从Map任务获取中间结果
排序:对相同key的值进行分组
Reduce函数:处理分组后的数据
输出:将最终结果写入HDFS
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 public class WordCountMapper extends Mapper <LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable (1 ); private Text word = new Text (); @Override protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] words = value.toString().toLowerCase().split("\\s+" ); for (String w : words) { word.set(w); context.write(word, one); } } } public class WordCountReducer extends Reducer <Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable (); @Override protected void reduce (Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0 ; for (IntWritable value : values) { sum += value.get(); } result.set(sum); context.write(key, result); } } public class WordCountDriver { public static void main (String[] args) throws Exception { Configuration conf = new Configuration (); Job job = Job.getInstance(conf, "word count" ); job.setJarByClass(WordCountDriver.class); job.setMapperClass(WordCountMapper.class); job.setCombinerClass(WordCountReducer.class); job.setReducerClass(WordCountReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path (args[0 ])); FileOutputFormat.setOutputPath(job, new Path (args[1 ])); System.exit(job.waitForCompletion(true ) ? 0 : 1 ); } }
(二)MapReduce执行流程
作业提交
客户端提交作业到ResourceManager
ResourceManager分配ApplicationMaster
ApplicationMaster向ResourceManager申请资源
启动Map和Reduce任务
任务执行
Map任务:读取输入分片,执行Map函数
Shuffle阶段:Map输出传输到Reducer
Reduce任务:执行Reduce函数,输出结果
任务监控:跟踪任务进度和状态
(三)MapReduce优化技术
输入优化
文件格式:使用SequenceFile、Avro等高效格式
压缩:启用输入数据压缩减少I/O
分片大小:调整输入分片大小优化并行度
数据本地性:优化数据分布提高本地性
执行优化
Combiner:减少Map输出数据量
压缩:启用Map输出和最终输出压缩
内存调优:调整JVM堆大小和缓冲区
推测执行:启用推测执行处理慢任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 <configuration > <property > <name > mapreduce.map.output.compress</name > <value > true</value > </property > <property > <name > mapreduce.map.output.compress.codec</name > <value > org.apache.hadoop.io.compress.SnappyCodec</value > </property > <property > <name > mapreduce.map.memory.mb</name > <value > 2048</value > </property > <property > <name > mapreduce.reduce.memory.mb</name > <value > 4096</value > </property > <property > <name > mapreduce.map.speculative</name > <value > true</value > </property > </configuration >
四、YARN资源管理 (一)YARN架构组件
ResourceManager(RM)
全局资源管理:管理整个集群的资源
应用管理:接收作业提交,启动ApplicationMaster
调度器:根据策略分配资源给应用
高可用:支持Active/Standby模式
NodeManager(NM)
节点资源管理:管理单个节点的资源
容器管理:启动和监控容器
健康检查:监控节点健康状态
日志管理:收集和管理应用日志
ApplicationMaster(AM)
应用协调:协调应用的执行
资源申请:向ResourceManager申请资源
任务调度:在分配的容器中启动任务
容错处理:处理任务失败和重试
Container(容器)
资源封装:封装CPU、内存等资源
任务执行:应用任务的执行环境
资源隔离:提供资源隔离保证
生命周期管理:容器的创建、运行、销毁
(二)YARN调度器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 <configuration > <property > <name > yarn.resourcemanager.scheduler.class</name > <value > org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value > </property > <property > <name > yarn.scheduler.capacity.root.queues</name > <value > default,production,development</value > </property > <property > <name > yarn.scheduler.capacity.root.default.capacity</name > <value > 40</value > </property > <property > <name > yarn.scheduler.capacity.root.production.capacity</name > <value > 40</value > </property > <property > <name > yarn.scheduler.capacity.root.development.capacity</name > <value > 20</value > </property > </configuration >
(三)YARN应用开发
应用提交流程
客户端向ResourceManager提交应用
ResourceManager分配容器启动ApplicationMaster
ApplicationMaster向ResourceManager注册
ApplicationMaster申请资源启动任务
任务完成后ApplicationMaster注销
编程接口
Client API:应用提交和监控
ApplicationMaster API:资源申请和任务管理
Container API:容器生命周期管理
Timeline Service:应用历史信息存储
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 public class YarnClient { public static void main (String[] args) throws Exception { Configuration conf = new Configuration (); YarnClient yarnClient = YarnClient.createYarnClient(); yarnClient.init(conf); yarnClient.start(); YarnClientApplication app = yarnClient.createApplication(); ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext(); ApplicationId appId = appContext.getApplicationId(); appContext.setApplicationName("MyYarnApp" ); appContext.setApplicationType("MAPREDUCE" ); ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance( null , null , null , null , null , null ); appContext.setAMContainerSpec(amContainer); Resource capability = Resource.newInstance(1024 , 1 ); appContext.setResource(capability); yarnClient.submitApplication(appContext); ApplicationReport appReport = yarnClient.getApplicationReport(appId); YarnApplicationState appState = appReport.getYarnApplicationState(); while (appState != YarnApplicationState.FINISHED && appState != YarnApplicationState.KILLED && appState != YarnApplicationState.FAILED) { Thread.sleep(1000 ); appReport = yarnClient.getApplicationReport(appId); appState = appReport.getYarnApplicationState(); } yarnClient.close(); } }
五、Hadoop生态系统组件 (一)Hive数据仓库
Hive概述
SQL接口:提供类SQL查询语言HiveQL
元数据管理:存储表结构和分区信息
数据存储:数据存储在HDFS上
执行引擎:支持MapReduce、Spark、Tez
Hive架构
Hive CLI:命令行接口
HiveServer2:JDBC/ODBC服务
Metastore:元数据存储服务
Driver:查询编译和优化
执行引擎:查询执行引擎
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 CREATE DATABASE IF NOT EXISTS sales_db;USE sales_db; CREATE EXTERNAL TABLE sales_data ( order_id STRING, customer_id STRING, product_id STRING, quantity INT , price DECIMAL (10 ,2 ), order_date STRING ) PARTITIONED BY (year INT , month INT ) STORED AS TEXTFILE LOCATION '/user/data/sales/' ; ALTER TABLE sales_data ADD PARTITION (year = 2023 , month = 12 )LOCATION '/user/data/sales/2023/12/' ; SELECT customer_id, SUM (quantity * price) as total_amount FROM sales_dataWHERE year = 2023 AND month = 12 GROUP BY customer_idORDER BY total_amount DESC LIMIT 10 ; CREATE TABLE customer_summary AS SELECT customer_id, COUNT (* ) as order_count, SUM (quantity * price) as total_spent, AVG (quantity * price) as avg_order_value FROM sales_dataGROUP BY customer_id;
(二)HBase NoSQL数据库
HBase特点
列式存储:按列族存储数据
实时读写:支持随机实时读写
自动分片:Region自动分割和负载均衡
强一致性:提供强一致性保证
水平扩展:支持线性扩展
HBase数据模型
表(Table):数据存储的逻辑单元
行键(Row Key):唯一标识一行数据
列族(Column Family):列的逻辑分组
列限定符(Column Qualifier):列的具体名称
时间戳(Timestamp):数据版本标识
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.client.*;import org.apache.hadoop.hbase.util.Bytes;public class HBaseExample { public static void main (String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum" , "zk1,zk2,zk3" ); Connection connection = ConnectionFactory.createConnection(conf); Table table = connection.getTable(TableName.valueOf("user_profile" )); Put put = new Put (Bytes.toBytes("user001" )); put.addColumn(Bytes.toBytes("info" ), Bytes.toBytes("name" ), Bytes.toBytes("张三" )); put.addColumn(Bytes.toBytes("info" ), Bytes.toBytes("age" ), Bytes.toBytes("25" )); put.addColumn(Bytes.toBytes("contact" ), Bytes.toBytes("email" ), Bytes.toBytes("zhangsan@example.com" )); table.put(put); Get get = new Get (Bytes.toBytes("user001" )); Result result = table.get(get); String name = Bytes.toString(result.getValue(Bytes.toBytes("info" ), Bytes.toBytes("name" ))); String age = Bytes.toString(result.getValue(Bytes.toBytes("info" ), Bytes.toBytes("age" ))); System.out.println("姓名: " + name + ", 年龄: " + age); Scan scan = new Scan (); scan.addFamily(Bytes.toBytes("info" )); ResultScanner scanner = table.getScanner(scan); for (Result r : scanner) { String rowKey = Bytes.toString(r.getRow()); String userName = Bytes.toString(r.getValue(Bytes.toBytes("info" ), Bytes.toBytes("name" ))); System.out.println("用户ID: " + rowKey + ", 姓名: " + userName); } scanner.close(); table.close(); connection.close(); } }
(三)Spark计算引擎
Spark优势
内存计算:数据缓存在内存中,提高性能
多语言支持:支持Scala、Java、Python、R
统一平台:批处理、流处理、机器学习、图计算
易用性:丰富的高级API和算子
容错性:RDD血缘关系提供容错能力
Spark核心概念
RDD:弹性分布式数据集
DataFrame:结构化数据抽象
Dataset:类型安全的数据抽象
Spark SQL:结构化数据处理
Spark Streaming:流数据处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 from pyspark.sql import SparkSessionfrom pyspark.sql.functions import *spark = SparkSession.builder \ .appName("SparkExample" ) \ .config("spark.sql.adaptive.enabled" , "true" ) \ .getOrCreate() df = spark.read.option("header" , "true" ).csv("hdfs://namenode:8020/user/data/sales.csv" ) result = df.groupBy("customer_id" ) \ .agg( count("order_id" ).alias("order_count" ), sum ("amount" ).alias("total_amount" ), avg("amount" ).alias("avg_amount" ) ) \ .filter (col("order_count" ) > 5 ) \ .orderBy(desc("total_amount" )) result.show(20 ) result.write \ .mode("overwrite" ) \ .option("header" , "true" ) \ .csv("hdfs://namenode:8020/user/output/customer_analysis" ) spark.stop()
六、Hadoop集群部署与管理 (一)集群规划与部署
硬件规划
NameNode:高内存、SSD存储、双网卡
DataNode:大容量存储、多磁盘、高网络带宽
ResourceManager:中等配置、高可用部署
网络:万兆以太网、交换机配置
软件部署
操作系统:CentOS、Ubuntu LTS版本
Java环境:OpenJDK 8或11
Hadoop安装:二进制包部署或编译安装
配置文件:core-site.xml、hdfs-site.xml、yarn-site.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 #!/bin/bash export JAVA_HOME=/usr/lib/jvm/java-8-openjdkexport HADOOP_HOME=/opt/hadoopexport HADOOP_CONF_DIR=$HADOOP_HOME /etc/hadoopuseradd -m hadoop usermod -aG sudo hadoop su - hadoop -c "ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa" su - hadoop -c "cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys" su - hadoop -c "chmod 600 ~/.ssh/authorized_keys" cd /optwget https://downloads.apache.org/hadoop/common/hadoop-3.3.4/hadoop-3.3.4.tar.gz tar -xzf hadoop-3.3.4.tar.gz mv hadoop-3.3.4 hadoopchown -R hadoop:hadoop hadoopecho 'export HADOOP_HOME=/opt/hadoop' >> /home/hadoop/.bashrcecho 'export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop' >> /home/hadoop/.bashrcecho 'export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin' >> /home/hadoop/.bashrcsu - hadoop -c "$HADOOP_HOME /bin/hdfs namenode -format -force" su - hadoop -c "$HADOOP_HOME /sbin/start-dfs.sh" su - hadoop -c "$HADOOP_HOME /sbin/start-yarn.sh"
(二)集群监控与管理
Web界面监控
命令行监控
集群状态:hdfs dfsadmin -report
节点状态:yarn node -list
应用状态:yarn application -list
日志查看:yarn logs -applicationId app_id
第三方监控工具
Ambari:集群管理和监控平台
Cloudera Manager:企业级集群管理
Ganglia:分布式监控系统
Nagios:网络监控系统
Prometheus + Grafana:现代监控方案
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 echo "=== Hadoop集群健康检查 ===" echo "1. HDFS文件系统状态:" hdfs dfsadmin -report | grep -E "Live datanodes|Dead datanodes|DFS Used%" echo "2. HDFS文件系统完整性检查:" hdfs fsck / -files -blocks -locations | tail -10 echo "3. YARN集群状态:" yarn node -list -all | grep -E "RUNNING|UNHEALTHY|LOST" echo "4. 运行中的应用:" yarn application -list -appStates RUNNING echo "5. 集群资源使用情况:" yarn top echo "6. 关键服务进程检查:" jps | grep -E "NameNode|DataNode|ResourceManager|NodeManager"
(三)性能调优与故障排除
HDFS性能调优
块大小优化:根据文件大小调整块大小
副本数量:根据可靠性需求调整副本数
压缩配置:启用数据压缩减少存储和网络开销
缓存配置:配置HDFS缓存提高读性能
YARN性能调优
内存配置:合理配置容器内存大小
CPU配置:启用CPU资源调度
调度器优化:选择合适的调度器和配置
本地化:提高数据本地性减少网络传输
常见故障排除
NameNode故障:检查内存、磁盘空间、网络
DataNode故障:检查磁盘健康、网络连接
作业失败:检查日志、资源配置、数据格式
性能问题:分析瓶颈、优化配置、硬件升级
总结 :Hadoop作为大数据处理的基础平台,提供了可靠的分布式存储和计算能力。HDFS解决了大规模数据存储问题,MapReduce提供了简单易用的分布式计算模型,YARN实现了资源的统一管理和调度。
随着大数据技术的发展,Hadoop生态系统不断丰富,Spark、Flink等新一代计算引擎在某些场景下提供了更好的性能。但Hadoop作为大数据的基石,其稳定性、可靠性和成熟的生态系统仍然使其在企业级大数据应用中占据重要地位。
学习Hadoop需要理解分布式系统的基本概念,掌握HDFS、MapReduce、YARN的核心原理,熟悉生态系统中各组件的使用。在实际应用中,要根据业务需求选择合适的技术栈,合理规划集群架构,做好性能调优和运维管理。掌握Hadoop技术,将为您在大数据领域的发展奠定坚实的基础。