From 5a2eb7abf934479069c5bd0ecb7a9476c7a6b3aa Mon Sep 17 00:00:00 2001 From: Knut Ahlers Date: Fri, 26 Feb 2016 18:44:10 +0100 Subject: [PATCH] Initial version --- .gitignore | 1 + config.yml | 6 +++ main.go | 137 +++++++++++++++++++++++++++++++++++++++++++++++++++++ syslog.go | 82 ++++++++++++++++++++++++++++++++ 4 files changed, 226 insertions(+) create mode 100644 .gitignore create mode 100644 config.yml create mode 100644 main.go create mode 100644 syslog.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d9438ac --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +rsyslog_cron diff --git a/config.yml b/config.yml new file mode 100644 index 0000000..714a629 --- /dev/null +++ b/config.yml @@ -0,0 +1,6 @@ +--- +rsyslog_target: logs.myserver.com:12383 +jobs: + - name: date + schedule: "0 * * * * *" + cmd: "/bin/date" diff --git a/main.go b/main.go new file mode 100644 index 0000000..841b8b8 --- /dev/null +++ b/main.go @@ -0,0 +1,137 @@ +package main + +import ( + "fmt" + "io/ioutil" + "log" + "os" + "os/exec" + "strings" + "time" + + "gopkg.in/yaml.v2" + + "github.com/Luzifer/rconfig" + "github.com/robfig/cron" +) + +var ( + cfg = struct { + ConfigFile string `flag:"config" default:"config.yaml" description:"Cron definition file"` + Hostname string `flag:"hostname" description:"Overwrite system hostname"` + }{} + version = "dev" + + logstream = make(chan *message, 1000) +) + +type cronConfig struct { + RSyslogTarget string `yaml:"rsyslog_target"` + Jobs []cronJob `yaml:"jobs"` +} + +type cronJob struct { + Name string `yaml:"name"` + Schedule string `yaml:"schedule"` + Command string `yaml:"cmd"` + Arguments []string `yaml:"args"` +} + +func init() { + rconfig.Parse(&cfg) + + if cfg.Hostname == "" { + hostname, _ := os.Hostname() + cfg.Hostname = hostname + } +} + +func main() { + body, err := ioutil.ReadFile(cfg.ConfigFile) + if err != nil { + log.Fatalf("Unable to read config file: %s") + } + + cc := cronConfig{} + if err := yaml.Unmarshal(body, &cc); err != nil { + log.Fatalf("Unable to parse config file: %s", err) + } + + c := cron.New() + + for i := range cc.Jobs { + job := cc.Jobs[i] + if err := c.AddFunc(job.Schedule, getJobExecutor(job)); err != nil { + log.Fatalf("Unable to add job '%s': %s", job.Name, err) + } + } + + c.Start() + + logadapter, err := NewSyslogAdapter(cc.RSyslogTarget) + if err != nil { + log.Fatalf("Unable to open syslog connection: %s", err) + } + logadapter.Stream(logstream) +} + +func getJobExecutor(job cronJob) func() { + return func() { + stdout := &messageChanWriter{ + jobName: job.Name, + msgChan: logstream, + severity: 6, // Informational + } + + stderr := &messageChanWriter{ + jobName: job.Name, + msgChan: logstream, + severity: 3, // Error + } + + fmt.Fprintln(stdout, "[SYS] Starting job") + + cmd := exec.Command(job.Command, job.Arguments...) + cmd.Stdout = stdout + cmd.Stderr = stderr + + err := cmd.Run() + switch err.(type) { + case nil: + fmt.Fprintln(stdout, "[SYS] Command execution successful") + case *exec.ExitError: + fmt.Fprintln(stderr, "[SYS] Command exited with unexpected exit code != 0") + default: + fmt.Fprintf(stderr, "[SYS] Execution caused error: %s\n", err) + } + } +} + +type messageChanWriter struct { + jobName string + msgChan chan *message + severity int + + buffer []byte +} + +func (m *messageChanWriter) Write(p []byte) (n int, err error) { + n = len(p) + err = nil + + m.buffer = append(m.buffer, p...) + if strings.Contains(string(m.buffer), "\n") { + lines := strings.Split(string(m.buffer), "\n") + for _, l := range lines[:len(lines)-1] { + m.msgChan <- &message{ + Date: time.Now(), + JobName: m.jobName, + Message: l, + Severity: m.severity, + } + } + m.buffer = []byte(lines[len(lines)-1]) + } + + return +} diff --git a/syslog.go b/syslog.go new file mode 100644 index 0000000..5b449a1 --- /dev/null +++ b/syslog.go @@ -0,0 +1,82 @@ +package main + +import ( + "bytes" + "fmt" + "io" + "net" + "time" + + "github.com/cenkalti/backoff" +) + +const ( + readWriteTimeout = 1 * time.Second + tcpDialTimeout = 5 * time.Second +) + +func NewSyslogAdapter(address string) (*SyslogAdapter, error) { + return &SyslogAdapter{ + address: address, + dialer: &net.Dialer{ + Timeout: tcpDialTimeout, + }, + }, nil +} + +type message struct { + Date time.Time + JobName string + Message string + Severity int +} + +type SyslogAdapter struct { + address string + dialer *net.Dialer +} + +func (a *SyslogAdapter) Stream(logstream chan *message) { + backoff.Retry(func() error { + conn, err := a.dialer.Dial("tcp", a.address) + if err != nil { + fmt.Printf("syslog: Unable to dial to remote address\n") + return fmt.Errorf("Catch me if you can.") + } + defer conn.Close() + + b := new(bytes.Buffer) + for msg := range logstream { + b.Reset() + + fmt.Fprintf(b, "<%d>%s %s %s: %s\n", + 16*8+msg.Severity, + msg.Date.Format("Jan 02 15:04:05"), + cfg.Hostname, + msg.JobName, + msg.Message, + ) + + if err := conn.SetDeadline(time.Now().Add(readWriteTimeout)); err != nil { + fmt.Printf("syslog: Unable to set deadline: %s\n", err) + return fmt.Errorf("Catch me if you can.") + } + + logLine := b.Bytes() + written, err := io.Copy(conn, b) + + if err != nil { + if written > 0 { + fmt.Printf("syslog: (%d/%d) %s\n", written, len(logLine), err) + } else { + fmt.Printf("syslog: %s\n", err) + } + return fmt.Errorf("syslog: %s", err) + } + } + + fmt.Printf("syslog: I got out of the channel watch. This should never happen.\n") + return fmt.Errorf("Wat? Why am I here?") + + }, &backoff.ZeroBackOff{}) +}