Using OpenTelemetry Collector for Log Deduplication, Logs-to-Metrics, and Pattern-Based Filtering

By Peter Simkins
Picture of the author
Published on
image alt attribute

Using OpenTelemetry Collector for Log Deduplication, Logs-to-Metrics, and Pattern-Based Filtering

OpenTelemetry Collector is an observability framework for cloud-native software, offering extensive capabilities to manage telemetry data such as logs, metrics, and traces. In this article, we'll dive into three specific operations: deduplicating logs, converting logs to metrics, and filtering logs based on patterns.

1. Deduplicating Logs

Logs often contain repetitive data, especially when monitoring services or applications that generate similar messages at high frequencies. Deduplicating logs can reduce storage costs, increase clarity, and optimize performance.

Steps to Deduplicate Logs:

  1. Set Up the OpenTelemetry Collector: Ensure that the OpenTelemetry Collector is installed and configured to receive logs from your services or applications.

  2. Processor Configuration: Utilize a custom processor or create a script that hashes incoming logs. By comparing these hashes, you can determine if a log is a duplicate and then decide on a retention strategy.

  3. Hash and Store: Create a cache to store these hashes, and determine a timeframe for which the hash should be retained.

  4. Handle Duplicates: When an incoming log's hash matches one in the cache, either drop the log or modify it based on your requirements.

2. Converting Logs to Metrics

Logs-to-metrics conversion helps in deriving performance or status metrics from logs. This conversion is valuable when logs contain quantifiable data.

Steps to Convert Logs to Metrics:

  1. Define the Metric: Understand what kind of metric you want to extract from the logs. For example, it could be error rates, request counts, or latency measures.

  2. Processor Configuration: Use processors in the OpenTelemetry Collector that can parse log data to extract the desired information.

  3. Parse and Convert: Extract the relevant data from the log and transform it into a metric format. Depending on the use-case, aggregate this data over a specific interval.

  4. Forward Metrics: Once converted, route these metrics to your monitoring solution or time-series database for visualization and alerting.

3. Filtering Logs Based on Patterns

Pattern-based filtering is critical to focus on logs that are meaningful, ignoring the noise or unneeded log data.

Steps for Pattern-Based Filtering:

  1. Understand Your Log Structure: Before you filter logs, understand their structure and the information they carry. Determine which parts of the log are essential and which can be ignored.

  2. Configure Log Filtering: OpenTelemetry Collector offers processors like the attributes processor to filter and modify log data. Define rules based on the patterns you want to filter.

  3. Test Your Filters: Before deploying filtering in a production environment, test your configurations in a staging or development environment. This ensures that you don't miss out on any critical logs.

  4. Deploy and Monitor: Once satisfied, deploy the configurations to your production environment. Monitor the Collector's performance to ensure it's efficiently handling and filtering logs.

This example includes log deduplication using a deduplicator processor (in go), converting logs to metrics using a fictional log_to_metric processor, and filtering logs based on a pattern using the attributes processor.

    receivers:
      otlp:
        protocols:
          grpc:
          http:
    
    processors:
      # Pattern-based Filtering Processor
      attributes:
        actions:
          - action: delete
            key: log_message
            value: "Unwanted Pattern"
    
      # Deduplication processor involves creating a cache of previously seen log entries and checking new entries against this cache
      deduplicator:
        hash_keys:
          - log_message
        retention_time: 10m
    
      # Fictional Logs-to-Metrics Processor (this is an illustrative example)
      log_to_metric:
        rules:
          - name: error_count
            log_pattern: "Error:"
            aggregation_type: count
            metric_name: "service.errors"
    
    exporters:
      logging:
      prometheus:
        endpoint: "0.0.0.0:8889"
    
    service:
      pipelines:
        logs:
          receivers: [otlp]
          processors: [attributes, deduplicator, log_to_metric]
          exporters: [logging]
    
        metrics:
          receivers: [otlp]
          processors: [log_to_metric]
          exporters: [otlp]

The deduplication processor involves creating a cache of previously seen log entries and checking new entries against this cache. The actual implementation will depend on the underlying language and framework in which the OpenTelemetry Collector is extended. Go (which is the primary language for OpenTelemetry Collector).

To integrate the above processor into the OpenTelemetry Collector, you'd also need to:

  1. Create a configuration for this processor.
  2. Add the necessary logic to register this processor during the collector's startup.
  3. Add tests to validate its behavior.
  4. Integrate with the main OpenTelemetry Collector builder.

Remember, this code serves as a basic example. In a real-world scenario, you'd need to consider various edge cases, handle errors more gracefully, and potentially optimize the deduplication process for large volumes of logs. Always thoroughly test any new processor in a non-production environment before deploying to production.

package deduplicator

import (
	"crypto/sha256"
	"encoding/hex"
	"time"

	"go.opentelemetry.io/collector/consumer/pdata"
	"go.opentelemetry.io/collector/processor/processorhelper"
)

type DeduplicationProcessor struct {
	hashKeys       []string
	retentionTime  time.Duration
	seenHashes     map[string]time.Time
}

func NewDeduplicationProcessor(hashKeys []string, retentionTime time.Duration) *DeduplicationProcessor {
	return &DeduplicationProcessor{
		hashKeys:       hashKeys,
		retentionTime:  retentionTime,
		seenHashes:     make(map[string]time.Time),
	}
}

func (dp *DeduplicationProcessor) ProcessLogs(ctx context.Context, logs pdata.Logs) (pdata.Logs, error) {
	now := time.Now()

	for i := 0; i < logs.ResourceLogs().Len(); i++ {
		rl := logs.ResourceLogs().At(i)
		for j := 0; j < rl.InstrumentationLibraryLogs().Len(); j++ {
			ill := rl.InstrumentationLibraryLogs().At(j)

			// Filter out the duplicates
			for k := 0; k < ill.Logs().Len(); k++ {
				log := ill.Logs().At(k)
				hashValue := dp.computeHash(log)
				if seenTime, ok := dp.seenHashes[hashValue]; ok && now.Sub(seenTime) <= dp.retentionTime {
					ill.Logs().RemoveAt(k)
					k--
				} else {
					dp.seenHashes[hashValue] = now
				}
			}
		}
	}

	// Cleanup old entries
	for hash, timestamp := range dp.seenHashes {
		if now.Sub(timestamp) > dp.retentionTime {
			delete(dp.seenHashes, hash)
		}
	}

	return logs, nil
}

func (dp *DeduplicationProcessor) computeHash(log pdata.LogRecord) string {
	hash := sha256.New()

	for _, key := range dp.hashKeys {
		if val, exists := log.Attributes().Get(key); exists {
			hash.Write([]byte(val.StringVal()))
		}
	}

	return hex.EncodeToString(hash.Sum(nil))
}

Conclusion

OpenTelemetry Collector offers a comprehensive platform for telemetry data management. By leveraging its capabilities, organizations can optimize their observability stack, ensuring only meaningful data is processed and stored. Whether it's deduplicating logs, converting logs to metrics, or filtering logs based on patterns, the OpenTelemetry Collector can handle it all, offering a streamlined solution for modern observability needs.

Stay Tuned

Want to become a Next.js pro?
The best articles, links and news related to web development delivered once a week to your inbox.