Fayson的github:https://github.com/fayson/cdhproject
提示:代码块部分可以旁边滑动查看噢
1.文档编写目的

Fayson在前面的文章《
如何利用StreamSets实现MySQL中变革数据实时写入Kudu
》,本篇文章紧张先容如何利用StreamSets实时采集Kafka的数据并将采集的数据写入Kudu。
内容概述1.测试环境准备
2.准备生产Kafka数据脚本
3.配置StreamSets
4.流程测试及数据验证
测试环境1.RedHat7.4
2.CM和CDH版本为cdh5.13.3
3.kafka3.0.0(0.11.0)
4.Kudu 1.5.0
前置条件1.集群已安装Kafka并正常运行
2.集群未启用Kerberos
2.测试环境准备
1.通过如下命令创建测试topic
kafka-topics --create --zookeeper master.gzyh.com:2181,cdh01.gzyh.com:2181,cdh02.gzyh.com:2181 --replication-factor 3 --partitions 3 --topic kafka2kudu_topic
(可旁边滑动)
2.通过Hue利用Impala创建一个Kudu表,创建脚本如下:
CREATE TABLE ods_deal_daily_kudu ( id STRING COMPRESSION snappy, name STRING COMPRESSION snappy, sex STRING COMPRESSION snappy, city STRING COMPRESSION snappy, occupation STRING COMPRESSION snappy, mobile_phone_num STRING COMPRESSION snappy, fix_phone_num STRING COMPRESSION snappy, bank_name STRING COMPRESSION snappy, address STRING COMPRESSION snappy, marriage STRING COMPRESSION snappy, child_num INT COMPRESSION snappy, PRIMARY KEY (id)) PARTITION BY HASH PARTITIONS 16STORED AS KUDU TBLPROPERTIES ('kudu.master_addresses'='master.gzyh.com');
(可旁边滑动)
这里在创建Kudu表的时候增加了kudu.master的配置,如果在Impala中未启用集成kudu的配置则须要增加该参数,在Impala中配置向如下:
3..准备测试数据文件
共600条测试数据,数据的id是唯一的。
3.生产Kafka
在这里Fayson读取的是本地的数据文件,将每行文件解析并封装为json数据,实时的发送给Kafka。
1.创建Maven工程,工程的pom.xml文件内容如下:
<project xmlns=\"大众http://maven.apache.org/POM/4.0.0\"大众 xmlns:xsi=\"大众http://www.w3.org/2001/XMLSchema-instance\"大众 xsi:schemaLocation=\"大众http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd\"大众> <parent> <artifactId>cdh-project</artifactId> <groupId>com.cloudera</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>kafka-demo</artifactId> <packaging>jar</packaging> <name>kafka-demo</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.2.0</version> </dependency> <dependency> <groupId>net.sf.json-lib</groupId> <artifactId>json-lib</artifactId> <version>2.4</version> <classifier>jdk15</classifier> </dependency> </dependencies></project>
(可旁边滑动)
2.编写ReadFileToKafka.java文件内容如下:
package com.cloudera.nokerberos;import net.sf.json.JSONObject;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.clients.producer.ProducerRecord;import java.io.;import java.util.HashMap;import java.util.Map;import java.util.Properties;/ package: com.cloudera.nokerberos describe: 通过读取本地text文件将文件内容解析并组装为JSON发送到Kafka creat_user: Fayson email: htechinfo@163.com creat_date: 2018/4/27 creat_time: 下午4:42 公众年夜众号:Hadoop实操 /public class ReadFileToKafka { public static String confPath = System.getProperty(\"大众user.dir\"大众) + File.separator + \"大众conf\"大众; public static void main(String[] args) { if(args.length < 1) { System.out.print(\"大众短缺输入参数,请指定要处理的text文件\"大众); System.exit(1); } String filePath = args[0]; BufferedReader reader = null; try { Properties appProperties = new Properties(); appProperties.load(new FileInputStream(new File(confPath + File.separator + \"大众app.properties\"大众))); String brokerlist = String.valueOf(appProperties.get(\公众bootstrap.servers\公众)); String topic_name = String.valueOf(appProperties.get(\"大众topic.name\公众)); Properties props = getKafkaProps(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerlist); Producer<String, String> producer = new KafkaProducer<String, String>(props); reader = new BufferedReader(new FileReader(filePath)); String tempString = null; int line = 1; // 一次读入一行,直到读入null为文件结束 while ((tempString = reader.readLine()) != null) { String detailJson = parseJSON(tempString); ProducerRecord record = new ProducerRecord<String, String>(topic_name, detailJson); producer.send(record); line++; } reader.close(); producer.flush(); producer.close(); } catch (Exception e) { e.printStackTrace(); } finally { if (reader != null) { try { reader.close(); } catch (IOException e1) { } } } } / 将txt文件中的每行数据解析并组装为json字符串 @param tempString @return / private static String parseJSON(String tempString) { if(tempString != null && tempString.length() > 0) { Map<String, String> resultMap = null; String[] detail = tempString.split(\"大众\001\"大众); resultMap = new HashMap<>(); resultMap.put(\公众id\公众, detail[0]); resultMap.put(\"大众name\"大众, detail[1]); resultMap.put(\公众sex\"大众, detail[2]); resultMap.put(\"大众city\公众, detail[3]); resultMap.put(\"大众occupation\"大众, detail[4]); resultMap.put(\公众mobile_phone_num\"大众, detail[5]); resultMap.put(\公众fix_phone_num\公众, detail[6]); resultMap.put(\"大众bank_name\"大众, detail[7]); resultMap.put(\公众address\公众, detail[8]); resultMap.put(\"大众marriage\公众, detail[9]); resultMap.put(\"大众child_num\"大众, detail[10]); return JSONObject.fromObject(resultMap).toString(); } return null; } / 初始化Kafka配置 @return / private static Properties getKafkaProps() { try{ Properties props = new Properties(); props.put(ProducerConfig.ACKS_CONFIG, \"大众all\公众); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1000); //批量发送 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, \"大众org.apache.kafka.common.serialization.StringSerializer\公众); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, \"大众org.apache.kafka.common.serialization.StringSerializer\"大众); return props; } catch (Exception e) { e.printStackTrace(); } return null; }}
(可旁边滑动)
3.将编写好的代码利用mvn命令打包
在工程目录利用mvn cleanpackage命令进行编译打包
4.编写脚本run.sh脚本运行jar包
运行脚本目录构造如下
run.sh脚本内容如下
[root@master kafka-run]# vim run.sh #!/bin/bash########################################## 创建Topic# kafka-topics --create --zookeeper master.gzyh.com:2181,cdh01.gzyh.com:2181,cdh02.gzyh.com:2181 --replication-factor 3 --partitions 3 --topic ods_deal_daily_topic########################################JAVA_HOME=/usr/java/jdk1.8.0_131#要读取的文件read_file=$1for file in `ls lib/jar`do CLASSPATH=$CLASSPATH:$filedoneexport CLASSPATH${JAVA_HOME}/bin/java -Xms1024m -Xmx2048m com.cloudera.nokerberos.ReadFileToKafka $read_file
(可旁边滑动)
conf目录下的配置文件app.properties内容如下
[root@master kafka-run]# vim conf/app.properties bootstrap.servers=cdh01.gzyh.com:9092,cdh02.gzyh.com:9092,cdh03.gzyh.com:9092topic.name=ods_deal_daily_topic
(可旁边滑动)
lib目录的依赖包
依赖包可以在命令行利用mvn命令导出:
mvn dependency:copy-dependencies -DoutputDirectory=/tmp/lib
(可旁边滑动)
数据文件内容:
4.在StreamSets上创建Pipline
1.登录StreamSets,创建一个kafka2kudu的Pipline
2.在Pipline流程中添加Kafka Consumer作为源并配置Kafka根本信息
3.配置Kafka干系信息,如Broker、ZK及Topic
4.配置数据格式化办法,写入Kafka的数据为JSON格式,以是这里选择JSON
5.添加Kudu模块及配置基本信息
6.配置Kudu的Master、Table、Operation等
Kudu Masters:可以配置多个,多个地址以“,”分割
Table Name:如果利用Impala创建的Kudu表则须要添加impala::前缀
Field to ColumnMapping:配置Json中key与Kudu表的column的映射关系,如果字段名称同等则不须要配置。
DefaultOpertation:设置操作类型如:insert、upsert、delete
Kudu模块高等配置利用默认配置
5.流程测试验证
1.启动kafka2kudu的Pipline,启动成功如下图显示
2.在命令走运行run.sh脚本向Kafka发送
[root@master kafka-run]# sh run.sh ods_user_600.txt
(可旁边滑动)
上面实行了两次脚本。
3.在命令走运行run.sh脚本向Kafka发送
点击Kudu模块,查看监控信息
4.查看Kudu的ods_deal_daily_kudu表内容
入库的数据总条数
可以看到ods_deal_daily_kudu表的总条数与准备的测试数据量同等。
GitHub地址:
https://github.com/fayson/cdhproject/tree/master/kafkademo/readlocalfile-kafka-shell
https://github.com/fayson/cdhproject/blob/master/kafkademo/src/main/java/com/cloudera/nokerberos/ReadFileToKafka.java
提示:代码块部分可以旁边滑动查看噢
为天地立心,为平生易近立命,为往圣继绝学,为万世开太平。
温馨提示:要看高清无码套图,请利用手机打开并单击图片放大查看。
推举关注Hadoop实操,第一韶光,分享更多Hadoop干货,欢迎转发和分享。
原创文章,欢迎转载,转载请注明:转载自微信"大众年夜众号Hadoop实操