请教 - 如何用jmeter导入csv文件到kafka ???

问题

我想用jmeter导入csv文件到kafka,但是不知道这个kafka的参数格式怎么写->kafka_key和kafka_message,并实现每秒传入50条数据

报错信息

只导入了“XH,CKSJ,CX,SFZRKMC,RKSJ,BZ,SFZCKMC,CP"这些变量名

pic04



环境


你这里只让发送一个请求,肯定就会只发送一个呀

如果数据有20条就需要发送的线程数为20。
如果数据有20条,发送的线程数设置为30也可以,但是还是只会跑20条数据,因为20条数据之后对应的没有数据进行请求了

那参数该怎么设置呢,现在只传进来了变量名,没有传实际的值

kafka_key和kafka_message这里不知道怎么填,我想实现传进一整行的数据(即包括XH,CKSJ,CX,SFZRKMC等列数据)

你是想要直接传递参数值,还是想要传递变量名加参数值呢?
如果是只想传递参数值,则需要设置变量名跟是否忽略第一行选择true;
如果是想要传递变量名加参数值则直接这样即可,只改请求线程数

其实和http请求发送参数一样,只不过是协议改变了

想要传递变量名加参数值,我修改以后还是报错



pic07

错误截图完整一些~右边看不到啦

2022-12-23 11:48:26,124 INFO o.a.j.e.StandardJMeterEngine: Running the test!
2022-12-23 11:48:26,124 INFO o.a.j.s.SampleEvent: List of sample_variables:
2022-12-23 11:48:26,124 INFO o.a.j.p.j.s.JavaSampler: Created class: co.signal.kafkameter.KafkaProducerSampler. Uses tearDownTest:
2022-12-23 11:48:26,125 INFO o.a.j.g.u.JMeterMenuBar: setRunning(true, local)
2022-12-23 11:48:26,195 INFO o.a.j.e.StandardJMeterEngine: Starting ThreadGroup: 1 : Thread Group
2022-12-23 11:48:26,195 INFO o.a.j.e.StandardJMeterEngine: Starting 1 threads for group Thread Group.
2022-12-23 11:48:26,195 INFO o.a.j.e.StandardJMeterEngine: Thread will continue on error
2022-12-23 11:48:26,195 INFO o.a.j.t.ThreadGroup: Starting thread group… number=1 threads=1 ramp-up=1 delayedStart=false
2022-12-23 11:48:26,195 INFO o.a.j.t.ThreadGroup: Started thread group number 1
2022-12-23 11:48:26,195 INFO o.a.j.e.StandardJMeterEngine: All thread groups have been started
2022-12-23 11:48:26,196 INFO o.a.j.t.JMeterThread: Thread started: Thread Group 1-1
2022-12-23 11:48:26,196 INFO o.a.j.s.FileServer: Stored: C:/Users/小短腿zz/Desktop/The-road-of-data-division-s-growth.-master/高速公路ETC入深圳数据_2920000403621.csv
2022-12-23 11:48:26,197 INFO o.a.k.c.p.ProducerConfig: ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [hadoop102:9092]
buffer.memory = 33554432
client.dns.lookup = default
client.id =
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes =
key.serializer = class kafka.serializer.NullEncoder
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters =
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class kafka.serializer.DefaultEncoder

2022-12-23 11:48:26,198 INFO o.a.k.c.p.KafkaProducer: [Producer clientId=producer-10] Closing the Kafka producer with timeoutMillis = 0 ms.
2022-12-23 11:48:26,198 ERROR o.a.j.t.JMeterThread: Error while processing sampler: ‘Java Request’.
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:433) ~[jmeter.backendlistener.kafka-1.0.0.jar:?]
at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:298) ~[jmeter.backendlistener.kafka-1.0.0.jar:?]
at co.signal.kafkameter.KafkaProducerSampler.setupTest(KafkaProducerSampler.java:143) ~[kafkameter-0.2.0.jar:?]
at org.apache.jmeter.protocol.java.sampler.JavaSampler.sample(JavaSampler.java:194) ~[ApacheJMeter_java.jar:5.5]
at org.apache.jmeter.threads.JMeterThread.doSampling(JMeterThread.java:651) ~[ApacheJMeter_core.jar:5.5]
at org.apache.jmeter.threads.JMeterThread.executeSamplePackage(JMeterThread.java:570) ~[ApacheJMeter_core.jar:5.5]
at org.apache.jmeter.threads.JMeterThread.processSampler(JMeterThread.java:501) ~[ApacheJMeter_core.jar:5.5]
at org.apache.jmeter.threads.JMeterThread.run(JMeterThread.java:268) ~[ApacheJMeter_core.jar:5.5]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_311]
Caused by: org.apache.kafka.common.KafkaException: Could not find a public no-argument constructor for kafka.serializer.NullEncoder
at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:323) ~[jmeter.backendlistener.kafka-1.0.0.jar:?]
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:370) ~[jmeter.backendlistener.kafka-1.0.0.jar:?]
at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:360) ~[jmeter.backendlistener.kafka-1.0.0.jar:?]
… 8 more
Caused by: java.lang.NoSuchMethodException: kafka.serializer.NullEncoder.()
at java.lang.Class.getConstructor0(Class.java:3082) ~[?:1.8.0_311]
at java.lang.Class.getDeclaredConstructor(Class.java:2178) ~[?:1.8.0_311]
at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:321) ~[jmeter.backendlistener.kafka-1.0.0.jar:?]
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:370) ~[jmeter.backendlistener.kafka-1.0.0.jar:?]
at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:360) ~[jmeter.backendlistener.kafka-1.0.0.jar:?]
… 8 more
2022-12-23 11:48:26,198 INFO o.a.j.t.JMeterThread: Thread is done: Thread Group 1-1
2022-12-23 11:48:26,198 INFO o.a.j.t.JMeterThread: Thread finished: Thread Group 1-1
2022-12-23 11:48:26,199 INFO o.a.j.e.StandardJMeterEngine: Notifying test listeners of end of test
2022-12-23 11:48:26,199 INFO o.a.j.s.FileServer: Close: C:/Users/小短腿zz/Desktop/The-road-of-data-division-s-growth.-master/高速公路ETC入深圳数据_2920000403621.csv
2022-12-23 11:48:26,199 WARN o.a.j.e.StandardJMeterEngine: Error encountered during shutdown of org.apache.jmeter.protocol.java.sampler.JavaSampler@4fc824a1
java.lang.NullPointerException: null
at co.signal.kafkameter.KafkaProducerSampler.teardownTest(KafkaProducerSampler.java:148) ~[kafkameter-0.2.0.jar:?]
at org.apache.jmeter.protocol.java.sampler.JavaSampler.testEnded(JavaSampler.java:298) ~[ApacheJMeter_java.jar:5.5]
at org.apache.jmeter.engine.StandardJMeterEngine.notifyTestListenersOfEnd(StandardJMeterEngine.java:230) ~[ApacheJMeter_core.jar:5.5]
at org.apache.jmeter.engine.StandardJMeterEngine.run(StandardJMeterEngine.java:507) ~[ApacheJMeter_core.jar:5.5]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_311]
2022-12-23 11:48:26,199 INFO o.a.j.g.u.JMeterMenuBar: setRunning(false, local)

kafka的消费者端没有消费到任何消息

不知您是否可以,用我的数据源在您的机子上发送数据到kafka试试,如果成功我看看截图(参数列表等设置),应该就明白啦

嗯,我下个jar包试一下

总结起来是两个问题,用CSV文件上传一行行数据(变量名+值)到Kafka,然后JavaRequest和CSV文件里的参数列表该怎么填写?是否要设置全局变量或者引用变量之类的?(因为我发现消费者端只收到${XH},${CKSJ},${CX},${SFZRKMC},${RKSJ},${BZ},${SFZCKMC},${CP},即各个变量名,而不是变量名对应的值),总之就是要把csv数据传进kafka

老师您好,我已经解决了数据导入的问题,现在我想实现每秒导入50条数据,该怎么实现呀?

这样是每秒50条数据吗 然后一共持续10秒