Skip to content
nilesh-c edited this page Sep 3, 2014 · 10 revisions

Weekly progress reports are maintained here.

Week 1 : May 19 - May 25

Done

  • Clean up maven stuff and setup the POMs properly.
  • Implemented unit test for DistRedirects and bash scripts to automate extraction output verification/testing.
  • Separated config files and classes for Spark configuration and normal extraction configuration.

Priority Todo

  • Code review + merge the stuff above
  • Fix issue #9 (Some Extractors fail with ConcurrentModificationException) - we can't go forward if all extractors don't work I guess?
  • Regarding the extractors that do work: Parallel extraction works well, template redirects extraction using DistRedirects works too. As of now DistributedExtractionJob collects all results in-memory and then writes to output file. Need to fix that.
  • Fix issue #4 - need to write a Hadoop InputFormat class that will read the multistream bz2 file, split it and parse it into WikiPages - this will be used by the Spark code to build up an RDD (distributed over N nodes). Also, this will work for both dumps on HDFS and dumps on local file system.

Weeks 2 and 3: May 26 - June 8

Done

  • All extractors work, issue #9 fixed. Except the (stateful) InfoboxExtractor.
  • Maven uber jars are used for ease of Spark deployment.
  • Refactored DistIOUtils code to make Kryo-based saving/loading RDDs fast.
  • Scalable job outputs - iterate over RDD elements locally and write to local file instead of collecting the whole output to memory
  • Milestone 2 practically finished. What's left: Scalable input with InputFormat + better code documentation.

Week 4: June 9 - June 15

Done

  • The Many code changes to make everything work with Path and XmlInputFormat pull request consists of the bulk of work done.
  • Hadoop works with Paths and FileSystem which provide an abstraction over files and file systems. That is to say, Hadoop encapsulates the stuff about distributed or local files and files systems and lets us work with Paths and InputFormats. The aforementioned PR consists of many code changes to make the framework Path-friendly.
  • Added the Mahout InputFormat.
  • A couple more commits are left to officially complete milestone2 and milestone3 together:
    • Merging DistConfig and Config into a single DistConfig (subclassing and delegation) to solve the problem discussed in this pull request.
    • Adding support for compressed files
    • Multiple languages in parallel using a FIFO/Fair scheduler.
  • Discussed parallel/distributed downloads in this issue and came up with an implementation plan.

Discussion on Testing

I think a good time to test out the framework will be once we have milestone3 done - actually do some proper testing and draw up some plots to show concrete results, some examples being:

  1. WC (wall clock) Time vs No. of machines, for wiki dump size (GB) = A, B, C, D
  2. WC Time vs wiki dump size (GB), for no. of machines = A, B, C, D
  3. WC Time vs No. of resulting RDF edges, for no. of machines = A, B, C, D
  4. Comparison of WC Time taken for (a) FIFO scheduling vs (b) Fair scheduling - when running parallel multiple language extractions (the insights we draw from this should be important)
  5. Resource-utilization graphs to test Spark/Hadoop based settings - CPU loads, disk I/O, proper distribution of load among the nodes - often you can get 2-3x improvements by crafting your getSplits method correctly. Ganglia graphs for resource utilization are superb for Hadoop scheduler-specific tests, comparing schedulers, testing the Hadoop delay scheduler, seeing how much delay works best for our case. You can find some examples here, if you scroll till the end: http://www.infoq.com/articles/HadoopInputFormat.

1 and 2 would kind of look like the figure below (taken from Efficient Large-Scale Graph Analysis in MapReduce) if you consider each line representing (A, B, C, D...) and X axis representing dump size, or no. of machines:

<Figure 6: Mat-Vec performance: Pegasus Naive vs Naive vs Partitioned; subgraph-size = 2 19 , partition-size = 2 18 , fraction of border edges = 20%.> from https://www.cs.purdue.edu/homes/kkambatl/papers/tr-graph-analysis.pdf

On the other hand, it might be a good idea to do a few small tests for now just to check if the distributed framework doesn't do any obvious blow-ups while processing, say, the English wikipedia dump, and reserve the proper testing and graph-plotting stuff for later (once we fix InfoboxExtractor - Christopher gave me some tips about that too which will come of use later).

Weeks 5 and 6: June 16 - June 29

Done

  • The merged pull request Completes milestone 3: Single united DistConfig class instead of two; Support for parallel multiple language extraction makes two key changes:
    • Merge all config functionality into the single DistConfig class. Quoting from the PR, DistConfig wraps a Config instance and handles the logic required for selecting which properties should come from which files. Also, ConfigLoader is not extended anymore by DistConfigLoader, ExecutionJob is not extended anymore by DistExecutionJob - the inheritance was adding increasingly needless trouble.
    • Support for parallel extraction of multiple languages was added.
  • This almost concludes milestone3, except for two features we need to add:
    • A custom OutputFormat so that we can write the outputs via Hadoop's file system API - this will avoid streaming the whole output data through the master node as it currently happens.
    • Support for reading compressed input.

Week 7: June 30 - July 6

Done

  1. Set Hadoop version to 2.2.0 and make certain changes to prevent compile errors (Hadoop API changes?) and move from deprecated API to newer API.
  2. A custom InputFormat called DBpediaWikiPageInputFormat is added (which uses two classes, SeekableInputStream and ByteMatcher) that supports compressed input. It also supports splittable compression methods like bz2 which is a huge benefit for us. Here's how: say we have this input file, enwiki-latest-pages-articles-multistream.xml.bz2. It's > 10GB. It's compressed such that Hadoop's splittable input stream implementations can break it into chunks and decompress them on the fly parallely. Parallel decompression and subsequently parallel operations on the data.
  3. A custom OutputFormat called DBpediaCompositeOutputFormat) is used to write outputs in parallel instead of streaming it over the master iteratively and writing to Hadoop FileSystem - this obviously eliminates a big performance bottleneck.

Week 8: July 6 - July 13

Done

  1. Some changes to clean up the Destination classes: https://github.com/dbpedia/distributed-extraction-framework/pull/31
  2. Make logging level and number of job submission threads configurable. Add logging for failed tasks in DBpediaJobProgressListener - https://github.com/dbpedia/distributed-extraction-framework/pull/32

Week 9: July 13 - July 20

Done

  1. Add RichHadoopPath.getSchemeWithFileName and use it instead of Path.toString: https://github.com/dbpedia/distributed-extraction-framework/pull/34
  2. Remove all synchronization from OutputFormats - https://github.com/dbpedia/distributed-extraction-framework/pull/30
  3. Small fixes and enhancements

Week 10: July 21 - July 27

Done

  1. Work on a Python script to setup a Spark+HDFS cluster on Google Compute Engine.
  2. Test out the framework on a GCE cluster, try multiple configurations, come up with some sensible values, fix some bugs.
  3. Move redirect computation to DistExtractionJob so that the redirect computation doesn't end up becoming a bottleneck. Previously, in a multiple language extraction, all the redirects for all languages would be computed before the actual parallel extraction started. Even though on the individual language-scale redirects were being computed in parallel, the "individual language redirect extraction" jobs were executed sequentially! Moving redirects into DistExtractionJob makes a single language a single logical job and the framework is now truly parallel.
  4. Get all the code written till now in nildev2 merged to master. Distributed extraction is in a pretty much stable state now.

Scalability and speedup plots and statistics are to follow later, including resource utilization graphs for distributed extraction, but for now, here are some time measurements:

Cluster: 1 master (2 core 7.5G RAM - GCE n1-standard-2), 2 slaves (4 core 15G RAM each - GCE n1-standard-4) with 4 workers in each slave

Languages: en

Original extraction framework on single slave node using as many threads as no. of cores:

  • Redirects extraction: 2 hrs. 23 min.
  • Running extractors: 5 hrs. 7 min.
  • Total time: 7 hrs. 10 min.

Distributed extraction framework:

  • Redirects extraction: 39 min.
  • Running extractors: 2 hrs. 42 min.
  • Total time: 3 hrs. 21 min.

Overall Speedup: 2.14

Todo:

  1. Distributed downloads
  2. If we get time, or after GSoC: Scalability and speedup plots of distributed extraction and distributed downloads over a cluster. Write up a report with the results.
  3. If we get time, or after GSoC: Streaming extraction: Download chunks of X MB and send them to Spark for computation in a streaming fashion. The redirects are extracted and aggregated (to get the redirects map for the whole language) and saved to disk. Once all the redirects are extracted (which will of course be when the whole language dump is downloaded and its computation finished) we can send it to Spark to run the extractors on them.

Week 11: July 28 - August 3

Done

  1. Make InfoboxExtractor work.
  2. Clean up some code, add docs. Distributed extraction is pretty much packed.

Week 12: August 4 - August 10

Done

  1. Rename the single child module to extraction. Create a new maven module called common and shift some code from extraction to it. These will be needed by both download and extraction.
  2. Create the download module and being work on distributed downloads. Build the DistDownloadConfig class.
  3. Discuss and finalize the overall approach towards distributed downloads. Re: http://typesafe.com/activator/template/akka-distributed-workers and http://nileshc.com/blog/2014/06/distributed_downloads_of_wikipedia_xml_dumps_part_1/

Week 10: August 11 - August 18

Done

  1. Add the akka message classes to be used for inter-actor communication.
  2. Implement worker-side download job runner logic, download job tracker and the worker actor itself. A new mixin called ActoredCounter was added to replace the role of Counter (track and log progress using a callback)
  3. Add frontend/driver node-side actors: DownloadClient --> Master --> DownloadResultConsumer
  4. Numerous features and fixes that constitute the bulk of the pull request: https://github.com/dbpedia/distributed-extraction-framework/pull/56/commits
  5. Polish up the docs and comments.