Skip to content

Commit c175284

Browse files
committed
Rewrite stages, fixes and refactoring
1 parent e0a73e6 commit c175284

File tree

20 files changed

+463
-408
lines changed

20 files changed

+463
-408
lines changed

firebase-firestore/src/main/java/com/google/firebase/firestore/Pipeline.kt

Lines changed: 39 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@ package com.google.firebase.firestore
1616

1717
import com.google.android.gms.tasks.Task
1818
import com.google.android.gms.tasks.TaskCompletionSource
19-
import com.google.common.collect.FluentIterable
20-
import com.google.common.collect.ImmutableList
2119
import com.google.firebase.Timestamp
2220
import com.google.firebase.firestore.model.DocumentKey
2321
import com.google.firebase.firestore.model.Values
@@ -62,7 +60,7 @@ open class AbstractPipeline
6260
internal constructor(
6361
internal val firestore: FirebaseFirestore,
6462
internal val userDataReader: UserDataReader,
65-
internal val stages: FluentIterable<Stage<*>>
63+
internal val stages: List<Stage<*>>
6664
) {
6765
private fun toStructuredPipelineProto(options: InternalOptions?): StructuredPipeline {
6866
val builder = StructuredPipeline.newBuilder()
@@ -95,7 +93,7 @@ internal constructor(
9593
private val userDataWriter =
9694
UserDataWriter(firestore, DocumentSnapshot.ServerTimestampBehavior.DEFAULT)
9795
private val taskCompletionSource = TaskCompletionSource<PipelineSnapshot>()
98-
private val results: ImmutableList.Builder<PipelineResult> = ImmutableList.builder()
96+
private val results: MutableList<PipelineResult> = mutableListOf()
9997
override fun onDocument(
10098
key: DocumentKey?,
10199
data: Map<String, Value>,
@@ -115,7 +113,7 @@ internal constructor(
115113
}
116114

117115
override fun onComplete(executionTime: Timestamp) {
118-
taskCompletionSource.setResult(PipelineSnapshot(executionTime, results.build()))
116+
taskCompletionSource.setResult(PipelineSnapshot(executionTime, results))
119117
}
120118

121119
override fun onError(exception: FirebaseFirestoreException) {
@@ -131,16 +129,16 @@ class Pipeline
131129
private constructor(
132130
firestore: FirebaseFirestore,
133131
userDataReader: UserDataReader,
134-
stages: FluentIterable<Stage<*>>
132+
stages: List<Stage<*>>
135133
) : AbstractPipeline(firestore, userDataReader, stages) {
136134
internal constructor(
137135
firestore: FirebaseFirestore,
138136
userDataReader: UserDataReader,
139137
stage: Stage<*>
140-
) : this(firestore, userDataReader, FluentIterable.of(stage))
138+
) : this(firestore, userDataReader, listOf(stage))
141139

142140
private fun append(stage: Stage<*>): Pipeline {
143-
return Pipeline(firestore, userDataReader, stages.append(stage))
141+
return Pipeline(firestore, userDataReader, stages.plus(stage))
144142
}
145143

146144
fun execute(): Task<PipelineSnapshot> = execute(null)
@@ -760,17 +758,18 @@ class RealtimePipeline
760758
internal constructor(
761759
firestore: FirebaseFirestore,
762760
userDataReader: UserDataReader,
763-
stages: FluentIterable<Stage<*>>
761+
stages: List<Stage<*>>
764762
) : AbstractPipeline(firestore, userDataReader, stages) {
765763
internal constructor(
766764
firestore: FirebaseFirestore,
767765
userDataReader: UserDataReader,
768766
stage: Stage<*>
769-
) : this(firestore, userDataReader, FluentIterable.of(stage))
767+
) : this(firestore, userDataReader, listOf(stage))
770768

771-
private fun append(stage: Stage<*>): RealtimePipeline {
772-
return RealtimePipeline(firestore, userDataReader, stages.append(stage))
773-
}
769+
private fun with(stages: List<Stage<*>>): RealtimePipeline =
770+
RealtimePipeline(firestore, userDataReader, stages)
771+
772+
private fun append(stage: Stage<*>): RealtimePipeline = with(stages.plus(stage))
774773

775774
fun execute(): Task<PipelineSnapshot> = execute(null)
776775

@@ -790,6 +789,33 @@ internal constructor(
790789
append(SortStage(arrayOf(order, *additionalOrders)))
791790

792791
fun where(condition: BooleanExpr): RealtimePipeline = append(WhereStage(condition))
792+
793+
internal fun rewriteStages(): RealtimePipeline {
794+
var hasOrder = false
795+
return with(
796+
buildList {
797+
for (stage in stages) when (stage) {
798+
// Stages whose semantics depend on ordering
799+
is LimitStage,
800+
is OffsetStage -> {
801+
if (!hasOrder) {
802+
hasOrder = true
803+
add(SortStage.BY_DOCUMENT_ID)
804+
}
805+
add(stage)
806+
}
807+
is SortStage -> {
808+
hasOrder = true
809+
add(stage.withStableOrdering())
810+
}
811+
else -> add(stage)
812+
}
813+
if (!hasOrder) {
814+
add(SortStage.BY_DOCUMENT_ID)
815+
}
816+
}
817+
)
818+
}
793819
}
794820

795821
/**

firebase-firestore/src/main/java/com/google/firebase/firestore/pipeline/evaluation.kt

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,7 @@ import com.google.common.math.LongMath
2020
import com.google.common.math.LongMath.checkedAdd
2121
import com.google.common.math.LongMath.checkedMultiply
2222
import com.google.common.math.LongMath.checkedSubtract
23-
import com.google.firebase.firestore.FirebaseFirestore
24-
import com.google.firebase.firestore.UserDataReader
23+
import com.google.firebase.firestore.RealtimePipeline
2524
import com.google.firebase.firestore.model.MutableDocument
2625
import com.google.firebase.firestore.model.Values
2726
import com.google.firebase.firestore.model.Values.encodeValue
@@ -43,7 +42,7 @@ import kotlin.math.log10
4342
import kotlin.math.pow
4443
import kotlin.math.sqrt
4544

46-
internal class EvaluationContext(val db: FirebaseFirestore, val userDataReader: UserDataReader)
45+
internal class EvaluationContext(val pipeline: RealtimePipeline)
4746

4847
internal typealias EvaluateDocument = (input: MutableDocument) -> EvaluateResult
4948

firebase-firestore/src/main/java/com/google/firebase/firestore/pipeline/expressions.kt

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ abstract class Expr internal constructor() {
227227
override fun evaluateContext(
228228
context: EvaluationContext
229229
): (input: MutableDocument) -> EvaluateResult {
230-
val result = EvaluateResultValue(toProto(context.userDataReader))
230+
val result = EvaluateResultValue(toProto(context.pipeline.userDataReader))
231231
return { _ -> result }
232232
}
233233
}
@@ -4124,8 +4124,8 @@ abstract class Expr internal constructor() {
41244124

41254125
/** Expressions that have an alias are [Selectable] */
41264126
abstract class Selectable : Expr() {
4127-
internal abstract fun getAlias(): String
4128-
internal abstract fun getExpr(): Expr
4127+
internal abstract val alias: String
4128+
internal abstract val expr: Expr
41294129

41304130
internal companion object {
41314131
fun toSelectable(o: Any): Selectable {
@@ -4140,10 +4140,8 @@ abstract class Selectable : Expr() {
41404140
}
41414141

41424142
/** Represents an expression that will be given the alias in the output document. */
4143-
class ExprWithAlias internal constructor(private val alias: String, private val expr: Expr) :
4143+
class ExprWithAlias internal constructor(override val alias: String, override val expr: Expr) :
41444144
Selectable() {
4145-
override fun getAlias() = alias
4146-
override fun getExpr() = expr
41474145
override fun toProto(userDataReader: UserDataReader): Value = expr.toProto(userDataReader)
41484146
override fun evaluateContext(context: EvaluationContext) = expr.evaluateContext(context)
41494147
}
@@ -4164,12 +4162,16 @@ class Field internal constructor(private val fieldPath: ModelFieldPath) : Select
41644162
*
41654163
* @return An [Field] representing the document ID.
41664164
*/
4167-
@JvmField val DOCUMENT_ID: Field = field(FieldPath.documentId())
4165+
@JvmField val DOCUMENT_ID: Field = Field(KEY_PATH)
4166+
4167+
@JvmField internal val UPDATE_TIME: Field = Field(UPDATE_TIME_PATH)
4168+
4169+
@JvmField internal val CREATE_TIME: Field = Field(CREATE_TIME_PATH)
41684170
}
41694171

4170-
override fun getAlias(): String = fieldPath.canonicalString()
4172+
override val alias: String = fieldPath.canonicalString()
41714173

4172-
override fun getExpr(): Expr = this
4174+
override val expr: Expr = this
41734175

41744176
override fun toProto(userDataReader: UserDataReader) = toProto()
41754177

@@ -4180,7 +4182,8 @@ class Field internal constructor(private val fieldPath: ModelFieldPath) : Select
41804182
block@{ input: MutableDocument ->
41814183
EvaluateResultValue(
41824184
when (fieldPath) {
4183-
KEY_PATH -> encodeValue(DocumentReference.forPath(input.key.path, context.db))
4185+
KEY_PATH ->
4186+
encodeValue(DocumentReference.forPath(input.key.path, context.pipeline.firestore))
41844187
CREATE_TIME_PATH -> encodeValue(input.createTime.timestamp)
41854188
UPDATE_TIME_PATH -> encodeValue(input.version.timestamp)
41864189
else -> input.getField(fieldPath) ?: return@block EvaluateResultUnset

firebase-firestore/src/main/java/com/google/firebase/firestore/pipeline/stage.kt

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import com.google.firebase.firestore.CollectionReference
1818
import com.google.firebase.firestore.FirebaseFirestore
1919
import com.google.firebase.firestore.UserDataReader
2020
import com.google.firebase.firestore.VectorValue
21+
import com.google.firebase.firestore.model.DocumentKey.KEY_FIELD_NAME
2122
import com.google.firebase.firestore.model.MutableDocument
2223
import com.google.firebase.firestore.model.ResourcePath
2324
import com.google.firebase.firestore.model.Values
@@ -292,9 +293,17 @@ internal constructor(
292293
private val fields: Array<out Selectable>,
293294
options: InternalOptions = InternalOptions.EMPTY
294295
) : Stage<AddFieldsStage>("add_fields", options) {
296+
init {
297+
for (field in fields) {
298+
val alias = field.alias
299+
require(alias != Field.DOCUMENT_ID.alias, { "Alias ${Field.DOCUMENT_ID.alias} is reserved" })
300+
require(alias != Field.CREATE_TIME.alias, { "Alias ${Field.CREATE_TIME.alias} is reserved" })
301+
require(alias != Field.UPDATE_TIME.alias, { "Alias ${Field.UPDATE_TIME.alias} is reserved" })
302+
}
303+
}
295304
override fun self(options: InternalOptions) = AddFieldsStage(fields, options)
296305
override fun args(userDataReader: UserDataReader): Sequence<Value> =
297-
sequenceOf(encodeValue(fields.associate { it.getAlias() to it.toProto(userDataReader) }))
306+
sequenceOf(encodeValue(fields.associate { it.alias to it.toProto(userDataReader) }))
298307
}
299308

300309
/**
@@ -368,8 +377,8 @@ internal constructor(
368377
fun withGroups(group: Selectable, vararg additionalGroups: Any) =
369378
AggregateStage(
370379
accumulators,
371-
mapOf(group.getAlias() to group.getExpr())
372-
.plus(additionalGroups.map(Selectable::toSelectable).associateBy(Selectable::getAlias))
380+
mapOf(group.alias to group.expr)
381+
.plus(additionalGroups.map(Selectable::toSelectable).associateBy(Selectable::alias))
373382
)
374383

375384
override fun args(userDataReader: UserDataReader): Sequence<Value> =
@@ -539,7 +548,7 @@ internal constructor(private val offset: Int, options: InternalOptions = Interna
539548
}
540549

541550
internal class SelectStage
542-
private constructor(private val fields: Array<out Selectable>, options: InternalOptions) :
551+
private constructor(internal val fields: Array<out Selectable>, options: InternalOptions) :
543552
Stage<SelectStage>("select", options) {
544553
companion object {
545554
@JvmStatic
@@ -555,14 +564,18 @@ private constructor(private val fields: Array<out Selectable>, options: Internal
555564
}
556565
override fun self(options: InternalOptions) = SelectStage(fields, options)
557566
override fun args(userDataReader: UserDataReader): Sequence<Value> =
558-
sequenceOf(encodeValue(fields.associate { it.getAlias() to it.toProto(userDataReader) }))
567+
sequenceOf(encodeValue(fields.associate { it.alias to it.toProto(userDataReader) }))
559568
}
560569

561570
internal class SortStage
562571
internal constructor(
563572
private val orders: Array<out Ordering>,
564573
options: InternalOptions = InternalOptions.EMPTY
565574
) : Stage<SortStage>("sort", options) {
575+
companion object {
576+
internal val BY_DOCUMENT_ID = SortStage(arrayOf(Field.DOCUMENT_ID.ascending()))
577+
}
578+
566579
override fun self(options: InternalOptions) = SortStage(orders, options)
567580
override fun args(userDataReader: UserDataReader): Sequence<Value> =
568581
orders.asSequence().map { it.toProto(userDataReader) }
@@ -603,6 +616,16 @@ internal constructor(
603616
.forEach { p -> emit(p.first) }
604617
}
605618
}
619+
620+
internal fun withStableOrdering(): SortStage {
621+
val position = orders.indexOfFirst { (it.expr as? Field)?.alias == KEY_FIELD_NAME }
622+
return if (position < 0) {
623+
// Append the DocumentId to orders to make ordering stable.
624+
SortStage(orders.asList().plus(Field.DOCUMENT_ID.ascending()).toTypedArray(), options)
625+
} else {
626+
this
627+
}
628+
}
606629
}
607630

608631
internal class DistinctStage
@@ -612,14 +635,22 @@ internal constructor(
612635
) : Stage<DistinctStage>("distinct", options) {
613636
override fun self(options: InternalOptions) = DistinctStage(groups, options)
614637
override fun args(userDataReader: UserDataReader): Sequence<Value> =
615-
sequenceOf(encodeValue(groups.associate { it.getAlias() to it.toProto(userDataReader) }))
638+
sequenceOf(encodeValue(groups.associate { it.alias to it.toProto(userDataReader) }))
616639
}
617640

618641
internal class RemoveFieldsStage
619642
internal constructor(
620643
private val fields: Array<out Field>,
621644
options: InternalOptions = InternalOptions.EMPTY
622645
) : Stage<RemoveFieldsStage>("remove_fields", options) {
646+
init {
647+
for (field in fields) {
648+
val alias = field.alias
649+
require(alias != Field.DOCUMENT_ID.alias, { "Alias ${Field.DOCUMENT_ID.alias} is required" })
650+
require(alias != Field.CREATE_TIME.alias, { "Alias ${Field.CREATE_TIME.alias} is required" })
651+
require(alias != Field.UPDATE_TIME.alias, { "Alias ${Field.UPDATE_TIME.alias} is required" })
652+
}
653+
}
623654
override fun self(options: InternalOptions) = RemoveFieldsStage(fields, options)
624655
override fun args(userDataReader: UserDataReader): Sequence<Value> =
625656
fields.asSequence().map(Field::toProto)
@@ -746,11 +777,11 @@ internal constructor(
746777
*/
747778
@JvmStatic
748779
fun withField(arrayField: String, alias: String): UnnestStage =
749-
UnnestStage(Expr.field(arrayField).alias(alias))
780+
UnnestStage(Expr.Companion.field(arrayField).alias(alias))
750781
}
751782
override fun self(options: InternalOptions) = UnnestStage(selectable, options)
752783
override fun args(userDataReader: UserDataReader): Sequence<Value> =
753-
sequenceOf(encodeValue(selectable.getAlias()), selectable.toProto(userDataReader))
784+
sequenceOf(encodeValue(selectable.alias), selectable.toProto(userDataReader))
754785

755786
/**
756787
* Adds index field to emitted documents

0 commit comments

Comments
 (0)