Skip to content

Commit

Permalink
updated code base to handle missing event header that occurs randomly.
Browse files Browse the repository at this point in the history
updated code base to handle invalid messages by triggering a dispose delegate to allow developers to handle things accordingly
updated code base to handle filtering events on sockets.
  • Loading branch information
roger.castaldo@gmail.com committed Mar 18, 2013
1 parent 39f95ce commit 2bdf53a
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 23 deletions.
Binary file modified FreeswitchConfigSockets.suo
Binary file not shown.
89 changes: 66 additions & 23 deletions Library/ASocket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,14 @@ internal static bool StringsEqual(string str1, string str2)
private const string MESSAGE_END_STRING = "\n\n";
private const string REGISTER_EVENT_COMMAND = "event {0}";
private const string REMOVE_EVENT_COMMAND = "nixevent {0}";
private const string EVENT_FILTER_COMMAND = "filter {0} {1}";
private const string REMOVE_EVENT_FILTER_COMMAND = "filter delete {0} {1}";
private const string AUTH_COMMAND = "auth {0}";
private const string BACKGROUND_API_RESPONSE_EVENT = "SWITCH_EVENT_BACKGROUND_JOB";
private const string API_ISSUE_COMMAND = "bgapi {0}";

public delegate void delProcessEventMessage(SocketEvent message);
public delegate void delDisposeInvalidMessage(string message);

private Socket _socket;
protected Socket socket
Expand Down Expand Up @@ -137,6 +140,12 @@ public FreeSwitchLogLevels LogLevel
private Thread _backgroundProcessor;
private Thread _backgroundDataReader;
private ManualResetEvent _mreMessageWaiting;
private delDisposeInvalidMessage _disposeInvalidMesssage;
public delDisposeInvalidMessage DisposeInvalidMessage
{
get { return _disposeInvalidMesssage; }
set { _disposeInvalidMesssage = value; }
}

protected ASocket(Socket socket)
{
Expand Down Expand Up @@ -334,6 +343,16 @@ public void UnRegister(string eventName)
_sendCommand(string.Format(REMOVE_EVENT_COMMAND, eventName));
}

public void RegisterEventFilter(string fieldName, string fieldValue)
{
_sendCommand(string.Format(EVENT_FILTER_COMMAND, fieldName, fieldValue));
}

public void UnRegisterEventFilter(string fieldName, string fieldValue)
{
_sendCommand(string.Format(REMOVE_EVENT_FILTER_COMMAND, fieldName, fieldValue));
}

protected abstract void _processMessageQueue(Queue<ASocketMessage> messages);
protected abstract void _close();
protected abstract void _preSocketReady();
Expand Down Expand Up @@ -408,6 +427,22 @@ private void _MessageProcessorStart()
_processingMessages.RemoveAt(0);
Dictionary<string, string> pars = ASocketMessage.ParseProperties(origMsg);
string subMsg = "";
//fail safe for delayed header
if (!pars.ContainsKey("Content-Type"))
{
if (pars.ContainsKey("Event-Name"))
{
_processingMessages.Insert(0, origMsg);
origMsg = "Content-Type:text/event-plain\nContent-Length:" + origMsg.Length.ToString() + "\n";
pars = ASocketMessage.ParseProperties(origMsg);
}
else
{
if (DisposeInvalidMessage != null)
DisposeInvalidMessage(origMsg);
break;
}
}
if (pars.ContainsKey("Content-Length"))
{
if (_processingMessages.Count > 0)
Expand All @@ -423,39 +458,47 @@ private void _MessageProcessorStart()
}
if (pars["Content-Type"] == "text/event-plain")
{
SocketEvent se;
se = new SocketEvent(subMsg);
if (se["Content-Length"] != null)
if (subMsg == "")
{
if (_processingMessages.Count > 0)
{
se.Message = _processingMessages[0];
_processingMessages.RemoveAt(0);
}
else
{
_processingMessages.Insert(0, origMsg);
_processingMessages.Insert(1, subMsg);
break;
}
_processingMessages.Insert(0, origMsg);
break;
}
if (se.EventName == "BACKGROUND_JOB")
else
{
lock (_commandThreads)
SocketEvent se;
se = new SocketEvent(subMsg);
if (se["Content-Length"] != null)
{
if (_processingMessages.Count > 0)
{
se.Message = _processingMessages[0];
_processingMessages.RemoveAt(0);
}
else
{
_processingMessages.Insert(0, origMsg);
_processingMessages.Insert(1, subMsg);
break;
}
}
if (se.EventName == "BACKGROUND_JOB")
{
if (_commandThreads.ContainsKey(se["Job-UUID"]))
lock (_commandThreads)
{
lock (_awaitingCommandReturns)
if (_commandThreads.ContainsKey(se["Job-UUID"]))
{
_awaitingCommandReturns.Add(se["Job-UUID"], se.Message.Trim('\n'));
lock (_awaitingCommandReturns)
{
_awaitingCommandReturns.Add(se["Job-UUID"], se.Message.Trim('\n'));
}
ManualResetEvent mre = _commandThreads[se["Job-UUID"]];
_commandThreads.Remove(se["Job-UUID"]);
mre.Set();
}
ManualResetEvent mre = _commandThreads[se["Job-UUID"]];
_commandThreads.Remove(se["Job-UUID"]);
mre.Set();
}
}
msgs.Enqueue(se);
}
msgs.Enqueue(se);
}
else if (pars["Content-Type"] == "command/reply")
{
Expand Down
7 changes: 7 additions & 0 deletions Library/Inbound/InboundConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,11 @@ public void PlayAudioFileEndlessly(string filePath, bool eventLock)
ExecuteApplication("endless_playback", filePath, eventLock);
}

public string PlayAndGetDigits(int minDigits, int maxDigits, int tries, long timeout, string terminators, string file, string invalidFile, string regexp, int? digitTimeout)
{
return PlayAndGetDigits(minDigits, maxDigits, tries, timeout, terminators, file, invalidFile, null, regexp, digitTimeout);
}

public string PlayAndGetDigits(int minDigits, int maxDigits, int tries, long timeout, string terminators, string file, string invalidFile,string var, string regexp, int? digitTimeout)
{
if (var==null)
Expand Down Expand Up @@ -876,6 +881,8 @@ public string this[string name]
{
get
{
if (name == null)
return null;
string ret = null;
lock (_properties)
{
Expand Down
8 changes: 8 additions & 0 deletions Library/Inbound/InboundListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ public int Port
get { return _port; }
}

private ASocket.delDisposeInvalidMessage _disposeInvalidMessage;
public ASocket.delDisposeInvalidMessage DisposeInvalidMessage
{
get { return _disposeInvalidMessage; }
set { _disposeInvalidMessage = value; }
}

public delegate void delProcessConnection(InboundConnection conn);

private WrappedTcpListener _listener;
Expand Down Expand Up @@ -59,6 +66,7 @@ private void RecieveClient(IAsyncResult res)
if (clnt != null)
{
InboundConnection conn = new InboundConnection(clnt);
conn.DisposeInvalidMessage = _disposeInvalidMessage;
_connectionProcessor.Invoke(conn);
}
}
Expand Down
7 changes: 7 additions & 0 deletions Tester/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,21 @@ public static void Main(string[] args)
Console.WriteLine("Starting up listener socket to test PIN obtainment...");
InboundListener list = new InboundListener(IPAddress.Any, 8084,
new InboundListener.delProcessConnection(ProcessConnection));
list.DisposeInvalidMessage = new ASocket.delDisposeInvalidMessage(DisposeInvalidMessage);
OutboundSocket os = new OutboundSocket(IPAddress.Loopback, 8021, "ClueCon", new ASocket.delProcessEventMessage(ProcessEvent),
null, null, null);
os.DisposeInvalidMessage += new ASocket.delDisposeInvalidMessage(DisposeInvalidMessage);
os.RegisterEvent("all");
Console.WriteLine("Hit Enter to exit...");
Console.ReadLine();
os.Close();
}

public static void DisposeInvalidMessage(string message){
Console.WriteLine("Disposing invalid message...");
Console.WriteLine(message);
}

public static void ProcessEvent(SocketEvent evnt){
Console.WriteLine("Event recieved of type " + evnt.EventName);
}
Expand Down

0 comments on commit 2bdf53a

Please sign in to comment.