Skip to content

Commit 43d87db

Browse files
committed
feat: add socket auditor for forwarding logs to coder agent
Add SocketAuditor that sends audit logs to the Coder workspace agent via a Unix socket. This enables boundary audit events to be forwarded to coderd for centralized logging. Implementation notes: - Batching: 10 logs or 5 seconds, whichever comes first - Wire format: length-prefixed protobuf. proto imported from AgentAPI to simplify boundary -> agent -> coderd forwarding to start.
1 parent 9c9c878 commit 43d87db

File tree

6 files changed

+786
-87
lines changed

6 files changed

+786
-87
lines changed

audit/multi_auditor.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package audit
2+
3+
// MultiAuditor wraps multiple auditors and sends audit events to all of them.
4+
type MultiAuditor struct {
5+
auditors []Auditor
6+
}
7+
8+
// NewMultiAuditor creates a new MultiAuditor that sends to all provided auditors.
9+
func NewMultiAuditor(auditors ...Auditor) *MultiAuditor {
10+
return &MultiAuditor{auditors: auditors}
11+
}
12+
13+
// AuditRequest sends the request to all wrapped auditors.
14+
func (m *MultiAuditor) AuditRequest(req Request) {
15+
for _, a := range m.auditors {
16+
a.AuditRequest(req)
17+
}
18+
}

audit/socket_auditor.go

Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
package audit
2+
3+
import (
4+
"context"
5+
"encoding/binary"
6+
"log/slog"
7+
"net"
8+
"time"
9+
10+
"google.golang.org/protobuf/proto"
11+
"google.golang.org/protobuf/types/known/timestamppb"
12+
13+
agentproto "github.com/coder/coder/v2/agent/proto"
14+
)
15+
16+
const (
17+
defaultBatchSize = 10
18+
defaultBatchTimerDuration = 5 * time.Second
19+
// DefaultAuditSocketPath is the well-known path for the boundary audit socket.
20+
// The expectation is the Coder agent listens on this socket to receive audit logs.
21+
DefaultAuditSocketPath = "/tmp/boundary-audit.sock"
22+
)
23+
24+
// SocketAuditor implements the Auditor interface. It sends logs to the
25+
// workspace agent's boundary log proxy socket. It queues logs and sends
26+
// them in batches using a batch size and timer. The internal queue operates
27+
// as a FIFO i.e., logs are sent in the order they are received and dropped
28+
// if the queue is full.
29+
type SocketAuditor struct {
30+
socketPath string
31+
logger *slog.Logger
32+
logCh chan *agentproto.BoundaryLog
33+
batchSize int
34+
batchTimerDuration time.Duration
35+
36+
// onFlushAttempt is called after each flush attempt (intended for testing).
37+
onFlushAttempt func()
38+
}
39+
40+
// NewSocketAuditor creates a new SocketAuditor that sends logs to the agent's
41+
// boundary log proxy socket at DefaultAuditSocketPath after SocketAuditor.Loop
42+
// is called.
43+
func NewSocketAuditor(logger *slog.Logger) *SocketAuditor {
44+
return &SocketAuditor{
45+
socketPath: DefaultAuditSocketPath,
46+
logger: logger,
47+
logCh: make(chan *agentproto.BoundaryLog, 2*defaultBatchSize),
48+
batchSize: defaultBatchSize,
49+
batchTimerDuration: defaultBatchTimerDuration,
50+
}
51+
}
52+
53+
// AuditRequest implements the Auditor interface. It queues the log to be sent to the
54+
// agent in a batch.
55+
func (s *SocketAuditor) AuditRequest(req Request) {
56+
httpReq := &agentproto.BoundaryLog_HttpRequest{
57+
Method: req.Method,
58+
Url: req.URL,
59+
}
60+
// Only include the matched rule for denied requests, as documented in
61+
// the proto schema.
62+
if !req.Allowed {
63+
httpReq.MatchedRule = req.Rule
64+
}
65+
66+
log := &agentproto.BoundaryLog{
67+
Allowed: req.Allowed,
68+
Time: timestamppb.Now(),
69+
Resource: &agentproto.BoundaryLog_HttpRequest_{HttpRequest: httpReq},
70+
}
71+
72+
select {
73+
case s.logCh <- log:
74+
default:
75+
s.logger.Warn("audit log dropped, channel full")
76+
}
77+
}
78+
79+
// flushErr represents an error from flush, distinguishing between
80+
// permanent errors (bad data) and transient errors (network issues).
81+
type flushErr struct {
82+
err error
83+
permanent bool
84+
}
85+
86+
func (e *flushErr) Error() string { return e.err.Error() }
87+
88+
// flush sends the current batch of logs to the given connection.
89+
func flush(conn net.Conn, logs []*agentproto.BoundaryLog) *flushErr {
90+
if len(logs) == 0 {
91+
return nil
92+
}
93+
94+
req := &agentproto.ReportBoundaryLogsRequest{
95+
Logs: logs,
96+
}
97+
98+
data, err := proto.Marshal(req)
99+
if err != nil {
100+
return &flushErr{err: err, permanent: true}
101+
}
102+
103+
if err := binary.Write(conn, binary.BigEndian, uint32(len(data))); err != nil {
104+
return &flushErr{err: err}
105+
}
106+
if _, err := conn.Write(data); err != nil {
107+
return &flushErr{err: err}
108+
}
109+
return nil
110+
}
111+
112+
// Loop handles the I/O to send audit logs to the agent.
113+
func (s *SocketAuditor) Loop(ctx context.Context) {
114+
var conn net.Conn
115+
batch := make([]*agentproto.BoundaryLog, 0, s.batchSize)
116+
t := time.NewTimer(0)
117+
t.Stop()
118+
119+
// connect attempts to establish a connection to the socket.
120+
connect := func() {
121+
if conn != nil {
122+
return
123+
}
124+
var err error
125+
conn, err = net.Dial("unix", s.socketPath)
126+
if err != nil {
127+
s.logger.Warn("failed to connect to audit socket", "path", s.socketPath, "error", err)
128+
conn = nil
129+
}
130+
}
131+
132+
// closeConn closes the current connection if open.
133+
closeConn := func() {
134+
if conn != nil {
135+
_ = conn.Close()
136+
conn = nil
137+
}
138+
}
139+
140+
// clearBatch resets the length of the batch and frees memory while preserving
141+
// the batch slice backing array.
142+
clearBatch := func() {
143+
for i := range len(batch) {
144+
batch[i] = nil
145+
}
146+
batch = batch[:0]
147+
}
148+
149+
// doFlush flushes the batch and handles errors by reconnecting.
150+
doFlush := func() {
151+
t.Stop()
152+
defer func() {
153+
if s.onFlushAttempt != nil {
154+
s.onFlushAttempt()
155+
}
156+
}()
157+
if len(batch) == 0 {
158+
return
159+
}
160+
connect()
161+
if conn == nil {
162+
// No connection: logs will be retried on next flush.
163+
return
164+
}
165+
166+
if err := flush(conn, batch); err != nil {
167+
s.logger.Warn("failed to flush audit logs", "error", err)
168+
if err.permanent {
169+
// Data error: discard batch to avoid infinite retries.
170+
clearBatch()
171+
} else {
172+
// Network error: close connection but keep batch for a future retry.
173+
closeConn()
174+
}
175+
return
176+
}
177+
178+
clearBatch()
179+
}
180+
181+
connect()
182+
183+
for {
184+
select {
185+
case <-ctx.Done():
186+
// Drain any pending logs before the last flush. Not concerned about
187+
// growing the batch slice here since we're exiting.
188+
drain:
189+
for {
190+
select {
191+
case log := <-s.logCh:
192+
batch = append(batch, log)
193+
default:
194+
break drain
195+
}
196+
}
197+
198+
doFlush()
199+
closeConn()
200+
return
201+
case <-t.C:
202+
doFlush()
203+
case log := <-s.logCh:
204+
// If batch is at capacity, attempt flushing first and drop the log if
205+
// the batch still full.
206+
if len(batch) >= s.batchSize {
207+
doFlush()
208+
if len(batch) >= s.batchSize {
209+
s.logger.Warn("audit log dropped, batch full")
210+
continue
211+
}
212+
}
213+
214+
batch = append(batch, log)
215+
216+
if len(batch) == 1 {
217+
t.Reset(s.batchTimerDuration)
218+
}
219+
220+
if len(batch) >= s.batchSize {
221+
doFlush()
222+
}
223+
}
224+
}
225+
}

0 commit comments

Comments
 (0)