From ccad7286ba18deecb105c5b4e8c2cf77d4d5ec6c Mon Sep 17 00:00:00 2001 From: volodymyr-bondarenko85 <131799099+volodymyr-bondarenko85@users.noreply.github.com> Date: Mon, 15 Jan 2024 17:00:34 +0200 Subject: [PATCH] BPK session close fix (#38) --- .../plugins/broadpeak/BroadpeakPlugin.java | 70 ++++++++++++++++--- 1 file changed, 62 insertions(+), 8 deletions(-) diff --git a/broadpeakplugin/src/main/java/com/kaltura/playkit/plugins/broadpeak/BroadpeakPlugin.java b/broadpeakplugin/src/main/java/com/kaltura/playkit/plugins/broadpeak/BroadpeakPlugin.java index 6f81559..48aee36 100644 --- a/broadpeakplugin/src/main/java/com/kaltura/playkit/plugins/broadpeak/BroadpeakPlugin.java +++ b/broadpeakplugin/src/main/java/com/kaltura/playkit/plugins/broadpeak/BroadpeakPlugin.java @@ -17,7 +17,9 @@ import com.kaltura.playkit.PlayerEvent; import com.kaltura.tvplayer.PKMediaEntryInterceptor; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import tv.broadpeak.smartlib.SmartLib; @@ -31,11 +33,21 @@ public class BroadpeakPlugin extends PKPlugin implements PKMediaEntryInterceptor private static final PKLog log = PKLog.get("BroadpeakPlugin"); private MessageBus messageBus; - private final Map sessionsMap = new HashMap<>(); + private final Map sessionsMap = new HashMap<>(); private Player player; private BroadpeakConfig config; private Context context; + static class StreamingSessionInfo { + private final StreamingSession session; + private boolean updateMediaReceived; + + public StreamingSessionInfo(StreamingSession session) { + this.session = session; + this.updateMediaReceived = false; + } + } + public static final Factory factory = new Factory() { @Override public String getName() { @@ -93,8 +105,9 @@ protected void onLoad(final Player player, Object config, final MessageBus messa this.messageBus.addListener(this, PlayerEvent.stopped, event -> { log.d("PlayerEvent stopped: calling stopStreamingSession"); + PlayerEvent.Stopped stoppedEvent = (PlayerEvent.Stopped)event; // Stop the session in case of Playback stop - stopCurrentStreamingSession(); + stopStreamingSession(stoppedEvent.mediaSourceUrl); }); } @@ -123,6 +136,16 @@ private void addGeneralConfig(BroadpeakConfig bpConfig) { @Override protected void onUpdateMedia(PKMediaConfig mediaConfig) { log.d("Start onUpdateMedia"); + if (mediaConfig != null + && mediaConfig.getMediaEntry() != null + && mediaConfig.getMediaEntry().getSources() != null + && mediaConfig.getMediaEntry().getSources().get(0) != null) { + String currentSession = mediaConfig.getMediaEntry().getSources().get(0).getUrl(); + onUpdateMediaReceivedForStreamingSession(currentSession); + cleanupRunningStreamingSessions(currentSession); + } else { + log.d("Empty mediaConfig, sessions cleanup skipped"); + } } @Override @@ -192,11 +215,42 @@ private void stopCurrentStreamingSession() { private void stopStreamingSession(String sessionKey) { log.d("stopStreamingSession called with sessionKey=[" + sessionKey + "]"); if (sessionKey != null && sessionsMap.containsKey(sessionKey)) { - StreamingSession session = sessionsMap.get(sessionKey); - if (session != null) { - session.stopStreamingSession(); + StreamingSessionInfo sessionInfo = sessionsMap.get(sessionKey); + if (sessionInfo != null) { + if (sessionInfo.updateMediaReceived) { + sessionInfo.session.stopStreamingSession(); + sessionsMap.remove(sessionKey); + } else { + log.d("Session not stopped, updateMediaReceived false"); + } + } else { + log.d("Session not stopped, sessionInfo is null"); } - sessionsMap.remove(sessionKey); + } else { + log.d("sessionMap entry not found"); + } + log.d("Finalizing stopStreamingSession call, number of active sessions = " + sessionsMap.size()); + } + + private void onUpdateMediaReceivedForStreamingSession(String currentSession) { + log.d("cleanupStreamingSessions called with currentSession=[" + currentSession + "]"); + if (currentSession != null && sessionsMap.containsKey(currentSession)) { + StreamingSessionInfo sessionInfo = sessionsMap.get(currentSession); + if (sessionInfo != null) { + sessionInfo.updateMediaReceived = true; + } + } + } + + private void cleanupRunningStreamingSessions(String currentSession) { + List cleanedUpSessions = new ArrayList<>(); + for (Map.Entry sessionEntry : sessionsMap.entrySet()) { + if (!sessionEntry.getKey().equals(currentSession)) { + cleanedUpSessions.add(sessionEntry.getKey()); + } + } + for (String sessionKey : cleanedUpSessions) { + stopStreamingSession(sessionKey); } } @@ -228,7 +282,6 @@ public void apply(PKMediaEntry mediaEntry, PKMediaEntryInterceptor.Listener list sendBroadpeakErrorEvent(errorCode, errorMessage); return; } - sessionsMap.put(source.getUrl(), session); addSessionConfig(session); session.attachPlayer(player, messageBus); @@ -240,13 +293,14 @@ public void apply(PKMediaEntry mediaEntry, PKMediaEntryInterceptor.Listener list // Replace the URL log.d("Apply New Entry URL " + mediaEntry.getName() + " - " + mediaEntry.getId() + " url: " + result.getURL()); source.setUrl(result.getURL()); + sessionsMap.put(source.getUrl(), new StreamingSessionInfo(session)); } else { // Stop the session in case of error if (result != null) { errorCode = result.getErrorCode(); errorMessage = result.getErrorMessage(); } - stopStreamingSession(source.getUrl()); + session.stopStreamingSession(); // send event to MessageBus sendBroadpeakErrorEvent(errorCode, errorMessage); }