Page tree
Skip to end of metadata
Go to start of metadata

Objective

Publish audit logs for changes to CDAP entities so that other apps/tools like Cask Tracker, MDM, etc can use this as a source for audit information.

For 3.4 release, we'll limit the scope to publishing changes for Datasets and Streams.

Use Cases

Use cases and user stories are documented at Cask Tracker.

Design Choices

We chose Kafka to be the system where audit information gets published from CDAP. Other tools can subscribe to the Kafka feed to get audit information. Using Kakfa could make integrating external tools with CDAP easier.

However, publishing to Kafka has certain limitations today that will need to be addressed later -

  • Kafka publish does not happen in a transaction, so there is a chance that the audit log feed from Kafka may be inconsistent compared to what actually happened. CDAP-5109 has more discussion on it.
  • There is no access control on who can publish audit information to Kafka (CDAP-5130).
  • Messages in Kafka are transient. They will be deleted after a few days in most setups. The subscribers will have to consume the messages before they are deleted.

Audit Message Format

Audit feed will be a stream of audit messages as defined below.

Types of Audit Message

The following types of audit messages are published for an entity -

  • CREATE
  • UPDATE
  • TRUNCATE
  • DELETE
  • ACCESS (sub types: READ, WRITE, UNKNOWN)
  • METADATA_CHANGE

 

[
  /** Dataset access operation **/
  {
    "time": 1456956659468,
    "entityId": {
      "namespace": "ns1",
      "stream": "stream1",
      "entity": "STREAM"
    },
    "user": "user1",
    "type": "ACCESS",
    "payload": {
      "accessType": "WRITE",
      "accessor": {
        "namespace": "ns1",
        "application": "app1",
        "type": "Flow",
        "program": "flow1",
        "run": "run1",
        "entity": "PROGRAM_RUN"
      }
    }
  },
  
  /** Explore stream access **/
  {
    "time": 1456956659469,
    "entityId": {
      "namespace": "ns1",
      "stream": "stream1",
      "entity": "STREAM"
    },
    "user": "user1",
    "type": "ACCESS",
    "payload": {
      "accessType": "UNKNOWN",
      "accessor": {
        "service": "explore",
        "entity": "SYSTEM_SERVICE"
      }
    }
  },
  
  /** Metadata change **/
  {
    "time": 1456956659470,
    "entityId": {
      "namespace": "ns1",
      "application": "app1",
      "entity": "APPLICATION"
    },
    "user": "user1",
    "type": "METADATA_CHANGE",
    "payload": {
      "previous": {
        "USER": {
          "properties": {
            "uk": "uv",
            "uk1": "uv2"
          },
          "tags": [
            "ut1",
            "ut2"
          ]
        },
        "SYSTEM": {
          "properties": {
            "sk": "sv"
          },
          "tags": []
        }
      },
      "additions": {
        "SYSTEM": {
          "properties": {
            "sk": "sv"
          },
          "tags": [
            "t1",
            "t2"
          ]
        }
      },
      "deletions": {
        "USER": {
          "properties": {
            "uk": "uv"
          },
          "tags": [
            "ut1"
          ]
        }
      }
    }
  },
  
   /** Dataset admin operation **/
  {
    "time": 1456956659471,
    "entityId": {
      "namespace": "ns1",
      "dataset": "ds1",
      "entity": "DATASET"
    },
    "user": "user1",
    "type": "CREATE",
    "payload": {}
  }
]

Implementation

The Audit log information will be published to CDAP Kafka server when `audit.publish.enabled` config parameter is set to true.

  • Dataset admin operations can be published by DatasetOpExecutor service.
  • Stream admin operations can be published by StreamAdmin class
  • Dataset and stream access information can be published by piggy backing on lineage capturing code.
  • Metadata changes can be published by DefaultMetadataStore class.

Note: Publishing of metadata updates to Kafka introduced by CDAP-3518 for Navigator integration will be deprecated in 3.4 and removed in 3.5. We will need to move Navigator app to use audit log instead of metadata change updates.

 

  • No labels

20 Comments

  1.  

    • Dataset and stream access information can be published by LineageWirterDatasetFramework (can be renamed to AuditingDatasetFramework).

    Just Dataset access information right?

    1. Right, will fix it.

  2. Will this cover reads/writes using Hive

    1. In this case we need to adapt the json structure, so that it covers program and hive query. 

       

      "access": {
            "accessType""READ",
            "programRun": {
              "namespace""ns1",
              "application""app1",
              "type""Flow",
              "program""flow1",
              "run""xxx-yyy-zzz",
              "entity""PROGRAM_RUN"
            }
      1. What would we like to capture for a hive query?

        Hive queries can be run in two ways -

        • Directly by users - in this case is it sufficient to capture the userid of the user who ran the query?
        • Programmatically like workflow actions - in this case we could log the runId of the app that run the query.

        Anything else you can think of?

        1. We don't have workflow actions yet. So, we should capture the userid of person running the query to start with.

          1. Done, updated the JSON.

  3. Deleting a namespace should send delete messages for all the datasets/streams in the namespace. Wanted to highlight that so that we cover this use-case.

    1. Good point. Namespace deletion deletes dataset instances one by one. So we should receive notifications for deletes in this case.

  4. Also, perhaps we should also capture access info from the mapreduce and spark runtimes, so we can tell if a dataset access is READ or WRITE or BOTH?

    1. Yes, we can capture extra information in these cases.

  5. +1 this look good for v1 of this implementation.

  6. /** Explore dataset access **/

    This is not a dataset access. This just logs that the user has used the explore service,

  7. /** Dataset access operation **/
      {
        "time": 1456956659468,
        "entityId": {
          "namespace": "ns1",
          "stream": "stream1",
          "entity": "STREAM"
      

    Looks like this is a stream access, not a dataset access.

  8.       "accessor": {
            "namespace": "ns1",
            "application": "app1",
            "type": "Flow",

    If is it a mapreduce inside a workflow, will this have both the workflow and the mapreduce name? And both their run ids?

    What if it is a custom action in a workflow?

    1. I think mapreduce inside a workflow will only have the mapreduce run id. We will have to derive the workflow that ran the mapreduce job by looking into the app meta table.

      Custom actions do not have run id yet. Here is the JIRA for it - CDAP-2751 - Getting issue details... STATUS

  9.    "payload": {}

    should this contain the type and dataset properties?

    1. Good point. Not sure if this will be part of first cut though. Filed JIRA - CDAP-5220 - Getting issue details... STATUS