BigQueryでのSTRUCT型を伴うSELECT文での重複行の削除についてのメモ
まず、前提として、BigQUeryにはunique constraint的な機能が存在しない。なので、あれこれと頑張ってユニーク性を保つ作業をしたくなることがある。テキトーに検索してみてもそういう話のstack overflowの質問やそれへの回答のようなものは見つかる。
- Google BigQuery There are no primary key or unique constraints, how do you prevent duplicated records being inserted? - Stack Overflow
- distinct - Delete duplicate rows from a BigQuery table - Stack Overflow
unique constraintが存在しないだとか、重複除去したテーブル(的なもの)を如何にして作るかなどは今回言及したいことではないので諸々省略することにする。
ユニーク性を保証したくなる時
BigQueryは追記ベースで考えたほうが良い。そういうアーキテクチャであることは分かっている。まぁそれは置いておいて、ユニーク性を保証したくなる事がある。例えば、アプリのデーターベースのバックアップを取っているときに、同一の行をBigQueryに転送してしまうことがある1。
利便性やバックアップのために、複数の世代のデータをBigQueryに保持しておくということはよくあることだと思う。ここで、最新の断面だけが欲しい場合に、以下の様な形でテーブルfooに対するfoo_latest的なviewなどを設定しておくことで、分析などで利用するときに最新断面とのjoinをやりやすくというような設定はされているとする。
-- こんなに陽に書くことはなく、viewか何かになっていることが多い WITH indexTable AS ( SELECT DISTINCT id, MAX(_updatedAt) as _updatedAt FROM <database name> GROUP BY id ) SELECT DISTINCT o.* FROM <database name> as o INNER JOIN indexTable as oo ON o.id == oo.id AND o._updatedAt = oo._updatedAt
一方で、重複すること自体は防げず同じようなデータが登録されている状態になりうる。
viewを作ることに限らず、ユニーク性を担保する方法は色々あるが、基本的には重複行を排除した結果を別のテーブルなどに保存する(materialized viewも含む)という形なのではないかと思う。そのテーブルを作るためのSELECT文さえ作ってしまえばあとは何とでもなる。
このときの重複行を排除したSELECT文をどうやって作るか?ということに関するモヤモヤが今回の主題。
前提確認
まずは、実際のデータを使って状況を整理してみることにする。ここで、昨日の記事が役に立つ2。一切実データを使うことなくqueryのみで状況を確認していきたい。
とりあえず、usersテーブル的なものが存在すると仮定する。これを対象に色々やっていきたい。
アプリ上での最新断面の状態
とりあえずfooさんbarさん2人のユーザーが居るくらいの雑なデータセット。
CREATE TEMP FUNCTION Now0() RETURNS TIMESTAMP AS (TIMESTAMP_SECONDS(1628038568)); WITH users AS ( SELECT 1 as id, "foo" as name, Now0() as _updatedAt UNION ALL SELECT 2, "bar", Now0() ) SELECT * FROM users
この時のデータの出力は以下のようなもの。
[ { "id": "1", "name": "foo", "_updatedAt": "2021-08-04 00:56:08 UTC" }, { "id": "2", "name": "bar", "_updatedAt": "2021-08-04 00:56:08 UTC" } ]
これが、アプリ側から見える世界。
複数世代に跨ったデータをBigQuery側は保持
さて、先程話したとおり、BigQueryには複数世代のデータが入っているとする。 そんなわけで過去のデータも持っていることにする。_updatedAtが異なる同一idのデータも保持することにする。
CREATE TEMP FUNCTION Now0() RETURNS TIMESTAMP AS (TIMESTAMP_SECONDS(1628038568)); CREATE TEMP FUNCTION Now1() RETURNS TIMESTAMP AS (TIMESTAMP_SUB(Now0(), INTERVAL 1 DAY)); WITH users AS ( SELECT 1 as id, "foo" as name, Now0() as _updatedAt UNION ALL SELECT 2, "bar", Now0() UNION ALL -- これが過去のもの SELECT 1, "fo", Now1() ) SELECT * FROM users
id=1の行が2つある
[ { "id": "1", "name": "foo", "_updatedAt": "2021-08-04 00:56:08 UTC" }, { "id": "2", "name": "bar", "_updatedAt": "2021-08-04 00:56:08 UTC" }, { "id": "1", "name": "fo", "_updatedAt": "2021-08-03 00:56:08 UTC" } ]
これをviewなどを作ることで最新断面だけを見るということはできる様になっている。これも模倣してみる。
CREATE TEMP FUNCTION Now0() RETURNS TIMESTAMP AS (TIMESTAMP_SECONDS(1628038568)); CREATE TEMP FUNCTION Now1() RETURNS TIMESTAMP AS (TIMESTAMP_SUB(Now0(), INTERVAL 1 DAY)); WITH users AS ( SELECT 1 as id, "foo" as name, Now0() as _updatedAt UNION ALL SELECT 2, "bar", Now0() UNION ALL SELECT 1, "fo", Now1() ), users_latest AS ( -- 実際はviewなど SELECT x.* FROM users as x INNER JOIN (SELECT DISTINCT o.id, MAX(o._updatedAt) as _updatedAt FROM users as o GROUP BY id) as y ON x.id = y.id AND x._updatedAt = y._updatedAt ) SELECT * FROM users_latest
一見したところ良い感じ。
[ { "id": "1", "name": "foo", "_updatedAt": "2021-08-04 00:56:08 UTC" }, { "id": "2", "name": "bar", "_updatedAt": "2021-08-04 00:56:08 UTC" } ]
最新断面だけが参照可能な世界。
本当に同じデータが重複行として含まれることがある
ところで、先ほどのview(的なもの)は、_updatedAtのMAXで見ていたので、本当に同じデータが含まれていた場合には二重に出力されることになる。
CREATE TEMP FUNCTION Now0() RETURNS TIMESTAMP AS (TIMESTAMP_SECONDS(1628038568)); CREATE TEMP FUNCTION Now1() RETURNS TIMESTAMP AS (TIMESTAMP_SUB(Now0(), INTERVAL 1 DAY)); WITH users AS ( SELECT 1 as id, "foo" as name, Now0() as _updatedAt UNION ALL -- 完全に同じ SELECT 1 as id, "foo" as name, Now0() as _updatedAt UNION ALL SELECT 2, "bar", Now0() UNION ALL SELECT 1, "fo", Now1() ), users_latest AS ( SELECT x.* FROM users as x INNER JOIN (SELECT DISTINCT o.id, MAX(o._updatedAt) as _updatedAt FROM users as o GROUP BY id) as y ON x.id = y.id AND x._updatedAt = y._updatedAt ) SELECT * FROM users_latest
MAXで絞り込んたものとのjoinなので当然。
[ { "id": "1", "name": "foo", "_updatedAt": "2021-08-04 00:56:08 UTC" }, { "id": "1", "name": "foo", "_updatedAt": "2021-08-04 00:56:08 UTC" }, { "id": "2", "name": "bar", "_updatedAt": "2021-08-04 00:56:08 UTC" } ]
対応方法はSELECT DISTINCTで絞ったりなどで。
CREATE TEMP FUNCTION Now0() RETURNS TIMESTAMP AS (TIMESTAMP_SECONDS(1628038568)); CREATE TEMP FUNCTION Now1() RETURNS TIMESTAMP AS (TIMESTAMP_SUB(Now0(), INTERVAL 1 DAY)); WITH users AS ( SELECT 1 as id, "foo" as name, Now0() as _updatedAt UNION ALL SELECT 1 as id, "foo" as name, Now0() as _updatedAt UNION ALL SELECT 2, "bar", Now0() UNION ALL SELECT 1, "fo", Now1() ), users_latest AS ( SELECT DISTINCT -- distinct x.* FROM users as x INNER JOIN (SELECT DISTINCT o.id, MAX(o._updatedAt) as _updatedAt FROM users as o GROUP BY id) as y ON x.id = y.id AND x._updatedAt = y._updatedAt ) SELECT * FROM users_latest
はい。
[ { "id": "1", "name": "foo", "_updatedAt": "2021-08-04 00:56:08 UTC" }, { "id": "2", "name": "bar", "_updatedAt": "2021-08-04 00:56:08 UTC" } ]
これでめでたしめでたしと思いきや、ようやく本題に入れる。
本題
ようやく、本題。
STRUCT型はdistinctができない
重複除去のためのdistinctがSTRUCT型を使った場合には効かない。というかエラーになってしまう。これがだるい。
意気揚々と以下のようなqueryを実行するとエラーになってしまう。悲しい。テーブルpointsのpがSTRUCT型。
WITH points AS ( SELECT 1 as id, STRUCT<x int64, y int64>(10, 10) as p UNION ALL SELECT 2, STRUCT(0 as x, 10 as y) UNION ALL SELECT 2, STRUCT(0 as x, 10 as y) ) SELECT DISTINCT * FROM points
こういうエラー。
Column p of type STRUCT cannot be used in SELECT DISTINCT at [6:18]
なるほど。仕方がないのでGROUP BYで逃げる。これもだめ。結局フィールドごとに明示的に指定しなければいけないらしい。
WITH points AS ( SELECT 1 as id, STRUCT<x int64, y int64>(10, 10) as p UNION ALL SELECT 2, STRUCT(0 as x, 10 as y) UNION ALL SELECT 2, STRUCT(0 as x, 10 as y) ) SELECT * FROM points as t GROUP BY t.id, t.p -- Grouping by expressions of type STRUCT is not allowed at [8:9]
明示的に指定する必要があるのは良いのだけれど、指定した瞬間にflattenされてしまってだるい。
WITH points AS ( SELECT 1 as id, STRUCT<x int64, y int64>(10, 10) as p UNION ALL SELECT 2, STRUCT(0 as x, 10 as y) UNION ALL SELECT 2, STRUCT(0 as x, 10 as y) ) SELECT t.id, -- t.p, -- SELECT list expression references t.p which is neither grouped nor aggregated at [8:3] t.p.x, t.p.y FROM points as t GROUP BY t.id, t.p.x, t.p.y
そんなわけで同じ形状を保とうと思ったらもう一度作り直す必要がある3。
WITH points AS ( SELECT 1 as id, STRUCT<x int64, y int64>(10, 10) as p UNION ALL SELECT 2, STRUCT(0 as x, 10 as y) UNION ALL SELECT 2, STRUCT(0 as x, 10 as y) ) SELECT t.id, STRUCT(t.p.x as x, t.p.y as y) as p FROM points as t GROUP BY t.id, t.p.x, t.p.y
大変だった。structも自由に利用できる任意のものについての最新断面の提供ということを考えると、SQL文のコピペでは対応できずに頑張って生成するしかないのかもしれない。一方でなんだかそれが必要になるというのもオーバーワーク気味なんじゃ?という気がしている。そういうモヤモヤ。
[ { "id": "1", "p": { "x": "10", "y": "10" } }, { "id": "2", "p": { "x": "0", "y": "10" } } ]
ただ、考えてみると、元となるソースがdocument指向のDBではなくRDBMSなどであった場合はそもそも初めからflattenなのであまり問題にならないのかもしれない?
まとめ
- BigQueryで最新断面とのjoinがしたくなる。
- STRUCT型ではDISTINCTが効かない。だるい。
gist
追記:
後日、TobakuCptlsmさんにtwitterでROW_NUMBER()と組み合わせる方法を教えてもらいました。実はROW_NUMBER()の存在自体は知っていたのですが、行番号に依存したコードはあまりSQL的には嬉しくないんじゃないか?と思ったりなどして記事に含めるのを辞めていたのでした
その後、色々bigqueryのドキュメントを読んでいくうちに、CDC(Change Data Capture)を利用したデータの同期のドキュメントでも同様の説明があったことから利用しても良いかもしれないと思うようになりました。
すこし形は違いますが以下の様な例の部分です。
bq mk --view \ "SELECT * EXCEPT(change_type, row_num) FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY id ORDER BY change_id DESC) AS row_num FROM ( SELECT * EXCEPT(change_type), change_type FROM \`$(gcloud config get-value project).cdc_tutorial.session_delta\` UNION ALL SELECT *, 'I' FROM \`$(gcloud config get-value project).cdc_tutorial.session_main\`)) WHERE row_num = 1 AND change_type <> 'D'" \ cdc_tutorial.session_latest_v
あとは、viewではなくmaterialized viewないしは別テーブルへのqueryになっていればROW_NUMBER()を使う形でも悪くないかなと思うようになりました。
参考
- Google BigQuery There are no primary key or unique constraints, how do you prevent duplicated records being inserted? - Stack Overflow
- 標準 SQL のタイムスタンプ関数 | BigQuery | Google Cloud
- 標準 SQL ユーザー定義関数 | BigQuery | Google Cloud
- 標準 SQL のデータ型 | BigQuery | Google Cloud
- 標準 SQL の番号付け関数 | BigQuery | Google Cloud
- 変更データ キャプチャを使用した BigQuery へのデータベース レプリケーション | Cloud アーキテクチャ センター
-
例えばembulkなどでBigQueryに転送していることを思い浮かべてもらえると↩
-
実際のところ、この記事を書くために昨日の記事を書いた↩
-
group by部分はformatを使って手抜きをする方法はあるかもしれない。雑でよければ。https://sucrose.hatenablog.com/entry/2018/10/09/232753↩
SQLでWITH句を使ってテーブルにデータをINSERTせずにqueryを確かめたい。あるいはSQLの復習。
主に手抜きのため。bigqueryのドキュメントの実行例で使われていてなるほどなーと思ったのでメモ。 ついでに色々なJOINの利用例を列挙してみる。
WITH句を使って仮想的なtableと見做す
WITH句自体は概ねどのような処理系でもサポートしているみたい。これを使うのが今回の肝。
- bigquery https://cloud.google.com/bigquery/docs/reference/standard-sql/query-syntax?hl=ja#with_clause
- sqlite https://www.sqlite.org/lang_with.html
- postgresql https://www.postgresql.jp/document/9.0/html/queries-with.html
- mysql https://dev.mysql.com/doc/refman/8.0/en/with.html
例えば、以下はbigqueryでのコードの実行例。データとqueryが全て1つの文に入っているので状況がわかりやすい。
https://cloud.google.com/bigquery/docs/reference/standard-sql/timestamp_functions?hl=ja#extract
WITH Timestamps AS ( SELECT TIMESTAMP("2005-01-03 12:34:56+00") AS timestamp_value UNION ALL SELECT TIMESTAMP("2007-12-31 12:00:00+00") UNION ALL SELECT TIMESTAMP("2009-01-01 12:00:00+00") UNION ALL SELECT TIMESTAMP("2009-12-31 12:00:00+00") UNION ALL SELECT TIMESTAMP("2017-01-02 12:00:00+00") UNION ALL SELECT TIMESTAMP("2017-05-26 12:00:00+00") ) SELECT timestamp_value, EXTRACT(ISOYEAR FROM timestamp_value) AS isoyear, EXTRACT(ISOWEEK FROM timestamp_value) AS isoweek, EXTRACT(YEAR FROM timestamp_value) AS year, EXTRACT(WEEK FROM timestamp_value) AS week FROM Timestamps ORDER BY timestamp_value; -- Results may differ, depending upon the environment and time zone where this query was executed. +-------------------------+---------+---------+------+------+ | timestamp_value | isoyear | isoweek | year | week | +-------------------------+---------+---------+------+------+ | 2005-01-03 12:34:56 UTC | 2005 | 1 | 2005 | 1 | | 2007-12-31 12:00:00 UTC | 2008 | 1 | 2007 | 52 | | 2009-01-01 12:00:00 UTC | 2009 | 1 | 2009 | 0 | | 2009-12-31 12:00:00 UTC | 2009 | 53 | 2009 | 52 | | 2017-01-02 12:00:00 UTC | 2017 | 1 | 2017 | 1 | | 2017-05-26 12:00:00 UTC | 2017 | 21 | 2017 | 21 | +-------------------------+---------+---------+------+------+
(以降は諸々の関係上sqliteでの実行結果になっている)
通常のCTEの使い方
ちなみに普通はどうやって使うかと言えば、RECURSIVE付きで再帰的に問い合わせたりして使う。
WITH RECURSIVE f(n, name) AS ( SELECT 1 as n, 'f' as name UNION ALL SELECT n+1 as n, 'f'|| substr('oooooooooooo', 1, n) as name FROM f LIMIT 10 ) SELECT * from f
n = 1 name = f n = 2 name = fo n = 3 name = foo n = 4 name = fooo n = 5 name = foooo
これらを悪用して仮想的にテーブルがあるかのようにSELECT文を作りqueryの練習をすることができる。
queryの実行例
team,userみたいなテーブルがあるとする。テキトーにjoinを試してみることにしよう。
joinなし
WITH teams AS ( SELECT 1 as id, 'x' as name UNION ALL SELECT 2, 'y' UNION ALL SELECT 3, 'z' ) SELECT id, name FROM teams
普通のquery。withを除けば既存のSELECT文の例に近い。
id = 1 name = x id = 2 name = y id = 3 name = z
INNER JOIN
WITH teams AS ( SELECT 1 as id, 'x' as name UNION ALL SELECT 2, 'y' UNION ALL SELECT 3, 'z' ), users as ( SELECT 10 as id, 'foo' as name, 1 as team_id UNION ALL SELECT 20, 'bar', 1 UNION ALL SELECT 30, 'boo', 3 ) SELECT t.id as team_id, t.name as team_name, u.id as user_id, u.name as user_name FROM teams as t INNER JOIN users as u ON t.id = u.team_id
よくある例。
team_id = 1 team_name = x user_id = 10 user_name = foo team_id = 1 team_name = x user_id = 2 user_name = bar team_id = 3 team_name = z user_id = 3 user_name = boo
teamごとのuser数なんかを数えたい場合。
WITH teams AS ( SELECT 1 as id, 'x' as name UNION ALL SELECT 2, 'y' UNION ALL SELECT 3, 'z' ), users as ( SELECT 10 as id, 'foo' as name, 1 as team_id UNION ALL SELECT 20, 'bar', 1 UNION ALL SELECT 30, 'boo', 3 ) SELECT t.id as team_id, t.name as team_name, count(*) as c FROM teams as t INNER JOIN users as u ON t.id = u.team_id GROUP BY t.id
team_id = 1 team_name = x c = 2 team_id = 3 team_name = z c = 1
LEFT OUTER JOIN
userを持たないteamの分も数えたいのでouter joinを使う。対応する行が存在しない側は値がNULLになるのでcountではなくsumで数える。
WITH teams AS ( SELECT 1 as id, 'x' as name UNION ALL SELECT 2, 'y' UNION ALL SELECT 3, 'z' ), users as ( SELECT 10 as id, 'foo' as name, 1 as team_id UNION ALL SELECT 20, 'bar', 1 UNION ALL SELECT 30, 'boo', 3 ) SELECT t.id as team_id, t.name as team_name, sum(CASE WHEN u.id is NULL then 0 ELSE 1 END) as c FROM teams as t LEFT OUTER JOIN users as u ON t.id = u.team_id GROUP BY t.id
team_id = 1 team_name = x c = 2 team_id = 2 team_name = y c = 0 team_id = 3 team_name = z c = 1
many to many
多対多の関係の場合には中間テーブルが必要になる。
WITH teams AS ( SELECT 1 as id, 'x' as name UNION ALL SELECT 2, 'y' UNION ALL SELECT 3, 'z' ), users as ( SELECT 10 as id, 'foo' as name -- x,yに所属 UNION ALL SELECT 20, 'bar' -- x,zに所属 UNION ALL SELECT 30, 'boo' -- xに所属 ), teams2users as ( SELECT 1 as team_id, 10 as user_id UNION ALL SELECT 1, 20 UNION ALL SELECT 1, 30 UNION ALL SELECT 2, 10 UNION ALL SELECT 3, 20 ) SELECT t.id as team_id, t.name as team_name, u.id as user_id, u.name as user_name FROM teams as t INNER JOIN users as u INNER JOIN teams2users as xref ON t.id = xref.team_id AND xref.user_id = u.id
team_id = 1 team_name = x user_id = 10 user_name = foo team_id = 1 team_name = x user_id = 20 user_name = bar team_id = 1 team_name = x user_id = 30 user_name = boo team_id = 2 team_name = y user_id = 10 user_name = foo team_id = 3 team_name = z user_id = 20 user_name = bar
CROSS JOIN
tableの行数とqueryの行数が乖離したようなsummary的なものを1発で作りたい時。 例えば、teamに対する所属期間のような列があるとして、それを年次で集計したい場合など
そういう実用的な例の前にトリビアルな例を。基本的には直積なので3行のものと3行のものを組み合わせたら9行のものができる。
WITH ids AS ( (SELECT 1 as id UNION ALL SELECT 2 UNION ALL 3) ), ys AS ( (SELECT 'foo' as name UNION ALL SELECT 'bar' UNION ALL 'boo') ) SELECT * FROM ids CROSS JOIN ys
はい。
1 foo 1 bar 1 boo 2 foo 2 bar 2 boo 3 foo 3 bar 3 boo
これを使って以下をやってみる。
例えば、teamに対する所属期間のような列があるとして、それを年次で集計したい場合など
WITH teams AS ( SELECT 1 as id, 'x' as name UNION ALL SELECT 2, 'y' UNION ALL SELECT 3, 'z' ), users as ( SELECT 10 as id, 'foo' as name -- x,yに所属 UNION ALL SELECT 20, 'bar' -- x,zに所属 UNION ALL SELECT 30, 'boo' -- xに所属 ), teams2users as ( SELECT 1 as team_id, 10 as user_id, '2019' as start, '2020' as `end` UNION ALL SELECT 1, 20, '2020', '2020' UNION ALL SELECT 1, 30, '2020', NULL UNION ALL SELECT 2, 10, '2020', NULL UNION ALL SELECT 3, 20, '2021', '2021' ), years as ( SELECT '2019' as year UNION ALL SELECT '2020' UNION ALL SELECT '2021' UNION ALL SELECT '2022' ) SELECT y.year as year, t.name as team_name, sum(CASE WHEN u.id is NULL then 0 ELSE 1 END) as c, group_concat(u.name) as names FROM teams as t INNER JOIN teams2users as xref ON t.id = xref.team_id LEFT OUTER JOIN users as u ON xref.user_id = u.id CROSS JOIN years as y ON xref.start <= y.year AND (xref.`end` IS NULL OR xref.`end` >= y.year) GROUP BY y.year, t.id, t.name
year = 2019 team_name = x c = 1 names = foo year = 2020 team_name = x c = 3 names = foo,bar,boo year = 2020 team_name = y c = 1 names = foo year = 2021 team_name = x c = 1 names = boo year = 2021 team_name = y c = 1 names = foo year = 2021 team_name = z c = 1 names = bar year = 2022 team_name = x c = 1 names = boo year = 2022 team_name = y c = 1 names = foo
misc
DISTINCT, having, ORDER BY, window関数とかは省略。
gist
追記: (bigqueryならではの別解)
後日、TobakuCptlsmさんにtwitterでbigqueryでArrayを使って表現する方法を教えてもらいました。こちらの方が他のプログラミング言語に似たような形式なので直感的かもしれません。
WITH points AS ( select * from UNNEST(ARRAY<STRUCT<id int, p STRUCT<x int64, y int64>>>[ (1, (0, 10)), (2, (0, 10)), (2, (0, 10)) ]) ) SELECT * FROM points
こちらのgist https://gist.github.com/TobCap/83d6d874f40e265b45cdb0c992317e00