Skip to main content

Elastic Security Integration

Updated over 2 months ago

This guide explains how to ingest Axur Platform feeds into Elastic Security using Push delivery (Webhook 2.0) without external infrastructure. We will use only native Elastic Stack features: indices, ingest pipelines, and Watcher to split data into item-level documents.

Important: This is just one possible integration approach. Index design and data transformations can be customized to match your environment and preferences. The approach below was designed to ingest data directly from the Axur Feed and split it into specific indices per entity (for example, tickets and credentials).

Note: This tutorial assumes you already have access to the Axur Platform and the required API Key.


Prerequisites

  • Elastic Stack (Elasticsearch + Kibana) version 8.x

  • Admin access to Kibana

  • Network access from Axur infrastructure to your Elastic cluster

  • Elastic API Key with write permissions to target indices


Cost and billing considerations

  • Important: This integration uses Elastic’s built-in REST API and Watcher. There is no Axur-side billing for this.

  • Main cost drivers:

    • Elastic licensing/features used (Watcher requires appropriate license)

    • Storage for ingested events and derived indices

    • Network egress/ingress depending on your hosting

  • Tips to control cost:

    • Start with tighter feed filters to limit volume initially

    • Apply ILM policies to the indices created here

    • Monitor ingestion and adjust feed filters as needed

Tip: A glossary of Elastic terms used in this tutorial is available at the end of the document.


1) Create the Axur API Key

Create an API Key. Keep the value at hand for later steps.

Generate an API Key

  1. In the Axur Platform, go to API Keys configuration.

  2. Create a new API key and copy the API Key value securely.

Note: The feed only returns data that the API key user has permission to access.

Screenshots:

From this point onward (Steps 2–6), all configuration is performed in the Elastic/Kibana console.


2) In Elastic: Create the input index

Create an index to receive Axur data directly via HTTP POST. Use flattened to handle dynamic fields.

PUT axur-feed-input
{
"mappings": {
"dynamic": true,
"dynamic_templates": [
{
"collectionData_flattened": {
"path_match": "collectionData.*",
"mapping": { "type": "flattened" }
}
}
]
}
}

Screenshots:


3) In Elastic: Create the ingest pipeline

Add an ingestion timestamp to each document received in the input index.

PUT _ingest/pipeline/axur-feed-input-pipeline
{
"processors": [
{ "set": { "field": "ingested_at", "value": "{{_ingest.timestamp}}" } }
]
}

Important: The Axur Feed Push URL must include ?pipeline=axur-feed-input-pipeline to attach this pipeline.

Screenshots:


4) In Elastic: Create the output indices

These indices will store item-level documents, already split by type.

Tickets index:

PUT axur-feed-ticket_splitted
{
"mappings": {
"dynamic": true,
"properties": {
"item": { "type": "flattened" },
"feedData": { "type": "flattened" }
}
}
}

Credentials (detections) index:

PUT axur-feed-credential_splitted
{
"mappings": {
"dynamic": true,
"properties": {
"item": { "type": "flattened" },
"feedData": { "type": "flattened" }
}
}
}

Screenshots:


5) In Elastic: Create the checkpoint index

Store the last processed timestamp to avoid reprocessing.

PUT axur-feed-checkpoint
{
"mappings": {
"properties": {
"last_ts": { "type": "date" }
}
}
}

Screenshots:


6) In Elastic: Create the Watcher to split items

This Watcher processes only new input documents and indexes split items to the output indices, updating the checkpoint.

PUT _watcher/watch/axur_split_batch
{
"trigger": { "schedule": { "interval": "1m" } },
"input": {
"chain": {
"inputs": [
{
"cp": {
"search": {
"request": {
"indices": ["axur-feed-checkpoint"],
"body": {
"size": 1,
"query": { "ids": { "values": ["checkpoint"] } }
}
}
}
}
},
{
"src": {
"search": {
"request": {
"indices": ["axur-feed-input"],
"body": {
"size": 200,
"query": {
"range": {
"ingested_at": {
"gt": "{{#ctx.payload.cp.hits.hits}}{{ctx.payload.cp.hits.hits.0._source.last_ts}}{{/ctx.payload.cp.hits.hits}}{{^ctx.payload.cp.hits.hits}}1970-01-01T00:00:00Z{{/ctx.payload.cp.hits.hits}}"
}
}
},
"sort": [{ "ingested_at": "asc" }]
}
}
}
}
}
]
}
},
"condition": {
"script": {
"lang": "painless",
"source": "def h = ctx?.payload?.src?.hits; if (h == null) return false; def t = h.total; if (t == null) return false; if (t instanceof Map) return (t.value != null && t.value > 0); return (t > 0);"
}
},
"transform": {
"script": {
"lang": "painless",
"source": """
def tics = new ArrayList();
def creds = new ArrayList();
def maxTs = null;

for (def hit : ctx.payload.src.hits.hits) {
def src = hit._source;
if (src == null) continue;

def ts = src.containsKey('ingested_at') ? src.ingested_at : null;
if (ts != null) {
if (maxTs == null) { maxTs = ts; }
else if (java.time.ZonedDateTime.parse(ts).isAfter(java.time.ZonedDateTime.parse(maxTs))) { maxTs = ts; }
}

def feed = src.containsKey('feedData') ? src.feedData : null;
if (!src.containsKey('collectionData')) continue;
def cd = src.collectionData;

if (cd != null && cd.containsKey('tickets') && cd.tickets != null) {
for (def it : cd.tickets) {
if (it == null) continue;
tics.add(['feedData': feed, 'item': it]);
}
}
if (cd != null && cd.containsKey('detections') && cd.detections != null) {
for (def it2 : cd.detections) {
if (it2 == null) continue;
creds.add(['feedData': feed, 'item': it2]);
}
}
}

return ['tics': tics, 'creds': creds, 'next_checkpoint': maxTs];
"""
}
},
"actions": {
"index_tics": {
"foreach": "ctx.payload.tics",
"max_iterations": 500,
"transform": { "script": { "lang": "painless", "source": "return ctx.payload._value;" } },
"index": { "index": "axur-feed-ticket_splitted", "execution_time_field": "indexed_at", "refresh": "wait_for" }
},
"index_creds": {
"foreach": "ctx.payload.creds",
"max_iterations": 500,
"transform": { "script": { "lang": "painless", "source": "return ctx.payload._value;" } },
"index": { "index": "axur-feed-credential_splitted", "execution_time_field": "indexed_at", "refresh": "wait_for" }
},
"update_checkpoint": {
"condition": { "script": { "lang": "painless", "source": "return ctx.payload.next_checkpoint != null;" } },
"transform": { "script": { "lang": "painless", "source": "return ['last_ts': ctx.payload.next_checkpoint];" } },
"index": { "index": "axur-feed-checkpoint", "doc_id": "checkpoint", "execution_time_field": "updated_at", "refresh": "wait_for" }
}
}
}

Screenshots:


7) Configure the Axur PUSH Feed endpoint

Point the Axur Feed to your Elastic input index and attach the pipeline.

  • Endpoint URL: https://YOUR_ELASTIC_CLUSTER/axur-feed-input/_doc?pipeline=axur-feed-input-pipeline

  • Headers to add:

    • Authorization: ApiKey <YOUR_ENCODED_ELASTIC_API_KEY> (copy the Encoded format from Elastic API Keys UI)

  • Secret Key: configure a secure secret in the feed; Axur signs requests with HMAC. Optionally verify on the Elastic side.

Screenshots:


Validation

After configuration, send a test document to the input index (Kibana → Dev Tools):

POST axur-feed-input/_doc?pipeline=axur-feed-input-pipeline
{
"feedData": { "feedType": "ticket", "customerKey": "TEST" },
"collectionData": {
"tickets": [ { "ticket": { "id": "TCK123", "type": "leak" } } ],
"detections": [ { "credential": { "user": "alice" } } ]
}
}

Wait ~1 minute and verify:

GET axur-feed-ticket_splitted/_search

GET axur-feed-credential_splitted/_search

GET axur-feed-checkpoint/_doc/checkpoint

You should see item-level documents in each output index and an updated last_ts in the checkpoint. Subsequent runs should not reprocess the same input documents.

Screenshots:


Troubleshooting

  • No documents in output indices: ensure Watcher is active and not failing; check Watcher > History.

  • 401/403 indexing errors: validate Elastic API Key permissions (write, create_index).

  • Duplicates: confirm the checkpoint doc updates; validate cluster time sync.

  • Nothing ingested: verify the Push URL reachability from Axur and the pipeline parameter in the path.


You’re done: where to find data and an alert example

  • Tickets are indexed in: axur-feed-ticket_splitted

  • Credentials (detections) are indexed in: axur-feed-credential_splitted

Example queries (Kibana → Discover):

GET axur-feed-ticket_splitted/_search
{
"query": { "term": { "item.detection.status": "open" } },
"size": 20,
"sort": [{ "indexed_at": { "order": "desc" } }]
}
GET axur-feed-credential_splitted/_search
{
"query": { "exists": { "field": "item.credential.user" } },
"size": 20,
"sort": [{ "indexed_at": { "order": "desc" } }]
}

Create a detection rule (Elastic Security → Rules → Create rule → Custom query):

  • Index pattern: axur-feed-credential_splitted

  • Custom query (KQL): item.credential.user:* and item.access.domain:"example.com"

  • Schedule: Every 5 minutes

  • Severity: Medium

  • Actions: add your notifications (email, Slack, etc.)


How to generate the Elastic API Key

Create an API Key in Elastic to authenticate the Axur Push requests.

In Kibana:

  1. Go to Stack ManagementSecurityAPI KeysCreate API key.

  2. Name: axur-feed-ingest.

  3. Privileges:

    1. Index privileges: add axur-feed-* (or the specific indices you created) with write and create_index.

  4. Create and copy the key in the Encoded format. Keep it secure.

Screenshots:


Glossary

  • Elasticsearch: Distributed search and analytics engine of the Elastic Stack.

  • Kibana: Web UI for managing and visualizing Elasticsearch data.

  • Index: A logical namespace for documents in Elasticsearch.

  • Ingest Pipeline: Processors executed before indexing a document.

  • Watcher: Alerting/automation feature used here to implement periodic splitting.

  • ILM (Index Lifecycle Management): Policy to manage data retention and performance.

  • API Key: Authentication mechanism for Elasticsearch.

  • Push (Webhook 2.0): Axur delivery mode that sends events directly to your endpoint.


If you have any questions, feel free to reach out at [email protected] 😊

Did this answer your question?