1package repositories 2 3import ( 4 "database/sql" 5 "fmt" 6 7 "github.com/pkg/errors" 8 9 ent "repodiff/entities" 10 "repodiff/mappers" 11 repoSQL "repodiff/persistence/sql" 12) 13 14type GlobalDenormalizer struct { 15 db *sql.DB 16} 17 18type ScopedDenormalizer struct { 19 db *sql.DB 20 target ent.DiffTarget 21 mappedTarget ent.MappedDiffTarget 22} 23 24func (g GlobalDenormalizer) DenormalizeToTopCommitter() error { 25 table := "denormalized_view_top_committer" 26 if _, err := g.db.Exec( 27 fmt.Sprintf( 28 "TRUNCATE TABLE %s", 29 table, 30 ), 31 ); err != nil { 32 return err 33 } 34 _, err := g.db.Exec( 35 fmt.Sprintf( 36 `INSERT INTO %s ( 37 upstream_target_id, 38 downstream_target_id, 39 surrogate_id, 40 committer, 41 commits, 42 line_changes, 43 tech_area, 44 upstream_url, 45 upstream_branch, 46 downstream_url, 47 downstream_branch 48 ) ( 49 SELECT 50 upstream_target_id, 51 downstream_target_id, 52 @rn:=@rn+1 AS surrogate_id, 53 committer, 54 commits, 55 line_changes, 56 tech_area, 57 upstream_url, 58 upstream_branch, 59 downstream_url, 60 downstream_branch 61 FROM ( 62 SELECT upstream_target_id, 63 downstream_target_id, 64 author as committer, 65 tech_area, 66 COUNT(*) AS commits, 67 SUM(0) AS line_changes, 68 upstream_url, 69 upstream_branch, 70 downstream_url, 71 downstream_branch 72 FROM denormalized_view_recent_commit GROUP BY 73 author, 74 tech_area, 75 upstream_target_id, 76 downstream_target_id, 77 upstream_url, 78 upstream_branch, 79 downstream_url, 80 downstream_branch ORDER BY upstream_target_id, 81 downstream_target_id 82 ) t1, 83 (SELECT @rn:=0) t2 84 )`, 85 table, 86 ), 87 ) 88 return err 89} 90 91func (g GlobalDenormalizer) DenormalizeToTopTechArea() error { 92 table := "denormalized_view_top_tech_area" 93 if _, err := g.db.Exec( 94 fmt.Sprintf( 95 "TRUNCATE TABLE %s", 96 table, 97 ), 98 ); err != nil { 99 return err 100 } 101 _, err := g.db.Exec( 102 fmt.Sprintf( 103 `INSERT INTO %s ( 104 upstream_target_id, 105 downstream_target_id, 106 surrogate_id, 107 tech_area, 108 commits, 109 line_changes, 110 upstream_url, 111 upstream_branch, 112 downstream_url, 113 downstream_branch 114 ) ( 115 SELECT 116 upstream_target_id, 117 downstream_target_id, 118 @rn:=@rn+1 AS surrogate_id, 119 tech_area, 120 commits, 121 line_changes, 122 upstream_url, 123 upstream_branch, 124 downstream_url, 125 downstream_branch FROM ( 126 SELECT 127 upstream_target_id, 128 downstream_target_id, 129 tech_area, 130 COUNT(*) AS commits, 131 SUM(0) AS line_changes, 132 upstream_url, 133 upstream_branch, 134 downstream_url, 135 downstream_branch 136 FROM denormalized_view_recent_commit GROUP BY 137 tech_area, 138 upstream_target_id, 139 downstream_target_id, 140 upstream_url, 141 upstream_branch, 142 downstream_url, 143 downstream_branch 144 ORDER BY 145 upstream_target_id, 146 downstream_target_id 147 ) t1, 148 (SELECT @rn:=0) t2 149 )`, 150 table, 151 ), 152 ) 153 return err 154} 155 156func (s ScopedDenormalizer) DenormalizeToRecentView(diffRows []ent.AnalyzedDiffRow) error { 157 table := "denormalized_view_recent_project" 158 if err := s.deleteExistingView(table); err != nil { 159 return err 160 } 161 return errors.Wrap( 162 repoSQL.SingleTransactionInsert( 163 s.db, 164 fmt.Sprintf( 165 `INSERT INTO %s ( 166 upstream_target_id, 167 downstream_target_id, 168 row_index, 169 date, 170 downstream_project, 171 upstream_project, 172 status, 173 files_changed, 174 line_insertions, 175 line_deletions, 176 line_changes, 177 commits_not_upstreamed, 178 project_type, 179 upstream_url, 180 upstream_branch, 181 downstream_url, 182 downstream_branch 183 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, 184 table, 185 ), 186 s.rowsWithScopedIndices( 187 mappers.DiffRowsToDenormalizedCols(diffRows), 188 ), 189 ), 190 errorMessageForTable(table), 191 ) 192} 193 194func NewGlobalDenormalizerRepository() (GlobalDenormalizer, error) { 195 db, err := repoSQL.GetDBConnectionPool() 196 return GlobalDenormalizer{ 197 db: db, 198 }, errors.Wrap(err, "Could not establish a database connection") 199} 200 201func NewScopedDenormalizerRepository(target ent.DiffTarget, mappedTarget ent.MappedDiffTarget) (ScopedDenormalizer, error) { 202 db, err := repoSQL.GetDBConnectionPool() 203 return ScopedDenormalizer{ 204 db: db, 205 target: cleanedDiffTarget(target), 206 mappedTarget: mappedTarget, 207 }, errors.Wrap(err, "Could not establish a database connection") 208} 209 210func (s ScopedDenormalizer) DenormalizeToChangesOverTime(diffRows []ent.AnalyzedDiffRow) error { 211 // This query only inserts a single row into the database. If it becomes problematic, this 212 // could become more efficient without the prepared statement embedded in the SingleTransactionInsert 213 // function 214 table := "denormalized_view_changes_over_time" 215 return errors.Wrap( 216 repoSQL.SingleTransactionInsert( 217 s.db, 218 fmt.Sprintf( 219 `INSERT IGNORE INTO %s ( 220 upstream_target_id, 221 downstream_target_id, 222 datastudio_datetime, 223 modified_projects, 224 line_changes, 225 files_changed, 226 upstream_url, 227 upstream_branch, 228 downstream_url, 229 downstream_branch 230 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, 231 table, 232 ), 233 s.rowsWithScopedIndices( 234 mappers.DiffRowsToAggregateChangesOverTime(diffRows), 235 ), 236 ), 237 errorMessageForTable(table), 238 ) 239} 240 241func (s ScopedDenormalizer) DenormalizeToRecentCommits(commitRows []ent.AnalyzedCommitRow, commitToTimestamp map[string]ent.RepoTimestamp) error { 242 table := "denormalized_view_recent_commit" 243 if err := s.deleteExistingView(table); err != nil { 244 return err 245 } 246 return errors.Wrap( 247 repoSQL.SingleTransactionInsert( 248 s.db, 249 fmt.Sprintf( 250 `INSERT INTO %s ( 251 upstream_target_id, 252 downstream_target_id, 253 row_index, 254 commit_, 255 downstream_project, 256 author, 257 subject, 258 tech_area, 259 project_type, 260 first_seen_datastudio_datetime, 261 upstream_url, 262 upstream_branch, 263 downstream_url, 264 downstream_branch 265 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, 266 table, 267 ), 268 s.rowsWithScopedIndices( 269 mappers.CommitRowsToDenormalizedCols(commitRows, commitToTimestamp), 270 ), 271 ), 272 errorMessageForTable(table), 273 ) 274} 275 276func (s ScopedDenormalizer) deleteExistingView(tableName string) error { 277 _, err := s.db.Exec( 278 fmt.Sprintf( 279 `DELETE FROM %s 280 WHERE 281 upstream_target_id = ? 282 AND downstream_target_id = ?`, 283 tableName, 284 ), 285 s.mappedTarget.UpstreamTarget, 286 s.mappedTarget.DownstreamTarget, 287 ) 288 return err 289} 290 291func (s ScopedDenormalizer) rowsWithScopedIndices(rowsOfCols [][]interface{}) [][]interface{} { 292 return mappers.PrependMappedDiffTarget( 293 s.mappedTarget, 294 mappers.AppendDiffTarget( 295 s.target, 296 rowsOfCols, 297 ), 298 ) 299} 300 301func errorMessageForTable(tableName string) string { 302 return fmt.Sprintf("Error inserting rows into %s", tableName) 303} 304