From 23d32bf5a0cefa99eeffa3f1cb43df8b9b969525 Mon Sep 17 00:00:00 2001 From: Knut Ahlers Date: Sun, 15 Apr 2018 13:54:39 +0200 Subject: [PATCH] Convert to elasticsearch logging --- .gitignore | 2 +- README.md | 2 + config.yml | 12 ++++- main.go | 141 ++++++++++++++++++++++++++++------------------------- 4 files changed, 88 insertions(+), 69 deletions(-) diff --git a/.gitignore b/.gitignore index d9438ac..a45afa5 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1 @@ -rsyslog_cron +elastic_cron diff --git a/README.md b/README.md index 26a5595..cf67342 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,7 @@ This project is a quick and dirty replacement for running a cron daemon inside d --- elasticsearch: + index: 'elastic_cron-%{+YYYY.MM.dd}' servers: - http://localhost:9200 auth: [username, password] @@ -38,6 +39,7 @@ jobs: ``` - `elasticsearch` + - `index` - Name of the index to write messages to (understands same date specifier as ES beats) - `servers` - List of elasticsearch instances of the same cluster to log to - `auth` - List consisting of two elements: username and password - `schedule` - consists of 6 instead of the normal 5 fields: diff --git a/config.yml b/config.yml index 714a629..b0fe8da 100644 --- a/config.yml +++ b/config.yml @@ -1,6 +1,16 @@ --- -rsyslog_target: logs.myserver.com:12383 + +elasticsearch: + index: 'elastic_cron-%{+YYYY.MM.dd}' + servers: + - http://localhost:9200 + auth: [username, password] + jobs: - name: date schedule: "0 * * * * *" cmd: "/bin/date" + args: + - "+%+" + +... diff --git a/main.go b/main.go index 27c279a..70fb905 100644 --- a/main.go +++ b/main.go @@ -2,17 +2,20 @@ package main import ( "fmt" - "io/ioutil" - "log" "net/http" "os" "os/exec" + "regexp" "strings" "time" "github.com/Luzifer/rconfig" + "github.com/elastic/beats/libbeat/common/dtfmt" + "github.com/olivere/elastic" "github.com/robfig/cron" + log "github.com/sirupsen/logrus" "golang.org/x/net/context" + elogrus "gopkg.in/sohlich/elogrus.v3" "gopkg.in/yaml.v2" ) @@ -22,15 +25,17 @@ var ( Hostname string `flag:"hostname" description:"Overwrite system hostname"` PingTimout time.Duration `flag:"ping-timeout" default:"1s" description:"Timeout for success / failure pings"` }{} - version = "dev" - logstream = make(chan *message, 1000) + version = "dev" ) type cronConfig struct { - RSyslogTarget string `yaml:"rsyslog_target"` - LogTemplate string `yaml:"log_template"` - Jobs []cronJob `yaml:"jobs"` + Elasticsearch struct { + Auth []string `yaml:"auth"` + Index string `yaml:"index"` + Servers []string `yaml:"servers"` + } `yaml:"elasticsearch"` + Jobs []cronJob `yaml:"jobs"` } type cronJob struct { @@ -43,7 +48,7 @@ type cronJob struct { } func init() { - rconfig.Parse(&cfg) + rconfig.ParseAndValidate(&cfg) if cfg.Hostname == "" { hostname, _ := os.Hostname() @@ -51,19 +56,24 @@ func init() { } } +func readConfig() (*cronConfig, error) { + fp, err := os.Open(cfg.ConfigFile) + if err != nil { + return nil, err + } + defer fp.Close() + + cc := &cronConfig{} + cc.Elasticsearch.Index = "elastic_cron-%{+YYYY.MM.dd}" + return cc, yaml.NewDecoder(fp).Decode(cc) +} + func main() { - body, err := ioutil.ReadFile(cfg.ConfigFile) + cc, err := readConfig() if err != nil { log.Fatalf("Unable to read config file: %s", err) } - cc := cronConfig{ - LogTemplate: `<{{ syslogpri .Severity }}>{{ .Date.Format "Jan 02 15:04:05" }} {{ .Hostname }} {{ .JobName }}: {{ .Message }}`, - } - if err := yaml.Unmarshal(body, &cc); err != nil { - log.Fatalf("Unable to parse config file: %s", err) - } - c := cron.New() for i := range cc.Jobs { @@ -73,28 +83,54 @@ func main() { } } - c.Start() - - logadapter, err := NewSyslogAdapter(cc.RSyslogTarget, cc.LogTemplate) - if err != nil { - log.Fatalf("Unable to open syslog connection: %s", err) + opts := []elastic.ClientOptionFunc{ + elastic.SetURL(cc.Elasticsearch.Servers...), + } + + if cc.Elasticsearch.Auth != nil && len(cc.Elasticsearch.Auth) == 2 && cc.Elasticsearch.Auth[0] != "" { + opts = append(opts, elastic.SetBasicAuth(cc.Elasticsearch.Auth[0], cc.Elasticsearch.Auth[1])) + } + + esClient, err := elastic.NewSimpleClient(opts...) + if err != nil { + log.WithError(err).Fatal("Unable to create elasticsearch client") + } + + hook, err := elogrus.NewElasticHookWithFunc(esClient, cfg.Hostname, log.InfoLevel, getIndexNameFunc(cc)) + if err != nil { + log.WithError(err).Fatal("Unable to create elasticsearch log hook") + } + log.AddHook(hook) + + c.Run() +} + +func getIndexNameFunc(cc *cronConfig) func() string { + if !strings.Contains(cc.Elasticsearch.Index, `%{+`) { + // Simple string without date expansion + return func() string { return cc.Elasticsearch.Index } + } + + return func() string { + rex := regexp.MustCompile(`%{\+([^}]+)}`) + return rex.ReplaceAllStringFunc(cc.Elasticsearch.Index, func(f string) string { + f = strings.TrimSuffix(strings.TrimPrefix(f, `%{+`), `}`) + d, _ := dtfmt.Format(time.Now(), f) + return d + }) } - logadapter.Stream(logstream) } func getJobExecutor(job cronJob) func() { return func() { - stdout := &messageChanWriter{ - jobName: job.Name, - msgChan: logstream, - severity: 6, // Informational - } + logger := log.WithFields(log.Fields{ + "job": job.Name, + }) - stderr := &messageChanWriter{ - jobName: job.Name, - msgChan: logstream, - severity: 3, // Error - } + stdout := logger.WriterLevel(log.InfoLevel) + defer stdout.Close() + stderr := logger.WriterLevel(log.ErrorLevel) + defer stderr.Close() fmt.Fprintln(stdout, "[SYS] Starting job") @@ -105,26 +141,26 @@ func getJobExecutor(job cronJob) func() { err := cmd.Run() switch err.(type) { case nil: - fmt.Fprintln(stdout, "[SYS] Command execution successful") + logger.Info("[SYS] Command execution successful") go func(url string) { if err := doPing(url); err != nil { - fmt.Fprintf(stderr, "[SYS] Ping to URL %q caused an error: %s", url, err) + logger.WithError(err).Errorf("[SYS] Ping to URL %q caused an error", url) } }(job.PingSuccess) case *exec.ExitError: - fmt.Fprintln(stderr, "[SYS] Command exited with unexpected exit code != 0") + logger.Info("[SYS] Command exited with unexpected exit code != 0") go func(url string) { if err := doPing(url); err != nil { - fmt.Fprintf(stderr, "[SYS] Ping to URL %q caused an error: %s", url, err) + logger.WithError(err).Errorf("[SYS] Ping to URL %q caused an error", url) } }(job.PingFailure) default: - fmt.Fprintf(stderr, "[SYS] Execution caused error: %s\n", err) + logger.WithError(err).Error("[SYS] Execution caused error") go func(url string) { if err := doPing(url); err != nil { - fmt.Fprintf(stderr, "[SYS] Ping to URL %q caused an error: %s", url, err) + logger.WithError(err).Errorf("[SYS] Ping to URL %q caused an error", url) } }(job.PingFailure) @@ -155,32 +191,3 @@ func doPing(url string) error { return nil } - -type messageChanWriter struct { - jobName string - msgChan chan *message - severity int - - buffer []byte -} - -func (m *messageChanWriter) Write(p []byte) (n int, err error) { - n = len(p) - err = nil - - m.buffer = append(m.buffer, p...) - if strings.Contains(string(m.buffer), "\n") { - lines := strings.Split(string(m.buffer), "\n") - for _, l := range lines[:len(lines)-1] { - m.msgChan <- &message{ - Date: time.Now(), - JobName: m.jobName, - Message: l, - Severity: m.severity, - } - } - m.buffer = []byte(lines[len(lines)-1]) - } - - return -}