mökv

Mar 4th 2025

A distributed key value store implemented in Go.

I built it following the book Distributed Services with Go by Travis Jeffery.

Set and Delete operations are routed to the leader node, which replicates them across all follower nodes before confirming success. Get requests are distributed to follower nodes in a round-robin fashion. The data is stored in an in-memory map for simplicity.

Check out the code here.

API

The service exposes a gRPC API for interacting with the key value store:

internal/api/kv.proto
service KV {
    rpc Get(GetRequest) returns (GetResponse) {}
    rpc Set(SetRequest) returns (SetResponse) {}
    rpc Delete(DeleteRequest) returns (DeleteResponse) {}
    rpc List(google.protobuf.Empty) returns (stream GetResponse) {}
    rpc GetServers(google.protobuf.Empty) returns (GetServersResponse){}
}

Raft Consensus

mökv uses the Hashicorp's Raft library to implement distributed consensus. Raft makes sure that all nodes in the cluster maintain the same state by replicating a log of operations across nodes. For node discovery I used Serf.

internal/discovery/mebership.go
type Handler interface {
	Join(name, addr string) error
	Leave(name string) error
}

func (m *Membership) handleJoin(member serf.Member) {
    // ...

    // When Serf detects a new node
    err := m.handler.Join(member.Name, member.Tags["rpc_addr"])
    // ...
}
internal/kv/kv.go
// The Join function fulfills the Hanlder interface above
func (kv *KV) Join(id, addr string) error {
    // ...

    // Executed by Raft leader to add a new voter
    addFuture := kv.raft.AddVoter(raft.ServerID(id), raft.ServerAddress(addr), 0, time.Second*10)
    return addFuture.Error()
}

Then to replicate the Delete operation to all nodes:

internal/kv/kv.go
func (kv *KV) Delete(key string) error {
	// Replicate Delete
	_, err := kv.apply(DeleteRequestType, &api.DeleteRequest{Key: key})
	if err != nil {
		return fmt.Errorf("failed to apply replication deleting key: %s from kv: %w", key, err)
	}
	return nil
}

func (kv *KV) apply(reqType RequestType, req proto.Message) (any, error) {
	var buf bytes.Buffer
	// Write the reqType byte to the buffer to differentiate from other requests
	// later on
	_, err := buf.Write([]byte{byte(reqType)})
	if err != nil {
		return nil, err
	}
	b, err := proto.Marshal(req)
	if err != nil {
		return nil, err
	}
	// Add the actual request after that first byte to the buffer
	_, err = buf.Write(b)
	if err != nil {
		return nil, err
	}
	timeout := 10 * time.Second
    // This Apply call will tirgger the Finite State Machine apply operation below
	future := kv.raft.Apply(buf.Bytes(), timeout)
	if future.Error() != nil {
		return nil, future.Error()
	}
	res := future.Response()
	if err, ok := res.(error); ok {
		return nil, err
	}
	return res, nil
}

// This will get called on every node in the cluster
func (fsm *fsm) Apply(log *raft.Log) any {
    // ...

	reqType := RequestType(log.Data[0])
	switch reqType {
	case SetRequestType:
		return fsm.applySet(log.Data[1:])
	case DeleteRequestType:
		return fsm.applyDelete(log.Data[1:])
	}
	return nil
}

func (fsm *fsm) applyDelete(b []byte) any {
	var req api.DeleteRequest
	err := proto.Unmarshal(b, &req)
	if err != nil {
		return err
	}
	err = fsm.kv.Delete(req.Key)
	if err != nil {
		return err
	}
	return &api.DeleteResponse{Ok: true}
}

For more details check the README.

Client-Side Load Balancing

For load balancing I used the gRPC Name Resolver and Picker.

internal/discovery/resolver.go
func (r *Resolver) ResolveNow(resolver.ResolveNowOptions) {
	r.mu.Lock()
	defer r.mu.Unlock()

    // Create client to requst list of servers
	client := api.NewKVClient(r.resolverConn)
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

    // Get the servers
	res, err := client.GetServers(ctx, &emptypb.Empty{})
	if err != nil {
		slog.Error("failed to resolve server", "err", err)
		return
	}
	var addrs []resolver.Address
	for _, server := range res.Servers {
		slog.Info("got server", "server", server)
		addrs = append(addrs, resolver.Address{
			Addr: server.RpcAddr,
			Attributes: attributes.New( // Add this so te picker knows which one is the leader
				"is_leader",
				server.IsLeader,
			),
		})
	}

	r.clientConn.UpdateState(resolver.State{
		Addresses:     addrs,
		ServiceConfig: r.serviceConfig,
	})
}
internal/discovery/picker.go
func (p *Picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
	var result balancer.PickResult
    // ...

    // If this is a Set or Delete operation route it to the leader node
	if strings.Contains(info.FullMethodName, "Set") ||
		strings.Contains(info.FullMethodName, "Delete") {
		if p.leader == nil {
			return result, balancer.ErrNoSubConnAvailable
		}
		result.SubConn = p.leader
		return result, nil
	}

    // If not pick a random next follower
	if len(p.followers) > 0 {
		result.SubConn = p.nextFollower()
		return result, nil
	}

    // ...

	return result, balancer.ErrNoSubConnAvailable
}

Final Thoughts

This is a project to learn more about distributed systems, Go and gRPC. It's not intended to be used in production.