Skip to content

Instantly share code, notes, and snippets.

@briansorahan
Created July 14, 2020 18:51
Show Gist options
  • Save briansorahan/1da8a795ac4ab982652363b6b8b1bcf2 to your computer and use it in GitHub Desktop.
Save briansorahan/1da8a795ac4ab982652363b6b8b1bcf2 to your computer and use it in GitHub Desktop.
Debugging parquet-go
package main
import (
"bufio"
"compress/gzip"
"io"
"log"
"os"
"time"
"github.com/pkg/errors"
"github.com/xitongsys/parquet-go-source/local"
"github.com/xitongsys/parquet-go/writer"
)
func main() {
f, err := os.Open("lease_production.ndjson.gz")
if err != nil {
log.Fatal(err)
}
defer f.Close()
r, err := gzip.NewReader(f)
if err != nil {
log.Fatal(err)
}
defer r.Close()
if err := run(r); err != nil {
log.Fatal(err)
}
}
func run(r io.Reader) error {
var (
count int64
br = bufio.NewReader(r)
start = time.Now()
)
fw, err := local.NewLocalFileWriter("lease_production.parquet")
if err != nil {
return errors.Wrap(err, "creating parquet file writer")
}
pw, err := writer.NewJSONWriter(schema, fw, 4)
if err != nil {
return errors.Wrap(err, "creating parquet writer")
}
defer func() {
pw.WriteStop()
log.Println("closed parquet writer")
fw.Close()
log.Println("closed file writer")
}()
ReadLoop:
for {
line, err := br.ReadString(0x0A)
if err == io.EOF {
break ReadLoop
}
if err != nil {
return errors.Wrap(err, "reading line")
}
if err := process(line, pw); err != nil {
return errors.Wrapf(err, "processing line %d %s", count, line)
}
count++
}
log.Printf("processed %d lines in %s", count, time.Since(start))
return nil
}
func process(line string, pw *writer.JSONWriter) error {
if err := pw.Write(line); err != nil {
return errors.Wrap(err, "writing json to parquet")
}
return nil
}
var schema = `{
"Tag": "name=parquet-go-root",
"Fields": [
{"Tag": "name=entity_id, type=UTF8, repetitiontype=REQUIRED"}
,{"Tag": "name=lease_no, type=UTF8, repetitiontype=REQUIRED"}
,{"Tag": "name=district, type=UTF8, repetitiontype=REQUIRED"}
,{"Tag": "name=lease_type, type=UTF8, repetitiontype=REQUIRED"}
,{"Tag": "name=field_no, type=UTF8, repetitiontype=REQUIRED"}
,{"Tag": "name=production_date, type=UTF8, repetitiontype=REQUIRED"}
,{"Tag": "name=operator_no, type=UTF8, repetitiontype=OPTIONAL"}
,{"Tag": "name=oil, type=DOUBLE, repetitiontype=OPTIONAL"}
,{"Tag": "name=casinghead, type=DOUBLE, repetitiontype=OPTIONAL"}
,{"Tag": "name=condensate, type=DOUBLE, repetitiontype=OPTIONAL"}
,{"Tag": "name=gas, type=DOUBLE, repetitiontype=OPTIONAL"}
,{"Tag": "name=flow_count, type=DOUBLE, repetitiontype=OPTIONAL"}
,{"Tag": "name=lift_count, type=DOUBLE, repetitiontype=OPTIONAL"}
]
}
`
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment