Skip to content

Instantly share code, notes, and snippets.

@ankurcha
Created October 23, 2015 05:29
Show Gist options
  • Save ankurcha/56f33837c10bed31093a to your computer and use it in GitHub Desktop.
Save ankurcha/56f33837c10bed31093a to your computer and use it in GitHub Desktop.
Helper class to convert Apache Avro schema to BigQuery Table schema
package com.mallo64.dataflow
import com.google.api.client.json.GenericJson;
import com.google.api.services.bigquery.model.TableCell;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import org.apache.avro.Schema;
import org.apache.avro.specific.SpecificRecord;
import java.util.List;
import java.util.stream.Collectors;
import static org.apache.avro.Schema.*;
public class AvroToBigQuery<TRecord extends SpecificRecord> extends DoFn<TRecord, TableRow> {
@Override
public void processElement(ProcessContext processContext) throws Exception {
processContext.output(getTableRow(processContext.element()));
}
static TableRow getTableRow(SpecificRecord record) {
TableRow row = new TableRow();
encode(record, row);
return row;
}
static TableCell getTableCell(SpecificRecord record) {
TableCell cell = new TableCell();
encode(record, cell);
return cell;
}
private static void encode(SpecificRecord record, GenericJson row) {
Schema schema = record.getSchema();
schema.getFields().forEach(field -> {
Type type = field.schema().getType();
switch (type) {
case RECORD:
row.set(field.name(), getTableCell((SpecificRecord) record.get(field.pos())));
break;
case INT:
case LONG:
row.set(field.name(), ((Number)record.get(field.pos())).longValue());
break;
case BOOLEAN:
row.set(field.name(), record.get(field.pos()));
break;
case FLOAT:
case DOUBLE:
row.set(field.name(), ((Number)record.get(field.pos())).doubleValue());
break;
default:
row.set(field.name(), String.valueOf(record.get(field.pos())));
}
});
}
public static TableSchema getTableSchemaRecord(Schema schema) {
return new TableSchema().setFields(getFieldsSchema(schema.getFields()));
}
static List<TableFieldSchema> getFieldsSchema(List<Schema.Field> fields) {
return fields.stream().map(field -> {
TableFieldSchema column = new TableFieldSchema().setName(field.name());
Type type = field.schema().getType();
switch (type) {
case RECORD:
column.setType("RECORD");
column.setFields(getFieldsSchema(fields));
break;
case INT:
case LONG:
column.setType("INTEGER");
break;
case BOOLEAN:
column.setType("BOOLEAN");
break;
case FLOAT:
case DOUBLE:
column.setType("FLOAT");
break;
default:
column.setType("STRING");
}
return column;
}).collect(Collectors.toList());
}
}
@gadaldo
Copy link

gadaldo commented Aug 22, 2016

That's a cool helper, thanks. Wonder if there is an helper to do the opposite.

@pablod
Copy link

pablod commented Mar 15, 2017

useful!

@darrenhaken
Copy link

That's a cool helper, thanks. Wonder if there is an helper to do the opposite.

Did you ever get the reverse?

@uraniumdawn
Copy link

Hello guys. Can you help me to get info about how I can use the TableSchema for creating an empty BQ table? New BQ java sdk version push me to use the new class that represents the BQ schema: com.google.cloud.bigquery.Schema. It not so flexible to create the new bq schema dynamically like in the example above.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment