Skip to content
This repository was archived by the owner on Feb 25, 2023. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion middleware/cmd/middleware/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package main

import (
"flag"
"fmt"
"log"
"net/http"

Expand All @@ -22,6 +23,8 @@ func main() {
bbbConfigScript := flag.String("bbbconfigscript", "/opt/shift/scripts/bbb-config.sh", "Path to the bbb-config file that allows setting system configuration")
flag.Parse()

electrsAddress := fmt.Sprintf("localhost:%s", *electrsRPCPort)

argumentMap := make(map[string]string)
argumentMap["bitcoinRPCUser"] = *bitcoinRPCUser
argumentMap["bitcoinRPCPassword"] = *bitcoinRPCPassword
Expand All @@ -43,7 +46,7 @@ func main() {
middleware := middleware.NewMiddleware(argumentMap)
log.Println("--------------- Started middleware --------------")

handlers := handlers.NewHandlers(middleware, *dataDir)
handlers := handlers.NewHandlers(middleware, *dataDir, electrsAddress)
log.Println("Binding middleware api to port 8845")

if err := http.ListenAndServe(":8845", handlers.Router); err != nil {
Expand Down
48 changes: 48 additions & 0 deletions middleware/src/electrum/electrum.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Package electrum reads messages from a connected electrum server over tcp and passes the read value to a callback function. It also sends messages to the electrum server over tcp
package electrum

import (
"bufio"
"fmt"
"log"
"net"
)

// Electrum makes a connection to an Electrum server and proxies messages.
type Electrum struct {
connection net.Conn
onMessageReceived func([]byte)
}

// NewElectrum creates a new Electrum instance and tries to connect to the server.
func NewElectrum(address string, onMessageReceived func([]byte)) (*Electrum, error) {
connection, err := net.Dial("tcp", address)
if err != nil {
return nil, err
}
electrum := &Electrum{
connection: connection,
onMessageReceived: onMessageReceived,
}
go electrum.read()
return electrum, nil
}

// read raw message from Electrum server
func (electrum *Electrum) read() {
reader := bufio.NewReader(electrum.connection)
for {
line, err := reader.ReadBytes(byte('\n'))
if err != nil {
log.Println(fmt.Sprintf("electrum read error: %v", err))
break
}
electrum.onMessageReceived(line)
}
}

// Send a raw message to the Electrum server.
func (electrum *Electrum) Send(msg []byte) error {
_, err := electrum.connection.Write(msg)
return err
}
45 changes: 35 additions & 10 deletions middleware/src/handlers/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ import (
"github.com/gorilla/websocket"
)

const (
opElectrum = byte('e')
opRPC = byte('r')
)

// Middleware provides an interface to the middleware package.
type Middleware interface {
// Start triggers the main middleware event loop that emits events to be caught by the handlers.
Expand All @@ -30,6 +35,7 @@ type Handlers struct {
upgrader websocket.Upgrader
middleware Middleware
middlewareEvents <-chan []byte
electrumAddress string

noiseConfig *noisemanager.NoiseConfig
nClients int
Expand All @@ -38,16 +44,21 @@ type Handlers struct {
}

// NewHandlers returns a handler instance.
func NewHandlers(middlewareInstance Middleware, dataDir string) *Handlers {
func NewHandlers(
middlewareInstance Middleware,
dataDir string,
electrumAddress string,
) *Handlers {
router := mux.NewRouter()

handlers := &Handlers{
middleware: middlewareInstance,
Router: router,
upgrader: websocket.Upgrader{},
noiseConfig: noisemanager.NewNoiseConfig(dataDir),
nClients: 0,
clientsMap: make(map[int]chan<- []byte),
middleware: middlewareInstance,
electrumAddress: electrumAddress,
Router: router,
upgrader: websocket.Upgrader{},
noiseConfig: noisemanager.NewNoiseConfig(dataDir),
nClients: 0,
clientsMap: make(map[int]chan<- []byte),
}
handlers.Router.HandleFunc("/", handlers.rootHandler).Methods("GET")
handlers.Router.HandleFunc("/ws", handlers.wsHandler)
Expand Down Expand Up @@ -96,11 +107,25 @@ func (handlers *Handlers) wsHandler(w http.ResponseWriter, r *http.Request) {
log.Println(err.Error() + "Noise connection failed to initialize")
return
}
server := rpcserver.NewRPCServer(handlers.middleware)
writeChan := make(chan []byte)
onElectrumMessageReceived := func(msg []byte) {
writeChan <- append([]byte{opElectrum}, msg...)
}

server := rpcserver.NewRPCServer(
handlers.middleware,
handlers.electrumAddress,
onElectrumMessageReceived,
)
go func() {
for {
msg := <-server.RPCConnection.WriteChan()
writeChan <- append([]byte{opRPC}, msg...)
}
}()
handlers.mu.Lock()
handlers.clientsMap[handlers.nClients] = server.RPCConnection.WriteChan()
handlers.runWebsocket(ws, server.RPCConnection.ReadChan(), server.RPCConnection.WriteChan(), handlers.nClients)
handlers.clientsMap[handlers.nClients] = writeChan
handlers.runWebsocket(ws, server.RPCConnection.ReadChan(), writeChan, handlers.nClients)
handlers.nClients++
handlers.mu.Unlock()
go server.Serve()
Expand Down
14 changes: 12 additions & 2 deletions middleware/src/handlers/handlers_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package handlers_test

import (
"fmt"

middleware "github.com/digitalbitbox/bitbox-base/middleware/src"
"github.com/digitalbitbox/bitbox-base/middleware/src/handlers"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -33,7 +35,11 @@ func TestRootHandler(t *testing.T) {
argumentMap["bbbConfigScript"] = "/home/bitcoin/script.sh"

middlewareInstance := middleware.NewMiddleware(argumentMap)
handlers := handlers.NewHandlers(middlewareInstance, ".base")
handlers := handlers.NewHandlers(
middlewareInstance,
".base",
fmt.Sprintf("localhost:%s", argumentMap["electrsRPCPort"]),
)
req, err := http.NewRequest("GET", "/", nil)
require.NoError(t, err)
rr := httptest.NewRecorder()
Expand All @@ -53,7 +59,11 @@ func TestWebsocketHandler(t *testing.T) {
argumentMap["bbbConfigScript"] = "/home/bitcoin/script.sh"

middlewareInstance := middleware.NewMiddleware(argumentMap)
handlers := handlers.NewHandlers(middlewareInstance, ".base")
handlers := handlers.NewHandlers(
middlewareInstance,
".base",
fmt.Sprintf("localhost:%s", argumentMap["electrsRPCPort"]),
)
rr := httptest.NewServer(handlers.Router)
defer rr.Close()

Expand Down
12 changes: 8 additions & 4 deletions middleware/src/handlers/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@ const (
// It takes four arguments, a websocket connection, a read and a write channel.
//
// The goroutines close client upon exit or dues to a send/receive error.
func (handlers *Handlers) runWebsocket(client *websocket.Conn, readChan chan<- []byte, writeChan <-chan []byte, clientID int) {
func (handlers *Handlers) runWebsocket(
client *websocket.Conn,
readChan chan<- []byte,
writeChan <-chan []byte,
clientID int) {

const maxMessageSize = 512
const maxMessageSize = 1024 * 1024
// this channel is used to break the write loop, when the read loop breaks
closeChan := make(chan struct{})

Expand Down Expand Up @@ -46,7 +50,7 @@ func (handlers *Handlers) runWebsocket(client *websocket.Conn, readChan chan<- [
}
if msg[0] == opICanHasPairinVerificashun {
msg = handlers.noiseConfig.CheckVerification()
err = client.WriteMessage(websocket.TextMessage, msg)
err = client.WriteMessage(websocket.BinaryMessage, msg)
if err != nil {
log.Println("Error, websocket failed to write channel hash verification message")
}
Expand Down Expand Up @@ -77,7 +81,7 @@ func (handlers *Handlers) runWebsocket(client *websocket.Conn, readChan chan<- [
_ = client.WriteMessage(websocket.CloseMessage, []byte{})
return
}
err := client.WriteMessage(websocket.TextMessage, handlers.noiseConfig.Encrypt(message))
err := client.WriteMessage(websocket.BinaryMessage, handlers.noiseConfig.Encrypt(message))
if err != nil {
log.Println("Error, websocket closed unexpectedly in the writing loop")
_ = client.WriteMessage(websocket.CloseMessage, []byte{})
Expand Down
15 changes: 10 additions & 5 deletions middleware/src/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,19 @@ const (

// Middleware connects to services on the base with provided parrameters and emits events for the handler.
type Middleware struct {
info SampleInfoResponse
environment system.Environment
events chan []byte
info SampleInfoResponse
environment system.Environment
events chan []byte
electrumEvents chan []byte
}

// NewMiddleware returns a new instance of the middleware
func NewMiddleware(argumentMap map[string]string) *Middleware {
middleware := &Middleware{
environment: system.NewEnvironment(argumentMap),
//TODO(TheCharlatan) find a better way to increase the channel size
events: make(chan []byte), //the channel size needs to be increased every time we had an extra endpoint
events: make(chan []byte), //the channel size needs to be increased every time we had an extra endpoint
electrumEvents: make(chan []byte),
info: SampleInfoResponse{
Blocks: 0,
Difficulty: 0.0,
Expand Down Expand Up @@ -134,7 +136,10 @@ func (middleware *Middleware) ResyncBitcoin() ResyncBitcoinResponse {

// SystemEnv returns a GetEnvResponse struct in response to a rpcserver request
func (middleware *Middleware) SystemEnv() GetEnvResponse {
response := GetEnvResponse{Network: middleware.environment.Network, ElectrsRPCPort: middleware.environment.ElectrsRPCPort}
response := GetEnvResponse{
Network: middleware.environment.Network,
ElectrsRPCPort: middleware.environment.ElectrsRPCPort,
}
return response
}

Expand Down
38 changes: 32 additions & 6 deletions middleware/src/rpcserver/rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net/rpc"

middleware "github.com/digitalbitbox/bitbox-base/middleware/src"
"github.com/digitalbitbox/bitbox-base/middleware/src/electrum"
)

type rpcConn struct {
Expand Down Expand Up @@ -34,7 +35,7 @@ func (conn *rpcConn) Read(p []byte) (n int, err error) {
}

func (conn *rpcConn) Write(p []byte) (n int, err error) {
conn.writeChan <- append([]byte("r"), p...)
conn.writeChan <- p
return len(p), nil
}

Expand All @@ -49,17 +50,27 @@ type Middleware interface {
SampleInfo() middleware.SampleInfoResponse
}

type Electrum interface {
Send(msg []byte) error
}

// RPCServer provides rpc calls to the middleware
type RPCServer struct {
middleware Middleware
RPCConnection *rpcConn
middleware Middleware
electrum Electrum
electrumAddress string
onElectrumMessageReceived func(msg []byte)
RPCConnection *rpcConn
}

// NewRPCServer returns a new RPCServer
func NewRPCServer(middleware Middleware) *RPCServer {
func NewRPCServer(middleware Middleware, electrumAddress string, onElectrumMessageReceived func(msg []byte)) *RPCServer { //, electrum Electrum) *RPCServer {
server := &RPCServer{
middleware: middleware,
RPCConnection: newRPCConn(),
middleware: middleware,
//electrum: electrum,
electrumAddress: electrumAddress,
onElectrumMessageReceived: onElectrumMessageReceived,
RPCConnection: newRPCConn(),
}
err := rpc.Register(server)
if err != nil {
Expand Down Expand Up @@ -90,6 +101,21 @@ func (server *RPCServer) GetSampleInfo(args int, reply *middleware.SampleInfoRes
return nil
}

// ElectrumSend sends a message to the Electrum server on the connection owned by the client.
func (server *RPCServer) ElectrumSend(
args struct{ Msg []byte },
reply *struct{}) error {
if server.electrum == nil {
electrumClient, err := electrum.NewElectrum(server.electrumAddress, server.onElectrumMessageReceived)
server.electrum = electrumClient
if err != nil {
log.Println(err.Error() + "Electrum connection failed to initialize")
return err
}
}
return server.electrum.Send(args.Msg)
}

func (server *RPCServer) Serve() {
rpc.ServeConn(server.RPCConnection)
}
9 changes: 5 additions & 4 deletions middleware/src/rpcserver/rpcserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ func TestRPCServer(t *testing.T) {
argumentMap["network"] = "testnet"
argumentMap["bbbConfigScript"] = "/home/bitcoin/script.sh"
middlewareInstance := middleware.NewMiddleware(argumentMap)

rpcServer := rpcserver.NewRPCServer(middlewareInstance)
rpcServer := rpcserver.NewRPCServer(middlewareInstance, "localhost:80801", func([]byte) {})
serverWriteChan := rpcServer.RPCConnection.WriteChan()
serverReadChan := rpcServer.RPCConnection.ReadChan()

Expand All @@ -63,9 +62,11 @@ func TestRPCServer(t *testing.T) {
msgRequest := <-clientWriteChan
serverReadChan <- msgRequest
msgResponse := <-serverWriteChan
//t.Logf("significant byte: %s", string(msgResponse[0]))
t.Logf("response message %s", string(msgResponse))
// Cut off the significant Byte in the response
clientReadChan <- msgResponse[1:]
//t.Logf("significant byte: %s", string(msgResponse[1]))
clientReadChan <- msgResponse
wg.Wait()
t.Logf("reply: %v", reply)
require.Equal(t, "testnet", reply.Network)
Expand All @@ -86,7 +87,7 @@ func TestRPCServer(t *testing.T) {
msgResponse = <-serverWriteChan
t.Logf("Resync Bitcoin Response %q", string(msgResponse))
// Cut off the significant Byte in the response
clientReadChan <- msgResponse[1:]
clientReadChan <- msgResponse
wg.Wait()
require.Equal(t, false, resyncReply.Success)
}