1// Copyright 2018 syzkaller project authors. All rights reserved. 2// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. 3 4package main 5 6import ( 7 "time" 8 9 "github.com/google/syzkaller/pkg/hash" 10 "github.com/google/syzkaller/pkg/log" 11 "github.com/google/syzkaller/pkg/mgrconfig" 12 "github.com/google/syzkaller/pkg/report" 13 "github.com/google/syzkaller/pkg/rpctype" 14 "github.com/google/syzkaller/prog" 15) 16 17func (mgr *Manager) hubSyncLoop() { 18 hc := &HubConnector{ 19 mgr: mgr, 20 cfg: mgr.cfg, 21 target: mgr.target, 22 stats: mgr.stats, 23 enabledCalls: mgr.checkResult.EnabledCalls[mgr.cfg.Sandbox], 24 fresh: mgr.fresh, 25 hubReproQueue: mgr.hubReproQueue, 26 } 27 if mgr.cfg.Reproduce && mgr.dash != nil { 28 hc.needMoreRepros = mgr.needMoreRepros 29 } 30 hc.loop() 31} 32 33type HubConnector struct { 34 mgr HubManagerView 35 cfg *mgrconfig.Config 36 target *prog.Target 37 stats *Stats 38 enabledCalls []int 39 fresh bool 40 hubCorpus map[hash.Sig]bool 41 newRepros [][]byte 42 hubReproQueue chan *Crash 43 needMoreRepros chan chan bool 44} 45 46// HubManagerView restricts interface between HubConnector and Manager. 47type HubManagerView interface { 48 getMinimizedCorpus() (corpus, repros [][]byte) 49 addNewCandidates(progs [][]byte) 50} 51 52func (hc *HubConnector) loop() { 53 var hub *rpctype.RPCClient 54 for { 55 time.Sleep(time.Minute) 56 corpus, repros := hc.mgr.getMinimizedCorpus() 57 hc.newRepros = append(hc.newRepros, repros...) 58 if hub == nil { 59 var err error 60 if hub, err = hc.connect(corpus); err != nil { 61 log.Logf(0, "failed to connect to hub at %v: %v", hc.cfg.HubAddr, err) 62 continue 63 } 64 log.Logf(0, "connected to hub at %v, corpus %v", hc.cfg.HubAddr, len(corpus)) 65 } 66 if err := hc.sync(hub, corpus); err != nil { 67 log.Logf(0, "hub sync failed: %v", err) 68 hub.Close() 69 hub = nil 70 } 71 } 72} 73 74func (hc *HubConnector) connect(corpus [][]byte) (*rpctype.RPCClient, error) { 75 a := &rpctype.HubConnectArgs{ 76 Client: hc.cfg.HubClient, 77 Key: hc.cfg.HubKey, 78 Manager: hc.cfg.Name, 79 Fresh: hc.fresh, 80 } 81 for _, id := range hc.enabledCalls { 82 a.Calls = append(a.Calls, hc.target.Syscalls[id].Name) 83 } 84 hubCorpus := make(map[hash.Sig]bool) 85 for _, inp := range corpus { 86 hubCorpus[hash.Hash(inp)] = true 87 a.Corpus = append(a.Corpus, inp) 88 } 89 // Hub.Connect request can be very large, so do it on a transient connection 90 // (rpc connection buffers never shrink). 91 if err := rpctype.RPCCall(hc.cfg.HubAddr, "Hub.Connect", a, nil); err != nil { 92 return nil, err 93 } 94 hub, err := rpctype.NewRPCClient(hc.cfg.HubAddr) 95 if err != nil { 96 return nil, err 97 } 98 hc.hubCorpus = hubCorpus 99 hc.fresh = false 100 return hub, nil 101} 102 103func (hc *HubConnector) sync(hub *rpctype.RPCClient, corpus [][]byte) error { 104 a := &rpctype.HubSyncArgs{ 105 Client: hc.cfg.HubClient, 106 Key: hc.cfg.HubKey, 107 Manager: hc.cfg.Name, 108 } 109 sigs := make(map[hash.Sig]bool) 110 for _, inp := range corpus { 111 sig := hash.Hash(inp) 112 sigs[sig] = true 113 if hc.hubCorpus[sig] { 114 continue 115 } 116 hc.hubCorpus[sig] = true 117 a.Add = append(a.Add, inp) 118 } 119 for sig := range hc.hubCorpus { 120 if sigs[sig] { 121 continue 122 } 123 delete(hc.hubCorpus, sig) 124 a.Del = append(a.Del, sig.String()) 125 } 126 if hc.needMoreRepros != nil { 127 needReproReply := make(chan bool) 128 hc.needMoreRepros <- needReproReply 129 a.NeedRepros = <-needReproReply 130 } 131 a.Repros = hc.newRepros 132 for { 133 r := new(rpctype.HubSyncRes) 134 if err := hub.Call("Hub.Sync", a, r); err != nil { 135 return err 136 } 137 progDropped := hc.processProgs(r.Progs) 138 reproDropped := hc.processRepros(r.Repros) 139 hc.stats.hubSendProgAdd.add(len(a.Add)) 140 hc.stats.hubSendProgDel.add(len(a.Del)) 141 hc.stats.hubSendRepro.add(len(a.Repros)) 142 hc.stats.hubRecvProg.add(len(r.Progs) - progDropped) 143 hc.stats.hubRecvProgDrop.add(progDropped) 144 hc.stats.hubRecvRepro.add(len(r.Repros) - reproDropped) 145 hc.stats.hubRecvReproDrop.add(reproDropped) 146 log.Logf(0, "hub sync: send: add %v, del %v, repros %v;"+ 147 " recv: progs %v, repros %v; more %v", 148 len(a.Add), len(a.Del), len(a.Repros), 149 len(r.Progs)-progDropped, len(r.Repros)-reproDropped, r.More) 150 a.Add = nil 151 a.Del = nil 152 a.Repros = nil 153 a.NeedRepros = false 154 hc.newRepros = nil 155 if len(r.Progs)+r.More == 0 { 156 return nil 157 } 158 } 159} 160 161func (hc *HubConnector) processProgs(progs [][]byte) int { 162 dropped := 0 163 candidates := make([][]byte, 0, len(progs)) 164 for _, inp := range progs { 165 if _, err := hc.target.Deserialize(inp); err != nil { 166 dropped++ 167 continue 168 } 169 candidates = append(candidates, inp) 170 } 171 hc.mgr.addNewCandidates(candidates) 172 return dropped 173} 174 175func (hc *HubConnector) processRepros(repros [][]byte) int { 176 dropped := 0 177 for _, repro := range repros { 178 if _, err := hc.target.Deserialize(repro); err != nil { 179 dropped++ 180 continue 181 } 182 hc.hubReproQueue <- &Crash{ 183 vmIndex: -1, 184 hub: true, 185 Report: &report.Report{ 186 Title: "external repro", 187 Output: repro, 188 }, 189 } 190 } 191 return dropped 192} 193