diff --git a/pkg/hydrator/hydrator.go b/pkg/hydrator/hydrator.go index 7b9d90e..1a9eebf 100644 --- a/pkg/hydrator/hydrator.go +++ b/pkg/hydrator/hydrator.go @@ -255,6 +255,29 @@ func (h *Hydrator) flattenFullProfile(profile *bsky.ActorDefs_ProfileViewDetaile return } +func (h *Hydrator) flattenFacets(facets []*bsky.RichtextFacet) (hashtags []string, urls []string) { + hashtags = []string{} + urls = []string{} + if facets != nil { + for _, facet := range facets { + if facet != nil { + features := facet.Features + for _, feature := range features { + if feature.RichtextFacet_Tag != nil { + tag := feature.RichtextFacet_Tag.Tag + hashtags = append(hashtags, tag) + } + if feature.RichtextFacet_Link != nil { + url := feature.RichtextFacet_Link.Uri + urls = append(urls, url) + } + } + } + } + } + return +} + func (h *Hydrator) flattenPostView(post *bsky.FeedDefs_PostView) (result map[string]interface{}) { if post == nil { return nil @@ -287,6 +310,10 @@ func (h *Hydrator) flattenPostView(post *bsky.FeedDefs_PostView) (result map[str result["Embed"] = h.flattenEmbed(rec.Embed) } + hashtags, urls := h.flattenFacets(rec.Facets) + result["Hashtags"] = hashtags + result["URLs"] = urls + return } @@ -316,6 +343,10 @@ func (h *Hydrator) flattenPost(post *bsky.FeedPost) (result map[string]interface result["Embed"] = h.flattenEmbed(post.Embed) } + hashtags, urls := h.flattenFacets(post.Facets) + result["Hashtags"] = hashtags + result["URLs"] = urls + return } diff --git a/pkg/output/bq/bq.go b/pkg/output/bq/bq.go index 03a140c..51bac1a 100644 --- a/pkg/output/bq/bq.go +++ b/pkg/output/bq/bq.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "sort" "strings" "cloud.google.com/go/bigquery" @@ -52,8 +53,21 @@ func New(ctx context.Context, tablePath string, outputChannel chan map[string]in return &bq, nil } +func sortSchema(schema bigquery.Schema) { + sort.Slice(schema, func(i, j int) bool { + return schema[i].Name < schema[j].Name + }) + for _, field := range schema { + if field.Type == bigquery.RecordFieldType { + sortSchema(field.Schema) + } + } +} + // Helper function to compare two schemas func schemasAreEqual(schema1, schema2 bigquery.Schema) bool { + sortSchema(schema1) + sortSchema(schema2) if len(schema1) != len(schema2) { return false } diff --git a/pkg/output/bq/schema/schema.go b/pkg/output/bq/schema/schema.go index 31007c3..7f6d72c 100644 --- a/pkg/output/bq/schema/schema.go +++ b/pkg/output/bq/schema/schema.go @@ -276,6 +276,16 @@ func GetSchema() bigquery.Schema { "name": "RepostCount", "type": "INTEGER" }, + { + "mode": "REPEATED", + "name": "Hashtags", + "type": "STRING" + }, + { + "mode": "REPEATED", + "name": "URLs", + "type": "STRING" + }, { "name": "Text", "type": "STRING" @@ -399,6 +409,16 @@ func GetSchema() bigquery.Schema { "name": "ReplyParentCID", "type": "STRING" }, + { + "mode": "REPEATED", + "name": "Hashtags", + "type": "STRING" + }, + { + "mode": "REPEATED", + "name": "URLs", + "type": "STRING" + }, { "name": "Text", "type": "STRING" @@ -566,6 +586,16 @@ func GetSchema() bigquery.Schema { "name": "RepostCount", "type": "INTEGER" }, + { + "mode": "REPEATED", + "name": "Hashtags", + "type": "STRING" + }, + { + "mode": "REPEATED", + "name": "URLs", + "type": "STRING" + }, { "name": "Text", "type": "STRING"