eBPF: Handling Events in Userspace

3 months ago 1

Objective

  • to understand how eBPF-events are being handled in userspace in various open-source projects
    • to learn their approach for handling massive amount of events
Note

Currently only tetragon project is covered.

Reasoning

Once eBPF-events are written by the kernel-space hook in ringBuffer or perfBuffer, they become available for consumption from user-space.

Following steps are usually performed in user-space code;

  1. Preparation of ringBuffer / perfBuffer reader
  2. Reading of records from buffer
  3. Processing of raw-samples

Tetragon

Preparation

PerfEvent reader is prepared from pinned perf-map. [Source]

snippet.go
... pinOpts := ebpf.LoadPinOptions{} perfMap, err := ebpf.LoadPinnedMap(k.PerfConfig.MapName, &pinOpts) if err != nil { return fmt.Errorf("opening pinned map '%s' failed: %w", k.PerfConfig.MapName, err) } defer perfMap.Close() rbSize := k.getRBSize(int(perfMap.MaxEntries())) perfReader, err := perf.NewReader(perfMap, rbSize) ...

Reading

A goroutine is launched;

  • to read records from perfReader that adds them to eventsQueue (a buffered-channel).

[Source]

snippet2.go
... // We spawn go routine to read and process perf events, // connected with main app through eventsQueue channel. eventsQueue := make(chan *perf.Record, k.getRBQueueSize()) // Listeners are ready and about to start reading from perf reader, tell // user everything is ready. k.log.Info("Listening for events...") // Start reading records from the perf array. Reads until the reader is closed. var wg sync.WaitGroup wg.Add(1) defer wg.Wait() go func() { defer wg.Done() for stopCtx.Err() == nil { record, err := perfReader.Read() if err != nil { // NOTE(JM and Djalal): count and log errors while excluding the stopping context if stopCtx.Err() == nil { RingbufErrors.Inc() errorCnt := getCounterValue(RingbufErrors) k.log.Warn("Reading bpf events failed", "errors", errorCnt, logfields.Error, err) } } else { if len(record.RawSample) > 0 { select { case eventsQueue <- &record: default: // eventsQueue channel is full, drop the event queueLost.Inc() } RingbufReceived.Inc() } if record.LostSamples > 0 { RingbufLost.Add(float64(record.LostSamples)) } } } }() ...

Another goroutine is launched;

  • for reading records from eventsQueue, where they are passed to receiveEvent() for processing

[Source]

snippet3.go
... // Start processing records from perf. wg.Add(1) go func() { defer wg.Done() for { select { case event := <-eventsQueue: k.receiveEvent(event.RawSample) queueReceived.Inc() case <-stopCtx.Done(): k.log.Info("Listening for events completed.", logfields.Error, stopCtx.Err()) k.log.Debug(fmt.Sprintf("Unprocessed events in RB queue: %d", len(eventsQueue))) return } } }() ...

Processing

On calling receiveEvent()

  • it converts raw-bytes to events by passing data to HandlePerfData()
  • send events to various listeners

[Source]

snippet4.go
func (k *Observer) receiveEvent(data []byte) { var timer time.Time if option.Config.EnableMsgHandlingLatency { timer = time.Now() } op, events, err := HandlePerfData(data) opcodemetrics.OpTotalInc(ops.OpCode(op)) if err != nil { errormetrics.HandlerErrorsInc(ops.OpCode(op), err.kind) switch err.kind { case errormetrics.HandlePerfUnknownOp: k.log.Debug("unknown opcode ignored", "opcode", err.opcode) default: k.log.Debug("error occurred in event handler", "opcode", err.opcode, logfields.Error, err) } } for _, event := range events { k.observerListeners(event) } if option.Config.EnableMsgHandlingLatency { opcodemetrics.LatencyStats.WithLabelValues(strconv.FormatUint(uint64(op), 10)).Observe(float64(time.Since(timer).Microseconds())) } }

On calling HandlePerfData();

  • it tries to find event-specific handler using first-byte
  • calls the handler for parsing raw-bytes

[Source]

snippet5.go
func HandlePerfData(data []byte) (byte, []Event, *HandlePerfError) { op := data[0] r := bytes.NewReader(data) // These ops handlers are registered by RegisterEventHandlerAtInit(). handler, ok := eventHandler[op] if !ok { return op, nil, &HandlePerfError{ kind: errormetrics.HandlePerfUnknownOp, err: fmt.Errorf("unknown op: %d", op), opcode: op, } } events, err := handler(r) if err != nil { return op, events, &HandlePerfError{ kind: errormetrics.HandlePerfHandlerError, err: fmt.Errorf("handler for op %d failed: %w", op, err), opcode: op, } } return op, events, nil }

References

Read Entire Article