diff --git a/.github/workflows/push.yml b/.github/workflows/push.yml index 33b9ac8..89ee4b0 100644 --- a/.github/workflows/push.yml +++ b/.github/workflows/push.yml @@ -7,25 +7,47 @@ on: types: [checks_requested] jobs: + tests-ubuntu: + uses: ./.github/workflows/test.yml + strategy: + fail-fast: false + matrix: + javaVersion: ['11', '17', '21'] + with: + os: linux-ubuntu-latest + javaVersion: ${{ matrix.javaVersion }} + + tests-windows: + uses: ./.github/workflows/test.yml + strategy: + fail-fast: false + matrix: + javaVersion: ['11', '17', '21'] + with: + os: windows-server-latest + javaVersion: ${{ matrix.javaVersion }} + fmt: runs-on: group: databricks-protected-runner-group labels: linux-ubuntu-latest - steps: - - name: Set up JDK 11 - uses: actions/setup-java@v1 - with: - java-version: 11 + steps: - name: Checkout uses: actions/checkout@v4 - - name: Cache Maven packages - uses: actions/cache@v4 + - name: Set up JDK 11 + uses: actions/setup-java@v4 with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2 + java-version: 11 + distribution: 'temurin' + cache: 'maven' - name: Check formatting - run: mvn --errors spotless:check + run: mvn spotless:check + + - name: Fail on formatting differences + if: failure() + run: | + echo "Code formatting issues detected. Run 'mvn spotless:apply' to fix." + exit 1 diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..4c1a263 --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,48 @@ +name: Test Workflow + +on: + workflow_call: + inputs: + os: + required: true + type: string + javaVersion: + required: true + type: string + +jobs: + test: + strategy: + fail-fast: false + runs-on: + group: databricks-protected-runner-group + labels: ${{ inputs.os }} + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Unshallow + run: git fetch --prune --unshallow + + - name: Set up JDK ${{ inputs.javaVersion }} + uses: actions/setup-java@v4 + with: + java-version: ${{ inputs.javaVersion }} + distribution: 'temurin' + cache: 'maven' + + - name: Build with Maven + run: mvn clean compile + + - name: Run tests + run: mvn test + + - name: Upload test results + if: always() + uses: actions/upload-artifact@v4 + with: + name: test-results-${{ inputs.os }}-java-${{ inputs.javaVersion }} + path: | + target/surefire-reports/ + target/test-classes/ + retention-days: 7 diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..450ad94 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,17 @@ +# Version changelog + +## Release v0.1.0 + +Initial release of the Databricks Zerobus Ingest SDK for Java. + +### API Changes + +- Added `ZerobusSdk` class for creating ingestion streams +- Added `ZerobusStream` class for managing stateful gRPC streams +- Added `RecordAcknowledgment` for blocking until record acknowledgment +- Added `TableProperties` for configuring table schema and name +- Added `StreamConfigurationOptions` for stream behavior configuration +- Added `ZerobusException` and `NonRetriableException` for error handling +- Added `StreamState` enum for tracking stream lifecycle +- Added utility methods in `ZerobusSdkStubUtils` for gRPC stub management +- Support for Java 8 and higher diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 6813a4e..8972e1b 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -1,6 +1,6 @@ -# Contributing to Zerobus SDK for Python +# Contributing to Zerobus SDK for Java -We happily welcome contributions to the Zerobus SDK for Python. We use [GitHub Issues](https://github.com/databricks/zerobus-sdk-py/issues) to track community reported issues and [GitHub Pull Requests](https://github.com/databricks/zerobus-sdk-py/pulls) for accepting changes. +We happily welcome contributions to the Zerobus SDK for Java. We use [GitHub Issues](https://github.com/databricks/zerobus-sdk-java/issues) to track community reported issues and [GitHub Pull Requests](https://github.com/databricks/zerobus-sdk-java/pulls) for accepting changes. Contributions are licensed on a license-in/license-out basis. @@ -20,61 +20,60 @@ Small patches and bug fixes don't need prior communication. ### Prerequisites -- Python 3.7 or higher +- **Java**: 8 or higher - [Download Java](https://adoptium.net/) +- **Maven**: 3.6 or higher - [Download Maven](https://maven.apache.org/download.cgi) +- **Protocol Buffers Compiler** (`protoc`): 24.4 - [Download protoc](https://github.com/protocolbuffers/protobuf/releases/tag/v24.4) - Git -- pip ### Setting Up Your Development Environment 1. **Clone the repository:** ```bash - git clone https://github.com/databricks/zerobus-sdk-py.git - cd zerobus-sdk-py + git clone https://github.com/databricks/zerobus-sdk-java.git + cd zerobus-sdk-java ``` -2. **Create and activate a virtual environment:** +2. **Build the project:** ```bash - make dev + mvn clean install ``` This will: - - Create a virtual environment in `.venv` - - Install the package in development mode with all dev dependencies + - Generate protobuf Java classes + - Compile the source code + - Run tests + - Install the artifact to your local Maven repository -3. **Activate the virtual environment:** +3. **Run tests:** ```bash - source .venv/bin/activate # On Windows: .venv\Scripts\activate + mvn test ``` ## Coding Style -Code style is enforced by a formatter check in your pull request. We use [Black](https://github.com/psf/black) to format our code. Run `make fmt` to ensure your code is properly formatted prior to raising a pull request. +Code style is enforced by [Spotless](https://github.com/diffplug/spotless) in your pull request. We use Google Java Format for code formatting. ### Running the Formatter Format your code before committing: ```bash -make fmt +mvn spotless:apply ``` -This runs: -- **Black**: Code formatting -- **autoflake**: Remove unused imports -- **isort**: Sort imports +This will format: +- **Java code**: Using Google Java Format +- **Imports**: Organized and unused imports removed +- **pom.xml**: Sorted dependencies and plugins -### Running Linters +### Checking Formatting -Check your code for issues: +Check if your code is properly formatted: ```bash -make lint +mvn spotless:check ``` -This runs: -- **pycodestyle**: Style guide enforcement -- **autoflake**: Check for unused imports - ## Pull Request Process 1. **Create a feature branch:** @@ -85,25 +84,33 @@ This runs: 2. **Make your changes:** - Write clear, concise commit messages - Follow existing code style + - Add tests for new functionality - Update documentation as needed 3. **Format your code:** ```bash - make fmt + mvn spotless:apply + ``` + +4. **Run tests:** + ```bash + mvn test ``` -4. **Commit your changes:** +5. **Commit your changes:** ```bash git add . - git commit -m "Add feature: description of your changes" + git commit -s -m "Add feature: description of your changes" ``` -5. **Push to your fork:** + Note: The `-s` flag signs your commit (required - see below). + +6. **Push to your fork:** ```bash git push origin feature/your-feature-name ``` -6. **Create a Pull Request:** +7. **Create a Pull Request:** - Provide a clear description of changes - Reference any related issues - Ensure all CI checks pass @@ -130,10 +137,11 @@ git commit -s -m "Your commit message" When reviewing code: -- Check for adherence to code style +- Check for adherence to code style (Google Java Format) - Look for potential edge cases - Consider performance implications - Ensure documentation is updated +- Verify tests cover new functionality ## Commit Message Guidelines @@ -148,8 +156,8 @@ Example: ``` Add async stream creation example -- Add async_example.py demonstrating non-blocking ingestion -- Update README with async API documentation +- Add BlockingIngestionExample.java demonstrating synchronous ingestion +- Update README with blocking API documentation Fixes #42 ``` @@ -158,58 +166,109 @@ Fixes #42 ### Updating Documentation -- Update docstrings for all public APIs -- Use Google-style docstrings -- Include examples in docstrings where helpful +- Add Javadoc for all public APIs +- Use standard Javadoc format +- Include `@param`, `@return`, `@throws` tags - Update README.md for user-facing changes - Update examples/ for new features -Example docstring: -```python -def ingest_record(self, record) -> RecordAcknowledgment: - """ - Submits a single record for ingestion into the stream. +Example Javadoc: +```java +/** + * Ingests a single record into the stream. + * + *

Returns a CompletableFuture that completes when the record is durably written to storage. + * This method may block if the maximum number of in-flight records has been reached. + * + * @param record The protobuf message to ingest + * @return A CompletableFuture that completes when the record is acknowledged + * @throws ZerobusException if the stream is not in a valid state for ingestion + */ +public CompletableFuture ingestRecord(RecordType record) throws ZerobusException { + // ... +} +``` + +## Testing + +### Writing Tests + +- Add unit tests for all new functionality +- Use JUnit 5 for test framework +- Use Mockito for mocking +- Tests should be fast (< 1 second per test) +- Use descriptive test names + +Example test: +```java +@Test +public void testSingleRecordIngestAndAcknowledgment() throws Exception { + // Given + ZerobusStream stream = createTestStream(); + + // When + CompletableFuture result = stream.ingestRecord(testMessage); - This method may block if the maximum number of in-flight records - has been reached. + // Then + result.get(5, TimeUnit.SECONDS); + assertEquals(StreamState.OPENED, stream.getState()); +} +``` - Args: - record: The Protobuf message object to be ingested. +### Running Tests - Returns: - RecordAcknowledgment: An object to wait on for the server's acknowledgment. +```bash +# Run all tests +mvn test - Raises: - ZerobusException: If the stream is not in a valid state for ingestion. +# Run specific test class +mvn test -Dtest=ZerobusSdkTest - Example: - >>> record = AirQuality(device_name="sensor-1", temp=25) - >>> ack = stream.ingest_record(record) - >>> ack.wait_for_ack() - """ +# Run specific test method +mvn test -Dtest=ZerobusSdkTest#testSingleRecordIngestAndAcknowledgment ``` ## Continuous Integration All pull requests must pass CI checks: -- **fmt**: Runs formatting checks (black, autoflake, isort) +- **Build**: `mvn clean compile` +- **Tests**: `mvn test` (on Java 11, 17, 21) +- **Formatting**: `mvn spotless:check` -The formatting check runs `make dev fmt` and then checks for any git differences. If there are differences, the check will fail. +Tests run on both Ubuntu and Windows runners. You can view CI results in the GitHub Actions tab of the pull request. -## Makefile Targets +## Maven Commands + +Useful Maven commands: + +```bash +# Clean build +mvn clean + +# Compile code +mvn compile -Available make targets: +# Run tests +mvn test -- `make dev` - Set up development environment -- `make install` - Install package -- `make build` - Build wheel package -- `make fmt` - Format code with black, autoflake, and isort -- `make lint` - Run linting with pycodestyle -- `make clean` - Remove build artifacts -- `make help` - Show available targets +# Format code +mvn spotless:apply + +# Check formatting +mvn spotless:check + +# Create JARs (regular + fat JAR) +mvn package + +# Install to local Maven repo +mvn install + +# Generate protobuf classes +mvn protobuf:compile +``` ## Versioning @@ -227,11 +286,13 @@ We follow [Semantic Versioning](https://semver.org/): ## Package Name -The package is published on PyPI as `databricks-zerobus-ingest-sdk`. +The package is published on Maven Central as: +- **Group ID**: `com.databricks` +- **Artifact ID**: `zerobus-ingest-sdk` ## Code of Conduct - Be respectful and inclusive - Welcome newcomers - Focus on constructive feedback -- Follow the [Python Community Code of Conduct](https://www.python.org/psf/conduct/) +- Follow professional open source etiquette diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..93a5b5f --- /dev/null +++ b/LICENSE @@ -0,0 +1,69 @@ + Databricks License + Copyright (2022) Databricks, Inc. + + Definitions. + + Agreement: The agreement between Databricks, Inc., and you governing + the use of the Databricks Services, as that term is defined in + the Master Cloud Services Agreement (MCSA) located at + www.databricks.com/legal/mcsa. + + Licensed Materials: The source code, object code, data, and/or other + works to which this license applies. + + Scope of Use. You may not use the Licensed Materials except in + connection with your use of the Databricks Services pursuant to + the Agreement. Your use of the Licensed Materials must comply at all + times with any restrictions applicable to the Databricks Services, + generally, and must be used in accordance with any applicable + documentation. You may view, use, copy, modify, publish, and/or + distribute the Licensed Materials solely for the purposes of using + the Licensed Materials within or connecting to the Databricks Services. + If you do not agree to these terms, you may not view, use, copy, + modify, publish, and/or distribute the Licensed Materials. + + Redistribution. You may redistribute and sublicense the Licensed + Materials so long as all use is in compliance with these terms. + In addition: + + - You must give any other recipients a copy of this License; + - You must cause any modified files to carry prominent notices + stating that you changed the files; + - You must retain, in any derivative works that you distribute, + all copyright, patent, trademark, and attribution notices, + excluding those notices that do not pertain to any part of + the derivative works; and + - If a "NOTICE" text file is provided as part of its + distribution, then any derivative works that you distribute + must include a readable copy of the attribution notices + contained within such NOTICE file, excluding those notices + that do not pertain to any part of the derivative works. + + You may add your own copyright statement to your modifications and may + provide additional license terms and conditions for use, reproduction, + or distribution of your modifications, or for any such derivative works + as a whole, provided your use, reproduction, and distribution of + the Licensed Materials otherwise complies with the conditions stated + in this License. + + Termination. This license terminates automatically upon your breach of + these terms or upon the termination of your Agreement. Additionally, + Databricks may terminate this license at any time on notice. Upon + termination, you must permanently delete the Licensed Materials and + all copies thereof. + + DISCLAIMER; LIMITATION OF LIABILITY. + + THE LICENSED MATERIALS ARE PROVIDED “AS-IS” AND WITH ALL FAULTS. + DATABRICKS, ON BEHALF OF ITSELF AND ITS LICENSORS, SPECIFICALLY + DISCLAIMS ALL WARRANTIES RELATING TO THE LICENSED MATERIALS, EXPRESS + AND IMPLIED, INCLUDING, WITHOUT LIMITATION, IMPLIED WARRANTIES, + CONDITIONS AND OTHER TERMS OF MERCHANTABILITY, SATISFACTORY QUALITY OR + FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT. DATABRICKS AND + ITS LICENSORS TOTAL AGGREGATE LIABILITY RELATING TO OR ARISING OUT OF + YOUR USE OF OR DATABRICKS’ PROVISIONING OF THE LICENSED MATERIALS SHALL + BE LIMITED TO ONE THOUSAND ($1,000) DOLLARS. IN NO EVENT SHALL + THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR + OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, + ARISING FROM, OUT OF OR IN CONNECTION WITH THE LICENSED MATERIALS OR + THE USE OR OTHER DEALINGS IN THE LICENSED MATERIALS. \ No newline at end of file diff --git a/NEXT_CHANGELOG.md b/NEXT_CHANGELOG.md new file mode 100644 index 0000000..1b000f6 --- /dev/null +++ b/NEXT_CHANGELOG.md @@ -0,0 +1,13 @@ +# NEXT CHANGELOG + +## Release v0.2.0 + +### New Features and Improvements + +### Bug Fixes + +### Documentation + +### Internal Changes + +### API Changes diff --git a/NOTICE b/NOTICE new file mode 100644 index 0000000..1ec04b5 --- /dev/null +++ b/NOTICE @@ -0,0 +1,34 @@ +Copyright (2025) Databricks, Inc. + +This Software includes software developed at Databricks (https://www.databricks.com/) and its use is subject to the included LICENSE file. + +--- + +This Software includes the following open source components: + +## Apache License 2.0 + +### Protocol Buffers (protobuf-java) +Copyright 2008 Google Inc. +https://github.com/protocolbuffers/protobuf +License: https://github.com/protocolbuffers/protobuf/blob/main/LICENSE + +### gRPC Java +Copyright 2014 gRPC authors +https://github.com/grpc/grpc-java +License: https://github.com/grpc/grpc-java/blob/master/LICENSE + +### Netty +Copyright 2014 The Netty Project +https://github.com/netty/netty +License: https://github.com/netty/netty/blob/4.1/LICENSE.txt + +### SLF4J +Copyright (c) 2004-2017 QOS.ch +https://github.com/qos-ch/slf4j +License: https://github.com/qos-ch/slf4j/blob/master/LICENSE.txt + +## CDDL + GPLv2 with classpath exception + +### javax.annotation-api +https://github.com/javaee/javax.annotation diff --git a/README.md b/README.md index ac11dd7..be3bcc3 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,30 @@ # Databricks Zerobus Ingest SDK for Java -The Databricks Zerobus Ingest SDK for Java provides a high-performance client for ingesting data directly into Databricks Delta tables using the Zerobus streaming protocol. +[Public Preview](https://docs.databricks.com/release-notes/release-types.html): This SDK is supported for production use cases and is available to all customers. Databricks is actively working on stabilizing the Zerobus Ingest SDK for Java. Minor version updates may include backwards-incompatible changes. + +We are keen to hear feedback from you on this SDK. Please [file issues](https://github.com/databricks/zerobus-sdk-java/issues), and we will address them. + +The Databricks Zerobus Ingest SDK for Java provides a high-performance client for ingesting data directly into Databricks Delta tables using the Zerobus streaming protocol. | See also the [SDK for Rust](https://github.com/databricks/zerobus-sdk-rs) | See also the [SDK for Python](https://github.com/databricks/zerobus-sdk-py) + +## Table of Contents + +- [Features](#features) +- [Requirements](#requirements) +- [Quick Start User Guide](#quick-start-user-guide) + - [Prerequisites](#prerequisites) + - [Building Your Application](#building-your-application) + - [Define Your Protocol Buffer Schema](#define-your-protocol-buffer-schema) + - [Generate Protocol Buffer Schema from Unity Catalog (Alternative)](#generate-protocol-buffer-schema-from-unity-catalog-alternative) + - [Write Your Client Code](#write-your-client-code) + - [Compile and Run](#compile-and-run) +- [Usage Examples](#usage-examples) + - [Blocking Ingestion](#blocking-ingestion) + - [Non-Blocking Ingestion](#non-blocking-ingestion) +- [Configuration](#configuration) +- [Logging](#logging) +- [Error Handling](#error-handling) +- [API Reference](#api-reference) +- [Best Practices](#best-practices) ## Features @@ -21,7 +45,6 @@ The Databricks Zerobus Ingest SDK for Java provides a high-performance client fo **When using the fat JAR** (recommended for most users): - No additional dependencies required - all dependencies are bundled -- Includes `slf4j-simple` for logging out of the box **When using the regular JAR**: - [`protobuf-java` 3.24.0](https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java/3.24.0) @@ -30,7 +53,7 @@ The Databricks Zerobus Ingest SDK for Java provides a high-performance client fo - [`grpc-stub` 1.58.0](https://mvnrepository.com/artifact/io.grpc/grpc-stub/1.58.0) - [`javax.annotation-api` 1.3.2](https://mvnrepository.com/artifact/javax.annotation/javax.annotation-api/1.3.2) - [`slf4j-api` 1.7.36](https://mvnrepository.com/artifact/org.slf4j/slf4j-api/1.7.36) -- [`slf4j-simple` 1.7.36](https://mvnrepository.com/artifact/org.slf4j/slf4j-simple/1.7.36) (or substitute your own SLF4J implementation like [`logback-classic` 1.2.11](https://mvnrepository.com/artifact/ch.qos.logback/logback-classic/1.2.11)) +- An SLF4J implementation such as [`slf4j-simple` 1.7.36](https://mvnrepository.com/artifact/org.slf4j/slf4j-simple/1.7.36) or [`logback-classic` 1.2.11](https://mvnrepository.com/artifact/ch.qos.logback/logback-classic/1.2.11) ### Build Requirements (only for building from source) @@ -176,7 +199,7 @@ message AirQuality { } ``` -Compile the protobuf: +**Compile the protobuf:** ```bash protoc --java_out=src/main/java src/main/proto/record.proto @@ -184,6 +207,95 @@ protoc --java_out=src/main/java src/main/proto/record.proto This generates `src/main/java/com/example/proto/Record.java`. +### Generate Protocol Buffer Schema from Unity Catalog (Alternative) + +Instead of manually writing and compiling your protobuf schema, you can automatically generate it from an existing Unity Catalog table schema using the included `GenerateProto` tool. + +#### Using the Proto Generation Tool + +The `GenerateProto` tool fetches your table schema from Unity Catalog and generates a corresponding proto2 definition file with the correct type mappings. + +**Basic Usage:** + +```bash +java -jar lib/databricks-zerobus-ingest-sdk-0.1.0-jar-with-dependencies.jar \ + --uc-endpoint "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com" \ + --client-id "your-service-principal-application-id" \ + --client-secret "your-service-principal-secret" \ + --table "main.default.air_quality" \ + --output "src/main/proto/record.proto" \ + --proto-msg "AirQuality" +``` + +**Parameters:** +- `--uc-endpoint`: Your workspace URL (e.g., `https://dbc-a1b2c3d4-e5f6.cloud.databricks.com`) +- `--client-id`: Service principal application ID +- `--client-secret`: Service principal secret +- `--table`: Fully qualified table name (catalog.schema.table) +- `--output`: Output path for the generated proto file +- `--proto-msg`: (Optional) Name for the protobuf message (defaults to table name) + +**Example:** + +For a table defined as: +```sql +CREATE TABLE main.default.air_quality ( + device_name STRING, + temp INT, + humidity BIGINT +) +USING DELTA; +``` + +Running the generation tool will create `src/main/proto/record.proto`: +```protobuf +syntax = "proto2"; + +package com.example; + +option java_package = "com.example.proto"; +option java_outer_classname = "Record"; + +message AirQuality { + optional string device_name = 1; + optional int32 temp = 2; + optional int64 humidity = 3; +} +``` + +After generating the proto file, compile it as shown above: +```bash +protoc --java_out=src/main/java src/main/proto/record.proto +``` + +**Type Mappings:** + +The tool automatically maps Unity Catalog types to proto2 types: + +| Delta Type | Proto2 Type | +|-----------|-------------| +| INT, SMALLINT, SHORT | int32 | +| BIGINT, LONG | int64 | +| FLOAT | float | +| DOUBLE | double | +| STRING, VARCHAR | string | +| BOOLEAN | bool | +| BINARY | bytes | +| DATE | int32 | +| TIMESTAMP | int64 | +| ARRAY\ | repeated type | +| MAP\ | map\ | +| STRUCT\ | nested message | + +**Benefits:** +- No manual schema creation required +- Ensures schema consistency between your table and protobuf definitions +- Automatically handles complex types (arrays, maps, structs) +- Reduces errors from manual type mapping +- No need to clone the repository - runs directly from the SDK JAR + +For detailed documentation and examples, see [tools/README.md](tools/README.md). + #### 4. Write Your Client Code Create `src/main/java/com/example/ZerobusClient.java`: @@ -353,24 +465,40 @@ try { ## Logging -The Databricks Zerobus Ingest SDK for Java uses the standard [SLF4J logging framework](https://www.slf4j.org/) and includes `slf4j-simple` by default, so **logging works out of the box** with no additional configuration required. +The Databricks Zerobus Ingest SDK for Java uses the standard [SLF4J logging framework](https://www.slf4j.org/). The SDK only depends on `slf4j-api`, which means **you need to add an SLF4J implementation** to your classpath to see log output. -### Controlling Log Levels +### Adding a Logging Implementation -By default, the SDK logs at **INFO** level. To enable debug logging, set the system property when running your application: +**Option 1: Using slf4j-simple** (simplest for getting started) + +Add to your Maven dependencies: +```xml + + org.slf4j + slf4j-simple + 1.7.36 + +``` +Control log levels with system properties: ```bash java -Dorg.slf4j.simpleLogger.log.com.databricks.zerobus=debug -cp "lib/*:out" com.example.ZerobusClient ``` Available log levels: `trace`, `debug`, `info`, `warn`, `error` -### Using a Different Logging Framework +**Option 2: Using Logback** (recommended for production) -If you prefer a different logging framework like **Logback** or **Log4j**, you can substitute `slf4j-simple`: - -**With Logback**, add the following to your `logback.xml`: +Add to your Maven dependencies: +```xml + + ch.qos.logback + logback-classic + 1.2.11 + +``` +Create `logback.xml` in your resources directory: ```xml @@ -387,13 +515,43 @@ If you prefer a different logging framework like **Logback** or **Log4j**, you c ``` -**With Log4j**, add to your `log4j.properties`: +**Option 3: Using Log4j 2** -```properties -log4j.logger.com.databricks.zerobus=DEBUG +Add to your Maven dependencies: +```xml + + org.apache.logging.log4j + log4j-slf4j-impl + 2.20.0 + +``` + +Create `log4j2.xml` in your resources directory: +```xml + + + + + + + + + + + + + ``` -To use a different logging framework, exclude `slf4j-simple` and add your preferred implementation to the classpath. +### No Logging Implementation + +If you don't add an SLF4J implementation, you'll see a warning like: +``` +SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". +SLF4J: Defaulting to no-operation (NOP) logger implementation +``` + +The SDK will still work, but no log messages will be output. ### What Gets Logged @@ -676,7 +834,4 @@ NonRetriableException(String message, Throwable cause) 4. **Error handling**: Implement proper retry logic for retriable errors 5. **Monitoring**: Use `ackCallback` to track ingestion progress 6. **Token refresh**: Tokens are automatically refreshed on stream creation and recovery - -## Disclaimer - -Databricks is actively working on stabilizing the Zerobus Ingest SDK for Java. Minor version updates may include backwards-incompatible changes. +7. **Proto generation**: Use the built-in `GenerateProto` tool to automatically generate proto files from your table schemas diff --git a/pom.xml b/pom.xml index 908e47a..3166520 100644 --- a/pom.xml +++ b/pom.xml @@ -3,11 +3,30 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 com.databricks - databricks-zerobus-ingest-sdk + zerobus-ingest-sdk 0.1.0 jar Zerobus Ingest SDK for Java Databricks Zerobus Ingest SDK for Java - Direct ingestion to Delta tables + https://github.com/databricks/zerobus-sdk-java + + + Databricks License + https://github.com/databricks/zerobus-sdk-java/blob/main/LICENSE + repo + + + + + Databricks + https://databricks.com + + + + scm:git:git://github.com/databricks/zerobus-sdk-java.git + scm:git:ssh://github.com:databricks/zerobus-sdk-java.git + https://github.com/databricks/zerobus-sdk-java/tree/main + 1.8 1.8 @@ -48,11 +67,43 @@ slf4j-api 1.7.36 - + org.slf4j slf4j-simple 1.7.36 + test + + + + org.junit.jupiter + junit-jupiter-api + 5.10.0 + test + + + org.junit.jupiter + junit-jupiter-engine + 5.10.0 + test + + + org.mockito + mockito-core + 5.5.0 + test + + + org.mockito + mockito-junit-jupiter + 5.5.0 + test + + + io.grpc + grpc-testing + 1.58.0 + test @@ -69,11 +120,18 @@ + compile-protobuf compile compile-custom + + test-compile-protobuf + + test-compile + + @@ -114,6 +172,12 @@ + + + org.apache.maven.plugins + maven-surefire-plugin + 3.2.1 + org.apache.maven.plugins maven-jar-plugin @@ -138,6 +202,7 @@ ${project.name} ${project.version} + com.databricks.zerobus.tools.GenerateProto @@ -157,6 +222,70 @@ + + + org.apache.maven.plugins + maven-source-plugin + 3.3.0 + + + attach-sources + + jar-no-fork + + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + 3.6.2 + + none + true + + + + attach-javadocs + + jar + + + + + + + org.apache.maven.plugins + maven-gpg-plugin + 3.1.0 + + + sign-artifacts + + sign + + verify + + + --pinentry-mode + loopback + + + + + + + + org.sonatype.central + central-publishing-maven-plugin + 0.5.0 + true + + central + false + + diff --git a/src/main/java/com/databricks/zerobus/ZerobusSdk.java b/src/main/java/com/databricks/zerobus/ZerobusSdk.java index 5d312f2..f086137 100644 --- a/src/main/java/com/databricks/zerobus/ZerobusSdk.java +++ b/src/main/java/com/databricks/zerobus/ZerobusSdk.java @@ -159,18 +159,25 @@ public CompletableFuture> try { logger.debug("Creating stream for table: " + tableProperties.getTableName()); - // Generate authentication token - String token = - TokenFactory.getZerobusToken( - tableProperties.getTableName(), - workspaceId, - unityCatalogEndpoint, - clientId, - clientSecret); + // Create a token supplier that generates a fresh token for each gRPC request + java.util.function.Supplier tokenSupplier = + () -> { + try { + return TokenFactory.getZerobusToken( + tableProperties.getTableName(), + workspaceId, + unityCatalogEndpoint, + clientId, + clientSecret); + } catch (NonRetriableException e) { + throw new RuntimeException("Failed to get Zerobus token", e); + } + }; - // Create gRPC stub with authentication + // Create gRPC stub once with token supplier - it will fetch fresh tokens as needed ZerobusGrpc.ZerobusStub stub = - stubFactory.createStub(serverEndpoint, true, tableProperties.getTableName(), token); + stubFactory.createStubWithTokenSupplier( + serverEndpoint, tableProperties.getTableName(), tokenSupplier); ZerobusStream stream = new ZerobusStream<>( diff --git a/src/main/java/com/databricks/zerobus/ZerobusSdkStubUtils.java b/src/main/java/com/databricks/zerobus/ZerobusSdkStubUtils.java index 9c9650a..6893cd5 100644 --- a/src/main/java/com/databricks/zerobus/ZerobusSdkStubUtils.java +++ b/src/main/java/com/databricks/zerobus/ZerobusSdkStubUtils.java @@ -20,69 +20,58 @@ class ZerobusSdkStubFactory { // gRPC channel configuration constants private static final int DEFAULT_TLS_PORT = 443; - private static final int DEFAULT_PLAINTEXT_PORT = 80; private static final long KEEP_ALIVE_TIME_SECONDS = 30; private static final long KEEP_ALIVE_TIMEOUT_SECONDS = 10; - private static final int MAX_INBOUND_MESSAGE_SIZE_MB = 100; - private static final int BYTES_PER_MB = 1024 * 1024; - // Protocol prefixes + // Protocol prefix private static final String HTTPS_PREFIX = "https://"; - private static final String HTTP_PREFIX = "http://"; /** - * Creates a new managed gRPC channel. + * Creates a new managed gRPC channel with TLS. * *

The channel is configured for long-lived streaming with appropriate keep-alive settings and - * message size limits. + * unlimited message size limits. * - * @param endpoint The endpoint URL (may include protocol prefix) - * @param useTls Whether to use TLS encryption + * @param endpoint The endpoint URL (may include https:// prefix) * @return A configured ManagedChannel */ - ManagedChannel createGrpcChannel(String endpoint, boolean useTls) { - EndpointInfo endpointInfo = parseEndpoint(endpoint, useTls); + ManagedChannel createGrpcChannel(String endpoint) { + EndpointInfo endpointInfo = parseEndpoint(endpoint); NettyChannelBuilder builder = - NettyChannelBuilder.forAddress(endpointInfo.host, endpointInfo.port); + NettyChannelBuilder.forAddress(endpointInfo.host, endpointInfo.port).useTransportSecurity(); - // Configure TLS or plaintext - if (useTls) { - builder.useTransportSecurity(); - } else { - builder.usePlaintext(); - } - - // Configure for long-lived streaming connections + // Configure for long-lived streaming connections with unlimited message size return builder .keepAliveTime(KEEP_ALIVE_TIME_SECONDS, TimeUnit.SECONDS) .keepAliveTimeout(KEEP_ALIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS) .keepAliveWithoutCalls(true) - .maxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE_MB * BYTES_PER_MB) + .maxInboundMessageSize(Integer.MAX_VALUE) .build(); } /** - * Creates a new Zerobus gRPC stub with authentication. + * Creates a new Zerobus gRPC stub with dynamic token supplier. * - *

The stub is configured with an interceptor that adds authentication headers to all outgoing - * requests. + *

The stub is configured with an interceptor that obtains a fresh token for each request using + * the provided token supplier. This allows token rotation without recreating the stub. * *

Note: Currently creates a new channel for each stub. Consider reusing channels across * multiple streams for better resource utilization. * * @param endpoint The endpoint URL - * @param useTls Whether to use TLS encryption * @param tableName The target table name - * @param token Authentication token (Bearer token) - * @return A configured ZerobusStub + * @param tokenSupplier Supplier that provides a fresh authentication token for each request + * @return A configured ZerobusStub with unlimited message sizes */ - ZerobusGrpc.ZerobusStub createStub( - String endpoint, boolean useTls, String tableName, String token) { - ManagedChannel channel = createGrpcChannel(endpoint, useTls); - ClientInterceptor authInterceptor = new AuthenticationInterceptor(token, tableName); + ZerobusGrpc.ZerobusStub createStubWithTokenSupplier( + String endpoint, String tableName, java.util.function.Supplier tokenSupplier) { + ManagedChannel channel = createGrpcChannel(endpoint); + ClientInterceptor authInterceptor = new AuthenticationInterceptor(tokenSupplier, tableName); Channel interceptedChannel = io.grpc.ClientInterceptors.intercept(channel, authInterceptor); - return ZerobusGrpc.newStub(interceptedChannel); + return ZerobusGrpc.newStub(interceptedChannel) + .withMaxInboundMessageSize(Integer.MAX_VALUE) + .withMaxOutboundMessageSize(Integer.MAX_VALUE); } /** @@ -97,26 +86,20 @@ static ZerobusSdkStubFactory create() { /** * Parses an endpoint string to extract host and port information. * - * @param endpoint The endpoint string (may include protocol) - * @param useTls Whether TLS is being used (affects default port) + * @param endpoint The endpoint string (may include https:// prefix) * @return Parsed endpoint information */ - private EndpointInfo parseEndpoint(String endpoint, boolean useTls) { + private EndpointInfo parseEndpoint(String endpoint) { // Remove protocol prefix if present String cleanEndpoint = endpoint; if (cleanEndpoint.startsWith(HTTPS_PREFIX)) { cleanEndpoint = cleanEndpoint.substring(HTTPS_PREFIX.length()); - } else if (cleanEndpoint.startsWith(HTTP_PREFIX)) { - cleanEndpoint = cleanEndpoint.substring(HTTP_PREFIX.length()); } - // Split host and port + // Parse host:port format String[] parts = cleanEndpoint.split(":", 2); String host = parts[0]; - int port = - parts.length > 1 - ? Integer.parseInt(parts[1]) - : (useTls ? DEFAULT_TLS_PORT : DEFAULT_PLAINTEXT_PORT); + int port = parts.length > 1 ? Integer.parseInt(parts[1]) : DEFAULT_TLS_PORT; return new EndpointInfo(host, port); } @@ -151,17 +134,17 @@ class AuthenticationInterceptor implements ClientInterceptor { Metadata.Key.of("x-databricks-zerobus-table-name", Metadata.ASCII_STRING_MARSHALLER); private static final String BEARER_PREFIX = "Bearer "; - private final String token; + private final java.util.function.Supplier tokenSupplier; private final String tableName; /** - * Creates a new authentication interceptor. + * Creates a new authentication interceptor with a dynamic token supplier. * - * @param token The authentication token (without "Bearer " prefix) + * @param tokenSupplier Supplier that provides a fresh authentication token for each request * @param tableName The target table name */ - AuthenticationInterceptor(String token, String tableName) { - this.token = token; + AuthenticationInterceptor(java.util.function.Supplier tokenSupplier, String tableName) { + this.tokenSupplier = tokenSupplier; this.tableName = tableName; } @@ -172,7 +155,7 @@ public ClientCall interceptCall( next.newCall(method, callOptions)) { @Override public void start(Listener responseListener, Metadata headers) { - headers.put(AUTHORIZATION_HEADER, BEARER_PREFIX + token); + headers.put(AUTHORIZATION_HEADER, BEARER_PREFIX + tokenSupplier.get()); headers.put(TABLE_NAME_HEADER, tableName); super.start(responseListener, headers); } diff --git a/src/main/java/com/databricks/zerobus/ZerobusStream.java b/src/main/java/com/databricks/zerobus/ZerobusStream.java index 2657d2d..8abdd39 100644 --- a/src/main/java/com/databricks/zerobus/ZerobusStream.java +++ b/src/main/java/com/databricks/zerobus/ZerobusStream.java @@ -352,28 +352,10 @@ private CompletableFuture createStream() { () -> { CompletableFuture createStreamTry = new CompletableFuture<>(); - // Generate a fresh token for this stream creation attempt - try { - String token = - TokenFactory.getZerobusToken( - tableProperties.getTableName(), - workspaceId, - unityCatalogEndpoint, - clientId, - clientSecret); - - // Create a new stub with the fresh token - stub = - stubFactory.createStub( - serverEndpoint, true, tableProperties.getTableName(), token); - - logger.debug("Generated new token and created stub for stream"); - } catch (NonRetriableException e) { - createStreamTry.completeExceptionally(e); - return createStreamTry; - } + // The stub was created once with a token supplier, so we don't recreate it here + // The token supplier will provide a fresh token for each gRPC request - // Create the gRPC stream with the new stub + // Create the gRPC stream with the existing stub streamCreatedEvent = Optional.of(new CompletableFuture<>()); stream = Optional.of( diff --git a/src/main/java/com/databricks/zerobus/tools/GenerateProto.java b/src/main/java/com/databricks/zerobus/tools/GenerateProto.java new file mode 100644 index 0000000..e83c14c --- /dev/null +++ b/src/main/java/com/databricks/zerobus/tools/GenerateProto.java @@ -0,0 +1,706 @@ +package com.databricks.zerobus.tools; + +import java.io.BufferedReader; +import java.io.FileWriter; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.net.HttpURLConnection; +import java.net.URL; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Generate proto2 file from Unity Catalog table schema. + * + *

This tool fetches table schema from Unity Catalog and generates a corresponding proto2 + * definition file. It supports all Delta data types and maps them to appropriate Protocol Buffer + * types. + * + *

Usage: java GenerateProto --uc-endpoint <endpoint> --client-id <id> + * --client-secret <secret> --table <catalog.schema.table> --output <output.proto> + * [--proto-msg <message_name>] + * + *

Type mappings: INT -> int32 STRING -> string FLOAT -> float LONG/BIGINT -> int64 + * SHORT/SMALLINT -> int32 DOUBLE -> double BOOLEAN -> bool BINARY -> bytes DATE -> + * int32 TIMESTAMP -> int64 ARRAY<type> -> repeated type MAP<key_type, value_type> + * -> map<key_type, value_type> + */ +public class GenerateProto { + + private static final String USAGE = + "Usage: java GenerateProto \n" + + " --uc-endpoint Unity Catalog endpoint URL\n" + + " --client-id OAuth client ID\n" + + " --client-secret OAuth client secret\n" + + " --table Full table name\n" + + " --output Output path for proto file\n" + + " [--proto-msg ] Name of protobuf message (defaults to table name)\n" + + "\n" + + "Examples:\n" + + " java GenerateProto \\\n" + + " --uc-endpoint \"https://your-workspace.cloud.databricks.com\" \\\n" + + " --client-id \"your-client-id\" \\\n" + + " --client-secret \"your-client-secret\" \\\n" + + " --table \"catalog.schema.table_name\" \\\n" + + " --proto-msg \"TableMessage\" \\\n" + + " --output \"output.proto\"\n" + + "\n" + + "Type mappings:\n" + + " Delta -> Proto2\n" + + " INT -> int32\n" + + " STRING -> string\n" + + " FLOAT -> float\n" + + " LONG -> int64\n" + + " SHORT -> int32\n" + + " DOUBLE -> double\n" + + " BOOLEAN -> bool\n" + + " BINARY -> bytes\n" + + " DATE -> int32\n" + + " TIMESTAMP -> int64\n" + + " ARRAY -> repeated type\n" + + " MAP -> map\n"; + + public static void main(String[] args) { + try { + Args parsedArgs = parseArgs(args); + run(parsedArgs); + System.out.println("Successfully generated proto file at: " + parsedArgs.output); + System.exit(0); + } catch (IllegalArgumentException e) { + System.err.println("Error: " + e.getMessage()); + System.err.println(); + System.err.println(USAGE); + System.exit(1); + } catch (Exception e) { + System.err.println("Error: " + e.getMessage()); + e.printStackTrace(); + System.exit(1); + } + } + + private static void run(Args args) throws Exception { + // Get OAuth token + String token = getOAuthToken(args.ucEndpoint, args.clientId, args.clientSecret); + + // Fetch table information from Unity Catalog + Map tableInfo = fetchTableInfo(args.ucEndpoint, token, args.table); + + // Extract column information + List> columns = extractColumns(tableInfo); + + // Determine message name + String messageName = args.protoMsg != null ? args.protoMsg : args.table.split("\\.")[2]; + + // Generate proto file + generateProtoFile(messageName, columns, args.output); + } + + private static Args parseArgs(String[] args) { + Args result = new Args(); + + for (int i = 0; i < args.length; i++) { + String arg = args[i]; + if (arg.startsWith("--")) { + String key = arg.substring(2); + if (i + 1 >= args.length) { + throw new IllegalArgumentException("Missing value for argument: " + arg); + } + String value = args[++i]; + + switch (key) { + case "uc-endpoint": + result.ucEndpoint = value; + break; + case "client-id": + result.clientId = value; + break; + case "client-secret": + result.clientSecret = value; + break; + case "table": + result.table = value; + break; + case "output": + result.output = value; + break; + case "proto-msg": + result.protoMsg = value; + break; + default: + throw new IllegalArgumentException("Unknown argument: " + arg); + } + } + } + + // Validate required arguments + if (result.ucEndpoint == null) { + throw new IllegalArgumentException("Missing required argument: --uc-endpoint"); + } + if (result.clientId == null) { + throw new IllegalArgumentException("Missing required argument: --client-id"); + } + if (result.clientSecret == null) { + throw new IllegalArgumentException("Missing required argument: --client-secret"); + } + if (result.table == null) { + throw new IllegalArgumentException("Missing required argument: --table"); + } + if (result.output == null) { + throw new IllegalArgumentException("Missing required argument: --output"); + } + + return result; + } + + /** + * Obtains an OAuth token using client credentials flow. + * + *

This method uses basic OAuth 2.0 client credentials flow without resource or authorization + * details. + * + * @param ucEndpoint The Unity Catalog endpoint URL + * @param clientId The OAuth client ID + * @param clientSecret The OAuth client secret + * @return The OAuth access token (JWT) + * @throws Exception if the token request fails + */ + private static String getOAuthToken(String ucEndpoint, String clientId, String clientSecret) + throws Exception { + String urlString = ucEndpoint + "/oidc/v1/token"; + + // Build OAuth 2.0 client credentials request with minimal scope + String formData = "grant_type=client_credentials&scope=all-apis"; + + // Encode credentials for HTTP Basic authentication + String credentials = + Base64.getEncoder() + .encodeToString((clientId + ":" + clientSecret).getBytes(StandardCharsets.UTF_8)); + + HttpURLConnection connection = (HttpURLConnection) new URL(urlString).openConnection(); + connection.setRequestMethod("POST"); + connection.setRequestProperty("Content-Type", "application/x-www-form-urlencoded"); + connection.setRequestProperty("Authorization", "Basic " + credentials); + connection.setDoOutput(true); + + OutputStreamWriter writer = + new OutputStreamWriter(connection.getOutputStream(), StandardCharsets.UTF_8); + writer.write(formData); + writer.close(); + + int responseCode = connection.getResponseCode(); + + if (responseCode != 200) { + String errorBody = readStream(connection.getErrorStream()); + throw new IOException("OAuth request failed with status " + responseCode + ": " + errorBody); + } + + String responseBody = readStream(connection.getInputStream()); + + // Extract access token using regex to avoid dependency on a JSON library + Pattern accessTokenPattern = Pattern.compile("\"access_token\"\\s*:\\s*\"([^\"]+)\""); + Matcher matcher = accessTokenPattern.matcher(responseBody); + + if (matcher.find()) { + return matcher.group(1); + } else { + throw new IOException("No access token received from OAuth response"); + } + } + + /** + * Fetch table information from Unity Catalog. + * + * @param endpoint Base URL of the Unity Catalog endpoint + * @param token Authentication token + * @param table Table identifier (catalog.schema.table) + * @return The parsed table information as a Map + * @throws Exception If the HTTP request fails + */ + @SuppressWarnings("unchecked") + private static Map fetchTableInfo(String endpoint, String token, String table) + throws Exception { + String encodedTable = URLEncoder.encode(table, "UTF-8"); + String urlString = endpoint + "/api/2.1/unity-catalog/tables/" + encodedTable; + + HttpURLConnection connection = (HttpURLConnection) new URL(urlString).openConnection(); + connection.setRequestMethod("GET"); + connection.setRequestProperty("Authorization", "Bearer " + token); + connection.setRequestProperty("Content-Type", "application/json"); + + int responseCode = connection.getResponseCode(); + + if (responseCode != 200) { + String errorBody = readStream(connection.getErrorStream()); + throw new IOException( + "Failed to fetch table info with status " + responseCode + ": " + errorBody); + } + + String responseBody = readStream(connection.getInputStream()); + return (Map) parseJson(responseBody); + } + + /** + * Extract column information from the table schema. + * + * @param tableInfo Raw table information from Unity Catalog + * @return List of column information maps + * @throws IllegalArgumentException If the expected schema structure is not found + */ + @SuppressWarnings("unchecked") + private static List> extractColumns(Map tableInfo) { + if (!tableInfo.containsKey("columns")) { + throw new IllegalArgumentException("No columns found in table info"); + } + return (List>) tableInfo.get("columns"); + } + + /** + * Map Unity Catalog column types to proto2 field information. + * + * @param columnType The Unity Catalog column type + * @param nullable Whether the column is nullable + * @return Array containing [field_modifier, proto_type] + * @throws IllegalArgumentException If the column type is not supported + */ + private static String[] getProtoFieldInfo(String columnType, boolean nullable) { + String upperType = columnType.toUpperCase(); + + // Basic type mapping + String protoType = null; + switch (upperType) { + case "SMALLINT": + case "INT": + case "SHORT": + case "DATE": + protoType = "int32"; + break; + case "BIGINT": + case "LONG": + case "TIMESTAMP": + protoType = "int64"; + break; + case "STRING": + protoType = "string"; + break; + case "FLOAT": + protoType = "float"; + break; + case "DOUBLE": + protoType = "double"; + break; + case "BOOLEAN": + protoType = "bool"; + break; + case "BINARY": + protoType = "bytes"; + break; + } + + if (protoType != null) { + return new String[] {nullable ? "optional" : "required", protoType}; + } + + // VARCHAR types + if (upperType.startsWith("VARCHAR")) { + return new String[] {nullable ? "optional" : "required", "string"}; + } + + // Array types + Pattern arrayPattern = Pattern.compile("^ARRAY<(.+)>$"); + Matcher arrayMatcher = arrayPattern.matcher(upperType); + if (arrayMatcher.matches()) { + String elementType = arrayMatcher.group(1).trim(); + String elementProtoType = getBasicProtoType(elementType); + if (elementProtoType == null) { + throw new IllegalArgumentException("Unsupported array element type: " + elementType); + } + return new String[] {"repeated", elementProtoType}; + } + + // Map types + Pattern mapPattern = Pattern.compile("^MAP<(.+),(.+)>$"); + Matcher mapMatcher = mapPattern.matcher(upperType); + if (mapMatcher.matches()) { + String keyType = mapMatcher.group(1).trim(); + String valueType = mapMatcher.group(2).trim(); + + String keyProtoType = getBasicProtoType(keyType); + if (keyProtoType == null) { + throw new IllegalArgumentException("Unsupported map key type: " + keyType); + } + + String valueProtoType = getBasicProtoType(valueType); + if (valueProtoType == null) { + throw new IllegalArgumentException("Unsupported map value type: " + valueType); + } + + return new String[] {"", "map<" + keyProtoType + ", " + valueProtoType + ">"}; + } + + throw new IllegalArgumentException("Unsupported column type: " + columnType); + } + + /** + * Get basic proto type mapping for simple types. + * + * @param type The Unity Catalog type + * @return The proto type or null if not a basic type + */ + private static String getBasicProtoType(String type) { + String upperType = type.toUpperCase(); + switch (upperType) { + case "SMALLINT": + case "INT": + case "SHORT": + case "DATE": + return "int32"; + case "BIGINT": + case "LONG": + case "TIMESTAMP": + return "int64"; + case "STRING": + return "string"; + case "FLOAT": + return "float"; + case "DOUBLE": + return "double"; + case "BOOLEAN": + return "bool"; + case "BINARY": + return "bytes"; + default: + return null; + } + } + + /** + * Generate a proto2 file from the column information. + * + * @param messageName Name of the protobuf message + * @param columns List of column information maps + * @param outputPath Path where to write the proto file + * @throws IOException If the file cannot be written + */ + @SuppressWarnings("unchecked") + private static void generateProtoFile( + String messageName, List> columns, String outputPath) throws IOException { + StringBuilder protoContent = new StringBuilder(); + protoContent.append("syntax = \"proto2\";\n"); + protoContent.append("\n"); + protoContent.append("message ").append(messageName).append(" {\n"); + + // Add fields + int fieldNumber = 1; + for (Map col : columns) { + String fieldName = (String) col.get("name"); + String typeText = (String) col.get("type_text"); + boolean nullable = (Boolean) col.get("nullable"); + + String[] fieldInfo = getProtoFieldInfo(typeText, nullable); + String fieldModifier = fieldInfo[0]; + String protoType = fieldInfo[1]; + + if (fieldModifier.isEmpty()) { + // Map type (no modifier) + protoContent + .append(" ") + .append(protoType) + .append(" ") + .append(fieldName) + .append(" = ") + .append(fieldNumber) + .append(";\n"); + } else { + // Regular field or repeated field + protoContent + .append(" ") + .append(fieldModifier) + .append(" ") + .append(protoType) + .append(" ") + .append(fieldName) + .append(" = ") + .append(fieldNumber) + .append(";\n"); + } + fieldNumber++; + } + + protoContent.append("}\n"); + + // Write to file + try (FileWriter writer = new FileWriter(outputPath)) { + writer.write(protoContent.toString()); + } + } + + /** Helper method to read an input stream to a string. */ + private static String readStream(java.io.InputStream stream) throws IOException { + if (stream == null) { + return "No error details available"; + } + BufferedReader reader = + new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8)); + StringBuilder builder = new StringBuilder(); + String line; + while ((line = reader.readLine()) != null) { + builder.append(line).append("\n"); + } + reader.close(); + return builder.toString(); + } + + /** + * Simple JSON parser for basic objects and arrays. This avoids adding a JSON library dependency. + */ + private static Object parseJson(String json) { + return new SimpleJsonParser(json).parse(); + } + + /** Simple command-line arguments holder. */ + private static class Args { + String ucEndpoint; + String clientId; + String clientSecret; + String table; + String output; + String protoMsg; + } + + /** + * A minimal JSON parser that can handle basic objects, arrays, strings, numbers, booleans, and + * nulls. This is sufficient for parsing Unity Catalog API responses without adding a dependency. + */ + private static class SimpleJsonParser { + private final String json; + private int pos = 0; + + SimpleJsonParser(String json) { + this.json = json.trim(); + } + + Object parse() { + skipWhitespace(); + return parseValue(); + } + + private Object parseValue() { + skipWhitespace(); + char c = peek(); + + if (c == '{') { + return parseObject(); + } else if (c == '[') { + return parseArray(); + } else if (c == '"') { + return parseString(); + } else if (c == 't' || c == 'f') { + return parseBoolean(); + } else if (c == 'n') { + return parseNull(); + } else if (c == '-' || Character.isDigit(c)) { + return parseNumber(); + } else { + throw new IllegalArgumentException("Unexpected character at position " + pos + ": " + c); + } + } + + private Map parseObject() { + Map map = new java.util.HashMap<>(); + consume('{'); + skipWhitespace(); + + if (peek() == '}') { + consume('}'); + return map; + } + + while (true) { + skipWhitespace(); + String key = parseString(); + skipWhitespace(); + consume(':'); + skipWhitespace(); + Object value = parseValue(); + map.put(key, value); + + skipWhitespace(); + char c = peek(); + if (c == '}') { + consume('}'); + break; + } else if (c == ',') { + consume(','); + } else { + throw new IllegalArgumentException("Expected ',' or '}' at position " + pos); + } + } + + return map; + } + + private List parseArray() { + List list = new java.util.ArrayList<>(); + consume('['); + skipWhitespace(); + + if (peek() == ']') { + consume(']'); + return list; + } + + while (true) { + skipWhitespace(); + list.add(parseValue()); + skipWhitespace(); + + char c = peek(); + if (c == ']') { + consume(']'); + break; + } else if (c == ',') { + consume(','); + } else { + throw new IllegalArgumentException("Expected ',' or ']' at position " + pos); + } + } + + return list; + } + + private String parseString() { + consume('"'); + StringBuilder sb = new StringBuilder(); + + while (pos < json.length()) { + char c = json.charAt(pos); + if (c == '"') { + pos++; + return sb.toString(); + } else if (c == '\\') { + pos++; + if (pos >= json.length()) { + throw new IllegalArgumentException("Unterminated string escape"); + } + char escaped = json.charAt(pos); + switch (escaped) { + case '"': + case '\\': + case '/': + sb.append(escaped); + break; + case 'b': + sb.append('\b'); + break; + case 'f': + sb.append('\f'); + break; + case 'n': + sb.append('\n'); + break; + case 'r': + sb.append('\r'); + break; + case 't': + sb.append('\t'); + break; + case 'u': + // Unicode escape + if (pos + 4 >= json.length()) { + throw new IllegalArgumentException("Invalid unicode escape"); + } + String hex = json.substring(pos + 1, pos + 5); + sb.append((char) Integer.parseInt(hex, 16)); + pos += 4; + break; + default: + throw new IllegalArgumentException("Invalid escape character: " + escaped); + } + pos++; + } else { + sb.append(c); + pos++; + } + } + + throw new IllegalArgumentException("Unterminated string"); + } + + private Object parseNumber() { + int start = pos; + if (peek() == '-') { + pos++; + } + + while (pos < json.length() + && (Character.isDigit(json.charAt(pos)) + || json.charAt(pos) == '.' + || json.charAt(pos) == 'e' + || json.charAt(pos) == 'E' + || json.charAt(pos) == '+' + || json.charAt(pos) == '-')) { + pos++; + } + + String numStr = json.substring(start, pos); + if (numStr.contains(".") || numStr.contains("e") || numStr.contains("E")) { + return Double.parseDouble(numStr); + } else { + try { + return Integer.parseInt(numStr); + } catch (NumberFormatException e) { + return Long.parseLong(numStr); + } + } + } + + private Boolean parseBoolean() { + if (json.startsWith("true", pos)) { + pos += 4; + return Boolean.TRUE; + } else if (json.startsWith("false", pos)) { + pos += 5; + return Boolean.FALSE; + } else { + throw new IllegalArgumentException("Invalid boolean at position " + pos); + } + } + + private Object parseNull() { + if (json.startsWith("null", pos)) { + pos += 4; + return null; + } else { + throw new IllegalArgumentException("Invalid null at position " + pos); + } + } + + private char peek() { + if (pos >= json.length()) { + throw new IllegalArgumentException("Unexpected end of JSON"); + } + return json.charAt(pos); + } + + private void consume(char expected) { + char c = peek(); + if (c != expected) { + throw new IllegalArgumentException( + "Expected '" + expected + "' but got '" + c + "' at position " + pos); + } + pos++; + } + + private void skipWhitespace() { + while (pos < json.length() && Character.isWhitespace(json.charAt(pos))) { + pos++; + } + } + } +} diff --git a/src/test/java/com/databricks/zerobus/MockedGrpcServer.java b/src/test/java/com/databricks/zerobus/MockedGrpcServer.java new file mode 100644 index 0000000..5620b98 --- /dev/null +++ b/src/test/java/com/databricks/zerobus/MockedGrpcServer.java @@ -0,0 +1,420 @@ +package com.databricks.zerobus; + +import io.grpc.stub.ClientCallStreamObserver; +import io.grpc.stub.StreamObserver; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * MockedGrpcServer simulates server-side gRPC behavior for testing ZerobusSDK without needing a + * real server. + * + *

It intercepts gRPC stream messages, processes them asynchronously, and sends responses back to + * the client based on injected test scenarios. + */ +public class MockedGrpcServer { + private static class AckRecord { + final boolean success; + final long offsetId; + final long delayMs; + final Throwable error; + final boolean writeFailure; + final boolean closeStreamSignal; + + AckRecord( + boolean success, + long offsetId, + long delayMs, + Throwable error, + boolean writeFailure, + boolean closeStreamSignal) { + this.success = success; + this.offsetId = offsetId; + this.delayMs = delayMs; + this.error = error; + this.writeFailure = writeFailure; + this.closeStreamSignal = closeStreamSignal; + } + } + + private static class CreateStreamResponse { + final boolean success; + final long delayMs; + final boolean skip; + final boolean writeFailure; + + CreateStreamResponse(boolean success, long delayMs, boolean skip, boolean writeFailure) { + this.success = success; + this.delayMs = delayMs; + this.skip = skip; + this.writeFailure = writeFailure; + } + } + + private final ExecutorService executorService; + private final List capturedMessages; + private final List injectedAckRecords; + private final List injectedCreateStreamResponses; + private final BlockingQueue messagesToProcess; + + private StreamObserver ackSender; + private long lastReceivedOffsetId = -1; + private volatile boolean serverRunning = false; + private volatile boolean streamReady = true; + private Runnable streamReadyHandler; + + private final ClientCallStreamObserver messageReceiver = + new ClientCallStreamObserver() { + @Override + public void onNext(EphemeralStreamRequest request) { + synchronized (MockedGrpcServer.this) { + capturedMessages.add(request); + messagesToProcess.offer(request); + } + } + + @Override + public void onError(Throwable t) { + stopServerThread(); + } + + @Override + public void onCompleted() { + stopServerThread(); + } + + @Override + public boolean isReady() { + return streamReady; + } + + @Override + public void setOnReadyHandler(Runnable onReadyHandler) { + streamReadyHandler = onReadyHandler; + } + + @Override + public void disableAutoInboundFlowControl() {} + + @Override + public void request(int count) {} + + @Override + public void setMessageCompression(boolean enable) {} + + @Override + public void cancel(String message, Throwable cause) {} + }; + + public MockedGrpcServer() { + this.executorService = Executors.newFixedThreadPool(2); + this.capturedMessages = Collections.synchronizedList(new ArrayList<>()); + this.injectedAckRecords = Collections.synchronizedList(new ArrayList<>()); + this.injectedCreateStreamResponses = Collections.synchronizedList(new ArrayList<>()); + this.messagesToProcess = new LinkedBlockingQueue<>(); + } + + /** Initialize the mocked server with an ack sender and start processing messages. */ + public void initialize(StreamObserver ackSender) { + synchronized (this) { + this.ackSender = ackSender; + this.lastReceivedOffsetId = -1; + this.messagesToProcess.clear(); + startServerThread(); + } + } + + /** Inject a successful ack for a specific record offset with optional delay. */ + public void injectAckRecord(long offsetId, long delayMs) { + injectedAckRecords.add(new AckRecord(true, offsetId, delayMs, null, false, false)); + } + + /** Inject a successful ack for a specific record offset. */ + public void injectAckRecord(long offsetId) { + injectAckRecord(offsetId, 0); + } + + /** Clear all injected ack records. */ + public void clearAckRecords() { + synchronized (injectedAckRecords) { + injectedAckRecords.clear(); + } + } + + /** Inject a failed ingest record response. */ + public void injectFailIngestRecord(long offsetId, long delayMs, Throwable error) { + injectedAckRecords.add(new AckRecord(false, offsetId, delayMs, error, false, false)); + } + + /** Inject a failed ingest record response. */ + public void injectFailIngestRecord(long offsetId) { + injectFailIngestRecord(offsetId, 0, new RuntimeException("Ingest record failed")); + } + + /** Inject a write failure for a specific record offset. */ + public void injectWriteFailureOfRecords(long offsetId, long delayMs) { + injectedAckRecords.add( + new AckRecord( + false, + offsetId, + delayMs, + new RuntimeException("IngestRecord write failure"), + true, + false)); + } + + /** Inject a write failure for a specific record offset. */ + public void injectWriteFailureOfRecords(long offsetId) { + injectWriteFailureOfRecords(offsetId, 0); + } + + /** Inject a non-retriable error for a specific record offset. */ + public void injectNonRetriableError(long offsetId, long delayMs) { + io.grpc.StatusRuntimeException nonRetriableError = + new io.grpc.StatusRuntimeException( + io.grpc.Status.UNAUTHENTICATED.withDescription("Non-retriable gRPC error")); + injectedAckRecords.add( + new AckRecord(false, offsetId, delayMs, nonRetriableError, false, false)); + } + + /** Inject a non-retriable error for a specific record offset. */ + public void injectNonRetriableError(long offsetId) { + injectNonRetriableError(offsetId, 0); + } + + /** Inject a CloseStreamSignal for a specific record offset. */ + public void injectCloseStreamSignal(long offsetId, long delayMs) { + injectedAckRecords.add(new AckRecord(true, offsetId, delayMs, null, false, true)); + } + + /** Inject a CloseStreamSignal for a specific record offset. */ + public void injectCloseStreamSignal(long offsetId) { + injectCloseStreamSignal(offsetId, 0); + } + + /** Inject a successful create stream response with delay. */ + public void injectCreateStreamSuccessWithDelay(long delayMs) { + injectedCreateStreamResponses.add(new CreateStreamResponse(true, delayMs, false, false)); + } + + /** Inject a failed create stream response. */ + public void injectFailCreateStream() { + injectedCreateStreamResponses.add(new CreateStreamResponse(false, 0, false, false)); + } + + /** Inject a skip create stream response (never sends response). */ + public void injectSkipCreateStreamResponse() { + injectedCreateStreamResponses.add(new CreateStreamResponse(false, 0, true, false)); + } + + /** Inject a write failure for create stream. */ + public void injectWriteFailureCreateStream(long delayMs) { + injectedCreateStreamResponses.add(new CreateStreamResponse(false, delayMs, false, true)); + } + + /** Inject a write failure for create stream. */ + public void injectWriteFailureCreateStream() { + injectWriteFailureCreateStream(0); + } + + /** Get all captured messages sent by the client. */ + public List getCapturedMessages() { + synchronized (capturedMessages) { + return new ArrayList<>(capturedMessages); + } + } + + /** Get the message receiver for the client to write to. */ + public ClientCallStreamObserver getMessageReceiver() { + return messageReceiver; + } + + /** Set stream readiness state. */ + public void setStreamReady(boolean ready) { + boolean oldStreamReadyState = streamReady; + streamReady = ready; + + if (streamReady && !oldStreamReadyState && streamReadyHandler != null) { + streamReadyHandler.run(); + } + } + + /** Destroy the mocked server and clean up resources. */ + public void destroy() { + stopServerThread(); + executorService.shutdownNow(); + try { + executorService.awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + private void startServerThread() { + synchronized (this) { + if (serverRunning) { + return; + } + serverRunning = true; + } + + executorService.submit( + () -> { + try { + while (serverRunning) { + EphemeralStreamRequest request = messagesToProcess.poll(100, TimeUnit.MILLISECONDS); + if (request == null) { + continue; + } + + processMessage(request); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (Exception e) { + // Server thread error, stop processing + } + }); + } + + private void stopServerThread() { + synchronized (this) { + serverRunning = false; + } + } + + private void processMessage(EphemeralStreamRequest request) throws InterruptedException { + if (request.hasCreateStream()) { + handleCreateStream(); + } else if (request.hasIngestRecord()) { + handleIngestRecord(request.getIngestRecord().getOffsetId()); + } + } + + private void handleCreateStream() throws InterruptedException { + synchronized (injectedCreateStreamResponses) { + if (injectedCreateStreamResponses.isEmpty()) { + sendCreateStreamSuccess(); + return; + } + + CreateStreamResponse response = injectedCreateStreamResponses.remove(0); + if (response.skip) { + return; // Never send response + } + + if (response.delayMs > 0) { + Thread.sleep(response.delayMs); + } + + if (response.writeFailure) { + throw new RuntimeException("CreateStream write failure"); + } + + if (response.success) { + sendCreateStreamSuccess(); + } else { + sendError(new RuntimeException("Create stream failed")); + } + } + } + + private void handleIngestRecord(long offset) throws InterruptedException { + if (offset != lastReceivedOffsetId + 1) { + sendError( + new RuntimeException( + String.format( + "Invalid offset Id; expected %d but got %d", lastReceivedOffsetId + 1, offset))); + return; + } + + lastReceivedOffsetId = offset; + + synchronized (injectedAckRecords) { + if (injectedAckRecords.isEmpty()) { + // Default behavior: auto-ack all records when no specific behavior is injected + sendAck(offset); + return; + } + + // Check if there's a specific ack record for this offset + AckRecord matchingRecord = null; + for (int i = 0; i < injectedAckRecords.size(); i++) { + if (injectedAckRecords.get(i).offsetId == offset) { + matchingRecord = injectedAckRecords.remove(i); + break; + } + } + + if (matchingRecord != null) { + // Process the specific injected behavior + if (matchingRecord.delayMs > 0) { + Thread.sleep(matchingRecord.delayMs); + } + + if (matchingRecord.writeFailure) { + throw new RuntimeException("IngestRecord write failure"); + } + + if (matchingRecord.closeStreamSignal) { + sendCloseStreamSignal(); + } else if (matchingRecord.success) { + sendAck(offset); + } else { + Throwable error = + matchingRecord.error != null + ? matchingRecord.error + : new RuntimeException("Ingest failed"); + sendError(error); + } + } + // Note: If no matching record found and injectedAckRecords is not empty, + // do NOT send ack (this is intentional for tests that need to test lack of acks) + } + } + + private void sendCreateStreamSuccess() { + if (ackSender != null) { + EphemeralStreamResponse response = + EphemeralStreamResponse.newBuilder() + .setCreateStreamResponse( + CreateIngestStreamResponse.newBuilder().setStreamId("test-stream-id").build()) + .build(); + ackSender.onNext(response); + } + } + + private void sendAck(long offset) { + if (ackSender != null) { + EphemeralStreamResponse response = + EphemeralStreamResponse.newBuilder() + .setIngestRecordResponse( + IngestRecordResponse.newBuilder().setDurabilityAckUpToOffset(offset).build()) + .build(); + ackSender.onNext(response); + } + } + + private void sendCloseStreamSignal() { + if (ackSender != null) { + EphemeralStreamResponse response = + EphemeralStreamResponse.newBuilder() + .setCloseStreamSignal(CloseStreamSignal.newBuilder().build()) + .build(); + ackSender.onNext(response); + } + } + + private void sendError(Throwable error) { + if (ackSender != null) { + ackSender.onError(error); + } + stopServerThread(); + } +} diff --git a/src/test/java/com/databricks/zerobus/ZerobusSdkTest.java b/src/test/java/com/databricks/zerobus/ZerobusSdkTest.java new file mode 100644 index 0000000..d184405 --- /dev/null +++ b/src/test/java/com/databricks/zerobus/ZerobusSdkTest.java @@ -0,0 +1,381 @@ +package com.databricks.zerobus; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.*; + +import com.databricks.test.table.TestTableRow.CityPopulationTableRow; +import io.grpc.stub.StreamObserver; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; + +/** + * Test suite for ZerobusSdk with mocked gRPC server. + * + *

These tests verify the SDK's core functionality including stream creation, record ingestion, + * acknowledgments, and flush operations without requiring a real Zerobus backend server. + * + *

Best practices followed: - Fast execution (no long sleeps or timeouts) - Clear test names + * describing what is being tested - Proper mock setup and teardown - Testing both success and + * failure paths - Using CompletableFutures for async operations + */ +@ExtendWith(MockitoExtension.class) +public class ZerobusSdkTest { + + private MockedGrpcServer mockedGrpcServer; + private ZerobusGrpc.ZerobusStub zerobusStub; + private ZerobusSdk zerobusSdk; + private ZerobusSdkStubFactory zerobusSdkStubFactory; + private org.mockito.MockedStatic tokenFactoryMock; + + @BeforeEach + public void setUp() { + // Create mocked gRPC server + mockedGrpcServer = new MockedGrpcServer(); + + // Create mocked stub + zerobusStub = mock(ZerobusGrpc.ZerobusStub.class); + + // Create spy on stub factory + zerobusSdkStubFactory = spy(ZerobusSdkStubFactory.create()); + + // Mock TokenFactory to return a fake token + tokenFactoryMock = mockStatic(TokenFactory.class); + tokenFactoryMock + .when( + () -> + TokenFactory.getZerobusToken( + anyString(), anyString(), anyString(), anyString(), anyString())) + .thenReturn("fake-token-for-testing"); + + // Create ZerobusSdk and set mocked stub factory + zerobusSdk = new ZerobusSdk("localhost:50051", "https://test.cloud.databricks.com"); + zerobusSdk.setStubFactory(zerobusSdkStubFactory); + + // Configure stub factory to return our mocked stub with token supplier + doReturn(zerobusStub) + .when(zerobusSdkStubFactory) + .createStubWithTokenSupplier(anyString(), anyString(), any()); + + // Setup mocked stub's ephemeralStream behavior + doAnswer( + invocation -> { + @SuppressWarnings("unchecked") + StreamObserver ackSender = + (StreamObserver) invocation.getArgument(0); + + mockedGrpcServer.initialize(ackSender); + return mockedGrpcServer.getMessageReceiver(); + }) + .when(zerobusStub) + .ephemeralStream(any()); + } + + @AfterEach + public void tearDown() { + if (tokenFactoryMock != null) { + tokenFactoryMock.close(); + } + if (mockedGrpcServer != null) { + mockedGrpcServer.destroy(); + } + mockedGrpcServer = null; + zerobusStub = null; + zerobusSdk = null; + zerobusSdkStubFactory = null; + tokenFactoryMock = null; + } + + @Test + public void testSingleRecordIngestAndAcknowledgment() throws Exception { + // Test basic ingestion: send one record and verify it's acknowledged + mockedGrpcServer.injectAckRecord(0); + + TableProperties tableProperties = + new TableProperties<>("test-table", CityPopulationTableRow.getDefaultInstance()); + StreamConfigurationOptions options = + StreamConfigurationOptions.builder().setRecovery(false).build(); + + ZerobusStream stream = + zerobusSdk.createStream(tableProperties, "client-id", "client-secret", options).get(); + + assertEquals(StreamState.OPENED, stream.getState()); + + CompletableFuture writeCompleted = + stream.ingestRecord( + CityPopulationTableRow.newBuilder() + .setCityName("test-city") + .setPopulation(1000) + .build()); + + // Wait for acknowledgment + writeCompleted.get(5, TimeUnit.SECONDS); + + // Verify no unacked records + Iterator unackedRecords = stream.getUnackedRecords(); + assertFalse(unackedRecords.hasNext()); + + stream.close(); + assertEquals(StreamState.CLOSED, stream.getState()); + } + + @Test + public void testBatchIngestion() throws Exception { + // Test ingesting multiple records in a batch + int batchSize = 100; + + for (int i = 0; i < batchSize; i++) { + mockedGrpcServer.injectAckRecord(i); + } + + TableProperties tableProperties = + new TableProperties<>("test-table", CityPopulationTableRow.getDefaultInstance()); + StreamConfigurationOptions options = + StreamConfigurationOptions.builder().setRecovery(false).build(); + + ZerobusStream stream = + zerobusSdk.createStream(tableProperties, "client-id", "client-secret", options).get(); + assertEquals(StreamState.OPENED, stream.getState()); + + // Send records + List> futures = new ArrayList<>(); + for (int i = 0; i < batchSize; i++) { + futures.add( + stream.ingestRecord( + CityPopulationTableRow.newBuilder() + .setCityName("city-" + i) + .setPopulation(1000 + i) + .build())); + } + + // Wait for all acknowledgments + for (CompletableFuture future : futures) { + future.get(5, TimeUnit.SECONDS); + } + + // Verify all records acknowledged + Iterator unackedRecords = stream.getUnackedRecords(); + assertFalse(unackedRecords.hasNext()); + + stream.close(); + assertEquals(StreamState.CLOSED, stream.getState()); + } + + @Test + public void testFlushWaitsForAllAcknowledgments() throws Exception { + // Test that flush() blocks until all inflight records are acknowledged + int numRecords = 10; + mockedGrpcServer.injectAckRecord(numRecords - 1); + + TableProperties tableProperties = + new TableProperties<>("test-table", CityPopulationTableRow.getDefaultInstance()); + StreamConfigurationOptions options = + StreamConfigurationOptions.builder().setRecovery(false).build(); + + ZerobusStream stream = + zerobusSdk.createStream(tableProperties, "client-id", "client-secret", options).get(); + assertEquals(StreamState.OPENED, stream.getState()); + + // Ingest records + for (int i = 0; i < numRecords; i++) { + stream.ingestRecord( + CityPopulationTableRow.newBuilder() + .setCityName("device-" + i) + .setPopulation(20 + i) + .build()); + } + + // Flush should wait for all acks + stream.flush(); + + // Verify no unacked records after flush + Iterator unackedRecords = stream.getUnackedRecords(); + assertFalse(unackedRecords.hasNext()); + + stream.close(); + } + + @Test + public void testEmptyFlushReturnsImmediately() throws Exception { + // Test that flush() on an empty stream returns immediately + TableProperties tableProperties = + new TableProperties<>("test-table", CityPopulationTableRow.getDefaultInstance()); + StreamConfigurationOptions options = + StreamConfigurationOptions.builder().setRecovery(false).build(); + + ZerobusStream stream = + zerobusSdk.createStream(tableProperties, "client-id", "client-secret", options).get(); + + assertEquals(StreamState.OPENED, stream.getState()); + + // Measure flush execution time + long startTime = System.currentTimeMillis(); + stream.flush(); + long endTime = System.currentTimeMillis(); + long flushDuration = endTime - startTime; + + assertTrue( + flushDuration < 100, + "Expected flush to return immediately, but took " + flushDuration + "ms"); + + assertEquals(StreamState.OPENED, stream.getState()); + stream.close(); + } + + @Test + public void testAckCallback() throws Exception { + // Test that ack callbacks are invoked for each acknowledgment + List ackedOffsets = Collections.synchronizedList(new ArrayList<>()); + Consumer ackCallback = + response -> ackedOffsets.add(response.getDurabilityAckUpToOffset()); + + int numRecords = 10; + for (int i = 0; i < numRecords; i++) { + mockedGrpcServer.injectAckRecord(i); + } + + TableProperties tableProperties = + new TableProperties<>("test-table", CityPopulationTableRow.getDefaultInstance()); + StreamConfigurationOptions options = + StreamConfigurationOptions.builder().setRecovery(false).setAckCallback(ackCallback).build(); + + ZerobusStream stream = + zerobusSdk.createStream(tableProperties, "client-id", "client-secret", options).get(); + assertEquals(StreamState.OPENED, stream.getState()); + + // Ingest records + List> futures = new ArrayList<>(); + for (int i = 0; i < numRecords; i++) { + futures.add( + stream.ingestRecord( + CityPopulationTableRow.newBuilder() + .setCityName("test-city-" + i) + .setPopulation(i) + .build())); + } + + // Wait for all records to be acknowledged + for (CompletableFuture future : futures) { + future.get(5, TimeUnit.SECONDS); + } + + stream.flush(); + assertEquals(StreamState.OPENED, stream.getState()); + + // Wait for callbacks to complete - wait until we see the final offset (numRecords - 1) + long deadline = System.currentTimeMillis() + 2000; + boolean foundFinalOffset = false; + while (System.currentTimeMillis() < deadline) { + synchronized (ackedOffsets) { + if (!ackedOffsets.isEmpty() && ackedOffsets.contains((long) (numRecords - 1))) { + foundFinalOffset = true; + break; + } + } + Thread.sleep(10); + } + + // Verify callback was called and final offset was received + assertTrue(foundFinalOffset, "Expected to receive ack for final offset " + (numRecords - 1)); + assertTrue(ackedOffsets.size() > 0, "Expected callback to be called at least once"); + + // Verify the final offset was acknowledged + assertTrue( + ackedOffsets.contains((long) (numRecords - 1)), + "Expected callbacks to include offset " + (numRecords - 1)); + + // Verify unacked records are empty + Iterator unackedRecords = stream.getUnackedRecords(); + assertFalse(unackedRecords.hasNext()); + + stream.close(); + assertEquals(StreamState.CLOSED, stream.getState()); + } + + @Test + public void testCallbackExceptionHandling() throws Exception { + // Test that exceptions in callbacks don't crash the stream + List callbackInvocations = new ArrayList<>(); + List thrownExceptions = new ArrayList<>(); + + Consumer ackCallback = + response -> { + long offsetId = response.getDurabilityAckUpToOffset(); + callbackInvocations.add(offsetId); + + // Throw exception for offset 1 to test error handling + if (offsetId == 1) { + RuntimeException exception = + new RuntimeException("Test exception in callback for offset " + offsetId); + thrownExceptions.add(exception.getMessage()); + throw exception; + } + }; + + int numRecords = 3; + for (int i = 0; i < numRecords; i++) { + mockedGrpcServer.injectAckRecord(i); + } + + TableProperties tableProperties = + new TableProperties<>("test-table", CityPopulationTableRow.getDefaultInstance()); + StreamConfigurationOptions options = + StreamConfigurationOptions.builder().setRecovery(false).setAckCallback(ackCallback).build(); + + ZerobusStream stream = + zerobusSdk.createStream(tableProperties, "client-id", "client-secret", options).get(); + + assertEquals(StreamState.OPENED, stream.getState()); + + List> ingestResults = new ArrayList<>(); + for (int i = 0; i < numRecords; i++) { + CompletableFuture writeCompleted = + stream.ingestRecord( + CityPopulationTableRow.newBuilder() + .setCityName("error-callback-device-" + i) + .setPopulation(30 + i) + .build()); + ingestResults.add(writeCompleted); + } + + // Wait for all records to be acknowledged (should succeed despite callback exception) + for (CompletableFuture future : ingestResults) { + future.get(5, TimeUnit.SECONDS); + } + + // Wait for callbacks to complete + long deadline = System.currentTimeMillis() + 1000; + while (callbackInvocations.size() < numRecords && System.currentTimeMillis() < deadline) { + Thread.yield(); + } + + // Verify callback was invoked for all acknowledgments (including the one that threw) + assertEquals(numRecords, callbackInvocations.size()); + assertTrue(callbackInvocations.contains(0L)); + assertTrue(callbackInvocations.contains(1L)); + assertTrue(callbackInvocations.contains(2L)); + + // Verify the exception was thrown for offset 1 + assertEquals(1, thrownExceptions.size()); + assertTrue(thrownExceptions.get(0).contains("Test exception in callback for offset 1")); + + // Verify stream remains functional + Iterator unackedRecords = stream.getUnackedRecords(); + assertFalse(unackedRecords.hasNext()); + assertEquals(StreamState.OPENED, stream.getState()); + + stream.close(); + assertEquals(StreamState.CLOSED, stream.getState()); + } +} diff --git a/src/test/proto/test_table.proto b/src/test/proto/test_table.proto new file mode 100644 index 0000000..773ed9c --- /dev/null +++ b/src/test/proto/test_table.proto @@ -0,0 +1,11 @@ +syntax = "proto2"; + +package databricks.test.table; + +option java_package = "com.databricks.test.table"; +option java_outer_classname = "TestTableRow"; + +message CityPopulationTableRow { + optional string city_name = 1; + optional int32 population = 2; +} diff --git a/src/test/resources/simplelogger.properties b/src/test/resources/simplelogger.properties new file mode 100644 index 0000000..3ac050e --- /dev/null +++ b/src/test/resources/simplelogger.properties @@ -0,0 +1,20 @@ +# SLF4J Simple Logger configuration for tests +# Reduce noise during test execution while keeping error logs visible + +# Default log level for all loggers +org.slf4j.simpleLogger.defaultLogLevel=error + +# Show date/time in logs +org.slf4j.simpleLogger.showDateTime=true +org.slf4j.simpleLogger.dateTimeFormat=HH:mm:ss.SSS + +# Show thread name +org.slf4j.simpleLogger.showThreadName=false + +# Show logger name +org.slf4j.simpleLogger.showLogName=false + +# Only show errors from ZerobusStream during tests +# This suppresses INFO logs like "Stream created successfully" but keeps ERROR logs +org.slf4j.simpleLogger.log.com.databricks.zerobus.ZerobusStream=error +org.slf4j.simpleLogger.log.com.databricks.zerobus.ZerobusSdk=error diff --git a/tools/README.md b/tools/README.md new file mode 100644 index 0000000..3829d4f --- /dev/null +++ b/tools/README.md @@ -0,0 +1,262 @@ +# Generate Proto Tool + +A standalone tool for generating Protocol Buffer (proto2) definition files from Unity Catalog table schemas. + +## Overview + +The `GenerateProto` tool fetches table schema information from Unity Catalog and automatically generates a corresponding `.proto` file with proper type mappings. This is useful when you need to create Protocol Buffer message definitions that match your Delta table schemas for use with the Zerobus SDK. + +The tool is **packaged within the Zerobus SDK JAR**, so users can run it directly after downloading the SDK without needing to clone the repository. + +## Features + +- Fetches table schema directly from Unity Catalog +- Supports all standard Delta data types +- Generates proto2 format files +- Handles complex types (arrays and maps) +- Uses OAuth 2.0 client credentials authentication +- No external dependencies beyond Java standard library +- Packaged in SDK JAR for easy distribution + +## Requirements + +- Java 8 or higher +- Zerobus SDK JAR (built with `mvn package`) +- OAuth client ID and client secret with access to Unity Catalog +- Access to a Unity Catalog endpoint + +## Usage + +### Method 1: Using the Helper Script (Recommended for Development) + +If you have the SDK source repository: + +```bash +# First, build the SDK JAR +mvn package + +# Then run the tool +./tools/generate_proto.sh \ + --uc-endpoint "https://your-workspace.cloud.databricks.com" \ + --client-id "your-client-id" \ + --client-secret "your-client-secret" \ + --table "catalog.schema.table_name" \ + --output "output.proto" \ + --proto-msg "TableMessage" +``` + +### Method 2: Running Directly from the SDK JAR (Recommended for Users) + +If you have downloaded the SDK JAR without the source code: + +```bash +# Using the shaded JAR (includes all dependencies) +java -cp databricks-zerobus-ingest-sdk-0.1.0-jar-with-dependencies.jar \ + com.databricks.zerobus.tools.GenerateProto \ + --uc-endpoint "https://your-workspace.cloud.databricks.com" \ + --client-id "your-client-id" \ + --client-secret "your-client-secret" \ + --table "catalog.schema.table_name" \ + --output "output.proto" \ + --proto-msg "TableMessage" +``` + +Or, if the JAR has a Main-Class manifest entry (which it does): + +```bash +# Even simpler - just use -jar flag +java -jar databricks-zerobus-ingest-sdk-0.1.0-jar-with-dependencies.jar \ + --uc-endpoint "https://your-workspace.cloud.databricks.com" \ + --client-id "your-client-id" \ + --client-secret "your-client-secret" \ + --table "catalog.schema.table_name" \ + --output "output.proto" \ + --proto-msg "TableMessage" +``` + +## Arguments + +| Argument | Required | Description | +|----------|----------|-------------| +| `--uc-endpoint` | Yes | Unity Catalog endpoint URL (e.g., `https://your-workspace.cloud.databricks.com`) | +| `--client-id` | Yes | OAuth client ID for authentication | +| `--client-secret` | Yes | OAuth client secret for authentication | +| `--table` | Yes | Full table name in format `catalog.schema.table_name` | +| `--output` | Yes | Output path for the generated proto file (e.g., `output.proto`) | +| `--proto-msg` | No | Name of the protobuf message (defaults to the table name) | + +## Type Mappings + +The tool automatically maps Delta/Unity Catalog types to Protocol Buffer types: + +| Delta Type | Proto2 Type | +|------------|-------------| +| `INT`, `SHORT`, `SMALLINT` | `int32` | +| `LONG`, `BIGINT` | `int64` | +| `STRING`, `VARCHAR(n)` | `string` | +| `FLOAT` | `float` | +| `DOUBLE` | `double` | +| `BOOLEAN` | `bool` | +| `BINARY` | `bytes` | +| `DATE` | `int32` | +| `TIMESTAMP` | `int64` | +| `ARRAY` | `repeated type` | +| `MAP` | `map` | + +## Examples + +### Basic Usage + +Generate a proto file for a simple table: + +**From the SDK JAR:** +```bash +java -jar databricks-zerobus-ingest-sdk-0.1.0-jar-with-dependencies.jar \ + --uc-endpoint "https://myworkspace.cloud.databricks.com" \ + --client-id "abc123" \ + --client-secret "secret123" \ + --table "my_catalog.my_schema.users" \ + --output "users.proto" +``` + +**Or, if you have the source repository:** +```bash +./tools/generate_proto.sh \ + --uc-endpoint "https://myworkspace.cloud.databricks.com" \ + --client-id "abc123" \ + --client-secret "secret123" \ + --table "my_catalog.my_schema.users" \ + --output "users.proto" +``` + +This might generate: + +```protobuf +syntax = "proto2"; + +message users { + required int32 user_id = 1; + required string username = 2; + optional string email = 3; + required int64 created_at = 4; +} +``` + +### Custom Message Name + +Specify a custom message name: + +```bash +java -jar databricks-zerobus-ingest-sdk-0.1.0-jar-with-dependencies.jar \ + --uc-endpoint "https://myworkspace.cloud.databricks.com" \ + --client-id "abc123" \ + --client-secret "secret123" \ + --table "my_catalog.my_schema.events" \ + --output "events.proto" \ + --proto-msg "EventRecord" +``` + +### Complex Types + +The tool handles complex types like arrays and maps: + +```bash +java -jar databricks-zerobus-ingest-sdk-0.1.0-jar-with-dependencies.jar \ + --uc-endpoint "https://myworkspace.cloud.databricks.com" \ + --client-id "abc123" \ + --client-secret "secret123" \ + --table "my_catalog.my_schema.products" \ + --output "products.proto" +``` + +If the table has columns like: +- `tags ARRAY` +- `attributes MAP` + +The generated proto will include: +```protobuf +syntax = "proto2"; + +message products { + required int32 product_id = 1; + required string name = 2; + repeated string tags = 3; + map attributes = 4; +} +``` + +## Authentication + +The tool uses OAuth 2.0 client credentials flow to authenticate with Unity Catalog. Unlike the SDK's token generation (which includes resource and authorization details for specific table privileges), this tool uses basic authentication with minimal scope to fetch table metadata. + +The authentication flow: +1. Exchanges client ID and secret for an OAuth token +2. Uses the token to fetch table schema from Unity Catalog API +3. Token is used only for metadata retrieval (read-only operation) + +## Integration with Zerobus SDK + +After generating the `.proto` file: + +1. Place it in your project's proto directory (e.g., `src/main/proto/`) +2. Compile it using the protobuf compiler: + ```bash + protoc --java_out=src/main/java your_proto_file.proto + ``` +3. Use the generated Java classes with the Zerobus SDK: + ```java + TableProperties tableProperties = + new TableProperties<>("catalog.schema.table", YourMessage.getDefaultInstance()); + + ZerobusStream stream = sdk.createStream( + tableProperties, clientId, clientSecret).join(); + ``` + +## Troubleshooting + +### Authentication Errors + +If you receive authentication errors: +- Verify your client ID and secret are correct +- Ensure your OAuth client has access to Unity Catalog +- Check that the endpoint URL is correct + +### Table Not Found + +If the table cannot be found: +- Verify the table name format is `catalog.schema.table` +- Ensure the table exists in Unity Catalog +- Check that your OAuth client has permission to read the table metadata + +### Unsupported Type Errors + +If you encounter unsupported type errors: +- Check if your table uses custom or complex types not listed in the type mappings +- Consider simplifying the column type or manually editing the generated proto file + +## Distribution + +The tool is distributed as part of the Zerobus SDK JAR. When you download or build the SDK, the `GenerateProto` tool is automatically included in the shaded JAR file (`databricks-zerobus-ingest-sdk-*-jar-with-dependencies.jar`). + +Users can run the tool directly from the JAR without needing access to the source code: + +```bash +# Download the SDK JAR (or build it with mvn package) +# Then simply run: +java -jar databricks-zerobus-ingest-sdk-0.1.0-jar-with-dependencies.jar \ + --uc-endpoint "..." \ + --client-id "..." \ + --client-secret "..." \ + --table "..." \ + --output "output.proto" +``` + +## Files + +- `src/main/java/com/databricks/zerobus/tools/GenerateProto.java` - Main tool implementation (packaged in SDK JAR) +- `tools/generate_proto.sh` - Helper script for running from source repository +- `tools/README.md` - This documentation file + +## License + +This tool is part of the Databricks Zerobus SDK for Java. diff --git a/tools/generate_proto.sh b/tools/generate_proto.sh new file mode 100755 index 0000000..9995232 --- /dev/null +++ b/tools/generate_proto.sh @@ -0,0 +1,27 @@ +#!/bin/bash + +# Generate Proto Tool - Helper script for running GenerateProto from the SDK JAR +# +# This script runs the GenerateProto tool to generate proto2 files +# from Unity Catalog table schemas. +# +# The tool is packaged within the Zerobus SDK JAR and can be executed +# directly without needing to clone the repository. + +set -e + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +PROJECT_ROOT="${SCRIPT_DIR}/.." +TARGET_DIR="${PROJECT_ROOT}/target" + +# Find the shaded JAR (with dependencies) +SHADED_JAR=$(find "${TARGET_DIR}" -name "databricks-zerobus-ingest-sdk-*-jar-with-dependencies.jar" 2>/dev/null | head -n 1) + +if [ -z "${SHADED_JAR}" ] || [ ! -f "${SHADED_JAR}" ]; then + echo "Error: Zerobus SDK JAR not found in ${TARGET_DIR}" + echo "Please run 'mvn package' first to build the SDK JAR" + exit 1 +fi + +# Run the tool from the JAR +java -cp "${SHADED_JAR}" com.databricks.zerobus.tools.GenerateProto "$@"