首页 » 网站建设 » kafkaphpcreattopic技巧_0254若何运用StreamSets实时采集Kafka并入库Kudu

kafkaphpcreattopic技巧_0254若何运用StreamSets实时采集Kafka并入库Kudu

访客 2024-11-15 0

扫一扫用手机浏览

文章目录 [+]

Fayson的github:https://github.com/fayson/cdhproject

提示:代码块部分可以旁边滑动查看噢

kafkaphpcreattopic技巧_0254若何运用StreamSets实时采集Kafka并入库Kudu

1.文档编写目的

kafkaphpcreattopic技巧_0254若何运用StreamSets实时采集Kafka并入库Kudu
(图片来自网络侵删)

Fayson在前面的文章《

如何利用StreamSets实现MySQL中变革数据实时写入Kudu

》,本篇文章紧张先容如何利用StreamSets实时采集Kafka的数据并将采集的数据写入Kudu。

内容概述

1.测试环境准备

2.准备生产Kafka数据脚本

3.配置StreamSets

4.流程测试及数据验证

测试环境

1.RedHat7.4

2.CM和CDH版本为cdh5.13.3

3.kafka3.0.0(0.11.0)

4.Kudu 1.5.0

前置条件

1.集群已安装Kafka并正常运行

2.集群未启用Kerberos

2.测试环境准备

1.通过如下命令创建测试topic

kafka-topics --create --zookeeper master.gzyh.com:2181,cdh01.gzyh.com:2181,cdh02.gzyh.com:2181 --replication-factor 3 --partitions 3 --topic kafka2kudu_topic

(可旁边滑动)

2.通过Hue利用Impala创建一个Kudu表,创建脚本如下:

CREATE TABLE ods_deal_daily_kudu ( id STRING COMPRESSION snappy, name STRING COMPRESSION snappy, sex STRING COMPRESSION snappy, city STRING COMPRESSION snappy, occupation STRING COMPRESSION snappy, mobile_phone_num STRING COMPRESSION snappy, fix_phone_num STRING COMPRESSION snappy, bank_name STRING COMPRESSION snappy, address STRING COMPRESSION snappy, marriage STRING COMPRESSION snappy, child_num INT COMPRESSION snappy, PRIMARY KEY (id)) PARTITION BY HASH PARTITIONS 16STORED AS KUDU TBLPROPERTIES ('kudu.master_addresses'='master.gzyh.com');

(可旁边滑动)

这里在创建Kudu表的时候增加了kudu.master的配置,如果在Impala中未启用集成kudu的配置则须要增加该参数,在Impala中配置向如下:

3..准备测试数据文件

共600条测试数据,数据的id是唯一的。

3.生产Kafka

在这里Fayson读取的是本地的数据文件,将每行文件解析并封装为json数据,实时的发送给Kafka。

1.创建Maven工程,工程的pom.xml文件内容如下:

<project xmlns=\"大众http://maven.apache.org/POM/4.0.0\"大众 xmlns:xsi=\"大众http://www.w3.org/2001/XMLSchema-instance\"大众 xsi:schemaLocation=\"大众http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd\"大众> <parent> <artifactId>cdh-project</artifactId> <groupId>com.cloudera</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>kafka-demo</artifactId> <packaging>jar</packaging> <name>kafka-demo</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.2.0</version> </dependency> <dependency> <groupId>net.sf.json-lib</groupId> <artifactId>json-lib</artifactId> <version>2.4</version> <classifier>jdk15</classifier> </dependency> </dependencies></project>

(可旁边滑动)

2.编写ReadFileToKafka.java文件内容如下:

package com.cloudera.nokerberos;import net.sf.json.JSONObject;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.clients.producer.ProducerRecord;import java.io.;import java.util.HashMap;import java.util.Map;import java.util.Properties;/ package: com.cloudera.nokerberos describe: 通过读取本地text文件将文件内容解析并组装为JSON发送到Kafka creat_user: Fayson email: htechinfo@163.com creat_date: 2018/4/27 creat_time: 下午4:42 公众年夜众号:Hadoop实操 /public class ReadFileToKafka { public static String confPath = System.getProperty(\"大众user.dir\"大众) + File.separator + \"大众conf\"大众; public static void main(String[] args) { if(args.length < 1) { System.out.print(\"大众短缺输入参数,请指定要处理的text文件\"大众); System.exit(1); } String filePath = args[0]; BufferedReader reader = null; try { Properties appProperties = new Properties(); appProperties.load(new FileInputStream(new File(confPath + File.separator + \"大众app.properties\"大众))); String brokerlist = String.valueOf(appProperties.get(\公众bootstrap.servers\公众)); String topic_name = String.valueOf(appProperties.get(\"大众topic.name\公众)); Properties props = getKafkaProps(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerlist); Producer<String, String> producer = new KafkaProducer<String, String>(props); reader = new BufferedReader(new FileReader(filePath)); String tempString = null; int line = 1; // 一次读入一行,直到读入null为文件结束 while ((tempString = reader.readLine()) != null) { String detailJson = parseJSON(tempString); ProducerRecord record = new ProducerRecord<String, String>(topic_name, detailJson); producer.send(record); line++; } reader.close(); producer.flush(); producer.close(); } catch (Exception e) { e.printStackTrace(); } finally { if (reader != null) { try { reader.close(); } catch (IOException e1) { } } } } / 将txt文件中的每行数据解析并组装为json字符串 @param tempString @return / private static String parseJSON(String tempString) { if(tempString != null && tempString.length() > 0) { Map<String, String> resultMap = null; String[] detail = tempString.split(\"大众\001\"大众); resultMap = new HashMap<>(); resultMap.put(\公众id\公众, detail[0]); resultMap.put(\"大众name\"大众, detail[1]); resultMap.put(\公众sex\"大众, detail[2]); resultMap.put(\"大众city\公众, detail[3]); resultMap.put(\"大众occupation\"大众, detail[4]); resultMap.put(\公众mobile_phone_num\"大众, detail[5]); resultMap.put(\公众fix_phone_num\公众, detail[6]); resultMap.put(\"大众bank_name\"大众, detail[7]); resultMap.put(\公众address\公众, detail[8]); resultMap.put(\"大众marriage\公众, detail[9]); resultMap.put(\"大众child_num\"大众, detail[10]); return JSONObject.fromObject(resultMap).toString(); } return null; } / 初始化Kafka配置 @return / private static Properties getKafkaProps() { try{ Properties props = new Properties(); props.put(ProducerConfig.ACKS_CONFIG, \"大众all\公众); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1000); //批量发送 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, \"大众org.apache.kafka.common.serialization.StringSerializer\公众); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, \"大众org.apache.kafka.common.serialization.StringSerializer\"大众); return props; } catch (Exception e) { e.printStackTrace(); } return null; }}

(可旁边滑动)

3.将编写好的代码利用mvn命令打包

在工程目录利用mvn cleanpackage命令进行编译打包

4.编写脚本run.sh脚本运行jar包

运行脚本目录构造如下

run.sh脚本内容如下

[root@master kafka-run]# vim run.sh #!/bin/bash########################################## 创建Topic# kafka-topics --create --zookeeper master.gzyh.com:2181,cdh01.gzyh.com:2181,cdh02.gzyh.com:2181 --replication-factor 3 --partitions 3 --topic ods_deal_daily_topic########################################JAVA_HOME=/usr/java/jdk1.8.0_131#要读取的文件read_file=$1for file in `ls lib/jar`do CLASSPATH=$CLASSPATH:$filedoneexport CLASSPATH${JAVA_HOME}/bin/java -Xms1024m -Xmx2048m com.cloudera.nokerberos.ReadFileToKafka $read_file

(可旁边滑动)

conf目录下的配置文件app.properties内容如下

[root@master kafka-run]# vim conf/app.properties bootstrap.servers=cdh01.gzyh.com:9092,cdh02.gzyh.com:9092,cdh03.gzyh.com:9092topic.name=ods_deal_daily_topic

(可旁边滑动)

lib目录的依赖包

依赖包可以在命令行利用mvn命令导出:

mvn dependency:copy-dependencies -DoutputDirectory=/tmp/lib

(可旁边滑动)

数据文件内容:

4.在StreamSets上创建Pipline

1.登录StreamSets,创建一个kafka2kudu的Pipline

2.在Pipline流程中添加Kafka Consumer作为源并配置Kafka根本信息

3.配置Kafka干系信息,如Broker、ZK及Topic

4.配置数据格式化办法,写入Kafka的数据为JSON格式,以是这里选择JSON

5.添加Kudu模块及配置基本信息

6.配置Kudu的Master、Table、Operation等

Kudu Masters:可以配置多个,多个地址以“,”分割

Table Name:如果利用Impala创建的Kudu表则须要添加impala::前缀

Field to ColumnMapping:配置Json中key与Kudu表的column的映射关系,如果字段名称同等则不须要配置。

DefaultOpertation:设置操作类型如:insert、upsert、delete

Kudu模块高等配置利用默认配置

5.流程测试验证

1.启动kafka2kudu的Pipline,启动成功如下图显示

2.在命令走运行run.sh脚本向Kafka发送

[root@master kafka-run]# sh run.sh ods_user_600.txt

(可旁边滑动)

上面实行了两次脚本。

3.在命令走运行run.sh脚本向Kafka发送

点击Kudu模块,查看监控信息

4.查看Kudu的ods_deal_daily_kudu表内容

入库的数据总条数

可以看到ods_deal_daily_kudu表的总条数与准备的测试数据量同等。

GitHub地址:

https://github.com/fayson/cdhproject/tree/master/kafkademo/readlocalfile-kafka-shell

https://github.com/fayson/cdhproject/blob/master/kafkademo/src/main/java/com/cloudera/nokerberos/ReadFileToKafka.java

提示:代码块部分可以旁边滑动查看噢

为天地立心,为平生易近立命,为往圣继绝学,为万世开太平。

温馨提示:要看高清无码套图,请利用手机打开并单击图片放大查看。

推举关注Hadoop实操,第一韶光,分享更多Hadoop干货,欢迎转发和分享。

原创文章,欢迎转载,转载请注明:转载自微信"大众年夜众号Hadoop实操

标签:

相关文章

php反射场景感化技巧_php反射机制用法详解

面向工具编程中工具被授予了自省的能力,而这个自省的过程便是反射。反射,直不雅观理解便是根据到达地找到出发地和来源。比如,一个光秃秃...

网站建设 2024-12-11 阅读0 评论0