红联Linux门户
Linux帮助

rabbitmq和kafka简单的性能测试

发布时间:2016-02-16 09:52:56来源:linux网站作者:风荷举

测试环境:ubuntu 15.10 64位

cpu:inter core i7-4790 3.60GHZ * 8

内存:16GB

硬盘:ssd 120GB

软件环境:rabbmitmq 3.6.0   kafka0.8.1  (均为单机本机运行)


测试结果:

kafka :消费速度: 37,586 /s  生产速度: 448,753 /s

rabbitmq: 消费速度: 20,807 /s  生产速度  16.413 /s


出现问题:

rabbitmq 生产4分钟左右出现队列阻塞,无法继续添加数据,1分钟后恢复,再过大约1分钟又出现此现象并以约1分钟为间隔出现此问题。

rabbitmq 生产对象时有不小的几率(约 1/20)添加队列失败,报出的错误是“tcp链接重置”

其他并无任何问题。


结论:

很明显的看出kafka的性能远超rabbitmq。不过这也是理所当然的,毕竟2个消息队列实现的协议是不一样的,处理消息的场景也大有不同。rabbitmq适合处理一些数据严谨的消息,比如说支付消息,社交消息等不能丢失的数据。kafka是批量操作切不报证数据是否能完整的到达消费者端,所以适合一些大量的营销消息的场景。


kafka:

package main
import (
"github.com/Shopify/sarama"
"os"
"os/signal"
"sync"
"log"
"time"
)


func main() {
go producer()
//go consumer()
time.Sleep(10*time.Minute)
}

func producer()  {
config :=sarama.NewConfig()
config.Producer.Return.Successes = true
proder,err := sarama.NewAsyncProducer([]string{"localhost:9092"},config)
if err != nil {
panic(err)
}

signals :=make(chan  os.Signal,1)
signal.Notify(signals,os.Interrupt)

var (
wg  sync.WaitGroup
enqueued, successes, errors int
)

wg.Add(1)
go func() {
defer  wg.Done()
for _=range proder.Successes(){
successes++
}
}()
wg.Add(1)
go func() {
defer wg.Done()
for err := range proder.Errors(){
log.Println(err)
errors++
}
}()

go func() {
t1 := time.NewTicker(time.Second)
for{
<- t1.C
log.Println(enqueued)
}
}()

ProducerLoop:

for{
message :=&sarama.ProducerMessage{Topic:"test",Value:sarama.StringEncoder("testing 123")}
select {
case proder.Input() <- message:
enqueued++

case <- signals:
proder.AsyncClose()
break ProducerLoop
}

}

wg.Wait()
log.Println("Successfully produced:%d;errors:%d\n",successes,errors)

}

func consumer()  {
coner,err := sarama.NewConsumer([]string{"localhost:9092"},nil)
if err != nil {
panic(err)
}

defer func() {
if err :=coner.Close(); err !=nil{
log.Fatalln(err)
}
}()

partitionConsumer ,err := coner.ConsumePartition("test",0,sarama.OffsetNewest)
if err != nil {
panic(err)
}

defer func() {
if err := partitionConsumer.Close();err!=nil{
log.Fatalln(err)
}
}()

signals := make(chan os.Signal,1)
signal.Notify(signals,os.Interrupt)
consumed:=0

go func() {
t1 := time.NewTicker(time.Second)
for{
<- t1.C
log.Println(consumed)
}
}()

ConsumerLoop:
for{
select {
case _ = <-partitionConsumer.Messages():

consumed++
//log.Println( string(msg.Value),"  =>  ",consumed)
case <-signals:
break ConsumerLoop
}
}

log.Printf("Consumed: %d\n", consumed)
}


rabbitmq:

package main

import (
"github.com/streadway/amqp"
"time"
"fmt"
"log"
)

const (
queueName = "push.msg.q"
exchange  = "t.msg.ex"
mqurl ="amqp://shimeng:shimeng1015@192.168.155.106:5672/push"

)

var conn *amqp.Connection
var channel *amqp.Channel

func main() {
fmt.Println(1)
//push()
receive()
//fmt.Println("end")
//close()
}

func failOnErr(err error, msg string) {
if err != nil {
log.Fatalf("%s:%s", msg, err)
panic(fmt.Sprintf("%s:%s", msg, err))
}
}

func mqConnect() {
var err error
conn, err = amqp.Dial(mqurl)
if err != nil {
log.Println(1)
log.Fatalln(err)
}
fmt.Println(5)
channel, err = conn.Channel()
if err != nil {
fmt.Println(2)
log.Fatalln(err)
}else {
fmt.Println("a")
}
}

func push() {
count := 0
if channel == nil {
fmt.Println(2)
mqConnect()
}else {
fmt.Println(3)
}
msgContent := "hello world!"
t1 := time.NewTicker(time.Second)

go func() {
for{
<- t1.C
log.Println(count)
}
}()

for{
err := channel.Publish(exchange, "test", false, false, amqp.Publishing{
ContentType: "text/plain",
Body:[]byte(msgContent),
})
if err != nil {

}else {
count ++
}

}

}

func receive() {
if channel == nil {
mqConnect()
}
count :=0
msgs, err := channel.Consume(queueName, "", true, false, false, false, nil)
failOnErr(err, "")

forever := make(chan bool)

t1 := time.NewTicker(time.Second)
go func() {
for{
<- t1.C
log.Println(count)
}
}()
go func() {
//fmt.Println(*msgs)
for _= range msgs {
count ++
//s := BytesToString(&(d.Body))
//count++
//fmt.Printf("receve msg is :%s -- %d\n", *s, count)
}
}()

fmt.Printf(" [*] Waiting for messages. To exit press CTRL+C\n")
<-forever
}


本文永久更新地址:http://www.linuxdiyf.com/linux/18107.html