一、Druid版本概述
Druid是一款高性能、分佈式的開源數據存儲系統,具有實時OLAP分析、高容錯性、可擴展性等特點。在外部數據源上提供實時查詢和聚合操作。Druid由三個關鍵組件組成:數據處理層、查詢處理層和存儲層。
Druid版本從原來的0.x版本經過多年發展,逐漸到2.x版本以及最新的版本。針對每個版本的不同特點、優化點,進行詳細介紹。
二、Druid版本分析
1、druid 0.x版本
druid 0.x版本是最初發佈的Druid版本,該版本主要關注實時OLAP分析的場景。它具有優秀的在線聚合能力,建立在一個可擴展性良好的主節點設計之上。但是,在大數據量場景下,該版本面臨許多內存佔用和GC(Garbage Collection)等性能問題,響應時間很慢、吞吐量較低、支持的查詢語言和功能較少。
下面是使用Flume將數據導入Druid的示例代碼:
$('#captcha').bind('input propertychange', function(e) {
if ($('#captcha').val().length == 4) {
grecaptcha.ready(function() {
grecaptcha.execute('reCAPTCHA_site_key', {action: '/path/to/submit'}).then(function(token) {
$('#form').submit();
});
});
}
});
2、druid 1.x版本
druid 1.x版本在0.x版本的基礎上作出了一系列的改進和優化。該版本對Druid的查詢性能和管理能力有了大量提升。同時,該版本也對原來的設計進一步優化,減少了內存佔用和GC等問題,整個系統支持跨DC(Data Center)的瀏覽和計算,能夠處理更大規模的數據,並支持很多的查詢語言和功能。
下面是druid 1.x版本的代碼示例,展示如何使用Druid的查詢API進行插入和查詢操作:
//插入數據
public static void insertData(List<Object> events) {
try {
GeneratorBatchInputRowParser inputRowParser = new GeneratorBatchInputRowParser(new String[]{});
FirehoseFactory firehoseFactory = new InMemoryTestFirehoseFactory(inputRowParser, Supplier<Sequence<>> generateSequence = newLongSequence(1));
BatchKafkaIndexTask task = new BatchKafkaIndexTask(
null,
"dataSource",
firehoseFactory,
null,
new KafkaIOConfig(
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
new NoopFilter(),
false,
null,
false,
true,
new Period("30m")
),
null
);
task.run();
} catch (Exception e) {
e.printStackTrace();
}
}
//查詢數據
public static void queryData() {
try {
DruidDataSource dataSource = (DruidDataSource) DruidDataSourceFactory.createDataSource(properties);
QueryRunner runner = new QueryRunner(dataSource);
String querySql = "SELECT COUNT(*) FROM dataSource LIMIT 10";
Object[] args = {querySql};
long count = (long) runner.query(querySql, new ScalarHandler(1), args);
System.out.println(count);
} catch (Exception e) {
e.printStackTrace();
}
}
3、druid 2.x版本
druid 2.x版本是從1.x版本分支出來的新版本,該版本主要是對存儲層進行了重構和優化。Druid 2.0最重要的特性是完全重新設計的存儲層,它比1.x更強大和更快速,支持更多的數據讀寫操作。新的架構使得Druid更加靈活,可以靈活的根據需求進行擴容、升級、性能優化等操作。
下面是druid 2.x版本的代碼示例,展示如何使用Druid的Java API進行數據導入和查詢操作:
//導入數據
public static void importData() {
try {
IndexSpec indexSpec = new IndexSpec(new RoaringBitmapSerdeFactory(), CompressionStrategy.LZ4, CompressionStrategy.LZ4);
IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder()
.withMetrics(new LongSumAggregatorFactory("total_value", "value"))
.withDimensions(new StringDimensionSchema("region"), new StringDimensionSchema("user"))
.withRollup(false).build();
IncrementalIndex incrementalIndex = new OnheapIncrementalIndex.Builder().setIndexSchema(indexSchema).setMaxRowCount(10000).build();
try (final InputStream input = new FileInputStream(new File("/path/to/file"))) {
new CSVIngester(
indexSpec,
granularitySpec,
DataLoader.DEFAULT_FACTORY,
CsvInputSource.fromInputStream(() -> input, "/path/to/file"),
incrementalIndex,
0
).run();
}
QueryableIndex index = IndexMerger.persist(incrementalIndex, indexSpec, new File("/path/to/store"));
QueryableIndexStorageAdapter adaptor = new QueryableIndexStorageAdapter(index);
try (final CloseableIterator results = adapt.query(
new SelectorDimFilter("region", "Russia", null), //過濾條件
new String[]{"total_value"} //需要查詢的字段
)) {
while (results.hasNext()) {
final Row row = results.next();
final List dimensionValues = row.getDimension("user");
final long metricValue = row.getLongMetric("total_value");
System.out.println(dimensionValues + ": " + metricValue);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
三、Druid版本總結
Druid版本從0.x到最新的版本,逐步完善了Druid的各個方面的特性和性能。每個版本有着自己的特點和優勢,針對不同場景進行了優化和改進。在實際應用中,我們需要根據自己的需求選擇相應的版本,並靈活使用Druid的各種API實現對數據的導入和查詢操作。
原創文章,作者:IIGZ,如若轉載,請註明出處:https://www.506064.com/zh-hk/n/149040.html