diff --git a/.github/workflows/push.yml b/.github/workflows/push.yml
index 33b9ac8..89ee4b0 100644
--- a/.github/workflows/push.yml
+++ b/.github/workflows/push.yml
@@ -7,25 +7,47 @@ on:
types: [checks_requested]
jobs:
+ tests-ubuntu:
+ uses: ./.github/workflows/test.yml
+ strategy:
+ fail-fast: false
+ matrix:
+ javaVersion: ['11', '17', '21']
+ with:
+ os: linux-ubuntu-latest
+ javaVersion: ${{ matrix.javaVersion }}
+
+ tests-windows:
+ uses: ./.github/workflows/test.yml
+ strategy:
+ fail-fast: false
+ matrix:
+ javaVersion: ['11', '17', '21']
+ with:
+ os: windows-server-latest
+ javaVersion: ${{ matrix.javaVersion }}
+
fmt:
runs-on:
group: databricks-protected-runner-group
labels: linux-ubuntu-latest
- steps:
- - name: Set up JDK 11
- uses: actions/setup-java@v1
- with:
- java-version: 11
+ steps:
- name: Checkout
uses: actions/checkout@v4
- - name: Cache Maven packages
- uses: actions/cache@v4
+ - name: Set up JDK 11
+ uses: actions/setup-java@v4
with:
- path: ~/.m2
- key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
- restore-keys: ${{ runner.os }}-m2
+ java-version: 11
+ distribution: 'temurin'
+ cache: 'maven'
- name: Check formatting
- run: mvn --errors spotless:check
+ run: mvn spotless:check
+
+ - name: Fail on formatting differences
+ if: failure()
+ run: |
+ echo "Code formatting issues detected. Run 'mvn spotless:apply' to fix."
+ exit 1
diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml
new file mode 100644
index 0000000..4c1a263
--- /dev/null
+++ b/.github/workflows/test.yml
@@ -0,0 +1,48 @@
+name: Test Workflow
+
+on:
+ workflow_call:
+ inputs:
+ os:
+ required: true
+ type: string
+ javaVersion:
+ required: true
+ type: string
+
+jobs:
+ test:
+ strategy:
+ fail-fast: false
+ runs-on:
+ group: databricks-protected-runner-group
+ labels: ${{ inputs.os }}
+ steps:
+ - name: Checkout
+ uses: actions/checkout@v4
+
+ - name: Unshallow
+ run: git fetch --prune --unshallow
+
+ - name: Set up JDK ${{ inputs.javaVersion }}
+ uses: actions/setup-java@v4
+ with:
+ java-version: ${{ inputs.javaVersion }}
+ distribution: 'temurin'
+ cache: 'maven'
+
+ - name: Build with Maven
+ run: mvn clean compile
+
+ - name: Run tests
+ run: mvn test
+
+ - name: Upload test results
+ if: always()
+ uses: actions/upload-artifact@v4
+ with:
+ name: test-results-${{ inputs.os }}-java-${{ inputs.javaVersion }}
+ path: |
+ target/surefire-reports/
+ target/test-classes/
+ retention-days: 7
diff --git a/CHANGELOG.md b/CHANGELOG.md
new file mode 100644
index 0000000..450ad94
--- /dev/null
+++ b/CHANGELOG.md
@@ -0,0 +1,17 @@
+# Version changelog
+
+## Release v0.1.0
+
+Initial release of the Databricks Zerobus Ingest SDK for Java.
+
+### API Changes
+
+- Added `ZerobusSdk` class for creating ingestion streams
+- Added `ZerobusStream` class for managing stateful gRPC streams
+- Added `RecordAcknowledgment` for blocking until record acknowledgment
+- Added `TableProperties` for configuring table schema and name
+- Added `StreamConfigurationOptions` for stream behavior configuration
+- Added `ZerobusException` and `NonRetriableException` for error handling
+- Added `StreamState` enum for tracking stream lifecycle
+- Added utility methods in `ZerobusSdkStubUtils` for gRPC stub management
+- Support for Java 8 and higher
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
index 6813a4e..8972e1b 100644
--- a/CONTRIBUTING.md
+++ b/CONTRIBUTING.md
@@ -1,6 +1,6 @@
-# Contributing to Zerobus SDK for Python
+# Contributing to Zerobus SDK for Java
-We happily welcome contributions to the Zerobus SDK for Python. We use [GitHub Issues](https://github.com/databricks/zerobus-sdk-py/issues) to track community reported issues and [GitHub Pull Requests](https://github.com/databricks/zerobus-sdk-py/pulls) for accepting changes.
+We happily welcome contributions to the Zerobus SDK for Java. We use [GitHub Issues](https://github.com/databricks/zerobus-sdk-java/issues) to track community reported issues and [GitHub Pull Requests](https://github.com/databricks/zerobus-sdk-java/pulls) for accepting changes.
Contributions are licensed on a license-in/license-out basis.
@@ -20,61 +20,60 @@ Small patches and bug fixes don't need prior communication.
### Prerequisites
-- Python 3.7 or higher
+- **Java**: 8 or higher - [Download Java](https://adoptium.net/)
+- **Maven**: 3.6 or higher - [Download Maven](https://maven.apache.org/download.cgi)
+- **Protocol Buffers Compiler** (`protoc`): 24.4 - [Download protoc](https://github.com/protocolbuffers/protobuf/releases/tag/v24.4)
- Git
-- pip
### Setting Up Your Development Environment
1. **Clone the repository:**
```bash
- git clone https://github.com/databricks/zerobus-sdk-py.git
- cd zerobus-sdk-py
+ git clone https://github.com/databricks/zerobus-sdk-java.git
+ cd zerobus-sdk-java
```
-2. **Create and activate a virtual environment:**
+2. **Build the project:**
```bash
- make dev
+ mvn clean install
```
This will:
- - Create a virtual environment in `.venv`
- - Install the package in development mode with all dev dependencies
+ - Generate protobuf Java classes
+ - Compile the source code
+ - Run tests
+ - Install the artifact to your local Maven repository
-3. **Activate the virtual environment:**
+3. **Run tests:**
```bash
- source .venv/bin/activate # On Windows: .venv\Scripts\activate
+ mvn test
```
## Coding Style
-Code style is enforced by a formatter check in your pull request. We use [Black](https://github.com/psf/black) to format our code. Run `make fmt` to ensure your code is properly formatted prior to raising a pull request.
+Code style is enforced by [Spotless](https://github.com/diffplug/spotless) in your pull request. We use Google Java Format for code formatting.
### Running the Formatter
Format your code before committing:
```bash
-make fmt
+mvn spotless:apply
```
-This runs:
-- **Black**: Code formatting
-- **autoflake**: Remove unused imports
-- **isort**: Sort imports
+This will format:
+- **Java code**: Using Google Java Format
+- **Imports**: Organized and unused imports removed
+- **pom.xml**: Sorted dependencies and plugins
-### Running Linters
+### Checking Formatting
-Check your code for issues:
+Check if your code is properly formatted:
```bash
-make lint
+mvn spotless:check
```
-This runs:
-- **pycodestyle**: Style guide enforcement
-- **autoflake**: Check for unused imports
-
## Pull Request Process
1. **Create a feature branch:**
@@ -85,25 +84,33 @@ This runs:
2. **Make your changes:**
- Write clear, concise commit messages
- Follow existing code style
+ - Add tests for new functionality
- Update documentation as needed
3. **Format your code:**
```bash
- make fmt
+ mvn spotless:apply
+ ```
+
+4. **Run tests:**
+ ```bash
+ mvn test
```
-4. **Commit your changes:**
+5. **Commit your changes:**
```bash
git add .
- git commit -m "Add feature: description of your changes"
+ git commit -s -m "Add feature: description of your changes"
```
-5. **Push to your fork:**
+ Note: The `-s` flag signs your commit (required - see below).
+
+6. **Push to your fork:**
```bash
git push origin feature/your-feature-name
```
-6. **Create a Pull Request:**
+7. **Create a Pull Request:**
- Provide a clear description of changes
- Reference any related issues
- Ensure all CI checks pass
@@ -130,10 +137,11 @@ git commit -s -m "Your commit message"
When reviewing code:
-- Check for adherence to code style
+- Check for adherence to code style (Google Java Format)
- Look for potential edge cases
- Consider performance implications
- Ensure documentation is updated
+- Verify tests cover new functionality
## Commit Message Guidelines
@@ -148,8 +156,8 @@ Example:
```
Add async stream creation example
-- Add async_example.py demonstrating non-blocking ingestion
-- Update README with async API documentation
+- Add BlockingIngestionExample.java demonstrating synchronous ingestion
+- Update README with blocking API documentation
Fixes #42
```
@@ -158,58 +166,109 @@ Fixes #42
### Updating Documentation
-- Update docstrings for all public APIs
-- Use Google-style docstrings
-- Include examples in docstrings where helpful
+- Add Javadoc for all public APIs
+- Use standard Javadoc format
+- Include `@param`, `@return`, `@throws` tags
- Update README.md for user-facing changes
- Update examples/ for new features
-Example docstring:
-```python
-def ingest_record(self, record) -> RecordAcknowledgment:
- """
- Submits a single record for ingestion into the stream.
+Example Javadoc:
+```java
+/**
+ * Ingests a single record into the stream.
+ *
+ *
Returns a CompletableFuture that completes when the record is durably written to storage.
+ * This method may block if the maximum number of in-flight records has been reached.
+ *
+ * @param record The protobuf message to ingest
+ * @return A CompletableFuture that completes when the record is acknowledged
+ * @throws ZerobusException if the stream is not in a valid state for ingestion
+ */
+public CompletableFuture ingestRecord(RecordType record) throws ZerobusException {
+ // ...
+}
+```
+
+## Testing
+
+### Writing Tests
+
+- Add unit tests for all new functionality
+- Use JUnit 5 for test framework
+- Use Mockito for mocking
+- Tests should be fast (< 1 second per test)
+- Use descriptive test names
+
+Example test:
+```java
+@Test
+public void testSingleRecordIngestAndAcknowledgment() throws Exception {
+ // Given
+ ZerobusStream stream = createTestStream();
+
+ // When
+ CompletableFuture result = stream.ingestRecord(testMessage);
- This method may block if the maximum number of in-flight records
- has been reached.
+ // Then
+ result.get(5, TimeUnit.SECONDS);
+ assertEquals(StreamState.OPENED, stream.getState());
+}
+```
- Args:
- record: The Protobuf message object to be ingested.
+### Running Tests
- Returns:
- RecordAcknowledgment: An object to wait on for the server's acknowledgment.
+```bash
+# Run all tests
+mvn test
- Raises:
- ZerobusException: If the stream is not in a valid state for ingestion.
+# Run specific test class
+mvn test -Dtest=ZerobusSdkTest
- Example:
- >>> record = AirQuality(device_name="sensor-1", temp=25)
- >>> ack = stream.ingest_record(record)
- >>> ack.wait_for_ack()
- """
+# Run specific test method
+mvn test -Dtest=ZerobusSdkTest#testSingleRecordIngestAndAcknowledgment
```
## Continuous Integration
All pull requests must pass CI checks:
-- **fmt**: Runs formatting checks (black, autoflake, isort)
+- **Build**: `mvn clean compile`
+- **Tests**: `mvn test` (on Java 11, 17, 21)
+- **Formatting**: `mvn spotless:check`
-The formatting check runs `make dev fmt` and then checks for any git differences. If there are differences, the check will fail.
+Tests run on both Ubuntu and Windows runners.
You can view CI results in the GitHub Actions tab of the pull request.
-## Makefile Targets
+## Maven Commands
+
+Useful Maven commands:
+
+```bash
+# Clean build
+mvn clean
+
+# Compile code
+mvn compile
-Available make targets:
+# Run tests
+mvn test
-- `make dev` - Set up development environment
-- `make install` - Install package
-- `make build` - Build wheel package
-- `make fmt` - Format code with black, autoflake, and isort
-- `make lint` - Run linting with pycodestyle
-- `make clean` - Remove build artifacts
-- `make help` - Show available targets
+# Format code
+mvn spotless:apply
+
+# Check formatting
+mvn spotless:check
+
+# Create JARs (regular + fat JAR)
+mvn package
+
+# Install to local Maven repo
+mvn install
+
+# Generate protobuf classes
+mvn protobuf:compile
+```
## Versioning
@@ -227,11 +286,13 @@ We follow [Semantic Versioning](https://semver.org/):
## Package Name
-The package is published on PyPI as `databricks-zerobus-ingest-sdk`.
+The package is published on Maven Central as:
+- **Group ID**: `com.databricks`
+- **Artifact ID**: `zerobus-ingest-sdk`
## Code of Conduct
- Be respectful and inclusive
- Welcome newcomers
- Focus on constructive feedback
-- Follow the [Python Community Code of Conduct](https://www.python.org/psf/conduct/)
+- Follow professional open source etiquette
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..93a5b5f
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,69 @@
+ Databricks License
+ Copyright (2022) Databricks, Inc.
+
+ Definitions.
+
+ Agreement: The agreement between Databricks, Inc., and you governing
+ the use of the Databricks Services, as that term is defined in
+ the Master Cloud Services Agreement (MCSA) located at
+ www.databricks.com/legal/mcsa.
+
+ Licensed Materials: The source code, object code, data, and/or other
+ works to which this license applies.
+
+ Scope of Use. You may not use the Licensed Materials except in
+ connection with your use of the Databricks Services pursuant to
+ the Agreement. Your use of the Licensed Materials must comply at all
+ times with any restrictions applicable to the Databricks Services,
+ generally, and must be used in accordance with any applicable
+ documentation. You may view, use, copy, modify, publish, and/or
+ distribute the Licensed Materials solely for the purposes of using
+ the Licensed Materials within or connecting to the Databricks Services.
+ If you do not agree to these terms, you may not view, use, copy,
+ modify, publish, and/or distribute the Licensed Materials.
+
+ Redistribution. You may redistribute and sublicense the Licensed
+ Materials so long as all use is in compliance with these terms.
+ In addition:
+
+ - You must give any other recipients a copy of this License;
+ - You must cause any modified files to carry prominent notices
+ stating that you changed the files;
+ - You must retain, in any derivative works that you distribute,
+ all copyright, patent, trademark, and attribution notices,
+ excluding those notices that do not pertain to any part of
+ the derivative works; and
+ - If a "NOTICE" text file is provided as part of its
+ distribution, then any derivative works that you distribute
+ must include a readable copy of the attribution notices
+ contained within such NOTICE file, excluding those notices
+ that do not pertain to any part of the derivative works.
+
+ You may add your own copyright statement to your modifications and may
+ provide additional license terms and conditions for use, reproduction,
+ or distribution of your modifications, or for any such derivative works
+ as a whole, provided your use, reproduction, and distribution of
+ the Licensed Materials otherwise complies with the conditions stated
+ in this License.
+
+ Termination. This license terminates automatically upon your breach of
+ these terms or upon the termination of your Agreement. Additionally,
+ Databricks may terminate this license at any time on notice. Upon
+ termination, you must permanently delete the Licensed Materials and
+ all copies thereof.
+
+ DISCLAIMER; LIMITATION OF LIABILITY.
+
+ THE LICENSED MATERIALS ARE PROVIDED “AS-IS” AND WITH ALL FAULTS.
+ DATABRICKS, ON BEHALF OF ITSELF AND ITS LICENSORS, SPECIFICALLY
+ DISCLAIMS ALL WARRANTIES RELATING TO THE LICENSED MATERIALS, EXPRESS
+ AND IMPLIED, INCLUDING, WITHOUT LIMITATION, IMPLIED WARRANTIES,
+ CONDITIONS AND OTHER TERMS OF MERCHANTABILITY, SATISFACTORY QUALITY OR
+ FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT. DATABRICKS AND
+ ITS LICENSORS TOTAL AGGREGATE LIABILITY RELATING TO OR ARISING OUT OF
+ YOUR USE OF OR DATABRICKS’ PROVISIONING OF THE LICENSED MATERIALS SHALL
+ BE LIMITED TO ONE THOUSAND ($1,000) DOLLARS. IN NO EVENT SHALL
+ THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR
+ OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+ ARISING FROM, OUT OF OR IN CONNECTION WITH THE LICENSED MATERIALS OR
+ THE USE OR OTHER DEALINGS IN THE LICENSED MATERIALS.
\ No newline at end of file
diff --git a/NEXT_CHANGELOG.md b/NEXT_CHANGELOG.md
new file mode 100644
index 0000000..1b000f6
--- /dev/null
+++ b/NEXT_CHANGELOG.md
@@ -0,0 +1,13 @@
+# NEXT CHANGELOG
+
+## Release v0.2.0
+
+### New Features and Improvements
+
+### Bug Fixes
+
+### Documentation
+
+### Internal Changes
+
+### API Changes
diff --git a/NOTICE b/NOTICE
new file mode 100644
index 0000000..1ec04b5
--- /dev/null
+++ b/NOTICE
@@ -0,0 +1,34 @@
+Copyright (2025) Databricks, Inc.
+
+This Software includes software developed at Databricks (https://www.databricks.com/) and its use is subject to the included LICENSE file.
+
+---
+
+This Software includes the following open source components:
+
+## Apache License 2.0
+
+### Protocol Buffers (protobuf-java)
+Copyright 2008 Google Inc.
+https://github.com/protocolbuffers/protobuf
+License: https://github.com/protocolbuffers/protobuf/blob/main/LICENSE
+
+### gRPC Java
+Copyright 2014 gRPC authors
+https://github.com/grpc/grpc-java
+License: https://github.com/grpc/grpc-java/blob/master/LICENSE
+
+### Netty
+Copyright 2014 The Netty Project
+https://github.com/netty/netty
+License: https://github.com/netty/netty/blob/4.1/LICENSE.txt
+
+### SLF4J
+Copyright (c) 2004-2017 QOS.ch
+https://github.com/qos-ch/slf4j
+License: https://github.com/qos-ch/slf4j/blob/master/LICENSE.txt
+
+## CDDL + GPLv2 with classpath exception
+
+### javax.annotation-api
+https://github.com/javaee/javax.annotation
diff --git a/README.md b/README.md
index ac11dd7..be3bcc3 100644
--- a/README.md
+++ b/README.md
@@ -1,6 +1,30 @@
# Databricks Zerobus Ingest SDK for Java
-The Databricks Zerobus Ingest SDK for Java provides a high-performance client for ingesting data directly into Databricks Delta tables using the Zerobus streaming protocol.
+[Public Preview](https://docs.databricks.com/release-notes/release-types.html): This SDK is supported for production use cases and is available to all customers. Databricks is actively working on stabilizing the Zerobus Ingest SDK for Java. Minor version updates may include backwards-incompatible changes.
+
+We are keen to hear feedback from you on this SDK. Please [file issues](https://github.com/databricks/zerobus-sdk-java/issues), and we will address them.
+
+The Databricks Zerobus Ingest SDK for Java provides a high-performance client for ingesting data directly into Databricks Delta tables using the Zerobus streaming protocol. | See also the [SDK for Rust](https://github.com/databricks/zerobus-sdk-rs) | See also the [SDK for Python](https://github.com/databricks/zerobus-sdk-py)
+
+## Table of Contents
+
+- [Features](#features)
+- [Requirements](#requirements)
+- [Quick Start User Guide](#quick-start-user-guide)
+ - [Prerequisites](#prerequisites)
+ - [Building Your Application](#building-your-application)
+ - [Define Your Protocol Buffer Schema](#define-your-protocol-buffer-schema)
+ - [Generate Protocol Buffer Schema from Unity Catalog (Alternative)](#generate-protocol-buffer-schema-from-unity-catalog-alternative)
+ - [Write Your Client Code](#write-your-client-code)
+ - [Compile and Run](#compile-and-run)
+- [Usage Examples](#usage-examples)
+ - [Blocking Ingestion](#blocking-ingestion)
+ - [Non-Blocking Ingestion](#non-blocking-ingestion)
+- [Configuration](#configuration)
+- [Logging](#logging)
+- [Error Handling](#error-handling)
+- [API Reference](#api-reference)
+- [Best Practices](#best-practices)
## Features
@@ -21,7 +45,6 @@ The Databricks Zerobus Ingest SDK for Java provides a high-performance client fo
**When using the fat JAR** (recommended for most users):
- No additional dependencies required - all dependencies are bundled
-- Includes `slf4j-simple` for logging out of the box
**When using the regular JAR**:
- [`protobuf-java` 3.24.0](https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java/3.24.0)
@@ -30,7 +53,7 @@ The Databricks Zerobus Ingest SDK for Java provides a high-performance client fo
- [`grpc-stub` 1.58.0](https://mvnrepository.com/artifact/io.grpc/grpc-stub/1.58.0)
- [`javax.annotation-api` 1.3.2](https://mvnrepository.com/artifact/javax.annotation/javax.annotation-api/1.3.2)
- [`slf4j-api` 1.7.36](https://mvnrepository.com/artifact/org.slf4j/slf4j-api/1.7.36)
-- [`slf4j-simple` 1.7.36](https://mvnrepository.com/artifact/org.slf4j/slf4j-simple/1.7.36) (or substitute your own SLF4J implementation like [`logback-classic` 1.2.11](https://mvnrepository.com/artifact/ch.qos.logback/logback-classic/1.2.11))
+- An SLF4J implementation such as [`slf4j-simple` 1.7.36](https://mvnrepository.com/artifact/org.slf4j/slf4j-simple/1.7.36) or [`logback-classic` 1.2.11](https://mvnrepository.com/artifact/ch.qos.logback/logback-classic/1.2.11)
### Build Requirements (only for building from source)
@@ -176,7 +199,7 @@ message AirQuality {
}
```
-Compile the protobuf:
+**Compile the protobuf:**
```bash
protoc --java_out=src/main/java src/main/proto/record.proto
@@ -184,6 +207,95 @@ protoc --java_out=src/main/java src/main/proto/record.proto
This generates `src/main/java/com/example/proto/Record.java`.
+### Generate Protocol Buffer Schema from Unity Catalog (Alternative)
+
+Instead of manually writing and compiling your protobuf schema, you can automatically generate it from an existing Unity Catalog table schema using the included `GenerateProto` tool.
+
+#### Using the Proto Generation Tool
+
+The `GenerateProto` tool fetches your table schema from Unity Catalog and generates a corresponding proto2 definition file with the correct type mappings.
+
+**Basic Usage:**
+
+```bash
+java -jar lib/databricks-zerobus-ingest-sdk-0.1.0-jar-with-dependencies.jar \
+ --uc-endpoint "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com" \
+ --client-id "your-service-principal-application-id" \
+ --client-secret "your-service-principal-secret" \
+ --table "main.default.air_quality" \
+ --output "src/main/proto/record.proto" \
+ --proto-msg "AirQuality"
+```
+
+**Parameters:**
+- `--uc-endpoint`: Your workspace URL (e.g., `https://dbc-a1b2c3d4-e5f6.cloud.databricks.com`)
+- `--client-id`: Service principal application ID
+- `--client-secret`: Service principal secret
+- `--table`: Fully qualified table name (catalog.schema.table)
+- `--output`: Output path for the generated proto file
+- `--proto-msg`: (Optional) Name for the protobuf message (defaults to table name)
+
+**Example:**
+
+For a table defined as:
+```sql
+CREATE TABLE main.default.air_quality (
+ device_name STRING,
+ temp INT,
+ humidity BIGINT
+)
+USING DELTA;
+```
+
+Running the generation tool will create `src/main/proto/record.proto`:
+```protobuf
+syntax = "proto2";
+
+package com.example;
+
+option java_package = "com.example.proto";
+option java_outer_classname = "Record";
+
+message AirQuality {
+ optional string device_name = 1;
+ optional int32 temp = 2;
+ optional int64 humidity = 3;
+}
+```
+
+After generating the proto file, compile it as shown above:
+```bash
+protoc --java_out=src/main/java src/main/proto/record.proto
+```
+
+**Type Mappings:**
+
+The tool automatically maps Unity Catalog types to proto2 types:
+
+| Delta Type | Proto2 Type |
+|-----------|-------------|
+| INT, SMALLINT, SHORT | int32 |
+| BIGINT, LONG | int64 |
+| FLOAT | float |
+| DOUBLE | double |
+| STRING, VARCHAR | string |
+| BOOLEAN | bool |
+| BINARY | bytes |
+| DATE | int32 |
+| TIMESTAMP | int64 |
+| ARRAY\ | repeated type |
+| MAP\ | map\ |
+| STRUCT\ | nested message |
+
+**Benefits:**
+- No manual schema creation required
+- Ensures schema consistency between your table and protobuf definitions
+- Automatically handles complex types (arrays, maps, structs)
+- Reduces errors from manual type mapping
+- No need to clone the repository - runs directly from the SDK JAR
+
+For detailed documentation and examples, see [tools/README.md](tools/README.md).
+
#### 4. Write Your Client Code
Create `src/main/java/com/example/ZerobusClient.java`:
@@ -353,24 +465,40 @@ try {
## Logging
-The Databricks Zerobus Ingest SDK for Java uses the standard [SLF4J logging framework](https://www.slf4j.org/) and includes `slf4j-simple` by default, so **logging works out of the box** with no additional configuration required.
+The Databricks Zerobus Ingest SDK for Java uses the standard [SLF4J logging framework](https://www.slf4j.org/). The SDK only depends on `slf4j-api`, which means **you need to add an SLF4J implementation** to your classpath to see log output.
-### Controlling Log Levels
+### Adding a Logging Implementation
-By default, the SDK logs at **INFO** level. To enable debug logging, set the system property when running your application:
+**Option 1: Using slf4j-simple** (simplest for getting started)
+
+Add to your Maven dependencies:
+```xml
+
+ org.slf4j
+ slf4j-simple
+ 1.7.36
+
+```
+Control log levels with system properties:
```bash
java -Dorg.slf4j.simpleLogger.log.com.databricks.zerobus=debug -cp "lib/*:out" com.example.ZerobusClient
```
Available log levels: `trace`, `debug`, `info`, `warn`, `error`
-### Using a Different Logging Framework
+**Option 2: Using Logback** (recommended for production)
-If you prefer a different logging framework like **Logback** or **Log4j**, you can substitute `slf4j-simple`:
-
-**With Logback**, add the following to your `logback.xml`:
+Add to your Maven dependencies:
+```xml
+
+ ch.qos.logback
+ logback-classic
+ 1.2.11
+
+```
+Create `logback.xml` in your resources directory:
```xml
@@ -387,13 +515,43 @@ If you prefer a different logging framework like **Logback** or **Log4j**, you c
```
-**With Log4j**, add to your `log4j.properties`:
+**Option 3: Using Log4j 2**
-```properties
-log4j.logger.com.databricks.zerobus=DEBUG
+Add to your Maven dependencies:
+```xml
+
+ org.apache.logging.log4j
+ log4j-slf4j-impl
+ 2.20.0
+
+```
+
+Create `log4j2.xml` in your resources directory:
+```xml
+
+
+
+
+
+
+
+
+
+
+
+
+
```
-To use a different logging framework, exclude `slf4j-simple` and add your preferred implementation to the classpath.
+### No Logging Implementation
+
+If you don't add an SLF4J implementation, you'll see a warning like:
+```
+SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
+SLF4J: Defaulting to no-operation (NOP) logger implementation
+```
+
+The SDK will still work, but no log messages will be output.
### What Gets Logged
@@ -676,7 +834,4 @@ NonRetriableException(String message, Throwable cause)
4. **Error handling**: Implement proper retry logic for retriable errors
5. **Monitoring**: Use `ackCallback` to track ingestion progress
6. **Token refresh**: Tokens are automatically refreshed on stream creation and recovery
-
-## Disclaimer
-
-Databricks is actively working on stabilizing the Zerobus Ingest SDK for Java. Minor version updates may include backwards-incompatible changes.
+7. **Proto generation**: Use the built-in `GenerateProto` tool to automatically generate proto files from your table schemas
diff --git a/pom.xml b/pom.xml
index 908e47a..3166520 100644
--- a/pom.xml
+++ b/pom.xml
@@ -3,11 +3,30 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0com.databricks
- databricks-zerobus-ingest-sdk
+ zerobus-ingest-sdk0.1.0jarZerobus Ingest SDK for JavaDatabricks Zerobus Ingest SDK for Java - Direct ingestion to Delta tables
+ https://github.com/databricks/zerobus-sdk-java
+
+
+ Databricks License
+ https://github.com/databricks/zerobus-sdk-java/blob/main/LICENSE
+ repo
+
+
+
+
+ Databricks
+ https://databricks.com
+
+
+
+ scm:git:git://github.com/databricks/zerobus-sdk-java.git
+ scm:git:ssh://github.com:databricks/zerobus-sdk-java.git
+ https://github.com/databricks/zerobus-sdk-java/tree/main
+ 1.81.8
@@ -48,11 +67,43 @@
slf4j-api1.7.36
-
+
org.slf4jslf4j-simple1.7.36
+ test
+
+
+
+ org.junit.jupiter
+ junit-jupiter-api
+ 5.10.0
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter-engine
+ 5.10.0
+ test
+
+
+ org.mockito
+ mockito-core
+ 5.5.0
+ test
+
+
+ org.mockito
+ mockito-junit-jupiter
+ 5.5.0
+ test
+
+
+ io.grpc
+ grpc-testing
+ 1.58.0
+ test
@@ -69,11 +120,18 @@
+ compile-protobufcompilecompile-custom
+
+ test-compile-protobuf
+
+ test-compile
+
+
@@ -114,6 +172,12 @@
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ 3.2.1
+ org.apache.maven.pluginsmaven-jar-plugin
@@ -138,6 +202,7 @@
${project.name}${project.version}
+ com.databricks.zerobus.tools.GenerateProto
@@ -157,6 +222,70 @@
+
+
+ org.apache.maven.plugins
+ maven-source-plugin
+ 3.3.0
+
+
+ attach-sources
+
+ jar-no-fork
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-javadoc-plugin
+ 3.6.2
+
+ none
+ true
+
+
+
+ attach-javadocs
+
+ jar
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-gpg-plugin
+ 3.1.0
+
+
+ sign-artifacts
+
+ sign
+
+ verify
+
+
+ --pinentry-mode
+ loopback
+
+
+
+
+
+
+
+ org.sonatype.central
+ central-publishing-maven-plugin
+ 0.5.0
+ true
+
+ central
+ false
+
+
diff --git a/src/main/java/com/databricks/zerobus/ZerobusSdk.java b/src/main/java/com/databricks/zerobus/ZerobusSdk.java
index 5d312f2..f086137 100644
--- a/src/main/java/com/databricks/zerobus/ZerobusSdk.java
+++ b/src/main/java/com/databricks/zerobus/ZerobusSdk.java
@@ -159,18 +159,25 @@ public CompletableFuture>
try {
logger.debug("Creating stream for table: " + tableProperties.getTableName());
- // Generate authentication token
- String token =
- TokenFactory.getZerobusToken(
- tableProperties.getTableName(),
- workspaceId,
- unityCatalogEndpoint,
- clientId,
- clientSecret);
+ // Create a token supplier that generates a fresh token for each gRPC request
+ java.util.function.Supplier tokenSupplier =
+ () -> {
+ try {
+ return TokenFactory.getZerobusToken(
+ tableProperties.getTableName(),
+ workspaceId,
+ unityCatalogEndpoint,
+ clientId,
+ clientSecret);
+ } catch (NonRetriableException e) {
+ throw new RuntimeException("Failed to get Zerobus token", e);
+ }
+ };
- // Create gRPC stub with authentication
+ // Create gRPC stub once with token supplier - it will fetch fresh tokens as needed
ZerobusGrpc.ZerobusStub stub =
- stubFactory.createStub(serverEndpoint, true, tableProperties.getTableName(), token);
+ stubFactory.createStubWithTokenSupplier(
+ serverEndpoint, tableProperties.getTableName(), tokenSupplier);
ZerobusStream stream =
new ZerobusStream<>(
diff --git a/src/main/java/com/databricks/zerobus/ZerobusSdkStubUtils.java b/src/main/java/com/databricks/zerobus/ZerobusSdkStubUtils.java
index 9c9650a..6893cd5 100644
--- a/src/main/java/com/databricks/zerobus/ZerobusSdkStubUtils.java
+++ b/src/main/java/com/databricks/zerobus/ZerobusSdkStubUtils.java
@@ -20,69 +20,58 @@ class ZerobusSdkStubFactory {
// gRPC channel configuration constants
private static final int DEFAULT_TLS_PORT = 443;
- private static final int DEFAULT_PLAINTEXT_PORT = 80;
private static final long KEEP_ALIVE_TIME_SECONDS = 30;
private static final long KEEP_ALIVE_TIMEOUT_SECONDS = 10;
- private static final int MAX_INBOUND_MESSAGE_SIZE_MB = 100;
- private static final int BYTES_PER_MB = 1024 * 1024;
- // Protocol prefixes
+ // Protocol prefix
private static final String HTTPS_PREFIX = "https://";
- private static final String HTTP_PREFIX = "http://";
/**
- * Creates a new managed gRPC channel.
+ * Creates a new managed gRPC channel with TLS.
*
*
The channel is configured for long-lived streaming with appropriate keep-alive settings and
- * message size limits.
+ * unlimited message size limits.
*
- * @param endpoint The endpoint URL (may include protocol prefix)
- * @param useTls Whether to use TLS encryption
+ * @param endpoint The endpoint URL (may include https:// prefix)
* @return A configured ManagedChannel
*/
- ManagedChannel createGrpcChannel(String endpoint, boolean useTls) {
- EndpointInfo endpointInfo = parseEndpoint(endpoint, useTls);
+ ManagedChannel createGrpcChannel(String endpoint) {
+ EndpointInfo endpointInfo = parseEndpoint(endpoint);
NettyChannelBuilder builder =
- NettyChannelBuilder.forAddress(endpointInfo.host, endpointInfo.port);
+ NettyChannelBuilder.forAddress(endpointInfo.host, endpointInfo.port).useTransportSecurity();
- // Configure TLS or plaintext
- if (useTls) {
- builder.useTransportSecurity();
- } else {
- builder.usePlaintext();
- }
-
- // Configure for long-lived streaming connections
+ // Configure for long-lived streaming connections with unlimited message size
return builder
.keepAliveTime(KEEP_ALIVE_TIME_SECONDS, TimeUnit.SECONDS)
.keepAliveTimeout(KEEP_ALIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS)
.keepAliveWithoutCalls(true)
- .maxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE_MB * BYTES_PER_MB)
+ .maxInboundMessageSize(Integer.MAX_VALUE)
.build();
}
/**
- * Creates a new Zerobus gRPC stub with authentication.
+ * Creates a new Zerobus gRPC stub with dynamic token supplier.
*
- *
The stub is configured with an interceptor that adds authentication headers to all outgoing
- * requests.
+ *
The stub is configured with an interceptor that obtains a fresh token for each request using
+ * the provided token supplier. This allows token rotation without recreating the stub.
*
*
Note: Currently creates a new channel for each stub. Consider reusing channels across
* multiple streams for better resource utilization.
*
* @param endpoint The endpoint URL
- * @param useTls Whether to use TLS encryption
* @param tableName The target table name
- * @param token Authentication token (Bearer token)
- * @return A configured ZerobusStub
+ * @param tokenSupplier Supplier that provides a fresh authentication token for each request
+ * @return A configured ZerobusStub with unlimited message sizes
*/
- ZerobusGrpc.ZerobusStub createStub(
- String endpoint, boolean useTls, String tableName, String token) {
- ManagedChannel channel = createGrpcChannel(endpoint, useTls);
- ClientInterceptor authInterceptor = new AuthenticationInterceptor(token, tableName);
+ ZerobusGrpc.ZerobusStub createStubWithTokenSupplier(
+ String endpoint, String tableName, java.util.function.Supplier tokenSupplier) {
+ ManagedChannel channel = createGrpcChannel(endpoint);
+ ClientInterceptor authInterceptor = new AuthenticationInterceptor(tokenSupplier, tableName);
Channel interceptedChannel = io.grpc.ClientInterceptors.intercept(channel, authInterceptor);
- return ZerobusGrpc.newStub(interceptedChannel);
+ return ZerobusGrpc.newStub(interceptedChannel)
+ .withMaxInboundMessageSize(Integer.MAX_VALUE)
+ .withMaxOutboundMessageSize(Integer.MAX_VALUE);
}
/**
@@ -97,26 +86,20 @@ static ZerobusSdkStubFactory create() {
/**
* Parses an endpoint string to extract host and port information.
*
- * @param endpoint The endpoint string (may include protocol)
- * @param useTls Whether TLS is being used (affects default port)
+ * @param endpoint The endpoint string (may include https:// prefix)
* @return Parsed endpoint information
*/
- private EndpointInfo parseEndpoint(String endpoint, boolean useTls) {
+ private EndpointInfo parseEndpoint(String endpoint) {
// Remove protocol prefix if present
String cleanEndpoint = endpoint;
if (cleanEndpoint.startsWith(HTTPS_PREFIX)) {
cleanEndpoint = cleanEndpoint.substring(HTTPS_PREFIX.length());
- } else if (cleanEndpoint.startsWith(HTTP_PREFIX)) {
- cleanEndpoint = cleanEndpoint.substring(HTTP_PREFIX.length());
}
- // Split host and port
+ // Parse host:port format
String[] parts = cleanEndpoint.split(":", 2);
String host = parts[0];
- int port =
- parts.length > 1
- ? Integer.parseInt(parts[1])
- : (useTls ? DEFAULT_TLS_PORT : DEFAULT_PLAINTEXT_PORT);
+ int port = parts.length > 1 ? Integer.parseInt(parts[1]) : DEFAULT_TLS_PORT;
return new EndpointInfo(host, port);
}
@@ -151,17 +134,17 @@ class AuthenticationInterceptor implements ClientInterceptor {
Metadata.Key.of("x-databricks-zerobus-table-name", Metadata.ASCII_STRING_MARSHALLER);
private static final String BEARER_PREFIX = "Bearer ";
- private final String token;
+ private final java.util.function.Supplier tokenSupplier;
private final String tableName;
/**
- * Creates a new authentication interceptor.
+ * Creates a new authentication interceptor with a dynamic token supplier.
*
- * @param token The authentication token (without "Bearer " prefix)
+ * @param tokenSupplier Supplier that provides a fresh authentication token for each request
* @param tableName The target table name
*/
- AuthenticationInterceptor(String token, String tableName) {
- this.token = token;
+ AuthenticationInterceptor(java.util.function.Supplier tokenSupplier, String tableName) {
+ this.tokenSupplier = tokenSupplier;
this.tableName = tableName;
}
@@ -172,7 +155,7 @@ public ClientCall interceptCall(
next.newCall(method, callOptions)) {
@Override
public void start(Listener responseListener, Metadata headers) {
- headers.put(AUTHORIZATION_HEADER, BEARER_PREFIX + token);
+ headers.put(AUTHORIZATION_HEADER, BEARER_PREFIX + tokenSupplier.get());
headers.put(TABLE_NAME_HEADER, tableName);
super.start(responseListener, headers);
}
diff --git a/src/main/java/com/databricks/zerobus/ZerobusStream.java b/src/main/java/com/databricks/zerobus/ZerobusStream.java
index 2657d2d..8abdd39 100644
--- a/src/main/java/com/databricks/zerobus/ZerobusStream.java
+++ b/src/main/java/com/databricks/zerobus/ZerobusStream.java
@@ -352,28 +352,10 @@ private CompletableFuture createStream() {
() -> {
CompletableFuture createStreamTry = new CompletableFuture<>();
- // Generate a fresh token for this stream creation attempt
- try {
- String token =
- TokenFactory.getZerobusToken(
- tableProperties.getTableName(),
- workspaceId,
- unityCatalogEndpoint,
- clientId,
- clientSecret);
-
- // Create a new stub with the fresh token
- stub =
- stubFactory.createStub(
- serverEndpoint, true, tableProperties.getTableName(), token);
-
- logger.debug("Generated new token and created stub for stream");
- } catch (NonRetriableException e) {
- createStreamTry.completeExceptionally(e);
- return createStreamTry;
- }
+ // The stub was created once with a token supplier, so we don't recreate it here
+ // The token supplier will provide a fresh token for each gRPC request
- // Create the gRPC stream with the new stub
+ // Create the gRPC stream with the existing stub
streamCreatedEvent = Optional.of(new CompletableFuture<>());
stream =
Optional.of(
diff --git a/src/main/java/com/databricks/zerobus/tools/GenerateProto.java b/src/main/java/com/databricks/zerobus/tools/GenerateProto.java
new file mode 100644
index 0000000..e83c14c
--- /dev/null
+++ b/src/main/java/com/databricks/zerobus/tools/GenerateProto.java
@@ -0,0 +1,706 @@
+package com.databricks.zerobus.tools;
+
+import java.io.BufferedReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Generate proto2 file from Unity Catalog table schema.
+ *
+ *
This tool fetches table schema from Unity Catalog and generates a corresponding proto2
+ * definition file. It supports all Delta data types and maps them to appropriate Protocol Buffer
+ * types.
+ *
+ *