[authoritative guide to hadoop 4th Edition] Chapter IV IO of hadoop [notes + code]

4.1 data integrity

The common method to detect damaged data is to calculate the checksum of the data when entering the system for the first time. If the newly generated checksum after transmission does not completely match the original checksum, the data will be considered damaged.

  • Note that the checksum may be wrong and the data is correct, but this is unlikely because the checksum is much smaller than the data.

A common data detection code is CRC-32 (cyclic redundancy check), which calculates the integer checksum of any size input of 32 bits.

4.4.1 data integrity of HDFS

The client writes data and sends it to the pipeline of a data node. The checksum is verified at the last data node on the pipeline. If an error is detected, the client will receive a checksum Exception, which is a subclass of IOException.

When the client reads the data on the data node, it will verify the checksum and compare it with the checksum stored on the data node. Each data node maintains a continuous checksum verification log, so it knows the last verification time of each data block. After the client successfully verifies the data block, it will tell the data node, which will update the log accordingly. Maintaining such statistics is valuable for detecting damaged disks.

After verifying the data read by the client, each data node will also run a datablockscanner (data block detector) in the background thread to regularly verify all blocks stored on the data node. This is to prevent data corruption caused by bit attenuation in physical storage media.

HDFS has a copy of the block, which can be directly copied as a correction.

  • How it works

If the client detects an error, report the bad block and the data node it reads from the name node before throwing the checksum Exception. The name node marks the block as corrupt, so it will not be copied directly to the client or to a secondary copy elsewhere. He will copy a new copy from another copy, and the damaged copy will be deleted.

However, some files don't want to be deleted automatically. Maybe he can save them?

Before using the open () method to read the file, we can disable checksum verification by passing false to the setverichecksum () method in the filesystem. If it is a shell command, you can use the - ignoreCrc option in - get or its equivalent - copyTOLocal command.

4.2 compression

  • Faster compression and decompression usually consumes more space. (balance of time and space)
#The fastest compression method, and ZIP are general compression tools, which are relatively balanced in space-time processing.
gzip -l file
  • LZO uses the speed optimal algorithm, but the compression efficiency is slightly lower.

4.2.1 encoder and decoder

Compress the data read from standard input and write it to standard output

public class StreamCompressor {
    public static void main(String[] args) throws Exception {
    String codecClassname = args[0];
    Class<?> codecClass = Class.forName(codecClassname);
    Configuration conf = new Configuration();
    This document is provided by Linux commune www.linuxidc.com Collection and arrangement
     CompressionCodec codec = (CompressionCodec)
    ReflectionUtils.newInstance(codecClass, conf);
    CompressionOutputStream out = codec.createOutputStream(System.out);
    IOUtils.copyBytes(System.in, out, 4096, false);
    out.finish();
    }
}

Use the above StreamCompressor program and GunCodec to compress the string "text", and then use gunzip to decompress it from standard input.

% echo "Text" | hadoop StreamCompressor org.apache.hadoop.io.compress.GzipCodec \
| gunzip -
Text

4.2.3 use of compression in MR

According to the extension of the file, use the encoder and decoder to decompress the compressed file

public class FileDecompressor {
    public static void main(String[] args) throws Exception {
        String uri = args[0];
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(uri), conf);
        Path inputPath = new Path(uri);
        CompressionCodecFactory factory = new CompressionCodecFactory(conf);
        CompressionCodec codec = factory.getCodec(inputPath);
        if (codec == null) {
            System.err.println("No codec found for " + uri);
            System.exit(1);
        }
        String outputUri = CompressionCodecFactory.removeSuffix(uri, codec.getDefaultExtension());
        InputStream in = null;
        OutputStream out = null;
        try {
            in = codec.createInputStream(fs.open(inputPath));
            out = fs.create(new Path(outputUri));
            IOUtils.copyBytes(in, out, conf);
        } finally {
            IOUtils.closeStream(in);
            IOUtils.closeStream(out);
        }
    }
}
% hadoop FileDecompressor file.gz

Use the compression pool program to compress the data read in from standard input and written to standard output

public class MaxTemperatureWithCompression {
    public static void main(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println("Usage: MaxTemperatureWithCompression <input path> " + "<output path>");
            System.exit(-1);
        }
        Job job = new Job();
        job.setJarByClass(MaxTemperature.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileOutputFormat.setCompressOutput(job, true);
        FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
        job.setMapperClass(MaxTemperatureMapper.class);
        job.setCombinerClass(MaxTemperatureReducer.class);
        job.setReducerClass(MaxTemperatureReducer.class);
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

Produce compressed output results

usage

% hadoop MaxTemperatureWithCompression input/ncdc/sample.txt.gz output

result

% gunzip -c output/part-r-00000.gz
1949 111
1950 22

4.3 serialization

characteristic

  • Compact

    A compact format makes full use of network bandwidth, which is the most scarce resource in the data center.

  • fast

    Inter process communication is the backbone of distributed system, so it must minimize the serialization and deserialization overhead.

  • Extensible

    The protocol changes over time to meet new requirements, so it should evolve directly into a client-side and server-side control protocol. For example, he should be able to add a new parameter method call and have a new server to receive old format messages from old clients (excluding new parameters).

  • Interoperability

    For some systems, it is best to support the client written in different languages to be written to the server, so the file format needs to be carefully designed for this purpose.

hadoop uses its own serialization format, writeables, which is compact and fast (when it is not easy to extend or languages other than java). Because writeables is hadoop's Hexing (MR program uses it to serialize key value pairs).

Custom writeable, a pair of strings

import java.io.*;
import org.apache.hadoop.io.*;

public class TextPair implements WritableComparable<TextPair> {
    private Text first;
    private Text second;

    public TextPair() {
        set(new Text(), new Text());
    }

    public TextPair(String first, String second) {
        set(new Text(first), new Text(second));
    }

    public TextPair(Text first, Text second) {
        set(first, second);
    }

    public void set(Text first, Text second) {
        this.first = first;
        this.second = second;
    }

    public Text getFirst() {
        return first;
    }

    public Text getSecond() {
        return second;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        first.write(out);
        second.write(out);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        first.readFields(in);
        second.readFields(in);
    }

    @Override
    public int hashCode() {
        return first.hashCode() * 163 + second.hashCode();
    }

    @Override
    public boolean equals(Object o) {
        if (o instanceof TextPair) {
            TextPair tp = (TextPair) o;
            return first.equals(tp.first) && second.equals(tp.second);
        }
        return false;
    }

    @Override
    public String toString() {
        return first + "\t" + second;
    }

    @Override
    public int compareTo(TextPair tp) {
        int cmp = first.compareTo(tp.first);
        if (cmp != 0) {
            return cmp;
        }
        return second.compareTo(tp.second);
    }
}

4.4 file based data structure

For some applications, a special data structure is needed to store data. For running MR based processes, each binary data block is put into its own file, which is not easy to expand. Therefore, hadoop has developed a series of advanced containers.

Write a sequenceFile class

public class SequenceFileWriteDemo {
    private static final String[] DATA = { "One, two, buckle my shoe", "Three, four, shut the door",
            "Five, six, pick up sticks", "Seven, eight, lay them straight", "Nine, ten, a big fat hen" };

    public static void main(String[] args) throws IOException {
        String uri = args[0];
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(uri), conf);
        Path path = new Path(uri);
        IntWritable key = new IntWritable();
        Text value = new Text();
        SequenceFile.Writer writer = null;
        try {
            writer = SequenceFile.createWriter(fs, conf, path, key.getClass(), value.getClass());
            for (int i = 0; i < 100; i++) {
                key.set(100 - i);
                value.set(DATA[i % DATA.length]);
                System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value);
                writer.append(key, value);
            }
        } finally {
            IOUtils.closeStream(writer);
        }
    }
}

Read a sequence file

public class SequenceFileReadDemo {
    public static void main(String[] args) throws IOException {
        String uri = args[0];
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(uri), conf);
        Path path = new Path(uri);
        SequenceFile.Reader reader = null;
        try {
            reader = new SequenceFile.Reader(fs, path, conf);
            Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
            Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
            long position = reader.getPosition();
            while (reader.next(key, value)) {
                String syncSeen = reader.syncSeen() ? "*" : "";
                System.out.printf("[%s%s]\t%s\t%s\n", position, syncSeen, key, value);
                position = reader.getPosition(); // beginning of next record
            }
        } finally {
            IOUtils.closeStream(reader);
        }
    }
}

Tags: Python Java Big Data Hadoop

Posted by flash99 on Mon, 02 May 2022 12:03:14 +0300