diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS
index ca79aba529f..a16c4cc85a5 100644
--- a/.github/CODEOWNERS
+++ b/.github/CODEOWNERS
@@ -44,23 +44,22 @@
/security-command-center @GoogleCloudPlatform/java-samples-reviewers @yoshi-approver @GoogleCloudPlatform/cloud-samples-reviewers @GoogleCloudPlatform/dee-infra @GoogleCloudPlatform/gcp-security-command-center
/servicedirectory @GoogleCloudPlatform/java-samples-reviewers @yoshi-approver @GoogleCloudPlatform/cloud-samples-reviewers @GoogleCloudPlatform/dee-infra
/webrisk @GoogleCloudPlatform/java-samples-reviewers @yoshi-approver @GoogleCloudPlatform/cloud-samples-reviewers @GoogleCloudPlatform/dee-infra
+/tpu @GoogleCloudPlatform/java-samples-reviewers @yoshi-approver @GoogleCloudPlatform/cloud-samples-reviewers @GoogleCloudPlatform/dee-infra
# DEE Platform Ops (DEEPO)
/errorreporting @GoogleCloudPlatform/java-samples-reviewers @yoshi-approver @GoogleCloudPlatform/cloud-samples-reviewers
/monitoring @GoogleCloudPlatform/java-samples-reviewers @yoshi-approver @GoogleCloudPlatform/cloud-samples-reviewers
-/opencensus @GoogleCloudPlatform/java-samples-reviewers @yoshi-approver @GoogleCloudPlatform/cloud-samples-reviewers
-/trace @GoogleCloudPlatform/java-samples-reviewers @yoshi-approver @GoogleCloudPlatform/cloud-samples-reviewers
# Cloud SDK Databases & Data Analytics teams
# ---* Cloud Native DB
-/bigtable @GoogleCloudPlatform/java-samples-reviewers @yoshi-approver @GoogleCloudPlatform/cloud-samples-reviewers @GoogleCloudPlatform/cloud-native-db-dpes
+/bigtable @GoogleCloudPlatform/java-samples-reviewers @yoshi-approver @GoogleCloudPlatform/cloud-samples-reviewers @GoogleCloudPlatform/cloud-native-db-dpes @GoogleCloudPlatform/bigtable-eng
/memorystore @GoogleCloudPlatform/java-samples-reviewers @yoshi-approver @GoogleCloudPlatform/cloud-samples-reviewers
/spanner @GoogleCloudPlatform/java-samples-reviewers @yoshi-approver @GoogleCloudPlatform/cloud-samples-reviewers @GoogleCloudPlatform/api-spanner-java
# ---* Cloud Storage
/storage @GoogleCloudPlatform/java-samples-reviewers @yoshi-approver @GoogleCloudPlatform/cloud-samples-reviewers @GoogleCloudPlatform/cloud-storage-dpes
/storage-transfer @GoogleCloudPlatform/java-samples-reviewers @yoshi-approver @GoogleCloudPlatform/cloud-samples-reviewers @GoogleCloudPlatform/cloud-storage-dpes
# ---* Infra DB
-/cloud-sql @GoogleCloudPlatform/java-samples-reviewers @yoshi-approver @GoogleCloudPlatform/cloud-samples-reviewers @GoogleCloudPlatform/infra-db-sdk
+/cloud-sql @GoogleCloudPlatform/java-samples-reviewers @yoshi-approver @GoogleCloudPlatform/cloud-samples-reviewers @GoogleCloudPlatform/cloud-sql-connectors
# Data & AI
/aiplatform @GoogleCloudPlatform/java-samples-reviewers @yoshi-approver @GoogleCloudPlatform/cloud-samples-reviewers @GoogleCloudPlatform/text-embedding
diff --git a/.github/blunderbuss.yml b/.github/blunderbuss.yml
index f9bbd1cfac6..d2733b96f92 100644
--- a/.github/blunderbuss.yml
+++ b/.github/blunderbuss.yml
@@ -38,7 +38,7 @@ assign_issues_by:
- labels:
- 'api: cloudsql'
to:
- - GoogleCloudPlatform/infra-db-sdk
+ - GoogleCloudPlatform/cloud-sql-connectors
- labels:
- 'api: spanner'
to:
@@ -115,7 +115,7 @@ assign_prs_by:
- labels:
- 'api: cloudsql'
to:
- - GoogleCloudPlatform/infra-db-sdk
+ - GoogleCloudPlatform/cloud-sql-connectors
- labels:
- 'api: spanner'
to:
diff --git a/.github/renovate.json5 b/.github/renovate.json5
index 5a9bf7c5360..85e6da61771 100644
--- a/.github/renovate.json5
+++ b/.github/renovate.json5
@@ -1,142 +1,136 @@
-// find legacy configuration at https://github.com/GoogleCloudPlatform/java-docs-samples/blob/91792d4da53a12f96032f4556815f7d91f27257b/renovate.json
{
- "extends": [
- "config:recommended",
- ":approveMajorUpdates",
- "schedule:earlyMondays",
- ":ignoreUnstable",
+ extends: [
+ 'config:recommended',
+ ':approveMajorUpdates',
+ 'schedule:earlyMondays',
+ ':ignoreUnstable',
],
- "labels": [
- "dependencies",
- "automerge"
+ labels: [
+ 'dependencies',
+ 'automerge',
],
- "minimumReleaseAge": "7 days",
- "dependencyDashboardLabels": [
- "type: process",
+ minimumReleaseAge: '7 days',
+ dependencyDashboardLabels: [
+ 'type: process',
],
- // discontinue upgrades for java8 code samples
- "ignorePaths": ["**/*java8*/**", "**/*java-8*/**"],
- "packageRules": [
+ ignorePaths: [
+ '**/*java8*/**',
+ '**/*java-8*/**',
+ ],
+ packageRules: [
{
- "matchCategories": [
- "java"
+ matchCategories: [
+ 'java',
+ ],
+ addLabels: [
+ 'lang: java',
],
- "addLabels": [
- "lang: java"
- ]
},
- // TODO: check if auto-merge rules will work at all
{
- "matchUpdateTypes": [
- "minor",
- "patch",
- "digest",
- "lockFileMaintenance"
+ matchUpdateTypes: [
+ 'minor',
+ 'patch',
+ 'digest',
+ 'lockFileMaintenance',
],
- "automerge": true
+ automerge: true,
},
{
- "matchDepTypes": [
- "devDependencies"
+ matchDepTypes: [
+ 'devDependencies',
],
- "automerge": true
+ automerge: true,
},
- // group all Dockerfile dependencies
{
- "matchCategories": [
- "docker"
- ],
- "matchUpdateTypes": [
- "minor",
- "patch",
- "digest",
- "lockFileMaintenance"
- ],
- "groupName": "docker",
- "pinDigests": true,
- "automerge": true
+ matchCategories: [
+ 'docker',
+ ],
+ matchUpdateTypes: [
+ 'minor',
+ 'patch',
+ 'digest',
+ 'lockFileMaintenance',
+ ],
+ groupName: 'docker',
+ pinDigests: true,
+ automerge: true,
},
- // group all terraform dependencies for google providers
{
- "matchCategories": [
- "terraform"
+ matchCategories: [
+ 'terraform',
+ ],
+ matchDepTypes: [
+ 'provider',
+ 'required_provider',
],
- "matchDepTypes": [
- "provider",
- "required_provider"
+ groupName: 'Terraform Google providers',
+ matchPackageNames: [
+ '/^google/',
],
- "matchPackagePatterns": "^google",
- "groupName": "Terraform Google providers",
},
- // *** Java dependency rules:
- // group *ALL* Java dependencies
{
- "matchCategories": [
- "java"
+ matchCategories: [
+ 'java',
],
- "matchUpdateTypes": [
- "minor",
- "patch",
- "digest",
- "lockFileMaintenance"
+ matchUpdateTypes: [
+ 'minor',
+ 'patch',
+ 'digest',
+ 'lockFileMaintenance',
],
- "groupName": "java",
- "automerge": true
+ groupName: 'java',
+ automerge: true,
},
- // do not allow Spring Boot 3 upgrades yet
{
- "matchCategories": [
- "java"
+ matchCategories: [
+ 'java',
],
- "matchPackagePatterns": [
- "org.springframework.boot"
+ matchCurrentVersion: '>=2.0.0, <3.0.0',
+ allowedVersions: '<3',
+ groupName: 'Spring Boot upgrades for v2',
+ description: '@akitsch: Spring Boot V3 requires Java 17',
+ matchPackageNames: [
+ '/org.springframework.boot/',
],
- "matchCurrentVersion": ">=2.0.0, <3.0.0",
- "allowedVersions": "<3",
- "groupName": "Spring Boot upgrades for v2",
- "description": "@akitsch: Spring Boot V3 requires Java 17"
},
- // limit micronaut upgrades for versions <= 4
{
- "matchPackagePatterns": [
- "^io.micronaut"
+ groupName: 'Micronaut packages',
+ allowedVersions: '<4',
+ matchFileNames: [
+ 'appengine-java11/**',
+ 'flexible/java-11/**',
],
- "groupName": "Micronaut packages",
- "allowedVersions": "<4",
- "matchPaths": [
- "appengine-java11/**",
- "flexible/java-11/**"
+ description: '@akitsch: Micronaut V4 requires Java 17',
+ matchPackageNames: [
+ '/^io.micronaut/',
],
- "description": "@akitsch: Micronaut V4 requires Java 17"
},
- // disable Scala dependency upgrades
{
- "matchPackagePatterns": [
- "scala"
+ enabled: false,
+ matchPackageNames: [
+ '/scala/',
],
- "enabled": false
},
{
- "matchPackagePatterns": [
- "^jackson-module-scala"
+ enabled: false,
+ matchPackageNames: [
+ '/^jackson-module-scala/',
],
- "enabled": false
},
- // disable SQL Spark dependency upgrades
{
- "matchPackagePatterns": [
- "^spark-sql"
+ enabled: false,
+ matchPackageNames: [
+ '/^spark-sql/',
],
- "enabled": false
},
{},
],
- "rebaseWhen": "behind-base-branch",
- "semanticCommits": "enabled",
- "vulnerabilityAlerts": {
- "labels": [
- "type:security"
+ rebaseWhen: 'behind-base-branch',
+ semanticCommits: 'enabled',
+ vulnerabilityAlerts: {
+ labels: [
+ 'type:security',
],
- "minimumReleaseAge": null
+ minimumReleaseAge: null,
},
-}
\ No newline at end of file
+}
diff --git a/appengine-java11/appengine-simple-jetty-main/pom.xml b/appengine-java11/appengine-simple-jetty-main/pom.xml
index fbc38d30133..6ad52162b64 100644
--- a/appengine-java11/appengine-simple-jetty-main/pom.xml
+++ b/appengine-java11/appengine-simple-jetty-main/pom.xml
@@ -21,7 +21,7 @@
UTF-81111
- 9.4.54.v20240208
+ 9.4.56.v20240826
diff --git a/appengine-java8/datastore/src/test/java/com/example/appengine/QueriesTest.java b/appengine-java8/datastore/src/test/java/com/example/appengine/QueriesTest.java
index c242a669d41..6c36df45fa8 100644
--- a/appengine-java8/datastore/src/test/java/com/example/appengine/QueriesTest.java
+++ b/appengine-java8/datastore/src/test/java/com/example/appengine/QueriesTest.java
@@ -430,32 +430,6 @@ public void queryInterface_multipleFilters_printsMatchedEntities() throws Except
assertThat(buf.toString()).doesNotContain("Charlie");
}
- @Test
- public void queryInterface_singleFilter_returnsMatchedEntities() throws Exception {
- // Arrange
- Entity a = new Entity("Person", "a");
- a.setProperty("height", 100);
- Entity b = new Entity("Person", "b");
- b.setProperty("height", 150);
- Entity c = new Entity("Person", "c");
- c.setProperty("height", 300);
- datastore.put(ImmutableList.of(a, b, c));
-
- // Act
- long minHeight = 150;
- // [START gae_java8_datastore_interface_2]
- Filter heightMinFilter =
- new FilterPredicate("height", FilterOperator.GREATER_THAN_OR_EQUAL, minHeight);
-
- Query q = new Query("Person").setFilter(heightMinFilter);
- // [END gae_java8_datastore_interface_2]
-
- // Assert
- List results =
- datastore.prepare(q.setKeysOnly()).asList(FetchOptions.Builder.withDefaults());
- assertWithMessage("query results").that(results).containsExactly(b, c);
- }
-
@Test
public void queryInterface_orFilter_printsMatchedEntities() throws Exception {
// Arrange
diff --git a/auth/src/main/java/UndeleteApiKey.java b/auth/src/main/java/UndeleteApiKey.java
new file mode 100644
index 00000000000..cd509c705b3
--- /dev/null
+++ b/auth/src/main/java/UndeleteApiKey.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// [START apikeys_undelete_api_key]
+import com.google.api.apikeys.v2.ApiKeysClient;
+import com.google.api.apikeys.v2.Key;
+import com.google.api.apikeys.v2.UndeleteKeyRequest;
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class UndeleteApiKey {
+
+ public static void main(String[] args)
+ throws IOException, ExecutionException, InterruptedException, TimeoutException {
+ // TODO(developer): Replace these variables before running the sample.
+ // Project ID or project number of the Google Cloud project.
+ String projectId = "YOUR_PROJECT_ID";
+ // The API key id to undelete.
+ String keyId = "YOUR_KEY_ID";
+
+ undeleteApiKey(projectId, keyId);
+ }
+
+ // Undeletes an API key.
+ public static void undeleteApiKey(String projectId, String keyId)
+ throws IOException, ExecutionException, InterruptedException, TimeoutException {
+ // Initialize client that will be used to send requests. This client only needs to be created
+ // once, and can be reused for multiple requests.
+ try (ApiKeysClient apiKeysClient = ApiKeysClient.create()) {
+
+ // Initialize the undelete request and set the argument.
+ UndeleteKeyRequest undeleteKeyRequest = UndeleteKeyRequest.newBuilder()
+ .setName(String.format("projects/%s/locations/global/keys/%s", projectId, keyId))
+ .build();
+
+ // Make the request and wait for the operation to complete.
+ Key undeletedKey = apiKeysClient.undeleteKeyAsync(undeleteKeyRequest)
+ .get(3, TimeUnit.MINUTES);
+
+ System.out.printf("Successfully undeleted the API key: %s", undeletedKey.getName());
+ }
+ }
+}
+// [END apikeys_undelete_api_key]
\ No newline at end of file
diff --git a/auth/src/test/java/ApiKeySnippetsIT.java b/auth/src/test/java/ApiKeySnippetsIT.java
index 46a059d2203..7f65313d0e1 100644
--- a/auth/src/test/java/ApiKeySnippetsIT.java
+++ b/auth/src/test/java/ApiKeySnippetsIT.java
@@ -36,7 +36,6 @@
public class ApiKeySnippetsIT {
private static final String PROJECT_ID = System.getenv("GOOGLE_CLOUD_PROJECT");
- private static final String CREDENTIALS = System.getenv("GOOGLE_APPLICATION_CREDENTIALS");
private static Key API_KEY;
private static String API_KEY_STRING;
private ByteArrayOutputStream stdOut;
@@ -79,8 +78,15 @@ public static void cleanup()
String apiKeyId = getApiKeyId(API_KEY);
DeleteApiKey.deleteApiKey(PROJECT_ID, apiKeyId);
- String goal = String.format("Successfully deleted the API key: %s", API_KEY.getName());
- assertThat(stdOut.toString()).contains(goal);
+
+ UndeleteApiKey.undeleteApiKey(PROJECT_ID, apiKeyId);
+ String undeletedKey = String.format("Successfully undeleted the API key: %s",
+ API_KEY.getName());
+ assertThat(stdOut.toString()).contains(undeletedKey);
+
+ DeleteApiKey.deleteApiKey(PROJECT_ID, apiKeyId);
+ String deletedKey = String.format("Successfully deleted the API key: %s", API_KEY.getName());
+ assertThat(stdOut.toString()).contains(deletedKey);
stdOut.close();
System.setOut(out);
diff --git a/automl/src/main/java/com/google/cloud/vision/samples/automl/ClassificationDeployModel.java b/automl/src/main/java/com/google/cloud/vision/samples/automl/ClassificationDeployModel.java
deleted file mode 100644
index 63da52ead0d..00000000000
--- a/automl/src/main/java/com/google/cloud/vision/samples/automl/ClassificationDeployModel.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Copyright 2019 Google LLC
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.google.cloud.vision.samples.automl;
-
-// [START automl_vision_classification_deploy_model]
-import com.google.api.gax.longrunning.OperationFuture;
-import com.google.cloud.automl.v1beta1.AutoMlClient;
-import com.google.cloud.automl.v1beta1.DeployModelRequest;
-import com.google.cloud.automl.v1beta1.ModelName;
-import com.google.cloud.automl.v1beta1.OperationMetadata;
-import com.google.protobuf.Empty;
-import java.io.IOException;
-import java.util.concurrent.ExecutionException;
-
-class ClassificationDeployModel {
-
- // Deploy a model
- static void classificationDeployModel(String projectId, String modelId)
- throws IOException, ExecutionException, InterruptedException {
- // String projectId = "YOUR_PROJECT_ID";
- // String modelId = "YOUR_MODEL_ID";
-
- // Initialize client that will be used to send requests. This client only needs to be created
- // once, and can be reused for multiple requests. After completing all of your requests, call
- // the "close" method on the client to safely clean up any remaining background resources.
- try (AutoMlClient client = AutoMlClient.create()) {
-
- // Get the full path of the model.
- ModelName modelFullId = ModelName.of(projectId, "us-central1", modelId);
-
- // Build deploy model request.
- DeployModelRequest deployModelRequest =
- DeployModelRequest.newBuilder().setName(modelFullId.toString()).build();
-
- // Deploy a model with the deploy model request.
- OperationFuture future =
- client.deployModelAsync(deployModelRequest);
-
- future.get();
-
- // Display the deployment details of model.
- System.out.println("Model deployment finished");
- }
- }
-}
-// [END automl_vision_classification_deploy_model]
diff --git a/automl/src/test/java/com/google/cloud/translate/automl/DatasetApiIT.java b/automl/src/test/java/com/google/cloud/translate/automl/DatasetApiIT.java
deleted file mode 100644
index bd02bde649e..00000000000
--- a/automl/src/test/java/com/google/cloud/translate/automl/DatasetApiIT.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Copyright 2018 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.google.cloud.translate.automl;
-
-import static com.google.common.truth.Truth.assertThat;
-
-import com.google.api.gax.rpc.NotFoundException;
-import io.grpc.StatusRuntimeException;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.util.concurrent.ExecutionException;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for Automl translation "Dataset API" sample. */
-@Ignore("This test is ignored because the legacy version of AutoML API is deprecated")
-@RunWith(JUnit4.class)
-@SuppressWarnings("checkstyle:abbreviationaswordinname")
-public class DatasetApiIT {
-
- private static final String PROJECT_ID = "java-docs-samples-testing";
- private static final String BUCKET = PROJECT_ID + "-vcm";
- private static final String COMPUTE_REGION = "us-central1";
- private ByteArrayOutputStream bout;
- private PrintStream originalPrintStream;
- private String datasetId = "TEN0000000000000000000";
-
- @Before
- public void setUp() {
- bout = new ByteArrayOutputStream();
- PrintStream out = new PrintStream(bout);
- originalPrintStream = System.out;
- System.setOut(out);
- }
-
- @After
- public void tearDown() {
- // restores print statements in the original method
- System.out.flush();
- System.setOut(originalPrintStream);
- }
-
- @Test
- public void testCreateImportDeleteDataset() throws IOException, InterruptedException {
- try {
- DatasetApi.importData(
- PROJECT_ID, COMPUTE_REGION, datasetId, "gs://" + BUCKET + "/en-ja-short.csv");
- String got = bout.toString();
- assertThat(got).contains("The Dataset doesn't exist ");
- } catch (NotFoundException | ExecutionException | StatusRuntimeException ex) {
- assertThat(ex.getMessage()).contains("The Dataset doesn't exist");
- }
- }
-}
diff --git a/automl/src/test/java/com/google/cloud/vision/samples/automl/ClassificationDeployModelIT.java b/automl/src/test/java/com/google/cloud/vision/samples/automl/ClassificationDeployModelIT.java
index 50095202fa3..7431265fa11 100644
--- a/automl/src/test/java/com/google/cloud/vision/samples/automl/ClassificationDeployModelIT.java
+++ b/automl/src/test/java/com/google/cloud/vision/samples/automl/ClassificationDeployModelIT.java
@@ -45,20 +45,6 @@ public void tearDown() {
System.setOut(null);
}
- @Test
- public void testClassificationDeployModelApi() {
- // As model deployment can take a long time, instead try to deploy a
- // nonexistent model and confirm that the model was not found, but other
- // elements of the request were valid.
- try {
- ClassificationDeployModel.classificationDeployModel(PROJECT_ID, MODEL_ID);
- String got = bout.toString();
- assertThat(got).contains("The model does not exist");
- } catch (IOException | ExecutionException | InterruptedException e) {
- assertThat(e.getMessage()).contains("The model does not exist");
- }
- }
-
@Test
public void testClassificationDeployModelNodeCountApi() {
// As model deployment can take a long time, instead try to deploy a
diff --git a/batch/snippets/src/test/java/com/example/batch/CreateResourcesIT.java b/batch/snippets/src/test/java/com/example/batch/CreateResourcesIT.java
index 03e6ab7ba64..8e4f8242e0b 100644
--- a/batch/snippets/src/test/java/com/example/batch/CreateResourcesIT.java
+++ b/batch/snippets/src/test/java/com/example/batch/CreateResourcesIT.java
@@ -36,6 +36,7 @@
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -135,6 +136,7 @@ private static void safeDeleteJob(String jobName) {
}
}
+ @Ignore("Canceling jobs not yet GA")
@Test
public void createBatchCustomServiceAccountTest()
throws IOException, ExecutionException, InterruptedException, TimeoutException {
@@ -148,6 +150,7 @@ public void createBatchCustomServiceAccountTest()
Assert.assertNotNull(job.getAllocationPolicy().getServiceAccount().getEmail());
}
+ @Ignore("Canceling jobs not yet GA")
@Test
public void createBatchUsingSecretManager()
throws IOException, ExecutionException, InterruptedException, TimeoutException {
@@ -164,6 +167,7 @@ public void createBatchUsingSecretManager()
-> taskGroup.getTaskSpec().getEnvironment().containsSecretVariables(variableName)));
}
+ @Ignore("Canceling jobs not yet GA")
@Test
public void createGpuJobTest()
throws IOException, ExecutionException, InterruptedException, TimeoutException {
@@ -181,6 +185,7 @@ public void createGpuJobTest()
-> instance.getPolicy().getMachineType().contains(machineType)));
}
+ @Ignore("Canceling jobs not yet GA")
@Test
public void createGpuJobN1Test()
throws IOException, ExecutionException, InterruptedException, TimeoutException {
@@ -199,6 +204,7 @@ public void createGpuJobN1Test()
-> accelerator.getType().contains(gpuType) && accelerator.getCount() == count)));
}
+ @Ignore("Canceling jobs not yet GA")
@Test
public void createLocalSsdJobTest()
throws IOException, ExecutionException, InterruptedException, TimeoutException {
@@ -217,6 +223,7 @@ public void createLocalSsdJobTest()
-> attachedDisk.getDeviceName().contains(LOCAL_SSD_NAME))));
}
+ @Ignore("Canceling jobs not yet GA")
@Test
public void createPersistentDiskJobTest()
throws IOException, ExecutionException, InterruptedException, TimeoutException {
@@ -243,6 +250,7 @@ public void createPersistentDiskJobTest()
-> attachedDisk.getDeviceName().contains(NEW_PERSISTENT_DISK_NAME))));
}
+ @Ignore("Canceling jobs not yet GA")
@Test
public void createBatchNotificationTest()
throws IOException, ExecutionException, InterruptedException, TimeoutException {
@@ -263,6 +271,7 @@ public void createBatchNotificationTest()
&& jobNotification.getMessage().getNewTaskState() == State.FAILED));
}
+ @Ignore("Canceling jobs not yet GA")
@Test
public void createBatchCustomEventTest()
throws IOException, ExecutionException, InterruptedException, TimeoutException {
@@ -284,6 +293,7 @@ public void createBatchCustomEventTest()
.anyMatch(runnable -> runnable.getDisplayName().equals(displayName))));
}
+ @Ignore("Canceling jobs not yet GA")
@Test
public void createScriptJobWithNfsTest()
throws IOException, ExecutionException, InterruptedException, TimeoutException {
@@ -303,6 +313,7 @@ public void createScriptJobWithNfsTest()
.anyMatch(volume -> volume.getNfs().getServer().equals(NFS_IP_ADDRESS))));
}
+ @Ignore("Canceling jobs not yet GA")
@Test
public void createBatchLabelJobTest()
throws IOException, ExecutionException, InterruptedException, TimeoutException {
@@ -324,6 +335,7 @@ public void createBatchLabelJobTest()
Assert.assertTrue(job.getLabelsMap().containsValue(labelValue2));
}
+ @Ignore("Canceling jobs not yet GA")
@Test
public void createBatchCustomNetworkTest()
throws IOException, ExecutionException, InterruptedException, TimeoutException {
@@ -346,6 +358,7 @@ public void createBatchCustomNetworkTest()
.anyMatch(AllocationPolicy.NetworkInterface::getNoExternalIpAddress));
}
+ @Ignore("Canceling jobs not yet GA")
@Test
public void createJobWithAllocationPolicyLabelTest()
throws IOException, ExecutionException, InterruptedException, TimeoutException {
@@ -368,6 +381,7 @@ public void createJobWithAllocationPolicyLabelTest()
Assert.assertTrue(job.getAllocationPolicy().getLabelsMap().containsValue(labelValue2));
}
+ @Ignore("Canceling jobs not yet GA")
@Test
public void createBatchRunnableLabelTest()
throws IOException, ExecutionException, InterruptedException, TimeoutException {
diff --git a/batch/snippets/src/test/java/com/example/batch/Util.java b/batch/snippets/src/test/java/com/example/batch/Util.java
index eb4342ac572..5a6635ff71b 100644
--- a/batch/snippets/src/test/java/com/example/batch/Util.java
+++ b/batch/snippets/src/test/java/com/example/batch/Util.java
@@ -109,7 +109,7 @@ public static void waitForJobCompletion(Job job)
String[] jobName = job.getName().split("/");
Instant startTime = Instant.now();
while (WAIT_STATES.contains(job.getStatus().getState())) {
- if (Instant.now().getEpochSecond() - startTime.getEpochSecond() > 900) {
+ if (Instant.now().getEpochSecond() - startTime.getEpochSecond() > 1200) {
throw new Error("Timed out waiting for operation to complete.");
}
job = getJob(jobName[1], jobName[3], jobName[5]);
diff --git a/bigtable/bigtable-proxy/.gitignore b/bigtable/bigtable-proxy/.gitignore
new file mode 100644
index 00000000000..af665abb669
--- /dev/null
+++ b/bigtable/bigtable-proxy/.gitignore
@@ -0,0 +1,38 @@
+target/
+!.mvn/wrapper/maven-wrapper.jar
+!**/src/main/**/target/
+!**/src/test/**/target/
+
+### IntelliJ IDEA ###
+.idea/modules.xml
+.idea/jarRepositories.xml
+.idea/compiler.xml
+.idea/libraries/
+*.iws
+*.iml
+*.ipr
+
+### Eclipse ###
+.apt_generated
+.classpath
+.factorypath
+.project
+.settings
+.springBeans
+.sts4-cache
+
+### NetBeans ###
+/nbproject/private/
+/nbbuild/
+/dist/
+/nbdist/
+/.nb-gradle/
+build/
+!**/src/main/**/build/
+!**/src/test/**/build/
+
+### VS Code ###
+.vscode/
+
+### Mac OS ###
+.DS_Store
diff --git a/bigtable/bigtable-proxy/README.md b/bigtable/bigtable-proxy/README.md
new file mode 100644
index 00000000000..d3e7b4d916e
--- /dev/null
+++ b/bigtable/bigtable-proxy/README.md
@@ -0,0 +1,106 @@
+# Bigtable proxy
+
+## Overview
+
+A simple server meant to be used as a sidecar to maintain a persistent connection to Bigtable and
+collect metrics. The primary purpose is to support applications that can't maintain a longlived
+gRPC connection (ie. php in apache).
+
+The proxy is intended to be used as a local sidecar process. The proxy is intended to be shared by
+all processes on the VM that it is running on. It's listening address is hardcoded to `localhost`.
+The proxy will use [Application Default Credentials](https://cloud.google.com/docs/authentication/application-default-credentials)
+for all outbound RPCs.
+
+The proxy will accept local unencrypted connections from Bigtable clients, and:
+- attach credentials
+- export metrics
+- send the RPC over an encrypted channel pool to Bigtable service
+
+## Features
+
+* Metrics - The proxy will track RPC metrics and export them to Google Cloud Monitoring
+* Multi tenant - The proxy can be used to connect to many different Bigtable instances
+* Credential handling - The proxy has its own set of credentials. It will ignore any inbound
+ credentials from the client
+* Channel pooling - The proxy will maintain and autosize the outbound channel pool to properly
+ load balance RPCs.
+
+## Metrics
+
+The proxy is instrumented with Opentelemtry and will export those metrics to Google Cloud Monitoring
+in a project your choosing. The metrics will be published under the namespace
+`workload.googleapis.com`. Available metrics:
+
+* `bigtableproxy.server.call.started` The total number of RPCs started, including those that have
+ not completed.
+* `bigtableproxy.client.call.credential.duration` Latency of getting credentials
+* `bigtableproxy.client.call.queue.duration` Duration of how long the outbound side of the proxy had
+ the RPC queued
+* `bigtableproxy.client.call.sent_total_message_size` Total bytes sent per call to Bigtable service
+ (excluding metadata, grpc and transport framing bytes
+* `bigtableproxy.client.call.rcvd_total_message_size` Total bytes received per call from Bigtable
+ service (excluding metadata, grpc and transport framing bytes)
+* `bigtableproxy.client.gfe.duration` Latency as measured by Google load balancer from the time it
+ received the first byte of the request until it received the first byte of the response from the
+ Cloud Bigtable service.
+* `bigtableproxy.client.gfe.duration_missing.count` Count of calls missing gfe response headers
+* `bigtableproxy.client.call.duration` Total duration of how long the outbound call took
+* `bigtableproxy.server.write_wait.duration` Total amount of time spent waiting for the downstream
+ client to be ready for data.
+* `bigtableproxy.client.channel.count` Number of open channels
+* `bigtableproxy.client.channel_change_count` Number of channel transitions by previous and next
+ states.
+* `bigtableproxy.client.call.max_outstanding_count` Maximum number of concurrent RPCs in a single
+ minute window
+* `bigtableproxy.presence` Counts number of proxy processes (emit 1 per process).
+
+## Requirements
+
+* JVM >= 11
+* Ensure that the service account includes the IAM roles:
+ * `Monitoring Metric Writer`
+ * `Bigtable User`
+* Ensure that the metrics project has `Stackdriver Monitoring API` enabled
+
+## Expected usage
+
+```sh
+# Build the binary
+mvn package
+
+# unpack the binary on the proxy host
+unzip target/bigtable-proxy-0.0.1-SNAPSHOT-bin.zip
+cd bigtable-proxy-0.0.1-SNAPSHOT
+
+# Verify that the proxy has require permissions using an existing table. Please note that the table
+# data will not be modified, however a test metric will be written.
+./bigtable-verify.sh \
+ --bigtable-project-id=$BIGTABLE_PROJECT_ID \
+ --bigtable-instance-id=$BIGTABLE_INSTANCE_ID \
+ --bigtable-table-id=$BIGTABLE_TABLE_ID \
+ --metrics-project-id=$METRICS_PROJECT_ID
+
+# Then start the proxy on the specified port. The proxy can forward requests for multiple
+# Bigtable projects/instances/tables. However it will export health metrics to a single project
+# specified by `metrics-project-id`.
+./bigtable-proxy.sh \
+ --listen-port=1234 \
+ --metrics-project-id=SOME_GCP_PROJECT
+
+# Start your application, and redirect the bigtable client to connect to the local proxy.
+export BIGTABLE_EMULATOR_HOST="localhost:1234"
+path/to/application/with/bigtable/client
+```
+
+## Configuration
+
+Required options:
+* `--listen-port=` The local port to listen for Bigtable client connections. This needs to
+ match port in the `BIGTABLE_EMULATOR_HOST="localhost:` environment variable passed to your
+ application.
+* `--metrics-project-id=` The Google Cloud project that should be used to collect metrics
+ emitted from the proxy.
+
+Optional configuration:
+* The environment variable `GOOGLE_APPLICATION_CREDENTIALS` can be used to use a non-default service
+ account. More details can be found here: https://cloud.google.com/docs/authentication/application-default-credentials
diff --git a/bigtable/bigtable-proxy/pom.xml b/bigtable/bigtable-proxy/pom.xml
new file mode 100644
index 00000000000..1eebfccb9a4
--- /dev/null
+++ b/bigtable/bigtable-proxy/pom.xml
@@ -0,0 +1,285 @@
+
+
+ 4.0.0
+
+
+ com.google.cloud.samples
+ shared-configuration
+ 1.2.2
+
+
+
+ com.google.cloud.bigtable
+ bigtable-proxy
+ 0.0.1-SNAPSHOT
+
+
+ 11
+ 11
+ UTF-8
+
+
+
+ 26.50.0
+
+ 1.44.1
+ 1.41.0-alpha
+ 0.33.0
+ 0.33.0
+
+ 2.0.16
+ 1.5.12
+ 1.11.0
+ 4.7.6
+
+ 4.13.2
+ 1.4.4
+
+
+
+
+
+ com.google.cloud
+ libraries-bom
+ ${libraries-bom.version}
+ pom
+ import
+
+
+ io.opentelemetry
+ opentelemetry-bom
+ ${otel.version}
+ pom
+ import
+
+
+ org.mockito
+ mockito-bom
+ 5.14.2
+ pom
+ import
+
+
+
+
+
+
+
+ io.grpc
+ grpc-api
+
+
+ io.grpc
+ grpc-core
+
+
+ io.grpc
+ grpc-netty-shaded
+
+
+ io.grpc
+ grpc-auth
+
+
+ com.google.auth
+ google-auth-library-oauth2-http
+
+
+
+
+
+ com.google.api.grpc
+ grpc-google-cloud-bigtable-v2
+
+
+ com.google.api.grpc
+ proto-google-cloud-bigtable-v2
+
+
+ com.google.api.grpc
+ grpc-google-cloud-bigtable-admin-v2
+
+
+ com.google.api.grpc
+ proto-google-cloud-bigtable-admin-v2
+
+
+ com.google.api.grpc
+ grpc-google-common-protos
+
+
+ com.google.api.grpc
+ proto-google-common-protos
+
+
+
+
+ io.opentelemetry
+ opentelemetry-sdk
+
+
+
+ io.opentelemetry
+ opentelemetry-sdk-metrics
+
+
+
+ com.google.cloud.opentelemetry
+ exporter-metrics
+ ${exporter-metrics.version}
+
+
+
+ com.google.cloud
+ google-cloud-core
+
+
+ io.opentelemetry.contrib
+ opentelemetry-gcp-resources
+ ${otel-contrib.version}
+
+
+ io.opentelemetry
+ opentelemetry-sdk-extension-autoconfigure-spi
+
+
+ com.google.cloud.opentelemetry
+ shared-resourcemapping
+ ${shared-resourcemapping.version}
+
+
+
+
+ org.slf4j
+ slf4j-api
+ ${slf4j.version}
+
+
+ org.slf4j
+ jul-to-slf4j
+ ${slf4j.version}
+
+
+ ch.qos.logback
+ logback-classic
+ ${logback.version}
+
+
+
+
+ com.google.guava
+ guava
+
+
+
+ com.google.auto.value
+ auto-value-annotations
+ ${auto-value.version}
+ provided
+
+
+ info.picocli
+ picocli
+ ${picocli.version}
+
+
+
+
+ io.grpc
+ grpc-testing
+ test
+
+
+ junit
+ junit
+ ${junit.version}
+ test
+
+
+ com.google.truth
+ truth
+ ${truth.version}
+ test
+
+
+ org.mockito
+ mockito-core
+
+ test
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.13.0
+
+
+
+ info.picocli
+ picocli-codegen
+ ${picocli.version}
+
+
+ com.google.auto.value
+ auto-value
+ ${auto-value.version}
+
+
+
+
+ -Aproject=${project.groupId}/${project.artifactId}
+
+
+
+
+
+ maven-surefire-plugin
+ 3.5.2
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+ 3.4.2
+
+
+
+ true
+
+ lib/
+ com.google.cloud.bigtable.examples.proxy.Main
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-assembly-plugin
+ 3.7.1
+
+
+
+
+ src/main/assembly/assembly.xml
+
+
+
+
+
+ assemble
+
+ single
+
+ package
+
+
+
+
+
+
diff --git a/bigtable/bigtable-proxy/src/main/assembly/assembly.xml b/bigtable/bigtable-proxy/src/main/assembly/assembly.xml
new file mode 100644
index 00000000000..47126e8861f
--- /dev/null
+++ b/bigtable/bigtable-proxy/src/main/assembly/assembly.xml
@@ -0,0 +1,52 @@
+
+ bin
+
+
+ zip
+
+
+
+
+
+
+ false
+ lib
+ false
+
+
+
+
+
+
+ ${project.basedir}
+
+
+ README*
+ LICENSE*
+ NOTICE*
+
+
+
+
+
+ ${project.build.scriptSourceDirectory}
+
+
+ *.sh
+
+ true
+
+
+
+
+
+ ${project.build.directory}
+
+
+ *.jar
+
+
+
+
diff --git a/bigtable/bigtable-proxy/src/main/java/com/google/cloud/bigtable/examples/proxy/Main.java b/bigtable/bigtable-proxy/src/main/java/com/google/cloud/bigtable/examples/proxy/Main.java
new file mode 100644
index 00000000000..b480f3777d8
--- /dev/null
+++ b/bigtable/bigtable-proxy/src/main/java/com/google/cloud/bigtable/examples/proxy/Main.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.bigtable.examples.proxy;
+
+import com.google.cloud.bigtable.examples.proxy.commands.Serve;
+import com.google.cloud.bigtable.examples.proxy.commands.Verify;
+import org.slf4j.bridge.SLF4JBridgeHandler;
+import picocli.CommandLine;
+import picocli.CommandLine.Command;
+
+/**
+ * Main entry point for proxy commands under {@link
+ * com.google.cloud.bigtable.examples.proxy.commands}.
+ */
+@Command(
+ subcommands = {Serve.class, Verify.class},
+ name = "bigtable-proxy")
+public final class Main {
+ public static void main(String[] args) {
+ SLF4JBridgeHandler.install();
+ new CommandLine(new Main()).execute(args);
+ }
+}
diff --git a/bigtable/bigtable-proxy/src/main/java/com/google/cloud/bigtable/examples/proxy/channelpool/ChannelFactory.java b/bigtable/bigtable-proxy/src/main/java/com/google/cloud/bigtable/examples/proxy/channelpool/ChannelFactory.java
new file mode 100644
index 00000000000..10c68d7d9e7
--- /dev/null
+++ b/bigtable/bigtable-proxy/src/main/java/com/google/cloud/bigtable/examples/proxy/channelpool/ChannelFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Copied from
+// https://github.com/googleapis/sdk-platform-java/blob/a333b0709023c971f12a85e5287b6d77d1b57c48/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelFactory.java
+// Changes:
+// - package name
+// - removed InternalApi annotation
+
+package com.google.cloud.bigtable.examples.proxy.channelpool;
+
+import io.grpc.ManagedChannel;
+import java.io.IOException;
+
+/**
+ * This interface represents a factory for creating one ManagedChannel
+ *
+ *
This is public only for technical reasons, for advanced usage.
+ */
+public interface ChannelFactory {
+ ManagedChannel createSingleChannel() throws IOException;
+}
diff --git a/bigtable/bigtable-proxy/src/main/java/com/google/cloud/bigtable/examples/proxy/channelpool/ChannelPool.java b/bigtable/bigtable-proxy/src/main/java/com/google/cloud/bigtable/examples/proxy/channelpool/ChannelPool.java
new file mode 100644
index 00000000000..380d97c9418
--- /dev/null
+++ b/bigtable/bigtable-proxy/src/main/java/com/google/cloud/bigtable/examples/proxy/channelpool/ChannelPool.java
@@ -0,0 +1,591 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.bigtable.examples.proxy.channelpool;
+
+import com.google.api.core.InternalApi;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import io.grpc.CallOptions;
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
+import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
+import io.grpc.ManagedChannel;
+import io.grpc.Metadata;
+import io.grpc.MethodDescriptor;
+import io.grpc.Status;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import javax.annotation.Nullable;
+
+/**
+ * A {@link ManagedChannel} that will send requests round-robin via a set of channels.
+ *
+ *
In addition to spreading requests over a set of child connections, the pool will also actively
+ * manage the lifecycle of the channels. Currently, lifecycle management is limited to pre-emptively
+ * replacing channels every hour. In the future it will dynamically size the pool based on number of
+ * outstanding requests.
+ *
+ *
Package-private for internal use.
+ */
+public class ChannelPool extends ManagedChannel {
+ @VisibleForTesting static final Logger LOG = Logger.getLogger(ChannelPool.class.getName());
+ private static final java.time.Duration REFRESH_PERIOD = java.time.Duration.ofMinutes(50);
+
+ private final ChannelPoolSettings settings;
+ private final ChannelFactory channelFactory;
+ private final ScheduledExecutorService executor;
+
+ private final Object entryWriteLock = new Object();
+ @VisibleForTesting final AtomicReference> entries = new AtomicReference<>();
+ private final AtomicInteger indexTicker = new AtomicInteger();
+ private final String authority;
+
+ public static ChannelPool create(ChannelPoolSettings settings, ChannelFactory channelFactory)
+ throws IOException {
+ return new ChannelPool(settings, channelFactory, Executors.newSingleThreadScheduledExecutor());
+ }
+
+ /**
+ * Initializes the channel pool. Assumes that all channels have the same authority.
+ *
+ * @param settings options for controling the ChannelPool sizing behavior
+ * @param channelFactory method to create the channels
+ * @param executor periodically refreshes the channels
+ */
+ @VisibleForTesting
+ ChannelPool(
+ ChannelPoolSettings settings,
+ ChannelFactory channelFactory,
+ ScheduledExecutorService executor)
+ throws IOException {
+ this.settings = settings;
+ this.channelFactory = channelFactory;
+
+ ImmutableList.Builder initialListBuilder = ImmutableList.builder();
+
+ for (int i = 0; i < settings.getInitialChannelCount(); i++) {
+ initialListBuilder.add(new Entry(channelFactory.createSingleChannel()));
+ }
+
+ entries.set(initialListBuilder.build());
+ authority = entries.get().get(0).channel.authority();
+ this.executor = executor;
+
+ if (!settings.isStaticSize()) {
+ executor.scheduleAtFixedRate(
+ this::resizeSafely,
+ ChannelPoolSettings.RESIZE_INTERVAL.getSeconds(),
+ ChannelPoolSettings.RESIZE_INTERVAL.getSeconds(),
+ TimeUnit.SECONDS);
+ }
+ if (settings.isPreemptiveRefreshEnabled()) {
+ executor.scheduleAtFixedRate(
+ this::refreshSafely,
+ REFRESH_PERIOD.getSeconds(),
+ REFRESH_PERIOD.getSeconds(),
+ TimeUnit.SECONDS);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String authority() {
+ return authority;
+ }
+
+ /**
+ * Create a {@link ClientCall} on a Channel from the pool chosen in a round-robin fashion to the
+ * remote operation specified by the given {@link MethodDescriptor}. The returned {@link
+ * ClientCall} does not trigger any remote behavior until {@link
+ * ClientCall#start(ClientCall.Listener, io.grpc.Metadata)} is invoked.
+ */
+ @Override
+ public ClientCall newCall(
+ MethodDescriptor methodDescriptor, CallOptions callOptions) {
+ return getChannel(indexTicker.getAndIncrement()).newCall(methodDescriptor, callOptions);
+ }
+
+ Channel getChannel(int affinity) {
+ return new AffinityChannel(affinity);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public ManagedChannel shutdown() {
+ LOG.fine("Initiating graceful shutdown due to explicit request");
+
+ List localEntries = entries.get();
+ for (Entry entry : localEntries) {
+ entry.channel.shutdown();
+ }
+ if (executor != null) {
+ // shutdownNow will cancel scheduled tasks
+ executor.shutdownNow();
+ }
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean isShutdown() {
+ List localEntries = entries.get();
+ for (Entry entry : localEntries) {
+ if (!entry.channel.isShutdown()) {
+ return false;
+ }
+ }
+ return executor == null || executor.isShutdown();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean isTerminated() {
+ List localEntries = entries.get();
+ for (Entry entry : localEntries) {
+ if (!entry.channel.isTerminated()) {
+ return false;
+ }
+ }
+
+ return executor == null || executor.isTerminated();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public ManagedChannel shutdownNow() {
+ LOG.fine("Initiating immediate shutdown due to explicit request");
+
+ List localEntries = entries.get();
+ for (Entry entry : localEntries) {
+ entry.channel.shutdownNow();
+ }
+ if (executor != null) {
+ executor.shutdownNow();
+ }
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+ long endTimeNanos = System.nanoTime() + unit.toNanos(timeout);
+ List localEntries = entries.get();
+ for (Entry entry : localEntries) {
+ long awaitTimeNanos = endTimeNanos - System.nanoTime();
+ if (awaitTimeNanos <= 0) {
+ break;
+ }
+ entry.channel.awaitTermination(awaitTimeNanos, TimeUnit.NANOSECONDS);
+ }
+ if (executor != null) {
+ long awaitTimeNanos = endTimeNanos - System.nanoTime();
+ executor.awaitTermination(awaitTimeNanos, TimeUnit.NANOSECONDS);
+ }
+ return isTerminated();
+ }
+
+ private void resizeSafely() {
+ try {
+ synchronized (entryWriteLock) {
+ resize();
+ }
+ } catch (Exception e) {
+ LOG.log(Level.WARNING, "Failed to resize channel pool", e);
+ }
+ }
+
+ /**
+ * Resize the number of channels based on the number of outstanding RPCs.
+ *
+ *
This method is expected to be called on a fixed interval. On every invocation it will:
+ *
+ *
+ *
Get the maximum number of outstanding RPCs since last invocation
+ *
Determine a valid range of number of channels to handle that many outstanding RPCs
+ *
If the current number of channel falls outside of that range, add or remove at most
+ * {@link ChannelPoolSettings#MAX_RESIZE_DELTA} to get closer to middle of that range.
+ *
+ *
+ *
Not threadsafe, must be called under the entryWriteLock monitor
+ */
+ @VisibleForTesting
+ void resize() {
+ List localEntries = entries.get();
+ // Estimate the peak of RPCs in the last interval by summing the peak of RPCs per channel
+ int actualOutstandingRpcs =
+ localEntries.stream().mapToInt(Entry::getAndResetMaxOutstanding).sum();
+
+ // Number of channels if each channel operated at max capacity
+ int minChannels =
+ (int) Math.ceil(actualOutstandingRpcs / (double) settings.getMaxRpcsPerChannel());
+ // Limit the threshold to absolute range
+ if (minChannels < settings.getMinChannelCount()) {
+ minChannels = settings.getMinChannelCount();
+ }
+
+ // Number of channels if each channel operated at minimum capacity
+ // Note: getMinRpcsPerChannel() can return 0, but division by 0 shouldn't cause a problem.
+ int maxChannels =
+ (int) Math.ceil(actualOutstandingRpcs / (double) settings.getMinRpcsPerChannel());
+ // Limit the threshold to absolute range
+ if (maxChannels > settings.getMaxChannelCount()) {
+ maxChannels = settings.getMaxChannelCount();
+ }
+ if (maxChannels < minChannels) {
+ maxChannels = minChannels;
+ }
+
+ // If the pool were to be resized, try to aim for the middle of the bound, but limit rate of
+ // change.
+ int tentativeTarget = (maxChannels + minChannels) / 2;
+ int currentSize = localEntries.size();
+ int delta = tentativeTarget - currentSize;
+ int dampenedTarget = tentativeTarget;
+ if (Math.abs(delta) > ChannelPoolSettings.MAX_RESIZE_DELTA) {
+ dampenedTarget =
+ currentSize + (int) Math.copySign(ChannelPoolSettings.MAX_RESIZE_DELTA, delta);
+ }
+
+ // Only resize the pool when thresholds are crossed
+ if (localEntries.size() < minChannels) {
+ LOG.fine(
+ String.format(
+ "Detected throughput peak of %d, expanding channel pool size: %d -> %d.",
+ actualOutstandingRpcs, currentSize, dampenedTarget));
+
+ expand(dampenedTarget);
+ } else if (localEntries.size() > maxChannels) {
+ LOG.fine(
+ String.format(
+ "Detected throughput drop to %d, shrinking channel pool size: %d -> %d.",
+ actualOutstandingRpcs, currentSize, dampenedTarget));
+
+ shrink(dampenedTarget);
+ }
+ }
+
+ /** Not threadsafe, must be called under the entryWriteLock monitor */
+ private void shrink(int desiredSize) {
+ ImmutableList localEntries = entries.get();
+ Preconditions.checkState(
+ localEntries.size() >= desiredSize, "current size is already smaller than the desired");
+
+ // Set the new list
+ entries.set(localEntries.subList(0, desiredSize));
+ // clean up removed entries
+ List removed = localEntries.subList(desiredSize, localEntries.size());
+ removed.forEach(Entry::requestShutdown);
+ }
+
+ /** Not threadsafe, must be called under the entryWriteLock monitor */
+ private void expand(int desiredSize) {
+ List localEntries = entries.get();
+ Preconditions.checkState(
+ localEntries.size() <= desiredSize, "current size is already bigger than the desired");
+
+ ImmutableList.Builder newEntries = ImmutableList.builder().addAll(localEntries);
+
+ for (int i = 0; i < desiredSize - localEntries.size(); i++) {
+ try {
+ newEntries.add(new Entry(channelFactory.createSingleChannel()));
+ } catch (IOException e) {
+ LOG.log(Level.WARNING, "Failed to add channel", e);
+ }
+ }
+
+ entries.set(newEntries.build());
+ }
+
+ private void refreshSafely() {
+ try {
+ refresh();
+ } catch (Exception e) {
+ LOG.log(Level.WARNING, "Failed to pre-emptively refresh channnels", e);
+ }
+ }
+
+ /**
+ * Replace all of the channels in the channel pool with fresh ones. This is meant to mitigate the
+ * hourly GFE disconnects by giving clients the ability to prime the channel on reconnect.
+ *
+ *
This is done on a best effort basis. If the replacement channel fails to construct, the old
+ * channel will continue to be used.
+ */
+ @InternalApi("Visible for testing")
+ void refresh() {
+ // Note: synchronization is necessary in case refresh is called concurrently:
+ // - thread1 fails to replace a single entry
+ // - thread2 succeeds replacing an entry
+ // - thread1 loses the race to replace the list
+ // - then thread2 will shut down channel that thread1 will put back into circulation (after it
+ // replaces the list)
+ synchronized (entryWriteLock) {
+ LOG.fine("Refreshing all channels");
+ ArrayList newEntries = new ArrayList<>(entries.get());
+
+ for (int i = 0; i < newEntries.size(); i++) {
+ try {
+ newEntries.set(i, new Entry(channelFactory.createSingleChannel()));
+ } catch (IOException e) {
+ LOG.log(Level.WARNING, "Failed to refresh channel, leaving old channel", e);
+ }
+ }
+
+ ImmutableList replacedEntries = entries.getAndSet(ImmutableList.copyOf(newEntries));
+
+ // Shutdown the channels that were cycled out.
+ for (Entry e : replacedEntries) {
+ if (!newEntries.contains(e)) {
+ e.requestShutdown();
+ }
+ }
+ }
+ }
+
+ /**
+ * Get and retain a Channel Entry. The returned Entry will have its rpc count incremented,
+ * preventing it from getting recycled.
+ */
+ Entry getRetainedEntry(int affinity) {
+ // The maximum number of concurrent calls to this method for any given time span is at most 2,
+ // so the loop can actually be 2 times. But going for 5 times for a safety margin for potential
+ // code evolving
+ for (int i = 0; i < 5; i++) {
+ Entry entry = getEntry(affinity);
+ if (entry.retain()) {
+ return entry;
+ }
+ }
+ // It is unlikely to reach here unless the pool code evolves to increase the maximum possible
+ // concurrent calls to this method. If it does, this is a bug in the channel pool implementation
+ // the number of retries above should be greater than the number of contending maintenance
+ // tasks.
+ throw new IllegalStateException("Bug: failed to retain a channel");
+ }
+
+ /**
+ * Returns one of the channels managed by this pool. The pool continues to "own" the channel, and
+ * the caller should not shut it down.
+ *
+ * @param affinity Two calls to this method with the same affinity returns the same channel most
+ * of the time, if the channel pool was refreshed since the last call, a new channel will be
+ * returned. The reverse is not true: Two calls with different affinities might return the
+ * same channel. However, the implementation should attempt to spread load evenly.
+ */
+ private Entry getEntry(int affinity) {
+ List localEntries = entries.get();
+
+ int index = Math.abs(affinity % localEntries.size());
+
+ return localEntries.get(index);
+ }
+
+ /** Bundles a gRPC {@link ManagedChannel} with some usage accounting. */
+ static class Entry {
+ private final ManagedChannel channel;
+
+ /**
+ * The primary purpose of keeping a count for outstanding RPCs is to track when a channel is
+ * safe to close. In grpc, initialization & starting of rpcs is split between 2 methods:
+ * Channel#newCall() and ClientCall#start. gRPC already has a mechanism to safely close channels
+ * that have rpcs that have been started. However, it does not protect calls that have been
+ * created but not started. In the sequence: Channel#newCall() Channel#shutdown()
+ * ClientCall#Start(), gRpc will error out the call telling the caller that the channel is
+ * shutdown.
+ *
+ *
Hence, the increment of outstanding RPCs has to happen when the ClientCall is initialized,
+ * as part of Channel#newCall(), not after the ClientCall is started. The decrement of
+ * outstanding RPCs has to happen when the ClientCall is closed or the ClientCall failed to
+ * start.
+ */
+ @VisibleForTesting final AtomicInteger outstandingRpcs = new AtomicInteger(0);
+
+ private final AtomicInteger maxOutstanding = new AtomicInteger();
+
+ // Flag that the channel should be closed once all of the outstanding RPC complete.
+ private final AtomicBoolean shutdownRequested = new AtomicBoolean();
+ // Flag that the channel has been closed.
+ private final AtomicBoolean shutdownInitiated = new AtomicBoolean();
+
+ private Entry(ManagedChannel channel) {
+ this.channel = channel;
+ }
+
+ int getAndResetMaxOutstanding() {
+ return maxOutstanding.getAndSet(outstandingRpcs.get());
+ }
+
+ /**
+ * Try to increment the outstanding RPC count. The method will return false if the channel is
+ * closing and the caller should pick a different channel. If the method returned true, the
+ * channel has been successfully retained and it is the responsibility of the caller to release
+ * it.
+ */
+ private boolean retain() {
+ // register desire to start RPC
+ int currentOutstanding = outstandingRpcs.incrementAndGet();
+
+ // Rough book keeping
+ int prevMax = maxOutstanding.get();
+ if (currentOutstanding > prevMax) {
+ maxOutstanding.incrementAndGet();
+ }
+
+ // abort if the channel is closing
+ if (shutdownRequested.get()) {
+ release();
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Notify the channel that the number of outstanding RPCs has decreased. If shutdown has been
+ * previously requested, this method will shutdown the channel if its the last outstanding RPC.
+ */
+ private void release() {
+ int newCount = outstandingRpcs.decrementAndGet();
+ if (newCount < 0) {
+ LOG.log(Level.WARNING, "Bug! Reference count is negative (" + newCount + ")!");
+ }
+
+ // Must check outstandingRpcs after shutdownRequested (in reverse order of retain()) to ensure
+ // mutual exclusion.
+ if (shutdownRequested.get() && outstandingRpcs.get() == 0) {
+ shutdown();
+ }
+ }
+
+ /**
+ * Request a shutdown. The actual shutdown will be delayed until there are no more outstanding
+ * RPCs.
+ */
+ private void requestShutdown() {
+ shutdownRequested.set(true);
+ if (outstandingRpcs.get() == 0) {
+ shutdown();
+ }
+ }
+
+ /** Ensure that shutdown is only called once. */
+ private void shutdown() {
+ if (shutdownInitiated.compareAndSet(false, true)) {
+ channel.shutdown();
+ }
+ }
+ }
+
+ /** Thin wrapper to ensure that new calls are properly reference counted. */
+ private class AffinityChannel extends Channel {
+ private final int affinity;
+
+ public AffinityChannel(int affinity) {
+ this.affinity = affinity;
+ }
+
+ @Override
+ public String authority() {
+ return authority;
+ }
+
+ @Override
+ public ClientCall newCall(
+ MethodDescriptor methodDescriptor, CallOptions callOptions) {
+
+ Entry entry = getRetainedEntry(affinity);
+
+ return new ReleasingClientCall<>(entry.channel.newCall(methodDescriptor, callOptions), entry);
+ }
+ }
+
+ /** ClientCall wrapper that makes sure to decrement the outstanding RPC count on completion. */
+ static class ReleasingClientCall extends SimpleForwardingClientCall {
+ @Nullable private CancellationException cancellationException;
+ final Entry entry;
+ private final AtomicBoolean wasClosed = new AtomicBoolean();
+ private final AtomicBoolean wasReleased = new AtomicBoolean();
+
+ public ReleasingClientCall(ClientCall delegate, Entry entry) {
+ super(delegate);
+ this.entry = entry;
+ }
+
+ @Override
+ public void start(Listener responseListener, Metadata headers) {
+ if (cancellationException != null) {
+ throw new IllegalStateException("Call is already cancelled", cancellationException);
+ }
+ try {
+ super.start(
+ new SimpleForwardingClientCallListener(responseListener) {
+ @Override
+ public void onClose(Status status, Metadata trailers) {
+ if (!wasClosed.compareAndSet(false, true)) {
+ LOG.log(
+ Level.WARNING,
+ "Call is being closed more than once. Please make sure that onClose() is"
+ + " not being manually called.");
+ return;
+ }
+ try {
+ super.onClose(status, trailers);
+ } finally {
+ if (wasReleased.compareAndSet(false, true)) {
+ entry.release();
+ } else {
+ LOG.log(
+ Level.WARNING,
+ "Entry was released before the call is closed. This may be due to an"
+ + " exception on start of the call.");
+ }
+ }
+ }
+ },
+ headers);
+ } catch (Exception e) {
+ // In case start failed, make sure to release
+ if (wasReleased.compareAndSet(false, true)) {
+ entry.release();
+ } else {
+ LOG.log(
+ Level.WARNING,
+ "The entry is already released. This indicates that onClose() has already been"
+ + " called previously");
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public void cancel(@Nullable String message, @Nullable Throwable cause) {
+ this.cancellationException = new CancellationException(message);
+ super.cancel(message, cause);
+ }
+ }
+}
diff --git a/bigtable/bigtable-proxy/src/main/java/com/google/cloud/bigtable/examples/proxy/channelpool/ChannelPoolSettings.java b/bigtable/bigtable-proxy/src/main/java/com/google/cloud/bigtable/examples/proxy/channelpool/ChannelPoolSettings.java
new file mode 100644
index 00000000000..6788e95f485
--- /dev/null
+++ b/bigtable/bigtable-proxy/src/main/java/com/google/cloud/bigtable/examples/proxy/channelpool/ChannelPoolSettings.java
@@ -0,0 +1,169 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.bigtable.examples.proxy.channelpool;
+
+import com.google.api.core.BetaApi;
+import com.google.auto.value.AutoValue;
+import com.google.common.base.Preconditions;
+import java.time.Duration;
+
+/**
+ * Settings to control {@link ChannelPool} behavior.
+ *
+ *
To facilitate low latency/high throughout applications, gax provides a {@link ChannelPool}.
+ * The pool is meant to facilitate high throughput/low latency clients. By splitting load across
+ * multiple gRPC channels the client can spread load across multiple frontends and overcome gRPC's
+ * limit of 100 concurrent RPCs per channel. However oversizing the {@link ChannelPool} can lead to
+ * underutilized channels which will lead to high tail latency due to GFEs disconnecting idle
+ * channels.
+ *
+ *
The {@link ChannelPool} is designed to adapt to varying traffic patterns by tracking
+ * outstanding RPCs and resizing the pool size. This class configures the behavior. In general
+ * clients should aim to have less than 50 concurrent RPCs per channel and at least 1 outstanding
+ * per channel per minute.
+ *
+ *
The settings in this class will be applied every minute.
+ */
+@BetaApi("surface for channel pool sizing is not yet stable")
+@AutoValue
+public abstract class ChannelPoolSettings {
+ /** How often to check and possibly resize the {@link ChannelPool}. */
+ static final Duration RESIZE_INTERVAL = Duration.ofMinutes(1);
+ /** The maximum number of channels that can be added or removed at a time. */
+ static final int MAX_RESIZE_DELTA = 2;
+
+ /**
+ * Threshold to start scaling down the channel pool.
+ *
+ *
When the average of the maximum number of outstanding RPCs in a single minute drop below
+ * this threshold, channels will be removed from the pool.
+ */
+ public abstract int getMinRpcsPerChannel();
+
+ /**
+ * Threshold to start scaling up the channel pool.
+ *
+ *
When the average of the maximum number of outstanding RPCs in a single minute surpass this
+ * threshold, channels will be added to the pool. For google services, gRPC channels will start
+ * locally queuing RPC when there are 100 concurrent RPCs.
+ */
+ public abstract int getMaxRpcsPerChannel();
+
+ /**
+ * The absolute minimum size of the channel pool.
+ *
+ *
Regardless of the current throughput, the number of channels will not drop below this limit
+ */
+ public abstract int getMinChannelCount();
+
+ /**
+ * The absolute maximum size of the channel pool.
+ *
+ *
Regardless of the current throughput, the number of channels will not exceed this limit
+ */
+ public abstract int getMaxChannelCount();
+
+ /**
+ * The initial size of the channel pool.
+ *
+ *
During client construction the client open this many connections. This will be scaled up or
+ * down in the next period.
+ */
+ public abstract int getInitialChannelCount();
+
+ /**
+ * If all of the channels should be replaced on an hourly basis.
+ *
+ *
The GFE will forcibly disconnect active channels after an hour. To minimize the cost of
+ * reconnects, this will create a new channel asynchronuously, prime it and then swap it with an
+ * old channel.
+ */
+ public abstract boolean isPreemptiveRefreshEnabled();
+
+ /** Helper to check if the {@link ChannelPool} implementation can skip dynamic size logic */
+ boolean isStaticSize() {
+ // When range is restricted to a single size
+ if (getMinChannelCount() == getMaxChannelCount()) {
+ return true;
+ }
+ // When the scaling threshold are not set
+ if (getMinRpcsPerChannel() == 0 && getMaxRpcsPerChannel() == Integer.MAX_VALUE) {
+ return true;
+ }
+
+ return false;
+ }
+
+ public abstract Builder toBuilder();
+
+ public static ChannelPoolSettings staticallySized(int size) {
+ return builder()
+ .setInitialChannelCount(size)
+ .setMinRpcsPerChannel(0)
+ .setMaxRpcsPerChannel(Integer.MAX_VALUE)
+ .setMinChannelCount(size)
+ .setMaxChannelCount(size)
+ .build();
+ }
+
+ public static Builder builder() {
+ return new AutoValue_ChannelPoolSettings.Builder()
+ .setInitialChannelCount(1)
+ .setMinChannelCount(1)
+ .setMaxChannelCount(200)
+ .setMinRpcsPerChannel(0)
+ .setMaxRpcsPerChannel(Integer.MAX_VALUE)
+ .setPreemptiveRefreshEnabled(false);
+ }
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+ public abstract Builder setMinRpcsPerChannel(int count);
+
+ public abstract Builder setMaxRpcsPerChannel(int count);
+
+ public abstract Builder setMinChannelCount(int count);
+
+ public abstract Builder setMaxChannelCount(int count);
+
+ public abstract Builder setInitialChannelCount(int count);
+
+ public abstract Builder setPreemptiveRefreshEnabled(boolean enabled);
+
+ abstract ChannelPoolSettings autoBuild();
+
+ public ChannelPoolSettings build() {
+ ChannelPoolSettings s = autoBuild();
+
+ Preconditions.checkState(
+ s.getMinRpcsPerChannel() <= s.getMaxRpcsPerChannel(), "rpcsPerChannel range is invalid");
+ Preconditions.checkState(
+ s.getMinChannelCount() > 0, "Minimum channel count must be at least 1");
+ Preconditions.checkState(
+ s.getMinChannelCount() <= s.getMaxRpcsPerChannel(), "absolute channel range is invalid");
+ Preconditions.checkState(
+ s.getMinChannelCount() <= s.getInitialChannelCount(),
+ "initial channel count be at least minChannelCount");
+ Preconditions.checkState(
+ s.getInitialChannelCount() <= s.getMaxChannelCount(),
+ "initial channel count must be less than maxChannelCount");
+ Preconditions.checkState(
+ s.getInitialChannelCount() > 0, "Initial channel count must be greater than 0");
+ return s;
+ }
+ }
+}
diff --git a/bigtable/bigtable-proxy/src/main/java/com/google/cloud/bigtable/examples/proxy/channelpool/DataChannel.java b/bigtable/bigtable-proxy/src/main/java/com/google/cloud/bigtable/examples/proxy/channelpool/DataChannel.java
new file mode 100644
index 00000000000..a2b3dd7fced
--- /dev/null
+++ b/bigtable/bigtable-proxy/src/main/java/com/google/cloud/bigtable/examples/proxy/channelpool/DataChannel.java
@@ -0,0 +1,387 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.bigtable.examples.proxy.channelpool;
+
+import com.google.bigtable.v2.BigtableGrpc;
+import com.google.bigtable.v2.PingAndWarmRequest;
+import com.google.bigtable.v2.PingAndWarmResponse;
+import com.google.cloud.bigtable.examples.proxy.core.CallLabels;
+import com.google.cloud.bigtable.examples.proxy.core.CallLabels.PrimingKey;
+import com.google.cloud.bigtable.examples.proxy.metrics.Metrics;
+import com.google.cloud.bigtable.examples.proxy.metrics.Tracer;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import io.grpc.CallCredentials;
+import io.grpc.CallOptions;
+import io.grpc.ClientCall;
+import io.grpc.ClientCall.Listener;
+import io.grpc.ConnectivityState;
+import io.grpc.Deadline;
+import io.grpc.ExperimentalApi;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.Metadata;
+import io.grpc.Metadata.Key;
+import io.grpc.MethodDescriptor;
+import io.grpc.Status;
+import java.time.Duration;
+import java.util.List;
+import java.util.Optional;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Decorator for a Bigtable data plane connection to add channel warming via PingAndWarm. Channel
+ * warming will happen on creation and then every 3 minutes (with jitter).
+ */
+public class DataChannel extends ManagedChannel {
+ private static final Logger LOGGER = LoggerFactory.getLogger(DataChannel.class);
+
+ private static final Metadata.Key GFE_DEBUG_REQ_HEADER =
+ Key.of("X-Return-Encrypted-Headers", Metadata.ASCII_STRING_MARSHALLER);
+ private static final Metadata.Key GFE_DEBUG_RESP_HEADER =
+ Key.of("X-Encrypted-Debug-Headers", Metadata.ASCII_STRING_MARSHALLER);
+
+ private static final Duration WARM_PERIOD = Duration.ofMinutes(3);
+ private static final Duration MAX_JITTER = Duration.ofSeconds(10);
+
+ private final Random random = new Random();
+ private final ManagedChannel inner;
+ private final Metrics metrics;
+ private final ResourceCollector resourceCollector;
+ private final CallCredentials callCredentials;
+ private final ScheduledExecutorService warmingExecutor;
+ private volatile ScheduledFuture> antiIdleTask;
+
+ private final AtomicBoolean closed = new AtomicBoolean();
+ private final Object scheduleLock = new Object();
+
+ public DataChannel(
+ ResourceCollector resourceCollector,
+ String userAgent,
+ CallCredentials callCredentials,
+ String endpoint,
+ int port,
+ ScheduledExecutorService warmingExecutor,
+ Metrics metrics) {
+ this.resourceCollector = resourceCollector;
+
+ this.callCredentials = callCredentials;
+ inner =
+ ManagedChannelBuilder.forAddress(endpoint, port)
+ .userAgent(userAgent)
+ .disableRetry()
+ .maxInboundMessageSize(256 * 1024 * 1024)
+ .keepAliveTime(30, TimeUnit.SECONDS)
+ .keepAliveTimeout(10, TimeUnit.SECONDS)
+ .build();
+
+ this.warmingExecutor = warmingExecutor;
+ this.metrics = metrics;
+
+ new StateTransitionWatcher().run();
+
+ try {
+ warm();
+ } catch (RuntimeException e) {
+ try {
+ inner.shutdown();
+ } catch (RuntimeException e2) {
+ e.addSuppressed(e2);
+ }
+ throw e;
+ }
+
+ antiIdleTask =
+ warmingExecutor.schedule(this::warmTask, nextWarmup().toMillis(), TimeUnit.MILLISECONDS);
+ metrics.updateChannelCount(1);
+ }
+
+ private Duration nextWarmup() {
+ return WARM_PERIOD.minus(
+ Duration.ofMillis((long) (MAX_JITTER.toMillis() * random.nextDouble())));
+ }
+
+ private void warmTask() {
+ try {
+ warm();
+ } catch (RuntimeException e) {
+ LOGGER.warn("anti idle ping failed, forcing reconnect", e);
+ inner.enterIdle();
+ } finally {
+ synchronized (scheduleLock) {
+ if (!closed.get()) {
+ antiIdleTask =
+ warmingExecutor.schedule(
+ this::warmTask, nextWarmup().toMillis(), TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+ }
+
+ private void warm() {
+ List primingKeys = resourceCollector.getPrimingKeys();
+ if (primingKeys.isEmpty()) {
+ return;
+ }
+
+ LOGGER.debug("Warming channel {} with: {}", inner, primingKeys);
+
+ List> futures =
+ primingKeys.stream().map(this::sendPingAndWarm).collect(Collectors.toList());
+
+ int successCount = 0;
+ int failures = 0;
+ for (ListenableFuture future : futures) {
+ PrimingKey request = primingKeys.get(successCount + failures);
+ try {
+ future.get();
+ successCount++;
+ } catch (ExecutionException e) {
+ // All permanent errors are ignored and treated as a success
+ // The priming request for that generated the error will be dropped
+ if (e.getCause() instanceof PingAndWarmException) {
+ PingAndWarmException se = (PingAndWarmException) e.getCause();
+
+ switch (se.getStatus().getCode()) {
+ case INTERNAL:
+ case PERMISSION_DENIED:
+ case NOT_FOUND:
+ case UNAUTHENTICATED:
+ successCount++;
+ // drop the priming request for permenant errors
+ resourceCollector.evict(request);
+ continue;
+ default:
+ // noop
+ }
+ LOGGER.warn(
+ "Failed to prime channel with request: {}, status: {}, debug response headers: {}",
+ request,
+ se.getStatus(),
+ Optional.ofNullable(se.getDebugHeaders()).orElse(""));
+ } else {
+ LOGGER.warn("Unexpected failure priming channel with request: {}", request, e.getCause());
+ }
+
+ failures++;
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Interrupted while priming channel with request: " + request, e);
+ }
+ }
+ if (successCount < failures) {
+ throw new RuntimeException("Most of the priming requests failed");
+ }
+ }
+
+ private ListenableFuture sendPingAndWarm(PrimingKey primingKey) {
+ Metadata metadata = primingKey.composeMetadata();
+ metadata.put(GFE_DEBUG_REQ_HEADER, "gfe_response_only");
+ PingAndWarmRequest request = primingKey.composeProto();
+ request = request.toBuilder().setName(request.getName()).build();
+
+ CallLabels callLabels = CallLabels.create(BigtableGrpc.getPingAndWarmMethod(), metadata);
+ Tracer tracer = new Tracer(metrics, callLabels);
+
+ CallOptions callOptions =
+ CallOptions.DEFAULT
+ .withCallCredentials(callCredentials)
+ .withDeadline(Deadline.after(1, TimeUnit.MINUTES));
+ callOptions = tracer.injectIntoCallOptions(callOptions);
+
+ ClientCall call =
+ inner.newCall(BigtableGrpc.getPingAndWarmMethod(), callOptions);
+
+ SettableFuture f = SettableFuture.create();
+ call.start(
+ new Listener<>() {
+ String debugHeaders = null;
+
+ @Override
+ public void onMessage(PingAndWarmResponse response) {
+ if (!f.set(response)) {
+ // TODO: set a metric
+ LOGGER.warn("PingAndWarm returned multiple responses");
+ }
+ }
+
+ @Override
+ public void onHeaders(Metadata headers) {
+ debugHeaders = headers.get(GFE_DEBUG_RESP_HEADER);
+ }
+
+ @Override
+ public void onClose(Status status, Metadata trailers) {
+ tracer.onCallFinished(status);
+
+ if (status.isOk()) {
+ f.setException(
+ new PingAndWarmException(
+ "PingAndWarm was missing a response", debugHeaders, trailers, status));
+ } else {
+ f.setException(
+ new PingAndWarmException("PingAndWarm failed", debugHeaders, trailers, status));
+ }
+ }
+ },
+ metadata);
+ call.sendMessage(request);
+ call.halfClose();
+ call.request(Integer.MAX_VALUE);
+
+ return f;
+ }
+
+ static class PingAndWarmException extends RuntimeException {
+
+ private final String debugHeaders;
+ private final Metadata trailers;
+ private final Status status;
+
+ public PingAndWarmException(
+ String message, String debugHeaders, Metadata trailers, Status status) {
+ super(String.format("PingAndWarm failed, status: " + status));
+ this.debugHeaders = debugHeaders;
+ this.trailers = trailers;
+ this.status = status;
+ }
+
+ public String getDebugHeaders() {
+ return debugHeaders;
+ }
+
+ public Metadata getTrailers() {
+ return trailers;
+ }
+
+ public Status getStatus() {
+ return status;
+ }
+ }
+
+ @Override
+ public ManagedChannel shutdown() {
+ final boolean closing;
+
+ synchronized (scheduleLock) {
+ closing = closed.compareAndSet(false, true);
+ antiIdleTask.cancel(true);
+ }
+ if (closing) {
+ metrics.updateChannelCount(-1);
+ }
+
+ return inner.shutdown();
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return inner.isShutdown();
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return inner.isTerminated();
+ }
+
+ @Override
+ public ManagedChannel shutdownNow() {
+ final boolean closing;
+
+ synchronized (scheduleLock) {
+ closing = closed.compareAndSet(false, true);
+ antiIdleTask.cancel(true);
+ }
+
+ if (closing) {
+ metrics.updateChannelCount(-1);
+ }
+
+ return inner.shutdownNow();
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+ return inner.awaitTermination(timeout, unit);
+ }
+
+ @ExperimentalApi("https://github.com/grpc/grpc-java/issues/4359")
+ @Override
+ public ConnectivityState getState(boolean requestConnection) {
+ return inner.getState(requestConnection);
+ }
+
+ @ExperimentalApi("https://github.com/grpc/grpc-java/issues/4359")
+ @Override
+ public void notifyWhenStateChanged(ConnectivityState source, Runnable callback) {
+ inner.notifyWhenStateChanged(source, callback);
+ }
+
+ @ExperimentalApi("https://github.com/grpc/grpc-java/issues/4056")
+ @Override
+ public void resetConnectBackoff() {
+ inner.resetConnectBackoff();
+ }
+
+ @ExperimentalApi("https://github.com/grpc/grpc-java/issues/4056")
+ @Override
+ public void enterIdle() {
+ inner.enterIdle();
+ }
+
+ @Override
+ public ClientCall newCall(
+ MethodDescriptor methodDescriptor, CallOptions callOptions) {
+ Tracer tracer =
+ Optional.ofNullable(Tracer.extractTracerFromCallOptions(callOptions))
+ .orElseThrow(
+ () ->
+ new IllegalStateException(
+ "DataChannel failed to extract Tracer from CallOptions"));
+ resourceCollector.collect(tracer.getCallLabels());
+
+ return inner.newCall(methodDescriptor, callOptions);
+ }
+
+ @Override
+ public String authority() {
+ return inner.authority();
+ }
+
+ class StateTransitionWatcher implements Runnable {
+ private ConnectivityState prevState = null;
+
+ @Override
+ public void run() {
+ if (closed.get()) {
+ return;
+ }
+
+ ConnectivityState newState = inner.getState(false);
+ metrics.recordChannelStateChange(prevState, newState);
+ prevState = newState;
+ inner.notifyWhenStateChanged(prevState, this);
+ }
+ }
+}
diff --git a/bigtable/bigtable-proxy/src/main/java/com/google/cloud/bigtable/examples/proxy/channelpool/ResourceCollector.java b/bigtable/bigtable-proxy/src/main/java/com/google/cloud/bigtable/examples/proxy/channelpool/ResourceCollector.java
new file mode 100644
index 00000000000..d36fb630ef3
--- /dev/null
+++ b/bigtable/bigtable-proxy/src/main/java/com/google/cloud/bigtable/examples/proxy/channelpool/ResourceCollector.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.bigtable.examples.proxy.channelpool;
+
+import com.google.cloud.bigtable.examples.proxy.core.CallLabels;
+import com.google.cloud.bigtable.examples.proxy.core.CallLabels.ParsingException;
+import com.google.cloud.bigtable.examples.proxy.core.CallLabels.PrimingKey;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.ImmutableList;
+import java.time.Duration;
+import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ResourceCollector {
+ private static final Logger LOG = LoggerFactory.getLogger(ResourceCollector.class);
+
+ private final Cache primingKeys =
+ CacheBuilder.newBuilder().expireAfterWrite(Duration.ofHours(1)).maximumSize(100).build();
+
+ public void collect(CallLabels labels) {
+ try {
+ PrimingKey.from(labels).ifPresent(k -> primingKeys.put(k, true));
+ } catch (ParsingException e) {
+ LOG.warn("Failed to collect priming request for {}", labels, e);
+ }
+ }
+
+ public List getPrimingKeys() {
+ return ImmutableList.copyOf(primingKeys.asMap().keySet());
+ }
+
+ public void evict(PrimingKey request) {
+ primingKeys.invalidate(request);
+ }
+}
diff --git a/bigtable/bigtable-proxy/src/main/java/com/google/cloud/bigtable/examples/proxy/commands/Endpoint.java b/bigtable/bigtable-proxy/src/main/java/com/google/cloud/bigtable/examples/proxy/commands/Endpoint.java
new file mode 100644
index 00000000000..4319cdbfcfe
--- /dev/null
+++ b/bigtable/bigtable-proxy/src/main/java/com/google/cloud/bigtable/examples/proxy/commands/Endpoint.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.bigtable.examples.proxy.commands;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.base.Preconditions;
+import picocli.CommandLine.ITypeConverter;
+
+@AutoValue
+abstract class Endpoint {
+ abstract String getName();
+
+ abstract int getPort();
+
+ @Override
+ public String toString() {
+ return String.format("%s:%d", getName(), getPort());
+ }
+
+ static Endpoint create(String name, int port) {
+ return new AutoValue_Endpoint(name, port);
+ }
+
+ static class ArgConverter implements ITypeConverter {
+ @Override
+ public Endpoint convert(String s) throws Exception {
+ int i = s.lastIndexOf(":");
+ Preconditions.checkArgument(i > 0, "endpoint must of the form `name:port`");
+
+ String name = s.substring(0, i);
+ int port = Integer.parseInt(s.substring(i + 1));
+ return Endpoint.create(name, port);
+ }
+ }
+}
diff --git a/bigtable/bigtable-proxy/src/main/java/com/google/cloud/bigtable/examples/proxy/commands/Serve.java b/bigtable/bigtable-proxy/src/main/java/com/google/cloud/bigtable/examples/proxy/commands/Serve.java
new file mode 100644
index 00000000000..797c861632d
--- /dev/null
+++ b/bigtable/bigtable-proxy/src/main/java/com/google/cloud/bigtable/examples/proxy/commands/Serve.java
@@ -0,0 +1,178 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.bigtable.examples.proxy.commands;
+
+import com.google.auth.Credentials;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.bigtable.admin.v2.BigtableInstanceAdminGrpc;
+import com.google.bigtable.admin.v2.BigtableTableAdminGrpc;
+import com.google.bigtable.v2.BigtableGrpc;
+import com.google.cloud.bigtable.examples.proxy.channelpool.ChannelPool;
+import com.google.cloud.bigtable.examples.proxy.channelpool.ChannelPoolSettings;
+import com.google.cloud.bigtable.examples.proxy.channelpool.DataChannel;
+import com.google.cloud.bigtable.examples.proxy.channelpool.ResourceCollector;
+import com.google.cloud.bigtable.examples.proxy.core.ProxyHandler;
+import com.google.cloud.bigtable.examples.proxy.core.Registry;
+import com.google.cloud.bigtable.examples.proxy.metrics.InstrumentedCallCredentials;
+import com.google.cloud.bigtable.examples.proxy.metrics.Metrics;
+import com.google.cloud.bigtable.examples.proxy.metrics.MetricsImpl;
+import com.google.common.collect.ImmutableMap;
+import com.google.longrunning.OperationsGrpc;
+import io.grpc.CallCredentials;
+import io.grpc.InsecureServerCredentials;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.Server;
+import io.grpc.ServerCallHandler;
+import io.grpc.auth.MoreCallCredentials;
+import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import picocli.CommandLine.Command;
+import picocli.CommandLine.Help.Visibility;
+import picocli.CommandLine.Option;
+
+@Command(name = "serve", description = "Start the proxy server")
+public class Serve implements Callable {
+ private static final Logger LOGGER = LoggerFactory.getLogger(Serve.class);
+
+ @Option(
+ names = "--listen-port",
+ required = true,
+ description = "Local port to accept connections on")
+ int listenPort;
+
+ @Option(names = "--useragent", showDefaultValue = Visibility.ALWAYS)
+ String userAgent = "bigtable-java-proxy";
+
+ @Option(
+ names = "--bigtable-data-endpoint",
+ converter = Endpoint.ArgConverter.class,
+ showDefaultValue = Visibility.ALWAYS)
+ Endpoint dataEndpoint = Endpoint.create("bigtable.googleapis.com", 443);
+
+ @Option(
+ names = "--bigtable-admin-endpoint",
+ converter = Endpoint.ArgConverter.class,
+ showDefaultValue = Visibility.ALWAYS)
+ Endpoint adminEndpoint = Endpoint.create("bigtableadmin.googleapis.com", 443);
+
+ @Option(
+ names = "--metrics-project-id",
+ required = true,
+ description = "The project id where metrics should be exported")
+ String metricsProjectId = null;
+
+ ManagedChannel adminChannel = null;
+ ManagedChannel dataChannel = null;
+ Credentials credentials = null;
+ Server server;
+ Metrics metrics;
+ private ScheduledExecutorService refreshExecutor;
+
+ @Override
+ public Void call() throws Exception {
+ start();
+ server.awaitTermination();
+ cleanup();
+ return null;
+ }
+
+ void start() throws IOException {
+ if (credentials == null) {
+ credentials = GoogleCredentials.getApplicationDefault();
+ }
+ CallCredentials callCredentials =
+ new InstrumentedCallCredentials(MoreCallCredentials.from(credentials));
+
+ if (metrics == null) {
+ // InstrumentedCallCredentials expect to only be called when a Tracer is available in the
+ // CallOptions. This is only true for DataChannel pingAndWarm and things invoked by
+ // ProxyHandler. MetricsImpl does not do this, so it must get undecorated credentials.
+ metrics = new MetricsImpl(credentials, metricsProjectId);
+ }
+
+ ResourceCollector resourceCollector = new ResourceCollector();
+ refreshExecutor = Executors.newSingleThreadScheduledExecutor();
+
+ ChannelPoolSettings poolSettings =
+ ChannelPoolSettings.builder()
+ .setInitialChannelCount(10)
+ .setMinChannelCount(2)
+ .setMaxChannelCount(20)
+ .setMinRpcsPerChannel(5)
+ .setMaxRpcsPerChannel(50)
+ .setPreemptiveRefreshEnabled(true)
+ .build();
+
+ if (dataChannel == null) {
+ dataChannel =
+ ChannelPool.create(
+ poolSettings,
+ () ->
+ new DataChannel(
+ resourceCollector,
+ userAgent,
+ callCredentials,
+ dataEndpoint.getName(),
+ dataEndpoint.getPort(),
+ refreshExecutor,
+ metrics));
+ }
+
+ if (adminChannel == null) {
+ adminChannel =
+ ManagedChannelBuilder.forAddress(adminEndpoint.getName(), adminEndpoint.getPort())
+ .userAgent(userAgent)
+ .disableRetry()
+ .build();
+ }
+
+ Map> serviceMap =
+ ImmutableMap.of(
+ BigtableGrpc.SERVICE_NAME,
+ new ProxyHandler<>(metrics, dataChannel, callCredentials),
+ BigtableInstanceAdminGrpc.SERVICE_NAME,
+ new ProxyHandler<>(metrics, adminChannel, callCredentials),
+ BigtableTableAdminGrpc.SERVICE_NAME,
+ new ProxyHandler<>(metrics, adminChannel, callCredentials),
+ OperationsGrpc.SERVICE_NAME,
+ new ProxyHandler<>(metrics, adminChannel, callCredentials));
+
+ server =
+ NettyServerBuilder.forAddress(
+ new InetSocketAddress("localhost", listenPort), InsecureServerCredentials.create())
+ .fallbackHandlerRegistry(new Registry(serviceMap))
+ .maxInboundMessageSize(256 * 1024 * 1024)
+ .build();
+
+ server.start();
+ LOGGER.info("Listening on port {}", server.getPort());
+ }
+
+ void cleanup() throws InterruptedException {
+ refreshExecutor.shutdown();
+ dataChannel.shutdown();
+ adminChannel.shutdown();
+ }
+}
diff --git a/bigtable/bigtable-proxy/src/main/java/com/google/cloud/bigtable/examples/proxy/commands/Verify.java b/bigtable/bigtable-proxy/src/main/java/com/google/cloud/bigtable/examples/proxy/commands/Verify.java
new file mode 100644
index 00000000000..669385e4421
--- /dev/null
+++ b/bigtable/bigtable-proxy/src/main/java/com/google/cloud/bigtable/examples/proxy/commands/Verify.java
@@ -0,0 +1,229 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.bigtable.examples.proxy.commands;
+
+import com.google.auth.Credentials;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.bigtable.v2.BigtableGrpc;
+import com.google.bigtable.v2.BigtableGrpc.BigtableBlockingStub;
+import com.google.bigtable.v2.CheckAndMutateRowRequest;
+import com.google.bigtable.v2.CheckAndMutateRowResponse;
+import com.google.bigtable.v2.Mutation;
+import com.google.bigtable.v2.Mutation.DeleteFromRow;
+import com.google.bigtable.v2.ReadRowsRequest;
+import com.google.bigtable.v2.ReadRowsResponse;
+import com.google.bigtable.v2.RowFilter;
+import com.google.bigtable.v2.RowFilter.Chain;
+import com.google.bigtable.v2.RowSet;
+import com.google.cloud.bigtable.examples.proxy.metrics.MetricsImpl;
+import com.google.cloud.opentelemetry.metric.GoogleCloudMetricExporter;
+import com.google.cloud.opentelemetry.metric.MetricConfiguration;
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
+import io.grpc.CallCredentials;
+import io.grpc.CallOptions;
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.ClientInterceptor;
+import io.grpc.Deadline;
+import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.Metadata;
+import io.grpc.Metadata.Key;
+import io.grpc.MethodDescriptor;
+import io.grpc.StatusRuntimeException;
+import io.grpc.auth.MoreCallCredentials;
+import io.opentelemetry.contrib.gcp.resource.GCPResourceProvider;
+import io.opentelemetry.sdk.common.CompletableResultCode;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.metrics.export.MetricExporter;
+import io.opentelemetry.sdk.resources.Resource;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import picocli.CommandLine.Command;
+import picocli.CommandLine.Help.Visibility;
+import picocli.CommandLine.Option;
+
+@Command(name = "verify", description = "Verify environment is properly set up")
+public class Verify implements Callable {
+ @Option(
+ names = "--bigtable-project-id",
+ required = true,
+ description = "Project that contains a Bigtable instance to use for connectivity test")
+ String bigtableProjectId;
+
+ @Option(
+ names = "--bigtable-instance-id",
+ required = true,
+ description = "Bigtable instance to use for connectivity test")
+ String bigtableInstanceId;
+
+ @Option(
+ names = "--bigtable-table-id",
+ required = true,
+ description = "Bigtable table to use for connectivity test")
+ String bigtableTableId;
+
+ @Option(
+ names = "--metrics-project-id",
+ required = true,
+ description = "The project id where metrics should be exported")
+ String metricsProjectId = null;
+
+ @Option(
+ names = "--bigtable-data-endpoint",
+ converter = Endpoint.ArgConverter.class,
+ showDefaultValue = Visibility.ALWAYS)
+ Endpoint dataEndpoint = Endpoint.create("bigtable.googleapis.com", 443);
+
+ Credentials credentials = null;
+
+ @Override
+ public Void call() throws Exception {
+ if (credentials == null) {
+ credentials = GoogleCredentials.getApplicationDefault();
+ }
+ checkBigtable(
+ MoreCallCredentials.from(credentials),
+ String.format(
+ "projects/%s/instances/%s/tables/%s",
+ bigtableProjectId, bigtableInstanceId, bigtableTableId));
+
+ checkMetrics(credentials);
+ return null;
+ }
+
+ private void checkBigtable(CallCredentials callCredentials, String tableName) {
+ ManagedChannel channel =
+ ManagedChannelBuilder.forAddress(dataEndpoint.getName(), dataEndpoint.getPort()).build();
+
+ try {
+ Metadata md = new Metadata();
+
+ md.put(
+ Key.of("x-goog-request-params", Metadata.ASCII_STRING_MARSHALLER),
+ String.format(
+ "table_name=%s&app_profile_id=%s",
+ URLEncoder.encode(tableName, StandardCharsets.UTF_8), ""));
+
+ BigtableBlockingStub stub =
+ BigtableGrpc.newBlockingStub(channel)
+ .withCallCredentials(callCredentials)
+ .withInterceptors(new MetadataInterceptor(md));
+
+ ReadRowsRequest readRequest =
+ ReadRowsRequest.newBuilder()
+ .setTableName(
+ String.format(
+ "projects/%s/instances/%s/tables/%s",
+ bigtableProjectId, bigtableInstanceId, bigtableTableId))
+ .setRowsLimit(1)
+ .setRows(
+ RowSet.newBuilder().addRowKeys(ByteString.copyFromUtf8("some-nonexistent-row")))
+ .setFilter(
+ RowFilter.newBuilder()
+ .setChain(
+ Chain.newBuilder()
+ .addFilters(RowFilter.newBuilder().setCellsPerRowLimitFilter(1))
+ .addFilters(
+ RowFilter.newBuilder().setStripValueTransformer(true).build())))
+ .build();
+
+ Iterator readIt =
+ stub.withDeadline(Deadline.after(1, TimeUnit.SECONDS)).readRows(readRequest);
+
+ try {
+ while (readIt.hasNext()) {
+ readIt.next();
+ }
+ System.out.println("Bigtable Read: OK");
+ } catch (StatusRuntimeException e) {
+ System.out.println("Bigtable Read: Failed - " + e.getStatus());
+ return;
+ }
+
+ CheckAndMutateRowRequest rwReq =
+ CheckAndMutateRowRequest.newBuilder()
+ .setTableName(tableName)
+ .setRowKey(ByteString.copyFromUtf8("some-non-existent-row"))
+ .setPredicateFilter(RowFilter.newBuilder().setBlockAllFilter(true))
+ .addTrueMutations(
+ Mutation.newBuilder().setDeleteFromRow(DeleteFromRow.getDefaultInstance()))
+ .build();
+
+ try {
+ CheckAndMutateRowResponse ignored = stub.checkAndMutateRow(rwReq);
+ System.out.println("Bigtable Read/Write: OK");
+ } catch (StatusRuntimeException e) {
+ System.out.println("Bigtable Read/Write: Failed - " + e.getStatus());
+ return;
+ }
+ } finally {
+ channel.shutdown();
+ }
+ }
+
+ void checkMetrics(Credentials creds) {
+ MetricConfiguration config =
+ MetricConfiguration.builder()
+ .setCredentials(creds)
+ .setProjectId(metricsProjectId)
+ .setInstrumentationLibraryLabelsEnabled(false)
+ .build();
+
+ GCPResourceProvider resourceProvider = new GCPResourceProvider();
+ Resource resource = Resource.create(resourceProvider.getAttributes());
+ ImmutableList metricData =
+ ImmutableList.of(MetricsImpl.generateTestPresenceMeasurement(resource));
+
+ try (MetricExporter exporter = GoogleCloudMetricExporter.createWithConfiguration(config)) {
+ CompletableResultCode result = exporter.export(metricData);
+ result.join(1, TimeUnit.MINUTES);
+
+ System.out.println("Metrics resource: " + resource);
+ if (result.isSuccess()) {
+ System.out.println("Metrics write: OK");
+ } else {
+ System.out.println("Metrics write: FAILED: " + result.getFailureThrowable().getMessage());
+ }
+ }
+ }
+
+ private static class MetadataInterceptor implements ClientInterceptor {
+ private final Metadata metadata;
+
+ private MetadataInterceptor(Metadata metadata) {
+ this.metadata = metadata;
+ }
+
+ @Override
+ public ClientCall interceptCall(
+ MethodDescriptor method, CallOptions callOptions, Channel next) {
+ return new SimpleForwardingClientCall<>(next.newCall(method, callOptions)) {
+ @Override
+ public void start(Listener responseListener, Metadata headers) {
+ headers.merge(metadata);
+ super.start(responseListener, headers);
+ }
+ };
+ }
+ }
+}
diff --git a/bigtable/bigtable-proxy/src/main/java/com/google/cloud/bigtable/examples/proxy/commands/package-info.java b/bigtable/bigtable-proxy/src/main/java/com/google/cloud/bigtable/examples/proxy/commands/package-info.java
new file mode 100644
index 00000000000..e3b143a9fe9
--- /dev/null
+++ b/bigtable/bigtable-proxy/src/main/java/com/google/cloud/bigtable/examples/proxy/commands/package-info.java
@@ -0,0 +1,18 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Contains all the command implementations for the proxy server. */
+package com.google.cloud.bigtable.examples.proxy.commands;
diff --git a/bigtable/bigtable-proxy/src/main/java/com/google/cloud/bigtable/examples/proxy/core/ByteMarshaller.java b/bigtable/bigtable-proxy/src/main/java/com/google/cloud/bigtable/examples/proxy/core/ByteMarshaller.java
new file mode 100644
index 00000000000..e8d3611045f
--- /dev/null
+++ b/bigtable/bigtable-proxy/src/main/java/com/google/cloud/bigtable/examples/proxy/core/ByteMarshaller.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.bigtable.examples.proxy.core;
+
+import com.google.common.io.ByteStreams;
+import io.grpc.MethodDescriptor;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+class ByteMarshaller implements MethodDescriptor.Marshaller {
+
+ @Override
+ public byte[] parse(InputStream stream) {
+ try {
+ return ByteStreams.toByteArray(stream);
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ @Override
+ public InputStream stream(byte[] value) {
+ return new ByteArrayInputStream(value);
+ }
+}
diff --git a/bigtable/bigtable-proxy/src/main/java/com/google/cloud/bigtable/examples/proxy/core/CallLabels.java b/bigtable/bigtable-proxy/src/main/java/com/google/cloud/bigtable/examples/proxy/core/CallLabels.java
new file mode 100644
index 00000000000..cdd3c6f5e38
--- /dev/null
+++ b/bigtable/bigtable-proxy/src/main/java/com/google/cloud/bigtable/examples/proxy/core/CallLabels.java
@@ -0,0 +1,291 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.bigtable.examples.proxy.core;
+
+import com.google.auto.value.AutoValue;
+import com.google.bigtable.v2.PingAndWarmRequest;
+import com.google.bigtable.v2.PingAndWarmRequest.Builder;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import io.grpc.Metadata;
+import io.grpc.Metadata.Key;
+import io.grpc.MethodDescriptor;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A value class to encapsulate call identity.
+ *
+ *
This call extracts relevant information from request headers and makes it accessible to
+ * metrics & the upstream client. The primary headers consulted are:
+ *
+ *
+ *
{@code x-goog-request-params} - contains the resource and app profile id
+ *
{@code google-cloud-resource-prefix} - the previous version of {@code
+ * x-goog-request-params}, used as a fallback
+ *
{@code x-goog-cbt-cookie-routing} - an opaque blob used to routing RPCs on the serverside
+ *
{@code bigtable-features} - the client's available features
+ *
{@code x-goog-api-client} - contains the client info of the downstream client
+ *
+ */
+@AutoValue
+public abstract class CallLabels {
+ private static final Logger LOG = LoggerFactory.getLogger(CallLabels.class);
+
+ // All RLS headers
+ static final Key REQUEST_PARAMS =
+ Key.of("x-goog-request-params", Metadata.ASCII_STRING_MARSHALLER);
+ static final Key LEGACY_RESOURCE_PREFIX =
+ Key.of("google-cloud-resource-prefix", Metadata.ASCII_STRING_MARSHALLER);
+ static final Key ROUTING_COOKIE =
+ Key.of("x-goog-cbt-cookie-routing", Metadata.ASCII_STRING_MARSHALLER);
+ static final Key FEATURE_FLAGS =
+ Key.of("bigtable-features", Metadata.ASCII_STRING_MARSHALLER);
+ static final Key API_CLIENT =
+ Key.of("x-goog-api-client", Metadata.ASCII_STRING_MARSHALLER);
+
+ enum ResourceNameType {
+ Parent("parent", 0),
+ Name("name", 1),
+ TableName("table_name", 2);
+
+ private final String name;
+ private final int priority;
+
+ ResourceNameType(String name, int priority) {
+ this.name = name;
+ this.priority = priority;
+ }
+ }
+
+ @AutoValue
+ abstract static class ResourceName {
+
+ abstract ResourceNameType getType();
+
+ abstract String getValue();
+
+ static ResourceName create(ResourceNameType type, String value) {
+ return new AutoValue_CallLabels_ResourceName(type, value);
+ }
+ }
+
+ public abstract String getMethodName();
+
+ abstract Optional getRequestParams();
+
+ abstract Optional getLegacyResourcePrefix();
+
+ abstract Optional getRoutingCookie();
+
+ abstract Optional getEncodedFeatures();
+
+ public abstract Optional getApiClient();
+
+ public static CallLabels create(MethodDescriptor, ?> method, Metadata headers) {
+ Optional apiClient = Optional.ofNullable(headers.get(API_CLIENT));
+
+ Optional requestParams = Optional.ofNullable(headers.get(REQUEST_PARAMS));
+ Optional legacyResourcePrefix =
+ Optional.ofNullable(headers.get(LEGACY_RESOURCE_PREFIX));
+ Optional routingCookie = Optional.ofNullable(headers.get(ROUTING_COOKIE));
+ Optional encodedFeatures = Optional.ofNullable(headers.get(FEATURE_FLAGS));
+
+ return create(
+ method, requestParams, legacyResourcePrefix, routingCookie, encodedFeatures, apiClient);
+ }
+
+ @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
+ @VisibleForTesting
+ public static CallLabels create(
+ MethodDescriptor, ?> method,
+ Optional requestParams,
+ Optional legacyResourcePrefix,
+ Optional routingCookie,
+ Optional encodedFeatures,
+ Optional apiClient) {
+
+ return new AutoValue_CallLabels(
+ method.getFullMethodName(),
+ requestParams,
+ legacyResourcePrefix,
+ routingCookie,
+ encodedFeatures,
+ apiClient);
+ }
+
+ /**
+ * Extracts the resource name, will use {@link #getRequestParams()} if present, otherwise falls
+ * back on {@link #getLegacyResourcePrefix()}. If neither is present, {@link Optional#empty()} is
+ * returned. If there was an issue extracting, a {@link ParsingException} is thrown. In the
+ * primary case, the value will be url decoded.
+ */
+ public Optional extractResourceName() throws ParsingException {
+ if (getRequestParams().isEmpty()) {
+ return getLegacyResourcePrefix();
+ }
+
+ String requestParams = getRequestParams().orElse("");
+ String[] encodedKvPairs = requestParams.split("&");
+ Optional resourceName = Optional.empty();
+
+ for (String encodedKv : encodedKvPairs) {
+ String[] split = encodedKv.split("=", 2);
+ if (split.length != 2) {
+ continue;
+ }
+ String encodedKey = split[0];
+ String encodedValue = split[1];
+ if (encodedKey.isEmpty() || encodedValue.isEmpty()) {
+ continue;
+ }
+
+ Optional newType = findType(encodedKey);
+
+ if (newType.isEmpty()) {
+ continue;
+ }
+ // Skip if we previously found a resource name and the new resource name type has a lower
+ // priority
+ if (resourceName.isPresent()
+ && newType.get().priority <= resourceName.get().getType().priority) {
+ continue;
+ }
+ String decodedValue = percentDecode(encodedValue);
+
+ resourceName = Optional.of(ResourceName.create(newType.get(), decodedValue));
+ }
+ return resourceName.map(ResourceName::getValue);
+ }
+
+ private static Optional findType(String key) {
+ for (ResourceNameType type : ResourceNameType.values()) {
+ if (type.name.equals(key)) {
+ return Optional.of(type);
+ }
+ }
+ return Optional.empty();
+ }
+
+ /**
+ * Extracts the app profile id from {@link #getRequestParams()}. Returns {@link Optional#empty()}
+ * if the key is missing. The value will be url decoded.
+ */
+ public Optional extractAppProfileId() throws ParsingException {
+ String requestParams = getRequestParams().orElse("");
+
+ for (String encodedPair : requestParams.split("&")) {
+ if (!encodedPair.startsWith("app_profile_id=")) {
+ continue;
+ }
+ String[] parts = encodedPair.split("=", 2);
+ String encodedValue = parts.length > 1 ? parts[1] : "";
+ return Optional.of(percentDecode(encodedValue));
+ }
+ return Optional.empty();
+ }
+
+ private static String percentDecode(String s) throws ParsingException {
+ try {
+ return URLDecoder.decode(s, StandardCharsets.UTF_8);
+ } catch (RuntimeException e) {
+ throw new ParsingException("Failed to url decode " + s, e);
+ }
+ }
+
+ /**
+ * Can be derived from {@link CallLabels} to create a priming request to keep the channel active
+ * for future RPCs.
+ */
+ @AutoValue
+ public abstract static class PrimingKey {
+ protected abstract Map getMetadata();
+
+ protected abstract String getName();
+
+ protected abstract Optional getAppProfileId();
+
+ public static Optional from(CallLabels labels) throws ParsingException {
+ final ImmutableMap.Builder md = ImmutableMap.builder();
+
+ Optional resourceName = labels.extractResourceName();
+ if (resourceName.isEmpty()) {
+ return Optional.empty();
+ }
+ String[] resourceNameParts = resourceName.get().split("/", 5);
+ if (resourceNameParts.length < 4
+ || !resourceNameParts[0].equals("projects")
+ || !resourceNameParts[2].equals("instances")) {
+ return Optional.empty();
+ }
+ String instanceName =
+ "projects/" + resourceNameParts[1] + "/instances/" + resourceNameParts[3];
+ StringBuilder reqParams =
+ new StringBuilder()
+ .append("name=")
+ .append(URLEncoder.encode(instanceName, StandardCharsets.UTF_8));
+
+ Optional appProfileId = labels.extractAppProfileId();
+ appProfileId.ifPresent(val -> reqParams.append("&app_profile_id=").append(val));
+ md.put(REQUEST_PARAMS.name(), reqParams.toString());
+
+ labels
+ .getLegacyResourcePrefix()
+ .ifPresent(ignored -> md.put(LEGACY_RESOURCE_PREFIX.name(), instanceName));
+
+ labels.getRoutingCookie().ifPresent(c -> md.put(ROUTING_COOKIE.name(), c));
+
+ labels.getEncodedFeatures().ifPresent(c -> md.put(FEATURE_FLAGS.name(), c));
+
+ labels.getApiClient().ifPresent(c -> md.put(API_CLIENT.name(), c));
+
+ return Optional.of(
+ new AutoValue_CallLabels_PrimingKey(md.build(), instanceName, appProfileId));
+ }
+
+ public Metadata composeMetadata() {
+ Metadata md = new Metadata();
+ for (Entry e : getMetadata().entrySet()) {
+ md.put(Key.of(e.getKey(), Metadata.ASCII_STRING_MARSHALLER), e.getValue());
+ }
+ return md;
+ }
+
+ public PingAndWarmRequest composeProto() {
+ Builder builder = PingAndWarmRequest.newBuilder().setName(getName());
+ getAppProfileId().ifPresent(builder::setAppProfileId);
+ return builder.build();
+ }
+ }
+
+ public static class ParsingException extends Exception {
+
+ public ParsingException(String message) {
+ super(message);
+ }
+
+ public ParsingException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
+}
diff --git a/bigtable/bigtable-proxy/src/main/java/com/google/cloud/bigtable/examples/proxy/core/CallProxy.java b/bigtable/bigtable-proxy/src/main/java/com/google/cloud/bigtable/examples/proxy/core/CallProxy.java
new file mode 100644
index 00000000000..6285bc5896f
--- /dev/null
+++ b/bigtable/bigtable-proxy/src/main/java/com/google/cloud/bigtable/examples/proxy/core/CallProxy.java
@@ -0,0 +1,186 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.bigtable.examples.proxy.core;
+
+import com.google.cloud.bigtable.examples.proxy.metrics.Tracer;
+import com.google.common.base.Stopwatch;
+import io.grpc.ClientCall;
+import io.grpc.Metadata;
+import io.grpc.ServerCall;
+import io.grpc.Status;
+import javax.annotation.concurrent.GuardedBy;
+
+/** A per gppc RPC proxy. */
+class CallProxy {
+
+ private final Tracer tracer;
+ final RequestProxy serverCallListener;
+ final ResponseProxy clientCallListener;
+
+ private final Stopwatch downstreamStopwatch = Stopwatch.createUnstarted();
+
+ /**
+ * @param tracer a lifecycle observer to publish metrics.
+ * @param serverCall the incoming server call. This will be triggered a customer client.
+ * @param clientCall the outgoing call to Bigtable service. This will be created by {@link
+ * ProxyHandler}
+ */
+ public CallProxy(
+ Tracer tracer, ServerCall serverCall, ClientCall clientCall) {
+ this.tracer = tracer;
+ // Listen for incoming request messages and send them to the upstream ClientCall
+ // The RequestProxy will respect back pressure from the ClientCall and only request a new
+ // message from the incoming rpc when the upstream client call is ready,
+ serverCallListener = new RequestProxy(clientCall);
+
+ // Listen from response messages from the upstream ClientCall and relay them to the customer's
+ // client. This will respect backpressure and request new messages from the upstream when the
+ // customer's client is ready.
+ clientCallListener = new ResponseProxy(serverCall);
+ }
+
+ /**
+ * Back pressure aware message pump of request messages from a customer's downstream client to
+ * upstream Bigtable service.
+ *
+ *
Additional messages are requested from the downstream while the upstream's isReady() flag is
+ * set. As soon as the upstream signals that is full by returning false for isReady(). {@link
+ * RequestProxy} will remember that the need to get more messages from downstream and then wait
+ * until the upstream signals readiness via onClientReady().
+ *
+ *
Please note in the current Bigtable protocol, all RPCs a client unary. Until that changes,
+ * this proxy will only have a single iteration. However, its designed generically to support
+ * future usecases.
+ */
+ private class RequestProxy extends ServerCall.Listener {
+
+ private final ClientCall clientCall;
+
+ @GuardedBy("this")
+ private boolean needToRequest;
+
+ public RequestProxy(ClientCall clientCall) {
+ this.clientCall = clientCall;
+ }
+
+ @Override
+ public void onCancel() {
+ clientCall.cancel("Server cancelled", null);
+ }
+
+ @Override
+ public void onHalfClose() {
+ clientCall.halfClose();
+ }
+
+ @Override
+ public void onMessage(ReqT message) {
+ clientCall.sendMessage(message);
+ synchronized (this) {
+ if (clientCall.isReady()) {
+ clientCallListener.serverCall.request(1);
+ } else {
+ // The outgoing call is not ready for more requests. Stop requesting additional data and
+ // wait for it to catch up.
+ needToRequest = true;
+ }
+ }
+ }
+
+ @Override
+ public void onReady() {
+ clientCallListener.onServerReady();
+ }
+
+ // Called from ResponseProxy, which is a different thread than the ServerCall.Listener
+ // callbacks.
+ synchronized void onClientReady() {
+ if (needToRequest) {
+ // When the upstream client is ready for another request message from the customer's client,
+ // ask for one more message.
+ clientCallListener.serverCall.request(1);
+ needToRequest = false;
+ }
+ }
+ }
+
+ /**
+ * Back pressure aware message pump of response messages from upstream Bigtable service to a
+ * customer's downstream client.
+ *
+ *
Additional messages are requested from the upstream while the downstream's isReady() flag is
+ * set. As soon as the downstream signals that is full by returning false for isReady(). {@link
+ * ResponseProxy} will remember that the need to get more messages from upstream and then wait
+ * until the downstream signals readiness via onServerReady().
+ */
+ private class ResponseProxy extends ClientCall.Listener {
+
+ private final ServerCall, RespT> serverCall;
+
+ @GuardedBy("this")
+ private boolean needToRequest;
+
+ public ResponseProxy(ServerCall, RespT> serverCall) {
+ this.serverCall = serverCall;
+ }
+
+ @Override
+ public void onClose(Status status, Metadata trailers) {
+ tracer.onCallFinished(status);
+
+ serverCall.close(status, trailers);
+ }
+
+ @Override
+ public void onHeaders(Metadata headers) {
+ serverCall.sendHeaders(headers);
+ }
+
+ @Override
+ public void onMessage(RespT message) {
+ serverCall.sendMessage(message);
+ synchronized (this) {
+ if (serverCall.isReady()) {
+ serverCallListener.clientCall.request(1);
+ } else {
+ // The incoming call is not ready for more responses. Stop requesting additional data
+ // and wait for it to catch up.
+ needToRequest = true;
+ downstreamStopwatch.reset().start();
+ }
+ }
+ }
+
+ @Override
+ public void onReady() {
+ serverCallListener.onClientReady();
+ }
+
+ // Called from RequestProxy, which is a different thread than the ClientCall.Listener
+ // callbacks.
+ synchronized void onServerReady() {
+ if (downstreamStopwatch.isRunning()) {
+ tracer.onDownstreamLatency(downstreamStopwatch.elapsed());
+ downstreamStopwatch.stop();
+ }
+ if (needToRequest) {
+ serverCallListener.clientCall.request(1);
+ needToRequest = false;
+ }
+ }
+ }
+}
diff --git a/bigtable/bigtable-proxy/src/main/java/com/google/cloud/bigtable/examples/proxy/core/ProxyHandler.java b/bigtable/bigtable-proxy/src/main/java/com/google/cloud/bigtable/examples/proxy/core/ProxyHandler.java
new file mode 100644
index 00000000000..dfdbdd24ba2
--- /dev/null
+++ b/bigtable/bigtable-proxy/src/main/java/com/google/cloud/bigtable/examples/proxy/core/ProxyHandler.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.bigtable.examples.proxy.core;
+
+import com.google.cloud.bigtable.examples.proxy.metrics.Metrics;
+import com.google.cloud.bigtable.examples.proxy.metrics.Tracer;
+import io.grpc.CallCredentials;
+import io.grpc.CallOptions;
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.Metadata;
+import io.grpc.ServerCall;
+import io.grpc.ServerCallHandler;
+
+/** A factory pairing of an incoming server call to an outgoing client call. */
+public final class ProxyHandler implements ServerCallHandler {
+ private static final Metadata.Key AUTHORIZATION_KEY =
+ Metadata.Key.of("Authorization", Metadata.ASCII_STRING_MARSHALLER);
+
+ private final Metrics metrics;
+ private final Channel channel;
+ private final CallCredentials callCredentials;
+
+ public ProxyHandler(Metrics metrics, Channel channel, CallCredentials callCredentials) {
+ this.metrics = metrics;
+ this.channel = channel;
+ this.callCredentials = callCredentials;
+ }
+
+ @Override
+ public ServerCall.Listener startCall(ServerCall serverCall, Metadata headers) {
+ CallLabels callLabels = CallLabels.create(serverCall.getMethodDescriptor(), headers);
+ Tracer tracer = new Tracer(metrics, callLabels);
+
+ // Inject proxy credentials
+ CallOptions callOptions = CallOptions.DEFAULT.withCallCredentials(callCredentials);
+ callOptions = tracer.injectIntoCallOptions(callOptions);
+
+ // Strip incoming credentials
+ headers.removeAll(AUTHORIZATION_KEY);
+
+ ClientCall clientCall =
+ channel.newCall(serverCall.getMethodDescriptor(), callOptions);
+
+ CallProxy proxy = new CallProxy<>(tracer, serverCall, clientCall);
+ clientCall.start(proxy.clientCallListener, headers);
+ serverCall.request(1);
+ clientCall.request(1);
+ return proxy.serverCallListener;
+ }
+}
diff --git a/bigtable/bigtable-proxy/src/main/java/com/google/cloud/bigtable/examples/proxy/core/Registry.java b/bigtable/bigtable-proxy/src/main/java/com/google/cloud/bigtable/examples/proxy/core/Registry.java
new file mode 100644
index 00000000000..bed62c292e0
--- /dev/null
+++ b/bigtable/bigtable-proxy/src/main/java/com/google/cloud/bigtable/examples/proxy/core/Registry.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.bigtable.examples.proxy.core;
+
+import com.google.common.collect.ImmutableMap;
+import io.grpc.HandlerRegistry;
+import io.grpc.MethodDescriptor;
+import io.grpc.ServerCallHandler;
+import io.grpc.ServerMethodDefinition;
+import java.util.Map;
+
+/**
+ * Contains the service name -> handler mapping. This acts as an aggregate service.
+ *
+ *
The handlers treat requests and responses as raw byte arrays.
+ */
+public class Registry extends HandlerRegistry {
+ private final MethodDescriptor.Marshaller byteMarshaller = new ByteMarshaller();
+ private final Map> serviceMap;
+
+ public Registry(Map> serviceMap) {
+ this.serviceMap = ImmutableMap.copyOf(serviceMap);
+ }
+
+ @Override
+ public ServerMethodDefinition, ?> lookupMethod(String methodName, String authority) {
+ MethodDescriptor methodDescriptor =
+ MethodDescriptor.newBuilder(byteMarshaller, byteMarshaller)
+ .setFullMethodName(methodName)
+ .setType(MethodDescriptor.MethodType.UNKNOWN)
+ .build();
+
+ ServerCallHandler handler = serviceMap.get(methodDescriptor.getServiceName());
+ if (handler == null) {
+ return null;
+ }
+
+ return ServerMethodDefinition.create(methodDescriptor, handler);
+ }
+}
diff --git a/bigtable/bigtable-proxy/src/main/java/com/google/cloud/bigtable/examples/proxy/metrics/InstrumentedCallCredentials.java b/bigtable/bigtable-proxy/src/main/java/com/google/cloud/bigtable/examples/proxy/metrics/InstrumentedCallCredentials.java
new file mode 100644
index 00000000000..14d1454a22f
--- /dev/null
+++ b/bigtable/bigtable-proxy/src/main/java/com/google/cloud/bigtable/examples/proxy/metrics/InstrumentedCallCredentials.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.bigtable.examples.proxy.metrics;
+
+import com.google.cloud.bigtable.examples.proxy.channelpool.DataChannel;
+import com.google.cloud.bigtable.examples.proxy.core.CallLabels.PrimingKey;
+import com.google.cloud.bigtable.examples.proxy.core.ProxyHandler;
+import com.google.common.base.Stopwatch;
+import io.grpc.CallCredentials;
+import io.grpc.CallOptions;
+import io.grpc.InternalMayRequireSpecificExecutor;
+import io.grpc.Metadata;
+import io.grpc.ServerCall;
+import io.grpc.Status;
+import java.time.Duration;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link CallCredentials} decorator that tracks latency for fetching credentials.
+ *
+ *
This expects that all RPCs that use these credentials embed a {@link Tracer} in the {@link
+ * io.grpc.CallOptions} using {@link Tracer#injectIntoCallOptions(CallOptions)}.
+ *
+ *