Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions cmd/wsh/cmd/wshcmd-connserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ var serverCmd = &cobra.Command{
RunE: serverRun,
}

const (
JobLogRetentionTime = 48 * time.Hour
JobLogCleanupDelay = 10 * time.Second
JobLogCleanupInterval = 1 * time.Hour
)

var connServerRouter bool
var connServerRouterDomainSocket bool
var connServerConnName string
Expand All @@ -53,6 +59,61 @@ func init() {
rootCmd.AddCommand(serverCmd)
}

func cleanupOldJobLogs() {
jobDir := wavebase.GetRemoteJobLogDir()
entries, err := os.ReadDir(jobDir)
if err != nil {
return
}

cutoffTime := time.Now().Add(-JobLogRetentionTime)

for _, entry := range entries {
if entry.IsDir() {
continue
}

name := entry.Name()
if !strings.HasSuffix(name, ".log") {
continue
}

info, err := entry.Info()
if err != nil {
continue
}

if info.ModTime().Before(cutoffTime) {
filePath := filepath.Join(jobDir, name)
err := os.Remove(filePath)
if err != nil {
log.Printf("error removing old job log file %s: %v", filePath, err)
} else {
log.Printf("removed old job log file: %s", filePath)
}
}
}
}

func startJobLogCleanup() {
go func() {
defer func() {
panichandler.PanicHandler("startJobLogCleanup", recover())
}()

time.Sleep(JobLogCleanupDelay)

cleanupOldJobLogs()

ticker := time.NewTicker(JobLogCleanupInterval)
defer ticker.Stop()

for range ticker.C {
cleanupOldJobLogs()
}
}()
}

func getRemoteDomainSocketName() string {
homeDir := wavebase.GetHomeDir()
return filepath.Join(homeDir, wavebase.RemoteWaveHomeDirName, wavebase.RemoteDomainSocketBaseName)
Expand Down Expand Up @@ -218,6 +279,7 @@ func serverRunRouter() error {
}()
wshremote.RunSysInfoLoop(client, connServerConnName)
}()
startJobLogCleanup()
log.Printf("running server, successfully started")
select {}
}
Expand Down Expand Up @@ -324,6 +386,7 @@ func serverRunRouterDomainSocket(jwtToken string) error {
}()
wshremote.RunSysInfoLoop(client, connServerConnName)
}()
startJobLogCleanup()

log.Printf("running server (router-domainsocket mode), successfully started")
select {}
Expand All @@ -346,6 +409,7 @@ func serverRunNormal(jwtToken string) error {
}()
wshremote.RunSysInfoLoop(RpcClient, RpcContext.Conn)
}()
startJobLogCleanup()
select {} // run forever
}

Expand Down
24 changes: 12 additions & 12 deletions pkg/jobmanager/jobmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,17 @@ func SetupJobManager(clientId string, jobId string, publicKeyBytes []byte, jobAu
return fmt.Errorf("failed to daemonize: %w", err)
}

go func() {
defer func() {
panichandler.PanicHandler("JobManager:keepalive", recover())
}()
ticker := time.NewTicker(1 * time.Hour)
defer ticker.Stop()
for range ticker.C {
log.Printf("keepalive: job manager active\n")
}
}()

return nil
}

Expand Down Expand Up @@ -365,25 +376,14 @@ func (jm *JobManager) StartStream(msc *MainServerConn) error {
return nil
}

func GetJobSocketPath(jobId string) string {
socketDir := filepath.Join("/tmp", fmt.Sprintf("waveterm-%d", os.Getuid()))
return filepath.Join(socketDir, fmt.Sprintf("%s.sock", jobId))
}

func GetJobFilePath(clientId string, jobId string, extension string) string {
homeDir := wavebase.GetHomeDir()
jobDir := filepath.Join(homeDir, ".waveterm", "jobs", clientId)
return filepath.Join(jobDir, fmt.Sprintf("%s.%s", jobId, extension))
}

func MakeJobDomainSocket(clientId string, jobId string) error {
socketDir := filepath.Join("/tmp", fmt.Sprintf("waveterm-%d", os.Getuid()))
err := os.MkdirAll(socketDir, 0700)
if err != nil {
return fmt.Errorf("failed to create socket directory: %w", err)
}

socketPath := GetJobSocketPath(jobId)
socketPath := wavebase.GetRemoteJobSocketPath(jobId)

os.Remove(socketPath)

Expand Down
4 changes: 3 additions & 1 deletion pkg/jobmanager/jobmanager_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"path/filepath"
"syscall"

"github.com/wavetermdev/waveterm/pkg/wavebase"
"golang.org/x/sys/unix"
)

Expand All @@ -32,7 +33,7 @@ func daemonize(clientId string, jobId string) error {
}
devNull.Close()

logPath := GetJobFilePath(clientId, jobId, "log")
logPath := wavebase.GetRemoteJobFilePath(jobId, "log")
logDir := filepath.Dir(logPath)
err = os.MkdirAll(logDir, 0700)
if err != nil {
Expand All @@ -54,6 +55,7 @@ func daemonize(clientId string, jobId string) error {

log.SetOutput(logFile)
log.Printf("job manager daemonized, logging to %s\n", logPath)
log.Printf("job owner clientid: %s\n", clientId)

signal.Ignore(syscall.SIGHUP)

Expand Down
5 changes: 2 additions & 3 deletions pkg/jobmanager/mainserverconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ type routedDataSender struct {
}

func (rds *routedDataSender) SendData(dataPk wshrpc.CommandStreamData) {
log.Printf("SendData: sending seq=%d, len=%d, eof=%t, error=%s, route=%s",
dataPk.Seq, len(dataPk.Data64), dataPk.Eof, dataPk.Error, rds.route)
// log.Printf("SendData: sending seq=%d, len=%d, eof=%t, error=%s, route=%s",
// dataPk.Seq, len(dataPk.Data64), dataPk.Eof, dataPk.Error, rds.route)
err := wshclient.StreamDataCommand(rds.wshRpc, dataPk, &wshrpc.RpcOpts{NoResponse: true, Route: rds.route})
if err != nil {
log.Printf("SendData: error sending stream data: %v\n", err)
Expand Down Expand Up @@ -132,4 +132,3 @@ func (msc *MainServerConn) JobInputCommand(ctx context.Context, data wshrpc.Comm
WshCmdJobManager.InputQueue.QueueItem(data.InputSessionId, data.SeqNum, data)
return nil
}

6 changes: 3 additions & 3 deletions pkg/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ func mergeActivity(curActivity *telemetrydata.TEventProps, newActivity telemetry
// ignores the timestamp in tevent, and uses the current time
func updateActivityTEvent(ctx context.Context, tevent *telemetrydata.TEvent) error {
eventTs := time.Now()
// compute to 2-hour boundary, and round up to next 2-hour boundary
eventTs = eventTs.Truncate(2 * time.Hour).Add(2 * time.Hour)
// compute to 1-hour boundary, and round up to next 1-hour boundary
eventTs = eventTs.Truncate(time.Hour).Add(time.Hour)

return wstore.WithTx(ctx, func(tx *wstore.TxWrap) error {
// find event that matches this timestamp with event name "app:activity"
Expand Down Expand Up @@ -195,7 +195,7 @@ func updateActivityTEvent(ctx context.Context, tevent *telemetrydata.TEvent) err

func TruncateActivityTEventForShutdown(ctx context.Context) error {
nowTs := time.Now()
eventTs := nowTs.Truncate(2 * time.Hour).Add(2 * time.Hour)
eventTs := nowTs.Truncate(time.Hour).Add(time.Hour)
return wstore.WithTx(ctx, func(tx *wstore.TxWrap) error {
// find event that matches this timestamp with event name "app:activity"
uuidStr := tx.GetString(`SELECT uuid FROM db_tevent WHERE ts = ? AND event = ?`, eventTs.UnixMilli(), ActivityEventName)
Expand Down
19 changes: 19 additions & 0 deletions pkg/wavebase/wavebase.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,3 +435,22 @@ func getSystemSummary(ctx context.Context) string {
return fmt.Sprintf("%s (%s)", runtime.GOOS, runtime.GOARCH)
}
}

// job socket path on remote machine
func GetRemoteJobSocketPath(jobId string) string {
socketDir := filepath.Join("/tmp", fmt.Sprintf("waveterm-%d", os.Getuid()))
return filepath.Join(socketDir, fmt.Sprintf("%s.sock", jobId))
}

// job file path on remote machine
func GetRemoteJobFilePath(jobId string, extension string) string {
jobDir := GetRemoteJobLogDir()
return filepath.Join(jobDir, fmt.Sprintf("%s.%s", jobId, extension))
}

// job file dir on remote machines
func GetRemoteJobLogDir() string {
homeDir := GetHomeDir()
jobDir := filepath.Join(homeDir, ".waveterm", "jobs")
return jobDir
}
4 changes: 2 additions & 2 deletions pkg/wshrpc/wshremote/wshremote_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"time"

"github.com/shirou/gopsutil/v4/process"
"github.com/wavetermdev/waveterm/pkg/jobmanager"
"github.com/wavetermdev/waveterm/pkg/wavebase"
"github.com/wavetermdev/waveterm/pkg/wshrpc"
"github.com/wavetermdev/waveterm/pkg/wshrpc/wshclient"
"github.com/wavetermdev/waveterm/pkg/wshutil"
Expand All @@ -43,7 +43,7 @@ func isProcessRunning(pid int, pidStartTs int64) (*process.Process, error) {

// returns jobRouteId, cleanupFunc, error
func (impl *ServerImpl) connectToJobManager(ctx context.Context, jobId string, mainServerJwtToken string) (string, func(), error) {
socketPath := jobmanager.GetJobSocketPath(jobId)
socketPath := wavebase.GetRemoteJobSocketPath(jobId)
log.Printf("connectToJobManager: connecting to socket: %s\n", socketPath)
conn, err := net.Dial("unix", socketPath)
if err != nil {
Expand Down
Loading