diff --git a/cmd/jsonstream.go b/cmd/jsonstream.go index 493c06b..3bdc4e7 100644 --- a/cmd/jsonstream.go +++ b/cmd/jsonstream.go @@ -1,24 +1,69 @@ package main import ( - "encoding/json" - "flag" "fmt" "io" "os" "github.com/brettlangdon/jsonstream" + "github.com/brettlangdon/jsonstream/vendor/github.com/alexflint/go-arg" ) -var input io.Reader -var inputFile string +var args struct { + File string `arg:"-f,help:JSON stream file to read from"` + TSV bool `arg:"-t,help:Reformat the JSON stream to TSV '\t'"` + Key bool `arg:"-k,help:Whether or not to include the key in --tsv. '=\t='"` + Keys []string `arg:"positional,help:Which keys from the input JSON stream to include in the output"` +} func init() { - flag.StringVar(&inputFile, "file", nil, "") + arg.MustParse(&args) +} + +func getReader() (reader *jsonstream.Reader, err error) { + var input io.Reader + input = os.Stdin + if args.File != "" { + input, err = os.Open(args.File) + } + + if err == nil { + reader = jsonstream.NewReader(input, args.Keys) + } + return reader, err +} + +func getFormatter() (formatter *jsonstream.Formatter, err error) { + var format jsonstream.FormatType + format = jsonstream.FormatJSON + + if args.TSV { + format = jsonstream.FormatTSV + if args.Key { + format = jsonstream.FormatTSVKey + } + } + + if err == nil { + formatter = jsonstream.NewFormatter(format) + } + return formatter, err } func main() { - reader := jsonstream.NewReader(os.Stdin) + var err error + var reader *jsonstream.Reader + var formatter *jsonstream.Formatter + + reader, err = getReader() + if err != nil { + panic(err) + } + formatter, err = getFormatter() + if err != nil { + panic(err) + } + for { data, err := reader.ReadLine() if err == io.EOF { @@ -29,7 +74,7 @@ func main() { } var output []byte - output, err = json.Marshal(data) + output, err = formatter.Format(data) if err != nil { panic(err) } diff --git a/formatter.go b/formatter.go new file mode 100644 index 0000000..0a79f01 --- /dev/null +++ b/formatter.go @@ -0,0 +1,72 @@ +package jsonstream + +import ( + "bytes" + "encoding/json" + "fmt" + "sort" +) + +type FormatType int + +const ( + FormatJSON FormatType = iota + FormatTSV + FormatTSVKey +) + +type Formatter struct { + format FormatType +} + +func NewFormatter(format FormatType) *Formatter { + return &Formatter{ + format: format, + } +} + +func (formatter *Formatter) Format(data interface{}) (buf []byte, err error) { + if formatter.format == FormatJSON { + buf, err = formatter.formatJSON(data) + } else if formatter.format == FormatTSV { + buf, err = formatter.formatTSV(data, false) + } else if formatter.format == FormatTSVKey { + buf, err = formatter.formatTSV(data, true) + } else { + err = fmt.Errorf("Unknown FormatType '%v+'. Options are FormatJSON=0, or FormatTSV=1", formatter.format) + } + return buf, err +} + +func (formatter *Formatter) formatJSON(data interface{}) (buf []byte, err error) { + return json.Marshal(data) +} + +func (formatter *Formatter) formatTSV(data interface{}, keyed bool) (buf []byte, err error) { + var fields map[string]interface{} + fields, err = getAsMap(data) + if err != nil { + return buf, err + } + + var values [][]byte + var keys []string + for key := range fields { + keys = append(keys, key) + } + sort.Strings(keys) + + for _, key := range keys { + var value []byte + value, err = formatter.formatJSON(fields[key]) + if err != nil { + break + } + if keyed { + value = []byte(fmt.Sprintf("%s=%s", key, value)) + } + values = append(values, value) + } + buf = bytes.Join(values, []byte{'\t'}) + return buf, err +} diff --git a/reader.go b/reader.go index bd7ab79..616b6ba 100644 --- a/reader.go +++ b/reader.go @@ -9,14 +9,37 @@ import ( type Reader struct { buffer *bufio.Reader + keys map[string]bool } -func NewReader(r io.Reader) *Reader { +func NewReader(r io.Reader, k []string) *Reader { + var keys map[string]bool + keys = make(map[string]bool, 0) + for _, key := range k { + keys[key] = true + } return &Reader{ buffer: bufio.NewReader(r), + keys: keys, } } +func (reader *Reader) processData(data interface{}) (processed map[string]interface{}, err error) { + var fields map[string]interface{} + fields, err = getAsMap(data) + if err != nil { + return processed, err + } + + processed = make(map[string]interface{}) + for key, value := range fields { + if _, ok := reader.keys[key]; ok { + processed[key] = value + } + } + return processed, err +} + func (reader *Reader) ReadLine() (data interface{}, err error) { var line []byte var isPrefix bool @@ -26,8 +49,23 @@ func (reader *Reader) ReadLine() (data interface{}, err error) { err = errors.New("Line exceeds the length of the buffer") } - if err == nil { - err = json.Unmarshal(line, &data) + if err != nil { + return data, err + } + + // skip empty lines, we'll fail at processing them anyways + if len(line) == 0 { + return reader.ReadLine() + } + + err = json.Unmarshal(line, &data) + + if err != nil { + return data, err + } + + if len(reader.keys) > 0 { + data, err = reader.processData(data) } return data, err diff --git a/utils.go b/utils.go new file mode 100644 index 0000000..474a16c --- /dev/null +++ b/utils.go @@ -0,0 +1,16 @@ +package jsonstream + +import ( + "fmt" + "reflect" +) + +func getAsMap(data interface{}) (fields map[string]interface{}, err error) { + switch value := data.(type) { + case map[string]interface{}: + fields = value + default: + err = fmt.Errorf("Unexpected data type '%s', expected 'map[string]interface{}'.", reflect.TypeOf(value)) + } + return fields, err +}