Kafka
0
无    2020-09-02 18:19:58    0    0
myron



Step 3: Create a topic

 

./kafka-topics.sh --create --bootstrap-server  10.1.6.134:9092,10.1.8.188:9092,10.1.0.177:9092 --replication-factor 1 --partitions 1 --topic 

panshi_ml_sorder


./kafka-topics.sh --create --zookeeper  10.1.6.134:2181,10.1.8.188:2181,10.1.0.177:2181 --replication-factor 1 --partitions 1 --topic panshi_ml_sorder



Let's create a topic named "test" with a single partition and only one replica:
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
We can now see that topic if we run the list topic command:
> bin/kafka-topics.sh --list --zookeeper localhost:2181
test

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
So there is no surprise there—the original topic has no replicas and is on server 0, the only server in our cluster when we created it.
Let's publish a few messages to our new topic:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
...
my test message 1
my test message 2
^C
Now let's consume these messages:
> bin/kafka-console-consumer.sh --zookeeper 125.39.193.243:2181 --from-beginning --topic mgr
...
my test message 1
my test message 2
^C

> bin/kafka-console-consumer.sh --zookeeper kafka-zoo-svc:2181 --from-beginning --topic mgr
_____________________________________________________________________

from pykafka import KafkaClient
client = KafkaClient(hosts="125.39.193.243:9092") 
client.topics 
topic = client.topics['mgr'] 
producer = topic.get_producer()
producer.produce('test message 1') ​



______________________________________________________________________

概念问题
kafaka和zookeeper的群集,使用samsa的时候生产者和消费者都连接了zookeeper,但是我跟峰云(大数据大牛,运维屌丝逆转)沟通,他们使用的时候是生产者直接连接kafaka服务器列表,消费者才用zookeeper。这也解决了我看pykafka文档,只有消费者才连接zookeeper的困惑,所以问题解决,直接按照文档搞起。
生产者
1
2
3
4
5
6
>>> from pykafka import KafkaClient
>>> client = KafkaClient(hosts="192.168.1.1:9092, 192.168.1.2:9092") # 可接受多个Client这是重点
>>> client.topics # 查看所有topic
>>> topic = client.topics['my.test'] # 选择一个topic
>>> producer = topic.get_producer()
>>> producer.produce(['test message ' + str(i ** 2) for i in range(4)]) # 加了个str官方的例子py2.7跑不过
消费者
1
2
3
4
5
>>> balanced_consumer = topic.get_balanced_consumer(
consumer_group='testgroup',
auto_commit_enable=True, # 设置为Flase的时候不需要添加 consumer_group
zookeeper_connect='myZkClusterNode1.com:2181,myZkClusterNode2.com:2181/myZkChroot' # 这里就是连接多个zk
)



_______________________________________________________________________

ESX Server常用命令
文档导航