Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

switching over to BigQuery storage API #14

Merged
merged 1 commit into from
Dec 7, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
269 changes: 197 additions & 72 deletions pkg/output/bq/bq.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,19 @@ import (
"fmt"
"sort"
"strings"
"time"

"cloud.google.com/go/bigquery"
"cloud.google.com/go/bigquery/storage/managedwriter"
adapt "cloud.google.com/go/bigquery/storage/managedwriter/adapt"
log "github.com/sirupsen/logrus"
bq_schema "github.com/stanfordio/skyfall/pkg/output/bq/schema"
"google.golang.org/api/iterator"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/dynamicpb"
)

type BQ struct {
Expand Down Expand Up @@ -162,120 +170,237 @@ func (bq BQ) GetBackfillSeqno() (int64, error) {
return maxSeq, nil
}

// Clean the map values to handle pointers, nested structures, and key transformations
func cleanOutput(value map[string]interface{}) map[string]interface{} {
for k, v := range value {
// Check if the key starts with "$" and replace it with "_"
// Handle pointer dereferencing
switch v := v.(type) {
case *string:
value[k] = derefString(v)
case *int64:
value[k] = derefInt64(v)
case *float64:
value[k] = derefFloat64(v)
}

// Transform keys starting with "$" to "_"
if strings.HasPrefix(k, "$") {
newKey := "_" + k[1:]
value[newKey] = v
value[newKey] = value[k]
delete(value, k)
}

// If the value is a map, recursively clean it
// Recursively clean nested maps
if subMap, ok := v.(map[string]interface{}); ok {
value[k] = cleanOutput(subMap)
}

// Handle specific fields like timestamps
if k == "CreatedAt" || k == "PulledTimestamp" || k == "IndexedAt" {
value[k] = parseTimestamp(value[k])
}

// Convert "Full" to JSON string if it's a map
if k == "Full" {
value[k] = convertFullToJSON(value[k])
}
}
return value
}

type MapValueSaver struct {
Data map[string]interface{}
func derefString(p *string) string {
if p != nil {
return *p
}
return ""
}

func (mvs MapValueSaver) Save() (row map[string]bigquery.Value, insertID string, err error) {
row = make(map[string]bigquery.Value)
for k, v := range mvs.Data {
row[k] = v
func derefInt64(p *int64) int64 {
if p != nil {
return *p
}
// Generate a unique insertID if needed, or return "" if not
insertID = ""
return row, insertID, nil
return 0
}

func prepareForWrite(value map[string]interface{}) (out *MapValueSaver, err error) {
// Set "Full" to the JSON representation of "Full"
fullMarshalled, err := json.Marshal(value["Full"])
if err != nil {
log.Errorf("Failed to marshal event: %+v", err)
return nil, err
func derefFloat64(p *float64) float64 {
if p != nil {
return *p
}
value["Full"] = string(fullMarshalled)
return 0.0
}

cleaned := cleanOutput(value)
func parseTimestamp(v interface{}) int64 {
if t, ok := v.(string); ok {
parsedTime, err := time.Parse(time.RFC3339, t)
if err != nil {
log.Printf("Invalid timestamp format: %v", err)
return 0
}
return parsedTime.Unix()
}
return 0
}

// Insert the "_Raw" field as a string
json, err := json.Marshal(cleaned)
if err != nil {
log.Errorf("Failed to marshal event: %+v", err)
return nil, err
func convertFullToJSON(v interface{}) string {
switch v := v.(type) {
case map[string]interface{}:
fullMarshalled, err := json.Marshal(v)
if err != nil {
log.Printf("Failed to marshal 'Full' field: %v", err)
return ""
}
return string(fullMarshalled)
case string:
return v
default:
return fmt.Sprintf("%v", v)
}
}

cleaned["_Raw"] = string(json)
func prepareForWrite(value map[string]interface{}) ([]byte, error) {
cleaned := cleanOutput(value)

// Remove _Raw field at the root level
delete(cleaned, "_Raw")

mvs := MapValueSaver{Data: cleaned} // Heap allocated, thanks Go.
// Remove ReplyCount field under Projection -> LikedPost
if projection, ok := cleaned["Projection"].(map[string]interface{}); ok {
if likedPost, ok := projection["LikedPost"].(map[string]interface{}); ok {
delete(likedPost, "ReplyCount")
}
// Remove ReplyCount field under Projection -> RepostedPost
if repostedPost, ok := projection["RepostedPost"].(map[string]interface{}); ok {
delete(repostedPost, "ReplyCount")
}
}

return &mvs, nil
// Marshal the cleaned map into JSON bytes
return json.Marshal(cleaned)
}

// Streams data to BigQuery
func (bq BQ) StreamOutput(ctx context.Context) error {
log.Infof("Streaming output to BigQuery table: %+v", bq.OutputTable.FullyQualifiedName())
log.Infof("Starting to stream output to BigQuery table: %+v", bq.OutputTable.FullyQualifiedName())

_, cancel := context.WithCancel(ctx)
ctx, cancel := context.WithCancel(ctx)
defer cancel()

inserter := bq.OutputTable.Inserter()
inserter.IgnoreUnknownValues = true

MainLoop:
for {
var values []map[string]interface{}
// Set up the schema descriptors
tableSchema := bq_schema.GetSchema()
messageDescriptor, descriptor, err := setupDynamicDescriptors(tableSchema)
if err != nil {
log.Errorf("Failed to create descriptors: %v", err)
return err
}

// Wait for at least one value in the channel
value, ok := <-bq.OutputChannel
if !ok {
break MainLoop // Channel is closed
}
values = append(values, value)

// Collect remaining values from the channel until it is empty
ChannelCollector:
for {
select {
case value, ok := <-bq.OutputChannel:
if !ok {
break MainLoop // Channel is closed
}
client, err := managedwriter.NewClient(ctx, bigquery.DetectProjectID)
if err != nil {
log.Errorf("Failed to create managed writer client: %v", err)
return err
}
defer client.Close()

values = append(values, value)
destinationTable := fmt.Sprintf("projects/%s/datasets/%s/tables/%s", bq.OutputTable.ProjectID, bq.OutputTable.DatasetID, bq.OutputTable.TableID)
managedStream, err := client.NewManagedStream(ctx,
managedwriter.WithSchemaDescriptor(descriptor),
managedwriter.WithDestinationTable(destinationTable),
)
if err != nil {
log.Errorf("Failed to create managed stream: %v", err)
return err
}

// If there are more than 500 values, write them now
if len(values) >= 500 {
break ChannelCollector
// Stream processing loop
var buffer []map[string]interface{}
for {
select {
case value, ok := <-bq.OutputChannel:
if !ok {
log.Info("Channel closed, exiting.")
return nil
}
buffer = append(buffer, value)

// Flush if buffer size reaches threshold
if len(buffer) >= 250 {
if err := bq.flushBuffer(ctx, managedStream, messageDescriptor, buffer); err != nil {
log.Errorf("Failed to flush buffer: %v", err)
continue
} else {
// Log the number of rows uploaded to BigQuery
log.Infof("Buffer flushed! Rows uploaded: %d", len(buffer))
}
default:
break ChannelCollector // Channel is empty
buffer = nil // Clear the buffer after flushing
}

case <-ctx.Done():
log.Warn("Context canceled, stopping.")
return ctx.Err()
}
}
}

func (bq BQ) flushBuffer(ctx context.Context, managedStream *managedwriter.ManagedStream, descriptor protoreflect.MessageDescriptor, buffer []map[string]interface{}) error {
var encodedRows [][]byte
for _, value := range buffer {
preparedValue, err := prepareForWrite(value)
if err != nil {
log.Errorf("Failed to prepare value for writing: %+v", err)
continue
}

// Prepare the values for writing
var preparedValues []*MapValueSaver
for _, value := range values {
preparedValue, err := prepareForWrite(value)
if err != nil {
log.Errorf("Failed to prepare value for writing: %+v", err)
continue
}
preparedValues = append(preparedValues, preparedValue)
message := dynamicpb.NewMessage(descriptor)
if err := protojson.Unmarshal(preparedValue, message); err != nil {
log.Errorf("Failed to unmarshal into proto message: %+v", err)
continue
}

// Do the write in a batch (not a BigQuery *batch*, just, like, a
// semantic batch)
if err := inserter.Put(ctx, preparedValues); err != nil {
log.Errorf("Failed to write output: %+v", err)
encodedRow, err := proto.Marshal(message)
if err != nil {
log.Errorf("Failed to marshal proto message: %+v", err)
continue
}
log.Infof("Wrote %d rows to BigQuery", len(values))
encodedRows = append(encodedRows, encodedRow)
}

return nil
if len(encodedRows) == 0 {
log.Warn("No rows to write.")
return nil
}

result, err := managedStream.AppendRows(ctx, encodedRows)
if err != nil {
return fmt.Errorf("failed to append rows: %w", err)
}

_, err = result.GetResult(ctx)
return err
}

func setupDynamicDescriptors(schema bigquery.Schema) (protoreflect.MessageDescriptor, *descriptorpb.DescriptorProto, error) {
// Convert BigQuery schema to storage schema
convertedSchema, err := adapt.BQSchemaToStorageTableSchema(schema)
if err != nil {
return nil, nil, fmt.Errorf("failed to convert BigQuery schema to storage schema: %v", err)
}

// Convert storage schema to proto2 descriptor
descriptor, err := adapt.StorageSchemaToProto2Descriptor(convertedSchema, "root")
if err != nil {
return nil, nil, fmt.Errorf("failed to convert storage schema to proto2 descriptor: %v", err)
}

// Ensure the descriptor is a MessageDescriptor
messageDescriptor, ok := descriptor.(protoreflect.MessageDescriptor)
if !ok {
return nil, nil, fmt.Errorf("descriptor is not a MessageDescriptor")
}

// Normalize the descriptor
dp, err := adapt.NormalizeDescriptor(messageDescriptor)
if err != nil {
return nil, nil, fmt.Errorf("failed to normalize descriptor: %v", err)
}

return messageDescriptor, dp, nil
}
Loading