mirror of
https://github.com/Luzifer/elastic_cron.git
synced 2024-12-22 18:31:20 +00:00
Initial version
This commit is contained in:
commit
5a2eb7abf9
4 changed files with 226 additions and 0 deletions
1
.gitignore
vendored
Normal file
1
.gitignore
vendored
Normal file
|
@ -0,0 +1 @@
|
|||
rsyslog_cron
|
6
config.yml
Normal file
6
config.yml
Normal file
|
@ -0,0 +1,6 @@
|
|||
---
|
||||
rsyslog_target: logs.myserver.com:12383
|
||||
jobs:
|
||||
- name: date
|
||||
schedule: "0 * * * * *"
|
||||
cmd: "/bin/date"
|
137
main.go
Normal file
137
main.go
Normal file
|
@ -0,0 +1,137 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"os"
|
||||
"os/exec"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"gopkg.in/yaml.v2"
|
||||
|
||||
"github.com/Luzifer/rconfig"
|
||||
"github.com/robfig/cron"
|
||||
)
|
||||
|
||||
var (
|
||||
cfg = struct {
|
||||
ConfigFile string `flag:"config" default:"config.yaml" description:"Cron definition file"`
|
||||
Hostname string `flag:"hostname" description:"Overwrite system hostname"`
|
||||
}{}
|
||||
version = "dev"
|
||||
|
||||
logstream = make(chan *message, 1000)
|
||||
)
|
||||
|
||||
type cronConfig struct {
|
||||
RSyslogTarget string `yaml:"rsyslog_target"`
|
||||
Jobs []cronJob `yaml:"jobs"`
|
||||
}
|
||||
|
||||
type cronJob struct {
|
||||
Name string `yaml:"name"`
|
||||
Schedule string `yaml:"schedule"`
|
||||
Command string `yaml:"cmd"`
|
||||
Arguments []string `yaml:"args"`
|
||||
}
|
||||
|
||||
func init() {
|
||||
rconfig.Parse(&cfg)
|
||||
|
||||
if cfg.Hostname == "" {
|
||||
hostname, _ := os.Hostname()
|
||||
cfg.Hostname = hostname
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
body, err := ioutil.ReadFile(cfg.ConfigFile)
|
||||
if err != nil {
|
||||
log.Fatalf("Unable to read config file: %s")
|
||||
}
|
||||
|
||||
cc := cronConfig{}
|
||||
if err := yaml.Unmarshal(body, &cc); err != nil {
|
||||
log.Fatalf("Unable to parse config file: %s", err)
|
||||
}
|
||||
|
||||
c := cron.New()
|
||||
|
||||
for i := range cc.Jobs {
|
||||
job := cc.Jobs[i]
|
||||
if err := c.AddFunc(job.Schedule, getJobExecutor(job)); err != nil {
|
||||
log.Fatalf("Unable to add job '%s': %s", job.Name, err)
|
||||
}
|
||||
}
|
||||
|
||||
c.Start()
|
||||
|
||||
logadapter, err := NewSyslogAdapter(cc.RSyslogTarget)
|
||||
if err != nil {
|
||||
log.Fatalf("Unable to open syslog connection: %s", err)
|
||||
}
|
||||
logadapter.Stream(logstream)
|
||||
}
|
||||
|
||||
func getJobExecutor(job cronJob) func() {
|
||||
return func() {
|
||||
stdout := &messageChanWriter{
|
||||
jobName: job.Name,
|
||||
msgChan: logstream,
|
||||
severity: 6, // Informational
|
||||
}
|
||||
|
||||
stderr := &messageChanWriter{
|
||||
jobName: job.Name,
|
||||
msgChan: logstream,
|
||||
severity: 3, // Error
|
||||
}
|
||||
|
||||
fmt.Fprintln(stdout, "[SYS] Starting job")
|
||||
|
||||
cmd := exec.Command(job.Command, job.Arguments...)
|
||||
cmd.Stdout = stdout
|
||||
cmd.Stderr = stderr
|
||||
|
||||
err := cmd.Run()
|
||||
switch err.(type) {
|
||||
case nil:
|
||||
fmt.Fprintln(stdout, "[SYS] Command execution successful")
|
||||
case *exec.ExitError:
|
||||
fmt.Fprintln(stderr, "[SYS] Command exited with unexpected exit code != 0")
|
||||
default:
|
||||
fmt.Fprintf(stderr, "[SYS] Execution caused error: %s\n", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type messageChanWriter struct {
|
||||
jobName string
|
||||
msgChan chan *message
|
||||
severity int
|
||||
|
||||
buffer []byte
|
||||
}
|
||||
|
||||
func (m *messageChanWriter) Write(p []byte) (n int, err error) {
|
||||
n = len(p)
|
||||
err = nil
|
||||
|
||||
m.buffer = append(m.buffer, p...)
|
||||
if strings.Contains(string(m.buffer), "\n") {
|
||||
lines := strings.Split(string(m.buffer), "\n")
|
||||
for _, l := range lines[:len(lines)-1] {
|
||||
m.msgChan <- &message{
|
||||
Date: time.Now(),
|
||||
JobName: m.jobName,
|
||||
Message: l,
|
||||
Severity: m.severity,
|
||||
}
|
||||
}
|
||||
m.buffer = []byte(lines[len(lines)-1])
|
||||
}
|
||||
|
||||
return
|
||||
}
|
82
syslog.go
Normal file
82
syslog.go
Normal file
|
@ -0,0 +1,82 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"github.com/cenkalti/backoff"
|
||||
)
|
||||
|
||||
const (
|
||||
readWriteTimeout = 1 * time.Second
|
||||
tcpDialTimeout = 5 * time.Second
|
||||
)
|
||||
|
||||
func NewSyslogAdapter(address string) (*SyslogAdapter, error) {
|
||||
return &SyslogAdapter{
|
||||
address: address,
|
||||
dialer: &net.Dialer{
|
||||
Timeout: tcpDialTimeout,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
type message struct {
|
||||
Date time.Time
|
||||
JobName string
|
||||
Message string
|
||||
Severity int
|
||||
}
|
||||
|
||||
type SyslogAdapter struct {
|
||||
address string
|
||||
dialer *net.Dialer
|
||||
}
|
||||
|
||||
func (a *SyslogAdapter) Stream(logstream chan *message) {
|
||||
backoff.Retry(func() error {
|
||||
conn, err := a.dialer.Dial("tcp", a.address)
|
||||
if err != nil {
|
||||
fmt.Printf("syslog: Unable to dial to remote address\n")
|
||||
return fmt.Errorf("Catch me if you can.")
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
b := new(bytes.Buffer)
|
||||
for msg := range logstream {
|
||||
b.Reset()
|
||||
|
||||
fmt.Fprintf(b, "<%d>%s %s %s: %s\n",
|
||||
16*8+msg.Severity,
|
||||
msg.Date.Format("Jan 02 15:04:05"),
|
||||
cfg.Hostname,
|
||||
msg.JobName,
|
||||
msg.Message,
|
||||
)
|
||||
|
||||
if err := conn.SetDeadline(time.Now().Add(readWriteTimeout)); err != nil {
|
||||
fmt.Printf("syslog: Unable to set deadline: %s\n", err)
|
||||
return fmt.Errorf("Catch me if you can.")
|
||||
}
|
||||
|
||||
logLine := b.Bytes()
|
||||
written, err := io.Copy(conn, b)
|
||||
|
||||
if err != nil {
|
||||
if written > 0 {
|
||||
fmt.Printf("syslog: (%d/%d) %s\n", written, len(logLine), err)
|
||||
} else {
|
||||
fmt.Printf("syslog: %s\n", err)
|
||||
}
|
||||
return fmt.Errorf("syslog: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
fmt.Printf("syslog: I got out of the channel watch. This should never happen.\n")
|
||||
return fmt.Errorf("Wat? Why am I here?")
|
||||
|
||||
}, &backoff.ZeroBackOff{})
|
||||
}
|
Loading…
Reference in a new issue