-
Notifications
You must be signed in to change notification settings - Fork 74
Description
Hi, and thank you for this repo it is incredibly useful!
However, I'm having problems running your notebook. I'm not really sure what exactly the problem is (so sorry from the ramblings below) as I've tried a number of different things and getting different issues even after reverting all changes back to your exact code.
Initially, upon executing the notebook, everything apart from the consumer seemed to work. I could see messages being written to the Kafka topic in Grafana. However, the consumer complained that it didn't have the Kafka dependency available.
I found that the Kafka JAR hadn't been downloaded, as indicated in the image below of Zeppelin's interpreter page:

I couldn't even wget the JAR from inside the Zeppelin container (I believe the request timedout). However, now (after reverting my changes) it seems I can wget in the container, but Zeppelin is still unable to fetch them itself.
Putting aside the network issue, I've tried to manually add the dependency using:
%sh
mkdir /zeppelin/dep
cd /zeppelin/dep && wget https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka-0-8_2.11/2.0.2/spark-streaming-kafka-0-8_2.11-2.0.2.jar
%producer.dep
z.reset()
z.load("/zeppelin/dep/spark-streaming-kafka-0-8_2.11-2.0.2.jar")
%consumer.dep
z.reset()
z.load("/zeppelin/dep/spark-streaming-kafka-0-8_2.11-2.0.2.jar")
but now the producer times out when connecting to Kafka:
KafkaTimeoutErrorTraceback (most recent call last)
<ipython-input-6-49d1d1bf1849> in <module>()
20 topic=KAFKA_TOPIC,
21 key=str(row_dict["_c0"]).encode("utf-8"),
---> 22 value=json.dumps(row_dict).encode("utf-8"))
23
24 try:
/opt/conda/lib/python2.7/site-packages/kafka/producer/kafka.pyc in send(self, topic, value, key, headers, partition, timestamp_ms)
562 key_bytes = value_bytes = None
563 try:
--> 564 self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0)
565
566 key_bytes = self._serialize(
/opt/conda/lib/python2.7/site-packages/kafka/producer/kafka.pyc in _wait_on_metadata(self, topic, max_wait)
689 if not metadata_event.is_set():
690 raise Errors.KafkaTimeoutError(
--> 691 "Failed to update metadata after %.1f secs." % (max_wait,))
692 elif topic in self._metadata.unauthorized_topics:
693 raise Errors.TopicAuthorizationFailedError(topic)
KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.
This is all on a fresh set of containers (docker-compose down && docker-compose up -d).
Any help would be greatly appreciated!