Subscribe to container events

This commit is contained in:
Trevor Slocum 2023-04-14 22:20:08 -07:00
parent 4f91af6302
commit 588877e6ee
6 changed files with 233 additions and 26 deletions

View file

@ -25,9 +25,11 @@ func NewClient(conn net.Conn) *Client {
func (c *Client) handleWrite() {
for out := range c.Out {
log.Printf(" -> %s", out)
_, err := c.Conn.Write(out)
if err != nil {
// TODO terminate client
log.Println("write error", err)
continue
}
}

View file

@ -1,6 +1,7 @@
package beehive
import (
"bufio"
"bytes"
"fmt"
"io/fs"
@ -11,6 +12,7 @@ import (
"regexp"
"strconv"
"strings"
"time"
)
type Deployment struct {
@ -104,27 +106,25 @@ func (d *Deployment) deploy() error {
copyDataDir := path.Join(festoonPath, "data")
outDir := path.Join(d.Worker.DeploymentsDir, d.Label())
fileInfo, err := os.Stat(outDir)
fileInfo, err := os.Stat(d.Dir())
if err != nil {
if !os.IsNotExist(err) {
return err
}
err = os.MkdirAll(outDir, 0700)
err = os.MkdirAll(d.Dir(), 0700)
if err != nil {
return err
}
} else if !fileInfo.IsDir() {
return fmt.Errorf("invalid output directory: %s", outDir)
return fmt.Errorf("invalid output directory: %s", d.Dir())
}
err = d.interpolateAndCopy(path.Join(festoonPath, "docker-compose.yml"), path.Join(outDir, "docker-compose.yml"))
err = d.interpolateAndCopy(path.Join(festoonPath, "docker-compose.yml"), path.Join(d.Dir(), "docker-compose.yml"))
if err != nil {
return err
}
outDataPath := path.Join(outDir, "data")
outDataPath := path.Join(d.Dir(), "data")
fileInfo, err = os.Stat(copyDataDir)
if err != nil {
@ -157,9 +157,66 @@ func (d *Deployment) deploy() error {
return err
}
}
stdOut, stdErr, err := DockerCompose(d.Dir(), []string{"up", "-d"})
if bytes.Contains(stdErr, []byte(fmt.Sprintf("%s is up-to-date", d.Label()))) {
log.Printf("Warning: %s was already up", d.Label())
} else if err != nil {
return fmt.Errorf("failed to bring deployment up: %s", err)
}
log.Printf("docker compose stdOut: %s", stdOut)
log.Printf("docker compose stdErr: %s", stdErr)
log.Println("deployment UP!")
return nil
}
func (d *Deployment) Label() string {
return fmt.Sprintf("%s-%d", d.Festoon, d.ID)
}
func (d *Deployment) Dir() string {
return path.Join(d.Worker.DeploymentsDir, d.Label())
}
func (d *Deployment) handleEvents() {
log.Println("HANDLE EVENTS")
cmd, stdOut, stdErr, err := DockerEvents(d.Label())
if err != nil {
log.Fatal(err, stdErr.String())
}
go func() {
for {
scanner := bufio.NewScanner(stdErr)
for scanner.Scan() {
log.Println("EVENT", string(scanner.Bytes()))
}
if scanner.Err() != nil {
log.Fatal("scanner error", scanner.Err())
}
time.Sleep(2 * time.Millisecond)
}
}()
_ = cmd
for {
scanner := bufio.NewScanner(stdOut)
for scanner.Scan() {
b := scanner.Bytes()
if !bytes.HasPrefix(b, []byte("H.net")) || !bytes.HasSuffix(b, []byte("H.net")) {
log.Fatalf("unrecognized event: %s", b)
}
l := len(b)
if l == 10 {
continue
}
log.Println("Container status", string(b[5:l-5]))
}
if scanner.Err() != nil {
log.Fatal("scanner error", scanner.Err())
}
time.Sleep(2 * time.Millisecond)
}
}

View file

@ -1,22 +1,11 @@
package beehive
type ResultType int
// Note: Result types must only be appended to preserve values.
const (
ResultHealth ResultType = iota + 1
ResultDeploy
ResultStart
ResultRestart
ResultStop
)
type Result struct {
Type ResultType
Type TaskType
Parameters map[string]string
}
func NewResult(t ResultType, parameters map[string]string) *Result {
func NewResult(t TaskType, parameters map[string]string) *Result {
return &Result{
Type: t,
Parameters: parameters,

View file

@ -8,6 +8,7 @@ import (
"math/rand"
"net"
"strconv"
"sync"
"time"
)
@ -28,7 +29,8 @@ type Server struct {
WorkerConfig map[int]string
Clients []*Client
Clients []*Client
ClientsLock sync.Mutex
}
func NewServer(address string, workerConfig map[int]string) (*Server, error) {
@ -53,7 +55,6 @@ func (s *Server) listen() {
log.Fatalf("failed to accept connection on %s: %s", s.Address, err)
}
client := NewClient(conn)
s.Clients = append(s.Clients, client)
go s.handleConnection(client)
}
}
@ -85,12 +86,19 @@ func (s *Server) sendTestingTask(c *Client) {
}
t = NewTask(TaskDeploy, parameters)
s.sendTask(c.Worker, t)
t = NewTask(TaskHealth, map[string]string{
"time": fmt.Sprintf("%d", time.Now().UnixNano()),
})
s.sendTask(c.Worker, t)
}
func (s *Server) handleRead(c *Client) {
var readMessage bool
scanner := bufio.NewScanner(c.Conn)
for scanner.Scan() {
log.Printf(" <- %s", scanner.Bytes())
// Authenticate.
if !readMessage {
challenge := &ServerAuthentication{}
@ -131,14 +139,14 @@ func (s *Server) handleRead(c *Client) {
log.Printf(" <- result: %+v", result)
switch result.Type {
case ResultHealth:
case TaskHealth:
resultTime, err := strconv.ParseInt(result.Parameters["time"], 10, 64)
if err != nil {
// TODO disconnect worker
log.Fatal(err)
}
log.Printf("health check time: %.0fms", float64(time.Now().UnixNano()-resultTime)/1000000)
case ResultDeploy:
case TaskDeploy:
switch result.Parameters["status"] {
case "ok":
log.Printf("Deployment %s completed", result.Parameters["id"])
@ -167,8 +175,35 @@ func (s *Server) allocatePorts(d *Deployment) {
}
func (s *Server) handleConnection(c *Client) {
s.ClientsLock.Lock()
// Remove existing client.
for i, client := range s.Clients {
if client.Worker == c.Worker {
client.Conn.Close()
s.Clients = append(s.Clients[:i], s.Clients[i+1:]...)
log.Printf("Dropped existing connection with worker %d", c.Worker)
break
}
}
// Add to clients list.
s.Clients = append(s.Clients, c)
s.ClientsLock.Unlock()
s.handleRead(c)
s.ClientsLock.Lock()
defer s.ClientsLock.Unlock()
for i, client := range s.Clients {
if client.Worker == c.Worker {
s.Clients = append(s.Clients[:i], s.Clients[i+1:]...)
return
}
}
// TODO remove from clients list with mutex
log.Printf("Worker %d disconnected", c.Worker)
}

103
util.go
View file

@ -1,11 +1,18 @@
package beehive
import (
"bytes"
"context"
"errors"
"fmt"
"io/ioutil"
"log"
"os"
"os/exec"
"path"
"strings"
"sync"
"time"
"sigs.k8s.io/yaml"
)
@ -54,3 +61,99 @@ func Deserialize(object interface{}, path string) error {
return nil
}
func runCommand(dir string, timeout time.Duration, command string, args []string) (*exec.Cmd, *AsyncBuffer, *AsyncBuffer, error) {
stdOut := &AsyncBuffer{}
stdErr := &AsyncBuffer{}
var cmd *exec.Cmd
if timeout != 0 {
ctx, _ := context.WithTimeout(context.Background(), 60*time.Second)
cmd = exec.CommandContext(ctx, command, args...)
} else {
cmd = exec.Command(command, args...)
}
cmd.Dir = dir
cmd.Stdout = stdOut
cmd.Stderr = stdErr
err := cmd.Start()
if err != nil {
if exitError, ok := err.(*exec.ExitError); ok {
exitCode := exitError.ExitCode()
return cmd, stdOut, stdErr, fmt.Errorf("failed to execute %s %+v: return status %d", command, args, exitCode)
}
return cmd, stdOut, stdErr, fmt.Errorf("failed to execute %s %+v: %v", command, args, err)
}
return cmd, stdOut, stdErr, nil
}
func runCommandAndWait(dir string, timeout time.Duration, command string, args []string) ([]byte, []byte, error) {
log.Printf("Executing %s: %s %s", dir, command, strings.Join(args, " "))
cmd, stdOut, stdErr, err := runCommand(dir, timeout, command, args)
err = cmd.Wait()
if err != nil {
var errExtra string
if stdErr.Len() > 0 {
errExtra = "\n" + stdErr.String()
}
if exitError, ok := err.(*exec.ExitError); ok {
exitCode := exitError.ExitCode()
return stdOut.Bytes(), stdErr.Bytes(), fmt.Errorf("command terminated: %s %+v: return status %d%s%s", command, args, exitCode, stdOut.Bytes(), stdErr.Bytes())
}
return stdOut.Bytes(), stdErr.Bytes(), fmt.Errorf("failed to execute %s %+v: %v%s", command, args, err, errExtra)
}
return stdOut.Bytes(), stdErr.Bytes(), nil
}
func Docker(dir string, args []string) ([]byte, []byte, error) {
return runCommandAndWait(dir, 1*time.Minute, "docker", args)
}
func DockerCompose(dir string, args []string) ([]byte, []byte, error) {
return runCommandAndWait(dir, 1*time.Minute, "docker-compose", args)
}
func DockerEvents(container string) (*exec.Cmd, *AsyncBuffer, *AsyncBuffer, error) {
return runCommand("/", 0, "docker", []string{"events", "--filter", "container=" + container, "--format", "H.net{{ .Status }}H.net"})
}
type AsyncBuffer struct {
b bytes.Buffer
l sync.Mutex
}
func (b *AsyncBuffer) Read(p []byte) (n int, err error) {
b.l.Lock()
defer b.l.Unlock()
return b.b.Read(p)
}
func (b *AsyncBuffer) Write(p []byte) (n int, err error) {
b.l.Lock()
defer b.l.Unlock()
return b.b.Write(p)
}
func (b *AsyncBuffer) String() string {
b.l.Lock()
defer b.l.Unlock()
return b.b.String()
}
func (b *AsyncBuffer) Bytes() []byte {
b.l.Lock()
defer b.l.Unlock()
return b.b.Bytes()
}
func (b *AsyncBuffer) Len() int {
b.l.Lock()
defer b.l.Unlock()
return b.b.Len()
}

View file

@ -50,6 +50,8 @@ func (w *Worker) HandleRead(c *Client) {
var readFirst bool
scanner := bufio.NewScanner(c.Conn)
for scanner.Scan() {
log.Printf(" <- %s", scanner.Bytes())
if !readFirst {
if !bytes.Equal(scanner.Bytes(), []byte(serverWelcomeMessage)) {
log.Fatalf("unexpected server reply: %s", scanner.Bytes())
@ -69,9 +71,24 @@ func (w *Worker) HandleRead(c *Client) {
switch task.Type {
case TaskHealth:
result := NewResult(ResultHealth, map[string]string{
result := NewResult(TaskHealth, map[string]string{
"time": task.Parameters["time"],
})
for _, d := range w.Deployments {
stdOut, stdErr, err := Docker("", []string{"ps", "--filter", "name=" + d.Label(), "--format", "{{ .State }}"})
if err != nil {
log.Printf("failed to check status of deployment %d: %s", d.Label(), err)
}
if len(stdErr) > 0 {
log.Printf("failed to check status of deployment %d: %s", d.Label(), stdErr)
} else if len(stdOut) > 0 {
// TODO validate stdout matches status
result.Parameters[fmt.Sprintf("status_%d", d.ID)] = string(stdOut)
}
}
resultJson, err := json.Marshal(result)
if err != nil {
log.Fatalf("failed to marshal result %+v: %s", result, err)
@ -111,8 +128,12 @@ func (w *Worker) HandleRead(c *Client) {
log.Fatalf("failed to deploy %+v: %s", d, err)
}
go d.handleEvents()
w.Deployments = append(w.Deployments, d)
// Send result
result := NewResult(ResultDeploy, map[string]string{
result := NewResult(TaskDeploy, map[string]string{
"id": task.Parameters["id"],
"status": "ok",
})