@@ -8,6 +8,8 @@ import com.segment.analytics.kotlin.core.utilities.SegmentInstant
88import com.segment.analytics.kotlin.core.utilities.getString
99import com.segment.analytics.kotlin.core.utilities.putInContext
1010import com.segment.analytics.kotlin.core.utilities.updateJsonObject
11+ import com.segment.analytics.kotlin.core.utilities.set
12+ import com.segment.analytics.kotlin.core.utils.StubAfterPlugin
1113import com.segment.analytics.kotlin.core.utils.StubPlugin
1214import com.segment.analytics.kotlin.core.utils.TestRunPlugin
1315import com.segment.analytics.kotlin.core.utils.clearPersistentStorage
@@ -17,7 +19,6 @@ import io.mockk.*
1719import kotlinx.coroutines.runBlocking
1820import kotlinx.coroutines.test.TestScope
1921import kotlinx.coroutines.test.UnconfinedTestDispatcher
20- import kotlinx.coroutines.test.runBlockingTest
2122import kotlinx.coroutines.test.runTest
2223import kotlinx.serialization.json.buildJsonObject
2324import kotlinx.serialization.json.jsonObject
@@ -34,6 +35,7 @@ import java.io.ByteArrayInputStream
3435import java.net.HttpURLConnection
3536import java.util.Date
3637import java.util.UUID
38+ import java.util.concurrent.Semaphore
3739
3840@TestInstance(TestInstance .Lifecycle .PER_CLASS )
3941class AnalyticsTests {
@@ -979,4 +981,169 @@ class AnalyticsTests {
979981 context = baseContext
980982 integrations = emptyJsonObject
981983 }
984+ }
985+
986+ @TestInstance(TestInstance .Lifecycle .PER_METHOD )
987+ class AsyncAnalyticsTests {
988+ private lateinit var analytics: Analytics
989+
990+ private lateinit var afterPlugin: StubAfterPlugin
991+
992+ private lateinit var httpSemaphore: Semaphore
993+
994+ private lateinit var assertSemaphore: Semaphore
995+
996+ private lateinit var actual: CapturingSlot <BaseEvent >
997+
998+ @BeforeEach
999+ fun setup () {
1000+ httpSemaphore = Semaphore (0 )
1001+ assertSemaphore = Semaphore (0 )
1002+
1003+ val settings = """
1004+ {"integrations":{"Segment.io":{"apiKey":"1vNgUqwJeCHmqgI9S1sOm9UHCyfYqbaQ"}},"plan":{},"edgeFunction":{}}
1005+ """ .trimIndent()
1006+ mockkConstructor(HTTPClient ::class )
1007+ val settingsStream = ByteArrayInputStream (
1008+ settings.toByteArray()
1009+ )
1010+ val httpConnection: HttpURLConnection = mockk()
1011+ val connection = object : Connection (httpConnection, settingsStream, null ) {}
1012+ every { anyConstructed<HTTPClient >().settings(" cdn-settings.segment.com/v1" ) } answers {
1013+ // suspend http calls until we tracked events
1014+ // this will force events get into startup queue
1015+ httpSemaphore.acquire()
1016+ connection
1017+ }
1018+
1019+ afterPlugin = spyk(StubAfterPlugin ())
1020+ actual = slot<BaseEvent >()
1021+ every { afterPlugin.execute(capture(actual)) } answers {
1022+ val input = firstArg<BaseEvent ?>()
1023+ // since this is an after plugin, when its execute function is called,
1024+ // it is guaranteed that the enrichment closure has been called.
1025+ // so we can release the semaphore on assertions.
1026+ assertSemaphore.release()
1027+ input
1028+ }
1029+ analytics = Analytics (Configuration (writeKey = " 123" , application = " Test" ))
1030+ analytics.add(afterPlugin)
1031+ }
1032+
1033+ @Test
1034+ fun `startup queue should replay with track enrichment closure` () {
1035+ val expectedEvent = " foo"
1036+ val expectedAnonymousId = " bar"
1037+
1038+ analytics.track(expectedEvent) {
1039+ it?.anonymousId = expectedAnonymousId
1040+ it
1041+ }
1042+
1043+ // now we have tracked event, i.e. event added to startup queue
1044+ // release the semaphore put on http client, so we startup queue will replay the events
1045+ httpSemaphore.release()
1046+ // now we need to wait for events being fully replayed before making assertions
1047+ assertSemaphore.acquire()
1048+
1049+ assertTrue(actual.isCaptured)
1050+ actual.captured.let {
1051+ assertTrue(it is TrackEvent )
1052+ val e = it as TrackEvent
1053+ assertTrue(e.properties.isEmpty())
1054+ assertEquals(expectedEvent, e.event)
1055+ assertEquals(expectedAnonymousId, e.anonymousId)
1056+ }
1057+ }
1058+
1059+ @Disabled
1060+ @Test
1061+ fun `startup queue should replay with identify enrichment closure` () {
1062+ val expected = buildJsonObject {
1063+ put(" foo" , " baz" )
1064+ }
1065+ val expectedUserId = " newUserId"
1066+
1067+ analytics.identify(expectedUserId) {
1068+ if (it is IdentifyEvent ) {
1069+ it.traits = updateJsonObject(it.traits) {
1070+ it[" foo" ] = " baz"
1071+ }
1072+ }
1073+ it
1074+ }
1075+
1076+ // now we have tracked event, i.e. event added to startup queue
1077+ // release the semaphore put on http client, so we startup queue will replay the events
1078+ httpSemaphore.release()
1079+ // now we need to wait for events being fully replayed before making assertions
1080+ assertSemaphore.acquire()
1081+
1082+ val actualUserId = analytics.userId()
1083+
1084+ assertTrue(actual.isCaptured)
1085+ actual.captured.let {
1086+ assertTrue(it is IdentifyEvent )
1087+ val e = it as IdentifyEvent
1088+ assertEquals(expected, e.traits)
1089+ assertEquals(expectedUserId, actualUserId)
1090+ }
1091+ }
1092+
1093+ @Disabled
1094+ @Test
1095+ fun `startup queue should replay with group enrichment closure` () {
1096+ val expected = buildJsonObject {
1097+ put(" foo" , " baz" )
1098+ }
1099+ val expectedGroupId = " foo"
1100+
1101+ analytics.group(expectedGroupId) {
1102+ if (it is GroupEvent ) {
1103+ it.traits = updateJsonObject(it.traits) {
1104+ it[" foo" ] = " baz"
1105+ }
1106+ }
1107+ it
1108+ }
1109+
1110+ // now we have tracked event, i.e. event added to startup queue
1111+ // release the semaphore put on http client, so we startup queue will replay the events
1112+ httpSemaphore.release()
1113+ // now we need to wait for events being fully replayed before making assertions
1114+ assertSemaphore.acquire()
1115+
1116+ assertTrue(actual.isCaptured)
1117+ actual.captured.let {
1118+ assertTrue(it is GroupEvent )
1119+ val e = it as GroupEvent
1120+ assertEquals(expected, e.traits)
1121+ assertEquals(expectedGroupId, e.groupId)
1122+ }
1123+ }
1124+
1125+ @Disabled
1126+ @Test
1127+ fun `startup queue should replay with alias enrichment closure` () {
1128+ val expected = " bar"
1129+
1130+ analytics.alias(expected) {
1131+ it?.anonymousId = " test"
1132+ it
1133+ }
1134+
1135+ // now we have tracked event, i.e. event added to startup queue
1136+ // release the semaphore put on http client, so we startup queue will replay the events
1137+ httpSemaphore.release()
1138+ // now we need to wait for events being fully replayed before making assertions
1139+ assertSemaphore.acquire()
1140+
1141+ assertTrue(actual.isCaptured)
1142+ actual.captured.let {
1143+ assertTrue(it is AliasEvent )
1144+ val e = it as AliasEvent
1145+ assertEquals(expected, e.userId)
1146+ assertEquals(" test" , e.anonymousId)
1147+ }
1148+ }
9821149}
0 commit comments