Skip to content

Conversation

floydspace
Copy link
Owner

@floydspace floydspace commented Nov 17, 2024

closes #16

here is the way to define those particular options:

import { NodeRuntime } from "@effect/platform-node";
import { Effect, Logger, LogLevel } from "effect";
import { ConfluentRdKafkaInstance, Producer } from "../src";

const program = Producer.sendScoped({
  topic: "test-topic",
  messages: [{ value: "Hello, effect-kafka user!" }, { value: "How are you, effect-kafka user?" }],
}).pipe(
  Producer.withProducerOptions({
    idempotent: false,
    partitioner: "consistent_random",
    queueBuffering: {
      maxMessages: 1000000,
      maxKbytes: 1048576,
      maxMs: 100,
    },
    batching: {
      maxMessages: 100000,
      maxBytes: 4194304,
    },
    stickyPartitioning: {
      lingerMs: 25,
    },
  }),
);

const KafkaLive = ConfluentRdKafkaInstance.layer({
  "metadata.broker.list": "localhost:19092",
  // "fetch.max.bytes": 4194304, // ConsumerOptions.maxBytes
  // "max.partition.fetch.bytes": 1048576, // ConsumerOptions.maxBytesPerPartition
  // "queue.buffering.max.kbytes": 1048576, // ProducerOptions.queueBuffering.maxKbytes
  // "queue.buffering.max.messages": 1000000, // ProducerOptions.queueBuffering.maxMessages
  "message.max.bytes": 4194304,
  // "batch.size": "4194304", // ProducerOptions.batching.maxBytes
  // "batch.num.messages": "100000", // ProducerOptions.batching.maxMessages
  // "linger.ms": "100", // ProducerOptions.queueBuffering.maxMs
  // "sticky.partitioning.linger.ms": "25", // ProducerOptions.stickyPartitioning.lingerMs
  // "enable.idempotence": "false", // ProducerOptions.idempotent
  "max.in.flight.requests.per.connection": 1000000,
  // "partitioner": "consistent_random", // ProducerOptions.partitioner
});
const MainLive = Effect.scoped(program).pipe(Effect.provide(KafkaLive), Logger.withMinimumLogLevel(LogLevel.Debug));

NodeRuntime.runMain(MainLive);

Copy link

changeset-bot bot commented Nov 17, 2024

🦋 Changeset detected

Latest commit: 6e88d17

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 1 package
Name Type
effect-kafka Patch

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

@floydspace floydspace marked this pull request as draft November 17, 2024 12:49
@floydspace floydspace marked this pull request as ready for review November 18, 2024 21:07
@floydspace floydspace merged commit 64bbd83 into main Nov 18, 2024
3 checks passed
@floydspace floydspace deleted the feat/16-rdkafka-config branch November 18, 2024 21:09
@github-actions github-actions bot mentioned this pull request Nov 17, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add support for more configuration of the lidrdkafka producer

1 participant