package stream import ( "log" "reflect" "sort" ) type ( // a Stream is where one can drain data from Stream chan interface{} // buffer stream BufferStream struct { Stream Size int } // retains those element agreed with FilterFunc FilterFunc func(i interface{}) bool // used by sort LessFunc func(i, j interface{}) bool // KeyFunc func(i interface{}) interface{} ) // instance an empty Stream func NewStream() Stream { return make(chan interface{}) } // init a Stream from a source func (s Stream) From(source interface{}) Stream { v := reflect.ValueOf(source) switch k := v.Kind(); k { case reflect.Slice: go func() { defer close(s) for i := 0; i < v.Len(); i++ { s <- v.Index(i).Interface() } }() default: panic("got a non-slice kind source") } return s } func (s Stream) Retain(f FilterFunc) Stream { c := make(chan interface{}) go func() { defer close(c) for i := range s { if f(i) { c <- i } } }() return c } func (s Stream) Sort(lessFunc LessFunc) Stream { cache := make([]interface{}, 0) for i := range s { cache = append(cache, i) } sort.Slice(cache, func(i, j int) bool { return lessFunc(cache[i], cache[j]) }) return NewStream().From(cache) } func (s Stream) Reverse() Stream { var items []interface{} for item := range s { items = append(items, item) } // reverse, official method for i := len(items)/2 - 1; i >= 0; i-- { opp := len(items) - 1 - i items[i], items[opp] = items[opp], items[i] } return NewStream().From(items) } // sink: print func (s Stream) Print() { for i := range s { log.Println(i) } } // sink: size func (s Stream) Size() int { count := 0 for range s { count += 1 } return count } // sink func (s Stream) First(n int) Stream { c := make(chan interface{}, n) count := 0 go func() { defer close(c) for i := range s { if count < n { c <- i count += 1 } } }() return c } // sink func (s Stream) FirstOne() (interface{}, bool) { c := s.First(1) cache := make([]interface{}, 0) for i := range c { cache = append(cache, i) } if len(cache) == 0 { return nil, false } return cache[0], true } // sink func (s Stream) Sum(f KeyFunc) float64 { result := 0.0 for i := range s { v := reflect.ValueOf(f(i)) switch v.Kind() { case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: result += float64(v.Int()) case reflect.Float32, reflect.Float64: result += v.Float() } } return result }
package stream type ( GroupedRecord struct { Key interface{} Value []interface{} } AggregatedRecord struct { Key interface{} Value interface{} } AggFunc func(elements []interface{}) interface{} GroupedStream chan GroupedRecord AggregatedGroupedStream chan AggregatedRecord ) func (s Stream) GroupBy(f KeyFunc) GroupedStream { c := make(chan GroupedRecord) cache := make(map[interface{}][]interface{}) for i := range s { key := f(i) cache[key] = append(cache[key], i) } go func() { defer close(c) for k, v := range cache { c <- GroupedRecord{ Key: k, Value: v, } } }() return c } func (gs GroupedStream) Agg(aggFunc AggFunc) AggregatedGroupedStream { c := make(chan AggregatedRecord) go func() { defer close(c) for gr := range gs { c <- AggregatedRecord{ Key: gr.Key, Value: aggFunc(gr.Value), } } }() return c } func (ags AggregatedGroupedStream) Gather() map[interface{}]interface{} { r := make(map[interface{}]interface{}) for i := range ags { r[i.Key] = i.Value } return r }
package stream import "log" type ( JoinedValue struct { Left interface{} Right interface{} } JoinedRecord struct { Key interface{} Value []JoinedValue } JoinedStream chan JoinedRecord JoinedFilterFunc func(left, right interface{}) bool JoinAggFunc func(left, right interface{}) interface{} ) func Join(left, right Stream, leftBy, rightBy KeyFunc) JoinedStream { c := make(chan JoinedRecord) cache := make(map[interface{}][]JoinedValue) leftCache, rightCache := make([]interface{}, 0), make([]interface{}, 0) for i := range left { leftCache = append(leftCache, i) } for j := range right { rightCache = append(rightCache, j) } for _, i := range leftCache { for _, j := range rightCache { keyLeft, keyRight := leftBy(i), rightBy(j) if keyLeft == keyRight { cache[keyLeft] = append(cache[keyLeft], JoinedValue{ Left: i, Right: j, }) } } } go func() { defer close(c) for k, v := range cache { c <- JoinedRecord{ Key: k, Value: v, } } }() return c } func (js JoinedStream) Filter(f JoinedFilterFunc) JoinedStream { c := make(chan JoinedRecord) go func() { defer close(c) for jr := range js { cache := make([]JoinedValue, 0) for _, jv := range jr.Value { if f(jv.Left, jv.Right) { cache = append(cache, jv) } } if len(cache) != 0 { c <- JoinedRecord{ Key: jr.Key, Value: cache, } } } }() return c } // sink func (js JoinedStream) Print() { for i := range js { log.Println(i) } } // convert a stream of (key,[]JoinValue) to a stream of (key, []interface) // a.k.a a JoinedStream -> a GroupedStream func (js JoinedStream) Fold(f JoinAggFunc) GroupedStream { c := make(chan GroupedRecord) go func() { defer close(c) for jr := range js { cache := make([]interface{}, 0) for _, jv := range jr.Value { cache = append(cache, f(jv.Left, jv.Right)) } c <- GroupedRecord{ Key: jr.Key, Value: cache, } } }() return c }