Page tree
Skip to end of metadata
Go to start of metadata

Introduction

In some instances, the results of a pipeline may need to be posted to an external webservice. For example, you could have a processing pipeline that would send messages to Slack via the rest endpoint or you may want to send notifications to a 3rd party website. This sink would send the messages from the pipeline to an external http endpoint. 

Use case(s)

  • I would like to post a notification to Slack every time a user from my website sees a 500 error. I would like to set up a realtime spark streaming pipeline to read my weblog data, filter for messages that have a 500 error, and post a custom message to slack with details from the message such as the url.
  • I am leveraging a 3rd party reporting tool for updating metrics in a dashboard. I would like to create a realtime pipeline to generate those metrics and post them to the 3rd party reporting service. I would like to use a realtime spark streaming pipeline, configure windows for aggregations, then send those aggregated stats to the 3rd party using this HTTP Sink.

User Storie(s)

  • As a pipeline developer, i would like to post data to an external webservice by providing the request method (GET, POST, PUT, DELETE), url, payload (If POST or PUT), request headers, timeouts. 
  • As a pipeline developer, i would like to be able to define a custom POST payload leveraging fields from the message. 
  • As a pipeline developer, I would like to batch my updates if required, so that it would post to the external service only when n number of messages has been sent.
  • As a pipeline developer, I would like the plugin to retry an configurable amount of time before failing the pipeline
  • As a pipeline developer, I would like to be able to send basic auth credentials by providing a username and password in the config
  • As a pipeline developer, I would like to be able to send to http and https endpoints.

Plugin Type

  • BatchSink

Configurables

This section defines properties that are configurable for this plugin. 

User Facing NameTypeDescriptionConstraintsMacro Enabled?
URLString
Required. The URL to post data to.
 yes
Request MethodSelect
The HTTP request method.
GET, POST, PUT, DELETE 
Batch SizeStringThe number of messages to batch before sending> 0, default 1 (no batching)yes
FormatSelectThe format to send the message in. JSON will format the entire input record to json and send it as a payload. Form will convert the input message to a query string and send it in the payload. Custom will leverage the request body field to send.JSON, Form, Custom 
Request BodyString
Optional request body. Only required if Custom format is specified.
 yes
Content TypeStringUsed to specify the Content-Type header. yes
Request HeadersKeyValue
An optional string of header values to send in each request where the keys and values are
delimited by a colon (":") and each pair is delimited by a newline ("\n").
 yes
Should Follow Redirects?Select
Whether to automatically follow redirects. Defaults to true.
true,false 
Number of RetriesSelect
The number of times the request should be retried if the request fails. Defaults to 3.
0,1,2,3,4,5,6,7,8,9,10 
Connect TimeoutString
The time in milliseconds to wait for a connection. Set to 0 for infinite. Defaults to 60000 (1 minute).
  
Read TimeoutString
The time in milliseconds to wait for a read. Set to 0 for infinite. Defaults to 60000 (1 minute).
  

Design / Implementation Tips

  • Please use HTTPPoller and HTTPCallback in Hydrator plugins as a reference.
  • If a user selects json, the content-type header should be set to application/json. Form should be set to application/x-www-form-urlencoded. 
  • When formatting the message as a query string, don't forget to urlencode the values
  • We will need to define some sort of macro language so that the user can leverage message fields in their post payload. For example, i might define my payload as \{ "messageType" : "update", "name" : "%{firstName}" \} where %{firstName} will be substituted for the value that is in firstName in the incoming message.
  • For Batching, each message will be sent separated by a newline (\n) character

Design

{
    "name": "HTTP Sink",
      "plugin": {
        "name": "HTTP",
        "type": "batchsink",
        "label": "HTTP Sink",
        "artifact": {
          "name": "http-sink-plugin",
          "version": "1.6.0",
          "scope": "SYSTEM"
      },
      "properties": {
		  "referenceName": "HTTP Sink Plugin",
          "url": "http://example.com/data",
          "method": "POST",
		  "batchSize": "1",
          "messageFormat": "JSON",
          "body": "{"text" : "Hello Slack"}",
		  "delimiterForMessages": "\n",
		  "chatset": "UTF-8",
          "followRedirects": "true",
          "disableSSLValidation": "true",
          "numRetries": 3,
          "connectTimeout": 60000,
          "readTimeout": 60000,
		  "failOnNon200Response": "true"
      }
}

 

Approach(s)

1.JSON would be default message format.

2.If batchsize > 1, delimiterForMessages would be used to create batch message.

3.If header doesnot contain "Content-Type",then the value would be set as per the message format configured.

4.In case user wants to have Custom message and want to use some of the input record fields as variables to build the message,then user needs to put the variable in the message having # as prefix  so that the same would be replaced by the value from the input record.

5.Multiple messages built for batch would be sent as a single payload to the http end point for every batch http execution.

Properties

url: The URL to post data to.
method: The HTTP request method. Defaults to POST.
bathSize: Batch size. Defaults to 1.
delimiterForMessages: Delimiter for messages in case of batching > 1. Defaults to "\n".
messageFormat: Format to send messsage in. Defaults to JSON.
body: Custom message.
requestHeaders: An optional string of header values to send in each request where the keys and values are
delimited by a colon (":") and each pair is delimited by a newline ("\n").
charset: Charset. Defaults to UTF-8.
followRedirects: Whether to automatically follow redirects. Defaults to true.
disableSSLValidation: If user enables SSL validation, they will be expected to add the certificate to the trustStore on each machine. Defaults to true.
numRetries: The number of times the request should be retried if the request fails. Defaults to 3.
connectTimeout: The time in milliseconds to wait for a connection. Set to 0 for infinite. Defaults to 60000 (1 minute).
readTimeout: The time in milliseconds to wait for a read. Set to 0 for infinite. Defaults to 60000 (1 minute).

failOnNon200Response: Whether to fail the pipeline on non-200 response from the http end point. Defaults to true.

 

NFR

1.If user enables SSL validation, they will be expected to add the certificate to the truststore of each machine.

Limitation(s)

Future Work

  • Some future work – HYDRATOR-99999
  • Another future work – HYDRATOR-99999

Test Case(s)

1.Send slack message with Custom format
2.Send slack message with JSON format
3.Batch(1*10000) put request
4.Batch(2*5000) put request
5.Post custom message
6.Put form batch message
7.Non-200 response

Sample Pipeline

Slack_Custom_Multi_batch1.json

Slack_JSON.json

PUT_10000Batch_Custom.json

PUT_2_5000_batch_Custom.json

POST_Custom.json

PUT_Form_batch.json

Non-200-reponse.json

Table of Contents

Checklist

  • User stories documented 
  • User stories reviewed 
  • Design documented 
  • Design reviewed 
  • Feature merged 
  • Examples and guides 
  • Integration tests 
  • Documentation for feature 
  • Short video demonstrating the feature
  • No labels

4 Comments

  1. Russ Savage

    Please correct/confirm -

    1.Should we default request method to POST ?
    2.Do we need default value for message format as JSON or Custom or Form ?
    3.Can we have content type values set on the basis of selected message format instead of having separate configuration for content type ?

    Message formats,
    4.For JSON,input structuredrecord will be converted to Json and would be sent as payload to http end point.
    5.For Form,input message will be converted to query string and would be sent as payload.Can we assume that one of the field from the input structuredrecord should be considered
    as message ?If yes,we can have a configuration for the user to configure the field name which is to be used as message.
    6.For Custom,can multiple fields from input structuredrecord be needed to form a message ?If yes,we may need to have csv widget for user to configure multiple fields to be used to create a custom message?

    For batching,
    7.We can maintain the message from each input record as per the batch size(n) .For the nth execution of transform(),we can execute the http call with all the messages separated by \n as a payload to the request.

  2. Please correct/confirm -

    1.Should we default request method to POST ?
         - Yes, default to POST 

    2.Do we need default value for message format as JSON or Custom or Form ?
        - Default to JSON
    3.Can we have content type values set on the basis of selected message format instead of having separate configuration for content type ?
        - Content type can be automatically set based on the message format. We don't need a seperate option, but if someone enters a Content-Type in the headers section, it should overwrite the default chosen by the message format. 

    Message formats,
    4.For JSON,input structuredrecord will be converted to Json and would be sent as payload to http end point.
         - this is correct
    5.For Form,input message will be converted to query string and would be sent as payload.Can we assume that one of the field from the input structuredrecord should be considered as message ? If yes,we can have a configuration for the user to configure the field name which is to be used as message.
         - all the fields from the input structured record should be sent, in the format field1=value1&field2=value2&field3=value3. the values should be url encoded. if the user wants to only send a few fields, they can specify a custom message or filter the fields before the sink.

    6.For Custom,can multiple fields from input structuredrecord be needed to form a message ?If yes,we may need to have csv widget for user to configure multiple fields to be used to create a custom message?
        - the input widget should be a textbox where the user can enter free form text. multiple fields from the input message can be leveraged in this field using some sort of macro language such as %{fieldname} or something.

    For batching,
    7.We can maintain the message from each input record as per the batch size(n) .For the nth execution of transform(),we can execute the http call with all the messages separated by \n as a payload to the request.
       - this is correct. it might be a good idea to allow the user to enter the delimiter for each message in the batch, and maybe default to \n 

  3. Russ Savage Do we need to have response headers in output token?

  4. Shashank i think it would be useful to store the response, headers, and response status code in output token in case the user wants to leverage it in another action downstream.