Apache Kudu简介及 JavaAPI示例

Apache Kudu 简介

Kudu是Cloudera开源的新型列式存储系统,是Apache Hadoop生态圈的新成员之一(incubating),专门为了对快速变化的数据进行快速的分析

特性:

  • OLAP
  • 对数据扫描(scan)和随机访问(random access)同时具有高性能,简化用户复杂的混合架构
  • 支持单条或批量的数据读写,支持schema的创建修改
  • 既可以当作简单的key-value 使用,也可以作为复杂的几百不同的强类型属性。

常见的几个应用场景:

  1. 实时更新的应用。刚刚到达的数据就马上要被终端用户使用访问到
  2. 时间序列相关的应用,需要同时支持
  3. 根据海量历史数据查询
  4. 非常快地返回关于单个实体的细粒度查询
  5. 实时预测模型的应用,支持根据所有历史数据周期地更新模型

1.1 基本框架

Kudu是用于存储结构化(structured)的表(Table)。表有预定义的带类型的列(Columns),每张表有一个主键(primary key)。主键带有唯一性(uniqueness)限制,可作为索引用来支持快速的random access。

Kudu的表是由很多数据子集构成的,表被水平拆分成多个Tablets. Kudu用以每个tablet为一个单元来实现数据的durability。Tablet有多个副本,同时在多个节点上进行持久化。

Kudu有两种类型的组件,Master Server和Tablet Server。Master负责管理元数据。这些元数据包括talbet的基本信息,位置信息。Master还作为负载均衡服务器,监听Tablet Server的健康状态。对于副本数过低的Tablet,Master会在起replication任务来提高其副本数。Master的所有信息都在内存中cache,因此速度非常快。每次查询都在百毫秒级别。Kudu支持多个Master,不过只有一个active Master,其余只是作为灾备,不提供服务。

Tablet Server上存了10~100个Tablets,每个Tablet有3(或5)个副本存放在不同的Tablet Server上,每个Tablet同时只有一个leader副本,这个副本对用户提供修改操作,然后将修改结果同步给follower。Follower只提供读服务,不提供修改服务。副本之间使用raft协议来实现High Availability,当leader所在的节点发生故障时,followers会重新选举leader。根据官方的数据,其MTTR约为5秒,对client端几乎没有影响。Raft协议的另一个作用是实现Consistency。Client对leader的修改操作,需要同步到N/2+1个节点上,该操作才算成功。

Kudu采用了类似log-structured存储系统的方式,增删改操作都放在内存中的buffer,然后才merge到持久化的列式存储中。Kudu还是用了WALs来对内存中的buffer进行灾备。

1.2 列式存储

持久化的列式存储存储,与HBase完全不同,而是使用了类似Parquet的方式,同一个列在磁盘上是作为一个连续的块进行存放的。

1
SELECT COUNT(*) FROM tweets WHERE user_name = ‘newsycbot’;

我们只需要查询User_name这个block即可。同一个列的数据是集中的,而且是相同格式的,Kudu可以对数据进行编码,例如字典编码,行长编码,bitshuffle等。通过这种方式可以很大的减少数据在磁盘上的大小,提高吞吐率。除此之外,用户可以选择使用通用的压缩格式对数据进行压缩,如LZ4, gzip, 或bzip2

1.3 vs Hbase

  • 在scan和range查询上,kudu比HBase快很多,而random access则比HBase稍慢

ApacheKudu_javaAPI操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.*;
import org.junit.Assert;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;

/**
* KUDU ut. <br />
*
* @author shenlongguang<https://github.com/ifengkou>
* @date: 2019/5/13
*/
public class KuduUT {
public static String tableName = "testkudu004";
public static String KUDU_HOST = "52.82.62.xx";
@Test
public void testKuduDDL(){
KuduClient client = new KuduClient.KuduClientBuilder(KUDU_HOST).build();
List<ColumnSchema> columns = new ArrayList(2);
columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.STRING)
.key(true)
.build());
columns.add(new ColumnSchema.ColumnSchemaBuilder("value",Type.STRING).build());
List<String> rangeKeys = new ArrayList<>();
rangeKeys.add("key");
Schema schema = new Schema(columns);

try {
client.createTable(tableName, schema,
new CreateTableOptions().setRangePartitionColumns(rangeKeys));
client.getTablesList().getTablesList().forEach(str-> System.out.println(str));
client.close();
} catch (KuduException e) {
e.printStackTrace();
}

}

@Test
public void testKuduListTables(){
KuduClient client = new KuduClient.KuduClientBuilder(KUDU_HOST).build();
try {
client.getTablesList().getTablesList().forEach(str-> System.out.println(str));
client.close();
} catch (KuduException e) {
e.printStackTrace();
}

}

@Test
public void testKuduDML(){
KuduClient client = new KuduClient.KuduClientBuilder(KUDU_HOST).build();
try {
KuduTable table = client.openTable(tableName);
KuduSession session = client.newSession();
session.setTimeoutMillis(60000);
System.out.println("--------insert ---------");
//新增
for (int i = 0; i < 10; i++) {
Insert insert = table.newInsert();
PartialRow row = insert.getRow();
row.addString(0, "key_"+i);
row.addString(1, "value_"+i);
session.apply(insert);
}
printlnData(client,table);

System.out.println("--------update ---------");
//修改
Update update = table.newUpdate();
PartialRow row1 = update.getRow();
row1.addString("key","key_5");
row1.addString("value","修改值");
session.apply(update);
printlnData(client,table);

System.out.println("--------delete ---------");
//删除
Delete delete = table.newDelete();
PartialRow row2 = delete.getRow();
row2.addString(0,"key_0");//key = key_0的删除
session.apply(delete);

printlnData(client,table);
client.close();
} catch (KuduException e) {
e.printStackTrace();
}
}

@Test
public void testQueryTableData(){
KuduClient client = new KuduClient.KuduClientBuilder(KUDU_HOST).build();
try {
KuduTable table = client.openTable(tableName);
printlnData(client,table);
} catch (KuduException e) {
e.printStackTrace();
}
}

@Test
public void testComplexQuery(){
KuduClient client = new KuduClient.KuduClientBuilder(KUDU_HOST).build();
try {
long start = System.currentTimeMillis();
KuduTable table = client.openTable(tableName);

KuduPredicate predicate = KuduPredicate.newComparisonPredicate(
table.getSchema().getColumn("value"),
KuduPredicate.ComparisonOp.EQUAL,
"修改值"
);
KuduScanner.KuduScannerBuilder builder = client.newScannerBuilder(table);
builder.addPredicate(predicate);
KuduScanner scanner = builder.build();

long stop = System.currentTimeMillis();
System.out.println("time:"+(stop-start));
printScanner(scanner);
scanner.close();
} catch (KuduException e) {
e.printStackTrace();
}finally {
try {
client.close();
} catch (KuduException e) {
e.printStackTrace();
}
}
}

public static void printScanner(KuduScanner scanner ) throws KuduException {
while (scanner.hasMoreRows()){
RowResultIterator results = scanner.nextRows();
while (results.hasNext()) {
RowResult result = results.next();
System.out.println("{\"key\":"+ result.getString(0)+",\"value\":\""+result.getString(1)+"\"}");
}
}
}

public static void printlnData(KuduClient client,KuduTable table) throws KuduException {
List<String> projectColumns = new ArrayList<>();
projectColumns.add("key");
projectColumns.add("value");

KuduScanner scanner = client.newScannerBuilder(table)
.setProjectedColumnNames(projectColumns)
.build();
printScanner(scanner);
}

}

Comments

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×