-
Notifications
You must be signed in to change notification settings - Fork 474
[server] Support generate and execute reblance plan #1452
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
061ed6b to
6bf4aca
Compare
6bf4aca to
d67fc39
Compare
d88c76c to
434a4f4
Compare
d67fc39 to
780e238
Compare
967ccb0 to
85341ca
Compare
60fd7ef to
ed3a37a
Compare
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
Show resolved
Hide resolved
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
Outdated
Show resolved
Hide resolved
fluss-server/src/main/java/org/apache/fluss/server/zk/data/LeaderAndIsr.java
Outdated
Show resolved
Hide resolved
f391276 to
b03b9eb
Compare
b03b9eb to
270af0a
Compare
270af0a to
911a84e
Compare
18e28c0 to
541782e
Compare
platinumhamburg
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@swuferhong Thanks for driving this critical and important work—I have a few comments on this PR.
fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java
Show resolved
Hide resolved
fluss-server/src/main/java/org/apache/fluss/server/zk/data/RebalancePlanJsonSerde.java
Show resolved
Hide resolved
| public RebalancePlan( | ||
| RebalanceStatus rebalanceStatus, Map<TableBucket, RebalancePlanForBucket> bucketPlan) { | ||
| String rebalanceId, | ||
| RebalanceStatus rebalanceStatus, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The inclusion of RebalanceStatus inside RebalancePlan is confusing to me. From the overall design, it appears that the concepts of Plan and Execution are intentionally separated—an elegant approach that aligns with common software architecture principles. However, in the current implementation, they seem tightly coupled.
Beyond this issue, the name rebalanceId feels ambiguous: does it refer to a planId, a progressId, or is it meant to unify both? This coupling isn’t ideal. For example, after a plan is generated, its execution might fail and be retried, which would break the one-to-one mapping between a plan and its corresponding progress instance.
After a broader review, I realized the original design intent: in fact, RebalanceProgress (or RebalanceTask) is the first-class entity. The Plan is merely an internal, ephemeral artifact—generated once per RebalanceProgress, never reused, never externally observed, and not subject to retries.
Given this understanding, perhaps what should be stored in ZooKeeper isn’t the RebalancePlan class itself, but rather RebalanceProgress, with the plan as an internal field within it. @swuferhong @wuchong What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@platinumhamburg I agree.
How about this:
CompletableFuture<RebalancePlan> rebalance(List<GoalType> priorityGoals, boolean dryRun);
==>
// triggers a rebalance task, and returns the rebalance id, in the future we can make the plan generation async
CompletableFuture<String> rebalance(List<GoalType> priorityGoals);
// returns a dry run plan for the goals, introduce in future, remove for now.
// CompletableFuture<RebalancePlan> rebalancePlan(List<GoalType> priorityGoals);
// return optional progress. empty if no rebalance in progress.
CompletableFuture<Optional<RebalanceProgress>> listRebalanceProgress(@Nullable String rebalanceId);
// RebalanceProgress still contains rebalance id, status, and plan to check the detailed progress per bucket.
// In ZK, we should json serialize RebalanceTask to distinguish class `RebalanceProgress` and `RebalancePlan`
// because it records the top-level status, but not record per-bucket rebalance statusWhat do you think @swuferhong ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. The new api: CompletableFuture<RebalancePlan> rebalancePlan(List<GoalType> priorityGoals); will be introduce in an individual issue.
fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
Outdated
Show resolved
Hide resolved
fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java
Outdated
Show resolved
Hide resolved
...ver/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/GoalOptimizerUtils.java
Show resolved
Hide resolved
fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManager.java
Show resolved
Hide resolved
fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManager.java
Show resolved
Hide resolved
...rver/src/test/java/org/apache/fluss/server/coordinator/rebalance/goal/GoalOptimizerTest.java
Outdated
Show resolved
Hide resolved
...erver/src/test/java/org/apache/fluss/server/coordinator/rebalance/model/ServerModelTest.java
Show resolved
Hide resolved
fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
Outdated
Show resolved
Hide resolved
fluss-client/src/test/java/org/apache/fluss/client/admin/RebalanceITCase.java
Show resolved
Hide resolved
fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/Goal.java
Show resolved
Hide resolved
fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/Goal.java
Outdated
Show resolved
Hide resolved
fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/Goal.java
Show resolved
Hide resolved
...ver/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/GoalOptimizerUtils.java
Outdated
Show resolved
Hide resolved
...ver/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/GoalOptimizerUtils.java
Show resolved
Hide resolved
...ver/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/GoalOptimizerUtils.java
Show resolved
Hide resolved
6616c63 to
d16bd4f
Compare
platinumhamburg
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add some comments.
fluss-client/src/test/java/org/apache/fluss/client/admin/RebalanceITCase.java
Show resolved
Hide resolved
fluss-client/src/test/java/org/apache/fluss/client/admin/RebalanceITCase.java
Show resolved
Hide resolved
...ver/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/GoalOptimizerUtils.java
Outdated
Show resolved
Hide resolved
...java/org/apache/fluss/server/coordinator/rebalance/goal/ReplicaDistributionAbstractGoal.java
Outdated
Show resolved
Hide resolved
...java/org/apache/fluss/server/coordinator/rebalance/goal/ReplicaDistributionAbstractGoal.java
Show resolved
Hide resolved
fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/BucketModel.java
Show resolved
Hide resolved
...-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/ReplicaReassignment.java
Outdated
Show resolved
Hide resolved
...-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/ReplicaReassignment.java
Outdated
Show resolved
Hide resolved
...-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/ReplicaReassignment.java
Outdated
Show resolved
Hide resolved
...-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/ReplicaReassignment.java
Outdated
Show resolved
Hide resolved
5a0e50b to
995e3d2
Compare
995e3d2 to
6d4f006
Compare
...ink-common/src/main/java/org/apache/fluss/flink/procedure/ListRebalanceProcessProcedure.java
Show resolved
Hide resolved
...nk/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/RebalanceProcedure.java
Show resolved
Hide resolved
...s-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/ClusterModel.java
Show resolved
Hide resolved
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
Show resolved
Hide resolved
...java/org/apache/fluss/server/coordinator/rebalance/goal/ReplicaDistributionAbstractGoal.java
Show resolved
Hide resolved
...rc/main/java/org/apache/fluss/server/coordinator/rebalance/goal/ReplicaDistributionGoal.java
Show resolved
Hide resolved
fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/ServerModel.java
Show resolved
Hide resolved
fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
Show resolved
Hide resolved
.../src/main/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachine.java
Show resolved
Hide resolved
| (replica) -> { | ||
| // For rebalance case. the replica state already set to null in method | ||
| // stopRemovedReplicasOfReassignedBucket. so we need not reset it again. | ||
| if (coordinatorContext.getReplicaState(replica) != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that null is a very ambiguous state—would it be better to define a clear, explicit state for use in rebalance scenarios?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
null is not define only for reblance. NonExist will change the state in coordinatorContext to null.
Purpose
Linked issue: #1397
Brief change log
Tests
API and Format
Documentation