Skip to content

Commit 0728459

Browse files
authored
GG-31859 [IGNITE-14465 Add] the ability to set and get cluster state (#37)
(cherry picked from commit 7c1d0cc)
1 parent 718eed4 commit 0728459

File tree

19 files changed

+723
-85
lines changed

19 files changed

+723
-85
lines changed

pygridgain/aio_client.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from itertools import chain
1919
from typing import Iterable, Type, Union, Any, Dict
2020

21+
from .aio_cluster import AioCluster
2122
from .api import cache_get_node_partitions_async
2223
from .api.binary import get_binary_type_async, put_binary_type_async
2324
from .api.cache_config import cache_get_names_async
@@ -93,7 +94,7 @@ async def _connect(self, nodes):
9394

9495
if not self.partition_aware:
9596
try:
96-
if self.protocol_version is None:
97+
if self.protocol_context is None:
9798
# open connection before adding to the pool
9899
await conn.connect()
99100

@@ -121,7 +122,7 @@ async def _connect(self, nodes):
121122

122123
await asyncio.gather(*reconnect_coro, return_exceptions=True)
123124

124-
if self.protocol_version is None:
125+
if self.protocol_context is None:
125126
raise ReconnectError('Can not connect.')
126127

127128
async def close(self):
@@ -302,7 +303,7 @@ async def get_best_node(
302303
most probably contains the needed key-value pair. See IEP-23.
303304
304305
This method is not a part of the public API. Unless you wish to
305-
extend the `pyignite` capabilities (with additional testing, logging,
306+
extend the `pygridgain` capabilities (with additional testing, logging,
306307
examining connections, et c.) you probably should not use it.
307308
308309
:param cache: Ignite cache, cache name or cache id,
@@ -461,3 +462,11 @@ def sql(
461462
return AioSqlFieldsCursor(self, c_id, query_str, page_size, query_args, schema, statement_type,
462463
distributed_joins, local, replicated_only, enforce_join_order, collocated,
463464
lazy, include_field_names, max_rows, timeout)
465+
466+
def get_cluster(self) -> 'AioCluster':
467+
"""
468+
Gets client cluster facade.
469+
470+
:return: AioClient cluster facade.
471+
"""
472+
return AioCluster(self)

pygridgain/aio_cluster.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
#
2+
# Copyright 2021 GridGain Systems, Inc. and Contributors.
3+
#
4+
# Licensed under the GridGain Community Edition License (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
"""
17+
This module contains `AioCluster` that lets you get info and change state of the
18+
whole cluster asynchronously.
19+
"""
20+
from pygridgain.api.cluster import cluster_get_state_async, cluster_set_state_async
21+
from pygridgain.exceptions import ClusterError
22+
from pygridgain.utils import status_to_exception
23+
24+
25+
class AioCluster:
26+
"""
27+
Ignite cluster abstraction. Users should never use this class directly,
28+
but construct its instances with
29+
:py:meth:`~pygridgain.aio_client.AioClient.get_cluster` method instead.
30+
"""
31+
32+
def __init__(self, client: 'AioClient'):
33+
self._client = client
34+
35+
@status_to_exception(ClusterError)
36+
async def get_state(self):
37+
"""
38+
Gets current cluster state.
39+
40+
:return: Current cluster state. This is one of ClusterState.INACTIVE,
41+
ClusterState.ACTIVE or ClusterState.ACTIVE_READ_ONLY.
42+
"""
43+
return await cluster_get_state_async(await self._client.random_node())
44+
45+
@status_to_exception(ClusterError)
46+
async def set_state(self, state):
47+
"""
48+
Changes current cluster state to the given.
49+
50+
Note: Deactivation clears in-memory caches (without persistence)
51+
including the system caches.
52+
53+
:param state: New cluster state. This is one of ClusterState.INACTIVE,
54+
ClusterState.ACTIVE or ClusterState.ACTIVE_READ_ONLY.
55+
"""
56+
return await cluster_set_state_async(await self._client.random_node(), state)

pygridgain/api/cluster.py

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
#
2+
# Copyright 2021 GridGain Systems, Inc. and Contributors.
3+
#
4+
# Licensed under the GridGain Community Edition License (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
from pygridgain.api import APIResult
17+
from pygridgain.connection import AioConnection, Connection
18+
from pygridgain.datatypes import Byte
19+
from pygridgain.exceptions import NotSupportedByClusterError
20+
from pygridgain.queries import Query, query_perform
21+
from pygridgain.queries.op_codes import OP_CLUSTER_GET_STATE, OP_CLUSTER_CHANGE_STATE
22+
23+
24+
def cluster_get_state(connection: 'Connection', query_id=None) -> 'APIResult':
25+
"""
26+
Get cluster state.
27+
28+
:param connection: Connection to use,
29+
:param query_id: (optional) a value generated by client and returned as-is
30+
in response.query_id. When the parameter is omitted, a random value
31+
is generated,
32+
:return: API result data object. Contains zero status and a state
33+
retrieved on success, non-zero status and an error description on failure.
34+
"""
35+
return __cluster_get_state(connection, query_id)
36+
37+
38+
async def cluster_get_state_async(connection: 'AioConnection', query_id=None) -> 'APIResult':
39+
"""
40+
Async version of cluster_get_state
41+
"""
42+
return await __cluster_get_state(connection, query_id)
43+
44+
45+
def __post_process_get_state(result):
46+
if result.status == 0:
47+
result.value = result.value['state']
48+
return result
49+
50+
51+
def __cluster_get_state(connection, query_id):
52+
if not connection.protocol_context.is_cluster_api_supported():
53+
raise NotSupportedByClusterError('Cluster API is not supported by the cluster')
54+
55+
query_struct = Query(OP_CLUSTER_GET_STATE, query_id=query_id)
56+
return query_perform(
57+
query_struct, connection,
58+
response_config=[('state', Byte)],
59+
post_process_fun=__post_process_get_state
60+
)
61+
62+
63+
def cluster_set_state(connection: 'Connection', state: int, query_id=None) -> 'APIResult':
64+
"""
65+
Set cluster state.
66+
67+
:param connection: Connection to use,
68+
:param state: State to set,
69+
:param query_id: (optional) a value generated by client and returned as-is
70+
in response.query_id. When the parameter is omitted, a random value
71+
is generated,
72+
:return: API result data object. Contains zero status if a value
73+
is written, non-zero status and an error description otherwise.
74+
"""
75+
return __cluster_set_state(connection, state, query_id)
76+
77+
78+
async def cluster_set_state_async(connection: 'AioConnection', state: int, query_id=None) -> 'APIResult':
79+
"""
80+
Async version of cluster_get_state
81+
"""
82+
return await __cluster_set_state(connection, state, query_id)
83+
84+
85+
def __post_process_set_state(result):
86+
if result.status == 0:
87+
result.value = result.value['state']
88+
return result
89+
90+
91+
def __cluster_set_state(connection, state, query_id):
92+
if not connection.protocol_context.is_cluster_api_supported():
93+
raise NotSupportedByClusterError('Cluster API is not supported by the cluster')
94+
95+
query_struct = Query(
96+
OP_CLUSTER_CHANGE_STATE,
97+
[
98+
('state', Byte)
99+
],
100+
query_id=query_id
101+
)
102+
return query_perform(
103+
query_struct, connection,
104+
query_params={
105+
'state': state,
106+
}
107+
)

pygridgain/client.py

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
from .api import cache_get_node_partitions
5050
from .api.binary import get_binary_type, put_binary_type
5151
from .api.cache_config import cache_get_names
52+
from .cluster import Cluster
5253
from .cursors import SqlFieldsCursor
5354
from .cache import Cache, create_cache, get_cache, get_or_create_cache, BaseCache
5455
from .connection import Connection
@@ -83,32 +84,32 @@ def __init__(self, compact_footer: bool = None, partition_aware: bool = False, *
8384
self._partition_aware = partition_aware
8485
self.affinity_version = (0, 0)
8586
self._affinity = {'version': self.affinity_version, 'partition_mapping': defaultdict(dict)}
86-
self._protocol_version = None
87+
self._protocol_context = None
8788

8889
@property
89-
def protocol_version(self):
90+
def protocol_context(self):
9091
"""
91-
Returns the tuple of major, minor, and revision numbers of the used
92-
thin protocol version, or None, if no connection to the GridGain
92+
Returns protocol context, or None, if no connection to the Ignite
9393
cluster was not yet established.
9494
9595
This method is not a part of the public API. Unless you wish to
9696
extend the `pygridgain` capabilities (with additional testing,
9797
logging, examining connections, et c.) you probably should not use it.
9898
"""
99-
return self._protocol_version
99+
return self._protocol_context
100100

101-
@protocol_version.setter
102-
def protocol_version(self, value):
103-
self._protocol_version = value
101+
@protocol_context.setter
102+
def protocol_context(self, value):
103+
self._protocol_context = value
104104

105105
@property
106106
def partition_aware(self):
107107
return self._partition_aware and self.partition_awareness_supported_by_protocol
108108

109109
@property
110110
def partition_awareness_supported_by_protocol(self):
111-
return self.protocol_version is not None and self.protocol_version >= (1, 4, 0)
111+
return self.protocol_context is not None \
112+
and self.protocol_context.is_partition_awareness_supported()
112113

113114
@property
114115
def compact_footer(self) -> bool:
@@ -379,7 +380,7 @@ def _connect(self, nodes):
379380
conn = Connection(self, host, port, **self._connection_args)
380381

381382
try:
382-
if self.protocol_version is None or self.partition_aware:
383+
if self.protocol_context is None or self.partition_aware:
383384
# open connection before adding to the pool
384385
conn.connect()
385386

@@ -396,7 +397,7 @@ def _connect(self, nodes):
396397

397398
self._nodes.append(conn)
398399

399-
if self.protocol_version is None:
400+
if self.protocol_context is None:
400401
raise ReconnectError('Can not connect.')
401402

402403
def close(self):
@@ -578,7 +579,7 @@ def get_best_node(
578579
most probably contains the needed key-value pair. See IEP-23.
579580
580581
This method is not a part of the public API. Unless you wish to
581-
extend the `pyignite` capabilities (with additional testing, logging,
582+
extend the `pygridgain` capabilities (with additional testing, logging,
582583
examining connections, et c.) you probably should not use it.
583584
584585
:param cache: Ignite cache, cache name or cache id,
@@ -728,3 +729,11 @@ def sql(
728729
return SqlFieldsCursor(self, c_id, query_str, page_size, query_args, schema, statement_type, distributed_joins,
729730
local, replicated_only, enforce_join_order, collocated, lazy, include_field_names,
730731
max_rows, timeout)
732+
733+
def get_cluster(self) -> 'Cluster':
734+
"""
735+
Gets client cluster facade.
736+
737+
:return: Client cluster facade.
738+
"""
739+
return Cluster(self)

pygridgain/cluster.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
#
2+
# Copyright 2021 GridGain Systems, Inc. and Contributors.
3+
#
4+
# Licensed under the GridGain Community Edition License (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
17+
"""
18+
This module contains `Cluster` that lets you get info and change state of the
19+
whole cluster.
20+
"""
21+
from pygridgain.api.cluster import cluster_get_state, cluster_set_state
22+
from pygridgain.exceptions import ClusterError
23+
from pygridgain.utils import status_to_exception
24+
25+
26+
class Cluster:
27+
"""
28+
Ignite cluster abstraction. Users should never use this class directly,
29+
but construct its instances with
30+
:py:meth:`~pygridgain.client.Client.get_cluster` method instead.
31+
"""
32+
33+
def __init__(self, client: 'Client'):
34+
self._client = client
35+
36+
@status_to_exception(ClusterError)
37+
def get_state(self):
38+
"""
39+
Gets current cluster state.
40+
41+
:return: Current cluster state. This is one of ClusterState.INACTIVE,
42+
ClusterState.ACTIVE or ClusterState.ACTIVE_READ_ONLY.
43+
"""
44+
return cluster_get_state(self._client.random_node)
45+
46+
@status_to_exception(ClusterError)
47+
def set_state(self, state):
48+
"""
49+
Changes current cluster state to the given.
50+
51+
Note: Deactivation clears in-memory caches (without persistence)
52+
including the system caches.
53+
54+
:param state: New cluster state. This is one of ClusterState.INACTIVE,
55+
ClusterState.ACTIVE or ClusterState.ACTIVE_READ_ONLY.
56+
"""
57+
return cluster_set_state(self._client.random_node, state)

0 commit comments

Comments
 (0)