Counting words with NATS
16 Oct 2014Still going through the Exercises in Programming Styles, after diving into Haskell and Transducers for a bit, and since I’ve mentioned to other HackerSchoolers about NATS, I decided to use it for the map reduce programming example. It is somewhat loosely based on the dist-adder example from Derek.
NATS server
There is readily available gnatsd Docker container that we can use to start the server that will be used by the clients to communicate:
sudo docker run -p 4222:4222 apcera/gnatsd
Aggregator
I call it the aggregator, but basically this process will be in charge of dispatching a range of the lines in the text and dispatch the task to another node so that it computes the word frequency computation.
require 'nats/client'
require 'json'
NATS.start {
# Run options
$stdout.sync = true
["TERM", "INT"].each { |sig| trap(sig) { NATS.stop } }
SRC_ROOT = File.join(File.expand_path("."), "src", "exercises-in-programming-style")
PRIDE_AND_PREJUDICE = File.join(SRC_ROOT, "pride-and-prejudice.txt")
STOP_WORDS = File.join(SRC_ROOT, 'stop_words.txt')
# Compute the stop words once.
# This payload information is small enough that it will be transmitted
# to the frequency counters via the channel
@stop_words = File.read(STOP_WORDS).split(',')
@stop_words << ('a'..'z').to_a # also the alphabet
@stop_words.flatten!.uniq!
# Initialize
@words = Hash.new {|h,k| h[k] = 0 }
@available_computing_nodes = []
# Discovery Channel
NATS.subscribe('pride-prejudice.discovery') do |msg, reply, sub|
computing_node = JSON.parse(msg)
unless @available_computing_nodes.include?(computing_node)
puts "[DISCOVERED] :: #{computing_node}"
@available_computing_nodes << computing_node
puts "[AVAILABLE NODES] :: #{@available_computing_nodes.count}"
end
end
# {"id"=>3, "results"=>{"words"=>[{"test"=>1}]}}
NATS.subscribe('pride-prejudice.responses') do |msg, reply, sub|
results = JSON.parse(msg)
puts "[DONE] :: Job #{results['id']} is done."
# Mark the job as done
@chunks[results['id']][:done] = true
begin
# Use the partial results and start to count the words
counted_words = results['results']['words']
counted_words.each_pair do |w, c|
@words[w] += c
end
rescue => e
puts "Error while trying to count the words..."
puts e
puts e.backtrace
end
puts "TOP counted words so far"
@words.sort {|a,b| a[1] <=> b[1]}.reverse[0...25].each do |k, v|
puts "#{k} - #{v}"
end
end
puts "Waiting 5 seconds to get resources for the job..."
EM.add_timer(5) do
pride_and_prejudice_text = File.read(PRIDE_AND_PREJUDICE)
total_lines = pride_and_prejudice_text.lines.count
puts "Total lines to split: #{total_lines}"
# Most likely cannot split the computation perfectly into the number of nodes,
# so we take the remaining lines and add them to the first batch
chunk_size = total_lines / (@available_computing_nodes.count)
out_of_chunk = total_lines % @available_computing_nodes.count
puts "Chunk size per node: #{chunk_size}"
# Read the file, count the number of lines, and divide in chunks
# according to the number of available nodes
@chunks = {} # {index => {:start, :end, :done, :stop_words }}
chunk_start = 0
chunk_end = 0
1.upto(@available_computing_nodes.count) do |n|
chunk_end += chunk_size
if out_of_chunk > 0
chunk_size += out_of_chunk
out_of_chunk = 0
end
chunk_end = [chunk_end, total_lines].min
@chunks[n] = {:start => chunk_start, :end => chunk_end, :done => false, :stop_words => @stop_words }
chunk_start = chunk_end + 1
end
@chunks.each do |job|
job_id, range = job
# Only want one checker to respond to this
NATS.request('pride-prejudice.requests', nil, :max => 1) do |response|
node = JSON.parse(response)
puts "[REQUEST] :: Job ##{job_id} needs to be done. Anyone can help? Range is (#{range[:start]}:#{range[:end]})"
NATS.publish("pride-prejudice.#{node['id']}.compute", job.to_json) do
puts "[HOPING] :: #{range[:start]} -- #{range[:end]} to be done by #{node['id']}."
end
end
end
end
}
Word Frequency Counter
Then each one of the counters, will receive a chunk of words to process and ignore, then reply with the partial computed frequency when done.
While receiving tasks, we counter taints itself a little according to the number of tasks that it is in charge of processing.
require 'nats/client'
require 'securerandom'
require 'json'
$stdout.sync = true
["TERM", "INT"].each { |sig| trap(sig) { NATS.stop } }
SRC_ROOT = File.join(File.expand_path("."), "src", "exercises-in-programming-style")
PRIDE_AND_PREJUDICE = File.join(SRC_ROOT, "pride-and-prejudice.txt")
ID = SecureRandom.uuid
INFO = {'id' => ID }
def compute(range)
range_start = range['start'].to_i
range_end = range['end'].to_i
stop_words = range['stop_words']
words_frequency = Hash.new {|h,k| h[k] = 0 }
# Read local copy of the document and fetch that range of lines
lines = File.read(PRIDE_AND_PREJUDICE).lines[range_start..range_end]
lines.each do |line|
line.gsub!(/[^a-zA-Z0-9]/, " ") # remove non alphanumeric
words = line.split(" ")
words.each do |w|
next if stop_words.include?(w.downcase)
words_frequency[w.downcase] += 1
end
end
results = {'words' => words_frequency }
results
end
NATS.start do
@offerings = 0
EM.add_periodic_timer(1) do
NATS.publish('pride-prejudice.discovery', INFO.to_json)
end
NATS.subscribe('pride-prejudice.requests') do |msg, reply, sub|
EM.add_timer(@offerings) { NATS.publish(reply, INFO.to_json) }
@offerings += 1 # decrease taint delay
end
NATS.subscribe("pride-prejudice.#{ID}.compute") do |msg, reply, sub|
job = JSON.parse(msg)
job_id, range = job
puts "[OK] :: Start to work on (#{range['start']}:#{range['end']})"
results = compute(range)
@offerings -= 1 # delay ourselves according to the number of task being done
job_done = {
:id => job_id,
:results => results
}
NATS.publish("pride-prejudice.responses", job_done.to_json)
end
end
Output
Once the run is done the output would look something similar to this:
nats-server -- started with pid 15660
aggregator -- Waiting 5 seconds to get resources for the job...
aggregator -- [DISCOVERED] :: {"id"=>"251adacb-0605-4e96-8904-9fd5f239fce2"}
aggregator -- [AVAILABLE NODES] :: 1
aggregator -- [DISCOVERED] :: {"id"=>"a97518f9-2c10-4362-9800-5ec8e57651b9"}
aggregator -- [AVAILABLE NODES] :: 2
aggregator -- [DISCOVERED] :: {"id"=>"4ce45026-3cba-4209-a17b-e27e425366c0"}
aggregator -- [AVAILABLE NODES] :: 3
aggregator -- Total lines to split: 13426
aggregator -- Chunk size per node: 4475
aggregator -- [REQUEST] :: Job #1 needs to be done. Anyone can help? Range is (0:4475)
aggregator -- [HOPING] :: 0 -- 4475 to be done by 4ce45026-3cba-4209-a17b-e27e425366c0.
frequency-counter:1 -- [OK] :: Start to work on (0:4475)
aggregator -- [DONE] :: Job 1 is done.
aggregator -- TOP counted words so far
aggregator -- mr - 395
aggregator -- elizabeth - 202
aggregator -- very - 198
aggregator -- bingley - 181
aggregator -- darcy - 162
aggregator -- bennet - 160
aggregator -- miss - 142
aggregator -- such - 131
aggregator -- much - 130
aggregator -- mrs - 121
aggregator -- jane - 105
aggregator -- more - 102
aggregator -- collins - 95
aggregator -- one - 95
aggregator -- though - 77
aggregator -- think - 74
aggregator -- being - 73
aggregator -- know - 72
aggregator -- never - 71
aggregator -- lady - 70
aggregator -- well - 69
aggregator -- good - 67
aggregator -- man - 65
aggregator -- soon - 64
aggregator -- before - 63
aggregator -- [REQUEST] :: Job #2 needs to be done. Anyone can help? Range is (4476:8951)
aggregator -- [HOPING] :: 4476 -- 8951 to be done by 251adacb-0605-4e96-8904-9fd5f239fce2.
frequency-counter:3 -- [OK] :: Start to work on (4476:8951)
aggregator -- [DONE] :: Job 2 is done.
aggregator -- TOP counted words so far
aggregator -- mr - 630
aggregator -- elizabeth - 437
aggregator -- very - 378
aggregator -- darcy - 327
aggregator -- such - 251
aggregator -- bingley - 245
aggregator -- much - 243
aggregator -- miss - 240
aggregator -- mrs - 239
aggregator -- more - 218
aggregator -- bennet - 206
aggregator -- one - 187
aggregator -- jane - 185
aggregator -- herself - 173
aggregator -- collins - 167
aggregator -- think - 152
aggregator -- lady - 149
aggregator -- before - 146
aggregator -- though - 143
aggregator -- well - 141
aggregator -- never - 140
aggregator -- sister - 139
aggregator -- little - 136
aggregator -- soon - 133
aggregator -- know - 133
aggregator -- [REQUEST] :: Job #3 needs to be done. Anyone can help? Range is (8952:13426)
aggregator -- [HOPING] :: 8952 -- 13426 to be done by 251adacb-0605-4e96-8904-9fd5f239fce2.
frequency-counter:3 -- [OK] :: Start to work on (8952:13426)
aggregator -- [DONE] :: Job 3 is done.
aggregator -- TOP counted words so far
aggregator -- mr - 786
aggregator -- elizabeth - 635
aggregator -- very - 488
aggregator -- darcy - 418
aggregator -- such - 395
aggregator -- mrs - 343
aggregator -- much - 329
aggregator -- more - 327
aggregator -- bennet - 323
aggregator -- bingley - 306
aggregator -- jane - 295
aggregator -- miss - 283
aggregator -- one - 275
aggregator -- know - 239
aggregator -- before - 229
aggregator -- herself - 227
aggregator -- though - 226
aggregator -- well - 224
aggregator -- never - 220
aggregator -- sister - 218
aggregator -- soon - 216
aggregator -- think - 211
aggregator -- now - 209
aggregator -- time - 203
aggregator -- good - 201
Further work
- Extend this example so that it becomes resilient to fault tolerance, and see how does defending against this impact the style.
- Dispatch to other nodes with using a different client.