Kafka的Producer发送采取的是异步发送的办法。在发送的过程中,涉及到了两个线程——main线程和Sender线程,以及一个线程共享变量——RecordAccumulator。main线程将发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker。
普通配置和异步/同步发送acks:设置发送数据是否须要做事真个反馈,有三个值0,1,-1。retries:如果要求失落败,生产者可以自动重试,将重试指定为0,以是不会重试。batch.size:生产者为每个分区掩护未发送记录的缓冲区。 这些缓冲区的大小由batch.size配置指定。linger.ms:默认情形下,纵然缓冲区中还有其他未利用的空间,缓冲区也可以立即发送。如果要减少要求数,可以将linger.ms设置为大于0的值。这将指示生产者在发送要求之前等待该毫秒数,以希望会有更多记录来添补buffer.memory:掌握生产者可用于缓冲的内存总量。如果记录的发送速率超过了将记录发送到做事器的速率,则该缓冲区空间将被耗尽。当缓冲区空间用尽时,其他发送调用将壅塞。阻挡韶光的阈值由max.block.ms确定,此阈值之后将引发TimeoutException。send()方法是异步的。调用时,它将记录添加到暂挂记录缓冲区中中并立即返回。这使生产者可以将各个记录一起批处理以提高效率。

这种办法不会抛出非常很难定位问题,一样平常不该用该办法
异步带回调函数的API回调函数会在producer收到ack时调用,为异步调用,该方法有两个参数,分别是RecordMetadata和Exception,如果Exception为null,解释发送成功,如果Exception不为null,解释发送失落败。
同步发送
同步发送的意思便是,一条发送之后,会壅塞当前哨程,直至返回ack。由于send方法返回的是一个Future工具,根据Futrue工具的特点,也可以实现同步发送的效果,只需在调用Future工具的get方发即可。
同步发送API 必要时等待打算完成,然后检索其结果。由于send调用是异步的,因此它将为RecordMetadata返回Future,该Future将分配给该记录。 在此将来调用get()将壅塞,直到关联的要求完成,然后返回记录的元数据或引发发送记录时发生的任何非常。