From fee1f1a67d8f5ecbf6a12bafee898002df6b25e6 Mon Sep 17 00:00:00 2001 From: Shizun Ge Date: Tue, 16 Jan 2024 21:06:03 -0800 Subject: [PATCH] re-org files into packages. update copyright. accept multiple ports. --- client.go => client/client.go | 11 +- {coordinates => geoip}/country.go | 8 +- geoip.go => geoip/geoip.go | 12 +- main.go | 225 +++++++++--------------------- metrics/metrics.go | 154 ++++++++++++++++++++ 5 files changed, 234 insertions(+), 176 deletions(-) rename client.go => client/client.go (92%) rename {coordinates => geoip}/country.go (98%) rename geoip.go => geoip/geoip.go (95%) create mode 100644 metrics/metrics.go diff --git a/client.go b/client/client.go similarity index 92% rename from client.go rename to client/client.go index e8b7c8b..17f4c86 100644 --- a/client.go +++ b/client/client.go @@ -1,4 +1,4 @@ -// Copyright (C) 2021 Shizun Ge +// Copyright (C) 2021-2024 Shizun Ge // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by @@ -14,11 +14,12 @@ // along with this program. If not, see . // -package main +package client import ( "math/rand" "net" + "strconv" "sync/atomic" "time" @@ -65,10 +66,14 @@ func NewClient(conn net.Conn, interval time.Duration, maxClients int64) *Client } } -func (c *Client) IpAddr() string { +func (c *Client) RemoteIpAddr() string { return c.conn.RemoteAddr().(*net.TCPAddr).IP.String() } +func (c *Client) LocalPort() string { + return strconv.Itoa(c.conn.LocalAddr().(*net.TCPAddr).Port) +} + func (c *Client) Send(bannerMaxLength int64) (int, error) { if time.Now().Before(c.next) { time.Sleep(c.next.Sub(time.Now())) diff --git a/coordinates/country.go b/geoip/country.go similarity index 98% rename from coordinates/country.go rename to geoip/country.go index e09ca2a..ebe6ad0 100644 --- a/coordinates/country.go +++ b/geoip/country.go @@ -1,4 +1,4 @@ -// Copyright (C) 2023 Shizun Ge +// Copyright (C) 2023-2024 Shizun Ge // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by @@ -14,16 +14,16 @@ // along with this program. If not, see . // -package coordinates +package geoip // Map country's ISO to their capital's latitude and longitude. // Country's ISO see https://en.wikipedia.org/wiki/ISO_3166-1_alpha-2 -type Location struct { +type location struct { Latitude float64 Longitude float64 } -var Country = map[string]Location{ +var countryToLocation = map[string]location{ "AD": {42.5, 1.5}, "AE": {24.4511, 54.3969}, "AF": {34.5328, 69.1658}, diff --git a/geoip.go b/geoip/geoip.go similarity index 95% rename from geoip.go rename to geoip/geoip.go index 4f84b00..6701197 100644 --- a/geoip.go +++ b/geoip/geoip.go @@ -1,4 +1,4 @@ -// Copyright (C) 2021-2023 Shizun Ge +// Copyright (C) 2021-2024 Shizun Ge // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by @@ -14,7 +14,7 @@ // along with this program. If not, see . // -package main +package geoip import ( "encoding/json" @@ -24,8 +24,6 @@ import ( "net/http" "strings" - "endlessh-go/coordinates" - "github.com/oschwald/geoip2-golang" "github.com/pierrre/geohash" ) @@ -35,8 +33,6 @@ type GeoOption struct { MaxMindDbFileName string } -var () - func composeLocation(country string, region string, city string) string { var locations []string for _, s := range []string{country, region, city} { @@ -120,7 +116,7 @@ func geohashAndLocationFromMaxMindDb(ipAddr, maxMindDbFileName string) (string, iso := cityRecord.Country.IsoCode if latitude == 0 && longitude == 0 { // In case of using Country DB, city is not available. - loc, ok := coordinates.Country[iso] + loc, ok := countryToLocation[iso] if ok { latitude = loc.Latitude longitude = loc.Longitude @@ -138,7 +134,7 @@ func geohashAndLocationFromMaxMindDb(ipAddr, maxMindDbFileName string) (string, return gh, country, location, nil } -func geohashAndLocation(ipAddr string, option GeoOption) (string, string, string, error) { +func GeohashAndLocation(ipAddr string, option GeoOption) (string, string, string, error) { switch option.GeoipSupplier { case "off": return "s000", "Geohash off", "Geohash off", nil diff --git a/main.go b/main.go index 66eeb6b..7e66f7b 100644 --- a/main.go +++ b/main.go @@ -1,4 +1,4 @@ -// Copyright (C) 2021-2023 Shizun Ge +// Copyright (C) 2021-2024 Shizun Ge // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by @@ -17,144 +17,21 @@ package main import ( + "endlessh-go/client" + "endlessh-go/geoip" + "endlessh-go/metrics" "flag" "fmt" "net" - "net/http" "os" - "sync/atomic" + "strings" "time" "github.com/golang/glog" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" ) -var ( - numTotalClients int64 - numTotalClientsClosed int64 - numTotalBytes int64 - numTotalMilliseconds int64 - totalClients prometheus.CounterFunc - totalClientsClosed prometheus.CounterFunc - totalBytes prometheus.CounterFunc - totalSeconds prometheus.CounterFunc - clientIP *prometheus.CounterVec - clientSeconds *prometheus.CounterVec -) - -func initPrometheus(prometheusHost, prometheusPort, prometheusEntry string) { - totalClients = prometheus.NewCounterFunc( - prometheus.CounterOpts{ - Name: "endlessh_client_open_count_total", - Help: "Total number of clients that tried to connect to this host.", - }, func() float64 { - return float64(numTotalClients) - }, - ) - totalClientsClosed = prometheus.NewCounterFunc( - prometheus.CounterOpts{ - Name: "endlessh_client_closed_count_total", - Help: "Total number of clients that stopped connecting to this host.", - }, func() float64 { - return float64(numTotalClientsClosed) - }, - ) - totalBytes = prometheus.NewCounterFunc( - prometheus.CounterOpts{ - Name: "endlessh_sent_bytes_total", - Help: "Total bytes sent to clients that tried to connect to this host.", - }, func() float64 { - return float64(numTotalBytes) - }, - ) - totalSeconds = prometheus.NewCounterFunc( - prometheus.CounterOpts{ - Name: "endlessh_trapped_time_seconds_total", - Help: "Total seconds clients spent on endlessh.", - }, func() float64 { - return float64(numTotalMilliseconds) / 1000 - }, - ) - clientIP = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "endlessh_client_open_count", - Help: "Number of connections of clients.", - }, - []string{"ip", "geohash", "country", "location"}, - ) - clientSeconds = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "endlessh_client_trapped_time_seconds", - Help: "Seconds a client spends on endlessh.", - }, - []string{"ip"}, - ) - promReg := prometheus.NewRegistry() - promReg.MustRegister(totalClients) - promReg.MustRegister(totalClientsClosed) - promReg.MustRegister(totalBytes) - promReg.MustRegister(totalSeconds) - promReg.MustRegister(clientIP) - promReg.MustRegister(clientSeconds) - handler := promhttp.HandlerFor(promReg, promhttp.HandlerOpts{EnableOpenMetrics: true}) - http.Handle("/"+prometheusEntry, handler) - go func() { - glog.Infof("Starting Prometheus on %v:%v, entry point is /%v", prometheusHost, prometheusPort, prometheusEntry) - http.ListenAndServe(prometheusHost+":"+prometheusPort, nil) - }() -} - -const ( - recordTypeStart = iota - recordTypeSend = iota - recordTypeStop = iota -) - -type recordEntry struct { - RecordType int - IpAddr string - BytesSent int - MillisecondsSpent int64 -} - -func startRecording(maxClients int64, prometheusEnabled bool, geoOption GeoOption) chan recordEntry { - records := make(chan recordEntry, maxClients) - go func() { - for { - r, more := <-records - if !more { - return - } - if !prometheusEnabled { - continue - } - switch r.RecordType { - case recordTypeStart: - geohash, country, location, err := geohashAndLocation(r.IpAddr, geoOption) - if err != nil { - glog.Warningf("Failed to obatin the geohash of %v: %v.", r.IpAddr, err) - } - clientIP.With(prometheus.Labels{ - "ip": r.IpAddr, - "geohash": geohash, - "country": country, - "location": location}).Inc() - atomic.AddInt64(&numTotalClients, 1) - case recordTypeSend: - clientSeconds.With(prometheus.Labels{"ip": r.IpAddr}).Add(float64(r.MillisecondsSpent) / 1000) - atomic.AddInt64(&numTotalBytes, int64(r.BytesSent)) - atomic.AddInt64(&numTotalMilliseconds, r.MillisecondsSpent) - case recordTypeStop: - atomic.AddInt64(&numTotalClientsClosed, 1) - } - } - }() - return records -} - -func startSending(maxClients int64, bannerMaxLength int64, records chan<- recordEntry) chan *Client { - clients := make(chan *Client, maxClients) +func startSending(maxClients int64, bannerMaxLength int64, records chan<- metrics.RecordEntry) chan *client.Client { + clients := make(chan *client.Client, maxClients) go func() { for { c, more := <-clients @@ -163,20 +40,23 @@ func startSending(maxClients int64, bannerMaxLength int64, records chan<- record } go func() { bytesSent, err := c.Send(bannerMaxLength) - ipAddr := c.IpAddr() + remoteIpAddr := c.RemoteIpAddr() + localPort := c.LocalPort() if err != nil { c.Close() - records <- recordEntry{ - RecordType: recordTypeStop, - IpAddr: ipAddr, + records <- metrics.RecordEntry{ + RecordType: metrics.RecordEntryTypeStop, + IpAddr: remoteIpAddr, + LocalPort: localPort, } return } millisecondsSpent := c.MillisecondsSinceLast() clients <- c - records <- recordEntry{ - RecordType: recordTypeSend, - IpAddr: ipAddr, + records <- metrics.RecordEntry{ + RecordType: metrics.RecordEntryTypeSend, + IpAddr: remoteIpAddr, + LocalPort: localPort, BytesSent: bytesSent, MillisecondsSpent: millisecondsSpent, } @@ -186,39 +66,57 @@ func startSending(maxClients int64, bannerMaxLength int64, records chan<- record return clients } -func startAccepting(maxClients int64, connType, connHost, connPort string, interval time.Duration, clients chan<- *Client, records chan<- recordEntry) { - l, err := net.Listen(connType, connHost+":"+connPort) - if err != nil { - glog.Errorf("Error listening: %v", err) - os.Exit(1) - } - // Close the listener when the application closes. - defer l.Close() - glog.Infof("Listening on %v:%v", connHost, connPort) - for { - // Listen for an incoming connection. - conn, err := l.Accept() +func startAccepting(maxClients int64, connType, connHost, connPort string, interval time.Duration, clients chan<- *client.Client, records chan<- metrics.RecordEntry) { + go func() { + l, err := net.Listen(connType, connHost+":"+connPort) if err != nil { - glog.Errorf("Error accepting connection from port %v: %v", connPort, err) + glog.Errorf("Error listening: %v", err) os.Exit(1) } - c := NewClient(conn, interval, maxClients) - ipAddr := c.IpAddr() - records <- recordEntry{ - RecordType: recordTypeStart, - IpAddr: ipAddr, + // Close the listener when the application closes. + defer l.Close() + glog.Infof("Listening on %v:%v", connHost, connPort) + for { + // Listen for an incoming connection. + conn, err := l.Accept() + if err != nil { + glog.Errorf("Error accepting connection from port %v: %v", connPort, err) + os.Exit(1) + } + c := client.NewClient(conn, interval, maxClients) + remoteIpAddr := c.RemoteIpAddr() + records <- metrics.RecordEntry{ + RecordType: metrics.RecordEntryTypeStart, + IpAddr: remoteIpAddr, + LocalPort: connPort, + } + clients <- c } - clients <- c - } + }() } +type arrayStrings []string + +func (a *arrayStrings) String() string { + return strings.Join(*a, ", ") +} + +func (a *arrayStrings) Set(value string) error { + *a = append(*a, value) + return nil +} + +const defaultPort = "2222" + +var connPorts arrayStrings + func main() { intervalMs := flag.Int("interval_ms", 1000, "Message millisecond delay") bannerMaxLength := flag.Int64("line_length", 32, "Maximum banner line length") maxClients := flag.Int64("max_clients", 4096, "Maximum number of clients") connType := flag.String("conn_type", "tcp", "Connection type. Possible values are tcp, tcp4, tcp6") connHost := flag.String("host", "0.0.0.0", "SSH listening address") - connPort := flag.String("port", "2222", "SSH listening port") + flag.Var(&connPorts, "port", "SSH listening port") prometheusEnabled := flag.Bool("enable_prometheus", false, "Enable prometheus") prometheusHost := flag.String("prometheus_host", "0.0.0.0", "The address for prometheus") prometheusPort := flag.String("prometheus_port", "2112", "The port for prometheus") @@ -236,10 +134,10 @@ func main() { if *connType == "tcp6" && *prometheusHost == "0.0.0.0" { *prometheusHost = "[::]" } - initPrometheus(*prometheusHost, *prometheusPort, *prometheusEntry) + metrics.InitPrometheus(*prometheusHost, *prometheusPort, *prometheusEntry) } - records := startRecording(*maxClients, *prometheusEnabled, GeoOption{ + records := metrics.StartRecording(*maxClients, *prometheusEnabled, geoip.GeoOption{ GeoipSupplier: *geoipSupplier, MaxMindDbFileName: *maxMindDbFileName, }) @@ -250,7 +148,12 @@ func main() { if *connType == "tcp6" && *connHost == "0.0.0.0" { *connHost = "[::]" } - go startAccepting(*maxClients, *connType, *connHost, *connPort, interval, clients, records) + if len(connPorts) == 0 { + connPorts = append(connPorts, defaultPort) + } + for _, connPort := range connPorts { + startAccepting(*maxClients, *connType, *connHost, connPort, interval, clients, records) + } for { time.Sleep(time.Duration(1<<63 - 1)) } diff --git a/metrics/metrics.go b/metrics/metrics.go new file mode 100644 index 0000000..6097953 --- /dev/null +++ b/metrics/metrics.go @@ -0,0 +1,154 @@ +// Copyright (C) 2024 Shizun Ge +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . +// + +package metrics + +import ( + "endlessh-go/geoip" + "net/http" + "sync/atomic" + + "github.com/golang/glog" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +var ( + numTotalClients int64 + numTotalClientsClosed int64 + numTotalBytes int64 + numTotalMilliseconds int64 + totalClients prometheus.CounterFunc + totalClientsClosed prometheus.CounterFunc + totalBytes prometheus.CounterFunc + totalSeconds prometheus.CounterFunc + clientIP *prometheus.CounterVec + clientSeconds *prometheus.CounterVec +) + +func InitPrometheus(prometheusHost, prometheusPort, prometheusEntry string) { + totalClients = prometheus.NewCounterFunc( + prometheus.CounterOpts{ + Name: "endlessh_client_open_count_total", + Help: "Total number of clients that tried to connect to this host.", + }, func() float64 { + return float64(numTotalClients) + }, + ) + totalClientsClosed = prometheus.NewCounterFunc( + prometheus.CounterOpts{ + Name: "endlessh_client_closed_count_total", + Help: "Total number of clients that stopped connecting to this host.", + }, func() float64 { + return float64(numTotalClientsClosed) + }, + ) + totalBytes = prometheus.NewCounterFunc( + prometheus.CounterOpts{ + Name: "endlessh_sent_bytes_total", + Help: "Total bytes sent to clients that tried to connect to this host.", + }, func() float64 { + return float64(numTotalBytes) + }, + ) + totalSeconds = prometheus.NewCounterFunc( + prometheus.CounterOpts{ + Name: "endlessh_trapped_time_seconds_total", + Help: "Total seconds clients spent on endlessh.", + }, func() float64 { + return float64(numTotalMilliseconds) / 1000 + }, + ) + clientIP = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "endlessh_client_open_count", + Help: "Number of connections of clients.", + }, + []string{"ip", "local_port", "geohash", "country", "location"}, + ) + clientSeconds = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "endlessh_client_trapped_time_seconds", + Help: "Seconds a client spends on endlessh.", + }, + []string{"ip", "local_port"}, + ) + promReg := prometheus.NewRegistry() + promReg.MustRegister(totalClients) + promReg.MustRegister(totalClientsClosed) + promReg.MustRegister(totalBytes) + promReg.MustRegister(totalSeconds) + promReg.MustRegister(clientIP) + promReg.MustRegister(clientSeconds) + handler := promhttp.HandlerFor(promReg, promhttp.HandlerOpts{EnableOpenMetrics: true}) + http.Handle("/"+prometheusEntry, handler) + go func() { + glog.Infof("Starting Prometheus on %v:%v, entry point is /%v", prometheusHost, prometheusPort, prometheusEntry) + http.ListenAndServe(prometheusHost+":"+prometheusPort, nil) + }() +} + +const ( + RecordEntryTypeStart = iota + RecordEntryTypeSend = iota + RecordEntryTypeStop = iota +) + +type RecordEntry struct { + RecordType int + IpAddr string + LocalPort string + BytesSent int + MillisecondsSpent int64 +} + +func StartRecording(maxClients int64, prometheusEnabled bool, geoOption geoip.GeoOption) chan RecordEntry { + records := make(chan RecordEntry, maxClients) + go func() { + for { + r, more := <-records + if !more { + return + } + if !prometheusEnabled { + continue + } + switch r.RecordType { + case RecordEntryTypeStart: + geohash, country, location, err := geoip.GeohashAndLocation(r.IpAddr, geoOption) + if err != nil { + glog.Warningf("Failed to obatin the geohash of %v: %v.", r.IpAddr, err) + } + clientIP.With(prometheus.Labels{ + "ip": r.IpAddr, + "local_port": r.LocalPort, + "geohash": geohash, + "country": country, + "location": location}).Inc() + atomic.AddInt64(&numTotalClients, 1) + case RecordEntryTypeSend: + clientSeconds.With(prometheus.Labels{ + "ip": r.IpAddr, + "local_port": r.LocalPort}).Add(float64(r.MillisecondsSpent) / 1000) + atomic.AddInt64(&numTotalBytes, int64(r.BytesSent)) + atomic.AddInt64(&numTotalMilliseconds, r.MillisecondsSpent) + case RecordEntryTypeStop: + atomic.AddInt64(&numTotalClientsClosed, 1) + } + } + }() + return records +}