Charmed Spark K8s
- Canonical | bundle
Channel | Revision | Published |
latest/edge | 4 | 06 Aug 2024 |
3.4/edge | 4 | 06 Aug 2024 |
juju deploy spark-k8s-bundle --channel edge
Deploy Kubernetes operators easily with Juju, the Universal Operator Lifecycle Manager. Need a Kubernetes cluster? Install MicroK8s to create a full CNCF-certified Kubernetes system in under 60 seconds.
How to run Apache Spark Streaming against Apache Kafka
The following guide is to set up Apache Spark for structured streaming with Apache Kafka.
As a pre-requisite, Juju has to be installed together with a kubernetes-based juju controller.
First, create a fresh Juju model to be used as a workspace for spark-streaming experiments:
juju add-model spark-streaming
Deploy the Apache ZooKeeper and the Apache Kafka k8s-charms. Single units should be enough.
juju deploy zookeeper-k8s --series=jammy --channel=edge
juju deploy kafka-k8s --series=jammy --channel=edge
juju relate kafka-k8s zookeeper-k8s
Deploy a test producer application, to write messages to Charmed Apache Kafka:
juju deploy kafka-test-app --series=jammy --channel=edge --config role=producer --config topic_name=spark-streaming-store --config num_messages=1000
juju relate kafka-test-app kafka-k8s
To consume these messages we need to establish a connection between Apache Spark and Apache Kafka, which requires credentials.
We need to deploy the data-integrator
charm, which performs credential retrieval:
juju deploy data-integrator --series=jammy --channel=edge --config extra-user-roles=consumer,admin --config topic-name=spark-streaming-store
juju relate data-integrator kafka-k8s
juju run-action data-integrator/0 get-credentials --wait
We are using the service account set up in the previous examples.
We need to set up the environment in a Kubernetes pod launched in the same namespace as the Juju model (i.e. spark-streaming
in this example).
The pod specification yaml goes as below:
apiVersion: v1
kind: Pod
name: testpod
- image:
name: spark
- containerPort: 18080
command: ["sleep"]
args: ["3600"]
Create the pod in the same namespace as the Juju model.
Launch a Bash shell inside the test pod.
kubectl apply -f ./testpod.yaml --namespace=spark-streaming
kubectl exec -it testpod -n spark-streaming -- /bin/bash
Create a Kubernetes cluster configuration within the test pod shell session to be able to work with spark-client
Launch a pyspark
shell to read the structured stream from Apache Kafka.
cd /home/spark
mkdir .kube
cat > .kube/config << EOF
spark-client.service-account-registry create --username hello --namespace spark-streaming
spark-client.service-account-registry list
spark-client.pyspark --username hello --namespace spark-streaming --conf spark.executor.instances=1 --conf spark.jars.ivy=/tmp --packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.2.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0
Within the pyspark
shell, now use the credentials retrieved previously to read stream from Apache Kafka.
from pyspark.sql.functions import udf
from json import loads
lines = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka-k8s-0.kafka-k8s-endpoints:9092") \
.option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
.option("", "SASL_PLAINTEXT") \
.option("kafka.sasl.jaas.config", f' required username={username} password={password};') \
.option("subscribe", "spark-streaming-store") \
.option("includeHeaders", "true") \
get_origin = udf(lambda x: loads(x)["origin"])
count = lines.withColumn("origin", get_origin(col("value"))).select("origin")\
.groupBy("origin", "partition")\