• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 import subprocess
2 import os
3 import pipes
4 import threading
5 from dataclasses import dataclass, asdict, field
6 import logging
7 import sqlite3
8 import time
9 
10 
11 @dataclass
12 class JobInfo:
13     """
14     A class for ota job information
15     """
16     id: str
17     target: str
18     incremental: str = ''
19     verbose: bool = False
20     partial: list[str] = field(default_factory=list)
21     output: str = ''
22     status: str = 'Running'
23     downgrade: bool = False
24     extra: str = ''
25     stdout: str = ''
26     stderr: str = ''
27     start_time: int = 0
28     finish_time: int = 0
29     isPartial: bool = False
30     isIncremental: bool = False
31 
32     def __post_init__(self):
33 
34         def enforce_bool(t): return t if isinstance(t, bool) else bool(t)
35         self.verbose, self.downgrade = map(
36             enforce_bool,
37             [self.verbose, self.downgrade])
38         if self.incremental:
39             self.isIncremental = True
40         if self.partial:
41             self.isPartial = True
42         else:
43             self.partial = []
44         if type(self.partial) == str:
45             self.partial = self.partial.split(',')
46 
47     def to_sql_form_dict(self):
48         """
49         Convert this instance to a dict, which can be later used to insert into
50         the SQL database.
51         Format:
52             id: string, target: string, incremental: string, verbose: int,
53             partial: string, output:string, status:string,
54             downgrade: bool, extra: string, stdout: string, stderr:string,
55             start_time:int, finish_time: int(not required)
56         """
57         sql_form_dict = asdict(self)
58         sql_form_dict['partial'] = ','.join(sql_form_dict['partial'])
59         def bool_to_int(t): return 1 if t else 0
60         sql_form_dict['verbose'], sql_form_dict['downgrade'] = map(
61             bool_to_int,
62             [sql_form_dict['verbose'], sql_form_dict['downgrade']])
63         return sql_form_dict
64 
65     def to_dict_basic(self):
66         """
67         Convert the instance to a dict, which includes the file name of target.
68         """
69         basic_info = asdict(self)
70         basic_info['target_name'] = self.target.split('/')[-1]
71         if self.isIncremental:
72             basic_info['incremental_name'] = self.incremental.split('/')[-1]
73         return basic_info
74 
75     def to_dict_detail(self, target_lib, offset=0):
76         """
77         Convert this instance into a dict, which includes some detailed information
78         of the target/source build, i.e. build version and file name.
79         """
80         detail_info = asdict(self)
81         try:
82             with open(self.stdout, 'r') as fout:
83                 detail_info['stdout'] = fout.read()
84             with open(self.stderr, 'r') as ferr:
85                 detail_info['stderr'] = ferr.read()
86         except FileNotFoundError:
87             detail_info['stdout'] = 'NO STD OUTPUT IS FOUND'
88             detail_info['stderr'] = 'NO STD ERROR IS FOUND'
89         target_info = target_lib.get_build_by_path(self.target)
90         detail_info['target_name'] = target_info.file_name
91         detail_info['target_build_version'] = target_info.build_version
92         if self.incremental:
93             incremental_info = target_lib.get_build_by_path(
94                 self.incremental)
95             detail_info['incremental_name'] = incremental_info.file_name
96             detail_info['incremental_build_version'] = incremental_info.build_version
97         return detail_info
98 
99 
100 class DependencyError(Exception):
101     pass
102 
103 
104 class ProcessesManagement:
105     """
106     A class manage the ota generate process
107     """
108 
109     @staticmethod
110     def check_external_dependencies():
111         try:
112             java_version = subprocess.check_output(["java", "--version"])
113             print("Java version:", java_version.decode())
114         except Exception as e:
115             raise DependencyError(
116                 "java not found in PATH. Attempt to generate OTA might fail. " + str(e))
117         try:
118             zip_version = subprocess.check_output(["zip", "-v"])
119             print("Zip version:", zip_version.decode())
120         except Exception as e:
121             raise DependencyError(
122                 "zip command not found in PATH. Attempt to generate OTA might fail. " + str(e))
123 
124     def __init__(self, *, working_dir='output', db_path=None, otatools_dir=None):
125         """
126         create a table if not exist
127         """
128         ProcessesManagement.check_external_dependencies()
129         self.working_dir = working_dir
130         self.logs_dir = os.path.join(working_dir, 'logs')
131         self.otatools_dir = otatools_dir
132         os.makedirs(self.working_dir, exist_ok=True)
133         os.makedirs(self.logs_dir, exist_ok=True)
134         if not db_path:
135             db_path = os.path.join(self.working_dir, "ota_database.db")
136         self.path = db_path
137         with sqlite3.connect(self.path) as connect:
138             cursor = connect.cursor()
139             cursor.execute("""
140                 CREATE TABLE if not exists Jobs (
141                 ID TEXT,
142                 TargetPath TEXT,
143                 IncrementalPath TEXT,
144                 Verbose INTEGER,
145                 Partial TEXT,
146                 OutputPath TEXT,
147                 Status TEXT,
148                 Downgrade INTEGER,
149                 OtherFlags TEXT,
150                 STDOUT TEXT,
151                 STDERR TEXT,
152                 StartTime INTEGER,
153                 FinishTime INTEGER
154             )
155             """)
156 
157     def insert_database(self, job_info):
158         """
159         Insert the job_info into the database
160         Args:
161             job_info: JobInfo
162         """
163         with sqlite3.connect(self.path) as connect:
164             cursor = connect.cursor()
165             cursor.execute("""
166                     INSERT INTO Jobs (ID, TargetPath, IncrementalPath, Verbose, Partial, OutputPath, Status, Downgrade, OtherFlags, STDOUT, STDERR, StartTime, Finishtime)
167                     VALUES (:id, :target, :incremental, :verbose, :partial, :output, :status, :downgrade, :extra, :stdout, :stderr, :start_time, :finish_time)
168                 """, job_info.to_sql_form_dict())
169 
170     def get_status_by_ID(self, id):
171         """
172         Return the status of job <id> as a instance of JobInfo
173         Args:
174             id: string
175         Return:
176             JobInfo
177         """
178         with sqlite3.connect(self.path) as connect:
179             cursor = connect.cursor()
180             logging.info(id)
181             cursor.execute("""
182             SELECT ID, TargetPath, IncrementalPath, Verbose, Partial, OutputPath, Status, Downgrade, OtherFlags, STDOUT, STDERR, StartTime, FinishTime
183             FROM Jobs WHERE ID=(?)
184             """, (str(id),))
185             row = cursor.fetchone()
186         status = JobInfo(*row)
187         return status
188 
189     def get_status(self):
190         """
191         Return the status of all jobs as a list of JobInfo
192         Return:
193             List[JobInfo]
194         """
195         with sqlite3.connect(self.path) as connect:
196             cursor = connect.cursor()
197             cursor.execute("""
198             SELECT ID, TargetPath, IncrementalPath, Verbose, Partial, OutputPath, Status, Downgrade, OtherFlags, STDOUT, STDERR, StartTime, FinishTime
199             FROM Jobs
200             """)
201             rows = cursor.fetchall()
202         statuses = [JobInfo(*row) for row in rows]
203         return statuses
204 
205     def update_status(self, id, status, finish_time):
206         """
207         Change the status and finish time of job <id> in the database
208         Args:
209             id: string
210             status: string
211             finish_time: int
212         """
213         with sqlite3.connect(self.path) as connect:
214             cursor = connect.cursor()
215             cursor.execute("""
216                 UPDATE Jobs SET Status=(?), FinishTime=(?)
217                 WHERE ID=(?)
218                 """,
219                            (status, finish_time, id))
220 
221     def ota_run(self, command, id, stdout_path, stderr_path):
222         """
223         Initiate a subprocess to run the ota generation. Wait until it finished and update
224         the record in the database.
225         """
226         stderr_pipes = pipes.Template()
227         stdout_pipes = pipes.Template()
228         ferr = stderr_pipes.open(stdout_path, 'w')
229         fout = stdout_pipes.open(stderr_path, 'w')
230         env = {}
231         if self.otatools_dir:
232             env['PATH'] = os.path.join(
233                 self.otatools_dir, "bin") + ":" + os.environ["PATH"]
234         # TODO(lishutong): Enable user to use self-defined stderr/stdout path
235         try:
236             proc = subprocess.Popen(
237                 command, stderr=ferr, stdout=fout, shell=False, env=env, cwd=self.otatools_dir)
238         except FileNotFoundError as e:
239             logging.error('ota_from_target_files is not set properly %s', e)
240             self.update_status(id, 'Error', int(time.time()))
241             raise
242         except Exception as e:
243             logging.error('Failed to execute ota_from_target_files %s', e)
244             self.update_status(id, 'Error', int(time.time()))
245             raise
246 
247         def wait_result():
248             exit_code = proc.wait()
249             if exit_code == 0:
250                 self.update_status(id, 'Finished', int(time.time()))
251             else:
252                 self.update_status(id, 'Error', int(time.time()))
253         threading.Thread(target=wait_result).start()
254 
255     def ota_generate(self, args, id):
256         """
257         Read in the arguments from the frontend and start running the OTA
258         generation process, then update the records in database.
259         Format of args:
260             output: string, extra_keys: List[string], extra: string,
261             isIncremental: bool, isPartial: bool, partial: List[string],
262             incremental: string, target: string, verbose: bool
263         args:
264             args: dict
265             id: string
266         """
267         command = ['ota_from_target_files']
268         # Check essential configuration is properly set
269         if not os.path.isfile(args['target']):
270             raise FileNotFoundError
271         if not 'output' in args:
272             args['output'] = os.path.join(self.working_dir, str(id) + '.zip')
273         if args['verbose']:
274             command.append('-v')
275         if args['extra_keys']:
276             args['extra'] = '--' + \
277                 ' --'.join(args['extra_keys']) + ' ' + args['extra']
278         if args['extra']:
279             command += args['extra'].strip().split(' ')
280         if args['isIncremental']:
281             if not os.path.isfile(args['incremental']):
282                 raise FileNotFoundError
283             command.append('-i')
284             command.append(os.path.realpath(args['incremental']))
285         if args['isPartial']:
286             command.append('--partial')
287             command.append(' '.join(args['partial']))
288         command.append(os.path.realpath(args['target']))
289         command.append(os.path.realpath(args['output']))
290         stdout = os.path.join(self.logs_dir, 'stdout.' + str(id))
291         stderr = os.path.join(self.logs_dir, 'stderr.' + str(id))
292         job_info = JobInfo(id,
293                            target=args['target'],
294                            incremental=args['incremental'] if args['isIncremental'] else '',
295                            verbose=args['verbose'],
296                            partial=args['partial'] if args['isPartial'] else [
297                            ],
298                            output=args['output'],
299                            status='Running',
300                            extra=args['extra'],
301                            start_time=int(time.time()),
302                            stdout=stdout,
303                            stderr=stderr
304                            )
305         self.ota_run(command, id, job_info.stdout, job_info.stderr)
306         self.insert_database(job_info)
307         logging.info(
308             'Starting generating OTA package with id {}: \n {}'
309             .format(id, command))
310