Skip to content

Commit 4d470b4

Browse files
committed
Release 5.2 - Welcome LiveRamp 🔪
1 parent 0d8f0d4 commit 4d470b4

File tree

16 files changed

+754
-93
lines changed

16 files changed

+754
-93
lines changed

cloudbuild.sfdc.yaml

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ steps:
1919
# init_deployment_config.py leaves the validated config.json file in workspace/config so it's available for other build steps
2020
- name: gcr.io/kittycorn-public/deploy-kittycorn:v2.0
2121
entrypoint: "bash"
22-
id: 'init_deploy_config'
22+
id: "init_deploy_config"
2323
args:
2424
- "-c"
2525
- |-
@@ -35,7 +35,7 @@ steps:
3535
cat ${_CONFIG_FILE}
3636
echo -e "\n--------------------------------"
3737
38-
- name: 'gcr.io/kittycorn-public/deploy-kittycorn:v2.0'
38+
- name: "gcr.io/kittycorn-public/deploy-kittycorn:v2.0"
3939
entrypoint: /bin/bash
4040
args:
4141
- "-c"
@@ -126,9 +126,9 @@ steps:
126126
fi
127127
128128
# Generate required K9 Processing tables in case they don't exist, so as not to break Reporting.
129-
- name: 'gcr.io/kittycorn-public/deploy-kittycorn:v2.0'
129+
- name: "gcr.io/kittycorn-public/deploy-kittycorn:v2.0"
130130
entrypoint: /bin/bash
131-
id: 'generate_k9_placeholder'
131+
id: "generate_k9_placeholder"
132132
args:
133133
- "-c"
134134
- |-
@@ -154,7 +154,7 @@ steps:
154154
155155
# Generate SFDC Reporting bigquery views / tables via Materializer.
156156
- name: gcr.io/kittycorn-public/deploy-kittycorn:v2.0
157-
id: 'reporting'
157+
id: "reporting"
158158
entrypoint: "bash"
159159
args:
160160
- "-c"
@@ -176,9 +176,11 @@ steps:
176176
--config_file "${_CONFIG_FILE}" \
177177
--materializer_settings_file config/reporting_settings.yaml
178178
179+
python3 src/common/annotations_loader.py --annotations-directory "src/annotations"
180+
179181
logsBucket: "gs://$_GCS_BUCKET"
180182
timeout: 10200s
181183
substitutions:
182-
_CONFIG_FILE: "config/sfdc_config.json"
184+
_CONFIG_FILE: "config/config.json"
183185
options:
184186
substitution_option: "ALLOW_LOOSE"
File renamed without changes.
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# - id: "{{ project_id_tgt }}.{{ sfdc_datasets_reporting }}.MyViewName"
2+
# description: "View description goes here"
3+
# fields:
4+
# - name: "Field_Name"
5+
# description: "Field description goes here"

src/cdc_dag_generator/generate_dags.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
_THIS_DIR = Path(__file__).resolve().parent
3939

4040
# Config file containing various parameters.
41-
_CONFIG_FILE = Path(_THIS_DIR, "../../config/sfdc_config.json")
41+
_CONFIG_FILE = Path(_THIS_DIR, "../../config/config.json")
4242

4343
# Settings file containing tables to be copied from SFDC.
4444
_SETTINGS_FILE = Path(_THIS_DIR, "../../config/ingestion_settings.yaml")

src/cdc_dag_generator/generate_views.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
_THIS_DIR = Path(__file__).resolve().parent
3737

3838
# Config file containing various parameters.
39-
_CONFIG_FILE = Path(_THIS_DIR, "../../config/sfdc_config.json")
39+
_CONFIG_FILE = Path(_THIS_DIR, "../../config/config.json")
4040

4141
# Settings file containing tables to be copied from SFDC.
4242
_SETTINGS_FILE = Path(_THIS_DIR, "../../config/ingestion_settings.yaml")

src/common/annotations_loader.py

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
# Copyright 2023 Google LLC
2+
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
"""
15+
BigQuery Annotations loader
16+
"""
17+
18+
import argparse
19+
from concurrent import futures
20+
import logging
21+
import pathlib
22+
import sys
23+
import typing
24+
import yaml
25+
26+
from google.cloud import bigquery
27+
from google.cloud.exceptions import NotFound
28+
29+
sys.path.append(".")
30+
sys.path.append("./src")
31+
sys.path.append(str(pathlib.Path(__file__).parent))
32+
33+
# pylint:disable=wrong-import-position
34+
from common.py_libs.configs import load_config_file
35+
from common.py_libs.jinja import (apply_jinja_params_dict_to_file,
36+
initialize_jinja_from_config)
37+
38+
_PARALLEL_THREADS = 5
39+
40+
41+
def _load_table_annotations(table_annotations: typing.Dict[str, typing.Any],
42+
client: bigquery.Client):
43+
full_table_id = table_annotations["id"]
44+
table_description = table_annotations["description"]
45+
description_changed = False
46+
schema_changed = False
47+
try:
48+
table = client.get_table(full_table_id)
49+
except NotFound:
50+
logging.info("Table or view `%s` was not found. Skipping it.",
51+
full_table_id)
52+
return
53+
table_description = table.description or table_description
54+
description_changed = table_description != table.description
55+
table.description = table_description
56+
annotation_fields = {
57+
field_item["name"]: field_item["description"]
58+
for field_item in table_annotations["fields"]
59+
}
60+
61+
schema = table.schema.copy()
62+
for index, field in enumerate(schema):
63+
description = field.description or annotation_fields.get(field.name, "")
64+
if description != field.description:
65+
schema_changed = True
66+
field_dict = field.to_api_repr()
67+
field_dict["description"] = description
68+
schema[index] = bigquery.SchemaField.from_api_repr(field_dict)
69+
changes = []
70+
if schema_changed:
71+
table.schema = schema
72+
changes.append("schema")
73+
if description_changed:
74+
changes.append("description")
75+
if len(changes) > 0:
76+
client.update_table(table, changes)
77+
logging.info("Table/view `%s` has been updated.", full_table_id)
78+
else:
79+
logging.info("No changes in `%s`.", full_table_id)
80+
81+
82+
def load_annotations(jinja_dict: typing.Dict[str, typing.Any],
83+
client: bigquery.Client, annotations_file: pathlib.Path):
84+
annotations_yaml = apply_jinja_params_dict_to_file(annotations_file,
85+
jinja_dict)
86+
annotations_dict = yaml.safe_load(annotations_yaml)
87+
if not annotations_dict:
88+
logging.warning("Annotations file `%s` has no parsable content.",
89+
str(annotations_file))
90+
return
91+
logging.info("Loading annotations from `%s`.", str(annotations_file))
92+
93+
threads = []
94+
executor = futures.ThreadPoolExecutor(_PARALLEL_THREADS)
95+
96+
for table_item in annotations_dict:
97+
threads.append(
98+
executor.submit(_load_table_annotations, table_item, client))
99+
futures.wait(threads)
100+
101+
102+
def main(args: typing.Sequence[str]) -> int:
103+
"""BigQuery Annotations loader main"""
104+
105+
parser = argparse.ArgumentParser(description="BigQuery Annotations Loader")
106+
parser.add_argument("--annotations-directory",
107+
help="Annotation files directory",
108+
type=str,
109+
required=True)
110+
parser.add_argument("--debug",
111+
help="Debugging mode.",
112+
action="store_true",
113+
default=False,
114+
required=False)
115+
parser.add_argument("--config",
116+
help="Data Foundation config.json.",
117+
type=str,
118+
required=False,
119+
default="./config/config.json")
120+
options = parser.parse_args(args)
121+
logging.basicConfig(
122+
format="%(asctime)s | %(levelname)s | %(message)s",
123+
level=logging.INFO if not options.debug else logging.DEBUG,
124+
)
125+
126+
logging.info("Cortex Annotations Loader for BigQuery.")
127+
128+
config = load_config_file(options.config)
129+
130+
logging.info("Loading BigQuery Annotations.")
131+
132+
annotations_path = pathlib.Path(options.annotations_directory)
133+
if not annotations_path.exists():
134+
logging.fatal("Directory `%s` doesn't exist.", str(annotations_path))
135+
return 1
136+
137+
client = bigquery.Client(project=config["projectIdSource"],
138+
location=config["location"])
139+
jinja_dict = initialize_jinja_from_config(config)
140+
141+
for annotation_file in annotations_path.iterdir():
142+
load_annotations(jinja_dict, client, annotation_file.absolute())
143+
logging.info("BigQuery Annotations has been loaded!")
144+
145+
return 0
146+
147+
148+
###############################################################
149+
if __name__ == "__main__":
150+
sys.exit(main(sys.argv[1:]))

src/common/materializer/create_bq_object.py

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import json
2727
import logging
2828
import textwrap
29+
import typing
2930
import sys
3031

3132
from pathlib import Path
@@ -36,6 +37,7 @@
3637
from common.py_libs.dag_generator import generate_file_from_template
3738

3839
from google.cloud.exceptions import BadRequest
40+
from google.cloud.exceptions import NotFound
3941
from google.cloud import bigquery
4042

4143
# NOTE: All paths here are relative to the root directory, unless specified
@@ -221,17 +223,19 @@ def _generate_dag_files(module_name: str, target_dataset_type: str,
221223

222224

223225
def _create_view(bq_client: bigquery.Client, view_name: str,
224-
core_sql: str) -> None:
226+
description: typing.Optional[str], core_sql: str) -> None:
225227
"""Creates BQ Reporting view."""
226-
create_view_sql = ("CREATE OR REPLACE VIEW `" + view_name + "` AS (\n" +
228+
create_view_sql = ("CREATE OR REPLACE VIEW `" + view_name + "` " +
229+
f"OPTIONS(description=\"{description or ''}\") AS (\n" +
227230
textwrap.indent(core_sql, " ") + "\n)")
228231
create_view_job = bq_client.query(create_view_sql)
229232
_ = create_view_job.result()
230233
logging.info("Created view '%s'", view_name)
231234

232235

233236
def _create_table(bq_client: bigquery.Client, full_table_name: str,
234-
sql_str: str, table_setting: dict) -> None:
237+
description: typing.Optional[str], sql_str: str,
238+
table_setting: dict) -> None:
235239
"""Creates empty BQ Reporting table."""
236240

237241
# Steps to create table:
@@ -250,8 +254,10 @@ def _create_table(bq_client: bigquery.Client, full_table_name: str,
250254
temp_table_name = full_table_name + "_temp"
251255
bq_client.delete_table(temp_table_name, not_found_ok=True)
252256
logging.info("Creating temporary table '%s'", temp_table_name)
253-
temp_table_sql = ("CREATE TABLE `" + temp_table_name + "` AS (\n" +
254-
" SELECT * FROM (\n" +
257+
temp_table_sql = ("CREATE TABLE `" + temp_table_name + "` " +
258+
"OPTIONS(expiration_timestamp=TIMESTAMP_ADD(" +
259+
"CURRENT_TIMESTAMP(), INTERVAL 12 HOUR))" +
260+
" AS (\n SELECT * FROM (\n" +
255261
textwrap.indent(sql_str, " ") + "\n)\n" +
256262
" WHERE FALSE\n)")
257263
logging.info("temporary table sql = '%s'", temp_table_sql)
@@ -273,16 +279,25 @@ def _create_table(bq_client: bigquery.Client, full_table_name: str,
273279
cluster_details = table_setting.get("cluster_details")
274280
if cluster_details:
275281
table = bq_materializer.add_cluster_to_table_def(table, cluster_details)
282+
# Add optional description
283+
if description:
284+
table.description = description
276285

277286
try:
278287
_ = bq_client.create_table(table)
279288
logging.info("Created table '%s'", full_table_name)
280289
except Exception as e:
281290
raise e
282291
finally:
283-
# Cleanup - remote temporary table
292+
# Cleanup - remove temporary table
284293
bq_client.delete_table(temp_table_name, not_found_ok=True)
285-
294+
try:
295+
bq_client.get_table(temp_table_name)
296+
logging.warning("⚠️ Couldn't delete temporary table `%s`."
297+
"Please delete it manually. ⚠️",
298+
temp_table_name)
299+
except NotFound:
300+
logging.info("Deleted temp table = %s'", temp_table_name)
286301

287302
def _generate_table_refresh_sql(bq_client: bigquery.Client,
288303
full_table_name: str, sql_str: str) -> str:
@@ -335,6 +350,7 @@ def main():
335350
_validate_sql(bq_client, rendered_sql)
336351

337352
object_type = bq_object_setting["type"]
353+
object_description = bq_object_setting.get("description")
338354

339355
if object_type in ["table", "view"]:
340356
object_name = Path(sql_file).stem
@@ -361,7 +377,8 @@ def main():
361377
# Create view or table, based on object type.
362378
if object_type == "view":
363379
try:
364-
_create_view(bq_client, object_name_full, rendered_sql)
380+
_create_view(bq_client, object_name_full, object_description,
381+
rendered_sql)
365382
except BadRequest as e:
366383
if hasattr(e, "query_job") and e.query_job: # type: ignore
367384
query = e.query_job.query # type: ignore
@@ -376,8 +393,8 @@ def main():
376393
try:
377394
table_setting = bq_object_setting["table_setting"]
378395
bq_materializer.validate_table_setting(table_setting)
379-
_create_table(bq_client, object_name_full, rendered_sql,
380-
table_setting)
396+
_create_table(bq_client, object_name_full, object_description,
397+
rendered_sql, table_setting)
381398
except BadRequest as e:
382399
if hasattr(e, "query_job") and e.query_job: # type: ignore
383400
query = e.query_job.query # type: ignore

src/common/materializer/deploy.sh

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ validate_args() {
8888
#- -------------------
8989

9090
set -o errexit -o noclobber -o nounset -o pipefail
91-
params="$(getopt -o l:t:y:m:c:f:h --long gcs_logs_bucket:,gcs_tgt_bucket:,target_type:,module_name:,config_file:,materializer_settings_file:,help --name "$0" -- "$@")"
91+
params="$(getopt -o l:t:y:m:c:f:k:h --long gcs_logs_bucket:,gcs_tgt_bucket:,target_type:,module_name:,config_file:,materializer_settings_file:,k9_manifest:,help --name "$0" -- "$@")"
9292
eval set -- "$params"
9393

9494
while true; do
@@ -117,6 +117,10 @@ while true; do
117117
MATERIALIZER_SETTINGS_FILE=$2
118118
shift 2
119119
;;
120+
-k | --k9_manifest)
121+
K9_MANIFEST_FILE=$2
122+
shift 2
123+
;;
120124
-h | --help)
121125
usage
122126
exit
@@ -145,6 +149,7 @@ echo " GCS_TGT_BUCKET = '${GCS_TGT_BUCKET}'"
145149
echo " MODULE_NAME = '${MODULE_NAME}'"
146150
echo " TGT_DATASET_TYPE = '${TGT_DATASET_TYPE}'"
147151
echo " CONFIG_FILE = '${CONFIG_FILE}'"
152+
echo " K9_MANIFEST_FILE = '${K9_MANIFEST_FILE}'"
148153
echo " MATERIALIZER_SETTINGS_FILE = '${MATERIALIZER_SETTINGS_FILE}'"
149154

150155
# Set various directories that help navigate to various things.
@@ -159,15 +164,16 @@ python3 "$THIS_DIR"/generate_build_files.py \
159164
--module_name "${MODULE_NAME}" \
160165
--config_file "${CONFIG_FILE}" \
161166
--target_dataset_type "${TGT_DATASET_TYPE}" \
162-
--materializer_settings_file "${MATERIALIZER_SETTINGS_FILE}"
167+
--materializer_settings_file "${MATERIALIZER_SETTINGS_FILE}" \
168+
--k9_manifest_file "${K9_MANIFEST_FILE}"
163169

164170
echo "Build files generated successfully."
165171

166172
# We may have one or more build files. Let's run all of them.
167173
set +e
168174
failure=0
169175
echo "Executing generated gcloud build files...."
170-
for build_file_name in "${GENERATED_FILES_PARENT_DIR}"/"${MODULE_NAME}"/cloudbuild.materializer.create_bq_objects.*.yaml; do
176+
for build_file_name in "${GENERATED_FILES_PARENT_DIR}"/"${MODULE_NAME}"/cloudbuild.materializer.*.yaml; do
171177
[[ -e "$build_file_name" ]] || break
172178
echo -e "gcloud builds submit . --config=\"${build_file_name}\" --substitutions=_GCS_LOGS_BUCKET=\"${GCS_LOGS_BUCKET}\",_GCS_TGT_BUCKET=\"${GCS_TGT_BUCKET}\" "
173179
gcloud builds submit . --config="${build_file_name}" --substitutions=_GCS_LOGS_BUCKET="${GCS_LOGS_BUCKET}",_GCS_TGT_BUCKET="${GCS_TGT_BUCKET}"

0 commit comments

Comments
 (0)