mökv
Mar 4th 2025A distributed key value store implemented in Go
.
I built it following the book Distributed Services with Go by Travis Jeffery.
Set
and Delete
operations are routed to the leader node, which replicates them across all follower nodes before confirming success. Get
requests are distributed to follower nodes in a round-robin fashion. The data is stored in an in-memory map for simplicity.
Check out the code here.
API
The service exposes a gRPC
API for interacting with the key value store:
service KV {
rpc Get(GetRequest) returns (GetResponse) {}
rpc Set(SetRequest) returns (SetResponse) {}
rpc Delete(DeleteRequest) returns (DeleteResponse) {}
rpc List(google.protobuf.Empty) returns (stream GetResponse) {}
rpc GetServers(google.protobuf.Empty) returns (GetServersResponse){}
}
Raft Consensus
mökv uses the Hashicorp's Raft
library to implement distributed consensus. Raft
makes sure that all nodes in the cluster maintain the same state by replicating a log of operations across nodes. For node discovery I used Serf
.
type Handler interface {
Join(name, addr string) error
Leave(name string) error
}
func (m *Membership) handleJoin(member serf.Member) {
// ...
// When Serf detects a new node
err := m.handler.Join(member.Name, member.Tags["rpc_addr"])
// ...
}
// The Join function fulfills the Hanlder interface above
func (kv *KV) Join(id, addr string) error {
// ...
// Executed by Raft leader to add a new voter
addFuture := kv.raft.AddVoter(raft.ServerID(id), raft.ServerAddress(addr), 0, time.Second*10)
return addFuture.Error()
}
Then to replicate the Delete
operation to all nodes:
func (kv *KV) Delete(key string) error {
// Replicate Delete
_, err := kv.apply(DeleteRequestType, &api.DeleteRequest{Key: key})
if err != nil {
return fmt.Errorf("failed to apply replication deleting key: %s from kv: %w", key, err)
}
return nil
}
func (kv *KV) apply(reqType RequestType, req proto.Message) (any, error) {
var buf bytes.Buffer
// Write the reqType byte to the buffer to differentiate from other requests
// later on
_, err := buf.Write([]byte{byte(reqType)})
if err != nil {
return nil, err
}
b, err := proto.Marshal(req)
if err != nil {
return nil, err
}
// Add the actual request after that first byte to the buffer
_, err = buf.Write(b)
if err != nil {
return nil, err
}
timeout := 10 * time.Second
// This Apply call will tirgger the Finite State Machine apply operation below
future := kv.raft.Apply(buf.Bytes(), timeout)
if future.Error() != nil {
return nil, future.Error()
}
res := future.Response()
if err, ok := res.(error); ok {
return nil, err
}
return res, nil
}
// This will get called on every node in the cluster
func (fsm *fsm) Apply(log *raft.Log) any {
// ...
reqType := RequestType(log.Data[0])
switch reqType {
case SetRequestType:
return fsm.applySet(log.Data[1:])
case DeleteRequestType:
return fsm.applyDelete(log.Data[1:])
}
return nil
}
func (fsm *fsm) applyDelete(b []byte) any {
var req api.DeleteRequest
err := proto.Unmarshal(b, &req)
if err != nil {
return err
}
err = fsm.kv.Delete(req.Key)
if err != nil {
return err
}
return &api.DeleteResponse{Ok: true}
}
For more details check the README
.
Client-Side Load Balancing
For load balancing I used the gRPC
Name Resolver and Picker.
func (r *Resolver) ResolveNow(resolver.ResolveNowOptions) {
r.mu.Lock()
defer r.mu.Unlock()
// Create client to requst list of servers
client := api.NewKVClient(r.resolverConn)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Get the servers
res, err := client.GetServers(ctx, &emptypb.Empty{})
if err != nil {
slog.Error("failed to resolve server", "err", err)
return
}
var addrs []resolver.Address
for _, server := range res.Servers {
slog.Info("got server", "server", server)
addrs = append(addrs, resolver.Address{
Addr: server.RpcAddr,
Attributes: attributes.New( // Add this so te picker knows which one is the leader
"is_leader",
server.IsLeader,
),
})
}
r.clientConn.UpdateState(resolver.State{
Addresses: addrs,
ServiceConfig: r.serviceConfig,
})
}
func (p *Picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
var result balancer.PickResult
// ...
// If this is a Set or Delete operation route it to the leader node
if strings.Contains(info.FullMethodName, "Set") ||
strings.Contains(info.FullMethodName, "Delete") {
if p.leader == nil {
return result, balancer.ErrNoSubConnAvailable
}
result.SubConn = p.leader
return result, nil
}
// If not pick a random next follower
if len(p.followers) > 0 {
result.SubConn = p.nextFollower()
return result, nil
}
// ...
return result, balancer.ErrNoSubConnAvailable
}
Final Thoughts
This is a project to learn more about distributed systems, Go
and gRPC
. It's not intended to be used in production.