Skip to content

Conversation

@sylwiaszunejko
Copy link
Collaborator

@sylwiaszunejko sylwiaszunejko commented Dec 18, 2025

This PR fixes inefficiencies in the host initialization mechanism when bootstrapping a cluster.

Previously, the driver created Host instances with connections from the contact points provided in the cluster configuration using random host IDs. After establishing the control connection and reading from system.peers, these initial Host instances were discarded and replaced with new ones created using the correct host metadata. This approach resulted in unnecessary creation and teardown of multiple connections.

Changes

  • The control connection is now initialized only using the endpoints specified in the cluster configuration.
  • After a successful control connection is established, the driver reads from system.local and system.peers.
  • Based on this metadata, Host instances are created with the correct host_id values.
  • Connections are then initialized directly on these properly constructed Host instances.

@sylwiaszunejko
Copy link
Collaborator Author

Some tests are still failing, but I wanted to ask if the direction is good @dkropachev

@sylwiaszunejko
Copy link
Collaborator Author

@Lorak-mmk maybe you know, why this test assumes that the new_host should be different?

def test_get_control_connection_host(self):
        """
        Test to validate Cluster.get_control_connection_host() metadata

        @since 3.5.0
        @jira_ticket PYTHON-583
        @expected_result the control connection metadata should accurately reflect cluster state.

        @test_category metadata
        """

        host = self.cluster.get_control_connection_host()
        assert host == None

        self.session = self.cluster.connect()
        cc_host = self.cluster.control_connection._connection.host

        host = self.cluster.get_control_connection_host()
        assert host.address == cc_host
        assert host.is_up == True

        # reconnect and make sure that the new host is reflected correctly
        self.cluster.control_connection._reconnect()
        new_host = self.cluster.get_control_connection_host()
        assert host != new_host

@Lorak-mmk
Copy link

Lorak-mmk commented Dec 18, 2025

I have no idea.
In Rust Driver we have logic that if CC breaks then we try to connect it to all other hosts (because the one it was connected to is presumed non-working for now).
I see no such logic in Python Driver. This part was added in commit 2796ee5:
image

Was this test passing until now and non-flaky? If so, then perhaps there is such logic somewhere.

@Lorak-mmk
Copy link

Now that I think of it: I see that driver uses LBP to decide order of hosts to connect. See _connect_host_in_lbp and _reconnect_internal.
LBP uses by default is Round Robin, so on reconnect it will start from a different host than at the beginning, right? It would explain why each CC reconnect should land at different host in healthy cluster.

@sylwiaszunejko
Copy link
Collaborator Author

Now that I think of it: I see that driver uses LBP to decide order of hosts to connect. See _connect_host_in_lbp and _reconnect_internal. LBP uses by default is Round Robin, so on reconnect it will start from a different host than at the beginning, right? It would explain why each CC reconnect should land at different host in healthy cluster.

Makes sense, second question: in this test:

def test_profile_lb_swap(self):
        """
        Tests that profile load balancing policies are not shared

        Creates two LBP, runs a few queries, and validates that each LBP is execised
        seperately between EP's

        @since 3.5
        @jira_ticket PYTHON-569
        @expected_result LBP should not be shared.

        @test_category config_profiles
        """
        query = "select release_version from system.local where key='local'"
        rr1 = ExecutionProfile(load_balancing_policy=RoundRobinPolicy())
        rr2 = ExecutionProfile(load_balancing_policy=RoundRobinPolicy())
        exec_profiles = {'rr1': rr1, 'rr2': rr2}
        with TestCluster(execution_profiles=exec_profiles) as cluster:
            session = cluster.connect(wait_for_all_pools=True)

            # default is DCA RR for all hosts
            expected_hosts = set(cluster.metadata.all_hosts())
            rr1_queried_hosts = set()
            rr2_queried_hosts = set()

            rs = session.execute(query, execution_profile='rr1')
            rr1_queried_hosts.add(rs.response_future._current_host)
            rs = session.execute(query, execution_profile='rr2')
            rr2_queried_hosts.add(rs.response_future._current_host)

            assert rr2_queried_hosts == rr1_queried_hosts

in this tests it is assumed that both queries should use the same host, as they use different instances of RoundRobinPolicy and they start from the same host? But how this can be true if the position when we start is randomized here: https://github.com/scylladb/python-driver/blob/master/cassandra/policies.py#L182

@Lorak-mmk
Copy link

No idea. Perhaps populate is not called for those policies for some reason, and they are populated using on_up/down etc?
Try to print a log / stacktrace in populate and run this test.

Comment on lines +267 to +269
if not self.local_dc:
self.local_dc = dc
return HostDistance.LOCAL
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should not be in this PR

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sylwiaszunejko, what is the reason for having it here ?

@sylwiaszunejko
Copy link
Collaborator Author

Now that I think of it: I see that driver uses LBP to decide order of hosts to connect. See _connect_host_in_lbp and _reconnect_internal. LBP uses by default is Round Robin, so on reconnect it will start from a different host than at the beginning, right? It would explain why each CC reconnect should land at different host in healthy cluster.

Makes sense, second question: in this test:

def test_profile_lb_swap(self):
        """
        Tests that profile load balancing policies are not shared

        Creates two LBP, runs a few queries, and validates that each LBP is execised
        seperately between EP's

        @since 3.5
        @jira_ticket PYTHON-569
        @expected_result LBP should not be shared.

        @test_category config_profiles
        """
        query = "select release_version from system.local where key='local'"
        rr1 = ExecutionProfile(load_balancing_policy=RoundRobinPolicy())
        rr2 = ExecutionProfile(load_balancing_policy=RoundRobinPolicy())
        exec_profiles = {'rr1': rr1, 'rr2': rr2}
        with TestCluster(execution_profiles=exec_profiles) as cluster:
            session = cluster.connect(wait_for_all_pools=True)

            # default is DCA RR for all hosts
            expected_hosts = set(cluster.metadata.all_hosts())
            rr1_queried_hosts = set()
            rr2_queried_hosts = set()

            rs = session.execute(query, execution_profile='rr1')
            rr1_queried_hosts.add(rs.response_future._current_host)
            rs = session.execute(query, execution_profile='rr2')
            rr2_queried_hosts.add(rs.response_future._current_host)

            assert rr2_queried_hosts == rr1_queried_hosts

in this tests it is assumed that both queries should use the same host, as they use different instances of RoundRobinPolicy and they start from the same host? But how this can be true if the position when we start is randomized here: https://github.com/scylladb/python-driver/blob/master/cassandra/policies.py#L182

This test was working because populate was called before cc was created, so we only knew about contact points provided in cluster config (so only one host) I believe current approach (calling populate on lbp after creating cc so we can update lbp with all known hosts) is much better so we should remove this test @Lorak-mmk WDYT?

@Lorak-mmk
Copy link

In the previous approach (calling populate with one host) were the on_add calls correct (so one call for each host, besides CC host)?
If so, then both versions are correct. I think we could then switch to proposed version.

@Lorak-mmk
Copy link

You could then adjust the test, not remove it.

@sylwiaszunejko
Copy link
Collaborator Author

sylwiaszunejko commented Dec 19, 2025

In the previous approach (calling populate with one host) were the on_add calls correct (so one call for each host, besides CC host)? If so, then both versions are correct. I think we could then switch to proposed version.

on_add is called properly, but if there is only one host during populate the starting position for RoundRobinPolicy is always the same even if some hosts are added later:

if len(hosts) > 1:
            self._position = randint(0, len(hosts) - 1)

@sylwiaszunejko sylwiaszunejko force-pushed the remove_random_ids branch 2 times, most recently from adddec1 to 3e864fc Compare December 20, 2025 12:58
Let control connection use resolved contact points from
cluster config if lbp is not yet initialized.
@sylwiaszunejko sylwiaszunejko self-assigned this Dec 22, 2025
@sylwiaszunejko sylwiaszunejko marked this pull request as ready for review December 22, 2025 13:21
Comment on lines 3524 to 3562
def _connect_host(self):
errors = {}

lbp = (
self._cluster.load_balancing_policy
if self._cluster._config_mode == _ConfigMode.LEGACY else
self._cluster._default_load_balancing_policy
)

# use endpoints from the default LBP if it is already initialized
for host in lbp.make_query_plan():
try:
return (self._try_connect(host), None)
return (self._try_connect(host.endpoint), None)
except ConnectionException as exc:
errors[str(host.endpoint)] = exc
log.warning("[control connection] Error connecting to %s:", host, exc_info=True)
self._cluster.signal_connection_failure(host, exc, is_host_addition=False)
except Exception as exc:
errors[str(host.endpoint)] = exc
log.warning("[control connection] Error connecting to %s:", host, exc_info=True)
if self._is_shutdown:
raise DriverException("[control connection] Reconnection in progress during shutdown")


# if lbp not initialized use contact points provided to the cluster
if len(errors) == 0:
for endpoint in self._cluster.endpoints_resolved:
try:
return (self._try_connect(endpoint), None)
except ConnectionException as exc:
errors[str(endpoint)] = exc
log.warning("[control connection] Error connecting to %s:", endpoint, exc_info=True)
self._cluster.signal_connection_failure(endpoint, exc, is_host_addition=False)
except Exception as exc:
errors[str(endpoint)] = exc
log.warning("[control connection] Error connecting to %s:", endpoint, exc_info=True)
if self._is_shutdown:
raise DriverException("[control connection] Reconnection in progress during shutdown")

return (None, errors)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make it simple solving #622 on the way:

    def _connect_host(self):
        errors = {}

        lbp = self._cluster.load_balancing_policy \
            if self._cluster._config_mode == _ConfigMode.LEGACY else self._cluster._default_load_balancing_policy

        # use endpoints from the default LBP if it is already initialized
        for endpoint in itertools.chain((host.endpoint for host in lbp.make_query_plan()), self._cluster.endpoints_resolved):
            try:
                return (self._try_connect(endpoint), None)
            except Exception as exc:
                errors[str(endpoint)] = exc
                log.warning("[control connection] Error connecting to %s:", endpoint, exc_info=True)
            if self._is_shutdown:
                raise DriverException("[control connection] Reconnection in progress during shutdown")

        return (None, errors)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please also find a better name for it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants