MapReduce中Map/Reduce作业和map/reduce函数的区别是什么?
mapreduce作业提交源码分析我们在编写mapreduce程序的时候,首先需要编写map函数和reduce函数。完成mapper和reducer的编写后,进行job的配置;job配置完成后,调用job。 submit()方法完成作业的提交。那我们思考一下,job最终如何完成作业(job)的提交呢?粗略想一下,job必然需要通过某种方式连接到jobtracker,因为只有这样才能将job提交到jobtracker上进行调度执行。 还需要考虑一下,我们自己编写的mapper和reducer,即jar文件如何传送到jobtracker上呢?其中有一种最简单也比较直观的方法,直接通过soc...全部
mapreduce作业提交源码分析我们在编写mapreduce程序的时候,首先需要编写map函数和reduce函数。完成mapper和reducer的编写后,进行job的配置;job配置完成后,调用job。
submit()方法完成作业的提交。那我们思考一下,job最终如何完成作业(job)的提交呢?粗略想一下,job必然需要通过某种方式连接到jobtracker,因为只有这样才能将job提交到jobtracker上进行调度执行。
还需要考虑一下,我们自己编写的mapper和reducer,即jar文件如何传送到jobtracker上呢?其中有一种最简单也比较直观的方法,直接通过socket传输给jobtracker,由jobtracker再传输给tasktracker(注意:mapreduce并没有采用这种方法)。
第三个需要考虑的内容是,jobtracker如何将用户作业的配置转化成map task和reduce task。下面我们来分析一下mapreduce这些功能的实现。首先在class job内部通过jobclient完成作业的提交,最终由jobclient完成与jobtracker的交互功能。
在jobclient的构造函数中,通过调用rpc完成与jobtracker连接的建立。完成建立后,jobclient首先确定job相关文件的存放位置(我们上面提到mapreduce没有采用将jar即其他文件传输给jobtracker的方式,而是将这些文件保存到hdfs当中,并且可以根据用户的配置存放多份)。
至于该存放目录的分配是通过调用rpc访问jobtracker的方法来进行分配的,下面看一下jobtracker的分配代码:final path stagingrootdir = new path(conf。
get("mapreduce。jobtracker。staging。root。dir","/tmp/hadoop/mapred/staging"));final filesystem fs = stagingrootdir。
getfilesystem(conf);return fs。makequalified(new path(stagingrootdir, user "/。staging"))。tostring();注意上面代码所生成的stagingrootdir是所有job文件的存放目录,是一个根目录,并不单指当前job。
完成job存放目录的分配后,jobclient向jobtracker申请一个jobid(通过rpc,注意基本上jobclient与jobtracker的所有通信都是通过rpc完成的,如果下文没有显示著名也应该属于这种情况)。
jobid jobid = jobsubmitclient。getnewjobid();下面是jobtracker。getnewjobid的具体实现:publicsynchronized jobid getnewjobid() throws ioexception {returnnew jobid(gettrackeridentifier(), nextjobid );}获得jobid后,将该jobid与上面的stagingrootdir组合就构成了job文件的具体存放地址的构建。
进行这些相关工作后,jobclient将相关的文件存储到hdfs当中。收起