mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
* add jaeger support, link hot container & req span * adds jaeger support now with FN_JAEGER_URL, there's a simple tutorial in the operating/metrics.md file now and it's pretty easy to get up and running. * links a hot request span to a hot container span. when we change this to sample at a lower ratio we'll need to finagle the hot container span to always sample or something, otherwise we'll hide that info. at least, since we're sampling at 100% for now if this is flipped on, can see freeze/unfreeze etc. if they hit. this is useful for debugging. note that zipkin's exporter does not follow the link at all, hence jaeger... and they're backed by the Cloud Empire now (CNCF) so we'll probably use it anyway. * vendor: add thrift for jaeger
369 lines
9.6 KiB
Go
369 lines
9.6 KiB
Go
// Copyright 2013 The Go Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style
|
|
// license that can be found in the LICENSE file.
|
|
|
|
package main
|
|
|
|
import (
|
|
"container/list"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"log"
|
|
"math"
|
|
"math/rand"
|
|
"net/http"
|
|
"os"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
bigquery "google.golang.org/api/bigquery/v2"
|
|
storage "google.golang.org/api/storage/v1"
|
|
)
|
|
|
|
const (
|
|
GB = 1 << 30
|
|
MaxBackoff = 30000
|
|
BaseBackoff = 250
|
|
BackoffGrowthFactor = 1.8
|
|
BackoffGrowthDamper = 0.25
|
|
JobStatusDone = "DONE"
|
|
DatasetAlreadyExists = "Already Exists: Dataset"
|
|
TableWriteEmptyDisposition = "WRITE_EMPTY"
|
|
)
|
|
|
|
func init() {
|
|
scope := fmt.Sprintf("%s %s %s", bigquery.BigqueryScope,
|
|
storage.DevstorageReadOnlyScope,
|
|
"https://www.googleapis.com/auth/userinfo.profile")
|
|
registerDemo("bigquery", scope, bqMain)
|
|
}
|
|
|
|
// This example demonstrates loading objects from Google Cloud Storage into
|
|
// BigQuery. Objects are specified by their bucket and a name prefix. Each
|
|
// object will be loaded into a new table identified by the object name minus
|
|
// any file extension. All tables are added to the specified dataset (one will
|
|
// be created if necessary). Currently, tables will not be overwritten and an
|
|
// attempt to load an object into a dataset that already contains its table
|
|
// will emit an error message indicating the table already exists.
|
|
// A schema file must be provided and it will be applied to every object/table.
|
|
// Example usage:
|
|
// go-api-demo -clientid="my-clientid" -secret="my-secret" bq myProject
|
|
// myDataBucket datafile2013070 DataFiles2013
|
|
// ./datafile_schema.json 100
|
|
//
|
|
// This will load all objects (e.g. all data files from July 2013) from
|
|
// gs://myDataBucket into a (possibly new) BigQuery dataset named DataFiles2013
|
|
// using the schema file provided and allowing up to 100 bad records. Assuming
|
|
// each object is named like datafileYYYYMMDD.csv.gz and all of July's files are
|
|
// stored in the bucket, 9 tables will be created named like datafile201307DD
|
|
// where DD ranges from 01 to 09, inclusive.
|
|
// When the program completes, it will emit a results line similar to:
|
|
//
|
|
// 9 files loaded in 3m58s (18m2.708s). Size: 7.18GB Rows: 7130725
|
|
//
|
|
// The total elapsed time from the start of first job to the end of the last job
|
|
// (effectively wall clock time) is shown. In parenthesis is the aggregate time
|
|
// taken to load all tables.
|
|
func bqMain(client *http.Client, argv []string) {
|
|
if len(argv) != 6 {
|
|
fmt.Fprintln(os.Stderr,
|
|
"Usage: bq project_id bucket prefix dataset schema max_bad_records")
|
|
return
|
|
}
|
|
|
|
var (
|
|
project = argv[0]
|
|
bucket = argv[1]
|
|
objPrefix = argv[2]
|
|
datasetId = argv[3]
|
|
schemaFile = argv[4]
|
|
)
|
|
badRecords, err := strconv.ParseInt(argv[5], 10, 64)
|
|
if err != nil {
|
|
fmt.Fprintln(os.Stderr, err)
|
|
return
|
|
}
|
|
|
|
rand.Seed(time.Now().UnixNano())
|
|
|
|
service, err := storage.New(client)
|
|
if err != nil {
|
|
log.Fatalf("Unable to create Storage service: %v", err)
|
|
}
|
|
|
|
// Get the list of objects in the bucket matching the specified prefix.
|
|
list := service.Objects.List(bucket)
|
|
list.Prefix(objPrefix)
|
|
objects, err := list.Do()
|
|
if err != nil {
|
|
fmt.Fprintln(os.Stderr, err)
|
|
return
|
|
}
|
|
|
|
// Create the wrapper and insert the (new) dataset.
|
|
dataset, err := newBQDataset(client, project, datasetId)
|
|
if err != nil {
|
|
fmt.Fprintln(os.Stderr, err)
|
|
return
|
|
}
|
|
if err = dataset.insert(true); err != nil {
|
|
fmt.Fprintln(os.Stderr, err)
|
|
return
|
|
}
|
|
|
|
objectSource := &tableSource{
|
|
maxBadRecords: badRecords,
|
|
disposition: TableWriteEmptyDisposition,
|
|
}
|
|
|
|
// Load the schema from disk.
|
|
f, err := ioutil.ReadFile(schemaFile)
|
|
if err != nil {
|
|
fmt.Fprintln(os.Stderr, err)
|
|
return
|
|
}
|
|
if err = json.Unmarshal(f, &objectSource.schema); err != nil {
|
|
fmt.Fprintln(os.Stderr, err)
|
|
return
|
|
}
|
|
|
|
// Assumes all objects have .csv, .csv.gz (or no) extension.
|
|
tableIdFromObject := func(name string) string {
|
|
return strings.TrimSuffix(strings.TrimSuffix(name, ".gz"), ".csv")
|
|
}
|
|
|
|
// A jobset is way to group a collection of jobs together for monitoring.
|
|
// For this example, we just use the name of the bucket and object prefix.
|
|
jobset := fmt.Sprintf("%s:%s", bucket, objPrefix)
|
|
fmt.Fprintf(os.Stderr, "\nLoading %d objects.\n", len(objects.Items))
|
|
|
|
// Load each object into a dataset of the same name (minus any extension).
|
|
// A successful insert call will inject the job into our queue for monitoring.
|
|
for _, o := range objects.Items {
|
|
objectSource.id = tableIdFromObject(o.Name)
|
|
objectSource.uri = fmt.Sprintf("gs://%s/%s", o.Bucket, o.Name)
|
|
if err = dataset.load(jobset, objectSource); err != nil {
|
|
fmt.Fprintln(os.Stderr, err)
|
|
}
|
|
}
|
|
|
|
dataset.monitor(jobset)
|
|
}
|
|
|
|
// Wraps the BigQuery service and dataset and provides some helper functions.
|
|
type bqDataset struct {
|
|
project string
|
|
id string
|
|
bq *bigquery.Service
|
|
dataset *bigquery.Dataset
|
|
jobsets map[string]*list.List
|
|
}
|
|
|
|
func newBQDataset(client *http.Client, dsProj string, dsId string) (*bqDataset,
|
|
error) {
|
|
|
|
service, err := bigquery.New(client)
|
|
if err != nil {
|
|
log.Fatalf("Unable to create BigQuery service: %v", err)
|
|
}
|
|
|
|
return &bqDataset{
|
|
project: dsProj,
|
|
id: dsId,
|
|
bq: service,
|
|
dataset: &bigquery.Dataset{
|
|
DatasetReference: &bigquery.DatasetReference{
|
|
DatasetId: dsId,
|
|
ProjectId: dsProj,
|
|
},
|
|
},
|
|
jobsets: make(map[string]*list.List),
|
|
}, nil
|
|
}
|
|
|
|
func (ds *bqDataset) insert(existsOK bool) error {
|
|
call := ds.bq.Datasets.Insert(ds.project, ds.dataset)
|
|
_, err := call.Do()
|
|
if err != nil && (!existsOK || !strings.Contains(err.Error(),
|
|
DatasetAlreadyExists)) {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
type tableSource struct {
|
|
id string
|
|
uri string
|
|
schema bigquery.TableSchema
|
|
maxBadRecords int64
|
|
disposition string
|
|
}
|
|
|
|
func (ds *bqDataset) load(jobset string, source *tableSource) error {
|
|
job := &bigquery.Job{
|
|
Configuration: &bigquery.JobConfiguration{
|
|
Load: &bigquery.JobConfigurationLoad{
|
|
DestinationTable: &bigquery.TableReference{
|
|
DatasetId: ds.dataset.DatasetReference.DatasetId,
|
|
ProjectId: ds.project,
|
|
TableId: source.id,
|
|
},
|
|
MaxBadRecords: source.maxBadRecords,
|
|
Schema: &source.schema,
|
|
SourceUris: []string{source.uri},
|
|
WriteDisposition: source.disposition,
|
|
},
|
|
},
|
|
}
|
|
|
|
call := ds.bq.Jobs.Insert(ds.project, job)
|
|
job, err := call.Do()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, ok := ds.jobsets[jobset]
|
|
if !ok {
|
|
ds.jobsets[jobset] = list.New()
|
|
}
|
|
ds.jobsets[jobset].PushBack(job)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (ds *bqDataset) getJob(id string) (*bigquery.Job, error) {
|
|
return ds.bq.Jobs.Get(ds.project, id).Do()
|
|
}
|
|
|
|
func (ds *bqDataset) monitor(jobset string) {
|
|
jobq, ok := ds.jobsets[jobset]
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
var backoff float64 = BaseBackoff
|
|
pause := func(grow bool) {
|
|
if grow {
|
|
backoff *= BackoffGrowthFactor
|
|
backoff -= (backoff * rand.Float64() * BackoffGrowthDamper)
|
|
backoff = math.Min(backoff, MaxBackoff)
|
|
fmt.Fprintf(os.Stderr, "[%s] Checking remaining %d jobs...\n", jobset,
|
|
1+jobq.Len())
|
|
}
|
|
time.Sleep(time.Duration(backoff) * time.Millisecond)
|
|
}
|
|
var stats jobStats
|
|
|
|
// Track a 'head' pending job in queue for detecting cycling.
|
|
head := ""
|
|
// Loop until all jobs are done - with either success or error.
|
|
for jobq.Len() > 0 {
|
|
jel := jobq.Front()
|
|
job := jel.Value.(*bigquery.Job)
|
|
jobq.Remove(jel)
|
|
jid := job.JobReference.JobId
|
|
loop := false
|
|
|
|
// Check and possibly pick a new head job id.
|
|
if len(head) == 0 {
|
|
head = jid
|
|
} else {
|
|
if jid == head {
|
|
loop = true
|
|
}
|
|
}
|
|
|
|
// Retrieve the job's current status.
|
|
pause(loop)
|
|
j, err := ds.getJob(jid)
|
|
if err != nil {
|
|
fmt.Fprintln(os.Stderr, err)
|
|
// In this case of a transient API error, we want keep the job.
|
|
if j == nil {
|
|
jobq.PushBack(job)
|
|
} else {
|
|
// Must reset head tracker if job is discarded.
|
|
if loop {
|
|
head = ""
|
|
backoff = BaseBackoff
|
|
}
|
|
}
|
|
continue
|
|
}
|
|
|
|
// Reassign with the updated job data (from Get).
|
|
// We don't use j here as Get might return nil for this value.
|
|
job = j
|
|
|
|
if job.Status.State != JobStatusDone {
|
|
jobq.PushBack(job)
|
|
continue
|
|
}
|
|
|
|
if res := job.Status.ErrorResult; res != nil {
|
|
fmt.Fprintln(os.Stderr, res.Message)
|
|
} else {
|
|
stat := job.Statistics
|
|
lstat := stat.Load
|
|
stats.files += 1
|
|
stats.bytesIn += lstat.InputFileBytes
|
|
stats.bytesOut += lstat.OutputBytes
|
|
stats.rows += lstat.OutputRows
|
|
stats.elapsed +=
|
|
time.Duration(stat.EndTime-stat.StartTime) * time.Millisecond
|
|
|
|
if stats.start.IsZero() {
|
|
stats.start = time.Unix(stat.StartTime/1000, 0)
|
|
} else {
|
|
t := time.Unix(stat.StartTime/1000, 0)
|
|
if stats.start.Sub(t) > 0 {
|
|
stats.start = t
|
|
}
|
|
}
|
|
|
|
if stats.finish.IsZero() {
|
|
stats.finish = time.Unix(stat.EndTime/1000, 0)
|
|
} else {
|
|
t := time.Unix(stat.EndTime/1000, 0)
|
|
if t.Sub(stats.finish) > 0 {
|
|
stats.finish = t
|
|
}
|
|
}
|
|
}
|
|
// When the head job is processed reset the backoff since the loads
|
|
// run in BQ in parallel.
|
|
if loop {
|
|
head = ""
|
|
backoff = BaseBackoff
|
|
}
|
|
}
|
|
|
|
fmt.Fprintf(os.Stderr, "%#v\n", stats)
|
|
}
|
|
|
|
type jobStats struct {
|
|
// Number of files (sources) loaded.
|
|
files int64
|
|
// Bytes read from source (possibly compressed).
|
|
bytesIn int64
|
|
// Bytes loaded into BigQuery (uncompressed).
|
|
bytesOut int64
|
|
// Rows loaded into BigQuery.
|
|
rows int64
|
|
// Time taken to load source into table.
|
|
elapsed time.Duration
|
|
// Start time of the job.
|
|
start time.Time
|
|
// End time of the job.
|
|
finish time.Time
|
|
}
|
|
|
|
func (s jobStats) GoString() string {
|
|
return fmt.Sprintf("\n%d files loaded in %v (%v). Size: %.2fGB Rows: %d\n",
|
|
s.files, s.finish.Sub(s.start), s.elapsed, float64(s.bytesOut)/GB,
|
|
s.rows)
|
|
}
|