Here is a new article about DataCrunch, following DataCrunch Introduction. We will focus on DataCrunch design and technical implementation.

This article details how we perform analytics computations on all our scientific datasets in native format (Netcdf, Hdf, ...), using terabytes of data and millions files, with only 30 lines of Python code (or Matlab, R ...).

Requirements

Here are some constraints, which lead the design choices for Datacrunch implementation:

  • easier use of tons of data to perform quick analytics or data mining, with high performances and heavy distributed jobs using up to hundreds terabytes : use Hadoop, since it appears at this scale the most (only?) appealing solution to cope with our needs. Although it is not designed to deal out of the box with binary structured data like our datasets, the challenge to use it efficiently was interesting.

  • Don't break compatibility with existing software and user practices

    • Language : our scientists mostly use Python, IDL, Matlab ... for data analysis. No way to ask them to write Java code...
    • Filesystem : using HDFS would break compatibility for most existing softwares. Moreover, it is not designed to be a daily storage support to work with a few data (high latency, no cache..)

Some drastic choices...

  • Use Python with Hadoop-Streaming : not the most efficient way to work with hadoop (especially when working with binary data...), but allows quick prototyping for us and language is well known in our team. A good comprise between performances and convenience is OK for us.

  • Use MooseFS distributed Filesystem instead of Hadoop HDFS : it could look strange to use another storage layer than HDFS with Hadoop (this looks more common nowadays, cf. MapR distribution for example). Using MooseFS under high Hadoop Map/Reduce processing load was not an issue (contrarily with NFS... yes, we tried ;-)). Also MooseFS is our main filesystem, already containing most of our data, thus no need to copy them to HDFS. Finally, major drawback is the loss of data locality since we don't have it natively, although this could be solved in some way (more on this HERE! TODO). In practice, from the performances point of view and due to the way we use our binary structured data (ie. using only slices of data at a time), performances impact was not significant. And what about the bandwidth ? Well, even during high processings it is far from beeing a bottleneck, so who cares ?

How does it work, roughly

  • Inputs :

    • 1 listing of files to process (usually with Hadoop, it should be the data to stream, but since we don't care about data locality and we not want to stream the binary structured data, this is an acceptable choice). This way, files to process are distributed between mappers
    • 1 configuration file for the job (containing user expected outputs, options etc...), transmitted to mapper using Hadoop environment variable
  • Mapper : Mapping processing is based on 2 python files. First comes from Datacrunch framework, and helps with data analysis (eg. preparing data for reduce phase, performing indexation in HBase). Second is the User (or Data) mapper, which usually consists of a few methods to implement (inherited from the main Mapper class of Datacrunch), to handle data heterogeneity : getTime(), getLatLon(), getVariable().

  • Reducer : default reducer computes basic statistics per key (min, max, mean, std, percentiles, sum, ...). Not much intelligence in reducer, but this can already provide a lot of interesting results when carefully choosing keys in the mapper. Of course, reducer could be customized for specific studies.

  • Final Outputs : mapper Keys emitted depend on job configuration file (from user request). Here are a few outputs available (mean/max/std.... per day/month/year...) : timeseries, maps, hovmoller, file listings, counters... Depending on the final format expected, results from reducer are post-processed to generate outputs easy to use for the user or for the datacrunch web gui (png generation, json files ...)

To make it short: with this basic design, we can efficiently extract meaningful informations from terabytes datasets with millions files on our existing filesystem, using only 1 python file (usually ~30 lines in Python or Matlab to read data), using Map/Reduce distributed processing. Thanks to Python with Hadoop-streaming and Datacrunch framework (2 python classes + django for web gui and a few bash scripts for convenience), processing time is a matter of minutes !


Comments

comments powered by Disqus