Skip to content

Releases: floydspace/effect-kafka

v0.9.0

21 Jul 18:21
9eddcf9
Compare
Choose a tag to compare

Minor Changes

  • 1493848 Thanks @floydspace! - Implement methods for defining and parsing kafka messages with effect Schema

    Ho to use:

    1. When you parse raw kafka message, you can use the MessageRouter.schemaRaw in combination with ConsumerSchema helper methods to define the schema of the message.
    import { Console, Effect, Schema } from "effect";
    import { Consumer, ConsumerSchema, MessageRouter } from "effect-kafka";
    
    const ConsumerLive = MessageRouter.empty.pipe(
      MessageRouter.subscribe(
        "test-topic",
        MessageRouter.schemaRaw(
          Schema.Struct({
            topic: Schema.Literal("test-topic"),
            partition: Schema.Number,
            offset: Schema.NumberFromString,
            key: Schema.NullOr(ConsumerSchema.Number),
            value: ConsumerSchema.String,
          }),
        ).pipe(
          Effect.flatMap(({ topic, partition, ...message }) =>
            Console.log({
              topic,
              partition,
              offset: message.offset,
              key: message.key,
              value: message.value,
            }),
          ),
        ),
      ),
      Consumer.serve({ groupId: "group" }),
    );
    1. When you parse raw kafka message, you can use the MessageRouter.schemaJson if you expect kafka message value to be a JSON string. So value schema property can be defined as Schema.Struct.
    import { Console, Effect, Schema } from "effect";
    import { Consumer, ConsumerSchema, MessageRouter } from "effect-kafka";
    
    const ConsumerLive = MessageRouter.empty.pipe(
      MessageRouter.subscribe(
        "test-topic",
        MessageRouter.schemaJson(
          Schema.Struct({
            topic: Schema.Literal("test-topic"),
            partition: Schema.Number,
            offset: Schema.NumberFromString,
            key: Schema.NullOr(ConsumerSchema.Number),
            value: Schema.Struct({ message: Schema.String }),
          }),
        ).pipe(
          Effect.flatMap(({ topic, partition, ...message }) =>
            Console.log({
              topic,
              partition,
              offset: message.offset,
              key: message.key,
              value: message.value.message,
            }),
          ),
        ),
      ),
      Consumer.serve({ groupId: "group" }),
    );
    1. When you parse only kafka message raw value as raw value, you can use the ConsumerRecord.schemaValueRaw in combination with ConsumerSchema helper methods to define the schema of the message value.
    import { Console, Effect } from "effect";
    import { Consumer, ConsumerRecord, ConsumerSchema, MessageRouter } from "effect-kafka";
    
    const ConsumerLive = MessageRouter.empty.pipe(
      MessageRouter.subscribe(
        "customers",
        ConsumerRecord.schemaValueRaw(ConsumerSchema.String).pipe(Effect.flatMap((value) => Console.log(value))),
      ),
      Consumer.serve({ groupId: "group" }),
    );
    1. When you parse only kafka message value as json value, you can use the ConsumerRecord.schemaValueJson. So value schema can be defined as Schema.Struct.
    import { Console, Effect } from "effect";
    import { Consumer, ConsumerRecord, MessageRouter } from "effect-kafka";
    
    const ConsumerLive = MessageRouter.empty.pipe(
      MessageRouter.subscribe(
        "customers",
        ConsumerRecord.schemaValueJson(Schema.Struct({ message: Schema.String })).pipe(
          Effect.flatMap((value) => Console.log(value)),
        ),
      ),
      Consumer.serve({ groupId: "group" }),
    );

Patch Changes

v0.8.0

03 Jul 22:04
2802852
Compare
Choose a tag to compare

Minor Changes

v0.7.1

07 Jun 17:12
df50027
Compare
Choose a tag to compare

Patch Changes

v0.7.0

21 Jan 08:37
af69d7f
Compare
Choose a tag to compare

Minor Changes

v0.6.0

24 Nov 02:09
65558b9
Compare
Choose a tag to compare

Minor Changes

  • #29 d3c8fc0 Thanks @floydspace! - define UnknownProducerError and use it as fallback for all errors happend in producer engine

Patch Changes

v0.5.2

22 Nov 12:29
d7d0ca5
Compare
Choose a tag to compare

Patch Changes

v0.5.1

20 Nov 22:41
08b19d2
Compare
Choose a tag to compare

Patch Changes

v0.5.0

20 Nov 22:10
87ddb2f
Compare
Choose a tag to compare

Minor Changes

  • #22 42f2df9 Thanks @floydspace! - Improve build configuration, expose sub packages and fix optional peer dependencies bug, closes #21

v0.4.3

18 Nov 21:11
310264f
Compare
Choose a tag to compare

Patch Changes

v0.4.2

16 Nov 13:33
80de819
Compare
Choose a tag to compare

Patch Changes

  • 0ef48b3 Thanks @floydspace! - forward rdkafka logs, use Logger.withMinimumLogLevel to enable desired log levels, closes #11