@@ -85,7 +85,8 @@ class SegmentDestination(
8585 var initialDelay = flushIntervalInMillis
8686
8787 // If we have events in queue flush them
88- val eventFilePaths = parseFilePaths(storage.read(Storage .Constants .Events )) // should we switch to extension function?
88+ val eventFilePaths =
89+ parseFilePaths(storage.read(Storage .Constants .Events )) // should we switch to extension function?
8990 if (eventFilePaths.isNotEmpty()) {
9091 initialDelay = 0
9192 }
@@ -115,6 +116,9 @@ class SegmentDestination(
115116 }
116117
117118 private fun performFlush () {
119+ if (eventCount.get() < 1 ) {
120+ return
121+ }
118122 val fileUrls = parseFilePaths(storage.read(Storage .Constants .Events ))
119123 if (fileUrls.isEmpty()) {
120124 analytics.log(" No events to upload" )
@@ -124,9 +128,14 @@ class SegmentDestination(
124128 for (fileUrl in fileUrls) {
125129 try {
126130 val connection = httpClient.upload(apiHost, apiKey)
131+ val file = File (fileUrl)
132+ // flush is executed in a thread pool and file could have been deleted by another thread
133+ if (! file.exists()) {
134+ continue
135+ }
127136 connection.outputStream?.let {
128137 // Write the payloads into the OutputStream.
129- val fileInputStream = FileInputStream (File (fileUrl) )
138+ val fileInputStream = FileInputStream (file )
130139 fileInputStream.copyTo(connection.outputStream)
131140 fileInputStream.close()
132141 connection.outputStream.close()
@@ -151,11 +160,13 @@ class SegmentDestination(
151160 )
152161 }
153162 } catch (e: Exception ) {
154- analytics.log("""
163+ analytics.log(
164+ """
155165 | Error uploading events from batch file
156166 | fileUrl="$fileUrl "
157167 | msg=${e.message}
158- """ .trimMargin(), type = LogType .ERROR )
168+ """ .trimMargin(), type = LogType .ERROR
169+ )
159170 e.printStackTrace()
160171 }
161172 }
0 commit comments