}
/** @} nrs */
+static inline bool req_capsule_ptlreq(struct req_capsule *pill)
+{
+ struct ptlrpc_request *req = pill->rc_req;
+
+ return req != NULL && pill == &req->rq_pill;
+}
+
+static inline bool req_capsule_subreq(struct req_capsule *pill)
+{
+ struct ptlrpc_request *req = pill->rc_req;
+
+ return req == NULL || pill != &req->rq_pill;
+}
+
+/**
+ * Returns true if request needs to be swabbed into local cpu byteorder
+ */
+static inline bool req_capsule_req_need_swab(struct req_capsule *pill)
+{
+ struct ptlrpc_request *req = pill->rc_req;
+
+ return req && req_capsule_req_swabbed(&req->rq_pill,
+ MSG_PTLRPC_HEADER_OFF);
+}
+
+/**
+ * Returns true if request reply needs to be swabbed into local cpu byteorder
+ */
+static inline bool req_capsule_rep_need_swab(struct req_capsule *pill)
+{
+ struct ptlrpc_request *req = pill->rc_req;
+
+ return req && req_capsule_rep_swabbed(&req->rq_pill,
+ MSG_PTLRPC_HEADER_OFF);
+}
+
/**
* Convert numerical request phase value \a phase into text string description
*/
void req_capsule_fini(struct req_capsule *pill);
void req_capsule_set(struct req_capsule *pill, const struct req_format *fmt);
+void req_capsule_subreq_init(struct req_capsule *pill,
+ const struct req_format *fmt,
+ struct ptlrpc_request *req,
+ struct lustre_msg *reqmsg,
+ struct lustre_msg *repmsg,
+ enum req_location loc);
+
void req_capsule_client_dump(struct req_capsule *pill);
void req_capsule_server_dump(struct req_capsule *pill);
void req_capsule_init_area(struct req_capsule *pill);
size_t req_capsule_filled_sizes(struct req_capsule *pill,
enum req_location loc);
int req_capsule_server_pack(struct req_capsule *pill);
+int req_capsule_client_pack(struct req_capsule *pill);
+void req_capsule_set_replen(struct req_capsule *pill);
void *req_capsule_client_get(struct req_capsule *pill,
const struct req_msg_field *field);
}
/**
- * Returns true if request needs to be swabbed into local cpu byteorder
- */
-static inline bool req_capsule_req_need_swab(struct req_capsule *pill)
-{
- return req_capsule_req_swabbed(pill, MSG_PTLRPC_HEADER_OFF);
-}
-
-/**
- * Returns true if request reply needs to be swabbed into local cpu byteorder
- */
-static inline bool req_capsule_rep_need_swab(struct req_capsule *pill)
-{
- return req_capsule_rep_swabbed(pill, MSG_PTLRPC_HEADER_OFF);
-}
-
-/**
* Mark request buffer at offset \a index that it was already swabbed
*/
static inline void req_capsule_set_req_swabbed(struct req_capsule *pill,
extern struct req_format RQF_LFSCK_NOTIFY;
extern struct req_format RQF_LFSCK_QUERY;
+/* Batch UpdaTe req_format */
+extern struct req_format RQF_MDS_BATCH;
+
extern struct req_msg_field RMF_GENERIC_DATA;
extern struct req_msg_field RMF_PTLRPC_BODY;
extern struct req_msg_field RMF_MDT_BODY;
extern struct req_msg_field RMF_OUT_UPDATE_HEADER;
extern struct req_msg_field RMF_OUT_UPDATE_BUF;
+/* Batch UpdaTe format */
+extern struct req_msg_field RMF_BUT_REPLY;
+extern struct req_msg_field RMF_BUT_HEADER;
+extern struct req_msg_field RMF_BUT_BUF;
+
/* LFSCK format */
extern struct req_msg_field RMF_LFSCK_REQUEST;
extern struct req_msg_field RMF_LFSCK_REPLY;
void lustre_swab_hsm_user_state(struct hsm_user_state *hus);
void lustre_swab_hsm_user_item(struct hsm_user_item *hui);
void lustre_swab_hsm_request(struct hsm_request *hr);
+void lustre_swab_batch_update_request(struct batch_update_request *bur);
+void lustre_swab_but_update_header(struct but_update_header *buh);
+void lustre_swab_but_update_buffer(struct but_update_buffer *bub);
+void lustre_swab_batch_update_reply(struct batch_update_reply *bur);
void lustre_swab_swap_layouts(struct mdc_swap_layouts *msl);
void lustre_swab_close_data(struct close_data *data);
void lustre_swab_close_data_resync_done(struct close_data_resync_done *resync);
struct md_op_item;
typedef int (*md_op_item_cb_t)(struct md_op_item *item, int rc);
+enum md_item_opcode {
+ MD_OP_NONE = 0,
+ MD_OP_GETATTR = 1,
+ MD_OP_MAX,
+};
+
struct md_op_item {
+ enum md_item_opcode mop_opc;
struct md_op_data mop_data;
struct lookup_intent mop_it;
struct lustre_handle mop_lockh;
struct work_struct mop_work;
};
+enum lu_batch_flags {
+ BATCH_FL_NONE = 0x0,
+ /* All requests in a batch are read-only. */
+ BATCH_FL_RDONLY = 0x1,
+ /* Will create PTLRPC request set for the batch. */
+ BATCH_FL_RQSET = 0x2,
+ /* Whether need sync commit. */
+ BATCH_FL_SYNC = 0x4,
+};
+
+struct lu_batch {
+ struct ptlrpc_request_set *lbt_rqset;
+ __s32 lbt_result;
+ __u32 lbt_flags;
+ /* Max batched SUB requests count in a batch. */
+ __u32 lbt_max_count;
+};
+
+struct batch_update_head {
+ struct obd_export *buh_exp;
+ struct lu_batch *buh_batch;
+ int buh_flags;
+ __u32 buh_count;
+ __u32 buh_update_count;
+ __u32 buh_buf_count;
+ __u32 buh_reqsize;
+ __u32 buh_repsize;
+ __u32 buh_batchid;
+ struct list_head buh_buf_list;
+ struct list_head buh_cb_list;
+};
+
+struct object_update_callback;
+typedef int (*object_update_interpret_t)(struct ptlrpc_request *req,
+ struct lustre_msg *repmsg,
+ struct object_update_callback *ouc,
+ int rc);
+
+struct object_update_callback {
+ struct list_head ouc_item;
+ object_update_interpret_t ouc_interpret;
+ struct batch_update_head *ouc_head;
+ void *ouc_data;
+};
+
+typedef int (*md_update_pack_t)(struct batch_update_head *head,
+ struct lustre_msg *reqmsg,
+ size_t *max_pack_size,
+ struct md_op_item *item);
+
+struct cli_batch {
+ struct lu_batch cbh_super;
+ struct batch_update_head *cbh_head;
+};
+
+struct lu_batch *cli_batch_create(struct obd_export *exp,
+ enum lu_batch_flags flags, __u32 max_count);
+int cli_batch_stop(struct obd_export *exp, struct lu_batch *bh);
+int cli_batch_flush(struct obd_export *exp, struct lu_batch *bh, bool wait);
+int cli_batch_add(struct obd_export *exp, struct lu_batch *bh,
+ struct md_op_item *item, md_update_pack_t packer,
+ object_update_interpret_t interpreter);
+
struct obd_ops {
struct module *o_owner;
int (*o_iocontrol)(unsigned int cmd, struct obd_export *exp, int len,
const union lmv_mds_md *lmv, size_t lmv_size);
int (*m_rmfid)(struct obd_export *exp, struct fid_array *fa, int *rcs,
struct ptlrpc_request_set *set);
+ struct lu_batch *(*m_batch_create)(struct obd_export *exp,
+ enum lu_batch_flags flags,
+ __u32 max_count);
+ int (*m_batch_stop)(struct obd_export *exp, struct lu_batch *bh);
+ int (*m_batch_flush)(struct obd_export *exp, struct lu_batch *bh,
+ bool wait);
+ int (*m_batch_add)(struct obd_export *exp, struct lu_batch *bh,
+ struct md_op_item *item);
};
static inline struct md_open_data *obd_mod_alloc(void)
return MDP(exp->exp_obd, rmfid)(exp, fa, rcs, set);
}
+static inline struct lu_batch *
+md_batch_create(struct obd_export *exp, enum lu_batch_flags flags,
+ __u32 max_count)
+{
+ int rc;
+
+ rc = exp_check_ops(exp);
+ if (rc)
+ return ERR_PTR(rc);
+
+ return MDP(exp->exp_obd, batch_create)(exp, flags, max_count);
+}
+
+static inline int md_batch_stop(struct obd_export *exp, struct lu_batch *bh)
+{
+ int rc;
+
+ rc = exp_check_ops(exp);
+ if (rc)
+ return rc;
+
+ return MDP(exp->exp_obd, batch_stop)(exp, bh);
+}
+
+static inline int md_batch_flush(struct obd_export *exp, struct lu_batch *bh,
+ bool wait)
+{
+ int rc;
+
+ rc = exp_check_ops(exp);
+ if (rc)
+ return rc;
+
+ return MDP(exp->exp_obd, batch_flush)(exp, bh, wait);
+}
+
+static inline int md_batch_add(struct obd_export *exp, struct lu_batch *bh,
+ struct md_op_item *item)
+{
+ int rc;
+
+ rc = exp_check_ops(exp);
+ if (rc)
+ return rc;
+
+ return MDP(exp->exp_obd, batch_add)(exp, bh, item);
+}
+
/* OBD Metadata Support */
extern int obd_init_caches(void);
#define OBD_FAIL_MDS_LINK_RENAME_RACE 0x18a
#define OBD_FAIL_MDS_HSM_RESTORE_RACE 0x18b
#define OBD_FAIL_MDS_CHANGELOG_ENOSPC 0x18c
+#define OBD_FAIL_MDS_BATCH_NET 0x18d
/* OI scrub */
#define OBD_FAIL_OSD_SCRUB_DELAY 0x190
return ptr;
}
+
+static inline struct lustre_msg *
+batch_update_reqmsg_next(struct batch_update_request *bur,
+ struct lustre_msg *reqmsg)
+{
+ if (reqmsg)
+ return (struct lustre_msg *)((char *)reqmsg +
+ lustre_packed_msg_size(reqmsg));
+ else
+ return &bur->burq_reqmsg[0];
+}
+
+static inline struct lustre_msg *
+batch_update_repmsg_next(struct batch_update_reply *bur,
+ struct lustre_msg *repmsg)
+{
+ if (repmsg)
+ return (struct lustre_msg *)((char *)repmsg +
+ lustre_packed_msg_size(repmsg));
+ else
+ return &bur->burp_repmsg[0];
+}
#endif
__u32 lm_repsize; /* size of preallocated reply buffer */
__u32 lm_cksum; /* CRC32 of ptlrpc_body early reply messages */
__u32 lm_flags; /* enum lustre_msghdr MSGHDR_* flags */
- __u32 lm_padding_2; /* unused */
+ __u32 lm_opc; /* SUB request opcode in a batch request */
__u32 lm_padding_3; /* unused */
__u32 lm_buflens[0]; /* length of additional buffers in bytes,
* padded to a multiple of 8 bytes. */
*/
};
+/* The returned result of the SUB request in a batch request */
+#define lm_result lm_opc
+
/* ptlrpc_body packet pb_types */
#define PTL_RPC_MSG_REQUEST 4711 /* normal RPC request message */
#define PTL_RPC_MSG_ERR 4712 /* error reply if request unprocessed */
OBD_CONNECT2_LSEEK | OBD_CONNECT2_DOM_LVB |\
OBD_CONNECT2_REP_MBITS | \
OBD_CONNECT2_ATOMIC_OPEN_LOCK | \
+ OBD_CONNECT2_BATCH_RPC | \
OBD_CONNECT2_ENCRYPT_NAME)
#define OST_CONNECT_SUPPORTED (OBD_CONNECT_SRVLOCK | OBD_CONNECT_GRANT | \
MDS_HSM_CT_UNREGISTER = 60,
MDS_SWAP_LAYOUTS = 61,
MDS_RMFID = 62,
+ MDS_BATCH = 63,
MDS_LAST_OPC
};
char orr_data[0];
};
+#define BUT_REQUEST_MAGIC 0xBADE0001
+/* Hold batched updates sending to the remote target in a single RPC */
+struct batch_update_request {
+ /* Magic number: BUT_REQUEST_MAGIC. */
+ __u32 burq_magic;
+ /* Number of sub requests packed in this batched RPC: burq_reqmsg[]. */
+ __u16 burq_count;
+ /* Unused padding field. */
+ __u16 burq_padding;
+ /*
+ * Sub request message array. As message feild buffers for each sub
+ * request are packed after padded lustre_msg.lm_buflens[] arrary, thus
+ * it can locate the next request message via the function
+ * @batch_update_reqmsg_next() in lustre/include/obj_update.h
+ */
+ struct lustre_msg burq_reqmsg[0];
+};
+
+#define BUT_HEADER_MAGIC 0xBADF0001
+/* Header for Batched UpdaTes request */
+struct but_update_header {
+ /* Magic number: BUT_HEADER_MAGIC */
+ __u32 buh_magic;
+ /*
+ * When the total request buffer length is less than MAX_INLINE_SIZE,
+ * @buh_count is set with 1 and the batched RPC reqeust can be packed
+ * inline.
+ * Otherwise, @buh_count indicates the IO vector count transferring in
+ * bulk I/O.
+ */
+ __u32 buh_count;
+ /* inline buffer length when the batched RPC can be packed inline. */
+ __u32 buh_inline_length;
+ /* The reply buffer size the client prepared. */
+ __u32 buh_reply_size;
+ /* Sub request count in this batched RPC. */
+ __u32 buh_update_count;
+ /* Unused padding field. */
+ __u32 buh_padding;
+ /* Inline buffer used when the RPC request can be packed inline. */
+ __u32 buh_inline_data[0];
+};
+
+struct but_update_buffer {
+ __u32 bub_size;
+ __u32 bub_padding;
+};
+
+#define BUT_REPLY_MAGIC 0x00AD0001
+/* Batched reply received from a remote targer in a batched RPC. */
+struct batch_update_reply {
+ /* Magic number: BUT_REPLY_MAGIC. */
+ __u32 burp_magic;
+ /* Successful returned sub requests. */
+ __u16 burp_count;
+ /* Unused padding field. */
+ __u16 burp_padding;
+ /*
+ * Sub reply message array.
+ * It can locate the next reply message buffer via the function
+ * @batch_update_repmsg_next() in lustre/include/obj_update.h
+ */
+ struct lustre_msg burp_repmsg[0];
+};
+
+/**
+ * Batch update opcode.
+ */
+enum batch_update_cmd {
+ BUT_GETATTR = 1,
+ BUT_LAST_OPC,
+ BUT_FIRST_OPC = BUT_GETATTR,
+};
+
/** layout swap request structure
* fid1 and fid2 are in mdt_body
*/
#define LL_IT2STR(it) \
((it) ? ldlm_it2str((it)->it_op) : "0")
+struct lmvsub_batch {
+ struct lu_batch *sbh_sub;
+ struct lmv_tgt_desc *sbh_tgt;
+ struct list_head sbh_sub_item;
+};
+
+struct lmv_batch {
+ struct lu_batch lbh_super;
+ struct ptlrpc_request_set *lbh_rqset;
+ struct list_head lbh_sub_batch_list;
+};
+
int lmv_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
struct lookup_intent *it, struct ptlrpc_request **reqp,
ldlm_blocking_callback cb_blocking,
return 0;
}
+static struct lu_batch *lmv_batch_create(struct obd_export *exp,
+ enum lu_batch_flags flags,
+ __u32 max_count)
+{
+ struct lu_batch *bh;
+ struct lmv_batch *lbh;
+
+ ENTRY;
+ OBD_ALLOC_PTR(lbh);
+ if (!lbh)
+ RETURN(ERR_PTR(-ENOMEM));
+
+ bh = &lbh->lbh_super;
+ bh->lbt_flags = flags;
+ bh->lbt_max_count = max_count;
+
+ if (flags & BATCH_FL_RQSET) {
+ bh->lbt_rqset = ptlrpc_prep_set();
+ if (bh->lbt_rqset == NULL) {
+ OBD_FREE_PTR(lbh);
+ RETURN(ERR_PTR(-ENOMEM));
+ }
+ }
+
+ INIT_LIST_HEAD(&lbh->lbh_sub_batch_list);
+ RETURN(bh);
+}
+
+static int lmv_batch_stop(struct obd_export *exp, struct lu_batch *bh)
+{
+ struct lmv_batch *lbh;
+ struct lmvsub_batch *sub;
+ struct lmvsub_batch *tmp;
+ int rc = 0;
+
+ ENTRY;
+
+ lbh = container_of(bh, struct lmv_batch, lbh_super);
+ list_for_each_entry_safe(sub, tmp, &lbh->lbh_sub_batch_list,
+ sbh_sub_item) {
+ list_del(&sub->sbh_sub_item);
+ rc = md_batch_stop(sub->sbh_tgt->ltd_exp, sub->sbh_sub);
+ if (rc < 0) {
+ CERROR("%s: stop batch processing failed: rc = %d\n",
+ exp->exp_obd->obd_name, rc);
+ if (bh->lbt_result == 0)
+ bh->lbt_result = rc;
+ }
+ OBD_FREE_PTR(sub);
+ }
+
+ if (bh->lbt_flags & BATCH_FL_RQSET) {
+ rc = ptlrpc_set_wait(NULL, bh->lbt_rqset);
+ ptlrpc_set_destroy(bh->lbt_rqset);
+ }
+
+ OBD_FREE_PTR(lbh);
+ RETURN(rc);
+}
+
+static int lmv_batch_flush(struct obd_export *exp, struct lu_batch *bh,
+ bool wait)
+{
+ struct lmv_batch *lbh;
+ struct lmvsub_batch *sub;
+ int rc = 0;
+ int rc1;
+
+ ENTRY;
+
+ lbh = container_of(bh, struct lmv_batch, lbh_super);
+ list_for_each_entry(sub, &lbh->lbh_sub_batch_list, sbh_sub_item) {
+ rc1 = md_batch_flush(sub->sbh_tgt->ltd_exp, sub->sbh_sub, wait);
+ if (rc1 < 0) {
+ CERROR("%s: stop batch processing failed: rc = %d\n",
+ exp->exp_obd->obd_name, rc);
+ if (bh->lbt_result == 0)
+ bh->lbt_result = rc;
+
+ if (rc == 0)
+ rc = rc1;
+ }
+ }
+
+ if (wait && bh->lbt_flags & BATCH_FL_RQSET) {
+ rc1 = ptlrpc_set_wait(NULL, bh->lbt_rqset);
+ if (rc == 0)
+ rc = rc1;
+ }
+
+ RETURN(rc);
+}
+
+static inline struct lmv_tgt_desc *
+lmv_batch_locate_tgt(struct lmv_obd *lmv, struct md_op_item *item)
+{
+ struct lmv_tgt_desc *tgt;
+
+ switch (item->mop_opc) {
+ default:
+ tgt = ERR_PTR(-ENOTSUPP);
+ }
+
+ return tgt;
+}
+
+struct lu_batch *lmv_batch_lookup_sub(struct lmv_batch *lbh,
+ struct lmv_tgt_desc *tgt)
+{
+ struct lmvsub_batch *sub;
+
+ list_for_each_entry(sub, &lbh->lbh_sub_batch_list, sbh_sub_item) {
+ if (sub->sbh_tgt == tgt)
+ return sub->sbh_sub;
+ }
+
+ return NULL;
+}
+
+struct lu_batch *lmv_batch_get_sub(struct lmv_batch *lbh,
+ struct lmv_tgt_desc *tgt)
+{
+ struct lmvsub_batch *sbh;
+ struct lu_batch *child_bh;
+ struct lu_batch *bh;
+
+ ENTRY;
+
+ child_bh = lmv_batch_lookup_sub(lbh, tgt);
+ if (child_bh != NULL)
+ RETURN(child_bh);
+
+ OBD_ALLOC_PTR(sbh);
+ if (sbh == NULL)
+ RETURN(ERR_PTR(-ENOMEM));
+
+ INIT_LIST_HEAD(&sbh->sbh_sub_item);
+ sbh->sbh_tgt = tgt;
+
+ bh = &lbh->lbh_super;
+ child_bh = md_batch_create(tgt->ltd_exp, bh->lbt_flags,
+ bh->lbt_max_count);
+ if (IS_ERR(child_bh)) {
+ OBD_FREE_PTR(sbh);
+ RETURN(child_bh);
+ }
+
+ child_bh->lbt_rqset = bh->lbt_rqset;
+ sbh->sbh_sub = child_bh;
+ list_add(&sbh->sbh_sub_item, &lbh->lbh_sub_batch_list);
+ RETURN(child_bh);
+}
+
+static int lmv_batch_add(struct obd_export *exp, struct lu_batch *bh,
+ struct md_op_item *item)
+{
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_tgt_desc *tgt;
+ struct lmv_batch *lbh;
+ struct lu_batch *child_bh;
+ int rc;
+
+ ENTRY;
+
+ tgt = lmv_batch_locate_tgt(lmv, item);
+ if (IS_ERR(tgt))
+ RETURN(PTR_ERR(tgt));
+
+ lbh = container_of(bh, struct lmv_batch, lbh_super);
+ child_bh = lmv_batch_get_sub(lbh, tgt);
+ if (IS_ERR(child_bh))
+ RETURN(PTR_ERR(child_bh));
+
+ rc = md_batch_add(tgt->ltd_exp, child_bh, item);
+ RETURN(rc);
+}
+
static const struct obd_ops lmv_obd_ops = {
.o_owner = THIS_MODULE,
.o_setup = lmv_setup,
.m_get_fid_from_lsm = lmv_get_fid_from_lsm,
.m_unpackmd = lmv_unpackmd,
.m_rmfid = lmv_rmfid,
+ .m_batch_create = lmv_batch_create,
+ .m_batch_add = lmv_batch_add,
+ .m_batch_stop = lmv_batch_stop,
+ .m_batch_flush = lmv_batch_flush,
};
static int __init lmv_init(void)
mdc_lib.o \
mdc_locks.o \
mdc_changelog.o \
- mdc_dev.o
+ mdc_dev.o \
+ mdc_batch.o
mdc-objs-$(CONFIG_FS_POSIX_ACL) += mdc_acl.o
--- /dev/null
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.gnu.org/licenses/gpl-2.0.html
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2020, 2022, DDN Storage Corporation.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ */
+/*
+ * lustre/mdc/mdc_batch.c
+ *
+ * Batch Metadata Updating on the client (MDC)
+ *
+ * Author: Qian Yingjin <qian@ddn.com>
+ */
+
+#define DEBUG_SUBSYSTEM S_MDC
+
+#include <linux/module.h>
+#include <lustre_update.h>
+#include <lustre_acl.h>
+
+#include "mdc_internal.h"
+
+
+static md_update_pack_t mdc_update_packers[MD_OP_MAX];
+
+static object_update_interpret_t mdc_update_interpreters[MD_OP_MAX];
+
+int mdc_batch_add(struct obd_export *exp, struct lu_batch *bh,
+ struct md_op_item *item)
+{
+ enum md_item_opcode opc = item->mop_opc;
+
+ ENTRY;
+
+ if (opc >= MD_OP_MAX || mdc_update_packers[opc] == NULL ||
+ mdc_update_interpreters[opc] == NULL) {
+ CERROR("%s: unexpected opcode %d\n",
+ exp->exp_obd->obd_name, opc);
+ RETURN(-EFAULT);
+ }
+
+ RETURN(cli_batch_add(exp, bh, item, mdc_update_packers[opc],
+ mdc_update_interpreters[opc]));
+}
int mdc_intent_getattr_async(struct obd_export *exp, struct md_op_item *item);
+int mdc_batch_add(struct obd_export *exp, struct lu_batch *bh,
+ struct md_op_item *item);
+
enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
const struct lu_fid *fid, enum ldlm_type type,
union ldlm_policy_data *policy,
.m_intent_getattr_async = mdc_intent_getattr_async,
.m_revalidate_lock = mdc_revalidate_lock,
.m_rmfid = mdc_rmfid,
+ .m_batch_create = cli_batch_create,
+ .m_batch_stop = cli_batch_stop,
+ .m_batch_flush = cli_batch_flush,
+ .m_batch_add = mdc_batch_add,
};
dev_t mdc_changelog_dev;
mdt-objs += mdt_hsm_cdt_client.o
mdt-objs += mdt_hsm_cdt_agent.o
mdt-objs += mdt_coordinator.o
+mdt-objs += mdt_batch.o
@INCLUDE_RULES@
--- /dev/null
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.gnu.org/licenses/gpl-2.0.html
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2020, DDN Storage Corporation.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ */
+/*
+ * lustre/mdt/mdt_batch.c
+ *
+ * Batch Metadata Updating on the server (MDT)
+ *
+ * Author: Qian Yingjin <qian@ddn.com>
+ */
+
+#define DEBUG_SUBSYSTEM S_MDS
+
+#include <linux/module.h>
+
+#include <lustre_mds.h>
+#include "mdt_internal.h"
+
+static int mdt_batch_unpack(struct mdt_thread_info *info, __u32 opc)
+{
+ int rc = 0;
+
+ switch (opc) {
+ default:
+ rc = -EOPNOTSUPP;
+ CERROR("%s: Unexpected opcode %d: rc = %d\n",
+ mdt_obd_name(info->mti_mdt), opc, rc);
+ break;
+ }
+
+ RETURN(rc);
+}
+
+static int mdt_batch_pack_repmsg(struct mdt_thread_info *info)
+{
+ return 0;
+}
+
+/* Batch UpdaTe Request with a format known in advance */
+#define TGT_BUT_HDL(flags, opc, fn) \
+[opc - BUT_FIRST_OPC] = { \
+ .th_name = #opc, \
+ .th_fail_id = 0, \
+ .th_opc = opc, \
+ .th_flags = flags, \
+ .th_act = fn, \
+ .th_fmt = &RQF_ ## opc, \
+ .th_version = LUSTRE_MDS_VERSION, \
+ .th_hp = NULL, \
+}
+
+static struct tgt_handler mdt_batch_handlers[BUT_LAST_OPC];
+
+static struct tgt_handler *mdt_batch_handler_find(__u32 opc)
+{
+ struct tgt_handler *h;
+
+ h = NULL;
+ if (opc >= BUT_FIRST_OPC && opc < BUT_LAST_OPC) {
+ h = &mdt_batch_handlers[opc - BUT_FIRST_OPC];
+ LASSERTF(h->th_opc == opc, "opcode mismatch %d != %d\n",
+ h->th_opc, opc);
+ } else {
+ h = NULL; /* unsupported opc */
+ }
+ return h;
+}
+
+int mdt_batch(struct tgt_session_info *tsi)
+{
+ struct mdt_thread_info *info = tsi2mdt_info(tsi);
+ struct req_capsule *pill = &info->mti_sub_pill;
+ struct ptlrpc_request *req = tgt_ses_req(tsi);
+ struct but_update_header *buh;
+ struct but_update_buffer *bub = NULL;
+ struct batch_update_reply *reply = NULL;
+ struct ptlrpc_bulk_desc *desc = NULL;
+ struct lustre_msg *repmsg = NULL;
+ __u32 handled_update_count = 0;
+ __u32 update_buf_count;
+ __u32 packed_replen;
+ void **update_bufs;
+ int buh_size;
+ int rc;
+ int i;
+
+ ENTRY;
+
+ buh_size = req_capsule_get_size(&req->rq_pill, &RMF_BUT_HEADER,
+ RCL_CLIENT);
+ if (buh_size <= 0)
+ RETURN(err_serious(-EPROTO));
+
+ buh = req_capsule_client_get(&req->rq_pill, &RMF_BUT_HEADER);
+ if (buh == NULL)
+ RETURN(err_serious(-EPROTO));
+
+ if (buh->buh_magic != BUT_HEADER_MAGIC) {
+ CERROR("%s: invalid update header magic %x expect %x: "
+ "rc = %d\n", tgt_name(tsi->tsi_tgt), buh->buh_magic,
+ BUT_HEADER_MAGIC, -EPROTO);
+ RETURN(err_serious(-EPROTO));
+ }
+
+ update_buf_count = buh->buh_count;
+ if (update_buf_count == 0)
+ RETURN(err_serious(-EPROTO));
+
+ OBD_ALLOC_PTR_ARRAY(update_bufs, update_buf_count);
+ if (update_bufs == NULL)
+ RETURN(err_serious(-ENOMEM));
+
+ if (buh->buh_inline_length > 0) {
+ update_bufs[0] = buh->buh_inline_data;
+ } else {
+ struct but_update_buffer *tmp;
+ int page_count = 0;
+
+ bub = req_capsule_client_get(&req->rq_pill, &RMF_BUT_BUF);
+ if (bub == NULL)
+ GOTO(out, rc = err_serious(-EPROTO));
+
+ for (i = 0; i < update_buf_count; i++)
+ /* First *and* last might be partial pages, hence +1 */
+ page_count += DIV_ROUND_UP(bub[i].bub_size,
+ PAGE_SIZE) + 1;
+
+ desc = ptlrpc_prep_bulk_exp(req, page_count,
+ PTLRPC_BULK_OPS_COUNT,
+ PTLRPC_BULK_GET_SINK,
+ MDS_BULK_PORTAL,
+ &ptlrpc_bulk_kiov_nopin_ops);
+ if (desc == NULL)
+ GOTO(out, rc = err_serious(-ENOMEM));
+
+ tmp = bub;
+ for (i = 0; i < update_buf_count; i++, tmp++) {
+ if (tmp->bub_size >= OUT_MAXREQSIZE)
+ GOTO(out, rc = err_serious(-EPROTO));
+
+ OBD_ALLOC_LARGE(update_bufs[i], tmp->bub_size);
+ if (update_bufs[i] == NULL)
+ GOTO(out, rc = err_serious(-ENOMEM));
+
+ desc->bd_frag_ops->add_iov_frag(desc, update_bufs[i],
+ tmp->bub_size);
+ }
+
+ req->rq_bulk_write = 1;
+ rc = sptlrpc_svc_prep_bulk(req, desc);
+ if (rc != 0)
+ GOTO(out, rc = err_serious(rc));
+
+ rc = target_bulk_io(req->rq_export, desc);
+ if (rc < 0)
+ GOTO(out, rc = err_serious(rc));
+ }
+
+ req_capsule_set_size(&req->rq_pill, &RMF_BUT_REPLY, RCL_SERVER,
+ buh->buh_reply_size);
+ rc = req_capsule_server_pack(&req->rq_pill);
+ if (rc != 0) {
+ DEBUG_REQ(D_ERROR, req, "%s: Can't pack response: rc = %d\n",
+ tgt_name(tsi->tsi_tgt), rc);
+ GOTO(out, rc);
+ }
+
+ /* Prepare the update reply buffer */
+ reply = req_capsule_server_get(&req->rq_pill, &RMF_BUT_REPLY);
+ if (reply == NULL)
+ GOTO(out, rc = -EPROTO);
+
+ reply->burp_magic = BUT_REPLY_MAGIC;
+ packed_replen = sizeof(*reply);
+ info->mti_max_repsize = buh->buh_reply_size;
+ info->mti_batch_env = 1;
+ info->mti_pill = pill;
+
+ /* Walk through sub requests in the batch request to execute them. */
+ for (i = 0; i < update_buf_count; i++) {
+ struct batch_update_request *bur;
+ struct lustre_msg *reqmsg = NULL;
+ struct tgt_handler *h;
+ int update_count;
+ int j;
+
+ bur = update_bufs[i];
+ update_count = bur->burq_count;
+ for (j = 0; j < update_count; j++) {
+ __u32 replen;
+
+ reqmsg = batch_update_reqmsg_next(bur, reqmsg);
+ repmsg = batch_update_repmsg_next(reply, repmsg);
+
+ if (handled_update_count > buh->buh_update_count)
+ GOTO(out, rc = -EOVERFLOW);
+
+ LASSERT(reqmsg != NULL && repmsg != NULL);
+ LASSERTF(reqmsg->lm_magic == LUSTRE_MSG_MAGIC_V2,
+ "Invalid reqmsg magic %x expected %x\n",
+ reqmsg->lm_magic, LUSTRE_MSG_MAGIC_V2);
+
+ h = mdt_batch_handler_find(reqmsg->lm_opc);
+ if (unlikely(h == NULL)) {
+ CERROR("%s: unsupported opc: 0x%x\n",
+ tgt_name(tsi->tsi_tgt), reqmsg->lm_opc);
+ GOTO(out, rc = -ENOTSUPP);
+ }
+
+ /* TODO: Check resend case only for modifying RPC */
+
+ LASSERT(h->th_fmt != NULL);
+ req_capsule_subreq_init(pill, h->th_fmt, req,
+ reqmsg, repmsg, RCL_SERVER);
+
+ rc = mdt_batch_unpack(info, reqmsg->lm_opc);
+ if (rc) {
+ CERROR("%s: Can't unpack subreq, rc = %d\n",
+ mdt_obd_name(info->mti_mdt), rc);
+ GOTO(out, rc);
+ }
+
+ rc = mdt_batch_pack_repmsg(info);
+ if (rc)
+ GOTO(out, rc);
+
+ rc = h->th_act(tsi);
+ if (rc)
+ GOTO(out, rc);
+
+ repmsg->lm_result = rc;
+ mdt_thread_info_reset(info);
+ /*
+ * TODO: Check whether overflow reply buffer.
+ * Fix reply, shrink and/or grow reply buffers.
+ */
+ replen = lustre_packed_msg_size(repmsg);
+ info->mti_max_repsize -= replen;
+ packed_replen += replen;
+ handled_update_count++;
+ }
+ }
+
+ /*
+ * TODO: Grow/shrink the reply buffer.
+ */
+ CDEBUG(D_INFO, "reply size %u packed replen %u\n",
+ buh->buh_reply_size, packed_replen);
+ if (buh->buh_reply_size > packed_replen)
+ req_capsule_shrink(&req->rq_pill, &RMF_BUT_REPLY,
+ packed_replen, RCL_SERVER);
+out:
+ if (reply != NULL)
+ reply->burp_count = handled_update_count;
+
+ if (update_bufs != NULL) {
+ if (bub != NULL) {
+ for (i = 0; i < update_buf_count; i++, bub++) {
+ if (update_bufs[i] != NULL)
+ OBD_FREE_LARGE(update_bufs[i],
+ bub->bub_size);
+ }
+ }
+
+ OBD_FREE_PTR_ARRAY(update_bufs, update_buf_count);
+ }
+
+ if (desc != NULL)
+ ptlrpc_free_bulk(desc);
+
+ mdt_thread_info_fini(info);
+ RETURN(rc);
+}
+
if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
DEF_REP_MD_SIZE);
+
if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
req_capsule_set_size(pill, &RMF_LOGCOOKIES,
RCL_SERVER, 0);
LASSERT(!lustre_handle_is_used(&lh->mlh_pdo_lh));
}
+void mdt_thread_info_reset(struct mdt_thread_info *info)
+{
+ memset(&info->mti_attr, 0, sizeof(info->mti_attr));
+ info->mti_body = NULL;
+ info->mti_dlm_req = NULL;
+ info->mti_cross_ref = 0;
+ info->mti_opdata = 0;
+ info->mti_big_lmm_used = 0;
+ info->mti_big_acl_used = 0;
+ info->mti_som_strict = 0;
+
+ info->mti_spec.no_create = 0;
+ info->mti_spec.sp_rm_entry = 0;
+ info->mti_spec.sp_permitted = 0;
+
+ info->mti_spec.u.sp_ea.eadata = NULL;
+ info->mti_spec.u.sp_ea.eadatalen = 0;
+
+ if (info->mti_batch_env && info->mti_object != NULL) {
+ mdt_object_put(info->mti_env, info->mti_object);
+ info->mti_object = NULL;
+ }
+}
+
/*
* Initialize fields of struct mdt_thread_info. Other fields are left in
* uninitialized state, because it's too expensive to zero out whole
info->mti_mdt = NULL;
info->mti_env = req->rq_svc_thread->t_env;
info->mti_transno = lustre_msg_get_transno(req->rq_reqmsg);
-
- memset(&info->mti_attr, 0, sizeof(info->mti_attr));
info->mti_big_buf = LU_BUF_NULL;
- info->mti_body = NULL;
- info->mti_object = NULL;
- info->mti_dlm_req = NULL;
- info->mti_cross_ref = 0;
- info->mti_opdata = 0;
- info->mti_big_lmm_used = 0;
- info->mti_big_acl_used = 0;
- info->mti_som_strict = 0;
-
- info->mti_spec.no_create = 0;
- info->mti_spec.sp_rm_entry = 0;
- info->mti_spec.sp_permitted = 0;
+ info->mti_max_repsize = 0;
+ info->mti_batch_env = 0;
+ info->mti_object = NULL;
- info->mti_spec.u.sp_ea.eadata = NULL;
- info->mti_spec.u.sp_ea.eadatalen = 0;
+ mdt_thread_info_reset(info);
}
void mdt_thread_info_fini(struct mdt_thread_info *info)
if (rc)
rc = err_serious(rc);
}
+
mdt_thread_info_fini(info);
RETURN(rc);
}
MDS_SWAP_LAYOUTS,
mdt_swap_layouts),
TGT_MDT_HDL(IS_MUTABLE, MDS_RMFID, mdt_rmfid),
+TGT_MDT_HDL(IS_MUTABLE, MDS_BATCH, mdt_batch),
};
static struct tgt_handler mdt_io_ops[] = {
*/
struct req_capsule *mti_pill;
+ /*
+ * SUB request pill in a batch request.
+ */
+ struct req_capsule mti_sub_pill;
+
+ /*
+ * Max left reply buffer size for the batch request.
+ */
+ __u32 mti_max_repsize;
+
/* although we have export in req, there are cases when it is not
* available, e.g. closing files upon export destroy */
struct obd_export *mti_exp;
/* big_lmm buffer was used and must be used in reply */
mti_big_lmm_used:1,
mti_big_acl_used:1,
- mti_som_strict:1;
+ mti_som_strict:1,
+ /* Batch processing environment */
+ mti_batch_env:1;
/* opdata for mdt_reint_open(), has the same as
* ldlm_reply:lock_policy_res1. mdt_update_last_rcvd() stores this
struct mdt_object *p,
const struct lu_name *lname,
struct lu_fid *fid, int idx);
+void mdt_thread_info_reset(struct mdt_thread_info *info);
void mdt_thread_info_init(struct ptlrpc_request *req,
struct mdt_thread_info *mti);
void mdt_thread_info_fini(struct mdt_thread_info *mti);
/* mdt/mdt_recovery.c */
__u64 mdt_req_from_lrd(struct ptlrpc_request *req, struct tg_reply_data *trd);
+/* mdt/mdt_batch.c */
+int mdt_batch(struct tgt_session_info *tsi);
+
/* mdt/mdt_hsm.c */
int mdt_hsm_state_get(struct tgt_session_info *tsi);
int mdt_hsm_state_set(struct tgt_session_info *tsi);
ptlrpc_objs += pers.o lproc_ptlrpc.o wiretest.o layout.o
ptlrpc_objs += sec.o sec_ctx.o sec_bulk.o sec_gc.o sec_config.o sec_lproc.o
ptlrpc_objs += sec_null.o sec_plain.o nrs.o nrs_fifo.o nrs_delay.o heap.o
-ptlrpc_objs += errno.o
+ptlrpc_objs += errno.o batch.o
nrs_server_objs := nrs_crr.o nrs_orr.o nrs_tbf.o
--- /dev/null
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.gnu.org/licenses/gpl-2.0.html
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2020, 2022, DDN/Whamcloud Storage Corporation.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ */
+/*
+ * lustre/ptlrpc/batch.c
+ *
+ * Batch Metadata Updating on the client
+ *
+ * Author: Qian Yingjin <qian@ddn.com>
+ */
+
+#define DEBUG_SUBSYSTEM S_MDC
+
+#include <linux/module.h>
+#include <lustre_update.h>
+#include <obd.h>
+
+struct batch_update_buffer {
+ struct batch_update_request *bub_req;
+ size_t bub_size;
+ size_t bub_end;
+ struct list_head bub_item;
+};
+
+struct batch_update_args {
+ struct batch_update_head *ba_head;
+};
+
+/**
+ * Prepare inline update request
+ *
+ * Prepare BUT update ptlrpc inline request, and the request usuanlly includes
+ * one update buffer, which does not need bulk transfer.
+ */
+static int batch_prep_inline_update_req(struct batch_update_head *head,
+ struct ptlrpc_request *req,
+ int repsize)
+{
+ struct batch_update_buffer *buf;
+ struct but_update_header *buh;
+ int rc;
+
+ buf = list_entry(head->buh_buf_list.next,
+ struct batch_update_buffer, bub_item);
+ req_capsule_set_size(&req->rq_pill, &RMF_BUT_HEADER, RCL_CLIENT,
+ buf->bub_end + sizeof(*buh));
+
+ rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_BATCH);
+ if (rc != 0)
+ RETURN(rc);
+
+ buh = req_capsule_client_get(&req->rq_pill, &RMF_BUT_HEADER);
+ buh->buh_magic = BUT_HEADER_MAGIC;
+ buh->buh_count = 1;
+ buh->buh_inline_length = buf->bub_end;
+ buh->buh_reply_size = repsize;
+ buh->buh_update_count = head->buh_update_count;
+
+ memcpy(buh->buh_inline_data, buf->bub_req, buf->bub_end);
+
+ req_capsule_set_size(&req->rq_pill, &RMF_BUT_REPLY,
+ RCL_SERVER, repsize);
+
+ ptlrpc_request_set_replen(req);
+ req->rq_request_portal = OUT_PORTAL;
+ req->rq_reply_portal = OSC_REPLY_PORTAL;
+
+ RETURN(rc);
+}
+
+static int batch_prep_update_req(struct batch_update_head *head,
+ struct ptlrpc_request **reqp)
+{
+ struct ptlrpc_request *req;
+ struct ptlrpc_bulk_desc *desc;
+ struct batch_update_buffer *buf;
+ struct but_update_header *buh;
+ struct but_update_buffer *bub;
+ int page_count = 0;
+ int total = 0;
+ int repsize;
+ int rc;
+
+ ENTRY;
+
+ repsize = head->buh_repsize +
+ cfs_size_round(offsetof(struct batch_update_reply,
+ burp_repmsg[0]));
+ if (repsize < OUT_UPDATE_REPLY_SIZE)
+ repsize = OUT_UPDATE_REPLY_SIZE;
+
+ LASSERT(head->buh_buf_count > 0);
+
+ req = ptlrpc_request_alloc(class_exp2cliimp(head->buh_exp),
+ &RQF_MDS_BATCH);
+ if (req == NULL)
+ RETURN(-ENOMEM);
+
+ if (head->buh_buf_count == 1) {
+ buf = list_entry(head->buh_buf_list.next,
+ struct batch_update_buffer, bub_item);
+
+ /* Check whether it can be packed inline */
+ if (buf->bub_end + sizeof(struct but_update_header) <
+ OUT_UPDATE_MAX_INLINE_SIZE) {
+ rc = batch_prep_inline_update_req(head, req, repsize);
+ if (rc == 0)
+ *reqp = req;
+ GOTO(out_req, rc);
+ }
+ }
+
+ req_capsule_set_size(&req->rq_pill, &RMF_BUT_HEADER, RCL_CLIENT,
+ sizeof(struct but_update_header));
+ req_capsule_set_size(&req->rq_pill, &RMF_BUT_BUF, RCL_CLIENT,
+ head->buh_buf_count * sizeof(*bub));
+
+ rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_BATCH);
+ if (rc != 0)
+ GOTO(out_req, rc);
+
+ buh = req_capsule_client_get(&req->rq_pill, &RMF_BUT_HEADER);
+ buh->buh_magic = BUT_HEADER_MAGIC;
+ buh->buh_count = head->buh_buf_count;
+ buh->buh_inline_length = 0;
+ buh->buh_reply_size = repsize;
+ buh->buh_update_count = head->buh_update_count;
+ bub = req_capsule_client_get(&req->rq_pill, &RMF_BUT_BUF);
+ list_for_each_entry(buf, &head->buh_buf_list, bub_item) {
+ bub->bub_size = buf->bub_size;
+ bub++;
+ /* First *and* last might be partial pages, hence +1 */
+ page_count += DIV_ROUND_UP(buf->bub_size, PAGE_SIZE) + 1;
+ }
+
+ req->rq_bulk_write = 1;
+ desc = ptlrpc_prep_bulk_imp(req, page_count,
+ MD_MAX_BRW_SIZE >> LNET_MTU_BITS,
+ PTLRPC_BULK_GET_SOURCE,
+ MDS_BULK_PORTAL,
+ &ptlrpc_bulk_kiov_nopin_ops);
+ if (desc == NULL)
+ GOTO(out_req, rc = -ENOMEM);
+
+ list_for_each_entry(buf, &head->buh_buf_list, bub_item) {
+ desc->bd_frag_ops->add_iov_frag(desc, buf->bub_req,
+ buf->bub_size);
+ total += buf->bub_size;
+ }
+ CDEBUG(D_OTHER, "Total %d in %u\n", total, head->buh_update_count);
+
+ req_capsule_set_size(&req->rq_pill, &RMF_BUT_REPLY,
+ RCL_SERVER, repsize);
+
+ ptlrpc_request_set_replen(req);
+ req->rq_request_portal = OUT_PORTAL;
+ req->rq_reply_portal = OSC_REPLY_PORTAL;
+ *reqp = req;
+
+out_req:
+ if (rc < 0)
+ ptlrpc_req_finished(req);
+
+ RETURN(rc);
+}
+
+static struct batch_update_buffer *
+current_batch_update_buffer(struct batch_update_head *head)
+{
+ if (list_empty(&head->buh_buf_list))
+ return NULL;
+
+ return list_entry(head->buh_buf_list.prev, struct batch_update_buffer,
+ bub_item);
+}
+
+static int batch_update_buffer_create(struct batch_update_head *head,
+ size_t size)
+{
+ struct batch_update_buffer *buf;
+ struct batch_update_request *bur;
+
+ OBD_ALLOC_PTR(buf);
+ if (buf == NULL)
+ return -ENOMEM;
+
+ LASSERT(size > 0);
+ size = round_up(size, PAGE_SIZE);
+ OBD_ALLOC_LARGE(bur, size);
+ if (bur == NULL) {
+ OBD_FREE_PTR(buf);
+ return -ENOMEM;
+ }
+
+ bur->burq_magic = BUT_REQUEST_MAGIC;
+ bur->burq_count = 0;
+ buf->bub_req = bur;
+ buf->bub_size = size;
+ buf->bub_end = sizeof(*bur);
+ INIT_LIST_HEAD(&buf->bub_item);
+ list_add_tail(&buf->bub_item, &head->buh_buf_list);
+ head->buh_buf_count++;
+
+ return 0;
+}
+
+/**
+ * Destroy an @object_update_callback.
+ */
+static void object_update_callback_fini(struct object_update_callback *ouc)
+{
+ LASSERT(list_empty(&ouc->ouc_item));
+
+ OBD_FREE_PTR(ouc);
+}
+
+/**
+ * Insert an @object_update_callback into the the @batch_update_head.
+ *
+ * Usually each update in @batch_update_head will have one correspondent
+ * callback, and these callbacks will be called in ->rq_interpret_reply.
+ */
+static int
+batch_insert_update_callback(struct batch_update_head *head, void *data,
+ object_update_interpret_t interpret)
+{
+ struct object_update_callback *ouc;
+
+ OBD_ALLOC_PTR(ouc);
+ if (ouc == NULL)
+ return -ENOMEM;
+
+ INIT_LIST_HEAD(&ouc->ouc_item);
+ ouc->ouc_interpret = interpret;
+ ouc->ouc_head = head;
+ ouc->ouc_data = data;
+ list_add_tail(&ouc->ouc_item, &head->buh_cb_list);
+
+ return 0;
+}
+
+/**
+ * Allocate and initialize batch update request.
+ *
+ * @batch_update_head is being used to track updates being executed on
+ * this OBD device. The update buffer will be 4K initially, and increased
+ * if needed.
+ */
+static struct batch_update_head *
+batch_update_request_create(struct obd_export *exp, struct lu_batch *bh)
+{
+ struct batch_update_head *head;
+ int rc;
+
+ OBD_ALLOC_PTR(head);
+ if (head == NULL)
+ return ERR_PTR(-ENOMEM);
+
+ INIT_LIST_HEAD(&head->buh_cb_list);
+ INIT_LIST_HEAD(&head->buh_buf_list);
+ head->buh_exp = exp;
+ head->buh_batch = bh;
+
+ rc = batch_update_buffer_create(head, PAGE_SIZE);
+ if (rc != 0) {
+ OBD_FREE_PTR(head);
+ RETURN(ERR_PTR(rc));
+ }
+
+ return head;
+}
+
+static void batch_update_request_destroy(struct batch_update_head *head)
+{
+ struct batch_update_buffer *bub, *tmp;
+
+ if (head == NULL)
+ return;
+
+ list_for_each_entry_safe(bub, tmp, &head->buh_buf_list, bub_item) {
+ list_del(&bub->bub_item);
+ if (bub->bub_req)
+ OBD_FREE_LARGE(bub->bub_req, bub->bub_size);
+ OBD_FREE_PTR(bub);
+ }
+
+ OBD_FREE_PTR(head);
+}
+
+static int batch_update_request_fini(struct batch_update_head *head,
+ struct ptlrpc_request *req,
+ struct batch_update_reply *reply, int rc)
+{
+ struct object_update_callback *ouc, *next;
+ struct lustre_msg *repmsg = NULL;
+ int count = 0;
+ int index = 0;
+
+ ENTRY;
+
+ if (reply)
+ count = reply->burp_count;
+
+ list_for_each_entry_safe(ouc, next, &head->buh_cb_list, ouc_item) {
+ int rc1 = 0;
+
+ list_del_init(&ouc->ouc_item);
+
+ /*
+ * The peer may only have handled some requests (indicated by
+ * @count) in the packaged OUT PRC, we can only get results
+ * for the handled part.
+ */
+ if (index < count) {
+ repmsg = batch_update_repmsg_next(reply, repmsg);
+ if (repmsg == NULL)
+ rc1 = -EPROTO;
+ else
+ rc1 = repmsg->lm_result;
+ } else {
+ /*
+ * The peer did not handle these request, let us return
+ * -ECANCELED to the update interpreter for now.
+ */
+ repmsg = NULL;
+ rc1 = -ECANCELED;
+ }
+
+ if (ouc->ouc_interpret != NULL)
+ ouc->ouc_interpret(req, repmsg, ouc, rc1);
+
+ object_update_callback_fini(ouc);
+ if (rc == 0 && rc1 < 0)
+ rc = rc1;
+ }
+
+ batch_update_request_destroy(head);
+
+ RETURN(rc);
+}
+
+static int batch_update_interpret(const struct lu_env *env,
+ struct ptlrpc_request *req,
+ void *args, int rc)
+{
+ struct batch_update_args *aa = (struct batch_update_args *)args;
+ struct batch_update_reply *reply = NULL;
+
+ ENTRY;
+
+ if (aa->ba_head == NULL)
+ RETURN(0);
+
+ /* Unpack the results from the reply message. */
+ if (req->rq_repmsg != NULL && req->rq_replied) {
+ reply = req_capsule_server_sized_get(&req->rq_pill,
+ &RMF_BUT_REPLY,
+ sizeof(*reply));
+ if ((reply == NULL ||
+ reply->burp_magic != BUT_REPLY_MAGIC) && rc == 0)
+ rc = -EPROTO;
+ }
+
+ rc = batch_update_request_fini(aa->ba_head, req, reply, rc);
+
+ RETURN(rc);
+}
+
+static int batch_send_update_req(const struct lu_env *env,
+ struct batch_update_head *head)
+{
+ struct lu_batch *bh;
+ struct ptlrpc_request *req = NULL;
+ struct batch_update_args *aa;
+ int rc;
+
+ ENTRY;
+
+ if (head == NULL)
+ RETURN(0);
+
+ bh = head->buh_batch;
+ rc = batch_prep_update_req(head, &req);
+ if (rc) {
+ rc = batch_update_request_fini(head, NULL, NULL, rc);
+ RETURN(rc);
+ }
+
+ aa = ptlrpc_req_async_args(aa, req);
+ aa->ba_head = head;
+ req->rq_interpret_reply = batch_update_interpret;
+
+ if (bh->lbt_flags & BATCH_FL_SYNC) {
+ rc = ptlrpc_queue_wait(req);
+ } else {
+ if ((bh->lbt_flags & (BATCH_FL_RDONLY | BATCH_FL_RQSET)) ==
+ BATCH_FL_RDONLY) {
+ ptlrpcd_add_req(req);
+ } else if (bh->lbt_flags & BATCH_FL_RQSET) {
+ ptlrpc_set_add_req(bh->lbt_rqset, req);
+ ptlrpc_check_set(env, bh->lbt_rqset);
+ } else {
+ ptlrpcd_add_req(req);
+ }
+ req = NULL;
+ }
+
+ if (req != NULL)
+ ptlrpc_req_finished(req);
+
+ RETURN(rc);
+}
+
+static int batch_update_request_add(struct batch_update_head **headp,
+ struct md_op_item *item,
+ md_update_pack_t packer,
+ object_update_interpret_t interpreter)
+{
+ struct batch_update_head *head = *headp;
+ struct lu_batch *bh = head->buh_batch;
+ struct batch_update_buffer *buf;
+ struct lustre_msg *reqmsg;
+ size_t max_len;
+ int rc;
+
+ ENTRY;
+
+ for (; ;) {
+ buf = current_batch_update_buffer(head);
+ LASSERT(buf != NULL);
+ max_len = buf->bub_size - buf->bub_end;
+ reqmsg = (struct lustre_msg *)((char *)buf->bub_req +
+ buf->bub_end);
+ rc = packer(head, reqmsg, &max_len, item);
+ if (rc == -E2BIG) {
+ int rc2;
+
+ /* Create new batch object update buffer */
+ rc2 = batch_update_buffer_create(head,
+ max_len + offsetof(struct batch_update_request,
+ burq_reqmsg[0]) + 1);
+ if (rc2 != 0) {
+ rc = rc2;
+ break;
+ }
+ } else {
+ if (rc == 0) {
+ buf->bub_end += max_len;
+ buf->bub_req->burq_count++;
+ head->buh_update_count++;
+ head->buh_repsize += reqmsg->lm_repsize;
+ }
+ break;
+ }
+ }
+
+ if (rc)
+ GOTO(out, rc);
+
+ rc = batch_insert_update_callback(head, item, interpreter);
+ if (rc)
+ GOTO(out, rc);
+
+ /* Unplug the batch queue if accumulated enough update requests. */
+ if (bh->lbt_max_count && head->buh_update_count >= bh->lbt_max_count) {
+ rc = batch_send_update_req(NULL, head);
+ *headp = NULL;
+ }
+out:
+ if (rc) {
+ batch_update_request_destroy(head);
+ *headp = NULL;
+ }
+
+ RETURN(rc);
+}
+
+struct lu_batch *cli_batch_create(struct obd_export *exp,
+ enum lu_batch_flags flags, __u32 max_count)
+{
+ struct cli_batch *cbh;
+ struct lu_batch *bh;
+
+ ENTRY;
+
+ OBD_ALLOC_PTR(cbh);
+ if (!cbh)
+ RETURN(ERR_PTR(-ENOMEM));
+
+ bh = &cbh->cbh_super;
+ bh->lbt_result = 0;
+ bh->lbt_flags = flags;
+ bh->lbt_max_count = max_count;
+
+ cbh->cbh_head = batch_update_request_create(exp, bh);
+ if (IS_ERR(cbh->cbh_head)) {
+ bh = (struct lu_batch *)cbh->cbh_head;
+ OBD_FREE_PTR(cbh);
+ }
+
+ RETURN(bh);
+}
+EXPORT_SYMBOL(cli_batch_create);
+
+int cli_batch_stop(struct obd_export *exp, struct lu_batch *bh)
+{
+ struct cli_batch *cbh;
+ int rc;
+
+ ENTRY;
+
+ cbh = container_of(bh, struct cli_batch, cbh_super);
+ rc = batch_send_update_req(NULL, cbh->cbh_head);
+
+ OBD_FREE_PTR(cbh);
+ RETURN(rc);
+}
+EXPORT_SYMBOL(cli_batch_stop);
+
+int cli_batch_flush(struct obd_export *exp, struct lu_batch *bh, bool wait)
+{
+ struct cli_batch *cbh;
+ int rc;
+
+ ENTRY;
+
+ cbh = container_of(bh, struct cli_batch, cbh_super);
+ if (cbh->cbh_head == NULL)
+ RETURN(0);
+
+ rc = batch_send_update_req(NULL, cbh->cbh_head);
+ cbh->cbh_head = NULL;
+
+ RETURN(rc);
+}
+EXPORT_SYMBOL(cli_batch_flush);
+
+int cli_batch_add(struct obd_export *exp, struct lu_batch *bh,
+ struct md_op_item *item, md_update_pack_t packer,
+ object_update_interpret_t interpreter)
+{
+ struct cli_batch *cbh;
+ int rc;
+
+ ENTRY;
+
+ cbh = container_of(bh, struct cli_batch, cbh_super);
+ if (cbh->cbh_head == NULL) {
+ cbh->cbh_head = batch_update_request_create(exp, bh);
+ if (IS_ERR(cbh->cbh_head))
+ RETURN(PTR_ERR(cbh->cbh_head));
+ }
+
+ rc = batch_update_request_add(&cbh->cbh_head, item,
+ packer, interpreter);
+
+ RETURN(rc);
+}
+EXPORT_SYMBOL(cli_batch_add);
&RMF_CAPA2
};
+static const struct req_msg_field *mds_batch_client[] = {
+ &RMF_PTLRPC_BODY,
+ &RMF_BUT_HEADER,
+ &RMF_BUT_BUF,
+};
+
+static const struct req_msg_field *mds_batch_server[] = {
+ &RMF_PTLRPC_BODY,
+ &RMF_BUT_REPLY,
+};
+
static const struct req_msg_field *llog_origin_handle_create_client[] = {
&RMF_PTLRPC_BODY,
&RMF_LLOGD_BODY,
&RQF_CONNECT,
&RQF_LFSCK_NOTIFY,
&RQF_LFSCK_QUERY,
+ &RQF_MDS_BATCH,
};
struct req_msg_field {
lustre_swab_ladvise, NULL);
EXPORT_SYMBOL(RMF_OST_LADVISE);
+struct req_msg_field RMF_BUT_REPLY =
+ DEFINE_MSGF("batch_update_reply", 0, -1,
+ lustre_swab_batch_update_reply, NULL);
+EXPORT_SYMBOL(RMF_BUT_REPLY);
+
+struct req_msg_field RMF_BUT_HEADER = DEFINE_MSGF("but_update_header", 0,
+ -1, lustre_swab_but_update_header, NULL);
+EXPORT_SYMBOL(RMF_BUT_HEADER);
+
+struct req_msg_field RMF_BUT_BUF = DEFINE_MSGF("but_update_buf",
+ RMF_F_STRUCT_ARRAY, sizeof(struct but_update_buffer),
+ lustre_swab_but_update_buffer, NULL);
+EXPORT_SYMBOL(RMF_BUT_BUF);
+
/*
* Request formats.
*/
mds_getinfo_server);
EXPORT_SYMBOL(RQF_MDS_GET_INFO);
+struct req_format RQF_MDS_BATCH =
+ DEFINE_REQ_FMT0("MDS_BATCH", mds_batch_client,
+ mds_batch_server);
+EXPORT_SYMBOL(RQF_MDS_BATCH);
+
struct req_format RQF_LDLM_ENQUEUE =
DEFINE_REQ_FMT0("LDLM_ENQUEUE",
ldlm_enqueue_client, ldlm_enqueue_lvb_server);
LASSERT(fmt != NULL);
count = req_capsule_filled_sizes(pill, RCL_SERVER);
- rc = lustre_pack_reply(pill->rc_req, count,
- pill->rc_area[RCL_SERVER], NULL);
- if (rc != 0) {
- DEBUG_REQ(D_ERROR, pill->rc_req,
- "Cannot pack %d fields in format '%s'",
- count, fmt->rf_name);
+ if (req_capsule_ptlreq(pill)) {
+ rc = lustre_pack_reply(pill->rc_req, count,
+ pill->rc_area[RCL_SERVER], NULL);
+ if (rc != 0) {
+ DEBUG_REQ(D_ERROR, pill->rc_req,
+ "Cannot pack %d fields in format '%s'",
+ count, fmt->rf_name);
+ }
+ } else { /* SUB request */
+ __u32 msg_len;
+
+ msg_len = lustre_msg_size_v2(count, pill->rc_area[RCL_SERVER]);
+ if (msg_len > pill->rc_reqmsg->lm_repsize) {
+ /* TODO: Check whether there is enough buffer size */
+ CDEBUG(D_INFO,
+ "Overflow pack %d fields in format '%s' for "
+ "the SUB request with message len %u:%u\n",
+ count, fmt->rf_name, msg_len,
+ pill->rc_reqmsg->lm_repsize);
+ }
+
+ rc = 0;
+ lustre_init_msg_v2(pill->rc_repmsg, count,
+ pill->rc_area[RCL_SERVER], NULL);
}
+
return rc;
}
EXPORT_SYMBOL(req_capsule_server_pack);
+int req_capsule_client_pack(struct req_capsule *pill)
+{
+ const struct req_format *fmt;
+ int count;
+ int rc = 0;
+
+ LASSERT(pill->rc_loc == RCL_CLIENT);
+ fmt = pill->rc_fmt;
+ LASSERT(fmt != NULL);
+
+ count = req_capsule_filled_sizes(pill, RCL_CLIENT);
+ if (req_capsule_ptlreq(pill)) {
+ struct ptlrpc_request *req = pill->rc_req;
+
+ rc = lustre_pack_request(req, req->rq_import->imp_msg_magic,
+ count, pill->rc_area[RCL_CLIENT],
+ NULL);
+ } else {
+ /* Sub request in a batch PTLRPC request */
+ lustre_init_msg_v2(pill->rc_reqmsg, count,
+ pill->rc_area[RCL_CLIENT], NULL);
+ }
+ return rc;
+}
+EXPORT_SYMBOL(req_capsule_client_pack);
+
/**
* Returns the PTLRPC request or reply (\a loc) buffer offset of a \a pill
* corresponding to the given RMF (\a field).
value = getter(msg, offset, len);
if (value == NULL) {
+ LASSERT(pill->rc_req != NULL);
DEBUG_REQ(D_ERROR, pill->rc_req,
- "Wrong buffer for field '%s' (%u of %u) in format '%s', %u vs. %u (%s)",
- field->rmf_name, offset, lustre_msg_bufcount(msg),
- fmt->rf_name, lustre_msg_buflen(msg, offset), len,
- rcl_names[loc]);
+ "Wrong buffer for field '%s' (%u of %u) in format '%s', %u vs. %u (%s)",
+ field->rmf_name, offset, lustre_msg_bufcount(msg),
+ fmt->rf_name, lustre_msg_buflen(msg, offset), len,
+ rcl_names[loc]);
} else {
swabber_dumper_helper(pill, field, loc, offset, value, len,
dump, swabber);
*/
__u32 req_capsule_msg_size(struct req_capsule *pill, enum req_location loc)
{
- return lustre_msg_size(pill->rc_req->rq_import->imp_msg_magic,
- pill->rc_fmt->rf_fields[loc].nr,
- pill->rc_area[loc]);
+ if (req_capsule_ptlreq(pill)) {
+ return lustre_msg_size(pill->rc_req->rq_import->imp_msg_magic,
+ pill->rc_fmt->rf_fields[loc].nr,
+ pill->rc_area[loc]);
+ } else { /* SUB request in a batch request */
+ int count;
+
+ count = req_capsule_filled_sizes(pill, loc);
+ return lustre_msg_size_v2(count, pill->rc_area[loc]);
+ }
}
+EXPORT_SYMBOL(req_capsule_msg_size);
/**
* While req_capsule_msg_size() computes the size of a PTLRPC request or reply
LASSERTF(newlen <= len, "%s:%s, oldlen=%u, newlen=%u\n",
fmt->rf_name, field->rmf_name, len, newlen);
+ len = lustre_shrink_msg(msg, offset, newlen, 1);
if (loc == RCL_CLIENT) {
- pill->rc_req->rq_reqlen = lustre_shrink_msg(msg, offset, newlen,
- 1);
+ if (req_capsule_ptlreq(pill))
+ pill->rc_req->rq_reqlen = len;
} else {
- pill->rc_req->rq_replen = lustre_shrink_msg(msg, offset, newlen,
- 1);
/* update also field size in reply lenghts arrays for possible
* reply re-pack due to req_capsule_server_grow() call.
*/
req_capsule_set_size(pill, field, loc, newlen);
+ if (req_capsule_ptlreq(pill))
+ pill->rc_req->rq_replen = len;
}
}
EXPORT_SYMBOL(req_capsule_shrink);
const char *sepol = NULL;
const char *nm_sepol = NULL;
+ if (req_capsule_subreq(pill))
+ return 0;
+
if (!pill->rc_req)
return -EPROTO;
}
EXPORT_SYMBOL(req_check_sepol);
#endif
+
+void req_capsule_subreq_init(struct req_capsule *pill,
+ const struct req_format *fmt,
+ struct ptlrpc_request *req,
+ struct lustre_msg *reqmsg,
+ struct lustre_msg *repmsg,
+ enum req_location loc)
+{
+ req_capsule_init(pill, req, loc);
+ req_capsule_set(pill, fmt);
+ pill->rc_reqmsg = reqmsg;
+ pill->rc_repmsg = repmsg;
+}
+EXPORT_SYMBOL(req_capsule_subreq_init);
+
+void req_capsule_set_replen(struct req_capsule *pill)
+{
+ if (req_capsule_ptlreq(pill)) {
+ ptlrpc_request_set_replen(pill->rc_req);
+ } else { /* SUB request in a batch request */
+ int count;
+
+ count = req_capsule_filled_sizes(pill, RCL_SERVER);
+ pill->rc_reqmsg->lm_repsize =
+ lustre_msg_size_v2(count,
+ pill->rc_area[RCL_SERVER]);
+ }
+}
+EXPORT_SYMBOL(req_capsule_set_replen);
{ MDS_HSM_CT_UNREGISTER, "mds_hsm_ct_unregister" },
{ MDS_SWAP_LAYOUTS, "mds_swap_layouts" },
{ MDS_RMFID, "mds_rmfid" },
+ { MDS_BATCH, "mds_batch" },
{ LDLM_ENQUEUE, "ldlm_enqueue" },
{ LDLM_CONVERT, "ldlm_convert" },
{ LDLM_CANCEL, "ldlm_cancel" },
__swab32s(&m->lm_repsize);
__swab32s(&m->lm_cksum);
__swab32s(&m->lm_flags);
- BUILD_BUG_ON(offsetof(typeof(*m), lm_padding_2) == 0);
+ __swab32s(&m->lm_opc);
BUILD_BUG_ON(offsetof(typeof(*m), lm_padding_3) == 0);
}
__swab32s(&hr->hr_data_len);
}
+/* TODO: swab each sub request message */
+void lustre_swab_batch_update_request(struct batch_update_request *bur)
+{
+ __swab32s(&bur->burq_magic);
+ __swab16s(&bur->burq_count);
+ __swab16s(&bur->burq_padding);
+}
+
+/* TODO: swab each sub reply message. */
+void lustre_swab_batch_update_reply(struct batch_update_reply *bur)
+{
+ __swab32s(&bur->burp_magic);
+ __swab16s(&bur->burp_count);
+ __swab16s(&bur->burp_padding);
+}
+
+void lustre_swab_but_update_header(struct but_update_header *buh)
+{
+ __swab32s(&buh->buh_magic);
+ __swab32s(&buh->buh_count);
+ __swab32s(&buh->buh_inline_length);
+ __swab32s(&buh->buh_reply_size);
+ __swab32s(&buh->buh_update_count);
+}
+EXPORT_SYMBOL(lustre_swab_but_update_header);
+
+void lustre_swab_but_update_buffer(struct but_update_buffer *bub)
+{
+ __swab32s(&bub->bub_size);
+ __swab32s(&bub->bub_padding);
+}
+EXPORT_SYMBOL(lustre_swab_but_update_buffer);
+
void lustre_swab_swap_layouts(struct mdc_swap_layouts *msl)
{
__swab64s(&msl->msl_flags);
(long long)MDS_SWAP_LAYOUTS);
LASSERTF(MDS_RMFID == 62, "found %lld\n",
(long long)MDS_RMFID);
- LASSERTF(MDS_LAST_OPC == 63, "found %lld\n",
+ LASSERTF(MDS_BATCH == 63, "found %lld\n",
+ (long long)MDS_BATCH);
+ LASSERTF(MDS_LAST_OPC == 64, "found %lld\n",
(long long)MDS_LAST_OPC);
LASSERTF(REINT_SETATTR == 1, "found %lld\n",
(long long)REINT_SETATTR);
(long long)(int)offsetof(struct lustre_msg_v2, lm_flags));
LASSERTF((int)sizeof(((struct lustre_msg_v2 *)0)->lm_flags) == 4, "found %lld\n",
(long long)(int)sizeof(((struct lustre_msg_v2 *)0)->lm_flags));
- LASSERTF((int)offsetof(struct lustre_msg_v2, lm_padding_2) == 24, "found %lld\n",
- (long long)(int)offsetof(struct lustre_msg_v2, lm_padding_2));
- LASSERTF((int)sizeof(((struct lustre_msg_v2 *)0)->lm_padding_2) == 4, "found %lld\n",
- (long long)(int)sizeof(((struct lustre_msg_v2 *)0)->lm_padding_2));
+ LASSERTF((int)offsetof(struct lustre_msg_v2, lm_opc) == 24, "found %lld\n",
+ (long long)(int)offsetof(struct lustre_msg_v2, lm_opc));
+ LASSERTF((int)sizeof(((struct lustre_msg_v2 *)0)->lm_opc) == 4, "found %lld\n",
+ (long long)(int)sizeof(((struct lustre_msg_v2 *)0)->lm_opc));
LASSERTF((int)offsetof(struct lustre_msg_v2, lm_padding_3) == 28, "found %lld\n",
(long long)(int)offsetof(struct lustre_msg_v2, lm_padding_3));
LASSERTF((int)sizeof(((struct lustre_msg_v2 *)0)->lm_padding_3) == 4, "found %lld\n",
CHECK_VALUE(MDS_HSM_CT_UNREGISTER);
CHECK_VALUE(MDS_SWAP_LAYOUTS);
CHECK_VALUE(MDS_RMFID);
+ CHECK_VALUE(MDS_BATCH);
CHECK_VALUE(MDS_LAST_OPC);
CHECK_VALUE(REINT_SETATTR);
(long long)MDS_SWAP_LAYOUTS);
LASSERTF(MDS_RMFID == 62, "found %lld\n",
(long long)MDS_RMFID);
- LASSERTF(MDS_LAST_OPC == 63, "found %lld\n",
+ LASSERTF(MDS_BATCH == 63, "found %lld\n",
+ (long long)MDS_BATCH);
+ LASSERTF(MDS_LAST_OPC == 64, "found %lld\n",
(long long)MDS_LAST_OPC);
LASSERTF(REINT_SETATTR == 1, "found %lld\n",
(long long)REINT_SETATTR);
(long long)(int)offsetof(struct lustre_msg_v2, lm_flags));
LASSERTF((int)sizeof(((struct lustre_msg_v2 *)0)->lm_flags) == 4, "found %lld\n",
(long long)(int)sizeof(((struct lustre_msg_v2 *)0)->lm_flags));
- LASSERTF((int)offsetof(struct lustre_msg_v2, lm_padding_2) == 24, "found %lld\n",
- (long long)(int)offsetof(struct lustre_msg_v2, lm_padding_2));
- LASSERTF((int)sizeof(((struct lustre_msg_v2 *)0)->lm_padding_2) == 4, "found %lld\n",
- (long long)(int)sizeof(((struct lustre_msg_v2 *)0)->lm_padding_2));
+ LASSERTF((int)offsetof(struct lustre_msg_v2, lm_opc) == 24, "found %lld\n",
+ (long long)(int)offsetof(struct lustre_msg_v2, lm_opc));
+ LASSERTF((int)sizeof(((struct lustre_msg_v2 *)0)->lm_opc) == 4, "found %lld\n",
+ (long long)(int)sizeof(((struct lustre_msg_v2 *)0)->lm_opc));
LASSERTF((int)offsetof(struct lustre_msg_v2, lm_padding_3) == 28, "found %lld\n",
(long long)(int)offsetof(struct lustre_msg_v2, lm_padding_3));
LASSERTF((int)sizeof(((struct lustre_msg_v2 *)0)->lm_padding_3) == 4, "found %lld\n",