1
0
Fork 0
mirror of https://github.com/Luzifer/elastic_cron.git synced 2024-12-22 18:31:20 +00:00

Convert to elasticsearch logging

This commit is contained in:
Knut Ahlers 2018-04-15 13:54:39 +02:00
parent c752a62b0f
commit 23d32bf5a0
Signed by: luzifer
GPG key ID: DC2729FDD34BE99E
4 changed files with 88 additions and 69 deletions

2
.gitignore vendored
View file

@ -1 +1 @@
rsyslog_cron elastic_cron

View file

@ -21,6 +21,7 @@ This project is a quick and dirty replacement for running a cron daemon inside d
--- ---
elasticsearch: elasticsearch:
index: 'elastic_cron-%{+YYYY.MM.dd}'
servers: servers:
- http://localhost:9200 - http://localhost:9200
auth: [username, password] auth: [username, password]
@ -38,6 +39,7 @@ jobs:
``` ```
- `elasticsearch` - `elasticsearch`
- `index` - Name of the index to write messages to (understands same date specifier as ES beats)
- `servers` - List of elasticsearch instances of the same cluster to log to - `servers` - List of elasticsearch instances of the same cluster to log to
- `auth` - List consisting of two elements: username and password - `auth` - List consisting of two elements: username and password
- `schedule` - consists of 6 instead of the normal 5 fields: - `schedule` - consists of 6 instead of the normal 5 fields:

View file

@ -1,6 +1,16 @@
--- ---
rsyslog_target: logs.myserver.com:12383
elasticsearch:
index: 'elastic_cron-%{+YYYY.MM.dd}'
servers:
- http://localhost:9200
auth: [username, password]
jobs: jobs:
- name: date - name: date
schedule: "0 * * * * *" schedule: "0 * * * * *"
cmd: "/bin/date" cmd: "/bin/date"
args:
- "+%+"
...

139
main.go
View file

@ -2,17 +2,20 @@ package main
import ( import (
"fmt" "fmt"
"io/ioutil"
"log"
"net/http" "net/http"
"os" "os"
"os/exec" "os/exec"
"regexp"
"strings" "strings"
"time" "time"
"github.com/Luzifer/rconfig" "github.com/Luzifer/rconfig"
"github.com/elastic/beats/libbeat/common/dtfmt"
"github.com/olivere/elastic"
"github.com/robfig/cron" "github.com/robfig/cron"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context" "golang.org/x/net/context"
elogrus "gopkg.in/sohlich/elogrus.v3"
"gopkg.in/yaml.v2" "gopkg.in/yaml.v2"
) )
@ -22,14 +25,16 @@ var (
Hostname string `flag:"hostname" description:"Overwrite system hostname"` Hostname string `flag:"hostname" description:"Overwrite system hostname"`
PingTimout time.Duration `flag:"ping-timeout" default:"1s" description:"Timeout for success / failure pings"` PingTimout time.Duration `flag:"ping-timeout" default:"1s" description:"Timeout for success / failure pings"`
}{} }{}
version = "dev"
logstream = make(chan *message, 1000) version = "dev"
) )
type cronConfig struct { type cronConfig struct {
RSyslogTarget string `yaml:"rsyslog_target"` Elasticsearch struct {
LogTemplate string `yaml:"log_template"` Auth []string `yaml:"auth"`
Index string `yaml:"index"`
Servers []string `yaml:"servers"`
} `yaml:"elasticsearch"`
Jobs []cronJob `yaml:"jobs"` Jobs []cronJob `yaml:"jobs"`
} }
@ -43,7 +48,7 @@ type cronJob struct {
} }
func init() { func init() {
rconfig.Parse(&cfg) rconfig.ParseAndValidate(&cfg)
if cfg.Hostname == "" { if cfg.Hostname == "" {
hostname, _ := os.Hostname() hostname, _ := os.Hostname()
@ -51,19 +56,24 @@ func init() {
} }
} }
func readConfig() (*cronConfig, error) {
fp, err := os.Open(cfg.ConfigFile)
if err != nil {
return nil, err
}
defer fp.Close()
cc := &cronConfig{}
cc.Elasticsearch.Index = "elastic_cron-%{+YYYY.MM.dd}"
return cc, yaml.NewDecoder(fp).Decode(cc)
}
func main() { func main() {
body, err := ioutil.ReadFile(cfg.ConfigFile) cc, err := readConfig()
if err != nil { if err != nil {
log.Fatalf("Unable to read config file: %s", err) log.Fatalf("Unable to read config file: %s", err)
} }
cc := cronConfig{
LogTemplate: `<{{ syslogpri .Severity }}>{{ .Date.Format "Jan 02 15:04:05" }} {{ .Hostname }} {{ .JobName }}: {{ .Message }}`,
}
if err := yaml.Unmarshal(body, &cc); err != nil {
log.Fatalf("Unable to parse config file: %s", err)
}
c := cron.New() c := cron.New()
for i := range cc.Jobs { for i := range cc.Jobs {
@ -73,28 +83,54 @@ func main() {
} }
} }
c.Start() opts := []elastic.ClientOptionFunc{
elastic.SetURL(cc.Elasticsearch.Servers...),
logadapter, err := NewSyslogAdapter(cc.RSyslogTarget, cc.LogTemplate) }
if err != nil {
log.Fatalf("Unable to open syslog connection: %s", err) if cc.Elasticsearch.Auth != nil && len(cc.Elasticsearch.Auth) == 2 && cc.Elasticsearch.Auth[0] != "" {
opts = append(opts, elastic.SetBasicAuth(cc.Elasticsearch.Auth[0], cc.Elasticsearch.Auth[1]))
}
esClient, err := elastic.NewSimpleClient(opts...)
if err != nil {
log.WithError(err).Fatal("Unable to create elasticsearch client")
}
hook, err := elogrus.NewElasticHookWithFunc(esClient, cfg.Hostname, log.InfoLevel, getIndexNameFunc(cc))
if err != nil {
log.WithError(err).Fatal("Unable to create elasticsearch log hook")
}
log.AddHook(hook)
c.Run()
}
func getIndexNameFunc(cc *cronConfig) func() string {
if !strings.Contains(cc.Elasticsearch.Index, `%{+`) {
// Simple string without date expansion
return func() string { return cc.Elasticsearch.Index }
}
return func() string {
rex := regexp.MustCompile(`%{\+([^}]+)}`)
return rex.ReplaceAllStringFunc(cc.Elasticsearch.Index, func(f string) string {
f = strings.TrimSuffix(strings.TrimPrefix(f, `%{+`), `}`)
d, _ := dtfmt.Format(time.Now(), f)
return d
})
} }
logadapter.Stream(logstream)
} }
func getJobExecutor(job cronJob) func() { func getJobExecutor(job cronJob) func() {
return func() { return func() {
stdout := &messageChanWriter{ logger := log.WithFields(log.Fields{
jobName: job.Name, "job": job.Name,
msgChan: logstream, })
severity: 6, // Informational
}
stderr := &messageChanWriter{ stdout := logger.WriterLevel(log.InfoLevel)
jobName: job.Name, defer stdout.Close()
msgChan: logstream, stderr := logger.WriterLevel(log.ErrorLevel)
severity: 3, // Error defer stderr.Close()
}
fmt.Fprintln(stdout, "[SYS] Starting job") fmt.Fprintln(stdout, "[SYS] Starting job")
@ -105,26 +141,26 @@ func getJobExecutor(job cronJob) func() {
err := cmd.Run() err := cmd.Run()
switch err.(type) { switch err.(type) {
case nil: case nil:
fmt.Fprintln(stdout, "[SYS] Command execution successful") logger.Info("[SYS] Command execution successful")
go func(url string) { go func(url string) {
if err := doPing(url); err != nil { if err := doPing(url); err != nil {
fmt.Fprintf(stderr, "[SYS] Ping to URL %q caused an error: %s", url, err) logger.WithError(err).Errorf("[SYS] Ping to URL %q caused an error", url)
} }
}(job.PingSuccess) }(job.PingSuccess)
case *exec.ExitError: case *exec.ExitError:
fmt.Fprintln(stderr, "[SYS] Command exited with unexpected exit code != 0") logger.Info("[SYS] Command exited with unexpected exit code != 0")
go func(url string) { go func(url string) {
if err := doPing(url); err != nil { if err := doPing(url); err != nil {
fmt.Fprintf(stderr, "[SYS] Ping to URL %q caused an error: %s", url, err) logger.WithError(err).Errorf("[SYS] Ping to URL %q caused an error", url)
} }
}(job.PingFailure) }(job.PingFailure)
default: default:
fmt.Fprintf(stderr, "[SYS] Execution caused error: %s\n", err) logger.WithError(err).Error("[SYS] Execution caused error")
go func(url string) { go func(url string) {
if err := doPing(url); err != nil { if err := doPing(url); err != nil {
fmt.Fprintf(stderr, "[SYS] Ping to URL %q caused an error: %s", url, err) logger.WithError(err).Errorf("[SYS] Ping to URL %q caused an error", url)
} }
}(job.PingFailure) }(job.PingFailure)
@ -155,32 +191,3 @@ func doPing(url string) error {
return nil return nil
} }
type messageChanWriter struct {
jobName string
msgChan chan *message
severity int
buffer []byte
}
func (m *messageChanWriter) Write(p []byte) (n int, err error) {
n = len(p)
err = nil
m.buffer = append(m.buffer, p...)
if strings.Contains(string(m.buffer), "\n") {
lines := strings.Split(string(m.buffer), "\n")
for _, l := range lines[:len(lines)-1] {
m.msgChan <- &message{
Date: time.Now(),
JobName: m.jobName,
Message: l,
Severity: m.severity,
}
}
m.buffer = []byte(lines[len(lines)-1])
}
return
}