diff --git a/CMakeLists.txt b/CMakeLists.txt index 52d36bb4..998fd439 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -125,7 +125,30 @@ endif() #Find packages find_package(Threads REQUIRED) find_package(OpenSSL REQUIRED) -find_package(yaml-cpp REQUIRED) + +# Try to find yaml-cpp, fetch if not found +find_package(yaml-cpp QUIET) +if(NOT yaml-cpp_FOUND) + message(STATUS "yaml-cpp not found, fetching from source...") + include(FetchContent) + # Set policy for older cmake_minimum_required in yaml-cpp + set(CMAKE_POLICY_DEFAULT_CMP0048 NEW) + if(POLICY CMP0077) + cmake_policy(SET CMP0077 NEW) + endif() + FetchContent_Declare( + yaml-cpp + GIT_REPOSITORY https://github.com/jbeder/yaml-cpp.git + GIT_TAG yaml-cpp-0.7.0 + ) + set(YAML_CPP_BUILD_TESTS OFF CACHE BOOL "" FORCE) + set(YAML_CPP_BUILD_TOOLS OFF CACHE BOOL "" FORCE) + set(YAML_CPP_BUILD_CONTRIB OFF CACHE BOOL "" FORCE) + set(YAML_CPP_FORMAT_SOURCE OFF CACHE BOOL "" FORCE) + set(YAML_BUILD_SHARED_LIBS OFF CACHE BOOL "" FORCE) + FetchContent_MakeAvailable(yaml-cpp) + message(STATUS "yaml-cpp fetched successfully") +endif() #Find libevent find_package(PkgConfig) @@ -297,9 +320,23 @@ else() message(STATUS "nghttp2 support disabled") endif() -# fmt library for formatting - use system-installed version -find_package(fmt REQUIRED) -message(STATUS "Found fmt: ${fmt_VERSION}") +# fmt library for formatting - try system, fetch if not found +find_package(fmt QUIET) +if(NOT fmt_FOUND) + message(STATUS "fmt not found, fetching from source...") + FetchContent_Declare( + fmt + GIT_REPOSITORY https://github.com/fmtlib/fmt.git + GIT_TAG 10.2.1 + ) + set(FMT_INSTALL OFF CACHE BOOL "" FORCE) + set(FMT_TEST OFF CACHE BOOL "" FORCE) + set(FMT_DOC OFF CACHE BOOL "" FORCE) + FetchContent_MakeAvailable(fmt) + message(STATUS "fmt fetched successfully") +else() + message(STATUS "Found fmt: ${fmt_VERSION}") +endif() # nlohmann/json FetchContent_Declare( diff --git a/build-mingw.sh b/build-mingw.sh new file mode 100644 index 00000000..c93ddbcf --- /dev/null +++ b/build-mingw.sh @@ -0,0 +1,59 @@ +#!/bin/bash +# Build script for GopherMCP with MinGW on Cygwin +# Usage: ./build-mingw.sh [clean|release|debug] + +set -e + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +BUILD_DIR="${SCRIPT_DIR}/build-mingw" +BUILD_TYPE="${1:-Release}" + +# Handle clean option +if [ "$1" == "clean" ]; then + echo "Cleaning build directory..." + rm -rf "${BUILD_DIR}" + echo "Done." + exit 0 +fi + +# Set build type +if [ "$1" == "debug" ]; then + BUILD_TYPE="Debug" +elif [ "$1" == "release" ]; then + BUILD_TYPE="Release" +fi + +echo "========================================" +echo "GopherMCP MinGW Build Script" +echo "========================================" +echo "Build type: ${BUILD_TYPE}" +echo "Build directory: ${BUILD_DIR}" +echo "" + +# Create build directory +mkdir -p "${BUILD_DIR}" +cd "${BUILD_DIR}" + +# Configure with CMake +echo "Configuring with CMake..." +cmake -DCMAKE_TOOLCHAIN_FILE="${SCRIPT_DIR}/cmake/mingw-w64-toolchain.cmake" \ + -DCMAKE_BUILD_TYPE="${BUILD_TYPE}" \ + -DCMAKE_POLICY_VERSION_MINIMUM=3.5 \ + -DBUILD_EXAMPLES=ON \ + -DBUILD_TESTS=OFF \ + -DBUILD_SHARED_LIBS=OFF \ + -DBUILD_STATIC_LIBS=ON \ + "${SCRIPT_DIR}" + +# Build +echo "" +echo "Building mcp_example_server..." +cmake --build . --target mcp_example_server -j$(nproc 2>/dev/null || echo 4) + +echo "" +echo "========================================" +echo "Build complete!" +echo "========================================" +echo "Executable: ${BUILD_DIR}/examples/mcp/mcp_example_server.exe" +echo "" +echo "To run: ${BUILD_DIR}/examples/mcp/mcp_example_server.exe --help" diff --git a/cmake/mingw-w64-toolchain.cmake b/cmake/mingw-w64-toolchain.cmake new file mode 100644 index 00000000..a5546118 --- /dev/null +++ b/cmake/mingw-w64-toolchain.cmake @@ -0,0 +1,73 @@ +# MinGW-w64 Cross-Compilation Toolchain for Cygwin +# Usage: cmake -DCMAKE_TOOLCHAIN_FILE=../cmake/mingw-w64-toolchain.cmake .. + +set(CMAKE_SYSTEM_NAME Windows) +set(CMAKE_SYSTEM_PROCESSOR x86_64) + +# Specify the cross compilers +set(CMAKE_C_COMPILER x86_64-w64-mingw32-gcc.exe) +set(CMAKE_CXX_COMPILER x86_64-w64-mingw32-g++.exe) +set(CMAKE_RC_COMPILER x86_64-w64-mingw32-windres.exe) +set(CMAKE_AR x86_64-w64-mingw32-ar.exe) +set(CMAKE_RANLIB x86_64-w64-mingw32-ranlib.exe) + +# MinGW sysroot paths +set(MINGW_SYSROOT /usr/x86_64-w64-mingw32/sys-root/mingw) + +# Target environment - Cygwin's MinGW sysroot +set(CMAKE_FIND_ROOT_PATH + ${MINGW_SYSROOT} + /usr/x86_64-w64-mingw32 +) + +# Search for programs in the build host directories +set(CMAKE_FIND_ROOT_PATH_MODE_PROGRAM NEVER) +# Search for libraries and headers in the target directories +set(CMAKE_FIND_ROOT_PATH_MODE_LIBRARY ONLY) +set(CMAKE_FIND_ROOT_PATH_MODE_INCLUDE ONLY) +set(CMAKE_FIND_ROOT_PATH_MODE_PACKAGE ONLY) + +# OpenSSL paths for MinGW +set(OPENSSL_ROOT_DIR ${MINGW_SYSROOT}) +set(OPENSSL_INCLUDE_DIR ${MINGW_SYSROOT}/include) +set(OPENSSL_CRYPTO_LIBRARY ${MINGW_SYSROOT}/lib/libcrypto.dll.a) +set(OPENSSL_SSL_LIBRARY ${MINGW_SYSROOT}/lib/libssl.dll.a) + +# yaml-cpp paths for MinGW +set(yaml-cpp_DIR ${MINGW_SYSROOT}/lib/cmake/yaml-cpp) +set(YAML_CPP_INCLUDE_DIR ${MINGW_SYSROOT}/include) +set(YAML_CPP_LIBRARIES ${MINGW_SYSROOT}/lib/libyaml-cpp.dll.a) + +# libevent paths for MinGW +set(LIBEVENT_INCLUDE_DIRS ${MINGW_SYSROOT}/include) +set(LIBEVENT_LIBRARIES + ${MINGW_SYSROOT}/lib/libevent.dll.a + ${MINGW_SYSROOT}/lib/libevent_core.dll.a +) + +# fmt library paths +set(fmt_DIR ${MINGW_SYSROOT}/lib/cmake/fmt) + +# pkg-config path for MinGW packages +set(ENV{PKG_CONFIG_PATH} "${MINGW_SYSROOT}/lib/pkgconfig") +set(PKG_CONFIG_EXECUTABLE /usr/bin/x86_64-w64-mingw32-pkg-config) + +# Additional include/library paths +set(CMAKE_INCLUDE_PATH ${MINGW_SYSROOT}/include) +set(CMAKE_LIBRARY_PATH ${MINGW_SYSROOT}/lib) +set(CMAKE_PREFIX_PATH ${MINGW_SYSROOT}) + +# Windows platform definitions +set(WIN32 TRUE) +set(MINGW TRUE) +add_definitions(-D_WIN32 -DWIN32 -D_WINDOWS -DMINGW) + +# Ensure Windows socket libraries are linked +link_libraries(ws2_32 mswsock) + +# Disable features that may cause issues with cross-compilation +set(CMAKE_CROSSCOMPILING TRUE) + +# Static linking of libgcc and libstdc++ for easier distribution +set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -static-libgcc -static-libstdc++") +set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} -static-libgcc -static-libstdc++") diff --git a/examples/mcp/CMakeLists.txt b/examples/mcp/CMakeLists.txt index d989e7a3..7b6c3175 100644 --- a/examples/mcp/CMakeLists.txt +++ b/examples/mcp/CMakeLists.txt @@ -47,4 +47,11 @@ install(TARGETS mcp_example_client mcp_example_server mcp_config_example_server install(FILES ../configs/mcp_server_example.json DESTINATION share/mcp/examples/configs -) \ No newline at end of file +) + +# Windows-specific libraries +if(WIN32) + target_link_libraries(mcp_example_client ws2_32 mswsock) + target_link_libraries(mcp_example_server ws2_32 mswsock) + target_link_libraries(mcp_config_example_server ws2_32 mswsock) +endif() \ No newline at end of file diff --git a/examples/mcp/mcp_example_server.cc b/examples/mcp/mcp_example_server.cc index a92a8f08..6abb60e7 100644 --- a/examples/mcp/mcp_example_server.cc +++ b/examples/mcp/mcp_example_server.cc @@ -94,10 +94,16 @@ #include #include #include -#include #include #include + +// Platform-specific includes for signal/shutdown handling +#ifdef _WIN32 +#include +#else +#include #include // for _exit +#endif #include "mcp/json/json_bridge.h" #include "mcp/server/mcp_server.h" @@ -134,6 +140,39 @@ struct ServerOptions { std::string http_health_path = "/health"; }; +// Platform-specific shutdown handler +#ifdef _WIN32 +// Windows Console Control Handler +BOOL WINAPI ConsoleCtrlHandler(DWORD ctrlType) { + switch (ctrlType) { + case CTRL_C_EVENT: + case CTRL_BREAK_EVENT: + case CTRL_CLOSE_EVENT: + case CTRL_SHUTDOWN_EVENT: + std::cerr << "\n[INFO] Received console control event " << ctrlType + << ", initiating graceful shutdown..." << std::endl; + + // Set the shutdown flag + g_shutdown = true; + + // Notify the shutdown monitor thread + g_shutdown_cv.notify_all(); + + // For safety, if we receive multiple signals, force exit + static std::atomic signal_count(0); + signal_count++; + if (signal_count > 1) { + std::cerr << "\n[INFO] Force shutdown after multiple signals..." + << std::endl; + ExitProcess(0); + } + return TRUE; + default: + return FALSE; + } +} +#else +// Unix signal handler void signal_handler(int signal) { // Signal handlers should do minimal work // Just set the flag and notify - actual shutdown happens in main thread @@ -158,6 +197,7 @@ void signal_handler(int signal) { _exit(0); } } +#endif void printUsage(const char* program) { std::cerr << "USAGE: " << program << " [options]\n\n"; @@ -847,9 +887,13 @@ void printStatistics(const McpServer& server) { } int main(int argc, char* argv[]) { - // Install signal handlers + // Install platform-specific signal/shutdown handlers +#ifdef _WIN32 + SetConsoleCtrlHandler(ConsoleCtrlHandler, TRUE); +#else signal(SIGINT, signal_handler); signal(SIGTERM, signal_handler); +#endif // Parse command-line options ServerOptions options = parseArguments(argc, argv); diff --git a/include/mcp/event/event_loop.h b/include/mcp/event/event_loop.h index abbdf4f1..11feaea3 100644 --- a/include/mcp/event/event_loop.h +++ b/include/mcp/event/event_loop.h @@ -9,11 +9,24 @@ #include #include +#ifdef _WIN32 +#include +#endif + #include "mcp/core/compat.h" namespace mcp { namespace event { +// Platform-specific socket/fd type for event monitoring +// On Windows, socket handles are SOCKET type +// On Unix/Linux, file descriptors are 32-bit int +#ifdef _WIN32 +using os_fd_t = SOCKET; +#else +using os_fd_t = int; +#endif + // Forward declaration class WatermarkFactory { public: @@ -84,11 +97,12 @@ enum class FileTriggerType { // Determine platform-preferred event type constexpr FileTriggerType determinePlatformPreferredEventType() { -#if defined(WIN32) || defined(_WIN32) || defined(__WIN32__) || \ - defined(FORCE_LEVEL_EVENTS) - // Windows doesn't support native edge triggers, use emulated - // FORCE_LEVEL_EVENTS allows testing Windows behavior on POSIX - return FileTriggerType::EmulatedEdge; +#if defined(WIN32) || defined(_WIN32) || defined(__WIN32__) + // Windows select() only supports level-triggered mode + return FileTriggerType::Level; +#elif defined(FORCE_LEVEL_EVENTS) + // FORCE_LEVEL_EVENTS allows testing level-triggered behavior on POSIX + return FileTriggerType::Level; #elif defined(__APPLE__) || defined(__FreeBSD__) // macOS/BSD: Use level-triggered to avoid issues // Edge-triggered with EV_CLEAR causes problems with our event handling @@ -309,8 +323,9 @@ class Dispatcher : public DispatcherBase { /** * Create a file event that monitors a file descriptor. + * @param fd Platform-specific socket/fd (os_fd_t: int on Unix, uintptr_t on Windows) */ - virtual FileEventPtr createFileEvent(int fd, + virtual FileEventPtr createFileEvent(os_fd_t fd, FileReadyCb cb, FileTriggerType trigger, uint32_t events) = 0; diff --git a/include/mcp/event/libevent_dispatcher.h b/include/mcp/event/libevent_dispatcher.h index 67e739a0..9b40bacc 100644 --- a/include/mcp/event/libevent_dispatcher.h +++ b/include/mcp/event/libevent_dispatcher.h @@ -18,6 +18,15 @@ namespace event { // Rename to avoid conflict with struct event using libevent_event = struct event; +// Platform-compatible socket type to match libevent's evutil_socket_t +// On Windows, uses SOCKET type (same as os_fd_t) +// On Unix/Linux, uses int (32-bit) +#ifdef _WIN32 +using libevent_socket_t = SOCKET; +#else +using libevent_socket_t = int; +#endif + /** * @brief Libevent-based implementation of the Dispatcher interface * @@ -40,7 +49,7 @@ class LibeventDispatcher : public Dispatcher { void registerWatchdog(const WatchDogSharedPtr& watchdog, std::chrono::milliseconds min_touch_interval) override; - FileEventPtr createFileEvent(int fd, + FileEventPtr createFileEvent(os_fd_t fd, FileReadyCb cb, FileTriggerType trigger, uint32_t events) override; @@ -87,7 +96,7 @@ class LibeventDispatcher : public Dispatcher { class FileEventImpl : public FileEvent { public: FileEventImpl(LibeventDispatcher& dispatcher, - int fd, + os_fd_t fd, FileReadyCb cb, FileTriggerType trigger, uint32_t events); @@ -99,13 +108,13 @@ class LibeventDispatcher : public Dispatcher { void registerEventIfEmulatedEdge(uint32_t event) override; private: - static void eventCallback(int fd, short events, void* arg); + static void eventCallback(libevent_socket_t fd, short events, void* arg); void assignEvents(uint32_t events); void updateEvents(uint32_t events); void mergeInjectedEventsAndRunCb(uint32_t events); LibeventDispatcher& dispatcher_; - int fd_; + os_fd_t fd_; FileReadyCb cb_; FileTriggerType trigger_; libevent_event* event_; @@ -129,7 +138,7 @@ class LibeventDispatcher : public Dispatcher { bool enabled() override; private: - static void timerCallback(int fd, short events, void* arg); + static void timerCallback(libevent_socket_t fd, short events, void* arg); LibeventDispatcher& dispatcher_; TimerCb cb_; @@ -165,7 +174,7 @@ class LibeventDispatcher : public Dispatcher { ~SignalEventImpl() override; private: - static void signalCallback(int fd, short events, void* arg); + static void signalCallback(libevent_socket_t fd, short events, void* arg); LibeventDispatcher& dispatcher_; int signal_num_; @@ -185,7 +194,7 @@ class LibeventDispatcher : public Dispatcher { void runDeferredDeletes(); void touchWatchdog(); void initializeLibevent(); - static void postWakeupCallback(int fd, short events, void* arg); + static void postWakeupCallback(libevent_socket_t fd, short events, void* arg); // Member variables const std::string name_; @@ -196,7 +205,7 @@ class LibeventDispatcher : public Dispatcher { // Post callback handling std::mutex post_mutex_; std::queue post_callbacks_; - int wakeup_fd_[2]; // Pipe for waking up event loop + libevent_socket_t wakeup_fd_[2]; // Pipe for waking up event loop libevent_event* wakeup_event_; // Deferred deletion diff --git a/include/mcp/filter/filter_event.h b/include/mcp/filter/filter_event.h index ea88ca75..c33dc183 100644 --- a/include/mcp/filter/filter_event.h +++ b/include/mcp/filter/filter_event.h @@ -63,6 +63,10 @@ enum class FilterEventType { * * Severity classification for filtering and prioritization of events. */ +// Windows defines ERROR as a macro in wingdi.h, undef it to avoid conflict +#ifdef ERROR +#undef ERROR +#endif enum class FilterEventSeverity { TRACE = 0, ///< Trace-level diagnostic information DEBUG = 1, ///< Debug-level information diff --git a/include/mcp/http/http_parser.h b/include/mcp/http/http_parser.h index 0053e383..4abbb855 100644 --- a/include/mcp/http/http_parser.h +++ b/include/mcp/http/http_parser.h @@ -47,6 +47,10 @@ enum class HttpStatusCode : uint16_t { }; // HTTP methods +// Note: Windows defines DELETE as a macro in winnt.h, so we need to undefine it +#ifdef DELETE +#undef DELETE +#endif enum class HttpMethod { GET, POST, diff --git a/include/mcp/network/address.h b/include/mcp/network/address.h index 2dddcb8c..2aeabbff 100644 --- a/include/mcp/network/address.h +++ b/include/mcp/network/address.h @@ -1,6 +1,7 @@ #ifndef MCP_NETWORK_ADDRESS_H #define MCP_NETWORK_ADDRESS_H +#include #include #include #include diff --git a/include/mcp/network/address_impl.h b/include/mcp/network/address_impl.h index 853c5ad2..97150739 100644 --- a/include/mcp/network/address_impl.h +++ b/include/mcp/network/address_impl.h @@ -71,6 +71,7 @@ class Ipv6Instance : public Ip { sockaddr_in6 addr_; }; +#ifndef _WIN32 /** * Unix domain socket address implementation */ @@ -99,6 +100,7 @@ class PipeInstance : public Pipe { std::string path_; mode_t mode_; }; +#endif // !_WIN32 } // namespace Address } // namespace network diff --git a/include/mcp/network/connection_utility.h b/include/mcp/network/connection_utility.h index 12620f27..dd3a7ce5 100644 --- a/include/mcp/network/connection_utility.h +++ b/include/mcp/network/connection_utility.h @@ -2,6 +2,7 @@ #include +#include "mcp/network/io_handle.h" // For os_fd_t #include "mcp/network/socket.h" namespace mcp { @@ -14,9 +15,9 @@ class SocketConfigUtility { public: /** * Set socket options optimized for performance (simple interface) - * @param fd The file descriptor to configure + * @param fd Platform-specific socket/fd (os_fd_t: int on Unix, SOCKET on Windows) */ - static void setSocketOptions(int fd); + static void setSocketOptions(os_fd_t fd); /** * Configure a socket for optimal connection settings diff --git a/include/mcp/network/io_handle.h b/include/mcp/network/io_handle.h index ab545de6..4167502e 100644 --- a/include/mcp/network/io_handle.h +++ b/include/mcp/network/io_handle.h @@ -20,9 +20,11 @@ class IoHandle; using IoHandlePtr = std::unique_ptr; // Platform-specific socket type +// On Windows, uses SOCKET type directly +// On Unix/Linux, uses int file descriptor #ifdef _WIN32 -using os_fd_t = uintptr_t; -constexpr os_fd_t INVALID_SOCKET_FD = static_cast(-1); +using os_fd_t = SOCKET; +constexpr os_fd_t INVALID_SOCKET_FD = INVALID_SOCKET; #else using os_fd_t = int; constexpr os_fd_t INVALID_SOCKET_FD = -1; diff --git a/include/mcp/network/socket_interface.h b/include/mcp/network/socket_interface.h index 482d889f..dddb99d5 100644 --- a/include/mcp/network/socket_interface.h +++ b/include/mcp/network/socket_interface.h @@ -243,9 +243,9 @@ SocketInterface& socketInterface(); /** * Set a custom socket interface (for testing). - * @param interface Custom interface + * @param iface Custom interface */ -void setSocketInterface(SocketInterfacePtr interface); +void setSocketInterface(SocketInterfacePtr iface); /** * Reset to default socket interface. diff --git a/include/mcp/network/socket_interface_impl.h b/include/mcp/network/socket_interface_impl.h index 644ac473..57002191 100644 --- a/include/mcp/network/socket_interface_impl.h +++ b/include/mcp/network/socket_interface_impl.h @@ -84,7 +84,7 @@ class SocketInterfaceImpl : public SocketInterface { bool supportsReusePort() const override; private: - void setNonBlocking(os_fd_t fd); + int setNonBlocking(os_fd_t fd); // Returns 0 on success, -1 on error void setCloseOnExec(os_fd_t fd); std::string detectPlatform(); void initializeCapabilities(); diff --git a/include/mcp/protocol/mcp_protocol_state_machine.h b/include/mcp/protocol/mcp_protocol_state_machine.h index 5b5a62fa..92cb0406 100644 --- a/include/mcp/protocol/mcp_protocol_state_machine.h +++ b/include/mcp/protocol/mcp_protocol_state_machine.h @@ -35,6 +35,10 @@ namespace protocol { * These states track MCP-specific protocol phases like initialization and * capability negotiation. */ +// Windows defines ERROR as a macro in wingdi.h, undef it to avoid conflict +#ifdef ERROR +#undef ERROR +#endif enum class McpProtocolState { // Initial state - no connection DISCONNECTED, diff --git a/include/mcp/transport/pipe_io_handle.h b/include/mcp/transport/pipe_io_handle.h index 85fc9fa4..6384a7de 100644 --- a/include/mcp/transport/pipe_io_handle.h +++ b/include/mcp/transport/pipe_io_handle.h @@ -1,7 +1,13 @@ #pragma once +// Platform-specific includes +#ifdef _WIN32 +#include +#include +#else #include #include +#endif #include "mcp/network/io_socket_handle_impl.h" @@ -23,7 +29,11 @@ class PipeIoHandle : public network::IoSocketHandleImpl { ~PipeIoHandle() override { // Close write FD if still open if (write_fd_ >= 0) { +#ifdef _WIN32 + closesocket(write_fd_); +#else ::close(write_fd_); +#endif write_fd_ = -1; } } @@ -39,11 +49,15 @@ class PipeIoHandle : public network::IoSocketHandleImpl { return network::IoCallResult::success(0); } - // Use write() for pipes (not send() or writev()) // Writing to pipes: handle one slice at a time for simplicity size_t total_written = 0; for (size_t i = 0; i < num_slices; ++i) { +#ifdef _WIN32 + int result = send(write_fd_, static_cast(slices[i].mem_), + static_cast(slices[i].len_), 0); +#else ssize_t result = ::write(write_fd_, slices[i].mem_, slices[i].len_); +#endif if (result >= 0) { total_written += result; if (static_cast(result) < slices[i].len_) { @@ -51,6 +65,16 @@ class PipeIoHandle : public network::IoSocketHandleImpl { break; } } else { +#ifdef _WIN32 + int err = WSAGetLastError(); + if (total_written > 0) { + return network::IoCallResult::success(total_written); + } + if (err == WSAEWOULDBLOCK) { + return network::IoCallResult::error(EAGAIN); + } + return network::IoCallResult::error(err); +#else int err = errno; if (total_written > 0) { // Return what we've written so far @@ -62,6 +86,7 @@ class PipeIoHandle : public network::IoSocketHandleImpl { return network::IoCallResult::error(err); } return network::IoCallResult::error(err); +#endif } } return network::IoCallResult::success(total_written); @@ -71,7 +96,11 @@ class PipeIoHandle : public network::IoSocketHandleImpl { network::IoVoidResult close() override { // Close write FD first if (write_fd_ >= 0) { +#ifdef _WIN32 + closesocket(write_fd_); +#else ::close(write_fd_); +#endif write_fd_ = -1; } // Then close read FD via parent class diff --git a/src/echo/echo_stdio_transport_advanced.cc b/src/echo/echo_stdio_transport_advanced.cc index e31aaffa..173fedb3 100644 --- a/src/echo/echo_stdio_transport_advanced.cc +++ b/src/echo/echo_stdio_transport_advanced.cc @@ -7,9 +7,29 @@ #include #include -#include #include + +#ifdef _WIN32 +#include +#include +#define read(fd, buf, len) _read(fd, buf, static_cast(len)) +#define write(fd, buf, len) _write(fd, buf, static_cast(len)) +#ifndef STDIN_FILENO +#define STDIN_FILENO 0 +#endif +#ifndef STDOUT_FILENO +#define STDOUT_FILENO 1 +#endif +#ifndef EAGAIN +#define EAGAIN WSAEWOULDBLOCK +#endif +#ifndef EWOULDBLOCK +#define EWOULDBLOCK WSAEWOULDBLOCK +#endif +#else +#include #include +#endif namespace mcp { namespace echo { @@ -165,10 +185,17 @@ void StdioEchoTransport::readThread() { } void StdioEchoTransport::setNonBlocking(int fd) { +#ifdef _WIN32 + // On Windows, stdin/stdout don't support ioctlsocket + // Non-blocking mode for console I/O requires different approach + // For now, we rely on polling with timeout + (void)fd; // Suppress unused parameter warning +#else int flags = fcntl(fd, F_GETFL, 0); if (flags != -1) { fcntl(fd, F_SETFL, flags | O_NONBLOCK); } +#endif } } // namespace echo diff --git a/src/event/libevent_dispatcher.cc b/src/event/libevent_dispatcher.cc index 6f9aca3c..9685cac5 100644 --- a/src/event/libevent_dispatcher.cc +++ b/src/event/libevent_dispatcher.cc @@ -4,7 +4,15 @@ #include #include #include +#include + +// Platform-specific includes +#ifdef _WIN32 +#include +#include +#else #include +#endif #include #include @@ -103,10 +111,18 @@ LibeventDispatcher::~LibeventDispatcher() { } if (wakeup_fd_[0] >= 0) { +#ifdef _WIN32 + evutil_closesocket(wakeup_fd_[0]); +#else close(wakeup_fd_[0]); +#endif } if (wakeup_fd_[1] >= 0) { +#ifdef _WIN32 + evutil_closesocket(wakeup_fd_[1]); +#else close(wakeup_fd_[1]); +#endif } if (base_) { @@ -134,18 +150,39 @@ void LibeventDispatcher::initializeLibevent() { throw std::runtime_error("Failed to create event base"); } + // Debug: Print which backend libevent is using + const char* method = event_base_get_method(base_); + std::cerr << "[DEBUG LIBEVENT] Created event base using backend: " + << (method ? method : "unknown") << std::endl; + // Create pipe for waking up the event loop + // TODO We can keep evutil_socketpair as platformm independent code + // and remove pipe later +#ifdef _WIN32 + // On Windows, use evutil_socketpair which emulates Unix socketpair + // evutil_socketpair expects evutil_socket_t* (intptr_t*), cast from our SOCKET type + evutil_socket_t temp_fds[2]; + if (evutil_socketpair(AF_INET, SOCK_STREAM, 0, temp_fds) != 0) { + throw std::runtime_error("Failed to create wakeup pipe"); + } + wakeup_fd_[0] = static_cast(temp_fds[0]); + wakeup_fd_[1] = static_cast(temp_fds[1]); +#else if (pipe(wakeup_fd_) != 0) { throw std::runtime_error("Failed to create wakeup pipe"); } +#endif // Make pipe non-blocking - evutil_make_socket_nonblocking(wakeup_fd_[0]); - evutil_make_socket_nonblocking(wakeup_fd_[1]); + evutil_make_socket_nonblocking(static_cast(wakeup_fd_[0])); + evutil_make_socket_nonblocking(static_cast(wakeup_fd_[1])); // Create wakeup event - wakeup_event_ = event_new(base_, wakeup_fd_[0], EV_READ | EV_PERSIST, - &LibeventDispatcher::postWakeupCallback, this); + // Cast callback to match libevent's expected signature (evutil_socket_t is intptr_t) + wakeup_event_ = event_new(base_, static_cast(wakeup_fd_[0]), + EV_READ | EV_PERSIST, + reinterpret_cast(&LibeventDispatcher::postWakeupCallback), + this); if (!wakeup_event_) { throw std::runtime_error("Failed to create wakeup event"); } @@ -171,7 +208,11 @@ void LibeventDispatcher::post(PostCb callback) { if (need_wakeup && !isThreadSafe()) { // Wake up the event loop char byte = 1; +#ifdef _WIN32 + int rc = send(wakeup_fd_[1], &byte, 1, 0); +#else ssize_t rc = write(wakeup_fd_[1], &byte, 1); +#endif (void)rc; // Ignore EAGAIN } } @@ -209,10 +250,13 @@ void LibeventDispatcher::registerWatchdog( touchWatchdog(); } -FileEventPtr LibeventDispatcher::createFileEvent(int fd, +FileEventPtr LibeventDispatcher::createFileEvent(os_fd_t fd, FileReadyCb cb, FileTriggerType trigger, uint32_t events) { + std::cerr << "[DEBUG LIBEVENT] createFileEvent: fd=" << fd + << " events=" << events + << " trigger=" << static_cast(trigger) << std::endl; assert(thread_id_ == std::thread::id() || isThreadSafe()); return std::make_unique(*this, fd, std::move(cb), trigger, events); @@ -271,6 +315,9 @@ SignalEventPtr LibeventDispatcher::listenForSignal(int signal_num, } void LibeventDispatcher::run(RunType type) { + std::cerr << "[DEBUG LIBEVENT] run() called with type=" << static_cast(type) + << " base_ptr=" << (void*)base_ << std::endl; + // Reset exit flag to allow dispatcher reuse exit_requested_ = false; thread_id_ = std::this_thread::get_id(); @@ -281,22 +328,45 @@ void LibeventDispatcher::run(RunType type) { int flags = 0; switch (type) { case RunType::Block: + std::cerr << "[DEBUG LIBEVENT] Using Block mode" << std::endl; // Run until no more events break; case RunType::NonBlock: flags = EVLOOP_NONBLOCK; break; case RunType::RunUntilExit: + std::cerr << "[DEBUG LIBEVENT] Entering RunUntilExit loop" << std::endl; while (!exit_requested_) { updateApproximateMonotonicTime(); - event_base_loop(base_, EVLOOP_ONCE); + int loop_result = event_base_loop(base_, EVLOOP_ONCE); + std::cerr << "[DEBUG LIBEVENT] event_base_loop returned: " << loop_result + << " exit_requested=" << exit_requested_ << std::endl; runPostCallbacks(); } return; } updateApproximateMonotonicTime(); - event_base_loop(base_, flags); + int num_events = event_base_get_num_events(base_, EVENT_BASE_COUNT_ADDED); + std::cerr << "[DEBUG LIBEVENT] Calling event_base_loop with flags=" << flags + << " num_events=" << num_events << std::endl; + std::cerr.flush(); + +#ifdef _WIN32 + // On Windows, try dispatch() for blocking mode - might handle sockets better + int result; + if (flags == 0) { + std::cerr << "[DEBUG LIBEVENT] Using event_base_dispatch() for Windows" << std::endl; + std::cerr.flush(); + result = event_base_dispatch(base_); + } else { + result = event_base_loop(base_, flags); + } +#else + int result = event_base_loop(base_, flags); +#endif + std::cerr << "[DEBUG LIBEVENT] event loop returned: " << result << std::endl; + std::cerr.flush(); runPostCallbacks(); } @@ -352,14 +422,18 @@ void LibeventDispatcher::shutdown() { } } -void LibeventDispatcher::postWakeupCallback(int fd, +void LibeventDispatcher::postWakeupCallback(libevent_socket_t fd, short /*events*/, void* arg) { auto* dispatcher = static_cast(arg); // Drain the pipe char buffer[256]; +#ifdef _WIN32 + while (recv(fd, buffer, sizeof(buffer), 0) > 0) { +#else while (read(fd, buffer, sizeof(buffer)) > 0) { +#endif // Continue draining } @@ -402,7 +476,7 @@ void LibeventDispatcher::touchWatchdog() { // FileEventImpl implementation LibeventDispatcher::FileEventImpl::FileEventImpl(LibeventDispatcher& dispatcher, - int fd, + os_fd_t fd, FileReadyCb cb, FileTriggerType trigger, uint32_t events) @@ -412,12 +486,18 @@ LibeventDispatcher::FileEventImpl::FileEventImpl(LibeventDispatcher& dispatcher, trigger_(trigger), enabled_events_(0) { #ifdef _WIN32 - // Windows doesn't support edge triggers with libevent - if (trigger == FileTriggerType::Edge) { - trigger_ = FileTriggerType::EmulatedEdge; - } -#endif - + // Windows only supports level-triggered mode via select() + // PlatformDefaultTriggerType should be Level on Windows + if (trigger != FileTriggerType::Level) { + throw std::runtime_error( + "Windows only supports FileTriggerType::Level. " + "Use PlatformDefaultTriggerType for cross-platform code."); + } + // On Windows, don't create the event here - create it in assignEvents() + // with the proper flags from the start. This ensures the socket is properly + // registered with select() when the event is created. + event_ = nullptr; +#else // Validate EmulatedEdge usage if constexpr (PlatformDefaultTriggerType != FileTriggerType::EmulatedEdge) { if (trigger_ == FileTriggerType::EmulatedEdge) { @@ -433,11 +513,16 @@ LibeventDispatcher::FileEventImpl::FileEventImpl(LibeventDispatcher& dispatcher, throw std::runtime_error("Failed to create file event"); } + std::cerr << "[DEBUG LIBEVENT] FileEventImpl created: fd=" << fd_ + << " event_ptr=" << (void*)event_ + << " base_ptr=" << (void*)dispatcher_.base() << std::endl; + if (trigger_ == FileTriggerType::EmulatedEdge) { // Create activation callback for emulated edge support activation_cb_ = std::make_unique( dispatcher_, [this]() { mergeInjectedEventsAndRunCb(0); }); } +#endif setEnabled(events); } @@ -478,6 +563,22 @@ void LibeventDispatcher::FileEventImpl::setEnabled(uint32_t events) { } void LibeventDispatcher::FileEventImpl::updateEvents(uint32_t events) { +#ifdef _WIN32 + // On Windows, assignEvents() creates/recreates the event entirely. + // event_ may be nullptr initially (deferred creation from constructor). + bool had_events = event_added_; + event_added_ = false; + + if (events != 0) { + assignEvents(events); + event_added_ = true; + } else if (had_events && event_) { + // No new events requested, just delete and free the old event + event_del(event_); + event_free(event_); + event_ = nullptr; + } +#else if (event_) { // Only call event_del if the event was previously added to the event base // This prevents the "event has no event_base set" warning @@ -491,19 +592,85 @@ void LibeventDispatcher::FileEventImpl::updateEvents(uint32_t events) { event_added_ = true; } } +#endif } void LibeventDispatcher::FileEventImpl::assignEvents(uint32_t events) { short libevent_events = toLibeventEvents(events, trigger_); +#ifdef _WIN32 + // On Windows, we create events fresh each time with the proper flags. + // This is because Windows select() requires the socket to be properly + // registered when the event is created, not reassigned later. + // Free old event if it exists (from previous assignEvents call) + if (event_) { + event_del(event_); + event_free(event_); + std::cerr << "[DEBUG LIBEVENT] Windows: freed old event" << std::endl; + } + + // Debug: Use a simple C callback wrapper to test if the issue is with our callback + // Store 'this' in a static map since we'll pass nullptr to event_new to test + static std::unordered_map fd_to_impl; + fd_to_impl[fd_] = this; + + static auto debug_callback = [](evutil_socket_t fd, short what, void* arg) { + std::cerr << "[DEBUG LIBEVENT] *** C CALLBACK WRAPPER FIRED! fd=" << fd + << " what=" << what << " arg=" << arg << " ***" << std::endl; + std::cerr.flush(); + // Look up the real impl from our map + auto it = fd_to_impl.find(fd); + if (it != fd_to_impl.end()) { + FileEventImpl::eventCallback(fd, what, it->second); + } + }; + + // Create new event with proper flags from the start + // TEST: Pass nullptr like the working direct test socket + event_ = event_new(dispatcher_.base(), fd_, libevent_events, + debug_callback, nullptr); + if (!event_) { + std::cerr << "[DEBUG LIBEVENT] ERROR: event_new failed in assignEvents!" << std::endl; + return; + } + int add_result = event_add(event_, nullptr); + + std::cerr << "[DEBUG LIBEVENT] Windows: created event with flags, fd=" << fd_ + << " libevent_events=" << libevent_events + << " event_ptr=" << (void*)event_ + << " base_ptr=" << (void*)dispatcher_.base() + << " callback=" << (void*)+debug_callback + << " this=" << (void*)this + << " add_result=" << add_result << std::endl; + + short pending = event_pending(event_, EV_READ | EV_WRITE | EV_TIMEOUT, nullptr); + std::cerr << "[DEBUG LIBEVENT] Windows: event_pending=" + << pending << " event_fd=" << event_get_fd(event_) + << " event_events=" << event_get_events(event_) << std::endl; + + static bool dumped_events = false; + if (!dumped_events) { + std::cerr << "[DEBUG LIBEVENT] Dumping event_base events once for debugging" + << std::endl; + event_base_dump_events(dispatcher_.base(), stderr); + dumped_events = true; + } +#else event_assign(event_, dispatcher_.base(), fd_, libevent_events, &FileEventImpl::eventCallback, this); - event_add(event_, nullptr); + int add_result = event_add(event_, nullptr); + std::cerr << "[DEBUG LIBEVENT] event_add called: fd=" << fd_ + << " libevent_events=" << libevent_events + << " result=" << add_result << std::endl; +#endif } -void LibeventDispatcher::FileEventImpl::eventCallback(int fd, +void LibeventDispatcher::FileEventImpl::eventCallback(libevent_socket_t fd, short events, void* arg) { + std::cerr << "[DEBUG LIBEVENT] eventCallback fired: fd=" << fd + << " events=" << events << std::endl; + auto* file_event = static_cast(arg); // Update approximate time before callback @@ -573,7 +740,9 @@ void LibeventDispatcher::FileEventImpl::registerEventIfEmulatedEdge( LibeventDispatcher::TimerImpl::TimerImpl(LibeventDispatcher& dispatcher, TimerCb cb) : dispatcher_(dispatcher), cb_(std::move(cb)), enabled_(false) { - event_ = evtimer_new(dispatcher_.base(), &TimerImpl::timerCallback, this); + event_ = evtimer_new(dispatcher_.base(), + reinterpret_cast(&TimerImpl::timerCallback), + this); if (!event_) { throw std::runtime_error("Failed to create timer"); } @@ -615,7 +784,7 @@ void LibeventDispatcher::TimerImpl::enableHRTimer( bool LibeventDispatcher::TimerImpl::enabled() { return enabled_; } -void LibeventDispatcher::TimerImpl::timerCallback(int /*fd*/, +void LibeventDispatcher::TimerImpl::timerCallback(libevent_socket_t /*fd*/, short /*events*/, void* arg) { auto* timer = static_cast(arg); @@ -690,7 +859,8 @@ LibeventDispatcher::SignalEventImpl::SignalEventImpl( LibeventDispatcher& dispatcher, int signal_num, SignalCb cb) : dispatcher_(dispatcher), signal_num_(signal_num), cb_(std::move(cb)) { event_ = evsignal_new(dispatcher_.base(), signal_num_, - &SignalEventImpl::signalCallback, this); + reinterpret_cast(&SignalEventImpl::signalCallback), + this); if (!event_) { throw std::runtime_error("Failed to create signal event"); } @@ -705,7 +875,7 @@ LibeventDispatcher::SignalEventImpl::~SignalEventImpl() { } } -void LibeventDispatcher::SignalEventImpl::signalCallback(int /*fd*/, +void LibeventDispatcher::SignalEventImpl::signalCallback(libevent_socket_t /*fd*/, short /*events*/, void* arg) { auto* signal_event = static_cast(arg); diff --git a/src/filter/protocol_detection_filter.cc b/src/filter/protocol_detection_filter.cc index 833b77d7..3d63d347 100644 --- a/src/filter/protocol_detection_filter.cc +++ b/src/filter/protocol_detection_filter.cc @@ -8,6 +8,25 @@ #include #include +#ifdef _WIN32 +// memmem is a GNU extension, provide implementation for Windows +static void* memmem(const void* haystack, size_t haystacklen, + const void* needle, size_t needlelen) { + if (needlelen == 0) return const_cast(haystack); + if (haystacklen < needlelen) return nullptr; + + const char* h = static_cast(haystack); + const char* n = static_cast(needle); + + for (size_t i = 0; i <= haystacklen - needlelen; ++i) { + if (memcmp(h + i, n, needlelen) == 0) { + return const_cast(static_cast(h + i)); + } + } + return nullptr; +} +#endif + #include "mcp/buffer.h" #include "mcp/filter/http_routing_filter.h" #include "mcp/filter/sse_codec_filter.h" diff --git a/src/logging/CMakeLists.txt b/src/logging/CMakeLists.txt index 8ea98022..b9fac791 100644 --- a/src/logging/CMakeLists.txt +++ b/src/logging/CMakeLists.txt @@ -1,7 +1,11 @@ # Logging library CMake configuration cmake_minimum_required(VERSION 3.14) -find_package(fmt REQUIRED CONFIG) +# fmt should already be available from parent CMakeLists.txt +# Only find if not already defined +if(NOT TARGET fmt::fmt) + find_package(fmt REQUIRED CONFIG) +endif() # Collect logging sources set(MCP_LOGGING_SOURCES diff --git a/src/mcp_connection_manager.cc b/src/mcp_connection_manager.cc index 1ea73a2c..9b70a1aa 100644 --- a/src/mcp_connection_manager.cc +++ b/src/mcp_connection_manager.cc @@ -3,7 +3,12 @@ #include #include +#ifdef _WIN32 +#include +#include +#else #include // For TCP_NODELAY +#endif #include "mcp/core/result.h" #include "mcp/filter/http_sse_filter_chain_factory.h" @@ -113,10 +118,19 @@ VoidResult McpConnectionManager::connect() { config_.stdio_config->stdin_fd, config_.stdio_config->stdout_fd); // Create pipe addresses +#ifndef _WIN32 auto local_address = std::make_shared( "/tmp/test_stdio_in"); auto remote_address = std::make_shared( "/tmp/test_stdio_out"); +#else + // Windows: Use loopback placeholder addresses for test pipes + // (PipeInstance is Unix-only, use Ipv4Instance as placeholder) + auto local_address = std::make_shared( + "127.0.0.1", 0); + auto remote_address = std::make_shared( + "127.0.0.1", 0); +#endif // Create the connection socket socket_wrapper = std::make_unique( diff --git a/src/network/connection_impl.cc b/src/network/connection_impl.cc index f90100e4..5d4ec726 100644 --- a/src/network/connection_impl.cc +++ b/src/network/connection_impl.cc @@ -4,9 +4,14 @@ #include #include +#ifdef _WIN32 +#include +#include +#else #include #include #include +#endif #include "mcp/buffer.h" #include "mcp/event/event_loop.h" diff --git a/src/network/connection_manager_impl.cc b/src/network/connection_manager_impl.cc index 10614916..32b64755 100644 --- a/src/network/connection_manager_impl.cc +++ b/src/network/connection_manager_impl.cc @@ -1,6 +1,10 @@ #include +#ifdef _WIN32 +#include +#else #include +#endif #include "mcp/network/connection_impl.h" #include "mcp/network/connection_manager.h" diff --git a/src/network/connection_utility.cc b/src/network/connection_utility.cc index aa53aca1..063ed7ec 100644 --- a/src/network/connection_utility.cc +++ b/src/network/connection_utility.cc @@ -1,27 +1,33 @@ #include "mcp/network/connection_utility.h" +#ifdef _WIN32 +#include +#include +#else #include - #include #include #include +#endif #include "mcp/network/socket_option_impl.h" namespace mcp { namespace network { -void SocketConfigUtility::setSocketOptions(int fd) { - if (fd < 0) +void SocketConfigUtility::setSocketOptions(os_fd_t fd) { + if (fd == INVALID_SOCKET_FD) return; // Set TCP_NODELAY to disable Nagle's algorithm int flag = 1; - setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)); + setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, + reinterpret_cast(&flag), sizeof(flag)); // Enable keep-alive int keepalive = 1; - setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &keepalive, sizeof(keepalive)); + setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, + reinterpret_cast(&keepalive), sizeof(keepalive)); // Set keep-alive parameters #ifdef __linux__ @@ -58,8 +64,10 @@ void SocketConfigUtility::setSocketOptions(int fd) { // Set socket buffer sizes int sndbuf = 256 * 1024; int rcvbuf = 256 * 1024; - setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sndbuf, sizeof(sndbuf)); - setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &rcvbuf, sizeof(rcvbuf)); + setsockopt(fd, SOL_SOCKET, SO_SNDBUF, + reinterpret_cast(&sndbuf), sizeof(sndbuf)); + setsockopt(fd, SOL_SOCKET, SO_RCVBUF, + reinterpret_cast(&rcvbuf), sizeof(rcvbuf)); } void SocketConfigUtility::configureSocket(Socket& socket, @@ -74,11 +82,12 @@ void SocketConfigUtility::configureKeepAlive(Socket& socket, std::chrono::seconds idle_time, std::chrono::seconds interval, uint32_t probes) { - int fd = socket.ioHandle().fd(); + os_fd_t fd = socket.ioHandle().fd(); // Enable/disable keep-alive int keep_alive = enable ? 1 : 0; - setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &keep_alive, sizeof(keep_alive)); + setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, + reinterpret_cast(&keep_alive), sizeof(keep_alive)); if (!enable) return; @@ -104,25 +113,28 @@ void SocketConfigUtility::configureKeepAlive(Socket& socket, void SocketConfigUtility::configureBufferSizes(Socket& socket, uint32_t receive_buffer_size, uint32_t send_buffer_size) { - int fd = socket.ioHandle().fd(); + os_fd_t fd = socket.ioHandle().fd(); if (receive_buffer_size > 0) { int size = receive_buffer_size; - setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)); + setsockopt(fd, SOL_SOCKET, SO_RCVBUF, + reinterpret_cast(&size), sizeof(size)); } if (send_buffer_size > 0) { int size = send_buffer_size; - setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)); + setsockopt(fd, SOL_SOCKET, SO_SNDBUF, + reinterpret_cast(&size), sizeof(size)); } } void SocketConfigUtility::configureForLowLatency(Socket& socket) { - int fd = socket.ioHandle().fd(); + os_fd_t fd = socket.ioHandle().fd(); // Disable Nagle's algorithm int nodelay = 1; - setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &nodelay, sizeof(nodelay)); + setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, + reinterpret_cast(&nodelay), sizeof(nodelay)); #ifdef __linux__ // Enable TCP_QUICKACK for lower latency on Linux @@ -135,11 +147,12 @@ void SocketConfigUtility::configureForLowLatency(Socket& socket) { } void SocketConfigUtility::configureForHighThroughput(Socket& socket) { - int fd = socket.ioHandle().fd(); + os_fd_t fd = socket.ioHandle().fd(); // Enable Nagle's algorithm for better throughput int nodelay = 0; - setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &nodelay, sizeof(nodelay)); + setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, + reinterpret_cast(&nodelay), sizeof(nodelay)); // Use larger buffers for higher throughput configureBufferSizes(socket, 512 * 1024, 512 * 1024); diff --git a/src/network/io_socket_handle_impl.cc b/src/network/io_socket_handle_impl.cc index c990a678..dd2afc9b 100644 --- a/src/network/io_socket_handle_impl.cc +++ b/src/network/io_socket_handle_impl.cc @@ -2,6 +2,7 @@ #include #include +#include #ifdef _WIN32 #include @@ -72,7 +73,13 @@ IoSocketHandleImpl::~IoSocketHandleImpl() { void IoSocketHandleImpl::setNonBlocking() { #ifdef _WIN32 u_long mode = 1; - ::ioctlsocket(fd_, FIONBIO, &mode); + int result = ::ioctlsocket(fd_, FIONBIO, &mode); + std::cerr << "[DEBUG SOCKET] IoSocketHandleImpl::setNonBlocking: fd=" << fd_ + << " result=" << result; + if (result != 0) { + std::cerr << " WSAError=" << WSAGetLastError(); + } + std::cerr << std::endl; #else int flags = ::fcntl(fd_, F_GETFL, 0); if (flags != -1) { @@ -361,7 +368,7 @@ IoCallResult IoSocketHandleImpl::recvmsg( // Set peer address msg.peer_address = - addressFromSockAddr(peer_addr, peer_addr_len, socket_v6only_); + Address::addressFromSockAddr(peer_addr, peer_addr_len, socket_v6only_); // Windows doesn't provide local address in recvfrom msg.truncated = (flags & MSG_PARTIAL) != 0; @@ -514,21 +521,63 @@ IoResult IoSocketHandleImpl::bind( return IoResult::error(EBADF); } + std::cerr << "[DEBUG SOCKET] IoSocketHandleImpl::bind() called: fd=" << fd_ + << " addr=" << (address ? address->asStringView() : "") + << std::endl; int result = ::bind(fd_, address->sockAddr(), address->sockAddrLen()); if (result == 0) { + sockaddr_storage local_addr; + socklen_t local_len = sizeof(local_addr); + int local_result = ::getsockname( + fd_, reinterpret_cast(&local_addr), &local_len); + if (local_result == 0) { + auto addr = Address::addressFromSockAddr(local_addr, local_len, + socket_v6only_); + std::cerr << "[DEBUG SOCKET] bind() local address: " + << (addr ? addr->asStringView() : "") << std::endl; + } else { + std::cerr << "[DEBUG SOCKET] bind() getsockname failed: " + << getLastSocketError() << std::endl; + } return IoResult::success(0); } else { + std::cerr << "[DEBUG SOCKET] bind() failed: error=" << getLastSocketError() + << std::endl; return IoResult::error(getLastSocketError()); } } IoResult IoSocketHandleImpl::listen(int backlog) { + std::cerr << "[DEBUG SOCKET] IoSocketHandleImpl::listen() called: fd=" << fd_ + << " backlog=" << backlog << std::endl; + if (!isOpen()) { + std::cerr << "[DEBUG SOCKET] listen() failed: socket not open" << std::endl; return IoResult::error(EBADF); } + std::cerr << "[DEBUG SOCKET] ::listen() before "; int result = ::listen(fd_, backlog); + std::cerr << "[DEBUG SOCKET] ::listen() returned: " << result; +#ifdef _WIN32 + if (result != 0) { + std::cerr << " WSAError=" << WSAGetLastError(); + } +#endif + std::cerr << std::endl; + if (result == 0) { + int accept_conn = 0; + socklen_t optlen = sizeof(accept_conn); + int opt_result = + ::getsockopt(fd_, SOL_SOCKET, SO_ACCEPTCONN, + reinterpret_cast(&accept_conn), &optlen); + if (opt_result == 0) { + std::cerr << "[DEBUG SOCKET] SO_ACCEPTCONN=" << accept_conn << std::endl; + } else { + std::cerr << "[DEBUG SOCKET] SO_ACCEPTCONN check failed: " + << getLastSocketError() << std::endl; + } return IoResult::success(0); } else { return IoResult::error(getLastSocketError()); @@ -546,8 +595,12 @@ IoResult IoSocketHandleImpl::accept() { #ifdef _WIN32 SOCKET new_fd = ::accept(fd_, reinterpret_cast(&addr), &addr_len); if (new_fd != INVALID_SOCKET) { + std::cerr << "[DEBUG SOCKET] accept() success: fd=" << new_fd << std::endl; return IoResult::success( std::make_unique(new_fd, socket_v6only_, domain_)); + } else { + std::cerr << "[DEBUG SOCKET] accept() failed: WSAError=" + << WSAGetLastError() << std::endl; } #else #ifdef __linux__ @@ -817,14 +870,22 @@ void IoSocketHandleImpl::configureInitialCongestionWindow( IoHandlePtr IoSocketHandleImpl::duplicate() { #ifdef _WIN32 WSAPROTOCOL_INFO info; + std::cerr << "[DEBUG SOCKET] IoSocketHandleImpl::duplicate() called: fd=" << fd_ << std::endl; if (::WSADuplicateSocket(fd_, ::GetCurrentProcessId(), &info) == 0) { + std::cerr << "[DEBUG SOCKET] WSADuplicateSocket() success, calling WSASocket()" << std::endl; SOCKET new_fd = ::WSASocket(FROM_PROTOCOL_INFO, FROM_PROTOCOL_INFO, FROM_PROTOCOL_INFO, &info, 0, WSA_FLAG_OVERLAPPED); + std::cerr << "[DEBUG SOCKET] WSASocket() returned: new_fd=" << new_fd + << " (INVALID_SOCKET=" << INVALID_SOCKET << ")" << std::endl; if (new_fd != INVALID_SOCKET) { return std::make_unique(new_fd, socket_v6only_, domain_); + } else { + std::cerr << "[DEBUG SOCKET] WSASocket() failed: WSAError=" << WSAGetLastError() << std::endl; } + } else { + std::cerr << "[DEBUG SOCKET] WSADuplicateSocket() failed: WSAError=" << WSAGetLastError() << std::endl; } #else int new_fd = ::dup(fd_); @@ -851,4 +912,4 @@ IoHandlePtr createIoSocketHandle(os_fd_t fd, } } // namespace network -} // namespace mcp \ No newline at end of file +} // namespace mcp diff --git a/src/network/listener_impl.cc b/src/network/listener_impl.cc index 0ce5e372..7073ad1f 100644 --- a/src/network/listener_impl.cc +++ b/src/network/listener_impl.cc @@ -1,9 +1,14 @@ #include #include -#include +#ifdef _WIN32 +#include +#include +#else +#include #include #include +#endif #include "mcp/core/result.h" #include "mcp/network/connection_impl.h" @@ -98,6 +103,9 @@ ActiveListener::ActiveListener(event::Dispatcher& dispatcher, ActiveListener::~ActiveListener() { disable(); } VoidResult ActiveListener::listen() { + std::cerr << "[DEBUG LISTENER] ActiveListener::listen() called: bind_to_port=" + << config_.bind_to_port << " address=" + << config_.address->asStringView() << std::endl; // Create socket if (config_.bind_to_port) { // Use the global createListenSocket function @@ -117,6 +125,8 @@ VoidResult ActiveListener::listen() { } socket_ = std::move(socket); + std::cerr << "[DEBUG LISTENER] listen socket created: fd=" + << socket_->ioHandle().fd() << std::endl; // Call listen() to start accepting connections auto listen_result = @@ -127,6 +137,8 @@ VoidResult ActiveListener::listen() { err.message = "Failed to listen on socket"; return makeVoidError(err); } + std::cerr << "[DEBUG LISTENER] listen() succeeded: backlog=" + << config_.backlog << std::endl; // Apply socket options if (config_.socket_options) { @@ -144,11 +156,13 @@ VoidResult ActiveListener::listen() { int val = 1; socket_->setSocketOption(SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)); - // Set SO_REUSEPORT if requested + // Set SO_REUSEPORT if requested (not available on Windows) +#ifndef _WIN32 if (config_.enable_reuse_port) { int val = 1; socket_->setSocketOption(SOL_SOCKET, SO_REUSEPORT, &val, sizeof(val)); } +#endif // createListenSocket already binds and listens if bind_to_port is true, // so we don't need to do it again @@ -160,6 +174,9 @@ VoidResult ActiveListener::listen() { [this](uint32_t events) { onSocketEvent(events); }, event::PlatformDefaultTriggerType, // Use platform-specific default static_cast(event::FileReadyType::Closed)); + std::cerr << "[DEBUG LISTENER] file_event created: " + << (file_event_ ? "SUCCESS" : "FAILED") << " fd=" + << socket_->ioHandle().fd() << std::endl; if (enabled_) { file_event_->setEnabled(static_cast(event::FileReadyType::Read)); @@ -171,6 +188,8 @@ VoidResult ActiveListener::listen() { void ActiveListener::disable() { enabled_ = false; if (file_event_) { + std::cerr << "[DEBUG LISTENER] ActiveListener::disable() fd=" + << socket_->ioHandle().fd() << std::endl; file_event_->setEnabled(0); } } @@ -178,6 +197,8 @@ void ActiveListener::disable() { void ActiveListener::enable() { enabled_ = true; if (file_event_) { + std::cerr << "[DEBUG LISTENER] ActiveListener::enable() fd=" + << socket_->ioHandle().fd() << std::endl; file_event_->setEnabled(static_cast(event::FileReadyType::Read)); } } @@ -209,6 +230,8 @@ void ActiveListener::doAccept() { reinterpret_cast(&addr), &addr_len); if (!accept_result.ok()) { + std::cerr << "[DEBUG LISTENER] accept() failed: error=" + << accept_result.error_code() << std::endl; if (accept_result.error_code() == EAGAIN || accept_result.error_code() == EWOULDBLOCK) { // No more connections to accept @@ -400,4 +423,4 @@ void ListenerManagerImpl::stopListeners() { } } // namespace network -} // namespace mcp \ No newline at end of file +} // namespace mcp diff --git a/src/network/socket_impl.cc b/src/network/socket_impl.cc index 5b0fd723..00465c6e 100644 --- a/src/network/socket_impl.cc +++ b/src/network/socket_impl.cc @@ -145,6 +145,8 @@ const IoHandle& SocketImpl::ioHandle() const { return *io_handle_; } void SocketImpl::close() { if (io_handle_) { + std::cerr << "[DEBUG SOCKET] SocketImpl::close() fd=" << io_handle_->fd() + << std::endl; io_handle_->close(); } } @@ -170,6 +172,8 @@ IoResult SocketImpl::listen(int backlog) { return IoResult::error(EBADF); } + std::cerr << "[DEBUG SOCKET] SocketImpl::listen() fd=" << io_handle_->fd() + << " backlog=" << backlog << std::endl; return io_handle_->listen(backlog); } @@ -761,4 +765,4 @@ bool applySocketOptions(Socket& socket, } } // namespace network -} // namespace mcp \ No newline at end of file +} // namespace mcp diff --git a/src/network/socket_interface_impl.cc b/src/network/socket_interface_impl.cc index d5cc9d6c..ff3a518f 100644 --- a/src/network/socket_interface_impl.cc +++ b/src/network/socket_interface_impl.cc @@ -1,14 +1,22 @@ #include "mcp/network/socket_interface_impl.h" #include +#include #include #include #ifdef _WIN32 -#include #include #include +#include #pragma comment(lib, "ws2_32.lib") +// Define O_NONBLOCK for Windows compatibility +#ifndef O_NONBLOCK +#define O_NONBLOCK 0x0004 +#endif +#ifndef FD_CLOEXEC +#define FD_CLOEXEC 1 +#endif #else #include #include @@ -118,13 +126,20 @@ IoResult SocketInterfaceImpl::socket( Address::Type addr_type, optional version, bool socket_v6only) { + std::cerr << "[DEBUG SOCKET] socket() requested: type=" << static_cast(type) + << " addr_type=" << static_cast(addr_type) + << " version=" << (version.has_value() ? static_cast(*version) : -1) + << " v6only=" << socket_v6only << std::endl; + int domain = addressTypeToDomain(addr_type, version); if (domain < 0) { + std::cerr << "[DEBUG SOCKET] socket() failed: unsupported domain" << std::endl; return IoResult::error(SOCKET_ERROR_AFNOSUPPORT); } int sock_type = socketTypeToInt(type); if (sock_type < 0) { + std::cerr << "[DEBUG SOCKET] socket() failed: invalid type" << std::endl; return IoResult::error(SOCKET_ERROR_INVAL); } @@ -135,12 +150,19 @@ IoResult SocketInterfaceImpl::socket( os_fd_t fd = ::socket(domain, sock_type, 0); if (fd == INVALID_SOCKET_FD) { + std::cerr << "[DEBUG SOCKET] ::socket() failed: error=" << getLastSocketError() + << std::endl; return IoResult::error(getLastSocketError()); } + std::cerr << "[DEBUG SOCKET] ::socket() success: fd=" << fd << std::endl; // Set non-blocking and close-on-exec on other platforms #ifndef __linux__ - setNonBlocking(fd); + int nb_result = setNonBlocking(fd); + if (nb_result != 0) { + std::cerr << "[ERROR SOCKET] Failed to set socket non-blocking: fd=" << fd << std::endl; + // Continue anyway - socket may still work + } setCloseOnExec(fd); #endif @@ -207,14 +229,20 @@ IoResult SocketInterfaceImpl::duplicate(os_fd_t fd) { #ifdef _WIN32 // Windows socket duplication is more complex WSAPROTOCOL_INFO info; + std::cerr << "[DEBUG SOCKET] SocketInterfaceImpl::duplicate() called: fd=" << fd << std::endl; if (::WSADuplicateSocket(fd, ::GetCurrentProcessId(), &info) != 0) { + std::cerr << "[DEBUG SOCKET] WSADuplicateSocket() failed: WSAError=" << WSAGetLastError() << std::endl; return IoResult::error(getLastSocketError()); } + std::cerr << "[DEBUG SOCKET] WSADuplicateSocket() success, calling WSASocket()" << std::endl; os_fd_t new_fd = ::WSASocket(FROM_PROTOCOL_INFO, FROM_PROTOCOL_INFO, FROM_PROTOCOL_INFO, &info, 0, WSA_FLAG_OVERLAPPED); + std::cerr << "[DEBUG SOCKET] WSASocket() returned: new_fd=" << new_fd + << " (INVALID_SOCKET=" << INVALID_SOCKET << ")" << std::endl; if (new_fd == INVALID_SOCKET) { + std::cerr << "[DEBUG SOCKET] WSASocket() failed: WSAError=" << WSAGetLastError() << std::endl; return IoResult::error(getLastSocketError()); } return IoResult::success(new_fd); @@ -303,18 +331,48 @@ IoResult SocketInterfaceImpl::getsockopt( IoResult SocketInterfaceImpl::bind(os_fd_t fd, const Address::Instance& addr) { + std::cerr << "[DEBUG SOCKET] bind() called: fd=" << fd + << " addr=" << addr.asStringView() << std::endl; int result = ::bind(fd, addr.sockAddr(), addr.sockAddrLen()); if (result < 0) { + std::cerr << "[DEBUG SOCKET] ::bind() failed: error=" << getLastSocketError() + << std::endl; return IoResult::error(getLastSocketError()); } + std::cerr << "[DEBUG SOCKET] ::bind() success" << std::endl; return IoResult::success(0); } IoResult SocketInterfaceImpl::listen(os_fd_t fd, int backlog) { + std::cerr << "[DEBUG SOCKET] ::listen() before 1"; + std::cerr << "[DEBUG SOCKET] SocketInterfaceImpl::listen() called: fd=" << fd + << " backlog=" << backlog << std::endl; + int result = ::listen(fd, backlog); + std::cerr << "[DEBUG SOCKET] ::listen() returned: " << result; +#ifdef _WIN32 + if (result != 0) { + std::cerr << " WSAError=" << WSAGetLastError(); + } +#endif + std::cerr << std::endl; + if (result < 0) { return IoResult::error(getLastSocketError()); } + + int accept_conn = 0; + socklen_t optlen = sizeof(accept_conn); + int opt_result = + ::getsockopt(fd, SOL_SOCKET, SO_ACCEPTCONN, + reinterpret_cast(&accept_conn), &optlen); + if (opt_result == 0) { + std::cerr << "[DEBUG SOCKET] SO_ACCEPTCONN=" << accept_conn << std::endl; + } else { + std::cerr << "[DEBUG SOCKET] SO_ACCEPTCONN check failed: " + << getLastSocketError() << std::endl; + } + return IoResult::success(0); } @@ -337,8 +395,12 @@ IoResult SocketInterfaceImpl::accept(os_fd_t fd, #else os_fd_t new_fd = ::accept(fd, addr, addrlen); if (new_fd != INVALID_SOCKET_FD) { + std::cerr << "[DEBUG SOCKET] accept() success: fd=" << new_fd << std::endl; setNonBlocking(new_fd); setCloseOnExec(new_fd); + } else { + std::cerr << "[DEBUG SOCKET] accept() failed: error=" << getLastSocketError() + << std::endl; } #endif @@ -481,15 +543,36 @@ bool SocketInterfaceImpl::supportsReusePort() const { return supports_reuse_port_; } -void SocketInterfaceImpl::setNonBlocking(os_fd_t fd) { +int SocketInterfaceImpl::setNonBlocking(os_fd_t fd) { #ifdef _WIN32 u_long mode = 1; - ::ioctlsocket(fd, FIONBIO, &mode); + int result = ::ioctlsocket(fd, FIONBIO, &mode); + std::cerr << "[DEBUG SOCKET] setNonBlocking (ioctlsocket): fd=" << fd + << " mode=" << mode << " result=" << result; + if (result != 0) { + int err = WSAGetLastError(); + std::cerr << " WSAError=" << err << " FAILED"; + } else { + std::cerr << " SUCCESS"; + } + std::cerr << std::endl; + return result; #else int flags = ::fcntl(fd, F_GETFL, 0); - if (flags >= 0) { - ::fcntl(fd, F_SETFL, flags | O_NONBLOCK); + if (flags < 0) { + std::cerr << "[DEBUG SOCKET] setNonBlocking: fcntl(F_GETFL) failed fd=" << fd + << " errno=" << errno << std::endl; + return -1; + } + int result = ::fcntl(fd, F_SETFL, flags | O_NONBLOCK); + std::cerr << "[DEBUG SOCKET] setNonBlocking: fd=" << fd << " result=" << result; + if (result < 0) { + std::cerr << " errno=" << errno << " FAILED"; + } else { + std::cerr << " SUCCESS"; } + std::cerr << std::endl; + return result; #endif } @@ -555,7 +638,7 @@ IoResult SocketInterfaceImpl::emulateSocketPairWindows(SocketType type, auto listen_result = socket(type, Address::Type::Ip, Address::IpVersion::v4, false); if (!listen_result.ok()) { - return IoResult::error(*listen_result.error_code); + return IoResult::error(listen_result.error_code()); } os_fd_t listen_fd = *listen_result; @@ -586,7 +669,7 @@ IoResult SocketInterfaceImpl::emulateSocketPairWindows(SocketType type, socket(type, Address::Type::Ip, Address::IpVersion::v4, false); if (!connect_result.ok()) { ::closesocket(listen_fd); - return IoResult::error(*connect_result.error_code); + return IoResult::error(connect_result.error_code()); } fds[0] = *connect_result; @@ -636,9 +719,9 @@ SocketInterface& socketInterface() { return *g_socket_interface; } -void setSocketInterface(SocketInterfacePtr interface) { +void setSocketInterface(SocketInterfacePtr iface) { std::lock_guard lock(g_socket_interface_mutex); - g_socket_interface = std::move(interface); + g_socket_interface = std::move(iface); } void resetSocketInterface() { @@ -673,4 +756,4 @@ SocketInterfacePtr createIoUringSocketInterface() { #endif } // namespace network -} // namespace mcp \ No newline at end of file +} // namespace mcp diff --git a/src/network/tcp_server_listener_impl.cc b/src/network/tcp_server_listener_impl.cc index 2e022d13..854ba717 100644 --- a/src/network/tcp_server_listener_impl.cc +++ b/src/network/tcp_server_listener_impl.cc @@ -4,12 +4,19 @@ */ #include -#include #include -#include +#ifdef _WIN32 +#include +#include +#include +#include +#else +#include +#include #include #include +#endif #include "mcp/network/connection_impl.h" #include "mcp/network/io_socket_handle_impl.h" @@ -132,7 +139,7 @@ TcpListenerImpl::TcpListenerImpl(event::Dispatcher& dispatcher, << std::endl; if (bind_to_port_ && socket_) { - int fd = socket_->ioHandle().fd(); + os_fd_t fd = socket_->ioHandle().fd(); std::cerr << "[DEBUG] Creating file event for listener fd: " << fd << std::endl; @@ -179,8 +186,20 @@ void TcpListenerImpl::enable() { enabled_ = true; if (file_event_) { - std::cerr << "[DEBUG] Enabling file event for listener fd: " - << socket_->ioHandle().fd() << std::endl; + os_fd_t fd = socket_->ioHandle().fd(); + std::cerr << "[DEBUG] Enabling file event for listener fd: " << fd << std::endl; + +#ifdef _WIN32 + // Debug: Test if select() can see this socket at all + fd_set read_fds; + FD_ZERO(&read_fds); + FD_SET(fd, &read_fds); // fd is already os_fd_t (SOCKET on Windows) + struct timeval tv = {0, 0}; // Non-blocking poll + int sel_result = select(0, &read_fds, NULL, NULL, &tv); + std::cerr << "[DEBUG] Manual select() test: result=" << sel_result + << " WSAError=" << WSAGetLastError() << std::endl; +#endif + file_event_->setEnabled(static_cast(event::FileReadyType::Read)); } else { std::cerr << "[ERROR] No file event to enable for listener!" << std::endl; @@ -198,8 +217,11 @@ void TcpListenerImpl::configureLoadShedPoints(LoadShedPoint& load_shed_point) { } void TcpListenerImpl::onSocketEvent(uint32_t events) { + std::cerr << "[DEBUG LISTENER] onSocketEvent called: events=" << events << std::endl; + // Only handle read events (new connections) if (!(events & static_cast(event::FileReadyType::Read))) { + std::cerr << "[DEBUG LISTENER] Not a read event, returning" << std::endl; return; } @@ -245,11 +267,29 @@ bool TcpListenerImpl::doAccept() { sockaddr_storage addr; socklen_t addr_len = sizeof(addr); + std::cerr << "[DEBUG LISTENER] Calling accept() on fd=" + << socket_->ioHandle().fd() << std::endl; + // Accept new connection - int new_fd = ::accept(socket_->ioHandle().fd(), - reinterpret_cast(&addr), &addr_len); + // On Windows, accept() returns SOCKET (uintptr_t), on Linux returns int +#ifdef _WIN32 + os_fd_t new_fd = ::accept(socket_->ioHandle().fd(), + reinterpret_cast(&addr), &addr_len); + bool accept_failed = (new_fd == INVALID_SOCKET); +#else + os_fd_t new_fd = ::accept(socket_->ioHandle().fd(), + reinterpret_cast(&addr), &addr_len); + bool accept_failed = (new_fd < 0); +#endif - if (new_fd < 0) { + std::cerr << "[DEBUG LISTENER] accept() returned: new_fd=" << new_fd + << " errno=" << errno << std::endl; + + if (accept_failed) { +#ifdef _WIN32 + std::cerr << "[DEBUG LISTENER] accept() WSAError=" << WSAGetLastError() + << std::endl; +#endif // Would block or error if (errno == EAGAIN || errno == EWOULDBLOCK) { return false; // No more connections available @@ -267,6 +307,10 @@ bool TcpListenerImpl::doAccept() { auto io_handle = std::make_unique(new_fd); // Set non-blocking mode immediately +#ifdef _WIN32 + u_long mode = 1; + ioctlsocket(new_fd, FIONBIO, &mode); +#else int flags = fcntl(new_fd, F_GETFL, 0); if (flags >= 0) { fcntl(new_fd, F_SETFL, flags | O_NONBLOCK); @@ -274,11 +318,16 @@ bool TcpListenerImpl::doAccept() { // Set close-on-exec fcntl(new_fd, F_SETFD, FD_CLOEXEC); +#endif // Create address from sockaddr auto remote_address = Address::addressFromSockAddr(addr, addr_len); if (!remote_address) { +#ifdef _WIN32 + ::closesocket(new_fd); +#else ::close(new_fd); +#endif return true; // Continue accepting } @@ -674,4 +723,4 @@ std::unique_ptr ListenerFactory::createTcpListener( } } // namespace network -} // namespace mcp \ No newline at end of file +} // namespace mcp diff --git a/src/network/transport_socket.cc b/src/network/transport_socket.cc index 333e3e9f..07edbfe1 100644 --- a/src/network/transport_socket.cc +++ b/src/network/transport_socket.cc @@ -1,9 +1,13 @@ #include "mcp/network/transport_socket.h" +#ifdef _WIN32 +#include +#include +#else #include #include - #include +#endif #include "mcp/network/socket_interface_impl.h" diff --git a/src/transport/stdio_pipe_transport.cc b/src/transport/stdio_pipe_transport.cc index 4e632a16..d5cee024 100644 --- a/src/transport/stdio_pipe_transport.cc +++ b/src/transport/stdio_pipe_transport.cc @@ -2,11 +2,42 @@ #include #include -#include #include +#include + +#ifdef _WIN32 +#include +#include +#include +#include +// Windows pipe/file operations +// Note: Using pipe_close instead of close to avoid conflict with TransportIoResult::close() +#define pipe(fds) _pipe(fds, 4096, _O_BINARY) +#define pipe_close(fd) _close(fd) +#define pipe_read(fd, buf, len) _read(fd, buf, static_cast(len)) +#define pipe_write(fd, buf, len) _write(fd, buf, static_cast(len)) +#define usleep(us) Sleep((us) / 1000) +#ifndef EAGAIN +#define EAGAIN WSAEWOULDBLOCK +#endif +#ifndef EWOULDBLOCK +#define EWOULDBLOCK WSAEWOULDBLOCK +#endif +#ifndef EPIPE +#define EPIPE ERROR_BROKEN_PIPE +#endif +#ifndef ECONNRESET +#define ECONNRESET WSAECONNRESET +#endif +#else +#include #include #include -#include +// Unix: use standard functions directly +#define pipe_close(fd) ::close(fd) +#define pipe_read(fd, buf, len) ::read(fd, buf, len) +#define pipe_write(fd, buf, len) ::write(fd, buf, len) +#endif #include "mcp/network/address.h" #include "mcp/network/address_impl.h" @@ -33,14 +64,14 @@ StdioPipeTransport::~StdioPipeTransport() { // Close write end of stdin pipe to signal EOF to the reader thread // This will wake up the bridgeStdinToPipe thread if it's blocked on write() if (stdin_to_conn_pipe_[1] != -1) { - ::close(stdin_to_conn_pipe_[1]); + pipe_close(stdin_to_conn_pipe_[1]); stdin_to_conn_pipe_[1] = -1; } // Close read end of stdout pipe to signal EOF to the writer thread // This will wake up the bridgePipeToStdout thread if it's blocked on read() if (conn_to_stdout_pipe_[0] != -1) { - ::close(conn_to_stdout_pipe_[0]); + pipe_close(conn_to_stdout_pipe_[0]); conn_to_stdout_pipe_[0] = -1; } @@ -56,18 +87,18 @@ StdioPipeTransport::~StdioPipeTransport() { // Close remaining pipe ends that weren't transferred to ConnectionSocketImpl // Only close if fd != -1 (i.e., not transferred via takePipeSocket) if (stdin_to_conn_pipe_[0] != -1) { - ::close(stdin_to_conn_pipe_[0]); + pipe_close(stdin_to_conn_pipe_[0]); stdin_to_conn_pipe_[0] = -1; } if (conn_to_stdout_pipe_[1] != -1) { - ::close(conn_to_stdout_pipe_[1]); + pipe_close(conn_to_stdout_pipe_[1]); conn_to_stdout_pipe_[1] = -1; } } VoidResult StdioPipeTransport::initialize() { // Create pipes - if (::pipe(stdin_to_conn_pipe_) == -1) { + if (pipe(stdin_to_conn_pipe_) == -1) { failure_reason_ = "Failed to create stdin pipe: "; failure_reason_ += strerror(errno); Error err; @@ -76,9 +107,9 @@ VoidResult StdioPipeTransport::initialize() { return makeVoidError(err); } - if (::pipe(conn_to_stdout_pipe_) == -1) { - ::close(stdin_to_conn_pipe_[0]); - ::close(stdin_to_conn_pipe_[1]); + if (pipe(conn_to_stdout_pipe_) == -1) { + pipe_close(stdin_to_conn_pipe_[0]); + pipe_close(stdin_to_conn_pipe_[1]); failure_reason_ = "Failed to create stdout pipe: "; failure_reason_ += strerror(errno); Error err; @@ -101,10 +132,10 @@ VoidResult StdioPipeTransport::initialize() { // Verify the handle was created successfully if (!io_handle || !io_handle->isOpen()) { - ::close(stdin_to_conn_pipe_[0]); - ::close(stdin_to_conn_pipe_[1]); - ::close(conn_to_stdout_pipe_[0]); - ::close(conn_to_stdout_pipe_[1]); + pipe_close(stdin_to_conn_pipe_[0]); + pipe_close(stdin_to_conn_pipe_[1]); + pipe_close(conn_to_stdout_pipe_[0]); + pipe_close(conn_to_stdout_pipe_[1]); failure_reason_ = "Failed to create IO handle for pipes"; Error err; err.code = -1; @@ -113,10 +144,19 @@ VoidResult StdioPipeTransport::initialize() { } // Create pipe addresses +#ifdef _WIN32 + // On Windows, use IPv4 loopback addresses as placeholders since PipeInstance + // (Unix domain sockets) is not available + auto local_address = + std::make_shared("127.0.0.1", 0); + auto remote_address = + std::make_shared("127.0.0.1", 0); +#else auto local_address = std::make_shared("/tmp/mcp_stdio_in"); auto remote_address = std::make_shared("/tmp/mcp_stdio_out"); +#endif // Create the connection socket that ConnectionImpl will use // IMPORTANT: The io_handle takes ownership of stdin_to_conn_pipe_[0] and @@ -201,7 +241,7 @@ void StdioPipeTransport::closeSocket(network::ConnectionEvent event) { // Close write end of stdin pipe to signal EOF to ConnectionImpl if (stdin_to_conn_pipe_[1] != -1) { - ::close(stdin_to_conn_pipe_[1]); + pipe_close(stdin_to_conn_pipe_[1]); stdin_to_conn_pipe_[1] = -1; } @@ -358,6 +398,69 @@ void StdioPipeTransport::bridgeStdinToPipe(int stdin_fd, std::vector buffer(config_.buffer_size); +#ifdef _WIN32 + // Windows version: Use WaitForSingleObject with stdin handle + // Windows select() only works with sockets, not file descriptors + HANDLE stdin_handle = GetStdHandle(STD_INPUT_HANDLE); + + while (*running) { + // Wait for input with 100ms timeout + DWORD wait_result = WaitForSingleObject(stdin_handle, 100); + + if (wait_result == WAIT_TIMEOUT) { + // Timeout - no data available, loop back to check *running + continue; + } + + if (wait_result != WAIT_OBJECT_0) { + // Error waiting + failure_reason_ = "Error waiting for stdin"; + break; + } + + // Data might be available, try to read + // Use CRT _read for consistency with _pipe + int bytes_read = pipe_read(stdin_fd, buffer.data(), static_cast(buffer.size())); + + if (bytes_read > 0) { + // Write all data to the pipe + size_t total_written = 0; + while (total_written < static_cast(bytes_read) && *running) { + int bytes_written = + pipe_write(write_pipe_fd, buffer.data() + total_written, + static_cast(bytes_read - total_written)); + + if (bytes_written > 0) { + total_written += bytes_written; + } else if (bytes_written == -1) { + int err = errno; + if (err != EAGAIN && err != EWOULDBLOCK) { + // Error writing to pipe + failure_reason_ = "Error writing to stdin pipe: "; + failure_reason_ += strerror(err); + *running = false; + break; + } + // Otherwise, retry + Sleep(1); // Sleep 1ms + } + } + } else if (bytes_read == 0) { + // EOF on stdin + break; + } else { + // Error reading from stdin + int err = errno; + if (err != EAGAIN && err != EWOULDBLOCK) { + failure_reason_ = "Error reading from stdin: "; + failure_reason_ += strerror(err); + break; + } + // Otherwise, retry + } + } +#else + // Unix version: Use select() with timeout while (*running) { // Use select() with 100ms timeout to wait for data while remaining responsive fd_set readfds; @@ -368,7 +471,7 @@ void StdioPipeTransport::bridgeStdinToPipe(int stdin_fd, tv.tv_sec = 0; tv.tv_usec = 100000; // 100ms timeout - int select_result = ::select(stdin_fd + 1, &readfds, nullptr, nullptr, &tv); + int select_result = select(stdin_fd + 1, &readfds, nullptr, nullptr, &tv); if (select_result < 0) { // select() error @@ -388,15 +491,15 @@ void StdioPipeTransport::bridgeStdinToPipe(int stdin_fd, } // Data is available, read it - ssize_t bytes_read = ::read(stdin_fd, buffer.data(), buffer.size()); + ssize_t bytes_read = pipe_read(stdin_fd, buffer.data(), buffer.size()); if (bytes_read > 0) { // Write all data to the pipe size_t total_written = 0; while (total_written < static_cast(bytes_read) && *running) { ssize_t bytes_written = - ::write(write_pipe_fd, buffer.data() + total_written, - bytes_read - total_written); + pipe_write(write_pipe_fd, buffer.data() + total_written, + bytes_read - total_written); if (bytes_written > 0) { total_written += bytes_written; @@ -427,10 +530,11 @@ void StdioPipeTransport::bridgeStdinToPipe(int stdin_fd, // Otherwise, retry via select() timeout } } +#endif // Close write end of pipe to signal EOF if (write_pipe_fd != -1) { - ::close(write_pipe_fd); + pipe_close(write_pipe_fd); // Note: Don't update member variable since it may be accessed elsewhere } } @@ -447,6 +551,61 @@ void StdioPipeTransport::bridgePipeToStdout(int read_pipe_fd, std::vector buffer(config_.buffer_size); +#ifdef _WIN32 + // Windows version: Use polling with timeout since select() doesn't work + // with pipe file descriptors on Windows + while (*running) { + // Try to read from pipe (may block briefly) + // On Windows, _read on pipes will block if no data is available + // We use smaller reads with Sleep to remain responsive + int bytes_read = pipe_read(read_pipe_fd, buffer.data(), static_cast(buffer.size())); + + if (bytes_read > 0) { + // Write all data to stdout + size_t total_written = 0; + while (total_written < static_cast(bytes_read) && *running) { + int bytes_written = + pipe_write(stdout_fd, buffer.data() + total_written, + static_cast(bytes_read - total_written)); + + if (bytes_written > 0) { + total_written += bytes_written; + + // Flush stdout after each write to ensure data is sent immediately + // This is critical for tests to receive the data + if (stdout_fd == 1) { + fflush(stdout); + } + } else if (bytes_written == -1) { + int err = errno; + if (err != EAGAIN && err != EWOULDBLOCK) { + // Error writing to stdout + failure_reason_ = "Error writing to stdout: "; + failure_reason_ += strerror(err); + *running = false; + break; + } + // Otherwise, retry + Sleep(1); // Sleep 1ms + } + } + } else if (bytes_read == 0) { + // EOF on pipe (ConnectionImpl closed) + break; + } else { + // Error reading from pipe + int err = errno; + if (err != EAGAIN && err != EWOULDBLOCK) { + failure_reason_ = "Error reading from stdout pipe: "; + failure_reason_ += strerror(err); + break; + } + // No data available, sleep briefly + Sleep(10); // Sleep 10ms + } + } +#else + // Unix version: Use select() with timeout while (*running) { // Use select() with 100ms timeout to wait for data while remaining responsive fd_set readfds; @@ -457,7 +616,7 @@ void StdioPipeTransport::bridgePipeToStdout(int read_pipe_fd, tv.tv_sec = 0; tv.tv_usec = 100000; // 100ms timeout - int select_result = ::select(read_pipe_fd + 1, &readfds, nullptr, nullptr, &tv); + int select_result = select(read_pipe_fd + 1, &readfds, nullptr, nullptr, &tv); if (select_result < 0) { // select() error @@ -477,15 +636,15 @@ void StdioPipeTransport::bridgePipeToStdout(int read_pipe_fd, } // Data is available, read it - ssize_t bytes_read = ::read(read_pipe_fd, buffer.data(), buffer.size()); + ssize_t bytes_read = pipe_read(read_pipe_fd, buffer.data(), buffer.size()); if (bytes_read > 0) { // Write all data to stdout size_t total_written = 0; while (total_written < static_cast(bytes_read) && *running) { ssize_t bytes_written = - ::write(stdout_fd, buffer.data() + total_written, - bytes_read - total_written); + pipe_write(stdout_fd, buffer.data() + total_written, + bytes_read - total_written); if (bytes_written > 0) { total_written += bytes_written; @@ -522,15 +681,23 @@ void StdioPipeTransport::bridgePipeToStdout(int read_pipe_fd, // Otherwise, retry via select() timeout } } +#endif // Close read end of pipe if (read_pipe_fd != -1) { - ::close(read_pipe_fd); + pipe_close(read_pipe_fd); // Note: Don't update member variable since it may be accessed elsewhere } } void StdioPipeTransport::setNonBlocking(int fd) { +#ifdef _WIN32 + // On Windows, _pipe() creates CRT file descriptors that don't support + // non-blocking mode via ioctlsocket. For pipe I/O, we rely on the + // bridge threads to handle blocking reads/writes. + // This is a no-op on Windows for pipes. + (void)fd; +#else int flags = fcntl(fd, F_GETFL, 0); if (flags == -1) { failure_reason_ = "Failed to get file descriptor flags"; @@ -540,6 +707,7 @@ void StdioPipeTransport::setNonBlocking(int fd) { if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) { failure_reason_ = "Failed to set non-blocking mode"; } +#endif } // StdioPipeTransportFactory implementation diff --git a/src/transport/stdio_transport_socket.cc b/src/transport/stdio_transport_socket.cc index 706a19a5..e4337d75 100644 --- a/src/transport/stdio_transport_socket.cc +++ b/src/transport/stdio_transport_socket.cc @@ -1,8 +1,14 @@ #include "mcp/transport/stdio_transport_socket.h" #include + +#ifdef _WIN32 +#include +#include +#else #include #include +#endif #include "mcp/buffer.h" @@ -107,6 +113,12 @@ void StdioTransportSocket::onConnected() { } void StdioTransportSocket::setNonBlocking(int fd) { +#ifdef _WIN32 + u_long mode = 1; + if (ioctlsocket(fd, FIONBIO, &mode) != 0) { + failure_reason_ = "Failed to set non-blocking mode"; + } +#else int flags = fcntl(fd, F_GETFL, 0); if (flags == -1) { failure_reason_ = "Failed to get file descriptor flags"; @@ -116,6 +128,7 @@ void StdioTransportSocket::setNonBlocking(int fd) { if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) { failure_reason_ = "Failed to set non-blocking mode"; } +#endif } TransportIoResult StdioTransportSocket::performRead(Buffer& buffer) { diff --git a/src/transport/tcp_transport_socket.cc b/src/transport/tcp_transport_socket.cc index ccf77fef..b33cf697 100644 --- a/src/transport/tcp_transport_socket.cc +++ b/src/transport/tcp_transport_socket.cc @@ -6,10 +6,24 @@ #include "mcp/transport/tcp_transport_socket.h" #include -#include +#ifdef _WIN32 +#include +#include +#include +// Windows uses SD_SEND instead of SHUT_WR +#ifndef SHUT_WR +#define SHUT_WR SD_SEND +#endif +// MSG_NOSIGNAL is Linux-specific (prevents SIGPIPE), not needed on Windows +#ifndef MSG_NOSIGNAL +#define MSG_NOSIGNAL 0 +#endif +#else +#include #include #include +#endif #include "mcp/buffer.h" #include "mcp/network/socket.h" @@ -125,7 +139,10 @@ network::TransportIoResult TcpTransportSocket::doRead(Buffer& buffer) { } // Perform the actual read - ssize_t bytes_read = ::recv(io_handle.fd(), slice.mem_, slice.len_, 0); + // Note: Windows recv() expects char* buffer, POSIX expects void* + ssize_t bytes_read = ::recv(io_handle.fd(), + static_cast(slice.mem_), + static_cast(slice.len_), 0); if (bytes_read > 0) { // Successful read @@ -230,8 +247,10 @@ network::TransportIoResult TcpTransportSocket::doWrite(Buffer& buffer, const uint8_t* data = static_cast(slice.mem_); while (remaining > 0) { + // Note: Windows send() expects const char* buffer, POSIX expects const void* ssize_t bytes_written = - ::send(io_handle.fd(), data, remaining, MSG_NOSIGNAL); + ::send(io_handle.fd(), reinterpret_cast(data), + static_cast(remaining), MSG_NOSIGNAL); if (bytes_written > 0) { total_written += bytes_written; diff --git a/src/transport/tcp_transport_socket_state_machine.cc b/src/transport/tcp_transport_socket_state_machine.cc index 2ba49193..50ad6f3d 100644 --- a/src/transport/tcp_transport_socket_state_machine.cc +++ b/src/transport/tcp_transport_socket_state_machine.cc @@ -5,8 +5,13 @@ #include "mcp/transport/tcp_transport_socket_state_machine.h" +#ifdef _WIN32 +#include +#include +#else #include #include +#endif #include "mcp/buffer.h"