From ca0bf942d65d694a1a2ee2aa384726b1c0f61e60 Mon Sep 17 00:00:00 2001 From: catpineapple Date: Thu, 11 Dec 2025 18:54:31 +0800 Subject: [PATCH] enable pvc storage modify --- api/disaggregated/v1/types.go | 10 + config/crd/bases/crds.yaml | 18 + ....doris.com_dorisdisaggregatedclusters.yaml | 18 + config/operator/disaggregated-operator.yaml | 1 + config/operator/operator-sign-ns.yaml | 428 ++++++++++++++++++ config/operator/operator.yaml | 1 + doc/dorisctl_usage_cn.md | 132 ++++++ ....doris.com_dorisdisaggregatedclusters.yaml | 18 + pkg/common/utils/mysql/mysql.go | 4 +- .../utils/resource/persistent_volume_claim.go | 41 +- .../computegroups/controller.go | 112 ++--- .../disaggregated_fe/controller.go | 18 +- .../metaservice/controller.go | 21 +- .../disaggregated_subcontroller.go | 136 +++++- 14 files changed, 867 insertions(+), 91 deletions(-) create mode 100644 config/operator/operator-sign-ns.yaml create mode 100644 doc/dorisctl_usage_cn.md diff --git a/api/disaggregated/v1/types.go b/api/disaggregated/v1/types.go index 2712ce30..30a416e4 100644 --- a/api/disaggregated/v1/types.go +++ b/api/disaggregated/v1/types.go @@ -246,8 +246,18 @@ type PersistentVolume struct { //Annotation for PVC pods. Users can adapt the storage authentication and pv binding of the cloud platform through configuration. //It only takes effect in the first configuration and cannot be added or modified later. Annotations map[string]string `json:"annotations,omitempty"` + + // defines pvc provisioner, default is ''. + PVCProvisioner PVCProvisioner `json:"provisioner,omitempty"` } +type PVCProvisioner string + +// Possible values of PVC provisioner +const ( + PVCProvisionerOperator PVCProvisioner = "operator" +) + type Secret struct { //specify the secret need to be mounted in deployed namespace. SecretName string `json:"secretName,omitempty"` diff --git a/config/crd/bases/crds.yaml b/config/crd/bases/crds.yaml index 6c776c0a..af694b13 100644 --- a/config/crd/bases/crds.yaml +++ b/config/crd/bases/crds.yaml @@ -10841,6 +10841,9 @@ spec: the PersistentVolume backing this claim. type: string type: object + provisioner: + description: defines pvc provisioner, default is ''. + type: string type: object persistentVolumes: description: volume template for mountPath @@ -11071,6 +11074,9 @@ spec: the PersistentVolume backing this claim. type: string type: object + provisioner: + description: defines pvc provisioner, default is ''. + type: string type: object type: array replicas: @@ -13079,6 +13085,9 @@ spec: PersistentVolume backing this claim. type: string type: object + provisioner: + description: defines pvc provisioner, default is ''. + type: string type: object persistentVolumes: description: volume template for mountPath @@ -13308,6 +13317,9 @@ spec: the PersistentVolume backing this claim. type: string type: object + provisioner: + description: defines pvc provisioner, default is ''. + type: string type: object type: array replicas: @@ -15327,6 +15339,9 @@ spec: PersistentVolume backing this claim. type: string type: object + provisioner: + description: defines pvc provisioner, default is ''. + type: string type: object persistentVolumes: description: volume template for mountPath @@ -15556,6 +15571,9 @@ spec: the PersistentVolume backing this claim. type: string type: object + provisioner: + description: defines pvc provisioner, default is ''. + type: string type: object type: array replicas: diff --git a/config/crd/bases/disaggregated.cluster.doris.com_dorisdisaggregatedclusters.yaml b/config/crd/bases/disaggregated.cluster.doris.com_dorisdisaggregatedclusters.yaml index c4121592..c1e753eb 100644 --- a/config/crd/bases/disaggregated.cluster.doris.com_dorisdisaggregatedclusters.yaml +++ b/config/crd/bases/disaggregated.cluster.doris.com_dorisdisaggregatedclusters.yaml @@ -1700,6 +1700,9 @@ spec: the PersistentVolume backing this claim. type: string type: object + provisioner: + description: defines pvc provisioner, default is ''. + type: string type: object persistentVolumes: description: volume template for mountPath @@ -1930,6 +1933,9 @@ spec: the PersistentVolume backing this claim. type: string type: object + provisioner: + description: defines pvc provisioner, default is ''. + type: string type: object type: array replicas: @@ -3938,6 +3944,9 @@ spec: PersistentVolume backing this claim. type: string type: object + provisioner: + description: defines pvc provisioner, default is ''. + type: string type: object persistentVolumes: description: volume template for mountPath @@ -4167,6 +4176,9 @@ spec: the PersistentVolume backing this claim. type: string type: object + provisioner: + description: defines pvc provisioner, default is ''. + type: string type: object type: array replicas: @@ -6186,6 +6198,9 @@ spec: PersistentVolume backing this claim. type: string type: object + provisioner: + description: defines pvc provisioner, default is ''. + type: string type: object persistentVolumes: description: volume template for mountPath @@ -6415,6 +6430,9 @@ spec: the PersistentVolume backing this claim. type: string type: object + provisioner: + description: defines pvc provisioner, default is ''. + type: string type: object type: array replicas: diff --git a/config/operator/disaggregated-operator.yaml b/config/operator/disaggregated-operator.yaml index db73b990..6adf837e 100644 --- a/config/operator/disaggregated-operator.yaml +++ b/config/operator/disaggregated-operator.yaml @@ -162,6 +162,7 @@ rules: - get - list - watch + - create - update - patch - delete diff --git a/config/operator/operator-sign-ns.yaml b/config/operator/operator-sign-ns.yaml new file mode 100644 index 00000000..fa2005fd --- /dev/null +++ b/config/operator/operator-sign-ns.yaml @@ -0,0 +1,428 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# permissions to do leader election. +apiVersion: v1 +kind: Namespace +metadata: + labels: + control-plane: doris-operator + app.kubernetes.io/name: namespace + app.kubernetes.io/instance: doris + app.kubernetes.io/component: doris-operator + app.kubernetes.io/part-of: doris-operator + name: doris +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + labels: + app.kubernetes.io/name: role + app.kubernetes.io/instance: leader-election-role + app.kubernetes.io/component: rbac + app.kubernetes.io/created-by: doris-operator + app.kubernetes.io/part-of: doris-operator + app.kubernetes.io/managed-by: kustomize + name: leader-election-role + namespace: doris +rules: + - apiGroups: + - "" + resources: + - configmaps + verbs: + - get + - list + - watch + - create + - update + - patch + - delete + - apiGroups: + - coordination.k8s.io + resources: + - leases + verbs: + - get + - list + - watch + - create + - update + - patch + - delete + - apiGroups: + - "" + resources: + - events + verbs: + - create + - patch +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + labels: + app.kubernetes.io/name: rolebinding + app.kubernetes.io/instance: leader-election-rolebinding + app.kubernetes.io/component: rbac + app.kubernetes.io/created-by: doris-operator + app.kubernetes.io/part-of: doris-operator + app.kubernetes.io/managed-by: kustomize + name: leader-election-rolebinding + namespace: doris +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: leader-election-role +subjects: + - kind: ServiceAccount + name: doris-operator + namespace: doris +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + labels: + app.kubernetes.io/name: role + app.kubernetes.io/instance: doris-operator + app.kubernetes.io/component: rbac + app.kubernetes.io/created-by: doris-operator + app.kubernetes.io/part-of: doris-operator + app.kubernetes.io/managed-by: kustomize + name: doris-operator + namespace: doris +rules: + - apiGroups: + - apps + resources: + - statefulsets + verbs: + - create + - delete + - get + - list + - patch + - update + - watch + - apiGroups: + - apps + resources: + - statefulsets/status + verbs: + - get + - apiGroups: + - autoscaling + resources: + - horizontalpodautoscalers + verbs: + - create + - delete + - get + - list + - patch + - update + - watch + - apiGroups: + - "" + resources: + - configmaps + verbs: + - get + - list + - watch + - apiGroups: + - "" + resources: + - endpoints + verbs: + - get + - list + - watch + - apiGroups: + - "" + resources: + - pods + verbs: + - get + - list + - watch + - update + - apiGroups: + - "" + resources: + - persistentvolumeclaims + verbs: + - get + - list + - watch + - create + - update + - patch + - delete + - apiGroups: + - "" + resources: + - secrets + verbs: + - get + - update + - list + - watch + - apiGroups: + - "" + resources: + - serviceaccounts + verbs: + - create + - delete + - get + - list + - patch + - update + - watch + - apiGroups: + - "" + resources: + - services + verbs: + - create + - delete + - get + - list + - patch + - update + - watch + - apiGroups: + - doris.selectdb.com + resources: + - dorisclusters + verbs: + - create + - delete + - get + - list + - patch + - update + - watch + - apiGroups: + - disaggregated.cluster.doris.com + resources: + - dorisdisaggregatedclusters + verbs: + - create + - delete + - get + - list + - patch + - update + - watch + - apiGroups: + - apps.foundationdb.org + resources: + - foundationdbclusters + verbs: + - create + - delete + - get + - list + - patch + - update + - watch + - apiGroups: + - doris.selectdb.com + resources: + - dorisclusters/finalizers + verbs: + - update + - apiGroups: + - doris.selectdb.com + resources: + - dorisclusters/status + verbs: + - get + - patch + - update + - apiGroups: + - disaggregated.cluster.doris.com + resources: + - dorisdisaggregatedclusters/status + verbs: + - get + - patch + - update + - apiGroups: + - rbac.authorization.k8s.io + resources: + - rolebindings + verbs: + - create + - delete + - get + - list + - patch + - update + - watch +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + labels: + app.kubernetes.io/name: rolebinding + app.kubernetes.io/instance: doris-operator-rolebinding + app.kubernetes.io/component: rbac + app.kubernetes.io/created-by: doris-operator + app.kubernetes.io/part-of: doris-operator + app.kubernetes.io/managed-by: kustomize + name: doris-operator-rolebinding + namespace: doris +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: doris-operator +subjects: + - kind: ServiceAccount + name: doris-operator + namespace: doris +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + labels: + app.kubernetes.io/name: serviceaccount + app.kubernetes.io/instance: controller-doris-operator-sa + app.kubernetes.io/component: rbac + app.kubernetes.io/created-by: doris-operator + app.kubernetes.io/part-of: doris-operator + app.kubernetes.io/managed-by: kustomize + name: doris-operator + namespace: doris +--- +apiVersion: v1 +kind: Secret +metadata: + name: doris-operator-secret-cert + namespace: doris + labels: + control-plane: doris-operator + app.kubernetes.io/instance: doris-operator +--- +apiVersion: v1 +kind: Service +metadata: + labels: + app.kubernetes.io/created-by: doris-operator + app.kubernetes.io/part-of: doris-operator + name: doris-operator-service + namespace: doris +spec: + ports: + - name: https + port: 443 + targetPort: 9443 + selector: + control-plane: doris-operator + app.kubernetes.io/name: deployment + app.kubernetes.io/instance: doris-operator + app.kubernetes.io/created-by: doris-operator + app.kubernetes.io/part-of: doris-operator +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: doris-operator + namespace: doris + labels: + control-plane: doris-operator + app.kubernetes.io/name: deployment + app.kubernetes.io/instance: doris-operator + app.kubernetes.io/created-by: doris-operator + app.kubernetes.io/part-of: doris-operator +spec: + selector: + matchLabels: + control-plane: doris-operator + replicas: 1 + template: + metadata: + annotations: + kubectl.kubernetes.io/default-container: doris-operator + labels: + control-plane: doris-operator + app.kubernetes.io/name: deployment + app.kubernetes.io/instance: doris-operator + app.kubernetes.io/created-by: doris-operator + app.kubernetes.io/part-of: doris-operator + spec: + securityContext: + runAsNonRoot: true + containers: + - command: + - /dorisoperator + args: + - --leader-elect + - -namespace=doris + image: apache/doris:operator-latest + imagePullPolicy: Always + name: dorisoperator + securityContext: + allowPrivilegeEscalation: false + capabilities: + drop: + - "ALL" + env: + - name: ENABLE_WEBHOOK + value: "false" + - name: START_DISAGGREGATED_OPERATOR + value: "true" + - name: OPERATOR_NAMESPACE + value: "doris" + - name: OPERATOR_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: SERVICE_NAME + value: doris-operator-service + livenessProbe: + httpGet: + path: /healthz + port: 8081 + initialDelaySeconds: 15 + periodSeconds: 20 + readinessProbe: + httpGet: + path: /readyz + port: 8081 + initialDelaySeconds: 5 + periodSeconds: 10 + resources: + limits: + cpu: "2" + memory: 4Gi + requests: + cpu: "1" + memory: 2Gi + volumeMounts: + - mountPath: /tmp/k8s-webhook-server/serving-certs + name: cert + readOnly: true + volumes: + - name: cert + secret: + defaultMode: 420 + secretName: doris-operator-secret-cert + serviceAccountName: doris-operator + terminationGracePeriodSeconds: 10 \ No newline at end of file diff --git a/config/operator/operator.yaml b/config/operator/operator.yaml index db73b990..6adf837e 100644 --- a/config/operator/operator.yaml +++ b/config/operator/operator.yaml @@ -162,6 +162,7 @@ rules: - get - list - watch + - create - update - patch - delete diff --git a/doc/dorisctl_usage_cn.md b/doc/dorisctl_usage_cn.md new file mode 100644 index 00000000..6116eae3 --- /dev/null +++ b/doc/dorisctl_usage_cn.md @@ -0,0 +1,132 @@ + +# `dorisctl` 使用指南 + +> 此文档由AI生成,仅供参考。 + +`dorisctl` 是 Doris Operator 自带的命令行工具,用于从集群前端(FE)拉取节点元数据,帮助运维人员快速排查集群状态。它通过 FE 暴露的 MySQL 协议执行 `SHOW FRONTENDS` / `SHOW BACKENDS` 等语句,因此使用前需要确保能够以有权限的账号连接到 FE。 + +> 当前版本聚焦于只读能力,核心命令为 `get`。未来扩展命令时,文档会随版本更新。 + +## 构建与安装 + +仓库根目录的 `Makefile` 已内置构建流程。 + +- **推荐方式**:在仓库根目录执行 `make build`,可同时生成 `bin/dorisctl`、`bin/dorisoperator` 等二进制。 +- **单独构建**:执行 `go build -o bin/dorisctl ./cmd/dorisctl`。 + +构建完成后,将 `bin/` 目录加入 `PATH`,或直接使用绝对路径运行。 + +## 连接前的准备与全局参数 + +所有子命令共享同一组全局参数,用于描述 FE 连接信息: + +| 参数 | 说明 | 备注 | +| --- | --- | --- | +| `--fe-host` | FE 对外访问地址 | **必填**。可以是域名或 IP。 | +| `--query-port` | FE MySQL 协议端口 | 默认 `9030`。如果 FE 使用自定义端口需要显式指定。 | +| `--user` | 登录用户名 | **必填**。必须具备执行 `SHOW FRONTENDS/BACKENDS` 的权限。 | +| `--password` | 登录密码 | 可以通过环境变量/交互方式置入,命令行将回显。 | +| `--ssl-ca` | CA 根证书路径 | 如果 FE 启用了 TLS,则需同时提供 `--ssl-cert` 和 `--ssl-key`。 | +| `--ssl-cert` | 客户端证书路径 | | +| `--ssl-key` | 客户端私钥路径 | | + +这些参数会传递给内部的 Doris 客户端(`pkg/common/cmd/util/client.go`),后者基于 `mysql` 驱动建立连接。TLS 选项存在缺一不可的约束:只要指定了 `--ssl-ca`,就必须同时提供 `--ssl-cert` 与 `--ssl-key`。 + +### 认证与权限小贴士 + +- 建议为 `dorisctl` 准备只读账号,至少授予 `SHOW FRONTENDS`、`SHOW BACKENDS` 权限。 +- 若连接失败或权限不足,`dorisctl` 会直接输出驱动返回的错误信息,可据此排查网络、防火墙或账号策略问题。 + +## 基本用法 + +命令模板: + +``` +dorisctl [全局参数] <子命令> <资源类型> <资源标识> [命令参数] +``` + +- 全局参数可以放在命令任意位置,最佳实践是在子命令前显式传入。 +- 当前仅实现 `get` 子命令,资源类型支持 `node`;`computegroup` 为预留关键字,暂未实现(调用会无输出)。 + +### `get` ——查询节点元数据 + +`get` 子命令用于查看单个 FE 或 BE 节点的详细状态。执行流程如下: + +1. 建立到 FE 的 MySQL 连接。 +2. 顺序执行 `SHOW FRONTENDS` 和 `SHOW BACKENDS`。 +3. 根据传入的 `资源标识`(节点 `Host` 字段)匹配到对应记录。 +4. 将结果以 JSON 形式输出到标准输出。 + +语法: + +``` +dorisctl [全局参数] get node [-o <输出选项>] +``` + +- `` 必须与 FE/BE 在 `SHOW` 结果中的 `Host` 字段完全一致。 +- 如果目标节点在两类列表均不存在,将不会返回内容(也不报错)。 + +#### 输出控制 `-o, --output` + +- 默认输出为格式化 JSON。 +- 当使用 `-o custom-columns=<字段路径>` 时,可以提取指定字段: + - 常规字段采用 `gjson` 语法,例如:`-o custom-columns=role` 输出 FE 的角色。 + - 当字段位于 BE 的标签(`Tag`,JSON 字符串)内时,可使用 `tag.<子字段>`,工具会自动展开标签 JSON。例如: + + ```bash + dorisctl --fe-host fe.example.com \ + --user monitor --password secret \ + get node be-1.example.com \ + -o custom-columns=tag.compute_group_name + ``` + + 如果标签缺失或字段不存在,将返回空行。 + +> 目前 `yaml` 等选项并未单独实现,传入 `-o yaml` 时与默认输出相同。 + +#### 示例 + +- **查看 FE 节点详情** + + ```bash + dorisctl --fe-host fe-1.prod.svc.cluster.local \ + --user monitor --password ***** \ + get node fe-1.prod.svc.cluster.local + ``` + +- **查询 BE 标签中的计算组信息** + + ```bash + dorisctl --fe-host fe-1.prod.svc.cluster.local \ + --user monitor --password ***** \ + get node be-3.prod.svc.cluster.local \ + -o custom-columns=tag.compute_group_name + ``` + +- **启用 TLS 访问 FE** + + ```bash + dorisctl --fe-host fe-ssl.prod.svc.cluster.local \ + --query-port 9430 \ + --user monitor --password ***** \ + --ssl-ca /etc/doris/ca.pem \ + --ssl-cert /etc/doris/client.crt \ + --ssl-key /etc/doris/client.key \ + get node be-3.prod.svc.cluster.local + ``` + +## 常见问题排查 + +| 场景 | 现象 | 建议处理 | +| --- | --- | --- | +| 连接失败 | 输出类似 `dial tcp: lookup ...` 或 `i/o timeout` | 检查 FE 地址/端口、防火墙或 K8s Service 是否暴露 MySQL 端口。 | +| 认证失败 | 输出 `Access denied for user` | 确认用户/密码或账号权限;若使用 LDAP/外部认证,需在 FE 侧开启相应配置。 | +| 输出为空 | 命令执行正常但无内容 | 核实 `Host` 是否与 Doris 显示字段一致,必要时先登录 FE 手动执行 `SHOW FRONTENDS`/`SHOW BACKENDS`。 | +| `custom-columns` 返回空字符串 | 字段名称不匹配 | 使用 `dorisctl ... get node ` 默认输出查看真实 JSON 字段,确认路径后再组合 `custom-columns`。 | + +## 后续规划 + +- `computegroup` 资源读取逻辑目前为空壳,如需此能力可关注后续版本或自行在 `pkg/common/cmd/get/get.go` 中实现。 +- 如果需要批量查询/过滤,可考虑在外层脚本结合 `dorisctl` 与 `jq`/`gjson` 等工具。 + +如在使用过程中遇到新的问题,欢迎在仓库 Issue 中反馈。 \ No newline at end of file diff --git a/helm-charts/doris-operator/crds/disaggregated.cluster.doris.com_dorisdisaggregatedclusters.yaml b/helm-charts/doris-operator/crds/disaggregated.cluster.doris.com_dorisdisaggregatedclusters.yaml index c4121592..c1e753eb 100644 --- a/helm-charts/doris-operator/crds/disaggregated.cluster.doris.com_dorisdisaggregatedclusters.yaml +++ b/helm-charts/doris-operator/crds/disaggregated.cluster.doris.com_dorisdisaggregatedclusters.yaml @@ -1700,6 +1700,9 @@ spec: the PersistentVolume backing this claim. type: string type: object + provisioner: + description: defines pvc provisioner, default is ''. + type: string type: object persistentVolumes: description: volume template for mountPath @@ -1930,6 +1933,9 @@ spec: the PersistentVolume backing this claim. type: string type: object + provisioner: + description: defines pvc provisioner, default is ''. + type: string type: object type: array replicas: @@ -3938,6 +3944,9 @@ spec: PersistentVolume backing this claim. type: string type: object + provisioner: + description: defines pvc provisioner, default is ''. + type: string type: object persistentVolumes: description: volume template for mountPath @@ -4167,6 +4176,9 @@ spec: the PersistentVolume backing this claim. type: string type: object + provisioner: + description: defines pvc provisioner, default is ''. + type: string type: object type: array replicas: @@ -6186,6 +6198,9 @@ spec: PersistentVolume backing this claim. type: string type: object + provisioner: + description: defines pvc provisioner, default is ''. + type: string type: object persistentVolumes: description: volume template for mountPath @@ -6415,6 +6430,9 @@ spec: the PersistentVolume backing this claim. type: string type: object + provisioner: + description: defines pvc provisioner, default is ''. + type: string type: object type: array replicas: diff --git a/pkg/common/utils/mysql/mysql.go b/pkg/common/utils/mysql/mysql.go index eba693da..faa0076c 100644 --- a/pkg/common/utils/mysql/mysql.go +++ b/pkg/common/utils/mysql/mysql.go @@ -149,13 +149,13 @@ func (db *DB) Select(dest interface{}, query string, args ...interface{}) error func (db *DB) ShowFrontends() ([]*Frontend, error) { var fes []*Frontend - err := db.Select(&fes, "show frontends") + err := db.Unsafe().Select(&fes, "show frontends") return fes, err } func (db *DB) ShowBackends() ([]*Backend, error) { var bes []*Backend - err := db.Select(&bes, "show backends") + err := db.Unsafe().Select(&bes, "show backends") return bes, err } diff --git a/pkg/common/utils/resource/persistent_volume_claim.go b/pkg/common/utils/resource/persistent_volume_claim.go index bbcd6742..8a9f122c 100644 --- a/pkg/common/utils/resource/persistent_volume_claim.go +++ b/pkg/common/utils/resource/persistent_volume_claim.go @@ -18,6 +18,9 @@ package resource import ( + "strings" + + dv1 "github.com/apache/doris-operator/api/disaggregated/v1" dorisv1 "github.com/apache/doris-operator/api/doris/v1" "github.com/apache/doris-operator/pkg/common/utils/doris" "github.com/apache/doris-operator/pkg/common/utils/hash" @@ -25,7 +28,6 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" - "strings" ) var ( @@ -33,6 +35,11 @@ var ( pvc_manager_annotation = "selectdb.doris.com/pvc-manager" ) +const ( + pvcFinalizerApache = "apache.doris.org/pvc-finalizer" + PVCManagerAnnotationApache = "apache.doris.org/pvc-manager" +) + func BuildPVCName(stsName, ordinal, volumeName string) string { pvcName := stsName + "-" + ordinal if volumeName != "" { @@ -57,6 +64,24 @@ func BuildPVC(volume dorisv1.PersistentVolume, labels map[string]string, namespa return pvc } +func BuildDisaggregatedPVC( + pvcTemplate corev1.PersistentVolumeClaim, + labels map[string]string, + namespace, stsName, ordinal string) corev1.PersistentVolumeClaim { + + pvc := corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: BuildPVCName(stsName, ordinal, pvcTemplate.Name), + Namespace: namespace, + Labels: labels, + Annotations: pvcTemplate.Annotations, + Finalizers: []string{pvcFinalizerApache}, + }, + Spec: pvcTemplate.Spec, + } + return pvc +} + // finalAnnotations is a combination of user annotations and operator default annotations func buildPVCAnnotations(volume dorisv1.PersistentVolume) Annotations { annotations := Annotations{} @@ -71,6 +96,20 @@ func buildPVCAnnotations(volume dorisv1.PersistentVolume) Annotations { return annotations } +// BuildDisaggregatedPVCAnnotations finalAnnotations is a combination of user annotations and operator default annotations +func BuildDisaggregatedPVCAnnotations(volume dv1.PersistentVolume) Annotations { + annotations := Annotations{} + if volume.PVCProvisioner == dv1.PVCProvisionerOperator { + annotations.Add(PVCManagerAnnotationApache, "operator") + annotations.Add(dorisv1.ComponentResourceHash, hash.HashObject(volume.PersistentVolumeClaimSpec)) + } + + if volume.Annotations != nil && len(volume.Annotations) > 0 { + annotations.AddAnnotation(volume.Annotations) + } + return annotations +} + func getDefaultDorisHome(componentType dorisv1.ComponentType) string { switch componentType { case dorisv1.Component_FE: diff --git a/pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go b/pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go index 3976a7b3..a2222c4c 100644 --- a/pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go +++ b/pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go @@ -18,29 +18,29 @@ package computegroups import ( - "context" - "encoding/json" - "errors" - "fmt" - "regexp" - "strconv" - "strings" - "sync" - - dv1 "github.com/apache/doris-operator/api/disaggregated/v1" - "github.com/apache/doris-operator/pkg/common/utils" - "github.com/apache/doris-operator/pkg/common/utils/k8s" - "github.com/apache/doris-operator/pkg/common/utils/mysql" - "github.com/apache/doris-operator/pkg/common/utils/resource" - "github.com/apache/doris-operator/pkg/common/utils/set" - sc "github.com/apache/doris-operator/pkg/controller/sub_controller" - appv1 "k8s.io/api/apps/v1" - corev1 "k8s.io/api/core/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/types" - "k8s.io/klog/v2" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" + "context" + "encoding/json" + "errors" + "fmt" + "regexp" + "strconv" + "strings" + "sync" + + dv1 "github.com/apache/doris-operator/api/disaggregated/v1" + "github.com/apache/doris-operator/pkg/common/utils" + "github.com/apache/doris-operator/pkg/common/utils/k8s" + "github.com/apache/doris-operator/pkg/common/utils/mysql" + "github.com/apache/doris-operator/pkg/common/utils/resource" + "github.com/apache/doris-operator/pkg/common/utils/set" + sc "github.com/apache/doris-operator/pkg/controller/sub_controller" + appv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" ) var _ sc.DisaggregatedSubController = &DisaggregatedComputeGroupsController{} @@ -160,6 +160,12 @@ func (dcgs *DisaggregatedComputeGroupsController) computeGroupSync(ctx context.C event, err = dcgs.reconcileStatefulset(ctx, st, ddc, cg) if err != nil { klog.Errorf("disaggregatedComputeGroupsController reconcile statefulset namespace %s name %s failed, err=%s", st.Namespace, st.Name, err.Error()) + return event, err + } + + event, err = dcgs.ReconcilePVC(ctx, ddc, cvs, dv1.DisaggregatedBE, st, cg) + if err != nil { + klog.Errorf("computeGroupSync ReconcilePVC failed, namespace: %s, ddc name %s, cgName: %s, error=%s!", ddc.Namespace, ddc.Name, cg.UniqueId, err.Error()) } return event, err @@ -167,13 +173,13 @@ func (dcgs *DisaggregatedComputeGroupsController) computeGroupSync(ctx context.C // reconcileStatefulset return bool means reconcile print error message. func (dcgs *DisaggregatedComputeGroupsController) reconcileStatefulset(ctx context.Context, st *appv1.StatefulSet, cluster *dv1.DorisDisaggregatedCluster, cg *dv1.ComputeGroup) (*sc.Event, error) { - //use new default value before apply new statefulset, when creating and apply spec change. - ndf := func(st *appv1.StatefulSet, est *appv1.StatefulSet) { - dcgs.useNewDefaultValuesInStatefulset(st) - } + //use new default value before apply new statefulset, when creating and apply spec change. + ndf := func(st *appv1.StatefulSet, est *appv1.StatefulSet) { + dcgs.useNewDefaultValuesInStatefulset(st) + } - var est appv1.StatefulSet - if err := dcgs.K8sclient.Get(ctx, types.NamespacedName{Namespace: st.Namespace, Name: st.Name}, &est); apierrors.IsNotFound(err) { + var est appv1.StatefulSet + if err := dcgs.K8sclient.Get(ctx, types.NamespacedName{Namespace: st.Namespace, Name: st.Name}, &est); apierrors.IsNotFound(err) { // add downlaodAPI volume Mounts dcgs.DisaggregatedSubDefaultController.AddDownwardAPI(st) //if err = k8s.CreateClientObject(ctx, dcgs.K8sclient, st); err != nil { @@ -181,14 +187,15 @@ func (dcgs *DisaggregatedComputeGroupsController) reconcileStatefulset(ctx conte // return &sc.Event{Type: sc.EventWarning, Reason: sc.CGCreateResourceFailed, Message: err.Error()}, err //} - //use apply replace create, if use create the default image not replace with be image and annotation for equal not assign. - if err = k8s.ApplyStatefulSet(ctx, dcgs.K8sclient, st, func(st, est *appv1.StatefulSet) bool { - //creating use the function to assign equal annotation. - return resource.StatefulsetDeepEqualWithKey(st ,est, dv1.DisaggregatedSpecHashValueAnnotation, false) - }, ndf); err != nil { - klog.Errorf("disaggregatedComputeGroupsController reconcileStatefulset create statefulset namespace=%s name=%s failed, err=%s", st.Namespace, st.Name, err.Error()) - return &sc.Event{Type: sc.EventWarning, Reason: sc.CGCreateResourceFailed, Message: err.Error()}, err - } + //use apply replace create, if use create the default image not replace with be image and annotation for equal not assign. + if err = k8s.ApplyStatefulSet(ctx, dcgs.K8sclient, st, func(new, est *appv1.StatefulSet) bool { + dcgs.RestrictConditionsEqual(new, est) + //creating use the function to assign equal annotation. + return resource.StatefulsetDeepEqualWithKey(new, est, dv1.DisaggregatedSpecHashValueAnnotation, false) + }, ndf); err != nil { + klog.Errorf("disaggregatedComputeGroupsController reconcileStatefulset create statefulset namespace=%s name=%s failed, err=%s", st.Namespace, st.Name, err.Error()) + return &sc.Event{Type: sc.EventWarning, Reason: sc.CGCreateResourceFailed, Message: err.Error()}, err + } return nil, nil } else if err != nil { @@ -201,20 +208,22 @@ func (dcgs *DisaggregatedComputeGroupsController) reconcileStatefulset(ctx conte klog.Errorf("disaggregatedComputeGroupsController reconcileStatefulset preApplyStatefulSet namespace=%s name=%s failed, err=%s", st.Namespace, st.Name, err.Error()) return &sc.Event{Type: sc.EventWarning, Reason: sc.CGSqlExecFailed, Message: err.Error()}, err } + + // be decimmission processing, skip apply statefulset. if skipApplyStatefulset(cluster, cg) { return nil, nil } - - if err := k8s.ApplyStatefulSet(ctx, dcgs.K8sclient, st, func(st, est *appv1.StatefulSet) bool { + if err := k8s.ApplyStatefulSet(ctx, dcgs.K8sclient, st, func(new, est *appv1.StatefulSet) bool { + dcgs.RestrictConditionsEqual(new, est) //store annotations "doris.disaggregated.cluster/generation={generation}" on statefulset //store annotations "doris.disaggregated.cluster/update-{uniqueid}=true/false" on DorisDisaggregatedCluster - equal := resource.StatefulsetDeepEqualWithKey(st, est, dv1.DisaggregatedSpecHashValueAnnotation, false) + equal := resource.StatefulsetDeepEqualWithKey(new, est, dv1.DisaggregatedSpecHashValueAnnotation, false) if !equal { - if len(st.Annotations) == 0 { - st.Annotations = map[string]string{} + if len(new.Annotations) == 0 { + new.Annotations = map[string]string{} } - st_annos := (resource.Annotations)(st.Annotations) + st_annos := (resource.Annotations)(new.Annotations) st_annos.Add(dv1.UpdateStatefulsetGeneration, strconv.FormatInt(cluster.Generation, 10)) if len(cluster.Annotations) == 0 { cluster.Annotations = map[string]string{} @@ -222,7 +231,7 @@ func (dcgs *DisaggregatedComputeGroupsController) reconcileStatefulset(ctx conte ddc_annos := (resource.Annotations)(cluster.Annotations) msUniqueIdKey := strings.ToLower(fmt.Sprintf(dv1.UpdateStatefulsetName, cluster.GetCGStatefulsetName(cg))) ddc_annos.Add(msUniqueIdKey, "true") - dcgs.DisaggregatedSubDefaultController.AddDownwardAPI(st) + dcgs.DisaggregatedSubDefaultController.AddDownwardAPI(new) } return equal @@ -574,9 +583,8 @@ func (dcgs *DisaggregatedComputeGroupsController) UpdateComponentStatus(obj clie } } - for _, cgs := range ddc.Status.ComputeGroupStatuses { - if cgs.ComputeGroupId == "" { + if cgs.ComputeGroupId == "" { dcgs.recordComputeGroupIds(ddc) break } @@ -601,7 +609,7 @@ func (dcgs *DisaggregatedComputeGroupsController) UpdateComponentStatus(obj clie return errors.New(errMs) } -func(dcgs *DisaggregatedComputeGroupsController) recordComputeGroupIds(ddc *dv1.DorisDisaggregatedCluster) error { +func (dcgs *DisaggregatedComputeGroupsController) recordComputeGroupIds(ddc *dv1.DorisDisaggregatedCluster) error { // get user and password adminUserName, password := dcgs.GetManagementAdminUserAndPWD(context.Background(), ddc) @@ -624,7 +632,7 @@ func(dcgs *DisaggregatedComputeGroupsController) recordComputeGroupIds(ddc *dv1. klog.Errorf("DisaggregatedComputeGroupsController recordComputeGroupIds new doris client failed,err=%s", err.Error()) return err } - defer db.Close() + defer db.Close() backends, err := db.ShowBackends() if err != nil { @@ -634,7 +642,7 @@ func(dcgs *DisaggregatedComputeGroupsController) recordComputeGroupIds(ddc *dv1. m := map[string]string{} //statefulsetname:computegroupid for _, backend := range backends { - tags :=map[string]string{} + tags := map[string]string{} err = json.Unmarshal([]byte(backend.Tag), &tags) if err != nil { klog.Errorf("DisaggregatedComputeGroupsController recordComputeGroupIds backend tag stirng to map failed, tag: %s, err: %s", backend.Tag, err.Error()) @@ -647,19 +655,18 @@ func(dcgs *DisaggregatedComputeGroupsController) recordComputeGroupIds(ddc *dv1. } podName := strings.Split(backend.Host, ".")[0] - re,_ := regexp.Compile("(.*)-[0-9]+$") + re, _ := regexp.Compile("(.*)-[0-9]+$") matchs := re.FindStringSubmatch(podName) stsName := matchs[len(matchs)-1] m[stsName] = tags[mysql.COMPUTE_GROUP_ID] } - for i,cgs := range ddc.Status.ComputeGroupStatuses { + for i, cgs := range ddc.Status.ComputeGroupStatuses { ddc.Status.ComputeGroupStatuses[i].ComputeGroupId = m[cgs.StatefulsetName] } return nil } - func (dcgs *DisaggregatedComputeGroupsController) updateCGStatus(ddc *dv1.DorisDisaggregatedCluster, cgs *dv1.ComputeGroupStatus) error { stfName := cgs.StatefulsetName sts, err := k8s.GetStatefulSet(context.Background(), dcgs.K8sclient, ddc.Namespace, stfName) @@ -684,7 +691,6 @@ func (dcgs *DisaggregatedComputeGroupsController) updateCGStatus(ddc *dv1.DorisD return err } - updateRevision := sts.Status.UpdateRevision //check all pods controlled by new statefulset. allUpdated := dcgs.DisaggregatedSubDefaultController.StatefulsetControlledPodsAllUseNewUpdateRevision(updateRevision, podList.Items) diff --git a/pkg/controller/sub_controller/disaggregated_cluster/disaggregated_fe/controller.go b/pkg/controller/sub_controller/disaggregated_cluster/disaggregated_fe/controller.go index cef6e4c1..f9c60d44 100644 --- a/pkg/controller/sub_controller/disaggregated_cluster/disaggregated_fe/controller.go +++ b/pkg/controller/sub_controller/disaggregated_cluster/disaggregated_fe/controller.go @@ -117,6 +117,11 @@ func (dfc *DisaggregatedFEController) Sync(ctx context.Context, obj client.Objec return err } + event, err = dfc.ReconcilePVC(ctx, ddc, confMap, v1.DisaggregatedFE, st, nil) + if err != nil { + klog.Errorf("FE Sync ReconcilePVC failed, namespace: %s, ddc name %s, error=%s!", ddc.Namespace, ddc.Name, err.Error()) + } + return nil } @@ -308,15 +313,16 @@ func (dfc *DisaggregatedFEController) reconcileStatefulset(ctx context.Context, } // apply fe StatefulSet - if err := k8s.ApplyStatefulSet(ctx, dfc.K8sclient, st, func(st, est *appv1.StatefulSet) bool { + if err := k8s.ApplyStatefulSet(ctx, dfc.K8sclient, st, func(new, est *appv1.StatefulSet) bool { + dfc.RestrictConditionsEqual(new, est) //store annotations "doris.disaggregated.cluster/generation={generation}" on statefulset //store annotations "doris.disaggregated.cluster/update-{uniqueid}=true/false" on DorisDisaggregatedCluster - equal := resource.StatefulsetDeepEqualWithKey(st, est, v1.DisaggregatedSpecHashValueAnnotation, false) + equal := resource.StatefulsetDeepEqualWithKey(new, est, v1.DisaggregatedSpecHashValueAnnotation, false) if !equal { - if len(st.Annotations) == 0 { - st.Annotations = map[string]string{} + if len(new.Annotations) == 0 { + new.Annotations = map[string]string{} } - st_annos := (resource.Annotations)(st.Annotations) + st_annos := (resource.Annotations)(new.Annotations) st_annos.Add(v1.UpdateStatefulsetGeneration, strconv.FormatInt(cluster.Generation, 10)) if len(cluster.Annotations) == 0 { cluster.Annotations = map[string]string{} @@ -335,7 +341,7 @@ func (dfc *DisaggregatedFEController) reconcileStatefulset(ctx context.Context, // RecycleResources pvc resource for fe recycle func (dfc *DisaggregatedFEController) recycleResources(ctx context.Context, ddc *v1.DorisDisaggregatedCluster) error { - if ddc.Spec.FeSpec.PersistentVolume != nil || len(ddc.Spec.FeSpec.PersistentVolumes) != 0{ + if ddc.Spec.FeSpec.PersistentVolume != nil || len(ddc.Spec.FeSpec.PersistentVolumes) != 0 { return dfc.listAndDeletePersistentVolumeClaim(ctx, ddc) } return nil diff --git a/pkg/controller/sub_controller/disaggregated_cluster/metaservice/controller.go b/pkg/controller/sub_controller/disaggregated_cluster/metaservice/controller.go index 0ac5166f..947c0534 100644 --- a/pkg/controller/sub_controller/disaggregated_cluster/metaservice/controller.go +++ b/pkg/controller/sub_controller/disaggregated_cluster/metaservice/controller.go @@ -21,6 +21,9 @@ import ( "context" "errors" "fmt" + "strconv" + "strings" + "github.com/apache/doris-operator/api/disaggregated/v1" "github.com/apache/doris-operator/pkg/common/utils/k8s" "github.com/apache/doris-operator/pkg/common/utils/resource" @@ -32,8 +35,6 @@ import ( "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" - "strconv" - "strings" ) type DisaggregatedMSController struct { @@ -184,6 +185,11 @@ func (dms *DisaggregatedMSController) Sync(ctx context.Context, obj client.Objec return err } + event, err = dms.ReconcilePVC(ctx, ddc, confMap, v1.DisaggregatedMS, st, nil) + if err != nil { + klog.Errorf("MS Sync ReconcilePVC failed, namespace: %s, ddc name %s, error=%s!", ddc.Namespace, ddc.Name, err.Error()) + } + return nil } @@ -201,15 +207,16 @@ func (dms *DisaggregatedMSController) reconcileStatefulset(ctx context.Context, return nil, err } - if err := k8s.ApplyStatefulSet(ctx, dms.K8sclient, st, func(st, est *appv1.StatefulSet) bool { + if err := k8s.ApplyStatefulSet(ctx, dms.K8sclient, st, func(new, est *appv1.StatefulSet) bool { + dms.RestrictConditionsEqual(new, est) //store annotations "doris.disaggregated.cluster/generation={generation}" on statefulset //store annotations "doris.disaggregated.cluster/update-{uniqueid}=true/false" on DorisDisaggregatedCluster - equal := resource.StatefulsetDeepEqualWithKey(st, est, v1.DisaggregatedSpecHashValueAnnotation, false) + equal := resource.StatefulsetDeepEqualWithKey(new, est, v1.DisaggregatedSpecHashValueAnnotation, false) if !equal { - if len(st.Annotations) == 0 { - st.Annotations = map[string]string{} + if len(new.Annotations) == 0 { + new.Annotations = map[string]string{} } - st_annos := (resource.Annotations)(st.Annotations) + st_annos := (resource.Annotations)(new.Annotations) st_annos.Add(v1.UpdateStatefulsetGeneration, strconv.FormatInt(ddc.Generation, 10)) if len(ddc.Annotations) == 0 { ddc.Annotations = map[string]string{} diff --git a/pkg/controller/sub_controller/disaggregated_subcontroller.go b/pkg/controller/sub_controller/disaggregated_subcontroller.go index ee3928fc..16298173 100644 --- a/pkg/controller/sub_controller/disaggregated_subcontroller.go +++ b/pkg/controller/sub_controller/disaggregated_subcontroller.go @@ -37,6 +37,7 @@ import ( "github.com/spf13/viper" appv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" @@ -272,13 +273,13 @@ func (d *DisaggregatedSubDefaultController) CheckSecretExist(ctx context.Context // RestrictConditionsEqual adds two StatefulSet, // It is used to control the conditions for comparing. -// nst StatefulSet - a new StatefulSet -// est StatefulSet - an old StatefulSet -func (d *DisaggregatedSubDefaultController) RestrictConditionsEqual(nst *appv1.StatefulSet, est *appv1.StatefulSet) { +func (d *DisaggregatedSubDefaultController) RestrictConditionsEqual(new *appv1.StatefulSet, old *appv1.StatefulSet) { //shield persistent volume update when the pvcProvider=Operator //in webhook should intercept the volume spec updated when use statefulset pvc. // TODO: updates to statefulset spec for fields other than 'replicas', 'template', 'updateStrategy', 'persistentVolumeClaimRetentionPolicy' and 'minReadySeconds' are forbidden - nst.Spec.VolumeClaimTemplates = est.Spec.VolumeClaimTemplates + if len(old.Spec.VolumeClaimTemplates) != 0 { + new.Spec.VolumeClaimTemplates = old.Spec.VolumeClaimTemplates + } } func (d *DisaggregatedSubDefaultController) GetManagementAdminUserAndPWD(ctx context.Context, ddc *v1.DorisDisaggregatedCluster) (string, string) { @@ -297,11 +298,11 @@ func (d *DisaggregatedSubDefaultController) GetManagementAdminUserAndPWD(ctx con } // add cluster specification on container spec. this is useful to add common spec on different type pods, example: kerberos volume for fe and be. -func(d *DisaggregatedSubDefaultController) AddClusterSpecForPodTemplate(componentType v1.DisaggregatedComponentType, configMap map[string]interface{}, spec *v1.DorisDisaggregatedClusterSpec, pts *corev1.PodTemplateSpec){ +func (d *DisaggregatedSubDefaultController) AddClusterSpecForPodTemplate(componentType v1.DisaggregatedComponentType, configMap map[string]interface{}, spec *v1.DorisDisaggregatedClusterSpec, pts *corev1.PodTemplateSpec) { var c *corev1.Container switch componentType { case v1.DisaggregatedFE: - for i, _ := range pts.Spec.Containers { + for i, _ := range pts.Spec.Containers { if pts.Spec.Containers[i].Name == resource.DISAGGREGATED_FE_MAIN_CONTAINER_NAME { c = &pts.Spec.Containers[i] break @@ -337,8 +338,8 @@ func(d *DisaggregatedSubDefaultController) AddClusterSpecForPodTemplate(componen } -//return which generation had updated the statefulset. -func(d *DisaggregatedSubDefaultController) ReturnStatefulsetUpdatedGeneration(sts *appv1.StatefulSet, annoGenerationKey string) int64 { +// return which generation had updated the statefulset. +func (d *DisaggregatedSubDefaultController) ReturnStatefulsetUpdatedGeneration(sts *appv1.StatefulSet, annoGenerationKey string) int64 { if sts == nil { return 0 } @@ -353,18 +354,17 @@ func(d *DisaggregatedSubDefaultController) ReturnStatefulsetUpdatedGeneration(st return g } -//use statefulset.status.updateRevision and pod `controller-revision-hash` annotation to check pods updated to new revision. -//if all pods used new updateRevision return true, else return false. -func(d *DisaggregatedSubDefaultController) StatefulsetControlledPodsAllUseNewUpdateRevision(stsUpdateRevision string, pods []corev1.Pod) bool { +// use statefulset.status.updateRevision and pod `controller-revision-hash` annotation to check pods updated to new revision. +// if all pods used new updateRevision return true, else return false. +func (d *DisaggregatedSubDefaultController) StatefulsetControlledPodsAllUseNewUpdateRevision(stsUpdateRevision string, pods []corev1.Pod) bool { if stsUpdateRevision == "" { return false } - if len(pods) ==0 { + if len(pods) == 0 { return false } - for _, pod := range pods { labels := pod.Labels podControlledRevision := labels[resource.POD_CONTROLLER_REVISION_HASH_KEY] @@ -377,6 +377,99 @@ func(d *DisaggregatedSubDefaultController) StatefulsetControlledPodsAllUseNewUpd return true } +func (d *DisaggregatedSubDefaultController) ReconcilePVC( + ctx context.Context, + ddc *v1.DorisDisaggregatedCluster, + cm map[string]interface{}, + componentType v1.DisaggregatedComponentType, + sts *appv1.StatefulSet, + cg *v1.ComputeGroup, +) (*Event, error) { + + var commonSpec *v1.CommonSpec + switch componentType { + case v1.DisaggregatedMS: + commonSpec = &ddc.Spec.MetaService.CommonSpec + case v1.DisaggregatedFE: + commonSpec = &ddc.Spec.FeSpec.CommonSpec + case v1.DisaggregatedBE: + commonSpec = &cg.CommonSpec + default: + } + + _, _, pvcTemplates := d.BuildVolumesVolumeMountsAndPVCs(cm, componentType, commonSpec) + + oldPvcList := corev1.PersistentVolumeClaimList{} + selector := sts.Spec.Selector.MatchLabels + if err := d.K8sclient.List(ctx, &oldPvcList, client.InNamespace(ddc.Namespace), client.MatchingLabels(selector)); err != nil { + message := fmt.Sprintf("ReconcilePVC list pvc failed, namespace: %s, name: %s, error: %s", ddc.Namespace, ddc.Name, err.Error()) + klog.Error(message) + return &Event{Type: EventWarning, Reason: PVCListFailed, Message: message}, err + } + + for i := range pvcTemplates { + manager := pvcTemplates[i].Annotations[resource.PVCManagerAnnotationApache] + if manager != string(v1.PVCProvisionerOperator) { + continue + } + for ordinal := range *commonSpec.Replicas { + pvc := resource.BuildDisaggregatedPVC(pvcTemplates[i], selector, ddc.Namespace, sts.Name, strconv.FormatInt(int64(ordinal), 10)) + oldPvc := getPvc(oldPvcList, pvc.Name) + + // need add new pvc, + // however, this feature seems to have some additional limitations and may require recreating the statefulset. + // The impact of this behavior on the stability of the Doris cluster is still under investigation; + // therefore, the recreating statefulset needs to be performed manually. + if oldPvc == nil { + if err := d.K8sclient.Create(ctx, &pvc); err != nil && !apierrors.IsAlreadyExists(err) { + message := fmt.Sprintf("ReconcilePVC create pvc failed, namespace: %s, name: %s create pvc %s, error %s.", ddc.Namespace, ddc.Name, pvc.Name, err.Error()) + //d.K8srecorder.Event(ddc, string(EventWarning), PVCCreateFailed, message) + klog.Error(message) + return &Event{Type: EventWarning, Reason: PVCCreateFailed, Message: message}, err + } + message := fmt.Sprintf("ReconcilePVC create pvc, namespace: %s, name: %s create pvc %s .", ddc.Namespace, ddc.Name, pvc.Name) + klog.Infof(message) + d.K8srecorder.Event(ddc, string(EventNormal), PVCCreate, message) + continue + } + + oldQuantity := oldPvc.Spec.Resources.Requests[corev1.ResourceStorage] + newQuantity := pvc.Spec.Resources.Requests[corev1.ResourceStorage] + //if !oldQuantity.Equal(newQuantity){ + if oldQuantity.Cmp(newQuantity) == -1 { + // pvc need update + oldPvc.Spec.Resources.Requests[corev1.ResourceStorage] = newQuantity + if err := d.K8sclient.Patch(ctx, oldPvc, client.Merge); err != nil { + message := fmt.Sprintf("ReconcilePVC patch pvc failed, namespace: %s, ddc name: %s, patch pvc %s, error: %s", ddc.Namespace, ddc.Name, pvc.Name, err.Error()) + klog.Errorf(message) + return &Event{Type: EventWarning, Reason: PVCUpdateFailed, Message: message}, err + } + message := fmt.Sprintf("ReconcilePVC patch pvc, namespace: %s, ddc name: %s update pvc %s .", ddc.Namespace, ddc.Name, pvc.Name) + klog.Infof(message) + d.K8srecorder.Event(ddc, string(EventNormal), PVCUpdate, message) + } + + if oldQuantity.Cmp(newQuantity) == 1 { + message := fmt.Sprintf("ReconcilePVC pvc resize is rejected, PVC shrinking is not supported. namespace: %s, ddc name: %s, resize pvc %s", ddc.Namespace, ddc.Name, pvc.Name) + klog.Warningf(message) + d.K8srecorder.Event(ddc, string(EventWarning), PVCUpdateFailed, message) + } + + } + } + + return nil, nil +} + +func getPvc(pvcs corev1.PersistentVolumeClaimList, pvcName string) *corev1.PersistentVolumeClaim { + for _, pvc := range pvcs.Items { + if pvc.Name == pvcName { + return &pvc + } + } + return nil +} + func (d *DisaggregatedSubDefaultController) BuildVolumesVolumeMountsAndPVCs(confMap map[string]interface{}, componentType v1.DisaggregatedComponentType, commonSpec *v1.CommonSpec) ([]corev1.Volume, []corev1.VolumeMount, []corev1.PersistentVolumeClaim) { if commonSpec.PersistentVolume == nil && len(commonSpec.PersistentVolumes) == 0 { vs, vms := d.getEmptyDirVolumesVolumeMounts(confMap, componentType) @@ -390,7 +483,7 @@ func (d *DisaggregatedSubDefaultController) BuildVolumesVolumeMountsAndPVCs(conf return d.PersistentVolumeArrayBuildVolumesVolumeMountsAndPVCs(commonSpec, confMap, componentType) } -// the old config before 25.2.1, the requiredPaths should filter log path before call this function. +// PersistentVolumeBuildVolumesVolumeMountsAndPVCs the old config before 25.2.1, the requiredPaths should filter log path before call this function. func (d *DisaggregatedSubDefaultController) PersistentVolumeBuildVolumesVolumeMountsAndPVCs(commonSpec *v1.CommonSpec, confMap map[string]interface{}, componentType v1.DisaggregatedComponentType) ([]corev1.Volume, []corev1.VolumeMount, []corev1.PersistentVolumeClaim) { v1pv := commonSpec.PersistentVolume if v1pv == nil { @@ -446,7 +539,6 @@ func (d *DisaggregatedSubDefaultController) PersistentVolumeBuildVolumesVolumeMo } - var vs []corev1.Volume var vms []corev1.VolumeMount var pvcs []corev1.PersistentVolumeClaim @@ -461,7 +553,7 @@ func (d *DisaggregatedSubDefaultController) PersistentVolumeBuildVolumesVolumeMo pvcs = append(pvcs, corev1.PersistentVolumeClaim{ ObjectMeta: metav1.ObjectMeta{ Name: name, - Annotations: v1pv.Annotations, + Annotations: resource.BuildDisaggregatedPVCAnnotations(*v1pv), }, Spec: *v1pv.PersistentVolumeClaimSpec.DeepCopy(), }) @@ -470,7 +562,7 @@ func (d *DisaggregatedSubDefaultController) PersistentVolumeBuildVolumesVolumeMo return vs, vms, pvcs } -// use array of PersistentVolume, the new config from 25.2.x +// PersistentVolumeArrayBuildVolumesVolumeMountsAndPVCs use array of PersistentVolume, the new config from 25.2.x func (d *DisaggregatedSubDefaultController) PersistentVolumeArrayBuildVolumesVolumeMountsAndPVCs(commonSpec *v1.CommonSpec, confMap map[string]interface{}, componentType v1.DisaggregatedComponentType) ([]corev1.Volume, []corev1.VolumeMount, []corev1.PersistentVolumeClaim) { var requiredPaths []string @@ -548,8 +640,8 @@ func (d *DisaggregatedSubDefaultController) PersistentVolumeArrayBuildVolumesVol //generate pvc from the last path in requiredPaths, the mountPath that configured by user is the highest wight, so first use the v1pv to generate pvc not template v1pv. ss := set.NewSetString() - for i:= len(requiredPaths); i > 0; i-- { - path := requiredPaths[i -1] + for i := len(requiredPaths); i > 0; i-- { + path := requiredPaths[i-1] //if the path have build volume, vm, pvc, skip it. if ss.Find(path) { continue @@ -569,7 +661,7 @@ func (d *DisaggregatedSubDefaultController) PersistentVolumeArrayBuildVolumesVol pvcs = append(pvcs, corev1.PersistentVolumeClaim{ ObjectMeta: metav1.ObjectMeta{ Name: metadataName, - Annotations: pv.Annotations, + Annotations: resource.BuildDisaggregatedPVCAnnotations(*pv), }, Spec: *pv.PersistentVolumeClaimSpec.DeepCopy(), }) @@ -585,7 +677,7 @@ func (d *DisaggregatedSubDefaultController) PersistentVolumeArrayBuildVolumesVol pvcs = append(pvcs, corev1.PersistentVolumeClaim{ ObjectMeta: metav1.ObjectMeta{ Name: metadataName, - Annotations: commonSpec.PersistentVolumes[ti].Annotations, + Annotations: resource.BuildDisaggregatedPVCAnnotations(commonSpec.PersistentVolumes[ti]), }, Spec: *commonSpec.PersistentVolumes[ti].PersistentVolumeClaimSpec.DeepCopy(), }) @@ -609,7 +701,7 @@ func (d *DisaggregatedSubDefaultController) getEmptyDirVolumesVolumeMounts(confM } // this function is a compensation, because the DownwardAPI annotations and labels are not mount in pod, so this function amends。 -func(d *DisaggregatedSubDefaultController) AddDownwardAPI(st *appv1.StatefulSet) { +func (d *DisaggregatedSubDefaultController) AddDownwardAPI(st *appv1.StatefulSet) { t := &st.Spec.Template for index, _ := range t.Spec.Containers { if t.Spec.Containers[index].Name == resource.DISAGGREGATED_FE_MAIN_CONTAINER_NAME || t.Spec.Containers[index].Name == resource.DISAGGREGATED_BE_MAIN_CONTAINER_NAME ||