refactor: move prometheus outside client class

This commit is contained in:
Shizun Ge
2024-01-16 20:16:27 -08:00
parent 2c0ee146ee
commit df4cd39c57
6 changed files with 200 additions and 121 deletions

1
.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
endlessh-go

100
client.go
View File

@@ -23,10 +23,12 @@ import (
"time"
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
)
var letterBytes = []byte(" abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234567890!@#$%^&*()-=_+[]{}|;:',./<>?")
var (
numCurrentClients int64
letterBytes = []byte(" abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234567890!@#$%^&*()-=_+[]{}|;:',./<>?")
)
func randStringBytes(n int64) []byte {
b := make([]byte, n+1)
@@ -37,75 +39,59 @@ func randStringBytes(n int64) []byte {
return b
}
type client struct {
conn net.Conn
last time.Time
next time.Time
start time.Time
interval time.Duration
geoipSupplier string
geohash string
country string
location string
bytesSent int
prometheusEnabled bool
type Client struct {
conn net.Conn
next time.Time
start time.Time
last time.Time
interval time.Duration
bytesSent int
}
func NewClient(conn net.Conn, interval time.Duration, maxClient int64, geoipSupplier string, prometheusEnabled bool) *client {
addr := conn.RemoteAddr().(*net.TCPAddr)
func NewClient(conn net.Conn, interval time.Duration, maxClients int64) *Client {
for numCurrentClients >= maxClients {
time.Sleep(interval)
}
atomic.AddInt64(&numCurrentClients, 1)
atomic.AddInt64(&numTotalClients, 1)
geohash, country, location, err := geohashAndLocation(addr.IP.String(), geoipSupplier)
if err != nil {
glog.Warningf("Failed to obatin the geohash of %v: %v.", addr.IP, err)
}
if prometheusEnabled {
clientIP.With(prometheus.Labels{
"ip": addr.IP.String(),
"geohash": geohash,
"country": country,
"location": location}).Inc()
}
glog.V(1).Infof("ACCEPT host=%v port=%v n=%v/%v\n", addr.IP, addr.Port, numCurrentClients, maxClient)
return &client{
conn: conn,
last: time.Now(),
next: time.Now().Add(interval),
start: time.Now(),
interval: interval,
geohash: geohash,
country: country,
location: location,
bytesSent: 0,
prometheusEnabled: prometheusEnabled,
addr := conn.RemoteAddr().(*net.TCPAddr)
glog.V(1).Infof("ACCEPT host=%v port=%v n=%v/%v\n", addr.IP, addr.Port, numCurrentClients, maxClients)
return &Client{
conn: conn,
next: time.Now().Add(interval),
start: time.Now(),
last: time.Now(),
interval: interval,
bytesSent: 0,
}
}
func (c *client) Send(bannerMaxLength int64) error {
defer func(c *client) {
addr := c.conn.RemoteAddr().(*net.TCPAddr)
millisecondsSpent := time.Now().Sub(c.last).Milliseconds()
c.last = time.Now()
c.next = time.Now().Add(c.interval)
atomic.AddInt64(&numTotalMilliseconds, millisecondsSpent)
if c.prometheusEnabled {
clientSeconds.With(prometheus.Labels{"ip": addr.IP.String()}).Add(float64(millisecondsSpent) / 1000)
}
}(c)
func (c *Client) IpAddr() string {
return c.conn.RemoteAddr().(*net.TCPAddr).IP.String()
}
func (c *Client) Send(bannerMaxLength int64) (int, error) {
if time.Now().Before(c.next) {
time.Sleep(c.next.Sub(time.Now()))
}
c.next = time.Now().Add(c.interval)
length := rand.Int63n(bannerMaxLength)
bytesSent, err := c.conn.Write(randStringBytes(length))
if err != nil {
return err
return 0, err
}
c.bytesSent += bytesSent
atomic.AddInt64(&numTotalBytes, int64(bytesSent))
return nil
return bytesSent, nil
}
func (c *client) Close() {
func (c *Client) MillisecondsSinceLast() int64 {
millisecondsSpent := time.Now().Sub(c.last).Milliseconds()
c.last = time.Now()
return millisecondsSpent
}
func (c *Client) Close() {
addr := c.conn.RemoteAddr().(*net.TCPAddr)
atomic.AddInt64(&numCurrentClients, -1)
atomic.AddInt64(&numTotalClientsClosed, 1)
glog.V(1).Infof("CLOSE host=%v port=%v time=%v bytes=%v\n", addr.IP, addr.Port, time.Now().Sub(c.start).Seconds(), c.bytesSent)
c.conn.Close()
atomic.AddInt64(&numCurrentClients, -1)
}

View File

@@ -30,9 +30,12 @@ import (
"github.com/pierrre/geohash"
)
var (
maxMindDbFileName *string
)
type GeoOption struct {
GeoipSupplier string
MaxMindDbFileName string
}
var ()
func composeLocation(country string, region string, city string) string {
var locations []string
@@ -69,9 +72,9 @@ type ipapi struct {
Longitude float64 `json:"lon"`
}
func geohashAndLocationFromIpapi(address string) (string, string, string, error) {
func geohashAndLocationFromIpapi(ipAddr string) (string, string, string, error) {
var geo ipapi
response, err := http.Get("http://ip-api.com/json/" + address)
response, err := http.Get("http://ip-api.com/json/" + ipAddr)
if err != nil {
return "s000", "Unknown", "Unknown", err
}
@@ -88,7 +91,7 @@ func geohashAndLocationFromIpapi(address string) (string, string, string, error)
}
if geo.Status != "success" {
return "s000", "Unknown", "Unknown", fmt.Errorf("failed to query %v via ip-api: status: %v, message: %v", address, geo.Status, geo.Message)
return "s000", "Unknown", "Unknown", fmt.Errorf("failed to query %v via ip-api: status: %v, message: %v", ipAddr, geo.Status, geo.Message)
}
gh := geohash.EncodeAuto(geo.Latitude, geo.Longitude)
@@ -98,14 +101,14 @@ func geohashAndLocationFromIpapi(address string) (string, string, string, error)
return gh, country, location, nil
}
func geohashAndLocationFromMaxMindDb(address string) (string, string, string, error) {
db, err := geoip2.Open(*maxMindDbFileName)
func geohashAndLocationFromMaxMindDb(ipAddr, maxMindDbFileName string) (string, string, string, error) {
db, err := geoip2.Open(maxMindDbFileName)
if err != nil {
return "s000", "Unknown", "Unknown", err
}
defer db.Close()
// If you are using strings that may be invalid, check that ip is not nil
ip := net.ParseIP(address)
ip := net.ParseIP(ipAddr)
cityRecord, err := db.City(ip)
if err != nil {
return "s000", "Unknown", "Unknown", err
@@ -135,15 +138,15 @@ func geohashAndLocationFromMaxMindDb(address string) (string, string, string, er
return gh, country, location, nil
}
func geohashAndLocation(address string, geoipSupplier string) (string, string, string, error) {
switch geoipSupplier {
func geohashAndLocation(ipAddr string, option GeoOption) (string, string, string, error) {
switch option.GeoipSupplier {
case "off":
return "s000", "Geohash off", "Geohash off", nil
case "ip-api":
return geohashAndLocationFromIpapi(address)
return geohashAndLocationFromIpapi(ipAddr)
case "max-mind-db":
return geohashAndLocationFromMaxMindDb(address)
return geohashAndLocationFromMaxMindDb(ipAddr, option.MaxMindDbFileName)
default:
return "s000", "Unknown", "Unknown", fmt.Errorf("unknown geoipSupplier %v.", geoipSupplier)
return "s000", "Unknown", "Unknown", fmt.Errorf("unknown geoipSupplier %v.", option.GeoipSupplier)
}
}

4
go.mod
View File

@@ -1,6 +1,8 @@
module endlessh-go
go 1.20
go 1.21.0
toolchain go1.21.4
require (
github.com/golang/glog v1.2.0

15
go.sum
View File

@@ -1,31 +1,43 @@
github.com/Codefor/geohash v0.0.0-20140723084247-1b41c28e3a9d h1:iG9B49Q218F/XxXNRM7k/vWf7MKmLIS8AcJV9cGN4nA=
github.com/Codefor/geohash v0.0.0-20140723084247-1b41c28e3a9d/go.mod h1:RVnhzAX71far8Kc3TQeA0k/dcaEKUnTDSOyet/JCmGI=
github.com/TomiHiltunen/geohash-golang v0.0.0-20150112065804-b3e4e625abfb h1:wumPkzt4zaxO4rHPBrjDK8iZMR41C1qs7njNqlacwQg=
github.com/TomiHiltunen/geohash-golang v0.0.0-20150112065804-b3e4e625abfb/go.mod h1:QiYsIBRQEO+Z4Rz7GoI+dsHVneZNONvhczuA+llOZNM=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/broady/gogeohash v0.0.0-20120525094510-7b2c40d64042 h1:iEdmkrNMLXbM7ecffOAtZJQOQUTE4iMonxrb5opUgE4=
github.com/broady/gogeohash v0.0.0-20120525094510-7b2c40d64042/go.mod h1:f1L9YvXvlt9JTa+A17trQjSMM6bV40f+tHjB+Pi+Fqk=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fanixk/geohash v0.0.0-20150324002647-c1f9b5fa157a h1:Fyfh/dsHFrC6nkX7H7+nFdTd1wROlX/FxEIWVpKYf1U=
github.com/fanixk/geohash v0.0.0-20150324002647-c1f9b5fa157a/go.mod h1:UgNw+PTmmGN8rV7RvjvnBMsoTU8ZXXnaT3hYsDTBlgQ=
github.com/golang/glog v1.2.0 h1:uCdmnmatrKCgMBlM4rMuJZWOkPDqdbZPnrMXDY4gI68=
github.com/golang/glog v1.2.0/go.mod h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg=
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k=
github.com/mmcloughlin/geohash v0.10.0 h1:9w1HchfDfdeLc+jFEf/04D27KP7E2QmpDu52wPbJWRE=
github.com/mmcloughlin/geohash v0.10.0/go.mod h1:oNZxQo5yWJh0eMQEP/8hwQuVx9Z9tjwFUqcTB1SmG0c=
github.com/oschwald/geoip2-golang v1.9.0 h1:uvD3O6fXAXs+usU+UGExshpdP13GAqp4GBrzN7IgKZc=
github.com/oschwald/geoip2-golang v1.9.0/go.mod h1:BHK6TvDyATVQhKNbQBdrj9eAvuwOMi2zSFXizL3K81Y=
github.com/oschwald/maxminddb-golang v1.11.0 h1:aSXMqYR/EPNjGE8epgqwDay+P30hCBZIveY0WZbAWh0=
github.com/oschwald/maxminddb-golang v1.11.0/go.mod h1:YmVI+H0zh3ySFR3w+oz8PCfglAFj3PuCmui13+P9zDg=
github.com/pierrre/assert v0.3.2 h1:wXdlkVN5FVSLEKl6pGijcCYkldgfjRgyheU3C1/by9Q=
github.com/pierrre/assert v0.3.2/go.mod h1:zwOn9QE9/+eSgqR/4iCS9K9dUpkqyl0iih2APCI5d8E=
github.com/pierrre/compare v1.4.2 h1:oabIiWclzAlXG7S/2MYSFDJ/vR34oa/MYrBZh5PNU80=
github.com/pierrre/compare v1.4.2/go.mod h1:EBDtLjy0XYVBEFP4T3pWljpfTwL7X8DqPt9RIP1+svY=
github.com/pierrre/geohash v1.1.1 h1:XCkvOyv/uesenMPhkvsCfIiUalBmGdHlFY0bIWTqb+s=
github.com/pierrre/geohash v1.1.1/go.mod h1:ucAm7cbgGBoVr6cr1t+d3ea5XQ9P5zKHXfS1Qy2iKVY=
github.com/pierrre/go-libs v0.2.14 h1:wAPoOrslKLnha6ow5EKkxxZpo76kOea57efs71A/ZnQ=
github.com/pierrre/go-libs v0.2.14/go.mod h1:eA3pQD5LHZmavOpTpUfO8FszduBNHoFXDWrevDR6Dy8=
github.com/pierrre/pretty v0.0.10 h1:Cb5som+1EpU+x7UA5AMy9I8AY2XkzMBywkLEAdo1JDg=
github.com/pierrre/pretty v0.0.10/go.mod h1:F+Z4XV4T5GIvbr/swCAkuQ2ng81qMaQT9CfI8rKOLdY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v1.18.0 h1:HzFfmkOzH5Q8L8G+kSJKUx5dtG87sewO+FoDDqP5Tbk=
github.com/prometheus/client_golang v1.18.0/go.mod h1:T+GXkCk5wSJyOqMIzVgvvjFDlkOQntgjkJWKrN5txjA=
github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw=
@@ -35,7 +47,9 @@ github.com/prometheus/common v0.45.0/go.mod h1:YJmSTw9BoKxJplESWWxlbyttQR4uaEcGy
github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo=
github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/the42/cartconvert v1.0.0 h1:g8kt6ic2GEhdcZ61ZP9GsWwhosVo5nCnH1n2/oAQXUU=
github.com/the42/cartconvert v1.0.0/go.mod h1:fWO/msnJVhHqN1yX6OBoxSyfj7TEj1hHiL8bJSQsK30=
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
@@ -43,3 +57,4 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

170
main.go
View File

@@ -19,10 +19,10 @@ package main
import (
"flag"
"fmt"
"math/rand"
"net"
"net/http"
"os"
"sync/atomic"
"time"
"github.com/golang/glog"
@@ -31,7 +31,6 @@ import (
)
var (
numCurrentClients int64
numTotalClients int64
numTotalClientsClosed int64
numTotalBytes int64
@@ -106,6 +105,113 @@ func initPrometheus(prometheusHost, prometheusPort, prometheusEntry string) {
}()
}
const (
recordTypeStart = iota
recordTypeSend = iota
recordTypeStop = iota
)
type recordEntry struct {
RecordType int
IpAddr string
BytesSent int
MillisecondsSpent int64
}
func startRecording(maxClients int64, prometheusEnabled bool, geoOption GeoOption) chan recordEntry {
records := make(chan recordEntry, maxClients)
go func() {
for {
r, more := <-records
if !more {
return
}
if !prometheusEnabled {
continue
}
switch r.RecordType {
case recordTypeStart:
geohash, country, location, err := geohashAndLocation(r.IpAddr, geoOption)
if err != nil {
glog.Warningf("Failed to obatin the geohash of %v: %v.", r.IpAddr, err)
}
clientIP.With(prometheus.Labels{
"ip": r.IpAddr,
"geohash": geohash,
"country": country,
"location": location}).Inc()
atomic.AddInt64(&numTotalClients, 1)
case recordTypeSend:
clientSeconds.With(prometheus.Labels{"ip": r.IpAddr}).Add(float64(r.MillisecondsSpent) / 1000)
atomic.AddInt64(&numTotalBytes, int64(r.BytesSent))
atomic.AddInt64(&numTotalMilliseconds, r.MillisecondsSpent)
case recordTypeStop:
atomic.AddInt64(&numTotalClientsClosed, 1)
}
}
}()
return records
}
func startSending(maxClients int64, bannerMaxLength int64, records chan<- recordEntry) chan *Client {
clients := make(chan *Client, maxClients)
go func() {
for {
c, more := <-clients
if !more {
return
}
go func() {
bytesSent, err := c.Send(bannerMaxLength)
ipAddr := c.IpAddr()
if err != nil {
c.Close()
records <- recordEntry{
RecordType: recordTypeStop,
IpAddr: ipAddr,
}
return
}
millisecondsSpent := c.MillisecondsSinceLast()
clients <- c
records <- recordEntry{
RecordType: recordTypeSend,
IpAddr: ipAddr,
BytesSent: bytesSent,
MillisecondsSpent: millisecondsSpent,
}
}()
}
}()
return clients
}
func startAccepting(maxClients int64, connType, connHost, connPort string, interval time.Duration, clients chan<- *Client, records chan<- recordEntry) {
l, err := net.Listen(connType, connHost+":"+connPort)
if err != nil {
glog.Errorf("Error listening: %v", err)
os.Exit(1)
}
// Close the listener when the application closes.
defer l.Close()
glog.Infof("Listening on %v:%v", connHost, connPort)
for {
// Listen for an incoming connection.
conn, err := l.Accept()
if err != nil {
glog.Errorf("Error accepting connection from port %v: %v", connPort, err)
os.Exit(1)
}
c := NewClient(conn, interval, maxClients)
ipAddr := c.IpAddr()
records <- recordEntry{
RecordType: recordTypeStart,
IpAddr: ipAddr,
}
clients <- c
}
}
func main() {
intervalMs := flag.Int("interval_ms", 1000, "Message millisecond delay")
bannerMaxLength := flag.Int64("line_length", 32, "Maximum banner line length")
@@ -118,7 +224,7 @@ func main() {
prometheusPort := flag.String("prometheus_port", "2112", "The port for prometheus")
prometheusEntry := flag.String("prometheus_entry", "metrics", "Entry point for prometheus")
geoipSupplier := flag.String("geoip_supplier", "off", "Supplier to obtain Geohash of IPs. Possible values are \"off\", \"ip-api\", \"max-mind-db\"")
maxMindDbFileName = flag.String("max_mind_db", "", "Path to the MaxMind DB file.")
maxMindDbFileName := flag.String("max_mind_db", "", "Path to the MaxMind DB file.")
flag.Usage = func() {
fmt.Fprintf(flag.CommandLine.Output(), "Usage of %v \n", os.Args[0])
@@ -126,60 +232,26 @@ func main() {
}
flag.Parse()
if *connType == "tcp6" && *prometheusHost == "0.0.0.0" {
*prometheusHost = "[::]"
}
if *prometheusEnabled {
if *connType == "tcp6" && *prometheusHost == "0.0.0.0" {
*prometheusHost = "[::]"
}
initPrometheus(*prometheusHost, *prometheusPort, *prometheusEntry)
}
rand.Seed(time.Now().UnixNano())
records := startRecording(*maxClients, *prometheusEnabled, GeoOption{
GeoipSupplier: *geoipSupplier,
MaxMindDbFileName: *maxMindDbFileName,
})
clients := startSending(*maxClients, *bannerMaxLength, records)
interval := time.Duration(*intervalMs) * time.Millisecond
// Listen for incoming connections.
if *connType == "tcp6" && *connHost == "0.0.0.0" {
*connHost = "[::]"
}
l, err := net.Listen(*connType, *connHost+":"+*connPort)
if err != nil {
glog.Errorf("Error listening: %v", err)
os.Exit(1)
go startAccepting(*maxClients, *connType, *connHost, *connPort, interval, clients, records)
for {
time.Sleep(time.Duration(1<<63 - 1))
}
// Close the listener when the application closes.
defer l.Close()
glog.Infof("Listening on %v:%v", *connHost, *connPort)
clients := make(chan *client, *maxClients)
go func() {
for {
c, more := <-clients
if !more {
return
}
if time.Now().Before(c.next) {
time.Sleep(c.next.Sub(time.Now()))
}
err := c.Send(*bannerMaxLength)
if err != nil {
c.Close()
continue
}
go func() { clients <- c }()
}
}()
listener := func() {
for {
// Listen for an incoming connection.
conn, err := l.Accept()
if err != nil {
glog.Errorf("Error accepting: %v", err)
os.Exit(1)
}
// Handle connections in a new goroutine.
for numCurrentClients >= *maxClients {
time.Sleep(interval)
}
clients <- NewClient(conn, interval, *maxClients, *geoipSupplier, *prometheusEnabled)
}
}
listener()
}