You can use the aggregate
filter in order to do this. The aggregate filter provides support for aggregating several log lines into one single event based on a common field value. In your case, the common field would be the job_id
field.
Then we need another field to detect the first event vs the second event that should be aggregated. In your case, this would be the state
field.
So you simply need to add another filter to your existing Logstash configuration, like this:
filter {
...your other filters
if [state] == "processing" {
aggregate {
task_id => "%{job_id}"
}
} else if [state] == "failed" {
aggregate {
task_id => "%{job_id}"
end_of_task => true
timeout => 120
}
}
}
You are free to adjust the timeout
(in seconds) depending on how long your jobs are running.