diff --git a/protocol/logger.go b/protocol/logger.go
new file mode 100644
index 0000000..ddc7b1f
--- /dev/null
+++ b/protocol/logger.go
@@ -0,0 +1,27 @@
+/* Copyright (c) 2018 Gregor Riepl
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+
+package protocol
+
+import (
+ "github.com/onitake/restreamer/util"
+)
+
+const (
+ moduleProtocol = "protocol"
+)
+
+var logger util.Logger = util.NewGlobalModuleLogger(moduleProtocol, nil)
diff --git a/protocol/rtp.go b/protocol/rtp.go
new file mode 100644
index 0000000..28326f0
--- /dev/null
+++ b/protocol/rtp.go
@@ -0,0 +1,180 @@
+/* Copyright (c) 2019 Gregor Riepl
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+
+package protocol
+
+import (
+ "encoding/binary"
+ "errors"
+ "io"
+)
+
+const (
+ DefaultRtpPacketSize int = 1500
+ minHeaderSize int = 12
+)
+
+var (
+ ErrInvalidRtpPacketSize = errors.New("Invalid RTP packet size")
+ ErrInvalidRtpVersion = errors.New("Invalid RTP version")
+)
+
+type RtpPayloadType uint8
+const (
+ RtpPayloadTypePCMU RtpPayloadType = 0
+ RtpPayloadTypeGSM RtpPayloadType = 3
+ RtpPayloadTypeG723 RtpPayloadType = 4
+ RtpPayloadTypeDVI4 RtpPayloadType = 5
+ RtpPayloadTypeDVI4_2 RtpPayloadType = 6
+ RtpPayloadTypeLPC RtpPayloadType = 7
+ RtpPayloadTypePCMA RtpPayloadType = 8
+ RtpPayloadTypeG722 RtpPayloadType = 9
+ RtpPayloadTypeL16 RtpPayloadType = 10
+ RtpPayloadTypeL16_2 RtpPayloadType = 11
+ RtpPayloadTypeQCELP RtpPayloadType = 12
+ RtpPayloadTypeCN RtpPayloadType = 13
+ RtpPayloadTypeMPA RtpPayloadType = 14
+ RtpPayloadTypeG728 RtpPayloadType = 15
+ RtpPayloadTypeDVI4_3 RtpPayloadType = 16
+ RtpPayloadTypeDVI4_4 RtpPayloadType = 17
+ RtpPayloadTypeG729 RtpPayloadType = 18
+ RtpPayloadTypeCelB RtpPayloadType = 25
+ RtpPayloadTypeJPEG RtpPayloadType = 26
+ RtpPayloadTypeNV RtpPayloadType = 28
+ RtpPayloadTypeH261 RtpPayloadType = 31
+ RtpPayloadTypeMPV RtpPayloadType = 32
+ RtpPayloadTypeMP2T RtpPayloadType = 33
+ RtpPayloadTypeH263 RtpPayloadType = 34
+)
+
+// RtpPacket represents a decoded RTP packet.
+// The header fields are dissected, while the payload is contained as a byte slice.
+type RtpPacket struct {
+ // Version contains the RTP protocol version.
+ // Only version 2 is supported.
+ Version uint8
+ // Padding is true if the packet hat the padding flag set.
+ // The extension header and/or payload is always truncated to the actual size.
+ Padding bool
+ // Marker is true if the packet hat the marker bit set.
+ Marker bool
+ // PayloadType describes the kind of data contained in the packet.
+ // See the RtpPayloadType constants for known values. Not that some ranges
+ // are dynamically assigned and must be defined by the application.
+ PayloadType RtpPayloadType
+ // SequenceNumber is a 16-bit sequence number that helps with correct ordering
+ // wen reassembling the stream.
+ SequenceNumber uint16
+ // Timestamp is an application-defined absolute timestamp that should be
+ // used as the basis for ES timestamps. It can also be ignored if the
+ // packaged protocol has sufficiently well-defined timestamps.
+ Timestamp uint32
+ // Ssrc contains the value of the SSRC field.
+ Ssrc uint32
+ // Csrc contains the values of the CSRC fields.
+ // If the packet had no CSRCs, it is an empty list (nil).
+ Csrc []uint32
+ // Extension contains the full extension header, including the length and ... fields.
+ Extension []byte
+ // Payload contains the actual data part of the packet.
+ // It is truncated to the packet size, excluding padding (if it was enabled)
+ Payload []byte
+}
+
+// RtpReader is a packet reader on top of an underlying standard reader.
+// It has a configurable maximum packet size.
+type RtpReader struct {
+ // Reader is the underlying I/O facility
+ Reader io.Reader
+ // PacketSize is the maximum packet size that can be read.
+ // If zero, a default packet size of 1500 octets will be used.
+ PacketSize int
+}
+
+// ReadRtpPacket reads and returns one RTP packet from the underlying reader.
+//
+// If the packet was larger than the maximum packet size, excess data will be dropped.
+// An incomplete packet, together with a non-nil error will be returned in this case.
+//
+// If the buffer was to small to read even the RTP header, a nil packet along with
+// ErrInvalidRtpPacket will be returned.
+//
+// If the prtocol version is not equal to 2, ErrInvalidRtpVersion is returned.
+func (r *RtpReader) ReadRtpPacket() (*RtpPacket, error) {
+ p := &RtpPacket{}
+
+ psize := r.PacketSize
+ if psize == 0 {
+ psize = DefaultRtpPacketSize
+ }
+ data := make([]byte, psize)
+ n, err := r.Reader.Read(data)
+ if n == 0 && err != nil {
+ return nil, err
+ }
+ if n < minHeaderSize {
+ return nil, ErrInvalidRtpPacketSize
+ }
+
+ p.Version = (data[0] & 0xc0) >> 6
+ if p.Version != 2 {
+ logger.Logkv(
+ "event", "error",
+ "error", "rtp_version",
+ "version", p.Version,
+ "message", "Invalid RTP version",
+ )
+ return nil, ErrInvalidRtpVersion
+ }
+ p.Padding = data[0] & 0x20 != 0
+ p.Marker = data[1] & 0x80 != 0
+
+ extension := data[0] & 0x10 != 0
+ csrcc := int(data[0] & 0x0f)
+ xhlen := 0
+ if extension {
+ xhlen = 4
+ }
+ if n < minHeaderSize + 4 * csrcc + xhlen {
+ return nil, ErrInvalidRtpPacketSize
+ }
+
+ p.PayloadType = RtpPayloadType(data[1])
+ p.SequenceNumber = binary.BigEndian.Uint16(data[2:4])
+ p.Timestamp = binary.BigEndian.Uint32(data[4:8])
+ p.Ssrc = binary.BigEndian.Uint32(data[8:12])
+ p.Csrc = make([]uint32, csrcc)
+ offset := minHeaderSize
+
+ for i := 0; i < csrcc; i++ {
+ p.Csrc[i] = binary.BigEndian.Uint32(data[offset:(offset+4)])
+ offset += 4
+ }
+
+ if extension {
+ xlen := int(binary.BigEndian.Uint16(data[(offset+2):(offset+4)]))
+ if n < offset + 4 + xlen {
+ return nil, ErrInvalidRtpPacketSize
+ }
+ p.Extension = data[offset:(offset+4+xlen)]
+ offset += 4 + xlen
+ }
+
+ // TODO truncate padding
+ p.Payload = data[offset:n]
+
+ return p, nil
+}
diff --git a/protocol/rtp_test.go b/protocol/rtp_test.go
new file mode 100644
index 0000000..549136b
--- /dev/null
+++ b/protocol/rtp_test.go
@@ -0,0 +1,376 @@
+/* Copyright (c) 2019 Gregor Riepl
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+
+package protocol
+
+import (
+ "bytes"
+ //"io"
+ "testing"
+)
+
+func listEqual(a, b []uint32) bool {
+ if len(a) != len(b) {
+ return false
+ }
+ for i := range a {
+ if a[i] != b[i] {
+ return false
+ }
+ }
+ return true
+}
+
+func comparePackets(t *testing.T, x, g *RtpPacket) {
+ if x == nil && g != nil {
+ t.Error("Expected a nil packet, got non-nil")
+ }
+ if x != nil && g == nil {
+ t.Error("Expected a non-nil packet, got nil")
+ }
+ if x.Version != g.Version {
+ t.Errorf("Got incorrect version: %d Expected: %d", g.Version, x.Version)
+ }
+ if x.Padding != g.Padding {
+ t.Errorf("Got incorrect padding flag: %v Expected: %v", g.Padding, x.Padding)
+ }
+ if x.Marker != g.Marker {
+ t.Errorf("Got incorrect marker flag: %v Expected: %v", g.Marker, x.Marker)
+ }
+ if x.PayloadType != g.PayloadType {
+ t.Errorf("Got incorrect payload type: %v Expected: %v", g.PayloadType, x.PayloadType)
+ }
+ if x.SequenceNumber != g.SequenceNumber {
+ t.Errorf("Got incorrect sequence number: %v Expected: %v", g.SequenceNumber, x.SequenceNumber)
+ }
+ if x.Timestamp != g.Timestamp {
+ t.Errorf("Got incorrect timestamp: %v Expected: %v", g.Timestamp, x.Timestamp)
+ }
+ if x.Ssrc != g.Ssrc {
+ t.Errorf("Got incorrect SSRC: %v Expected: %v", g.Ssrc, x.Ssrc)
+ }
+ if !listEqual(x.Csrc, g.Csrc) {
+ t.Errorf("CSRC doesn't match: %v Expected: %v", g.Csrc, x.Csrc)
+ }
+ if !bytes.Equal(x.Extension, g.Extension) {
+ t.Errorf("Extension doesn't match: %v Expected: %v", g.Extension, x.Extension)
+ }
+ if !bytes.Equal(x.Payload, g.Payload) {
+ t.Errorf("Payload doesn't match: %v Expected: %v", g.Payload, x.Payload)
+ }
+}
+
+func TestRtpEmptyPacket(t *testing.T) {
+ d := []byte{}
+ b := bytes.NewBuffer(d)
+ r := RtpReader{
+ Reader: b,
+ }
+ s, err := r.ReadRtpPacket()
+ if s != nil || err == nil {
+ t.Fatalf("Expected non-nil error and nil packet")
+ }
+}
+
+func TestRtpIncompleteHeader(t *testing.T) {
+ d := []byte{0x02, 0x21, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
+ for i := 1; i < 11; i++ {
+ b := bytes.NewBuffer(d[0:i])
+ r := RtpReader{
+ Reader: b,
+ }
+ s, err := r.ReadRtpPacket()
+ if s != nil || err != ErrInvalidRtpPacketSize {
+ t.Fatalf("Expected non-nil error and nil packet: %v", err)
+ }
+ }
+}
+
+func TestRtpInvalidVersion(t *testing.T) {
+ d := []byte{0x01, 0x21, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
+ b := bytes.NewBuffer(d)
+ r := RtpReader{
+ Reader: b,
+ }
+ s, err := r.ReadRtpPacket()
+ if s != nil || err != ErrInvalidRtpVersion {
+ t.Fatalf("Expected non-nil error and nil packet: %v", err)
+ }
+}
+
+func TestRtpMissingCsrc(t *testing.T) {
+ d := []byte{0x12, 0x21, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
+ b := bytes.NewBuffer(d)
+ r := RtpReader{
+ Reader: b,
+ }
+ s, err := r.ReadRtpPacket()
+ if s != nil || err != ErrInvalidRtpPacketSize {
+ t.Fatalf("Expected non-nil error and nil packet: %v", err)
+ }
+}
+
+func TestRtpMissingExtension(t *testing.T) {
+ d := []byte{0x0e, 0x21, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
+ b := bytes.NewBuffer(d)
+ r := RtpReader{
+ Reader: b,
+ }
+ s, err := r.ReadRtpPacket()
+ if s != nil || err != ErrInvalidRtpPacketSize {
+ t.Fatalf("Expected non-nil error and nil packet: %v", err)
+ }
+}
+
+func TestRtpIncompleteExtension(t *testing.T) {
+ d := []byte{0x0e, 0x21, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x22, 0x00, 0x04}
+ b := bytes.NewBuffer(d)
+ r := RtpReader{
+ Reader: b,
+ }
+ s, err := r.ReadRtpPacket()
+ if s != nil || err != ErrInvalidRtpPacketSize {
+ t.Fatalf("Expected non-nil error and nil packet: %v", err)
+ }
+}
+
+func TestRtpHeaderOnly(t *testing.T) {
+ d := []byte{0x02, 0x21, 0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef, 0xaa, 0x55}
+ b := bytes.NewBuffer(d)
+ r := RtpReader{
+ Reader: b,
+ }
+ s, err := r.ReadRtpPacket()
+ if err != nil {
+ t.Fatalf("Got an error instead of a packet: %v", err)
+ }
+ x := &RtpPacket{
+ Version: 2,
+ Padding: false,
+ Marker: false,
+ PayloadType: RtpPayloadTypeMP2T,
+ SequenceNumber: 0x0123,
+ Timestamp: 0x456789ab,
+ Ssrc: 0xcdefaa55,
+ Csrc: nil,
+ Extension: nil,
+ Payload: nil,
+ }
+ comparePackets(t, x, s)
+}
+
+func TestRtpHeaderCsrc(t *testing.T) {
+ d := []byte{0x26, 0x21, 0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef, 0xaa, 0x55, 0x10, 0x0e, 0x20, 0x0d, 0x40, 0x0b, 0x80, 0x07}
+ b := bytes.NewBuffer(d)
+ r := RtpReader{
+ Reader: b,
+ }
+ s, err := r.ReadRtpPacket()
+ if err != nil {
+ t.Fatalf("Got an error instead of a packet: %v", err)
+ }
+ x := &RtpPacket{
+ Version: 2,
+ Padding: true,
+ Marker: false,
+ PayloadType: RtpPayloadTypeMP2T,
+ SequenceNumber: 0x0123,
+ Timestamp: 0x456789ab,
+ Ssrc: 0xcdefaa55,
+ Csrc: []uint32{
+ 0x100e200d,
+ 0x400b8007,
+ },
+ Extension: nil,
+ Payload: nil,
+ }
+ comparePackets(t, x, s)
+}
+
+func TestRtpHeaderExtension(t *testing.T) {
+ d := []byte{0x0e, 0x21, 0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef, 0xaa, 0x55, 0x11, 0xff, 0x00, 0x08, 0x15, 0xae, 0x25, 0xad, 0x45, 0xab, 0x85, 0xa7}
+ b := bytes.NewBuffer(d)
+ r := RtpReader{
+ Reader: b,
+ }
+ s, err := r.ReadRtpPacket()
+ if err != nil {
+ t.Fatalf("Got an error instead of a packet: %v", err)
+ }
+ x := &RtpPacket{
+ Version: 2,
+ Padding: true,
+ Marker: false,
+ PayloadType: RtpPayloadTypeMP2T,
+ SequenceNumber: 0x0123,
+ Timestamp: 0x456789ab,
+ Ssrc: 0xcdefaa55,
+ Csrc: nil,
+ Extension: []byte{
+ 0x11, 0xff, 0x00, 0x08, 0x15, 0xae, 0x25, 0xad, 0x45, 0xab, 0x85, 0xa7,
+ },
+ Payload: nil,
+ }
+ comparePackets(t, x, s)
+}
+
+func TestRtpHeaderCsrcExtension(t *testing.T) {
+ d := []byte{0x2e, 0x21, 0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef, 0xaa, 0x55, 0x10, 0x0e, 0x20, 0x0d, 0x40, 0x0b, 0x80, 0x07, 0x11, 0xff, 0x00, 0x08, 0x15, 0xae, 0x25, 0xad, 0x45, 0xab, 0x85, 0xa7}
+ b := bytes.NewBuffer(d)
+ r := RtpReader{
+ Reader: b,
+ }
+ s, err := r.ReadRtpPacket()
+ if err != nil {
+ t.Fatalf("Got an error instead of a packet: %v", err)
+ }
+ x := &RtpPacket{
+ Version: 2,
+ Padding: true,
+ Marker: false,
+ PayloadType: RtpPayloadTypeMP2T,
+ SequenceNumber: 0x0123,
+ Timestamp: 0x456789ab,
+ Ssrc: 0xcdefaa55,
+ Csrc: []uint32{
+ 0x100e200d,
+ 0x400b8007,
+ },
+ Extension: []byte{
+ 0x11, 0xff, 0x00, 0x08, 0x15, 0xae, 0x25, 0xad, 0x45, 0xab, 0x85, 0xa7,
+ },
+ Payload: nil,
+ }
+ comparePackets(t, x, s)
+}
+
+func TestRtpPayload(t *testing.T) {
+ d := []byte{0x02, 0x21, 0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef, 0xaa, 0x55, 0x01, 0x02, 0x03, 0x04, 0x50, 0x60, 0x70}
+ b := bytes.NewBuffer(d)
+ r := RtpReader{
+ Reader: b,
+ }
+ s, err := r.ReadRtpPacket()
+ if err != nil {
+ t.Fatalf("Got an error instead of a packet: %v", err)
+ }
+ x := &RtpPacket{
+ Version: 2,
+ Padding: false,
+ Marker: false,
+ PayloadType: RtpPayloadTypeMP2T,
+ SequenceNumber: 0x0123,
+ Timestamp: 0x456789ab,
+ Ssrc: 0xcdefaa55,
+ Csrc: nil,
+ Extension: nil,
+ Payload: []byte{
+ 0x01, 0x02, 0x03, 0x04, 0x50, 0x60, 0x70,
+ },
+ }
+ comparePackets(t, x, s)
+}
+
+func TestRtpCsrcPayload(t *testing.T) {
+ d := []byte{0x26, 0x21, 0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef, 0xaa, 0x55, 0x10, 0x0e, 0x20, 0x0d, 0x40, 0x0b, 0x80, 0x07, 0x01, 0x02, 0x03, 0x04, 0x50, 0x60, 0x70}
+ b := bytes.NewBuffer(d)
+ r := RtpReader{
+ Reader: b,
+ }
+ s, err := r.ReadRtpPacket()
+ if err != nil {
+ t.Fatalf("Got an error instead of a packet: %v", err)
+ }
+ x := &RtpPacket{
+ Version: 2,
+ Padding: true,
+ Marker: false,
+ PayloadType: RtpPayloadTypeMP2T,
+ SequenceNumber: 0x0123,
+ Timestamp: 0x456789ab,
+ Ssrc: 0xcdefaa55,
+ Csrc: []uint32{
+ 0x100e200d,
+ 0x400b8007,
+ },
+ Extension: nil,
+ Payload: []byte{
+ 0x01, 0x02, 0x03, 0x04, 0x50, 0x60, 0x70,
+ },
+ }
+ comparePackets(t, x, s)
+}
+
+func TestRtpExtensionPayload(t *testing.T) {
+ d := []byte{0x0e, 0x21, 0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef, 0xaa, 0x55, 0x11, 0xff, 0x00, 0x08, 0x15, 0xae, 0x25, 0xad, 0x45, 0xab, 0x85, 0xa7, 0x01, 0x02, 0x03, 0x04, 0x50, 0x60, 0x70}
+ b := bytes.NewBuffer(d)
+ r := RtpReader{
+ Reader: b,
+ }
+ s, err := r.ReadRtpPacket()
+ if err != nil {
+ t.Fatalf("Got an error instead of a packet: %v", err)
+ }
+ x := &RtpPacket{
+ Version: 2,
+ Padding: true,
+ Marker: false,
+ PayloadType: RtpPayloadTypeMP2T,
+ SequenceNumber: 0x0123,
+ Timestamp: 0x456789ab,
+ Ssrc: 0xcdefaa55,
+ Csrc: nil,
+ Extension: []byte{
+ 0x11, 0xff, 0x00, 0x08, 0x15, 0xae, 0x25, 0xad, 0x45, 0xab, 0x85, 0xa7,
+ },
+ Payload: []byte{
+ 0x01, 0x02, 0x03, 0x04, 0x50, 0x60, 0x70,
+ },
+ }
+ comparePackets(t, x, s)
+}
+
+func TestRtpCsrcExtensionPayload(t *testing.T) {
+ d := []byte{0x2e, 0x21, 0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef, 0xaa, 0x55, 0x10, 0x0e, 0x20, 0x0d, 0x40, 0x0b, 0x80, 0x07, 0x11, 0xff, 0x00, 0x08, 0x15, 0xae, 0x25, 0xad, 0x45, 0xab, 0x85, 0xa7, 0x01, 0x02, 0x03, 0x04, 0x50, 0x60, 0x70}
+ b := bytes.NewBuffer(d)
+ r := RtpReader{
+ Reader: b,
+ }
+ s, err := r.ReadRtpPacket()
+ if err != nil {
+ t.Fatalf("Got an error instead of a packet: %v", err)
+ }
+ x := &RtpPacket{
+ Version: 2,
+ Padding: true,
+ Marker: false,
+ PayloadType: RtpPayloadTypeMP2T,
+ SequenceNumber: 0x0123,
+ Timestamp: 0x456789ab,
+ Ssrc: 0xcdefaa55,
+ Csrc: []uint32{
+ 0x100e200d,
+ 0x400b8007,
+ },
+ Extension: []byte{
+ 0x11, 0xff, 0x00, 0x08, 0x15, 0xae, 0x25, 0xad, 0x45, 0xab, 0x85, 0xa7,
+ },
+ Payload: []byte{
+ 0x01, 0x02, 0x03, 0x04, 0x50, 0x60, 0x70,
+ },
+ }
+ comparePackets(t, x, s)
+}
diff --git a/protocol/rtpbridge.go b/protocol/rtpbridge.go
new file mode 100644
index 0000000..eb97423
--- /dev/null
+++ b/protocol/rtpbridge.go
@@ -0,0 +1,162 @@
+/* Copyright (c) 2019 Gregor Riepl
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+
+package protocol
+
+import (
+ "bytes"
+ "errors"
+ "io"
+ "github.com/onitake/restreamer/util"
+ "fmt"
+)
+
+var (
+ ErrRtpBridgeInvalidPacketType = errors.New("Invalid packet type")
+ ErrRtpBridgePacketTooSmall = errors.New("Payload too small")
+ ErrRtpBridgeWaitingForMore = errors.New("Waiting for more data")
+)
+
+type RtpBridge struct {
+ reader *RtpReader
+ seqnum int
+ queue *util.SequenceQueue
+ buffer *bytes.Buffer
+}
+
+func NewRtpBridge(reader io.Reader, psize int, lookahead int) *RtpBridge {
+ return &RtpBridge{
+ reader: &RtpReader{
+ Reader: reader,
+ PacketSize: psize,
+ },
+ seqnum: -1,
+ queue: util.NewSequenceQueue(lookahead),
+ // this allocates a bit too much (we're only interested in the payload),
+ // but will work no matter what size the header has
+ buffer: bytes.NewBuffer(make([]byte, 0, psize)),
+ }
+}
+
+func (b *RtpBridge) packetIntoQueue() error {
+ p, err := b.reader.ReadRtpPacket()
+ if err != nil {
+ return err
+ }
+ if p.PayloadType != RtpPayloadTypeMP2T {
+ // we don't accept anything besides MPEG-2 TS packets
+ return ErrRtpBridgeInvalidPacketType
+ }
+ if len(p.Payload) < MpegTsPacketSize {
+ // RTP packets need to contain at least one whole MP2TS packet
+ return ErrRtpBridgeInvalidPacketType
+ }
+ seq := int(p.SequenceNumber)
+ // check if this is the first packet at start or after a long drought
+ var pos int
+ if b.queue.Length() == 0 {
+ pos = 0
+ b.seqnum = seq
+ } else {
+ // calculate the queue position relative to the last fetched sequence number
+ pos = util.AbsSub(b.seqnum, seq)
+ // check if we're too far ahead from the head of the queue
+ newseq := pos - (b.queue.Length() - 1)
+ if newseq > 0 {
+ // to make room for a packet this far ahead, we need to get rid of some previous slots first
+ for i := 0; i < newseq; i++ {
+ // TODO instead of dropping all of them, we could pop all non-empty slots and enqueue the data.
+ // but this would require another queue for the reassembled output...
+ // or moving from a pull to push design.
+ b.queue.Pop()
+ }
+ logger.Logkv(
+ "event", "error",
+ "error", "rtp_drop",
+ "packets", newseq,
+ "message", fmt.Sprintf("Packets lost: %d", newseq),
+ )
+
+ // update pos and seqnum with the new base index
+ pos -= newseq
+ b.seqnum += newseq
+ }
+ }
+ old, err := b.queue.Insert(pos, p)
+ if err != nil {
+ // should never happen, maybe just panic here?
+ return err
+ }
+ if old != nil {
+ logger.Logkv(
+ "event", "error",
+ "error", "rtp_dup",
+ "message", "Packet with duplicate sequence number overwritten",
+ )
+ }
+ return nil
+}
+
+func (b *RtpBridge) nextPacket() (*RtpPacket, error) {
+ p, err := b.queue.Peek()
+ if err != nil {
+ return nil, err
+ }
+ if p != nil {
+ // we already fetched the next packet, just update the head now
+ b.queue.Pop()
+ // this would panic on type mismatch, but the queue only contains RtpPackets and nils are already excluded
+ rtp := p.(*RtpPacket)
+ return rtp, nil
+ }
+ // still waiting for the next packet in line
+ return nil, nil
+}
+
+func (b *RtpBridge) Read(p []byte) (n int, err error) {
+ // TODO MPEG2-TS packets may also be 204 bytes long, which includes a 32-bit checksum.
+ // This is currently unsupported.
+
+ // the payload may only contain whole MP2TS packets - we don't do partial reassembly
+ if b.buffer.Len() < MpegTsPacketSize {
+ // partial or empty - clear the buffer
+ b.buffer.Reset()
+ // try to read the next packet
+ err = b.packetIntoQueue()
+ if err != nil {
+ return 0, err
+ }
+ // then pop the next packet from the queue (with possible reordering)
+ p, err := b.nextPacket()
+ if err != nil {
+ return 0, err
+ }
+ if p == nil {
+ // no packet available yet, we need to wait for more segments
+ return 0, ErrRtpBridgeWaitingForMore
+ }
+ b.buffer.Write(p.Payload)
+ }
+ return b.buffer.Read(p)
+}
+
+func (b *RtpBridge) Close() error {
+ if c, ok := b.reader.Reader.(io.Closer); ok {
+ // close the underlying reader if it supports Close()
+ return c.Close()
+ }
+ return nil
+}
diff --git a/streaming/client.go b/streaming/client.go
index bd73e7b..e68fdf5 100644
--- a/streaming/client.go
+++ b/streaming/client.go
@@ -31,6 +31,10 @@ import (
"time"
)
+const (
+ defaultPacketQueueSize int = 16
+)
+
var (
// ErrInvalidProtocol is thrown when an invalid protocol was specified.
// See the docs and example config for a list of supported protocols.
@@ -152,6 +156,8 @@ type Client struct {
readBufferSize int
// packetSize defines the size of individual datagram packets (UDP)
packetSize int
+ // packetQueueSize defines the number of RTP packets to buffer for sequence reordering
+ packetQueueSize int
}
// NewClient constructs a new streaming HTTP client, without connecting the socket yet.
@@ -237,6 +243,7 @@ func NewClient(name string, uris []string, streamer *Streamer, timeout uint, rec
interf: pintf,
readBufferSize: int(bufferSize * protocol.MpegTsPacketSize),
packetSize: int(packetSize),
+ packetQueueSize: defaultPacketQueueSize,
}
return &client, nil
}
@@ -463,6 +470,37 @@ func (client *Client) start(url *url.URL) error {
}
conn.SetReadBuffer(client.readBufferSize)
client.input = protocol.NewFixedReader(conn, client.packetSize)
+ case "rtp":
+ addr, err := net.ResolveUDPAddr("udp", url.Host)
+ if err != nil {
+ return err
+ }
+ var conn *net.UDPConn
+ if addr.IP.IsMulticast() {
+ logger.Logkv(
+ "event", eventClientOpenUdpMulticast,
+ "address", addr,
+ "message", fmt.Sprintf("Joining UDP multicast group %s on interface %v.", url.Host, client.interf),
+ )
+ var err error
+ conn, err = net.ListenMulticastUDP("udp", client.interf, addr)
+ if err != nil {
+ return err
+ }
+ } else {
+ logger.Logkv(
+ "event", eventClientOpenUdp,
+ "address", addr,
+ "message", fmt.Sprintf("Connecting to UDP address %s.", addr),
+ )
+ var err error
+ conn, err = net.ListenUDP("udp", addr)
+ if err != nil {
+ return err
+ }
+ }
+ conn.SetReadBuffer(client.readBufferSize)
+ client.input = protocol.NewRtpBridge(conn, client.packetSize, client.packetQueueSize)
default:
return ErrInvalidProtocol
}
diff --git a/util/abs.go b/util/abs.go
new file mode 100644
index 0000000..762f4ef
--- /dev/null
+++ b/util/abs.go
@@ -0,0 +1,44 @@
+/* Copyright (c) 2019 Gregor Riepl
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+
+package util
+
+// AbsSubInt32 calculates absolute difference between a and b, or: | a - b |
+// Arguments and return type are int32.
+func AbsSubInt32(a, b int32) int32 {
+ if a > b {
+ return a - b
+ }
+ return b - a
+}
+
+// AbsSubInt64 calculates absolute difference between a and b, or: | a - b |
+// Arguments and return type are int64.
+func AbsSubInt64(a, b int64) int64 {
+ if a > b {
+ return a - b
+ }
+ return b - a
+}
+
+// AbsSub calculates absolute difference between a and b, or: | a - b |
+// Arguments and return type are int.
+func AbsSub(a, b int) int {
+ if a > b {
+ return a - b
+ }
+ return b - a
+}
diff --git a/util/abs_test.go b/util/abs_test.go
new file mode 100644
index 0000000..8e9fad8
--- /dev/null
+++ b/util/abs_test.go
@@ -0,0 +1,94 @@
+/* Copyright (c) 2019 Gregor Riepl
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+
+package util
+
+import (
+ "testing"
+)
+
+const intSize = 4 + (^uint(0)>>63) * 4
+
+func TestAbsSubInt32(t *testing.T) {
+ va := []int32{ -0x7ffffffe, 0x7ffffffe, -0x7fffffff, 0x7fffffff, -2, 2, -1, -1, 0, 0 }
+ vb := []int32{ -0x7ffffffe, 0x7ffffffe, -0x7fffffff, 0x7fffffff, -2, 2, -1, -1, 0, 0 }
+ for ia := 0; ia < len(va); ia++ {
+ for ib := 0; ib < len(vb); ib++ {
+ a := va[ia]
+ b := vb[ib]
+ r := AbsSubInt32(a, b)
+ var x int32
+ if a > b {
+ x = a - b
+ } else {
+ x = b - a
+ }
+ if r != x {
+ t.Errorf("AbsSub(%d, %d) should be %d, got %d", a, b, x, r)
+ }
+ }
+ }
+}
+
+func TestAbsSubInt64(t *testing.T) {
+ va := []int64{ -0x7ffffffffffffffe, 0x7ffffffffffffffe, -0x7fffffffffffffff, 0x7fffffffffffffff, -0x7ffffffe, 0x7ffffffe, -0x7fffffff, 0x7fffffff, -2, 2, -1, -1, 0, 0 }
+ vb := []int64{ -0x7ffffffffffffffe, 0x7ffffffffffffffe, -0x7fffffffffffffff, 0x7fffffffffffffff, -0x7ffffffe, 0x7ffffffe, -0x7fffffff, 0x7fffffff, -2, 2, -1, -1, 0, 0 }
+ for ia := 0; ia < len(va); ia++ {
+ for ib := 0; ib < len(vb); ib++ {
+ a := va[ia]
+ b := vb[ib]
+ r := AbsSubInt64(a, b)
+ var x int64
+ if a > b {
+ x = a - b
+ } else {
+ x = b - a
+ }
+ if r != x {
+ t.Errorf("AbsSub(%d, %d) should be %d, got %d", a, b, x, r)
+ }
+ }
+ }
+}
+
+func TestAbsSub(t *testing.T) {
+ v64 := []int64{ -0x7ffffffffffffffe, 0x7ffffffffffffffe, -0x7fffffffffffffff, 0x7fffffffffffffff }
+ va := []int{ -0x7ffffffe, 0x7ffffffe, -0x7fffffff, 0x7fffffff, -2, 2, -1, -1, 0, 0 }
+ vb := []int{ -0x7ffffffe, 0x7ffffffe, -0x7fffffff, 0x7fffffff, -2, 2, -1, -1, 0, 0 }
+ if intSize == 8 {
+ for _, a64 := range v64 {
+ va = append(va, int(a64))
+ vb = append(vb, int(a64))
+ }
+ }
+ for ia := 0; ia < len(va); ia++ {
+ for ib := 0; ib < len(vb); ib++ {
+ a := va[ia]
+ b := vb[ib]
+ t.Logf("Testing |%d - %d|", a, b)
+ r := AbsSub(a, b)
+ var x int
+ if a > b {
+ x = a - b
+ } else {
+ x = b - a
+ }
+ if r != x {
+ t.Errorf("AbsSub(%d, %d) should be %d, got %d", a, b, x, r)
+ }
+ }
+ }
+}
diff --git a/util/sequencequeue.go b/util/sequencequeue.go
new file mode 100644
index 0000000..6e82b95
--- /dev/null
+++ b/util/sequencequeue.go
@@ -0,0 +1,77 @@
+/* Copyright (c) 2019 Gregor Riepl
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+
+package util
+
+import (
+ "errors"
+)
+
+var (
+ ErrSequenceQueueEmpty = errors.New("Queue is empty")
+ ErrSequenceQueueOutOfBounds = errors.New("Insert index out of bounds")
+)
+
+type SequenceQueue struct {
+ queue []interface{}
+ head int
+ tail int
+ length int
+}
+
+func NewSequenceQueue(bound int) *SequenceQueue {
+ return &SequenceQueue{
+ queue: make([]interface{}, bound),
+ }
+}
+
+func (f *SequenceQueue) Length() int {
+ return f.length
+}
+
+func (f *SequenceQueue) Insert(position int, value interface{}) (old interface{}, err error) {
+ if position < 0 || position >= len(f.queue) {
+ return nil, ErrSequenceQueueOutOfBounds
+ }
+ index := (f.head + position) % len(f.queue)
+ if position >= f.length {
+ f.queue[index] = value
+ f.tail = (index + 1) % len(f.queue)
+ f.length = position + 1
+ } else {
+ old = f.queue[index]
+ f.queue[index] = value
+ }
+ return old, nil
+}
+
+func (f *SequenceQueue) Peek() (interface{}, error) {
+ if f.length == 0 {
+ return nil, ErrSequenceQueueEmpty
+ }
+ return f.queue[f.head], nil
+}
+
+func (f *SequenceQueue) Pop() (interface{}, error) {
+ if f.length == 0 {
+ return nil, ErrSequenceQueueEmpty
+ }
+ ret := f.queue[f.head]
+ f.queue[f.head] = nil
+ f.head = (f.head + 1) % len(f.queue)
+ f.length--
+ return ret, nil
+}
diff --git a/util/sequencequeue_test.go b/util/sequencequeue_test.go
new file mode 100644
index 0000000..a3d2d20
--- /dev/null
+++ b/util/sequencequeue_test.go
@@ -0,0 +1,166 @@
+/* Copyright (c) 2019 Gregor Riepl
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+
+package util
+
+import (
+ "testing"
+)
+
+func TestSequenceQueuePush(t *testing.T) {
+ q := NewSequenceQueue(1)
+ o, e := q.Insert(0, "A")
+ if e != nil {
+ t.Fatalf("Insert returned error: %v", e)
+ }
+ if o != nil {
+ t.Errorf("Old value was not nil: %v", o)
+ }
+ l := q.Length()
+ if l != 1 {
+ t.Errorf("Length after insert is %d instead of %d", l, 1)
+ }
+ if l != q.Length() {
+ t.Errorf("Second length call mismatch")
+ }
+}
+
+func TestSequenceQueuePushOob(t *testing.T) {
+ q := NewSequenceQueue(1)
+ o, e := q.Insert(1, "A")
+ if e != ErrSequenceQueueOutOfBounds {
+ t.Fatalf("Didn't get out of bounds error but: %v", e)
+ }
+ if o != nil {
+ t.Errorf("Old value was not nil: %v", o)
+ }
+ o, e = q.Insert(-1, "B")
+ if e != ErrSequenceQueueOutOfBounds {
+ t.Fatalf("Didn't get out of bounds error but: %v", e)
+ }
+ if o != nil {
+ t.Errorf("Old value was not nil: %v", o)
+ }
+ l := q.Length()
+ if l != 0 {
+ t.Errorf("Length after insert is %d instead of %d", l, 0)
+ }
+}
+
+func TestSequenceQueuePushPop(t *testing.T) {
+ q := NewSequenceQueue(1)
+ o, e := q.Insert(0, "A")
+ if e != nil {
+ t.Logf("head=%d tail=%d length=%d", q.head, q.tail, q.length)
+ t.Fatalf("Insert returned error: %v", e)
+ }
+ if o != nil {
+ t.Errorf("Old value was not nil: %v", o)
+ }
+ r, e := q.Pop()
+ if e != nil {
+ t.Fatalf("Got error: %v", e)
+ }
+ if r != "A" {
+ t.Errorf("Got value: %v", r)
+ t.Logf("head=%d tail=%d length=%d", q.head, q.tail, q.length)
+ }
+}
+
+func TestSequenceQueuePushOccupied(t *testing.T) {
+ q := NewSequenceQueue(1)
+ o, e := q.Insert(0, "A")
+ if e != nil {
+ t.Logf("head=%d tail=%d length=%d", q.head, q.tail, q.length)
+ t.Fatalf("Insert returned error: %v", e)
+ }
+ if o != nil {
+ t.Errorf("Old value was not nil: %v", o)
+ }
+ o, e = q.Insert(0, "A")
+ if o == nil {
+ t.Errorf("Old value was nil")
+ }
+}
+
+func TestSequenceQueuePushMutiple(t *testing.T) {
+ q := NewSequenceQueue(10)
+ for i := 0; i < 10; i++ {
+ o, e := q.Insert(i, i)
+ if e != nil {
+ t.Logf("head=%d tail=%d length=%d", q.head, q.tail, q.length)
+ t.Fatalf("Insert returned error: %v", e)
+ }
+ if o != nil {
+ t.Errorf("Old value was not nil: %v", o)
+ }
+ }
+ for i := 0; i < 10; i++ {
+ r, e := q.Pop()
+ if e != nil {
+ t.Fatalf("Got error: %v", e)
+ }
+ if r != i {
+ t.Errorf("Got value: %v expected: %v", r, i)
+ t.Logf("head=%d tail=%d length=%d", q.head, q.tail, q.length)
+ }
+ }
+ r, e := q.Pop()
+ if e != ErrSequenceQueueEmpty || r != nil {
+ t.Fatalf("Expected empty queue, got: %v value=%v", e, r)
+ }
+}
+
+func TestSequenceQueuePushReverse(t *testing.T) {
+ q := NewSequenceQueue(10)
+ for i := 9; i >= 0; i-- {
+ o, e := q.Insert(i, i)
+ if e != nil {
+ t.Logf("head=%d tail=%d length=%d", q.head, q.tail, q.length)
+ t.Fatalf("Insert returned error: %v", e)
+ }
+ if o != nil {
+ t.Errorf("Old value was not nil: %v", o)
+ }
+ if i != 0 {
+ r, e := q.Peek()
+ if e != nil {
+ t.Logf("head=%d tail=%d length=%d", q.head, q.tail, q.length)
+ t.Fatalf("Insert returned error: %v", e)
+ }
+ if r != nil {
+ t.Errorf("Got value: %v expected: %v", r, nil)
+ }
+ }
+ }
+ for i := 0; i < 10; i++ {
+ r, e := q.Pop()
+ if e != nil {
+ t.Fatalf("Got error: %v", e)
+ }
+ if r != i {
+ t.Errorf("Got value: %v expected: %v", r, i)
+ t.Logf("head=%d tail=%d length=%d", q.head, q.tail, q.length)
+ }
+ }
+ r, e := q.Pop()
+ if e != ErrSequenceQueueEmpty {
+ t.Fatalf("Expected empty queue, got: %v value=%v", e, r)
+ }
+ if r != nil {
+ t.Errorf("Got value: %v expected: %v", r, nil)
+ }
+}