A data engineer, while designing a Pandas UDF to process financial time-series data with complex calculations that require maintaining state across rows within each stock symbol group, must ensure the function is efficient and scalable. Which approach will solve the problem with minimum overhead while preserving data integrity?
Use a scalar_iter Pandas UDF with iterator-based processing, implementing state management through persistent storage (Delta tables) that gets updated after each batch to maintain continuity across iterator chunks.
Use a scalar Pandas UDF that processes the entire dataset at once, implementing custom partitioning logic within the UDF to group by stock symbol and maintain state using global variables shared across all executor processes.
Use applyInPandas on a Spark DataFrame so that each stock symbol group is received as a pandas DataFrame, allowing processing within each group while maintaining state variables local to each group’s processing function.
Use a grouped-aggregate Pandas UDF that processes each stock symbol group independently, maintaining state through intermediate aggregation results that get passed between successive UDF calls via broadcast variables.
applyInPandas is the documented grouped Pandas API for processing each group as a pandas DataFrame. Spark passes all columns for each group together, which allows per-group state to be maintained naturally inside the function. By contrast, scalar Pandas UDFs are batch-oriented Series-to-Series operations, not group-state processing tools. ( Apache Spark )
This is why option C is the intended best answer among the listed choices. Option A adds unnecessary external persistence overhead, option B relies on unsupported global executor state, and option D misuses grouped aggregation semantics for row-by-row stateful logic. Spark also documents applyInPandas specifically as a grouped operation, while scalar Pandas UDFs process row batches and concatenate results rather than preserving grouped state semantics. ( Apache Spark )
======
QUESTION NO: 13
To identify the top users consuming compute resources, a data engineering team needs to monitor usage within their Databricks workspace for better resource utilization and cost control. The team decided to use Databricks system tables, available under the system catalog in Unity Catalog, to gain detailed visibility into workspace activity. Which SQL query should the team run from the system catalog to achieve this?
A.
SELECT
sku_name,
identity_metadata.created_by AS user_email,
SUM(usage_quantity * usage_unit) AS total_dbus
FROM system.billing.usage
GROUP BY user_email, sku_name
ORDER BY total_dbus DESC
LIMIT 10
B.
SELECT
sku_name,
identity_metadata.created_by AS user_email,
COUNT(usage_quantity) AS total_dbus
FROM system.billing.usage
GROUP BY user_email, sku_name
ORDER BY total_dbus DESC
LIMIT 10
C.
SELECT
identity_metadata.run_as AS user_email,
SUM(usage_quantity) AS total_dbus
FROM system.billing.usage
GROUP BY user_email
ORDER BY total_dbus DESC
LIMIT 10
D.
SELECT
sku_name,
usage_metadata.run_name AS user_email,
SUM(usage_quantity) AS total_dbus
FROM system.billing.usage
GROUP BY user_email, sku_name
ORDER BY total_dbus DESC
LIMIT 10
Answer: C
Databricks documents system.billing.usage as the correct system table for billable usage analysis, and it documents identity_metadata.run_as as the field that records who ran supported workloads such as jobs, notebooks, and Lakeflow Spark Declarative Pipelines. For “top users consuming compute resources,” summing usage_quantity by identity_metadata.run_as is the correct conceptual approach. ( Databricks Documentation )
The other options are not aligned with the documented schema or metric usage. identity_metadata.created_by is not the general compute-consumer identity field for jobs and notebook workloads; it applies to specific products such as Databricks Apps and certain agent workloads. usage_quantity should be summed, not counted, and usage_unit is not something you multiply into DBUs in the way shown. usage_metadata.run_name is not the documented user identity field for this purpose. As written, option C is the only option that matches the official identity model for user-attributed compute consumption. ( Databricks Documentation )
======
QUESTION NO: 15
Which approach demonstrates a modular and testable way to use DataFrame transform for ETL code in PySpark?
A.
def transform_data(input_df):
# transformation logic here
return output_df
test_input = spark.createDataFrame([(1, " a " )], [ " id " , " value " ])
assertDataFrameEqual(transform_data(test_input), expected)
B.
def upper_value(df):
return df.withColumn( " value_upper " , upper(col( " value " )))
def filter_positive(df):
return df.filter(df[ " id " ] > 0)
pipeline_df = df.transform(upper_value).transform(filter_positive)
C.
class Pipeline:
def transform(self, df):
return df.withColumn( " value_upper " , upper(col( " value " )))
pipeline = Pipeline()
assertDataFrameEqual(pipeline.transform(test_input), expected)
D.
def upper_transform(df):
return df.withColumn( " value_upper " , upper(col( " value " )))
actual = test_input.transform(upper_transform)
assertDataFrameEqual(actual, expected)
Answer: B
Apache Spark documents DataFrame.transform(func, *args, **kwargs) as concise syntax for chaining custom transformations, where the function takes a DataFrame and returns a DataFrame. The official example explicitly shows chained transforms, which makes option B the most modular and idiomatic ETL design among the choices. ( Apache Spark )
Option D shows a valid single transform and test, but it does not demonstrate the modular pipeline composition aspect as clearly as B. Option A does not actually use DataFrame.transform , and option C wraps logic in a class method but does not demonstrate the documented chaining pattern that transform is designed for. ( Apache Spark )
======
QUESTION NO: 19
A data engineer is configuring a Databricks Asset Bundle to deploy a job with granular permissions. The requirements are:
Grant the data-engineers group CAN_MANAGE access to the job.
Ensure the auditors group can view the job but not modify or run it.
Avoid granting unintended permissions to other users or groups.
How should the data engineer deploy the job while meeting the requirements?
A.
resources:
jobs:
my-job:
name: data-pipeline
tasks: (...)
job_clusters: (...)
permissions:
- group_name: data-engineers
level: CAN_MANAGE
- group_name: auditors
level: CAN_VIEW
B.
resources:
jobs:
my-job:
name: data-pipeline
tasks: (...)
job_clusters: (...)
permissions:
- group_name: data-engineers
level: CAN_MANAGE
- group_name: auditors
level: CAN_VIEW
C.
resources:
jobs:
my-job:
name: data-pipeline
tasks: [...]
job_clusters: [...]
permissions:
- group_name: data-engineers
level: CAN_MANAGE
permissions:
- group_name: auditors
level: CAN_VIEW
D.
permissions:
- group_name: data-engineers
level: CAN_MANAGE
- group_name: auditors
level: CAN_VIEW
resources:
jobs:
my-job:
name: data-pipeline
tasks: [...]
job_clusters: [...]
Answer: B
Databricks documents that resource-specific permissions for bundle resources can be defined under the resource itself, such as resources.jobs. < job > .permissions , using group_name and level . The documented syntax supports CAN_VIEW , CAN_MANAGE , and related permission levels, which matches option B. ( Databricks Documentation )
Option C is invalid because it repeats the permissions key incorrectly. Option D applies top-level permissions more broadly across bundle resources instead of scoping them specifically to the job, which does not best satisfy the “avoid unintended permissions” requirement. Option B is therefore the correct and properly scoped configuration. ( Databricks Documentation )
======
QUESTION NO: 21
A data engineering team uses Databricks Lakehouse Monitoring to track the percent_null metric for a critical column in their Delta table. The profile metrics table ( prod_catalog.prod_schema.customer_data_profile_metrics ) stores hourly percent_null values. The team wants to trigger an alert when the daily average of percent_null exceeds 5% for three consecutive days, while ensuring notifications are not spammed during sustained issues. Which SQL alert configuration achieves this goal while minimizing false positives and redundant notifications?
A.
WITH daily_avg AS (
SELECT
DATE_TRUNC( ' DAY ' , window.end) AS day,
AVG(percent_null) AS avg_null
FROM prod_catalog.prod_schema.customer_data_profile_metrics
GROUP BY DATE_TRUNC( ' DAY ' , window.end)
)
SELECT day, avg_null
FROM daily_avg
ORDER BY day DESC
LIMIT 3
Alert Condition: ALL avg_null > 5 for the latest 3 rows
Notification Frequency: Just once
B.
SELECT percent_null
FROM prod_catalog.prod_schema.customer_data_profile_metrics
WHERE window.end > = CURRENT_TIMESTAMP - INTERVAL ' 1 ' DAY
Alert Condition: percent_null > 5
Notification Frequency: At most every 24 hours
C.
SELECT AVG(percent_null) AS daily_avg
FROM prod_catalog.prod_schema.customer_data_profile_metrics
WHERE window.end > = CURRENT_TIMESTAMP - INTERVAL ' 3 ' DAY
Alert Condition: daily_avg > 5
Notification Frequency: Each time alert is evaluated
D.
SELECT SUM(CASE WHEN percent_null > 5 THEN 1 ELSE 0 END) AS violation_days
FROM prod_catalog.prod_schema.customer_data_profile_metrics
WHERE window.end > = CURRENT_TIMESTAMP - INTERVAL ' 3 ' DAY
Alert Condition: violation_days > = 3
Notification Frequency: Just once
Answer: A
Databricks SQL alerts support alert conditions over aggregated query results, including AVG , and they support notification-frequency behavior that avoids repeated alerts during a sustained triggered state. Databricks documents that with Just Once , a notification is sent when the alert changes from OK to TRIGGERED , but not repeatedly while it remains triggered. ( Databricks Documentation )
Option A is the only choice that correctly computes a daily average, checks the latest three daily rows, and pairs that with the anti-spam Just Once notification behavior. Option B checks only raw hourly values over one day, option C averages across the entire three-day span rather than requiring three consecutive daily breaches, and option D counts hourly threshold violations instead of true daily-average violations. ( Databricks Documentation )
======
QUESTION NO: 24
A data engineer is using Lakeflow Spark Declarative Pipelines Expectations to track the data quality of incoming sensor data. Periodically, sensors send bad readings that are out of range, and the team is currently flagging those rows with a warning and writing them to the silver table along with the good data. They have been given a new requirement: the bad rows need to be quarantined in a separate quarantine table and no longer included in the silver table.
This is the existing code for the silver table:
@dlt.table
@dlt.expect( " valid_sensor_reading " , " reading < 120 " )
def silver_sensor_readings():
return spark.readStream.table( " bronze_sensor_readings " )
Which code will satisfy the requirements?
A.
@dlt.table
@dlt.expect_or_drop( " valid_sensor_reading " , " reading < 120 " )
def silver_sensor_readings():
return spark.readStream.table( " bronze_sensor_readings " )
B.
@dlt.table
@dlt.expect_or_drop( " valid_sensor_reading " , " reading < 120 " )
def silver_sensor_readings():
return spark.readStream.table( " bronze_sensor_readings " )
@dlt.table
@dlt.expect( " invalid_sensor_reading " , " reading > = 120 " )
def quarantine_sensor_readings():
return spark.readStream.table( " bronze_sensor_readings " )
C.
@dlt.table
@dlt.expect_or_drop( " valid_sensor_reading " , " reading < 120 " )
def silver_sensor_readings():
return spark.readStream.table( " bronze_sensor_readings " )
@dlt.table
@dlt.expect( " invalid_sensor_reading " , " reading < 120 " )
def quarantine_sensor_readings():
return spark.readStream.table( " bronze_sensor_readings " )
D.
@dlt.table
@dlt.expect( " valid_sensor_reading " , " reading < 120 " )
def silver_sensor_readings():
return spark.readStream.table( " bronze_sensor_readings " )
@dlt.table
@dlt.expect( " invalid_sensor_reading " , " reading > = 120 " )
def quarantine_sensor_readings():
return spark.readStream.table( " bronze_sensor_readings " )
Answer: B
Databricks documents that expect retains invalid records in the target dataset, while expect_or_drop drops invalid records before writing to the target. Therefore, the silver table must use expect_or_drop so bad records are excluded from silver. ( Databricks Documentation )
Databricks also documents a quarantine pattern in which invalid records are separated for downstream processing, but the fully documented pattern uses an intermediate quarantine dataset with an is_quarantined flag and then derives valid and invalid paths from it. None of the listed options exactly matches the official quarantine pattern. As written, option B is the closest intended answer because it at least creates a separate quarantine table and removes invalid rows from silver, but strictly speaking, the documented quarantine implementation is more explicit than any option shown here. ( Databricks Documentation )
======
QUESTION NO: 26
A facilities-monitoring team is building a near-real-time Power BI dashboard off the Delta table device_readings :
device_id STRING — unique sensor ID
event_ts TIMESTAMP — ingestion timestamp (UTC)
temperature_c DOUBLE — temperature in °C
notes STRING
For each sensor, the team needs one row per non-overlapping 5-minute interval, offset by 2 minutes (for example, intervals like 00:02–00:07 , 00:07–00:12 , and so on), showing the average temperature in that slice. The result must include each interval’s start and end timestamps so downstream tools can plot time-series bars correctly. Which query satisfies the requirement?
A.
WITH buckets AS (
SELECT
device_id,
window(event_ts, ' 5 minutes ' , ' 5 minutes ' , ' 2 minutes ' ) AS win,
temperature_c
FROM device_readings
)
SELECT
device_id,
win.start AS bucket_start,
win.end AS bucket_end,
AVG(temperature_c) AS avg_temp_5m
FROM buckets
GROUP BY device_id, win
ORDER BY device_id, bucket_start;
B.
SELECT
device_id,
window.start AS bucket_start,
window.end AS bucket_end,
AVG(temperature_c) AS avg_temp_5m
FROM device_readings
GROUP BY device_id, window(event_ts, ' 5 minutes ' , ' 5 minutes ' , ' 2 minutes ' )
ORDER BY device_id, bucket_start;
C.
SELECT
device_id,
date_trunc( ' minute ' , event_ts - INTERVAL 2 MINUTES) + INTERVAL 2 MINUTES AS bucket_start,
date_trunc( ' minute ' , event_ts - INTERVAL 2 MINUTES) + INTERVAL 7 MINUTES AS bucket_end,
AVG(temperature_c) AS avg_temp_5m
FROM device_readings
GROUP BY device_id, date_trunc( ' minute ' , event_ts - INTERVAL 2 MINUTES)
ORDER BY device_id, bucket_start;
D.
SELECT
device_id,
event_ts,
AVG(temperature_c) OVER (
PARTITION BY device_id
ORDER BY event_ts
RANGE BETWEEN INTERVAL 5 MINUTES PRECEDING AND CURRENT ROW
) AS avg_temp_5m
FROM device_readings
WINDOW w AS (window(event_ts, ' 5 minutes ' , ' 2 minutes ' ));
Answer: A
Spark documents window(timeColumn, windowDuration, slideDuration=None, startTime=None) for time bucketing. The startTime argument is specifically the offset from the epoch used to align window boundaries, and the output is a window struct with start and end fields. That exactly matches the requirement for 5-minute non-overlapping intervals offset by 2 minutes. ( Apache Spark )
Option A correctly uses window(event_ts, ' 5 minutes ' , ' 5 minutes ' , ' 2 minutes ' ) , which creates tumbling 5-minute windows offset by 2 minutes and then exposes win.start and win.end . Option B is malformed in how it references the generated window column, option C creates minute-aligned groupings rather than true 5-minute tumbling windows, and option D computes a rolling window average instead of one row per non-overlapping time bucket. ( Apache Spark )

Submit