Counting words with NATS

Still going through the Exercises in Programming Styles, after diving into Haskell and Transducers for a bit, and since I’ve mentioned to other HackerSchoolers about NATS, I decided to use it for the map reduce programming example. It is somewhat loosely based on the dist-adder example from Derek.

NATS server

There is readily available gnatsd Docker container that we can use to start the server that will be used by the clients to communicate:

sudo docker run -p 4222:4222 apcera/gnatsd

Aggregator

I call it the aggregator, but basically this process will be in charge of dispatching a range of the lines in the text and dispatch the task to another node so that it computes the word frequency computation.

require 'nats/client'
require 'json'

NATS.start {

  # Run options
  $stdout.sync = true
  ["TERM", "INT"].each { |sig| trap(sig) { NATS.stop } }
  SRC_ROOT   = File.join(File.expand_path("."), "src", "exercises-in-programming-style")
  PRIDE_AND_PREJUDICE = File.join(SRC_ROOT, "pride-and-prejudice.txt")
  STOP_WORDS = File.join(SRC_ROOT, 'stop_words.txt')

  # Compute the stop words once.
  # This payload information is small enough that it will be transmitted
  # to the frequency counters via the channel
  @stop_words = File.read(STOP_WORDS).split(',')
  @stop_words << ('a'..'z').to_a # also the alphabet
  @stop_words.flatten!.uniq!

  # Initialize
  @words = Hash.new {|h,k| h[k] = 0 }
  @available_computing_nodes = []

  # Discovery Channel
  NATS.subscribe('pride-prejudice.discovery') do |msg, reply, sub|
    computing_node = JSON.parse(msg)
    unless @available_computing_nodes.include?(computing_node)
      puts "[DISCOVERED]      :: #{computing_node}"
      @available_computing_nodes << computing_node
      puts "[AVAILABLE NODES] :: #{@available_computing_nodes.count}"
    end
  end

  # {"id"=>3, "results"=>{"words"=>[{"test"=>1}]}}
  NATS.subscribe('pride-prejudice.responses') do |msg, reply, sub|
    results = JSON.parse(msg)
    puts "[DONE]      :: Job #{results['id']} is done."

    # Mark the job as done
    @chunks[results['id']][:done]    = true

    begin
      # Use the partial results and start to count the words
      counted_words = results['results']['words']
      counted_words.each_pair do |w, c|
        @words[w] += c
      end
    rescue => e
      puts "Error while trying to count the words..."
      puts e
      puts e.backtrace
    end

    puts "TOP counted words so far"
    @words.sort {|a,b| a[1] <=> b[1]}.reverse[0...25].each do |k, v|
      puts "#{k}  -  #{v}"
    end
  end

  puts "Waiting 5 seconds to get resources for the job..."
  EM.add_timer(5) do
    pride_and_prejudice_text  = File.read(PRIDE_AND_PREJUDICE)
    total_lines = pride_and_prejudice_text.lines.count
    puts "Total lines to split: #{total_lines}"

    # Most likely cannot split the computation perfectly into the number of nodes,
    # so we take the remaining lines and add them to the first batch
    chunk_size = total_lines / (@available_computing_nodes.count)
    out_of_chunk = total_lines % @available_computing_nodes.count
    puts "Chunk size per node: #{chunk_size}"

    # Read the file, count the number of lines, and divide in chunks
    # according to the number of available nodes
    @chunks = {} # {index => {:start, :end, :done, :stop_words }}
    chunk_start = 0
    chunk_end   = 0
    1.upto(@available_computing_nodes.count) do |n|
      chunk_end += chunk_size
      if out_of_chunk > 0
        chunk_size += out_of_chunk
        out_of_chunk = 0
      end
      chunk_end  = [chunk_end, total_lines].min
      @chunks[n]  = {:start => chunk_start, :end => chunk_end, :done => false, :stop_words => @stop_words }
      chunk_start = chunk_end + 1
    end

    @chunks.each do |job|
      job_id, range = job

      # Only want one checker to respond to this
      NATS.request('pride-prejudice.requests', nil, :max => 1) do |response|
        node = JSON.parse(response)
        puts "[REQUEST]   :: Job ##{job_id} needs to be done. Anyone can help? Range is (#{range[:start]}:#{range[:end]})"
        NATS.publish("pride-prejudice.#{node['id']}.compute", job.to_json) do
          puts "[HOPING]    :: #{range[:start]} -- #{range[:end]} to be done by #{node['id']}."
        end
      end
    end
  end
}

Word Frequency Counter

Then each one of the counters, will receive a chunk of words to process and ignore, then reply with the partial computed frequency when done.

While receiving tasks, we counter taints itself a little according to the number of tasks that it is in charge of processing.

require 'nats/client'
require 'securerandom'
require 'json'

$stdout.sync = true
["TERM", "INT"].each { |sig| trap(sig) { NATS.stop } }
SRC_ROOT = File.join(File.expand_path("."), "src", "exercises-in-programming-style")
PRIDE_AND_PREJUDICE = File.join(SRC_ROOT, "pride-and-prejudice.txt")

ID   = SecureRandom.uuid
INFO = {'id' => ID }

def compute(range)
  range_start     = range['start'].to_i
  range_end       = range['end'].to_i
  stop_words      = range['stop_words']
  words_frequency = Hash.new {|h,k| h[k] = 0 }

  # Read local copy of the document and fetch that range of lines
  lines = File.read(PRIDE_AND_PREJUDICE).lines[range_start..range_end]
  lines.each do |line|
    line.gsub!(/[^a-zA-Z0-9]/, " ") # remove non alphanumeric
    words = line.split(" ")
    words.each do |w|
      next if stop_words.include?(w.downcase)
      words_frequency[w.downcase] += 1
    end
  end

  results = {'words' => words_frequency }

  results
end

NATS.start do

  @offerings = 0

  EM.add_periodic_timer(1) do
    NATS.publish('pride-prejudice.discovery', INFO.to_json)
  end

  NATS.subscribe('pride-prejudice.requests') do |msg, reply, sub|
    EM.add_timer(@offerings) { NATS.publish(reply, INFO.to_json) }
    @offerings += 1 # decrease taint delay
  end

  NATS.subscribe("pride-prejudice.#{ID}.compute") do |msg, reply, sub|
    job = JSON.parse(msg)

    job_id, range = job
    puts "[OK]        :: Start to work on (#{range['start']}:#{range['end']})"
    results = compute(range)
    @offerings -= 1 # delay ourselves according to the number of task being done

    job_done = {
     :id      => job_id,
     :results => results
    }
    NATS.publish("pride-prejudice.responses", job_done.to_json)
  end
end

Output

Once the run is done the output would look something similar to this:

nats-server         -- started with pid 15660
aggregator          -- Waiting 5 seconds to get resources for the job...
aggregator          -- [DISCOVERED]      :: {"id"=>"251adacb-0605-4e96-8904-9fd5f239fce2"}
aggregator          -- [AVAILABLE NODES] :: 1
aggregator          -- [DISCOVERED]      :: {"id"=>"a97518f9-2c10-4362-9800-5ec8e57651b9"}
aggregator          -- [AVAILABLE NODES] :: 2
aggregator          -- [DISCOVERED]      :: {"id"=>"4ce45026-3cba-4209-a17b-e27e425366c0"}
aggregator          -- [AVAILABLE NODES] :: 3
aggregator          -- Total lines to split: 13426
aggregator          -- Chunk size per node: 4475
aggregator          -- [REQUEST]   :: Job #1 needs to be done. Anyone can help? Range is (0:4475)
aggregator          -- [HOPING]    :: 0 -- 4475 to be done by 4ce45026-3cba-4209-a17b-e27e425366c0.
frequency-counter:1 -- [OK]        :: Start to work on (0:4475)
aggregator          -- [DONE]      :: Job 1 is done.
aggregator          -- TOP counted words so far
aggregator          -- mr  -  395
aggregator          -- elizabeth  -  202
aggregator          -- very  -  198
aggregator          -- bingley  -  181
aggregator          -- darcy  -  162
aggregator          -- bennet  -  160
aggregator          -- miss  -  142
aggregator          -- such  -  131
aggregator          -- much  -  130
aggregator          -- mrs  -  121
aggregator          -- jane  -  105
aggregator          -- more  -  102
aggregator          -- collins  -  95
aggregator          -- one  -  95
aggregator          -- though  -  77
aggregator          -- think  -  74
aggregator          -- being  -  73
aggregator          -- know  -  72
aggregator          -- never  -  71
aggregator          -- lady  -  70
aggregator          -- well  -  69
aggregator          -- good  -  67
aggregator          -- man  -  65
aggregator          -- soon  -  64
aggregator          -- before  -  63
aggregator          -- [REQUEST]   :: Job #2 needs to be done. Anyone can help? Range is (4476:8951)
aggregator          -- [HOPING]    :: 4476 -- 8951 to be done by 251adacb-0605-4e96-8904-9fd5f239fce2.
frequency-counter:3 -- [OK]        :: Start to work on (4476:8951)
aggregator          -- [DONE]      :: Job 2 is done.
aggregator          -- TOP counted words so far
aggregator          -- mr  -  630
aggregator          -- elizabeth  -  437
aggregator          -- very  -  378
aggregator          -- darcy  -  327
aggregator          -- such  -  251
aggregator          -- bingley  -  245
aggregator          -- much  -  243
aggregator          -- miss  -  240
aggregator          -- mrs  -  239
aggregator          -- more  -  218
aggregator          -- bennet  -  206
aggregator          -- one  -  187
aggregator          -- jane  -  185
aggregator          -- herself  -  173
aggregator          -- collins  -  167
aggregator          -- think  -  152
aggregator          -- lady  -  149
aggregator          -- before  -  146
aggregator          -- though  -  143
aggregator          -- well  -  141
aggregator          -- never  -  140
aggregator          -- sister  -  139
aggregator          -- little  -  136
aggregator          -- soon  -  133
aggregator          -- know  -  133
aggregator          -- [REQUEST]   :: Job #3 needs to be done. Anyone can help? Range is (8952:13426)
aggregator          -- [HOPING]    :: 8952 -- 13426 to be done by 251adacb-0605-4e96-8904-9fd5f239fce2.
frequency-counter:3 -- [OK]        :: Start to work on (8952:13426)
aggregator          -- [DONE]      :: Job 3 is done.
aggregator          -- TOP counted words so far
aggregator          -- mr  -  786
aggregator          -- elizabeth  -  635
aggregator          -- very  -  488
aggregator          -- darcy  -  418
aggregator          -- such  -  395
aggregator          -- mrs  -  343
aggregator          -- much  -  329
aggregator          -- more  -  327
aggregator          -- bennet  -  323
aggregator          -- bingley  -  306
aggregator          -- jane  -  295
aggregator          -- miss  -  283
aggregator          -- one  -  275
aggregator          -- know  -  239
aggregator          -- before  -  229
aggregator          -- herself  -  227
aggregator          -- though  -  226
aggregator          -- well  -  224
aggregator          -- never  -  220
aggregator          -- sister  -  218
aggregator          -- soon  -  216
aggregator          -- think  -  211
aggregator          -- now  -  209
aggregator          -- time  -  203
aggregator          -- good  -  201

Further work