Apache Kudu 简介
Kudu是Cloudera开源的新型列式存储系统,是Apache Hadoop生态圈的新成员之一(incubating),专门为了对快速变化的数据进行快速的分析
特性:
- OLAP
- 对数据扫描(scan)和随机访问(random access)同时具有高性能,简化用户复杂的混合架构
- 支持单条或批量的数据读写,支持schema的创建修改
- 既可以当作简单的key-value 使用,也可以作为复杂的几百不同的强类型属性。
常见的几个应用场景:
- 实时更新的应用。刚刚到达的数据就马上要被终端用户使用访问到
- 时间序列相关的应用,需要同时支持
- 根据海量历史数据查询
- 非常快地返回关于单个实体的细粒度查询
- 实时预测模型的应用,支持根据所有历史数据周期地更新模型
1.1 基本框架
Kudu是用于存储结构化(structured)的表(Table)。表有预定义的带类型的列(Columns),每张表有一个主键(primary key)。主键带有唯一性(uniqueness)限制,可作为索引用来支持快速的random access。
Kudu的表是由很多数据子集构成的,表被水平拆分成多个Tablets. Kudu用以每个tablet为一个单元来实现数据的durability。Tablet有多个副本,同时在多个节点上进行持久化。
Kudu有两种类型的组件,Master Server和Tablet Server。Master负责管理元数据。这些元数据包括talbet的基本信息,位置信息。Master还作为负载均衡服务器,监听Tablet Server的健康状态。对于副本数过低的Tablet,Master会在起replication任务来提高其副本数。Master的所有信息都在内存中cache,因此速度非常快。每次查询都在百毫秒级别。Kudu支持多个Master,不过只有一个active Master,其余只是作为灾备,不提供服务。
Tablet Server上存了10~100个Tablets,每个Tablet有3(或5)个副本存放在不同的Tablet Server上,每个Tablet同时只有一个leader副本,这个副本对用户提供修改操作,然后将修改结果同步给follower。Follower只提供读服务,不提供修改服务。副本之间使用raft协议来实现High Availability,当leader所在的节点发生故障时,followers会重新选举leader。根据官方的数据,其MTTR约为5秒,对client端几乎没有影响。Raft协议的另一个作用是实现Consistency。Client对leader的修改操作,需要同步到N/2+1个节点上,该操作才算成功。
Kudu采用了类似log-structured存储系统的方式,增删改操作都放在内存中的buffer,然后才merge到持久化的列式存储中。Kudu还是用了WALs来对内存中的buffer进行灾备。
1.2 列式存储
持久化的列式存储存储,与HBase完全不同,而是使用了类似Parquet的方式,同一个列在磁盘上是作为一个连续的块进行存放的。
1
| SELECT COUNT(*) FROM tweets WHERE user_name = ‘newsycbot’;
|
我们只需要查询User_name这个block即可。同一个列的数据是集中的,而且是相同格式的,Kudu可以对数据进行编码,例如字典编码,行长编码,bitshuffle等。通过这种方式可以很大的减少数据在磁盘上的大小,提高吞吐率。除此之外,用户可以选择使用通用的压缩格式对数据进行压缩,如LZ4, gzip, 或bzip2
1.3 vs Hbase
- 在scan和range查询上,kudu比HBase快很多,而random access则比HBase稍慢
ApacheKudu_javaAPI操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159
| import org.apache.kudu.ColumnSchema; import org.apache.kudu.Schema; import org.apache.kudu.Type; import org.apache.kudu.client.*; import org.junit.Assert; import org.junit.Test;
import java.util.ArrayList; import java.util.List;
public class KuduUT { public static String tableName = "testkudu004"; public static String KUDU_HOST = "52.82.62.xx"; @Test public void testKuduDDL(){ KuduClient client = new KuduClient.KuduClientBuilder(KUDU_HOST).build(); List<ColumnSchema> columns = new ArrayList(2); columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.STRING) .key(true) .build()); columns.add(new ColumnSchema.ColumnSchemaBuilder("value",Type.STRING).build()); List<String> rangeKeys = new ArrayList<>(); rangeKeys.add("key"); Schema schema = new Schema(columns);
try { client.createTable(tableName, schema, new CreateTableOptions().setRangePartitionColumns(rangeKeys)); client.getTablesList().getTablesList().forEach(str-> System.out.println(str)); client.close(); } catch (KuduException e) { e.printStackTrace(); }
}
@Test public void testKuduListTables(){ KuduClient client = new KuduClient.KuduClientBuilder(KUDU_HOST).build(); try { client.getTablesList().getTablesList().forEach(str-> System.out.println(str)); client.close(); } catch (KuduException e) { e.printStackTrace(); }
}
@Test public void testKuduDML(){ KuduClient client = new KuduClient.KuduClientBuilder(KUDU_HOST).build(); try { KuduTable table = client.openTable(tableName); KuduSession session = client.newSession(); session.setTimeoutMillis(60000); System.out.println("--------insert ---------"); for (int i = 0; i < 10; i++) { Insert insert = table.newInsert(); PartialRow row = insert.getRow(); row.addString(0, "key_"+i); row.addString(1, "value_"+i); session.apply(insert); } printlnData(client,table);
System.out.println("--------update ---------"); Update update = table.newUpdate(); PartialRow row1 = update.getRow(); row1.addString("key","key_5"); row1.addString("value","修改值"); session.apply(update); printlnData(client,table);
System.out.println("--------delete ---------"); Delete delete = table.newDelete(); PartialRow row2 = delete.getRow(); row2.addString(0,"key_0"); session.apply(delete);
printlnData(client,table); client.close(); } catch (KuduException e) { e.printStackTrace(); } }
@Test public void testQueryTableData(){ KuduClient client = new KuduClient.KuduClientBuilder(KUDU_HOST).build(); try { KuduTable table = client.openTable(tableName); printlnData(client,table); } catch (KuduException e) { e.printStackTrace(); } }
@Test public void testComplexQuery(){ KuduClient client = new KuduClient.KuduClientBuilder(KUDU_HOST).build(); try { long start = System.currentTimeMillis(); KuduTable table = client.openTable(tableName);
KuduPredicate predicate = KuduPredicate.newComparisonPredicate( table.getSchema().getColumn("value"), KuduPredicate.ComparisonOp.EQUAL, "修改值" ); KuduScanner.KuduScannerBuilder builder = client.newScannerBuilder(table); builder.addPredicate(predicate); KuduScanner scanner = builder.build();
long stop = System.currentTimeMillis(); System.out.println("time:"+(stop-start)); printScanner(scanner); scanner.close(); } catch (KuduException e) { e.printStackTrace(); }finally { try { client.close(); } catch (KuduException e) { e.printStackTrace(); } } }
public static void printScanner(KuduScanner scanner ) throws KuduException { while (scanner.hasMoreRows()){ RowResultIterator results = scanner.nextRows(); while (results.hasNext()) { RowResult result = results.next(); System.out.println("{\"key\":"+ result.getString(0)+",\"value\":\""+result.getString(1)+"\"}"); } } }
public static void printlnData(KuduClient client,KuduTable table) throws KuduException { List<String> projectColumns = new ArrayList<>(); projectColumns.add("key"); projectColumns.add("value");
KuduScanner scanner = client.newScannerBuilder(table) .setProjectedColumnNames(projectColumns) .build(); printScanner(scanner); }
}
|