@@ -37,7 +37,7 @@ func (p *QueryProcess) RowIter(ctx *sql.Context) (sql.RowIter, error) {
3737 return nil , err
3838 }
3939
40- return & trackedRowIter {iter , p .Notify }, nil
40+ return & trackedRowIter {iter : iter , onDone : p .Notify }, nil
4141}
4242
4343func (p * QueryProcess ) String () string { return p .Child .String () }
@@ -48,12 +48,14 @@ func (p *QueryProcess) String() string { return p.Child.String() }
4848// partition is processed.
4949type ProcessIndexableTable struct {
5050 sql.IndexableTable
51- Notify NotifyFunc
51+ OnPartitionDone NamedNotifyFunc
52+ OnPartitionStart NamedNotifyFunc
53+ OnRowNext NamedNotifyFunc
5254}
5355
5456// NewProcessIndexableTable returns a new ProcessIndexableTable.
55- func NewProcessIndexableTable (t sql.IndexableTable , notify NotifyFunc ) * ProcessIndexableTable {
56- return & ProcessIndexableTable {t , notify }
57+ func NewProcessIndexableTable (t sql.IndexableTable , onPartitionDone , onPartitionStart , OnRowNext NamedNotifyFunc ) * ProcessIndexableTable {
58+ return & ProcessIndexableTable {t , onPartitionDone , onPartitionStart , OnRowNext }
5759}
5860
5961// Underlying implements sql.TableWrapper interface.
@@ -71,7 +73,7 @@ func (t *ProcessIndexableTable) IndexKeyValues(
7173 return nil , err
7274 }
7375
74- return & trackedPartitionIndexKeyValueIter {iter , t .Notify }, nil
76+ return & trackedPartitionIndexKeyValueIter {iter , t .OnPartitionDone , t . OnPartitionStart , t . OnRowNext }, nil
7577}
7678
7779// PartitionRows implements the sql.Table interface.
@@ -81,22 +83,46 @@ func (t *ProcessIndexableTable) PartitionRows(ctx *sql.Context, p sql.Partition)
8183 return nil , err
8284 }
8385
84- return & trackedRowIter {iter , t .Notify }, nil
86+ partitionName := partitionName (p )
87+ if t .OnPartitionStart != nil {
88+ t .OnPartitionStart (partitionName )
89+ }
90+
91+ var onDone NotifyFunc
92+ if t .OnPartitionDone != nil {
93+ onDone = func () {
94+ t .OnPartitionDone (partitionName )
95+ }
96+ }
97+
98+ var onNext NotifyFunc
99+ if t .OnRowNext != nil {
100+ onNext = func () {
101+ t .OnRowNext (partitionName )
102+ }
103+ }
104+
105+ return & trackedRowIter {iter : iter , onNext : onNext , onDone : onDone }, nil
85106}
86107
87108var _ sql.IndexableTable = (* ProcessIndexableTable )(nil )
88109
110+ // NamedNotifyFunc is a function to notify about some event with a string argument.
111+ type NamedNotifyFunc func (name string )
112+
89113// ProcessTable is a wrapper for sql.Tables inside a query process. It
90114// notifies the process manager about the status of a query when a partition
91115// is processed.
92116type ProcessTable struct {
93117 sql.Table
94- Notify NotifyFunc
118+ OnPartitionDone NamedNotifyFunc
119+ OnPartitionStart NamedNotifyFunc
120+ OnRowNext NamedNotifyFunc
95121}
96122
97123// NewProcessTable returns a new ProcessTable.
98- func NewProcessTable (t sql.Table , notify NotifyFunc ) * ProcessTable {
99- return & ProcessTable {t , notify }
124+ func NewProcessTable (t sql.Table , onPartitionDone , onPartitionStart , OnRowNext NamedNotifyFunc ) * ProcessTable {
125+ return & ProcessTable {t , onPartitionDone , onPartitionStart , OnRowNext }
100126}
101127
102128// Underlying implements sql.TableWrapper interface.
@@ -111,18 +137,38 @@ func (t *ProcessTable) PartitionRows(ctx *sql.Context, p sql.Partition) (sql.Row
111137 return nil , err
112138 }
113139
114- return & trackedRowIter {iter , t .Notify }, nil
140+ partitionName := partitionName (p )
141+ if t .OnPartitionStart != nil {
142+ t .OnPartitionStart (partitionName )
143+ }
144+
145+ var onDone NotifyFunc
146+ if t .OnPartitionDone != nil {
147+ onDone = func () {
148+ t .OnPartitionDone (partitionName )
149+ }
150+ }
151+
152+ var onNext NotifyFunc
153+ if t .OnRowNext != nil {
154+ onNext = func () {
155+ t .OnRowNext (partitionName )
156+ }
157+ }
158+
159+ return & trackedRowIter {iter : iter , onNext : onNext , onDone : onDone }, nil
115160}
116161
117162type trackedRowIter struct {
118163 iter sql.RowIter
119- notify NotifyFunc
164+ onDone NotifyFunc
165+ onNext NotifyFunc
120166}
121167
122168func (i * trackedRowIter ) done () {
123- if i .notify != nil {
124- i .notify ()
125- i .notify = nil
169+ if i .onDone != nil {
170+ i .onDone ()
171+ i .onDone = nil
126172 }
127173}
128174
@@ -134,6 +180,11 @@ func (i *trackedRowIter) Next() (sql.Row, error) {
134180 }
135181 return nil , err
136182 }
183+
184+ if i .onNext != nil {
185+ i .onNext ()
186+ }
187+
137188 return row , nil
138189}
139190
@@ -144,7 +195,9 @@ func (i *trackedRowIter) Close() error {
144195
145196type trackedPartitionIndexKeyValueIter struct {
146197 sql.PartitionIndexKeyValueIter
147- notify NotifyFunc
198+ OnPartitionDone NamedNotifyFunc
199+ OnPartitionStart NamedNotifyFunc
200+ OnRowNext NamedNotifyFunc
148201}
149202
150203func (i * trackedPartitionIndexKeyValueIter ) Next () (sql.Partition , sql.IndexKeyValueIter , error ) {
@@ -153,18 +206,38 @@ func (i *trackedPartitionIndexKeyValueIter) Next() (sql.Partition, sql.IndexKeyV
153206 return nil , nil , err
154207 }
155208
156- return p , & trackedIndexKeyValueIter {iter , i .notify }, nil
209+ partitionName := partitionName (p )
210+ if i .OnPartitionStart != nil {
211+ i .OnPartitionStart (partitionName )
212+ }
213+
214+ var onDone NotifyFunc
215+ if i .OnPartitionDone != nil {
216+ onDone = func () {
217+ i .OnPartitionDone (partitionName )
218+ }
219+ }
220+
221+ var onNext NotifyFunc
222+ if i .OnRowNext != nil {
223+ onNext = func () {
224+ i .OnRowNext (partitionName )
225+ }
226+ }
227+
228+ return p , & trackedIndexKeyValueIter {iter , onDone , onNext }, nil
157229}
158230
159231type trackedIndexKeyValueIter struct {
160232 iter sql.IndexKeyValueIter
161- notify NotifyFunc
233+ onDone NotifyFunc
234+ onNext NotifyFunc
162235}
163236
164237func (i * trackedIndexKeyValueIter ) done () {
165- if i .notify != nil {
166- i .notify ()
167- i .notify = nil
238+ if i .onDone != nil {
239+ i .onDone ()
240+ i .onDone = nil
168241 }
169242}
170243
@@ -185,5 +258,16 @@ func (i *trackedIndexKeyValueIter) Next() ([]interface{}, []byte, error) {
185258 return nil , nil , err
186259 }
187260
261+ if i .onNext != nil {
262+ i .onNext ()
263+ }
264+
188265 return v , k , nil
189266}
267+
268+ func partitionName (p sql.Partition ) string {
269+ if n , ok := p .(sql.Nameable ); ok {
270+ return n .Name ()
271+ }
272+ return string (p .Key ())
273+ }
0 commit comments