-
Notifications
You must be signed in to change notification settings - Fork 200
Open
Labels
questionFurther information is requestedFurther information is requested
Description
Checklist
- I have included information about relevant versions
- I have verified that the issue persists when using the
masterbranch of Faust.
Steps to reproduce
I'm trying to use Faust to do some processing on Kafka messages in Confluent Cloud.
Here's the example code I'm using:
broker_credentials=faust.SASLCredentials(
mechanism=faust.types.auth.SASLMechanism.PLAIN,
ssl_context=create_ssl_context(),
username='XXX',
password='XXX'
)
broker_credentials.protocol = AuthProtocol.SASL_SSL
app = faust.App('agents-demo',
broker='kafka://XXX.cloud',
store='memory://',
broker_credentials=broker_credentials
)
greetings_topic = app.topic('test-greetings', value_type=str, value_serializer='raw')
# Consumer
@app.agent(greetings_topic)
async def greet(stream):
async for greeting in stream:
print(greeting)
Topic is created and has some messages, just trying to read them and print them out.
Expected behavior
I would expect the messages to be printed out in the console
Actual behavior
It tries for a few times, then stops with this message:
[ERROR] [^Worker]: Error: PolicyViolationError('Cannot create topic: agents-demo-__assignor-__leader (44): Topic replication factor must be 3')
I also tried to set TOPIC_PARTITIONS=1 and TOPIC_REPLICATION_FACTOR=1
What am I doing wrong here ?
Thank you!
Full traceback
Traceback (most recent call last):
File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/mode/worker.py", line 273, in execute_from_commandline
self.loop.run_until_complete(self._starting_fut)
File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/asyncio/base_events.py", line 608, in run_until_complete
return future.result()
File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/mode/services.py", line 736, in start
await self._default_start()
File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/mode/services.py", line 743, in _default_start
await self._actually_start()
File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/mode/services.py", line 767, in _actually_start
await child.maybe_start()
File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/mode/services.py", line 795, in maybe_start
await self.start()
File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/mode/services.py", line 736, in start
await self._default_start()
File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/mode/services.py", line 743, in _default_start
await self._actually_start()
File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/mode/services.py", line 767, in _actually_start
await child.maybe_start()
File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/mode/services.py", line 795, in maybe_start
await self.start()
File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/mode/services.py", line 736, in start
await self._default_start()
File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/mode/services.py", line 743, in _default_start
await self._actually_start()
File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/mode/services.py", line 760, in _actually_start
await self.on_start()
File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/faust/assignor/leader_assignor.py", line 20, in on_start
await self._enable_leader_topic()
File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/faust/assignor/leader_assignor.py", line 24, in _enable_leader_topic
await leader_topic.maybe_declare()
File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/mode/utils/futures.py", line 55, in __call__
result = await self.fun(*self.args, **self.kwargs)
File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/faust/topics.py", line 455, in maybe_declare
await self.declare()
File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/faust/topics.py", line 468, in declare
await producer.create_topic(
File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 996, in create_topic
await cast(Transport, self.transport)._create_topic(
File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 1164, in _create_topic
await wrap()
File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/mode/utils/futures.py", line 55, in __call__
result = await self.fun(*self.args, **self.kwargs)
File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 1252, in _really_create_topic
raise for_code(code)(
kafka.errors.PolicyViolationError: [Error 44] PolicyViolationError: Cannot create topic: agents-demo-__assignor-__leader (44): Topic replication factor must be 3
Versions
- Python version 3.8
- Faust version 0.10.4
- Operating system OsX
- Kafka version Confluent Cloud
- RocksDB version (if applicable)
Metadata
Metadata
Assignees
Labels
questionFurther information is requestedFurther information is requested