• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright 2018 syzkaller project authors. All rights reserved.
2// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
3
4package main
5
6import (
7	"time"
8
9	"github.com/google/syzkaller/pkg/hash"
10	"github.com/google/syzkaller/pkg/log"
11	"github.com/google/syzkaller/pkg/mgrconfig"
12	"github.com/google/syzkaller/pkg/report"
13	"github.com/google/syzkaller/pkg/rpctype"
14	"github.com/google/syzkaller/prog"
15)
16
17func (mgr *Manager) hubSyncLoop() {
18	hc := &HubConnector{
19		mgr:           mgr,
20		cfg:           mgr.cfg,
21		target:        mgr.target,
22		stats:         mgr.stats,
23		enabledCalls:  mgr.checkResult.EnabledCalls[mgr.cfg.Sandbox],
24		fresh:         mgr.fresh,
25		hubReproQueue: mgr.hubReproQueue,
26	}
27	if mgr.cfg.Reproduce && mgr.dash != nil {
28		hc.needMoreRepros = mgr.needMoreRepros
29	}
30	hc.loop()
31}
32
33type HubConnector struct {
34	mgr            HubManagerView
35	cfg            *mgrconfig.Config
36	target         *prog.Target
37	stats          *Stats
38	enabledCalls   []int
39	fresh          bool
40	hubCorpus      map[hash.Sig]bool
41	newRepros      [][]byte
42	hubReproQueue  chan *Crash
43	needMoreRepros chan chan bool
44}
45
46// HubManagerView restricts interface between HubConnector and Manager.
47type HubManagerView interface {
48	getMinimizedCorpus() (corpus, repros [][]byte)
49	addNewCandidates(progs [][]byte)
50}
51
52func (hc *HubConnector) loop() {
53	var hub *rpctype.RPCClient
54	for {
55		time.Sleep(time.Minute)
56		corpus, repros := hc.mgr.getMinimizedCorpus()
57		hc.newRepros = append(hc.newRepros, repros...)
58		if hub == nil {
59			var err error
60			if hub, err = hc.connect(corpus); err != nil {
61				log.Logf(0, "failed to connect to hub at %v: %v", hc.cfg.HubAddr, err)
62				continue
63			}
64			log.Logf(0, "connected to hub at %v, corpus %v", hc.cfg.HubAddr, len(corpus))
65		}
66		if err := hc.sync(hub, corpus); err != nil {
67			log.Logf(0, "hub sync failed: %v", err)
68			hub.Close()
69			hub = nil
70		}
71	}
72}
73
74func (hc *HubConnector) connect(corpus [][]byte) (*rpctype.RPCClient, error) {
75	a := &rpctype.HubConnectArgs{
76		Client:  hc.cfg.HubClient,
77		Key:     hc.cfg.HubKey,
78		Manager: hc.cfg.Name,
79		Fresh:   hc.fresh,
80	}
81	for _, id := range hc.enabledCalls {
82		a.Calls = append(a.Calls, hc.target.Syscalls[id].Name)
83	}
84	hubCorpus := make(map[hash.Sig]bool)
85	for _, inp := range corpus {
86		hubCorpus[hash.Hash(inp)] = true
87		a.Corpus = append(a.Corpus, inp)
88	}
89	// Hub.Connect request can be very large, so do it on a transient connection
90	// (rpc connection buffers never shrink).
91	if err := rpctype.RPCCall(hc.cfg.HubAddr, "Hub.Connect", a, nil); err != nil {
92		return nil, err
93	}
94	hub, err := rpctype.NewRPCClient(hc.cfg.HubAddr)
95	if err != nil {
96		return nil, err
97	}
98	hc.hubCorpus = hubCorpus
99	hc.fresh = false
100	return hub, nil
101}
102
103func (hc *HubConnector) sync(hub *rpctype.RPCClient, corpus [][]byte) error {
104	a := &rpctype.HubSyncArgs{
105		Client:  hc.cfg.HubClient,
106		Key:     hc.cfg.HubKey,
107		Manager: hc.cfg.Name,
108	}
109	sigs := make(map[hash.Sig]bool)
110	for _, inp := range corpus {
111		sig := hash.Hash(inp)
112		sigs[sig] = true
113		if hc.hubCorpus[sig] {
114			continue
115		}
116		hc.hubCorpus[sig] = true
117		a.Add = append(a.Add, inp)
118	}
119	for sig := range hc.hubCorpus {
120		if sigs[sig] {
121			continue
122		}
123		delete(hc.hubCorpus, sig)
124		a.Del = append(a.Del, sig.String())
125	}
126	if hc.needMoreRepros != nil {
127		needReproReply := make(chan bool)
128		hc.needMoreRepros <- needReproReply
129		a.NeedRepros = <-needReproReply
130	}
131	a.Repros = hc.newRepros
132	for {
133		r := new(rpctype.HubSyncRes)
134		if err := hub.Call("Hub.Sync", a, r); err != nil {
135			return err
136		}
137		progDropped := hc.processProgs(r.Progs)
138		reproDropped := hc.processRepros(r.Repros)
139		hc.stats.hubSendProgAdd.add(len(a.Add))
140		hc.stats.hubSendProgDel.add(len(a.Del))
141		hc.stats.hubSendRepro.add(len(a.Repros))
142		hc.stats.hubRecvProg.add(len(r.Progs) - progDropped)
143		hc.stats.hubRecvProgDrop.add(progDropped)
144		hc.stats.hubRecvRepro.add(len(r.Repros) - reproDropped)
145		hc.stats.hubRecvReproDrop.add(reproDropped)
146		log.Logf(0, "hub sync: send: add %v, del %v, repros %v;"+
147			" recv: progs %v, repros %v; more %v",
148			len(a.Add), len(a.Del), len(a.Repros),
149			len(r.Progs)-progDropped, len(r.Repros)-reproDropped, r.More)
150		a.Add = nil
151		a.Del = nil
152		a.Repros = nil
153		a.NeedRepros = false
154		hc.newRepros = nil
155		if len(r.Progs)+r.More == 0 {
156			return nil
157		}
158	}
159}
160
161func (hc *HubConnector) processProgs(progs [][]byte) int {
162	dropped := 0
163	candidates := make([][]byte, 0, len(progs))
164	for _, inp := range progs {
165		if _, err := hc.target.Deserialize(inp); err != nil {
166			dropped++
167			continue
168		}
169		candidates = append(candidates, inp)
170	}
171	hc.mgr.addNewCandidates(candidates)
172	return dropped
173}
174
175func (hc *HubConnector) processRepros(repros [][]byte) int {
176	dropped := 0
177	for _, repro := range repros {
178		if _, err := hc.target.Deserialize(repro); err != nil {
179			dropped++
180			continue
181		}
182		hc.hubReproQueue <- &Crash{
183			vmIndex: -1,
184			hub:     true,
185			Report: &report.Report{
186				Title:  "external repro",
187				Output: repro,
188			},
189		}
190	}
191	return dropped
192}
193