diff --git a/controllers/admin/hardware.go b/controllers/admin/hardware.go index 3bdbde6e0..fc036bc3f 100644 --- a/controllers/admin/hardware.go +++ b/controllers/admin/hardware.go @@ -10,7 +10,7 @@ import ( // GetHardwareStats will return hardware utilization over time. func GetHardwareStats(w http.ResponseWriter, r *http.Request) { - m := metrics.Metrics + m := metrics.GetMetrics() w.Header().Set("Content-Type", "application/json") err := json.NewEncoder(w).Encode(m) diff --git a/controllers/admin/video.go b/controllers/admin/video.go new file mode 100644 index 000000000..a6cf5a588 --- /dev/null +++ b/controllers/admin/video.go @@ -0,0 +1,60 @@ +package admin + +import ( + "encoding/json" + "net/http" + + "github.com/owncast/owncast/core" + "github.com/owncast/owncast/core/data" + "github.com/owncast/owncast/metrics" + log "github.com/sirupsen/logrus" +) + +// GetVideoPlaybackMetrics returns video playback metrics. +func GetVideoPlaybackMetrics(w http.ResponseWriter, r *http.Request) { + type response struct { + Errors []metrics.TimestampedValue `json:"errors"` + QualityVariantChanges []metrics.TimestampedValue `json:"qualityVariantChanges"` + Latency []metrics.TimestampedValue `json:"latency"` + SegmentDownloadDuration []metrics.TimestampedValue `json:"segmentDownloadDuration"` + SlowestDownloadRate []metrics.TimestampedValue `json:"minPlayerBitrate"` + AvailableBitrates []int `json:"availableBitrates"` + SegmentLength int `json:"segmentLength"` + } + + availableBitrates := []int{} + var segmentLength int + if core.GetCurrentBroadcast() != nil { + segmentLength = core.GetCurrentBroadcast().LatencyLevel.SecondsPerSegment + for _, variants := range core.GetCurrentBroadcast().OutputSettings { + availableBitrates = append(availableBitrates, variants.VideoBitrate) + } + } else { + segmentLength = data.GetStreamLatencyLevel().SecondsPerSegment + for _, variants := range data.GetStreamOutputVariants() { + availableBitrates = append(availableBitrates, variants.VideoBitrate) + } + } + + errors := metrics.GetPlaybackErrorCountOverTime() + latency := metrics.GetLatencyOverTime() + durations := metrics.GetDownloadDurationsOverTime() + minPlayerBitrate := metrics.GetSlowestDownloadRateOverTime() + qualityVariantChanges := metrics.GetQualityVariantChangesOverTime() + + resp := response{ + AvailableBitrates: availableBitrates, + Errors: errors, + Latency: latency, + SegmentLength: segmentLength, + SegmentDownloadDuration: durations, + SlowestDownloadRate: minPlayerBitrate, + QualityVariantChanges: qualityVariantChanges, + } + + w.Header().Set("Content-Type", "application/json") + err := json.NewEncoder(w).Encode(resp) + if err != nil { + log.Errorln(err) + } +} diff --git a/controllers/playbackMetrics.go b/controllers/playbackMetrics.go new file mode 100644 index 000000000..2b2f29df9 --- /dev/null +++ b/controllers/playbackMetrics.go @@ -0,0 +1,40 @@ +package controllers + +import ( + "encoding/json" + "net/http" + + "github.com/owncast/owncast/metrics" + log "github.com/sirupsen/logrus" +) + +// ReportPlaybackMetrics will accept playback metrics from a client and save +// them for future video health reporting. +func ReportPlaybackMetrics(w http.ResponseWriter, r *http.Request) { + if r.Method != POST { + WriteSimpleResponse(w, false, r.Method+" not supported") + return + } + + type reportPlaybackMetricsRequest struct { + Bandwidth float64 `json:"bandwidth"` + Latency float64 `json:"latency"` + Errors float64 `json:"errors"` + DownloadDuration float64 `json:"downloadDuration"` + QualityVariantChanges float64 `json:"qualityVariantChanges"` + } + + decoder := json.NewDecoder(r.Body) + var request reportPlaybackMetricsRequest + if err := decoder.Decode(&request); err != nil { + log.Errorln("error decoding playback metrics payload:", err) + WriteSimpleResponse(w, false, err.Error()) + return + } + + metrics.RegisterPlaybackErrorCount(request.Errors) + metrics.RegisterPlayerBandwidth(request.Bandwidth) + metrics.RegisterPlayerLatency(request.Latency) + metrics.RegisterPlayerSegmentDownloadDuration(request.DownloadDuration) + metrics.RegisterQualityVariantChangesCount(request.QualityVariantChanges) +} diff --git a/metrics/alerting.go b/metrics/alerting.go index 58452de99..ad3f9ad04 100644 --- a/metrics/alerting.go +++ b/metrics/alerting.go @@ -6,17 +6,21 @@ import ( log "github.com/sirupsen/logrus" ) -const maxCPUAlertingThresholdPCT = 85 -const maxRAMAlertingThresholdPCT = 85 -const maxDiskAlertingThresholdPCT = 90 +const ( + maxCPUAlertingThresholdPCT = 85 + maxRAMAlertingThresholdPCT = 85 + maxDiskAlertingThresholdPCT = 90 +) -var inCPUAlertingState = false -var inRAMAlertingState = false -var inDiskAlertingState = false +var ( + inCPUAlertingState = false + inRAMAlertingState = false + inDiskAlertingState = false +) var errorResetDuration = time.Minute * 5 -const alertingError = "The %s utilization of %d%% could cause problems with video generation and delivery. Visit the documentation at http://owncast.online/docs/troubleshooting/ if you are experiencing issues." +const alertingError = "The %s utilization of %f%% could cause problems with video generation and delivery. Visit the documentation at http://owncast.online/docs/troubleshooting/ if you are experiencing issues." func handleAlerting() { handleCPUAlerting() @@ -25,11 +29,11 @@ func handleAlerting() { } func handleCPUAlerting() { - if len(Metrics.CPUUtilizations) < 2 { + if len(metrics.CPUUtilizations) < 2 { return } - avg := recentAverage(Metrics.CPUUtilizations) + avg := recentAverage(metrics.CPUUtilizations) if avg > maxCPUAlertingThresholdPCT && !inCPUAlertingState { log.Warnf(alertingError, "CPU", avg) inCPUAlertingState = true @@ -43,11 +47,11 @@ func handleCPUAlerting() { } func handleRAMAlerting() { - if len(Metrics.RAMUtilizations) < 2 { + if len(metrics.RAMUtilizations) < 2 { return } - avg := recentAverage(Metrics.RAMUtilizations) + avg := recentAverage(metrics.RAMUtilizations) if avg > maxRAMAlertingThresholdPCT && !inRAMAlertingState { log.Warnf(alertingError, "memory", avg) inRAMAlertingState = true @@ -61,11 +65,11 @@ func handleRAMAlerting() { } func handleDiskAlerting() { - if len(Metrics.DiskUtilizations) < 2 { + if len(metrics.DiskUtilizations) < 2 { return } - avg := recentAverage(Metrics.DiskUtilizations) + avg := recentAverage(metrics.DiskUtilizations) if avg > maxDiskAlertingThresholdPCT && !inDiskAlertingState { log.Warnf(alertingError, "disk", avg) @@ -79,6 +83,6 @@ func handleDiskAlerting() { } } -func recentAverage(values []timestampedValue) int { +func recentAverage(values []TimestampedValue) float64 { return (values[len(values)-1].Value + values[len(values)-2].Value) / 2 } diff --git a/metrics/hardware.go b/metrics/hardware.go index 99c967040..7b749827a 100644 --- a/metrics/hardware.go +++ b/metrics/hardware.go @@ -14,8 +14,8 @@ import ( const maxCollectionValues = 500 func collectCPUUtilization() { - if len(Metrics.CPUUtilizations) > maxCollectionValues { - Metrics.CPUUtilizations = Metrics.CPUUtilizations[1:] + if len(metrics.CPUUtilizations) > maxCollectionValues { + metrics.CPUUtilizations = metrics.CPUUtilizations[1:] } v, err := cpu.Percent(0, false) @@ -24,29 +24,29 @@ func collectCPUUtilization() { return } - metricValue := timestampedValue{time.Now(), int(v[0])} - Metrics.CPUUtilizations = append(Metrics.CPUUtilizations, metricValue) - cpuUsage.Set(float64(metricValue.Value)) + metricValue := TimestampedValue{time.Now(), v[0]} + metrics.CPUUtilizations = append(metrics.CPUUtilizations, metricValue) + cpuUsage.Set(metricValue.Value) } func collectRAMUtilization() { - if len(Metrics.RAMUtilizations) > maxCollectionValues { - Metrics.RAMUtilizations = Metrics.RAMUtilizations[1:] + if len(metrics.RAMUtilizations) > maxCollectionValues { + metrics.RAMUtilizations = metrics.RAMUtilizations[1:] } memoryUsage, _ := mem.VirtualMemory() - metricValue := timestampedValue{time.Now(), int(memoryUsage.UsedPercent)} - Metrics.RAMUtilizations = append(Metrics.RAMUtilizations, metricValue) + metricValue := TimestampedValue{time.Now(), memoryUsage.UsedPercent} + metrics.RAMUtilizations = append(metrics.RAMUtilizations, metricValue) } func collectDiskUtilization() { path := "./" diskUse, _ := disk.Usage(path) - if len(Metrics.DiskUtilizations) > maxCollectionValues { - Metrics.DiskUtilizations = Metrics.DiskUtilizations[1:] + if len(metrics.DiskUtilizations) > maxCollectionValues { + metrics.DiskUtilizations = metrics.DiskUtilizations[1:] } - metricValue := timestampedValue{time.Now(), int(diskUse.UsedPercent)} - Metrics.DiskUtilizations = append(Metrics.DiskUtilizations, metricValue) + metricValue := TimestampedValue{time.Now(), diskUse.UsedPercent} + metrics.DiskUtilizations = append(metrics.DiskUtilizations, metricValue) } diff --git a/metrics/metrics.go b/metrics/metrics.go index dbbbcbd4b..0c42b5d07 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -6,22 +6,32 @@ import ( "github.com/owncast/owncast/config" "github.com/owncast/owncast/core/data" "github.com/owncast/owncast/models" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" ) // How often we poll for updates. -const metricsPollingInterval = 1 * time.Minute +const hardwareMetricsPollingInterval = 1 * time.Minute + +const ( + // How often we poll for updates. + viewerMetricsPollingInterval = 2 * time.Minute + activeChatClientCountKey = "chat_client_count" + activeViewerCountKey = "viewer_count" +) // CollectedMetrics stores different collected + timestamped values. type CollectedMetrics struct { - CPUUtilizations []timestampedValue `json:"cpu"` - RAMUtilizations []timestampedValue `json:"memory"` - DiskUtilizations []timestampedValue `json:"disk"` + CPUUtilizations []TimestampedValue `json:"cpu"` + RAMUtilizations []TimestampedValue `json:"memory"` + DiskUtilizations []TimestampedValue `json:"disk"` + errorCount []TimestampedValue `json:"-"` + lowestBitrate []TimestampedValue `json:"-"` + segmentDownloadSeconds []TimestampedValue `json:"-"` + averageLatency []TimestampedValue `json:"-"` + qualityVariantChanges []TimestampedValue `json:"-"` } // Metrics is the shared Metrics instance. -var Metrics *CollectedMetrics +var metrics *CollectedMetrics // Start will begin the metrics collection and alerting. func Start(getStatus func() models.Status) { @@ -34,40 +44,12 @@ func Start(getStatus func() models.Status) { "host": host, } - activeViewerCount = promauto.NewGauge(prometheus.GaugeOpts{ - Name: "owncast_instance_active_viewer_count", - Help: "The number of viewers.", - ConstLabels: labels, - }) + setupPrometheusCollectors() - activeChatClientCount = promauto.NewGauge(prometheus.GaugeOpts{ - Name: "owncast_instance_active_chat_client_count", - Help: "The number of connected chat clients.", - ConstLabels: labels, - }) - - chatUserCount = promauto.NewGauge(prometheus.GaugeOpts{ - Name: "owncast_instance_total_chat_users", - Help: "The total number of chat users on this Owncast instance.", - ConstLabels: labels, - }) - - currentChatMessageCount = promauto.NewGauge(prometheus.GaugeOpts{ - Name: "owncast_instance_current_chat_message_count", - Help: "The number of chat messages currently saved before cleanup.", - ConstLabels: labels, - }) - - cpuUsage = promauto.NewGauge(prometheus.GaugeOpts{ - Name: "owncast_instance_cpu_use_pct", - Help: "CPU percentage used as seen within Owncast", - ConstLabels: labels, - }) - - Metrics = new(CollectedMetrics) + metrics = new(CollectedMetrics) go startViewerCollectionMetrics() - for range time.Tick(metricsPollingInterval) { + for range time.Tick(hardwareMetricsPollingInterval) { handlePolling() } } @@ -78,6 +60,17 @@ func handlePolling() { collectRAMUtilization() collectDiskUtilization() + collectPlaybackErrorCount() + collectLatencyValues() + collectSegmentDownloadDuration() + collectLowestBandwidth() + collectQualityVariantChanges() + // Alerting handleAlerting() } + +// GetMetrics will return the collected metrics. +func GetMetrics() *CollectedMetrics { + return metrics +} diff --git a/metrics/playback.go b/metrics/playback.go new file mode 100644 index 000000000..be110c42b --- /dev/null +++ b/metrics/playback.go @@ -0,0 +1,168 @@ +package metrics + +import ( + "math" + "time" + + "github.com/owncast/owncast/utils" +) + +// Playback error counts reported since the last time we collected metrics. +var ( + windowedErrorCounts = []float64{} + windowedQualityVariantChanges = []float64{} + windowedBandwidths = []float64{} + windowedLatencies = []float64{} + windowedDownloadDurations = []float64{} +) + +// RegisterPlaybackErrorCount will add to the windowed playback error count. +func RegisterPlaybackErrorCount(count float64) { + windowedErrorCounts = append(windowedErrorCounts, count) +} + +// RegisterQualityVariantChangesCount will add to the windowed quality variant +// change count. +func RegisterQualityVariantChangesCount(count float64) { + windowedQualityVariantChanges = append(windowedQualityVariantChanges, count) +} + +// RegisterPlayerBandwidth will add to the windowed playback bandwidth. +func RegisterPlayerBandwidth(kbps float64) { + windowedBandwidths = append(windowedBandwidths, kbps) +} + +// RegisterPlayerLatency will add to the windowed player latency values. +func RegisterPlayerLatency(seconds float64) { + windowedLatencies = append(windowedLatencies, seconds) +} + +// RegisterPlayerSegmentDownloadDuration will add to the windowed player segment +// download duration values. +func RegisterPlayerSegmentDownloadDuration(seconds float64) { + windowedDownloadDurations = append(windowedDownloadDurations, seconds) +} + +// collectPlaybackErrorCount will take all of the error counts each individual +// player reported and average them into a single metric. This is done so +// one person with bad connectivity doesn't make it look like everything is +// horrible for everyone. +func collectPlaybackErrorCount() { + count := utils.Sum(windowedErrorCounts) + windowedErrorCounts = []float64{} + + metrics.errorCount = append(metrics.errorCount, TimestampedValue{ + Time: time.Now(), + Value: count, + }) + + if len(metrics.errorCount) > maxCollectionValues { + metrics.errorCount = metrics.errorCount[1:] + } + + // Save to Prometheus collector. + playbackErrorCount.Set(count) +} + +func collectSegmentDownloadDuration() { + val := 0.0 + + if len(windowedDownloadDurations) > 0 { + val = utils.Avg(windowedDownloadDurations) + windowedDownloadDurations = []float64{} + } + metrics.segmentDownloadSeconds = append(metrics.segmentDownloadSeconds, TimestampedValue{ + Time: time.Now(), + Value: val, + }) + + if len(metrics.segmentDownloadSeconds) > maxCollectionValues { + metrics.segmentDownloadSeconds = metrics.segmentDownloadSeconds[1:] + } +} + +// GetDownloadDurationsOverTime will return a window of durations errors over time. +func GetDownloadDurationsOverTime() []TimestampedValue { + return metrics.segmentDownloadSeconds +} + +// GetPlaybackErrorCountOverTime will return a window of playback errors over time. +func GetPlaybackErrorCountOverTime() []TimestampedValue { + return metrics.errorCount +} + +func collectLatencyValues() { + val := 0.0 + + if len(windowedLatencies) > 0 { + val = utils.Avg(windowedLatencies) + val = math.Round(val) + windowedLatencies = []float64{} + } + + metrics.averageLatency = append(metrics.averageLatency, TimestampedValue{ + Time: time.Now(), + Value: val, + }) + + if len(metrics.averageLatency) > maxCollectionValues { + metrics.averageLatency = metrics.averageLatency[1:] + } +} + +// GetLatencyOverTime will return the min, max and avg latency values over time. +func GetLatencyOverTime() []TimestampedValue { + if len(metrics.averageLatency) == 0 { + return []TimestampedValue{} + } + + return metrics.averageLatency +} + +// collectLowestBandwidth will collect the lowest bandwidth currently collected +// so we can report to the streamer the worst possible streaming condition +// being experienced. +func collectLowestBandwidth() { + val := 0.0 + + if len(windowedBandwidths) > 0 { + val, _ = utils.MinMax(windowedBandwidths) + val = math.Round(val) + windowedBandwidths = []float64{} + } + + metrics.lowestBitrate = append(metrics.lowestBitrate, TimestampedValue{ + Time: time.Now(), + Value: math.Round(val), + }) + + if len(metrics.lowestBitrate) > maxCollectionValues { + metrics.lowestBitrate = metrics.lowestBitrate[1:] + } +} + +// GetSlowestDownloadRateOverTime will return the collected lowest bandwidth values +// over time. +func GetSlowestDownloadRateOverTime() []TimestampedValue { + if len(metrics.lowestBitrate) == 0 { + return []TimestampedValue{} + } + + return metrics.lowestBitrate +} + +func collectQualityVariantChanges() { + count := utils.Sum(windowedQualityVariantChanges) + windowedQualityVariantChanges = []float64{} + + metrics.qualityVariantChanges = append(metrics.qualityVariantChanges, TimestampedValue{ + Time: time.Now(), + Value: count, + }) +} + +// GetQualityVariantChangesOverTime will return the collected quality variant +// changes. +func GetQualityVariantChangesOverTime() []TimestampedValue { + return metrics.qualityVariantChanges +} diff --git a/metrics/prometheus.go b/metrics/prometheus.go index 89c1bb717..3fbe4235e 100644 --- a/metrics/prometheus.go +++ b/metrics/prometheus.go @@ -2,6 +2,7 @@ package metrics import ( "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" ) var ( @@ -11,4 +12,44 @@ var ( cpuUsage prometheus.Gauge chatUserCount prometheus.Gauge currentChatMessageCount prometheus.Gauge + playbackErrorCount prometheus.Gauge ) + +func setupPrometheusCollectors() { + // Setup the Prometheus collectors. + activeViewerCount = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "owncast_instance_active_viewer_count", + Help: "The number of viewers.", + ConstLabels: labels, + }) + + activeChatClientCount = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "owncast_instance_active_chat_client_count", + Help: "The number of connected chat clients.", + ConstLabels: labels, + }) + + chatUserCount = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "owncast_instance_total_chat_users", + Help: "The total number of chat users on this Owncast instance.", + ConstLabels: labels, + }) + + currentChatMessageCount = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "owncast_instance_current_chat_message_count", + Help: "The number of chat messages currently saved before cleanup.", + ConstLabels: labels, + }) + + playbackErrorCount = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "owncast_instance_playback_error_count", + Help: "Errors collected from players within this window", + ConstLabels: labels, + }) + + cpuUsage = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "owncast_instance_cpu_usage", + Help: "CPU usage as seen internally to Owncast.", + ConstLabels: labels, + }) +} diff --git a/metrics/timestampedValue.go b/metrics/timestampedValue.go index 0eba30a84..89a99231f 100644 --- a/metrics/timestampedValue.go +++ b/metrics/timestampedValue.go @@ -6,15 +6,16 @@ import ( "github.com/nakabonne/tstorage" ) -type timestampedValue struct { +// TimestampedValue is a value with a timestamp. +type TimestampedValue struct { Time time.Time `json:"time"` - Value int `json:"value"` + Value float64 `json:"value"` } -func makeTimestampedValuesFromDatapoints(dp []*tstorage.DataPoint) []timestampedValue { - tv := []timestampedValue{} +func makeTimestampedValuesFromDatapoints(dp []*tstorage.DataPoint) []TimestampedValue { + tv := []TimestampedValue{} for _, d := range dp { - tv = append(tv, timestampedValue{Time: time.Unix(d.Timestamp, 0), Value: int(d.Value)}) + tv = append(tv, TimestampedValue{Time: time.Unix(d.Timestamp, 0), Value: d.Value}) } return tv diff --git a/metrics/viewers.go b/metrics/viewers.go index 79539be90..40dd8c5dc 100644 --- a/metrics/viewers.go +++ b/metrics/viewers.go @@ -10,9 +10,6 @@ import ( log "github.com/sirupsen/logrus" ) -// How often we poll for updates. -const viewerMetricsPollingInterval = 2 * time.Minute - var storage tstorage.Storage func startViewerCollectionMetrics() { @@ -36,8 +33,20 @@ func collectViewerCount() { return } - // Save to our Prometheus collector. - activeViewerCount.Set(float64(core.GetStatus().ViewerCount)) + count := core.GetStatus().ViewerCount + + // Save active viewer count to our Prometheus collector. + activeViewerCount.Set(float64(count)) + + // Insert active viewer count into our on-disk time series storage. + if err := storage.InsertRows([]tstorage.Row{ + { + Metric: activeViewerCountKey, + DataPoint: tstorage.DataPoint{Timestamp: time.Now().Unix(), Value: float64(count)}, + }, + }); err != nil { + log.Errorln(err) + } } func collectChatClientCount() { @@ -46,15 +55,18 @@ func collectChatClientCount() { // Total message count cmc := data.GetMessagesCount() + // Insert message count into Prometheus collector. currentChatMessageCount.Set(float64(cmc)) // Total user count uc := data.GetUsersCount() + // Insert user count into Prometheus collector. chatUserCount.Set(float64(uc)) + // Insert active chat user count into our on-disk time series storage. if err := storage.InsertRows([]tstorage.Row{ { - Metric: "viewercount", + Metric: activeChatClientCountKey, DataPoint: tstorage.DataPoint{Timestamp: time.Now().Unix(), Value: float64(count)}, }, }); err != nil { @@ -63,8 +75,19 @@ func collectChatClientCount() { } // GetViewersOverTime will return a window of viewer counts over time. -func GetViewersOverTime(start, end time.Time) []timestampedValue { - p, err := storage.Select("viewercount", nil, start.Unix(), end.Unix()) +func GetViewersOverTime(start, end time.Time) []TimestampedValue { + p, err := storage.Select(activeViewerCountKey, nil, start.Unix(), end.Unix()) + if err != nil && err != tstorage.ErrNoDataPoints { + log.Errorln(err) + } + datapoints := makeTimestampedValuesFromDatapoints(p) + + return datapoints +} + +// GetChatClientCountOverTime will return a window of connected chat clients over time. +func GetChatClientCountOverTime(start, end time.Time) []TimestampedValue { + p, err := storage.Select(activeChatClientCountKey, nil, start.Unix(), end.Unix()) if err != nil && err != tstorage.ErrNoDataPoints { log.Errorln(err) } diff --git a/router/router.go b/router/router.go index 3d03f423e..5f1f4b03b 100644 --- a/router/router.go +++ b/router/router.go @@ -80,6 +80,9 @@ func Start() error { // return followers http.HandleFunc("/api/followers", middleware.HandlePagination(controllers.GetFollowers)) + // save client video playback metrics + http.HandleFunc("/api/metrics/playback", controllers.ReportPlaybackMetrics) + // Authenticated admin requests // Current inbound broadcaster @@ -294,6 +297,9 @@ func Start() error { // set custom style css http.HandleFunc("/api/admin/config/customstyles", middleware.RequireAdminAuth(admin.SetCustomStyles)) + // Video playback metrics + http.HandleFunc("/api/admin/metrics/video", middleware.RequireAdminAuth(admin.GetVideoPlaybackMetrics)) + // Inline chat moderation actions // Update chat message visibility diff --git a/utils/performanceTimer.go b/utils/performanceTimer.go index 30206c7a2..83f1a98e2 100644 --- a/utils/performanceTimer.go +++ b/utils/performanceTimer.go @@ -41,7 +41,7 @@ func GetAveragePerformance(key string) float64 { } _durationStorage[key] = removeHighValue(_durationStorage[key]) - return avg(_durationStorage[key]) + return Avg(_durationStorage[key]) } func removeHighValue(values []float64) []float64 { @@ -49,7 +49,8 @@ func removeHighValue(values []float64) []float64 { return values[:len(values)-1] } -func avg(values []float64) float64 { +// Avg will return the average value from a slice of float64s. +func Avg(values []float64) float64 { total := 0.0 for _, number := range values { total += number @@ -57,3 +58,27 @@ func avg(values []float64) float64 { average := total / float64(len(values)) return average } + +// Sum returns the sum of a slice of values. +func Sum(values []float64) float64 { + total := 0.0 + for _, number := range values { + total += number + } + return total +} + +// MinMax will return the min and max values from a slice of float64s. +func MinMax(array []float64) (float64, float64) { + max := array[0] + min := array[0] + for _, value := range array { + if max < value { + max = value + } + if min > value { + min = value + } + } + return min, max +} diff --git a/webroot/js/components/player.js b/webroot/js/components/player.js index 9f76b34b0..12dfbdcf7 100644 --- a/webroot/js/components/player.js +++ b/webroot/js/components/player.js @@ -3,9 +3,37 @@ import videojs from '/js/web_modules/videojs/dist/video.min.js'; import { getLocalStorage, setLocalStorage } from '../utils/helpers.js'; import { PLAYER_VOLUME, URL_STREAM } from '../utils/constants.js'; +import PlaybackMetrics from '../metrics/playback.js'; +import LatencyCompensator from './latencyCompensator.js'; const VIDEO_ID = 'video'; +const EVENTS = [ + 'loadstart', + 'progress', + 'suspend', + 'abort', + 'error', + 'emptied', + 'stalled', + 'loadedmetadata', + 'loadeddata', + 'canplay', + 'canplaythrough', + 'playing', + 'waiting', + 'seeking', + 'seeked', + 'ended', + 'durationchange', + 'timeupdate', + 'play', + 'pause', + 'ratechange', + 'resize', + 'volumechange', +]; + // Video setup const VIDEO_SRC = { src: URL_STREAM, @@ -37,11 +65,45 @@ const VIDEO_OPTIONS = { export const POSTER_DEFAULT = `/img/logo.png`; export const POSTER_THUMB = `/thumbnail.jpg`; +function getCurrentlyPlayingSegment(tech, old_segment = null) { + var target_media = tech.vhs.playlists.media(); + var snapshot_time = tech.currentTime(); + + var segment; + var segment_time; + + // Itinerate trough available segments and get first within which snapshot_time is + for (var i = 0, l = target_media.segments.length; i < l; i++) { + // Note: segment.end may be undefined or is not properly set + if (snapshot_time < target_media.segments[i].end) { + segment = target_media.segments[i]; + break; + } + } + + // Null segment_time in case it's lower then 0. + if (segment) { + segment_time = Math.max( + 0, + snapshot_time - (segment.end - segment.duration) + ); + // Because early segments don't have end property + } else { + segment = target_media.segments[0]; + segment_time = 0; + } + + return segment; +} + class OwncastPlayer { constructor() { window.VIDEOJS_NO_DYNAMIC_STYLE = true; // style override + this.playbackMetrics = new PlaybackMetrics(); + this.vjsPlayer = null; + this.latencyCompensator = null; this.appPlayerReadyCallback = null; this.appPlayerPlayingCallback = null; @@ -54,8 +116,9 @@ class OwncastPlayer { this.handleVolume = this.handleVolume.bind(this); this.handleEnded = this.handleEnded.bind(this); this.handleError = this.handleError.bind(this); + this.handleWaiting = this.handleWaiting.bind(this); + this.handleNoLongerBuffering = this.handleNoLongerBuffering.bind(this); this.addQualitySelector = this.addQualitySelector.bind(this); - this.qualitySelectionMenu = null; } @@ -63,11 +126,33 @@ class OwncastPlayer { this.addAirplay(); this.addQualitySelector(); + // Keep a reference of the standard vjs xhr function. + const oldVjsXhrCallback = videojs.xhr; + + // Override the xhr function to track segment download time. + videojs.Vhs.xhr = (...args) => { + if (args[0].uri.match('.ts')) { + const start = new Date(); + + const cb = args[1]; + args[1] = (request, error, response) => { + const end = new Date(); + const delta = end.getTime() - start.getTime(); + this.playbackMetrics.trackSegmentDownloadTime(delta); + cb(request, error, response); + }; + } + + return oldVjsXhrCallback(...args); + }; + + // Add a cachebuster param to playlist URLs. videojs.Vhs.xhr.beforeRequest = (options) => { if (options.uri.match('m3u8')) { const cachebuster = Math.random().toString(16).substr(2, 8); options.uri = `${options.uri}?cachebust=${cachebuster}`; } + return options; }; @@ -100,16 +185,39 @@ class OwncastPlayer { } handleReady() { - this.log('on Ready'); this.vjsPlayer.on('error', this.handleError); this.vjsPlayer.on('playing', this.handlePlaying); + this.vjsPlayer.on('waiting', this.handleWaiting); + this.vjsPlayer.on('canplaythrough', this.handleNoLongerBuffering); this.vjsPlayer.on('volumechange', this.handleVolume); this.vjsPlayer.on('ended', this.handleEnded); + this.vjsPlayer.on('ready', () => { + const tech = this.vjsPlayer.tech({ IWillNotUseThisInPlugins: true }); + tech.on('usage', (e) => { + if (e.name === 'vhs-unknown-waiting') { + this.playbackMetrics.incrementErrorCount(1); + } + + if (e.name === 'vhs-rendition-change-abr') { + // Quality variant has changed + this.playbackMetrics.incrementQualityVariantChanges(); + } + }); + + // Variant changed + const trackElements = this.vjsPlayer.textTracks(); + trackElements.addEventListener('cuechange', function (c) { + console.log(c); + }); + }); + if (this.appPlayerReadyCallback) { // start polling this.appPlayerReadyCallback(); } + + this.vjsPlayer.log.level('debug'); } handleVolume() { @@ -125,6 +233,22 @@ class OwncastPlayer { // start polling this.appPlayerPlayingCallback(); } + + setInterval(() => { + const tech = this.vjsPlayer.tech({ IWillNotUseThisInPlugins: true }); + const bandwidth = tech.vhs.systemBandwidth; + this.playbackMetrics.trackBandwidth(bandwidth); + + try { + const segment = getCurrentlyPlayingSegment(tech); + const segmentTime = segment.dateTimeObject.getTime(); + const now = new Date().getTime(); + const latency = now - segmentTime; + this.playbackMetrics.trackLatency(latency); + } catch (err) { + console.warn(err); + } + }, 5000); } handleEnded() { @@ -139,6 +263,17 @@ class OwncastPlayer { if (this.appPlayerEndedCallback) { this.appPlayerEndedCallback(); } + + this.playbackMetrics.incrementErrorCount(1); + } + + handleWaiting(e) { + // this.playbackMetrics.incrementErrorCount(1); + this.playbackMetrics.isBuffering = true; + } + + handleNoLongerBuffering() { + this.playbackMetrics.isBuffering = false; } log(message) { diff --git a/webroot/js/metrics/playback.js b/webroot/js/metrics/playback.js new file mode 100644 index 000000000..b468a8594 --- /dev/null +++ b/webroot/js/metrics/playback.js @@ -0,0 +1,98 @@ +import { URL_PLAYBACK_METRICS } from '../utils/constants.js'; +const METRICS_SEND_INTERVAL = 10000; + +class PlaybackMetrics { + constructor() { + this.hasPerformedInitialVariantChange = false; + + this.segmentDownloadTime = []; + this.bandwidthTracking = []; + this.latencyTracking = []; + this.errors = 0; + this.qualityVariantChanges = 0; + this.isBuffering = false; + + setInterval(() => { + this.send(); + }, METRICS_SEND_INTERVAL); + } + + incrementErrorCount(count) { + this.errors += count; + } + + incrementQualityVariantChanges() { + // We always start the player at the lowest quality, so let's just not + // count the first change. + if (!this.hasPerformedInitialVariantChange) { + this.hasPerformedInitialVariantChange = true; + return; + } + this.qualityVariantChanges++; + } + + trackSegmentDownloadTime(seconds) { + this.segmentDownloadTime.push(seconds); + } + + trackBandwidth(bps) { + this.bandwidthTracking.push(bps); + } + + trackLatency(latency) { + this.latencyTracking.push(latency); + } + + async send() { + if ( + this.segmentDownloadTime.length < 4 || + this.bandwidthTracking.length < 4 + ) { + return; + } + + const errorCount = this.errors; + const average = (arr) => arr.reduce((p, c) => p + c, 0) / arr.length; + + const averageDownloadDuration = average(this.segmentDownloadTime) / 1000; + const roundedAverageDownloadDuration = + Math.round(averageDownloadDuration * 1000) / 1000; + + const averageBandwidth = average(this.bandwidthTracking) / 1000; + const roundedAverageBandwidth = Math.round(averageBandwidth * 1000) / 1000; + + const averageLatency = average(this.latencyTracking) / 1000; + const roundedAverageLatency = Math.round(averageLatency * 1000) / 1000; + + const data = { + bandwidth: roundedAverageBandwidth, + latency: roundedAverageLatency, + downloadDuration: roundedAverageDownloadDuration, + errors: errorCount + this.isBuffering ? 1 : 0, + qualityVariantChanges: this.qualityVariantChanges, + }; + this.errors = 0; + this.qualityVariantChanges = 0; + this.segmentDownloadTime = []; + this.bandwidthTracking = []; + this.latencyTracking = []; + + const options = { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + body: JSON.stringify(data), + }; + + try { + fetch(URL_PLAYBACK_METRICS, options); + } catch (e) { + console.error(e); + } + + // console.log(data); + } +} + +export default PlaybackMetrics; diff --git a/webroot/js/utils/constants.js b/webroot/js/utils/constants.js index ddb4bf3f9..9cb9f04c9 100644 --- a/webroot/js/utils/constants.js +++ b/webroot/js/utils/constants.js @@ -17,6 +17,7 @@ export const URL_WEBSOCKET = `${ }://${location.host}/ws`; export const URL_CHAT_REGISTRATION = `/api/chat/register`; export const URL_FOLLOWERS = `/api/followers`; +export const URL_PLAYBACK_METRICS = `/api/metrics/playback`; export const TIMER_STATUS_UPDATE = 5000; // ms export const TIMER_DISABLE_CHAT_AFTER_OFFLINE = 5 * 60 * 1000; // 5 mins