Custom transformation
Users can follow this guide to apply custom transformations at the row level to all columns in their schema, including primary key columns. This guide can be used to specify custom transformation logic for existing columns, to populate new columns in Spanner that didn’t exist in the source, or to filter a row during migration. For ease of use, we have implemented the CustomTransformationFetcher class in v2/spanner-custom-shard, which can be updated according to the user’s logic.
Table of contents
Custom transformation workflow

Points to note regarding the workflow:
- The workflow will execute once per event and will also be triggered during retries.
- Not all column values need to be returned; only the returned columns will be updated. The rest will be migrated as they are from the source.
- Users can add new columns in the response that are not present in the source but exist in Spanner. These column values will be written to Spanner, and the data types of the returned values must be compatible with the Spanner schema.
Methods in CustomTransformationFetcher
- init() - This is an initialization method that will be called once during the pipeline setup. It is used to initialize the custom jar with custom parameters.
- toSpannerRow() - This method applies custom transformations to the incoming source record and is expected to return a subset of spanner row.
Parameter details
Request
MigrationTransformationRequest contains the following information -
- tableName - Name of the source table to which the event belongs to.
- shardId - Logical shard id of the record.
- eventType - The event type can either be INSERT, UPDATE-INSERT, UPDATE, UPDATE_DELETE or DELETE.
- requestRow - It is a map of type
Map<java.lang.String, java.lang.Object>where key is the source column name and value is source column value.
The following table outlines the Java object type that will be sent in the request to the toSpannerRow for each MySQL datatype:
| MYSQL datatype | Java object type |
|---|---|
| BIGINT | Long |
| BINARY | String (hex encoded) |
| BIT | Long |
| BLOB | String (hex encoded) |
| BOOLEAN | Long |
| CHAR | String |
| DATE | String (Format: yyyy-MM-dd) |
| DATETIME | String (UTC format) |
| DECIMAL | String |
| DOUBLE | Double |
| ENUM | String |
| FLOAT | Double |
| INT | Long |
| JSON | String |
| LONGBLOB | String (hex encoded) |
| LONGTEXT | String |
| MEDIUMBLOB | String (hex encoded) |
| MEDIUMINT | Long |
| MEDIUMTEXT | String |
| SET | String |
| SMALLINT | Long |
| TEXT | String |
| TIME | String (Format: HH-mm-ss) |
| TIMESTAMP | String (UTC format) |
| TINYBLOB | String (hex encoded) |
| TINYINT | Long |
| TINYTEXT | String |
| VARBINARY | String (hex encoded) |
| VARCHAR | String |
Values in the request for spatial data type columns will be NULL if not supported by the source connector.
Response
MigrationTransformationResponse contains the following information -
- responseRow - It is a map of type
Map<java.lang.String, java.lang.Object>where key is the spanner column name and value is spanner column value. - isEventFiltered - If set to true, event will be skipped and not written to spanner.
Values in the response row must be compatible with their corresponding Spanner column types.
The following table outlines the recommended Java object types for each Spanner data type(GSQL dialect and PostgreSQL dialect) to ensure successful data insertion:
| Spanner datatype (GSQL dialect) | Spanner datatype (PostgreSQL dialect) | Java object type |
|---|---|---|
| INT64 | bigint | Long |
| FLOAT32 | real | Double |
| FLOAT64 | double precision | Double |
| NUMERIC | numeric | String |
| BOOL | boolean | Boolean |
| STRING | text/character varying | String |
| BYTES | bytea | Hex Encoded string |
| DATE | date | String (Format: yyyy-MM-dd) |
| TIMESTAMP | timestamp with time zone | String (in UTC format, e.g., 2023-05-23T12:34:56Z) |
| JSON | jsonb | String |
Please refer to the sample implementation of toSpannerRow for most MySQL datatype columns here.
Steps to implement custom transformation
- Checkout the dataflow code from github
git clone https://github.com/GoogleCloudPlatform/DataflowTemplates.git - Update the logic in CustomTransformationFetcher class in
v2/spanner-custom-shard. - If any custom parameters are needed in the custom transformation logic, they can be passed to the init method of the custom class. The init method is invoked once per worker setup.
- Please test the modified code by writing unit and cross functional tests in CustomTransformationFetcherTest.java
- Build the
spanner-custom-shardmodule by running the below commands:cd v2/spanner-custom-shard mvn install - Upload the built JAR located in v2/spanner-custom-shard/target with the name
spanner-custom-shard-1.0-SNAPSHOT.jarto a GCS bucket
Error handling
- If the value of a column, received as a response from the custom JAR, cannot be successfully parsed into the corresponding Spanner column datatype, pipeline will deem the record as failed and label it as a SEVERE error in the DLQ.
- If the custom JAR sends a NULL response for a NOT NULL column, the record will encounter a failure during insertion and will be labelled as a SEVERE error in the DLQ.
- If the custom JAR returns an exception while processing a record then pipeline will deem the record as failed, label it as a SEVERE error in the DLQ and will increment the
Custom Transformation Exceptionsmetric. - If the custom JAR returns an extra column in response which is not present in the spanner schema then the record will be labelled as SEVERE error and moved to DLQ.
Best practices
- Avoid time-consuming operations in the custom JAR, as they can slow down the pipeline since it is executed at a per-record level.
- Ensure idempotency and account for retries.
- Be cautious with logging in the toSpannerRow method, as logging per row can cause throttling.
- Throw an InvalidTransformationException if an error occurs while processing a particular event in the custom JAR.