1、实时读取和处理Kafka中的数据。
利用faust.App创建Faust应用程序,并配置应用程序名称,Kafkabroker和序列模式。
接着,我们创建了一个主题,它与Kafka中的主题相对应。
Faust使用Python3.6+异步语法async,定义异步函数greet,并将其注册为Faust应用程序的agent。该函数接收实时数据集greetings,并异步输出每个数据。
import faust
app = faust.App(
'hello-world',
broker='kafka://localhost:9092',
value_serializer='raw',
)
greetings_topic = app.topic('greetings')
@app.agent(greetings_topic)
async def greet(greetings):
async for greeting in greetings:
print(greeting)
$ faust -A hello_world worker -l info
2、充分利用Python的类型提示,可以轻松定义数据模型。
import faust
class Greeting(faust.Record):
from_name: str
to_name: str
app = faust.App('hello-app', broker='kafka://localhost')
topic = app.topic('hello-topic', value_type=Greeting)
@app.agent(topic)
async def hello(greetings):
async for greeting in greetings:
print(f'Hello from {greeting.from_name} to {greeting.to_name}')
@app.timer(interval=1.0)
async def example_sender(app):
await hello.send(
value=Greeting(from_name='Faust', to_name='you'),
)
if __name__ == '__main__':
app.main()
神龙|纯净稳定代理IP免费测试>>>>>>>>天启|企业级代理IP免费测试>>>>>>>>IPIPGO|全球住宅代理IP免费测试