运维开发网

kafka的简单实用

运维开发网 https://www.qedev.com 2020-04-01 09:17 出处:网络 作者:运维开发网整理
首先maven导入kafka的包: <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.0.0</version> </dependency> 先来Producter生产者: public static void main(String[] arg
首先maven导入kafka的包:

<dependency>

<groupId>org.apache.kafka</groupId>

<artifactId>kafka-clients</artifactId>

<version>0.10.0.0</version>

</dependency>

先来Producter生产者:

public static void main(String[] args) {
        //kafka的配置
		Properties props = new Properties();
		props.put("bootstrap.servers", "localhost:9092");
		props.put("acks", "all");
		props.put("retries", 0);
		props.put("batch.size", 16384);
		props.put("linger.ms", 1);
		props.put("buffer.memory", 33554432);
		//key的序列化方式
		props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		//value的序列化方式一般是json方式,这里我自己写了个对象序列化方式
		props.put("value.serializer", "com.zfsy.syyx.kafka.MySeri");
		
		Producer<String, RoleBean> producer = new KafkaProducer<String, RoleBean>(props);
		RoleBean bean = new RoleBean();
		bean.setDm("test");
		bean.setMc("测试2");
		//topic为linlin
		ProducerRecord<String, RoleBean> record = new ProducerRecord<String, RoleBean>("linlin", "test03", bean);
		//发送到kafka客户端
		Future<RecordMetadata> future = producer.send(record);
		try {
			System.out.println(future.get().partition());
			
			
			
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (ExecutionException e) {
			e.printStackTrace();
		}
	}

然后就是消费者:

//消费者的配置
			Properties props2 = new Properties();
			props2.put("bootstrap.servers", "localhost:9092");
			props2.put("group.id", "test");
			props2.put("enable.auto.commit", "false");
			props2.put("auto.commit.interval.ms", "1000");
			props2.put("session.timeout.ms", "30000");
			//key的反序列化方式
			props2.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
			//value的反序列化方式
			props2.put("value.deserializer", "com.zfsy.syyx.kafka.MySeri");
			Consumer<String, RoleBean> consumer = new KafkaConsumer<String, RoleBean>(props2);
			//topic为linlin
			consumer.subscribe(Lists.newArrayList("linlin"));
			//一直获取
			while(true){
				ConsumerRecords<String, RoleBean> recoreds = consumer.poll(1000);
				Iterator<ConsumerRecord<String, RoleBean>> iterator = recoreds.iterator();
				while(iterator.hasNext()){
					ConsumerRecord<String, RoleBean> record2 = iterator.next();
					System.out.println(record2.value().getDm());
					System.out.println(record2.value().getMc());
				}
				
			}
			
		} 

MySeri源码:

public class MySeri implements Deserializer<Object>, Serializer<Object> {

	@Override
	public void configure(Map<String, ?> configs, boolean isKey) {
		// TODO Auto-generated method stub

	}

	@Override
	public Object deserialize(String topic, byte[] data) {
		ByteArrayInputStream in = new ByteArrayInputStream(data);
		try {
			ObjectInputStream objectInputStream = new ObjectInputStream(in);
			return objectInputStream.readObject();
		} catch (Exception e) {
			e.printStackTrace();
		}
		return null;
	}

	@Override
	public void close() {
		// TODO Auto-generated method stub

	}

	@Override
	public byte[] serialize(String topic, Object data) {
		ByteArrayOutputStream out = new ByteArrayOutputStream();
		try {
			ObjectOutputStream objectOutputStream = new ObjectOutputStream(out);
			objectOutputStream.writeObject(data);
			return out.toByteArray();
		} catch (IOException e) {
			e.printStackTrace();
		}
		return null;
	}
0

精彩评论

暂无评论...
验证码 换一张
取 消