Running Org Babel workloads on top of Mesos with Go and the Docker Containerizer

/public/mesos-tasks-output.png

One of the things that I left pending from my stay at HackerSchool, was to combine a couple of the technologies that have caught most of my attention: Mesos and Org mode.

/public/Mesos-Org.png

Although I’m a big fan of the literate programming and active documents that Org Babel enables for Emacs there are some limitations in the way it does it. It works perfectly for local runs to get the results in place within Emacs, but it does not work very well for long running processes:

#+BEGIN_SRC sh
while true; echo "hanging the Emacs session :P"; done
#+END_SRC

Also in case you want to run a code block remotely, you need to do it via ssh. Combining this with #+call blocks is still very cool though:

#+name: install-emacs
#+BEGIN_SRC sh :dir /ubuntu@192.168.0.7:/home/ubuntu
sudo apt-get update
sudo apt-get install -y emacs24-nox org-mode
#+END_SRC

#+call: install-emacs() :dir /ubuntu@192.168.0.8:/home/ubuntu
#+call: install-emacs() :dir /ubuntu@192.168.0.9:/home/ubuntu

For some cases, like provisioning and installing packages (usually the domain of tools like Chef, Puppet, Ansible, etc…) this may work well, but for running jobs remotely we still have the same issue of the session hanging and running the code blocks sequentially:

#+name: run-remotely
#+BEGIN_SRC sh :dir /ubuntu@192.168.0.7:/home/ubuntu
while true; do echo "called sequentially, blocks, not really what we want"; done
#+END_SRC

#+call: run-remotely() :dir /ubuntu@192.168.0.8:/home/ubuntu
#+call: run-remotely() :dir /ubuntu@192.168.0.9:/home/ubuntu

Not only that though, we also have to specify both the credentials and resources that we are using for running the workloads. Luckily, that is an area where Mesos really shines.

Implementation using the Mesos Go bindings

Mesos with its level of indirection approach, exposes a set of APIs that we can rely on to be able to write custom schedulers in a different runtime. The mesos-go bindings seem in actively development in particular, so I decided to base on those to write the scheduler.

Then, another thing we need is something which can understand Org mode documents, for this I implemented a basic parser of Org mode in Go which can be loaded by the scheduler to orchestrate the run.

Using the Go bindings, we start defining our scheduler:

type OrgBabelScheduler struct {
	tasksLaunched int
	tasksFinished int
	blocks        []*org.OrgSrcBlock
}

And below is a a basic example of how the ResourceOffers handle would look. Once having loaded the contents of the Org mode document, we will be inspecting the content of the code block its header arguments to procure the resources we want:

func (sched *OrgBabelScheduler) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) {

    // We will get many resource offerings,
    // but sometimes the resources being offered will not be enough
    // so we will need to implement backing off in case that happens.
    for _, offer := range offers {

        ...

        var tasks []*mesos.TaskInfo

        for _, src := range sched.blocks {
                sched.tasksLaunched++

                taskId := &mesos.TaskID{
                        Value: proto.String(strconv.Itoa(sched.tasksLaunched)),
                }

                // Should build the command properly depending of the runtime
                // Currenty only sh supported, but good enough
                // since I can just call the runtime from there
                cmd := src.RawContent

                fmt.Println("[OFFER ] Executing this code block:", src.Name, src.Headers)

                // The code block specifies the resources it should allocate
                //
                taskCpus := MIN_CPUS_PER_TASK
                if src.Headers[":cpus"] != "" {
                  taskCpus, _ = strconv.Atoi(src.Headers[":cpus"])
                }

                taskMem := MIN_MEM_PER_TASK
                if src.Headers[":mem"] != "" {
                  taskMem, _ = strconv.Atoi(src.Headers[":mem"])
                }

                task := &mesos.TaskInfo{
                        Name:     proto.String("ob-mesos-" + taskId.GetValue()),
                        TaskId:   taskId,
                        SlaveId:  offer.SlaveId,
                        // Executor: sched.executor,
                        Resources: []*mesos.Resource{
                                util.NewScalarResource("cpus", float64(taskCpus)),
                                util.NewScalarResource("mem", float64(taskMem)),
                        },
                        Command: &mesos.CommandInfo{
                                Value: proto.String(cmd),
                        },
                }

                // Run within a Docker container if :dockerimage is specified
                //
                if len(src.Headers[":dockerimage"]) > 0 {
                        task.Container = &mesos.ContainerInfo{
                                Type: mesos.ContainerInfo_DOCKER.Enum(),
                                Docker: &mesos.ContainerInfo_DockerInfo{
                                        Image: proto.String(src.Headers[":dockerimage"]),
					// ...
                                },
                        }
                }

                fmt.Printf("[OFFER ] Prepared to launch task:%s with offer %s \n", task.GetName(), offer.Id.GetValue())

                tasks = append(tasks, task)
        }
        fmt.Println("[OFFER ] Launching ", len(tasks), "tasks for offer", offer.Id.GetValue())
        driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, &mesos.Filters{RefuseSeconds: proto.Float64(1)})
    }
}

Full source of the implementation can be found here, written in literate programming with Org mode of course..

Basic usage example: Running two commands in parallel

A basic example, would be something like this:

#+title: Example of running Org Babel workloads on Mesos

*** Mesos settings

#+address: 192.168.0.7
#+master:  192.168.0.7:5050

*** Code blocks

#+name: hello-mesos
#+header: :cpus 2 :mem 128
#+BEGIN_SRC sh
while true; do 
  echo "hello world from Org Babel!!!"
  sleep 1
done
#+END_SRC

#+name: date-example
#+header: :cpus 2 :mem 256
#+BEGIN_SRC sh
while true; do 
  echo "Telling the time!"
  date
  sleep 1
done
#+END_SRC

Here, I am defining 2 code blocks, allocating 2 cpus to each. Also the second one will have a bit more memory than the other one. Only Mesos related configuration that we need is the location of a Mesos master and the ip address of the server from where we are running the scheduler.

Results

The end result of this experiment looks like this:

$ go run examples/org_scheduler.go -f org/job.org -logtostderr=true 

...
[REGIST] Framework Registered with Master  &MasterInfo{Id:*20150225-084641-117483712-5050-23902,Ip:*117483712,Port:*5050,Pid:*master@192.168.0.7:5050,Hostname:*192.168.0.7,XXX_unrecognized:[],}
[OFFER ] offerId = 20150225-084641-117483712-5050-23902-O60 , cpus = 4 , mem = 2812
[OFFER ] Executing this code block: hello-mesos map[:procs:5 :cpus:2 :mem:128]
[OFFER ] Prepared to launch task:ob-mesos-1 with offer 20150225-084641-117483712-5050-23902-O60 
[OFFER ] Executing this code block: date-example map[:cpus:2 :mem:256]
[OFFER ] Prepared to launch task:ob-mesos-2 with offer 20150225-084641-117483712-5050-23902-O60 
[OFFER ] Launching  2 tasks for offer 20150225-084641-117483712-5050-23902-O60
[STATUS] task 1  is in state  TASK_RUNNING
[STATUS] task 2  is in state  TASK_RUNNING

We can also check the logs within the Mesos slave sandbox:

/public/mesos-tasks.png

Containerizer usage example: Running a command within a Docker container

Thanks to the built-in Docker containerizer from Mesos, it would also be possible to specify a Docker image so that the command gets executed within a container.

In this example, the first block will specify the entrypoint and set :dockerimage redis so that the redis image is pulled and ran.

Then the second one will be using that redis container and updating a timestamp.

#+title:   Org + Docker + Mesos + Redis Example
#+address: 192.168.0.7
#+master:  192.168.0.7:5050

Here is a redis server, we can define the entrypoint to be used
within the Docker image via an Org Babel block:

#+name: redis-server
#+header: :cpus 2 :mem 512 :dockerimage redis
#+BEGIN_SRC sh
redis-server
#+END_SRC

In parallel, there will be another job running
which will be setting the time in the redis server:

#+name: set-time-in-redis
#+header: :cpus 2 :mem 256
#+BEGIN_SRC sh
while true; do 
  echo "SET time `date '+%s'`" | nc 127.0.0.1 6379
  sleep 1
done
#+END_SRC

Results

Output would be:

[OFFER ] offerId = 20150225-174751-117483712-5050-13334-O1376 , cpus = 4 , mem = 2812
[OFFER ] Executing this code block: redis-server map[:mem:512 :dockerimage:redis :cpus:2]
[OFFER ] Prepared to launch task:ob-mesos-1 with offer 20150225-174751-117483712-5050-13334-O1376 
[OFFER ] Executing this code block: set-time-in-redis map[:mem:256 :cpus:2]
[OFFER ] Prepared to launch task:ob-mesos-2 with offer 20150225-174751-117483712-5050-13334-O1376 
[OFFER ] Launching  2 tasks for offer 20150225-174751-117483712-5050-13334-O1376
[STATUS] task 2  is in state  TASK_RUNNING
[STATUS] task 1  is in state  TASK_RUNNING
[OFFER ] offerId = 20150225-174751-117483712-5050-13334-O1377 , cpus = 0 , mem = 2044

And we can also confirm that it has been run by the Docker engine:

sudo docker ps

CONTAINER ID        IMAGE               COMMAND                CREATED             STATUS              PORTS               NAMES
1a8b3c964c3e        redis:latest        "\"/bin/sh -c redis-   17 minutes ago      Up 17 minutes                           mesos-88de0870-b613-4bda-9ed4-30995834ccab

and that a timestamp has been set:

telnet 127.0.0.1 6379

Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
get time
$10
1424882889

We can also check the logs within the sandbox:

==> /tmp/mesos/slaves/20150223-223304-117483712-5050-29395-S0/frameworks/20150225-174751-117483712-5050-13334-0018/executors/1/runs/88de0870-b613-4bda-9ed4-30995834ccab/stdout <==
[8] 25 Feb 16:25:07.322 # Warning: no config file specified, using the default config. In order to specify a config file use redis-server /path/to/redis.conf
                _._
           _.-``__ ''-._
      _.-``    `.  `_.  ''-._           Redis 2.8.17 (00000000/0) 64 bit
  .-`` .-```.  ```\/    _.,_ ''-._
 (    '      ,       .-`  | `,    )     Running in stand alone mode
 |`-._`-...-` __...-.``-._|'` _.-'|     Port: 6379
 |    `-._   `._    /     _.-'    |     PID: 8
  `-._    `-._  `-./  _.-'    _.-'
 |`-._`-._    `-.__.-'    _.-'_.-'|
 |    `-._`-._        _.-'_.-'    |           http://redis.io
  `-._    `-._`-.__.-'_.-'    _.-'
 |`-._`-._    `-.__.-'    _.-'_.-'|
 |    `-._`-._        _.-'_.-'    |
  `-._    `-._`-.__.-'_.-'    _.-'
      `-._    `-.__.-'    _.-'
          `-._        _.-'
              `-.__.-'

[8] 25 Feb 16:25:07.349 # Server started, Redis version 2.8.17

Conclusion

But, why even use Org mode for this? Well, the best thing I think that it has for it is that we are adding value transparently to the way that we are executing our workloads along with its description.

By having a document format where code blocks are first class citizen, we can both manipulate the way we run something without losing the thought process of how we ran it in the first place (since human writing also first class citizen), thus emphasizing both reproducibility and readability.

Locally, the code blocks would still eval using the Org mode active document features, but by dispatching it to something like the OrgBabelScheduler, we can just lift those code blocks and run them in a distributed fashion.

There are still some ideas I have around this and the possibilites that it would open around reproducible research.

Feedback is very welcome! You can also follow me on Twitter if you find this interesting…