HADOOP快速入门(packet怎么读)

一 什么是Hadoop

HADOOP是apache旗下的一套开源软件平台HADOOP提供的功能:利用服务器集群,根据用户的自定义业务逻辑,对海量数据进行分布式处理HADOOP的核心组件有

HDFS(分布式文件系统)

YARN(运算资源调度系统)

MAPREDUCE(分布式运算编程框架)

4.广义上来说,HADOOP通常是指一个更广泛的概念——HADOOP生态圈

二 Hadoop生态圈以及各组成部分的简介

HADOOP快速入门

重点组件:

HDFS:分布式文件系统

MAPREDUCE:分布式运算程序开发框架

HIVE:基于大数据技术(文件系统+运算框架)的SQL数据仓库工具

HBASE:基于HADOOP的分布式海量数据库

ZOOKEEPER:分布式协调服务基础组件

Mahout:基于mapreduce/spark/flink等分布式运算框架的机器学习算法库

Oozie:工作流调度框架

Sqoop:数据导入导出工具

Flume:日志数据采集框架

三 Hadoop集群搭建

3.1 集群简介

HADOOP集群具体来说包含两个集群:HDFS集群和YARN集群,两者逻辑上分离,但物理上常在一起

HDFS集群:

负责海量数据的存储,集群中的角色主要有 NameNode / DataNode

YARN集群:

负责海量数据运算时的资源调度,集群中的角色主要有 ResourceManager /NodeManager

(那mapreduce是什么呢?它其实是一个应用程序开发包

3.2 集群安装

3.3 集群启动

初始化HDFS

bin/hadoop namenode -format

启动HDFS

sbin/start-dfs.sh

启动YARN

sbin/start-yarn.sh

3.4 集群初步使用

1 查看集群状态

命令: hdfs dfsadmin –report

也可打开web控制台查看HDFS集群信息,在浏览器打开http://hdp-node:50070/

2 上传文件到HDFS

查看HDFS中的目录信息

命令: hadoop fs –ls /

上传文件

命令: hadoop fs -put ./ scala-2.10.6.tgz to /

从HDFS下载文件

命令:hadoop fs -get /yarn-site.xml

四 HDFS

1 设计思想

设计思想

分而治之:将大文件、大批量文件,分布式存放在大量服务器上,以便于采取分而治之的方式对海量数据进行运算分析;

在大数据系统中作用:

为各类分布式运算框架(如:mapreduce,spark,tez,……)提供数据存储服务

重点概念:文件切块,副本存放,元数据

2 HDFS的概念和特性

首先,它是一个文件系统,用于存储文件,通过统一的命名空间——目录树来定位文件

其次,它是分布式的,由很多服务器联合起来实现其功能,集群中的服务器有各自的角色;

重要特性如下:

HDFS中的文件在物理上是分块存储(block),块的大小可以通过配置参数( dfs.blocksize)来规定,默认大小在hadoop2.x版本中是128M,老版本中是64MHDFS文件系统会给客户端提供一个统一的抽象目录树,客户端通过路径来访问文件,形如:hdfs://namenode:port/dir-a/dir-b/dir-c/file.data目录结构及文件分块信息(元数据)的管理由namenode节点承担——namenode是HDFS集群主节点,负责维护整个hdfs文件系统的目录树,以及每一个路径(文件)所对应的block块信息(block的id,及所在的datanode服务器)文件的各个block的存储管理由datanode节点承担—- datanode是HDFS集群从节点,每一个block都可以在多个datanode上存储多个副本(副本数量也可以通过参数设置dfs.replication)HDFS是设计成适应一次写入,多次读出的场景,且不支持文件的修改

(注:适合用来做数据分析,并不适合用来做网盘应用,因为,不便修改,延迟大,网络开销大,成本太高)

3 HDFS的shell(命令行客户端)操作

3.1 HDFS命令行客户端使用

HDFS提供shell命令行客户端,使用方法如下:

hadoop fs -ls /

hdfs dfs -ls /

3.2 命令行客户端支持的命令参数

[-appendToFile … ]

[-cat [-ignoreCrc] …]

[-checksum …]

[-chgrp [-R] GROUP PATH…]

[-chmod [-R] PATH…]

[-chown [-R] [OWNER][:[GROUP]] PATH…]

[-copyFromLocal [-f] [-p] … ]

[-copyToLocal [-p] [-ignoreCrc] [-crc] … ]

[-count [-q] …]

[-cp [-f] [-p] … ]

[-createSnapshot []]

[-deleteSnapshot ]

[-df [-h] [ …]]

[-du [-s] [-h] …]

[-expunge]

[-get [-p] [-ignoreCrc] [-crc] … ]

[-getfacl [-R] ]

[-getmerge [-nl] ]

[-help [cmd …]]

[-ls [-d] [-h] [-R] [ …]]

[-mkdir [-p] …]

[-moveFromLocal … ]

[-moveToLocal ]

[-mv … ]

[-put [-f] [-p] … ]

[-renameSnapshot ]

[-rm [-f] [-r|-R] [-skipTrash] …]

[-rmdir [–ignore-fail-on-non-empty] …]

[-setfacl [-R] [{-b|-k} {-m|-x } ]|[–set ]]

[-setrep [-R] [-w] …]

[-stat [format] …]

[-tail [-f] ]

[-test -[defsz] ]

[-text [-ignoreCrc] …]

[-touchz …]

[-usage [cmd …]]

3.2 常用命令参数介绍

-help

功能:输出这个命令参数手册

-ls

功能:显示目录信息

示例:hadoop fs -ls hdfs://hadoop-server01:9000/

备注:这些参数中,所有的hdfs路径都可以简写

–>hadoop fs -ls / 等同于上一条命令的效果

-mkdir

功能:在hdfs上创建目录

示例:hadoop fs -mkdir -p /aaa/bbb/cc/dd

-moveFromLocal

功能:从本地剪切粘贴到hdfs

示例:hadoop fs – moveFromLocal /home/hadoop/a.txt /aaa/bbb/cc/dd

-moveToLocal

功能:从hdfs剪切粘贴到本地

示例:hadoop fs – moveToLocal /aaa/bbb/cc/dd /home/hadoop/a.txt

–appendToFile

功能:追加一个文件到已经存在的文件末尾

示例:hadoop fs -appendToFile ./hello.txt hdfs://hadoop-server01:9000/hello.txt

可以简写为:

Hadoop fs -appendToFile ./hello.txt /hello.txt

-cat

功能:显示文件内容

示例:hadoop fs -cat /hello.txt

-tail

功能:显示一个文件的末尾

示例:hadoop fs -tail /weblog/access_log.1

-text

功能:以字符形式打印一个文件的内容

示例:hadoop fs -text /weblog/access_log.1

-chgrp

-chmod

-chown

功能:linux文件系统中的用法一样,对文件所属权限

示例:

hadoop fs -chmod 666 /hello.txt

hadoop fs -chown someuser:somegrp /hello.txt

-copyFromLocal

功能:从本地文件系统中拷贝文件到hdfs路径去

示例:hadoop fs -copyFromLocal ./jdk.tar.gz /aaa/

-copyToLocal

功能:从hdfs拷贝到本地

示例:hadoop fs -copyToLocal /aaa/jdk.tar.gz

-cp

功能:从hdfs的一个路径拷贝hdfs的另一个路径

示例:hadoop fs -cp /aaa/jdk.tar.gz /bbb/jdk.tar.gz.2

-mv

功能:在hdfs目录中移动文件

示例:hadoop fs -mv /aaa/jdk.tar.gz /

-get

功能:等同于copyToLocal,就是从hdfs下载文件到本地

示例:hadoop fs -get /aaa/jdk.tar.gz

-getmerge

功能:合并下载多个文件

示例:比如hdfs的目录 /aaa/下有多个文件:log.1, log.2,log.3,…

hadoop fs -getmerge /aaa/log.* ./log.sum

-put

功能:等同于copyFromLocal

示例:hadoop fs -put /aaa/jdk.tar.gz /bbb/jdk.tar.gz.2

-rm

功能:删除文件或文件夹

示例:hadoop fs -rm -r /aaa/bbb/

-rmdir

功能:删除空目录

示例:hadoop fs -rmdir /aaa/bbb/ccc

-df

功能:统计文件系统的可用空间信息

示例:hadoop fs -df -h /

-du

功能:统计文件夹的大小信息

示例:

hadoop fs -du -s -h /aaa/*

-count

功能:统计一个指定目录下的文件节点数量

示例:hadoop fs -count /aaa/

-setrep

功能:设置hdfs中文件的副本数量

示例:hadoop fs -setrep 3 /aaa/jdk.tar.gz

4 HDFS的工作机制

4.1 概述

HDFS集群分为两大角色:NameNode、DataNode (Secondary Namenode)NameNode负责管理整个文件系统的元数据DataNode 负责管理用户的文件数据块文件会按照固定的大小(blocksize)切成若干块后分布式存储在若干台datanode上每一个文件块可以有多个副本,并存放在不同的datanode上Datanode会定期向Namenode汇报自身所保存的文件block信息,而namenode则会负责保持文件的副本数量HDFS的内部工作机制对客户端保持透明,客户端请求访问HDFS都是通过向namenode申请来进行

4.2 HDFS写数据流程

4.2.1 概述

客户端要向HDFS写数据,首先要跟namenode通信以确认可以写文件并获得接收文件block的datanode,然后,客户端按顺序将文件逐个block传递给相应datanode,并由接收到block的datanode负责向其他datanode复制block的副本

4.2.2 详细步骤图

HADOOP快速入门

4.2.3 详细步骤解析

1、根namenode通信请求上传文件,namenode检查目标文件是否已存在,父目录是否存在

2、namenode返回是否可以上传

3、client请求第一个 block该传输到哪些datanode服务器上

4、namenode返回3个datanode服务器ABC

5、client请求3台dn中的一台A上传数据(本质上是一个RPC调用,建立pipeline),A收到请求会继续调用B,然后B调用C,将真个pipeline建立完成,逐级返回客户端

6、client开始往A上传第一个block(先从磁盘读取数据放到一个本地内存缓存),以packet为单位,A收到一个packet就会传给B,B传给C;A每传一个packet会放入一个应答队列等待应答

7、当一个block传输完成之后,client再次请求namenode上传第二个block的服务器。

4.3 HDFS读数据流程

4.3.1 概述

客户端将要读取的文件路径发送给namenode,namenode获取文件的元信息(主要是block的存放位置信息)返回给客户端,客户端根据返回的信息找到相应datanode逐个获取文件的block并在客户端本地进行数据追加合并从而获得整个文件

4.3.2 详细步骤图

HADOOP快速入门

4.3.3 详细步骤解析

1、跟namenode通信查询元数据,找到文件块所在的datanode服务器

2、挑选一台datanode(就近原则,然后随机)服务器,请求建立socket流

3、datanode开始发送数据(从磁盘里面读取数据放入流,以packet为单位来做校验)

4、客户端以packet为单位接收,现在本地缓存,然后写入目标文件

5 NameNode工作机制

5.1 namenode职责

负责客户端请求的响应

元数据的管理(查询,修改)

5.2 元数据管理

namenode对数据的管理采用了三种存储形式:

内存元数据(NameSystem)磁盘元数据镜像文件数据操作日志文件(可通过日志运算出元数据)

5.2.1 元数据存储机制

A、内存中有一份完整的元数据(内存meta data)

B、磁盘有一个“准完整”的元数据镜像(fsimage)文件(在namenode的工作目录中)

C、用于衔接内存metadata和持久化元数据镜像fsimage之间的操作日志(edits文件)

注:当客户端对hdfs中的文件进行新增或者修改操作,操作记录首先被记入edits日志文件中,当客户端操作成功后,相应的元数据会更新到内存meta.data中

5.2.2 元数据手动查看

可以通过hdfs的一个工具来查看edits中的信息

bin/hdfs oev -i edits -o edits.xml

bin/hdfs oiv -i fsimage_0000000000000000087 -p XML -o fsimage.xml

5.2.3 元数据的checkpoint

每隔一段时间,会由secondary namenode将namenode上积累的所有edits和一个最新的fsimage下载到本地,并加载到内存进行merge(这个过程称为checkpoint)

checkpoint的详细过程

HADOOP快速入门

checkpoint操作的触发条件配置参数

dfs.namenode.checkpoint.check.period=60 #检查触发条件是否满足的频率,60秒

dfs.namenode.checkpoint.dir=file://${hadoop.tmp.dir}/dfs/namesecondary

#以上两个参数做checkpoint操作时,secondary namenode的本地工作目录

dfs.namenode.checkpoint.edits.dir=${dfs.namenode.checkpoint.dir}

dfs.namenode.checkpoint.max-retries=3 #最大重试次数

dfs.namenode.checkpoint.period=3600 #两次checkpoint之间的时间间隔3600秒

dfs.namenode.checkpoint.txns=1000000 #两次checkpoint之间最大的操作记录

checkpoint的附带作用

namenode和secondary namenode的工作目录存储结构完全相同,所以,当namenode故障退出需要重新恢复时,可以从secondary namenode的工作目录中将fsimage拷贝到namenode的工作目录,以恢复namenode的元数据

5.2.4 元数据目录说明

在第一次部署好Hadoop集群的时候,我们需要在NameNode(NN)节点上格式化磁盘:

$HADOOP_HOME/bin/hdfs namenode -format

格式化完成之后,将会在$dfs.namenode.name.dir/current目录下如下的文件结构

current/

|– VERSION

|– edits_*

|– fsimage_0000000000008547077

|– fsimage_0000000000008547077.md5

`– seen_txid

其中的dfs.name.dir是在hdfs-site.xml文件中配置的,默认值如下:

dfs.name.dir

file://${hadoop.tmp.dir}/dfs/name

hadoop.tmp.dir是在core-site.xml中配置的,默认值如下

hadoop.tmp.dir

/tmp/hadoop-${user.name}

A base for other temporary directories.

dfs.namenode.name.dir属性可以配置多个目录,

如/data1/dfs/name,/data2/dfs/name,/data3/dfs/name,….。各个目录存储的文件结构和内容都完全一样,相当于备份,这样做的好处是当其中一个目录损坏了,也不会影响到Hadoop的元数据,特别是当其中一个目录是NFS(网络文件系统Network File System,NFS)之上,即使你这台机器损坏了,元数据也得到保存。

下面对$dfs.namenode.name.dir/current/目录下的文件进行解释。1、VERSION文件是Java属性文件,内容大致如下:

#Fri Nov 15 19:47:46 CST 2013

namespaceID=934548976

clusterID=CID-cdff7d73-93cd-4783-9399-0a22e6dce196

cTime=0

storageType=NAME_NODE

blockpoolID=BP-893790215-192.168.24.72-1383809616115

layoutVersion=-47

其中  (1)、namespaceID是文件系统的唯一标识符,在文件系统首次格式化之后生成的;  (2)、storageType说明这个文件存储的是什么进程的数据结构信息(如果是DataNode,storageType=DATA_NODE);  (3)、cTime表示NameNode存储时间的创建时间,由于我的NameNode没有更新过,所以这里的记录值为0,以后对NameNode升级之后,cTime将会记录更新时间戳;  (4)、layoutVersion表示HDFS永久性数据结构的版本信息, 只要数据结构变更,版本号也要递减,此时的HDFS也需要升级,否则磁盘仍旧是使用旧版本的数据结构,这会导致新版本的NameNode无法使用;  (5)、clusterID是系统生成或手动指定的集群ID,在-clusterid选项中可以使用它

2、$dfs.namenode.name.dir/current/seen_txid非常重要,是存放transactionId的文件,format之后是0,它代表的是namenode里面的edits_*文件的尾数,namenode重启的时候,会按照seen_txid的数字,循序从头跑edits_0000001~到seen_txid的数字。所以当你的hdfs发生异常重启的时候,一定要比对seen_txid内的数字是不是你edits最后的尾数,不然会发生建置namenode时metaData的资料有缺少,导致误删Datanode上多余Block的资讯。

3、$dfs.namenode.name.dir/current目录下在format的同时也会生成fsimage和edits文件,及其对应的md5校验文件。

补充:seen_txid

文件中记录的是edits滚动的序号,每次重启namenode时,namenode就知道要将哪些edits进行加载edits

6 Datanode的工作机制

6.1 概述

1、Datanode工作职责:

存储管理用户的文件块数据

定期向namenode汇报自身所持有的block信息(通过心跳信息上报)

(这点很重要,因为,当集群中发生某些block副本失效时,集群如何恢复block初始副本数量的问题)

dfs.blockreport.intervalMsec

3600000

Determines block reporting interval in milliseconds.

2、Datanode掉线判断时限参数

datanode进程死亡或者网络故障造成datanode无法与namenode通信,namenode不会立即把该节点判定为死亡,要经过一段时间,这段时间暂称作超时时长。HDFS默认的超时时长为10分钟+30秒。如果定义超时时间为timeout,则超时时长的计算公式为:

timeout = 2 * heartbeat.recheck.interval + 10 * dfs.heartbeat.interval。

而默认的heartbeat.recheck.interval 大小为5分钟,dfs.heartbeat.interval默认为3秒。

需要注意的是hdfs-site.xml 配置文件中的heartbeat.recheck.interval的单位为毫秒,dfs.heartbeat.interval的单位为秒。所以,举个例子,如果heartbeat.recheck.interval设置为5000(毫秒),dfs.heartbeat.interval设置为3(秒,默认),则总的超时时间为40秒。

heartbeat.recheck.interval

2000

dfs.heartbeat.interval

1

五 MapReduce

Mapreduce是一个分布式运算程序的编程框架,是用户开发“基于hadoop的数据分析应用”的核心框架;

Mapreduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个hadoop集群上;

1.1 为什么要MapReduce

(1)海量数据在单机上处理因为硬件资源限制,无法胜任

(2)而一旦将单机版程序扩展到集群来分布式运行,将极大增加程序的复杂度和开发难度

(3)引入mapreduce框架后,开发人员可以将绝大部分工作集中在业务逻辑的开发上,而将分布式计算中的复杂性交由框架来处理

1.2 MapReduce框架结构及核心运行机制

1.2.1 结构

一个完整的mapreduce程序在分布式运行时有三类实例进程:

1、MRAppMaster:负责整个程序的过程调度及状态协调

2、mapTask:负责map阶段的整个数据处理流程

3、ReduceTask:负责reduce阶段的整个数据处理流程

1.2.2 MR程序运行流程

1.2.2.1 流程解析

1.一个mr程序启动的时候,最先启动的是MRAppMaster,MRAppMaster启动后根据本次job的描述信息,计算出需要的maptask实例数量,然后向集群申请机器启动相应数量的maptask进程

2.maptask进程启动之后,根据给定的数据切片范围进行数据处理,主体流程为:

利用客户指定的inputformat来获取RecordReader读取数据,形成输入KV对将输入KV对传递给客户定义的map()方法,做逻辑运算,并将map()方法输出的KV对收集到缓存将缓存中的KV对按照K分区排序后不断溢写到磁盘文件

3.MRAppMaster监控到所有maptask进程任务完成之后,会根据客户指定的参数启动相应数量的reducetask进程,并告知reducetask进程要处理的数据范围(数据分区)

4.Reducetask进程启动之后,根据MRAppMaster告知的待处理数据所在位置,从若干台maptask运行所在机器上获取到若干个maptask输出结果文件,并在本地进行重新归并排序,然后按照相同key的KV为一个组,调用客户定义的reduce()方法进行逻辑运算,并收集运算输出的结果KV,然后调用客户指定的outputformat将结果数据输出到外部存储

1.3 MapTask并行度决定机制

maptask的并行度决定map阶段的任务处理并发度,进而影响到整个job的处理速度

那么,mapTask并行实例是否越多越好呢?其并行度又是如何决定呢?

1.3.1 mapTask并行度的决定机制

一个job的map阶段并行度由客户端在提交job时决定

而客户端对map阶段并行度的规划的基本逻辑为:

将待处理数据执行逻辑切片(即按照一个特定切片大小,将待处理数据划分成逻辑上的多个split),然后每一个split分配一个mapTask并行实例处理

这段逻辑及形成的切片规划描述文件,由FileInputFormat实现类的getSplits()方法

1.4 ReduceTask并行度的决定

reducetask的并行度同样影响整个job的执行并发度和执行效率,但与maptask的并发数由切片数决定不同,Reducetask数量的决定是可以直接手动设置:

//默认值是1,手动设置为4

job.setNumReduceTasks(4);

如果数据分布不均匀,就有可能在reduce阶段产生数据倾斜

注意: reducetask数量并不是任意设置,还要考虑业务逻辑需求,有些情况下,需要计算全局汇总结果,就只能有1个reducetask

尽量不要运行太多的reduce task。对大多数job来说,最好rduce的个数最多和集群中的reduce持平,或者比集群的 reduce slots小。这个对于小集群而言,尤其重要。

1.5MapReduce程序演示

Hadoop的发布包中内置了一个hadoop-mapreduce-example-2.4.1.jar,这个jar包中有各种MR示例程序,可以通过以下步骤运行:

启动hdfs,yarn

然后在集群中的任意一台服务器上启动执行程序(比如运行wordcount):

hadoop jar hadoop-mapreduce-example-2.4.1.jar wordcount /wordcount/data /wordcount/out

2 MapReduce实践篇

2.1 mapreduce示例编写及编程规范

2.1.1 编程规范

用户编写的程序分成三个部分:Mapper,Reducer,Driver(提交运行mr程序的客户端)Mapper的输入数据是KV对的形式(KV的类型可自定义)Mapper的输出数据是KV对的形式(KV的类型可自定义)Mapper中的业务逻辑写在map()方法中map()方法(maptask进程)对每一个调用一次Reducer的输入数据类型对应Mapper的输出数据类型,也是KVReducer的业务逻辑写在reduce()方法中Reducetask进程对每一组相同k的组调用一次reduce()方法用户自定义的Mapper和Reducer都要继承各自的父类整个程序需要一个Drvier来进行提交,提交的是一个描述了各种必要信息的job对象

1.6 MapReduce原理篇

1.6.1 mapreduce的shuffle机制

概述:

mapreduce中,map阶段处理的数据如何传递给reduce阶段,是mapreduce框架中最关键的一个流程,这个流程就叫shuffle;shuffle: 洗牌、发牌——(核心机制:数据分区,排序,缓存);具体来说:就是将maptask输出的处理结果数据,分发给reducetask,并在分发的过程中,对数据按key进行了分区和排序;

主要流程

HADOOP快速入门

shuffle是MR处理流程中的一个过程,它的每一个处理步骤是分散在各个map task和reduce task节点上完成的,整体来看,分为3个操作:

分区partitionSort根据key排序Combiner进行局部value的合并

详细流程

maptask收集我们的map()方法输出的kv对,放到内存缓冲区中从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件 (默认100M)多个溢出文件会被合并成大的溢出文件在溢出过程中,及合并的过程中,都要调用partitoner进行分组和针对key进行排序reducetask根据自己的分区号,去各个maptask机器上取相应的结果分区数据reducetask会取到同一个分区的来自不同maptask的结果文件,reducetask会将这些文件再进行合并(归并排序)合并成大文件后,shuffle的过程也就结束了,后面进入reducetask的逻辑运算过程(从文件中取出一个一个的键值对group,调用用户自定义的reduce()方法)

Shuffle中的缓冲区大小会影响到mapreduce程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快

缓冲区的大小可以通过参数调整, 参数:io.sort.mb 默认100M

六 yarn

Yarn是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式的操作系统平台,而mapreduce等运算程序则相当于运行于操作系统之上的应用程序

yarn的重要概念

yarn并不清楚用户提交的程序的运行机制yarn只提供运算资源的调度(用户程序向yarn申请资源,yarn就负责分配资源)yarn中的主管角色叫ResourceManageryarn中具体提供运算资源的角色叫NodeManager这样一来,yarn其实就与运行的用户程序完全解耦,就意味着yarn上可以运行各种类型的分布式运算程序(mapreduce只是其中的一种),比如mapreduce、storm程序,spark程序,tez ……所以,spark、storm等运算框架都可以整合在yarn上运行,只要他们各自的框架中有符合yarn规范的资源请求机制即可Yarn就成为一个通用的资源调度平台,从此,企业中以前存在的各种运算集群都可以整合在一个物理集群上,提高资源利用率,方便数据共享

原创文章,作者:LYXFFO,如若转载,请注明出处:https://www.beidanyezhu.com/a/95010.html

(0)
LYXFFO的头像LYXFFO
上一篇 2025-07-11
下一篇 2025-07-11

相关推荐

分享本页
返回顶部