1# Function Flow Runtime开发样例(Kotlin) 2 3本文档旨在介绍如何使用Kotlin语言以及Kotlin与Native层互操作的技术来为FFRT编写包装函数,并提供一些使用这些包装函数的示例。 4 5## Kotlin语言简介 6 7Kotlin是一种现代的、静态类型的编程语言,由JetBrains开发。它设计目标是成为“更好的Java”,但也可以编译成JavaScript或Native代码。Kotlin具有以下主要特点: 8 9- **简洁性:** Kotlin提供了简洁的语法,可以减少样板代码。 10- **空安全:** Kotlin的类型系统可以帮助避免空指针异常。 11- **互操作性:** Kotlin可以与Java代码无缝互操作。 12- **协程:** Kotlin支持协程,这是一种轻量级的并发机制。 13- **多平台:** Kotlin可以编译成JVM字节码、JavaScript或Native代码。 14 15## Kotlin/Native与Native层互操作 16 17Kotlin/Native允许Kotlin代码直接与Native层C/C++代码进行交互。这是通过`cinterop`工具实现的,该工具可以生成Kotlin代码来调用C/C++库。 18 19**互操作原理:** 20 211. **定义C接口:** 首先,你需要有一个C的头文件,其中声明了你想要调用的C函数和数据结构。 222. **`cinterop`工具:** 使用Kotlin/Native提供的`cinterop`工具,根据C头文件生成Kotlin代码。这个生成的代码包含了C函数的Kotlin包装器和C数据结构的Kotlin表示。 233. **Kotlin代码调用:** Kotlin代码就可以像调用普通的Kotlin函数一样调用生成的包装器函数,从而间接地调用C函数。 24 25**使用`cinterop`的步骤:** 26 271. **配置`build.gradle.kts`:** 在Gradle构建文件中配置 `cinterop`。 282. **创建`.def`文件:** 创建一个`.def`文件,其中指定要包含的C头文件、库和其他编译/链接选项。 293. **编写Kotlin代码:** 在Kotlin代码中导入并使用`cinterop`生成的API。 30 31更多的Kotlin与C语言互操作性的指导可以参阅官方文档:[Interoperability with C](https://kotlinlang.org/docs/native-c-interop.html) 32 33## 为什么要为FFRT编写Kotlin包装函数 34 35FFRT主要提供C和C++的接口。虽然可以直接在Kotlin/Native中使用这些接口,但这样做通常比较繁琐,且容易出错。为FFRT编写Kotlin包装函数存在如下收益: 36 37- **简化API使用:** Kotlin包装函数可以提供更简洁、更符合Kotlin语言习惯的API,隐藏底层的C/C++细节。 38- **提高类型安全性:** Kotlin的强类型系统可以帮助避免因类型错误导致的错误。 39- **资源管理:** Kotlin包装函数可以更好地管理FFRT资源的生命周期,例如内存分配和释放。 40- **提供更高级的抽象:** Kotlin包装函数可以提供更高级的抽象,例如使用Kotlin的集合类来代替C数组。 41- **提高开发效率:** 使用Kotlin包装函数可以提高开发效率,减少代码量。 42 43## FFRT的Kotlin封装 (Task.kt和Queue.kt) 44 45以下是FFRT的`Task`和`Queue`接口的一种Kotlin封装实现,**请注意,这只是一个示例,用于抛砖引玉,可能存在其他更好的封装方式。** 46 47包装库的文件目录为: 48 49```plain 50ffrt-wrapper/ 51├── build.gradle.kts 52├── settings.gradle.kts (需要包含 ':ffrt-wrapper') 53├── src/ 54│ ├── nativeInterop/ 55│ │ └── cinterop/ 56│ │ └── ffrt.def <-- cinterop 定义文件 57│ └── nativeMain/ 58│ └── kotlin/ 59│ ├── Queue.kt <-- 队列封装 60│ └── Task.kt <-- 任务封装 61``` 62 63其中`build.gradle.kts`配置文件如下所示: 64 65```kotlin 66// build.gradle.kts 67plugins { 68 kotlin("multiplatform") version "1.9.22" // 修改成项目中实际使用的版本 69} 70 71kotlin { 72 linuxX64("native") { 73 binaries { 74 sharedLib { 75 baseName = "ffrt-wrapper" 76 } 77 } 78 compilations.getByName("main") { 79 cinterops { 80 val ffrt by creating { 81 defFile = file("src/nativeInterop/cinterop/ffrt.def") 82 packageName = "com.huawei.ffrt" 83 } 84 } 85 } 86 } 87 88 sourceSets { 89 val nativeMain by getting { 90 dependencies { 91 // 依赖配置(如果有) 92 } 93 kotlin.srcDirs("src/nativeMain/kotlin") 94 } 95 } 96} 97``` 98 99其中`ffrt.def`配置文件如下所示: 100 101```ini 102// ffrt.def 103headers = path/to/queue.h, path/to/task.h 104package = com.huawei.ffrt 105compilerOpts = -I/path/to/ffrt/include 106linkerOpts = -L/path/to/ffrt/lib -lffrt 107``` 108 109> **说明:** 110> 111> - `Task.kt`和`Queue.kt`中的封装函数仅适配了不带参数的任务闭包,如果业务需要带参数的任务闭包可以修改`createFunctionWrapper`函数。 112> - 用户也可以在现有封装的基础上,再进一步封装更高级、更Kotlin风格的DSL。 113 114### Task.kt 115 116`Task.kt`文件封装了FFRT中与任务相关的接口。 117 118- **`TaskAttr`类:** 封装了`ffrt_task_attr_t`结构体,用于设置任务的属性(例如,任务名称、QoS)。实现了`Closeable`接口来管理资源。 119- **`TaskHandle`类:** 封装了`ffrt_task_handle_t`,用于表示一个任务句柄。实现了`Closeable`接口来管理任务句柄的生命周期。 120- **`Dependence`类:** 封装了`ffrt_dependence_t`,用于表示任务之间的依赖关系。 121- **`Task`object:** 提供了静态函数来提交任务 (`submit`、`submitWithHandle`)、等待任务完成 (`wait`、`waitDeps`) 等。 122 123封装代码如下所示: 124 125```kotlin 126import kotlinx.cinterop.* 127import kotlin.io.Closeable 128import com.huawei.ffrt.* 129 130// 封装 ffrt_queue_priority_t 131enum class QueuePriority(val value: Int) { 132 IMMEDIATE(ffrt_queue_priority_t.ffrt_queue_priority_immediate), 133 HIGH(ffrt_queue_priority_t.ffrt_queue_priority_high), 134 LOW(ffrt_queue_priority_t.ffrt_queue_priority_low), 135 IDLE(ffrt_queue_priority_t.ffrt_queue_priority_idle); 136 137 override fun toString(): String = name 138 139 companion object { 140 fun fromInt(value: Int): QueuePriority? = values().find { it.value == value } 141 } 142} 143 144// 封装 ffrt_task_attr_t 145class TaskAttr : Closeable { 146 private val struct = nativeHeap.alloc<ffrt_task_attr_t>() 147 internal val nativeAttr: CPointer<ffrt_task_attr_t> = struct.ptr 148 private var namePtr: CPointer<ByteVar>? = null 149 150 init { ffrt_task_attr_init(nativeAttr) } 151 152 constructor(name: String? = null) : this() { 153 this.name = name 154 } 155 156 var name: String? 157 get() = ffrt_task_attr_get_name(nativeAttr)?.toKString() 158 set(value) { 159 namePtr?.let { 160 nativeHeap.free(it) 161 namePtr = null 162 } 163 namePtr = value?.cstr?.getPointer(nativeHeap) 164 ffrt_task_attr_set_name(nativeAttr, namePtr) 165 } 166 167 fun name(name: String?): TaskAttr { 168 this.name = name 169 return this 170 } 171 172 var qos: Int 173 get() = ffrt_task_attr_get_qos(nativeAttr) 174 set(value) { ffrt_task_attr_set_qos(nativeAttr, value) } 175 176 fun qos(qos: Int): TaskAttr { 177 this.qos = qos 178 return this 179 } 180 181 var timeout: Long 182 get() = ffrt_task_attr_get_timeout(nativeAttr).toLong() 183 set(value) { ffrt_task_attr_set_timeout(nativeAttr, value.convert()) } 184 185 fun timeout(timeout: Long): TaskAttr { 186 this.timeout = timeout 187 return this 188 } 189 190 var delay: Long 191 get() = ffrt_task_attr_get_delay(nativeAttr).toLong() 192 set(value) { ffrt_task_attr_set_delay(nativeAttr, value.convert()) } 193 194 fun delay(delay: Long): TaskAttr { 195 this.delay = delay 196 return this 197 } 198 199 var queuePriority: QueuePriority 200 get() = QueuePriority.values().first { it.value == ffrt_task_attr_get_queue_priority(nativeAttr).value } 201 set(value) { 202 ffrt_task_attr_set_queue_priority(nativeAttr, value.value) 203 } 204 205 fun queuePriority(priority: QueuePriority): TaskAttr { 206 this.queuePriority = priority 207 return this 208 } 209 210 override fun close() { 211 ffrt_task_attr_destroy(nativeAttr) 212 namePtr?.let { 213 nativeHeap.free(it) 214 namePtr = null 215 } 216 nativeHeap.free(struct) 217 } 218} 219 220// 封装 ffrt_task_handle_t 221class TaskHandle(internal val nativeHandle: ffrt_task_handle_t) : Closeable { 222 override fun close() { ffrt_task_handle_destroy(nativeHandle) } 223} 224 225// 封装 ffrt_dependence_t 226class Dependence : Closeable { 227 internal val nativeDependence: ffrt_dependence_t 228 private var isTaskDependence: Boolean = false 229 230 // 数据依赖 231 constructor(data: COpaquePointer?) { 232 nativeDependence = ffrt_dependence_t().apply { 233 type = ffrt_dependence_type_t.ffrt_dependence_data 234 ptr = data 235 } 236 } 237 238 // 任务依赖 239 constructor(taskHandle: TaskHandle) { 240 nativeDependence = ffrt_dependence_t().apply { 241 type = ffrt_dependence_type_t.ffrt_dependence_task 242 ptr = taskHandle.nativeHandle 243 } 244 ffrt_task_handle_inc_ref(taskHandle.nativeHandle) 245 isTaskDependence = true 246 } 247 248 override fun close() { 249 if (isTaskDependence && nativeDependence.ptr != null) { 250 ffrt_task_handle_dec_ref(nativeDependence.ptr.reinterpret()) 251 } 252 } 253} 254 255fun createFunctionWrapper( 256 func: () -> Unit, 257 kind: ffrt_function_kind_t 258): CPointer<ffrt_function_header_t> { 259 val stableRef = StableRef.create(func) 260 val functionStorage = ffrt_alloc_auto_managed_function_storage_base(kind) 261 ?: throw Error("Failed to allocate function storage") 262 263 /* 264 functionStorage: 265 +-----------------------------+ <-- arg 266 | ffrt_function_header_t | <-- header fields (exec, destroy, ...) 267 | exec = execFunction | 268 | destroy = destroyFunction | 269 | reserve[2] = 0 | 270 +-----------------------------+ 271 | StableRef<() -> Unit> | <-- closure ptr (offset = sizeof(header)) 272 +-----------------------------+ 273 */ 274 val header = functionStorage.reinterpret<ffrt_function_header_t>() 275 header.pointed.exec = staticCFunction { arg -> 276 extractClosurePtr(arg).asStableRef<() -> Unit>().get().invoke() 277 } 278 header.pointed.destroy = staticCFunction { arg -> 279 extractClosurePtr(arg).asStableRef<() -> Unit>().dispose() 280 } 281 header.pointed.reserve[0] = 0uL 282 header.pointed.reserve[1] = 0uL 283 284 // 将 StableRef 存储在 header 后面,用于 C++ 风格封装 285 val headerSize = sizeOf<ffrt_function_header_t>() 286 val closureStorageOffset = functionStorage.rawValue + headerSize 287 closureStorageOffset.reinterpret<ULongVar>().pointed.value = stableRef.asCPointer().rawValue.toULong() 288 289 return header 290} 291 292private fun extractClosurePtr(arg: COpaquePointer?): COpaquePointer { 293 requireNotNull(arg) 294 val headerSize = sizeOf<ffrt_function_header_t>() 295 val closureOffset = arg.rawValue + headerSize 296 return closureOffset.toCPointer<COpaquePointerVar>()!!.pointed.value!! 297} 298 299object Task { 300 // 封装 ffrt_submit_base 301 fun submit( 302 func: () -> Unit, 303 inDeps: List<Dependence> = emptyList(), 304 outDeps: List<Dependence> = emptyList(), 305 attr: TaskAttr = TaskAttr() 306 ) { 307 memScoped { 308 val inDepsNative = inDeps.toNativeDeps(this) 309 val outDepsNative = outDeps.toNativeDeps(this) 310 val funcWrapper = createFunctionWrapper(func, ffrt_function_kind_t.ffrt_function_kind_general) 311 ffrt_submit_base(funcWrapper, inDepsNative.ptr, outDepsNative.ptr, attr.nativeAttr) 312 } 313 inDeps.forEach { it.close() } 314 outDeps.forEach { it.close() } 315 attr.close() 316 } 317 318 // 封装 ffrt_submit_h_base 319 fun submitWithHandle( 320 func: () -> Unit, 321 inDeps: List<Dependence> = emptyList(), 322 outDeps: List<Dependence> = emptyList(), 323 attr: TaskAttr = TaskAttr() 324 ): TaskHandle { 325 return memScoped { 326 val inDepsNative = inDeps.toNativeDeps(this) 327 val outDepsNative = outDeps.toNativeDeps(this) 328 val funcWrapper = createFunctionWrapper(func, ffrt_function_kind_t.ffrt_function_kind_general) 329 val handle = ffrt_submit_h_base(funcWrapper, inDepsNative.ptr, outDepsNative.ptr, attr.nativeAttr) 330 TaskHandle(handle!!) 331 }.also { 332 inDeps.forEach { it.close() } 333 outDeps.forEach { it.close() } 334 attr.close() 335 } 336 } 337 338 // 封装 ffrt_wait 339 fun wait() { 340 ffrt_wait() 341 } 342 343 // 封装 ffrt_wait_deps 344 fun waitDeps(deps: List<Dependence>) { 345 memScoped { 346 val nativeDeps = deps.toNativeDeps(this) 347 ffrt_wait_deps(nativeDeps.ptr) 348 } 349 deps.forEach { it.close() } 350 } 351 352 private fun List<Dependence>.toNativeDeps(scope: MemScope): ffrt_deps_t { 353 if (this.isEmpty()) { return ffrt_deps_t() } 354 val nativeDeps = scope.allocArray<ffrt_dependence_t>(this.size) 355 for (i in this.indices) { 356 nativeDeps[i] = this[i].nativeDependence 357 } 358 return ffrt_deps_t().apply { 359 len = this@toNativeDeps.size.convert() 360 items = nativeDeps 361 } 362 } 363} 364``` 365 366### Queue.kt 367 368`Queue.kt`文件封装了FFRT中与队列相关的接口。 369 370- **`QueueAttr`类:** 封装了 `ffrt_queue_attr_t` 结构体,用于设置队列的属性(例如,队列类型、QoS)。实现了`Closeable`接口来管理资源。 371- **`QueueType`enum class:** 封装了FFRT中定义的队列类型。 372- **`Queue`类:** 封装了`ffrt_queue_t`,用于表示一个FFRT队列。提供了函数来提交任务 (`submit`、`submitWithHandle`)、取消任务 (`cancel`)、等待任务完成 (`wait`) 等。 373 374封装代码如下所示: 375 376```kotlin 377import kotlinx.cinterop.* 378import kotlin.io.Closeable 379import com.huawei.ffrt.* 380 381// 封装 ffrt_queue_type_t 382enum class QueueType(val value: Int) { 383 SERIAL(ffrt_queue_type_t.ffrt_queue_serial), 384 CONCURRENT(ffrt_queue_type_t.ffrt_queue_concurrent); 385 386 override fun toString(): String = name 387 388 companion object { 389 fun fromInt(value: Int): QueueType? = values().find { it.value == value } 390 } 391} 392 393// 封装 ffrt_queue_attr_t 394class QueueAttr : Closeable { 395 private val struct = nativeHeap.alloc<ffrt_queue_attr_t>() 396 internal val nativeAttr: CPointer<ffrt_queue_attr_t> = struct.ptr 397 398 init { 399 ffrt_queue_attr_init(nativeAttr) 400 } 401 402 var qos: Int 403 get() = ffrt_queue_attr_get_qos(nativeAttr) 404 set(value) { ffrt_queue_attr_set_qos(nativeAttr, value) } 405 406 fun qos(qos: Int): QueueAttr { 407 this.qos = qos 408 return this 409 } 410 411 var maxConcurrency: Int 412 get() = ffrt_queue_attr_get_max_concurrency(nativeAttr) 413 set(value) { ffrt_queue_attr_set_max_concurrency(nativeAttr, value) } 414 415 fun maxConcurrency(value: Int): QueueAttr { 416 this.maxConcurrency = value 417 return this 418 } 419 420 override fun close() { 421 ffrt_queue_attr_destroy(nativeAttr) 422 nativeHeap.free(struct) 423 } 424} 425 426class Queue internal constructor( 427 private val queueHandle: ffrt_queue_t, 428 private val ownsHandle: Boolean = true 429) : Closeable { 430 constructor(type: QueueType, name: String, attr: QueueAttr = QueueAttr()) : this( 431 ffrt_queue_create(type.value, name.cstr.ptr, attr.nativeAttr) ?: error("Queue creation failed") 432 ) 433 434 constructor(name: String, attr: QueueAttr = QueueAttr()) : this( 435 ffrt_queue_create(QueueType.SERIAL.value, name.cstr.ptr, attr.nativeAttr) ?: error("Queue creation failed") 436 ) 437 438 // 封装 ffrt_queue_submit 439 fun submit(func: () -> Unit, attr: TaskAttr = TaskAttr()) { 440 val f = createFunctionWrapper(func, ffrt_function_kind_t.ffrt_function_kind_queue) 441 ffrt_queue_submit(queueHandle, f, attr.nativeAttr) 442 attr.close() 443 } 444 445 // 封装 ffrt_queue_submit_h 446 fun submitWithHandle(func: () -> Unit, attr: TaskAttr = TaskAttr()): TaskHandle { 447 val f = createFunctionWrapper(func, ffrt_function_kind_t.ffrt_function_kind_queue) 448 val handle = ffrt_queue_submit_h(queueHandle, f, attr.nativeAttr) 449 attr.close() 450 return TaskHandle(handle ?: error("Queue submit_h failed")) 451 } 452 453 // 封装 ffrt_queue_cancel 454 fun cancel(handle: TaskHandle): Int = ffrt_queue_cancel(handle.nativeHandle) 455 456 // 封装 ffrt_queue_wait 457 fun wait(handle: TaskHandle) = ffrt_queue_wait(handle.nativeHandle) 458 459 override fun close() { 460 if (ownsHandle) { 461 ffrt_queue_destroy(queueHandle) 462 } 463 } 464 465 companion object { 466 fun getMainQueue(): Queue? { 467 val q = ffrt_get_main_queue() 468 return if (q != null) Queue(q, ownsHandle = false) else null 469 } 470 } 471} 472``` 473 474## 使用Kotlin封装的示例 475 476以下是一些使用上面介绍的Kotlin封装来编写的FFRT示例。 477 478### Fibonacci数列 479 480这个示例展示了如何使用FFRT并行计算Fibonacci数列。具体用例场景可以参考:[图依赖并发(C++)](ffrt-concurrency-graph-cpp.md)。 481 482```kotlin 483import kotlinx.cinterop.* 484import com.huawei.ffrt.* 485 486data class FibArgs(val x: Int, var y: Int) 487 488fun fibonacci(x: Int): Int { 489 return if (x <= 1) { 490 x 491 } else { 492 fibonacci(x - 1) + fibonacci(x - 2) 493 } 494} 495 496fun main() { 497 val result = FibArgs(5, 0) 498 499 Task.submit { 500 result.y = fibonacci(result.x) 501 } 502 503 Task.wait() // 等待任务完成 504 505 println("Fibonacci(${result.x}) is ${result.y}") 506} 507``` 508 509### 任务图依赖 510 511这个示例展示了如何使用FFRT创建和执行具有依赖关系的多个任务。具体用例场景可以参考:[图依赖并发(C++)](ffrt-concurrency-graph-cpp.md)。 512 513```kotlin 514import kotlinx.cinterop.* 515import com.huawei.ffrt.* 516 517fun main() { 518 // 提交任务A 519 val handleA = Task.submitWithHandle { 520 println("视频解析") 521 } 522 523 // 提交任务B和C 524 val depsA = listOf(Dependence(handleA)) 525 val handleB = Task.submitWithHandle( 526 { println("视频转码") }, 527 inDeps = depsA 528 ) 529 val handleC = Task.submitWithHandle( 530 { println("视频生成缩略图") }, 531 inDeps = depsA 532 ) 533 534 // 提交任务D 535 val depsBC = listOf(Dependence(handleB), Dependence(handleC)) 536 val handleD = Task.submitWithHandle( 537 { println("视频添加水印") }, 538 inDeps = depsBC 539 ) 540 541 // 提交任务E 542 val depsD = listOf(Dependence(handleD)) 543 Task.submit( 544 { println("视频发布") }, 545 inDeps = depsD 546 ) 547 548 // 等待所有任务完成 549 Task.wait() 550} 551``` 552 553### 串行队列 554 555这个示例展示了如何使用FFRT队列来实现一个简单的日志系统。具体用例场景可以参考:[串行队列(C++)](ffrt-concurrency-serial-queue-cpp.md)。 556 557```kotlin 558import kotlinx.cinterop.* 559import kotlin.io.Closeable 560import com.huawei.ffrt.* 561import java.io.File 562import java.io.FileOutputStream 563import java.io.IOException 564 565// 封装日志系统 566class Logger(filename: String) : Closeable { 567 568 private val queue: Queue = Queue("logger_queue") // 创建 FFRT 串行队列 569 private val logFile = File(filename) 570 private val outputStream: FileOutputStream 571 572 init { 573 try { 574 outputStream = FileOutputStream(logFile, true) // 以追加模式打开文件 575 println("Log file opened: $filename") 576 } catch (e: IOException) { 577 throw IOException("Failed to open log file: $filename", e) 578 } 579 } 580 581 fun log(message: String) { 582 queue.submit { 583 try { 584 outputStream.write("$message\n".toByteArray()) 585 outputStream.flush() 586 } catch (e: IOException) { 587 System.err.println("Error writing to log: ${e.message}") 588 } 589 } 590 } 591 592 override fun close() { 593 queue.close() // 销毁 FFRT 队列 594 try { 595 outputStream.close() 596 println("Log file closed") 597 } catch (e: IOException) { 598 System.err.println("Error closing log file: ${e.message}") 599 } 600 } 601} 602 603fun main() { 604 val logger = Logger("log.txt") 605 606 // 主线程添加日志任务 607 logger.log("Log message 1") 608 logger.log("Log message 2") 609 logger.log("Log message 3") 610 611 // 模拟主线程继续执行其他任务 612 Thread.sleep(1000) // Kotlin/JVM 的 sleep (使用 Long) 613 614 logger.close() 615} 616``` 617 618### 并发队列 619 620这个示例展示了如何使用FFRT队列来实现一个简单的银行服务系统。具体用例场景可以参考:[并发队列(C++)](ffrt-concurrency-concurrent-queue-cpp.md)。 621 622```kotlin 623import kotlinx.cinterop.* 624import kotlin.io.Closeable 625import com.huawei.ffrt.* 626 627// 封装并发队列系统 628class ConcurrentQueueSystem(name: String, concurrency: Int) : Closeable { 629 630 private val queue: Queue 631 632 init { 633 val queueAttr = QueueAttr().maxConcurrency(concurrency) 634 queue = Queue(QueueType.CONCURRENT, name, queueAttr) 635 println("Concurrent queue system '$name' initialized with concurrency $concurrency") 636 } 637 638 // 提交任务到队列 639 fun enqueue( 640 taskName: String, 641 qos: Int, 642 delayMillis: Long = 0, 643 priority: QueuePriority, 644 block: () -> Unit 645 ): TaskHandle { 646 val taskAttr = TaskAttr() 647 .name(taskName) 648 .qos(qos) 649 .delay(delayMillis.toInt()) 650 .queuePriority(priority) 651 return queue.submitWithHandle(block, taskAttr) 652 } 653 654 // 取消任务 655 fun cancel(taskHandle: TaskHandle): Int { 656 return queue.cancel(taskHandle) 657 } 658 659 // 等待任务完成 660 fun wait(taskHandle: TaskHandle) { 661 queue.wait(taskHandle) 662 } 663 664 override fun close() { 665 queue.close() 666 println("Concurrent queue system closed") 667 } 668} 669 670fun bankBusiness(customerName: String) { 671 Thread.sleep(100) // 模拟耗时操作 672 println("Serving customer: $customerName in thread ${Thread.currentThread().name}") 673} 674 675fun bankBusinessVIP(customerName: String) { 676 Thread.sleep(50) // 模拟VIP服务更快 677 println("Serving VIP customer: $customerName in thread ${Thread.currentThread().name}") 678} 679 680fun main() { 681 val bankQueueSystem = ConcurrentQueueSystem("Bank", 2) // 银行有2个窗口 682 683 val task1 = bankQueueSystem.enqueue("Customer1", QueuePriority.LOW, 0) { 684 bankBusiness("Customer1") 685 } 686 687 val task2 = bankQueueSystem.enqueue("Customer2", QueuePriority.LOW, 0) { 688 bankBusiness("Customer2") 689 } 690 691 val task3 = bankQueueSystem.enqueue("Customer3 VIP", QueuePriority.HIGH, 0) { 692 bankBusinessVIP("Customer3 VIP") 693 } 694 695 val task4 = bankQueueSystem.enqueue("Customer4", QueuePriority.LOW, 0) { 696 bankBusiness("Customer4") 697 } 698 699 val task5 = bankQueueSystem.enqueue("Customer5", QueuePriority.LOW, 0) { 700 bankBusiness("Customer5") 701 } 702 703 // 模拟取消一个任务 704 bankQueueSystem.cancel(task4) 705 706 // 模拟等待任务完成 707 bankQueueSystem.wait(task5) 708 709 bankQueueSystem.close() 710} 711``` 712