Releases: floydspace/effect-kafka
Releases · floydspace/effect-kafka
v0.9.0
Minor Changes
-
1493848
Thanks @floydspace! - Implement methods for defining and parsing kafka messages with effect SchemaHo to use:
- When you parse raw kafka message, you can use the
MessageRouter.schemaRaw
in combination withConsumerSchema
helper methods to define the schema of the message.
import { Console, Effect, Schema } from "effect"; import { Consumer, ConsumerSchema, MessageRouter } from "effect-kafka"; const ConsumerLive = MessageRouter.empty.pipe( MessageRouter.subscribe( "test-topic", MessageRouter.schemaRaw( Schema.Struct({ topic: Schema.Literal("test-topic"), partition: Schema.Number, offset: Schema.NumberFromString, key: Schema.NullOr(ConsumerSchema.Number), value: ConsumerSchema.String, }), ).pipe( Effect.flatMap(({ topic, partition, ...message }) => Console.log({ topic, partition, offset: message.offset, key: message.key, value: message.value, }), ), ), ), Consumer.serve({ groupId: "group" }), );
- When you parse raw kafka message, you can use the
MessageRouter.schemaJson
if you expect kafka messagevalue
to be a JSON string. Sovalue
schema property can be defined asSchema.Struct
.
import { Console, Effect, Schema } from "effect"; import { Consumer, ConsumerSchema, MessageRouter } from "effect-kafka"; const ConsumerLive = MessageRouter.empty.pipe( MessageRouter.subscribe( "test-topic", MessageRouter.schemaJson( Schema.Struct({ topic: Schema.Literal("test-topic"), partition: Schema.Number, offset: Schema.NumberFromString, key: Schema.NullOr(ConsumerSchema.Number), value: Schema.Struct({ message: Schema.String }), }), ).pipe( Effect.flatMap(({ topic, partition, ...message }) => Console.log({ topic, partition, offset: message.offset, key: message.key, value: message.value.message, }), ), ), ), Consumer.serve({ groupId: "group" }), );
- When you parse only kafka message raw
value
as raw value, you can use theConsumerRecord.schemaValueRaw
in combination withConsumerSchema
helper methods to define the schema of the message value.
import { Console, Effect } from "effect"; import { Consumer, ConsumerRecord, ConsumerSchema, MessageRouter } from "effect-kafka"; const ConsumerLive = MessageRouter.empty.pipe( MessageRouter.subscribe( "customers", ConsumerRecord.schemaValueRaw(ConsumerSchema.String).pipe(Effect.flatMap((value) => Console.log(value))), ), Consumer.serve({ groupId: "group" }), );
- When you parse only kafka message
value
as json value, you can use theConsumerRecord.schemaValueJson
. Sovalue
schema can be defined asSchema.Struct
.
import { Console, Effect } from "effect"; import { Consumer, ConsumerRecord, MessageRouter } from "effect-kafka"; const ConsumerLive = MessageRouter.empty.pipe( MessageRouter.subscribe( "customers", ConsumerRecord.schemaValueJson(Schema.Struct({ message: Schema.String })).pipe( Effect.flatMap((value) => Console.log(value)), ), ), Consumer.serve({ groupId: "group" }), );
- When you parse raw kafka message, you can use the
Patch Changes
f564387
Thanks @floydspace! - upgrade @confluentinc/kafka-javascript to the stable v1
v0.8.0
Minor Changes
- #32
3fab074
Thanks @floydspace! - Implement mvp platformatic kafka instance
v0.7.1
Patch Changes
-
#34
936a300
Thanks @floydspace! - fix hanging stream when handler failscloses #33
v0.7.0
v0.6.0
Minor Changes
- #29
d3c8fc0
Thanks @floydspace! - define UnknownProducerError and use it as fallback for all errors happend in producer engine
Patch Changes
-
#29
d3c8fc0
Thanks @floydspace! - Poll and retry in case QueueFull error raised -
#20
4c5f113
Thanks @floydspace! - implement an ability to catch errors raised in MessageRouter handler
v0.5.2
Patch Changes
- #25
21df29a
Thanks @floydspace! - fix esm distro
v0.5.1
Patch Changes
d9a8d4d
Thanks @floydspace! - fix bug in internal types
v0.5.0
Minor Changes
- #22
42f2df9
Thanks @floydspace! - Improve build configuration, expose sub packages and fix optional peer dependencies bug, closes #21
v0.4.3
Patch Changes
-
#18
64bbd83
Thanks @floydspace! - proxy more config options for rdkafka -
7bd1b0c
Thanks @floydspace! - implement in memory kafka instance, closes #9
v0.4.2
Patch Changes
0ef48b3
Thanks @floydspace! - forward rdkafka logs, useLogger.withMinimumLogLevel
to enable desired log levels, closes #11