Skip to content
This repository was archived by the owner on Sep 30, 2024. It is now read-only.

Commit afd3156

Browse files
authored
Code monitors: fix a bunch of stuff (#57546)
- Fixes an issue where creating a monitor will fail due to concurrent transaction usage - Fixes an issue where a timeout will cause a monitor to continually get re-run - Cleans up some old feature flags - Cleans up some dead code
1 parent 9632aa9 commit afd3156

File tree

7 files changed

+98
-105
lines changed

7 files changed

+98
-105
lines changed

cmd/frontend/internal/codemonitors/resolvers/BUILD.bazel

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ go_library(
1313
"//internal/codemonitors",
1414
"//internal/codemonitors/background",
1515
"//internal/database",
16-
"//internal/featureflag",
1716
"//internal/gqlutil",
1817
"//internal/httpcli",
1918
"//lib/errors",

cmd/frontend/internal/codemonitors/resolvers/resolvers.go

Lines changed: 34 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515
"github.com/sourcegraph/sourcegraph/internal/codemonitors"
1616
"github.com/sourcegraph/sourcegraph/internal/codemonitors/background"
1717
"github.com/sourcegraph/sourcegraph/internal/database"
18-
"github.com/sourcegraph/sourcegraph/internal/featureflag"
1918
"github.com/sourcegraph/sourcegraph/internal/gqlutil"
2019
"github.com/sourcegraph/sourcegraph/internal/httpcli"
2120
"github.com/sourcegraph/sourcegraph/lib/errors"
@@ -122,14 +121,22 @@ func (r *Resolver) CreateCodeMonitor(ctx context.Context, args *graphqlbackend.C
122121
return nil, err
123122
}
124123

124+
userID, orgID, err := graphqlbackend.UnmarshalNamespaceToIDs(args.Monitor.Namespace)
125+
if err != nil {
126+
return nil, err
127+
}
128+
129+
// Snapshot the state of the searched repos when the monitor is created so that
130+
// we can distinguish new repos. We run the snapshot outside the transaction because
131+
// search requires that the DB handle is not a transaction.
132+
resolvedRevisions, err := codemonitors.Snapshot(ctx, r.logger, r.db, args.Trigger.Query)
133+
if err != nil {
134+
return nil, err
135+
}
136+
125137
// Start transaction.
126138
var newMonitor *database.Monitor
127139
err = r.withTransact(ctx, func(tx *Resolver) error {
128-
userID, orgID, err := graphqlbackend.UnmarshalNamespaceToIDs(args.Monitor.Namespace)
129-
if err != nil {
130-
return err
131-
}
132-
133140
// Create monitor.
134141
m, err := tx.db.CodeMonitors().CreateMonitor(ctx, database.MonitorArgs{
135142
Description: args.Monitor.Description,
@@ -147,10 +154,9 @@ func (r *Resolver) CreateCodeMonitor(ctx context.Context, args *graphqlbackend.C
147154
return err
148155
}
149156

150-
if featureflag.FromContext(ctx).GetBoolOr("cc-repo-aware-monitors", true) {
151-
// Snapshot the state of the searched repos when the monitor is created so that
152-
// we can distinguish new repos.
153-
err = codemonitors.Snapshot(ctx, r.logger, tx.db, args.Trigger.Query, m.ID)
157+
// Save the snapshotted commit IDs
158+
for repoID, commitIDs := range resolvedRevisions {
159+
err = tx.db.CodeMonitors().UpsertLastSearched(ctx, m.ID, repoID, commitIDs)
154160
if err != nil {
155161
return err
156162
}
@@ -245,7 +251,7 @@ func (r *Resolver) UpdateCodeMonitor(ctx context.Context, args *graphqlbackend.U
245251
if err = tx.createActions(ctx, monitorID, toCreate); err != nil {
246252
return err
247253
}
248-
m, err := tx.updateCodeMonitor(ctx, args)
254+
m, err := tx.updateCodeMonitor(ctx, r.db, args)
249255
if err != nil {
250256
return err
251257
}
@@ -507,7 +513,9 @@ func splitActionIDs(args *graphqlbackend.UpdateCodeMonitorArgs, actionIDs []grap
507513
return toCreate, toDelete, nil
508514
}
509515

510-
func (r *Resolver) updateCodeMonitor(ctx context.Context, args *graphqlbackend.UpdateCodeMonitorArgs) (*monitor, error) {
516+
// updateCodeMonitor updates the code monitor in the database. We pass in "rawDB" because Snapshot requires that the
517+
// database being used is not in a transaction, and updateCodeMonitor is run with a transacted resolver.
518+
func (r *Resolver) updateCodeMonitor(ctx context.Context, rawDB database.DB, args *graphqlbackend.UpdateCodeMonitorArgs) (*monitor, error) {
511519
// Update monitor.
512520
monitorID, err := unmarshalMonitorID(args.Monitor.Id)
513521
if err != nil {
@@ -534,18 +542,23 @@ func (r *Resolver) updateCodeMonitor(ctx context.Context, args *graphqlbackend.U
534542
return nil, err
535543
}
536544

537-
if featureflag.FromContext(ctx).GetBoolOr("cc-repo-aware-monitors", true) {
538-
currentTrigger, err := r.db.CodeMonitors().GetQueryTriggerForMonitor(ctx, monitorID)
545+
currentTrigger, err := r.db.CodeMonitors().GetQueryTriggerForMonitor(ctx, monitorID)
546+
if err != nil {
547+
return nil, err
548+
}
549+
550+
// When the query is changed, take a new snapshot of the commits that currently
551+
// exist so we know where to start.
552+
if currentTrigger.QueryString != args.Trigger.Update.Query {
553+
// Snapshot the state of the searched repos when the monitor is created so that
554+
// we can distinguish new repos.
555+
// NOTE: we use rawDB here because Snapshot requires that the db conn is not a transaction.
556+
resolvedRevisions, err := codemonitors.Snapshot(ctx, r.logger, rawDB, args.Trigger.Update.Query)
539557
if err != nil {
540558
return nil, err
541559
}
542-
543-
// When the query is changed, take a new snapshot of the commits that currently
544-
// exist so we know where to start.
545-
if currentTrigger.QueryString != args.Trigger.Update.Query {
546-
// Snapshot the state of the searched repos when the monitor is created so that
547-
// we can distinguish new repos.
548-
err = codemonitors.Snapshot(ctx, r.logger, r.db, args.Trigger.Update.Query, monitorID)
560+
for repoID, commitIDs := range resolvedRevisions {
561+
err = r.db.CodeMonitors().UpsertLastSearched(ctx, monitorID, repoID, commitIDs)
549562
if err != nil {
550563
return nil, err
551564
}

internal/codemonitors/BUILD.bazel

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ go_library(
88
visibility = ["//:__subpackages__"],
99
deps = [
1010
"//internal/api",
11-
"//internal/api/internalapi",
1211
"//internal/database",
1312
"//internal/errcode",
1413
"//internal/gitserver/protocol",
@@ -48,7 +47,6 @@ go_test(
4847
"//internal/search/searcher",
4948
"//internal/types",
5049
"//schema",
51-
"@com_github_sourcegraph_log//:log",
5250
"@com_github_sourcegraph_log//logtest",
5351
"@com_github_stretchr_testify//require",
5452
],

internal/codemonitors/search.go

Lines changed: 45 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,12 @@ package codemonitors
22

33
import (
44
"context"
5-
"net/url"
65
"sort"
6+
"sync"
77

88
"github.com/sourcegraph/log"
99

1010
"github.com/sourcegraph/sourcegraph/internal/api"
11-
"github.com/sourcegraph/sourcegraph/internal/api/internalapi"
1211
"github.com/sourcegraph/sourcegraph/internal/database"
1312
"github.com/sourcegraph/sourcegraph/internal/errcode"
1413
gitprotocol "github.com/sourcegraph/sourcegraph/internal/gitserver/protocol"
@@ -45,7 +44,7 @@ func Search(ctx context.Context, logger log.Logger, db database.DB, query string
4544
}
4645

4746
hook := func(ctx context.Context, db database.DB, gs commit.GitserverClient, args *gitprotocol.SearchRequest, repoID api.RepoID, doSearch commit.DoSearchFunc) error {
48-
return hookWithID(ctx, db, logger, gs, monitorID, repoID, args, doSearch)
47+
return hookWithID(ctx, db, gs, monitorID, repoID, args, doSearch)
4948
}
5049
planJob, err = addCodeMonitorHook(planJob, hook)
5150
if err != nil {
@@ -74,7 +73,11 @@ func Search(ctx context.Context, logger log.Logger, db database.DB, query string
7473
// Snapshot runs a dummy search that just saves the current state of the searched repos in the database.
7574
// On subsequent runs, this allows us to treat all new repos or sets of args as something new that should
7675
// be searched from the beginning.
77-
func Snapshot(ctx context.Context, logger log.Logger, db database.DB, query string, monitorID int64) error {
76+
func Snapshot(ctx context.Context, logger log.Logger, db database.DB, query string) (map[api.RepoID][]string, error) {
77+
if db.Handle().InTransaction() {
78+
return nil, errors.New("Snapshot cannot be run in a transaction")
79+
}
80+
7881
searchClient := client.New(logger, db)
7982
inputs, err := searchClient.Plan(
8083
ctx,
@@ -85,48 +88,49 @@ func Snapshot(ctx context.Context, logger log.Logger, db database.DB, query stri
8588
search.Streaming,
8689
)
8790
if err != nil {
88-
return err
91+
return nil, err
8992
}
9093

9194
clients := searchClient.JobClients()
9295
planJob, err := jobutil.NewPlanJob(inputs, inputs.Plan)
9396
if err != nil {
94-
return err
97+
return nil, err
9598
}
9699

100+
var (
101+
mu sync.Mutex
102+
resolvedRevisions = make(map[api.RepoID][]string)
103+
)
104+
97105
hook := func(ctx context.Context, db database.DB, gs commit.GitserverClient, args *gitprotocol.SearchRequest, repoID api.RepoID, _ commit.DoSearchFunc) error {
98-
return snapshotHook(ctx, db, gs, args, monitorID, repoID)
106+
// Resolve the requested revisions into a static set of commit hashes
107+
commitHashes, err := gs.ResolveRevisions(ctx, args.Repo, args.Revisions)
108+
if err != nil {
109+
return err
110+
}
111+
112+
mu.Lock()
113+
resolvedRevisions[repoID] = commitHashes
114+
mu.Unlock()
115+
116+
return nil
99117
}
100118

101119
planJob, err = addCodeMonitorHook(planJob, hook)
102120
if err != nil {
103-
return err
121+
return nil, err
104122
}
105123

106-
// HACK(camdencheek): limit the concurrency of the commit search job
107-
// because the db passed into this function might actually be a transaction
108-
// and transactions cannot be used concurrently.
109-
planJob = limitConcurrency(planJob)
110-
111124
_, err = planJob.Run(ctx, clients, streaming.NewNullStream())
112-
return err
125+
if err != nil {
126+
return nil, err
127+
}
128+
129+
return resolvedRevisions, nil
113130
}
114131

115132
var ErrInvalidMonitorQuery = errors.New("code monitor cannot use different patterns for different repos")
116133

117-
func limitConcurrency(in job.Job) job.Job {
118-
return job.Map(in, func(j job.Job) job.Job {
119-
switch v := j.(type) {
120-
case *commit.SearchJob:
121-
cp := *v
122-
cp.Concurrency = 1
123-
return &cp
124-
default:
125-
return j
126-
}
127-
})
128-
}
129-
130134
func addCodeMonitorHook(in job.Job, hook commit.CodeMonitorHook) (_ job.Job, err error) {
131135
commitSearchJobCount := 0
132136
return job.Map(in, func(j job.Job) job.Job {
@@ -157,7 +161,6 @@ func addCodeMonitorHook(in job.Job, hook commit.CodeMonitorHook) (_ job.Job, err
157161
func hookWithID(
158162
ctx context.Context,
159163
db database.DB,
160-
logger log.Logger,
161164
gs commit.GitserverClient,
162165
monitorID int64,
163166
repoID api.RepoID,
@@ -196,53 +199,24 @@ func hookWithID(
196199
argsCopy.Revisions = newRevs
197200

198201
// Execute the search
199-
err = doSearch(&argsCopy)
200-
if err != nil {
201-
if errors.IsContextError(err) {
202-
logger.Warn(
203-
"commit search timed out, some commits may have been skipped",
204-
log.Error(err),
205-
log.String("repo", string(args.Repo)),
206-
log.Strings("include", commitHashes),
207-
log.Strings("exlcude", lastSearched),
208-
)
209-
} else {
210-
return err
211-
}
212-
}
213-
214-
// If the search was successful, store the resolved hashes
215-
// as the new "last searched" hashes
216-
return cm.UpsertLastSearched(ctx, monitorID, repoID, commitHashes)
217-
}
218-
219-
func snapshotHook(
220-
ctx context.Context,
221-
db database.DB,
222-
gs commit.GitserverClient,
223-
args *gitprotocol.SearchRequest,
224-
monitorID int64,
225-
repoID api.RepoID,
226-
) error {
227-
cm := db.CodeMonitors()
202+
searchErr := doSearch(&argsCopy)
228203

229-
// Resolve the requested revisions into a static set of commit hashes
230-
commitHashes, err := gs.ResolveRevisions(ctx, args.Repo, args.Revisions)
231-
if err != nil {
232-
return err
204+
// NOTE(camdencheek): we want to always save the "last searched" commits
205+
// because if we stream results, the user will get a notification for them
206+
// whether or not there was an error and forcing a re-search will cause the
207+
// user to get repeated notifications for the same commits. This makes code
208+
// monitors look very broken, and should be avoided.
209+
upsertErr := cm.UpsertLastSearched(ctx, monitorID, repoID, commitHashes)
210+
if upsertErr != nil {
211+
return upsertErr
233212
}
234213

235-
return cm.UpsertLastSearched(ctx, monitorID, repoID, commitHashes)
236-
}
237-
238-
func gqlURL(queryName string) (string, error) {
239-
u, err := url.Parse(internalapi.Client.URL)
240-
if err != nil {
241-
return "", err
214+
// Still return the error so it can be displayed to the user
215+
if searchErr != nil {
216+
return errors.Wrap(searchErr, "search failed, some commits may be skipped")
242217
}
243-
u.Path = "/.internal/graphql"
244-
u.RawQuery = queryName
245-
return u.String(), nil
218+
219+
return nil
246220
}
247221

248222
func stringsEqual(left, right []string) bool {

internal/codemonitors/search_test.go

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66

77
"github.com/stretchr/testify/require"
88

9-
"github.com/sourcegraph/log"
109
"github.com/sourcegraph/log/logtest"
1110

1211
"github.com/sourcegraph/sourcegraph/internal/actor"
@@ -24,6 +23,19 @@ import (
2423
"github.com/sourcegraph/sourcegraph/schema"
2524
)
2625

26+
func TestSnapshot(t *testing.T) {
27+
t.Run("fails with transaction", func(t *testing.T) {
28+
ctx := context.Background()
29+
logger := logtest.Scoped(t)
30+
db := database.NewDB(logger, dbtest.NewDB(logger, t))
31+
err := db.WithTransact(ctx, func(tx database.DB) error {
32+
_, err := Snapshot(ctx, logtest.Scoped(t), tx, "type:commit")
33+
return err
34+
})
35+
require.Error(t, err)
36+
})
37+
}
38+
2739
func TestAddCodeMonitorHook(t *testing.T) {
2840
t.Parallel()
2941

@@ -139,7 +151,7 @@ func TestCodeMonitorHook(t *testing.T) {
139151
}})
140152
return nil
141153
}
142-
err := hookWithID(ctx, db, logger, gs, fixtures.Monitor.ID, fixtures.Repo.ID, &gitprotocol.SearchRequest{}, doSearch)
154+
err := hookWithID(ctx, db, gs, fixtures.Monitor.ID, fixtures.Repo.ID, &gitprotocol.SearchRequest{}, doSearch)
143155
require.NoError(t, err)
144156

145157
// The next time, doSearch should receive the new resolved hashes plus the
@@ -156,16 +168,14 @@ func TestCodeMonitorHook(t *testing.T) {
156168
}})
157169
return nil
158170
}
159-
err = hookWithID(ctx, db, logger, gs, fixtures.Monitor.ID, fixtures.Repo.ID, &gitprotocol.SearchRequest{}, doSearch)
171+
err = hookWithID(ctx, db, gs, fixtures.Monitor.ID, fixtures.Repo.ID, &gitprotocol.SearchRequest{}, doSearch)
160172
require.NoError(t, err)
161173

162-
t.Run("deadline exceeded is not propagated", func(t *testing.T) {
163-
logger, getLogs := logtest.Captured(t)
174+
t.Run("deadline exceeded is propagated", func(t *testing.T) {
164175
doSearch = func(args *gitprotocol.SearchRequest) error {
165176
return context.DeadlineExceeded
166177
}
167-
err := hookWithID(ctx, db, logger, gs, fixtures.Monitor.ID, fixtures.Repo.ID, &gitprotocol.SearchRequest{}, doSearch)
168-
require.NoError(t, err)
169-
require.Equal(t, getLogs()[0].Level, log.LevelWarn)
178+
err := hookWithID(ctx, db, gs, fixtures.Monitor.ID, fixtures.Repo.ID, &gitprotocol.SearchRequest{}, doSearch)
179+
require.ErrorContains(t, err, "some commits may be skipped")
170180
})
171181
}

internal/search/commit/commit.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ func (j *SearchJob) Run(ctx context.Context, clients job.RuntimeClients, stream
9999
repos := searchrepos.NewResolver(clients.Logger, clients.DB, clients.Gitserver, clients.SearcherURLs, clients.Zoekt)
100100
it := repos.Iterator(ctx, j.RepoOpts)
101101

102-
p := pool.New().WithContext(ctx).WithMaxGoroutines(j.Concurrency).WithFirstError()
102+
p := pool.New().WithContext(ctx).WithMaxGoroutines(4).WithFirstError()
103103

104104
for it.Next() {
105105
page := it.Current()

internal/search/job/jobutil/job.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,6 @@ func NewBasicJob(inputs *search.Inputs, b query.Basic) (job.Job, error) {
165165
Diff: diff,
166166
Limit: int(fileMatchLimit),
167167
IncludeModifiedFiles: authz.SubRepoEnabled(authz.DefaultSubRepoPermsChecker) || own,
168-
Concurrency: 4,
169168
})
170169
}
171170

0 commit comments

Comments
 (0)