Here's a walk through of how to setup knative eventing. I couldn't find this information in one place and it took longer than I expected to sort it out.

There are a couple of ways you can use eventing. One is to sent messages directly to specific services. This I outline walk through as a direct event.

Another is to use a broker and a trigger. A broker is basically the place you send stuff to, and then you define specific listeners by creating a trigger.

A third way it to use a channel and then create a subscription to that channel.

What's the differene?

Brokers/Trigger and Channels/Subscriptions are very very similar. The code that you use to send an event, in knative parlance, an event source is exactly the same, and the code you use to receive an event, or an event sink is exactly the same, the difference is in the deployment configuration and what the underlying service is that provides the event routing.

Note that the sources that are described in the knative documentation are the default ones, but writing your own is as simple as creating a binding (which gives you a K_SINK environment variable which will target your event), so it doesn't need to be all that complicated.

Also a bit confusing, is that brokers are themselves sinks, as well as sources.

With brokers and triggers, you can setup event type filtering on the trigger itself so that your service will only see messages of a specific type.

For channels I think you need to to the filtering on the receiving side.

Install knative-eventing

Since we've setup the knative operator previously, lets tell it to turn on eventing.

eventing.yaml

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
  apiVersion: v1
  kind: Namespace
  metadata:
    name: knative-eventing
  ---
  apiVersion: operator.knative.dev/v1alpha1
  kind: KnativeEventing
  metadata:
    name: knative-eventing
    namespace: knative-eventing

And then apply:

1
  kubectl apply -f eventing.yaml
namespace/knative-eventing configured
knativeeventing.operator.knative.dev/knative-eventing unchanged

Check the status of the deployment

1
  kubectl get deployment -n knative-eventing

Deploy the test services

Create the echo service

(Code is below)

1
  kn service create ruby-echo --image wschenk/ruby-echo -a networking.knative.dev/disableAutoTLS=true

Create the send service

(Code is below)

1
  kn service create ruby-send --image wschenk/ruby-send -a networking.knative.dev/disableAutoTLS=true

Receiving events

Direct event test

Create the source

This created an event source that sends an event every minute. We tell it to send the message directly to ruby-echo, the service that we just created.

1
2
3
  kn source ping create heartbeat-source \
    --schedule "*/1 * * * *" \
    --sink ksvc:ruby-echo
Ping source 'heartbeat-source' created in namespace 'default'.

View results

This is deployed on my domain, yours should be different.

1
curl http://ruby-echo.default.gitgratitude.com
Hello from the ruby eco service
{"specversion"=>"1.0", "id"=>"0dd72dfa-eac9-40a1-b48b-00df271cd62e", "source"=>"/apis/v1/namespaces/default/pingsources/heartbeat-source", "type"=>"dev.knative.sources.ping", "data_encoded"=>"", "data"=>"", "time"=>"2021-12-14T18:50:00.060257784Z"}

Cleanup

1
  kn source ping delete heartbeat-source
Ping source 'heartbeat-source' deleted in namespace 'default'.

Broker and Trigger test

Create the Broker

1
kn broker create default
Broker 'default' successfully created in namespace 'default'.

Create the source

Here we are using the ping source again, but pointing it to broker:default instead of the service.

1
2
3
  kn source ping create heartbeat-source \
    --schedule "*/1 * * * *" \
    --sink broker:default
Ping source 'heartbeat-source' created in namespace 'default'.

Create the trigger

Now that we are pinging the broker, we need to tell the broker to send that message type to our service, ruby-echo.

1
2
3
4
  kn trigger create heartbeat-trigger \
     --broker default \
     --filter type=dev.knative.sources.ping \
     --sink ksvc:ruby-echo
Trigger 'heartbeat-trigger' successfully created in namespace 'default'.

View results

1
  curl http://ruby-echo.default.gitgratitude.com
Hello from the ruby echo service
{"specversion"=>"1.0", "id"=>"1aef1313-e8ea-4b14-9d91-691f6ecbcc03", "source"=>"/apis/v1/namespaces/default/pingsources/heartbeat-source", "type"=>"dev.knative.sources.ping", "data_encoded"=>"", "data"=>"", "time"=>"2021-12-14T18:57:00.319157126Z", "knativearrivaltime"=>"2021-12-14T18:57:00.319736536Z"}

Cleanup

1
2
3
  kn broker delete default
  kn source ping delete heartbeat-source
  kn trigger delete heartbeat-trigger
Broker 'default' successfully deleted in namespace 'default'.
Ping source 'heartbeat-source' deleted in namespace 'default'.
Trigger 'heartbeat-trigger' deleted in namespace 'default'.

Channel test

Channel

Simple.

1
  kn channel create heartbeat
Channel 'heartbeat' created in namespace 'default'.

Ping the heartbeat channel

This is sending to channel:heartbeat

1
2
3
  kn source ping create heartbeat-source \
    --schedule "*/1 * * * *" \
    --sink channel:heartbeat
Ping source 'heartbeat-source' created in namespace 'default'.

Subscribe to the event

Here we create the subscription, which goes to our service.

1
2
3
  kn subscription create heartbeat-sub \
     --channel heartbeat \
     --sink ruby-echo
Subscription 'heartbeat-sub' created in namespace 'default'.

See the results

1
  curl http://ruby-echo.default.gitgratitude.com
Hello from the ruby echo service
{"specversion"=>"1.0", "id"=>"88171ffc-3008-4df4-98cd-c157ef5a1aea", "source"=>"/apis/v1/namespaces/default/pingsources/heartbeat-source", "type"=>"dev.knative.sources.ping", "data_encoded"=>"", "data"=>"", "time"=>"2021-12-14T18:59:00.082372715Z"}

Cleanup

1
2
3
  kn channel delete heartbeat
  kn source ping delete heartbeat-source
  kn subscription delete heartbeat-sub
Channel 'heartbeat' deleted in namespace 'default'.
Ping source 'heartbeat-source' deleted in namespace 'default'.
Subscription 'heartbeat-sub' deleted in namespace 'default'.

Sending events

Now we can look at creating and sending events. The receiving side is the same, but we need to create a service that will send a message. In each scenario this will be the same code, but depending upon how we deploy it, it will have a different K_SINK value in the environment.

Direct binding

Create direct binding

1
2
3
  kn source binding create direct-binding \
     --subject Service:serving.knative.dev/v1:ruby-send \
     --sink ruby-echo
Sink binding 'direct-binding' created in namespace 'default'.

Test

  curl http://ruby-echo.default.gitgratitude.com
Hello from the ruby echo service
1
  curl http://ruby-send.default.gitgratitude.com?message=Hello
sent
1
  curl http://ruby-echo.default.gitgratitude.com
Hello from the ruby echo service
{"specversion"=>"1.0", "id"=>"1234-1234-1234", "source"=>"/mycontext", "type"=>"com.example.someevent", "data_encoded"=>"{\"message\":\"Hello\"}", "data"=>{"message"=>"Hello"}, "datacontenttype"=>"application/json"}

Clean up

1
  kn source binding delete direct-binding

Through a broker

Create a broker

1
  kn broker create default

Create the trigger

1
2
3
4
  kn trigger create send-trigger \
     --broker default \
     --filter type=com.example.someevent \
     --sink ksvc:ruby-echo
Trigger 'send-trigger' successfully created in namespace 'default'.

Create the binding

1
2
3
  kn source binding create broker-binding \
     --subject Service:serving.knative.dev/v1:ruby-send \
     --sink broker:default
Sink binding 'broker-binding' created in namespace 'default'.

Test

  curl http://ruby-echo.default.gitgratitude.com
Hello from the ruby echo service
1
  curl http://ruby-send.default.gitgratitude.com?message=Hello
sent
1
  curl http://ruby-echo.default.gitgratitude.com
Hello from the ruby echo service
{"specversion"=>"1.0", "id"=>"1234-1234-1234", "source"=>"/mycontext", "type"=>"com.example.someevent", "data_encoded"=>"{\"message\":\"Hello\"}", "data"=>{"message"=>"Hello"}, "datacontenttype"=>"application/json", "knativearrivaltime"=>"2021-12-14T23:44:09.238452314Z"}

Cleanup

1
2
  kn trigger delete send-trigger
  kn source binding delete broker-binding
Trigger 'send-trigger' deleted in namespace 'default'.
Sink binding 'broker-binding' deleted in namespace 'default'.

On a channel

Create channel

Simple.

1
  kn channel create chatter

Create source binding to the channel

1
2
3
  kn source binding create chat-binding \
     --subject Service:serving.knative.dev/v1:ruby-send \
     --sink channel:chatter

Subscribe to the event

Here we create the subscription, which goes to our service.

1
2
3
  kn subscription create message-sub \
     --channel chatter \
     --sink ruby-echo
Subscription 'message-sub' created in namespace 'default'.

Test

  curl http://ruby-echo.default.gitgratitude.com
Hello from the ruby echo service
1
  curl http://ruby-send.default.gitgratitude.com?message=Hello
sent
1
  curl http://ruby-echo.default.gitgratitude.com
Hello from the ruby echo service
{"specversion"=>"1.0", "id"=>"1234-1234-1234", "source"=>"/mycontext", "type"=>"com.example.someevent", "data_encoded"=>"{\"message\":\"Hello\"}", "data"=>{"message"=>"Hello"}, "datacontenttype"=>"application/json"}

Cleanup

1
2
3
  kn channel delete chatter
  kn source binding delete chat-binding
  kn subscription delete message-sub

Code

ruby-echo

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
  require 'sinatra'
  require "cloud_events"

  class EventHolder
    def self.add_event( e )
      @events ||= []
      @events << e.to_h
    end

    def self.eventstring
      @events ||= []
      @events.join( "\n" )
    end
  end

  get '/' do
    "Hello from the ruby echo service\n#{EventHolder.eventstring}\n"
  end

  cloud_events_http = CloudEvents::HttpBinding.default

  post "/" do
    event = cloud_events_http.decode_event request.env
    logger.info "Received CloudEvent: #{event.to_h}"
    EventHolder.add_event( event )
  end

ruby-send

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
  require 'sinatra'
  require "cloud_events"
  require "net/http"
  require "uri"

  def send_message( data )
    event = CloudEvents::Event.create spec_version:      "1.0",
                                      id:                "1234-1234-1234",
                                      source:            "/mycontext",
                                      type:              "com.example.someevent",
                                      data_content_type: "application/json",
                                      data:              data

    cloud_events_http = CloudEvents::HttpBinding.default
    headers, body = cloud_events_http.encode_event event
    Net::HTTP.post URI(ENV['K_SINK']), body, headers
  end

  get '/' do
    if params[:message]
      if ENV['K_SINK']
        send_message( {message: params[:message] })
        return "sent"
      else
        return "K_SINK not defined"
      end
    else
      return 'Try passing in a message'
    end
  end


  get '/keys' do
    ENV.keys.collect { |key| "#{key}=#{ENV[key]}" }.join( "\n" )
  end

Previously

NextJS with Prisma on Kubernetes deploy as a knative service

2021-12-08

Next

Receiving CloudEvents with NextJS One file to rule them all

2021-12-15

howto

Previously

NextJS with Prisma on Kubernetes deploy as a knative service

2021-12-08

Next

Receiving CloudEvents with NextJS One file to rule them all

2021-12-15