Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/ublksrv.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ struct ublk_io_data {
#define UBLKSRV_ZERO_COPY (1U << 4)
#define UBLKSRV_AUTO_ZC (1U << 5)
#define UBLKSRV_QUEUE_POLL (1U << 6)
#define UBLKSRV_QUEUE_BATCH_IO (1U << 7)

/**
* ublksrv_queue is 1:1 mapping with ublk driver's blk-mq queue, and
Expand Down
95 changes: 94 additions & 1 deletion include/ublksrv_priv.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,54 @@
#include "ublksrv.h"
#include "ublksrv_aio.h"

/*
* Batch IO support structures
*
* When UBLK_F_BATCH_IO is set, the queue uses batch-based operations instead
* of per-tag FETCH/COMMIT_AND_FETCH:
* - UBLK_U_IO_PREP_IO_CMDS: One-time prep with buffer info
* - UBLK_U_IO_FETCH_IO_CMDS: Multishot fetch returning 2-byte tags
* - UBLK_U_IO_COMMIT_IO_CMDS: Batch commit of completed IOs
*/

/* Batch element - aligned with kernel's ublk_elem_header (8 or 16 bytes) */
struct ublk_batch_elem {
__u16 tag;
__u16 buf_index;
__s32 result;
__u64 buf_addr; /* Only used if !UBLK_F_USER_COPY && !UBLK_F_SUPPORT_ZERO_COPY */
};

/* Per-queue commit buffer state (2 per queue for double-buffering) */
struct batch_commit_buf {
void *buf; /* Points to commit buffer memory */
unsigned short done; /* Number of IOs added to this batch */
unsigned short count; /* Max capacity of this buffer */
};

/* Per-queue fetch buffer with io_uring buffer ring (2 per queue) */
struct batch_fetch_buf {
struct io_uring_buf_ring *br;
void *fetch_buf;
unsigned int fetch_buf_size;
unsigned int fetch_buf_off;
};

/* Number of fetch/commit buffers for double-buffering */
#define UBLK_BATCH_NR_FETCH_BUFS 1
#define UBLK_BATCH_NR_COMMIT_BUFS 2

/* Per-queue batch IO state - allocated only when UBLK_F_BATCH_IO is set */
struct ublksrv_queue_batch {
struct batch_fetch_buf fetch_bufs[UBLK_BATCH_NR_FETCH_BUFS];
struct batch_commit_buf commit_bufs[UBLK_BATCH_NR_COMMIT_BUFS];
void *commit_buf_mem; /* Allocated commit buffer memory */
unsigned int commit_buf_size; /* Size of each commit buffer */
unsigned char commit_buf_elem_size; /* Size of each element (8 or 16) */
unsigned char cur_commit_buf; /* Index of current active commit buffer (0 or 1) */
unsigned char prep_done; /* PREP_IO_CMDS has been issued */
__u16 cmd_flags; /* Flags for batch commands */
};

/* todo: relace the hardcode name with /dev/char/maj:min */
#ifdef UBLKC_PREFIX
Expand Down Expand Up @@ -154,7 +202,10 @@ struct _ublksrv_queue {
int nr_ctxs;
struct ublksrv_aio_ctx *ctxs[UBLKSRV_NR_CTX_BATCH];

unsigned long reserved[8];
/* Batch IO support - only used when UBLK_F_BATCH_IO is set */
struct ublksrv_queue_batch batch;

unsigned long reserved[4];

struct ublk_io ios[0];
};
Expand Down Expand Up @@ -233,6 +284,48 @@ int create_pid_file(const char *pid_file, int *pid_fd);

extern void ublksrv_build_cpu_str(char *buf, int len, const cpu_set_t *cpuset);

/* Check if queue needs to pass buffer addresses (not zero-copy or user-copy) */
static inline bool ublksrv_queue_use_buf(const struct _ublksrv_queue *q)
{
return !(q->state & (UBLKSRV_USER_COPY | UBLKSRV_ZERO_COPY));
}

/* Batch IO support - check if queue uses batch mode */
static inline bool ublksrv_queue_batch_io(const struct _ublksrv_queue *q)
{
return !!(q->state & UBLKSRV_QUEUE_BATCH_IO);
}

/* Batch IO functions (implemented in ublksrv_batch.c) */
int ublksrv_batch_alloc_bufs(struct _ublksrv_queue *q);
void ublksrv_batch_free_bufs(struct _ublksrv_queue *q);
void ublksrv_batch_start_fetch(struct _ublksrv_queue *q);
void ublksrv_batch_submit_commit(struct _ublksrv_queue *q);
bool ublksrv_batch_handle_cqe(struct _ublksrv_queue *q,
struct io_uring_cqe *cqe, unsigned cmd_op);

/* Add completed IO to current commit buffer (inline for fast path) */
static inline void ublksrv_batch_add_complete(struct _ublksrv_queue *q,
unsigned tag, int result)
{
struct ublksrv_queue_batch *b = &q->batch;
struct batch_commit_buf *cb = &b->commit_bufs[b->cur_commit_buf];
struct ublk_batch_elem *elem;

elem = (struct ublk_batch_elem *)
((char *)cb->buf + cb->done * b->commit_buf_elem_size);

elem->tag = tag;
elem->result = result;

if (q->state & UBLKSRV_AUTO_ZC)
elem->buf_index = tag;
else if (ublksrv_queue_use_buf(q))
elem->buf_addr = (__u64)q->ios[tag].buf_addr;

cb->done++;
}

/*
* bit63: target io, bit62: internal data.
*
Expand Down
1 change: 1 addition & 0 deletions lib/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ libublksrv_la_SOURCES = \
ublksrv_cmd.c \
ublksrv_json.cpp \
ublksrv.c \
ublksrv_batch.c \
utils.c \
ublksrv_aio.c
libublksrv_la_CFLAGS = \
Expand Down
47 changes: 40 additions & 7 deletions lib/ublksrv.c
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,6 @@ static void ublksrv_tgt_deinit(struct _ublksrv_dev *dev)
tgt->ops->deinit_tgt(local_to_tdev(dev));
}

static inline bool ublksrv_queue_use_buf(const struct _ublksrv_queue *q)
{
return !(q->state & (UBLKSRV_USER_COPY | UBLKSRV_ZERO_COPY));
}

static inline bool ublksrv_queue_alloc_buf(const struct _ublksrv_queue *q)
{
return !(q->state & UBLKSRV_ZERO_COPY);
Expand Down Expand Up @@ -225,9 +220,15 @@ static inline int ublksrv_queue_io_cmd(struct _ublksrv_queue *q,
int ublksrv_complete_io(const struct ublksrv_queue *tq, unsigned tag, int res)
{
struct _ublksrv_queue *q = tq_to_local(tq);

struct ublk_io *io = &q->ios[tag];

/* In batch mode, add to commit buffer instead of issuing individual cmd */
if (ublksrv_queue_batch_io(q)) {
ublksrv_batch_add_complete(q, tag, res);
io->flags = UBLKSRV_IO_FREE;
return 1;
}

ublksrv_mark_io_done(io, res);

return ublksrv_queue_io_cmd(q, io, tag);
Expand Down Expand Up @@ -327,12 +328,18 @@ int ublksrv_queue_send_event(const struct ublksrv_queue *tq)
* Issue all available commands to /dev/ublkcN and the exact cmd is figured
* out in queue_io_cmd with help of each io->status.
*
* todo: queue io commands with batching
* For batch mode, issue PREP_IO_CMDS once, then two multishot FETCH_IO_CMDS.
*/
static void ublksrv_submit_fetch_commands(struct _ublksrv_queue *q)
{
int i = 0;

if (ublksrv_queue_batch_io(q)) {
ublksrv_batch_start_fetch(q);
__ublksrv_queue_event(q);
return;
}

for (i = 0; i < q->q_depth; i++)
ublksrv_queue_io_cmd(q, &q->ios[i], i);

Expand Down Expand Up @@ -417,6 +424,10 @@ void ublksrv_queue_deinit(const struct ublksrv_queue *tq)
if (q->efd >= 0)
close(q->efd);

/* Free batch buffers before unregistering ring */
if (ublksrv_queue_batch_io(q))
ublksrv_batch_free_bufs(q);

io_uring_unregister_buffers(&q->ring);
io_uring_unregister_ring_fd(&q->ring);

Expand Down Expand Up @@ -737,6 +748,8 @@ const struct ublksrv_queue *ublksrv_queue_init_flags(const struct ublksrv_dev *t
/* polling logic should be implemented in ->handle_io_background() */
if (ctrl_dev->dev_info.ublksrv_flags & UBLKSRV_F_NEED_POLL)
q->state |= UBLKSRV_QUEUE_POLL;
if (ctrl_dev->dev_info.flags & UBLK_F_BATCH_IO)
q->state |= UBLKSRV_QUEUE_BATCH_IO;
q->q_id = q_id;
/* FIXME: depth has to be PO 2 */
q->q_depth = depth;
Expand Down Expand Up @@ -825,6 +838,18 @@ const struct ublksrv_queue *ublksrv_queue_init_flags(const struct ublksrv_dev *t

io_uring_register_ring_fd(&q->ring);

/* Allocate batch IO buffers if batch mode is enabled */
if (ublksrv_queue_batch_io(q)) {
ublk_dbg(UBLK_DBG_QUEUE, "ublk dev %d queue %d allocating batch bufs\n",
ctrl_dev->dev_info.dev_id, q->q_id);
ret = ublksrv_batch_alloc_bufs(q);
if (ret) {
ublk_err("ublk dev %d queue %d alloc batch bufs failed %d\n",
ctrl_dev->dev_info.dev_id, q->q_id, ret);
goto fail;
}
}

/*
* N.B. PR_SET_IO_FLUSHER was added with Linux 5.6+.
*/
Expand Down Expand Up @@ -1038,6 +1063,10 @@ static void ublksrv_handle_cqe(struct io_uring *r,
return;
}

/* Handle batch IO commands */
if (ublksrv_queue_batch_io(q) && ublksrv_batch_handle_cqe(q, cqe, cmd_op))
return;

io = &q->ios[tag];
q->cmd_inflight--;

Expand Down Expand Up @@ -1170,6 +1199,10 @@ int ublksrv_process_io(const struct ublksrv_queue *tq)
if (__ublksrv_queue_is_done(q))
return -ENODEV;

/* Submit any pending batch commits before io_uring submit */
if (ublksrv_queue_batch_io(q))
ublksrv_batch_submit_commit(q);

ret = io_uring_submit_and_wait_timeout(&q->ring, &cqe, wait_nr, tsp, NULL);

ublksrv_reset_aio_batch(q);
Expand Down
Loading
Loading