Skip to content

Commit

Permalink
RSDK-7821 Log basic metrics from signaling answerer (#282)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjirewis authored May 1, 2024
1 parent f9381f0 commit 168c284
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 0 deletions.
92 changes: 92 additions & 0 deletions rpc/answerer_stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package rpc

import (
"fmt"
"sync"
"time"

"github.com/edaniels/golog"
"github.com/pion/webrtc/v3"
)

// timeFormatStr copied from DefaultTimeFormatStr in RDK.
const timeFormatStr = "2006-01-02T15:04:05.000Z0700"

// answererStats is a collection of measurements/information gathered during
// the course of a single connection establishment attempt from a signaling
// answerer. We log the contents of the struct as a single INFO log message
// upon successful or failed connection establishment to avoid emitting dozens
// of logs during the answering process and cluttering regular robot logs.
// Answerer stats are only logged in production for external signalers.
type answererStats struct {
// mu guards all fields on answererStats.
mu sync.Mutex

success bool

// Stats below will be logged.
answerRequestInitReceived *time.Time
numAnswerUpdates int
// "local" ICE candidates are those generated by the signaling answerer, so
// these represent server-side candidates.
localICECandidates []*localICECandidate
// "remote" ICE candidates are those received from the caller, so these
// represent client-side candidates.
remoteICECandidates []*remoteICECandidate
}

// logs the answerer stats if connection establishment was successful or there
// was a clear failure: !success && as.AnswerRequestInitReceived != nil. If
// !success && as.AnswerRequestInitReceived == nil, another answerer picked up
// the connection establishment attempt. Cannot be called while holding mutex.
func (as *answererStats) log(logger golog.Logger) {
as.mu.Lock()
defer as.mu.Unlock()

msg := "Connection establishment succeeded"
if !as.success {
msg = "Connection establishment failed"
if as.answerRequestInitReceived == nil {
return
}
}

var fields []any
if as.answerRequestInitReceived != nil {
fAnswerRequestInitReceived := as.answerRequestInitReceived.Format(timeFormatStr)
fields = append(fields, "answerRequestInitReceieved", fAnswerRequestInitReceived)
}
fields = append(fields, "numAnswerUpdates", as.numAnswerUpdates)

var lics, rics []string
for _, lic := range as.localICECandidates {
lics = append(lics, lic.String())
}
for _, ric := range as.remoteICECandidates {
rics = append(rics, ric.String())
}
fields = append(fields, "localICECandidates", lics)
fields = append(fields, "remoteICECandidates", rics)

logger.Infow(msg, fields...)
}

type localICECandidate struct {
gatheredAt time.Time
candidate *webrtc.ICECandidate
}

func (lic *localICECandidate) String() string {
fGatheredAt := lic.gatheredAt.Format(timeFormatStr)
return fmt.Sprintf("at: %v, candidate: %s", fGatheredAt, lic.candidate)
}

type remoteICECandidate struct {
receivedAt time.Time
candidate *webrtc.ICECandidateInit
}

func (ric *remoteICECandidate) String() string {
fReceivedAt := ric.receivedAt.Format(timeFormatStr)
return fmt.Sprintf("at: %v, candidate: %v", fReceivedAt, ric.candidate.Candidate)
}
2 changes: 2 additions & 0 deletions rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,7 @@ func NewServer(logger golog.Logger, opts ...ServerOption) (Server, error) {
sOpts.webrtcOpts.ExternalSignalingDialOpts,
config,
logger.Named("external_signaler"),
true, // logStats == true
))
} else {
sOpts.webrtcOpts.EnableInternalSignaling = true
Expand Down Expand Up @@ -642,6 +643,7 @@ func NewServer(logger golog.Logger, opts ...ServerOption) (Server, error) {
answererDialOpts,
config,
logger.Named("internal_signaler"),
false, // logStats == false
))
}
}
Expand Down
2 changes: 2 additions & 0 deletions rpc/wrtc_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func testWebRTCClientServer(t *testing.T, signalingCallQueue WebRTCCallQueue, lo
[]DialOption{WithInsecure()},
webrtc.Configuration{},
logger,
false, // `logStats == false` to quiet logs
)
answerer.Start()

Expand Down Expand Up @@ -488,6 +489,7 @@ func testWebRTCClientAnswerConcurrent(t *testing.T, signalingCallQueue WebRTCCal
[]DialOption{WithInsecure()},
webrtc.Configuration{},
logger,
false, // `logStats == false` to quiet logs
)
answerer.Start()

Expand Down
35 changes: 35 additions & 0 deletions rpc/wrtc_signaling_answerer.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ type webrtcSignalingAnswerer struct {

closeCtx context.Context
logger golog.Logger

// When logStats is true, an INFO level log message containing metrics gathered during connection
// establishment will be output at the end of every connection establishment attempt. See comments on
// `answererStats` for more info.
logStats bool
}

// newWebRTCSignalingAnswerer makes an answerer that will connect to and listen for calls at the given
Expand All @@ -57,6 +62,7 @@ func newWebRTCSignalingAnswerer(
dialOpts []DialOption,
webrtcConfig webrtc.Configuration,
logger golog.Logger,
logStats bool,
) *webrtcSignalingAnswerer {
dialOptsCopy := make([]DialOption, len(dialOpts))
copy(dialOptsCopy, dialOpts)
Expand All @@ -71,6 +77,7 @@ func newWebRTCSignalingAnswerer(
cancelAnswerWorkers: cancel,
closeCtx: closeCtx,
logger: logger,
logStats: logStats,
}
}

Expand Down Expand Up @@ -238,6 +245,14 @@ func (ans *webrtcSignalingAnswerer) Stop() {
// the designated WebRTC data channel is passed off to the underlying Server which
// is then used as the server end of a gRPC connection.
func (ans *webrtcSignalingAnswerer) answer(client webrtcpb.SignalingService_AnswerClient) (err error) {
// Maintain and eventually log a collection of stats for each answering attempt.
stats := &answererStats{}
defer func() {
if ans.logStats {
stats.log(ans.logger)
}
}()

resp, err := client.Recv()
if err != nil {
return err
Expand All @@ -256,6 +271,10 @@ func (ans *webrtcSignalingAnswerer) answer(client webrtcpb.SignalingService_Answ
},
})
}
answerRequestInitReceived := time.Now()
stats.mu.Lock()
stats.answerRequestInitReceived = &answerRequestInitReceived
stats.mu.Unlock()
init := initStage.Init

disableTrickle := false
Expand Down Expand Up @@ -283,6 +302,10 @@ func (ans *webrtcSignalingAnswerer) answer(client webrtcpb.SignalingService_Answ
defer func() {
if !(successful && err == nil) {
err = multierr.Combine(err, pc.Close())
} else {
stats.mu.Lock()
stats.success = true
stats.mu.Unlock()
}
}()

Expand Down Expand Up @@ -333,6 +356,10 @@ func (ans *webrtcSignalingAnswerer) answer(client webrtcpb.SignalingService_Answ
return
}
if icecandidate != nil {
localCand := &localICECandidate{time.Now(), icecandidate}
stats.mu.Lock()
stats.localICECandidates = append(stats.localICECandidates, localCand)
stats.mu.Unlock()
pendingCandidates.Add(1)
if icecandidate.Typ == webrtc.ICECandidateTypeHost {
waitOneHostOnce.Do(func() {
Expand Down Expand Up @@ -390,6 +417,10 @@ func (ans *webrtcSignalingAnswerer) answer(client webrtcpb.SignalingService_Answ
}); err != nil {
sendErr(err)
}

stats.mu.Lock()
stats.numAnswerUpdates++
stats.mu.Unlock()
})
})

Expand Down Expand Up @@ -456,6 +487,10 @@ func (ans *webrtcSignalingAnswerer) answer(client webrtcpb.SignalingService_Answ
return errors.Errorf("uuid mismatch; have=%q want=%q", ansResp.Uuid, uuid)
}
cand := iceCandidateFromProto(s.Update.Candidate)
remoteCand := &remoteICECandidate{time.Now(), &cand}
stats.mu.Lock()
stats.remoteICECandidates = append(stats.remoteICECandidates, remoteCand)
stats.mu.Unlock()
if err := pc.AddICECandidate(cand); err != nil {
return err
}
Expand Down
2 changes: 2 additions & 0 deletions rpc/wrtc_signaling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func testWebRTCSignaling(t *testing.T, signalingCallQueue WebRTCCallQueue, logge
[]DialOption{WithInsecure()},
webrtc.Configuration{},
logger,
false, // `logStats == false` to quiet logs
)
answerer.Start()

Expand Down Expand Up @@ -180,6 +181,7 @@ func TestWebRTCAnswererImmediateStop(t *testing.T) {
[]DialOption{WithInsecure()},
webrtc.Configuration{},
logger,
false, // `logStats == false` to quiet logs
)

// Running both asynchronously means Stop will potentially happen before Start,
Expand Down

0 comments on commit 168c284

Please sign in to comment.