先安装zookeeper

docker run -d --restart=always --name zookeeper --volume /etc/localtime:/etc/localtime -p 2181:2181 -p 2888:2888 -p 3888:3888 -t zookeeper

再安装kafka

docker run -d --name kafka --publish 9092:9092 \
--link zookeeper \
--env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
--env KAFKA_ADVERTISED_HOST_NAME=192.168.1.22 \
--env KAFKA_ADVERTISED_PORT=9092 --restart=always \
--volume /etc/localtime:/etc/localtime \
wurstmeister/kafka


192.168.1.22为访问地址


下面为application.yml关于kafka配置

kafka:

  producer:
retries: 1
servers: 192.168.1.22:9092
linger: 1
batch:
  size: 4096
buffer:
  memory: 40960

  consumer:
auto:
  offset:
reset: latest
  commit:
interval: 100
servers: 192.168.1.22:9092
session:
  timeout: 20000
enable:
  auto:
commit: true
concurrency: 10
group:
  id: ${spring.application.name}-group


java消费者配置类

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Value("${kafka.consumer.servers}")
    private String servers;
    @Value("${kafka.consumer.enable.auto.commit}")
    private boolean enableAutoCommit;
    @Value("${kafka.consumer.session.timeout}")
    private String sessionTimeout;
    @Value("${kafka.consumer.auto.commit.interval}")
    private String autoCommitInterval;
    @Value("${kafka.consumer.group.id}")
    private String groupId;
    @Value("${kafka.consumer.auto.offset.reset}")
    private String autoOffsetReset;
    @Value("${kafka.consumer.concurrency}")
    private int concurrency;
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(concurrency);
        factory.getContainerProperties().setPollTimeout(1500);
        return factory;
    }
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>(8);
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        return propsMap;
    }
}

java生产者配置类

@Configuration
@EnableKafka
public class KafkaProducerConfig {
   @Value("${kafka.producer.servers}")
    private String servers;
    @Value("${kafka.producer.retries}")
    private int retries;
    @Value("${kafka.producer.batch.size}")
    private int batchSize;
    @Value("${kafka.producer.linger}")
    private int linger;
    @Value("${kafka.producer.buffer.memory}")
    private int bufferMemory;
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate(producerFactory());
    }
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }
}

发送kafka消息类

@Component
@Slf4j
public class KafkaMessageProducer {

   @Resource
   KafkaTemplate kafkaTemplate;

   /**
    * 发送消息
    *
    * @param message
    */
   public void sendMessage(ProducerMessage message) {
      log.info("kafka 请求参数:[{}]", JsonUtils.toJSONString(message));
      try {
         if (StringUtils.isNotBlank(message.getKey())) {
            this.kafkaTemplate.send(message.getTopic(), message.getKey(), message.getBody());
         } else {
            this.kafkaTemplate.send(message.getTopic(), message.getBody());
         }
      } catch (Exception e) {
         log.error("发送kafka异常:[{}]", message, e.getMessage(), e);
      }
      log.info("kafka response:[{}]", message);
   }
}

接受kafka消息

@KafkaListener(topics = {Constants.CREATE_ORDER_DELIVERY_TOPIC})
public void createReceivingAddress(String message){
   log.info("CREATE_ORDER_DELIVERY_TOPIC kafak message:{[]}",message);
   ReceivingAddress receivingAddress = JsonUtils.parseObject(message,ReceivingAddress.class);
   if(receivingAddress!=null){
      ReceivingAddress receivingAddressOld = receivingAddressService.findOneByEntity(receivingAddress);
      if(receivingAddressOld==null){
         receivingAddress.setStatus(DeliveryStatus.TO_BE_PAID);
         receivingAddressService.create(receivingAddress);
         return;
      }else{
         log.error("已存在订单:{[]}",JsonUtils.toJSONString(receivingAddressOld));
      }
   }else{
      log.error("CREATE_ORDER_DELIVERY_TOPIC kafka 消息异常");
   }
}