? 1.JobClient writes code, configures jobs and submits jobs.
? 2.JobTracker: Initialize the job, assign the job and coordinate the operation of the job. This is a java program, the main class is JobTracker.
? 3.TaskTracker: Run tasks after job division, that is, execute Map or Reduce tasks on allocation data allocation.
? 4.HDFS: Save job data and configuration information, and save job results.
Overall execution flow of Map/Reduce job:
? Code writing-> job configuration? -& gt; ? Job submission? -& gt; ? Mapping task assignment and execution? -& gt; ? Handling intermediate results? -& gt; ? Reduce task allocation and execution? -& gt; ? Output result
For the execution of each work, it also includes:
? Input preparation? -& gt; ? Mission execution? -& gt; ? Output result
Job submission job client:
? The runJob method of JobClient generates a Jobclient instance and calls its submitJob method, then the runJob starts a loop, and calls the getTaskCompetionEvents method in the loop to obtain the TaskCompletionEvent instance, polls the job progress every second (progress and status update will be introduced later), writes the progress to the console, displays the job counter after the job is completed, and records the error to the console when it fails.
? Job submission method Job submission process:
? 1. request a new JobId from JobTracker.
? 2. Check the path related to the job and return an error if the path is incorrect.
? 3. Calculate the job input slice and its division information.
? 4. Copy resources (jar files, configuration files, etc. ) operating the shared HDFS, and
Copy multiple copies (parameter control, the default value is 10) for tasktracker to access, and copy the calculated fragments to HDFS.
? 5. Call the submitJob () method of the JobTracker object to actually submit the job and tell JobTracker that the job is ready for execution.
Initialization of JobTracker:
? After receiving the call of submitJob method, JobTracker will put the call into an internal queue, which will be scheduled and initialized by Job scheduler. Job initialization creates a job object.
? When a job is scheduled, JobTracker creates a JobInProgress object representing the job, and encapsulates the task and record information in this object, so as to track the status and progress of the task.
? The initialization process is initialized by the initTasks method of the JobInProgress object.
? Initialization steps:
? 1. Read the job.split information corresponding to the job from HDFS to prepare for subsequent initialization.
? 2. Create and initialize maps and reduce tasks. According to the number of data fragmentation information, the number of map tasks is determined, and then a TaskInProgress object is generated for each maptask to process data fragmentation. This object is first put into nonRunningMapCache for JobTracker to use when assigning tasks. Next, according to the mapred.reduce.tasks attribute in JobConf, set the number of reducetasks by using the setNumReduceTasks () method, and then create them in the same way as map task.
? 3. Finally, create two initialization tasks to initialize map and reduce.
Task assignment JobTracker:
Messaging heartbeat: tasktracker runs a simple loop and sends heartbeat to JobTracker periodically. The heartbeat tells JobTracker whether he is alive or not, and at the same time passes other information as a message channel (requesting a new task). As part of the heartbeat, tasktracker will indicate whether it is ready to run a new task, and if so, jobtracker will assign it a task.
Assign the job to which the task belongs: Before Jobtracker assigns the task, you need to determine the job where the task is located. Various job scheduling algorithms will be introduced later, and the default is FIFO job scheduling.
Assign Map and Reduce tasks: tasktrack has a fixed number of task slots, and a tasktrack can run multiple Map and Reduce tasks at the same time, but its exact number is determined by the number of cores and memory size of tasktrack. The default scheduler will fill the Map task slot first, and then fill the Reduce task slot. Jobtracker will select the tasktracker closest to the slice file. Ideally, tasks are data-local, but they can also be rack-local. Without localization, they need to retrieve data from other racks. Reduce task allocation is very simple, and jobtracker will simply select one from the list of Reduce tasks to run, regardless of data localization.
Task Implementation Task Tracker:
? When a new task is received, TaskTracker will run it locally. The first step in running a task is to localize the injection configuration, data, programs and other information required for the localization of the task through localizedJob.
? 1. Localized data: copy job.split and job.jar locally from * * * file system (in distributed cache) and write job configuration information into job.xml.
? 2. Create a new local working directory: tasktracker will compress the job.jar file into this working directory.
? 3. Call the launchTaskForJob method to publish the task (in which a new TaskRunner instance will be created to run the task). MapTaskRunner will be enabled if it is a Map task, and Reducetaskrunner if it is a reduce task.
After that, TaskRunner will start a new JVM to run each Map/Reduce task to prevent tasktracker from crashing due to program reasons, but the JVM can still be reused between different tasks, and we will talk about task JVM reuse later.
? For a single map, the simple process of task execution is:
? 1. Assign task execution parameters
? 2. Add the Map task information to the temporary file of Child (Child is the main process running the map and Reduce tasks).
? 3. Configure the log folder and communication and output parameters of the map task.
? 4. The read input is split to generate the data read by the RecordReader.
? 5. Generate MapRunnable for Map, receive data from RecordReader in turn, and call the Map function for processing.
? 6. Finally, collect the output calls of the map function into the MapOutputBuffer (parameters control its size).
Flow and pipeline:
? Both streams and pipelines run special Map and Reduce tasks to run and communicate with user-provided executables.
? Streams: Use standard input and output streams to communicate with processes.
? Pipes: used to listen to sockets, it will send a port number to C++ programs, and the two can establish a link.
?
Progress and status updates:
? Jobs and their tasks have statuses, including: running success or failure status, mapping/reducing progress, job counter value and status message.
? Communication between status messages and clients:
? 1. Track the progress of the map task: the progress is the proportion of the processed input.
? 2. For Reduce: slightly complicated, the task of Reduce is divided into three stages (each stage accounts for 1/3), namely, copying, sorting and reduce processing. If reduce has executed half of the input, then the task progress is 1/3+ 1/6 = 5.
? 3. Task Counter: A task has a set of counters, which are responsible for counting the events that the task runs.
? 4. Task progress report: If the task reports progress, a flag will be set to indicate that the status will be sent to tasktracker. An independent thread checks this flag every three seconds, and if it is set, it tells tasktracker the current state.
? 5.tasktracker progress report: tasktracker will send a heartbeat to jobtracker every 5 seconds (this heartbeat is determined by the size of the cluster, and the larger the cluster, the longer it takes), and all the running status of tasktracker will be sent to jobtracker during the calling process.
? 6.jobtracker consolidated task report: It generates a global view showing the task status of all machines running jobs.
? The JobClient mentioned above receives the latest status by querying the JobTracker every second, and the getJob method of the client JobClient can get an instance of RunningJob, which contains all the status information of the job.
?
Completion of work:
? When jobtracker receives the notification that the last task of a job has been completed, it sets the job status to success. When the JobClient queries the status, it knows that the task has been successfully completed, so the JobClient prints a message to inform the user, and then returns from the runJob method.
? If the jobtracker has the corresponding settings, it will also send an Http job notification to the client, and the client who wants to receive the callback instruction can set it through the job.end.notification.url property.
? Jobtracker indicates the working status of the job, which means that tasktracker also clears the working status of the job, such as deleting the intermediate output.
?
fail
? In fact, there is a software error in the user code, the process will crash and the machine will fail, but Hadoop can handle these failures well and complete the homework.
? 1. Task failed.
? Subtask exception: If the user code in the Map/Reduce task throws an exception, the subtask JVM process will send an error report to the parent process tasktracker before exiting, and the error will be recorded in the user log. Tasktracker marks this task attempt as finished and releases this task slot to run another task.
? Sub-process JVM suddenly exits: due to some special reasons caused by user code caused by JVM bug, JVM may exit. In this case, tasktracker will notice that the process has exited and mark the attempt as a failure.
? Task Pending: Once tasktracker notices that it has not received progress updates for a period of time, it will mark the task as failed and the JVM subprocess will be automatically terminated. The interval of task failure is usually 10 minutes, and the expiration time can be set based on the job or cluster. The parameter is mapred.task.timeout Note: If the parameter value is set to 0, the suspended task will never release its task slot, which will reduce the efficiency of the whole cluster over time.
? Task failure attempt: When jobtracker learns that tasktracker failed, it will reschedule the task. Of course, jobtracker will try to avoid rescheduling failed tasktracker tasks. If the task attempts more than 4 times, it will not be retried. This value can be set. For map tasks, this parameter is mapred.map.max.attempts, and for reduce tasks, this parameter is controlled by the mapred.reduce.max.attempts property. If the number of times exceeds the limit, the whole operation will fail. Of course, sometimes we don't want to stop the whole homework when a few tasks fail, because even if some tasks fail, some results of the homework may still be useful. In this case, you can set the maximum task failure percentage allowed by the job without triggering the job failure. Map task and Reduce task can be controlled independently, and the parameters are mapred.max.map.failures.percent and mapred.max.failures.
? Kill: Task termination is different from task failure. You can abort a task attempt because it is a speculative copy, or because its tasktracker failed, which will cause jobtracker to mark all task attempts above it as terminated. Terminated task attempts will not be counted in the number of task running attempts, because the attempt to abort is not the fault of the task.
? 2.tasktracker failed
? Tasktracker fails because it crashes or runs too slowly. He will stop sending heartbeats to jobtracker (or rarely send heartbeats). Jobtracker notices that tasktracker has stopped sending heartbeats (the expiration time is set by the parameter mapred. Tasktracker.expire.interval, in milliseconds) and remove it from the tasktracker pool waiting to be scheduled. If it is an unfinished job, jobtracker will arrange the Map task that has been successfully run on the second tasktracker to be re-run, because the reduce task cannot be accessed at this time (the intermediate output is stored on the local file system of the failed tasktracker).
? Even if tasktracker doesn't fail, it may be blacklisted by jobtracker. If the number of failed tasks on tasktracker is much higher than the average number of failed tasks in the cluster, he will be blacklisted, and the blacklisted tasktracker can be removed from the jobtracker blacklist by restarting.
3.jobtracker failed
? The failure of the old version of JobTracker is a single point of failure, in which case the job is doomed to fail.
Operation plan:
? Advance job scheduling FIFO: FIFO in the order of job submission. You can set the priority by setting the mapred.job.priority property of JobClient or the setJobPriority () method (priority: very _ high, high, normal, low, very _ low). Please note that FIFO scheduling algorithm does not support preemption, so high-priority jobs will still be blocked by low-priority jobs that have been running for a long time.
? Fair scheduler: The goal is to make each user enjoy the cluster capability fairly. When there are many jobs in the cluster, the idle task slots will be allocated in the way of "let each user * * * enjoy the cluster". By default, each user has his own job pool. FairScheduler supports preemption, so if a pool does not get a fair share of resources within a certain period of time, it will terminate the tasks with too many resources in the pool, thus giving the task slot to the pool with insufficient resources. FairScheduler is a follow-up module. To use it, you need to put its jar file in the classpath of Hadoop. It can be configured through the parameter map.red.jobtracker.taskscheduler (the value is org.apache.hadoop.mapred.fairscheduler).
? Capacity scheduler:
? A cluster consists of many queues, and each queue has an allocated capacity, which is similar to FairScheduler, except that within each queue, jobs are scheduled according to FIFO. Essentially, the capacity scheduler allows users or organizations to simulate a cluster that uses FIFO independently for each user.
Out-of-order playback and sorting:
? MapReduce ensures that each Reducer's input is sorted by keys. The sorting process performed by the system-the process of passing the mapping output as input to the reducer is called shuffle. Shuffle is part of a code base that is constantly optimized and improved. Shuffle is the core of MapReduce in many ways.
? The whole shuffling process should be like this:
? Map result partition? Classification separation overflow? Merge the same department? Merge the same department? Merging result sorting reduces processing output.
? End of map:
? Write buffer: the output of the Map function is processed by the collector, not simply writing the result to disk. It uses buffers to write to memory and pre-sorts them for efficiency. Each mapping has a circular memory buffer for task output. The default buffer size is 100MB (adjusted by the parameter io.sort.mb). Once the buffer content reaches the threshold (0.8 by default), the background process starts to write the content to disk (overflow). While writing to disk, the map output will continue to be written to the buffer, but if the buffer is full, the map will prevent writing to disk. Writing to the disk will be done by polling to the job-specific subdirectory specified by the mapred.local.dir attribute.
? Write out the buffer: When collect writes out the contents of the buffer, it will call the sortAndSpill function, which is mainly used to create an overflow file, sort the data according to the key values, and write the data into the file according to the division. If the combiner class is configured, it will call the combineAndSpill function before writing to the file. SortAndspill writes an overflow file every time it is called.
? Merge overflow files of all maps: TaskTracker will merge all overflow files generated by maps after each map task. The merging rule is to merge the data in the same partition of all overflow files according to the partition and write them into a map output file sorted by partition. After the uniquely partitioned and sorted mapping output file is written into the last record, the shuffling stage on the mapping ends.
? Before writing to disk, the thread first divides the data into response partitions according to the reducer to which the data will eventually be passed. In each partition, background thread keys are sorted internally. If there is a combiner, it will run on the sorted output.
? When the memory reaches the threshold of overflow writing, a new overflow writing file will be created, because there will be several overflow writing files after the map task completes its last output record. Before the task is completed, the overflow write files will be merged into a partitioned and sorted output file. The configuration attribute io.sort.facor controls the maximum number of data streams that can be merged at one time, and the default value is 10.
? If a combiner is specified and the number of writes is at least 3 times (set by min.mum.spills.for.combine), the combiner will run before the output file is written to disk. The significance of running the combiner is to make the map output more compact, and write to the local disk, and send less data to reducer.
? Compression when writing: Compression when writing will make writing faster, save disk space and reduce the amount of data transmitted to reducer. By default, the output is uncompressed, but compression can be enabled by setting the mapred.compress.map.output value to true. The compression library used is made by mapred.map.output.compression.codec.
? Worker Threads Obtained by reducer: reducer obtains the partition of the output file through http, and the number of worker threads used for file partition is specified by the tracker.http.threads property. This setting applies to every tasktracker, not every map task slot. The default value is 40, which can be increased as needed on large clusters.
End of reduction:
? Copy phase: reduce will periodically obtain the output location of the map from JobTracker. Once the output location is obtained, reduce will copy the map output corresponding to TaskTracker to the local area (if the map output is small, it will be copied to the memory of TaskTracker node, otherwise it will be released as a disk), without waiting for all map tasks to be completed (of course, this is also controlled by parameters).
? Merge stage: the map output files copied from each TaskTracker (whether in disk or in memory) are integrated and the original order of the data is maintained.
? Reduce stage: take out one piece of data from the merged file in turn for reduce function processing, and then output the result to the local HDFS.
? The output file of the map is located on the local disk of tasktracker running the map task. Now, tasktracker wants to run the reduce task on the partition file. The completion time of each task may be different, but as soon as a task is completed, the reduce task begins to copy its output, which is the copy stage of the reduce task. The reduce task has a small number of replication threads, so it can get the map output in parallel. The default value is 5 threads, which can be set through the mapred.reduce.parallel.copies property.
? How does Reducer know which tasktracker to get the map output from: After the map task is completed, it will inform its parent that the tasktracker status has been updated, and then tasktracker will inform (through heartbeat) jobtracker. So JobTracker knows the mapping relationship between map output and tasktracker, and a thread in reducer periodically asks jobtracker to know the location of map output. Because the reducer may fail, when the first reducer retrieves the map output, tasktracker will not delete it from the disk immediately. Instead, he waits for jobtracker to announce that he can delete the map output, which is the last execution after the job is completed.
? If the map output is small, it will be directly copied to the memory buffer of reduce tasktracker (the size is controlled by mapred.job.buffer.input.buffer.percentage, which accounts for the percentage of heap space); Otherwise, the map output will be copied to disk. Once the memory buffer reaches the threshold size (via mapred.iob.buffer.merge.percent)
Or when the mapped output threshold size is reached (mapred.inmem.threadhold), the merged overflow is written to disk.
? As there are more and more copies on disk, background threads will merge them into larger ordered files. Note: In order to merge, the compressed map output must be decompressed in memory.
? Sorting stage: After the copy stage is completed, the reduce task will enter the sorting stage, or more precisely, the merging stage, which will merge the map outputs and keep their order. Merge is cyclic, and the number of output files per merge is determined by the merge factor. But it makes it possible to generate intermediate files.
? Reduce stage: in the final reduce stage, the sorted files are directly input into the reduce function, and the intermediate files are not merged. The final merge can come from memory or disk. The output of this stage will be written directly to the file system, usually hdfs.
? Details: The merger here is not an ordinary merger. For example, there are 40 files and the merge factor is 10. We don't merge 10 files every time, but merge them four times. Instead, merge four files for the first time and merge 10 for the last three times. For the last time, 4 files are merged, and the remaining 6 files are merged directly into reduce.