-
Notifications
You must be signed in to change notification settings - Fork 17
Progress
Weekly progress reports are maintained here.
- Clean up maven stuff and setup the POMs properly.
- Implemented unit test for DistRedirects and bash scripts to automate extraction output verification/testing.
- Separated config files and classes for Spark configuration and normal extraction configuration.
- Code review + merge the stuff above
- Fix issue #9 (Some Extractors fail with ConcurrentModificationException) - we can't go forward if all extractors don't work I guess?
- Regarding the extractors that do work: Parallel extraction works well, template redirects extraction using DistRedirects works too. As of now DistributedExtractionJob collects all results in-memory and then writes to output file. Need to fix that.
- Fix issue #4 - need to write a Hadoop InputFormat class that will read the multistream bz2 file, split it and parse it into WikiPages - this will be used by the Spark code to build up an RDD (distributed over N nodes). Also, this will work for both dumps on HDFS and dumps on local file system.
- All extractors work, issue #9 fixed. Except the (stateful) InfoboxExtractor.
- Maven uber jars are used for ease of Spark deployment.
- Refactored DistIOUtils code to make Kryo-based saving/loading RDDs fast.
- Scalable job outputs - iterate over RDD elements locally and write to local file instead of collecting the whole output to memory
- Milestone 2 practically finished. What's left: Scalable input with InputFormat + better code documentation.
- The Many code changes to make everything work with Path and XmlInputFormat pull request consists of the bulk of work done.
- Hadoop works with Paths and FileSystem which provide an abstraction over files and file systems. That is to say, Hadoop encapsulates the stuff about distributed or local files and files systems and lets us work with Paths and InputFormats. The aforementioned PR consists of many code changes to make the framework Path-friendly.
- Added the Mahout InputFormat.
- A couple more commits are left to officially complete milestone2 and milestone3 together:
- Merging DistConfig and Config into a single DistConfig (subclassing and delegation) to solve the problem discussed in this pull request.
- Adding support for compressed files
- Multiple languages in parallel using a FIFO/Fair scheduler.
- Discussed parallel/distributed downloads in this issue and came up with an implementation plan.
I think a good time to test out the framework will be once we have milestone3 done - actually do some proper testing and draw up some plots to show concrete results, some examples being:
- WC (wall clock) Time vs No. of machines, for wiki dump size (GB) = A, B, C, D
- WC Time vs wiki dump size (GB), for no. of machines = A, B, C, D
- WC Time vs No. of resulting RDF edges, for no. of machines = A, B, C, D
- Comparison of WC Time taken for (a) FIFO scheduling vs (b) Fair scheduling - when running parallel multiple language extractions (the insights we draw from this should be important)
- Resource-utilization graphs to test Spark/Hadoop based settings - CPU loads, disk I/O, proper distribution of load among the nodes - often you can get 2-3x improvements by crafting your getSplits method correctly. Ganglia graphs for resource utilization are superb for Hadoop scheduler-specific tests, comparing schedulers, testing the Hadoop delay scheduler, seeing how much delay works best for our case. You can find some examples here, if you scroll till the end: http://www.infoq.com/articles/HadoopInputFormat.
1 and 2 would kind of look like the figure below (taken from Efficient Large-Scale Graph Analysis in MapReduce) if you consider each line representing (A, B, C, D...) and X axis representing dump size, or no. of machines:
On the other hand, it might be a good idea to do a few small tests for now just to check if the distributed framework doesn't do any obvious blow-ups while processing, say, the English wikipedia dump, and reserve the proper testing and graph-plotting stuff for later (once we fix InfoboxExtractor - Christopher gave me some tips about that too which will come of use later).
- The merged pull request Completes milestone 3: Single united DistConfig class instead of two; Support for parallel multiple language extraction makes two key changes:
- Merge all config functionality into the single
DistConfig
class. Quoting from the PR, DistConfig wraps a Config instance and handles the logic required for selecting which properties should come from which files. Also, ConfigLoader is not extended anymore by DistConfigLoader, ExecutionJob is not extended anymore by DistExecutionJob - the inheritance was adding increasingly needless trouble. - Support for parallel extraction of multiple languages was added.
- Merge all config functionality into the single
- This almost concludes milestone3, except for two features we need to add:
- A custom
OutputFormat
so that we can write the outputs via Hadoop's file system API - this will avoid streaming the whole output data through the master node as it currently happens. - Support for reading compressed input.
- A custom
- Set Hadoop version to 2.2.0 and make certain changes to prevent compile errors (Hadoop API changes?) and move from deprecated API to newer API.
- A custom InputFormat called
DBpediaWikiPageInputFormat
is added (which uses two classes,SeekableInputStream
andByteMatcher
) that supports compressed input. It also supports splittable compression methods like bz2 which is a huge benefit for us. Here's how: say we have this input file, enwiki-latest-pages-articles-multistream.xml.bz2. It's > 10GB. It's compressed such that Hadoop's splittable input stream implementations can break it into chunks and decompress them on the fly parallely. Parallel decompression and subsequently parallel operations on the data. - A custom OutputFormat called
DBpediaCompositeOutputFormat
) is used to write outputs in parallel instead of streaming it over the master iteratively and writing to Hadoop FileSystem - this obviously eliminates a big performance bottleneck.
- Some changes to clean up the Destination classes: https://github.com/dbpedia/distributed-extraction-framework/pull/31
- Make logging level and number of job submission threads configurable. Add logging for failed tasks in DBpediaJobProgressListener - https://github.com/dbpedia/distributed-extraction-framework/pull/32
- Add RichHadoopPath.getSchemeWithFileName and use it instead of Path.toString: https://github.com/dbpedia/distributed-extraction-framework/pull/34
- Remove all synchronization from OutputFormats - https://github.com/dbpedia/distributed-extraction-framework/pull/30
- Small fixes and enhancements
- Work on a Python script to setup a Spark+HDFS cluster on Google Compute Engine.
- Test out the framework on a GCE cluster, try multiple configurations, come up with some sensible values, fix some bugs.
- Move redirect computation to
DistExtractionJob
so that the redirect computation doesn't end up becoming a bottleneck. Previously, in a multiple language extraction, all the redirects for all languages would be computed before the actual parallel extraction started. Even though on the individual language-scale redirects were being computed in parallel, the "individual language redirect extraction" jobs were executed sequentially! Moving redirects intoDistExtractionJob
makes a single language a single logical job and the framework is now truly parallel. - Get all the code written till now in
nildev2
merged tomaster
. Distributed extraction is in a pretty much stable state now.
Scalability and speedup plots and statistics are to follow later, including resource utilization graphs for distributed extraction, but for now, here are some time measurements:
Cluster: 1 master (2 core 7.5G RAM - GCE n1-standard-2), 2 slaves (4 core 15G RAM each - GCE n1-standard-4) with 4 workers in each slave
Languages: en
Original extraction framework on single slave node using as many threads as no. of cores:
- Redirects extraction: 2 hrs. 23 min.
- Running extractors: 5 hrs. 7 min.
- Total time: 7 hrs. 10 min.
Distributed extraction framework:
- Redirects extraction: 39 min.
- Running extractors: 2 hrs. 42 min.
- Total time: 3 hrs. 21 min.
Overall Speedup: 2.14
- Distributed downloads
- If we get time, or after GSoC: Scalability and speedup plots of distributed extraction and distributed downloads over a cluster. Write up a report with the results.
- If we get time, or after GSoC: Streaming extraction: Download chunks of X MB and send them to Spark for computation in a streaming fashion. The redirects are extracted and aggregated (to get the redirects map for the whole language) and saved to disk. Once all the redirects are extracted (which will of course be when the whole language dump is downloaded and its computation finished) we can send it to Spark to run the extractors on them.
- Make InfoboxExtractor work.
- Clean up some code, add docs. Distributed extraction is pretty much packed.
- Rename the single child module to extraction. Create a new maven module called common and shift some code from extraction to it. These will be needed by both download and extraction.
- Create the download module and being work on distributed downloads. Build the
DistDownloadConfig
class. - Discuss and finalize the overall approach towards distributed downloads. Re: http://typesafe.com/activator/template/akka-distributed-workers and http://nileshc.com/blog/2014/06/distributed_downloads_of_wikipedia_xml_dumps_part_1/
- Add the akka message classes to be used for inter-actor communication.
- Implement worker-side download job runner logic, download job tracker and the worker actor itself. A new mixin called
ActoredCounter
was added to replace the role ofCounter
(track and log progress using a callback) - Add frontend/driver node-side actors: DownloadClient --> Master --> DownloadResultConsumer
- Numerous features and fixes that constitute the bulk of the pull request: https://github.com/dbpedia/distributed-extraction-framework/pull/56/commits
- Polish up the docs and comments.