diff --git a/internal/config/config.go b/internal/config/config.go index c5d1c882..84f8aa3f 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -37,6 +37,7 @@ type Config struct { Tables Tables `json:"tables" yaml:"tables"` Statsd *StatsD `json:"statsd,omitempty" yaml:"statsd" env:"STATSD"` Computed []Computed `json:"computed" yaml:"computed" env:"COMPUTED"` + Filters []Computed `json:"filters" yaml:"filters" env:"FILTERS"` K8s *K8s `json:"k8s,omitempty" yaml:"k8s" env:"K8S"` } diff --git a/internal/encoding/block/strip.go b/internal/encoding/block/strip.go new file mode 100644 index 00000000..857737c3 --- /dev/null +++ b/internal/encoding/block/strip.go @@ -0,0 +1,58 @@ +// Copyright 2019-2020 Grabtaxi Holdings PTE LTE (GRAB), All rights reserved. +// Use of this source code is governed by an MIT-style license that can be found in the LICENSE file + +package block + +import ( + "github.com/kelindar/talaria/internal/column/computed" + "github.com/kelindar/talaria/internal/encoding/typeof" +) + +// Strip runs the computed Values and overwrites/appends them to the set. +func Strip(filter *typeof.Schema, computed ...computed.Computed) applyFunc { + return func(r Row) (Row, error) { + for _, c := range computed { + v, err := c.Value(r.Values) + if err != nil || v == nil { + continue + } + if v.(bool) == true { + if r.Values["bch"] == "test" { + out := NewRow(nil, 0) + return out, nil + } + } + } + + return r, nil + // // Create a new output row and copy the column values from the input + // schema := make(typeof.Schema, len(r.Schema)) + // out := NewRow(schema, len(r.Values)+len(computed)) + // for k, v := range r.Values { + // if filter == nil || filter.HasConvertible(k, r.Schema[k]) { + // out.Values[k] = v + // out.Schema[k] = r.Schema[k] + // } + // } + + // // Compute the Values + // for _, c := range computed { + // if filter != nil && !filter.Contains(c.Name(), c.Type()) { + // continue // Skip computed Values which aren't part of the filter + // } + + // // Compute the column + // // v, err := c.Value(r.Values) + // // if err != nil || v == nil { + // // continue + // // } + + // // If the column with the same name is already present in the input row, + // // we need to overwrite this column and set a new type. + // // out.Schema[c.Name()] = c.Type() + // delete(out.Schema, "") + // delete(out.Values, "") + // } + // return out, nil + } +} diff --git a/internal/encoding/merge/orc.go b/internal/encoding/merge/orc.go index b1ee90a9..d74b46aa 100644 --- a/internal/encoding/merge/orc.go +++ b/internal/encoding/merge/orc.go @@ -13,6 +13,9 @@ import ( // ToOrc merges multiple blocks together and outputs a key and merged orc data func ToOrc(blocks []block.Block, schema typeof.Schema) ([]byte, error) { + if len(schema) == 0 { + return nil, nil + } orcSchema, err := orc.SchemaFor(schema) if err != nil { return nil, errors.Internal("merge: error generating orc schema", err) diff --git a/internal/server/server.go b/internal/server/server.go index 299f0e7b..30b3fc37 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -66,6 +66,18 @@ func New(conf config.Func, monitor monitor.Monitor, tables ...table.Table) *Serv server.computed = append(server.computed, col) } + // Load filter columns + for _, c := range conf().Filters { + col, err := computed.NewComputed(c.Name, c.FuncName, c.Type, c.Func, monitor) + if err != nil { + monitor.Error(err) + continue + } + + monitor.Info("server: loaded filter %v of type %v", c.Name, c.Type) + server.filter = append(server.filter, col) + } + // Register the gRPC servers talaria.RegisterIngressServer(server.server, server) talaria.RegisterQueryServer(server.server, server) @@ -86,6 +98,7 @@ type Server struct { cancel context.CancelFunc // The cancellation function for the server tables map[string]table.Table // The list of tables computed []computed.Computed // The set of computed columns + filter []computed.Computed // The set of filters s3sqs *s3sqs.Ingress // The S3SQS Ingress (optional) } diff --git a/internal/server/server_ingest.go b/internal/server/server_ingest.go index 91df03b0..33bef369 100644 --- a/internal/server/server_ingest.go +++ b/internal/server/server_ingest.go @@ -39,7 +39,7 @@ func (s *Server) Ingest(ctx context.Context, request *talaria.IngestRequest) (*t } // Functions to be applied - funcs := []applyFunc{block.Transform(filter, s.computed...)} + funcs := []applyFunc{block.Strip(filter, s.filter...), block.Transform(filter, s.computed...)} // If table supports streaming, add publishing function if streamer, ok := t.(storage.Streamer); ok {