• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2006 Google Inc. All Rights Reserved.
2 
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 // worker.h : worker thread interface
16 
17 // This file contains the Worker Thread class interface
18 // for the SAT test. Worker Threads implement a repetative
19 // task used to test or stress the system.
20 
21 #ifndef STRESSAPPTEST_WORKER_H_
22 #define STRESSAPPTEST_WORKER_H_
23 
24 #include <pthread.h>
25 
26 #include <sys/time.h>
27 #include <sys/types.h>
28 
29 #ifdef HAVE_LIBAIO_H
30 #include <libaio.h>
31 #endif
32 
33 #include <queue>
34 #include <set>
35 #include <string>
36 #include <vector>
37 
38 // This file must work with autoconf on its public version,
39 // so these includes are correct.
40 #include "disk_blocks.h"
41 #include "queue.h"
42 #include "sattypes.h"
43 
44 
45 // Global Datastruture shared by the Cache Coherency Worker Threads.
46 struct cc_cacheline_data {
47   char *num;
48 };
49 
50 // Typical usage:
51 // (Other workflows may be possible, see function comments for details.)
52 // - Control thread creates object.
53 // - Control thread calls AddWorkers(1) for each worker thread.
54 // - Control thread calls Initialize().
55 // - Control thread launches worker threads.
56 // - Every worker thread frequently calls ContinueRunning().
57 // - Control thread periodically calls PauseWorkers(), effectively sleeps, and
58 //     then calls ResumeWorkers().
59 // - Some worker threads may exit early, before StopWorkers() is called.  They
60 //     call RemoveSelf() after their last call to ContinueRunning().
61 // - Control thread eventually calls StopWorkers().
62 // - Worker threads exit.
63 // - Control thread joins worker threads.
64 // - Control thread calls Destroy().
65 // - Control thread destroys object.
66 //
67 // Threadsafety:
68 // - ContinueRunning() may be called concurrently by different workers, but not
69 //     by a single worker.
70 // - No other methods may ever be called concurrently, with themselves or
71 //     eachother.
72 // - This object may be used by multiple threads only between Initialize() and
73 //     Destroy().
74 //
75 // TODO(matthewb): Move this class and its unittest to their own files.
76 class WorkerStatus {
77  public:
78   //--------------------------------
79   // Methods for the control thread.
80   //--------------------------------
81 
WorkerStatus()82   WorkerStatus() : num_workers_(0), status_(RUN) {}
83 
84   // Called by the control thread to increase the worker count.  Must be called
85   // before Initialize().  The worker count is 0 upon object initialization.
AddWorkers(int num_new_workers)86   void AddWorkers(int num_new_workers) {
87     // No need to lock num_workers_mutex_ because this is before Initialize().
88     num_workers_ += num_new_workers;
89   }
90 
91   // Called by the control thread.  May not be called multiple times.  If
92   // called, Destroy() must be called before destruction.
93   void Initialize();
94 
95   // Called by the control thread after joining all worker threads.  Must be
96   // called iff Initialize() was called.  No methods may be called after calling
97   // this.
98   void Destroy();
99 
100   // Called by the control thread to tell the workers to pause.  Does not return
101   // until all workers have called ContinueRunning() or RemoveSelf().  May only
102   // be called between Initialize() and Stop().  Must not be called multiple
103   // times without ResumeWorkers() having been called inbetween.
104   void PauseWorkers();
105 
106   // Called by the control thread to tell the workers to resume from a pause.
107   // May only be called between Initialize() and Stop().  May only be called
108   // directly after PauseWorkers().
109   void ResumeWorkers();
110 
111   // Called by the control thread to tell the workers to stop.  May only be
112   // called between Initialize() and Destroy().  May only be called once.
113   void StopWorkers();
114 
115   //--------------------------------
116   // Methods for the worker threads.
117   //--------------------------------
118 
119   // Called by worker threads to decrease the worker count by one.  May only be
120   // called between Initialize() and Destroy().  May wait for ResumeWorkers()
121   // when called after PauseWorkers().
122   void RemoveSelf();
123 
124   // Called by worker threads between Initialize() and Destroy().  May be called
125   // any number of times.  Return value is whether or not the worker should
126   // continue running.  When called after PauseWorkers(), does not return until
127   // ResumeWorkers() or StopWorkers() has been called.  Number of distinct
128   // calling threads must match the worker count (see AddWorkers() and
129   // RemoveSelf()).
130   bool ContinueRunning(bool *paused);
131 
132   // This is a hack!  It's like ContinueRunning(), except it won't pause.  If
133   // any worker threads use this exclusively in place of ContinueRunning() then
134   // PauseWorkers() should never be used!
135   bool ContinueRunningNoPause();
136 
137  private:
138   enum Status { RUN, PAUSE, STOP };
139 
WaitOnPauseBarrier()140   void WaitOnPauseBarrier() {
141 #ifdef HAVE_PTHREAD_BARRIERS
142     int error = pthread_barrier_wait(&pause_barrier_);
143     if (error != PTHREAD_BARRIER_SERIAL_THREAD)
144       sat_assert(error == 0);
145 #endif
146   }
147 
AcquireNumWorkersLock()148   void AcquireNumWorkersLock() {
149     sat_assert(0 == pthread_mutex_lock(&num_workers_mutex_));
150   }
151 
ReleaseNumWorkersLock()152   void ReleaseNumWorkersLock() {
153     sat_assert(0 == pthread_mutex_unlock(&num_workers_mutex_));
154   }
155 
AcquireStatusReadLock()156   void AcquireStatusReadLock() {
157     sat_assert(0 == pthread_rwlock_rdlock(&status_rwlock_));
158   }
159 
AcquireStatusWriteLock()160   void AcquireStatusWriteLock() {
161     sat_assert(0 == pthread_rwlock_wrlock(&status_rwlock_));
162   }
163 
ReleaseStatusLock()164   void ReleaseStatusLock() {
165     sat_assert(0 == pthread_rwlock_unlock(&status_rwlock_));
166   }
167 
GetStatus()168   Status GetStatus() {
169     AcquireStatusReadLock();
170     Status status = status_;
171     ReleaseStatusLock();
172     return status;
173   }
174 
175   // Returns the previous status.
SetStatus(Status status)176   Status SetStatus(Status status) {
177     AcquireStatusWriteLock();
178     Status prev_status = status_;
179     status_ = status;
180     ReleaseStatusLock();
181     return prev_status;
182   }
183 
184   pthread_mutex_t num_workers_mutex_;
185   int num_workers_;
186 
187   pthread_rwlock_t status_rwlock_;
188   Status status_;
189 
190 #ifdef HAVE_PTHREAD_BARRIERS
191   // Guaranteed to not be in use when (status_ != PAUSE).
192   pthread_barrier_t pause_barrier_;
193 #endif
194 
195   DISALLOW_COPY_AND_ASSIGN(WorkerStatus);
196 };
197 
198 
199 // This is a base class for worker threads.
200 // Each thread repeats a specific
201 // task on various blocks of memory.
202 class WorkerThread {
203  public:
204   // Enum to mark a thread as low/med/high priority.
205   enum Priority {
206     Low,
207     Normal,
208     High,
209   };
210   WorkerThread();
211   virtual ~WorkerThread();
212 
213   // Initialize values and thread ID number.
214   virtual void InitThread(int thread_num_init,
215                           class Sat *sat_init,
216                           class OsLayer *os_init,
217                           class PatternList *patternlist_init,
218                           WorkerStatus *worker_status);
219 
220   // This function is DEPRECATED, it does nothing.
SetPriority(Priority priority)221   void SetPriority(Priority priority) { priority_ = priority; }
222   // Spawn the worker thread, by running Work().
223   int SpawnThread();
224   // Only for ThreadSpawnerGeneric().
225   void StartRoutine();
226   bool InitPriority();
227 
228   // Wait for the thread to complete its cleanup.
229   virtual bool JoinThread();
230   // Kill worker thread with SIGINT.
231   virtual bool KillThread();
232 
233   // This is the task function that the thread executes.
234   // This is implemented per subclass.
235   virtual bool Work();
236 
237   // Starts per-WorkerThread timer.
StartThreadTimer()238   void StartThreadTimer() {gettimeofday(&start_time_, NULL);}
239   // Reads current timer value and returns run duration without recording it.
ReadThreadTimer()240   int64 ReadThreadTimer() {
241     struct timeval end_time_;
242     gettimeofday(&end_time_, NULL);
243     return (end_time_.tv_sec - start_time_.tv_sec)*1000000ULL +
244       (end_time_.tv_usec - start_time_.tv_usec);
245   }
246   // Stops per-WorkerThread timer and records thread run duration.
247   // Start/Stop ThreadTimer repetitively has cumulative effect, ie the timer
248   // is effectively paused and restarted, so runduration_usec accumulates on.
StopThreadTimer()249   void StopThreadTimer() {
250     runduration_usec_ += ReadThreadTimer();
251   }
252 
253   // Acccess member variables.
GetStatus()254   bool GetStatus() {return status_;}
GetErrorCount()255   int64 GetErrorCount() {return errorcount_;}
GetPageCount()256   int64 GetPageCount() {return pages_copied_;}
GetRunDurationUSec()257   int64 GetRunDurationUSec() {return runduration_usec_;}
258 
259   // Returns bandwidth defined as pages_copied / thread_run_durations.
260   virtual float GetCopiedData();
261   // Calculate worker thread specific copied data.
GetMemoryCopiedData()262   virtual float GetMemoryCopiedData() {return 0;}
GetDeviceCopiedData()263   virtual float GetDeviceCopiedData() {return 0;}
264   // Calculate worker thread specific bandwidth.
GetMemoryBandwidth()265   virtual float GetMemoryBandwidth()
266     {return GetMemoryCopiedData() / (
267         runduration_usec_ * 1.0 / 1000000.);}
GetDeviceBandwidth()268   virtual float GetDeviceBandwidth()
269     {return GetDeviceCopiedData() / (
270         runduration_usec_ * 1.0 / 1000000.);}
271 
set_cpu_mask(cpu_set_t * mask)272   void set_cpu_mask(cpu_set_t *mask) {
273     memcpy(&cpu_mask_, mask, sizeof(*mask));
274   }
275 
set_cpu_mask_to_cpu(int cpu_num)276   void set_cpu_mask_to_cpu(int cpu_num) {
277     cpuset_set_ab(&cpu_mask_, cpu_num, cpu_num + 1);
278   }
279 
set_tag(int32 tag)280   void set_tag(int32 tag) {tag_ = tag;}
281 
282   // Returns CPU mask, where each bit represents a logical cpu.
283   bool AvailableCpus(cpu_set_t *cpuset);
284   // Returns CPU mask of CPUs this thread is bound to,
285   bool CurrentCpus(cpu_set_t *cpuset);
286   // Returns Current Cpus mask as string.
CurrentCpusFormat()287   string CurrentCpusFormat() {
288     cpu_set_t current_cpus;
289     CurrentCpus(&current_cpus);
290     return cpuset_format(&current_cpus);
291   }
292 
ThreadID()293   int ThreadID() {return thread_num_;}
294 
295   // Bind worker thread to specified CPU(s)
296   bool BindToCpus(const cpu_set_t *cpuset);
297 
298  protected:
299   // This function dictates whether the main work loop
300   // continues, waits, or terminates.
301   // All work loops should be of the form:
302   //   do {
303   //     // work.
304   //   } while (IsReadyToRun());
305   virtual bool IsReadyToRun(bool *paused = NULL) {
306     return worker_status_->ContinueRunning(paused);
307   }
308 
309   // Like IsReadyToRun(), except it won't pause.
IsReadyToRunNoPause()310   virtual bool IsReadyToRunNoPause() {
311     return worker_status_->ContinueRunningNoPause();
312   }
313 
314   // These are functions used by the various work loops.
315   // Pretty print and log a data miscompare.
316   virtual void ProcessError(struct ErrorRecord *er,
317                             int priority,
318                             const char *message);
319 
320   // Compare a region of memory with a known data patter, and report errors.
321   virtual int CheckRegion(void *addr,
322                           class Pattern *pat,
323                           int64 length,
324                           int offset,
325                           int64 patternoffset);
326 
327   // Fast compare a block of memory.
328   virtual int CrcCheckPage(struct page_entry *srcpe);
329 
330   // Fast copy a block of memory, while verifying correctness.
331   virtual int CrcCopyPage(struct page_entry *dstpe,
332                           struct page_entry *srcpe);
333 
334   // Fast copy a block of memory, while verifying correctness, and heating CPU.
335   virtual int CrcWarmCopyPage(struct page_entry *dstpe,
336                               struct page_entry *srcpe);
337 
338   // Fill a page with its specified pattern.
339   virtual bool FillPage(struct page_entry *pe);
340 
341   // Copy with address tagging.
342   virtual bool AdlerAddrMemcpyC(uint64 *dstmem64,
343                                 uint64 *srcmem64,
344                                 unsigned int size_in_bytes,
345                                 AdlerChecksum *checksum,
346                                 struct page_entry *pe);
347   // SSE copy with address tagging.
348   virtual bool AdlerAddrMemcpyWarm(uint64 *dstmem64,
349                                    uint64 *srcmem64,
350                                    unsigned int size_in_bytes,
351                                    AdlerChecksum *checksum,
352                                    struct page_entry *pe);
353   // Crc data with address tagging.
354   virtual bool AdlerAddrCrcC(uint64 *srcmem64,
355                              unsigned int size_in_bytes,
356                              AdlerChecksum *checksum,
357                              struct page_entry *pe);
358   // Setup tagging on an existing page.
359   virtual bool TagAddrC(uint64 *memwords,
360                         unsigned int size_in_bytes);
361   // Report a mistagged cacheline.
362   virtual bool ReportTagError(uint64 *mem64,
363                       uint64 actual,
364                       uint64 tag);
365   // Print out the error record of the tag mismatch.
366   virtual void ProcessTagError(struct ErrorRecord *error,
367                        int priority,
368                        const char *message);
369 
370   // A worker thread can yield itself to give up CPU until it's scheduled again
371   bool YieldSelf();
372 
373  protected:
374   // General state variables that all subclasses need.
375   int thread_num_;                  // Thread ID.
376   volatile bool status_;            // Error status.
377   volatile int64 pages_copied_;     // Recorded for memory bandwidth calc.
378   volatile int64 errorcount_;       // Miscompares seen by this thread.
379 
380   cpu_set_t cpu_mask_;              // Cores this thread is allowed to run on.
381   volatile uint32 tag_;             // Tag hint for memory this thread can use.
382 
383   bool tag_mode_;                   // Tag cachelines with vaddr.
384 
385   // Thread timing variables.
386   struct timeval start_time_;        // Worker thread start time.
387   volatile int64 runduration_usec_;  // Worker run duration in u-seconds.
388 
389   // Function passed to pthread_create.
390   void *(*thread_spawner_)(void *args);
391   pthread_t thread_;                // Pthread thread ID.
392   Priority priority_;               // Worker thread priority.
393   class Sat *sat_;                  // Reference to parent stest object.
394   class OsLayer *os_;               // Os abstraction: put hacks here.
395   class PatternList *patternlist_;  // Reference to data patterns.
396 
397   // Work around style guide ban on sizeof(int).
398   static const uint64 iamint_ = 0;
399   static const int wordsize_ = sizeof(iamint_);
400 
401  private:
402   WorkerStatus *worker_status_;
403 
404   DISALLOW_COPY_AND_ASSIGN(WorkerThread);
405 };
406 
407 // Worker thread to perform File IO.
408 class FileThread : public WorkerThread {
409  public:
410   FileThread();
411   // Set filename to use for file IO.
412   virtual void SetFile(const char *filename_init);
413   virtual bool Work();
414 
415   // Calculate worker thread specific bandwidth.
GetDeviceCopiedData()416   virtual float GetDeviceCopiedData()
417     {return GetCopiedData()*2;}
418   virtual float GetMemoryCopiedData();
419 
420  protected:
421   // Record of where these pages were sourced from, and what
422   // potentially broken components they passed through.
423   struct PageRec {
424      class Pattern *pattern;  // This is the data it should contain.
425      void *src;  // This is the memory location the data was sourced from.
426      void *dst;  // This is where it ended up.
427   };
428 
429   // These are functions used by the various work loops.
430   // Pretty print and log a data miscompare. Disks require
431   // slightly different error handling.
432   virtual void ProcessError(struct ErrorRecord *er,
433                             int priority,
434                             const char *message);
435 
436   virtual bool OpenFile(int *pfile);
437   virtual bool CloseFile(int fd);
438 
439   // Read and write whole file to disk.
440   virtual bool WritePages(int fd);
441   virtual bool ReadPages(int fd);
442 
443   // Read and write pages to disk.
444   virtual bool WritePageToFile(int fd, struct page_entry *src);
445   virtual bool ReadPageFromFile(int fd, struct page_entry *dst);
446 
447   // Sector tagging support.
448   virtual bool SectorTagPage(struct page_entry *src, int block);
449   virtual bool SectorValidatePage(const struct PageRec &page,
450                                   struct page_entry *dst,
451                                   int block);
452 
453   // Get memory for an incoming data transfer..
454   virtual bool PagePrepare();
455   // Remove memory allocated for data transfer.
456   virtual bool PageTeardown();
457 
458   // Get memory for an incoming data transfer..
459   virtual bool GetEmptyPage(struct page_entry *dst);
460   // Get memory for an outgoing data transfer..
461   virtual bool GetValidPage(struct page_entry *dst);
462   // Throw out a used empty page.
463   virtual bool PutEmptyPage(struct page_entry *src);
464   // Throw out a used, filled page.
465   virtual bool PutValidPage(struct page_entry *src);
466 
467 
468   struct PageRec *page_recs_;          // Array of page records.
469   int crc_page_;                        // Page currently being CRC checked.
470   string filename_;                     // Name of file to access.
471   string devicename_;                   // Name of device file is on.
472 
473   bool page_io_;                        // Use page pool for IO.
474   void *local_page_;                   // malloc'd page fon non-pool IO.
475   int pass_;                            // Number of writes to the file so far.
476 
477   // Tag to detect file corruption.
478   struct SectorTag {
479     volatile uint8 magic;
480     volatile uint8 block;
481     volatile uint8 sector;
482     volatile uint8 pass;
483     char pad[512-4];
484   };
485 
486   DISALLOW_COPY_AND_ASSIGN(FileThread);
487 };
488 
489 
490 // Worker thread to perform Network IO.
491 class NetworkThread : public WorkerThread {
492  public:
493   NetworkThread();
494   // Set hostname to use for net IO.
495   virtual void SetIP(const char *ipaddr_init);
496   virtual bool Work();
497 
498   // Calculate worker thread specific bandwidth.
GetDeviceCopiedData()499   virtual float GetDeviceCopiedData()
500     {return GetCopiedData()*2;}
501 
502  protected:
503   // IsReadyToRunNoPause() wrapper, for NetworkSlaveThread to override.
504   virtual bool IsNetworkStopSet();
505   virtual bool CreateSocket(int *psocket);
506   virtual bool CloseSocket(int sock);
507   virtual bool Connect(int sock);
508   virtual bool SendPage(int sock, struct page_entry *src);
509   virtual bool ReceivePage(int sock, struct page_entry *dst);
510   char ipaddr_[256];
511   int sock_;
512 
513  private:
514   DISALLOW_COPY_AND_ASSIGN(NetworkThread);
515 };
516 
517 // Worker thread to reflect Network IO.
518 class NetworkSlaveThread : public NetworkThread {
519  public:
520   NetworkSlaveThread();
521   // Set socket for IO.
522   virtual void SetSock(int sock);
523   virtual bool Work();
524 
525  protected:
526   virtual bool IsNetworkStopSet();
527 
528  private:
529   DISALLOW_COPY_AND_ASSIGN(NetworkSlaveThread);
530 };
531 
532 // Worker thread to detect incoming Network IO.
533 class NetworkListenThread : public NetworkThread {
534  public:
535   NetworkListenThread();
536   virtual bool Work();
537 
538  private:
539   virtual bool Listen();
540   virtual bool Wait();
541   virtual bool GetConnection(int *pnewsock);
542   virtual bool SpawnSlave(int newsock, int threadid);
543   virtual bool ReapSlaves();
544 
545   // For serviced incoming connections.
546   struct ChildWorker {
547     WorkerStatus status;
548     NetworkSlaveThread thread;
549   };
550   typedef vector<ChildWorker*> ChildVector;
551   ChildVector child_workers_;
552 
553   DISALLOW_COPY_AND_ASSIGN(NetworkListenThread);
554 };
555 
556 // Worker thread to perform Memory Copy.
557 class CopyThread : public WorkerThread {
558  public:
CopyThread()559   CopyThread() {}
560   virtual bool Work();
561   // Calculate worker thread specific bandwidth.
GetMemoryCopiedData()562   virtual float GetMemoryCopiedData()
563     {return GetCopiedData()*2;}
564 
565  private:
566   DISALLOW_COPY_AND_ASSIGN(CopyThread);
567 };
568 
569 // Worker thread to perform Memory Invert.
570 class InvertThread : public WorkerThread {
571  public:
InvertThread()572   InvertThread() {}
573   virtual bool Work();
574   // Calculate worker thread specific bandwidth.
GetMemoryCopiedData()575   virtual float GetMemoryCopiedData()
576     {return GetCopiedData()*4;}
577 
578  private:
579   virtual int InvertPageUp(struct page_entry *srcpe);
580   virtual int InvertPageDown(struct page_entry *srcpe);
581   DISALLOW_COPY_AND_ASSIGN(InvertThread);
582 };
583 
584 // Worker thread to fill blank pages on startup.
585 class FillThread : public WorkerThread {
586  public:
587   FillThread();
588   // Set how many pages this thread should fill before exiting.
589   virtual void SetFillPages(int64 num_pages_to_fill_init);
590   virtual bool Work();
591 
592  private:
593   // Fill a page with the data pattern in pe->pattern.
594   virtual bool FillPageRandom(struct page_entry *pe);
595   int64 num_pages_to_fill_;
596   DISALLOW_COPY_AND_ASSIGN(FillThread);
597 };
598 
599 // Worker thread to verify page data matches pattern data.
600 // Thread will check and replace pages until "done" flag is set,
601 // then it will check and discard pages until no more remain.
602 class CheckThread : public WorkerThread {
603  public:
CheckThread()604   CheckThread() {}
605   virtual bool Work();
606   // Calculate worker thread specific bandwidth.
GetMemoryCopiedData()607   virtual float GetMemoryCopiedData()
608     {return GetCopiedData();}
609 
610  private:
611   DISALLOW_COPY_AND_ASSIGN(CheckThread);
612 };
613 
614 
615 // Worker thread to poll for system error messages.
616 // Thread will check for messages until "done" flag is set.
617 class ErrorPollThread : public WorkerThread {
618  public:
ErrorPollThread()619   ErrorPollThread() {}
620   virtual bool Work();
621 
622  private:
623   DISALLOW_COPY_AND_ASSIGN(ErrorPollThread);
624 };
625 
626 // Computation intensive worker thread to stress CPU.
627 class CpuStressThread : public WorkerThread {
628  public:
CpuStressThread()629   CpuStressThread() {}
630   virtual bool Work();
631 
632  private:
633   DISALLOW_COPY_AND_ASSIGN(CpuStressThread);
634 };
635 
636 // Worker thread that tests the correctness of the
637 // CPU Cache Coherency Protocol.
638 class CpuCacheCoherencyThread : public WorkerThread {
639  public:
640   CpuCacheCoherencyThread(cc_cacheline_data *cc_data,
641                           int cc_cacheline_count_,
642                           int cc_thread_num_,
643                           int cc_thread_count_,
644                           int cc_inc_count_);
645   virtual bool Work();
646 
647  protected:
648   // Used by the simple random number generator as a shift feedback;
649   // this polynomial (x^64 + x^63 + x^61 + x^60 + 1) will produce a
650   // psuedorandom cycle of period 2^64-1.
651   static const uint64 kRandomPolynomial = 0xD800000000000000ULL;
652   // A very simple psuedorandom generator that can be inlined and use
653   // registers, to keep the CC test loop tight and focused.
654   static uint64 SimpleRandom(uint64 seed);
655 
656   cc_cacheline_data *cc_cacheline_data_;  // Datstructure for each cacheline.
657   int cc_local_num_;        // Local counter for each thread.
658   int cc_cacheline_count_;  // Number of cache lines to operate on.
659   int cc_thread_num_;       // The integer id of the thread which is
660                             // used as an index into the integer array
661                             // of the cacheline datastructure.
662   int cc_thread_count_;     // Total number of threads being run, for
663                             // calculations mixing up cache line access.
664   int cc_inc_count_;        // Number of times to increment the counter.
665 
666  private:
667   DISALLOW_COPY_AND_ASSIGN(CpuCacheCoherencyThread);
668 };
669 
670 // Worker thread to perform disk test.
671 class DiskThread : public WorkerThread {
672  public:
673   explicit DiskThread(DiskBlockTable *block_table);
674   virtual ~DiskThread();
675   // Calculate disk thread specific bandwidth.
GetDeviceCopiedData()676   virtual float GetDeviceCopiedData() {
677     return (blocks_written_ * write_block_size_ +
678             blocks_read_ * read_block_size_) / kMegabyte;}
679 
680   // Set filename for device file (in /dev).
681   virtual void SetDevice(const char *device_name);
682   // Set various parameters that control the behaviour of the test.
683   virtual bool SetParameters(int read_block_size,
684                              int write_block_size,
685                              int64 segment_size,
686                              int64 cache_size,
687                              int blocks_per_segment,
688                              int64 read_threshold,
689                              int64 write_threshold,
690                              int non_destructive);
691 
692   virtual bool Work();
693 
GetMemoryCopiedData()694   virtual float GetMemoryCopiedData() {return 0;}
695 
696  protected:
697   static const int kSectorSize = 512;       // Size of sector on disk.
698   static const int kBufferAlignment = 512;  // Buffer alignment required by the
699                                             // kernel.
700   static const int kBlockRetry = 100;       // Number of retries to allocate
701                                             // sectors.
702 
703   enum IoOp {
704     ASYNC_IO_READ   = 0,
705     ASYNC_IO_WRITE  = 1
706   };
707 
708   virtual bool OpenDevice(int *pfile);
709   virtual bool CloseDevice(int fd);
710 
711   // Retrieves the size (in bytes) of the disk/file.
712   virtual bool GetDiskSize(int fd);
713 
714   // Retrieves the current time in microseconds.
715   virtual int64 GetTime();
716 
717   // Do an asynchronous disk I/O operation.
718   virtual bool AsyncDiskIO(IoOp op, int fd, void *buf, int64 size,
719                            int64 offset, int64 timeout);
720 
721   // Write a block to disk.
722   virtual bool WriteBlockToDisk(int fd, BlockData *block);
723 
724   // Verify a block on disk.
725   virtual bool ValidateBlockOnDisk(int fd, BlockData *block);
726 
727   // Main work loop.
728   virtual bool DoWork(int fd);
729 
730   int read_block_size_;       // Size of blocks read from disk, in bytes.
731   int write_block_size_;      // Size of blocks written to disk, in bytes.
732   int64 blocks_read_;         // Number of blocks read in work loop.
733   int64 blocks_written_;      // Number of blocks written in work loop.
734   int64 segment_size_;        // Size of disk segments (in bytes) that the disk
735                               // will be split into where testing can be
736                               // confined to a particular segment.
737                               // Allows for control of how evenly the disk will
738                               // be tested.  Smaller segments imply more even
739                               // testing (less random).
740   int blocks_per_segment_;    // Number of blocks that will be tested per
741                               // segment.
742   int cache_size_;            // Size of disk cache, in bytes.
743   int queue_size_;            // Length of in-flight-blocks queue, in blocks.
744   int non_destructive_;       // Use non-destructive mode or not.
745   int update_block_table_;    // If true, assume this is the thread
746                               // responsible for writing the data in the disk
747                               // for this block device and, therefore,
748                               // update the block table. If false, just use
749                               // the block table to get data.
750 
751   // read/write times threshold for reporting a problem
752   int64 read_threshold_;      // Maximum time a read should take (in us) before
753                               // a warning is given.
754   int64 write_threshold_;     // Maximum time a write should take (in us) before
755                               // a warning is given.
756   int64 read_timeout_;        // Maximum time a read can take before a timeout
757                               // and the aborting of the read operation.
758   int64 write_timeout_;       // Maximum time a write can take before a timeout
759                               // and the aborting of the write operation.
760 
761   string device_name_;        // Name of device file to access.
762   int64 device_sectors_;      // Number of sectors on the device.
763 
764   std::queue<BlockData*> in_flight_sectors_;   // Queue of sectors written but
765                                                 // not verified.
766   void *block_buffer_;        // Pointer to aligned block buffer.
767 
768 #ifdef HAVE_LIBAIO_H
769   io_context_t aio_ctx_;     // Asynchronous I/O context for Linux native AIO.
770 #endif
771 
772   DiskBlockTable *block_table_;  // Disk Block Table, shared by all disk
773                                  // threads that read / write at the same
774                                  // device
775 
776   DISALLOW_COPY_AND_ASSIGN(DiskThread);
777 };
778 
779 class RandomDiskThread : public DiskThread {
780  public:
781   explicit RandomDiskThread(DiskBlockTable *block_table);
782   virtual ~RandomDiskThread();
783   // Main work loop.
784   virtual bool DoWork(int fd);
785  protected:
786   DISALLOW_COPY_AND_ASSIGN(RandomDiskThread);
787 };
788 
789 // Worker thread to perform checks in a specific memory region.
790 class MemoryRegionThread : public WorkerThread {
791  public:
792   MemoryRegionThread();
793   ~MemoryRegionThread();
794   virtual bool Work();
795   void ProcessError(struct ErrorRecord *error, int priority,
796                     const char *message);
797   bool SetRegion(void *region, int64 size);
798   // Calculate worker thread specific bandwidth.
GetMemoryCopiedData()799   virtual float GetMemoryCopiedData()
800     {return GetCopiedData();}
GetDeviceCopiedData()801   virtual float GetDeviceCopiedData()
802     {return GetCopiedData() * 2;}
SetIdentifier(string identifier)803   void SetIdentifier(string identifier) {
804     identifier_ = identifier;
805   }
806 
807  protected:
808   // Page queue for this particular memory region.
809   char *region_;
810   PageEntryQueue *pages_;
811   bool error_injection_;
812   int phase_;
813   string identifier_;
814   static const int kPhaseNoPhase = 0;
815   static const int kPhaseCopy = 1;
816   static const int kPhaseCheck = 2;
817 
818  private:
819   DISALLOW_COPY_AND_ASSIGN(MemoryRegionThread);
820 };
821 
822 // Worker thread to check that the frequency of every cpu does not go below a
823 // certain threshold.
824 class CpuFreqThread : public WorkerThread {
825  public:
826   CpuFreqThread(int num_cpus, int freq_threshold, int round);
827   ~CpuFreqThread();
828 
829   // This is the task function that the thread executes.
830   virtual bool Work();
831 
832   // Returns true if this test can run on the current machine. Otherwise,
833   // returns false.
834   static bool CanRun();
835 
836  private:
837   static const int kIntervalPause = 10;   // The number of seconds to pause
838                                           // between acquiring the MSR data.
839   static const int kStartupDelay = 5;     // The number of seconds to wait
840                                           // before acquiring MSR data.
841   static const int kMsrTscAddr = 0x10;    // The address of the TSC MSR.
842   static const int kMsrAperfAddr = 0xE8;  // The address of the APERF MSR.
843   static const int kMsrMperfAddr = 0xE7;  // The address of the MPERF MSR.
844 
845   // The index values into the CpuDataType.msr[] array.
846   enum MsrValues {
847     kMsrTsc = 0,           // MSR index 0 = TSC.
848     kMsrAperf = 1,         // MSR index 1 = APERF.
849     kMsrMperf = 2,         // MSR index 2 = MPERF.
850     kMsrLast,              // Last MSR index.
851   };
852 
853   typedef struct {
854     uint32 msr;         // The address of the MSR.
855     const char *name;   // A human readable string for the MSR.
856   } CpuRegisterType;
857 
858   typedef struct {
859     uint64 msrs[kMsrLast];  // The values of the MSRs.
860     struct timeval tv;      // The time at which the MSRs were read.
861   } CpuDataType;
862 
863   // The set of MSR addresses and register names.
864   static const CpuRegisterType kCpuRegisters[kMsrLast];
865 
866   // Compute the change in values of the MSRs between current and previous,
867   // set the frequency in MHz of the cpu. If there is an error computing
868   // the delta, return false. Othewise, return true.
869   bool ComputeFrequency(CpuDataType *current, CpuDataType *previous,
870                         int *frequency);
871 
872   // Get the MSR values for this particular cpu and save them in data. If
873   // any error is encountered, returns false. Otherwise, returns true.
874   bool GetMsrs(int cpu, CpuDataType *data);
875 
876   // Compute the difference between the currently read MSR values and the
877   // previously read values and store the results in delta. If any of the
878   // values did not increase, or the TSC value is too small, returns false.
879   // Otherwise, returns true.
880   bool ComputeDelta(CpuDataType *current, CpuDataType *previous,
881                     CpuDataType *delta);
882 
883   // The total number of cpus on the system.
884   int num_cpus_;
885 
886   // The minimum frequency that each cpu must operate at (in MHz).
887   int freq_threshold_;
888 
889   // The value to round the computed frequency to.
890   int round_;
891 
892   // Precomputed value to add to the frequency to do the rounding.
893   double round_value_;
894 
895   DISALLOW_COPY_AND_ASSIGN(CpuFreqThread);
896 };
897 
898 #endif  // STRESSAPPTEST_WORKER_H_
899