package repositories import ( "database/sql" "fmt" "github.com/pkg/errors" ent "repodiff/entities" "repodiff/mappers" repoSQL "repodiff/persistence/sql" ) type GlobalDenormalizer struct { db *sql.DB } type ScopedDenormalizer struct { db *sql.DB target ent.DiffTarget mappedTarget ent.MappedDiffTarget } func (g GlobalDenormalizer) DenormalizeToTopCommitter() error { table := "denormalized_view_top_committer" if _, err := g.db.Exec( fmt.Sprintf( "TRUNCATE TABLE %s", table, ), ); err != nil { return err } _, err := g.db.Exec( fmt.Sprintf( `INSERT INTO %s ( upstream_target_id, downstream_target_id, surrogate_id, committer, commits, line_changes, tech_area, upstream_url, upstream_branch, downstream_url, downstream_branch ) ( SELECT upstream_target_id, downstream_target_id, @rn:=@rn+1 AS surrogate_id, committer, commits, line_changes, tech_area, upstream_url, upstream_branch, downstream_url, downstream_branch FROM ( SELECT upstream_target_id, downstream_target_id, author as committer, tech_area, COUNT(*) AS commits, SUM(0) AS line_changes, upstream_url, upstream_branch, downstream_url, downstream_branch FROM denormalized_view_recent_commit GROUP BY author, tech_area, upstream_target_id, downstream_target_id, upstream_url, upstream_branch, downstream_url, downstream_branch ORDER BY upstream_target_id, downstream_target_id ) t1, (SELECT @rn:=0) t2 )`, table, ), ) return err } func (g GlobalDenormalizer) DenormalizeToTopTechArea() error { table := "denormalized_view_top_tech_area" if _, err := g.db.Exec( fmt.Sprintf( "TRUNCATE TABLE %s", table, ), ); err != nil { return err } _, err := g.db.Exec( fmt.Sprintf( `INSERT INTO %s ( upstream_target_id, downstream_target_id, surrogate_id, tech_area, commits, line_changes, upstream_url, upstream_branch, downstream_url, downstream_branch ) ( SELECT upstream_target_id, downstream_target_id, @rn:=@rn+1 AS surrogate_id, tech_area, commits, line_changes, upstream_url, upstream_branch, downstream_url, downstream_branch FROM ( SELECT upstream_target_id, downstream_target_id, tech_area, COUNT(*) AS commits, SUM(0) AS line_changes, upstream_url, upstream_branch, downstream_url, downstream_branch FROM denormalized_view_recent_commit GROUP BY tech_area, upstream_target_id, downstream_target_id, upstream_url, upstream_branch, downstream_url, downstream_branch ORDER BY upstream_target_id, downstream_target_id ) t1, (SELECT @rn:=0) t2 )`, table, ), ) return err } func (s ScopedDenormalizer) DenormalizeToRecentView(diffRows []ent.AnalyzedDiffRow) error { table := "denormalized_view_recent_project" if err := s.deleteExistingView(table); err != nil { return err } return errors.Wrap( repoSQL.SingleTransactionInsert( s.db, fmt.Sprintf( `INSERT INTO %s ( upstream_target_id, downstream_target_id, row_index, date, downstream_project, upstream_project, status, files_changed, line_insertions, line_deletions, line_changes, commits_not_upstreamed, project_type, upstream_url, upstream_branch, downstream_url, downstream_branch ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, table, ), s.rowsWithScopedIndices( mappers.DiffRowsToDenormalizedCols(diffRows), ), ), errorMessageForTable(table), ) } func NewGlobalDenormalizerRepository() (GlobalDenormalizer, error) { db, err := repoSQL.GetDBConnectionPool() return GlobalDenormalizer{ db: db, }, errors.Wrap(err, "Could not establish a database connection") } func NewScopedDenormalizerRepository(target ent.DiffTarget, mappedTarget ent.MappedDiffTarget) (ScopedDenormalizer, error) { db, err := repoSQL.GetDBConnectionPool() return ScopedDenormalizer{ db: db, target: cleanedDiffTarget(target), mappedTarget: mappedTarget, }, errors.Wrap(err, "Could not establish a database connection") } func (s ScopedDenormalizer) DenormalizeToChangesOverTime(diffRows []ent.AnalyzedDiffRow) error { // This query only inserts a single row into the database. If it becomes problematic, this // could become more efficient without the prepared statement embedded in the SingleTransactionInsert // function table := "denormalized_view_changes_over_time" return errors.Wrap( repoSQL.SingleTransactionInsert( s.db, fmt.Sprintf( `INSERT IGNORE INTO %s ( upstream_target_id, downstream_target_id, datastudio_datetime, modified_projects, line_changes, files_changed, upstream_url, upstream_branch, downstream_url, downstream_branch ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, table, ), s.rowsWithScopedIndices( mappers.DiffRowsToAggregateChangesOverTime(diffRows), ), ), errorMessageForTable(table), ) } func (s ScopedDenormalizer) DenormalizeToRecentCommits(commitRows []ent.AnalyzedCommitRow, commitToTimestamp map[string]ent.RepoTimestamp) error { table := "denormalized_view_recent_commit" if err := s.deleteExistingView(table); err != nil { return err } return errors.Wrap( repoSQL.SingleTransactionInsert( s.db, fmt.Sprintf( `INSERT INTO %s ( upstream_target_id, downstream_target_id, row_index, commit_, downstream_project, author, subject, tech_area, project_type, first_seen_datastudio_datetime, upstream_url, upstream_branch, downstream_url, downstream_branch ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, table, ), s.rowsWithScopedIndices( mappers.CommitRowsToDenormalizedCols(commitRows, commitToTimestamp), ), ), errorMessageForTable(table), ) } func (s ScopedDenormalizer) deleteExistingView(tableName string) error { _, err := s.db.Exec( fmt.Sprintf( `DELETE FROM %s WHERE upstream_target_id = ? AND downstream_target_id = ?`, tableName, ), s.mappedTarget.UpstreamTarget, s.mappedTarget.DownstreamTarget, ) return err } func (s ScopedDenormalizer) rowsWithScopedIndices(rowsOfCols [][]interface{}) [][]interface{} { return mappers.PrependMappedDiffTarget( s.mappedTarget, mappers.AppendDiffTarget( s.target, rowsOfCols, ), ) } func errorMessageForTable(tableName string) string { return fmt.Sprintf("Error inserting rows into %s", tableName) }