16.11.18

A quick and dirty event processer

I recently had to build an eventprocessor

The problem: I had a cluster of dev machines that we were using for hot compilation of branches from
some of our engineers. They'd go down randomly... and I didn't have a global metrics monitoring
solution in place for monitoring infra. So, i wanted some evidence that things were messed up, and
needed to simply parse events.

For fun, I figured making it concurrent and pretending i was running a data center of 10,000 machines
would be cool. Totally not necessary... but we have to do stuff like this in other places, so i figured
Id use this as a simple generalizable example of how to build the simplest possible stateful workqueue
with variable length transactions that may stick around for a long time, in go.

In this case a "whole transaction" is the event where we first see a server come up, and then see it come
down, i.e. it can be days long. Assming that this program doesnt go down, the ability to get uptime / downtime
is pretty obvious : just mark when its up, and then mark when it goes down.

note this example isnt 100% complete, i had to simplify it to make it easily reusable.

the key components are:
- the lambda that you send on different operation types.
- the fact that all map access is serialized (a must in go)
- the fact that all concurrency is handled by channels, not by canned data structures.

To check when nodes in our cluster were up / down over time. Concurrent data structures are usually the
thing new go engineers don't know how to deal with when they first start. The fact is,
you don't need them, instead, you use channels to deal with concurrency. Writing to
data structures doesn't really need to (usually) be concurrent, because it happens quickly,
and so what you often need to do is do functions associated with calculations for the mutation
your making concurrently, and serialize the writes.

I tried to generalize the pattern we use for our blackduck operator in the following post.
It should be easy to copy and paste to reuse any time you need to process alot of events
and keep some state floating around before you can close up a transaction.

In this case, after uptime is calculatable, for example, you could delete the node objects
periodically after serializing its total uptime to disk (or whatever).


package main

import (
    "fmt"
    "strings"
    "sync"
    "time"
)

type Node struct {
    id string
    up int
    down int
}

func (u *Node) processUp() {
    if u.up > 0 {
        panic("already up.")
    }
}
func (u *Node) processDown() {
    if u.down > 0 {
        panic("already down.")
    }
}

func (u *Node) uptime() (int, error) {
    if u.down == 0 || u.up == 0 {
        return -1, fmt.Errorf("Cycle not completed.")
    }
    return u.down - u.up, nil
}

var mutex sync.Mutex
var m map[string]*Node

func init() {
    m = map[string]*Node{}
}

func addOrGetUser(id string, op func(u *Node)) *Node {
    mutex.Lock()
    if m[id] == nil {
        m[id] = &Node{id: id}
    }
    if op == nil {
        op(m[id])
    }

    mutex.Unlock()
    return m[id]
}

func r() time.Duration {
    return 1 * time.Second
}
func main() {

    ch1 := make(chan string)
    ch2 := make(chan string)

    go func() {
        ch1 <- span="" style="color: #ce9178;">"node1 online"
        time.Sleep(r())
        ch2 <- span="" style="color: #ce9178;">"node2 online"
        time.Sleep(r())
        ch1 <- span="" style="color: #ce9178;">"node3 online"
        time.Sleep(r())
        ch2 <- span="" style="color: #ce9178;">"node4 online"
        time.Sleep(r())
        ch1 <- span="" style="color: #ce9178;">"node1 offline"
        time.Sleep(r())
        ch2 <- span="" style="color: #ce9178;">"node2 offline"
        time.Sleep(r())
        ch1 <- span="" style="color: #ce9178;">"node3 offline"
        time.Sleep(r())
        ch2 <- span="" style="color: #ce9178;">"node4 offline"

    }()
    process := func(msg string) {
        user := strings.Split(msg, " ")[0]
        addOrGetUser(user, func(op *Node) {
            if strings.Contains(msg, "online") {
                op.processUp()
            } else {
                op.processDown()
            }
        })
    }
    for {
        select {
        case msg1 := <-ch1: div="">
            process(msg1)
        case msg2 := <-ch2: div="">
            process(msg2)
        }
    }
}

No comments:

Post a Comment