beehive/server.go

239 lines
5.1 KiB
Go

package beehive
import (
"bufio"
"encoding/json"
"fmt"
"log"
"math/rand"
"net"
"strconv"
"strings"
"sync"
"time"
)
const (
minPort = 10000
maxPort = 49151
)
const serverWelcomeMessage = "BUZZ"
type ServerAuthentication struct {
Worker int
Password string
}
type Server struct {
Address string
WorkerConfig map[int]string
Clients map[int]*Client
ClientsLock sync.Mutex
}
func NewServer(address string, workerConfig map[int]string) (*Server, error) {
s := &Server{
Address: address,
WorkerConfig: workerConfig,
Clients: make(map[int]*Client),
}
go s.listen()
return s, nil
}
func (s *Server) listen() {
listener, err := net.Listen("tcp", s.Address)
if err != nil {
log.Fatalf("failed to listen on %s: %s", s.Address, err)
}
var conn net.Conn
for {
conn, err = listener.Accept()
if err != nil {
log.Fatalf("failed to accept connection on %s: %s", s.Address, err)
}
client := NewClient(conn)
go s.handleConnection(client)
}
}
func (s *Server) sendTestingTask(c *Client) {
t := NewTask(TaskHealth, map[string]string{
"time": fmt.Sprintf("%d", time.Now().UnixNano()),
})
s.sendTask(c.Worker, t)
ports := []int{10500, 10501, 10502, 10503, 10504, 10505, 10506, 10507, 10508, 10509}
d := &Deployment{
ID: 1,
Festoon: "openttd",
}
parameters := map[string]string{
"id": fmt.Sprintf("%d", d.ID),
"festoon": d.Festoon,
}
for i := range ports {
parameters[fmt.Sprintf("port_%d", i)] = fmt.Sprintf("%d", ports[i])
}
parameters["ports"] = strconv.Itoa(len(ports))
t = NewTask(TaskDeploy, parameters)
s.sendTask(c.Worker, t)
time.Sleep(time.Second * 10)
t = NewTask(TaskHealth, map[string]string{
"time": fmt.Sprintf("%d", time.Now().UnixNano()),
})
s.sendTask(c.Worker, t)
}
func (s *Server) addClient(c *Client) {
s.ClientsLock.Lock()
defer s.ClientsLock.Unlock()
// Remove existing client.
existing := s.Clients[c.Worker]
if existing != nil {
existing.Conn.Close()
log.Printf("Dropped existing connection with worker %d", c.Worker)
}
s.Clients[c.Worker] = c
}
func (s *Server) handleRead(c *Client) {
var readMessage bool
scanner := bufio.NewScanner(c.Conn)
for scanner.Scan() {
log.Printf(" <- %s", scanner.Bytes())
// Authenticate.
if !readMessage {
challenge := &ServerAuthentication{}
err := json.Unmarshal(scanner.Bytes(), challenge)
if err != nil {
// TODO terminate
return
}
if challenge.Worker <= 0 {
// TODO terminate
return
}
password, ok := s.WorkerConfig[challenge.Worker]
if ok && password != "" && password == challenge.Password {
c.Conn.Write([]byte(serverWelcomeMessage + "\n"))
c.Worker = challenge.Worker
log.Printf("Worker %d connected", c.Worker)
s.addClient(c)
// TODO
s.sendTestingTask(c)
readMessage = true
continue
}
// TODO terminate
return
}
result := &Result{}
err := json.Unmarshal(scanner.Bytes(), result)
if err != nil {
log.Fatalf("failed to unmarshal %s: %s", scanner.Bytes(), err)
}
log.Printf(" <- result: %+v", result)
switch result.Type {
case TaskHealth:
resultTime, err := strconv.ParseInt(result.Parameters["time"], 10, 64)
if err != nil {
// TODO disconnect worker
log.Fatal(err)
}
log.Printf("health check time: %.0fms", float64(time.Now().UnixNano()-resultTime)/1000000)
for key, value := range result.Parameters {
if strings.HasPrefix(key, "events_") {
var events []DeploymentEvent
err := json.Unmarshal([]byte(value), &events)
if err != nil {
log.Fatal(err)
}
log.Printf("deployment %s events:", key[7:])
for _, event := range events {
t := time.Unix(event.Time, 0)
label := DeploymentStatusLabel(event.Status)
if label == "" {
label = "(UNKNOWN)"
}
log.Printf(" %s %s", t.Format(time.DateTime), label)
}
}
}
case TaskDeploy:
switch result.Parameters["status"] {
case "ok":
log.Printf("Deployment %s completed", result.Parameters["id"])
default:
log.Fatalf("unknown deployment status: %s", result.Parameters["status"])
}
default:
log.Fatalf("unknown result type %d", result.Type)
}
}
}
func (s *Server) allocatePorts(d *Deployment) {
const allocatePortRange = 10
// TODO check if port range is already allocated, if so return that
startPort := minPort + rand.Intn(rand.Intn(maxPort)-minPort-allocatePortRange)
startPort -= startPort % allocatePortRange
ports := make([]int, allocatePortRange)
for i := range ports {
ports[i] = startPort + i
}
}
func (s *Server) handleConnection(c *Client) {
s.handleRead(c)
s.ClientsLock.Lock()
defer s.ClientsLock.Unlock()
delete(s.Clients, c.Worker)
// TODO remove from clients list with mutex
log.Printf("Worker %d disconnected", c.Worker)
}
func (s *Server) sendTask(workerID int, t *TaskMessage) bool {
client := s.Clients[workerID]
if client == nil {
return false
}
taskJSON, err := json.Marshal(t)
if err != nil {
log.Fatal(err)
}
if t.Type == TaskDeploy {
log.Printf("Deployment %s initiated", t.Parameters["id"])
}
client.Out <- append(taskJSON, byte('\n'))
return true
}