diff --git a/cmd/wsh/cmd/wshcmd-connserver.go b/cmd/wsh/cmd/wshcmd-connserver.go index 86f6c2a923..8fde91dc3c 100644 --- a/cmd/wsh/cmd/wshcmd-connserver.go +++ b/cmd/wsh/cmd/wshcmd-connserver.go @@ -38,6 +38,12 @@ var serverCmd = &cobra.Command{ RunE: serverRun, } +const ( + JobLogRetentionTime = 48 * time.Hour + JobLogCleanupDelay = 10 * time.Second + JobLogCleanupInterval = 1 * time.Hour +) + var connServerRouter bool var connServerRouterDomainSocket bool var connServerConnName string @@ -53,6 +59,61 @@ func init() { rootCmd.AddCommand(serverCmd) } +func cleanupOldJobLogs() { + jobDir := wavebase.GetRemoteJobLogDir() + entries, err := os.ReadDir(jobDir) + if err != nil { + return + } + + cutoffTime := time.Now().Add(-JobLogRetentionTime) + + for _, entry := range entries { + if entry.IsDir() { + continue + } + + name := entry.Name() + if !strings.HasSuffix(name, ".log") { + continue + } + + info, err := entry.Info() + if err != nil { + continue + } + + if info.ModTime().Before(cutoffTime) { + filePath := filepath.Join(jobDir, name) + err := os.Remove(filePath) + if err != nil { + log.Printf("error removing old job log file %s: %v", filePath, err) + } else { + log.Printf("removed old job log file: %s", filePath) + } + } + } +} + +func startJobLogCleanup() { + go func() { + defer func() { + panichandler.PanicHandler("startJobLogCleanup", recover()) + }() + + time.Sleep(JobLogCleanupDelay) + + cleanupOldJobLogs() + + ticker := time.NewTicker(JobLogCleanupInterval) + defer ticker.Stop() + + for range ticker.C { + cleanupOldJobLogs() + } + }() +} + func getRemoteDomainSocketName() string { homeDir := wavebase.GetHomeDir() return filepath.Join(homeDir, wavebase.RemoteWaveHomeDirName, wavebase.RemoteDomainSocketBaseName) @@ -218,6 +279,7 @@ func serverRunRouter() error { }() wshremote.RunSysInfoLoop(client, connServerConnName) }() + startJobLogCleanup() log.Printf("running server, successfully started") select {} } @@ -324,6 +386,7 @@ func serverRunRouterDomainSocket(jwtToken string) error { }() wshremote.RunSysInfoLoop(client, connServerConnName) }() + startJobLogCleanup() log.Printf("running server (router-domainsocket mode), successfully started") select {} @@ -346,6 +409,7 @@ func serverRunNormal(jwtToken string) error { }() wshremote.RunSysInfoLoop(RpcClient, RpcContext.Conn) }() + startJobLogCleanup() select {} // run forever } diff --git a/pkg/jobmanager/jobmanager.go b/pkg/jobmanager/jobmanager.go index 7afaec047e..dd58bccc52 100644 --- a/pkg/jobmanager/jobmanager.go +++ b/pkg/jobmanager/jobmanager.go @@ -79,6 +79,17 @@ func SetupJobManager(clientId string, jobId string, publicKeyBytes []byte, jobAu return fmt.Errorf("failed to daemonize: %w", err) } + go func() { + defer func() { + panichandler.PanicHandler("JobManager:keepalive", recover()) + }() + ticker := time.NewTicker(1 * time.Hour) + defer ticker.Stop() + for range ticker.C { + log.Printf("keepalive: job manager active\n") + } + }() + return nil } @@ -365,17 +376,6 @@ func (jm *JobManager) StartStream(msc *MainServerConn) error { return nil } -func GetJobSocketPath(jobId string) string { - socketDir := filepath.Join("/tmp", fmt.Sprintf("waveterm-%d", os.Getuid())) - return filepath.Join(socketDir, fmt.Sprintf("%s.sock", jobId)) -} - -func GetJobFilePath(clientId string, jobId string, extension string) string { - homeDir := wavebase.GetHomeDir() - jobDir := filepath.Join(homeDir, ".waveterm", "jobs", clientId) - return filepath.Join(jobDir, fmt.Sprintf("%s.%s", jobId, extension)) -} - func MakeJobDomainSocket(clientId string, jobId string) error { socketDir := filepath.Join("/tmp", fmt.Sprintf("waveterm-%d", os.Getuid())) err := os.MkdirAll(socketDir, 0700) @@ -383,7 +383,7 @@ func MakeJobDomainSocket(clientId string, jobId string) error { return fmt.Errorf("failed to create socket directory: %w", err) } - socketPath := GetJobSocketPath(jobId) + socketPath := wavebase.GetRemoteJobSocketPath(jobId) os.Remove(socketPath) diff --git a/pkg/jobmanager/jobmanager_unix.go b/pkg/jobmanager/jobmanager_unix.go index 3ab26ff255..c9c07e1843 100644 --- a/pkg/jobmanager/jobmanager_unix.go +++ b/pkg/jobmanager/jobmanager_unix.go @@ -13,6 +13,7 @@ import ( "path/filepath" "syscall" + "github.com/wavetermdev/waveterm/pkg/wavebase" "golang.org/x/sys/unix" ) @@ -32,7 +33,7 @@ func daemonize(clientId string, jobId string) error { } devNull.Close() - logPath := GetJobFilePath(clientId, jobId, "log") + logPath := wavebase.GetRemoteJobFilePath(jobId, "log") logDir := filepath.Dir(logPath) err = os.MkdirAll(logDir, 0700) if err != nil { @@ -54,6 +55,7 @@ func daemonize(clientId string, jobId string) error { log.SetOutput(logFile) log.Printf("job manager daemonized, logging to %s\n", logPath) + log.Printf("job owner clientid: %s\n", clientId) signal.Ignore(syscall.SIGHUP) diff --git a/pkg/jobmanager/mainserverconn.go b/pkg/jobmanager/mainserverconn.go index de5965128d..33bb10cdfb 100644 --- a/pkg/jobmanager/mainserverconn.go +++ b/pkg/jobmanager/mainserverconn.go @@ -42,8 +42,8 @@ type routedDataSender struct { } func (rds *routedDataSender) SendData(dataPk wshrpc.CommandStreamData) { - log.Printf("SendData: sending seq=%d, len=%d, eof=%t, error=%s, route=%s", - dataPk.Seq, len(dataPk.Data64), dataPk.Eof, dataPk.Error, rds.route) + // log.Printf("SendData: sending seq=%d, len=%d, eof=%t, error=%s, route=%s", + // dataPk.Seq, len(dataPk.Data64), dataPk.Eof, dataPk.Error, rds.route) err := wshclient.StreamDataCommand(rds.wshRpc, dataPk, &wshrpc.RpcOpts{NoResponse: true, Route: rds.route}) if err != nil { log.Printf("SendData: error sending stream data: %v\n", err) @@ -132,4 +132,3 @@ func (msc *MainServerConn) JobInputCommand(ctx context.Context, data wshrpc.Comm WshCmdJobManager.InputQueue.QueueItem(data.InputSessionId, data.SeqNum, data) return nil } - diff --git a/pkg/telemetry/telemetry.go b/pkg/telemetry/telemetry.go index 5c632609d6..bac37600ee 100644 --- a/pkg/telemetry/telemetry.go +++ b/pkg/telemetry/telemetry.go @@ -162,8 +162,8 @@ func mergeActivity(curActivity *telemetrydata.TEventProps, newActivity telemetry // ignores the timestamp in tevent, and uses the current time func updateActivityTEvent(ctx context.Context, tevent *telemetrydata.TEvent) error { eventTs := time.Now() - // compute to 2-hour boundary, and round up to next 2-hour boundary - eventTs = eventTs.Truncate(2 * time.Hour).Add(2 * time.Hour) + // compute to 1-hour boundary, and round up to next 1-hour boundary + eventTs = eventTs.Truncate(time.Hour).Add(time.Hour) return wstore.WithTx(ctx, func(tx *wstore.TxWrap) error { // find event that matches this timestamp with event name "app:activity" @@ -195,7 +195,7 @@ func updateActivityTEvent(ctx context.Context, tevent *telemetrydata.TEvent) err func TruncateActivityTEventForShutdown(ctx context.Context) error { nowTs := time.Now() - eventTs := nowTs.Truncate(2 * time.Hour).Add(2 * time.Hour) + eventTs := nowTs.Truncate(time.Hour).Add(time.Hour) return wstore.WithTx(ctx, func(tx *wstore.TxWrap) error { // find event that matches this timestamp with event name "app:activity" uuidStr := tx.GetString(`SELECT uuid FROM db_tevent WHERE ts = ? AND event = ?`, eventTs.UnixMilli(), ActivityEventName) diff --git a/pkg/wavebase/wavebase.go b/pkg/wavebase/wavebase.go index 8bed6e1ccc..822451ebb3 100644 --- a/pkg/wavebase/wavebase.go +++ b/pkg/wavebase/wavebase.go @@ -435,3 +435,22 @@ func getSystemSummary(ctx context.Context) string { return fmt.Sprintf("%s (%s)", runtime.GOOS, runtime.GOARCH) } } + +// job socket path on remote machine +func GetRemoteJobSocketPath(jobId string) string { + socketDir := filepath.Join("/tmp", fmt.Sprintf("waveterm-%d", os.Getuid())) + return filepath.Join(socketDir, fmt.Sprintf("%s.sock", jobId)) +} + +// job file path on remote machine +func GetRemoteJobFilePath(jobId string, extension string) string { + jobDir := GetRemoteJobLogDir() + return filepath.Join(jobDir, fmt.Sprintf("%s.%s", jobId, extension)) +} + +// job file dir on remote machines +func GetRemoteJobLogDir() string { + homeDir := GetHomeDir() + jobDir := filepath.Join(homeDir, ".waveterm", "jobs") + return jobDir +} diff --git a/pkg/wshrpc/wshremote/wshremote_job.go b/pkg/wshrpc/wshremote/wshremote_job.go index 09310e09da..b357116427 100644 --- a/pkg/wshrpc/wshremote/wshremote_job.go +++ b/pkg/wshrpc/wshremote/wshremote_job.go @@ -17,7 +17,7 @@ import ( "time" "github.com/shirou/gopsutil/v4/process" - "github.com/wavetermdev/waveterm/pkg/jobmanager" + "github.com/wavetermdev/waveterm/pkg/wavebase" "github.com/wavetermdev/waveterm/pkg/wshrpc" "github.com/wavetermdev/waveterm/pkg/wshrpc/wshclient" "github.com/wavetermdev/waveterm/pkg/wshutil" @@ -43,7 +43,7 @@ func isProcessRunning(pid int, pidStartTs int64) (*process.Process, error) { // returns jobRouteId, cleanupFunc, error func (impl *ServerImpl) connectToJobManager(ctx context.Context, jobId string, mainServerJwtToken string) (string, func(), error) { - socketPath := jobmanager.GetJobSocketPath(jobId) + socketPath := wavebase.GetRemoteJobSocketPath(jobId) log.Printf("connectToJobManager: connecting to socket: %s\n", socketPath) conn, err := net.Dial("unix", socketPath) if err != nil {