beehive/deployment.go

382 lines
8.4 KiB
Go

package beehive
import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"io/fs"
"log"
"os"
"path"
"path/filepath"
"regexp"
"strings"
"time"
)
const (
eventPrefix = "BEE"
eventSuffix = "HIVE"
variablePrefix = "BEEHIVE_VAR_"
)
type DeploymentStatus int
// Note: Entries must only be appended to this list.
const (
StatusUnknown DeploymentStatus = iota
StatusAttach
StatusCommit
StatusCopy
StatusCreate
StatusDestroy
StatusDetach
StatusDie
StatusExecCreate
StatusExecDetach
StatusExecDie
StatusExecStart
StatusExport
StatusHealthStatus
StatusKill
StatusOOM
StatusPause
StatusRename
StatusResize
StatusRestart
StatusStart
StatusStop
StatusTop
StatusUnpause
StatusUpdate
)
var DeploymentStatusLabels = map[string]DeploymentStatus{
"attach": StatusAttach,
"commit": StatusCommit,
"copy": StatusCopy,
"create": StatusCreate,
"destroy": StatusDestroy,
"detach": StatusDetach,
"die": StatusDie,
"exec_create": StatusExecCreate,
"exec_detach": StatusExecDetach,
"exec_die": StatusExecDie,
"exec_start": StatusExecStart,
"export": StatusExport,
"health_status": StatusHealthStatus,
"kill": StatusKill,
"oom": StatusOOM,
"pause": StatusPause,
"rename": StatusRename,
"resize": StatusResize,
"restart": StatusRestart,
"start": StatusStart,
"stop": StatusStop,
"top": StatusTop,
"unpause": StatusUnpause,
"update": StatusUpdate,
}
func DeploymentStatusLabel(status DeploymentStatus) string {
for str, i := range DeploymentStatusLabels {
if i == status {
return str
}
}
return ""
}
var RecordedDeploymentStatuses = []DeploymentStatus{
StatusCreate,
StatusDestroy,
StatusDie,
StatusKill,
StatusOOM,
StatusRestart,
StatusStart,
StatusStop,
}
func ParseDeploymentStatus(status string) DeploymentStatus {
return DeploymentStatusLabels[status]
}
type DeploymentEvent struct {
Time int64
Status DeploymentStatus
}
type Deployment struct {
ID int
UID int
Ports []int
Festoon string
Worker *Worker `json:"-"`
Events []DeploymentEvent `json:"-"`
}
// TODO do not need to load any metadata
// just read name of directory, id-name
var metadataPrefix = []byte("# BEEHIVE_METADATA ")
func LoadDeployment(dir string) (*Deployment, error) {
composeFilePath := path.Join(dir, "docker-compose.yml")
file, err := os.ReadFile(composeFilePath)
if err != nil {
return nil, err
}
scanner := bufio.NewScanner(bytes.NewReader(file))
for scanner.Scan() {
line := scanner.Bytes()
if len(line) == 0 {
continue
}
if line[0] != '#' {
continue
}
if !bytes.HasPrefix(line, metadataPrefix) {
continue
}
d := &Deployment{}
err = json.Unmarshal(line[len(metadataPrefix):], d)
if err != nil {
return nil, err
}
go d.handleEvents()
return d, nil
}
return nil, fmt.Errorf("no metadata found")
}
func (d *Deployment) Interpolate(filePath string, customValues map[string]string) ([]byte, error) {
buf, err := os.ReadFile(filePath)
if err != nil {
return nil, err
}
replacements := customValues
replacements["ID"] = fmt.Sprintf("%s-%d", d.Festoon, d.ID)
replacements["IP"] = d.Worker.IP
for original := range replacements {
// Escape keywords.
replacements[original] = strings.ReplaceAll(replacements[original], "BEEHIVE_VAR_", "BEEHIVE-VAR-")
// Add quoted variables.
quotedLabel := "QUOTED_" + original
quotedValue := fmt.Sprintf(`"%s"`, strings.ReplaceAll(replacements[original], `"`, `\"`))
replacements[quotedLabel] = quotedValue
}
// Apply substitutions.
for original, replacement := range replacements {
buf = bytes.ReplaceAll(buf, []byte(variablePrefix+original), []byte(replacement))
}
// Unescape keywords.
buf = bytes.ReplaceAll(buf, []byte("BEEHIVE-VAR-"), []byte("BEEHIVE_VAR_"))
return buf, nil
}
func (d *Deployment) interpolateAndCopy(inFile string, outFile string) error {
customValues := make(map[string]string)
data, err := d.Interpolate(inFile, customValues)
if err != nil {
return err
}
if path.Base(outFile) == "docker-compose.yml" {
metadata, err := json.Marshal(d)
if err != nil {
return err
}
newData := append(metadataPrefix, metadata...)
newData = append(newData, '\n', '\n')
newData = append(newData, data...)
data = newData
}
return os.WriteFile(outFile, data, 0600)
}
func (d *Deployment) deploy() error {
if strings.TrimSpace(d.Festoon) == "" {
return fmt.Errorf("unknown festoon: %s", d.Festoon)
}
match, err := regexp.MatchString(`^[a-zA-Z0-9]+$`, d.Festoon)
if err != nil {
return err
} else if !match {
return fmt.Errorf("unknown festoon: %s", d.Festoon)
}
festoonPath := path.Join(d.Worker.FestoonsDir, d.Festoon)
copyDataDir := path.Join(festoonPath, "data")
fileInfo, err := os.Stat(d.Dir())
if err != nil {
if !os.IsNotExist(err) {
return err
}
err = os.MkdirAll(d.Dir(), 0700)
if err != nil {
return err
}
} else if !fileInfo.IsDir() {
return fmt.Errorf("invalid output directory: %s", d.Dir())
} else {
_, err = os.Stat(path.Join(d.Dir(), "docker-compose.yml"))
if err != nil {
if !os.IsNotExist(err) {
log.Fatalf("failed to check for existing docker-compose.yml: %s", err)
}
} else {
log.Printf("Stopping deployment %s...", d.Label())
_, _, err = DockerCompose(d.Dir(), []string{"stop"}, d.Worker.Legacy)
if err != nil {
log.Printf("failed to stop running deployment: %s", err)
}
}
}
log.Println("Interpolating and copying files...")
err = d.interpolateAndCopy(path.Join(festoonPath, "docker-compose.yml"), path.Join(d.Dir(), "docker-compose.yml"))
if err != nil {
return err
}
outDataPath := path.Join(d.Dir(), "data")
fileInfo, err = os.Stat(copyDataDir)
if err != nil {
if !os.IsNotExist(err) {
return err
}
} else if fileInfo.IsDir() {
err = filepath.WalkDir(copyDataDir, func(filePath string, dirEntry fs.DirEntry, err error) error {
relativePath := strings.TrimPrefix(filePath, copyDataDir)
if relativePath == "" {
return err
} else if !strings.HasPrefix(relativePath, "/") {
log.Fatalf("unexpected file path: %s", relativePath)
}
relativePath = relativePath[1:]
outPath := path.Join(outDataPath, relativePath)
if dirEntry.IsDir() {
err = os.MkdirAll(outPath, 0700)
if err != nil {
log.Fatal(err)
}
return err
}
return d.interpolateAndCopy(filePath, outPath)
})
if err != nil {
return err
}
}
log.Printf("Starting deployment %s...", d.Label())
_, stdErr, err := DockerCompose(d.Dir(), []string{"up", "-d"}, d.Worker.Legacy)
if bytes.Contains(stdErr, []byte(fmt.Sprintf("%s is up-to-date", d.Label()))) {
log.Printf("Warning: %s was already up", d.Label())
d.Events = append(d.Events, DeploymentEvent{
Time: time.Now().Unix(),
Status: StatusAttach,
})
} else if err != nil {
return fmt.Errorf("failed to bring deployment up: %s", err)
}
return nil
}
func (d *Deployment) Label() string {
return fmt.Sprintf("%s-%d", d.Festoon, d.ID)
}
func (d *Deployment) Dir() string {
return path.Join(d.Worker.DeploymentsDir, d.Label())
}
func (d *Deployment) handleEvents() {
log.Println("HANDLE EVENTS")
cmd, stdOut, stdErr, err := DockerEvents(d.Label())
if err != nil {
log.Fatal(err, stdErr.String())
}
go func() {
for {
scanner := bufio.NewScanner(stdErr)
for scanner.Scan() {
log.Println("EVENT", string(scanner.Bytes()))
}
if scanner.Err() != nil {
log.Fatal("scanner error", scanner.Err())
}
time.Sleep(2 * time.Millisecond)
}
}()
_ = cmd
for {
scanner := bufio.NewScanner(stdOut)
for scanner.Scan() {
b := scanner.Bytes()
if !bytes.HasPrefix(b, []byte(eventPrefix)) || !bytes.HasSuffix(b, []byte(eventSuffix)) {
log.Fatalf("unrecognized event: %s", b)
}
l := len(b)
if l <= 7 {
continue
}
statusString := string(b[3 : l-4])
status := ParseDeploymentStatus(statusString)
if status == StatusUnknown {
log.Printf("Warning: Deployment %s has unknown status %s", d.Label(), statusString)
}
var recordEvent bool
for _, recordStatus := range RecordedDeploymentStatuses {
if status == recordStatus {
recordEvent = true
break
}
}
if !recordEvent {
continue
}
event := DeploymentEvent{
Time: time.Now().Unix(),
Status: status,
}
d.Events = append(d.Events, event)
}
if scanner.Err() != nil {
log.Fatal("scanner error", scanner.Err())
}
time.Sleep(2 * time.Millisecond)
}
}