diff --git a/include/ublksrv.h b/include/ublksrv.h index ff233b6d..1d6cf738 100644 --- a/include/ublksrv.h +++ b/include/ublksrv.h @@ -99,6 +99,7 @@ struct ublk_io_data { #define UBLKSRV_ZERO_COPY (1U << 4) #define UBLKSRV_AUTO_ZC (1U << 5) #define UBLKSRV_QUEUE_POLL (1U << 6) +#define UBLKSRV_QUEUE_BATCH_IO (1U << 7) /** * ublksrv_queue is 1:1 mapping with ublk driver's blk-mq queue, and diff --git a/include/ublksrv_priv.h b/include/ublksrv_priv.h index ef462e05..ec00c0c2 100644 --- a/include/ublksrv_priv.h +++ b/include/ublksrv_priv.h @@ -24,6 +24,54 @@ #include "ublksrv.h" #include "ublksrv_aio.h" +/* + * Batch IO support structures + * + * When UBLK_F_BATCH_IO is set, the queue uses batch-based operations instead + * of per-tag FETCH/COMMIT_AND_FETCH: + * - UBLK_U_IO_PREP_IO_CMDS: One-time prep with buffer info + * - UBLK_U_IO_FETCH_IO_CMDS: Multishot fetch returning 2-byte tags + * - UBLK_U_IO_COMMIT_IO_CMDS: Batch commit of completed IOs + */ + +/* Batch element - aligned with kernel's ublk_elem_header (8 or 16 bytes) */ +struct ublk_batch_elem { + __u16 tag; + __u16 buf_index; + __s32 result; + __u64 buf_addr; /* Only used if !UBLK_F_USER_COPY && !UBLK_F_SUPPORT_ZERO_COPY */ +}; + +/* Per-queue commit buffer state (2 per queue for double-buffering) */ +struct batch_commit_buf { + void *buf; /* Points to commit buffer memory */ + unsigned short done; /* Number of IOs added to this batch */ + unsigned short count; /* Max capacity of this buffer */ +}; + +/* Per-queue fetch buffer with io_uring buffer ring (2 per queue) */ +struct batch_fetch_buf { + struct io_uring_buf_ring *br; + void *fetch_buf; + unsigned int fetch_buf_size; + unsigned int fetch_buf_off; +}; + +/* Number of fetch/commit buffers for double-buffering */ +#define UBLK_BATCH_NR_FETCH_BUFS 1 +#define UBLK_BATCH_NR_COMMIT_BUFS 2 + +/* Per-queue batch IO state - allocated only when UBLK_F_BATCH_IO is set */ +struct ublksrv_queue_batch { + struct batch_fetch_buf fetch_bufs[UBLK_BATCH_NR_FETCH_BUFS]; + struct batch_commit_buf commit_bufs[UBLK_BATCH_NR_COMMIT_BUFS]; + void *commit_buf_mem; /* Allocated commit buffer memory */ + unsigned int commit_buf_size; /* Size of each commit buffer */ + unsigned char commit_buf_elem_size; /* Size of each element (8 or 16) */ + unsigned char cur_commit_buf; /* Index of current active commit buffer (0 or 1) */ + unsigned char prep_done; /* PREP_IO_CMDS has been issued */ + __u16 cmd_flags; /* Flags for batch commands */ +}; /* todo: relace the hardcode name with /dev/char/maj:min */ #ifdef UBLKC_PREFIX @@ -154,7 +202,10 @@ struct _ublksrv_queue { int nr_ctxs; struct ublksrv_aio_ctx *ctxs[UBLKSRV_NR_CTX_BATCH]; - unsigned long reserved[8]; + /* Batch IO support - only used when UBLK_F_BATCH_IO is set */ + struct ublksrv_queue_batch batch; + + unsigned long reserved[4]; struct ublk_io ios[0]; }; @@ -233,6 +284,48 @@ int create_pid_file(const char *pid_file, int *pid_fd); extern void ublksrv_build_cpu_str(char *buf, int len, const cpu_set_t *cpuset); +/* Check if queue needs to pass buffer addresses (not zero-copy or user-copy) */ +static inline bool ublksrv_queue_use_buf(const struct _ublksrv_queue *q) +{ + return !(q->state & (UBLKSRV_USER_COPY | UBLKSRV_ZERO_COPY)); +} + +/* Batch IO support - check if queue uses batch mode */ +static inline bool ublksrv_queue_batch_io(const struct _ublksrv_queue *q) +{ + return !!(q->state & UBLKSRV_QUEUE_BATCH_IO); +} + +/* Batch IO functions (implemented in ublksrv_batch.c) */ +int ublksrv_batch_alloc_bufs(struct _ublksrv_queue *q); +void ublksrv_batch_free_bufs(struct _ublksrv_queue *q); +void ublksrv_batch_start_fetch(struct _ublksrv_queue *q); +void ublksrv_batch_submit_commit(struct _ublksrv_queue *q); +bool ublksrv_batch_handle_cqe(struct _ublksrv_queue *q, + struct io_uring_cqe *cqe, unsigned cmd_op); + +/* Add completed IO to current commit buffer (inline for fast path) */ +static inline void ublksrv_batch_add_complete(struct _ublksrv_queue *q, + unsigned tag, int result) +{ + struct ublksrv_queue_batch *b = &q->batch; + struct batch_commit_buf *cb = &b->commit_bufs[b->cur_commit_buf]; + struct ublk_batch_elem *elem; + + elem = (struct ublk_batch_elem *) + ((char *)cb->buf + cb->done * b->commit_buf_elem_size); + + elem->tag = tag; + elem->result = result; + + if (q->state & UBLKSRV_AUTO_ZC) + elem->buf_index = tag; + else if (ublksrv_queue_use_buf(q)) + elem->buf_addr = (__u64)q->ios[tag].buf_addr; + + cb->done++; +} + /* * bit63: target io, bit62: internal data. * diff --git a/lib/Makefile.am b/lib/Makefile.am index c52161cd..a4c3b769 100644 --- a/lib/Makefile.am +++ b/lib/Makefile.am @@ -6,6 +6,7 @@ libublksrv_la_SOURCES = \ ublksrv_cmd.c \ ublksrv_json.cpp \ ublksrv.c \ + ublksrv_batch.c \ utils.c \ ublksrv_aio.c libublksrv_la_CFLAGS = \ diff --git a/lib/ublksrv.c b/lib/ublksrv.c index 52d00d21..7c414468 100644 --- a/lib/ublksrv.c +++ b/lib/ublksrv.c @@ -131,11 +131,6 @@ static void ublksrv_tgt_deinit(struct _ublksrv_dev *dev) tgt->ops->deinit_tgt(local_to_tdev(dev)); } -static inline bool ublksrv_queue_use_buf(const struct _ublksrv_queue *q) -{ - return !(q->state & (UBLKSRV_USER_COPY | UBLKSRV_ZERO_COPY)); -} - static inline bool ublksrv_queue_alloc_buf(const struct _ublksrv_queue *q) { return !(q->state & UBLKSRV_ZERO_COPY); @@ -225,9 +220,15 @@ static inline int ublksrv_queue_io_cmd(struct _ublksrv_queue *q, int ublksrv_complete_io(const struct ublksrv_queue *tq, unsigned tag, int res) { struct _ublksrv_queue *q = tq_to_local(tq); - struct ublk_io *io = &q->ios[tag]; + /* In batch mode, add to commit buffer instead of issuing individual cmd */ + if (ublksrv_queue_batch_io(q)) { + ublksrv_batch_add_complete(q, tag, res); + io->flags = UBLKSRV_IO_FREE; + return 1; + } + ublksrv_mark_io_done(io, res); return ublksrv_queue_io_cmd(q, io, tag); @@ -327,12 +328,18 @@ int ublksrv_queue_send_event(const struct ublksrv_queue *tq) * Issue all available commands to /dev/ublkcN and the exact cmd is figured * out in queue_io_cmd with help of each io->status. * - * todo: queue io commands with batching + * For batch mode, issue PREP_IO_CMDS once, then two multishot FETCH_IO_CMDS. */ static void ublksrv_submit_fetch_commands(struct _ublksrv_queue *q) { int i = 0; + if (ublksrv_queue_batch_io(q)) { + ublksrv_batch_start_fetch(q); + __ublksrv_queue_event(q); + return; + } + for (i = 0; i < q->q_depth; i++) ublksrv_queue_io_cmd(q, &q->ios[i], i); @@ -417,6 +424,10 @@ void ublksrv_queue_deinit(const struct ublksrv_queue *tq) if (q->efd >= 0) close(q->efd); + /* Free batch buffers before unregistering ring */ + if (ublksrv_queue_batch_io(q)) + ublksrv_batch_free_bufs(q); + io_uring_unregister_buffers(&q->ring); io_uring_unregister_ring_fd(&q->ring); @@ -737,6 +748,8 @@ const struct ublksrv_queue *ublksrv_queue_init_flags(const struct ublksrv_dev *t /* polling logic should be implemented in ->handle_io_background() */ if (ctrl_dev->dev_info.ublksrv_flags & UBLKSRV_F_NEED_POLL) q->state |= UBLKSRV_QUEUE_POLL; + if (ctrl_dev->dev_info.flags & UBLK_F_BATCH_IO) + q->state |= UBLKSRV_QUEUE_BATCH_IO; q->q_id = q_id; /* FIXME: depth has to be PO 2 */ q->q_depth = depth; @@ -825,6 +838,18 @@ const struct ublksrv_queue *ublksrv_queue_init_flags(const struct ublksrv_dev *t io_uring_register_ring_fd(&q->ring); + /* Allocate batch IO buffers if batch mode is enabled */ + if (ublksrv_queue_batch_io(q)) { + ublk_dbg(UBLK_DBG_QUEUE, "ublk dev %d queue %d allocating batch bufs\n", + ctrl_dev->dev_info.dev_id, q->q_id); + ret = ublksrv_batch_alloc_bufs(q); + if (ret) { + ublk_err("ublk dev %d queue %d alloc batch bufs failed %d\n", + ctrl_dev->dev_info.dev_id, q->q_id, ret); + goto fail; + } + } + /* * N.B. PR_SET_IO_FLUSHER was added with Linux 5.6+. */ @@ -1038,6 +1063,10 @@ static void ublksrv_handle_cqe(struct io_uring *r, return; } + /* Handle batch IO commands */ + if (ublksrv_queue_batch_io(q) && ublksrv_batch_handle_cqe(q, cqe, cmd_op)) + return; + io = &q->ios[tag]; q->cmd_inflight--; @@ -1170,6 +1199,10 @@ int ublksrv_process_io(const struct ublksrv_queue *tq) if (__ublksrv_queue_is_done(q)) return -ENODEV; + /* Submit any pending batch commits before io_uring submit */ + if (ublksrv_queue_batch_io(q)) + ublksrv_batch_submit_commit(q); + ret = io_uring_submit_and_wait_timeout(&q->ring, &cqe, wait_nr, tsp, NULL); ublksrv_reset_aio_batch(q); diff --git a/lib/ublksrv_batch.c b/lib/ublksrv_batch.c new file mode 100644 index 00000000..3f713893 --- /dev/null +++ b/lib/ublksrv_batch.c @@ -0,0 +1,445 @@ +// SPDX-License-Identifier: MIT or LGPL-2.1-only + +/* + * UBLK_F_BATCH_IO support for libublksrv + * + * Implements batch-based operations instead of per-tag FETCH/COMMIT_AND_FETCH: + * - UBLK_U_IO_PREP_IO_CMDS: One-time prep with buffer info + * - UBLK_U_IO_FETCH_IO_CMDS: Multishot fetch returning 2-byte tags + * - UBLK_U_IO_COMMIT_IO_CMDS: Batch commit of completed IOs + */ + +#include +#include + +#include "ublksrv_priv.h" + +/* Compatibility definitions for older liburing versions */ +#ifndef IORING_URING_CMD_MULTISHOT +#define IORING_URING_CMD_MULTISHOT (1U << 1) +#endif + +#ifndef IOU_PBUF_RING_INC +#define IOU_PBUF_RING_INC (1U << 1) +#endif + +/* Calculate element size based on device flags (8 or 16 bytes) */ +static unsigned char ublk_batch_elem_buf_size(const struct _ublksrv_queue *q) +{ + /* Zero-copy, user-copy, and auto-buf-reg only need 8 bytes */ + if (q->state & (UBLKSRV_ZERO_COPY | UBLKSRV_USER_COPY | UBLKSRV_AUTO_ZC)) + return 8; + + /* Need extra 8 bytes for buffer address */ + return 16; +} + +/* Calculate commit buffer size (page-aligned) */ +static unsigned int ublk_batch_commit_buf_size(const struct _ublksrv_queue *q) +{ + unsigned elem_size = ublk_batch_elem_buf_size(q); + unsigned int total = elem_size * q->q_depth; + unsigned int page_sz = getpagesize(); + + return round_up(total, page_sz); +} + +/* Build user_data for batch commands */ +static inline __u64 build_batch_user_data(unsigned short buf_idx, unsigned op, + unsigned short nr_elem) +{ + /* Encode: tag=buf_idx, op=cmd_op, tgt_data=nr_elem (full 16 bits) */ + return build_user_data(buf_idx, op, nr_elem, 0); +} + +static inline unsigned short batch_user_data_to_nr_elem(__u64 user_data) +{ + return user_data_to_tgt_data(user_data); +} + +/* Allocate batch IO buffers for a queue */ +int ublksrv_batch_alloc_bufs(struct _ublksrv_queue *q) +{ + struct ublksrv_queue_batch *b = &q->batch; + unsigned int page_sz = getpagesize(); + unsigned int fetch_buf_size; + void *buf; + int i, ret; + + b->commit_buf_elem_size = ublk_batch_elem_buf_size(q); + b->commit_buf_size = ublk_batch_commit_buf_size(q); + + /* Allocate commit buffer memory */ + ret = posix_memalign(&buf, page_sz, + b->commit_buf_size * UBLK_BATCH_NR_COMMIT_BUFS); + if (ret || !buf) + return -ENOMEM; + + b->commit_buf_mem = buf; + + /* Lock commit buffer pages for fast access */ + if (mlock(b->commit_buf_mem, + b->commit_buf_size * UBLK_BATCH_NR_COMMIT_BUFS)) + ublk_err("%s: can't lock commit buffer: %s\n", __func__, + strerror(errno)); + + /* Setup commit buffer state */ + for (i = 0; i < UBLK_BATCH_NR_COMMIT_BUFS; i++) { + b->commit_bufs[i].buf = (char *)buf + i * b->commit_buf_size; + b->commit_bufs[i].done = 0; + b->commit_bufs[i].count = b->commit_buf_size / b->commit_buf_elem_size; + } + + /* Allocate fetch buffers (page-aligned) */ + fetch_buf_size = round_up(q->q_depth * 2, page_sz); + for (i = 0; i < UBLK_BATCH_NR_FETCH_BUFS; i++) { + b->fetch_bufs[i].fetch_buf_size = fetch_buf_size; + + ret = posix_memalign((void **)&b->fetch_bufs[i].fetch_buf, + page_sz, fetch_buf_size); + if (ret || !b->fetch_bufs[i].fetch_buf) + goto fail_fetch; + + /* Lock fetch buffer pages for fast fetching */ + if (mlock(b->fetch_bufs[i].fetch_buf, fetch_buf_size)) + ublk_err("%s: can't lock fetch buffer: %s\n", __func__, + strerror(errno)); + + /* Setup buffer ring for multishot fetch */ + b->fetch_bufs[i].br = io_uring_setup_buf_ring(&q->ring, 1, + i, IOU_PBUF_RING_INC, &ret); + if (!b->fetch_bufs[i].br) { + ublk_err("%s: qid %d buffer ring %d setup failed: %d (%s)\n", + __func__, q->q_id, i, ret, strerror(-ret)); + free(b->fetch_bufs[i].fetch_buf); + goto fail_fetch; + } + ublk_dbg(UBLK_DBG_QUEUE, "%s: qid %d buffer ring %d setup ok\n", + __func__, q->q_id, i); + b->fetch_bufs[i].fetch_buf_off = 0; + } + + /* Determine batch command flags */ + b->cmd_flags = 0; + if (ublksrv_queue_use_buf(q)) + b->cmd_flags |= UBLK_BATCH_F_HAS_BUF_ADDR; + + b->cur_commit_buf = 0; + b->prep_done = 0; + + ublk_dbg(UBLK_DBG_QUEUE, "%s: q%d elem_size=%u buf_size=%u flags=%x\n", + __func__, q->q_id, b->commit_buf_elem_size, + b->commit_buf_size, b->cmd_flags); + + return 0; + +fail_fetch: + while (--i >= 0) { + io_uring_free_buf_ring(&q->ring, b->fetch_bufs[i].br, 1, i); + munlock(b->fetch_bufs[i].fetch_buf, b->fetch_bufs[i].fetch_buf_size); + free(b->fetch_bufs[i].fetch_buf); + } + munlock(b->commit_buf_mem, + b->commit_buf_size * UBLK_BATCH_NR_COMMIT_BUFS); + free(b->commit_buf_mem); + b->commit_buf_mem = NULL; + return -ENOMEM; +} + +/* Free batch IO buffers */ +void ublksrv_batch_free_bufs(struct _ublksrv_queue *q) +{ + struct ublksrv_queue_batch *b = &q->batch; + int i; + + for (i = 0; i < UBLK_BATCH_NR_FETCH_BUFS; i++) { + if (b->fetch_bufs[i].br) { + io_uring_free_buf_ring(&q->ring, b->fetch_bufs[i].br, 1, i); + b->fetch_bufs[i].br = NULL; + } + if (b->fetch_bufs[i].fetch_buf) { + munlock(b->fetch_bufs[i].fetch_buf, + b->fetch_bufs[i].fetch_buf_size); + free(b->fetch_bufs[i].fetch_buf); + b->fetch_bufs[i].fetch_buf = NULL; + } + } + + if (b->commit_buf_mem) { + munlock(b->commit_buf_mem, + b->commit_buf_size * UBLK_BATCH_NR_COMMIT_BUFS); + free(b->commit_buf_mem); + b->commit_buf_mem = NULL; + } +} + +/* + * Common helper for issuing PREP or COMMIT batch commands. + * Returns the allocated SQE, or NULL on failure. + */ +static struct io_uring_sqe *ublksrv_batch_io_cmd(struct _ublksrv_queue *q, + unsigned int cmd_op, + void *buf, + unsigned short buf_idx, + unsigned short nr_elem) +{ + struct ublksrv_queue_batch *b = &q->batch; + struct io_uring_sqe *sqe; + struct ublk_batch_io *cmd; + + sqe = ublksrv_alloc_sqe(&q->ring); + if (!sqe) + return NULL; + + cmd = (struct ublk_batch_io *)ublksrv_get_sqe_cmd(sqe); + + ublksrv_set_sqe_cmd_op(sqe, cmd_op); + sqe->fd = 0; + sqe->opcode = IORING_OP_URING_CMD; + sqe->flags = IOSQE_FIXED_FILE; + sqe->addr = (__u64)buf; + sqe->len = b->commit_buf_elem_size * nr_elem; + + cmd->q_id = q->q_id; + cmd->flags = b->cmd_flags; + cmd->nr_elem = nr_elem; + cmd->elem_bytes = b->commit_buf_elem_size; + cmd->reserved = 0; + cmd->reserved2 = 0; + + io_uring_sqe_set_data64(sqe, build_batch_user_data(buf_idx, + _IOC_NR(cmd_op), nr_elem)); + + q->cmd_inflight++; + + return sqe; +} + +/* Issue UBLK_U_IO_PREP_IO_CMDS - one-time setup */ +static int ublksrv_batch_queue_prep_io_cmds(struct _ublksrv_queue *q) +{ + struct ublksrv_queue_batch *b = &q->batch; + struct batch_commit_buf *cb = &b->commit_bufs[0]; + unsigned short nr_elem = q->q_depth; + int i; + + /* Fill prep buffer with all tags and buffer info */ + for (i = 0; i < nr_elem; i++) { + struct ublk_batch_elem *elem = (struct ublk_batch_elem *) + ((char *)cb->buf + i * b->commit_buf_elem_size); + + elem->tag = i; + elem->result = 0; + + if (q->state & UBLKSRV_AUTO_ZC) + elem->buf_index = i; + else if (ublksrv_queue_use_buf(q)) + elem->buf_addr = (__u64)q->ios[i].buf_addr; + } + + if (!ublksrv_batch_io_cmd(q, UBLK_U_IO_PREP_IO_CMDS, cb->buf, 0, nr_elem)) { + ublk_err("%s: run out of sqe qid %d\n", __func__, q->q_id); + return -1; + } + + ublk_dbg(UBLK_DBG_IO_CMD, "%s: qid %d nr_elem %u elem_bytes %u\n", + __func__, q->q_id, nr_elem, b->commit_buf_elem_size); + + return 0; +} + +/* Issue UBLK_U_IO_FETCH_IO_CMDS (multishot) */ +static void ublksrv_batch_queue_fetch(struct _ublksrv_queue *q, int buf_idx) +{ + struct batch_fetch_buf *fb = &q->batch.fetch_bufs[buf_idx]; + struct io_uring_sqe *sqe; + struct ublk_batch_io *cmd; + unsigned short nr_elem = fb->fetch_buf_size / 2; + + /* Add buffer to the buffer ring */ + io_uring_buf_ring_add(fb->br, fb->fetch_buf, fb->fetch_buf_size, 0, 0, 0); + io_uring_buf_ring_advance(fb->br, 1); + + sqe = ublksrv_alloc_sqe(&q->ring); + if (!sqe) { + ublk_err("%s: run out of sqe qid %d\n", __func__, q->q_id); + return; + } + + cmd = (struct ublk_batch_io *)ublksrv_get_sqe_cmd(sqe); + + ublksrv_set_sqe_cmd_op(sqe, UBLK_U_IO_FETCH_IO_CMDS); + sqe->fd = 0; + sqe->opcode = IORING_OP_URING_CMD; + sqe->flags = IOSQE_FIXED_FILE | IOSQE_BUFFER_SELECT; + sqe->rw_flags = IORING_URING_CMD_MULTISHOT; + sqe->buf_group = buf_idx; + + cmd->q_id = q->q_id; + cmd->flags = 0; + cmd->nr_elem = nr_elem; + cmd->elem_bytes = 2; /* Each tag is 2 bytes */ + cmd->reserved = 0; + cmd->reserved2 = 0; + + io_uring_sqe_set_data64(sqe, build_batch_user_data(buf_idx, + _IOC_NR(UBLK_U_IO_FETCH_IO_CMDS), nr_elem)); + + q->cmd_inflight++; + fb->fetch_buf_off = 0; + + ublk_dbg(UBLK_DBG_IO_CMD, "%s: qid %d buf_idx %d nr_elem %u\n", + __func__, q->q_id, buf_idx, nr_elem); +} + +/* Issue initial batch fetch commands */ +void ublksrv_batch_start_fetch(struct _ublksrv_queue *q) +{ + int i; + + /* Issue one-time prep command */ + ublksrv_batch_queue_prep_io_cmds(q); + + /* Issue multishot fetch commands (double-buffering) */ + for (i = 0; i < UBLK_BATCH_NR_FETCH_BUFS; i++) + ublksrv_batch_queue_fetch(q, i); + + q->batch.prep_done = 1; +} + +/* Submit commit command and swap to other buffer */ +void ublksrv_batch_submit_commit(struct _ublksrv_queue *q) +{ + struct ublksrv_queue_batch *b = &q->batch; + struct batch_commit_buf *cb = &b->commit_bufs[b->cur_commit_buf]; + unsigned short nr_elem = cb->done; + + /* Nothing to commit */ + if (nr_elem == 0) + return; + + if (!ublksrv_batch_io_cmd(q, UBLK_U_IO_COMMIT_IO_CMDS, cb->buf, + b->cur_commit_buf, nr_elem)) { + ublk_err("%s: run out of sqe qid %d\n", __func__, q->q_id); + return; + } + + ublk_dbg(UBLK_DBG_IO_CMD, "%s: qid %d buf %d nr_elem %u\n", + __func__, q->q_id, b->cur_commit_buf, nr_elem); + + /* Swap to the other buffer */ + b->cur_commit_buf = 1 - b->cur_commit_buf; + b->commit_bufs[b->cur_commit_buf].done = 0; +} + +/* Handle fetch CQE - call handle_io_async for each tag */ +static void ublksrv_batch_handle_fetch_cqe(struct _ublksrv_queue *q, + struct io_uring_cqe *cqe) +{ + unsigned short buf_idx = user_data_to_tag(cqe->user_data); + struct batch_fetch_buf *fb = &q->batch.fetch_bufs[buf_idx]; + unsigned start = fb->fetch_buf_off; + unsigned end = start + cqe->res; + unsigned i; + + if (cqe->res < 0) { + if (cqe->res != -ENOBUFS) + ublk_err("%s: qid %d fetch failed: %d\n", + __func__, q->q_id, cqe->res); + return; + } + + /* Process each 2-byte tag in the buffer */ + for (i = start; i < end; i += 2) { + unsigned short tag = *(unsigned short *) + ((char *)fb->fetch_buf + i); + + if (tag >= q->q_depth) { + ublk_err("%s: qid %d invalid tag %u\n", + __func__, q->q_id, tag); + continue; + } + + q->tgt_ops->handle_io_async(local_to_tq(q), &q->ios[tag].data); + } + + fb->fetch_buf_off = end; + + ublk_dbg(UBLK_DBG_IO_CMD, "%s: qid %d buf %d processed %u tags\n", + __func__, q->q_id, buf_idx, (end - start) / 2); +} + +/* Handle prep/commit CQE completion */ +static void ublksrv_batch_handle_commit_cqe(struct _ublksrv_queue *q, + struct io_uring_cqe *cqe, + unsigned op) +{ + struct ublksrv_queue_batch *b = &q->batch; + unsigned short buf_idx = user_data_to_tag(cqe->user_data); + + if (op == _IOC_NR(UBLK_U_IO_PREP_IO_CMDS)) { + if (cqe->res != 0) + ublk_err("%s: qid %d prep failed: %d\n", + __func__, q->q_id, cqe->res); + /* Prep buffer (commit_bufs[0]) is no longer needed for prep */ + } else if (op == _IOC_NR(UBLK_U_IO_COMMIT_IO_CMDS)) { + unsigned short nr_elem = batch_user_data_to_nr_elem(cqe->user_data); + unsigned expected = b->commit_buf_elem_size * nr_elem; + + if (cqe->res != (int)expected && cqe->res >= 0) { + ublk_err("%s: qid %d commit partial: %d/%u\n", + __func__, q->q_id, cqe->res, expected); + } else if (cqe->res < 0) { + ublk_err("%s: qid %d commit failed: %d\n", + __func__, q->q_id, cqe->res); + } + + ublk_dbg(UBLK_DBG_IO_CMD, "%s: qid %d buf %d commit done\n", + __func__, q->q_id, buf_idx); + } +} + +/* + * Handle batch IO CQE - returns true if handled, false to fall through + * to normal (non-batch) CQE handling. + */ +bool ublksrv_batch_handle_cqe(struct _ublksrv_queue *q, + struct io_uring_cqe *cqe, + unsigned cmd_op) +{ + switch (cmd_op) { + case _IOC_NR(UBLK_U_IO_FETCH_IO_CMDS): + /* + * For multishot FETCH, only decrement cmd_inflight when + * the multishot actually ends (no MORE flag or error). + * While multishot is active, each CQE doesn't consume + * the command - it stays in flight. + */ + if (cqe->res < 0 && cqe->res != -ENOBUFS) { + /* Error - multishot ended */ + q->cmd_inflight--; + q->state |= UBLKSRV_QUEUE_STOPPING; + } else { + ublksrv_batch_handle_fetch_cqe(q, cqe); + + /* Rearm multishot if it stopped */ + if (!(cqe->flags & IORING_CQE_F_MORE) || + cqe->res == -ENOBUFS) { + unsigned short buf_idx = user_data_to_tag(cqe->user_data); + q->cmd_inflight--; + ublksrv_batch_queue_fetch(q, buf_idx); + } + /* If MORE flag is set, multishot still active - don't decrement */ + } + return true; + + case _IOC_NR(UBLK_U_IO_PREP_IO_CMDS): + case _IOC_NR(UBLK_U_IO_COMMIT_IO_CMDS): + q->cmd_inflight--; + ublksrv_batch_handle_commit_cqe(q, cqe, cmd_op); + return true; + } + + /* Not a batch command - fall through to normal handling */ + return false; +} diff --git a/targets/ublksrv_tgt.cpp b/targets/ublksrv_tgt.cpp index 18691cd3..e8e1dad1 100644 --- a/targets/ublksrv_tgt.cpp +++ b/targets/ublksrv_tgt.cpp @@ -346,6 +346,7 @@ static int ublksrv_parse_add_opts(struct ublksrv_dev_data *data, int *efd, int a int user_recovery_reissue = 0; int unprivileged = 0; int zero_copy = 0; + int batch_io = 0; int option_index = 0; unsigned int debug_mask = 0; static const struct option longopts[] = { @@ -364,6 +365,7 @@ static int ublksrv_parse_add_opts(struct ublksrv_dev_data *data, int *efd, int a { "eventfd", 1, NULL, 0}, { "max_io_buf_bytes", 1, NULL, 0}, { "zerocopy", 0, NULL, 'z'}, + { "batch-io", 0, NULL, 'b'}, { NULL } }; @@ -375,7 +377,7 @@ static int ublksrv_parse_add_opts(struct ublksrv_dev_data *data, int *efd, int a mkpath(data->run_dir); - while ((opt = getopt_long(argc, argv, "-:t:n:d:q:u:g:r:e:i:z", + while ((opt = getopt_long(argc, argv, "-:t:n:d:q:u:g:r:e:i:zb", longopts, &option_index)) != -1) { switch (opt) { case 'n': @@ -387,6 +389,9 @@ static int ublksrv_parse_add_opts(struct ublksrv_dev_data *data, int *efd, int a case 'z': zero_copy = 1; break; + case 'b': + batch_io = 1; + break; case 'q': data->nr_hw_queues = strtol(optarg, NULL, 10); break; @@ -441,6 +446,8 @@ static int ublksrv_parse_add_opts(struct ublksrv_dev_data *data, int *efd, int a data->flags |= UBLK_F_UNPRIVILEGED_DEV; if (zero_copy) data->flags |= UBLK_F_SUPPORT_ZERO_COPY; + if (batch_io) + data->flags |= UBLK_F_BATCH_IO; ublk_set_debug_mask(debug_mask); @@ -452,6 +459,7 @@ static void ublksrv_print_std_opts(void) printf("\t-n DEV_ID -q NR_HW_QUEUES -d QUEUE_DEPTH\n"); printf("\t-u URING_COMP -g NEED_GET_DATA -r USER_RECOVERY\n"); printf("\t-i USER_RECOVERY_REISSUE -e USER_RECOVERY_FAIL_IO\n"); + printf("\t-b | --batch-io (enable batch IO mode)\n"); printf("\t--debug_mask=0x{DBG_MASK} --unprivileged\n"); } @@ -488,16 +496,26 @@ static int ublksrv_cmd_dev_add(const struct ublksrv_tgt_type *tgt_type, int argc goto fail_send_event; } - if (data.flags & UBLK_F_SUPPORT_ZERO_COPY) { + if (data.flags & (UBLK_F_SUPPORT_ZERO_COPY | UBLK_F_BATCH_IO)) { __u64 features = 0; ret = ublksrv_ctrl_get_features(dev, &features); if (ret) return ret; - if (!(features & UBLK_F_SUPPORT_ZERO_COPY)) + + if ((data.flags & UBLK_F_SUPPORT_ZERO_COPY) && + !(features & UBLK_F_SUPPORT_ZERO_COPY)) + return -ENOTSUP; + + if ((data.flags & UBLK_F_BATCH_IO) && + !(features & UBLK_F_BATCH_IO)) { + fprintf(stderr, "UBLK_F_BATCH_IO not supported by kernel\n"); return -ENOTSUP; + } + /* disable UBLK_F_AUTO_BUF_REG if it isn't supported yet */ - if (!(features & UBLK_F_AUTO_BUF_REG)) { + if ((data.flags & UBLK_F_SUPPORT_ZERO_COPY) && + !(features & UBLK_F_AUTO_BUF_REG)) { data.flags &= ~UBLK_F_AUTO_BUF_REG; ublksrv_ctrl_deinit(dev); dev = ublksrv_ctrl_init(&data); diff --git a/tests/loop/012 b/tests/loop/012 new file mode 100755 index 00000000..64de5881 --- /dev/null +++ b/tests/loop/012 @@ -0,0 +1,12 @@ +#!/bin/bash +# SPDX-License-Identifier: MIT or GPL-2.0-only + +. common/fio_common +. common/loop_common + +file=`_create_loop_image "data" $LO_IMG_SZ` +export T_TYPE_PARAMS="-t loop -q 2 -b -f $file" + +__run_dev_perf 2 + +_remove_loop_image $file diff --git a/tests/nbd/024 b/tests/nbd/024 new file mode 100755 index 00000000..797225ff --- /dev/null +++ b/tests/nbd/024 @@ -0,0 +1,15 @@ +#!/bin/bash +# SPDX-License-Identifier: MIT or GPL-2.0 + +. common/fio_common +. common/nbd_common + +echo "run perf test via ublk-nbd(nbd server: $NBDSRV:nbdkit memory $NBD_SIZE)" + +file=`_create_image "nbd" "none" $NBD_SIZE` + +export T_TYPE_PARAMS="-t nbd -q 2 -d 127 -b --host $NBDSRV" +__run_dev_perf 1 + +_remove_image "nbd" $file + diff --git a/tests/null/012 b/tests/null/012 new file mode 100755 index 00000000..b651458d --- /dev/null +++ b/tests/null/012 @@ -0,0 +1,8 @@ +#!/bin/bash +# SPDX-License-Identifier: MIT or GPL-2.0-only + +. common/fio_common + +export T_TYPE_PARAMS="-t null -q 2 -b" + +__run_dev_perf 2