diff --git a/middleware/cmd/middleware/main.go b/middleware/cmd/middleware/main.go index d5e118dc..3f41d200 100644 --- a/middleware/cmd/middleware/main.go +++ b/middleware/cmd/middleware/main.go @@ -4,6 +4,7 @@ package main import ( "flag" + "fmt" "log" "net/http" @@ -22,6 +23,8 @@ func main() { bbbConfigScript := flag.String("bbbconfigscript", "/opt/shift/scripts/bbb-config.sh", "Path to the bbb-config file that allows setting system configuration") flag.Parse() + electrsAddress := fmt.Sprintf("localhost:%s", *electrsRPCPort) + argumentMap := make(map[string]string) argumentMap["bitcoinRPCUser"] = *bitcoinRPCUser argumentMap["bitcoinRPCPassword"] = *bitcoinRPCPassword @@ -43,7 +46,7 @@ func main() { middleware := middleware.NewMiddleware(argumentMap) log.Println("--------------- Started middleware --------------") - handlers := handlers.NewHandlers(middleware, *dataDir) + handlers := handlers.NewHandlers(middleware, *dataDir, electrsAddress) log.Println("Binding middleware api to port 8845") if err := http.ListenAndServe(":8845", handlers.Router); err != nil { diff --git a/middleware/src/electrum/electrum.go b/middleware/src/electrum/electrum.go new file mode 100644 index 00000000..ff3330bd --- /dev/null +++ b/middleware/src/electrum/electrum.go @@ -0,0 +1,48 @@ +// Package electrum reads messages from a connected electrum server over tcp and passes the read value to a callback function. It also sends messages to the electrum server over tcp +package electrum + +import ( + "bufio" + "fmt" + "log" + "net" +) + +// Electrum makes a connection to an Electrum server and proxies messages. +type Electrum struct { + connection net.Conn + onMessageReceived func([]byte) +} + +// NewElectrum creates a new Electrum instance and tries to connect to the server. +func NewElectrum(address string, onMessageReceived func([]byte)) (*Electrum, error) { + connection, err := net.Dial("tcp", address) + if err != nil { + return nil, err + } + electrum := &Electrum{ + connection: connection, + onMessageReceived: onMessageReceived, + } + go electrum.read() + return electrum, nil +} + +// read raw message from Electrum server +func (electrum *Electrum) read() { + reader := bufio.NewReader(electrum.connection) + for { + line, err := reader.ReadBytes(byte('\n')) + if err != nil { + log.Println(fmt.Sprintf("electrum read error: %v", err)) + break + } + electrum.onMessageReceived(line) + } +} + +// Send a raw message to the Electrum server. +func (electrum *Electrum) Send(msg []byte) error { + _, err := electrum.connection.Write(msg) + return err +} diff --git a/middleware/src/handlers/handlers.go b/middleware/src/handlers/handlers.go index 0bf8e890..61a66e80 100644 --- a/middleware/src/handlers/handlers.go +++ b/middleware/src/handlers/handlers.go @@ -14,6 +14,11 @@ import ( "github.com/gorilla/websocket" ) +const ( + opElectrum = byte('e') + opRPC = byte('r') +) + // Middleware provides an interface to the middleware package. type Middleware interface { // Start triggers the main middleware event loop that emits events to be caught by the handlers. @@ -30,6 +35,7 @@ type Handlers struct { upgrader websocket.Upgrader middleware Middleware middlewareEvents <-chan []byte + electrumAddress string noiseConfig *noisemanager.NoiseConfig nClients int @@ -38,16 +44,21 @@ type Handlers struct { } // NewHandlers returns a handler instance. -func NewHandlers(middlewareInstance Middleware, dataDir string) *Handlers { +func NewHandlers( + middlewareInstance Middleware, + dataDir string, + electrumAddress string, +) *Handlers { router := mux.NewRouter() handlers := &Handlers{ - middleware: middlewareInstance, - Router: router, - upgrader: websocket.Upgrader{}, - noiseConfig: noisemanager.NewNoiseConfig(dataDir), - nClients: 0, - clientsMap: make(map[int]chan<- []byte), + middleware: middlewareInstance, + electrumAddress: electrumAddress, + Router: router, + upgrader: websocket.Upgrader{}, + noiseConfig: noisemanager.NewNoiseConfig(dataDir), + nClients: 0, + clientsMap: make(map[int]chan<- []byte), } handlers.Router.HandleFunc("/", handlers.rootHandler).Methods("GET") handlers.Router.HandleFunc("/ws", handlers.wsHandler) @@ -96,11 +107,25 @@ func (handlers *Handlers) wsHandler(w http.ResponseWriter, r *http.Request) { log.Println(err.Error() + "Noise connection failed to initialize") return } - server := rpcserver.NewRPCServer(handlers.middleware) + writeChan := make(chan []byte) + onElectrumMessageReceived := func(msg []byte) { + writeChan <- append([]byte{opElectrum}, msg...) + } + server := rpcserver.NewRPCServer( + handlers.middleware, + handlers.electrumAddress, + onElectrumMessageReceived, + ) + go func() { + for { + msg := <-server.RPCConnection.WriteChan() + writeChan <- append([]byte{opRPC}, msg...) + } + }() handlers.mu.Lock() - handlers.clientsMap[handlers.nClients] = server.RPCConnection.WriteChan() - handlers.runWebsocket(ws, server.RPCConnection.ReadChan(), server.RPCConnection.WriteChan(), handlers.nClients) + handlers.clientsMap[handlers.nClients] = writeChan + handlers.runWebsocket(ws, server.RPCConnection.ReadChan(), writeChan, handlers.nClients) handlers.nClients++ handlers.mu.Unlock() go server.Serve() diff --git a/middleware/src/handlers/handlers_test.go b/middleware/src/handlers/handlers_test.go index 87e32f1a..2f958b8d 100644 --- a/middleware/src/handlers/handlers_test.go +++ b/middleware/src/handlers/handlers_test.go @@ -1,6 +1,8 @@ package handlers_test import ( + "fmt" + middleware "github.com/digitalbitbox/bitbox-base/middleware/src" "github.com/digitalbitbox/bitbox-base/middleware/src/handlers" "github.com/stretchr/testify/require" @@ -33,7 +35,11 @@ func TestRootHandler(t *testing.T) { argumentMap["bbbConfigScript"] = "/home/bitcoin/script.sh" middlewareInstance := middleware.NewMiddleware(argumentMap) - handlers := handlers.NewHandlers(middlewareInstance, ".base") + handlers := handlers.NewHandlers( + middlewareInstance, + ".base", + fmt.Sprintf("localhost:%s", argumentMap["electrsRPCPort"]), + ) req, err := http.NewRequest("GET", "/", nil) require.NoError(t, err) rr := httptest.NewRecorder() @@ -53,7 +59,11 @@ func TestWebsocketHandler(t *testing.T) { argumentMap["bbbConfigScript"] = "/home/bitcoin/script.sh" middlewareInstance := middleware.NewMiddleware(argumentMap) - handlers := handlers.NewHandlers(middlewareInstance, ".base") + handlers := handlers.NewHandlers( + middlewareInstance, + ".base", + fmt.Sprintf("localhost:%s", argumentMap["electrsRPCPort"]), + ) rr := httptest.NewServer(handlers.Router) defer rr.Close() diff --git a/middleware/src/handlers/websocket.go b/middleware/src/handlers/websocket.go index 8f55d4ac..41462879 100644 --- a/middleware/src/handlers/websocket.go +++ b/middleware/src/handlers/websocket.go @@ -16,9 +16,13 @@ const ( // It takes four arguments, a websocket connection, a read and a write channel. // // The goroutines close client upon exit or dues to a send/receive error. -func (handlers *Handlers) runWebsocket(client *websocket.Conn, readChan chan<- []byte, writeChan <-chan []byte, clientID int) { +func (handlers *Handlers) runWebsocket( + client *websocket.Conn, + readChan chan<- []byte, + writeChan <-chan []byte, + clientID int) { - const maxMessageSize = 512 + const maxMessageSize = 1024 * 1024 // this channel is used to break the write loop, when the read loop breaks closeChan := make(chan struct{}) @@ -46,7 +50,7 @@ func (handlers *Handlers) runWebsocket(client *websocket.Conn, readChan chan<- [ } if msg[0] == opICanHasPairinVerificashun { msg = handlers.noiseConfig.CheckVerification() - err = client.WriteMessage(websocket.TextMessage, msg) + err = client.WriteMessage(websocket.BinaryMessage, msg) if err != nil { log.Println("Error, websocket failed to write channel hash verification message") } @@ -77,7 +81,7 @@ func (handlers *Handlers) runWebsocket(client *websocket.Conn, readChan chan<- [ _ = client.WriteMessage(websocket.CloseMessage, []byte{}) return } - err := client.WriteMessage(websocket.TextMessage, handlers.noiseConfig.Encrypt(message)) + err := client.WriteMessage(websocket.BinaryMessage, handlers.noiseConfig.Encrypt(message)) if err != nil { log.Println("Error, websocket closed unexpectedly in the writing loop") _ = client.WriteMessage(websocket.CloseMessage, []byte{}) diff --git a/middleware/src/middleware.go b/middleware/src/middleware.go index 1aac7c6f..2ce4f312 100644 --- a/middleware/src/middleware.go +++ b/middleware/src/middleware.go @@ -18,9 +18,10 @@ const ( // Middleware connects to services on the base with provided parrameters and emits events for the handler. type Middleware struct { - info SampleInfoResponse - environment system.Environment - events chan []byte + info SampleInfoResponse + environment system.Environment + events chan []byte + electrumEvents chan []byte } // NewMiddleware returns a new instance of the middleware @@ -28,7 +29,8 @@ func NewMiddleware(argumentMap map[string]string) *Middleware { middleware := &Middleware{ environment: system.NewEnvironment(argumentMap), //TODO(TheCharlatan) find a better way to increase the channel size - events: make(chan []byte), //the channel size needs to be increased every time we had an extra endpoint + events: make(chan []byte), //the channel size needs to be increased every time we had an extra endpoint + electrumEvents: make(chan []byte), info: SampleInfoResponse{ Blocks: 0, Difficulty: 0.0, @@ -134,7 +136,10 @@ func (middleware *Middleware) ResyncBitcoin() ResyncBitcoinResponse { // SystemEnv returns a GetEnvResponse struct in response to a rpcserver request func (middleware *Middleware) SystemEnv() GetEnvResponse { - response := GetEnvResponse{Network: middleware.environment.Network, ElectrsRPCPort: middleware.environment.ElectrsRPCPort} + response := GetEnvResponse{ + Network: middleware.environment.Network, + ElectrsRPCPort: middleware.environment.ElectrsRPCPort, + } return response } diff --git a/middleware/src/rpcserver/rpcserver.go b/middleware/src/rpcserver/rpcserver.go index 26dd1ab5..f1a64eb5 100644 --- a/middleware/src/rpcserver/rpcserver.go +++ b/middleware/src/rpcserver/rpcserver.go @@ -5,6 +5,7 @@ import ( "net/rpc" middleware "github.com/digitalbitbox/bitbox-base/middleware/src" + "github.com/digitalbitbox/bitbox-base/middleware/src/electrum" ) type rpcConn struct { @@ -34,7 +35,7 @@ func (conn *rpcConn) Read(p []byte) (n int, err error) { } func (conn *rpcConn) Write(p []byte) (n int, err error) { - conn.writeChan <- append([]byte("r"), p...) + conn.writeChan <- p return len(p), nil } @@ -49,17 +50,27 @@ type Middleware interface { SampleInfo() middleware.SampleInfoResponse } +type Electrum interface { + Send(msg []byte) error +} + // RPCServer provides rpc calls to the middleware type RPCServer struct { - middleware Middleware - RPCConnection *rpcConn + middleware Middleware + electrum Electrum + electrumAddress string + onElectrumMessageReceived func(msg []byte) + RPCConnection *rpcConn } // NewRPCServer returns a new RPCServer -func NewRPCServer(middleware Middleware) *RPCServer { +func NewRPCServer(middleware Middleware, electrumAddress string, onElectrumMessageReceived func(msg []byte)) *RPCServer { //, electrum Electrum) *RPCServer { server := &RPCServer{ - middleware: middleware, - RPCConnection: newRPCConn(), + middleware: middleware, + //electrum: electrum, + electrumAddress: electrumAddress, + onElectrumMessageReceived: onElectrumMessageReceived, + RPCConnection: newRPCConn(), } err := rpc.Register(server) if err != nil { @@ -90,6 +101,21 @@ func (server *RPCServer) GetSampleInfo(args int, reply *middleware.SampleInfoRes return nil } +// ElectrumSend sends a message to the Electrum server on the connection owned by the client. +func (server *RPCServer) ElectrumSend( + args struct{ Msg []byte }, + reply *struct{}) error { + if server.electrum == nil { + electrumClient, err := electrum.NewElectrum(server.electrumAddress, server.onElectrumMessageReceived) + server.electrum = electrumClient + if err != nil { + log.Println(err.Error() + "Electrum connection failed to initialize") + return err + } + } + return server.electrum.Send(args.Msg) +} + func (server *RPCServer) Serve() { rpc.ServeConn(server.RPCConnection) } diff --git a/middleware/src/rpcserver/rpcserver_test.go b/middleware/src/rpcserver/rpcserver_test.go index bc740969..d91360d5 100644 --- a/middleware/src/rpcserver/rpcserver_test.go +++ b/middleware/src/rpcserver/rpcserver_test.go @@ -39,8 +39,7 @@ func TestRPCServer(t *testing.T) { argumentMap["network"] = "testnet" argumentMap["bbbConfigScript"] = "/home/bitcoin/script.sh" middlewareInstance := middleware.NewMiddleware(argumentMap) - - rpcServer := rpcserver.NewRPCServer(middlewareInstance) + rpcServer := rpcserver.NewRPCServer(middlewareInstance, "localhost:80801", func([]byte) {}) serverWriteChan := rpcServer.RPCConnection.WriteChan() serverReadChan := rpcServer.RPCConnection.ReadChan() @@ -63,9 +62,11 @@ func TestRPCServer(t *testing.T) { msgRequest := <-clientWriteChan serverReadChan <- msgRequest msgResponse := <-serverWriteChan + //t.Logf("significant byte: %s", string(msgResponse[0])) t.Logf("response message %s", string(msgResponse)) // Cut off the significant Byte in the response - clientReadChan <- msgResponse[1:] + //t.Logf("significant byte: %s", string(msgResponse[1])) + clientReadChan <- msgResponse wg.Wait() t.Logf("reply: %v", reply) require.Equal(t, "testnet", reply.Network) @@ -86,7 +87,7 @@ func TestRPCServer(t *testing.T) { msgResponse = <-serverWriteChan t.Logf("Resync Bitcoin Response %q", string(msgResponse)) // Cut off the significant Byte in the response - clientReadChan <- msgResponse[1:] + clientReadChan <- msgResponse wg.Wait() require.Equal(t, false, resyncReply.Success) }