tft每日頭條

 > 科技

 > cassandra導出數據

cassandra導出數據

科技 更新时间:2025-01-16 00:52:24

cassandra導出數據(Cassandra時間序列大規模數據建模)1

在開始使用Cassandra和時間序列數據時,人們面臨的最大挑戰之一是理解編寫工作負載對集群的影響。過快地寫入單個分區可能會創建熱點,從而限制向外擴展的能力。分區太大可能會導緻修複、流和讀取性能方面的問題。從大分區的中間讀取會帶來很大的開銷,并導緻GC壓力的增加。Cassandra 4.0應該可以提高大分區的性能,但是它不能完全解決我已經提到的其他問題。在可預見的未來,我們将需要考慮它們的性能影響,并相應地進行計劃。

在這篇文章中,我将讨論一種常見的Cassandra數據建模技術,稱為bucketing。bucketing是一種策略,讓我們可以控制每個分區中存儲多少數據,以及将寫出的數據分散到整個集群。這篇文章将讨論兩種形式的攻擊。當數據模型需要進一步擴展時,可以結合使用這些技術。讀者應該已經熟悉了分區的解剖和基本的CQL命令。

當我們第一次使用Cassandra學習數據建模時,我們可能會看到如下内容:

CREATE TABLE raw_data ( sensor text, ts timeuuid, readint int, primary key(sensor, ts)) WITH CLUSTERING ORDER BY (ts DESC) AND compaction = {'class': 'TimeWindowCompactionStrategy', 'compaction_window_size': 1, 'compaction_window_unit': 'DAYS'};

這是存儲一些非常簡單的傳感器數據的一個很好的第一個數據模型。通常我們收集的數據要比整數複雜得多,但在這篇文章中,我們将關注鍵。我們利用TWCS作為壓縮戰略。TWCS将幫助我們處理壓縮大分區的開銷,這将使我們的CPU和I/O處于控制之下。不幸的是,它仍然有一些明顯的限制。如果我們不使用TTL,那麼當我們接收更多數據時,我們的分區大小将無限地持續增長。如上所述,在修複、流化或從任意時間片讀取數據時,大分區會帶來很大的開銷。

為了分解這個大分區,我們将利用第一種形式的bucketing。我們将根據時間窗口将我們的分區分成更小的分區。理想的大小是将分區保持在100MB以下。例如,如果我們每天存儲50-75MB的數據,那麼每天每個傳感器一個分區就是一個不錯的選擇。隻要分區不超過100MB,我們也可以簡單地使用周(從某個紀元開始)、月和年。無論選擇什麼,留一點增長空間是個好主意。

為此,我們将向分區鍵添加另一個組件。修改之前的數據模型,我們将添加一個day字段:

CREATE TABLE raw_data_by_day (sensor text,day text,ts timeuuid,reading int,primary key((sensor, day), ts)) WITH CLUSTERING ORDER BY (ts DESC) AND COMPACTION = {'class': 'TimeWindowCompactionStrategy', 'compaction_window_unit': 'DAYS', 'compaction_window_size': 1};

插入到表中需要使用date和now()值(你也可以在你的應用代碼中生成一個TimeUUID):

INSERT INTO raw_data_by_day (sensor, day, ts, reading) VALUES ('mysensor', '2017-01-01', now(), 10);

這是限制每個分區的數據量的一種方法。為了跨多天獲取大量數據,您需要每天發出一個查詢。這樣查詢的好處在于,我們可以将工作分散到整個集群,而不是要求單個節點執行大量工作。我們還可以通過依賴驅動程序中的異步調用并行地發出這些查詢。對于這種用例,Python驅動程序甚至有一個方便的輔助函數:

from itertools import productfrom cassandra.concurrent import execute_concurrent_with_args

days = ["2017-07-01", "2017-07-12", "2017-07-03"] # collecting three days worth of datasession = Cluster(["127.0.0.1"]).connect("blog")prepared = session.prepare("SELECT day, ts, reading FROM raw_data_by_day WHERE sensor = ? and day = ?")

args = product(["mysensor"], days) # args: ('test', '2017-07-01'), ('test', '2017-07-12'), ('test', '2017-07-03')

# driver handles concurrency for youresults = execute_concurrent_with_args(session, prepared, args)

# Results:#[ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d36750>),# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d36a90>),# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d36550>)]

這種技術的一種變體是每個時間窗口使用不同的表。例如,每月使用一個表意味着每年有12個表:

CREATE TABLE raw_data_may_2017 ( sensor text, ts timeuuid, reading int, primary key(sensor, ts)) WITH COMPACTION = {'class': 'TimeWindowCompactionStrategy', 'compaction_window_unit': 'DAYS', 'compaction_window_size': 1};

這種策略的主要好處是有助于存檔和快速删除舊數據。例如,在每個月的開始,我們可以将上個月的數據以拼花的格式歸檔到HDFS或S3中,利用便宜的存儲來進行分析。當我們不再需要Cassandra中的數據時,我們可以簡單地删除表。您可能會看到,在創建和删除表時需要進行一些額外的維護,因此,這種方法實際上隻有在需要歸檔時才有用。還有其他存檔數據的方法,因此這種類型的bucketing可能是不必要的。

上面的策略主要是防止分區在長時間内變得太大。如果我們有一個可預測的工作負載和有很小變化的分區大小,這是很好的。我們可能會攝入太多的信息,以至于單個節點無法寫出數據,或者一小部分對象的攝入率要高得多。Twitter就是一個很好的例子,有些人擁有數千萬的追随者,但這并不常見。對于我們需要大規模使用的這些類型的賬戶,通常會有一個單獨的代碼路徑

第二種技術在任何給定時間使用多個分區将插入扇出到整個集群。這個策略的好處是,我們可以使用一個分區來處理小卷,使用多個分區來處理大卷。

我們在這個設計中所做的權衡是在讀取時我們需要使用散射聚集,這有明顯的更高的開銷。這可能會使分頁更加困難。我們需要能夠跟蹤我們為每個小發明攝取了多少數據。這是為了确保我們可以選擇正确數量的分區來使用。如果我們使用太多的桶,我們就會在很多分區上執行很多非常小的讀取操作。如果桶太少,我們會得到非常大的分區,這些分區不能很好地壓縮、修複、流處理,并且讀取性能很差。

在這個例子中,我們将研究一個理論模型,它适用于那些在Twitter這樣的社交網絡上關注大量用戶的人。大多數帳戶都可以使用一個單獨的分區來接收消息,但有些人/機器人可能會關注數百萬個帳戶。

免責聲明:我不知道Twitter實際上是如何存儲他們的數據的,這隻是一個簡單的例子來讨論。

CREATE TABLE tweet_stream ( account text, day text, bucket int, ts timeuuid, message text, primary key((account, day, bucket), ts)) WITH CLUSTERING ORDER BY (ts DESC) AND COMPACTION = {'class': 'TimeWindowCompactionStrategy', 'compaction_window_unit': 'DAYS', 'compaction_window_size': 1};

這個數據模型擴展了前面的數據模型,将bucket添加到分區鍵中。現在,每天都可以從多個桶中獲取數據。當需要讀取時,我們需要從所有分區中獲取所需的結果。為了演示,我們将插入一些數據到我們的分區:

cqlsh:blog> insert into tweet_stream (account, day, bucket, ts, message) VALUES ('jon_haddad', '2017-07-01', 0, now(), 'hi');cqlsh:blog> insert into tweet_stream (account, day, bucket, ts, message) VALUES ('jon_haddad', '2017-07-01', 1, now(), 'hi2');cqlsh:blog> insert into tweet_stream (account, day, bucket, ts, message) VALUES ('jon_haddad', '2017-07-01', 2, now(), 'hi3');cqlsh:blog> insert into tweet_stream (account, day, bucket, ts, message) VALUES ('jon_haddad', '2017-07-01', 3, now(), 'hi4');

如果我們想要十個最新的消息,我們可以這樣做:

from itertools import chainfrom cassandra.util import unix_time_from_uuid1

prepared = session.prepare("SELECT ts, message FROM tweet_stream WHERE account = ? and day = ? and bucket = ? LIMIT 10")# let's get 10 buckets partitions = range(10)# [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

args = product(["jon_haddad"], ["2017-07-01"], partitions)

result = execute_concurrent_with_args(session, prepared, args)

# [ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1e6d0>),# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1d710>),# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1d4d0>),# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1d950>),# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1db10>),# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1dfd0>),# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1dd90>),# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1d290>),# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1e250>),# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1e490>)]

results = [x.result_or_exc for x in result]

# append all the results togetherdata = chain(*results) sorted_results = sorted(data, key=lambda x: unix_time_from_uuid1(x.ts), reverse=True)

# newest stuff first# [Row(ts=UUID('e1c59e60-7406-11e7-9458-897782c5d96c'), message=u'hi4'),# Row(ts=UUID('dd6ddd00-7406-11e7-9458-897782c5d96c'), message=u'hi3'),# Row(ts=UUID('d4422560-7406-11e7-9458-897782c5d96c'), message=u'hi2'),# Row(ts=UUID('d17dae30-7406-11e7-9458-897782c5d96c'), message=u'hi')]

這個例子隻使用了10個項目,所以我們可以作為懶惰的程序員,合并列表,然後對它們排序。如果我們想獲取更多的元素我們就需要k路歸并算法。我們将在以後的博客中進一步讨論這個話題。

此時,您應該對如何圍繞集群分發數據和請求有了更好的理解,這使得集群可以比使用單個分區時擴展得更大。記住每個問題都是不同的,沒有萬能的解決方案。

本文:http://jiagoushi.pro/node/1348

,

更多精彩资讯请关注tft每日頭條,我们将持续为您更新最新资讯!

查看全部

相关科技资讯推荐

热门科技资讯推荐

网友关注

Copyright 2023-2025 - www.tftnews.com All Rights Reserved