• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2006 Google Inc. All Rights Reserved.
2 
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 // worker.h : worker thread interface
16 
17 // This file contains the Worker Thread class interface
18 // for the SAT test. Worker Threads implement a repetative
19 // task used to test or stress the system.
20 
21 #ifndef STRESSAPPTEST_WORKER_H_
22 #define STRESSAPPTEST_WORKER_H_
23 
24 #include <pthread.h>
25 
26 #include <sys/time.h>
27 #include <sys/types.h>
28 
29 #ifdef HAVE_LIBAIO_H
30 #include <libaio.h>
31 #endif
32 
33 #include <queue>
34 #include <set>
35 #include <string>
36 #include <vector>
37 
38 // This file must work with autoconf on its public version,
39 // so these includes are correct.
40 #include "disk_blocks.h"
41 #include "queue.h"
42 #include "sattypes.h"
43 
44 
45 // Global Datastruture shared by the Cache Coherency Worker Threads.
46 struct cc_cacheline_data {
47   int *num;
48 };
49 
50 // Typical usage:
51 // (Other workflows may be possible, see function comments for details.)
52 // - Control thread creates object.
53 // - Control thread calls AddWorkers(1) for each worker thread.
54 // - Control thread calls Initialize().
55 // - Control thread launches worker threads.
56 // - Every worker thread frequently calls ContinueRunning().
57 // - Control thread periodically calls PauseWorkers(), effectively sleeps, and
58 //     then calls ResumeWorkers().
59 // - Some worker threads may exit early, before StopWorkers() is called.  They
60 //     call RemoveSelf() after their last call to ContinueRunning().
61 // - Control thread eventually calls StopWorkers().
62 // - Worker threads exit.
63 // - Control thread joins worker threads.
64 // - Control thread calls Destroy().
65 // - Control thread destroys object.
66 //
67 // Threadsafety:
68 // - ContinueRunning() may be called concurrently by different workers, but not
69 //     by a single worker.
70 // - No other methods may ever be called concurrently, with themselves or
71 //     eachother.
72 // - This object may be used by multiple threads only between Initialize() and
73 //     Destroy().
74 //
75 // TODO(matthewb): Move this class and its unittest to their own files.
76 class WorkerStatus {
77  public:
78   //--------------------------------
79   // Methods for the control thread.
80   //--------------------------------
81 
WorkerStatus()82   WorkerStatus() : num_workers_(0), status_(RUN) {}
83 
84   // Called by the control thread to increase the worker count.  Must be called
85   // before Initialize().  The worker count is 0 upon object initialization.
AddWorkers(int num_new_workers)86   void AddWorkers(int num_new_workers) {
87     // No need to lock num_workers_mutex_ because this is before Initialize().
88     num_workers_ += num_new_workers;
89   }
90 
91   // Called by the control thread.  May not be called multiple times.  If
92   // called, Destroy() must be called before destruction.
93   void Initialize();
94 
95   // Called by the control thread after joining all worker threads.  Must be
96   // called iff Initialize() was called.  No methods may be called after calling
97   // this.
98   void Destroy();
99 
100   // Called by the control thread to tell the workers to pause.  Does not return
101   // until all workers have called ContinueRunning() or RemoveSelf().  May only
102   // be called between Initialize() and Stop().  Must not be called multiple
103   // times without ResumeWorkers() having been called inbetween.
104   void PauseWorkers();
105 
106   // Called by the control thread to tell the workers to resume from a pause.
107   // May only be called between Initialize() and Stop().  May only be called
108   // directly after PauseWorkers().
109   void ResumeWorkers();
110 
111   // Called by the control thread to tell the workers to stop.  May only be
112   // called between Initialize() and Destroy().  May only be called once.
113   void StopWorkers();
114 
115   //--------------------------------
116   // Methods for the worker threads.
117   //--------------------------------
118 
119   // Called by worker threads to decrease the worker count by one.  May only be
120   // called between Initialize() and Destroy().  May wait for ResumeWorkers()
121   // when called after PauseWorkers().
122   void RemoveSelf();
123 
124   // Called by worker threads between Initialize() and Destroy().  May be called
125   // any number of times.  Return value is whether or not the worker should
126   // continue running.  When called after PauseWorkers(), does not return until
127   // ResumeWorkers() or StopWorkers() has been called.  Number of distinct
128   // calling threads must match the worker count (see AddWorkers() and
129   // RemoveSelf()).
130   bool ContinueRunning();
131 
132   // TODO(matthewb): Is this functionality really necessary?  Remove it if not.
133   //
134   // This is a hack!  It's like ContinueRunning(), except it won't pause.  If
135   // any worker threads use this exclusively in place of ContinueRunning() then
136   // PauseWorkers() should never be used!
137   bool ContinueRunningNoPause();
138 
139  private:
140   enum Status { RUN, PAUSE, STOP };
141 
WaitOnPauseBarrier()142   void WaitOnPauseBarrier() {
143 #ifdef _POSIX_BARRIERS
144     int error = pthread_barrier_wait(&pause_barrier_);
145     if (error != PTHREAD_BARRIER_SERIAL_THREAD)
146       sat_assert(error == 0);
147 #endif
148   }
149 
AcquireNumWorkersLock()150   void AcquireNumWorkersLock() {
151     sat_assert(0 == pthread_mutex_lock(&num_workers_mutex_));
152   }
153 
ReleaseNumWorkersLock()154   void ReleaseNumWorkersLock() {
155     sat_assert(0 == pthread_mutex_unlock(&num_workers_mutex_));
156   }
157 
AcquireStatusReadLock()158   void AcquireStatusReadLock() {
159     sat_assert(0 == pthread_rwlock_rdlock(&status_rwlock_));
160   }
161 
AcquireStatusWriteLock()162   void AcquireStatusWriteLock() {
163     sat_assert(0 == pthread_rwlock_wrlock(&status_rwlock_));
164   }
165 
ReleaseStatusLock()166   void ReleaseStatusLock() {
167     sat_assert(0 == pthread_rwlock_unlock(&status_rwlock_));
168   }
169 
GetStatus()170   Status GetStatus() {
171     AcquireStatusReadLock();
172     Status status = status_;
173     ReleaseStatusLock();
174     return status;
175   }
176 
177   // Returns the previous status.
SetStatus(Status status)178   Status SetStatus(Status status) {
179     AcquireStatusWriteLock();
180     Status prev_status = status_;
181     status_ = status;
182     ReleaseStatusLock();
183     return prev_status;
184   }
185 
186   pthread_mutex_t num_workers_mutex_;
187   int num_workers_;
188 
189   pthread_rwlock_t status_rwlock_;
190   Status status_;
191 
192 #ifdef _POSIX_BARRIERS
193   // Guaranteed to not be in use when (status_ != PAUSE).
194   pthread_barrier_t pause_barrier_;
195 #endif
196 
197   DISALLOW_COPY_AND_ASSIGN(WorkerStatus);
198 };
199 
200 
201 // This is a base class for worker threads.
202 // Each thread repeats a specific
203 // task on various blocks of memory.
204 class WorkerThread {
205  public:
206   // Enum to mark a thread as low/med/high priority.
207   enum Priority {
208     Low,
209     Normal,
210     High,
211   };
212   WorkerThread();
213   virtual ~WorkerThread();
214 
215   // Initialize values and thread ID number.
216   virtual void InitThread(int thread_num_init,
217                           class Sat *sat_init,
218                           class OsLayer *os_init,
219                           class PatternList *patternlist_init,
220                           WorkerStatus *worker_status);
221 
222   // This function is DEPRECATED, it does nothing.
SetPriority(Priority priority)223   void SetPriority(Priority priority) { priority_ = priority; }
224   // Spawn the worker thread, by running Work().
225   int SpawnThread();
226   // Only for ThreadSpawnerGeneric().
227   void StartRoutine();
228   bool InitPriority();
229 
230   // Wait for the thread to complete its cleanup.
231   virtual bool JoinThread();
232   // Kill worker thread with SIGINT.
233   virtual bool KillThread();
234 
235   // This is the task function that the thread executes.
236   // This is implemented per subclass.
237   virtual bool Work();
238 
239   // Starts per-WorkerThread timer.
StartThreadTimer()240   void StartThreadTimer() {gettimeofday(&start_time_, NULL);}
241   // Reads current timer value and returns run duration without recording it.
ReadThreadTimer()242   int64 ReadThreadTimer() {
243     struct timeval end_time_;
244     gettimeofday(&end_time_, NULL);
245     return (end_time_.tv_sec - start_time_.tv_sec)*1000000 +
246       (end_time_.tv_usec - start_time_.tv_usec);
247   }
248   // Stops per-WorkerThread timer and records thread run duration.
249   // Start/Stop ThreadTimer repetitively has cumulative effect, ie the timer
250   // is effectively paused and restarted, so runduration_usec accumulates on.
StopThreadTimer()251   void StopThreadTimer() {
252     runduration_usec_ += ReadThreadTimer();
253   }
254 
255   // Acccess member variables.
GetStatus()256   bool GetStatus() {return status_;}
GetErrorCount()257   int64 GetErrorCount() {return errorcount_;}
GetPageCount()258   int64 GetPageCount() {return pages_copied_;}
GetRunDurationUSec()259   int64 GetRunDurationUSec() {return runduration_usec_;}
260 
261   // Returns bandwidth defined as pages_copied / thread_run_durations.
262   virtual float GetCopiedData();
263   // Calculate worker thread specific copied data.
GetMemoryCopiedData()264   virtual float GetMemoryCopiedData() {return 0;}
GetDeviceCopiedData()265   virtual float GetDeviceCopiedData() {return 0;}
266   // Calculate worker thread specific bandwidth.
GetMemoryBandwidth()267   virtual float GetMemoryBandwidth()
268     {return GetMemoryCopiedData() / (
269         runduration_usec_ * 1.0 / 1000000);}
GetDeviceBandwidth()270   virtual float GetDeviceBandwidth()
271     {return GetDeviceCopiedData() / (
272         runduration_usec_ * 1.0 / 1000000);}
273 
set_cpu_mask(cpu_set_t * mask)274   void set_cpu_mask(cpu_set_t *mask) {
275     memcpy(&cpu_mask_, mask, sizeof(*mask));
276   }
277 
set_cpu_mask_to_cpu(int cpu_num)278   void set_cpu_mask_to_cpu(int cpu_num) {
279     cpuset_set_ab(&cpu_mask_, cpu_num, cpu_num + 1);
280   }
281 
set_tag(int32 tag)282   void set_tag(int32 tag) {tag_ = tag;}
283 
284   // Returns CPU mask, where each bit represents a logical cpu.
285   bool AvailableCpus(cpu_set_t *cpuset);
286   // Returns CPU mask of CPUs this thread is bound to,
287   bool CurrentCpus(cpu_set_t *cpuset);
288   // Returns Current Cpus mask as string.
CurrentCpusFormat()289   string CurrentCpusFormat() {
290     cpu_set_t current_cpus;
291     CurrentCpus(&current_cpus);
292     return cpuset_format(&current_cpus);
293   }
294 
ThreadID()295   int ThreadID() {return thread_num_;}
296 
297   // Bind worker thread to specified CPU(s)
298   bool BindToCpus(const cpu_set_t *cpuset);
299 
300  protected:
301   // This function dictates whether the main work loop
302   // continues, waits, or terminates.
303   // All work loops should be of the form:
304   //   do {
305   //     // work.
306   //   } while (IsReadyToRun());
IsReadyToRun()307   virtual bool IsReadyToRun() { return worker_status_->ContinueRunning(); }
308   // TODO(matthewb): Is this function really necessary? Remove it if not.
309   //
310   // Like IsReadyToRun(), except it won't pause.
IsReadyToRunNoPause()311   virtual bool IsReadyToRunNoPause() {
312     return worker_status_->ContinueRunningNoPause();
313   }
314 
315   // These are functions used by the various work loops.
316   // Pretty print and log a data miscompare.
317   virtual void ProcessError(struct ErrorRecord *er,
318                             int priority,
319                             const char *message);
320 
321   // Compare a region of memory with a known data patter, and report errors.
322   virtual int CheckRegion(void *addr,
323                           class Pattern *pat,
324                           int64 length,
325                           int offset,
326                           int64 patternoffset);
327 
328   // Fast compare a block of memory.
329   virtual int CrcCheckPage(struct page_entry *srcpe);
330 
331   // Fast copy a block of memory, while verifying correctness.
332   virtual int CrcCopyPage(struct page_entry *dstpe,
333                           struct page_entry *srcpe);
334 
335   // Fast copy a block of memory, while verifying correctness, and heating CPU.
336   virtual int CrcWarmCopyPage(struct page_entry *dstpe,
337                               struct page_entry *srcpe);
338 
339   // Fill a page with its specified pattern.
340   virtual bool FillPage(struct page_entry *pe);
341 
342   // Copy with address tagging.
343   virtual bool AdlerAddrMemcpyC(uint64 *dstmem64,
344                                 uint64 *srcmem64,
345                                 unsigned int size_in_bytes,
346                                 AdlerChecksum *checksum,
347                                 struct page_entry *pe);
348   // SSE copy with address tagging.
349   virtual bool AdlerAddrMemcpyWarm(uint64 *dstmem64,
350                                    uint64 *srcmem64,
351                                    unsigned int size_in_bytes,
352                                    AdlerChecksum *checksum,
353                                    struct page_entry *pe);
354   // Crc data with address tagging.
355   virtual bool AdlerAddrCrcC(uint64 *srcmem64,
356                              unsigned int size_in_bytes,
357                              AdlerChecksum *checksum,
358                              struct page_entry *pe);
359   // Setup tagging on an existing page.
360   virtual bool TagAddrC(uint64 *memwords,
361                         unsigned int size_in_bytes);
362   // Report a mistagged cacheline.
363   virtual bool ReportTagError(uint64 *mem64,
364                       uint64 actual,
365                       uint64 tag);
366   // Print out the error record of the tag mismatch.
367   virtual void ProcessTagError(struct ErrorRecord *error,
368                        int priority,
369                        const char *message);
370 
371   // A worker thread can yield itself to give up CPU until it's scheduled again
372   bool YieldSelf();
373 
374  protected:
375   // General state variables that all subclasses need.
376   int thread_num_;                  // Thread ID.
377   volatile bool status_;            // Error status.
378   volatile int64 pages_copied_;     // Recorded for memory bandwidth calc.
379   volatile int64 errorcount_;       // Miscompares seen by this thread.
380 
381   cpu_set_t cpu_mask_;              // Cores this thread is allowed to run on.
382   volatile uint32 tag_;             // Tag hint for memory this thread can use.
383 
384   bool tag_mode_;                   // Tag cachelines with vaddr.
385 
386   // Thread timing variables.
387   struct timeval start_time_;        // Worker thread start time.
388   volatile int64 runduration_usec_;  // Worker run duration in u-seconds.
389 
390   // Function passed to pthread_create.
391   void *(*thread_spawner_)(void *args);
392   pthread_t thread_;                // Pthread thread ID.
393   Priority priority_;               // Worker thread priority.
394   class Sat *sat_;                  // Reference to parent stest object.
395   class OsLayer *os_;               // Os abstraction: put hacks here.
396   class PatternList *patternlist_;  // Reference to data patterns.
397 
398   // Work around style guide ban on sizeof(int).
399   static const uint64 iamint_ = 0;
400   static const int wordsize_ = sizeof(iamint_);
401 
402  private:
403   WorkerStatus *worker_status_;
404 
405   DISALLOW_COPY_AND_ASSIGN(WorkerThread);
406 };
407 
408 // Worker thread to perform File IO.
409 class FileThread : public WorkerThread {
410  public:
411   FileThread();
412   // Set filename to use for file IO.
413   virtual void SetFile(const char *filename_init);
414   virtual bool Work();
415 
416   // Calculate worker thread specific bandwidth.
GetDeviceCopiedData()417   virtual float GetDeviceCopiedData()
418     {return GetCopiedData()*2;}
419   virtual float GetMemoryCopiedData();
420 
421  protected:
422   // Record of where these pages were sourced from, and what
423   // potentially broken components they passed through.
424   struct PageRec {
425      struct Pattern *pattern;  // This is the data it should contain.
426      void *src;  // This is the memory location the data was sourced from.
427      void *dst;  // This is where it ended up.
428   };
429 
430   // These are functions used by the various work loops.
431   // Pretty print and log a data miscompare. Disks require
432   // slightly different error handling.
433   virtual void ProcessError(struct ErrorRecord *er,
434                             int priority,
435                             const char *message);
436 
437   virtual bool OpenFile(int *pfile);
438   virtual bool CloseFile(int fd);
439 
440   // Read and write whole file to disk.
441   virtual bool WritePages(int fd);
442   virtual bool ReadPages(int fd);
443 
444   // Read and write pages to disk.
445   virtual bool WritePageToFile(int fd, struct page_entry *src);
446   virtual bool ReadPageFromFile(int fd, struct page_entry *dst);
447 
448   // Sector tagging support.
449   virtual bool SectorTagPage(struct page_entry *src, int block);
450   virtual bool SectorValidatePage(const struct PageRec &page,
451                                   struct page_entry *dst,
452                                   int block);
453 
454   // Get memory for an incoming data transfer..
455   virtual bool PagePrepare();
456   // Remove memory allocated for data transfer.
457   virtual bool PageTeardown();
458 
459   // Get memory for an incoming data transfer..
460   virtual bool GetEmptyPage(struct page_entry *dst);
461   // Get memory for an outgoing data transfer..
462   virtual bool GetValidPage(struct page_entry *dst);
463   // Throw out a used empty page.
464   virtual bool PutEmptyPage(struct page_entry *src);
465   // Throw out a used, filled page.
466   virtual bool PutValidPage(struct page_entry *src);
467 
468 
469   struct PageRec *page_recs_;          // Array of page records.
470   int crc_page_;                        // Page currently being CRC checked.
471   string filename_;                     // Name of file to access.
472   string devicename_;                   // Name of device file is on.
473 
474   bool page_io_;                        // Use page pool for IO.
475   void *local_page_;                   // malloc'd page fon non-pool IO.
476   int pass_;                            // Number of writes to the file so far.
477 
478   // Tag to detect file corruption.
479   struct SectorTag {
480     volatile uint8 magic;
481     volatile uint8 block;
482     volatile uint8 sector;
483     volatile uint8 pass;
484     char pad[512-4];
485   };
486 
487   DISALLOW_COPY_AND_ASSIGN(FileThread);
488 };
489 
490 
491 // Worker thread to perform Network IO.
492 class NetworkThread : public WorkerThread {
493  public:
494   NetworkThread();
495   // Set hostname to use for net IO.
496   virtual void SetIP(const char *ipaddr_init);
497   virtual bool Work();
498 
499   // Calculate worker thread specific bandwidth.
GetDeviceCopiedData()500   virtual float GetDeviceCopiedData()
501     {return GetCopiedData()*2;}
502 
503  protected:
504   // IsReadyToRunNoPause() wrapper, for NetworkSlaveThread to override.
505   virtual bool IsNetworkStopSet();
506   virtual bool CreateSocket(int *psocket);
507   virtual bool CloseSocket(int sock);
508   virtual bool Connect(int sock);
509   virtual bool SendPage(int sock, struct page_entry *src);
510   virtual bool ReceivePage(int sock, struct page_entry *dst);
511   char ipaddr_[256];
512   int sock_;
513 
514  private:
515   DISALLOW_COPY_AND_ASSIGN(NetworkThread);
516 };
517 
518 // Worker thread to reflect Network IO.
519 class NetworkSlaveThread : public NetworkThread {
520  public:
521   NetworkSlaveThread();
522   // Set socket for IO.
523   virtual void SetSock(int sock);
524   virtual bool Work();
525 
526  protected:
527   virtual bool IsNetworkStopSet();
528 
529  private:
530   DISALLOW_COPY_AND_ASSIGN(NetworkSlaveThread);
531 };
532 
533 // Worker thread to detect incoming Network IO.
534 class NetworkListenThread : public NetworkThread {
535  public:
536   NetworkListenThread();
537   virtual bool Work();
538 
539  private:
540   virtual bool Listen();
541   virtual bool Wait();
542   virtual bool GetConnection(int *pnewsock);
543   virtual bool SpawnSlave(int newsock, int threadid);
544   virtual bool ReapSlaves();
545 
546   // For serviced incoming connections.
547   struct ChildWorker {
548     WorkerStatus status;
549     NetworkSlaveThread thread;
550   };
551   typedef vector<ChildWorker*> ChildVector;
552   ChildVector child_workers_;
553 
554   DISALLOW_COPY_AND_ASSIGN(NetworkListenThread);
555 };
556 
557 // Worker thread to perform Memory Copy.
558 class CopyThread : public WorkerThread {
559  public:
CopyThread()560   CopyThread() {}
561   virtual bool Work();
562   // Calculate worker thread specific bandwidth.
GetMemoryCopiedData()563   virtual float GetMemoryCopiedData()
564     {return GetCopiedData()*2;}
565 
566  private:
567   DISALLOW_COPY_AND_ASSIGN(CopyThread);
568 };
569 
570 // Worker thread to perform Memory Invert.
571 class InvertThread : public WorkerThread {
572  public:
InvertThread()573   InvertThread() {}
574   virtual bool Work();
575   // Calculate worker thread specific bandwidth.
GetMemoryCopiedData()576   virtual float GetMemoryCopiedData()
577     {return GetCopiedData()*4;}
578 
579  private:
580   virtual int InvertPageUp(struct page_entry *srcpe);
581   virtual int InvertPageDown(struct page_entry *srcpe);
582   DISALLOW_COPY_AND_ASSIGN(InvertThread);
583 };
584 
585 // Worker thread to fill blank pages on startup.
586 class FillThread : public WorkerThread {
587  public:
588   FillThread();
589   // Set how many pages this thread should fill before exiting.
590   virtual void SetFillPages(int64 num_pages_to_fill_init);
591   virtual bool Work();
592 
593  private:
594   // Fill a page with the data pattern in pe->pattern.
595   virtual bool FillPageRandom(struct page_entry *pe);
596   int64 num_pages_to_fill_;
597   DISALLOW_COPY_AND_ASSIGN(FillThread);
598 };
599 
600 // Worker thread to verify page data matches pattern data.
601 // Thread will check and replace pages until "done" flag is set,
602 // then it will check and discard pages until no more remain.
603 class CheckThread : public WorkerThread {
604  public:
CheckThread()605   CheckThread() {}
606   virtual bool Work();
607   // Calculate worker thread specific bandwidth.
GetMemoryCopiedData()608   virtual float GetMemoryCopiedData()
609     {return GetCopiedData();}
610 
611  private:
612   DISALLOW_COPY_AND_ASSIGN(CheckThread);
613 };
614 
615 
616 // Worker thread to poll for system error messages.
617 // Thread will check for messages until "done" flag is set.
618 class ErrorPollThread : public WorkerThread {
619  public:
ErrorPollThread()620   ErrorPollThread() {}
621   virtual bool Work();
622 
623  private:
624   DISALLOW_COPY_AND_ASSIGN(ErrorPollThread);
625 };
626 
627 // Computation intensive worker thread to stress CPU.
628 class CpuStressThread : public WorkerThread {
629  public:
CpuStressThread()630   CpuStressThread() {}
631   virtual bool Work();
632 
633  private:
634   DISALLOW_COPY_AND_ASSIGN(CpuStressThread);
635 };
636 
637 // Worker thread that tests the correctness of the
638 // CPU Cache Coherency Protocol.
639 class CpuCacheCoherencyThread : public WorkerThread {
640  public:
641   CpuCacheCoherencyThread(cc_cacheline_data *cc_data,
642                           int cc_cacheline_count_,
643                           int cc_thread_num_,
644                           int cc_inc_count_);
645   virtual bool Work();
646 
647  protected:
648   cc_cacheline_data *cc_cacheline_data_;  // Datstructure for each cacheline.
649   int cc_local_num_;        // Local counter for each thread.
650   int cc_cacheline_count_;  // Number of cache lines to operate on.
651   int cc_thread_num_;       // The integer id of the thread which is
652                             // used as an index into the integer array
653                             // of the cacheline datastructure.
654   int cc_inc_count_;        // Number of times to increment the counter.
655 
656  private:
657   DISALLOW_COPY_AND_ASSIGN(CpuCacheCoherencyThread);
658 };
659 
660 // Worker thread to perform disk test.
661 class DiskThread : public WorkerThread {
662  public:
663   explicit DiskThread(DiskBlockTable *block_table);
664   virtual ~DiskThread();
665   // Calculate disk thread specific bandwidth.
GetDeviceCopiedData()666   virtual float GetDeviceCopiedData() {
667     return (blocks_written_ * write_block_size_ +
668             blocks_read_ * read_block_size_) / kMegabyte;}
669 
670   // Set filename for device file (in /dev).
671   virtual void SetDevice(const char *device_name);
672   // Set various parameters that control the behaviour of the test.
673   virtual bool SetParameters(int read_block_size,
674                              int write_block_size,
675                              int64 segment_size,
676                              int64 cache_size,
677                              int blocks_per_segment,
678                              int64 read_threshold,
679                              int64 write_threshold,
680                              int non_destructive);
681 
682   virtual bool Work();
683 
GetMemoryCopiedData()684   virtual float GetMemoryCopiedData() {return 0;}
685 
686  protected:
687   static const int kSectorSize = 512;       // Size of sector on disk.
688   static const int kBufferAlignment = 512;  // Buffer alignment required by the
689                                             // kernel.
690   static const int kBlockRetry = 100;       // Number of retries to allocate
691                                             // sectors.
692 
693   enum IoOp {
694     ASYNC_IO_READ   = 0,
695     ASYNC_IO_WRITE  = 1
696   };
697 
698   virtual bool OpenDevice(int *pfile);
699   virtual bool CloseDevice(int fd);
700 
701   // Retrieves the size (in bytes) of the disk/file.
702   virtual bool GetDiskSize(int fd);
703 
704   // Retrieves the current time in microseconds.
705   virtual int64 GetTime();
706 
707   // Do an asynchronous disk I/O operation.
708   virtual bool AsyncDiskIO(IoOp op, int fd, void *buf, int64 size,
709                            int64 offset, int64 timeout);
710 
711   // Write a block to disk.
712   virtual bool WriteBlockToDisk(int fd, BlockData *block);
713 
714   // Verify a block on disk.
715   virtual bool ValidateBlockOnDisk(int fd, BlockData *block);
716 
717   // Main work loop.
718   virtual bool DoWork(int fd);
719 
720   int read_block_size_;       // Size of blocks read from disk, in bytes.
721   int write_block_size_;      // Size of blocks written to disk, in bytes.
722   int64 blocks_read_;         // Number of blocks read in work loop.
723   int64 blocks_written_;      // Number of blocks written in work loop.
724   int64 segment_size_;        // Size of disk segments (in bytes) that the disk
725                               // will be split into where testing can be
726                               // confined to a particular segment.
727                               // Allows for control of how evenly the disk will
728                               // be tested.  Smaller segments imply more even
729                               // testing (less random).
730   int blocks_per_segment_;    // Number of blocks that will be tested per
731                               // segment.
732   int cache_size_;            // Size of disk cache, in bytes.
733   int queue_size_;            // Length of in-flight-blocks queue, in blocks.
734   int non_destructive_;       // Use non-destructive mode or not.
735   int update_block_table_;    // If true, assume this is the thread
736                               // responsible for writing the data in the disk
737                               // for this block device and, therefore,
738                               // update the block table. If false, just use
739                               // the block table to get data.
740 
741   // read/write times threshold for reporting a problem
742   int64 read_threshold_;      // Maximum time a read should take (in us) before
743                               // a warning is given.
744   int64 write_threshold_;     // Maximum time a write should take (in us) before
745                               // a warning is given.
746   int64 read_timeout_;        // Maximum time a read can take before a timeout
747                               // and the aborting of the read operation.
748   int64 write_timeout_;       // Maximum time a write can take before a timeout
749                               // and the aborting of the write operation.
750 
751   string device_name_;        // Name of device file to access.
752   int64 device_sectors_;      // Number of sectors on the device.
753 
754   std::queue<BlockData*> in_flight_sectors_;   // Queue of sectors written but
755                                                 // not verified.
756   void *block_buffer_;        // Pointer to aligned block buffer.
757 
758 #ifdef HAVE_LIBAIO_H
759   io_context_t aio_ctx_;     // Asynchronous I/O context for Linux native AIO.
760 #endif
761 
762   DiskBlockTable *block_table_;  // Disk Block Table, shared by all disk
763                                  // threads that read / write at the same
764                                  // device
765 
766   DISALLOW_COPY_AND_ASSIGN(DiskThread);
767 };
768 
769 class RandomDiskThread : public DiskThread {
770  public:
771   explicit RandomDiskThread(DiskBlockTable *block_table);
772   virtual ~RandomDiskThread();
773   // Main work loop.
774   virtual bool DoWork(int fd);
775  protected:
776   DISALLOW_COPY_AND_ASSIGN(RandomDiskThread);
777 };
778 
779 // Worker thread to perform checks in a specific memory region.
780 class MemoryRegionThread : public WorkerThread {
781  public:
782   MemoryRegionThread();
783   ~MemoryRegionThread();
784   virtual bool Work();
785   void ProcessError(struct ErrorRecord *error, int priority,
786                     const char *message);
787   bool SetRegion(void *region, int64 size);
788   // Calculate worker thread specific bandwidth.
GetMemoryCopiedData()789   virtual float GetMemoryCopiedData()
790     {return GetCopiedData();}
GetDeviceCopiedData()791   virtual float GetDeviceCopiedData()
792     {return GetCopiedData() * 2;}
SetIdentifier(string identifier)793   void SetIdentifier(string identifier) {
794     identifier_ = identifier;
795   }
796 
797  protected:
798   // Page queue for this particular memory region.
799   char *region_;
800   PageEntryQueue *pages_;
801   bool error_injection_;
802   int phase_;
803   string identifier_;
804   static const int kPhaseNoPhase = 0;
805   static const int kPhaseCopy = 1;
806   static const int kPhaseCheck = 2;
807 
808  private:
809   DISALLOW_COPY_AND_ASSIGN(MemoryRegionThread);
810 };
811 
812 #endif  // STRESSAPPTEST_WORKER_H_
813