Hadoop learning from 0 to 1 -- Chapter 12 Hadoop data compression

1. Compression overview

Compressed computing can effectively reduce the number of read and write sections of the underlying storage system. Compression improves the efficiency of network bandwidth and disk space. When running MR program, I/O operation, network transmission, Shuffle and Merge take a lot of time, especially in the case of large data scale and intensive workload. Therefore, the use of data compression is very important.

Since disk I/O and network bandwidth are valuable resources of Hadoop, data compression is very helpful to save resources and minimize disk I/O and network transmission. Compression can be enabled at any MapReduce stage. However, although the CPU overhead of compression and decompression operations is not high, there is no cost for the prompt and resource saving concurrency.

2. Compression strategy and principle

Compression is an optimization strategy to improve the running efficiency of Hadoop.

Compress the data of Mapper and Reducer to reduce disk IO and improve the running speed of MR program.

Note: the compression technology reduces the disk IO, but increases the CPU computing burden at the same time. Therefore, proper use of compression characteristics can improve performance, and improper use may also reduce performance.

Basic principles of compression:

  1. Operation intensive jobs use less compression.
  2. IO intensive Job and multi-purpose compression.

3. MR supported compression coding

Compression format hadoop comes with? algorithm File extension Whether it can be segmented After changing to compressed format, does the original program need to be modified
DEFLATE Yes, direct use DEFLATE .deflate no Like text processing, it does not need to be modified
Gzip Yes, direct use DEFLATE .gz no Like text processing, it does not need to be modified
bzip2 Yes, direct use bzip2 .bz2 yes Like text processing, it does not need to be modified
LZO No, need to install LZO .lzo yes You need to build an index and specify the input format
Snappy No, need to install Snappy .snappy no Like text processing, it does not need to be modified

In order to support a variety of compression / decompression algorithms, Hadoop introduces a codec / decoder, as shown in the following table:

Compression format Corresponding encoder / decoder
DEFLATE org.apache.hadoop.io.compress.DefaultCodec
gzip org.apache.hadoop.io.compress.GzipCodec
bzip2 org.apache.hadoop.io.compress.BZip2Codec
LZO com.hadoop.compression.lzo.LzopCodec
Snappy org.apache.hadoop.io.compress.SnappyCodec

Comparison of compression performance

compression algorithm Original file size Compressed file size Compression speed Decompression speed
gzip 8.3GB 1.8GB 17.5MB/s 58MB/s
bzip2 8.3GB 1.1GB 2.4MB/s 9.5MB/s
LZO 8.3GB 2.9GB 49.3MB/s 74.6MB/s

http://google.github.io/snappy/

On the 64 Core i7 CUP, the compression efficiency of Snappy is 250MB/s or higher, and the decompression efficiency is 500MB/s or higher.

4. Selection of compression mode

4.1 Gzip compression

Advantages: high compression efficiency and fast compression / decompression speed. Hadoop itself supports, and processing Gzip files in applications is the same as directly processing text; Most Linux systems come with Gzip command, which is easy to use.

Disadvantages: Split is not supported.

Application scenario: when each file is compressed and within 130M (within a block size), Gzip format compression can be considered. For example, a log of one day or one hour is compressed into a Gzip file.

4.2 Bzip2 compression

Advantages: it supports Split and has a high compression rate, which is higher than Gzip. Hadoop comes with itself and is easy to use.

Disadvantages: slow compression / decompression speed.

Application scenario: suitable for the time when the speed requirement is not high but the compression rate is high; Or the output data is relatively large, the processed data needs to be compressed and archived to reduce disk space, and the later data is used less; Or if you want to compress a single large text file to reduce storage space, and you need to support Split and be compatible with previous applications.

4.3 Lzo compression

Advantages: fast compression / decompression speed and reasonable compression rate. It supports Split, which is the most popular compression format in Hadoop. The command can be installed under Linux system, which is convenient to use.

Disadvantages: the compression ratio is lower than Gzip. Hadoop itself does not support and needs to be installed; In the application, some special processing should be done for the file in Lzo format (in order to support Split, the index needs to be built, and the InputFormat needs to be specified as Lzo format).

Application scenario: if a large file is more than 200m after compression, it can be considered, and the larger a single file is, the more obvious the advantages of Lzo are.

4.4 Snappy compression

Advantages: high compression speed and reasonable compression ratio.

Disadvantages: Split is not supported. The compression ratio is lower than Gzip; Hadoop itself is not supported and needs to be installed.

Application scenario: when the data output from the Map of MapReduce job is relatively large, it is used as the compression format of the intermediate data from Map to Reduce. Or as the output of one MapReduce job and the input of another MapReduce job.

5. Selection of compression position

Compression can be enabled at any stage of MapReduce, as shown in the figure:

MapReduce data compression

6. Compression parameter configuration

To enable compression in Hadoop, you can configure the following parameters:

parameter Default value stage proposal
io.compression.codecs (configured in core-site.xml) org.apache.hadoop.io.compress.DefaultCodec, org.apache.hadoop.io.compress.GzipCodec, org.apache.hadoop.io.compress.BZip2Codec Input compression Hadoop uses file extensions to determine whether a codec is supported
mapreduce.map.output.compress (configured in mapred-site.xml) false mapper output Set this parameter to true to enable compression
mapreduce.map.output.compress.codec (configured in mapred-site.xml) org.apache.hadoop.io.compress.DefaultCodec mapper output Enterprises often use LZO or Snappy codec to compress data at this stage
mapreduce.output.fileoutputformat.compress (configured in mapred-site.xml) false reducer output Set this parameter to true to enable compression
mapreduce.output.fileoutputformat.compress.codec (configured in mapred-site.xml) org.apache.hadoop.io.compress. DefaultCodec reducer output Use standard tools or codecs, such as gzip and bzip2
mapreduce.output.fileoutputformat.compress.type (configured in mapred-site.xml) RECORD reducer output The compression type used for SequenceFile output: NONE and BLOCK
configuration parameter

7. Compress practical cases

7.1 compression and decompression of data stream

CompressionCodec has two ways to compress and decompress data easily.

  • Compression: the data of the data stream needs to be compressed. We can use the createoutputstream (OutputStream out) method to create a CompressionOutputStream. Compress it and write it to the underlying stream.
  • Decompression: if you need to decompress the data read from the input stream, you need to call the createinputstream (InputStream) function to obtain a CompressionInputStream and read the decompressed data from the underlying stream.

7.2 testing Hadoop compression mode

DEFLATE org.apache.hadoop.io.compress.DefaultCodec
gzip org.apache.hadoop.io.compress.GzipCodec
bzip2 org.apache.hadoop.io.compress.BZip2Codec
package com.bigdata.hadoop.mapreduce.compress;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.*;
import org.apache.hadoop.util.ReflectionUtils;

public class TestCompress {

    public static void main(String[] args) throws Exception {
        String method = BZip2Codec.class.getSimpleName();
        compress("/data.txt",method);
		decompress("/data.txt.bz2");
    }

    // 1. Compress
    private static void compress(String filename, String method) throws Exception {

        // (1) Get input stream
        FileInputStream fis = new FileInputStream(new File(filename));

        Class codecClass = Class.forName(method);

        CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, new Configuration());

        // (2) Get output stream
        FileOutputStream fos = new FileOutputStream(new File(filename + codec.getDefaultExtension()));
        CompressionOutputStream cos = codec.createOutputStream(fos);

        // (3) Stream torture
        IOUtils.copyBytes(fis, cos, 1024*1024*5, false);

        // (4) Close resource
        cos.close();
        fos.close();
        fis.close();
    }

    // 2. Decompress
    private static void decompress(String filename) throws FileNotFoundException, IOException {

        // (0) verify whether it can be decompressed
        CompressionCodecFactory factory = new CompressionCodecFactory(new Configuration());

        CompressionCodec codec = factory.getCodec(new Path(filename));

        if (codec == null) {
            System.out.println("cannot find codec for file " + filename);
            return;
        }

        // (1) Get input stream
        CompressionInputStream cis = codec.createInputStream(new FileInputStream(new File(filename)));

        // (2) Get output stream
        FileOutputStream fos = new FileOutputStream(new File(filename + ".decoded"));

        // (3) Stream torture
        IOUtils.copyBytes(cis, fos, 1024*1024*5, false);

        // (4) Close resource
        cis.close();
        fos.close();
    }
}

7.3 Map output is compressed

Even if the input and output files of your MapReduce are uncompressed files, you can still compress the intermediate result output of the Map task, because it needs to be written on the hard disk and transmitted to the Reduce node through the network. Compressing it can improve a lot of performance. For these tasks, just set two properties. Let's see how to set the code.

Hadoop source code provides you with compression formats: BZip2Codec and DefaultCodec

  1. Mapper remains unchanged

    package com.bigdata.hadoop.mapreduce.compress;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    
        Text k = new Text();
        IntWritable v = new IntWritable(1);
    
        @Override
        protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {
    
            // 1 get a row
            String line = value.toString();
    
            // 2 cutting
            String[] words = line.split(" ");
    
            // 3 cycle write
            for(String word:words){
                k.set(word);
                context.write(k, v);
            }
        }
    }
    
  2. Reducer remains unchanged

    package com.bigdata.hadoop.mapreduce.compress;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
    
        IntWritable v = new IntWritable();
    
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values,
                              Context context) throws IOException, InterruptedException {
    
            int sum = 0;
    
            // 1 Summary
            for(IntWritable value:values){
                sum += value.get();
            }
    
            v.set(sum);
    
            // 2 output
            context.write(key, v);
        }
    }
    
  3. Add compression mode at Driver end

    package com.bigdata.hadoop.mapreduce.compress;
    
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.compress.BZip2Codec;
    import org.apache.hadoop.io.compress.CompressionCodec;
    import org.apache.hadoop.io.compress.GzipCodec;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class WordCountDriver {
    
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    
            Configuration configuration = new Configuration();
    
            // Enable map output compression
            configuration.setBoolean("mapreduce.map.output.compress", true);
            // Set the map output compression mode
            configuration.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);
    
            Job job = Job.getInstance(configuration);
    
            job.setJarByClass(WordCountDriver.class);
    
            job.setMapperClass(WordCountMapper.class);
            job.setReducerClass(WordCountReducer.class);
    
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            boolean result = job.waitForCompletion(true);
    
            System.exit(result ? 1 : 0);
        }
    }
    

7.4 Reduce output adopts compression

Mapper and Reducer remain unchanged (see 7.3 for details)

Case processing based on WordCount.

package com.bigdata.hadoop.mapreduce.compress;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        Configuration configuration = new Configuration();

        // Enable map output compression
        configuration.setBoolean("mapreduce.map.output.compress", true);
        // Set the map output compression mode
        configuration.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);

        Job job = Job.getInstance(configuration);

        job.setJarByClass(WordCountDriver.class);

        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        // Set the output compression on the reduce side
        FileOutputFormat.setCompressOutput(job, true);

        // Set compression mode
        FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);

        boolean result = job.waitForCompletion(true);

        System.exit(result ? 1 : 0);
    }
}

Tags: Big Data Hadoop

Posted by frkmilla on Fri, 06 May 2022 07:30:28 +0300