Skip to content

Commit

Permalink
Merge pull request #14 from Parker-Kasiewicz/skyfall_enhancements
Browse files Browse the repository at this point in the history
switching over to BigQuery storage API
  • Loading branch information
lxcode authored Dec 7, 2024
2 parents bfe9547 + d78f495 commit 27dea95
Showing 1 changed file with 197 additions and 72 deletions.
269 changes: 197 additions & 72 deletions pkg/output/bq/bq.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,19 @@ import (
"fmt"
"sort"
"strings"
"time"

"cloud.google.com/go/bigquery"
"cloud.google.com/go/bigquery/storage/managedwriter"
adapt "cloud.google.com/go/bigquery/storage/managedwriter/adapt"
log "github.com/sirupsen/logrus"
bq_schema "github.com/stanfordio/skyfall/pkg/output/bq/schema"
"google.golang.org/api/iterator"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/dynamicpb"
)

type BQ struct {
Expand Down Expand Up @@ -162,120 +170,237 @@ func (bq BQ) GetBackfillSeqno() (int64, error) {
return maxSeq, nil
}

// Clean the map values to handle pointers, nested structures, and key transformations
func cleanOutput(value map[string]interface{}) map[string]interface{} {
for k, v := range value {
// Check if the key starts with "$" and replace it with "_"
// Handle pointer dereferencing
switch v := v.(type) {
case *string:
value[k] = derefString(v)
case *int64:
value[k] = derefInt64(v)
case *float64:
value[k] = derefFloat64(v)
}

// Transform keys starting with "$" to "_"
if strings.HasPrefix(k, "$") {
newKey := "_" + k[1:]
value[newKey] = v
value[newKey] = value[k]
delete(value, k)
}

// If the value is a map, recursively clean it
// Recursively clean nested maps
if subMap, ok := v.(map[string]interface{}); ok {
value[k] = cleanOutput(subMap)
}

// Handle specific fields like timestamps
if k == "CreatedAt" || k == "PulledTimestamp" || k == "IndexedAt" {
value[k] = parseTimestamp(value[k])
}

// Convert "Full" to JSON string if it's a map
if k == "Full" {
value[k] = convertFullToJSON(value[k])
}
}
return value
}

type MapValueSaver struct {
Data map[string]interface{}
func derefString(p *string) string {
if p != nil {
return *p
}
return ""
}

func (mvs MapValueSaver) Save() (row map[string]bigquery.Value, insertID string, err error) {
row = make(map[string]bigquery.Value)
for k, v := range mvs.Data {
row[k] = v
func derefInt64(p *int64) int64 {
if p != nil {
return *p
}
// Generate a unique insertID if needed, or return "" if not
insertID = ""
return row, insertID, nil
return 0
}

func prepareForWrite(value map[string]interface{}) (out *MapValueSaver, err error) {
// Set "Full" to the JSON representation of "Full"
fullMarshalled, err := json.Marshal(value["Full"])
if err != nil {
log.Errorf("Failed to marshal event: %+v", err)
return nil, err
func derefFloat64(p *float64) float64 {
if p != nil {
return *p
}
value["Full"] = string(fullMarshalled)
return 0.0
}

cleaned := cleanOutput(value)
func parseTimestamp(v interface{}) int64 {
if t, ok := v.(string); ok {
parsedTime, err := time.Parse(time.RFC3339, t)
if err != nil {
log.Printf("Invalid timestamp format: %v", err)
return 0
}
return parsedTime.Unix()
}
return 0
}

// Insert the "_Raw" field as a string
json, err := json.Marshal(cleaned)
if err != nil {
log.Errorf("Failed to marshal event: %+v", err)
return nil, err
func convertFullToJSON(v interface{}) string {
switch v := v.(type) {
case map[string]interface{}:
fullMarshalled, err := json.Marshal(v)
if err != nil {
log.Printf("Failed to marshal 'Full' field: %v", err)
return ""
}
return string(fullMarshalled)
case string:
return v
default:
return fmt.Sprintf("%v", v)
}
}

cleaned["_Raw"] = string(json)
func prepareForWrite(value map[string]interface{}) ([]byte, error) {
cleaned := cleanOutput(value)

// Remove _Raw field at the root level
delete(cleaned, "_Raw")

mvs := MapValueSaver{Data: cleaned} // Heap allocated, thanks Go.
// Remove ReplyCount field under Projection -> LikedPost
if projection, ok := cleaned["Projection"].(map[string]interface{}); ok {
if likedPost, ok := projection["LikedPost"].(map[string]interface{}); ok {
delete(likedPost, "ReplyCount")
}
// Remove ReplyCount field under Projection -> RepostedPost
if repostedPost, ok := projection["RepostedPost"].(map[string]interface{}); ok {
delete(repostedPost, "ReplyCount")
}
}

return &mvs, nil
// Marshal the cleaned map into JSON bytes
return json.Marshal(cleaned)
}

// Streams data to BigQuery
func (bq BQ) StreamOutput(ctx context.Context) error {
log.Infof("Streaming output to BigQuery table: %+v", bq.OutputTable.FullyQualifiedName())
log.Infof("Starting to stream output to BigQuery table: %+v", bq.OutputTable.FullyQualifiedName())

_, cancel := context.WithCancel(ctx)
ctx, cancel := context.WithCancel(ctx)
defer cancel()

inserter := bq.OutputTable.Inserter()
inserter.IgnoreUnknownValues = true

MainLoop:
for {
var values []map[string]interface{}
// Set up the schema descriptors
tableSchema := bq_schema.GetSchema()
messageDescriptor, descriptor, err := setupDynamicDescriptors(tableSchema)
if err != nil {
log.Errorf("Failed to create descriptors: %v", err)
return err
}

// Wait for at least one value in the channel
value, ok := <-bq.OutputChannel
if !ok {
break MainLoop // Channel is closed
}
values = append(values, value)

// Collect remaining values from the channel until it is empty
ChannelCollector:
for {
select {
case value, ok := <-bq.OutputChannel:
if !ok {
break MainLoop // Channel is closed
}
client, err := managedwriter.NewClient(ctx, bigquery.DetectProjectID)
if err != nil {
log.Errorf("Failed to create managed writer client: %v", err)
return err
}
defer client.Close()

values = append(values, value)
destinationTable := fmt.Sprintf("projects/%s/datasets/%s/tables/%s", bq.OutputTable.ProjectID, bq.OutputTable.DatasetID, bq.OutputTable.TableID)
managedStream, err := client.NewManagedStream(ctx,
managedwriter.WithSchemaDescriptor(descriptor),
managedwriter.WithDestinationTable(destinationTable),
)
if err != nil {
log.Errorf("Failed to create managed stream: %v", err)
return err
}

// If there are more than 500 values, write them now
if len(values) >= 500 {
break ChannelCollector
// Stream processing loop
var buffer []map[string]interface{}
for {
select {
case value, ok := <-bq.OutputChannel:
if !ok {
log.Info("Channel closed, exiting.")
return nil
}
buffer = append(buffer, value)

// Flush if buffer size reaches threshold
if len(buffer) >= 250 {
if err := bq.flushBuffer(ctx, managedStream, messageDescriptor, buffer); err != nil {
log.Errorf("Failed to flush buffer: %v", err)
continue
} else {
// Log the number of rows uploaded to BigQuery
log.Infof("Buffer flushed! Rows uploaded: %d", len(buffer))
}
default:
break ChannelCollector // Channel is empty
buffer = nil // Clear the buffer after flushing
}

case <-ctx.Done():
log.Warn("Context canceled, stopping.")
return ctx.Err()
}
}
}

func (bq BQ) flushBuffer(ctx context.Context, managedStream *managedwriter.ManagedStream, descriptor protoreflect.MessageDescriptor, buffer []map[string]interface{}) error {
var encodedRows [][]byte
for _, value := range buffer {
preparedValue, err := prepareForWrite(value)
if err != nil {
log.Errorf("Failed to prepare value for writing: %+v", err)
continue
}

// Prepare the values for writing
var preparedValues []*MapValueSaver
for _, value := range values {
preparedValue, err := prepareForWrite(value)
if err != nil {
log.Errorf("Failed to prepare value for writing: %+v", err)
continue
}
preparedValues = append(preparedValues, preparedValue)
message := dynamicpb.NewMessage(descriptor)
if err := protojson.Unmarshal(preparedValue, message); err != nil {
log.Errorf("Failed to unmarshal into proto message: %+v", err)
continue
}

// Do the write in a batch (not a BigQuery *batch*, just, like, a
// semantic batch)
if err := inserter.Put(ctx, preparedValues); err != nil {
log.Errorf("Failed to write output: %+v", err)
encodedRow, err := proto.Marshal(message)
if err != nil {
log.Errorf("Failed to marshal proto message: %+v", err)
continue
}
log.Infof("Wrote %d rows to BigQuery", len(values))
encodedRows = append(encodedRows, encodedRow)
}

return nil
if len(encodedRows) == 0 {
log.Warn("No rows to write.")
return nil
}

result, err := managedStream.AppendRows(ctx, encodedRows)
if err != nil {
return fmt.Errorf("failed to append rows: %w", err)
}

_, err = result.GetResult(ctx)
return err
}

func setupDynamicDescriptors(schema bigquery.Schema) (protoreflect.MessageDescriptor, *descriptorpb.DescriptorProto, error) {
// Convert BigQuery schema to storage schema
convertedSchema, err := adapt.BQSchemaToStorageTableSchema(schema)
if err != nil {
return nil, nil, fmt.Errorf("failed to convert BigQuery schema to storage schema: %v", err)
}

// Convert storage schema to proto2 descriptor
descriptor, err := adapt.StorageSchemaToProto2Descriptor(convertedSchema, "root")
if err != nil {
return nil, nil, fmt.Errorf("failed to convert storage schema to proto2 descriptor: %v", err)
}

// Ensure the descriptor is a MessageDescriptor
messageDescriptor, ok := descriptor.(protoreflect.MessageDescriptor)
if !ok {
return nil, nil, fmt.Errorf("descriptor is not a MessageDescriptor")
}

// Normalize the descriptor
dp, err := adapt.NormalizeDescriptor(messageDescriptor)
if err != nil {
return nil, nil, fmt.Errorf("failed to normalize descriptor: %v", err)
}

return messageDescriptor, dp, nil
}

0 comments on commit 27dea95

Please sign in to comment.