Skip to content

Commit 303cd2d

Browse files
committed
Add save matches to separate files
1 parent a7e3e76 commit 303cd2d

8 files changed

Lines changed: 183 additions & 112 deletions

File tree

.golangci.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ run:
55
linters:
66
default: all
77
disable:
8+
- funlen
9+
- gocyclo
10+
- gocognit
11+
- err113
812
- embeddedstructfieldcheck
913
- testpackage
1014
- noinlineerr

cmd/catp/README.md

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ catp [OPTIONS] PATH ...
2929
write first 10 seconds of CPU profile to file
3030
-dbg-mem-prof string
3131
write heap profile to file after 10 seconds
32+
-end-line int
33+
stop printing lines at this line (exclusive),
34+
default is 0 (no limit), each input file is counted separately
3235
-l count lines
3336
-no-progress
3437
disable progress printing
@@ -57,20 +60,25 @@ catp [OPTIONS] PATH ...
5760
write current progress to a file
5861
-rate-limit float
5962
output rate limit lines per second
63+
-save-matches value
64+
save matches of previous filter group to file
6065
-skip value
6166
filter matching, may contain multiple AND patterns separated by ^,
6267
if filter matches, line is removed from the output (may be kept if it passed preceding -pass)
6368
for example, you can use "-skip quux^baz -skip fooO" to skip lines that have (quux AND baz) OR fooO
6469
-skip-csv value
6570
filter matching, loads skip params from CSV file,
6671
each line is treated as -skip, each column value is AND condition.
72+
-start-line int
73+
start printing lines from this line (inclusive),
74+
default is 0 (first line), each input file is counted separately
6775
-version
6876
print version and exit
6977
```
7078

7179
## Examples
7280

73-
Feed a file into `jq` field extractor with progress printing.
81+
### Feed a file into `jq` field extractor with progress printing
7482

7583
```
7684
catp get-key.log | jq .context.callback.Data.Nonce > get-key.jq
@@ -84,11 +92,13 @@ get-key.log: 96.8% bytes read, 967819 lines processed, 8064.9 l/s, 41.8 MB/s, el
8492
get-key.log: 100.0% bytes read, 1000000 lines processed, 8065.7 l/s, 41.8 MB/s, elapsed 2m3.98s, remaining 0s
8593
```
8694

95+
### Parallel scan of multiple files
96+
8797
Run log filtering (lines containing `foo bar` or `baz`) on multiple files in background (with `screen`) and output to a
8898
new compressed file.
8999

90100
```
91-
screen -dmS foo12 ./catp -output ~/foo-2023-07-12.log.zst -pass "foo bar" -pass "baz" /home/logs/server-2023-07-12*
101+
screen -dmS foo12 ./catp -parallel 20 -output ~/foo-2023-07-12.log.zst -pass "foo bar" -pass "baz" /home/logs/server-2023-07-12*
92102
```
93103

94104
```
@@ -108,14 +118,31 @@ all: 32.3% bytes read, /home/logs/server-2023-07-12-09-00.log_6.zst: 5.1% bytes
108118
# detaching from screen with ctrl+a+d
109119
```
110120

111-
Filter based on large list of needles. Values from allow and block lists are loaded into high-performance
112-
[Aho Corasick](https://en.wikipedia.org/wiki/Aho%E2%80%93Corasick_algorithm) indexes.
121+
### Filter based on large list of needles
122+
123+
Values from allow and block lists are loaded into high-performance
124+
[Aho Corasick](https://en.wikipedia.org/wiki/Aho%E2%80%93Corasick_algorithm) indexes.
113125

114126
```
115127
catp -pass-csv allowlist.csv -skip-csv blocklist.csv -pass-any -output filtered.log.zst source.log.zst
116128
```
117129

118130
Each source line would follow the filtering pipeline:
119-
* if `allowlist.csv` has at least one row, all cells of which are present in the source line, source line gets into output
120-
* if not, but if `blocklist.csv` has at least one row, all cells of which are present in the source line, source line is skipped
131+
132+
* if `allowlist.csv` has at least one row, all cells of which are present in the source line, source line gets into
133+
output
134+
* if not, but if `blocklist.csv` has at least one row, all cells of which are present in the source line, source line is
135+
skipped
121136
* if not, source line gets into output because of `-pass-any`
137+
138+
### Split matches into separate files
139+
140+
```
141+
catp -pass foo -save-matches foo.log.zst -pass bar^baz -save-matches 2.gz -pass qux -pass quux -output other.log input.log
142+
```
143+
144+
Pipeline:
145+
* each line from `input.log` is being read
146+
* lines that contain `foo` are stored to `foo.log.zst`
147+
* lines that contain `bar` and `baz` (but not `foo` that was already matched) are stored to `2.gz`
148+
* lines that contain `qux` or `quux` are stored to `other.log`

cmd/catp/catp/app.go

Lines changed: 39 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"errors"
88
"flag"
99
"fmt"
10-
"io"
1110
"log"
1211
"os"
1312
"os/signal"
@@ -21,13 +20,21 @@ import (
2120

2221
"github.com/bool64/dev/version"
2322
"github.com/bool64/progress"
24-
gzip "github.com/klauspost/pgzip"
2523
)
2624

2725
// Main is the entry point for catp CLI tool.
2826
func Main(options ...func(o *Options)) error { //nolint:funlen,cyclop,gocognit,gocyclo,maintidx
2927
r := &runner{}
3028

29+
var closers []func() error
30+
defer func() {
31+
for _, closer := range closers {
32+
if err := closer(); err != nil {
33+
log.Printf("failed to close: %s\n", err.Error())
34+
}
35+
}
36+
}()
37+
3138
flag.Var(flagFunc(func(v string) error {
3239
r.filters.addFilter(true, bytes.Split([]byte(v), []byte("^"))...)
3340

@@ -62,6 +69,17 @@ func Main(options ...func(o *Options)) error { //nolint:funlen,cyclop,gocognit,g
6269
"if filter matches, line is removed from the output (may be kept if it passed preceding -pass)\n"+
6370
"for example, you can use \"-skip quux^baz -skip fooO\" to skip lines that have (quux AND baz) OR fooO")
6471

72+
flag.Var(flagFunc(func(v string) error {
73+
w, closer, err := makeWriter(v)
74+
if err != nil {
75+
return err
76+
}
77+
78+
closers = append(closers, closer)
79+
80+
return r.filters.saveTo(w)
81+
}), "save-matches", "save matches of previous filter group to file")
82+
6583
flag.IntVar(&r.parallel, "parallel", 1, "number of parallel readers if multiple files are provided\n"+
6684
"lines from different files will go to output simultaneously (out of order of files, but in order of lines in each file)\n"+
6785
"use 0 for multi-threaded zst decoder (slightly faster at cost of more CPU)")
@@ -85,7 +103,7 @@ func Main(options ...func(o *Options)) error { //nolint:funlen,cyclop,gocognit,g
85103
"default is 0 (no limit), each input file is counted separately")
86104

87105
flag.Usage = func() {
88-
fmt.Println("catp", version.Module("github.com/bool64/progress").Version+",",
106+
fmt.Println("catp", version.Module("github.com/bool64/progress").Version+r.options.VersionLabel+",",
89107
version.Info().GoVersion, strings.Join(versionExtra, " "))
90108
fmt.Println()
91109
fmt.Println("catp prints contents of files to STDOUT or dir/file output, \n" +
@@ -99,20 +117,6 @@ func Main(options ...func(o *Options)) error { //nolint:funlen,cyclop,gocognit,g
99117
}
100118
flag.Parse()
101119

102-
r.filters.buildIndex()
103-
104-
if *ver {
105-
fmt.Println(version.Module("github.com/bool64/progress").Version)
106-
107-
return nil
108-
}
109-
110-
if flag.NArg() == 0 {
111-
flag.Usage()
112-
113-
return nil
114-
}
115-
116120
if *cpuProfile != "" {
117121
startProfiling(*cpuProfile, *memProfile)
118122

@@ -127,6 +131,20 @@ func Main(options ...func(o *Options)) error { //nolint:funlen,cyclop,gocognit,g
127131
}
128132
}
129133

134+
r.filters.buildIndex()
135+
136+
if *ver {
137+
fmt.Println(version.Module("github.com/bool64/progress").Version + r.options.VersionLabel)
138+
139+
return nil
140+
}
141+
142+
if flag.NArg() == 0 {
143+
flag.Usage()
144+
145+
return nil
146+
}
147+
130148
var files []string
131149

132150
args := flag.Args()
@@ -163,61 +181,21 @@ func Main(options ...func(o *Options)) error { //nolint:funlen,cyclop,gocognit,g
163181
sort.Strings(files)
164182

165183
if *output != "" && r.outDir == "" {
166-
fn := *output
167-
168-
out, err := os.Create(fn) //nolint:gosec
184+
w, closer, err := makeWriter(*output)
169185
if err != nil {
170-
return fmt.Errorf("failed to create output file %s: %w", fn, err)
171-
}
172-
173-
r.output = out
174-
compCloser := io.Closer(io.NopCloser(nil))
175-
176-
switch {
177-
case strings.HasSuffix(fn, ".gz"):
178-
gw := gzip.NewWriter(r.output)
179-
compCloser = gw
180-
181-
r.output = gw
182-
case strings.HasSuffix(fn, ".zst"):
183-
zw, err := zstdWriter(r.output)
184-
if err != nil {
185-
return fmt.Errorf("zstd new writer: %w", err)
186-
}
187-
188-
compCloser = zw
189-
190-
r.output = zw
186+
return err
191187
}
192188

193-
w := bufio.NewWriterSize(r.output, 64*1024)
194189
r.output = w
195190

196-
defer func() {
197-
if err := w.Flush(); err != nil {
198-
log.Fatalf("failed to flush STDOUT buffer: %s", err)
199-
}
200-
201-
if err := compCloser.Close(); err != nil {
202-
log.Fatalf("failed to close compressor: %s", err)
203-
}
204-
205-
if err := out.Close(); err != nil {
206-
log.Fatalf("failed to close output file %s: %s", *output, err)
207-
}
208-
}()
191+
closers = append(closers, closer)
209192
} else {
210193
if isStdin {
211194
r.output = os.Stdout
212195
} else {
213196
w := bufio.NewWriterSize(os.Stdout, 64*1024)
214197
r.output = w
215-
216-
defer func() {
217-
if err := w.Flush(); err != nil {
218-
log.Fatalf("failed to flush STDOUT buffer: %s", err)
219-
}
220-
}()
198+
closers = append(closers, w.Flush)
221199
}
222200
}
223201

0 commit comments

Comments
 (0)