-
Notifications
You must be signed in to change notification settings - Fork 0
/
Connection.php
135 lines (110 loc) · 3.68 KB
/
Connection.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
<?php
/**
* RTCKit\ESL\Connection Class
*/
declare(strict_types = 1);
namespace RTCKit\ESL;
use RTCKit\ESL\Exception\ESLException;
/**
* ESL connection class
*/
class Connection implements ConnectionInterface
{
public int $role;
public string $buffer = '';
private MessageInterface $message;
/**
* Constructs a new connection
*
* @param int $role
*/
public function __construct(int $role)
{
$this->role = $role;
}
/**
* Consumes incoming ESL traffic (raw bytes)
*
* @param string $chunk
* @param list<MessageInterface> $messages
* @return int
*/
public function consume(string $chunk, ?array &$messages = []): int
{
/** @var int */
$status = self::READY;
/** @var list<MessageInterface> */
$messages = [];
if (!isset($chunk[0])) {
/* Nothing to consume! */
return $status;
}
if (!isset($this->buffer[0])) {
$this->buffer = $chunk;
} else {
$this->buffer .= $chunk;
}
do {
if (!isset($this->message)) {
/* We're waiting for new messages */
$blocks = explode(MessageInterface::MESSAGE_SEPARATOR, $this->buffer, 2);
if (count($blocks) === 1) {
/* We don't have a whole message just yet */
$status = self::WAIT_MESSAGE;
break;
}
if (($this->role === self::INBOUND_CLIENT) || ($this->role === self::OUTBOUND_SERVER)) {
$this->message = Response::parse($this->buffer);
} else {
$this->message = Request::parse($this->buffer);
}
$this->buffer = $blocks[1];
} else {
/* We are waiting for the remainder of the body of the current message */
}
$bodyLength = strlen($this->buffer);
$contentLength = $this->message->getHeader(AbstractHeader::CONTENT_LENGTH);
if (isset($contentLength)) {
$contentLength = (int)$contentLength;
if ($bodyLength < $contentLength) {
/* We need to wait for more body bytes */
$status = self::WAIT_BODY;
break;
} else if ($bodyLength > $contentLength) {
/* Move the remainder of bytes into the buffer and save the body */
$this->message->setBody(substr($this->buffer, 0, $contentLength));
$this->buffer = ltrim(substr($this->buffer, $contentLength));
} else {
/* We're all set! */
$this->message->setBody($this->buffer);
$this->buffer = '';
}
} else {
$this->message->setBody('');
}
$messages[] = $this->message;
unset($this->message);
$status = self::SUCCESS;
} while ($status === self::SUCCESS);
return (isset($messages[0])) ? self::SUCCESS : $status;
}
/**
* Initiates the sending of an outgoing ESL message
*
* @param MessageInterface $message
*/
public function emit(MessageInterface $message): void
{
$this->emitBytes($message->render());
}
/**
* Performs the actual sending of an ESL message.
* The library handling the I/O must properly implement this method.
*
* @param string $bytes
*/
protected function emitBytes(string $bytes): void
{
throw new ESLException(get_class($this) . '::emitBytes() not implemented');
}
}