diff --git a/p2p/protocol.go b/p2p/protocol.go index 63aec284..d5afccf3 100644 --- a/p2p/protocol.go +++ b/p2p/protocol.go @@ -5,6 +5,7 @@ import ( "encoding/hex" "errors" "fmt" + "io" "math/big" "time" @@ -14,6 +15,7 @@ import ( "github.com/ethereum/go-ethereum/eth/protocols/eth" ethp2p "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/rlp" "github.com/prometheus/client_golang/prometheus" "github.com/rs/zerolog" "github.com/rs/zerolog/log" @@ -370,16 +372,25 @@ func (c *conn) handleNewBlockHashes(ctx context.Context, msg ethp2p.Msg) error { } func (c *conn) handleTransactions(ctx context.Context, msg ethp2p.Msg) error { - var txs eth.TransactionsPacket - if err := msg.Decode(&txs); err != nil { - return err + payload, err := io.ReadAll(msg.Payload) + if err != nil { + return fmt.Errorf("failed to read transactions payload: %w", err) } + var rawTxs []rlp.RawValue + if err := rlp.DecodeBytes(payload, &rawTxs); err != nil { + c.logger.Warn().Err(err).Msg("Failed to decode transactions") + return nil + } + + txs := decodeTxs(rawTxs) tfs := time.Now() - c.countMsgReceived(txs.Name(), float64(len(txs))) + c.countMsgReceived((ð.TransactionsPacket{}).Name(), float64(len(txs))) - c.db.WriteTransactions(ctx, c.node, txs, tfs) + if len(txs) > 0 { + c.db.WriteTransactions(ctx, c.node, txs, tfs) + } return nil } @@ -449,18 +460,18 @@ func (c *conn) handleGetBlockBodies(msg ethp2p.Msg) error { } func (c *conn) handleBlockBodies(ctx context.Context, msg ethp2p.Msg) error { - var packet eth.BlockBodiesPacket + var packet eth.BlockBodiesRLPPacket if err := msg.Decode(&packet); err != nil { return err } tfs := time.Now() - if len(packet.BlockBodiesResponse) == 0 { + if len(packet.BlockBodiesRLPResponse) == 0 { return nil } - c.countMsgReceived(packet.Name(), float64(len(packet.BlockBodiesResponse))) + c.countMsgReceived((ð.BlockBodiesPacket{}).Name(), float64(len(packet.BlockBodiesRLPResponse))) hash, ok := c.requests.Get(packet.RequestId) if !ok { @@ -474,7 +485,18 @@ func (c *conn) handleBlockBodies(ctx context.Context, msg ethp2p.Msg) error { return nil } - body := packet.BlockBodiesResponse[0] + var decoded rawBlockBody + if err := rlp.DecodeBytes(packet.BlockBodiesRLPResponse[0], &decoded); err != nil { + c.logger.Warn().Err(err).Msg("Failed to decode block body") + return nil + } + + body := ð.BlockBody{ + Transactions: decodeTxs(decoded.Transactions), + Uncles: decoded.Uncles, + Withdrawals: decoded.Withdrawals, + } + c.db.WriteBlockBody(ctx, body, hash, tfs) // Update cache to store body @@ -487,26 +509,39 @@ func (c *conn) handleBlockBodies(ctx context.Context, msg ethp2p.Msg) error { } func (c *conn) handleNewBlock(ctx context.Context, msg ethp2p.Msg) error { - var block eth.NewBlockPacket - if err := msg.Decode(&block); err != nil { - return err + payload, err := io.ReadAll(msg.Payload) + if err != nil { + return fmt.Errorf("failed to read new block payload: %w", err) } + var raw rawNewBlockPacket + if err := rlp.DecodeBytes(payload, &raw); err != nil { + c.logger.Warn().Err(err).Msg("Failed to decode new block") + return nil + } + + block := types.NewBlockWithHeader(raw.Block.Header).WithBody(types.Body{ + Transactions: decodeTxs(raw.Block.Txs), + Uncles: raw.Block.Uncles, + Withdrawals: raw.Block.Withdrawals, + }) + packet := ð.NewBlockPacket{Block: block, TD: raw.TD} + tfs := time.Now() - hash := block.Block.Hash() + hash := packet.Block.Hash() - c.countMsgReceived(block.Name(), 1) + c.countMsgReceived(packet.Name(), 1) // Set the head block if newer. - if c.conns.UpdateHeadBlock(block) { + if c.conns.UpdateHeadBlock(*packet) { c.logger.Info(). Str("hash", hash.Hex()). - Uint64("number", block.Block.Number().Uint64()). - Str("td", block.TD.String()). + Uint64("number", packet.Block.Number().Uint64()). + Str("td", packet.TD.String()). Msg("Updated head block") } - if err := c.getParentBlock(ctx, block.Block.Header()); err != nil { + if err := c.getParentBlock(ctx, packet.Block.Header()); err != nil { return err } @@ -515,17 +550,17 @@ func (c *conn) handleNewBlock(ctx context.Context, msg ethp2p.Msg) error { return nil } - c.db.WriteBlock(ctx, c.node, block.Block, block.TD, tfs) + c.db.WriteBlock(ctx, c.node, packet.Block, packet.TD, tfs) // Update cache to store the full block c.conns.Blocks().Add(hash, BlockCache{ - Header: block.Block.Header(), + Header: packet.Block.Header(), Body: ð.BlockBody{ - Transactions: block.Block.Transactions(), - Uncles: block.Block.Uncles(), - Withdrawals: block.Block.Withdrawals(), + Transactions: packet.Block.Transactions(), + Uncles: packet.Block.Uncles(), + Withdrawals: packet.Block.Withdrawals(), }, - TD: block.TD, + TD: packet.TD, }) return nil @@ -549,7 +584,7 @@ func (c *conn) handleNewPooledTransactionHashes(version uint, msg ethp2p.Msg) er var name string switch version { - case 67, 68: + case 67, 68, 69: var txs eth.NewPooledTransactionHashesPacket if err := msg.Decode(&txs); err != nil { return err @@ -572,16 +607,28 @@ func (c *conn) handleNewPooledTransactionHashes(version uint, msg ethp2p.Msg) er } func (c *conn) handlePooledTransactions(ctx context.Context, msg ethp2p.Msg) error { - var packet eth.PooledTransactionsPacket - if err := msg.Decode(&packet); err != nil { - return err + payload, err := io.ReadAll(msg.Payload) + if err != nil { + return fmt.Errorf("failed to read pooled transactions payload: %w", err) + } + + var raw rawPooledTransactionsPacket + if err := rlp.DecodeBytes(payload, &raw); err != nil { + c.logger.Warn().Err(err).Msg("Failed to decode pooled transactions") + return nil + } + + packet := ð.PooledTransactionsPacket{ + PooledTransactionsResponse: decodeTxs(raw.Txs), } tfs := time.Now() c.countMsgReceived(packet.Name(), float64(len(packet.PooledTransactionsResponse))) - c.db.WriteTransactions(ctx, c.node, packet.PooledTransactionsResponse, tfs) + if len(packet.PooledTransactionsResponse) > 0 { + c.db.WriteTransactions(ctx, c.node, packet.PooledTransactionsResponse, tfs) + } return nil } diff --git a/p2p/types.go b/p2p/types.go index fb88d5d8..56402663 100644 --- a/p2p/types.go +++ b/p2p/types.go @@ -3,11 +3,14 @@ package p2p import ( "crypto/ecdsa" "fmt" + "math/big" "slices" "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/stateless" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/eth/protocols/snap" "github.com/ethereum/go-ethereum/p2p" @@ -15,6 +18,7 @@ import ( "github.com/ethereum/go-ethereum/p2p/rlpx" "github.com/ethereum/go-ethereum/rlp" "github.com/rs/zerolog" + "github.com/rs/zerolog/log" ) type Message interface { @@ -402,3 +406,79 @@ func (c *rlpxConn) hasCap(name string, version uint) bool { cap := p2p.Cap{Name: name, Version: version} return slices.Contains(c.peerCaps, cap) && slices.Contains(c.caps, cap) } + +// rawBlockBody is used to decode block bodies with any transaction type. +type rawBlockBody struct { + Transactions []rlp.RawValue + Uncles []*types.Header + Withdrawals []*types.Withdrawal `rlp:"optional"` +} + +// rawBlock is used to decode blocks with any transaction type. +type rawBlock struct { + Header *types.Header + Txs []rlp.RawValue + Uncles []*types.Header + Withdrawals []*types.Withdrawal `rlp:"optional"` +} + +// rawNewBlockPacket is used to decode NewBlockMsg with any transaction type. +type rawNewBlockPacket struct { + Block rawBlock + TD *big.Int +} + +// rawPooledTransactionsPacket is used to decode PooledTransactionsMsg with any transaction type. +type rawPooledTransactionsPacket struct { + RequestId uint64 + Txs []rlp.RawValue +} + +// decodeTx attempts to decode a transaction from an RLP-encoded raw value. +func decodeTx(raw []byte) *types.Transaction { + if len(raw) == 0 { + return nil + } + + var bytes []byte + if rlp.DecodeBytes(raw, &bytes) == nil { + tx := new(types.Transaction) + if tx.UnmarshalBinary(bytes) == nil { + return tx + } + + log.Warn(). + Uint8("type", bytes[0]). + Int("size", len(bytes)). + Str("hash", crypto.Keccak256Hash(bytes).Hex()). + Msg("Failed to decode transaction") + + return nil + } + + tx := new(types.Transaction) + if tx.UnmarshalBinary(raw) == nil { + return tx + } + + log.Warn(). + Uint8("prefix", raw[0]). + Int("size", len(raw)). + Str("hash", crypto.Keccak256Hash(raw).Hex()). + Msg("Failed to decode transaction") + + return nil +} + +// decodeTxs decodes a list of transactions, returning only successfully decoded ones. +func decodeTxs(rawTxs []rlp.RawValue) []*types.Transaction { + var txs []*types.Transaction + + for _, raw := range rawTxs { + if tx := decodeTx(raw); tx != nil { + txs = append(txs, tx) + } + } + + return txs +}