• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1package sql
2
3import (
4	"database/sql"
5	"fmt"
6	"runtime"
7	"sync"
8
9	"github.com/GoogleCloudPlatform/cloudsql-proxy/proxy/dialers/mysql"
10	"github.com/pkg/errors"
11
12	"repodiff/constants"
13)
14
15var mux sync.Mutex
16var db *sql.DB
17
18type handleRowFn func(*sql.Rows)
19
20func newDBConnectionPool() (*sql.DB, error) {
21	cfg := mysql.Cfg(
22		constants.GetConfigVar("GCP_DB_INSTANCE_CONNECTION_NAME"),
23		constants.GetConfigVar("GCP_DB_USER"),
24		constants.GetConfigVar("GCP_DB_PASSWORD"),
25	)
26	cfg.DBName = constants.GetConfigVar("GCP_DB_NAME")
27	return mysql.DialCfg(cfg)
28}
29
30func maxParallelism() int {
31	maxProcs := runtime.GOMAXPROCS(0)
32	numCPU := runtime.NumCPU()
33	if maxProcs < numCPU {
34		return maxProcs
35	}
36	return numCPU
37}
38
39func GetDBConnectionPool() (*sql.DB, error) {
40	if db != nil {
41		return db, nil
42	}
43	mux.Lock()
44	defer mux.Unlock()
45
46	// check, lock, check; redundant check for thread safety
47	if db != nil {
48		return db, nil
49	}
50	var err error
51	db, err = newDBConnectionPool()
52	if err != nil {
53		return nil, err
54	}
55	connections := maxParallelism()
56
57	// unless explicitly specified, the default connection pool size is unlimited
58	db.SetMaxOpenConns(connections)
59
60	// unless explicitly specified, the default is 0 where idle connections are immediately closed
61	db.SetMaxIdleConns(connections)
62	return db, nil
63}
64
65func SingleTransactionInsert(db *sql.DB, insertQuery string, rowsOfCols [][]interface{}) error {
66	tx, err := db.Begin()
67	if err != nil {
68		return errors.Wrap(err, "Error starting transaction")
69	}
70	stmt, err := tx.Prepare(insertQuery)
71	if err != nil {
72		return errors.Wrap(err, "Error preparing statement")
73	}
74	defer stmt.Close()
75
76	for _, cols := range rowsOfCols {
77		_, err = stmt.Exec(
78			cols...,
79		)
80		if err != nil {
81			tx.Rollback()
82			return errors.Wrap(err, "Error inserting values")
83		}
84	}
85	err = tx.Commit()
86	if err != nil {
87		tx.Rollback()
88		return errors.Wrap(
89			err,
90			"Error committing transaction",
91		)
92	}
93	return nil
94}
95
96func Select(db *sql.DB, rowHandler handleRowFn, query string, args ...interface{}) error {
97	rows, err := db.Query(
98		query,
99		args...,
100	)
101	if err != nil {
102		return err
103	}
104	defer rows.Close()
105
106	for rows.Next() {
107		rowHandler(rows)
108	}
109	if err = rows.Err(); err != nil {
110		return err
111	}
112	return nil
113}
114
115func TruncateTable(db *sql.DB, tableName string) error {
116	_, err := db.Exec(
117		fmt.Sprintf("TRUNCATE TABLE %s", tableName),
118	)
119	return err
120}
121