Skip to content

Commit

Permalink
patch[vod]: Add Time Limit On S3 Segment Read
Browse files Browse the repository at this point in the history
Add a time limit to reading segments from S3 so that the segment reader threads are not locked up by network issues.
  • Loading branch information
alwitt committed Jun 23, 2024
1 parent 1cffa1c commit f886681
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 10 deletions.
6 changes: 5 additions & 1 deletion bin/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,11 @@ func DefineControlNode(

// Define segment reader
vodSegmentReader, err := utils.NewSegmentReader(
parentCtxt, config.VODConfig.SegmentReaderWorkerCount, s3Client, metrics,
parentCtxt,
config.VODConfig.SegmentReaderWorkerCount,
config.VODConfig.SegmentReadMaxWaitTime(),
s3Client,
metrics,
)
if err != nil {
log.WithError(err).WithFields(logTags).Error("Failed to create segment reader")
Expand Down
2 changes: 1 addition & 1 deletion bin/edge.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ func DefineEdgeNode(

// Define video segment reader
theNode.segmentReader, err = utils.NewSegmentReader(
parentCtxt, config.MonitorConfig.SegmentReaderWorkerCount, nil, metrics,
parentCtxt, config.MonitorConfig.SegmentReaderWorkerCount, 0, nil, metrics,
)
if err != nil {
log.WithError(err).WithFields(logTags).Error("Failed to create video segment reader")
Expand Down
9 changes: 9 additions & 0 deletions common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,13 +406,20 @@ type CentralVODServerConfig struct {
RecordingStorage RecordingStorageConfig `mapstructure:"recordingStorage" json:"recordingStorage" validate:"required,dive"`
// SegmentReaderWorkerCount number of worker threads in the video segment reader
SegmentReaderWorkerCount int `mapstructure:"segmentReaderWorkerCount" json:"segmentReaderWorkerCount" validate:"gte=2,lte=64"`
// SegmentReadMaxWaitTimeInSec max time in second to complete a segment read
SegmentReadMaxWaitTimeInSec uint32 `mapstructure:"segmentReadMaxWaitTimeInSec" json:"segmentReadMaxWaitTimeInSec" validate:"gte=10,lte=300"`
}

// SegReceiverTrackingWindow convert SegReceiverTrackingWindowInSec to time.Duration
func (c CentralVODServerConfig) SegReceiverTrackingWindow() time.Duration {
return time.Second * time.Duration(c.SegReceiverTrackingWindowInSec)
}

// SegmentReadMaxWaitTime convert SegmentReadMaxWaitTimeInSec to time.Duration
func (c CentralVODServerConfig) SegmentReadMaxWaitTime() time.Duration {
return time.Second * time.Duration(c.SegmentReadMaxWaitTimeInSec)
}

// ===============================================================================
// Complete Configuration Structures

Expand Down Expand Up @@ -526,6 +533,8 @@ func InstallDefaultControlNodeConfigValues() {
viper.SetDefault("vod.segmentReceiverTrackingWindow", 60)
// Default segment reader worker threads
viper.SetDefault("vod.segmentReaderWorkerCount", 4)
// Default max time for segment read
viper.SetDefault("vod.segmentReadMaxWaitTimeInSec", 30)

// Default broadcast channel config
viper.SetDefault("broadcast.pubsub.msgTTL", 600)
Expand Down
3 changes: 3 additions & 0 deletions ref/example-control-node-cfg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,9 @@ vod:
# Number of worker threads in the video segment reader
segmentReaderWorkerCount: 4

# Max time in second to complete a segment read
segmentReadMaxWaitTimeInSec: 30

#########################################################################################
# System broadcast channel configuration
broadcast:
Expand Down
3 changes: 3 additions & 0 deletions ref/example-control-node-docker-cfg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,9 @@ vod:
# Number of worker threads in the video segment reader
segmentReaderWorkerCount: 4

# Max time in second to complete a segment read
segmentReadMaxWaitTimeInSec: 30

#########################################################################################
# System broadcast channel configuration
broadcast:
Expand Down
19 changes: 14 additions & 5 deletions utils/segment_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type segmentReader struct {
workerContext context.Context
workerCtxtCancel context.CancelFunc
s3 S3Client
maxSegReadTime time.Duration

/* Metrics Collection Agents */
segmentIOMetrics SegmentMetricsAgent
Expand All @@ -60,19 +61,24 @@ NewSegmentReader define new SegmentReader
@param parentContext context.Context - context from which to define the worker context
@param workerCount int - number of parallel read worker to define
@param maxSegReadTime time.Duration - max time allowed to completed a segment read
@param s3 S3Client - S3 client for operating against the S3 server
@param metrics goutils.MetricsCollector - metrics framework client
@return new SegmentReader
*/
func NewSegmentReader(
parentContext context.Context, workerCount int, s3 S3Client, metrics goutils.MetricsCollector,
parentContext context.Context,
workerCount int,
maxSegReadTime time.Duration,
s3 S3Client,
metrics goutils.MetricsCollector,
) (SegmentReader, error) {
logTags := log.Fields{
"module": "utils",
"component": "hls-video-segment-reader",
}
workers, err := goutils.GetNewTaskDemuxProcessorInstance(
parentContext, "segment-readers", workerCount*2, workerCount, logTags,
parentContext, "segment-readers", workerCount, workerCount, logTags,
)
if err != nil {
log.WithError(err).WithFields(logTags).Error("Unable to define worker thread pool")
Expand All @@ -93,6 +99,7 @@ func NewSegmentReader(
workerContext: workerCtxt,
workerCtxtCancel: cancel,
s3: s3,
maxSegReadTime: maxSegReadTime,
segmentIOMetrics: nil,
}

Expand Down Expand Up @@ -306,6 +313,8 @@ func CleanupObjectKey(orig string) string {
func (r *segmentReader) readSegmentFromS3(
segment common.VideoSegment, fetchURI *url.URL, returnCB SegmentReturnCallback,
) error {
readCtxt, readCtxtCancel := context.WithTimeout(r.workerContext, r.maxSegReadTime)
defer readCtxtCancel()
logTags := r.GetLogTagsForContext(r.workerContext)

if r.s3 == nil {
Expand All @@ -328,10 +337,10 @@ func (r *segmentReader) readSegmentFromS3(
WithField("source-id", segment.SourceID).
WithField("segemnt", segment.Name).
WithField("segment-uri", fetchURI.String()).
Debug("Fetching segment from S3")
Info("Fetching segment from S3")

startTime := time.Now().UTC()
content, err := r.s3.GetObject(r.workerContext, sourceBucket, segmentObjectKey)
content, err := r.s3.GetObject(readCtxt, sourceBucket, segmentObjectKey)
if err != nil {
log.
WithError(err).
Expand All @@ -350,7 +359,7 @@ func (r *segmentReader) readSegmentFromS3(
WithField("segemnt", segment.Name).
WithField("segment-uri", fetchURI.String()).
WithField("length", len(content)).
Debug("Read segment from S3")
Info("Read segment from S3")

if err := returnCB(r.workerContext, segment.ID, content); err != nil {
log.
Expand Down
6 changes: 3 additions & 3 deletions utils/segment_fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestReadingSegmentFromFile(t *testing.T) {

mockS3 := mocks.NewS3Client(t)

uut, err := utils.NewSegmentReader(utCtxt, 2, mockS3, nil)
uut, err := utils.NewSegmentReader(utCtxt, 2, 0, mockS3, nil)
assert.Nil(err)

// Define test file
Expand Down Expand Up @@ -86,7 +86,7 @@ func TestReadingSegmentFromS3(t *testing.T) {

mockS3 := mocks.NewS3Client(t)

uut, err := utils.NewSegmentReader(utCtxt, 2, mockS3, nil)
uut, err := utils.NewSegmentReader(utCtxt, 2, time.Minute, mockS3, nil)
assert.Nil(err)

// Prepare result callback
Expand All @@ -108,7 +108,7 @@ func TestReadingSegmentFromS3(t *testing.T) {
// Prepare mock
mockS3.On(
"GetObject",
mock.AnythingOfType("*context.cancelCtx"),
mock.AnythingOfType("*context.timerCtx"),
testBucket,
testObjectPath,
).Return(testContent, nil).Once()
Expand Down

0 comments on commit f886681

Please sign in to comment.