Skip to content

Using Faust with Confluent cloud #449

@dantodor

Description

@dantodor

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

I'm trying to use Faust to do some processing on Kafka messages in Confluent Cloud.
Here's the example code I'm using:

broker_credentials=faust.SASLCredentials(
    mechanism=faust.types.auth.SASLMechanism.PLAIN,
    ssl_context=create_ssl_context(),
    username='XXX',
    password='XXX'
)

broker_credentials.protocol = AuthProtocol.SASL_SSL


app = faust.App('agents-demo',
                broker='kafka://XXX.cloud',
                store='memory://',
                broker_credentials=broker_credentials
                )

greetings_topic = app.topic('test-greetings', value_type=str, value_serializer='raw')

# Consumer
@app.agent(greetings_topic)
async def greet(stream):
    async for greeting in stream:
        print(greeting)

Topic is created and has some messages, just trying to read them and print them out.

Expected behavior

I would expect the messages to be printed out in the console

Actual behavior

It tries for a few times, then stops with this message:

[ERROR] [^Worker]: Error: PolicyViolationError('Cannot create topic: agents-demo-__assignor-__leader (44): Topic replication factor must be 3')

I also tried to set TOPIC_PARTITIONS=1 and TOPIC_REPLICATION_FACTOR=1

What am I doing wrong here ?
Thank you!

Full traceback

Traceback (most recent call last):
  File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/mode/worker.py", line 273, in execute_from_commandline
    self.loop.run_until_complete(self._starting_fut)
  File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/asyncio/base_events.py", line 608, in run_until_complete
    return future.result()
  File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/mode/services.py", line 736, in start
    await self._default_start()
  File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/mode/services.py", line 743, in _default_start
    await self._actually_start()
  File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/mode/services.py", line 767, in _actually_start
    await child.maybe_start()
  File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/mode/services.py", line 795, in maybe_start
    await self.start()
  File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/mode/services.py", line 736, in start
    await self._default_start()
  File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/mode/services.py", line 743, in _default_start
    await self._actually_start()
  File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/mode/services.py", line 767, in _actually_start
    await child.maybe_start()
  File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/mode/services.py", line 795, in maybe_start
    await self.start()
  File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/mode/services.py", line 736, in start
    await self._default_start()
  File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/mode/services.py", line 743, in _default_start
    await self._actually_start()
  File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/mode/services.py", line 760, in _actually_start
    await self.on_start()
  File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/faust/assignor/leader_assignor.py", line 20, in on_start
    await self._enable_leader_topic()
  File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/faust/assignor/leader_assignor.py", line 24, in _enable_leader_topic
    await leader_topic.maybe_declare()
  File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/mode/utils/futures.py", line 55, in __call__
    result = await self.fun(*self.args, **self.kwargs)
  File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/faust/topics.py", line 455, in maybe_declare
    await self.declare()
  File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/faust/topics.py", line 468, in declare
    await producer.create_topic(
  File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 996, in create_topic
    await cast(Transport, self.transport)._create_topic(
  File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 1164, in _create_topic
    await wrap()
  File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/mode/utils/futures.py", line 55, in __call__
    result = await self.fun(*self.args, **self.kwargs)
  File "/Users/dantodor/opt/anaconda3/envs/faust2/lib/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 1252, in _really_create_topic
    raise for_code(code)(
kafka.errors.PolicyViolationError: [Error 44] PolicyViolationError: Cannot create topic: agents-demo-__assignor-__leader (44): Topic replication factor must be 3

Versions

  • Python version 3.8
  • Faust version 0.10.4
  • Operating system OsX
  • Kafka version Confluent Cloud
  • RocksDB version (if applicable)

Metadata

Metadata

Assignees

No one assigned

    Labels

    questionFurther information is requested

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions