Running Org Babel workloads on top of Mesos with Go and the Docker Containerizer
25 Feb 2015One of the things that I left pending from my stay at HackerSchool, was to combine a couple of the technologies that have caught most of my attention: Mesos and Org mode.
Although I’m a big fan of the literate programming and active documents that Org Babel enables for Emacs there are some limitations in the way it does it. It works perfectly for local runs to get the results in place within Emacs, but it does not work very well for long running processes:
#+BEGIN_SRC sh while true; echo "hanging the Emacs session :P"; done #+END_SRC
Also in case you want to run a code block remotely,
you need to do it via ssh. Combining this with #+call
blocks is
still very cool though:
#+name: install-emacs #+BEGIN_SRC sh :dir /ubuntu@192.168.0.7:/home/ubuntu sudo apt-get update sudo apt-get install -y emacs24-nox org-mode #+END_SRC #+call: install-emacs() :dir /ubuntu@192.168.0.8:/home/ubuntu #+call: install-emacs() :dir /ubuntu@192.168.0.9:/home/ubuntu
For some cases, like provisioning and installing packages (usually the domain of tools like Chef, Puppet, Ansible, etc…) this may work well, but for running jobs remotely we still have the same issue of the session hanging and running the code blocks sequentially:
#+name: run-remotely #+BEGIN_SRC sh :dir /ubuntu@192.168.0.7:/home/ubuntu while true; do echo "called sequentially, blocks, not really what we want"; done #+END_SRC #+call: run-remotely() :dir /ubuntu@192.168.0.8:/home/ubuntu #+call: run-remotely() :dir /ubuntu@192.168.0.9:/home/ubuntu
Not only that though, we also have to specify both the credentials and resources that we are using for running the workloads. Luckily, that is an area where Mesos really shines.
Implementation using the Mesos Go bindings
Mesos with its level of indirection approach, exposes a set of APIs that we can rely on to be able to write custom schedulers in a different runtime. The mesos-go bindings seem in actively development in particular, so I decided to base on those to write the scheduler.
Then, another thing we need is something which can understand Org mode documents, for this I implemented a basic parser of Org mode in Go which can be loaded by the scheduler to orchestrate the run.
Using the Go bindings, we start defining our scheduler:
type OrgBabelScheduler struct {
tasksLaunched int
tasksFinished int
blocks []*org.OrgSrcBlock
}
And below is a a basic example of how the ResourceOffers
handle
would look. Once having loaded the contents of the Org mode document,
we will be inspecting the content of the code block its header
arguments to procure the resources we want:
func (sched *OrgBabelScheduler) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) {
// We will get many resource offerings,
// but sometimes the resources being offered will not be enough
// so we will need to implement backing off in case that happens.
for _, offer := range offers {
...
var tasks []*mesos.TaskInfo
for _, src := range sched.blocks {
sched.tasksLaunched++
taskId := &mesos.TaskID{
Value: proto.String(strconv.Itoa(sched.tasksLaunched)),
}
// Should build the command properly depending of the runtime
// Currenty only sh supported, but good enough
// since I can just call the runtime from there
cmd := src.RawContent
fmt.Println("[OFFER ] Executing this code block:", src.Name, src.Headers)
// The code block specifies the resources it should allocate
//
taskCpus := MIN_CPUS_PER_TASK
if src.Headers[":cpus"] != "" {
taskCpus, _ = strconv.Atoi(src.Headers[":cpus"])
}
taskMem := MIN_MEM_PER_TASK
if src.Headers[":mem"] != "" {
taskMem, _ = strconv.Atoi(src.Headers[":mem"])
}
task := &mesos.TaskInfo{
Name: proto.String("ob-mesos-" + taskId.GetValue()),
TaskId: taskId,
SlaveId: offer.SlaveId,
// Executor: sched.executor,
Resources: []*mesos.Resource{
util.NewScalarResource("cpus", float64(taskCpus)),
util.NewScalarResource("mem", float64(taskMem)),
},
Command: &mesos.CommandInfo{
Value: proto.String(cmd),
},
}
// Run within a Docker container if :dockerimage is specified
//
if len(src.Headers[":dockerimage"]) > 0 {
task.Container = &mesos.ContainerInfo{
Type: mesos.ContainerInfo_DOCKER.Enum(),
Docker: &mesos.ContainerInfo_DockerInfo{
Image: proto.String(src.Headers[":dockerimage"]),
// ...
},
}
}
fmt.Printf("[OFFER ] Prepared to launch task:%s with offer %s \n", task.GetName(), offer.Id.GetValue())
tasks = append(tasks, task)
}
fmt.Println("[OFFER ] Launching ", len(tasks), "tasks for offer", offer.Id.GetValue())
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, &mesos.Filters{RefuseSeconds: proto.Float64(1)})
}
}
Full source of the implementation can be found here, written in literate programming with Org mode of course..
Basic usage example: Running two commands in parallel
A basic example, would be something like this:
#+title: Example of running Org Babel workloads on Mesos *** Mesos settings #+address: 192.168.0.7 #+master: 192.168.0.7:5050 *** Code blocks #+name: hello-mesos #+header: :cpus 2 :mem 128 #+BEGIN_SRC sh while true; do echo "hello world from Org Babel!!!" sleep 1 done #+END_SRC #+name: date-example #+header: :cpus 2 :mem 256 #+BEGIN_SRC sh while true; do echo "Telling the time!" date sleep 1 done #+END_SRC
Here, I am defining 2 code blocks, allocating 2 cpus to each. Also the second one will have a bit more memory than the other one. Only Mesos related configuration that we need is the location of a Mesos master and the ip address of the server from where we are running the scheduler.
Results
The end result of this experiment looks like this:
$ go run examples/org_scheduler.go -f org/job.org -logtostderr=true
...
[REGIST] Framework Registered with Master &MasterInfo{Id:*20150225-084641-117483712-5050-23902,Ip:*117483712,Port:*5050,Pid:*master@192.168.0.7:5050,Hostname:*192.168.0.7,XXX_unrecognized:[],}
[OFFER ] offerId = 20150225-084641-117483712-5050-23902-O60 , cpus = 4 , mem = 2812
[OFFER ] Executing this code block: hello-mesos map[:procs:5 :cpus:2 :mem:128]
[OFFER ] Prepared to launch task:ob-mesos-1 with offer 20150225-084641-117483712-5050-23902-O60
[OFFER ] Executing this code block: date-example map[:cpus:2 :mem:256]
[OFFER ] Prepared to launch task:ob-mesos-2 with offer 20150225-084641-117483712-5050-23902-O60
[OFFER ] Launching 2 tasks for offer 20150225-084641-117483712-5050-23902-O60
[STATUS] task 1 is in state TASK_RUNNING
[STATUS] task 2 is in state TASK_RUNNING
We can also check the logs within the Mesos slave sandbox:
Containerizer usage example: Running a command within a Docker container
Thanks to the built-in Docker containerizer from Mesos, it would also be possible to specify a Docker image so that the command gets executed within a container.
In this example, the first block will specify the entrypoint and set
:dockerimage redis
so that the redis image is pulled and ran.
Then the second one will be using that redis container and updating a timestamp.
#+title: Org + Docker + Mesos + Redis Example #+address: 192.168.0.7 #+master: 192.168.0.7:5050 Here is a redis server, we can define the entrypoint to be used within the Docker image via an Org Babel block: #+name: redis-server #+header: :cpus 2 :mem 512 :dockerimage redis #+BEGIN_SRC sh redis-server #+END_SRC In parallel, there will be another job running which will be setting the time in the redis server: #+name: set-time-in-redis #+header: :cpus 2 :mem 256 #+BEGIN_SRC sh while true; do echo "SET time `date '+%s'`" | nc 127.0.0.1 6379 sleep 1 done #+END_SRC
Results
Output would be:
[OFFER ] offerId = 20150225-174751-117483712-5050-13334-O1376 , cpus = 4 , mem = 2812
[OFFER ] Executing this code block: redis-server map[:mem:512 :dockerimage:redis :cpus:2]
[OFFER ] Prepared to launch task:ob-mesos-1 with offer 20150225-174751-117483712-5050-13334-O1376
[OFFER ] Executing this code block: set-time-in-redis map[:mem:256 :cpus:2]
[OFFER ] Prepared to launch task:ob-mesos-2 with offer 20150225-174751-117483712-5050-13334-O1376
[OFFER ] Launching 2 tasks for offer 20150225-174751-117483712-5050-13334-O1376
[STATUS] task 2 is in state TASK_RUNNING
[STATUS] task 1 is in state TASK_RUNNING
[OFFER ] offerId = 20150225-174751-117483712-5050-13334-O1377 , cpus = 0 , mem = 2044
And we can also confirm that it has been run by the Docker engine:
sudo docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
1a8b3c964c3e redis:latest "\"/bin/sh -c redis- 17 minutes ago Up 17 minutes mesos-88de0870-b613-4bda-9ed4-30995834ccab
and that a timestamp has been set:
telnet 127.0.0.1 6379
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
get time
$10
1424882889
We can also check the logs within the sandbox:
==> /tmp/mesos/slaves/20150223-223304-117483712-5050-29395-S0/frameworks/20150225-174751-117483712-5050-13334-0018/executors/1/runs/88de0870-b613-4bda-9ed4-30995834ccab/stdout <==
[8] 25 Feb 16:25:07.322 # Warning: no config file specified, using the default config. In order to specify a config file use redis-server /path/to/redis.conf
_._
_.-``__ ''-._
_.-`` `. `_. ''-._ Redis 2.8.17 (00000000/0) 64 bit
.-`` .-```. ```\/ _.,_ ''-._
( ' , .-` | `, ) Running in stand alone mode
|`-._`-...-` __...-.``-._|'` _.-'| Port: 6379
| `-._ `._ / _.-' | PID: 8
`-._ `-._ `-./ _.-' _.-'
|`-._`-._ `-.__.-' _.-'_.-'|
| `-._`-._ _.-'_.-' | http://redis.io
`-._ `-._`-.__.-'_.-' _.-'
|`-._`-._ `-.__.-' _.-'_.-'|
| `-._`-._ _.-'_.-' |
`-._ `-._`-.__.-'_.-' _.-'
`-._ `-.__.-' _.-'
`-._ _.-'
`-.__.-'
[8] 25 Feb 16:25:07.349 # Server started, Redis version 2.8.17
Conclusion
But, why even use Org mode for this? Well, the best thing I think that it has for it is that we are adding value transparently to the way that we are executing our workloads along with its description.
By having a document format where code blocks are first class citizen, we can both manipulate the way we run something without losing the thought process of how we ran it in the first place (since human writing also first class citizen), thus emphasizing both reproducibility and readability.
Locally, the code blocks would still eval
using the Org mode active document features, but by dispatching it to
something like the OrgBabelScheduler
, we can just lift those code blocks and run
them in a distributed fashion.
There are still some ideas I have around this and the possibilites that it would open around reproducible research.
Feedback is very welcome! You can also follow me on Twitter if you find this interesting…