diff --git a/internal/pkg/crawl/crawl.go b/internal/pkg/crawl/crawl.go index ba8d139f..1cbfd96a 100644 --- a/internal/pkg/crawl/crawl.go +++ b/internal/pkg/crawl/crawl.go @@ -161,6 +161,8 @@ func (c *Crawl) Start() (err error) { c.Log.WithFields(c.genLogFields(nil, nil, log.Fields)).Warn(log.Message) case logrus.InfoLevel: c.Log.WithFields(c.genLogFields(nil, nil, log.Fields)).Info(log.Message) + case logrus.DebugLevel: + c.Log.WithFields(c.genLogFields(nil, nil, log.Fields)).Debug(log.Message) } } }() diff --git a/internal/pkg/queue/dequeue.go b/internal/pkg/queue/dequeue.go index 8c331c27..5da6847b 100644 --- a/internal/pkg/queue/dequeue.go +++ b/internal/pkg/queue/dequeue.go @@ -3,6 +3,8 @@ package queue import ( "fmt" "io" + + "github.com/sirupsen/logrus" ) // Dequeue removes and returns the next item from the queue @@ -16,22 +18,30 @@ func (q *PersistentGroupedQueue) Dequeue() (*Item, error) { defer q.mutex.Unlock() for len(q.hostOrder) == 0 { - q.cond.Wait() // This unlocks the mutex while waiting + q.LoggingChan <- &LogMessage{ + Message: "Waiting for items to be enqueued", + Level: logrus.DebugLevel, + } + q.cond.Wait() } - // Loop through hosts until we find one with items or we've checked all hosts hostsChecked := 0 for hostsChecked < len(q.hostOrder) { host := q.hostOrder[q.currentHost] positions := q.hostIndex[host] + q.LoggingChan <- &LogMessage{ + Message: fmt.Sprintf("Checking host %s, positions: %d", host, len(positions)), + Level: logrus.DebugLevel, + } + if len(positions) == 0 { // Remove this host from the order and index q.hostOrder = append(q.hostOrder[:q.currentHost], q.hostOrder[q.currentHost+1:]...) delete(q.hostIndex, host) if len(q.hostOrder) == 0 { q.currentHost = 0 - continue // This will cause the outer loop to check again + continue } q.currentHost = q.currentHost % len(q.hostOrder) hostsChecked++ @@ -42,18 +52,22 @@ func (q *PersistentGroupedQueue) Dequeue() (*Item, error) { position := positions[0] q.hostIndex[host] = positions[1:] - // Seek to position and decode item + q.LoggingChan <- &LogMessage{ + Message: fmt.Sprintf("Dequeuing item at position %d for host %s", position, host), + Level: logrus.DebugLevel, + } + _, err := q.queueFile.Seek(int64(position), io.SeekStart) if err != nil { return nil, fmt.Errorf("failed to seek to item position: %w", err) } + var item Item err = q.queueDecoder.Decode(&item) if err != nil { return nil, fmt.Errorf("failed to decode item: %w", err) } - // Move to next host q.currentHost = (q.currentHost + 1) % len(q.hostOrder) // Update stats @@ -67,6 +81,8 @@ func (q *PersistentGroupedQueue) Dequeue() (*Item, error) { return &item, nil } + fmt.Println("After Loop") + // If we've checked all hosts and found no items, loop back to wait again return q.Dequeue() } diff --git a/internal/pkg/queue/queue.go b/internal/pkg/queue/queue.go index c09b3d6d..0b668b32 100644 --- a/internal/pkg/queue/queue.go +++ b/internal/pkg/queue/queue.go @@ -62,6 +62,10 @@ type Item struct { BypassSeencheck string } +func init() { + gob.Register(Item{}) +} + func NewPersistentGroupedQueue(queueDirPath string, loggingChan chan *LogMessage) (*PersistentGroupedQueue, error) { err := os.MkdirAll(queueDirPath, 0755) if err != nil { diff --git a/internal/pkg/queue/queue_test.go b/internal/pkg/queue/queue_test.go index caa89eb4..8481a1ab 100644 --- a/internal/pkg/queue/queue_test.go +++ b/internal/pkg/queue/queue_test.go @@ -1,10 +1,12 @@ package queue import ( + "fmt" "net/url" "os" "path" "testing" + "time" ) func TestNewItem(t *testing.T) { @@ -192,3 +194,63 @@ func TestPersistentGroupedQueue_Close(t *testing.T) { t.Errorf("Expected ErrQueueClosed on Enqueue after Close, got: %v", err) } } + +func TestLargeScaleEnqueueDequeue(t *testing.T) { + // Increase test timeout + t.Parallel() + + tempDir, err := os.MkdirTemp("", "queue_test") + if err != nil { + t.Fatalf("Failed to create temp dir: %v", err) + } + defer os.RemoveAll(tempDir) + + queuePath := path.Join(tempDir, "test_queue") + loggingChan := make(chan *LogMessage, 10000) + + q, err := NewPersistentGroupedQueue(queuePath, loggingChan) + if err != nil { + t.Fatalf("Failed to create new queue: %v", err) + } + defer q.Close() + + numItems := 1000 + hosts := []string{"example.com", "test.org", "sample.net", "demo.io"} + + // Enqueue items + startEnqueue := time.Now() + for i := 0; i < numItems; i++ { + host := hosts[i%len(hosts)] + u, _ := url.Parse(fmt.Sprintf("https://%s/page%d", host, i)) + item := NewItem(u, nil, "page", 1, fmt.Sprintf("id-%d", i), false) + err := q.Enqueue(item) + if err != nil { + t.Fatalf("Failed to enqueue item %d: %v", i, err) + } + } + enqueueTime := time.Since(startEnqueue) + t.Logf("Enqueue time for %d items: %v", numItems, enqueueTime) + + // Dequeue items + startDequeue := time.Now() + dequeuedItems := make(map[string]bool) + for i := 0; i < numItems; i++ { + item, err := q.Dequeue() + if err != nil { + t.Fatalf("Failed to dequeue item %d: %v", i, err) + } + if item == nil { + t.Fatalf("Dequeued nil item at position %d", i) + } + if dequeuedItems[item.ID] { + t.Errorf("Item with ID %s dequeued more than once", item.ID) + } + + dequeuedItems[item.ID] = true + } + dequeueTime := time.Since(startDequeue) + + t.Logf("Dequeue time for %d items: %v", numItems, dequeueTime) + t.Logf("Average enqueue time per item: %v", enqueueTime/time.Duration(numItems)) + t.Logf("Average dequeue time per item: %v", dequeueTime/time.Duration(numItems)) +}