Step 3: Create a topic
./kafka-topics.sh --create --bootstrap-server 10.1.6.134:9092,10.1.8.188:9092,10.1.0.177:9092 --replication-factor 1 --partitions 1 --topic
panshi_ml_sorder
./kafka-topics.sh --create --zookeeper 10.1.6.134:2181,10.1.8.188:2181,10.1.0.177:2181 --replication-factor 1 --partitions 1 --topic panshi_ml_sorder
Let's create a topic named "test" with a single partition and only one replica:> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testWe can now see that topic if we run the list topic command:> bin/kafka-topics.sh --list --zookeeper localhost:2181test> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testThis is a messageThis is another message> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic testTopic:test PartitionCount:1 ReplicationFactor:1 Configs: Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0So there is no surprise there—the original topic has no replicas and is on server 0, the only server in our cluster when we created it.Let's publish a few messages to our new topic:> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic...my test message 1my test message 2^CNow let's consume these messages:> bin/kafka-console-consumer.sh --zookeeper 125.39.193.243:2181 --from-beginning --topic mgr...my test message 1my test message 2^C> bin/kafka-console-consumer.sh --zookeeper kafka-zoo-svc:2181 --from-beginning --topic mgr
_____________________________________________________________________
from pykafka import KafkaClient
client = KafkaClient(hosts="125.39.193.243:9092")
client.topics
topic = client.topics['mgr']
producer = topic.get_producer()
producer.produce('test message 1')
______________________________________________________________________
概念问题
kafaka和zookeeper的群集,使用samsa的时候生产者和消费者都连接了zookeeper,但是我跟峰云(大数据大牛,运维屌丝逆转)沟通,他们使用的时候是生产者直接连接kafaka服务器列表,消费者才用zookeeper。这也解决了我看pykafka文档,只有消费者才连接zookeeper的困惑,所以问题解决,直接按照文档搞起。
生产者123456>>> from pykafka import KafkaClient>>> client = KafkaClient(hosts="192.168.1.1:9092, 192.168.1.2:9092") # 可接受多个Client这是重点>>> client.topics # 查看所有topic>>> topic = client.topics['my.test'] # 选择一个topic>>> producer = topic.get_producer()>>> producer.produce(['test message ' + str(i ** 2) for i in range(4)]) # 加了个str官方的例子py2.7跑不过
消费者
12345>>> balanced_consumer = topic.get_balanced_consumer( consumer_group='testgroup', auto_commit_enable=True, # 设置为Flase的时候不需要添加 consumer_group zookeeper_connect='myZkClusterNode1.com:2181,myZkClusterNode2.com:2181/myZkChroot' # 这里就是连接多个zk)
_______________________________________________________________________
myron