Skip to main content
  1. Blog/

Building the Ultimate Cyber Intelligence AI Using OpenSearch and Airflow

A large database packed with up-to-date cyber threat intelligence, including reports, news, and more that can be queried with an LLM? Sounds expensive, but it can be built using open-source components at no cost.

The goal of this mini-project is to set up a data pipeline that automatically pulls cybersecurity RSS feeds, stores the articles in an OpenSearch database, and generates vector embeddings so the content can be queried with an LLM.

architecture of this project
Architecture of this project.

This post shows the ingestion of RSS feeds, but theres more data that can be ingested into the database:

Threat AdvisoriesVulnerability DataSecurity Knowledge BasesOther
CISANIST NVDMITRE ATT&CK & D3FENDGoogle Project Zero
CERT-EUcve.orgAPEC (attack patterns) and CWE (weaknesses)ENISA Threat Landscape & sector reports
UK NCSCOSV.devNIST CSF 2.0 Reference Tool
NSA Cybersecurity advisories/guidanceGitHub Advisory Database
Red Hat Security Data API (CSAF/CVE/OVAL)
Cisco PSIRT openVuln API
Microsoft Security Update Guide (CVRF/APIs)

Prerequisites

For my demo setup, I’m using a Debian 12 VM running on Proxmox. The instructions should also work on other Linux distributions or even locally on your machine with some minor adjustments.

  • Debian 12 Server
  • Python 3.11 with the following packages installed: python3.11-venv python3.11-distutils python-pip
  • Docker and Docker Compose

Install OpenSearch

I’ll begin by installing a single-node OpenSearch stack with Docker Compose. In a production environment you might want to use a multi node setup that is installed bare metal or on Kubernetes, but for my homelab a single node Docker stack should do the job just fine.

Preparing the server

Before I can spin up the Docker containers the server needs some a small adjustment.

Disable memory paging and swapping and increase the max map count:

sudo swapoff -a
echo "vm.max_map_count=262144" | sudo tee /etc/sysctl.d/99-max_map_count.conf
sudo sysctl --system
sysctl vm.max_map_count

swapoff -a will disable all swaps temporarily. If you have some swaps in /etc/fstab, just comment them out and reboot.

Install OpenSearch and OpenSearch Dashboards using Docker Compose

Now OpenSearch can be installed using Docker Compose.

  1. Create a new directory and download the sample Compose file:
sudo mkdir /opt/docker && cd /opt/docker && mkdir opensearch && cd opensearch
  1. Download the Compose file: wget https://raw.githubusercontent.com/opensearch-project/documentation-website/3.2/assets/examples/docker-compose.yml

  2. Create a .env file for storing credentials and variables:

sudo nano .env
OPENSEARCH_INITIAL_ADMIN_PASSWORD=<custom-admin-password>
Make sure to use a secure password, otherwise the docker compose stack won’t start.
  1. Start the Compose stack:
docker compose up -d
  1. Now the OpenSearch Dashboards UI can be opened in the Browser using the following URL: https://<docker-host>:5601/

Login to Dashboards

The default username is admin and the password is the one defined in the .env file.

empy login form with username and password field

After logging in we can see an empty landing page:

empty dashboards view
After logging in you can see the landing page of OpenSearch Dashboards.

Create a new index in OpenSearch

Now I’ll create an index dedicated to my news feeds. This keeps them organized and separate from any other future datasets.

I’ll call the index news-articles and define:

  • Fields
    • title, summary, content as text: full-text search with BM25, good for keyword queries and highlighting.
    • link as keyword: exact matching and de-duplication, no analysis needed.
    • source as keyword: clean filtering and aggregations by feed or publisher.
    • published_at as date: proper sorting and range queries over time.
  • an idempotent external id (doc_id) = stable hash of the article URL
  • a 384-dimensional k-NN vector (embedding) compatible with all-MiniLM-L6-v2
    • Matches the output size of all-MiniLM-L6-v2, which gives solid relevance at low cost. 384 floats keep memory use modest while remaining accurate enough for news Q&A.
  • Index settings for a single-node dev box
    • number_of_shards: 1: avoids overhead from unnecessary shard splitting on small data.
    • number_of_replicas: 0: no replica on a single node, saves disk and RAM.
    • refresh_interval: 30s during ingestion: better throughput by reducing segment churn. You can switch back to 1s after the initial load.
    • index.knn: true: enables the k-NN plugin so vector search works out of the box.

The index can simply be created by using the OpenSearch Dev Console in the web interface:

Open Dev Tools → Console and run:

PUT news-articles
{
  "settings": {
    "index": {
      "number_of_shards": 1,
      "number_of_replicas": 0,
      "refresh_interval": "30s",
      "knn": true
    }
  },
  "mappings": {
    "properties": {
      "doc_id":      { "type": "keyword" },
      "title":       { "type": "text" },
      "link":        { "type": "keyword" },
      "source":      { "type": "keyword" },
      "summary":     { "type": "text" },
      "content":     { "type": "text" },
      "published_at":{ "type": "date"  },
      "embedding":   { "type": "knn_vector", "dimension": 384, "method": {"name":"hnsw","space_type":"cosinesimil","engine":"lucene"} }
    }
  }
}
Alternative: Use curl instead of the web UI
curl -k -u admin:<custom-admin-password> \
  -H "Content-Type: application/json" \
  -X PUT "https://localhost:9200/news-articles" \
  -d @- <<'JSON'
{
  "settings": {
    "index": {
      "number_of_shards": 1,
      "number_of_replicas": 0,
      "refresh_interval": "5s",
      "knn": true
    }
  },
  "mappings": {
    "properties": {
      "doc_id":      { "type": "keyword" },
      "title":       { "type": "text" },
      "link":        { "type": "keyword" },
      "source":      { "type": "keyword" },
      "summary":     { "type": "text" },
      "content":     { "type": "text" },
      "published_at":{ "type": "date"  },
      "embedding":   { "type": "knn_vector", "dimension": 384, "method": {"name":"hnsw","space_type":"cosinesimil","engine":"lucene"} }
    }
  }
}
JSON

screenshot of the opensearch dev console with post request for creating a new index inside of a textbox
You can send API calls to the cluster using the UI in the Dev Tools

After that you need to create an index pattern to view the content of the index in the OpenSearch Dashboard Discover UI:

step 1 of the create index pattern view with a new index pattern called news-*
Before the newsfeed data is visible in the Discover section, an index pattern needs to be created.

step 2 of the create index pattern view with the select a primary time field dropdown that shows the column published_at
After the indexes for the index pattern are selected, the field for the time filter needs to be selected.

Install Apache Airflow

Next, I’ll add Apache Airflow, which will orchestrate and monitor the ingestion pipeline. Airflow will fetch the feeds, process the articles, and insert them into OpenSearch on a schedule.

Prepare

Create a new directory for the Docker files:

sudo mkdir /opt/docker/airflow && cd /opt/docker/airflow

Install

  1. Set Airflow UIDs Airflow runs as your local UID to avoid permission drama:
echo "AIRFLOW_UID=$(id -u)" | sudo tee .env
echo "AIRFLOW_GID=$(id -g)" | sudo tee -a .env
  1. Add dependencies to Airflows runtime containers Airflow needs your Python deps in all runtime containers! Installing them only in the webserver won’t help. The worker runs the tasks.
sudo nano /opt/docker/airflow/.env
echo 'PIP_ADDITIONAL_REQUIREMENTS=feedparser opensearch-py "sentence-transformers<=3.0.1"' | sudo tee -a /opt/docker/airflow/.env
# (Optional but helpful) persist HF model cache so it isn’t re-downloaded:
echo 'HF_HOME=/opt/airflow/.cache/huggingface' | sudo tee -a /opt/docker/airflow/.env

Install the dependencies manually:

docker compose exec --user airflow airflow-worker \
  bash -lc "python -m pip install feedparser opensearch-py 'sentence-transformers<=3.0.1'"
  1. Download the Compose file:
sudo wget https://airflow.apache.org/docs/apache-airflow/3.0.6/docker-compose.yaml
  1. Initialize Airflow and Flower UI:
docker compose --profile flower up airflow-init
  1. Start everything in background
docker compose --profile flower up -d

Now the flower web-ui can be opened through http://<docker-host>:5555 and the API can be queried through http://<docker-host>:8080

flower ui
The Flower UI dashboard and the default worker.

Default credentials: Username: airflow Password: airflow

airflow ui
The AirFlow dashboard without any DAGs being configured.

The ingestion DAG

The core of this project is the DAG (Directed Acyclic Graph) that ties everything together.

It runs every 30 minutes and does the following:

  • Reads RSS URLs from Airflow Variable RSS_FEEDS
  • Parses entries via feedparser
  • (Optional) fetches the article body with Readability (off by default to keep deps small)
  • Dedupes by hashing the link → doc_id
  • Embeds title + summary/content with all-MiniLM-L6-v2 from sentence-transformers
  • Upserts into OpenSearch (index: news-articles) with k-NN vector
  1. Connect flow to OpenSearch:

The code uses BaseHook.get_connection(“opensearch_default”). Without this, it will fail.

docker compose exec airflow-apiserver \
airflow connections add opensearch_default \
  --conn-uri 'https://admin:<custom-admin-password>@<host-or-ip>:9200'
  1. Provide the Airflow Variable with a list of RSS feeds

My DAG reads Variable.get(“RSS_FEEDS”); it needs to be set before running:

docker compose exec airflow-apiserver \
airflow variables set RSS_FEEDS \
'[
  "https://www.cisa.gov/news-rss.xml",
  "https://feeds.feedburner.com/TheHackersNews",
  "https://www.bleepingcomputer.com/feed/",
  "https://krebsonsecurity.com/feed/",
  "https://www.ncsc.gov.uk/api/1/services/v1/news-rss-feed.xml"
]'

Create DAG file

The DAG file will be stored in: /opt/docker/airflow/dags/rss_to_opensearch.py

Click here to view the code
from __future__ import annotations
import hashlib
import os
from datetime import datetime, timedelta, timezone

from airflow import DAG
from airflow.decorators import task
from airflow.models import Variable
from airflow.hooks.base import BaseHook

import feedparser
from opensearchpy import OpenSearch, RequestsHttpConnection
from sentence_transformers import SentenceTransformer

INDEX_NAME = "news-articles"
EMBED_MODEL_NAME = os.environ.get("EMBED_MODEL_NAME", "sentence-transformers/all-MiniLM-L6-v2")

default_args = {
    "owner": "airflow",
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
}

with DAG(
    dag_id="rss_to_opensearch",
    default_args=default_args,
    schedule=timedelta(minutes=30),
    start_date=datetime(2025, 1, 1, tzinfo=timezone.utc),
    catchup=False,
    tags=["rss", "opensearch", "rag"],
) as dag:

    @task
    def get_feeds() -> list[str]:
        feeds = Variable.get("RSS_FEEDS", default_var="[]")
        import json
        return json.loads(feeds)

    @task
    def fetch_entries(feed_urls: list[str]) -> list[dict]:
        out = []
        for url in feed_urls:
            parsed = feedparser.parse(url)
            for e in parsed.entries:
                out.append({
                    "title": e.get("title", ""),
                    "link": e.get("link", ""),
                    "summary": e.get("summary", "") or e.get("description", ""),
                    "published_at": e.get("published_parsed") and datetime(*e.published_parsed[:6]).isoformat(),
                    "source": parsed.feed.get("title", url),
                })
        return out

    @task
    def enrich_and_embed(entries: list[dict]) -> list[dict]:
        # single global model load per task run
        model = SentenceTransformer(EMBED_MODEL_NAME)
        payloads = []
        for e in entries:
            if not e.get("link"):
                continue
            doc_id = hashlib.sha256(e["link"].encode("utf-8")).hexdigest()
            text = " ".join([e.get("title",""), e.get("summary","")]).strip()
            emb = model.encode(text or e.get("title",""), normalize_embeddings=True).tolist()
            payloads.append({
                "doc_id": doc_id,
                "title": e.get("title",""),
                "link": e["link"],
                "summary": e.get("summary",""),
                "content": "",  # Full-text can be added later
                "source": e.get("source",""),
                "published_at": e.get("published_at"),
                "embedding": emb,
            })
        return payloads

    @task
    def upsert_to_opensearch(docs: list[dict]) -> int:
        conn = BaseHook.get_connection("opensearch_default")

        client = OpenSearch(
            hosts=[{"host": conn.host, "port": conn.port}],
            http_auth=(conn.login, conn.password),
            use_ssl=(conn.schema == "https"),
            verify_certs=False,  # dev only; use a CA in prod
            connection_class=RequestsHttpConnection,
            timeout=30,
            max_retries=2,
            retry_on_timeout=True,
        )

        from opensearchpy.helpers import bulk
        def gen_actions():
            for d in docs:
                yield {
                    "_op_type": "index",
                    "_index": INDEX_NAME,
                    "_id": d["doc_id"],
                    "_source": d,
                }

        success, _ = bulk(client, gen_actions(), request_timeout=60, chunk_size=200)
        return success

    feed_urls = get_feeds()
    entries = fetch_entries(feed_urls)
    docs = enrich_and_embed(entries)
    _ = upsert_to_opensearch(docs)

Save the file, and Airflow will auto-detect the DAG in about one minute. It can also be triggered manually using the AirFlow UI.

Test the pipeline

Manually trigger the DAG in Airflow UI:

The DAG can be triggered manually by the ‘Trigger’ button in the top right corner.

Or using the CLI:

docker compose exec airflow-webserver airflow dags trigger rss_to_opensearch

Observe the logs:

Airflow UI → DAGs → rss_to_opensearch → Graph → click a task → Logs

Or using Docker: compose logs -f airflow-worker

Now the ingested news articles should appear in the Discover page in the OpenSearch Dashboards UI:

Now the newly ingested news articles can be seen and queried in the Discover page.

It is also possible to confirm the ingestion by running a command on the cli:

curl -k -u admin:<custom-admin-password> "https://localhost:9200/news-articles/_count"

More data

This prototype will be limited to RSS feeds, as they’re a straightforward way to ingest large volumes of frequently changing data while keeping this post concise.

In addition to RSS, the following data sources could also be added for generating the best possible answers:

  • Threat Advisories
    • CISA
    • CERT-EU
    • UK NCSC
    • NSA Cybersecurity advisories/guidance
    • Red Hat Security Data API (CSAF/CVE/OVAL)
    • Cisco PSIRT openVuln API
    • Microsoft Security Update Guide (CVRF/APIs)
  • Vulerability Data
    • NIST NVD
    • cve.org
    • OSV.dev
    • GitHub Advisory Database
  • Security knowledge bases
    • MITRE ATT&CK & D3FEND
    • APEC (attack patterns) and CWE (weaknesses)
    • NIST CSF 2.0 Reference Tool
  • Other
    • Google Project Zero
    • ENISA Threat Landscape & sector reports

Query the data using an LLM

Now a custom LLM can be connected to the database. I’ll use OpenAi’s GPT-5, but a self hosted model would also do the trick. As my homeserver does not have enough compute to run a model locally I’ll stick with GPT-5.

Install the OpenSearch Dashboards Assistant Plugin

OpenSearch Dashboards offers an Assistant plugin that allows us to query the data using an LLM directly through the web interface. This makes it comfortable to play with the data without having to code a custom script or UI first.

  1. Install Assistant by adding the following lines to the .env file and then restart the stack.
PLUGINS_ML_COMMONS_AGENT_FRAMEWORK_ENABLED=true
PLUGINS_ML_COMMONS_RAG_PIPELINE_FEATURE_ENABLED=true
ASSISTANT_CHAT_ENABLED=true
OBSERVABILITY_QUERY_ASSIST_ENABLED=true
  1. Cluster settings (remote LLM + dev ML on data node)
PUT /_cluster/settings
{
  "persistent": {
    "plugins.ml_commons.trusted_connector_endpoints_regex": ["^https://api\\.openai\\.com/.*$"],
    "plugins.ml_commons.allow_registering_model_via_url": true,
    "plugins.ml_commons.only_run_on_ml_node": false
  }
}
  1. Register a 384-dimensional embedding model (for VectorDBTool)
POST /_plugins/_ml/models/_register?deploy=true
{
  "name": "all-MiniLM-L6-v2",
  "version": "1.0.0",
  "description": "Sentence-Transformers all-MiniLM-L6-v2 (384-dim)",
  "model_format": "TORCH_SCRIPT",
  "function_name": "TEXT_EMBEDDING",
  "model_content_hash_value": "c15f0d2e62d872be5b5bc6c84d2e0f4921541e29fefbef51d59cc10a8ae30e0f",
  "model_config": {
    "model_type": "bert",
    "embedding_dimension": 384,
    "framework_type": "sentence_transformers",
    "all_config": "{\"hidden_size\":384,\"num_hidden_layers\":6,\"num_attention_heads\":12,\"vocab_size\":30522}"
  },
  "url": "https://artifacts.opensearch.org/models/ml-models/huggingface/sentence-transformers/all-MiniLM-L6-v2/1.0.1/torch_script/sentence-transformers_all-MiniLM-L6-v2-1.0.1-torch_script.zip"
}

Get its id (you’ll need it as YOUR_EMBEDDING_MODEL_ID):

POST /_plugins/_ml/models/_search
{ "query": { "term": { "name.keyword": "all-MiniLM-L6-v2" } } }
  1. Create an OpenAI Chat Completions connector (works with Assistant)
POST /_plugins/_ml/connectors/_create
{
  "name": "OpenAI Chat Completions",
  "description": "Chat model for Assistant",
  "version": 1,
  "protocol": "http",
  "parameters": {
    "endpoint": "api.openai.com",
    "model": "gpt-5",
    "response_filter": "$.choices[0].message.content"
  },
  "credential": { "openAI_key": "<OPENAI_API_KEY>" },
  "actions": [
    {
      "action_type": "predict",
      "method": "POST",
      "url": "https://${parameters.endpoint}/v1/chat/completions",
      "headers": {
        "Authorization": "Bearer ${credential.openAI_key}",
        "Content-Type": "application/json"
      },
      "request_body": "{ \"model\": \"${parameters.model}\", \"messages\": [{\"role\":\"user\",\"content\":\"${parameters.prompt}\"}] }"
    }
  ]
}
  1. Register and test the chat model
POST /_plugins/_ml/models/_register?deploy=true
{
  "name": "OpenAI Chat (for Assistant)",
  "function_name": "remote",
  "description": "Chat model for os_chat",
  "connector_id": "<CONNECTOR_ID>"
}

After registering the model you can test it:

POST /_plugins/_ml/models/<LLM_MODEL_ID>/_predict
{ "parameters": { "prompt": "Say hello in 5 words." } }

Test prompt to see if querying the LLM works.

  1. Create the conversational agent (RAG over news-articles)
POST /_plugins/_ml/agents/_register
{
  "name": "Cyber News RAG (Conversational)",
  "type": "conversational",
  "description": "Chat over the news-articles index.",
  "app_type": "os_chat",
  "llm": {
    "model_id": "<LLM_MODEL_ID>",
    "parameters": {
      "max_iteration": 5,
      "response_filter": "$.choices[0].message.content",
      "message_history_limit": 5,
      "disable_trace": false
    }
  },
  "memory": { "type": "conversation_index" },
  "tools": [
    {
      "type": "VectorDBTool",
      "name": "cyber_news_kb",
      "description": "RAG over cybersecurity RSS articles.",
      "parameters": {
        "input": "${parameters.question}",
        "index": "news-articles",
        "source_field": ["title","summary","link","source","published_at"],
        "model_id": "<EMBEDDING_MODEL_ID>",
        "embedding_field": "embedding",
        "doc_size": 5
      },
      "include_output_in_agent_response": true
    }
  ]
}

Save the returned agent_id as CONVERSATIONAL_AGENT_ID.

  1. Create the root agent for Dashboards
POST /_plugins/_ml/agents/_register
{
  "name": "Cyber News Chatbot (Root)",
  "type": "flow",
  "description": "Root agent for OpenSearch Assistant in Dashboards",
  "tools": [
    {
      "type": "AgentTool",
      "name": "LLMResponseGenerator",
      "parameters": { "agent_id": "<CONVERSATIONAL_AGENT_ID>" },
      "include_output_in_agent_response": true
    }
  ],
  "memory": { "type": "conversation_index" }
}

Save ROOT_AGENT_ID.

  1. Point the UI to the root agent (super-admin call to system index)

From host using the admin client certs:

curl -k --cert ./kirk.pem --key ./kirk-key.pem \
  -H 'Content-Type: application/json' \
  -X PUT https://<host-or-ip>:9200/.plugins-ml-config/_doc/os_chat -d '{
    "type":"os_chat_root_agent",
    "configuration":{ "agent_id":"YOUR_ROOT_AGENT_ID" }
  }'
  1. Restart Dashboards and chat docker compose restart opensearch-dashboards

Run a Query

Open Dashboards → chat icon → ask:

Why is the FBI warning about UNC6395?

chat example
The final result of this project: A database full of data that can be queried using an LLM of our choice.

We now have an automated cyber threat intelligence pipeline: feeds flow into OpenSearch, Airflow manages ingestion, and an LLM enables natural language queries. Next steps could include integrating vulnerability databases, building a custom dashboard, or experimenting with different embedding models.

Lars Ursprung
Author
Lars Ursprung