From d93011f7dae7addcda325417815024672bc93547 Mon Sep 17 00:00:00 2001 From: Gregor Riepl Date: Tue, 9 Jul 2019 08:25:20 +0200 Subject: [PATCH 01/10] Added RTP packet dissector --- protocol/rtp.go | 154 ++++++++++++++++++ protocol/rtp_test.go | 376 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 530 insertions(+) create mode 100644 protocol/rtp.go create mode 100644 protocol/rtp_test.go diff --git a/protocol/rtp.go b/protocol/rtp.go new file mode 100644 index 0000000..eca68e5 --- /dev/null +++ b/protocol/rtp.go @@ -0,0 +1,154 @@ +/* Copyright (c) 2019 Gregor Riepl + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package protocol + +import ( + "encoding/binary" + "errors" + "io" +) + +const ( + DefaultRtpPacketSize int = 1500 + minHeaderSize int = 12 +) + +var ( + ErrInvalidRtpPacketSize = errors.New("Invalid RTP packet size") + ErrInvalidRtpVersion = errors.New("Invalid RTP version") +) + +type RtpPayloadType uint8 +const ( + RtpPayloadTypePCMU RtpPayloadType = 0 + RtpPayloadTypeGSM RtpPayloadType = 3 + RtpPayloadTypeG723 RtpPayloadType = 4 + RtpPayloadTypeDVI4 RtpPayloadType = 5 + RtpPayloadTypeDVI4_2 RtpPayloadType = 6 + RtpPayloadTypeLPC RtpPayloadType = 7 + RtpPayloadTypePCMA RtpPayloadType = 8 + RtpPayloadTypeG722 RtpPayloadType = 9 + RtpPayloadTypeL16 RtpPayloadType = 10 + RtpPayloadTypeL16_2 RtpPayloadType = 11 + RtpPayloadTypeQCELP RtpPayloadType = 12 + RtpPayloadTypeCN RtpPayloadType = 13 + RtpPayloadTypeMPA RtpPayloadType = 14 + RtpPayloadTypeG728 RtpPayloadType = 15 + RtpPayloadTypeDVI4_3 RtpPayloadType = 16 + RtpPayloadTypeDVI4_4 RtpPayloadType = 17 + RtpPayloadTypeG729 RtpPayloadType = 18 + RtpPayloadTypeCelB RtpPayloadType = 25 + RtpPayloadTypeJPEG RtpPayloadType = 26 + RtpPayloadTypeNV RtpPayloadType = 28 + RtpPayloadTypeH261 RtpPayloadType = 31 + RtpPayloadTypeMPV RtpPayloadType = 32 + RtpPayloadTypeMP2T RtpPayloadType = 33 + RtpPayloadTypeH263 RtpPayloadType = 34 +) + +// RtpPacket represents a decoded RTP packet. +// The header fields are dissected, while the payload is contained as a byte slice. +type RtpPacket struct { + Version uint8 + Padding bool + Marker bool + PayloadType RtpPayloadType + SequenceNumber uint16 + Timestamp uint32 + Ssrc uint32 + Csrc []uint32 + Extension []byte + Payload []byte +} + +// RtpReader is packet reader on top of an underlying standard reader. +// It has a configurable maximum packet size. +type RtpReader struct { + // Reader is the underlying I/O facility + Reader io.Reader + // PacketSize is the maximum packet size that can be read. + // If zero, a default packet size of 1500 octets will be used. + PacketSize int +} + +// ReadRtpPacket reads and returns one RTP packet from the underlying reader. +// +// If the packet was larger than the maximum packet size, excess data will be dropped. +// An incomplete packet, together with a non-nil error will be returned in this case. +// +// If the buffer was to small to read even the RTP header, a nil packet along with +// ErrInvalidRtpPacket will be returned. +// +// If the prtocol version is not equal to 2, ErrInvalidRtpVersion is returned. +func (r *RtpReader) ReadRtpPacket() (*RtpPacket, error) { + p := &RtpPacket{} + + psize := r.PacketSize + if psize == 0 { + psize = DefaultRtpPacketSize + } + data := make([]byte, psize) + n, err := r.Reader.Read(data) + if n == 0 && err != nil { + return nil, err + } + if n < minHeaderSize { + return nil, ErrInvalidRtpPacketSize + } + + p.Version = data[0] & 0x03 + if p.Version != 2 { + return nil, ErrInvalidRtpVersion + } + p.Padding = data[0] & 0x04 != 0 + p.Marker = data[0] & 0x80 != 0 + + extension := data[0] & 0x08 != 0 + csrcc := int((data[0] & 0x70) >> 4) + xhlen := 0 + if extension { + xhlen = 4 + } + if n < minHeaderSize + 4 * csrcc + xhlen { + return nil, ErrInvalidRtpPacketSize + } + + p.PayloadType = RtpPayloadType(data[1]) + p.SequenceNumber = binary.BigEndian.Uint16(data[2:4]) + p.Timestamp = binary.BigEndian.Uint32(data[4:8]) + p.Ssrc = binary.BigEndian.Uint32(data[8:12]) + p.Csrc = make([]uint32, csrcc) + offset := minHeaderSize + + for i := 0; i < csrcc; i++ { + p.Csrc[i] = binary.BigEndian.Uint32(data[offset:(offset+4)]) + offset += 4 + } + + if extension { + xlen := int(binary.BigEndian.Uint16(data[(offset+2):(offset+4)])) + if n < offset + 4 + xlen { + return nil, ErrInvalidRtpPacketSize + } + p.Extension = data[offset:(offset+4+xlen)] + offset += 4 + xlen + } + + p.Payload = data[offset:n] + + return p, nil +} diff --git a/protocol/rtp_test.go b/protocol/rtp_test.go new file mode 100644 index 0000000..549136b --- /dev/null +++ b/protocol/rtp_test.go @@ -0,0 +1,376 @@ +/* Copyright (c) 2019 Gregor Riepl + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package protocol + +import ( + "bytes" + //"io" + "testing" +) + +func listEqual(a, b []uint32) bool { + if len(a) != len(b) { + return false + } + for i := range a { + if a[i] != b[i] { + return false + } + } + return true +} + +func comparePackets(t *testing.T, x, g *RtpPacket) { + if x == nil && g != nil { + t.Error("Expected a nil packet, got non-nil") + } + if x != nil && g == nil { + t.Error("Expected a non-nil packet, got nil") + } + if x.Version != g.Version { + t.Errorf("Got incorrect version: %d Expected: %d", g.Version, x.Version) + } + if x.Padding != g.Padding { + t.Errorf("Got incorrect padding flag: %v Expected: %v", g.Padding, x.Padding) + } + if x.Marker != g.Marker { + t.Errorf("Got incorrect marker flag: %v Expected: %v", g.Marker, x.Marker) + } + if x.PayloadType != g.PayloadType { + t.Errorf("Got incorrect payload type: %v Expected: %v", g.PayloadType, x.PayloadType) + } + if x.SequenceNumber != g.SequenceNumber { + t.Errorf("Got incorrect sequence number: %v Expected: %v", g.SequenceNumber, x.SequenceNumber) + } + if x.Timestamp != g.Timestamp { + t.Errorf("Got incorrect timestamp: %v Expected: %v", g.Timestamp, x.Timestamp) + } + if x.Ssrc != g.Ssrc { + t.Errorf("Got incorrect SSRC: %v Expected: %v", g.Ssrc, x.Ssrc) + } + if !listEqual(x.Csrc, g.Csrc) { + t.Errorf("CSRC doesn't match: %v Expected: %v", g.Csrc, x.Csrc) + } + if !bytes.Equal(x.Extension, g.Extension) { + t.Errorf("Extension doesn't match: %v Expected: %v", g.Extension, x.Extension) + } + if !bytes.Equal(x.Payload, g.Payload) { + t.Errorf("Payload doesn't match: %v Expected: %v", g.Payload, x.Payload) + } +} + +func TestRtpEmptyPacket(t *testing.T) { + d := []byte{} + b := bytes.NewBuffer(d) + r := RtpReader{ + Reader: b, + } + s, err := r.ReadRtpPacket() + if s != nil || err == nil { + t.Fatalf("Expected non-nil error and nil packet") + } +} + +func TestRtpIncompleteHeader(t *testing.T) { + d := []byte{0x02, 0x21, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00} + for i := 1; i < 11; i++ { + b := bytes.NewBuffer(d[0:i]) + r := RtpReader{ + Reader: b, + } + s, err := r.ReadRtpPacket() + if s != nil || err != ErrInvalidRtpPacketSize { + t.Fatalf("Expected non-nil error and nil packet: %v", err) + } + } +} + +func TestRtpInvalidVersion(t *testing.T) { + d := []byte{0x01, 0x21, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00} + b := bytes.NewBuffer(d) + r := RtpReader{ + Reader: b, + } + s, err := r.ReadRtpPacket() + if s != nil || err != ErrInvalidRtpVersion { + t.Fatalf("Expected non-nil error and nil packet: %v", err) + } +} + +func TestRtpMissingCsrc(t *testing.T) { + d := []byte{0x12, 0x21, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00} + b := bytes.NewBuffer(d) + r := RtpReader{ + Reader: b, + } + s, err := r.ReadRtpPacket() + if s != nil || err != ErrInvalidRtpPacketSize { + t.Fatalf("Expected non-nil error and nil packet: %v", err) + } +} + +func TestRtpMissingExtension(t *testing.T) { + d := []byte{0x0e, 0x21, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00} + b := bytes.NewBuffer(d) + r := RtpReader{ + Reader: b, + } + s, err := r.ReadRtpPacket() + if s != nil || err != ErrInvalidRtpPacketSize { + t.Fatalf("Expected non-nil error and nil packet: %v", err) + } +} + +func TestRtpIncompleteExtension(t *testing.T) { + d := []byte{0x0e, 0x21, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x22, 0x00, 0x04} + b := bytes.NewBuffer(d) + r := RtpReader{ + Reader: b, + } + s, err := r.ReadRtpPacket() + if s != nil || err != ErrInvalidRtpPacketSize { + t.Fatalf("Expected non-nil error and nil packet: %v", err) + } +} + +func TestRtpHeaderOnly(t *testing.T) { + d := []byte{0x02, 0x21, 0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef, 0xaa, 0x55} + b := bytes.NewBuffer(d) + r := RtpReader{ + Reader: b, + } + s, err := r.ReadRtpPacket() + if err != nil { + t.Fatalf("Got an error instead of a packet: %v", err) + } + x := &RtpPacket{ + Version: 2, + Padding: false, + Marker: false, + PayloadType: RtpPayloadTypeMP2T, + SequenceNumber: 0x0123, + Timestamp: 0x456789ab, + Ssrc: 0xcdefaa55, + Csrc: nil, + Extension: nil, + Payload: nil, + } + comparePackets(t, x, s) +} + +func TestRtpHeaderCsrc(t *testing.T) { + d := []byte{0x26, 0x21, 0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef, 0xaa, 0x55, 0x10, 0x0e, 0x20, 0x0d, 0x40, 0x0b, 0x80, 0x07} + b := bytes.NewBuffer(d) + r := RtpReader{ + Reader: b, + } + s, err := r.ReadRtpPacket() + if err != nil { + t.Fatalf("Got an error instead of a packet: %v", err) + } + x := &RtpPacket{ + Version: 2, + Padding: true, + Marker: false, + PayloadType: RtpPayloadTypeMP2T, + SequenceNumber: 0x0123, + Timestamp: 0x456789ab, + Ssrc: 0xcdefaa55, + Csrc: []uint32{ + 0x100e200d, + 0x400b8007, + }, + Extension: nil, + Payload: nil, + } + comparePackets(t, x, s) +} + +func TestRtpHeaderExtension(t *testing.T) { + d := []byte{0x0e, 0x21, 0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef, 0xaa, 0x55, 0x11, 0xff, 0x00, 0x08, 0x15, 0xae, 0x25, 0xad, 0x45, 0xab, 0x85, 0xa7} + b := bytes.NewBuffer(d) + r := RtpReader{ + Reader: b, + } + s, err := r.ReadRtpPacket() + if err != nil { + t.Fatalf("Got an error instead of a packet: %v", err) + } + x := &RtpPacket{ + Version: 2, + Padding: true, + Marker: false, + PayloadType: RtpPayloadTypeMP2T, + SequenceNumber: 0x0123, + Timestamp: 0x456789ab, + Ssrc: 0xcdefaa55, + Csrc: nil, + Extension: []byte{ + 0x11, 0xff, 0x00, 0x08, 0x15, 0xae, 0x25, 0xad, 0x45, 0xab, 0x85, 0xa7, + }, + Payload: nil, + } + comparePackets(t, x, s) +} + +func TestRtpHeaderCsrcExtension(t *testing.T) { + d := []byte{0x2e, 0x21, 0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef, 0xaa, 0x55, 0x10, 0x0e, 0x20, 0x0d, 0x40, 0x0b, 0x80, 0x07, 0x11, 0xff, 0x00, 0x08, 0x15, 0xae, 0x25, 0xad, 0x45, 0xab, 0x85, 0xa7} + b := bytes.NewBuffer(d) + r := RtpReader{ + Reader: b, + } + s, err := r.ReadRtpPacket() + if err != nil { + t.Fatalf("Got an error instead of a packet: %v", err) + } + x := &RtpPacket{ + Version: 2, + Padding: true, + Marker: false, + PayloadType: RtpPayloadTypeMP2T, + SequenceNumber: 0x0123, + Timestamp: 0x456789ab, + Ssrc: 0xcdefaa55, + Csrc: []uint32{ + 0x100e200d, + 0x400b8007, + }, + Extension: []byte{ + 0x11, 0xff, 0x00, 0x08, 0x15, 0xae, 0x25, 0xad, 0x45, 0xab, 0x85, 0xa7, + }, + Payload: nil, + } + comparePackets(t, x, s) +} + +func TestRtpPayload(t *testing.T) { + d := []byte{0x02, 0x21, 0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef, 0xaa, 0x55, 0x01, 0x02, 0x03, 0x04, 0x50, 0x60, 0x70} + b := bytes.NewBuffer(d) + r := RtpReader{ + Reader: b, + } + s, err := r.ReadRtpPacket() + if err != nil { + t.Fatalf("Got an error instead of a packet: %v", err) + } + x := &RtpPacket{ + Version: 2, + Padding: false, + Marker: false, + PayloadType: RtpPayloadTypeMP2T, + SequenceNumber: 0x0123, + Timestamp: 0x456789ab, + Ssrc: 0xcdefaa55, + Csrc: nil, + Extension: nil, + Payload: []byte{ + 0x01, 0x02, 0x03, 0x04, 0x50, 0x60, 0x70, + }, + } + comparePackets(t, x, s) +} + +func TestRtpCsrcPayload(t *testing.T) { + d := []byte{0x26, 0x21, 0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef, 0xaa, 0x55, 0x10, 0x0e, 0x20, 0x0d, 0x40, 0x0b, 0x80, 0x07, 0x01, 0x02, 0x03, 0x04, 0x50, 0x60, 0x70} + b := bytes.NewBuffer(d) + r := RtpReader{ + Reader: b, + } + s, err := r.ReadRtpPacket() + if err != nil { + t.Fatalf("Got an error instead of a packet: %v", err) + } + x := &RtpPacket{ + Version: 2, + Padding: true, + Marker: false, + PayloadType: RtpPayloadTypeMP2T, + SequenceNumber: 0x0123, + Timestamp: 0x456789ab, + Ssrc: 0xcdefaa55, + Csrc: []uint32{ + 0x100e200d, + 0x400b8007, + }, + Extension: nil, + Payload: []byte{ + 0x01, 0x02, 0x03, 0x04, 0x50, 0x60, 0x70, + }, + } + comparePackets(t, x, s) +} + +func TestRtpExtensionPayload(t *testing.T) { + d := []byte{0x0e, 0x21, 0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef, 0xaa, 0x55, 0x11, 0xff, 0x00, 0x08, 0x15, 0xae, 0x25, 0xad, 0x45, 0xab, 0x85, 0xa7, 0x01, 0x02, 0x03, 0x04, 0x50, 0x60, 0x70} + b := bytes.NewBuffer(d) + r := RtpReader{ + Reader: b, + } + s, err := r.ReadRtpPacket() + if err != nil { + t.Fatalf("Got an error instead of a packet: %v", err) + } + x := &RtpPacket{ + Version: 2, + Padding: true, + Marker: false, + PayloadType: RtpPayloadTypeMP2T, + SequenceNumber: 0x0123, + Timestamp: 0x456789ab, + Ssrc: 0xcdefaa55, + Csrc: nil, + Extension: []byte{ + 0x11, 0xff, 0x00, 0x08, 0x15, 0xae, 0x25, 0xad, 0x45, 0xab, 0x85, 0xa7, + }, + Payload: []byte{ + 0x01, 0x02, 0x03, 0x04, 0x50, 0x60, 0x70, + }, + } + comparePackets(t, x, s) +} + +func TestRtpCsrcExtensionPayload(t *testing.T) { + d := []byte{0x2e, 0x21, 0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef, 0xaa, 0x55, 0x10, 0x0e, 0x20, 0x0d, 0x40, 0x0b, 0x80, 0x07, 0x11, 0xff, 0x00, 0x08, 0x15, 0xae, 0x25, 0xad, 0x45, 0xab, 0x85, 0xa7, 0x01, 0x02, 0x03, 0x04, 0x50, 0x60, 0x70} + b := bytes.NewBuffer(d) + r := RtpReader{ + Reader: b, + } + s, err := r.ReadRtpPacket() + if err != nil { + t.Fatalf("Got an error instead of a packet: %v", err) + } + x := &RtpPacket{ + Version: 2, + Padding: true, + Marker: false, + PayloadType: RtpPayloadTypeMP2T, + SequenceNumber: 0x0123, + Timestamp: 0x456789ab, + Ssrc: 0xcdefaa55, + Csrc: []uint32{ + 0x100e200d, + 0x400b8007, + }, + Extension: []byte{ + 0x11, 0xff, 0x00, 0x08, 0x15, 0xae, 0x25, 0xad, 0x45, 0xab, 0x85, 0xa7, + }, + Payload: []byte{ + 0x01, 0x02, 0x03, 0x04, 0x50, 0x60, 0x70, + }, + } + comparePackets(t, x, s) +} From a1015823e64bde764e5a8702304da511c95f5b82 Mon Sep 17 00:00:00 2001 From: Gregor Riepl Date: Thu, 25 Jul 2019 09:43:15 +0200 Subject: [PATCH 02/10] Implemented a queue for packet reordering --- util/sequencequeue.go | 78 ++++++++++++++++++++ util/sequencequeue_test.go | 145 +++++++++++++++++++++++++++++++++++++ 2 files changed, 223 insertions(+) create mode 100644 util/sequencequeue.go create mode 100644 util/sequencequeue_test.go diff --git a/util/sequencequeue.go b/util/sequencequeue.go new file mode 100644 index 0000000..342ec0c --- /dev/null +++ b/util/sequencequeue.go @@ -0,0 +1,78 @@ +/* Copyright (c) 2019 Gregor Riepl + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package util + +import ( + "errors" +) + +var ( + ErrSequenceQueueEmpty = errors.New("Queue is empty") + ErrSequenceQueueOccupied = errors.New("Queue position is already occupied") + ErrSequenceQueueOutOfBounds = errors.New("Insert index out of bounds") + ErrSequenceQueueSlotEmpty = errors.New("Queue slot is empty") +) + +type SequenceQueue struct { + queue []interface{} + head int + tail int + length int +} + +func NewSequenceQueue(bound int) *SequenceQueue { + return &SequenceQueue{ + queue: make([]interface{}, bound), + } +} + +func (f *SequenceQueue) Length() int { + return f.length +} + +func (f *SequenceQueue) Insert(position int, value interface{}) error { + if position < 0 || position >= len(f.queue) { + return ErrSequenceQueueOutOfBounds + } + index := (f.head + position) % len(f.queue) + if position >= f.length { + f.queue[index] = value + f.tail = (index + 1) % len(f.queue) + f.length = position + 1 + } else { + if f.queue[index] == nil { + f.queue[index] = value + } else { + return ErrSequenceQueueOccupied + } + } + return nil +} + +func (f *SequenceQueue) Pop() (interface{}, error) { + if f.length == 0 { + return nil, ErrSequenceQueueEmpty + } + ret := f.queue[f.head] + if ret == nil { + return nil, ErrSequenceQueueSlotEmpty + } + f.queue[f.head] = nil + f.head = (f.head + 1) % len(f.queue) + f.length-- + return ret, nil +} diff --git a/util/sequencequeue_test.go b/util/sequencequeue_test.go new file mode 100644 index 0000000..e2241e8 --- /dev/null +++ b/util/sequencequeue_test.go @@ -0,0 +1,145 @@ +/* Copyright (c) 2019 Gregor Riepl + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package util + +import ( + "testing" +) + +func TestSequenceQueuePush(t *testing.T) { + q := NewSequenceQueue(1) + e := q.Insert(0, "A") + if e != nil { + t.Errorf("Insert returned error: %v", e) + } + l := q.Length() + if l != 1 { + t.Errorf("Length after insert is %d instead of %d", l, 1) + } + if l != q.Length() { + t.Errorf("Second length call mismatch") + } +} + +func TestSequenceQueuePushOob(t *testing.T) { + q := NewSequenceQueue(1) + e := q.Insert(1, "A") + if e != ErrSequenceQueueOutOfBounds { + t.Errorf("Didn't get out of bounds error but: %v", e) + } + e = q.Insert(-1, "B") + if e != ErrSequenceQueueOutOfBounds { + t.Errorf("Didn't get out of bounds error but: %v", e) + } + l := q.Length() + if l != 0 { + t.Errorf("Length after insert is %d instead of %d", l, 0) + } +} + +func TestSequenceQueuePushPop(t *testing.T) { + q := NewSequenceQueue(1) + e := q.Insert(0, "A") + if e != nil { + t.Logf("head=%d tail=%d length=%d", q.head, q.tail, q.length) + t.Errorf("Insert returned error: %v", e) + } + r, e := q.Pop() + if e != nil { + t.Fatalf("Got error: %v", e) + } + if r != "A" { + t.Errorf("Got value: %v", r) + t.Logf("head=%d tail=%d length=%d", q.head, q.tail, q.length) + } +} + +func TestSequenceQueuePushOccupied(t *testing.T) { + q := NewSequenceQueue(1) + e := q.Insert(0, "A") + if e != nil { + t.Logf("head=%d tail=%d length=%d", q.head, q.tail, q.length) + t.Errorf("Insert returned error: %v", e) + } + e = q.Insert(0, "A") + if e != ErrSequenceQueueOccupied { + t.Fatalf("Got nil instead of error") + } +} + +func TestSequenceQueuePushMutiple(t *testing.T) { + q := NewSequenceQueue(10) + for i := 0; i < 10; i++ { + e := q.Insert(i, i) + if e != nil { + t.Logf("head=%d tail=%d length=%d", q.head, q.tail, q.length) + t.Errorf("Insert returned error: %v", e) + } + } + for i := 0; i < 10; i++ { + r, e := q.Pop() + if e != nil { + t.Fatalf("Got error: %v", e) + } + if r != i { + t.Errorf("Got value: %v expected: %v", r, i) + t.Logf("head=%d tail=%d length=%d", q.head, q.tail, q.length) + } + } + r, e := q.Pop() + if e != ErrSequenceQueueEmpty || r != nil { + t.Fatalf("Expected empty queue, got: %v value=%v", e, r) + } +} + +func TestSequenceQueuePushReverse(t *testing.T) { + q := NewSequenceQueue(10) + for i := 9; i >= 0; i-- { + e := q.Insert(i, i) + if e != nil { + t.Logf("head=%d tail=%d length=%d", q.head, q.tail, q.length) + t.Errorf("Insert returned error: %v", e) + } + if i != 0 { + r, e := q.Pop() + if e != ErrSequenceQueueSlotEmpty { + t.Logf("head=%d tail=%d length=%d", q.head, q.tail, q.length) + t.Fatalf("Expected empty slot at head, got: %v", e) + } + if r != nil { + t.Errorf("Got value: %v expected: %v", r, nil) + } + } + } + for i := 0; i < 10; i++ { + r, e := q.Pop() + if e != nil { + t.Fatalf("Got error: %v", e) + } + if r != i { + t.Errorf("Got value: %v expected: %v", r, i) + t.Logf("head=%d tail=%d length=%d", q.head, q.tail, q.length) + } + } + r, e := q.Pop() + if e != ErrSequenceQueueEmpty { + t.Fatalf("Expected empty queue, got: %v value=%v", e, r) + } + if r != nil { + t.Errorf("Got value: %v expected: %v", r, nil) + } +} From 09fc905cd5fb704375ba405e6c18a5edab7bfb07 Mon Sep 17 00:00:00 2001 From: Gregor Riepl Date: Thu, 25 Jul 2019 09:43:43 +0200 Subject: [PATCH 03/10] Document RtpPacket fields --- protocol/rtp.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/protocol/rtp.go b/protocol/rtp.go index eca68e5..1ba543c 100644 --- a/protocol/rtp.go +++ b/protocol/rtp.go @@ -63,15 +63,34 @@ const ( // RtpPacket represents a decoded RTP packet. // The header fields are dissected, while the payload is contained as a byte slice. type RtpPacket struct { + // Version contains the RTP protocol version. + // Only version 2 is supported. Version uint8 + // Padding is true if the packet hat the padding flag set. + // The extension header and/or payload is always truncated to the actual size. Padding bool + // Marker is true if the packet hat the marker bit set. Marker bool + // PayloadType describes the kind of data contained in the packet. + // See the RtpPayloadType constants for known values. Not that some ranges + // are dynamically assigned and must be defined by the application. PayloadType RtpPayloadType + // SequenceNumber is a 16-bit sequence number that helps with correct ordering + // wen reassembling the stream. SequenceNumber uint16 + // Timestamp is an application-defined absolute timestamp that should be + // used as the basis for ES timestamps. It can also be ignored if the + // packaged protocol has sufficiently well-defined timestamps. Timestamp uint32 + // Ssrc contains the value of the SSRC field. Ssrc uint32 + // Csrc contains the values of the CSRC fields. + // If the packet had no CSRCs, it is an empty list (nil). Csrc []uint32 + // Extension contains the full extension header, including the length and ... fields. Extension []byte + // Payload contains the actual data part of the packet. + // It is truncated to the packet size, excluding padding (if it was enabled) Payload []byte } @@ -148,6 +167,7 @@ func (r *RtpReader) ReadRtpPacket() (*RtpPacket, error) { offset += 4 + xlen } + // TODO truncate padding p.Payload = data[offset:n] return p, nil From 96f5f5c8ea4afb78f0905dd8cd6fbe6697fd8bf8 Mon Sep 17 00:00:00 2001 From: Gregor Riepl Date: Tue, 30 Jul 2019 19:03:15 +0200 Subject: [PATCH 04/10] Generalised seqQ interface and removed redundant error codes --- util/sequencequeue.go | 25 ++++++++-------- util/sequencequeue_test.go | 61 +++++++++++++++++++++++++------------- 2 files changed, 53 insertions(+), 33 deletions(-) diff --git a/util/sequencequeue.go b/util/sequencequeue.go index 342ec0c..6e82b95 100644 --- a/util/sequencequeue.go +++ b/util/sequencequeue.go @@ -22,9 +22,7 @@ import ( var ( ErrSequenceQueueEmpty = errors.New("Queue is empty") - ErrSequenceQueueOccupied = errors.New("Queue position is already occupied") ErrSequenceQueueOutOfBounds = errors.New("Insert index out of bounds") - ErrSequenceQueueSlotEmpty = errors.New("Queue slot is empty") ) type SequenceQueue struct { @@ -44,9 +42,9 @@ func (f *SequenceQueue) Length() int { return f.length } -func (f *SequenceQueue) Insert(position int, value interface{}) error { +func (f *SequenceQueue) Insert(position int, value interface{}) (old interface{}, err error) { if position < 0 || position >= len(f.queue) { - return ErrSequenceQueueOutOfBounds + return nil, ErrSequenceQueueOutOfBounds } index := (f.head + position) % len(f.queue) if position >= f.length { @@ -54,13 +52,17 @@ func (f *SequenceQueue) Insert(position int, value interface{}) error { f.tail = (index + 1) % len(f.queue) f.length = position + 1 } else { - if f.queue[index] == nil { - f.queue[index] = value - } else { - return ErrSequenceQueueOccupied - } + old = f.queue[index] + f.queue[index] = value } - return nil + return old, nil +} + +func (f *SequenceQueue) Peek() (interface{}, error) { + if f.length == 0 { + return nil, ErrSequenceQueueEmpty + } + return f.queue[f.head], nil } func (f *SequenceQueue) Pop() (interface{}, error) { @@ -68,9 +70,6 @@ func (f *SequenceQueue) Pop() (interface{}, error) { return nil, ErrSequenceQueueEmpty } ret := f.queue[f.head] - if ret == nil { - return nil, ErrSequenceQueueSlotEmpty - } f.queue[f.head] = nil f.head = (f.head + 1) % len(f.queue) f.length-- diff --git a/util/sequencequeue_test.go b/util/sequencequeue_test.go index e2241e8..a3d2d20 100644 --- a/util/sequencequeue_test.go +++ b/util/sequencequeue_test.go @@ -22,9 +22,12 @@ import ( func TestSequenceQueuePush(t *testing.T) { q := NewSequenceQueue(1) - e := q.Insert(0, "A") + o, e := q.Insert(0, "A") if e != nil { - t.Errorf("Insert returned error: %v", e) + t.Fatalf("Insert returned error: %v", e) + } + if o != nil { + t.Errorf("Old value was not nil: %v", o) } l := q.Length() if l != 1 { @@ -37,13 +40,19 @@ func TestSequenceQueuePush(t *testing.T) { func TestSequenceQueuePushOob(t *testing.T) { q := NewSequenceQueue(1) - e := q.Insert(1, "A") + o, e := q.Insert(1, "A") if e != ErrSequenceQueueOutOfBounds { - t.Errorf("Didn't get out of bounds error but: %v", e) + t.Fatalf("Didn't get out of bounds error but: %v", e) + } + if o != nil { + t.Errorf("Old value was not nil: %v", o) } - e = q.Insert(-1, "B") + o, e = q.Insert(-1, "B") if e != ErrSequenceQueueOutOfBounds { - t.Errorf("Didn't get out of bounds error but: %v", e) + t.Fatalf("Didn't get out of bounds error but: %v", e) + } + if o != nil { + t.Errorf("Old value was not nil: %v", o) } l := q.Length() if l != 0 { @@ -53,10 +62,13 @@ func TestSequenceQueuePushOob(t *testing.T) { func TestSequenceQueuePushPop(t *testing.T) { q := NewSequenceQueue(1) - e := q.Insert(0, "A") + o, e := q.Insert(0, "A") if e != nil { t.Logf("head=%d tail=%d length=%d", q.head, q.tail, q.length) - t.Errorf("Insert returned error: %v", e) + t.Fatalf("Insert returned error: %v", e) + } + if o != nil { + t.Errorf("Old value was not nil: %v", o) } r, e := q.Pop() if e != nil { @@ -70,24 +82,30 @@ func TestSequenceQueuePushPop(t *testing.T) { func TestSequenceQueuePushOccupied(t *testing.T) { q := NewSequenceQueue(1) - e := q.Insert(0, "A") + o, e := q.Insert(0, "A") if e != nil { t.Logf("head=%d tail=%d length=%d", q.head, q.tail, q.length) - t.Errorf("Insert returned error: %v", e) + t.Fatalf("Insert returned error: %v", e) } - e = q.Insert(0, "A") - if e != ErrSequenceQueueOccupied { - t.Fatalf("Got nil instead of error") + if o != nil { + t.Errorf("Old value was not nil: %v", o) + } + o, e = q.Insert(0, "A") + if o == nil { + t.Errorf("Old value was nil") } } func TestSequenceQueuePushMutiple(t *testing.T) { q := NewSequenceQueue(10) for i := 0; i < 10; i++ { - e := q.Insert(i, i) + o, e := q.Insert(i, i) if e != nil { t.Logf("head=%d tail=%d length=%d", q.head, q.tail, q.length) - t.Errorf("Insert returned error: %v", e) + t.Fatalf("Insert returned error: %v", e) + } + if o != nil { + t.Errorf("Old value was not nil: %v", o) } } for i := 0; i < 10; i++ { @@ -109,16 +127,19 @@ func TestSequenceQueuePushMutiple(t *testing.T) { func TestSequenceQueuePushReverse(t *testing.T) { q := NewSequenceQueue(10) for i := 9; i >= 0; i-- { - e := q.Insert(i, i) + o, e := q.Insert(i, i) if e != nil { t.Logf("head=%d tail=%d length=%d", q.head, q.tail, q.length) - t.Errorf("Insert returned error: %v", e) + t.Fatalf("Insert returned error: %v", e) + } + if o != nil { + t.Errorf("Old value was not nil: %v", o) } if i != 0 { - r, e := q.Pop() - if e != ErrSequenceQueueSlotEmpty { + r, e := q.Peek() + if e != nil { t.Logf("head=%d tail=%d length=%d", q.head, q.tail, q.length) - t.Fatalf("Expected empty slot at head, got: %v", e) + t.Fatalf("Insert returned error: %v", e) } if r != nil { t.Errorf("Got value: %v expected: %v", r, nil) From 520fea1da4bfa35904922f5753ac6d0082f93d51 Mon Sep 17 00:00:00 2001 From: Gregor Riepl Date: Wed, 31 Jul 2019 19:15:56 +0200 Subject: [PATCH 05/10] Added absolute difference utility --- util/abs.go | 44 +++++++++++++++++++++++ util/abs_32_test.go | 86 +++++++++++++++++++++++++++++++++++++++++++++ util/abs_64_test.go | 86 +++++++++++++++++++++++++++++++++++++++++++++ util/abs_test.go | 63 +++++++++++++++++++++++++++++++++ 4 files changed, 279 insertions(+) create mode 100644 util/abs.go create mode 100644 util/abs_32_test.go create mode 100644 util/abs_64_test.go create mode 100644 util/abs_test.go diff --git a/util/abs.go b/util/abs.go new file mode 100644 index 0000000..762f4ef --- /dev/null +++ b/util/abs.go @@ -0,0 +1,44 @@ +/* Copyright (c) 2019 Gregor Riepl + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package util + +// AbsSubInt32 calculates absolute difference between a and b, or: | a - b | +// Arguments and return type are int32. +func AbsSubInt32(a, b int32) int32 { + if a > b { + return a - b + } + return b - a +} + +// AbsSubInt64 calculates absolute difference between a and b, or: | a - b | +// Arguments and return type are int64. +func AbsSubInt64(a, b int64) int64 { + if a > b { + return a - b + } + return b - a +} + +// AbsSub calculates absolute difference between a and b, or: | a - b | +// Arguments and return type are int. +func AbsSub(a, b int) int { + if a > b { + return a - b + } + return b - a +} diff --git a/util/abs_32_test.go b/util/abs_32_test.go new file mode 100644 index 0000000..3658b6c --- /dev/null +++ b/util/abs_32_test.go @@ -0,0 +1,86 @@ +// +build 386 arm + +/* Copyright (c) 2019 Gregor Riepl + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package util + +import ( + "testing" +) + +func TestAbsSubInt32_32(t *testing.T) { + va := []int32{ -0x7ffffffe, 0x7ffffffe, -0x7fffffff, 0x7fffffff, -2, 2, -1, -1, 0, 0 } + vb := []int32{ -0x7ffffffe, 0x7ffffffe, -0x7fffffff, 0x7fffffff, -2, 2, -1, -1, 0, 0 } + for ia := 0; ia < len(va); ia++ { + for ib := 0; ib < len(vb); ib++ { + a := va[ia] + b := vb[ib] + r := AbsSubInt32(a, b) + var x int32 + if a > b { + x = a - b + } else { + x = b - a + } + if r != x { + t.Errorf("AbsSub(%d, %d) should be %d, got %d", a, b, x, r) + } + } + } +} + +func TestAbsSubInt64_32(t *testing.T) { + va := []int64{ -0x7ffffffffffffffe, 0x7ffffffffffffffe, -0x7fffffffffffffff, 0x7fffffffffffffff, -0x7ffffffe, 0x7ffffffe, -0x7fffffff, 0x7fffffff, -2, 2, -1, -1, 0, 0 } + vb := []int64{ -0x7ffffffffffffffe, 0x7ffffffffffffffe, -0x7fffffffffffffff, 0x7fffffffffffffff, -0x7ffffffe, 0x7ffffffe, -0x7fffffff, 0x7fffffff, -2, 2, -1, -1, 0, 0 } + for ia := 0; ia < len(va); ia++ { + for ib := 0; ib < len(vb); ib++ { + a := va[ia] + b := vb[ib] + r := AbsSubInt64(a, b) + var x int64 + if a > b { + x = a - b + } else { + x = b - a + } + if r != x { + t.Errorf("AbsSub(%d, %d) should be %d, got %d", a, b, x, r) + } + } + } +} + +func TestAbsSub_32(t *testing.T) { + va := []int{ -0x7ffffffe, 0x7ffffffe, -0x7fffffff, 0x7fffffff, -2, 2, -1, -1, 0, 0 } + vb := []int{ -0x7ffffffe, 0x7ffffffe, -0x7fffffff, 0x7fffffff, -2, 2, -1, -1, 0, 0 } + for ia := 0; ia < len(va); ia++ { + for ib := 0; ib < len(vb); ib++ { + a := va[ia] + b := vb[ib] + r := AbsSub(a, b) + var x int + if a > b { + x = a - b + } else { + x = b - a + } + if r != x { + t.Errorf("AbsSub(%d, %d) should be %d, got %d", a, b, x, r) + } + } + } +} diff --git a/util/abs_64_test.go b/util/abs_64_test.go new file mode 100644 index 0000000..345266f --- /dev/null +++ b/util/abs_64_test.go @@ -0,0 +1,86 @@ +// +build amd64 arm64 ppc64 + +/* Copyright (c) 2019 Gregor Riepl + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package util + +import ( + "testing" +) + +func TestAbsSubInt32_64(t *testing.T) { + va := []int32{ -0x7ffffffe, 0x7ffffffe, -0x7fffffff, 0x7fffffff, -2, 2, -1, -1, 0, 0 } + vb := []int32{ -0x7ffffffe, 0x7ffffffe, -0x7fffffff, 0x7fffffff, -2, 2, -1, -1, 0, 0 } + for ia := 0; ia < len(va); ia++ { + for ib := 0; ib < len(vb); ib++ { + a := va[ia] + b := vb[ib] + r := AbsSubInt32(a, b) + var x int32 + if a > b { + x = a - b + } else { + x = b - a + } + if r != x { + t.Errorf("AbsSub(%d, %d) should be %d, got %d", a, b, x, r) + } + } + } +} + +func TestAbsSubInt64_64(t *testing.T) { + va := []int64{ -0x7ffffffffffffffe, 0x7ffffffffffffffe, -0x7fffffffffffffff, 0x7fffffffffffffff, -0x7ffffffe, 0x7ffffffe, -0x7fffffff, 0x7fffffff, -2, 2, -1, -1, 0, 0 } + vb := []int64{ -0x7ffffffffffffffe, 0x7ffffffffffffffe, -0x7fffffffffffffff, 0x7fffffffffffffff, -0x7ffffffe, 0x7ffffffe, -0x7fffffff, 0x7fffffff, -2, 2, -1, -1, 0, 0 } + for ia := 0; ia < len(va); ia++ { + for ib := 0; ib < len(vb); ib++ { + a := va[ia] + b := vb[ib] + r := AbsSubInt64(a, b) + var x int64 + if a > b { + x = a - b + } else { + x = b - a + } + if r != x { + t.Errorf("AbsSub(%d, %d) should be %d, got %d", a, b, x, r) + } + } + } +} + +func TestAbsSub_64(t *testing.T) { + va := []int{ -0x7ffffffffffffffe, 0x7ffffffffffffffe, -0x7fffffffffffffff, 0x7fffffffffffffff, -0x7ffffffe, 0x7ffffffe, -0x7fffffff, 0x7fffffff, -2, 2, -1, -1, 0, 0 } + vb := []int{ -0x7ffffffffffffffe, 0x7ffffffffffffffe, -0x7fffffffffffffff, 0x7fffffffffffffff, -0x7ffffffe, 0x7ffffffe, -0x7fffffff, 0x7fffffff, -2, 2, -1, -1, 0, 0 } + for ia := 0; ia < len(va); ia++ { + for ib := 0; ib < len(vb); ib++ { + a := va[ia] + b := vb[ib] + r := AbsSub(a, b) + var x int + if a > b { + x = a - b + } else { + x = b - a + } + if r != x { + t.Errorf("AbsSub(%d, %d) should be %d, got %d", a, b, x, r) + } + } + } +} diff --git a/util/abs_test.go b/util/abs_test.go new file mode 100644 index 0000000..17f4faa --- /dev/null +++ b/util/abs_test.go @@ -0,0 +1,63 @@ +/* Copyright (c) 2019 Gregor Riepl + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package util + +import ( + "testing" +) + +func TestAbsSubInt32(t *testing.T) { + va := []int32{ -0x7ffffffe, 0x7ffffffe, -0x7fffffff, 0x7fffffff, -2, 2, -1, -1, 0, 0 } + vb := []int32{ -0x7ffffffe, 0x7ffffffe, -0x7fffffff, 0x7fffffff, -2, 2, -1, -1, 0, 0 } + for ia := 0; ia < len(va); ia++ { + for ib := 0; ib < len(vb); ib++ { + a := va[ia] + b := vb[ib] + r := AbsSubInt32(a, b) + var x int32 + if a > b { + x = a - b + } else { + x = b - a + } + if r != x { + t.Errorf("AbsSub(%d, %d) should be %d, got %d", a, b, x, r) + } + } + } +} + +func TestAbsSubInt64(t *testing.T) { + va := []int64{ -0x7ffffffffffffffe, 0x7ffffffffffffffe, -0x7fffffffffffffff, 0x7fffffffffffffff, -0x7ffffffe, 0x7ffffffe, -0x7fffffff, 0x7fffffff, -2, 2, -1, -1, 0, 0 } + vb := []int64{ -0x7ffffffffffffffe, 0x7ffffffffffffffe, -0x7fffffffffffffff, 0x7fffffffffffffff, -0x7ffffffe, 0x7ffffffe, -0x7fffffff, 0x7fffffff, -2, 2, -1, -1, 0, 0 } + for ia := 0; ia < len(va); ia++ { + for ib := 0; ib < len(vb); ib++ { + a := va[ia] + b := vb[ib] + r := AbsSubInt64(a, b) + var x int64 + if a > b { + x = a - b + } else { + x = b - a + } + if r != x { + t.Errorf("AbsSub(%d, %d) should be %d, got %d", a, b, x, r) + } + } + } +} From 48f0cd0dc78af9e260ea1717ed1a2e26363b553f Mon Sep 17 00:00:00 2001 From: Gregor Riepl Date: Wed, 7 Aug 2019 19:47:54 +0200 Subject: [PATCH 06/10] Fixed RTP packet dissector --- protocol/rtp.go | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/protocol/rtp.go b/protocol/rtp.go index 1ba543c..28326f0 100644 --- a/protocol/rtp.go +++ b/protocol/rtp.go @@ -94,7 +94,7 @@ type RtpPacket struct { Payload []byte } -// RtpReader is packet reader on top of an underlying standard reader. +// RtpReader is a packet reader on top of an underlying standard reader. // It has a configurable maximum packet size. type RtpReader struct { // Reader is the underlying I/O facility @@ -129,15 +129,21 @@ func (r *RtpReader) ReadRtpPacket() (*RtpPacket, error) { return nil, ErrInvalidRtpPacketSize } - p.Version = data[0] & 0x03 + p.Version = (data[0] & 0xc0) >> 6 if p.Version != 2 { + logger.Logkv( + "event", "error", + "error", "rtp_version", + "version", p.Version, + "message", "Invalid RTP version", + ) return nil, ErrInvalidRtpVersion } - p.Padding = data[0] & 0x04 != 0 - p.Marker = data[0] & 0x80 != 0 + p.Padding = data[0] & 0x20 != 0 + p.Marker = data[1] & 0x80 != 0 - extension := data[0] & 0x08 != 0 - csrcc := int((data[0] & 0x70) >> 4) + extension := data[0] & 0x10 != 0 + csrcc := int(data[0] & 0x0f) xhlen := 0 if extension { xhlen = 4 From a68a273bef9670366a7de4b73df30394b422a0b8 Mon Sep 17 00:00:00 2001 From: Gregor Riepl Date: Wed, 7 Aug 2019 19:48:20 +0200 Subject: [PATCH 07/10] Unified absSub unit test between 32/64 archs --- util/abs_32_test.go | 86 --------------------------------------------- util/abs_64_test.go | 86 --------------------------------------------- util/abs_test.go | 31 ++++++++++++++++ 3 files changed, 31 insertions(+), 172 deletions(-) delete mode 100644 util/abs_32_test.go delete mode 100644 util/abs_64_test.go diff --git a/util/abs_32_test.go b/util/abs_32_test.go deleted file mode 100644 index 3658b6c..0000000 --- a/util/abs_32_test.go +++ /dev/null @@ -1,86 +0,0 @@ -// +build 386 arm - -/* Copyright (c) 2019 Gregor Riepl - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package util - -import ( - "testing" -) - -func TestAbsSubInt32_32(t *testing.T) { - va := []int32{ -0x7ffffffe, 0x7ffffffe, -0x7fffffff, 0x7fffffff, -2, 2, -1, -1, 0, 0 } - vb := []int32{ -0x7ffffffe, 0x7ffffffe, -0x7fffffff, 0x7fffffff, -2, 2, -1, -1, 0, 0 } - for ia := 0; ia < len(va); ia++ { - for ib := 0; ib < len(vb); ib++ { - a := va[ia] - b := vb[ib] - r := AbsSubInt32(a, b) - var x int32 - if a > b { - x = a - b - } else { - x = b - a - } - if r != x { - t.Errorf("AbsSub(%d, %d) should be %d, got %d", a, b, x, r) - } - } - } -} - -func TestAbsSubInt64_32(t *testing.T) { - va := []int64{ -0x7ffffffffffffffe, 0x7ffffffffffffffe, -0x7fffffffffffffff, 0x7fffffffffffffff, -0x7ffffffe, 0x7ffffffe, -0x7fffffff, 0x7fffffff, -2, 2, -1, -1, 0, 0 } - vb := []int64{ -0x7ffffffffffffffe, 0x7ffffffffffffffe, -0x7fffffffffffffff, 0x7fffffffffffffff, -0x7ffffffe, 0x7ffffffe, -0x7fffffff, 0x7fffffff, -2, 2, -1, -1, 0, 0 } - for ia := 0; ia < len(va); ia++ { - for ib := 0; ib < len(vb); ib++ { - a := va[ia] - b := vb[ib] - r := AbsSubInt64(a, b) - var x int64 - if a > b { - x = a - b - } else { - x = b - a - } - if r != x { - t.Errorf("AbsSub(%d, %d) should be %d, got %d", a, b, x, r) - } - } - } -} - -func TestAbsSub_32(t *testing.T) { - va := []int{ -0x7ffffffe, 0x7ffffffe, -0x7fffffff, 0x7fffffff, -2, 2, -1, -1, 0, 0 } - vb := []int{ -0x7ffffffe, 0x7ffffffe, -0x7fffffff, 0x7fffffff, -2, 2, -1, -1, 0, 0 } - for ia := 0; ia < len(va); ia++ { - for ib := 0; ib < len(vb); ib++ { - a := va[ia] - b := vb[ib] - r := AbsSub(a, b) - var x int - if a > b { - x = a - b - } else { - x = b - a - } - if r != x { - t.Errorf("AbsSub(%d, %d) should be %d, got %d", a, b, x, r) - } - } - } -} diff --git a/util/abs_64_test.go b/util/abs_64_test.go deleted file mode 100644 index 345266f..0000000 --- a/util/abs_64_test.go +++ /dev/null @@ -1,86 +0,0 @@ -// +build amd64 arm64 ppc64 - -/* Copyright (c) 2019 Gregor Riepl - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package util - -import ( - "testing" -) - -func TestAbsSubInt32_64(t *testing.T) { - va := []int32{ -0x7ffffffe, 0x7ffffffe, -0x7fffffff, 0x7fffffff, -2, 2, -1, -1, 0, 0 } - vb := []int32{ -0x7ffffffe, 0x7ffffffe, -0x7fffffff, 0x7fffffff, -2, 2, -1, -1, 0, 0 } - for ia := 0; ia < len(va); ia++ { - for ib := 0; ib < len(vb); ib++ { - a := va[ia] - b := vb[ib] - r := AbsSubInt32(a, b) - var x int32 - if a > b { - x = a - b - } else { - x = b - a - } - if r != x { - t.Errorf("AbsSub(%d, %d) should be %d, got %d", a, b, x, r) - } - } - } -} - -func TestAbsSubInt64_64(t *testing.T) { - va := []int64{ -0x7ffffffffffffffe, 0x7ffffffffffffffe, -0x7fffffffffffffff, 0x7fffffffffffffff, -0x7ffffffe, 0x7ffffffe, -0x7fffffff, 0x7fffffff, -2, 2, -1, -1, 0, 0 } - vb := []int64{ -0x7ffffffffffffffe, 0x7ffffffffffffffe, -0x7fffffffffffffff, 0x7fffffffffffffff, -0x7ffffffe, 0x7ffffffe, -0x7fffffff, 0x7fffffff, -2, 2, -1, -1, 0, 0 } - for ia := 0; ia < len(va); ia++ { - for ib := 0; ib < len(vb); ib++ { - a := va[ia] - b := vb[ib] - r := AbsSubInt64(a, b) - var x int64 - if a > b { - x = a - b - } else { - x = b - a - } - if r != x { - t.Errorf("AbsSub(%d, %d) should be %d, got %d", a, b, x, r) - } - } - } -} - -func TestAbsSub_64(t *testing.T) { - va := []int{ -0x7ffffffffffffffe, 0x7ffffffffffffffe, -0x7fffffffffffffff, 0x7fffffffffffffff, -0x7ffffffe, 0x7ffffffe, -0x7fffffff, 0x7fffffff, -2, 2, -1, -1, 0, 0 } - vb := []int{ -0x7ffffffffffffffe, 0x7ffffffffffffffe, -0x7fffffffffffffff, 0x7fffffffffffffff, -0x7ffffffe, 0x7ffffffe, -0x7fffffff, 0x7fffffff, -2, 2, -1, -1, 0, 0 } - for ia := 0; ia < len(va); ia++ { - for ib := 0; ib < len(vb); ib++ { - a := va[ia] - b := vb[ib] - r := AbsSub(a, b) - var x int - if a > b { - x = a - b - } else { - x = b - a - } - if r != x { - t.Errorf("AbsSub(%d, %d) should be %d, got %d", a, b, x, r) - } - } - } -} diff --git a/util/abs_test.go b/util/abs_test.go index 17f4faa..8e9fad8 100644 --- a/util/abs_test.go +++ b/util/abs_test.go @@ -20,6 +20,8 @@ import ( "testing" ) +const intSize = 4 + (^uint(0)>>63) * 4 + func TestAbsSubInt32(t *testing.T) { va := []int32{ -0x7ffffffe, 0x7ffffffe, -0x7fffffff, 0x7fffffff, -2, 2, -1, -1, 0, 0 } vb := []int32{ -0x7ffffffe, 0x7ffffffe, -0x7fffffff, 0x7fffffff, -2, 2, -1, -1, 0, 0 } @@ -61,3 +63,32 @@ func TestAbsSubInt64(t *testing.T) { } } } + +func TestAbsSub(t *testing.T) { + v64 := []int64{ -0x7ffffffffffffffe, 0x7ffffffffffffffe, -0x7fffffffffffffff, 0x7fffffffffffffff } + va := []int{ -0x7ffffffe, 0x7ffffffe, -0x7fffffff, 0x7fffffff, -2, 2, -1, -1, 0, 0 } + vb := []int{ -0x7ffffffe, 0x7ffffffe, -0x7fffffff, 0x7fffffff, -2, 2, -1, -1, 0, 0 } + if intSize == 8 { + for _, a64 := range v64 { + va = append(va, int(a64)) + vb = append(vb, int(a64)) + } + } + for ia := 0; ia < len(va); ia++ { + for ib := 0; ib < len(vb); ib++ { + a := va[ia] + b := vb[ib] + t.Logf("Testing |%d - %d|", a, b) + r := AbsSub(a, b) + var x int + if a > b { + x = a - b + } else { + x = b - a + } + if r != x { + t.Errorf("AbsSub(%d, %d) should be %d, got %d", a, b, x, r) + } + } + } +} From e0851f7da13bb4636d444fa7f0f6f6ae4a319aee Mon Sep 17 00:00:00 2001 From: Gregor Riepl Date: Wed, 7 Aug 2019 19:48:56 +0200 Subject: [PATCH 08/10] Added logger to protocol module --- protocol/logger.go | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 protocol/logger.go diff --git a/protocol/logger.go b/protocol/logger.go new file mode 100644 index 0000000..ddc7b1f --- /dev/null +++ b/protocol/logger.go @@ -0,0 +1,27 @@ +/* Copyright (c) 2018 Gregor Riepl + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package protocol + +import ( + "github.com/onitake/restreamer/util" +) + +const ( + moduleProtocol = "protocol" +) + +var logger util.Logger = util.NewGlobalModuleLogger(moduleProtocol, nil) From 0686192802789e2711abfeee9e11be46ab0e7423 Mon Sep 17 00:00:00 2001 From: Gregor Riepl Date: Wed, 7 Aug 2019 19:49:10 +0200 Subject: [PATCH 09/10] Implemented RTP->MP2TS bridging module --- protocol/rtpbridge.go | 162 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 162 insertions(+) create mode 100644 protocol/rtpbridge.go diff --git a/protocol/rtpbridge.go b/protocol/rtpbridge.go new file mode 100644 index 0000000..eb97423 --- /dev/null +++ b/protocol/rtpbridge.go @@ -0,0 +1,162 @@ +/* Copyright (c) 2019 Gregor Riepl + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package protocol + +import ( + "bytes" + "errors" + "io" + "github.com/onitake/restreamer/util" + "fmt" +) + +var ( + ErrRtpBridgeInvalidPacketType = errors.New("Invalid packet type") + ErrRtpBridgePacketTooSmall = errors.New("Payload too small") + ErrRtpBridgeWaitingForMore = errors.New("Waiting for more data") +) + +type RtpBridge struct { + reader *RtpReader + seqnum int + queue *util.SequenceQueue + buffer *bytes.Buffer +} + +func NewRtpBridge(reader io.Reader, psize int, lookahead int) *RtpBridge { + return &RtpBridge{ + reader: &RtpReader{ + Reader: reader, + PacketSize: psize, + }, + seqnum: -1, + queue: util.NewSequenceQueue(lookahead), + // this allocates a bit too much (we're only interested in the payload), + // but will work no matter what size the header has + buffer: bytes.NewBuffer(make([]byte, 0, psize)), + } +} + +func (b *RtpBridge) packetIntoQueue() error { + p, err := b.reader.ReadRtpPacket() + if err != nil { + return err + } + if p.PayloadType != RtpPayloadTypeMP2T { + // we don't accept anything besides MPEG-2 TS packets + return ErrRtpBridgeInvalidPacketType + } + if len(p.Payload) < MpegTsPacketSize { + // RTP packets need to contain at least one whole MP2TS packet + return ErrRtpBridgeInvalidPacketType + } + seq := int(p.SequenceNumber) + // check if this is the first packet at start or after a long drought + var pos int + if b.queue.Length() == 0 { + pos = 0 + b.seqnum = seq + } else { + // calculate the queue position relative to the last fetched sequence number + pos = util.AbsSub(b.seqnum, seq) + // check if we're too far ahead from the head of the queue + newseq := pos - (b.queue.Length() - 1) + if newseq > 0 { + // to make room for a packet this far ahead, we need to get rid of some previous slots first + for i := 0; i < newseq; i++ { + // TODO instead of dropping all of them, we could pop all non-empty slots and enqueue the data. + // but this would require another queue for the reassembled output... + // or moving from a pull to push design. + b.queue.Pop() + } + logger.Logkv( + "event", "error", + "error", "rtp_drop", + "packets", newseq, + "message", fmt.Sprintf("Packets lost: %d", newseq), + ) + + // update pos and seqnum with the new base index + pos -= newseq + b.seqnum += newseq + } + } + old, err := b.queue.Insert(pos, p) + if err != nil { + // should never happen, maybe just panic here? + return err + } + if old != nil { + logger.Logkv( + "event", "error", + "error", "rtp_dup", + "message", "Packet with duplicate sequence number overwritten", + ) + } + return nil +} + +func (b *RtpBridge) nextPacket() (*RtpPacket, error) { + p, err := b.queue.Peek() + if err != nil { + return nil, err + } + if p != nil { + // we already fetched the next packet, just update the head now + b.queue.Pop() + // this would panic on type mismatch, but the queue only contains RtpPackets and nils are already excluded + rtp := p.(*RtpPacket) + return rtp, nil + } + // still waiting for the next packet in line + return nil, nil +} + +func (b *RtpBridge) Read(p []byte) (n int, err error) { + // TODO MPEG2-TS packets may also be 204 bytes long, which includes a 32-bit checksum. + // This is currently unsupported. + + // the payload may only contain whole MP2TS packets - we don't do partial reassembly + if b.buffer.Len() < MpegTsPacketSize { + // partial or empty - clear the buffer + b.buffer.Reset() + // try to read the next packet + err = b.packetIntoQueue() + if err != nil { + return 0, err + } + // then pop the next packet from the queue (with possible reordering) + p, err := b.nextPacket() + if err != nil { + return 0, err + } + if p == nil { + // no packet available yet, we need to wait for more segments + return 0, ErrRtpBridgeWaitingForMore + } + b.buffer.Write(p.Payload) + } + return b.buffer.Read(p) +} + +func (b *RtpBridge) Close() error { + if c, ok := b.reader.Reader.(io.Closer); ok { + // close the underlying reader if it supports Close() + return c.Close() + } + return nil +} From 529ef7efc2982b39e406f886f1419c199f18cf31 Mon Sep 17 00:00:00 2001 From: Gregor Riepl Date: Wed, 7 Aug 2019 19:49:29 +0200 Subject: [PATCH 10/10] Implemented RTP support in streaming client --- streaming/client.go | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/streaming/client.go b/streaming/client.go index bd73e7b..e68fdf5 100644 --- a/streaming/client.go +++ b/streaming/client.go @@ -31,6 +31,10 @@ import ( "time" ) +const ( + defaultPacketQueueSize int = 16 +) + var ( // ErrInvalidProtocol is thrown when an invalid protocol was specified. // See the docs and example config for a list of supported protocols. @@ -152,6 +156,8 @@ type Client struct { readBufferSize int // packetSize defines the size of individual datagram packets (UDP) packetSize int + // packetQueueSize defines the number of RTP packets to buffer for sequence reordering + packetQueueSize int } // NewClient constructs a new streaming HTTP client, without connecting the socket yet. @@ -237,6 +243,7 @@ func NewClient(name string, uris []string, streamer *Streamer, timeout uint, rec interf: pintf, readBufferSize: int(bufferSize * protocol.MpegTsPacketSize), packetSize: int(packetSize), + packetQueueSize: defaultPacketQueueSize, } return &client, nil } @@ -463,6 +470,37 @@ func (client *Client) start(url *url.URL) error { } conn.SetReadBuffer(client.readBufferSize) client.input = protocol.NewFixedReader(conn, client.packetSize) + case "rtp": + addr, err := net.ResolveUDPAddr("udp", url.Host) + if err != nil { + return err + } + var conn *net.UDPConn + if addr.IP.IsMulticast() { + logger.Logkv( + "event", eventClientOpenUdpMulticast, + "address", addr, + "message", fmt.Sprintf("Joining UDP multicast group %s on interface %v.", url.Host, client.interf), + ) + var err error + conn, err = net.ListenMulticastUDP("udp", client.interf, addr) + if err != nil { + return err + } + } else { + logger.Logkv( + "event", eventClientOpenUdp, + "address", addr, + "message", fmt.Sprintf("Connecting to UDP address %s.", addr), + ) + var err error + conn, err = net.ListenUDP("udp", addr) + if err != nil { + return err + } + } + conn.SetReadBuffer(client.readBufferSize) + client.input = protocol.NewRtpBridge(conn, client.packetSize, client.packetQueueSize) default: return ErrInvalidProtocol }