Setting up knative eventing

lets send some messages

Published December 13, 2021 #knative, #kubernetes, #eventing

Here's a walk through of how to setup knative eventing. I couldn't find this information in one place and it took longer than I expected to sort it out.

There are a couple of ways you can use eventing. One is to sent messages directly to specific services. This I outline walk through as a direct event.

Another is to use a broker and a trigger. A broker is basically the place you send stuff to, and then you define specific listeners by creating a trigger.

A third way it to use a channel and then create a subscription to that channel.

What's the differene?

Brokers/Trigger and Channels/Subscriptions are very very similar. The code that you use to send an event, in knative parlance, an event source is exactly the same, and the code you use to receive an event, or an event sink is exactly the same, the difference is in the deployment configuration and what the underlying service is that provides the event routing.

Note that the sources that are described in the knative documentation are the default ones, but writing your own is as simple as creating a binding (which gives you a K_SINK environment variable which will target your event), so it doesn't need to be all that complicated.

Also a bit confusing, is that brokers are themselves sinks, as well as sources.

With brokers and triggers, you can setup event type filtering on the trigger itself so that your service will only see messages of a specific type.

For channels I think you need to to the filtering on the receiving side.

Install knative-eventing

Since we've setup the knative operator previously, lets tell it to turn on eventing.

eventing.yaml

  apiVersion: v1
  kind: Namespace
  metadata:
    name: knative-eventing
  ---
  apiVersion: operator.knative.dev/v1alpha1
  kind: KnativeEventing
  metadata:
    name: knative-eventing
    namespace: knative-eventing

And then apply:

  kubectl apply -f eventing.yaml
namespace/knative-eventing configured
knativeeventing.operator.knative.dev/knative-eventing unchanged

Check the status of the deployment

  kubectl get deployment -n knative-eventing

Deploy the test services

Create the echo service

(Code is below)

  kn service create ruby-echo --image wschenk/ruby-echo -a networking.knative.dev/disableAutoTLS=true

Create the send service

(Code is below)

  kn service create ruby-send --image wschenk/ruby-send -a networking.knative.dev/disableAutoTLS=true

Receiving events

Direct event test

Create the source

This created an event source that sends an event every minute. We tell it to send the message directly to ruby-echo, the service that we just created.

  kn source ping create heartbeat-source \
    --schedule "*/1 * * * *" \
    --sink ksvc:ruby-echo
Ping source 'heartbeat-source' created in namespace 'default'.

View results

This is deployed on my domain, yours should be different.

curl http://ruby-echo.default.gitgratitude.com
Hello from the ruby eco service
{"specversion"=>"1.0", "id"=>"0dd72dfa-eac9-40a1-b48b-00df271cd62e", "source"=>"/apis/v1/namespaces/default/pingsources/heartbeat-source", "type"=>"dev.knative.sources.ping", "data_encoded"=>"", "data"=>"", "time"=>"2021-12-14T18:50:00.060257784Z"}

Cleanup

  kn source ping delete heartbeat-source
Ping source 'heartbeat-source' deleted in namespace 'default'.

Broker and Trigger test

Create the Broker

kn broker create default
Broker 'default' successfully created in namespace 'default'.

Create the source

Here we are using the ping source again, but pointing it to broker:default instead of the service.

  kn source ping create heartbeat-source \
    --schedule "*/1 * * * *" \
    --sink broker:default
Ping source 'heartbeat-source' created in namespace 'default'.

Create the trigger

Now that we are pinging the broker, we need to tell the broker to send that message type to our service, ruby-echo.

  kn trigger create heartbeat-trigger \
     --broker default \
     --filter type=dev.knative.sources.ping \
     --sink ksvc:ruby-echo
Trigger 'heartbeat-trigger' successfully created in namespace 'default'.

View results

  curl http://ruby-echo.default.gitgratitude.com
Hello from the ruby echo service
{"specversion"=>"1.0", "id"=>"1aef1313-e8ea-4b14-9d91-691f6ecbcc03", "source"=>"/apis/v1/namespaces/default/pingsources/heartbeat-source", "type"=>"dev.knative.sources.ping", "data_encoded"=>"", "data"=>"", "time"=>"2021-12-14T18:57:00.319157126Z", "knativearrivaltime"=>"2021-12-14T18:57:00.319736536Z"}

Cleanup

  kn broker delete default
  kn source ping delete heartbeat-source
  kn trigger delete heartbeat-trigger
Broker 'default' successfully deleted in namespace 'default'.
Ping source 'heartbeat-source' deleted in namespace 'default'.
Trigger 'heartbeat-trigger' deleted in namespace 'default'.

Channel test

Channel

Simple.

  kn channel create heartbeat
Channel 'heartbeat' created in namespace 'default'.

Ping the heartbeat channel

This is sending to channel:heartbeat

  kn source ping create heartbeat-source \
    --schedule "*/1 * * * *" \
    --sink channel:heartbeat
Ping source 'heartbeat-source' created in namespace 'default'.

Subscribe to the event

Here we create the subscription, which goes to our service.

  kn subscription create heartbeat-sub \
     --channel heartbeat \
     --sink ruby-echo
Subscription 'heartbeat-sub' created in namespace 'default'.

See the results

  curl http://ruby-echo.default.gitgratitude.com
Hello from the ruby echo service
{"specversion"=>"1.0", "id"=>"88171ffc-3008-4df4-98cd-c157ef5a1aea", "source"=>"/apis/v1/namespaces/default/pingsources/heartbeat-source", "type"=>"dev.knative.sources.ping", "data_encoded"=>"", "data"=>"", "time"=>"2021-12-14T18:59:00.082372715Z"}

Cleanup

  kn channel delete heartbeat
  kn source ping delete heartbeat-source
  kn subscription delete heartbeat-sub
Channel 'heartbeat' deleted in namespace 'default'.
Ping source 'heartbeat-source' deleted in namespace 'default'.
Subscription 'heartbeat-sub' deleted in namespace 'default'.

Sending events

Now we can look at creating and sending events. The receiving side is the same, but we need to create a service that will send a message. In each scenario this will be the same code, but depending upon how we deploy it, it will have a different K_SINK value in the environment.

Direct binding

Create direct binding

  kn source binding create direct-binding \
     --subject Service:serving.knative.dev/v1:ruby-send \
     --sink ruby-echo
Sink binding 'direct-binding' created in namespace 'default'.

Test

  curl http://ruby-echo.default.gitgratitude.com
Hello from the ruby echo service
  curl http://ruby-send.default.gitgratitude.com?message=Hello
sent
  curl http://ruby-echo.default.gitgratitude.com
Hello from the ruby echo service
{"specversion"=>"1.0", "id"=>"1234-1234-1234", "source"=>"/mycontext", "type"=>"com.example.someevent", "data_encoded"=>"{\"message\":\"Hello\"}", "data"=>{"message"=>"Hello"}, "datacontenttype"=>"application/json"}

Clean up

  kn source binding delete direct-binding

Through a broker

Create a broker

  kn broker create default

Create the trigger

  kn trigger create send-trigger \
     --broker default \
     --filter type=com.example.someevent \
     --sink ksvc:ruby-echo
Trigger 'send-trigger' successfully created in namespace 'default'.

Create the binding

  kn source binding create broker-binding \
     --subject Service:serving.knative.dev/v1:ruby-send \
     --sink broker:default
Sink binding 'broker-binding' created in namespace 'default'.

Test

  curl http://ruby-echo.default.gitgratitude.com
Hello from the ruby echo service
  curl http://ruby-send.default.gitgratitude.com?message=Hello
sent
  curl http://ruby-echo.default.gitgratitude.com
Hello from the ruby echo service
{"specversion"=>"1.0", "id"=>"1234-1234-1234", "source"=>"/mycontext", "type"=>"com.example.someevent", "data_encoded"=>"{\"message\":\"Hello\"}", "data"=>{"message"=>"Hello"}, "datacontenttype"=>"application/json", "knativearrivaltime"=>"2021-12-14T23:44:09.238452314Z"}

Cleanup

  kn trigger delete send-trigger
  kn source binding delete broker-binding
Trigger 'send-trigger' deleted in namespace 'default'.
Sink binding 'broker-binding' deleted in namespace 'default'.

On a channel

Create channel

Simple.

  kn channel create chatter

Create source binding to the channel

  kn source binding create chat-binding \
     --subject Service:serving.knative.dev/v1:ruby-send \
     --sink channel:chatter

Subscribe to the event

Here we create the subscription, which goes to our service.

  kn subscription create message-sub \
     --channel chatter \
     --sink ruby-echo
Subscription 'message-sub' created in namespace 'default'.

Test

  curl http://ruby-echo.default.gitgratitude.com
Hello from the ruby echo service
  curl http://ruby-send.default.gitgratitude.com?message=Hello
sent
  curl http://ruby-echo.default.gitgratitude.com
Hello from the ruby echo service
{"specversion"=>"1.0", "id"=>"1234-1234-1234", "source"=>"/mycontext", "type"=>"com.example.someevent", "data_encoded"=>"{\"message\":\"Hello\"}", "data"=>{"message"=>"Hello"}, "datacontenttype"=>"application/json"}

Cleanup

  kn channel delete chatter
  kn source binding delete chat-binding
  kn subscription delete message-sub

Code

ruby-echo

  require 'sinatra'
  require "cloud_events"

  class EventHolder
    def self.add_event( e )
      @events ||= []
      @events << e.to_h
    end

    def self.eventstring
      @events ||= []
      @events.join( "\n" )
    end
  end

  get '/' do
    "Hello from the ruby echo service\n#{EventHolder.eventstring}\n"
  end

  cloud_events_http = CloudEvents::HttpBinding.default

  post "/" do
    event = cloud_events_http.decode_event request.env
    logger.info "Received CloudEvent: #{event.to_h}"
    EventHolder.add_event( event )
  end

ruby-send

  require 'sinatra'
  require "cloud_events"
  require "net/http"
  require "uri"

  def send_message( data )
    event = CloudEvents::Event.create spec_version:      "1.0",
                                      id:                "1234-1234-1234",
                                      source:            "/mycontext",
                                      type:              "com.example.someevent",
                                      data_content_type: "application/json",
                                      data:              data

    cloud_events_http = CloudEvents::HttpBinding.default
    headers, body = cloud_events_http.encode_event event
    Net::HTTP.post URI(ENV['K_SINK']), body, headers
  end

  get '/' do
    if params[:message]
      if ENV['K_SINK']
        send_message( {message: params[:message] })
        return "sent"
      else
        return "K_SINK not defined"
      end
    else
      return 'Try passing in a message'
    end
  end


  get '/keys' do
    ENV.keys.collect { |key| "#{key}=#{ENV[key]}" }.join( "\n" )
  end

References

  1. https://knative.dev/docs/eventing/sinks/

  2. https://knative.dev/docs/eventing/sources/ping-source/

  3. https://opensource.com/article/21/2/knative-eventing

Read next

See also

Serving a knative function on the root

root to services

I want to deploy everything as a knative service, including the root of the domain. Update: I found an easyier way. Easy way Turn on auto-tls and autocreate-cluster-domain-claims: kubectl patch configmap config-network --namespace knative-serving -p '{"data":{"auto-tls":"Enabled","autocreate-cluster-domain-claims":"true"}}' Then kn domain create gitgratitude.com --ref=homepage That's it. Hardway Left here for the record. Add ingress-nginx helm upgrade --install ingress-nginx ingress-nginx \ --repo https://kubernetes.

Read more

NextJS with Prisma on Kubernetes

deploy as a knative service

Now that we have our cluster up and running, lets look at how to build and deploy a NextJS app on it, including the database. Create a NextJS app We'll scaffold out a TypeScript app. npx create-next-app@latest --typescript myapp cd myapp npm run dev Fireup a local data docker run -e POSTGRES_PASSWORD=awesome_password -p 5432:5432 postgres Install prisma We'll add the npm packages to our project.

Read more

Setting up knative

functions functions functions

Let's walk through how to setup a k8 cluster on digitalocean with knative. Digital Ocean Start the cluster Installing and configure the doctl tool. Then setup a cluster: doctl kubernetes cluster create gratitude \ --auto-upgrade \ "--node-pool=name=default;min-nodes=1;max-nodes=10;size=s-4vcpu-8gb;auto-scale=true" Once that's in place, make sure that you have a domain, in my case gitgratitude.com: doctl compute domain create gitgratitude.com Installing knative We are going to use the knative operator to setup the install.

Read more