VirtualBox

source: vbox/trunk/src/VBox/Runtime/common/misc/aiomgr.cpp@ 62566

最後變更 在這個檔案從62566是 62566,由 vboxsync 提交於 9 年 前

IPRT: More unused parameters.

  • 屬性 svn:eol-style 設為 native
  • 屬性 svn:keywords 設為 Author Date Id Revision
檔案大小: 42.1 KB
 
1/* $Id: aiomgr.cpp 62566 2016-07-26 15:16:41Z vboxsync $ */
2/** @file
3 * IPRT - Async I/O manager.
4 */
5
6/*
7 * Copyright (C) 2013-2016 Oracle Corporation
8 *
9 * This file is part of VirtualBox Open Source Edition (OSE), as
10 * available from http://www.alldomusa.eu.org. This file is free software;
11 * you can redistribute it and/or modify it under the terms of the GNU
12 * General Public License (GPL) as published by the Free Software
13 * Foundation, in version 2 as it comes in the "COPYING" file of the
14 * VirtualBox OSE distribution. VirtualBox OSE is distributed in the
15 * hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
16 *
17 * The contents of this file may alternatively be used under the terms
18 * of the Common Development and Distribution License Version 1.0
19 * (CDDL) only, as it comes in the "COPYING.CDDL" file of the
20 * VirtualBox OSE distribution, in which case the provisions of the
21 * CDDL are applicable instead of those of the GPL.
22 *
23 * You may elect to license modified versions of this file under the
24 * terms and conditions of either the GPL or the CDDL or both.
25 */
26
27
28/*********************************************************************************************************************************
29* Header Files *
30*********************************************************************************************************************************/
31
32#include <iprt/aiomgr.h>
33#include <iprt/err.h>
34#include <iprt/asm.h>
35#include <iprt/mem.h>
36#include <iprt/file.h>
37#include <iprt/list.h>
38#include <iprt/thread.h>
39#include <iprt/assert.h>
40#include <iprt/string.h>
41#include <iprt/critsect.h>
42#include <iprt/memcache.h>
43#include <iprt/semaphore.h>
44#include <iprt/queueatomic.h>
45
46#include "internal/magics.h"
47
48
49/*********************************************************************************************************************************
50* Structures and Typedefs *
51*********************************************************************************************************************************/
52
53/** Pointer to an internal async I/O file instance. */
54typedef struct RTAIOMGRFILEINT *PRTAIOMGRFILEINT;
55
56/**
57 * Blocking event types.
58 */
59typedef enum RTAIOMGREVENT
60{
61 /** Invalid tye */
62 RTAIOMGREVENT_INVALID = 0,
63 /** No event pending. */
64 RTAIOMGREVENT_NO_EVENT,
65 /** A file is added to the manager. */
66 RTAIOMGREVENT_FILE_ADD,
67 /** A file is about to be closed. */
68 RTAIOMGREVENT_FILE_CLOSE,
69 /** The async I/O manager is shut down. */
70 RTAIOMGREVENT_SHUTDOWN,
71 /** 32bit hack */
72 RTAIOMGREVENT_32BIT_HACK = 0x7fffffff
73} RTAIOMGREVENT;
74
75/**
76 * Async I/O manager instance data.
77 */
78typedef struct RTAIOMGRINT
79{
80 /** Magic value. */
81 uint32_t u32Magic;
82 /** Reference count. */
83 volatile uint32_t cRefs;
84 /** Async I/O context handle. */
85 RTFILEAIOCTX hAioCtx;
86 /** async I/O thread. */
87 RTTHREAD hThread;
88 /** List of files assigned to this manager. */
89 RTLISTANCHOR ListFiles;
90 /** Number of requests active currently. */
91 unsigned cReqsActive;
92 /** Number of maximum requests active. */
93 uint32_t cReqsActiveMax;
94 /** Memory cache for requests. */
95 RTMEMCACHE hMemCacheReqs;
96 /** Critical section protecting the blocking event handling. */
97 RTCRITSECT CritSectBlockingEvent;
98 /** Event semaphore for blocking external events.
99 * The caller waits on it until the async I/O manager
100 * finished processing the event. */
101 RTSEMEVENT hEventSemBlock;
102 /** Blocking event type */
103 volatile RTAIOMGREVENT enmBlockingEvent;
104 /** Event type data */
105 union
106 {
107 /** The file to be added */
108 volatile PRTAIOMGRFILEINT pFileAdd;
109 /** The file to be closed */
110 volatile PRTAIOMGRFILEINT pFileClose;
111 } BlockingEventData;
112} RTAIOMGRINT;
113/** Pointer to an internal async I/O manager instance. */
114typedef RTAIOMGRINT *PRTAIOMGRINT;
115
116/**
117 * Async I/O manager file instance data.
118 */
119typedef struct RTAIOMGRFILEINT
120{
121 /** Magic value. */
122 uint32_t u32Magic;
123 /** Reference count. */
124 volatile uint32_t cRefs;
125 /** Flags. */
126 uint32_t fFlags;
127 /** Opaque user data passed on creation. */
128 void *pvUser;
129 /** File handle. */
130 RTFILE hFile;
131 /** async I/O manager this file belongs to. */
132 PRTAIOMGRINT pAioMgr;
133 /** Work queue for new requests. */
134 RTQUEUEATOMIC QueueReqs;
135 /** Completion callback for this file. */
136 PFNRTAIOMGRREQCOMPLETE pfnReqCompleted;
137 /** Data for exclusive use by the assigned async I/O manager. */
138 struct
139 {
140 /** List node of assigned files for a async I/O manager. */
141 RTLISTNODE NodeAioMgrFiles;
142 /** List of requests waiting for submission. */
143 RTLISTANCHOR ListWaitingReqs;
144 /** Number of requests currently being processed for this endpoint
145 * (excluded flush requests). */
146 unsigned cReqsActive;
147 } AioMgr;
148} RTAIOMGRFILEINT;
149
150/** Flag whether the file is closed. */
151#define RTAIOMGRFILE_FLAGS_CLOSING RT_BIT_32(1)
152
153/**
154 * Request type.
155 */
156typedef enum RTAIOMGRREQTYPE
157{
158 /** Invalid request type. */
159 RTAIOMGRREQTYPE_INVALID = 0,
160 /** Read reques type. */
161 RTAIOMGRREQTYPE_READ,
162 /** Write request. */
163 RTAIOMGRREQTYPE_WRITE,
164 /** Flush request. */
165 RTAIOMGRREQTYPE_FLUSH,
166 /** Prefetech request. */
167 RTAIOMGRREQTYPE_PREFETCH,
168 /** 32bit hack. */
169 RTAIOMGRREQTYPE_32BIT_HACK = 0x7fffffff
170} RTAIOMGRREQTYPE;
171/** Pointer to a reques type. */
172typedef RTAIOMGRREQTYPE *PRTAIOMGRREQTYPE;
173
174/**
175 * Async I/O manager request.
176 */
177typedef struct RTAIOMGRREQ
178{
179 /** Atomic queue work item. */
180 RTQUEUEATOMICITEM WorkItem;
181 /** Node for a waiting list. */
182 RTLISTNODE NodeWaitingList;
183 /** Request flags. */
184 uint32_t fFlags;
185 /** Transfer type. */
186 RTAIOMGRREQTYPE enmType;
187 /** Assigned file request. */
188 RTFILEAIOREQ hReqIo;
189 /** File the request belongs to. */
190 PRTAIOMGRFILEINT pFile;
191 /** Opaque user data. */
192 void *pvUser;
193 /** Start offset */
194 RTFOFF off;
195 /** Data segment. */
196 RTSGSEG DataSeg;
197 /** When non-zero the segment uses a bounce buffer because the provided buffer
198 * doesn't meet host requirements. */
199 size_t cbBounceBuffer;
200 /** Pointer to the used bounce buffer if any. */
201 void *pvBounceBuffer;
202 /** Start offset in the bounce buffer to copy from. */
203 uint32_t offBounceBuffer;
204} RTAIOMGRREQ;
205/** Pointer to a I/O manager request. */
206typedef RTAIOMGRREQ *PRTAIOMGRREQ;
207
208/** Flag whether the request was prepared already. */
209#define RTAIOMGRREQ_FLAGS_PREPARED RT_BIT_32(0)
210
211
212/*********************************************************************************************************************************
213* Defined Constants And Macros *
214*********************************************************************************************************************************/
215
216/** Validates a handle and returns VERR_INVALID_HANDLE if not valid. */
217#define RTAIOMGR_VALID_RETURN_RC(a_hAioMgr, a_rc) \
218 do { \
219 AssertPtrReturn((a_hAioMgr), (a_rc)); \
220 AssertReturn((a_hAioMgr)->u32Magic == RTAIOMGR_MAGIC, (a_rc)); \
221 } while (0)
222
223/** Validates a handle and returns VERR_INVALID_HANDLE if not valid. */
224#define RTAIOMGR_VALID_RETURN(a_hAioMgr) RTAIOMGR_VALID_RETURN_RC((hAioMgr), VERR_INVALID_HANDLE)
225
226/** Validates a handle and returns (void) if not valid. */
227#define RTAIOMGR_VALID_RETURN_VOID(a_hAioMgr) \
228 do { \
229 AssertPtrReturnVoid(a_hAioMgr); \
230 AssertReturnVoid((a_hAioMgr)->u32Magic == RTAIOMGR_MAGIC); \
231 } while (0)
232
233
234/*********************************************************************************************************************************
235* Internal Functions *
236*********************************************************************************************************************************/
237
238static int rtAioMgrReqsEnqueue(PRTAIOMGRINT pThis, PRTAIOMGRFILEINT pFile,
239 PRTFILEAIOREQ pahReqs, unsigned cReqs);
240
241/**
242 * Removes an endpoint from the currently assigned manager.
243 *
244 * @returns TRUE if there are still requests pending on the current manager for this endpoint.
245 * FALSE otherwise.
246 * @param pFile The endpoint to remove.
247 */
248static bool rtAioMgrFileRemove(PRTAIOMGRFILEINT pFile)
249{
250 /* Make sure that there is no request pending on this manager for the endpoint. */
251 if (!pFile->AioMgr.cReqsActive)
252 {
253 RTListNodeRemove(&pFile->AioMgr.NodeAioMgrFiles);
254 return false;
255 }
256
257 return true;
258}
259
260/**
261 * Allocate a new I/O request.
262 *
263 * @returns Pointer to the allocated request or NULL if out of memory.
264 * @param pThis The async I/O manager instance.
265 */
266static PRTAIOMGRREQ rtAioMgrReqAlloc(PRTAIOMGRINT pThis)
267{
268 return (PRTAIOMGRREQ)RTMemCacheAlloc(pThis->hMemCacheReqs);
269}
270
271/**
272 * Frees an I/O request.
273 *
274 * @returns nothing.
275 * @param pThis The async I/O manager instance.
276 * @param pReq The request to free.
277 */
278static void rtAioMgrReqFree(PRTAIOMGRINT pThis, PRTAIOMGRREQ pReq)
279{
280 if (pReq->cbBounceBuffer)
281 {
282 AssertPtr(pReq->pvBounceBuffer);
283 RTMemPageFree(pReq->pvBounceBuffer, pReq->cbBounceBuffer);
284 pReq->pvBounceBuffer = NULL;
285 pReq->cbBounceBuffer = 0;
286 }
287 pReq->fFlags = 0;
288 RTAioMgrFileRelease(pReq->pFile);
289 RTMemCacheFree(pThis->hMemCacheReqs, pReq);
290}
291
292static void rtAioMgrReqCompleteRc(PRTAIOMGRINT pThis, PRTAIOMGRREQ pReq,
293 int rcReq, size_t cbTransfered)
294{
295 int rc = VINF_SUCCESS;
296 PRTAIOMGRFILEINT pFile;
297
298 pFile = pReq->pFile;
299 pThis->cReqsActive--;
300 pFile->AioMgr.cReqsActive--;
301
302 /*
303 * It is possible that the request failed on Linux with kernels < 2.6.23
304 * if the passed buffer was allocated with remap_pfn_range or if the file
305 * is on an NFS endpoint which does not support async and direct I/O at the same time.
306 * The endpoint will be migrated to a failsafe manager in case a request fails.
307 */
308 if (RT_FAILURE(rcReq))
309 {
310 pFile->pfnReqCompleted(pFile, rcReq, pReq->pvUser);
311 rtAioMgrReqFree(pThis, pReq);
312 }
313 else
314 {
315 /*
316 * Restart an incomplete transfer.
317 * This usually means that the request will return an error now
318 * but to get the cause of the error (disk full, file too big, I/O error, ...)
319 * the transfer needs to be continued.
320 */
321 if (RT_UNLIKELY( cbTransfered < pReq->DataSeg.cbSeg
322 || ( pReq->cbBounceBuffer
323 && cbTransfered < pReq->cbBounceBuffer)))
324 {
325 RTFOFF offStart;
326 size_t cbToTransfer;
327 uint8_t *pbBuf = NULL;
328
329 Assert(cbTransfered % 512 == 0);
330
331 if (pReq->cbBounceBuffer)
332 {
333 AssertPtr(pReq->pvBounceBuffer);
334 offStart = (pReq->off & ~((RTFOFF)512-1)) + cbTransfered;
335 cbToTransfer = pReq->cbBounceBuffer - cbTransfered;
336 pbBuf = (uint8_t *)pReq->pvBounceBuffer + cbTransfered;
337 }
338 else
339 {
340 Assert(!pReq->pvBounceBuffer);
341 offStart = pReq->off + cbTransfered;
342 cbToTransfer = pReq->DataSeg.cbSeg - cbTransfered;
343 pbBuf = (uint8_t *)pReq->DataSeg.pvSeg + cbTransfered;
344 }
345
346 if ( pReq->enmType == RTAIOMGRREQTYPE_PREFETCH
347 || pReq->enmType == RTAIOMGRREQTYPE_READ)
348 {
349 rc = RTFileAioReqPrepareRead(pReq->hReqIo, pFile->hFile, offStart,
350 pbBuf, cbToTransfer, pReq);
351 }
352 else
353 {
354 AssertMsg(pReq->enmType == RTAIOMGRREQTYPE_WRITE,
355 ("Invalid transfer type\n"));
356 rc = RTFileAioReqPrepareWrite(pReq->hReqIo, pFile->hFile, offStart,
357 pbBuf, cbToTransfer, pReq);
358 }
359 AssertRC(rc);
360
361 rc = rtAioMgrReqsEnqueue(pThis, pFile, &pReq->hReqIo, 1);
362 AssertMsg(RT_SUCCESS(rc) || (rc == VERR_FILE_AIO_INSUFFICIENT_RESSOURCES),
363 ("Unexpected return code rc=%Rrc\n", rc));
364 }
365 else if (pReq->enmType == RTAIOMGRREQTYPE_PREFETCH)
366 {
367 Assert(pReq->cbBounceBuffer);
368 pReq->enmType = RTAIOMGRREQTYPE_WRITE;
369
370 memcpy(((uint8_t *)pReq->pvBounceBuffer) + pReq->offBounceBuffer,
371 pReq->DataSeg.pvSeg,
372 pReq->DataSeg.cbSeg);
373
374 /* Write it now. */
375 RTFOFF offStart = pReq->off & ~(RTFOFF)(512-1);
376 size_t cbToTransfer = RT_ALIGN_Z(pReq->DataSeg.cbSeg + (pReq->off - offStart), 512);
377
378 rc = RTFileAioReqPrepareWrite(pReq->hReqIo, pFile->hFile,
379 offStart, pReq->pvBounceBuffer, cbToTransfer, pReq);
380 AssertRC(rc);
381 rc = rtAioMgrReqsEnqueue(pThis, pFile, &pReq->hReqIo, 1);
382 AssertMsg(RT_SUCCESS(rc) || (rc == VERR_FILE_AIO_INSUFFICIENT_RESSOURCES),
383 ("Unexpected return code rc=%Rrc\n", rc));
384 }
385 else
386 {
387 if (RT_SUCCESS(rc) && pReq->cbBounceBuffer)
388 {
389 if (pReq->enmType == RTAIOMGRREQTYPE_READ)
390 memcpy(pReq->DataSeg.pvSeg,
391 ((uint8_t *)pReq->pvBounceBuffer) + pReq->offBounceBuffer,
392 pReq->DataSeg.cbSeg);
393 }
394
395 /* Call completion callback */
396 pFile->pfnReqCompleted(pFile, rcReq, pReq->pvUser);
397 rtAioMgrReqFree(pThis, pReq);
398 }
399 } /* request completed successfully */
400}
401
402/**
403 * Wrapper around rtAioMgrReqCompleteRc().
404 */
405static void rtAioMgrReqComplete(PRTAIOMGRINT pThis, RTFILEAIOREQ hReq)
406{
407 size_t cbTransfered = 0;
408 int rcReq = RTFileAioReqGetRC(hReq, &cbTransfered);
409 PRTAIOMGRREQ pReq = (PRTAIOMGRREQ)RTFileAioReqGetUser(hReq);
410
411 rtAioMgrReqCompleteRc(pThis, pReq, rcReq, cbTransfered);
412}
413
414/**
415 * Wrapper around RTFIleAioCtxSubmit() which is also doing error handling.
416 */
417static int rtAioMgrReqsEnqueue(PRTAIOMGRINT pThis, PRTAIOMGRFILEINT pFile,
418 PRTFILEAIOREQ pahReqs, unsigned cReqs)
419{
420 pThis->cReqsActive += cReqs;
421 pFile->AioMgr.cReqsActive += cReqs;
422
423 int rc = RTFileAioCtxSubmit(pThis->hAioCtx, pahReqs, cReqs);
424 if (RT_FAILURE(rc))
425 {
426 if (rc == VERR_FILE_AIO_INSUFFICIENT_RESSOURCES)
427 {
428 /* Append any not submitted task to the waiting list. */
429 for (size_t i = 0; i < cReqs; i++)
430 {
431 int rcReq = RTFileAioReqGetRC(pahReqs[i], NULL);
432
433 if (rcReq != VERR_FILE_AIO_IN_PROGRESS)
434 {
435 PRTAIOMGRREQ pReq = (PRTAIOMGRREQ)RTFileAioReqGetUser(pahReqs[i]);
436
437 Assert(pReq->hReqIo == pahReqs[i]);
438 RTListAppend(&pFile->AioMgr.ListWaitingReqs, &pReq->NodeWaitingList);
439 pThis->cReqsActive--;
440 pFile->AioMgr.cReqsActive--;
441 }
442 }
443
444 pThis->cReqsActiveMax = pThis->cReqsActive;
445 rc = VINF_SUCCESS;
446 }
447 else /* Another kind of error happened (full disk, ...) */
448 {
449 /* An error happened. Find out which one caused the error and resubmit all other tasks. */
450 for (size_t i = 0; i < cReqs; i++)
451 {
452 PRTAIOMGRREQ pReq = (PRTAIOMGRREQ)RTFileAioReqGetUser(pahReqs[i]);
453 int rcReq = RTFileAioReqGetRC(pahReqs[i], NULL);
454
455 if (rcReq == VERR_FILE_AIO_NOT_SUBMITTED)
456 {
457 /* We call ourself again to do any error handling which might come up now. */
458 rc = rtAioMgrReqsEnqueue(pThis, pFile, &pahReqs[i], 1);
459 AssertRC(rc);
460 }
461 else if (rcReq != VERR_FILE_AIO_IN_PROGRESS)
462 rtAioMgrReqCompleteRc(pThis, pReq, rcReq, 0);
463 }
464 }
465 }
466
467 return VINF_SUCCESS;
468}
469
470/**
471 * Adds a list of requests to the waiting list.
472 *
473 * @returns nothing.
474 * @param pFile The file instance to add the requests to.
475 * @param pReqsHead The head of the request list to add.
476 */
477static void rtAioMgrFileAddReqsToWaitingList(PRTAIOMGRFILEINT pFile, PRTAIOMGRREQ pReqsHead)
478{
479 while (pReqsHead)
480 {
481 PRTAIOMGRREQ pReqCur = pReqsHead;
482
483 pReqsHead = (PRTAIOMGRREQ)pReqsHead->WorkItem.pNext;
484 pReqCur->WorkItem.pNext = NULL;
485 RTListAppend(&pFile->AioMgr.ListWaitingReqs, &pReqCur->NodeWaitingList);
486 }
487}
488
489/**
490 * Prepare the native I/o request ensuring that all alignment prerequisites of
491 * the host are met.
492 *
493 * @returns IPRT statuse code.
494 * @param pFile The file instance data.
495 * @param pReq The request to prepare.
496 */
497static int rtAioMgrReqPrepareNonBuffered(PRTAIOMGRFILEINT pFile, PRTAIOMGRREQ pReq)
498{
499 int rc = VINF_SUCCESS;
500 RTFOFF offStart = pReq->off & ~(RTFOFF)(512-1);
501 size_t cbToTransfer = RT_ALIGN_Z(pReq->DataSeg.cbSeg + (pReq->off - offStart), 512);
502 void *pvBuf = pReq->DataSeg.pvSeg;
503 bool fAlignedReq = cbToTransfer == pReq->DataSeg.cbSeg
504 && offStart == pReq->off;
505
506 /*
507 * Check if the alignment requirements are met.
508 * Offset, transfer size and buffer address
509 * need to be on a 512 boundary.
510 */
511 if ( !fAlignedReq
512 /** @todo: || ((pEpClassFile->uBitmaskAlignment & (RTR3UINTPTR)pvBuf) != (RTR3UINTPTR)pvBuf) */)
513 {
514 /* Create bounce buffer. */
515 pReq->cbBounceBuffer = cbToTransfer;
516
517 AssertMsg(pReq->off >= offStart, ("Overflow in calculation off=%llu offStart=%llu\n",
518 pReq->off, offStart));
519 pReq->offBounceBuffer = pReq->off - offStart;
520
521 /** @todo: I think we need something like a RTMemAllocAligned method here.
522 * Current assumption is that the maximum alignment is 4096byte
523 * (GPT disk on Windows)
524 * so we can use RTMemPageAlloc here.
525 */
526 pReq->pvBounceBuffer = RTMemPageAlloc(cbToTransfer);
527 if (RT_LIKELY(pReq->pvBounceBuffer))
528 {
529 pvBuf = pReq->pvBounceBuffer;
530
531 if (pReq->enmType == RTAIOMGRREQTYPE_WRITE)
532 {
533 if ( RT_UNLIKELY(cbToTransfer != pReq->DataSeg.cbSeg)
534 || RT_UNLIKELY(offStart != pReq->off))
535 {
536 /* We have to fill the buffer first before we can update the data. */
537 pReq->enmType = RTAIOMGRREQTYPE_WRITE;
538 }
539 else
540 memcpy(pvBuf, pReq->DataSeg.pvSeg, pReq->DataSeg.cbSeg);
541 }
542 }
543 else
544 rc = VERR_NO_MEMORY;
545 }
546 else
547 pReq->cbBounceBuffer = 0;
548
549 if (RT_SUCCESS(rc))
550 {
551 if (pReq->enmType == RTAIOMGRREQTYPE_WRITE)
552 {
553 rc = RTFileAioReqPrepareWrite(pReq->hReqIo, pFile->hFile,
554 offStart, pvBuf, cbToTransfer, pReq);
555 }
556 else /* Read or prefetch request. */
557 rc = RTFileAioReqPrepareRead(pReq->hReqIo, pFile->hFile,
558 offStart, pvBuf, cbToTransfer, pReq);
559 AssertRC(rc);
560 pReq->fFlags |= RTAIOMGRREQ_FLAGS_PREPARED;
561 }
562
563 return rc;
564}
565
566/**
567 * Prepare a new request for enqueuing.
568 *
569 * @returns IPRT status code.
570 * @param pReq The request to prepare.
571 * @param phReqIo Where to store the handle to the native I/O request on success.
572 */
573static int rtAioMgrPrepareReq(PRTAIOMGRREQ pReq, PRTFILEAIOREQ phReqIo)
574{
575 int rc = VINF_SUCCESS;
576 PRTAIOMGRFILEINT pFile = pReq->pFile;
577
578 switch (pReq->enmType)
579 {
580 case RTAIOMGRREQTYPE_FLUSH:
581 {
582 rc = RTFileAioReqPrepareFlush(pReq->hReqIo, pFile->hFile, pReq);
583 break;
584 }
585 case RTAIOMGRREQTYPE_READ:
586 case RTAIOMGRREQTYPE_WRITE:
587 {
588 rc = rtAioMgrReqPrepareNonBuffered(pFile, pReq);
589 break;
590 }
591 default:
592 AssertMsgFailed(("Invalid transfer type %d\n", pReq->enmType));
593 } /* switch transfer type */
594
595 if (RT_SUCCESS(rc))
596 *phReqIo = pReq->hReqIo;
597
598 return rc;
599}
600
601/**
602 * Prepare newly submitted requests for processing.
603 *
604 * @returns IPRT status code
605 * @param pThis The async I/O manager instance data.
606 * @param pFile The file instance.
607 * @param pReqsNew The list of new requests to prepare.
608 */
609static int rtAioMgrPrepareNewReqs(PRTAIOMGRINT pThis,
610 PRTAIOMGRFILEINT pFile,
611 PRTAIOMGRREQ pReqsNew)
612{
613 RTFILEAIOREQ apReqs[20];
614 unsigned cRequests = 0;
615 int rc = VINF_SUCCESS;
616
617 /* Go through the list and queue the requests. */
618 while ( pReqsNew
619 && (pThis->cReqsActive + cRequests < pThis->cReqsActiveMax)
620 && RT_SUCCESS(rc))
621 {
622 PRTAIOMGRREQ pCurr = pReqsNew;
623 pReqsNew = (PRTAIOMGRREQ)pReqsNew->WorkItem.pNext;
624
625 pCurr->WorkItem.pNext = NULL;
626 AssertMsg(VALID_PTR(pCurr->pFile) && (pCurr->pFile == pFile),
627 ("Files do not match\n"));
628 AssertMsg(!(pCurr->fFlags & RTAIOMGRREQ_FLAGS_PREPARED),
629 ("Request on the new list is already prepared\n"));
630
631 rc = rtAioMgrPrepareReq(pCurr, &apReqs[cRequests]);
632 if (RT_FAILURE(rc))
633 rtAioMgrReqCompleteRc(pThis, pCurr, rc, 0);
634 else
635 cRequests++;
636
637 /* Queue the requests if the array is full. */
638 if (cRequests == RT_ELEMENTS(apReqs))
639 {
640 rc = rtAioMgrReqsEnqueue(pThis, pFile, apReqs, cRequests);
641 cRequests = 0;
642 AssertMsg(RT_SUCCESS(rc) || (rc == VERR_FILE_AIO_INSUFFICIENT_RESSOURCES),
643 ("Unexpected return code\n"));
644 }
645 }
646
647 if (cRequests)
648 {
649 rc = rtAioMgrReqsEnqueue(pThis, pFile, apReqs, cRequests);
650 AssertMsg(RT_SUCCESS(rc) || (rc == VERR_FILE_AIO_INSUFFICIENT_RESSOURCES),
651 ("Unexpected return code rc=%Rrc\n", rc));
652 }
653
654 if (pReqsNew)
655 {
656 /* Add the rest of the tasks to the pending list */
657 rtAioMgrFileAddReqsToWaitingList(pFile, pReqsNew);
658 }
659
660 /* Insufficient resources are not fatal. */
661 if (rc == VERR_FILE_AIO_INSUFFICIENT_RESSOURCES)
662 rc = VINF_SUCCESS;
663
664 return rc;
665}
666
667/**
668 * Queues waiting requests.
669 *
670 * @returns IPRT status code.
671 * @param pThis The async I/O manager instance data.
672 * @param pFile The file to get the requests from.
673 */
674static int rtAioMgrQueueWaitingReqs(PRTAIOMGRINT pThis, PRTAIOMGRFILEINT pFile)
675{
676 RTFILEAIOREQ apReqs[20];
677 unsigned cRequests = 0;
678 int rc = VINF_SUCCESS;
679 PRTAIOMGRREQ pReqIt;
680 PRTAIOMGRREQ pReqItNext;
681
682 /* Go through the list and queue the requests. */
683 RTListForEachSafe(&pFile->AioMgr.ListWaitingReqs, pReqIt, pReqItNext, RTAIOMGRREQ, NodeWaitingList)
684 {
685 RTListNodeRemove(&pReqIt->NodeWaitingList);
686 AssertMsg(VALID_PTR(pReqIt->pFile) && (pReqIt->pFile == pFile),
687 ("Files do not match\n"));
688
689 if (!(pReqIt->fFlags & RTAIOMGRREQ_FLAGS_PREPARED))
690 {
691 rc = rtAioMgrPrepareReq(pReqIt, &apReqs[cRequests]);
692 if (RT_FAILURE(rc))
693 rtAioMgrReqCompleteRc(pThis, pReqIt, rc, 0);
694 else
695 cRequests++;
696 }
697 else
698 {
699 apReqs[cRequests] = pReqIt->hReqIo;
700 cRequests++;
701 }
702
703 /* Queue the requests if the array is full. */
704 if (cRequests == RT_ELEMENTS(apReqs))
705 {
706 rc = rtAioMgrReqsEnqueue(pThis, pFile, apReqs, cRequests);
707 cRequests = 0;
708 AssertMsg(RT_SUCCESS(rc) || (rc == VERR_FILE_AIO_INSUFFICIENT_RESSOURCES),
709 ("Unexpected return code\n"));
710 }
711 }
712
713 if (cRequests)
714 {
715 rc = rtAioMgrReqsEnqueue(pThis, pFile, apReqs, cRequests);
716 AssertMsg(RT_SUCCESS(rc) || (rc == VERR_FILE_AIO_INSUFFICIENT_RESSOURCES),
717 ("Unexpected return code rc=%Rrc\n", rc));
718 }
719
720 /* Insufficient resources are not fatal. */
721 if (rc == VERR_FILE_AIO_INSUFFICIENT_RESSOURCES)
722 rc = VINF_SUCCESS;
723
724 return rc;
725}
726
727/**
728 * Adds all pending requests for the given file.
729 *
730 * @returns IPRT status code.
731 * @param pThis The async I/O manager instance data.
732 * @param pFile The file to get the requests from.
733 */
734static int rtAioMgrQueueReqs(PRTAIOMGRINT pThis, PRTAIOMGRFILEINT pFile)
735{
736 int rc = VINF_SUCCESS;
737 PRTAIOMGRFILEINT pReqsHead = NULL;
738
739 /* Check the pending list first */
740 if (!RTListIsEmpty(&pFile->AioMgr.ListWaitingReqs))
741 rc = rtAioMgrQueueWaitingReqs(pThis, pFile);
742
743 if ( RT_SUCCESS(rc)
744 && RTListIsEmpty(&pFile->AioMgr.ListWaitingReqs))
745 {
746 PRTAIOMGRREQ pReqsNew = (PRTAIOMGRREQ)RTQueueAtomicRemoveAll(&pFile->QueueReqs);
747
748 if (pReqsNew)
749 {
750 rc = rtAioMgrPrepareNewReqs(pThis, pFile, pReqsNew);
751 AssertRC(rc);
752 }
753 }
754
755 return rc;
756}
757
758/**
759 * Checks all files for new requests.
760 *
761 * @returns IPRT status code.
762 * @param pThis The I/O manager instance data.
763 */
764static int rtAioMgrCheckFiles(PRTAIOMGRINT pThis)
765{
766 int rc = VINF_SUCCESS;
767 PRTAIOMGRFILEINT pIt;
768
769 RTListForEach(&pThis->ListFiles, pIt, RTAIOMGRFILEINT, AioMgr.NodeAioMgrFiles)
770 {
771 rc = rtAioMgrQueueReqs(pThis, pIt);
772 if (RT_FAILURE(rc))
773 return rc;
774 }
775
776 return rc;
777}
778
779/**
780 * Process a blocking event from the outside.
781 *
782 * @returns IPRT status code.
783 * @param pThis The async I/O manager instance data.
784 */
785static int rtAioMgrProcessBlockingEvent(PRTAIOMGRINT pThis)
786{
787 int rc = VINF_SUCCESS;
788 bool fNotifyWaiter = false;
789
790 switch (pThis->enmBlockingEvent)
791 {
792 case RTAIOMGREVENT_NO_EVENT:
793 /* Nothing to do. */
794 break;
795 case RTAIOMGREVENT_FILE_ADD:
796 {
797 PRTAIOMGRFILEINT pFile = ASMAtomicReadPtrT(&pThis->BlockingEventData.pFileAdd, PRTAIOMGRFILEINT);
798 AssertMsg(VALID_PTR(pFile), ("Adding file event without a file to add\n"));
799
800 RTListAppend(&pThis->ListFiles, &pFile->AioMgr.NodeAioMgrFiles);
801 fNotifyWaiter = true;
802 break;
803 }
804 case RTAIOMGREVENT_FILE_CLOSE:
805 {
806 PRTAIOMGRFILEINT pFile = ASMAtomicReadPtrT(&pThis->BlockingEventData.pFileClose, PRTAIOMGRFILEINT);
807 AssertMsg(VALID_PTR(pFile), ("Close file event without a file to close\n"));
808
809 if (!(pFile->fFlags & RTAIOMGRFILE_FLAGS_CLOSING))
810 {
811 /* Make sure all requests finished. Process the queues a last time first. */
812 rc = rtAioMgrQueueReqs(pThis, pFile);
813 AssertRC(rc);
814
815 pFile->fFlags |= RTAIOMGRFILE_FLAGS_CLOSING;
816 fNotifyWaiter = !rtAioMgrFileRemove(pFile);
817 }
818 else if (!pFile->AioMgr.cReqsActive)
819 fNotifyWaiter = true;
820 break;
821 }
822 case RTAIOMGREVENT_SHUTDOWN:
823 {
824 if (!pThis->cReqsActive)
825 fNotifyWaiter = true;
826 break;
827 }
828 default:
829 AssertReleaseMsgFailed(("Invalid event type %d\n", pThis->enmBlockingEvent));
830 }
831
832 if (fNotifyWaiter)
833 {
834 /* Release the waiting thread. */
835 rc = RTSemEventSignal(pThis->hEventSemBlock);
836 AssertRC(rc);
837 }
838
839 return rc;
840}
841
842/**
843 * async I/O manager worker loop.
844 *
845 * @returns IPRT status code.
846 * @param hThreadSelf The thread handle this worker belongs to.
847 * @param pvUser Opaque user data (Pointer to async I/O manager instance).
848 */
849static DECLCALLBACK(int) rtAioMgrWorker(RTTHREAD hThreadSelf, void *pvUser)
850{
851 PRTAIOMGRINT pThis = (PRTAIOMGRINT)pvUser;
852 /*bool fRunning = true;*/
853 int rc = VINF_SUCCESS;
854
855 do
856 {
857 uint32_t cReqsCompleted = 0;
858 RTFILEAIOREQ ahReqsCompleted[32];
859 rc = RTFileAioCtxWait(pThis->hAioCtx, 1, RT_INDEFINITE_WAIT, &ahReqsCompleted[0],
860 RT_ELEMENTS(ahReqsCompleted), &cReqsCompleted);
861 if (rc == VERR_INTERRUPTED)
862 {
863 /* Process external event. */
864 rtAioMgrProcessBlockingEvent(pThis);
865 rc = rtAioMgrCheckFiles(pThis);
866 }
867 else if (RT_FAILURE(rc))
868 {
869 /* Something bad happened. */
870 /** @todo: */
871 }
872 else
873 {
874 /* Requests completed. */
875 for (uint32_t i = 0; i < cReqsCompleted; i++)
876 rtAioMgrReqComplete(pThis, ahReqsCompleted[i]);
877
878 /* Check files for new requests and queue waiting requests. */
879 rc = rtAioMgrCheckFiles(pThis);
880 }
881 } while ( /*fRunning - never modified
882 && */ RT_SUCCESS(rc));
883
884 RT_NOREF_PV(hThreadSelf);
885 return rc;
886}
887
888/**
889 * Wakes up the async I/O manager.
890 *
891 * @returns IPRT status code.
892 * @param pThis The async I/O manager.
893 */
894static int rtAioMgrWakeup(PRTAIOMGRINT pThis)
895{
896 return RTFileAioCtxWakeup(pThis->hAioCtx);
897}
898
899/**
900 * Waits until the async I/O manager handled the given event.
901 *
902 * @returns IPRT status code.
903 * @param pThis The async I/O manager.
904 * @param enmEvent The event to pass to the manager.
905 */
906static int rtAioMgrWaitForBlockingEvent(PRTAIOMGRINT pThis, RTAIOMGREVENT enmEvent)
907{
908 Assert(pThis->enmBlockingEvent == RTAIOMGREVENT_NO_EVENT);
909 ASMAtomicWriteU32((volatile uint32_t *)&pThis->enmBlockingEvent, enmEvent);
910
911 /* Wakeup the async I/O manager */
912 int rc = rtAioMgrWakeup(pThis);
913 if (RT_FAILURE(rc))
914 return rc;
915
916 /* Wait for completion. */
917 rc = RTSemEventWait(pThis->hEventSemBlock, RT_INDEFINITE_WAIT);
918 AssertRC(rc);
919
920 ASMAtomicWriteU32((volatile uint32_t *)&pThis->enmBlockingEvent, RTAIOMGREVENT_NO_EVENT);
921
922 return rc;
923}
924
925/**
926 * Add a given file to the given I/O manager.
927 *
928 * @returns IPRT status code.
929 * @param pThis The async I/O manager.
930 * @param pFile The file to add.
931 */
932static int rtAioMgrAddFile(PRTAIOMGRINT pThis, PRTAIOMGRFILEINT pFile)
933{
934 /* Update the assigned I/O manager. */
935 ASMAtomicWritePtr(&pFile->pAioMgr, pThis);
936
937 int rc = RTCritSectEnter(&pThis->CritSectBlockingEvent);
938 AssertRCReturn(rc, rc);
939
940 ASMAtomicWritePtr(&pThis->BlockingEventData.pFileAdd, pFile);
941 rc = rtAioMgrWaitForBlockingEvent(pThis, RTAIOMGREVENT_FILE_ADD);
942 ASMAtomicWriteNullPtr(&pThis->BlockingEventData.pFileAdd);
943
944 RTCritSectLeave(&pThis->CritSectBlockingEvent);
945 return rc;
946}
947
948/**
949 * Removes a given file from the given I/O manager.
950 *
951 * @returns IPRT status code.
952 * @param pThis The async I/O manager.
953 * @param pFile The file to remove.
954 */
955static int rtAioMgrCloseFile(PRTAIOMGRINT pThis, PRTAIOMGRFILEINT pFile)
956{
957 int rc = RTCritSectEnter(&pThis->CritSectBlockingEvent);
958 AssertRCReturn(rc, rc);
959
960 ASMAtomicWritePtr(&pThis->BlockingEventData.pFileClose, pFile);
961 rc = rtAioMgrWaitForBlockingEvent(pThis, RTAIOMGREVENT_FILE_CLOSE);
962 ASMAtomicWriteNullPtr(&pThis->BlockingEventData.pFileClose);
963
964 RTCritSectLeave(&pThis->CritSectBlockingEvent);
965
966 return rc;
967}
968
969/**
970 * Process a shutdown event.
971 *
972 * @returns IPRT status code.
973 * @param pThis The async I/O manager to shut down.
974 */
975static int rtAioMgrShutdown(PRTAIOMGRINT pThis)
976{
977 int rc = RTCritSectEnter(&pThis->CritSectBlockingEvent);
978 AssertRCReturn(rc, rc);
979
980 rc = rtAioMgrWaitForBlockingEvent(pThis, RTAIOMGREVENT_SHUTDOWN);
981 RTCritSectLeave(&pThis->CritSectBlockingEvent);
982
983 return rc;
984}
985
986/**
987 * Destroys an async I/O manager.
988 *
989 * @returns nothing.
990 * @param pThis The async I/O manager instance to destroy.
991 */
992static void rtAioMgrDestroy(PRTAIOMGRINT pThis)
993{
994 int rc;
995
996 rc = rtAioMgrShutdown(pThis);
997 AssertRC(rc);
998
999 rc = RTThreadWait(pThis->hThread, RT_INDEFINITE_WAIT, NULL);
1000 AssertRC(rc);
1001
1002 rc = RTFileAioCtxDestroy(pThis->hAioCtx);
1003 AssertRC(rc);
1004
1005 rc = RTMemCacheDestroy(pThis->hMemCacheReqs);
1006 AssertRC(rc);
1007
1008 pThis->hThread = NIL_RTTHREAD;
1009 pThis->hAioCtx = NIL_RTFILEAIOCTX;
1010 pThis->hMemCacheReqs = NIL_RTMEMCACHE;
1011 pThis->u32Magic = ~RTAIOMGR_MAGIC;
1012 RTCritSectDelete(&pThis->CritSectBlockingEvent);
1013 RTSemEventDestroy(pThis->hEventSemBlock);
1014 RTMemFree(pThis);
1015}
1016
1017/**
1018 * Queues a new request for processing.
1019 */
1020static void rtAioMgrFileQueueReq(PRTAIOMGRFILEINT pThis, PRTAIOMGRREQ pReq)
1021{
1022 RTAioMgrFileRetain(pThis);
1023 RTQueueAtomicInsert(&pThis->QueueReqs, &pReq->WorkItem);
1024 rtAioMgrWakeup(pThis->pAioMgr);
1025}
1026
1027/**
1028 * Destroys an async I/O manager file.
1029 *
1030 * @returns nothing.
1031 * @param pThis The async I/O manager file.
1032 */
1033static void rtAioMgrFileDestroy(PRTAIOMGRFILEINT pThis)
1034{
1035 pThis->u32Magic = ~RTAIOMGRFILE_MAGIC;
1036 rtAioMgrCloseFile(pThis->pAioMgr, pThis);
1037 RTAioMgrRelease(pThis->pAioMgr);
1038 RTMemFree(pThis);
1039}
1040
1041/**
1042 * Queues a new I/O request.
1043 *
1044 * @returns IPRT status code.
1045 * @param hAioMgrFile The I/O manager file handle.
1046 * @param off Start offset of the I/o request.
1047 * @param pSgBuf Data S/G buffer.
1048 * @param cbIo How much to transfer.
1049 * @param pvUser Opaque user data.
1050 * @param enmType I/O direction type (read/write).
1051 */
1052static int rtAioMgrFileIoReqCreate(RTAIOMGRFILE hAioMgrFile, RTFOFF off, PRTSGBUF pSgBuf,
1053 size_t cbIo, void *pvUser, RTAIOMGRREQTYPE enmType)
1054{
1055 int rc;
1056 PRTAIOMGRFILEINT pFile = hAioMgrFile;
1057 PRTAIOMGRINT pAioMgr;
1058
1059 AssertPtrReturn(pFile, VERR_INVALID_HANDLE);
1060 pAioMgr = pFile->pAioMgr;
1061
1062 PRTAIOMGRREQ pReq = rtAioMgrReqAlloc(pAioMgr);
1063 if (RT_LIKELY(pReq))
1064 {
1065 unsigned cSeg = 1;
1066 size_t cbSeg = RTSgBufSegArrayCreate(pSgBuf, &pReq->DataSeg, &cSeg, cbIo);
1067
1068 if (cbSeg == cbIo)
1069 {
1070 pReq->enmType = enmType;
1071 pReq->pFile = pFile;
1072 pReq->pvUser = pvUser;
1073 pReq->off = off;
1074 rtAioMgrFileQueueReq(pFile, pReq);
1075 rc = VERR_FILE_AIO_IN_PROGRESS;
1076 }
1077 else
1078 {
1079 /** @todo: Real S/G buffer support. */
1080 rtAioMgrReqFree(pAioMgr, pReq);
1081 rc = VERR_NOT_SUPPORTED;
1082 }
1083 }
1084 else
1085 rc = VERR_NO_MEMORY;
1086
1087 return rc;
1088}
1089
1090/**
1091 * Request constructor for the memory cache.
1092 *
1093 * @returns IPRT status code.
1094 * @param hMemCache The cache handle.
1095 * @param pvObj The memory object that should be initialized.
1096 * @param pvUser The user argument.
1097 */
1098static DECLCALLBACK(int) rtAioMgrReqCtor(RTMEMCACHE hMemCache, void *pvObj, void *pvUser)
1099{
1100 PRTAIOMGRREQ pReq = (PRTAIOMGRREQ)pvObj;
1101 RT_NOREF_PV(hMemCache); RT_NOREF_PV(pvUser);
1102
1103 memset(pReq, 0, sizeof(RTAIOMGRREQ));
1104 return RTFileAioReqCreate(&pReq->hReqIo);
1105}
1106
1107/**
1108 * Request destructor for the memory cache.
1109 *
1110 * @param hMemCache The cache handle.
1111 * @param pvObj The memory object that should be destroyed.
1112 * @param pvUser The user argument.
1113 */
1114static DECLCALLBACK(void) rtAioMgrReqDtor(RTMEMCACHE hMemCache, void *pvObj, void *pvUser)
1115{
1116 PRTAIOMGRREQ pReq = (PRTAIOMGRREQ)pvObj;
1117 int rc = RTFileAioReqDestroy(pReq->hReqIo);
1118 AssertRC(rc);
1119
1120 RT_NOREF_PV(hMemCache); RT_NOREF_PV(pvUser);
1121}
1122
1123RTDECL(int) RTAioMgrCreate(PRTAIOMGR phAioMgr, uint32_t cReqsMax)
1124{
1125 int rc = VINF_SUCCESS;
1126 PRTAIOMGRINT pThis;
1127
1128 AssertPtrReturn(phAioMgr, VERR_INVALID_POINTER);
1129 AssertReturn(cReqsMax > 0, VERR_INVALID_PARAMETER);
1130
1131 pThis = (PRTAIOMGRINT)RTMemAllocZ(sizeof(RTAIOMGRINT));
1132 if (pThis)
1133 {
1134 pThis->u32Magic = RTAIOMGR_MAGIC;
1135 pThis->cRefs = 1;
1136 pThis->enmBlockingEvent = RTAIOMGREVENT_NO_EVENT;
1137 RTListInit(&pThis->ListFiles);
1138 rc = RTCritSectInit(&pThis->CritSectBlockingEvent);
1139 if (RT_SUCCESS(rc))
1140 {
1141 rc = RTSemEventCreate(&pThis->hEventSemBlock);
1142 if (RT_SUCCESS(rc))
1143 {
1144 rc = RTMemCacheCreate(&pThis->hMemCacheReqs, sizeof(RTAIOMGRREQ),
1145 0, UINT32_MAX, rtAioMgrReqCtor, rtAioMgrReqDtor, NULL, 0);
1146 if (RT_SUCCESS(rc))
1147 {
1148 rc = RTFileAioCtxCreate(&pThis->hAioCtx, cReqsMax == UINT32_MAX
1149 ? RTFILEAIO_UNLIMITED_REQS
1150 : cReqsMax,
1151 RTFILEAIOCTX_FLAGS_WAIT_WITHOUT_PENDING_REQUESTS);
1152 if (RT_SUCCESS(rc))
1153 {
1154 rc = RTThreadCreateF(&pThis->hThread, rtAioMgrWorker, pThis, 0, RTTHREADTYPE_IO,
1155 RTTHREADFLAGS_WAITABLE, "AioMgr-%u", cReqsMax);
1156 if (RT_FAILURE(rc))
1157 {
1158 rc = RTFileAioCtxDestroy(pThis->hAioCtx);
1159 AssertRC(rc);
1160 }
1161 }
1162
1163 if (RT_FAILURE(rc))
1164 RTMemCacheDestroy(pThis->hMemCacheReqs);
1165 }
1166
1167 if (RT_FAILURE(rc))
1168 RTSemEventDestroy(pThis->hEventSemBlock);
1169 }
1170
1171 if (RT_FAILURE(rc))
1172 RTCritSectDelete(&pThis->CritSectBlockingEvent);
1173 }
1174
1175 if (RT_FAILURE(rc))
1176 RTMemFree(pThis);
1177 }
1178 else
1179 rc = VERR_NO_MEMORY;
1180
1181 if (RT_SUCCESS(rc))
1182 *phAioMgr = pThis;
1183
1184 return rc;
1185}
1186
1187RTDECL(uint32_t) RTAioMgrRetain(RTAIOMGR hAioMgr)
1188{
1189 PRTAIOMGRINT pThis = hAioMgr;
1190 AssertReturn(hAioMgr != NIL_RTAIOMGR, UINT32_MAX);
1191 AssertPtrReturn(pThis, UINT32_MAX);
1192 AssertReturn(pThis->u32Magic == RTAIOMGR_MAGIC, UINT32_MAX);
1193
1194 uint32_t cRefs = ASMAtomicIncU32(&pThis->cRefs);
1195 AssertMsg(cRefs > 1 && cRefs < _1G, ("%#x %p\n", cRefs, pThis));
1196 return cRefs;
1197}
1198
1199RTDECL(uint32_t) RTAioMgrRelease(RTAIOMGR hAioMgr)
1200{
1201 PRTAIOMGRINT pThis = hAioMgr;
1202 if (pThis == NIL_RTAIOMGR)
1203 return 0;
1204 AssertPtrReturn(pThis, UINT32_MAX);
1205 AssertReturn(pThis->u32Magic == RTAIOMGR_MAGIC, UINT32_MAX);
1206
1207 uint32_t cRefs = ASMAtomicDecU32(&pThis->cRefs);
1208 AssertMsg(cRefs < _1G, ("%#x %p\n", cRefs, pThis));
1209 if (cRefs == 0)
1210 rtAioMgrDestroy(pThis);
1211 return cRefs;
1212}
1213
1214RTDECL(int) RTAioMgrFileCreate(RTAIOMGR hAioMgr, RTFILE hFile, PFNRTAIOMGRREQCOMPLETE pfnReqComplete,
1215 void *pvUser, PRTAIOMGRFILE phAioMgrFile)
1216{
1217 int rc = VINF_SUCCESS;
1218 PRTAIOMGRFILEINT pThis;
1219
1220 AssertReturn(hAioMgr != NIL_RTAIOMGR, VERR_INVALID_HANDLE);
1221 AssertReturn(hFile != NIL_RTFILE, VERR_INVALID_HANDLE);
1222 AssertPtrReturn(pfnReqComplete, VERR_INVALID_POINTER);
1223 AssertPtrReturn(phAioMgrFile, VERR_INVALID_POINTER);
1224
1225 pThis = (PRTAIOMGRFILEINT)RTMemAllocZ(sizeof(RTAIOMGRFILEINT));
1226 if (pThis)
1227 {
1228 pThis->u32Magic = RTAIOMGRFILE_MAGIC;
1229 pThis->cRefs = 1;
1230 pThis->hFile = hFile;
1231 pThis->pAioMgr = hAioMgr;
1232 pThis->pvUser = pvUser;
1233 pThis->pfnReqCompleted = pfnReqComplete;
1234 RTQueueAtomicInit(&pThis->QueueReqs);
1235 RTListInit(&pThis->AioMgr.ListWaitingReqs);
1236 RTAioMgrRetain(hAioMgr);
1237 rc = RTFileAioCtxAssociateWithFile(pThis->pAioMgr->hAioCtx, hFile);
1238 if (RT_FAILURE(rc))
1239 rtAioMgrFileDestroy(pThis);
1240 else
1241 rtAioMgrAddFile(pThis->pAioMgr, pThis);
1242 }
1243 else
1244 rc = VERR_NO_MEMORY;
1245
1246 if (RT_SUCCESS(rc))
1247 *phAioMgrFile = pThis;
1248
1249 return rc;
1250}
1251
1252RTDECL(uint32_t) RTAioMgrFileRetain(RTAIOMGRFILE hAioMgrFile)
1253{
1254 PRTAIOMGRFILEINT pThis = hAioMgrFile;
1255 AssertReturn(hAioMgrFile != NIL_RTAIOMGRFILE, UINT32_MAX);
1256 AssertPtrReturn(pThis, UINT32_MAX);
1257 AssertReturn(pThis->u32Magic == RTAIOMGRFILE_MAGIC, UINT32_MAX);
1258
1259 uint32_t cRefs = ASMAtomicIncU32(&pThis->cRefs);
1260 AssertMsg(cRefs > 1 && cRefs < _1G, ("%#x %p\n", cRefs, pThis));
1261 return cRefs;
1262}
1263
1264RTDECL(uint32_t) RTAioMgrFileRelease(RTAIOMGRFILE hAioMgrFile)
1265{
1266 PRTAIOMGRFILEINT pThis = hAioMgrFile;
1267 if (pThis == NIL_RTAIOMGRFILE)
1268 return 0;
1269 AssertPtrReturn(pThis, UINT32_MAX);
1270 AssertReturn(pThis->u32Magic == RTAIOMGRFILE_MAGIC, UINT32_MAX);
1271
1272 uint32_t cRefs = ASMAtomicDecU32(&pThis->cRefs);
1273 AssertMsg(cRefs < _1G, ("%#x %p\n", cRefs, pThis));
1274 if (cRefs == 0)
1275 rtAioMgrFileDestroy(pThis);
1276 return cRefs;
1277}
1278
1279RTDECL(void *) RTAioMgrFileGetUser(RTAIOMGRFILE hAioMgrFile)
1280{
1281 PRTAIOMGRFILEINT pThis = hAioMgrFile;
1282
1283 AssertPtrReturn(pThis, NULL);
1284 return pThis->pvUser;
1285}
1286
1287RTDECL(int) RTAioMgrFileRead(RTAIOMGRFILE hAioMgrFile, RTFOFF off,
1288 PRTSGBUF pSgBuf, size_t cbRead, void *pvUser)
1289{
1290 return rtAioMgrFileIoReqCreate(hAioMgrFile, off, pSgBuf, cbRead, pvUser,
1291 RTAIOMGRREQTYPE_READ);
1292}
1293
1294RTDECL(int) RTAioMgrFileWrite(RTAIOMGRFILE hAioMgrFile, RTFOFF off,
1295 PRTSGBUF pSgBuf, size_t cbWrite, void *pvUser)
1296{
1297 return rtAioMgrFileIoReqCreate(hAioMgrFile, off, pSgBuf, cbWrite, pvUser,
1298 RTAIOMGRREQTYPE_WRITE);
1299}
1300
1301RTDECL(int) RTAioMgrFileFlush(RTAIOMGRFILE hAioMgrFile, void *pvUser)
1302{
1303 PRTAIOMGRFILEINT pFile = hAioMgrFile;
1304 PRTAIOMGRINT pAioMgr;
1305
1306 AssertPtrReturn(pFile, VERR_INVALID_HANDLE);
1307
1308 pAioMgr = pFile->pAioMgr;
1309
1310 PRTAIOMGRREQ pReq = rtAioMgrReqAlloc(pAioMgr);
1311 if (RT_UNLIKELY(!pReq))
1312 return VERR_NO_MEMORY;
1313
1314 pReq->pFile = pFile;
1315 pReq->enmType = RTAIOMGRREQTYPE_FLUSH;
1316 pReq->pvUser = pvUser;
1317 rtAioMgrFileQueueReq(pFile, pReq);
1318
1319 return VERR_FILE_AIO_IN_PROGRESS;
1320}
1321
注意: 瀏覽 TracBrowser 來幫助您使用儲存庫瀏覽器

© 2025 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette