BigQueryでのSTRUCT型を伴うSELECT文での重複行の削除についてのメモ

まず、前提として、BigQUeryにはunique constraint的な機能が存在しない。なので、あれこれと頑張ってユニーク性を保つ作業をしたくなることがある。テキトーに検索してみてもそういう話のstack overflowの質問やそれへの回答のようなものは見つかる。

unique constraintが存在しないだとか、重複除去したテーブル(的なもの)を如何にして作るかなどは今回言及したいことではないので諸々省略することにする。

ユニーク性を保証したくなる時

BigQueryは追記ベースで考えたほうが良い。そういうアーキテクチャであることは分かっている。まぁそれは置いておいて、ユニーク性を保証したくなる事がある。例えば、アプリのデーターベースのバックアップを取っているときに、同一の行をBigQueryに転送してしまうことがある1

利便性やバックアップのために、複数の世代のデータをBigQueryに保持しておくということはよくあることだと思う。ここで、最新の断面だけが欲しい場合に、以下の様な形でテーブルfooに対するfoo_latest的なviewなどを設定しておくことで、分析などで利用するときに最新断面とのjoinをやりやすくというような設定はされているとする。

-- こんなに陽に書くことはなく、viewか何かになっていることが多い
WITH indexTable AS (
  SELECT DISTINCT
     id,
     MAX(_updatedAt) as _updatedAt
  FROM
    <database name>
  GROUP BY
    id
)
SELECT DISTINCT
  o.*
FROM
  <database name> as o
  INNER JOIN indexTable as oo ON o.id == oo.id AND o._updatedAt = oo._updatedAt

一方で、重複すること自体は防げず同じようなデータが登録されている状態になりうる。

viewを作ることに限らず、ユニーク性を担保する方法は色々あるが、基本的には重複行を排除した結果を別のテーブルなどに保存する(materialized viewも含む)という形なのではないかと思う。そのテーブルを作るためのSELECT文さえ作ってしまえばあとは何とでもなる。

このときの重複行を排除したSELECT文をどうやって作るか?ということに関するモヤモヤが今回の主題。

前提確認

まずは、実際のデータを使って状況を整理してみることにする。ここで、昨日の記事が役に立つ2。一切実データを使うことなくqueryのみで状況を確認していきたい。

とりあえず、usersテーブル的なものが存在すると仮定する。これを対象に色々やっていきたい。

アプリ上での最新断面の状態

とりあえずfooさんbarさん2人のユーザーが居るくらいの雑なデータセット

CREATE TEMP FUNCTION Now0()
  RETURNS TIMESTAMP
  AS (TIMESTAMP_SECONDS(1628038568));

WITH users AS (
    SELECT 1 as id, "foo" as name, Now0() as _updatedAt UNION ALL
    SELECT 2, "bar", Now0()
)
SELECT
  *
FROM
  users  

この時のデータの出力は以下のようなもの。

[
  {
    "id": "1",
    "name": "foo",
    "_updatedAt": "2021-08-04 00:56:08 UTC"
  },
  {
    "id": "2",
    "name": "bar",
    "_updatedAt": "2021-08-04 00:56:08 UTC"
  }
]

これが、アプリ側から見える世界。

複数世代に跨ったデータをBigQuery側は保持

さて、先程話したとおり、BigQueryには複数世代のデータが入っているとする。 そんなわけで過去のデータも持っていることにする。_updatedAtが異なる同一idのデータも保持することにする。

CREATE TEMP FUNCTION Now0()
  RETURNS TIMESTAMP
  AS (TIMESTAMP_SECONDS(1628038568));

CREATE TEMP FUNCTION Now1()
  RETURNS TIMESTAMP
  AS (TIMESTAMP_SUB(Now0(), INTERVAL 1 DAY));


WITH users AS (
    SELECT 1 as id, "foo" as name, Now0() as _updatedAt UNION ALL
    SELECT 2, "bar", Now0() UNION ALL
    -- これが過去のもの
    SELECT 1, "fo", Now1()
)
SELECT
  *
FROM
  users  

id=1の行が2つある

[
  {
    "id": "1",
    "name": "foo",
    "_updatedAt": "2021-08-04 00:56:08 UTC"
  },
  {
    "id": "2",
    "name": "bar",
    "_updatedAt": "2021-08-04 00:56:08 UTC"
  },
  {
    "id": "1",
    "name": "fo",
    "_updatedAt": "2021-08-03 00:56:08 UTC"
  }
]

これをviewなどを作ることで最新断面だけを見るということはできる様になっている。これも模倣してみる。

CREATE TEMP FUNCTION Now0()
  RETURNS TIMESTAMP
  AS (TIMESTAMP_SECONDS(1628038568));

CREATE TEMP FUNCTION Now1()
  RETURNS TIMESTAMP
  AS (TIMESTAMP_SUB(Now0(), INTERVAL 1 DAY));


WITH users AS (
  SELECT 1 as id, "foo" as name, Now0() as _updatedAt UNION ALL
  SELECT 2, "bar", Now0() UNION ALL
  SELECT 1, "fo", Now1()
),
users_latest AS ( -- 実際はviewなど
  SELECT
   x.* 
  FROM users as x
  INNER JOIN (SELECT DISTINCT o.id, MAX(o._updatedAt) as _updatedAt FROM users as o GROUP BY id) as y ON x.id = y.id AND x._updatedAt = y._updatedAt
)
SELECT
  *
FROM
  users_latest

一見したところ良い感じ。

[
  {
    "id": "1",
    "name": "foo",
    "_updatedAt": "2021-08-04 00:56:08 UTC"
  },
  {
    "id": "2",
    "name": "bar",
    "_updatedAt": "2021-08-04 00:56:08 UTC"
  }
]

最新断面だけが参照可能な世界。

本当に同じデータが重複行として含まれることがある

ところで、先ほどのview(的なもの)は、_updatedAtのMAXで見ていたので、本当に同じデータが含まれていた場合には二重に出力されることになる。

CREATE TEMP FUNCTION Now0()
  RETURNS TIMESTAMP
  AS (TIMESTAMP_SECONDS(1628038568));

CREATE TEMP FUNCTION Now1()
  RETURNS TIMESTAMP
  AS (TIMESTAMP_SUB(Now0(), INTERVAL 1 DAY));

WITH users AS (
  SELECT 1 as id, "foo" as name, Now0() as _updatedAt UNION ALL
-- 完全に同じ
  SELECT 1 as id, "foo" as name, Now0() as _updatedAt UNION ALL
  SELECT 2, "bar", Now0() UNION ALL
  SELECT 1, "fo", Now1()
),
users_latest AS (
  SELECT
   x.* 
  FROM users as x
  INNER JOIN (SELECT DISTINCT o.id, MAX(o._updatedAt) as _updatedAt FROM users as o GROUP BY id) as y ON x.id = y.id AND x._updatedAt = y._updatedAt
)
SELECT
  *
FROM
  users_latest

MAXで絞り込んたものとのjoinなので当然。

[
  {
    "id": "1",
    "name": "foo",
    "_updatedAt": "2021-08-04 00:56:08 UTC"
  },
  {
    "id": "1",
    "name": "foo",
    "_updatedAt": "2021-08-04 00:56:08 UTC"
  },
  {
    "id": "2",
    "name": "bar",
    "_updatedAt": "2021-08-04 00:56:08 UTC"
  }
]

対応方法はSELECT DISTINCTで絞ったりなどで。

CREATE TEMP FUNCTION Now0()
  RETURNS TIMESTAMP
  AS (TIMESTAMP_SECONDS(1628038568));

CREATE TEMP FUNCTION Now1()
  RETURNS TIMESTAMP
  AS (TIMESTAMP_SUB(Now0(), INTERVAL 1 DAY));

WITH users AS (
  SELECT 1 as id, "foo" as name, Now0() as _updatedAt UNION ALL
  SELECT 1 as id, "foo" as name, Now0() as _updatedAt UNION ALL
  SELECT 2, "bar", Now0() UNION ALL
  SELECT 1, "fo", Now1()
),
users_latest AS (
  SELECT DISTINCT -- distinct
   x.* 
  FROM users as x
  INNER JOIN (SELECT DISTINCT o.id, MAX(o._updatedAt) as _updatedAt FROM users as o GROUP BY id) as y ON x.id = y.id AND x._updatedAt = y._updatedAt
)
SELECT
  *
FROM
  users_latest

はい。

[
  {
    "id": "1",
    "name": "foo",
    "_updatedAt": "2021-08-04 00:56:08 UTC"
  },
  {
    "id": "2",
    "name": "bar",
    "_updatedAt": "2021-08-04 00:56:08 UTC"
  }
]

これでめでたしめでたしと思いきや、ようやく本題に入れる。

本題

ようやく、本題。

STRUCT型はdistinctができない

重複除去のためのdistinctがSTRUCT型を使った場合には効かない。というかエラーになってしまう。これがだるい。

意気揚々と以下のようなqueryを実行するとエラーになってしまう。悲しい。テーブルpointsのpがSTRUCT型。

WITH points AS (
    SELECT 1 as id, STRUCT<x int64, y int64>(10, 10) as p UNION ALL
    SELECT 2, STRUCT(0 as x, 10 as y) UNION ALL
    SELECT 2, STRUCT(0 as x, 10 as y)
)
SELECT DISTINCT * FROM points

こういうエラー。

Column p of type STRUCT cannot be used in SELECT DISTINCT at [6:18]

なるほど。仕方がないのでGROUP BYで逃げる。これもだめ。結局フィールドごとに明示的に指定しなければいけないらしい。

WITH points AS (
    SELECT 1 as id, STRUCT<x int64, y int64>(10, 10) as p UNION ALL
    SELECT 2, STRUCT(0 as x, 10 as y) UNION ALL
    SELECT 2, STRUCT(0 as x, 10 as y)
)
SELECT * FROM points as t
GROUP BY
  t.id, t.p  -- Grouping by expressions of type STRUCT is not allowed at [8:9]

明示的に指定する必要があるのは良いのだけれど、指定した瞬間にflattenされてしまってだるい。

WITH points AS (
    SELECT 1 as id, STRUCT<x int64, y int64>(10, 10) as p UNION ALL
    SELECT 2, STRUCT(0 as x, 10 as y) UNION ALL
    SELECT 2, STRUCT(0 as x, 10 as y)
)
SELECT
  t.id,
--  t.p, -- SELECT list expression references t.p which is neither grouped nor aggregated at [8:3]
  t.p.x,
  t.p.y
FROM points as t
GROUP BY
  t.id, t.p.x, t.p.y

そんなわけで同じ形状を保とうと思ったらもう一度作り直す必要がある3

WITH points AS (
    SELECT 1 as id, STRUCT<x int64, y int64>(10, 10) as p UNION ALL
    SELECT 2, STRUCT(0 as x, 10 as y) UNION ALL
    SELECT 2, STRUCT(0 as x, 10 as y)
)
SELECT
  t.id,
  STRUCT(t.p.x as x, t.p.y as y) as p
FROM points as t
GROUP BY
  t.id, t.p.x, t.p.y

大変だった。structも自由に利用できる任意のものについての最新断面の提供ということを考えると、SQL文のコピペでは対応できずに頑張って生成するしかないのかもしれない。一方でなんだかそれが必要になるというのもオーバーワーク気味なんじゃ?という気がしている。そういうモヤモヤ。

[
  {
    "id": "1",
    "p": {
      "x": "10",
      "y": "10"
    }
  },
  {
    "id": "2",
    "p": {
      "x": "0",
      "y": "10"
    }
  }
]

ただ、考えてみると、元となるソースがdocument指向のDBではなくRDBMSなどであった場合はそもそも初めからflattenなのであまり問題にならないのかもしれない?

まとめ

  • BigQueryで最新断面とのjoinがしたくなる。
  • STRUCT型ではDISTINCTが効かない。だるい。

gist

追記:

後日、TobakuCptlsmさんにtwitterでROW_NUMBER()と組み合わせる方法を教えてもらいました。実はROW_NUMBER()の存在自体は知っていたのですが、行番号に依存したコードはあまりSQL的には嬉しくないんじゃないか?と思ったりなどして記事に含めるのを辞めていたのでした

その後、色々bigqueryのドキュメントを読んでいくうちに、CDC(Change Data Capture)を利用したデータの同期のドキュメントでも同様の説明があったことから利用しても良いかもしれないと思うようになりました。

すこし形は違いますが以下の様な例の部分です。

bq mk --view \
"SELECT * EXCEPT(change_type, row_num)
FROM (
  SELECT *, ROW_NUMBER() OVER (PARTITION BY id ORDER BY change_id DESC) AS row_num
  FROM (
    SELECT * EXCEPT(change_type), change_type
    FROM \`$(gcloud config get-value project).cdc_tutorial.session_delta\` UNION ALL
    SELECT *, 'I'
    FROM \`$(gcloud config get-value project).cdc_tutorial.session_main\`))
WHERE
  row_num = 1
  AND change_type <> 'D'" \
 cdc_tutorial.session_latest_v

あとは、viewではなくmaterialized viewないしは別テーブルへのqueryになっていればROW_NUMBER()を使う形でも悪くないかなと思うようになりました。

参考


  1. 例えばembulkなどでBigQueryに転送していることを思い浮かべてもらえると

  2. 実際のところ、この記事を書くために昨日の記事を書いた

  3. group by部分はformatを使って手抜きをする方法はあるかもしれない。雑でよければ。https://sucrose.hatenablog.com/entry/2018/10/09/232753

SQLでWITH句を使ってテーブルにデータをINSERTせずにqueryを確かめたい。あるいはSQLの復習。

主に手抜きのため。bigqueryのドキュメントの実行例で使われていてなるほどなーと思ったのでメモ。 ついでに色々なJOINの利用例を列挙してみる。

WITH句を使って仮想的なtableと見做す

WITH句自体は概ねどのような処理系でもサポートしているみたい。これを使うのが今回の肝。

例えば、以下はbigqueryでのコードの実行例。データとqueryが全て1つの文に入っているので状況がわかりやすい。

https://cloud.google.com/bigquery/docs/reference/standard-sql/timestamp_functions?hl=ja#extract

WITH Timestamps AS (
  SELECT TIMESTAMP("2005-01-03 12:34:56+00") AS timestamp_value UNION ALL
  SELECT TIMESTAMP("2007-12-31 12:00:00+00") UNION ALL
  SELECT TIMESTAMP("2009-01-01 12:00:00+00") UNION ALL
  SELECT TIMESTAMP("2009-12-31 12:00:00+00") UNION ALL
  SELECT TIMESTAMP("2017-01-02 12:00:00+00") UNION ALL
  SELECT TIMESTAMP("2017-05-26 12:00:00+00")
)
SELECT
  timestamp_value,
  EXTRACT(ISOYEAR FROM timestamp_value) AS isoyear,
  EXTRACT(ISOWEEK FROM timestamp_value) AS isoweek,
  EXTRACT(YEAR FROM timestamp_value) AS year,
  EXTRACT(WEEK FROM timestamp_value) AS week
FROM Timestamps
ORDER BY timestamp_value;

-- Results may differ, depending upon the environment and time zone where this query was executed.
+-------------------------+---------+---------+------+------+
| timestamp_value         | isoyear | isoweek | year | week |
+-------------------------+---------+---------+------+------+
| 2005-01-03 12:34:56 UTC | 2005    | 1       | 2005 | 1    |
| 2007-12-31 12:00:00 UTC | 2008    | 1       | 2007 | 52   |
| 2009-01-01 12:00:00 UTC | 2009    | 1       | 2009 | 0    |
| 2009-12-31 12:00:00 UTC | 2009    | 53      | 2009 | 52   |
| 2017-01-02 12:00:00 UTC | 2017    | 1       | 2017 | 1    |
| 2017-05-26 12:00:00 UTC | 2017    | 21      | 2017 | 21   |
+-------------------------+---------+---------+------+------+

(以降は諸々の関係上sqliteでの実行結果になっている)

通常のCTEの使い方

ちなみに普通はどうやって使うかと言えば、RECURSIVE付きで再帰的に問い合わせたりして使う。

WITH RECURSIVE f(n, name) AS (
  SELECT 1 as n, 'f' as name
  UNION ALL SELECT n+1 as n, 'f'|| substr('oooooooooooo', 1, n) as name FROM f LIMIT 10
) SELECT * from f

基底状態とUNION ALLで再帰したqueryを作る

    n = 1
 name = f

    n = 2
 name = fo

    n = 3
 name = foo

    n = 4
 name = fooo

    n = 5
 name = foooo

これらを悪用して仮想的にテーブルがあるかのようにSELECT文を作りqueryの練習をすることができる。

queryの実行例

team,userみたいなテーブルがあるとする。テキトーにjoinを試してみることにしよう。

joinなし

WITH teams AS (
  SELECT 1 as id, 'x' as name
  UNION ALL SELECT 2, 'y'
  UNION ALL SELECT 3, 'z'
)
SELECT
  id,
  name
FROM
 teams

普通のquery。withを除けば既存のSELECT文の例に近い。

   id = 1
 name = x

   id = 2
 name = y

   id = 3
 name = z

INNER JOIN

WITH teams AS (
  SELECT 1 as id, 'x' as name
  UNION ALL SELECT 2, 'y'
  UNION ALL SELECT 3, 'z'
),
users as (
  SELECT 10 as id, 'foo' as name, 1 as team_id
  UNION ALL SELECT 20, 'bar', 1
  UNION ALL SELECT 30, 'boo', 3
)
SELECT
  t.id as team_id,
  t.name as team_name,
  u.id as user_id,
  u.name as user_name
FROM
  teams as t
  INNER JOIN users as u ON t.id = u.team_id

よくある例。

  team_id = 1
team_name = x
  user_id = 10
user_name = foo

  team_id = 1
team_name = x
  user_id = 2
user_name = bar

  team_id = 3
team_name = z
  user_id = 3
user_name = boo

teamごとのuser数なんかを数えたい場合。

WITH teams AS (
  SELECT 1 as id, 'x' as name
  UNION ALL SELECT 2, 'y'
  UNION ALL SELECT 3, 'z'
),
users as (
  SELECT 10 as id, 'foo' as name, 1 as team_id
  UNION ALL SELECT 20, 'bar', 1
  UNION ALL SELECT 30, 'boo', 3
)
SELECT
  t.id as team_id,
  t.name as team_name,
  count(*) as c
FROM
  teams as t
  INNER JOIN users as u ON t.id = u.team_id
GROUP BY
  t.id
  team_id = 1
team_name = x
        c = 2

  team_id = 3
team_name = z
        c = 1

LEFT OUTER JOIN

userを持たないteamの分も数えたいのでouter joinを使う。対応する行が存在しない側は値がNULLになるのでcountではなくsumで数える。

WITH teams AS (
  SELECT 1 as id, 'x' as name
  UNION ALL SELECT 2, 'y'
  UNION ALL SELECT 3, 'z'
),
users as (
  SELECT 10 as id, 'foo' as name, 1 as team_id
  UNION ALL SELECT 20, 'bar', 1
  UNION ALL SELECT 30, 'boo', 3
)
SELECT
  t.id as team_id,
  t.name as team_name,
  sum(CASE WHEN u.id is NULL then 0 ELSE 1 END) as c
FROM
  teams as t
  LEFT OUTER JOIN users as u ON t.id = u.team_id
GROUP BY
  t.id
  team_id = 1
team_name = x
        c = 2

  team_id = 2
team_name = y
        c = 0

  team_id = 3
team_name = z
        c = 1

many to many

多対多の関係の場合には中間テーブルが必要になる。

WITH teams AS (
  SELECT 1 as id, 'x' as name
  UNION ALL SELECT 2, 'y'
  UNION ALL SELECT 3, 'z'
),
users as (
  SELECT 10 as id, 'foo' as name -- x,yに所属
  UNION ALL SELECT 20, 'bar' -- x,zに所属
  UNION ALL SELECT 30, 'boo' -- xに所属
),
teams2users as (
  SELECT 1 as team_id, 10 as user_id
  UNION ALL SELECT 1, 20
  UNION ALL SELECT 1, 30
  UNION ALL SELECT 2, 10
  UNION ALL SELECT 3, 20
)
SELECT
  t.id as team_id,
  t.name as team_name,
  u.id as user_id,
  u.name as user_name
FROM
  teams as t
  INNER JOIN users as u
  INNER JOIN teams2users as xref ON t.id = xref.team_id AND xref.user_id = u.id
  team_id = 1
team_name = x
  user_id = 10
user_name = foo

  team_id = 1
team_name = x
  user_id = 20
user_name = bar

  team_id = 1
team_name = x
  user_id = 30
user_name = boo

  team_id = 2
team_name = y
  user_id = 10
user_name = foo

  team_id = 3
team_name = z
  user_id = 20
user_name = bar

CROSS JOIN

tableの行数とqueryの行数が乖離したようなsummary的なものを1発で作りたい時。 例えば、teamに対する所属期間のような列があるとして、それを年次で集計したい場合など

そういう実用的な例の前にトリビアルな例を。基本的には直積なので3行のものと3行のものを組み合わせたら9行のものができる。

WITH ids AS (
  (SELECT 1 as id UNION ALL SELECT 2 UNION ALL 3)
), ys AS (
  (SELECT 'foo' as name UNION ALL SELECT 'bar' UNION ALL 'boo')
)
SELECT * FROM ids CROSS JOIN ys

はい。

1           foo       
1           bar       
1           boo       
2           foo       
2           bar       
2           boo       
3           foo       
3           bar       
3           boo       

これを使って以下をやってみる。

例えば、teamに対する所属期間のような列があるとして、それを年次で集計したい場合など

WITH teams AS (
  SELECT 1 as id, 'x' as name
  UNION ALL SELECT 2, 'y'
  UNION ALL SELECT 3, 'z'
),
users as (
  SELECT 10 as id, 'foo' as name -- x,yに所属
  UNION ALL SELECT 20, 'bar' -- x,zに所属
  UNION ALL SELECT 30, 'boo' -- xに所属
),
teams2users as (
  SELECT 1 as team_id, 10 as user_id, '2019' as start, '2020' as `end`
  UNION ALL SELECT 1, 20, '2020', '2020'
  UNION ALL SELECT 1, 30, '2020', NULL
  UNION ALL SELECT 2, 10, '2020', NULL
  UNION ALL SELECT 3, 20,  '2021', '2021'
),
years as (
  SELECT '2019' as year
  UNION ALL SELECT '2020'
  UNION ALL SELECT '2021'
  UNION ALL SELECT '2022'
)
SELECT
  y.year as year,
  t.name as team_name,
  sum(CASE WHEN u.id is NULL then 0 ELSE 1 END) as c,
  group_concat(u.name) as names
FROM
  teams as t
  INNER JOIN teams2users as xref ON t.id = xref.team_id
  LEFT OUTER JOIN users as u ON xref.user_id = u.id
  CROSS JOIN years as y ON xref.start <= y.year AND (xref.`end` IS NULL OR xref.`end` >= y.year)
GROUP BY
  y.year, t.id, t.name
     year = 2019
team_name = x
        c = 1
    names = foo

     year = 2020
team_name = x
        c = 3
    names = foo,bar,boo

     year = 2020
team_name = y
        c = 1
    names = foo

     year = 2021
team_name = x
        c = 1
    names = boo

     year = 2021
team_name = y
        c = 1
    names = foo

     year = 2021
team_name = z
        c = 1
    names = bar

     year = 2022
team_name = x
        c = 1
    names = boo

     year = 2022
team_name = y
        c = 1
    names = foo

misc

DISTINCT, having, ORDER BY, window関数とかは省略。

gist

追記: (bigqueryならではの別解)

後日、TobakuCptlsmさんにtwitterでbigqueryでArrayを使って表現する方法を教えてもらいました。こちらの方が他のプログラミング言語に似たような形式なので直感的かもしれません。

WITH points AS (
    select * from UNNEST(ARRAY<STRUCT<id int, p STRUCT<x int64, y int64>>>[
        (1, (0, 10)),
        (2, (0, 10)),
        (2, (0, 10))
    ])
)
SELECT * FROM points

こちらのgist https://gist.github.com/TobCap/83d6d874f40e265b45cdb0c992317e00