diff --git a/.gitignore b/.gitignore index f0d247e..51af806 100644 --- a/.gitignore +++ b/.gitignore @@ -19,4 +19,6 @@ out/ # End of https://www.gitignore.io/api/go -vendor/ \ No newline at end of file +vendor/ + +majestic_million*.csv \ No newline at end of file diff --git a/README.md b/README.md index 8fc2a75..5bcea77 100644 --- a/README.md +++ b/README.md @@ -24,11 +24,10 @@ Csvdiff is a difftool to compute changes between two csv files. ## Usage ```bash -$ csvdiff run --base base.csv --delta delta.csv +$ csvdiff base.csv delta.csv # Additions: 1 -... - # Modifications: 20 +# Rows: ... ``` @@ -37,29 +36,29 @@ $ csvdiff run --base base.csv --delta delta.csv - For MacOS ```bash -curl -sL https://github.com/aswinkarthik93/csvdiff/releases/download/v0.1.2/csvdiff_0.1.2_darwin_amd64.tar.gz | tar xfz - +curl -sL https://github.com/aswinkarthik93/csvdiff/releases/download/v1.0.0/csvdiff_1.0.0_darwin_amd64.tar.gz | tar xfz - ``` - For centos ```bash -yum install https://github.com/aswinkarthik93/csvdiff/releases/download/v0.1.2/csvdiff_0.1.2_linux_64-bit.rpm +yum install https://github.com/aswinkarthik93/csvdiff/releases/download/v1.0.0/csvdiff_1.0.0_linux_64-bit.rpm ``` - For debian ``` -curl -sL https://github.com/aswinkarthik93/csvdiff/releases/download/v0.1.2/csvdiff_0.1.2_linux_64-bit.deb -O +curl -sL https://github.com/aswinkarthik93/csvdiff/releases/download/v1.0.0/csvdiff_1.0.0_linux_64-bit.deb -O dpkg --install csvdiff_*_linux_64-bit.deb ``` - For Linux ```bash -curl -sL https://github.com/aswinkarthik93/csvdiff/releases/download/v0.1.2/csvdiff_0.1.2_linux_amd64.tar.gz | tar xfz - +curl -sL https://github.com/aswinkarthik93/csvdiff/releases/download/v1.0.0/csvdiff_1.0.0_linux_amd64.tar.gz | tar xfz - ``` -- For [Windows](https://github.com/aswinkarthik93/csvdiff/releases/download/v0.1.2/csvdiff_0.1.2_windows_amd64.tar.gz) +- For [Windows](https://github.com/aswinkarthik93/csvdiff/releases/download/v1.0.0/csvdiff_1.0.0_windows_amd64.tar.gz) - Build using Go @@ -85,22 +84,41 @@ go get -u github.com/aswinkarthik93/csvdiff ## Miscellaneous features +- By default, it marks the row as ADDED or MODIFIED by introducing a new column at last. + +```bash +% csvdiff examples/base-small.csv examples/delta-small.csv +Additions 1 +Modifications 1 +Rows: +24564,907,completely-newsite.com,com,19827,32902,completely-newsite.com,com,1621,909,19787,32822,ADDED +69,1048,aol.com,com,97543,225532,aol.com,com,70,49,97328,224491,MODIFIED +``` + - The `--primary-key` in an integer array. Specify comma separated positions if the table has a compound key. Using this primary key, it can figure out modifications. If the primary key changes, it is an addition. ```bash -% csvdiff run --base base.csv --delta delta.csv --primary-key 0,1 +% csvdiff base.csv delta.csv --primary-key 0,1 ``` - If you want to compare only few columns in the csv when computing hash, ```bash -% csvdiff run --base base.csv --delta delta.csv --primary-key 0,1 --value-columns 2 +% csvdiff base.csv delta.csv --primary-key 0,1 --columns 2 ``` -- **Additions** and **Modifications** can be written to files directly instead of STDOUT. +- Supports JSON format for post processing ```bash -% csvdiff run --base base.csv --delta delta.csv --additions additions.csv --modifications modifications.csv +% csvdiff examples/base-small.csv examples/delta-small.csv --format json +{ + "Additions": [ + "24564,907,completely-newsite.com,com,19827,32902,completely-newsite.com,com,1621,909,19787,32822" + ], + "Modifications": [ + "69,1048,aol.com,com,97543,225532,aol.com,com,70,49,97328,224491" + ] +} ``` ## Build locally diff --git a/benchmark/README.md b/benchmark/README.md index e4d4dcd..e38aa9e 100644 --- a/benchmark/README.md +++ b/benchmark/README.md @@ -1,26 +1,29 @@ -## Comparison with other tools +# Comparison with other tools - -### Setup +## Setup * Using the majestic million data. (Source in credits section) * Both files have 998390 rows and 12 columns. * Only one modification between both files. * Ran on Processor: Intel Core i7 2.5 GHz 4 cores 16 GB RAM -0. csvdiff (this tool) : *0m2.085s* - -```bash -time csvdiff run -b majestic_million.csv -d majestic_million_diff.csv +## Baseline -# Additions: 0 -# Modifications: 1 +0. csvdiff (this tool) : *0m1.159s* -real 0m2.085s -user 0m3.861s -sys 0m0.340s +```bash + time csvdiff majestic_million.csv majestic_million_diff.csv +Additions 0 +Modifications 1 +... + +real 0m1.159s +user 0m2.167s +sys 0m0.222s ``` +## Other tools + 1. [data.table](https://github.com/Rdatatable/data.table) : *0m4.284s* * Join both csvs using `id` column. @@ -71,13 +74,13 @@ $ cd ./pkg/digest $ go test -bench=. -v -benchmem -benchtime=5s -cover ``` -| | | | | | -| ---------------------------- | ---------- | ----------------------- | -------------------- | ------------------- | -| BenchmarkCreate1-8 | 2000000 | 5967 ns/op | 5474 B/op | 21 allocs/op | -| BenchmarkCreate10-8 | 500000 | 16251 ns/op | 10889 B/op | 94 allocs/op | -| BenchmarkCreate100-8 | 100000 | 114219 ns/op | 67139 B/op | 829 allocs/op | -| BenchmarkCreate1000-8 | 10000 | 1042723 ns/op | 674239 B/op | 8078 allocs/op | -| BenchmarkCreate10000-8 | 1000 | 10386850 ns/op | 6533806 B/op | 80306 allocs/op | -| BenchmarkCreate100000-8 | 100 | 108740944 ns/op | 64206718 B/op | 804208 allocs/op | -| BenchmarkCreate1000000-8 | 5 | 1161730558 ns/op | 672048142 B/op | 8039026 allocs/op | -| BenchmarkCreate10000000-8 | 1 | 12721982424 ns/op | 6549111872 B/op| 80308455 allocs/op | +``` +BenchmarkCreate1-8 200000 31794 ns/op 116163 B/op 24 allocs/op +BenchmarkCreate10-8 200000 43351 ns/op 119993 B/op 79 allocs/op +BenchmarkCreate100-8 50000 142645 ns/op 160577 B/op 634 allocs/op +BenchmarkCreate1000-8 10000 907308 ns/op 621694 B/op 6085 allocs/op +BenchmarkCreate10000-8 1000 7998083 ns/op 5117977 B/op 60345 allocs/op +BenchmarkCreate100000-8 100 81260585 ns/op 49106849 B/op 604563 allocs/op +BenchmarkCreate1000000-8 10 788485738 ns/op 520115434 B/op 6042650 allocs/op +BenchmarkCreate10000000-8 1 7878009695 ns/op 5029061632 B/op 60346535 allocs/op +``` \ No newline at end of file diff --git a/cmd/config.go b/cmd/config.go index 76c5ac3..d5cf9da 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -1,9 +1,8 @@ package cmd import ( - "io" - "log" - "os" + "errors" + "strings" "github.com/aswinkarthik93/csvdiff/pkg/digest" ) @@ -18,10 +17,7 @@ func init() { type Config struct { PrimaryKeyPositions []int ValueColumnPositions []int - Base string - Delta string - Additions string - Modifications string + Format string } // GetPrimaryKeys is to return the --primary-key flags as digest.Positions array. @@ -40,45 +36,38 @@ func (c *Config) GetValueColumns() digest.Positions { return []int{} } -// GetBaseReader returns an io.Reader for the base file. -func (c *Config) GetBaseReader() io.Reader { - return getReader(c.Base) -} - -// GetDeltaReader returns an io.Reader for the delta file. -func (c *Config) GetDeltaReader() io.Reader { - return getReader(c.Delta) -} - -// AdditionsWriter gives the output stream for the additions in delta csv. -func (c *Config) AdditionsWriter() io.WriteCloser { - return getWriter(c.Additions) -} - -// ModificationsWriter gives the output stream for the modifications in delta csv. -func (c *Config) ModificationsWriter() io.WriteCloser { - return getWriter(c.Modifications) -} +// Validate validates the config object +// and returns error if not valid. +func (c *Config) Validate() error { + allFormats := []string{rowmark, jsonFormat} -func getReader(filename string) io.Reader { - file, err := os.Open(filename) + formatValid := false + for _, format := range allFormats { + if strings.ToLower(c.Format) == format { + formatValid = true + } + } - if err != nil { - log.Fatal(err) + if !formatValid { + return errors.New("Specified format is not valid") } - return file + return nil } -func getWriter(outputStream string) io.WriteCloser { - if outputStream != "STDOUT" { - file, err := os.Create(outputStream) - - if err != nil { - log.Fatal(err) - } +const ( + rowmark = "rowmark" + jsonFormat = "json" +) - return file +// Formatter instantiates a new formatted +// based on config.Format +func (c *Config) Formatter() Formatter { + format := strings.ToLower(c.Format) + if format == rowmark { + return &RowMarkFormatter{} + } else if format == jsonFormat { + return &JSONFormatter{} } - return os.Stdout + return &RowMarkFormatter{} } diff --git a/cmd/config_test.go b/cmd/config_test.go index 30338d6..705a1c0 100644 --- a/cmd/config_test.go +++ b/cmd/config_test.go @@ -1,30 +1,72 @@ -package cmd +package cmd_test import ( "testing" + "github.com/aswinkarthik93/csvdiff/cmd" "github.com/aswinkarthik93/csvdiff/pkg/digest" "github.com/stretchr/testify/assert" ) func TestPrimaryKeyPositions(t *testing.T) { - config := Config{PrimaryKeyPositions: []int{0, 1}} + config := cmd.Config{PrimaryKeyPositions: []int{0, 1}} assert.Equal(t, digest.Positions([]int{0, 1}), config.GetPrimaryKeys()) - config = Config{PrimaryKeyPositions: []int{}} + config = cmd.Config{PrimaryKeyPositions: []int{}} assert.Equal(t, digest.Positions([]int{0}), config.GetPrimaryKeys()) - config = Config{} + config = cmd.Config{} assert.Equal(t, digest.Positions([]int{0}), config.GetPrimaryKeys()) } func TestValueColumnPositions(t *testing.T) { - config := Config{ValueColumnPositions: []int{0, 1}} + config := cmd.Config{ValueColumnPositions: []int{0, 1}} assert.Equal(t, digest.Positions([]int{0, 1}), config.GetValueColumns()) - config = Config{ValueColumnPositions: []int{}} + config = cmd.Config{ValueColumnPositions: []int{}} assert.Equal(t, digest.Positions([]int{}), config.GetValueColumns()) - config = Config{} + config = cmd.Config{} assert.Equal(t, digest.Positions([]int{}), config.GetValueColumns()) } + +func TestConfigValidate(t *testing.T) { + var config *cmd.Config + + config = &cmd.Config{} + assert.Error(t, config.Validate()) + + config = &cmd.Config{Format: "rowmark"} + assert.NoError(t, config.Validate()) + + config = &cmd.Config{Format: "rowMARK"} + assert.NoError(t, config.Validate()) + + config = &cmd.Config{Format: "json"} + assert.NoError(t, config.Validate()) +} + +func TestDefaultConfigFormatter(t *testing.T) { + config := &cmd.Config{} + + formatter, ok := config.Formatter().(*cmd.RowMarkFormatter) + + assert.True(t, ok) + assert.NotNil(t, formatter) +} + +func TestConfigFormatter(t *testing.T) { + var config *cmd.Config + var formatter cmd.Formatter + var ok bool + + config = &cmd.Config{Format: "rowmark"} + formatter, ok = config.Formatter().(*cmd.RowMarkFormatter) + assert.True(t, ok) + assert.NotNil(t, formatter) + + config = &cmd.Config{Format: "json"} + formatter, ok = config.Formatter().(*cmd.JSONFormatter) + assert.True(t, ok) + assert.NotNil(t, formatter) +} diff --git a/cmd/formatter.go b/cmd/formatter.go new file mode 100644 index 0000000..b83cc30 --- /dev/null +++ b/cmd/formatter.go @@ -0,0 +1,48 @@ +package cmd + +import ( + "encoding/json" + "fmt" + "io" + + "github.com/aswinkarthik93/csvdiff/pkg/digest" +) + +// Formatter defines the interface through which differences +// can be formatted and displayed +type Formatter interface { + Format(digest.Difference, io.Writer) +} + +// RowMarkFormatter formats diff by marking each row as +// ADDED/MODIFIED. It mutates the row and adds as a new column. +type RowMarkFormatter struct{} + +// Format prints the diff to os.Stdout +func (f *RowMarkFormatter) Format(diff digest.Difference, w io.Writer) { + fmt.Fprintf(w, "Additions %d\n", len(diff.Additions)) + fmt.Fprintf(w, "Modifications %d\n", len(diff.Modifications)) + fmt.Fprintf(w, "Rows:\n") + + for _, added := range diff.Additions { + fmt.Fprintf(w, "%s,%s\n", added, "ADDED") + } + + for _, modified := range diff.Modifications { + fmt.Fprintf(w, "%s,%s\n", modified, "MODIFIED") + } +} + +// JSONFormatter formats diff to as a JSON Object +type JSONFormatter struct{} + +// Format prints the diff as a JSON +func (f *JSONFormatter) Format(diff digest.Difference, w io.Writer) { + data, err := json.MarshalIndent(diff, "", " ") + + if err != nil { + panic(err) + } + + w.Write(data) +} diff --git a/cmd/formatter_test.go b/cmd/formatter_test.go new file mode 100644 index 0000000..da00383 --- /dev/null +++ b/cmd/formatter_test.go @@ -0,0 +1,55 @@ +package cmd_test + +import ( + "bytes" + "testing" + + "github.com/aswinkarthik93/csvdiff/cmd" + "github.com/aswinkarthik93/csvdiff/pkg/digest" + + "github.com/stretchr/testify/assert" +) + +func TestJSONFormat(t *testing.T) { + var formatter cmd.Formatter + diff := digest.Difference{ + Additions: []string{"additions"}, + Modifications: []string{"modification"}, + } + expected := `{ + "Additions": [ + "additions" + ], + "Modifications": [ + "modification" + ] +}` + + var buffer bytes.Buffer + + formatter = &cmd.JSONFormatter{} + + formatter.Format(diff, &buffer) + assert.Equal(t, expected, buffer.String()) +} + +func TestRowMarkFormatter(t *testing.T) { + var formatter cmd.Formatter + diff := digest.Difference{ + Additions: []string{"additions"}, + Modifications: []string{"modification"}, + } + expected := `Additions 1 +Modifications 1 +Rows: +additions,ADDED +modification,MODIFIED +` + + var buffer bytes.Buffer + + formatter = &cmd.RowMarkFormatter{} + + formatter.Format(diff, &buffer) + assert.Equal(t, expected, buffer.String()) +} diff --git a/cmd/root.go b/cmd/root.go index d854c9a..089398f 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -21,26 +21,73 @@ package cmd import ( + "errors" "fmt" + "io" "os" + "time" + "github.com/aswinkarthik93/csvdiff/pkg/digest" homedir "github.com/mitchellh/go-homedir" "github.com/spf13/cobra" "github.com/spf13/viper" ) -var cfgFile string +var ( + cfgFile string + timed bool + version bool +) // rootCmd represents the base command when called without any subcommands var rootCmd = &cobra.Command{ - Use: "csvdiff", + Use: "csvdiff ", Short: "A diff tool for database tables dumped as csv files", Long: `Differentiates two csv files and finds out the additions and modifications. Most suitable for csv files created from database tables`, + PreRunE: func(cmd *cobra.Command, args []string) error { + // If its --version flag, dont thrown error + if version { + return nil + } + + // Validate args + if len(args) != 2 { + return errors.New("Pass 2 files. Usage: csvdiff ") + } + + // Validate flags + if err := config.Validate(); err != nil { + return err + } + + return nil + }, // Uncomment the following line if your bare application // has an action associated with it: - // Run: func(cmd *cobra.Command, args []string) { - // }, + Run: func(cmd *cobra.Command, args []string) { + // Print version and exit program + if version { + fmt.Println(VersionString) + return + } + + if timed { + defer timeTrack(time.Now(), "csvdiff") + } + + baseFile := newReadCloser(args[0]) + defer baseFile.Close() + deltaFile := newReadCloser(args[1]) + defer deltaFile.Close() + + baseConfig := digest.NewConfig(baseFile, config.GetPrimaryKeys(), config.GetValueColumns()) + deltaConfig := digest.NewConfig(deltaFile, config.GetPrimaryKeys(), config.GetValueColumns()) + + diff := digest.Diff(baseConfig, deltaConfig) + + config.Formatter().Format(diff, os.Stdout) + }, } // Execute adds all child commands to the root command and sets flags appropriately. @@ -63,6 +110,13 @@ func init() { // Cobra also supports local flags, which will only run // when this action is called directly. rootCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle") + + rootCmd.Flags().IntSliceVarP(&config.PrimaryKeyPositions, "primary-key", "p", []int{0}, "Primary key positions of the Input CSV as comma separated values Eg: 1,2") + rootCmd.Flags().IntSliceVarP(&config.ValueColumnPositions, "columns", "", []int{}, "Selectively compare positions in CSV Eg: 1,2. Default is entire row") + rootCmd.Flags().StringVarP(&config.Format, "format", "", "rowmark", "Available (rowmark|json)") + + rootCmd.Flags().BoolVarP(&timed, "time", "", false, "Measure time") + rootCmd.Flags().BoolVarP(&version, "version", "", false, "Display version") } // initConfig reads in config file and ENV variables if set. @@ -90,3 +144,17 @@ func initConfig() { fmt.Println("Using config file:", viper.ConfigFileUsed()) } } + +func newReadCloser(filename string) io.ReadCloser { + file, err := os.Open(filename) + if err != nil { + panic(err) + } + + return file +} + +func timeTrack(start time.Time, name string) { + elapsed := time.Since(start) + fmt.Fprintln(os.Stderr, fmt.Sprintf("%s took %s", name, elapsed)) +} diff --git a/cmd/run.go b/cmd/run.go deleted file mode 100644 index 8b1c0ac..0000000 --- a/cmd/run.go +++ /dev/null @@ -1,163 +0,0 @@ -// Copyright © 2018 NAME HERE -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package cmd - -import ( - "encoding/json" - "fmt" - "io" - "log" - "os" - "sync" - "time" - - "github.com/aswinkarthik93/csvdiff/pkg/digest" - spinner "github.com/briandowns/spinner" - "github.com/spf13/cobra" -) - -// digestCmd represents the digest command -var digestCmd = &cobra.Command{ - Use: "run", - Short: "run diff between base-csv and delta-csv", - Run: func(cmd *cobra.Command, args []string) { - run() - }, -} - -var ( - debug bool - noTime bool - noSpinner bool - - loader *spinner.Spinner - startTime time.Time -) - -var newLine []byte - -func init() { - rootCmd.AddCommand(digestCmd) - newLine = []byte{'\n'} - - digestCmd.Flags().StringVarP(&config.Base, "base", "b", "", "The base csv file") - digestCmd.Flags().StringVarP(&config.Delta, "delta", "d", "", "The delta csv file") - digestCmd.Flags().IntSliceVarP(&config.PrimaryKeyPositions, "primary-key", "p", []int{0}, "Primary key positions of the Input CSV as comma separated values Eg: 1,2") - digestCmd.Flags().IntSliceVarP(&config.ValueColumnPositions, "value-columns", "", []int{}, "Value key positions of the Input CSV as comma separated values Eg: 1,2. Default is entire row") - digestCmd.Flags().StringVarP(&config.Additions, "additions", "a", "STDOUT", "Output stream for the additions in delta file") - digestCmd.Flags().StringVarP(&config.Modifications, "modifications", "m", "STDOUT", "Output stream for the modifications in delta file") - - digestCmd.Flags().BoolVarP(&debug, "debug", "", false, "Debug mode") - digestCmd.Flags().BoolVarP(&noTime, "no-time", "", false, "Do not measure time") - digestCmd.Flags().BoolVarP(&noSpinner, "no-spinner", "", false, "Do not display spinner") - - digestCmd.MarkFlagRequired("base") - digestCmd.MarkFlagRequired("delta") -} - -func run() { - if !noTime { - defer timeTrack(time.Now(), "csvdiff") - } - if str, err := json.Marshal(config); err == nil && debug { - fmt.Println(string(str)) - } else if err != nil { - log.Fatal(err) - } - - baseConfig := digest.NewConfig(config.GetBaseReader(), false, config.GetPrimaryKeys(), config.GetValueColumns()) - - deltaConfig := digest.NewConfig(config.GetDeltaReader(), true, config.GetPrimaryKeys(), config.GetValueColumns()) - - var wg sync.WaitGroup - baseChannel := make(chan message) - deltaChannel := make(chan message) - - if !noSpinner { - loader = spinner.New(spinner.CharSets[14], 100*time.Millisecond) - loader.Writer = os.Stderr - loader.Color("cyan") - loader.Start() - loader.Suffix = " Computing hashes" - } - - wg.Add(1) - go generateInBackground("base", baseConfig, &wg, baseChannel) - - wg.Add(1) - go generateInBackground("delta", deltaConfig, &wg, deltaChannel) - - wg.Add(1) - go compareInBackgroud(baseChannel, deltaChannel, &wg) - - wg.Wait() -} - -func timeTrack(start time.Time, name string) { - elapsed := time.Since(start) - log.Printf("%s took %s", name, elapsed) -} - -type message struct { - digestMap map[uint64]uint64 - sourceMap map[uint64]string -} - -func generateInBackground(name string, config *digest.Config, wg *sync.WaitGroup, channel chan<- message) { - digest, sourceMap, err := digest.Create(config) - if err != nil { - panic(err) - } - - if debug { - log.Println("Generated Digest for " + name) - } - channel <- message{digestMap: digest, sourceMap: sourceMap} - close(channel) - wg.Done() -} - -func compareInBackgroud(baseChannel, deltaChannel <-chan message, wg *sync.WaitGroup) { - baseMessage := <-baseChannel - deltaMessage := <-deltaChannel - if !noSpinner { - loader.Suffix = " Comparing hashes" - } - - additions, modifications := digest.Compare(baseMessage.digestMap, deltaMessage.digestMap) - - aWriter := config.AdditionsWriter() - mWriter := config.ModificationsWriter() - defer aWriter.Close() - defer mWriter.Close() - - if !noSpinner { - loader.Stop() - } - print("Additions", aWriter, additions, deltaMessage.sourceMap) - print("Modifications", mWriter, modifications, deltaMessage.sourceMap) - wg.Done() -} - -func print(recordType string, w io.Writer, positions []uint64, content map[uint64]string) { - fmt.Println() - fmt.Println(fmt.Sprintf("# %s: %d", recordType, len(positions))) - fmt.Println() - - for _, i := range positions { - w.Write([]byte(content[i])) - w.Write(newLine) - } -} diff --git a/cmd/version.go b/cmd/version.go new file mode 100644 index 0000000..1386969 --- /dev/null +++ b/cmd/version.go @@ -0,0 +1,4 @@ +package cmd + +// VersionString to display on --version call +const VersionString = "csvdiff v1.0.0" diff --git a/demo/csvdiff.gif b/demo/csvdiff.gif index 65eba7b..76fca8c 100644 Binary files a/demo/csvdiff.gif and b/demo/csvdiff.gif differ diff --git a/examples/README.md b/examples/README.md deleted file mode 100644 index 5cf292a..0000000 --- a/examples/README.md +++ /dev/null @@ -1,40 +0,0 @@ -## Example - -Dataset is used from this [blog](https://blog.majestic.com/development/majestic-million-csv-daily/) - -- Base csv file - -```bash -% cat ./examples/base-small.csv -15,12,wordpress.com,com,207790,792348,wordpress.com,com,15,12,207589,791634 -43,1,europa.eu,eu,116613,353412,europa.eu,eu,41,1,119129,359818 -69,48,aol.com,com,97543,225532,aol.com,com,70,49,97328,224491 -1615,905,proboards.com,com,19833,33110,proboards.com,com,1613,902,19835,33135 -1616,906,ccleaner.com,com,19831,32507,ccleaner.com,com,1614,903,19834,32463 -1617,907,doodle.com,com,19827,32902,doodle.com,com,1621,909,19787,32822 -``` - -- Delta csv file - -```bash -% cat ./examples/delta-small.csv -15,12,wordpress.com,com,207790,792348,wordpress.com,com,15,12,207589,791634 -43,1,europa.eu,eu,116613,353412,europa.eu,eu,41,1,119129,359818 -69,1048,aol.com,com,97543,225532,aol.com,com,70,49,97328,224491 -24564,907,completely-newsite.com,com,19827,32902,completely-newsite.com,com,1621,909,19787,32822 -``` - -- On run of csvdiff - -```bash -% csvdiff run --base ./examples/base-small.csv --delta ./examples/delta-small.csv --primary-key 0 - -# Additions: 1 - -24564,907,completely-newsite.com,com,19827,32902,completely-newsite.com,com,1621,909,19787,32822 - -# Modifications: 1 - -69,1048,aol.com,com,97543,225532,aol.com,com,70,49,97328,224491 -2018/04/23 21:43:30 csvdiff took 1.361831ms -``` \ No newline at end of file diff --git a/glide.lock b/glide.lock index 50b8ccf..282569f 100644 --- a/glide.lock +++ b/glide.lock @@ -1,12 +1,8 @@ -hash: a42b65bfb8f09e42a12971ff7d5c5b0106a86382a5dd90fc3e5b29d58c913f03 -updated: 2018-04-24T14:50:45.370899074+05:30 +hash: 9e942e84862c4047de33d9cbf763351d1baa5cd7e8f4c2745af2856a9570e0a8 +updated: 2018-04-29T14:34:53.907789+05:30 imports: -- name: github.com/briandowns/spinner - version: 48dbb65d7bd5c74ab50d53d04c949f20e3d14944 - name: github.com/cespare/xxhash version: 5c37fe3735342a2e0d01c87a907579987c8936cc -- name: github.com/fatih/color - version: 507f6050b8568533fb3f5504de8e5205fa62a114 - name: github.com/fsnotify/fsnotify version: c2828203cd70a50dcccfb2761f8b1f8ceef9a8e9 - name: github.com/hashicorp/hcl @@ -25,10 +21,6 @@ imports: version: 76626ae9c91c4f2a10f34cad8ce83ea42c93bb75 - name: github.com/magiconair/properties version: 2c9e9502788518c97fe44e8955cd069417ee89df -- name: github.com/mattn/go-colorable - version: 7dc3415be66d7cc68bf0182f35c8d31f8d2ad8a7 -- name: github.com/mattn/go-isatty - version: 6ca4dbf54d38eea1a992b3c722a76a5d1c4cb25c - name: github.com/mitchellh/go-homedir version: b8bc1bf767474819792c23f32d8286a45736f1c6 - name: github.com/mitchellh/mapstructure diff --git a/glide.yaml b/glide.yaml index 51ed9d1..8439936 100644 --- a/glide.yaml +++ b/glide.yaml @@ -1,7 +1,5 @@ package: github.com/aswinkarthik93/csvdiff import: -- package: github.com/briandowns/spinner - version: ^1.0.0 - package: github.com/cespare/xxhash version: ^1.0.0 - package: github.com/mitchellh/go-homedir diff --git a/pkg/digest/compare.go b/pkg/digest/compare.go index 22f09ff..1c466e3 100644 --- a/pkg/digest/compare.go +++ b/pkg/digest/compare.go @@ -1,26 +1,102 @@ package digest -// Compare compares two Digest maps and returns the additions and modification -// keys as arrays. -func Compare(baseDigest, newDigest map[uint64]uint64) (additions []uint64, modifications []uint64) { - maxSize := len(newDigest) - additions = make([]uint64, maxSize) - modifications = make([]uint64, maxSize) - - additionCounter := 0 - modificationCounter := 0 - for k, newVal := range newDigest { - if oldVal, present := baseDigest[k]; present { - if newVal != oldVal { - //Modifications - modifications[modificationCounter] = k - modificationCounter++ +import ( + "encoding/csv" + "runtime" + "strings" + "sync" +) + +// Difference represents the additions and modifications +// between the two Configs +type Difference struct { + Additions []string + Modifications []string +} + +type messageType int + +const ( + addition messageType = iota + modification messageType = iota +) + +type diffMessage struct { + _type messageType + value string +} + +// Diff will differentiate between two given configs +func Diff(baseConfig, deltaConfig *Config) Difference { + maxProcs := runtime.NumCPU() + base := Create(baseConfig) + + additions := make([]string, 0, len(base)) + modifications := make([]string, 0, len(base)) + + messageChan := make(chan []diffMessage, bufferSize*maxProcs) + + go readAndCompare(base, deltaConfig, messageChan) + + for msgs := range messageChan { + for _, msg := range msgs { + if msg._type == addition { + additions = append(additions, msg.value) + } else if msg._type == modification { + modifications = append(modifications, msg.value) + } + } + } + + return Difference{Additions: additions, Modifications: modifications} +} + +func readAndCompare(base map[uint64]uint64, config *Config, msgChannel chan<- []diffMessage) { + reader := csv.NewReader(config.Reader) + var wg sync.WaitGroup + for { + lines, eofReached := getNextNLines(reader) + wg.Add(1) + go compareDigestForNLines(base, lines, config, msgChannel, &wg) + + if eofReached { + break + } + } + wg.Wait() + close(msgChannel) +} + +func compareDigestForNLines(base map[uint64]uint64, + lines [][]string, + config *Config, + msgChannel chan<- []diffMessage, + wg *sync.WaitGroup, +) { + output := make([]diffMessage, len(lines)) + diffCounter := 0 + for _, line := range lines { + digest := CreateDigest(line, config.Key, config.Value) + if baseValue, present := base[digest.Key]; present { + // Present in both base and delta + if baseValue != digest.Value { + // Modification + output[diffCounter] = diffMessage{ + value: strings.Join(line, Separator), + _type: modification, + } + diffCounter++ } } else { - //Additions - additions[additionCounter] = k - additionCounter++ + // Not present in base. So Addition. + output[diffCounter] = diffMessage{ + value: strings.Join(line, Separator), + _type: addition, + } + diffCounter++ } } - return additions[:additionCounter], modifications[:modificationCounter] + + msgChannel <- output[:diffCounter] + wg.Done() } diff --git a/pkg/digest/compare_test.go b/pkg/digest/compare_test.go index 2441fda..3f94e29 100644 --- a/pkg/digest/compare_test.go +++ b/pkg/digest/compare_test.go @@ -1,30 +1,50 @@ -package digest +package digest_test import ( + "strings" "testing" + "github.com/aswinkarthik93/csvdiff/pkg/digest" "github.com/stretchr/testify/assert" ) -func TestCompare(t *testing.T) { - baseDigest := map[uint64]uint64{ - 10000106069522789940: 11608188164212916000, - 10000305084889337335: 11796412213504516000, - 10024909476616779194: 14500526491611670000, - 1004896778135186857: 15778011848259830000, +func TestDiff(t *testing.T) { + base := `1,col-1,col-2,col-3,one-value +2,col-1,col-2,col-3,two-value +3,col-1,col-2,col-3,three-value +100,col-1,col-2,col-3,hundred-value +` + + delta := `1,col-1,col-2,col-3,one-value +2,col-1,col-2,col-3,two-value-modified +4,col-1,col-2,col-3,four-value-added +100,col-1,col-2,col-3,hundred-value-modified +5,col-1,col-2,col-3,five-value-added +` + + baseConfig := &digest.Config{ + Reader: strings.NewReader(base), + Key: []int{0}, } - newDigest := map[uint64]uint64{ - 10000106069522789940: 11608188164212916000, - 10000305084889337335: 11796412213504516001, - 10049141081086325814: 12259600610026582000, + deltaConfig := &digest.Config{ + Reader: strings.NewReader(delta), + Key: []int{0}, } - additions, modifications := Compare(baseDigest, newDigest) + expected := digest.Difference{ + Additions: []string{ + "4,col-1,col-2,col-3,four-value-added", + "5,col-1,col-2,col-3,five-value-added", + }, + Modifications: []string{ + "2,col-1,col-2,col-3,two-value-modified", + "100,col-1,col-2,col-3,hundred-value-modified", + }, + } - expectedAdditions := []uint64{10049141081086325814} - expectedModifications := []uint64{10000305084889337335} + actual := digest.Diff(baseConfig, deltaConfig) - assert.Equal(t, expectedAdditions, additions) - assert.Equal(t, expectedModifications, modifications) + assert.ElementsMatch(t, expected.Modifications, actual.Modifications) + assert.ElementsMatch(t, expected.Additions, actual.Additions) } diff --git a/pkg/digest/digest.go b/pkg/digest/digest.go index 90a20d1..7adab19 100644 --- a/pkg/digest/digest.go +++ b/pkg/digest/digest.go @@ -3,7 +3,8 @@ package digest import ( "encoding/csv" "io" - "strings" + "runtime" + "sync" "github.com/cespare/xxhash" ) @@ -16,63 +17,85 @@ const Separator = "," type Digest struct { Key uint64 Value uint64 - Row string } // CreateDigest creates a Digest for each line of csv. // There will be one Digest per line func CreateDigest(csv []string, pKey Positions, pRow Positions) Digest { - row := strings.Join(csv, Separator) key := xxhash.Sum64String(pKey.MapToValue(csv)) digest := xxhash.Sum64String(pRow.MapToValue(csv)) - return Digest{Key: key, Value: digest, Row: row} + return Digest{Key: key, Value: digest} } // Config represents configurations that can be passed // to create a Digest. type Config struct { - KeyPositions []int - Key Positions - Value Positions - Reader io.Reader - Writer io.Writer - SourceMap bool + Key Positions + Value Positions + Reader io.Reader } // NewConfig creates an instance of Config struct. -func NewConfig(r io.Reader, createSourceMap bool, primaryKey Positions, valueColumns Positions) *Config { +func NewConfig(r io.Reader, primaryKey Positions, valueColumns Positions) *Config { return &Config{ - Reader: r, - SourceMap: createSourceMap, - Key: primaryKey, - Value: valueColumns, + Reader: r, + Key: primaryKey, + Value: valueColumns, } } +const bufferSize = 512 + // Create can create a Digest using the Configurations passed. // It returns the digest as a map[uint64]uint64. // It can also keep track of the Source line. -func Create(config *Config) (map[uint64]uint64, map[uint64]string, error) { +func Create(config *Config) map[uint64]uint64 { + maxProcs := runtime.NumCPU() reader := csv.NewReader(config.Reader) output := make(map[uint64]uint64) - sourceMap := make(map[uint64]string) - for { - line, err := reader.Read() - if err != nil { - if err == io.EOF { - break - } - return nil, nil, err + + digestChannel := make(chan []Digest, bufferSize*maxProcs) + + go readAndProcess(config, reader, digestChannel) + + for digests := range digestChannel { + for _, digest := range digests { + output[digest.Key] = digest.Value } - digest := CreateDigest(line, config.Key, config.Value) - output[digest.Key] = digest.Value - if config.SourceMap { - sourceMap[digest.Key] = digest.Row + } + + return output +} + +func readAndProcess(config *Config, reader *csv.Reader, digestChannel chan<- []Digest) { + var wg sync.WaitGroup + for { + lines, eofReached := getNextNLines(reader) + wg.Add(1) + go createDigestForNLines(lines, config, digestChannel, &wg) + + if eofReached { + break } } + wg.Wait() + close(digestChannel) +} + +func createDigestForNLines(lines [][]string, + config *Config, + digestChannel chan<- []Digest, + wg *sync.WaitGroup, +) { + output := make([]Digest, len(lines)) + + for i, line := range lines { + output[i] = CreateDigest(line, config.Key, config.Value) + } - return output, sourceMap, nil + digestChannel <- output + wg.Done() } diff --git a/pkg/digest/digest_test.go b/pkg/digest/digest_test.go index cc4604c..b456bfa 100644 --- a/pkg/digest/digest_test.go +++ b/pkg/digest/digest_test.go @@ -1,10 +1,10 @@ -package digest +package digest_test import ( - "bytes" "strings" "testing" + "github.com/aswinkarthik93/csvdiff/pkg/digest" "github.com/cespare/xxhash" "github.com/stretchr/testify/assert" ) @@ -14,9 +14,9 @@ func TestCreateDigest(t *testing.T) { firstKey := xxhash.Sum64String("1") firstLineDigest := xxhash.Sum64String(firstLine) - expectedDigest := Digest{Key: firstKey, Value: firstLineDigest, Row: firstLine} + expectedDigest := digest.Digest{Key: firstKey, Value: firstLineDigest} - actualDigest := CreateDigest(strings.Split(firstLine, Separator), []int{0}, []int{}) + actualDigest := digest.CreateDigest(strings.Split(firstLine, digest.Separator), []int{0}, []int{}) assert.Equal(t, expectedDigest, actualDigest) } @@ -25,42 +25,32 @@ func TestDigestForFile(t *testing.T) { firstLine := "1,first-line,some-columne,friday" firstKey := xxhash.Sum64String("1") firstDigest := xxhash.Sum64String(firstLine) + fridayDigest := xxhash.Sum64String("friday") secondLine := "2,second-line,nobody-needs-this,saturday" secondKey := xxhash.Sum64String("2") secondDigest := xxhash.Sum64String(secondLine) + saturdayDigest := xxhash.Sum64String("saturday") - var outputBuffer bytes.Buffer - - testConfig := &Config{ - Reader: strings.NewReader(firstLine + "\n" + secondLine), - Writer: &outputBuffer, - KeyPositions: []int{0}, - Key: []int{0}, - SourceMap: true, + testConfig := &digest.Config{ + Reader: strings.NewReader(firstLine + "\n" + secondLine), + Key: []int{0}, } - actualDigest, sourceMap, err := Create(testConfig) + actualDigest := digest.Create(testConfig) expectedDigest := map[uint64]uint64{firstKey: firstDigest, secondKey: secondDigest} - expectedSourceMap := map[uint64]string{firstKey: firstLine, secondKey: secondLine} - assert.Nil(t, err, "error at DigestForFile") assert.Equal(t, expectedDigest, actualDigest) - assert.Equal(t, expectedSourceMap, sourceMap) - // No source map - testConfigWithoutSourceMap := &Config{ - Reader: strings.NewReader(firstLine + "\n" + secondLine), - Writer: &outputBuffer, - KeyPositions: []int{0}, - Key: []int{0}, - SourceMap: false, + testConfig = &digest.Config{ + Reader: strings.NewReader(firstLine + "\n" + secondLine), + Key: []int{0}, + Value: []int{3}, } - actualDigest, sourceMap, err = Create(testConfigWithoutSourceMap) + actualDigest = digest.Create(testConfig) + expectedDigest = map[uint64]uint64{firstKey: fridayDigest, secondKey: saturdayDigest} - assert.Nil(t, err, "error at DigestForFile") assert.Equal(t, expectedDigest, actualDigest) - assert.Equal(t, map[uint64]string{}, sourceMap) } diff --git a/pkg/digest/positions.go b/pkg/digest/positions.go index e69b2cf..03a8581 100644 --- a/pkg/digest/positions.go +++ b/pkg/digest/positions.go @@ -9,23 +9,12 @@ type Positions []int // their respective positions and concatenates // them using Separator as a string. func (p Positions) MapToValue(csv []string) string { - if p.Length() == 0 { + if len(p) == 0 { return strings.Join(csv, Separator) } - output := make([]string, p.Length()) - for i, pos := range p.Items() { + output := make([]string, len(p)) + for i, pos := range p { output[i] = csv[pos] } return strings.Join(output, Separator) } - -// Length returns the size of the Positions array. -func (p Positions) Length() int { - return len([]int(p)) -} - -// Items returns the elements of the Positions array -// as an array of int -func (p Positions) Items() []int { - return []int(p) -} diff --git a/pkg/digest/positions_test.go b/pkg/digest/positions_test.go index 90b0c2f..2749d5e 100644 --- a/pkg/digest/positions_test.go +++ b/pkg/digest/positions_test.go @@ -1,14 +1,15 @@ -package digest +package digest_test import ( "strings" "testing" + "github.com/aswinkarthik93/csvdiff/pkg/digest" "github.com/stretchr/testify/assert" ) func TestPositionsMapValues(t *testing.T) { - positions := Positions([]int{0, 3}) + positions := digest.Positions([]int{0, 3}) csv := []string{"zero", "one", "two", "three"} actual := positions.MapToValue(csv) @@ -18,24 +19,11 @@ func TestPositionsMapValues(t *testing.T) { } func TestPositionsMapValuesReturnsCompleteStringCsvIfEmpty(t *testing.T) { - positions := Positions([]int{}) + positions := digest.Positions([]int{}) csv := []string{"zero", "one", "two", "three"} actual := positions.MapToValue(csv) - expected := strings.Join(csv, Separator) + expected := strings.Join(csv, digest.Separator) assert.Equal(t, expected, actual) } - -func TestPositionsLength(t *testing.T) { - positions := Positions([]int{0, 3}) - - assert.Equal(t, 2, positions.Length()) -} - -func TestPositionsItems(t *testing.T) { - items := []int{0, 3} - positions := Positions(items) - - assert.Equal(t, items, positions.Items()) -} diff --git a/pkg/digest/utils.go b/pkg/digest/utils.go new file mode 100644 index 0000000..52048f3 --- /dev/null +++ b/pkg/digest/utils.go @@ -0,0 +1,26 @@ +package digest + +import ( + "encoding/csv" + "io" +) + +func getNextNLines(reader *csv.Reader) ([][]string, bool) { + lines := make([][]string, bufferSize) + + lineCount := 0 + eofReached := false + for ; lineCount < bufferSize; lineCount++ { + line, err := reader.Read() + lines[lineCount] = line + if err != nil { + if err == io.EOF { + eofReached = true + break + } + panic(err) + } + } + + return lines[:lineCount], eofReached +}