From 23fad25a5b6b1e468d05dc7e81dfb9ad00c86a16 Mon Sep 17 00:00:00 2001 From: "nicolas.williams" Date: Thu, 5 Nov 2009 20:56:12 +0000 Subject: [PATCH] b=18631 i=robert.read@sun.com i=tom.wang@sun.com --- lustre/ChangeLog | 6 + lustre/include/lustre/lustre_idl.h | 9 +- lustre/include/lustre_capa.h | 16 -- lustre/include/lustre_debug.h | 6 +- lustre/include/lustre_dlm.h | 17 +- lustre/include/lustre_net.h | 7 +- lustre/include/lustre_req_layout.h | 3 + lustre/ldlm/ldlm_lock.c | 2 +- lustre/ldlm/ldlm_lockd.c | 12 +- lustre/ldlm/ldlm_request.c | 22 +- lustre/liblustre/super.c | 2 +- lustre/mdc/mdc_locks.c | 6 +- lustre/mdt/mdt_internal.h | 4 +- lustre/mdt/mdt_reint.c | 6 +- lustre/mgc/mgc_request.c | 18 +- lustre/mgs/mgs_handler.c | 2 +- lustre/obdclass/debug.c | 84 +----- lustre/obdecho/echo.c | 4 +- lustre/obdfilter/filter.c | 8 +- lustre/obdfilter/filter_io.c | 2 +- lustre/obdfilter/filter_lvb.c | 6 +- lustre/osc/osc_create.c | 14 +- lustre/osc/osc_request.c | 40 ++- lustre/ost/ost_handler.c | 461 ++++++++++++++++++------------ lustre/ptlrpc/layout.c | 558 +++++++++++++++++++++++++++++++------ lustre/ptlrpc/pack_generic.c | 125 ++++++--- lustre/ptlrpc/ptlrpc_module.c | 7 +- lustre/utils/req-layout.c | 7 + 28 files changed, 943 insertions(+), 511 deletions(-) diff --git a/lustre/ChangeLog b/lustre/ChangeLog index f1fff09..da9379b 100644 --- a/lustre/ChangeLog +++ b/lustre/ChangeLog @@ -14,6 +14,12 @@ tbd Sun Microsystems, Inc. * RHEL 4 and RHEL 5/SLES 10 clients behaves differently on 'cd' to a removed cwd "./" (refer to Bugzilla 14399). +Severity : normal$ +Bugzilla : 18631 +Description: Unify req format on client/servers +Details : Use new req_capsule API [almost] everywhere instead of old PTLRPC + buffers and swabbers approach.. + Severity : normal Frequency : cleanup Bugzilla : 19200 diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index 5e205b8..7f76ca0 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -2465,6 +2465,13 @@ extern void lustre_swab_llog_rec(struct llog_rec_hdr *rec, struct lustre_cfg; extern void lustre_swab_lustre_cfg(struct lustre_cfg *lcfg); +/* Functions for dumping PTLRPC fields */ +void dump_rniobuf(struct niobuf_remote *rnb); +void dump_ioo(struct obd_ioobj *nb); +void dump_obdo(struct obdo *oa); +void dump_ost_body(struct ost_body *ob); +void dump_rcs(__u32 *rc); + /* this will be used when OBD_CONNECT_CHANGE_QS is set */ struct qunit_data { /** @@ -2503,7 +2510,7 @@ struct qunit_data { #define QDATA_CLR_CHANGE_QS(qdata) ((qdata)->qd_flags &= ~LQUOTA_FLAGS_CHG_QS) extern void lustre_swab_qdata(struct qunit_data *d); -extern struct qunit_data *quota_get_qdata(void*req, int is_req, int is_exp); +extern struct qunit_data *quota_get_qdata(void *req, int is_req, int is_exp); extern int quota_copy_qdata(void *request, struct qunit_data *qdata, int is_req, int is_exp); diff --git a/lustre/include/lustre_capa.h b/lustre/include/lustre_capa.h index 9b9b21b..44a06b7d 100644 --- a/lustre/include/lustre_capa.h +++ b/lustre/include/lustre_capa.h @@ -284,22 +284,6 @@ static inline int capa_opc_supported(struct lustre_capa *capa, __u64 opc) return (capa_opc(capa) & opc) == opc; } -static inline struct lustre_capa * -lustre_unpack_incoming_capa(struct ptlrpc_request *req, unsigned int offset) -{ - struct lustre_capa *capa; - - capa = lustre_swab_reqbuf(req, offset, sizeof(*capa), - lustre_swab_lustre_capa); - if (capa == NULL) - CERROR("bufcount %u, bufsize %u\n", - lustre_msg_bufcount(req->rq_reqmsg), - (lustre_msg_bufcount(req->rq_reqmsg) <= offset) ? - -1 : lustre_msg_buflen(req->rq_reqmsg, offset)); - - return capa; -} - struct filter_capa_key { struct list_head k_list; struct lustre_capa_key k_key; diff --git a/lustre/include/lustre_debug.h b/lustre/include/lustre_debug.h index 5ded0ff..73c22bb 100644 --- a/lustre/include/lustre_debug.h +++ b/lustre/include/lustre_debug.h @@ -38,6 +38,7 @@ #define _LUSTRE_DEBUG_H #include +#include #if defined(__linux__) #include @@ -67,11 +68,8 @@ do { if (offset > ASSERT_MAX_SIZE_MB << 20) { \ }} while(0) /* lib/debug.c */ -int dump_lniobuf(struct niobuf_local *lnb); -int dump_rniobuf(struct niobuf_remote *rnb); -int dump_ioo(struct obd_ioobj *nb); +void dump_lniobuf(struct niobuf_local *lnb); int dump_req(struct ptlrpc_request *req); -int dump_obdo(struct obdo *oa); void dump_lsm(int level, struct lov_stripe_md *lsm); int block_debug_setup(void *addr, int len, __u64 off, __u64 id); int block_debug_check(char *who, void *addr, int len, __u64 off, __u64 id); diff --git a/lustre/include/lustre_dlm.h b/lustre/include/lustre_dlm.h index c83a841..710f921 100644 --- a/lustre/include/lustre_dlm.h +++ b/lustre/include/lustre_dlm.h @@ -368,7 +368,7 @@ struct ldlm_valblock_ops { int (*lvbo_init)(struct ldlm_resource *res); int (*lvbo_update)(struct ldlm_resource *res, struct ptlrpc_request *r, - int buf_idx, int increase); + int increase); }; typedef enum { @@ -667,7 +667,6 @@ struct ldlm_lock { */ __u32 l_lvb_len; void *l_lvb_data; - void *l_lvb_swabber; void *l_ast_data; spinlock_t l_extents_list_lock; @@ -915,12 +914,11 @@ ldlm_handle2lock_long(const struct lustre_handle *h, int flags) } static inline int ldlm_res_lvbo_update(struct ldlm_resource *res, - struct ptlrpc_request *r, int buf_idx, - int increase) + struct ptlrpc_request *r, int increase) { if (res->lr_namespace->ns_lvbo && res->lr_namespace->ns_lvbo->lvbo_update) { - return res->lr_namespace->ns_lvbo->lvbo_update(res, r, buf_idx, + return res->lr_namespace->ns_lvbo->lvbo_update(res, r, increase); } return 0; @@ -1064,8 +1062,8 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, struct ldlm_enqueue_info *einfo, const struct ldlm_res_id *res_id, ldlm_policy_data_t *policy, int *flags, - void *lvb, __u32 lvb_len, void *lvb_swabber, - struct lustre_handle *lockh, int async); + void *lvb, __u32 lvb_len, struct lustre_handle *lockh, + int async); int ldlm_prep_enqueue_req(struct obd_export *exp, struct ptlrpc_request *req, struct list_head *cancels, @@ -1080,8 +1078,7 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns, struct ptlrpc_request *req, int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req, ldlm_type_t type, __u8 with_policy, ldlm_mode_t mode, int *flags, void *lvb, __u32 lvb_len, - void *lvb_swabber, struct lustre_handle *lockh, - int rc); + struct lustre_handle *lockh, int rc); int ldlm_cli_enqueue_local(struct ldlm_namespace *ns, const struct ldlm_res_id *res_id, ldlm_type_t type, ldlm_policy_data_t *policy, @@ -1089,7 +1086,7 @@ int ldlm_cli_enqueue_local(struct ldlm_namespace *ns, ldlm_blocking_callback blocking, ldlm_completion_callback completion, ldlm_glimpse_callback glimpse, - void *data, __u32 lvb_len, void *lvb_swabber, + void *data, __u32 lvb_len, const __u64 *client_cookie, struct lustre_handle *lockh); int ldlm_server_ast(struct lustre_handle *lockh, struct ldlm_lock_desc *new, diff --git a/lustre/include/lustre_net.h b/lustre/include/lustre_net.h index 2433ea8..af283c5 100644 --- a/lustre/include/lustre_net.h +++ b/lustre/include/lustre_net.h @@ -415,7 +415,8 @@ struct ptlrpc_request { rq_pack_udesc:1, rq_pack_bulk:1, /* doesn't expect reply FIXME */ - rq_no_reply:1; + rq_no_reply:1, + rq_pill_init:1; /* pill initialized */ uid_t rq_auth_uid; /* authed uid */ uid_t rq_auth_mapped_uid; /* authed uid mapped to */ @@ -1133,10 +1134,6 @@ int lustre_msg_buflen(struct lustre_msg *m, int n); void lustre_msg_set_buflen(struct lustre_msg *m, int n, int len); int lustre_msg_bufcount(struct lustre_msg *m); char *lustre_msg_string (struct lustre_msg *m, int n, int max_len); -void *lustre_swab_reqbuf(struct ptlrpc_request *req, int n, int minlen, - void *swabber); -void *lustre_swab_repbuf(struct ptlrpc_request *req, int n, int minlen, - void *swabber); __u32 lustre_msghdr_get_flags(struct lustre_msg *msg); void lustre_msghdr_set_flags(struct lustre_msg *msg, __u32 flags); __u32 lustre_msg_get_flags(struct lustre_msg *msg); diff --git a/lustre/include/lustre_req_layout.h b/lustre/include/lustre_req_layout.h index 8c650e7..0cd88b4 100644 --- a/lustre/include/lustre_req_layout.h +++ b/lustre/include/lustre_req_layout.h @@ -75,6 +75,8 @@ void req_capsule_init(struct req_capsule *pill, struct ptlrpc_request *req, void req_capsule_fini(struct req_capsule *pill); void req_capsule_set(struct req_capsule *pill, const struct req_format *fmt); +void req_capsule_client_dump(struct req_capsule *pill); +void req_capsule_server_dump(struct req_capsule *pill); void req_capsule_init_area(struct req_capsule *pill); int req_capsule_filled_sizes(struct req_capsule *pill, enum req_location loc); int req_capsule_server_pack(struct req_capsule *pill); @@ -271,6 +273,7 @@ extern const struct req_msg_field RMF_OST_BODY; extern const struct req_msg_field RMF_OBD_IOOBJ; extern const struct req_msg_field RMF_OBD_ID; extern const struct req_msg_field RMF_NIOBUF_REMOTE; +extern const struct req_msg_field RMF_RCS; extern const struct req_msg_field RMF_FIEMAP_KEY; extern const struct req_msg_field RMF_FIEMAP_VAL; diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index dd57661..eda190b 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -1673,7 +1673,7 @@ void ldlm_cancel_locks_for_export_cb(void *obj, void *data) LDLM_LOCK_GET(lock); LDLM_DEBUG(lock, "export %p", exp); - ldlm_res_lvbo_update(res, NULL, 0, 1); + ldlm_res_lvbo_update(res, NULL, 1); ldlm_lock_cancel(lock); ldlm_reprocess_all(res); ldlm_resource_putref(res); diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index c59fd3e..a3fc916 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -637,8 +637,7 @@ static int ldlm_cb_interpret(const struct lu_env *env, * been received yet, we need to update lvbo to have the * proper attributes cached. */ if (rc == -EINVAL && arg->type == LDLM_BL_CALLBACK) - ldlm_res_lvbo_update(lock->l_resource, NULL, - 0, 1); + ldlm_res_lvbo_update(lock->l_resource, NULL, 1); rc = ldlm_handle_ast_error(lock, req, rc, arg->type == LDLM_BL_CALLBACK ? "blocking" : "completion"); @@ -949,7 +948,7 @@ int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data) else if (rc != 0) rc = ldlm_handle_ast_error(lock, req, rc, "glimpse"); else - rc = ldlm_res_lvbo_update(res, req, REPLY_REC_OFF, 1); + rc = ldlm_res_lvbo_update(res, req, 1); ptlrpc_req_finished(req); if (rc == -ERESTART) @@ -1391,7 +1390,7 @@ int ldlm_request_cancel(struct ptlrpc_request *req, if (res != NULL) { ldlm_resource_getref(res); LDLM_RESOURCE_ADDREF(res); - ldlm_res_lvbo_update(res, NULL, 0, 1); + ldlm_res_lvbo_update(res, NULL, 1); } pres = res; } @@ -1544,9 +1543,8 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, LDLM_ERROR(lock, "completion AST did not contain " "expected LVB!"); } else { - void *lvb = req_capsule_client_swab_get(&req->rq_pill, - &RMF_DLM_LVB, - (void *)lock->l_lvb_swabber); + void *lvb = req_capsule_client_get(&req->rq_pill, + &RMF_DLM_LVB); memcpy(lock->l_lvb_data, lvb, lock->l_lvb_len); } } diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index 1eaa0e1..5402c50 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -377,7 +377,7 @@ int ldlm_cli_enqueue_local(struct ldlm_namespace *ns, ldlm_blocking_callback blocking, ldlm_completion_callback completion, ldlm_glimpse_callback glimpse, - void *data, __u32 lvb_len, void *lvb_swabber, + void *data, __u32 lvb_len, const __u64 *client_cookie, struct lustre_handle *lockh) { @@ -406,7 +406,6 @@ int ldlm_cli_enqueue_local(struct ldlm_namespace *ns, lock->l_flags |= LDLM_FL_LOCAL; if (*flags & LDLM_FL_ATOMIC_CB) lock->l_flags |= LDLM_FL_ATOMIC_CB; - lock->l_lvb_swabber = lvb_swabber; unlock_res_and_lock(lock); if (policy != NULL) lock->l_policy_data = *policy; @@ -476,7 +475,7 @@ static void failed_lock_cleanup(struct ldlm_namespace *ns, int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req, ldlm_type_t type, __u8 with_policy, ldlm_mode_t mode, int *flags, void *lvb, __u32 lvb_len, - void *lvb_swabber, struct lustre_handle *lockh,int rc) + struct lustre_handle *lockh,int rc) { struct ldlm_namespace *ns = exp->exp_obd->obd_namespace; int is_replay = *flags & LDLM_FL_REPLAY; @@ -509,9 +508,8 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req, req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len); - tmplvb = req_capsule_server_swab_get(&req->rq_pill, - &RMF_DLM_LVB, - lvb_swabber); + tmplvb = req_capsule_server_get(&req->rq_pill, + &RMF_DLM_LVB); if (tmplvb == NULL) GOTO(cleanup, rc = -EPROTO); if (lvb != NULL) @@ -606,9 +604,8 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req, req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len); - tmplvb = req_capsule_server_swab_get(&req->rq_pill, - &RMF_DLM_LVB, - lvb_swabber); + tmplvb = req_capsule_server_get(&req->rq_pill, + &RMF_DLM_LVB); if (tmplvb == NULL) GOTO(cleanup, rc = -EPROTO); memcpy(lock->l_lvb_data, tmplvb, lvb_len); @@ -758,8 +755,8 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, struct ldlm_enqueue_info *einfo, const struct ldlm_res_id *res_id, ldlm_policy_data_t *policy, int *flags, - void *lvb, __u32 lvb_len, void *lvb_swabber, - struct lustre_handle *lockh, int async) + void *lvb, __u32 lvb_len, struct lustre_handle *lockh, + int async) { struct ldlm_namespace *ns = exp->exp_obd->obd_namespace; struct ldlm_lock *lock; @@ -794,7 +791,6 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, /* for the local lock, add the reference */ ldlm_lock_addref_internal(lock, einfo->ei_mode); ldlm_lock2handle(lock, lockh); - lock->l_lvb_swabber = lvb_swabber; if (policy != NULL) { /* INODEBITS_INTEROP: If the server does not support * inodebits, we will request a plain lock in the @@ -880,7 +876,7 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, err = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, policy ? 1 : 0, einfo->ei_mode, flags, lvb, lvb_len, - lvb_swabber, lockh, rc); + lockh, rc); /* If ldlm_cli_enqueue_fini did not find the lock, we need to free * one reference that we took */ diff --git a/lustre/liblustre/super.c b/lustre/liblustre/super.c index 0b2088d..bbb2dfb 100644 --- a/lustre/liblustre/super.c +++ b/lustre/liblustre/super.c @@ -1410,7 +1410,7 @@ static int llu_file_flock(struct inode *ino, if (lmv->tgts[0].ltd_exp != NULL) rc = ldlm_cli_enqueue(lmv->tgts[0].ltd_exp, NULL, &einfo, &res_id, - &flock, &flags, NULL, 0, NULL, &lockh, 0); + &flock, &flags, NULL, 0, &lockh, 0); else rc = -ENODEV; } diff --git a/lustre/mdc/mdc_locks.c b/lustre/mdc/mdc_locks.c index 9b1443f..4fb239f 100644 --- a/lustre/mdc/mdc_locks.c +++ b/lustre/mdc/mdc_locks.c @@ -661,7 +661,7 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, mdc_enter_request(&obddev->u.cli); } rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL, - 0, NULL, lockh, 0); + 0, lockh, 0); if (reqp) *reqp = req; @@ -943,7 +943,7 @@ static int mdc_intent_getattr_async_interpret(const struct lu_env *env, rc = -ETIMEDOUT; rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode, - &flags, NULL, 0, NULL, lockh, rc); + &flags, NULL, 0, lockh, rc); if (rc < 0) { CERROR("ldlm_cli_enqueue_fini: %d\n", rc); mdc_clear_replay_flag(req, rc); @@ -990,7 +990,7 @@ int mdc_intent_getattr_async(struct obd_export *exp, mdc_enter_request(&obddev->u.cli); rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL, - 0, NULL, &minfo->mi_lockh, 1); + 0, &minfo->mi_lockh, 1); if (rc < 0) { mdc_exit_request(&obddev->u.cli); RETURN(rc); diff --git a/lustre/mdt/mdt_internal.h b/lustre/mdt/mdt_internal.h index fa2205b..c32f9ad 100644 --- a/lustre/mdt/mdt_internal.h +++ b/lustre/mdt/mdt_internal.h @@ -712,8 +712,8 @@ static inline int mdt_fid_lock(struct ldlm_namespace *ns, rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, policy, mode, &flags, mdt_blocking_ast, - ldlm_completion_ast, - NULL, NULL, 0, NULL, client_cookie, lh); + ldlm_completion_ast, NULL, NULL, 0, + client_cookie, lh); return rc == ELDLM_OK ? 0 : -EIO; } diff --git a/lustre/mdt/mdt_reint.c b/lustre/mdt/mdt_reint.c index 38502ef..6548035 100644 --- a/lustre/mdt/mdt_reint.c +++ b/lustre/mdt/mdt_reint.c @@ -837,7 +837,6 @@ static int mdt_rename_lock(struct mdt_thread_info *info, rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, policy, LCK_EX, &flags, ldlm_blocking_ast, ldlm_completion_ast, NULL, NULL, 0, - NULL, &info->mti_exp->exp_handle.h_cookie, lh); } else { @@ -849,9 +848,8 @@ static int mdt_rename_lock(struct mdt_thread_info *info, * This is the case mdt0 is remote node, issue DLM lock like * other clients. */ - rc = ldlm_cli_enqueue(ms->ms_control_exp, - NULL, &einfo, res_id, - policy, &flags, NULL, 0, NULL, lh, 0); + rc = ldlm_cli_enqueue(ms->ms_control_exp, NULL, &einfo, res_id, + policy, &flags, NULL, 0, lh, 0); } RETURN(rc); diff --git a/lustre/mgc/mgc_request.c b/lustre/mgc/mgc_request.c index 339487f..b7eb9b0 100644 --- a/lustre/mgc/mgc_request.c +++ b/lustre/mgc/mgc_request.c @@ -786,29 +786,27 @@ static int mgc_set_mgs_param(struct obd_export *exp, { struct ptlrpc_request *req; struct mgs_send_param *req_msp, *rep_msp; - int size[] = { sizeof(struct ptlrpc_body), sizeof(*req_msp) }; - __u32 rep_size[] = { sizeof(struct ptlrpc_body), sizeof(*msp) }; int rc; ENTRY; - req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MGS_VERSION, - MGS_SET_INFO, 2, size, NULL); + req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), + &RQF_MGS_SET_INFO, LUSTRE_MGS_VERSION, + MGS_SET_INFO); if (!req) RETURN(-ENOMEM); - req_msp = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*req_msp)); + req_msp = req_capsule_client_get(&req->rq_pill, &RMF_MGS_SEND_PARAM); if (!req_msp) { ptlrpc_req_finished(req); RETURN(-ENOMEM); } memcpy(req_msp, msp, sizeof(*req_msp)); - ptlrpc_req_set_repsize(req, 2, rep_size); + ptlrpc_request_set_replen(req); rc = ptlrpc_queue_wait(req); if (!rc) { - rep_msp = lustre_swab_repbuf(req, REPLY_REC_OFF, - sizeof(*rep_msp), NULL); + rep_msp = req_capsule_server_get(&req->rq_pill, &RMF_MGS_SEND_PARAM); memcpy(msp, rep_msp, sizeof(*rep_msp)); } @@ -841,8 +839,8 @@ static int mgc_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm, /* We need a callback for every lockholder, so don't try to ldlm_lock_match (see rev 1.1.2.11.2.47) */ - rc = ldlm_cli_enqueue(exp, NULL, &einfo, &cld->cld_resid, - NULL, flags, NULL, 0, NULL, lockh, 0); + rc = ldlm_cli_enqueue(exp, NULL, &einfo, &cld->cld_resid, NULL, flags, + NULL, 0, lockh, 0); /* A failed enqueue should still call the mgc_blocking_ast, where it will be requeued if needed ("grant failed"). */ diff --git a/lustre/mgs/mgs_handler.c b/lustre/mgs/mgs_handler.c index d45f690..3c5262e 100644 --- a/lustre/mgs/mgs_handler.c +++ b/lustre/mgs/mgs_handler.c @@ -332,7 +332,7 @@ static int mgs_get_cfg_lock(struct obd_device *obd, char *fsname, LDLM_PLAIN, NULL, LCK_EX, &flags, ldlm_blocking_ast, ldlm_completion_ast, NULL, - fsname, 0, NULL, NULL, lockh); + fsname, 0, NULL, lockh); if (rc) CERROR("can't take cfg lock for %s (%d)\n", fsname, rc); diff --git a/lustre/obdclass/debug.c b/lustre/obdclass/debug.c index a80cd2c..1a62d9d 100644 --- a/lustre/obdclass/debug.c +++ b/lustre/obdclass/debug.c @@ -52,67 +52,15 @@ #include #include -int dump_ioo(struct obd_ioobj *ioo) +void dump_lniobuf(struct niobuf_local *nb) { - CERROR("obd_ioobj: ioo_id="LPD64", ioo_gr="LPD64", ioo_type=%d, " - "ioo_bufct=%d\n", - ioo->ioo_id, ioo->ioo_gr, ioo->ioo_type, ioo->ioo_bufcnt); - return -EINVAL; -} - -int dump_lniobuf(struct niobuf_local *nb) -{ - CERROR("niobuf_local: offset="LPD64", len=%d, page=%p, rc=%d\n", + CDEBUG(D_RPCTRACE, + "niobuf_local: offset="LPD64", len=%d, page=%p, rc=%d\n", nb->offset, nb->len, nb->page, nb->rc); - CERROR("nb->page: index = %ld\n", nb->page ? cfs_page_index(nb->page) : -1); - - return -EINVAL; -} - -int dump_rniobuf(struct niobuf_remote *nb) -{ - CERROR("niobuf_remote: offset="LPU64", len=%d, flags=%x\n", - nb->offset, nb->len, nb->flags); - - return -EINVAL; -} - -int dump_obdo(struct obdo *oa) -{ - __u32 valid = oa->o_valid; - - CERROR("obdo: o_valid = %08x\n", valid); - if (valid & OBD_MD_FLID) - CERROR("obdo: o_id = "LPD64"\n", oa->o_id); - if (valid & OBD_MD_FLATIME) - CERROR("obdo: o_atime = "LPD64"\n", oa->o_atime); - if (valid & OBD_MD_FLMTIME) - CERROR("obdo: o_mtime = "LPD64"\n", oa->o_mtime); - if (valid & OBD_MD_FLCTIME) - CERROR("obdo: o_ctime = "LPD64"\n", oa->o_ctime); - if (valid & OBD_MD_FLSIZE) - CERROR("obdo: o_size = "LPD64"\n", oa->o_size); - if (valid & OBD_MD_FLBLOCKS) /* allocation of space */ - CERROR("obdo: o_blocks = "LPD64"\n", oa->o_blocks); - if (valid & OBD_MD_FLBLKSZ) - CERROR("obdo: o_blksize = %d\n", oa->o_blksize); - if (valid & (OBD_MD_FLTYPE | OBD_MD_FLMODE)) - CERROR("obdo: o_mode = %o\n", - oa->o_mode & ((valid & OBD_MD_FLTYPE ? S_IFMT : 0) | - (valid & OBD_MD_FLMODE ? ~S_IFMT : 0))); - if (valid & OBD_MD_FLUID) - CERROR("obdo: o_uid = %u\n", oa->o_uid); - if (valid & OBD_MD_FLGID) - CERROR("obdo: o_gid = %u\n", oa->o_gid); - if (valid & OBD_MD_FLFLAGS) - CERROR("obdo: o_flags = %x\n", oa->o_flags); - if (valid & OBD_MD_FLNLINK) - CERROR("obdo: o_nlink = %u\n", oa->o_nlink); - if (valid & OBD_MD_FLGENER) - CERROR("obdo: o_generation = %u\n", oa->o_generation); - - return -EINVAL; + CDEBUG(D_RPCTRACE, "nb->page: index = %ld\n", + nb->page ? cfs_page_index(nb->page) : -1); } +EXPORT_SYMBOL(dump_lniobuf); void dump_lsm(int level, struct lov_stripe_md *lsm) { @@ -123,22 +71,6 @@ void dump_lsm(int level, struct lov_stripe_md *lsm) lsm->lsm_pool_name); } -/* XXX assumes only a single page in request */ -/* -int dump_req(struct ptlrpc_request *req) -{ - struct ost_body *body = lustre_msg_buf(req->rq_reqmsg, 0); - struct obd_ioobj *ioo = lustre_msg_buf(req->rq_reqmsg, 1); - //struct niobuf *nb = lustre_msg_buf(req->rq_reqmsg, 2); - - dump_obdo(&body->oa); - //dump_niobuf(nb); - dump_ioo(ioo); - - return -EINVAL; -} -*/ - #define LPDS sizeof(__u64) int block_debug_setup(void *addr, int len, __u64 off, __u64 id) { @@ -192,11 +124,7 @@ int block_debug_check(char *who, void *addr, int end, __u64 off, __u64 id) } #undef LPDS -EXPORT_SYMBOL(dump_lniobuf); -EXPORT_SYMBOL(dump_rniobuf); -EXPORT_SYMBOL(dump_ioo); //EXPORT_SYMBOL(dump_req); -EXPORT_SYMBOL(dump_obdo); EXPORT_SYMBOL(dump_lsm); EXPORT_SYMBOL(block_debug_setup); EXPORT_SYMBOL(block_debug_check); diff --git a/lustre/obdecho/echo.c b/lustre/obdecho/echo.c index 67f9a0b..58b308c 100644 --- a/lustre/obdecho/echo.c +++ b/lustre/obdecho/echo.c @@ -566,8 +566,8 @@ static int echo_setup(struct obd_device *obd, struct lustre_cfg *lcfg) rc = ldlm_cli_enqueue_local(obd->obd_namespace, &res_id, LDLM_PLAIN, NULL, LCK_NL, &lock_flags, NULL, - ldlm_completion_ast, NULL, NULL, - 0, NULL, NULL, &obd->u.echo.eo_nl_lock); + ldlm_completion_ast, NULL, NULL, 0, NULL, + &obd->u.echo.eo_nl_lock); LASSERT (rc == ELDLM_OK); lprocfs_echo_init_vars(&lvars); diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index c126570..7a64adc 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -1551,7 +1551,7 @@ static int filter_prepare_destroy(struct obd_device *obd, obd_id objid, rc = ldlm_cli_enqueue_local(obd->obd_namespace, &res_id, LDLM_EXTENT, &policy, LCK_PW, &flags, ldlm_blocking_ast, ldlm_completion_ast, NULL, NULL, 0, NULL, - NULL, lockh); + lockh); if (rc != ELDLM_OK) lockh->cookie = 0; RETURN(rc); @@ -1809,7 +1809,7 @@ static int filter_intent_policy(struct ldlm_namespace *ns, * * Of course, this will all disappear when we switch to * taking liblustre locks on the OST. */ - ldlm_res_lvbo_update(res, NULL, 0, 1); + ldlm_res_lvbo_update(res, NULL, 1); } RETURN(ELDLM_LOCK_ABORTED); } @@ -1836,7 +1836,7 @@ static int filter_intent_policy(struct ldlm_namespace *ns, * sending ast is not handled. This can result in lost client writes. */ if (rc != 0) - ldlm_res_lvbo_update(res, NULL, 0, 1); + ldlm_res_lvbo_update(res, NULL, 1); lock_res(res); *reply_lvb = *res_lvb; @@ -3445,7 +3445,7 @@ int filter_setattr(struct obd_export *exp, struct obd_info *oinfo, if (res != NULL) { LDLM_RESOURCE_ADDREF(res); - rc = ldlm_res_lvbo_update(res, NULL, 0, 0); + rc = ldlm_res_lvbo_update(res, NULL, 0); LDLM_RESOURCE_DELREF(res); ldlm_resource_putref(res); } diff --git a/lustre/obdfilter/filter_io.c b/lustre/obdfilter/filter_io.c index d195627..368842e 100644 --- a/lustre/obdfilter/filter_io.c +++ b/lustre/obdfilter/filter_io.c @@ -898,7 +898,7 @@ static int filter_commitrw_read(struct obd_export *exp, struct obdo *oa, if (resource != NULL) { LDLM_RESOURCE_ADDREF(resource); - ns->ns_lvbo->lvbo_update(resource, NULL, 0, 1); + ns->ns_lvbo->lvbo_update(resource, NULL, 1); LDLM_RESOURCE_DELREF(resource); ldlm_resource_putref(resource); } diff --git a/lustre/obdfilter/filter_lvb.c b/lustre/obdfilter/filter_lvb.c index 1f627b2..c66055e 100644 --- a/lustre/obdfilter/filter_lvb.c +++ b/lustre/obdfilter/filter_lvb.c @@ -121,8 +121,7 @@ out_dentry: * If 'increase_only' is true, don't allow values to move backwards. */ static int filter_lvbo_update(struct ldlm_resource *res, - struct ptlrpc_request *r, - int buf_idx, int increase_only) + struct ptlrpc_request *r, int increase_only) { int rc = 0; struct ost_lvb *lvb; @@ -144,8 +143,7 @@ static int filter_lvbo_update(struct ldlm_resource *res, struct ost_lvb *new; /* XXX update always from reply buffer */ - new = lustre_swab_repbuf(r, buf_idx, sizeof(*new), - lustre_swab_ost_lvb); + new = req_capsule_server_get(&r->rq_pill, &RMF_DLM_LVB); if (new == NULL) { CERROR("lustre_swab_buf failed\n"); diff --git a/lustre/osc/osc_create.c b/lustre/osc/osc_create.c index 2bd2311..17fcbd8 100644 --- a/lustre/osc/osc_create.c +++ b/lustre/osc/osc_create.c @@ -82,8 +82,7 @@ static int osc_interpret_create(const struct lu_env *env, ENTRY; if (req->rq_repmsg) { - body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body), - lustre_swab_ost_body); + body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); if (body == NULL && rc == 0) rc = -EPROTO; } @@ -189,7 +188,6 @@ static int oscc_internal_create(struct osc_creator *oscc) { struct ptlrpc_request *request; struct ost_body *body; - __u32 size[] = { sizeof(struct ptlrpc_body), sizeof(*body) }; ENTRY; LASSERT_SPIN_LOCKED(&oscc->oscc_lock); @@ -221,9 +219,9 @@ static int oscc_internal_create(struct osc_creator *oscc) oscc->oscc_flags |= OSCC_FLAG_CREATING; spin_unlock(&oscc->oscc_lock); - request = ptlrpc_prep_req(oscc->oscc_obd->u.cli.cl_import, - LUSTRE_OST_VERSION, OST_CREATE, 2, - size, NULL); + request = ptlrpc_request_alloc_pack(oscc->oscc_obd->u.cli.cl_import, + &RQF_OST_CREATE, + LUSTRE_OST_VERSION, OST_CREATE); if (request == NULL) { spin_lock(&oscc->oscc_lock); oscc->oscc_flags &= ~OSCC_FLAG_CREATING; @@ -233,7 +231,7 @@ static int oscc_internal_create(struct osc_creator *oscc) request->rq_request_portal = OST_CREATE_PORTAL; ptlrpc_at_set_req_timeout(request); - body = lustre_msg_buf(request->rq_reqmsg, REQ_REC_OFF, sizeof(*body)); + body = req_capsule_client_get(&request->rq_pill, &RMF_OST_BODY); spin_lock(&oscc->oscc_lock); body->oa.o_id = oscc->oscc_last_id + oscc->oscc_grow_count; @@ -248,7 +246,7 @@ static int oscc_internal_create(struct osc_creator *oscc) /* we should not resend create request - anyway we will have delorphan * and kill these objects */ request->rq_no_delay = request->rq_no_resend = 1; - ptlrpc_req_set_repsize(request, 2, size); + ptlrpc_request_set_replen(request); request->rq_async_args.pointer_arg[0] = oscc; request->rq_interpret_reply = osc_interpret_create; diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index ba12a1a..a2f2784 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -210,8 +210,7 @@ static int osc_getattr_interpret(const struct lu_env *env, if (rc != 0) GOTO(out, rc); - body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body), - lustre_swab_ost_body); + body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); if (body) { CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode); lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa); @@ -1138,19 +1137,18 @@ static int check_write_rcs(struct ptlrpc_request *req, int requested_nob, int niocount, obd_count page_count, struct brw_page **pga) { - int *remote_rcs, i; + int i; + __u32 *remote_rcs; - /* return error if any niobuf was in error */ - remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1, - sizeof(*remote_rcs) * niocount, NULL); + remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS, + sizeof(*remote_rcs) * + niocount); if (remote_rcs == NULL) { CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n"); return(-EPROTO); } - if (ptlrpc_rep_need_swab(req)) - for (i = 0; i < niocount; i++) - __swab32s(&remote_rcs[i]); + /* return error if any niobuf was in error */ for (i = 0; i < niocount; i++) { if (remote_rcs[i] < 0) return(remote_rcs[i]); @@ -1264,6 +1262,8 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, } pill = &req->rq_pill; + req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT, + sizeof(*ioobj)); req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT, niocount * sizeof(*niobuf)); osc_set_capa_size(req, &RMF_CAPA1, ocapa); @@ -1290,7 +1290,7 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, body = req_capsule_client_get(pill, &RMF_OST_BODY); ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ); niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE); - LASSERT(body && ioobj && niobuf); + LASSERT(body != NULL && ioobj != NULL && niobuf != NULL); lustre_set_wire_obdo(&body->oa, oa); @@ -1337,11 +1337,9 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, } LASSERTF((void *)(niobuf - niocount) == - lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2, - niocount * sizeof(*niobuf)), - "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg, - REQ_REC_OFF + 2, niocount * sizeof(*niobuf)), - (void *)(niobuf - niocount)); + req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE), + "want %p - real %p\n", req_capsule_client_get(&req->rq_pill, + &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount)); osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0); if (osc_should_shrink_grant(cli)) @@ -1377,7 +1375,7 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, } oa->o_cksum = body->oa.o_cksum; /* 1 RC per niobuf */ - req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER, + req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER, sizeof(__u32) * niocount); } else { if (unlikely(cli->cl_checksum) && @@ -1387,7 +1385,7 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type); body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS; } - req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER, 0); + req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER, 0); /* 1 RC for the whole I/O */ } ptlrpc_request_set_replen(req); @@ -1479,8 +1477,7 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) RETURN(rc); LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc); - body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body), - lustre_swab_ost_body); + body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); if (body == NULL) { CDEBUG(D_INFO, "Can't unpack body\n"); RETURN(-EPROTO); @@ -3157,8 +3154,7 @@ static int osc_enqueue_interpret(const struct lu_env *env, /* Complete obtaining the lock procedure. */ rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1, mode, aa->oa_flags, aa->oa_lvb, - sizeof(*aa->oa_lvb), lustre_swab_ost_lvb, - &handle, rc); + sizeof(*aa->oa_lvb), &handle, rc); /* Complete osc stuff. */ rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc); @@ -3325,7 +3321,7 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, *flags &= ~LDLM_FL_BLOCK_GRANTED; rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb, - sizeof(*lvb), lustre_swab_ost_lvb, lockh, async); + sizeof(*lvb), lockh, async); if (rqset) { if (!rc) { struct osc_enqueue_args *aa; diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index 2ee5f2b..b979851 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -106,40 +106,51 @@ static int ost_destroy(struct obd_export *exp, struct ptlrpc_request *req, struct obd_trans_info *oti) { struct ost_body *body, *repbody; - __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) }; struct lustre_capa *capa = NULL; int rc; ENTRY; - body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body), - lustre_swab_ost_body); + /* Get the request body */ + body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); if (body == NULL) RETURN(-EFAULT); if (body->oa.o_id == 0) RETURN(-EPROTO); - if (lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1)) { + /* If there's a DLM request, cancel the locks mentioned in it*/ + if (req_capsule_field_present(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT)) { struct ldlm_request *dlm; - dlm = lustre_swab_reqbuf(req, REQ_REC_OFF + 1, sizeof(*dlm), - lustre_swab_ldlm_request); + + dlm = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ); if (dlm == NULL) RETURN (-EFAULT); ldlm_request_cancel(req, dlm, 0); } - if (body->oa.o_valid & OBD_MD_FLOSSCAPA) - capa = lustre_unpack_incoming_capa(req, REQ_REC_OFF + 2); + /* If there's a capability, get it */ + if (body->oa.o_valid & OBD_MD_FLOSSCAPA) { + capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1); + if (capa == NULL) { + CERROR("Missing capability for OST DESTROY"); + RETURN (-EFAULT); + } + } - rc = lustre_pack_reply(req, 2, size, NULL); + /* Prepare the reply */ + rc = req_capsule_server_pack(&req->rq_pill); if (rc) RETURN(rc); + /* Get the log cancellation cookie */ if (body->oa.o_valid & OBD_MD_FLCOOKIE) oti->oti_logcookies = &body->oa.o_lcookie; - repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, - sizeof(*repbody)); + + /* Finish the reply */ + repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); memcpy(&repbody->oa, &body->oa, sizeof(body->oa)); + + /* Do the destroy and set the reply status accordingly */ req->rq_status = obd_destroy(exp, &body->oa, NULL, oti, NULL, capa); RETURN(0); } @@ -148,27 +159,30 @@ static int ost_getattr(struct obd_export *exp, struct ptlrpc_request *req) { struct ost_body *body, *repbody; struct obd_info oinfo = { { { 0 } } }; - __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) }; int rc; ENTRY; - body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body), - lustre_swab_ost_body); + body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); if (body == NULL) RETURN(-EFAULT); - rc = lustre_pack_reply(req, 2, size, NULL); + rc = req_capsule_server_pack(&req->rq_pill); if (rc) RETURN(rc); - repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, - sizeof(*repbody)); + repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); repbody->oa = body->oa; oinfo.oi_oa = &repbody->oa; - if (oinfo.oi_oa->o_valid & OBD_MD_FLOSSCAPA) - oinfo.oi_capa = lustre_unpack_incoming_capa(req, - REQ_REC_OFF + 1); + if (oinfo.oi_oa->o_valid & OBD_MD_FLOSSCAPA) { + oinfo.oi_capa = req_capsule_client_get(&req->rq_pill, + &RMF_CAPA1); + if (oinfo.oi_capa == NULL) { + CERROR("Missing capability for OST GETATTR"); + RETURN (-EFAULT); + } + } + req->rq_status = obd_getattr(exp, &oinfo); ost_drop_id(exp, &repbody->oa); RETURN(0); @@ -177,15 +191,14 @@ static int ost_getattr(struct obd_export *exp, struct ptlrpc_request *req) static int ost_statfs(struct ptlrpc_request *req) { struct obd_statfs *osfs; - __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) }; int rc; ENTRY; - rc = lustre_pack_reply(req, 2, size, NULL); + rc = req_capsule_server_pack(&req->rq_pill); if (rc) RETURN(rc); - osfs = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*osfs)); + osfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS); req->rq_status = obd_statfs(req->rq_export->exp_obd, osfs, cfs_time_current_64() - HZ, 0); @@ -201,21 +214,18 @@ static int ost_create(struct obd_export *exp, struct ptlrpc_request *req, struct obd_trans_info *oti) { struct ost_body *body, *repbody; - __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*repbody) }; int rc; ENTRY; - body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body), - lustre_swab_ost_body); + body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); if (body == NULL) RETURN(-EFAULT); - rc = lustre_pack_reply(req, 2, size, NULL); + rc = req_capsule_server_pack(&req->rq_pill); if (rc) RETURN(rc); - repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, - sizeof(*repbody)); + repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); memcpy(&repbody->oa, &body->oa, sizeof(body->oa)); oti->oti_logcookies = &repbody->oa.o_lcookie; @@ -224,7 +234,7 @@ static int ost_create(struct obd_export *exp, struct ptlrpc_request *req, RETURN(0); } -/* +/** * Helper function for ost_punch(): if asked by client, acquire [size, EOF] * lock on the file being truncated. */ @@ -271,11 +281,10 @@ static int ost_punch_lock_get(struct obd_export *exp, struct obdo *oa, RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id, LDLM_EXTENT, &policy, LCK_PW, &flags, ldlm_blocking_ast, ldlm_completion_ast, - ldlm_glimpse_ast, NULL, 0, NULL, - NULL, lh)); + ldlm_glimpse_ast, NULL, 0, NULL, lh)); } -/* +/** * Helper function for ost_punch(): release lock acquired by * ost_punch_lock_get(), if any. */ @@ -293,7 +302,6 @@ static int ost_punch(struct obd_export *exp, struct ptlrpc_request *req, { struct obd_info oinfo = { { { 0 } } }; struct ost_body *body, *repbody; - __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*repbody) }; int rc; struct lustre_handle lh = {0,}; ENTRY; @@ -301,9 +309,9 @@ static int ost_punch(struct obd_export *exp, struct ptlrpc_request *req, /* check that we do support OBD_CONNECT_TRUNCLOCK. */ CLASSERT(OST_CONNECT_SUPPORTED & OBD_CONNECT_TRUNCLOCK); - /* ost_body is varified and swabbed in ost_hpreq_handler() */ - body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body)); - LASSERT(body != NULL); + body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); + if (body == NULL) + RETURN(-EFAULT); oinfo.oi_oa = &body->oa; oinfo.oi_policy.l_extent.start = oinfo.oi_oa->o_size; @@ -313,12 +321,11 @@ static int ost_punch(struct obd_export *exp, struct ptlrpc_request *req, (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS)) RETURN(-EPROTO); - rc = lustre_pack_reply(req, 2, size, NULL); + rc = req_capsule_server_pack(&req->rq_pill); if (rc) RETURN(rc); - repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, - sizeof(*repbody)); + repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); rc = ost_punch_lock_get(exp, oinfo.oi_oa, &lh); if (rc == 0) { if (oinfo.oi_oa->o_valid & OBD_MD_FLFLAGS && @@ -330,9 +337,14 @@ static int ost_punch(struct obd_export *exp, struct ptlrpc_request *req, */ oinfo.oi_oa->o_valid &= ~OBD_MD_FLFLAGS; - if (oinfo.oi_oa->o_valid & OBD_MD_FLOSSCAPA) - oinfo.oi_capa = lustre_unpack_incoming_capa(req, - REQ_REC_OFF + 1); + if (oinfo.oi_oa->o_valid & OBD_MD_FLOSSCAPA) { + oinfo.oi_capa = req_capsule_client_get(&req->rq_pill, + &RMF_CAPA1); + if (oinfo.oi_capa == NULL) { + CERROR("Missing capability for OST PUNCH"); + RETURN (-EFAULT); + } + } req->rq_status = obd_punch(exp, &oinfo, oti, NULL); ost_punch_lock_put(exp, oinfo.oi_oa, &lh); } @@ -345,24 +357,26 @@ static int ost_sync(struct obd_export *exp, struct ptlrpc_request *req) { struct ost_body *body, *repbody; struct lustre_capa *capa = NULL; - __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*repbody) }; int rc; ENTRY; - body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body), - lustre_swab_ost_body); + body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); if (body == NULL) RETURN(-EFAULT); - if (body->oa.o_valid & OBD_MD_FLOSSCAPA) - capa = lustre_unpack_incoming_capa(req, REQ_REC_OFF + 1); + if (body->oa.o_valid & OBD_MD_FLOSSCAPA) { + capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1); + if (capa == NULL) { + CERROR("Missing capability for OST SYNC"); + RETURN (-EFAULT); + } + } - rc = lustre_pack_reply(req, 2, size, NULL); + rc = req_capsule_server_pack(&req->rq_pill); if (rc) RETURN(rc); - repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, - sizeof(*repbody)); + repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); memcpy(&repbody->oa, &body->oa, sizeof(body->oa)); req->rq_status = obd_sync(exp, &repbody->oa, NULL, repbody->oa.o_size, repbody->oa.o_blocks, capa); @@ -374,28 +388,30 @@ static int ost_setattr(struct obd_export *exp, struct ptlrpc_request *req, struct obd_trans_info *oti) { struct ost_body *body, *repbody; - __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*repbody) }; int rc; struct obd_info oinfo = { { { 0 } } }; ENTRY; - body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body), - lustre_swab_ost_body); + body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); if (body == NULL) RETURN(-EFAULT); - rc = lustre_pack_reply(req, 2, size, NULL); + rc = req_capsule_server_pack(&req->rq_pill); if (rc) RETURN(rc); - repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, - sizeof(*repbody)); + repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); repbody->oa = body->oa; oinfo.oi_oa = &repbody->oa; - if (oinfo.oi_oa->o_valid & OBD_MD_FLOSSCAPA) - oinfo.oi_capa = lustre_unpack_incoming_capa(req, - REQ_REC_OFF + 1); + if (oinfo.oi_oa->o_valid & OBD_MD_FLOSSCAPA) { + oinfo.oi_capa = req_capsule_client_get(&req->rq_pill, + &RMF_CAPA1); + if (oinfo.oi_capa == NULL) { + CERROR("Missing capability for OST SETATTR"); + RETURN (-EFAULT); + } + } req->rq_status = obd_setattr(exp, &oinfo, oti); ost_drop_id(exp, &repbody->oa); RETURN(0); @@ -461,10 +477,10 @@ static int ost_brw_lock_get(int mode, struct obd_export *exp, if (nrbufs == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK)) RETURN(0); - /* EXPENSIVE ASSERTION */ for (i = 1; i < nrbufs; i ++) - LASSERT((nb[0].flags & OBD_BRW_SRVLOCK) == - (nb[i].flags & OBD_BRW_SRVLOCK)); + if ((nb[0].flags & OBD_BRW_SRVLOCK) != + (nb[i].flags & OBD_BRW_SRVLOCK)) + RETURN(-EFAULT); policy.l_extent.start = nb[0].offset & CFS_PAGE_MASK; policy.l_extent.end = (nb[nrbufs - 1].offset + @@ -473,8 +489,7 @@ static int ost_brw_lock_get(int mode, struct obd_export *exp, RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id, LDLM_EXTENT, &policy, mode, &flags, ldlm_blocking_ast, ldlm_completion_ast, - ldlm_glimpse_ast, NULL, 0, NULL, - NULL, lh)); + ldlm_glimpse_ast, NULL, 0, NULL, lh)); } static void ost_brw_lock_put(int mode, @@ -619,7 +634,6 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti) struct lustre_capa *capa = NULL; struct l_wait_info lwi; struct lustre_handle lockh = { 0 }; - __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) }; int niocount, npages, nob = 0, rc, i; int no_reply = 0; ENTRY; @@ -644,21 +658,34 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti) /* ost_body, ioobj & noibuf_remote are verified and swabbed in * ost_rw_hpreq_check(). */ - body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body)); - LASSERT(body != NULL); + body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); + if (body == NULL) + GOTO(out, rc = -EFAULT); - ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioo)); - LASSERT(ioo != NULL); + /* + * A req_capsule_X_get_array(pill, field, ptr_to_element_count) function + * would be useful here and wherever we get &RMF_OBD_IOOBJ and + * &RMF_NIOBUF_REMOTE. + */ + ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ); + if (ioo == NULL) + GOTO(out, rc = -EFAULT); niocount = ioo->ioo_bufcnt; - remote_nb = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2, - niocount * sizeof(*remote_nb)); - LASSERT(remote_nb != NULL); + remote_nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE); + if (remote_nb == NULL) + GOTO(out, rc = -EFAULT); - if (body->oa.o_valid & OBD_MD_FLOSSCAPA) - capa = lustre_unpack_incoming_capa(req, REQ_REC_OFF + 3); + if (body->oa.o_valid & OBD_MD_FLOSSCAPA) { + capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1); + if (capa == NULL) { + CERROR("Missing capability for OST BRW READ"); + GOTO(out, rc = -EFAULT); + } + } - rc = lustre_pack_reply(req, 2, size, NULL); + req_capsule_set_size(&req->rq_pill, &RMF_RCS, RCL_SERVER, 0); + rc = req_capsule_server_pack(&req->rq_pill); if (rc) GOTO(out, rc); @@ -824,8 +851,7 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti) remote_nb, npages, local_nb, oti, rc); if (rc == 0) { - repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, - sizeof(*repbody)); + repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa)); ost_drop_id(exp, &repbody->oa); } @@ -873,7 +899,6 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti) struct lustre_handle lockh = {0}; struct lustre_capa *capa = NULL; __u32 *rcs; - __u32 size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) }; int objcount, niocount, npages; int rc, i, j; obd_count client_cksum = 0, server_cksum = 0; @@ -905,35 +930,47 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti) /* ost_body, ioobj & noibuf_remote are verified and swabbed in * ost_rw_hpreq_check(). */ - body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body)); - LASSERT(body != NULL); + body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); + if (body == NULL) + GOTO(out, rc = -EFAULT); if ((body->oa.o_flags & OBD_BRW_MEMALLOC) && (exp->exp_connection->c_peer.nid == exp->exp_connection->c_self)) libcfs_memory_pressure_set(); - objcount = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1) / - sizeof(*ioo); - ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, - objcount * sizeof(*ioo)); - LASSERT(ioo != NULL); + objcount = req_capsule_get_size(&req->rq_pill, &RMF_OBD_IOOBJ, + RCL_CLIENT) / sizeof(*ioo); + ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ); + if (ioo == NULL) + GOTO(out, rc = -EFAULT); for (niocount = i = 0; i < objcount; i++) niocount += ioo[i].ioo_bufcnt; - remote_nb = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2, - niocount * sizeof(*remote_nb)); - LASSERT(remote_nb != NULL); + /* + * It'd be nice to have a capsule function to indicate how many elements + * there were in a buffer for an RMF that's declared to be an array. + * It's easy enough to compute the number of elements here though. + */ + remote_nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE); + if (remote_nb == NULL || niocount != (req_capsule_get_size(&req->rq_pill, + &RMF_NIOBUF_REMOTE, RCL_CLIENT) / sizeof(*remote_nb))) + GOTO(out, rc = -EFAULT); - if (body->oa.o_valid & OBD_MD_FLOSSCAPA) - capa = lustre_unpack_incoming_capa(req, REQ_REC_OFF + 3); + if (body->oa.o_valid & OBD_MD_FLOSSCAPA) { + capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1); + if (capa == NULL) { + CERROR("Missing capability for OST BRW WRITE"); + GOTO(out, rc = -EFAULT); + } + } - size[REPLY_REC_OFF + 1] = niocount * sizeof(*rcs); - rc = lustre_pack_reply(req, 3, size, NULL); + req_capsule_set_size(&req->rq_pill, &RMF_RCS, RCL_SERVER, + niocount * sizeof(*rcs)); + rc = req_capsule_server_pack(&req->rq_pill); if (rc != 0) GOTO(out, rc); OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_PACK, obd_fail_val); - rcs = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF + 1, - niocount * sizeof(*rcs)); + rcs = req_capsule_server_get(&req->rq_pill, &RMF_RCS); /* * Per-thread array of struct niobuf_{local,remote}'s was allocated by @@ -1060,8 +1097,7 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti) } no_reply = rc != 0; - repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, - sizeof(*repbody)); + repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa)); if (unlikely(client_cksum != 0 && rc == 0)) { @@ -1189,47 +1225,64 @@ out: RETURN(rc); } +/** + * Implementation of OST_SET_INFO. + * + * OST_SET_INFO is like ioctl(): heavily overloaded. Specifically, it takes a + * "key" and a value RPC buffers as arguments, with the value's contents + * interpreted according to the key. + * + * Value types that need swabbing have swabbing done explicitly, either here or + * in functions called from here. This should be corrected: all swabbing should + * be done in the capsule abstraction, as that will then allow us to move + * swabbing exclusively to the client without having to modify server code + * outside the capsule abstraction's implementation itself. To correct this + * will require minor changes to the capsule abstraction; see the comments for + * req_capsule_extend() in layout.c. + */ static int ost_set_info(struct obd_export *exp, struct ptlrpc_request *req) { struct ost_body *body = NULL, *repbody; - __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) }; char *key, *val = NULL; int keylen, vallen, rc = 0; + int is_grant_shrink = 0; ENTRY; - key = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, 1); + key = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY); if (key == NULL) { DEBUG_REQ(D_HA, req, "no set_info key"); RETURN(-EFAULT); } - keylen = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF); + keylen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_KEY, + RCL_CLIENT); - if (KEY_IS(KEY_GRANT_SHRINK)) { - rc = lustre_pack_reply(req, 2, size, NULL); - if (rc) - RETURN(rc); - } else { - rc = lustre_pack_reply(req, 1, NULL, NULL); - if (rc) - RETURN(rc); - } + vallen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_VAL, + RCL_CLIENT); + + if ((is_grant_shrink = KEY_IS(KEY_GRANT_SHRINK))) + /* In this case the value is actually an RMF_OST_BODY, so we + * transmutate the type of this PTLRPC */ + req_capsule_extend(&req->rq_pill, &RQF_OST_SET_GRANT_INFO); + + rc = req_capsule_server_pack(&req->rq_pill); + if (rc) + RETURN(rc); - vallen = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1); if (vallen) { - if (KEY_IS(KEY_GRANT_SHRINK)) { - body = lustre_swab_reqbuf(req, REQ_REC_OFF + 1, - sizeof(*body), - lustre_swab_ost_body); + if (is_grant_shrink) { + body = req_capsule_client_get(&req->rq_pill, + &RMF_OST_BODY); if (!body) RETURN(-EFAULT); - repbody = lustre_msg_buf(req->rq_repmsg, - REPLY_REC_OFF, - sizeof(*repbody)); + repbody = req_capsule_server_get(&req->rq_pill, + &RMF_OST_BODY); memcpy(repbody, body, sizeof(*body)); val = (char*)repbody; - } else - val = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1,0); + } else { + val = req_capsule_client_get(&req->rq_pill, + &RMF_SETINFO_VAL); + } } if (KEY_IS(KEY_EVICT_BY_NID)) { @@ -1237,10 +1290,13 @@ static int ost_set_info(struct obd_export *exp, struct ptlrpc_request *req) obd_export_evict_by_nid(exp->exp_obd, val); GOTO(out, rc = 0); } else if (KEY_IS(KEY_MDS_CONN) && ptlrpc_req_need_swab(req)) { - /* Val's are not swabbed automatically */ + if (vallen < sizeof(__u32)) + RETURN(-EFAULT); __swab32s((__u32 *)val); } + /* OBD will also check if KEY_IS(KEY_GRANT_SHRINK), and will cast val to + * a struct ost_body * value */ rc = obd_set_info_async(exp, keylen, key, vallen, val, NULL); out: lustre_msg_set_status(req->rq_repmsg, 0); @@ -1254,8 +1310,6 @@ static int ost_get_info(struct obd_export *exp, struct ptlrpc_request *req) struct req_capsule *pill = &req->rq_pill; ENTRY; - req_capsule_set(&req->rq_pill, &RQF_OST_GET_INFO_GENERIC); - /* this common part for get_info rpc */ key = req_capsule_client_get(pill, &RMF_SETINFO_KEY); if (key == NULL) { @@ -1359,7 +1413,7 @@ static int ost_llog_handle_connect(struct obd_export *exp, int rc; ENTRY; - body = lustre_msg_buf(req->rq_reqmsg, 1, sizeof(*body)); + body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_CONN_BODY); rc = obd_llog_connect(exp, body); RETURN(rc); } @@ -1636,6 +1690,10 @@ int ost_msg_check_version(struct lustre_msg *msg) return rc; } +/** + * Returns 1 if the given PTLRPC matches the given LDLM locks, or 0 if it does + * not. + */ static int ost_rw_hpreq_lock_match(struct ptlrpc_request *req, struct ldlm_lock *lock) { @@ -1652,19 +1710,24 @@ static int ost_rw_hpreq_lock_match(struct ptlrpc_request *req, /* As the request may be covered by several locks, do not look at * o_handle, look at the RPC IO region. */ - body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body), - lustre_swab_obdo); - objcount = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1) / - sizeof(*ioo); - ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, - objcount * sizeof(*ioo)); - LASSERT(ioo != NULL); + body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); + if (body == NULL) + RETURN(0); + + objcount = req_capsule_get_size(&req->rq_pill, &RMF_OBD_IOOBJ, + RCL_CLIENT) / sizeof(*ioo); + ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ); + if (ioo == NULL) + RETURN(0); + for (niocount = i = 0; i < objcount; i++) niocount += ioo[i].ioo_bufcnt; - nb = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2, - niocount * sizeof(*nb)); - LASSERT(nb != NULL); + nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE); + if (nb == NULL || + niocount != (req_capsule_get_size(&req->rq_pill, &RMF_NIOBUF_REMOTE, + RCL_CLIENT) / sizeof(*nb))) + RETURN(0); mode = LCK_PW; if (opc == OST_READ) @@ -1690,9 +1753,14 @@ static int ost_rw_hpreq_lock_match(struct ptlrpc_request *req, } /** - * Swab buffers needed to call ost_rw_prolong_locks() and call it. - * Return the value from ost_rw_prolong_locks() which is non-zero if - * there is a cancelled lock which is waiting for this IO request. + * High-priority queue request check for whether the given PTLRPC request (\a + * req) is blocking an LDLM lock cancel. + * + * Returns 1 if the given given PTLRPC request (\a req) is blocking an LDLM lock + * cancel, 0 if it is not, and -EFAULT if the request is malformed. + * + * Only OST_READs, OST_WRITEs and OST_PUNCHes go on the h-p RPC queue. This + * function looks only at OST_READs and OST_WRITEs. */ static int ost_rw_hpreq_check(struct ptlrpc_request *req) { @@ -1706,21 +1774,25 @@ static int ost_rw_hpreq_check(struct ptlrpc_request *req) opc = lustre_msg_get_opc(req->rq_reqmsg); LASSERT(opc == OST_READ || opc == OST_WRITE); - body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body)); - LASSERT(body != NULL); + body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); + if (body == NULL) + RETURN(-EFAULT); - objcount = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1) / - sizeof(*ioo); - ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, - objcount * sizeof(*ioo)); - LASSERT(ioo != NULL); + objcount = req_capsule_get_size(&req->rq_pill, &RMF_OBD_IOOBJ, + RCL_CLIENT) / sizeof(*ioo); + ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ); + if (ioo == NULL) + RETURN(-EFAULT); for (niocount = i = 0; i < objcount; i++) niocount += ioo[i].ioo_bufcnt; - nb = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2, - niocount * sizeof(*nb)); - LASSERT(nb != NULL); - LASSERT(niocount == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK)); + nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE); + if (nb == NULL || + niocount != (req_capsule_get_size(&req->rq_pill, &RMF_NIOBUF_REMOTE, + RCL_CLIENT) / sizeof(*nb))) + RETURN(-EFAULT); + if (niocount != 0 && (nb[0].flags & OBD_BRW_SRVLOCK)) + RETURN(-EFAULT); mode = LCK_PW; if (opc == OST_READ) @@ -1762,15 +1834,18 @@ static int ost_punch_prolong_locks(struct ptlrpc_request *req, struct obdo *oa) RETURN(opd.opd_lock_match); } +/** + * Like ost_rw_hpreq_lock_match(), but for OST_PUNCH RPCs. + */ static int ost_punch_hpreq_lock_match(struct ptlrpc_request *req, struct ldlm_lock *lock) { struct ost_body *body; ENTRY; - body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body), - lustre_swab_obdo); - LASSERT(body != NULL); + body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); + if (body == NULL) + RETURN(0); /* can't return -EFAULT here */ if (body->oa.o_valid & OBD_MD_FLHANDLE && body->oa.o_handle.cookie == lock->l_handle.h_cookie) @@ -1778,11 +1853,17 @@ static int ost_punch_hpreq_lock_match(struct ptlrpc_request *req, RETURN(0); } +/** + * Like ost_rw_hpreq_check(), but for OST_PUNCH RPCs. + */ static int ost_punch_hpreq_check(struct ptlrpc_request *req) { - struct ost_body *body = lustre_msg_buf(req->rq_reqmsg, - REQ_REC_OFF, sizeof(*body)); - LASSERT(body != NULL); + struct ost_body *body; + + body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); + if (body == NULL) + RETURN(-EFAULT); + LASSERT(!(body->oa.o_valid & OBD_MD_FLFLAGS) || !(body->oa.o_flags & OBD_FL_TRUNCLOCK)); @@ -1811,18 +1892,27 @@ static int ost_hpreq_handler(struct ptlrpc_request *req) struct niobuf_remote *nb; struct obd_ioobj *ioo; int objcount, niocount; - int swab, i; + int i; + + /* RPCs on the H-P queue can be inspected before + * ost_handler() initializes their pills, so we + * initialize that here. Capsule initialization is + * idempotent, as is setting the pill's format (provided + * it doesn't change). + */ + req_capsule_init(&req->rq_pill, req, RCL_SERVER); + req_capsule_set(&req->rq_pill, &RQF_OST_BRW); - body = lustre_swab_reqbuf(req, REQ_REC_OFF, - sizeof(*body), - lustre_swab_obdo); - if (!body) { + body = req_capsule_client_get(&req->rq_pill, + &RMF_OST_BODY); + if (body == NULL) { CERROR("Missing/short ost_body\n"); RETURN(-EFAULT); } - objcount = lustre_msg_buflen(req->rq_reqmsg, - REQ_REC_OFF + 1) / - sizeof(*ioo); + objcount = req_capsule_get_size(&req->rq_pill, + &RMF_OBD_IOOBJ, + RCL_CLIENT) / + sizeof(*ioo); if (objcount == 0) { CERROR("Missing/short ioobj\n"); RETURN(-EFAULT); @@ -1832,18 +1922,14 @@ static int ost_hpreq_handler(struct ptlrpc_request *req) RETURN(-EFAULT); } - ioo = lustre_swab_reqbuf(req, REQ_REC_OFF + 1, - objcount * sizeof(*ioo), - lustre_swab_obd_ioobj); - if (!ioo) { + ioo = req_capsule_client_get(&req->rq_pill, + &RMF_OBD_IOOBJ); + if (ioo == NULL) { CERROR("Missing/short ioobj\n"); RETURN(-EFAULT); } - swab = ptlrpc_req_need_swab(req); for (niocount = i = 0; i < objcount; i++) { - if (i > 0 && swab) - lustre_swab_obd_ioobj(&ioo[i]); if (ioo[i].ioo_bufcnt == 0) { CERROR("ioo[%d] has zero bufcnt\n", i); RETURN(-EFAULT); @@ -1851,33 +1937,28 @@ static int ost_hpreq_handler(struct ptlrpc_request *req) niocount += ioo[i].ioo_bufcnt; } if (niocount > PTLRPC_MAX_BRW_PAGES) { - DEBUG_REQ(D_ERROR, req, "bulk has too many " - "pages (%d)", niocount); + DEBUG_REQ(D_RPCTRACE, req, + "bulk has too many pages (%d)", + niocount); RETURN(-EFAULT); } - nb = lustre_swab_reqbuf(req, REQ_REC_OFF + 2, - niocount * sizeof(*nb), - lustre_swab_niobuf_remote); - if (!nb) { + nb = req_capsule_client_get(&req->rq_pill, + &RMF_NIOBUF_REMOTE); + if (nb == NULL) { CERROR("Missing/short niobuf\n"); RETURN(-EFAULT); } - swab = ptlrpc_req_need_swab(req); - if (swab) { - /* swab remaining niobufs */ - for (i = 1; i < niocount; i++) - lustre_swab_niobuf_remote(&nb[i]); - } - if (niocount == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK)) req->rq_ops = &ost_hpreq_rw; } else if (opc == OST_PUNCH) { - body = lustre_swab_reqbuf(req, REQ_REC_OFF, - sizeof(*body), - lustre_swab_obdo); - if (!body) { + req_capsule_init(&req->rq_pill, req, RCL_SERVER); + req_capsule_set(&req->rq_pill, &RQF_OST_PUNCH); + + body = req_capsule_client_get(&req->rq_pill, + &RMF_OST_BODY); + if (body == NULL) { CERROR("Missing/short ost_body\n"); RETURN(-EFAULT); } @@ -1973,6 +2054,7 @@ int ost_handle(struct ptlrpc_request *req) break; case OST_CREATE: CDEBUG(D_INODE, "create\n"); + req_capsule_set(&req->rq_pill, &RQF_OST_CREATE); if (OBD_FAIL_CHECK(OBD_FAIL_OST_CREATE_NET)) RETURN(0); if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOSPC)) @@ -1983,6 +2065,7 @@ int ost_handle(struct ptlrpc_request *req) break; case OST_DESTROY: CDEBUG(D_INODE, "destroy\n"); + req_capsule_set(&req->rq_pill, &RQF_OST_DESTROY); if (OBD_FAIL_CHECK(OBD_FAIL_OST_DESTROY_NET)) RETURN(0); if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS)) @@ -1991,17 +2074,20 @@ int ost_handle(struct ptlrpc_request *req) break; case OST_GETATTR: CDEBUG(D_INODE, "getattr\n"); + req_capsule_set(&req->rq_pill, &RQF_OST_GETATTR); if (OBD_FAIL_CHECK(OBD_FAIL_OST_GETATTR_NET)) RETURN(0); rc = ost_getattr(req->rq_export, req); break; case OST_SETATTR: CDEBUG(D_INODE, "setattr\n"); + req_capsule_set(&req->rq_pill, &RQF_OST_SETATTR); if (OBD_FAIL_CHECK(OBD_FAIL_OST_SETATTR_NET)) RETURN(0); rc = ost_setattr(req->rq_export, req, oti); break; case OST_WRITE: + req_capsule_set(&req->rq_pill, &RQF_OST_BRW); CDEBUG(D_INODE, "write\n"); /* req->rq_request_portal would be nice, if it was set */ if (req->rq_rqbd->rqbd_service->srv_req_portal !=OST_IO_PORTAL){ @@ -2022,6 +2108,7 @@ int ost_handle(struct ptlrpc_request *req) /* ost_brw_write sends its own replies */ RETURN(rc); case OST_READ: + req_capsule_set(&req->rq_pill, &RQF_OST_BRW); CDEBUG(D_INODE, "read\n"); /* req->rq_request_portal would be nice, if it was set */ if (req->rq_rqbd->rqbd_service->srv_req_portal !=OST_IO_PORTAL){ @@ -2039,6 +2126,7 @@ int ost_handle(struct ptlrpc_request *req) RETURN(rc); case OST_PUNCH: CDEBUG(D_INODE, "punch\n"); + req_capsule_set(&req->rq_pill, &RQF_OST_PUNCH); if (OBD_FAIL_CHECK(OBD_FAIL_OST_PUNCH_NET)) RETURN(0); if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS)) @@ -2047,12 +2135,14 @@ int ost_handle(struct ptlrpc_request *req) break; case OST_STATFS: CDEBUG(D_INODE, "statfs\n"); + req_capsule_set(&req->rq_pill, &RQF_OST_STATFS); if (OBD_FAIL_CHECK(OBD_FAIL_OST_STATFS_NET)) RETURN(0); rc = ost_statfs(req); break; case OST_SYNC: CDEBUG(D_INODE, "sync\n"); + req_capsule_set(&req->rq_pill, &RQF_OST_SYNC); if (OBD_FAIL_CHECK(OBD_FAIL_OST_SYNC_NET)) RETURN(0); rc = ost_sync(req->rq_export, req); @@ -2064,6 +2154,7 @@ int ost_handle(struct ptlrpc_request *req) break; case OST_GET_INFO: DEBUG_REQ(D_INODE, req, "get_info"); + req_capsule_set(&req->rq_pill, &RQF_OST_GET_INFO_GENERIC); rc = ost_get_info(req->rq_export, req); break; #ifdef HAVE_QUOTA_SUPPORT diff --git a/lustre/ptlrpc/layout.c b/lustre/ptlrpc/layout.c index cdc9ce6..4cddc07 100644 --- a/lustre/ptlrpc/layout.c +++ b/lustre/ptlrpc/layout.c @@ -39,6 +39,12 @@ * * Author: Nikita Danilov */ +/* + * This file contains the "capsule/pill" abstraction layered above PTLRPC. + * + * Every struct ptlrpc_request contains a "pill", which points to a description + * of the format that the request conforms to. + */ #if !defined(__REQ_LAYOUT_USER__) @@ -67,10 +73,13 @@ /* struct ptlrpc_request, lustre_msg* */ #include #include +#include /* - * empty set of fields... for suitable definition of emptiness. + * RQFs (see below) refer to two struct req_msg_field arrays describing the + * client request and server reply, respectively. */ +/* empty set of fields... for suitable definition of emptiness. */ static const struct req_msg_field *empty[] = { &RMF_PTLRPC_BODY }; @@ -286,6 +295,12 @@ static const struct req_msg_field *obd_set_info_client[] = { &RMF_SETINFO_VAL }; +static const struct req_msg_field *ost_grant_shrink_client[] = { + &RMF_PTLRPC_BODY, + &RMF_SETINFO_KEY, + &RMF_OST_BODY +}; + static const struct req_msg_field *mds_getinfo_client[] = { &RMF_PTLRPC_BODY, &RMF_GETINFO_KEY, @@ -497,7 +512,7 @@ static const struct req_msg_field *ost_brw_client[] = { static const struct req_msg_field *ost_brw_server[] = { &RMF_PTLRPC_BODY, &RMF_OST_BODY, - &RMF_NIOBUF_REMOTE + &RMF_RCS }; static const struct req_msg_field *ost_get_info_generic_server[] = { @@ -609,16 +624,31 @@ struct req_msg_field { __u32 rmf_flags; const char *rmf_name; /** - * Field length. (-1) means "variable length". + * Field length. (-1) means "variable length". If the + * \a RMF_F_STRUCT_ARRAY flag is set the field is also variable-length, + * but the actual size must be a whole multiple of \a rmf_size. */ int rmf_size; void (*rmf_swabber)(void *); + void (*rmf_dumper)(void *); int rmf_offset[ARRAY_SIZE(req_formats)][RCL_NR]; }; enum rmf_flags { + /** + * The field is a string, must be NUL-terminated. + */ RMF_F_STRING = 1 << 0, - RMF_F_NO_SIZE_CHECK = 1 << 1 + /** + * The field's buffer size need not match the declared \a rmf_size. + */ + RMF_F_NO_SIZE_CHECK = 1 << 1, + /** + * The field's buffer size must be a whole multiple of the declared \a + * rmf_size and the \a rmf_swabber function must work on the declared \a + * rmf_size worth of bytes. + */ + RMF_F_STRUCT_ARRAY = 1 << 2 }; struct req_capsule; @@ -626,208 +656,219 @@ struct req_capsule; /* * Request fields. */ -#define DEFINE_MSGF(name, flags, size, swabber) { \ - .rmf_name = (name), \ - .rmf_flags = (flags), \ - .rmf_size = (size), \ - .rmf_swabber = (void (*)(void*))(swabber) \ +#define DEFINE_MSGF(name, flags, size, swabber, dumper) { \ + .rmf_name = (name), \ + .rmf_flags = (flags), \ + .rmf_size = (size), \ + .rmf_swabber = (void (*)(void*))(swabber), \ + .rmf_dumper = (void (*)(void*))(dumper) \ } const struct req_msg_field RMF_GENERIC_DATA = DEFINE_MSGF("generic_data", 0, - -1, NULL); + -1, NULL, NULL); EXPORT_SYMBOL(RMF_GENERIC_DATA); const struct req_msg_field RMF_MGS_TARGET_INFO = DEFINE_MSGF("mgs_target_info", 0, sizeof(struct mgs_target_info), - lustre_swab_mgs_target_info); + lustre_swab_mgs_target_info, NULL); EXPORT_SYMBOL(RMF_MGS_TARGET_INFO); const struct req_msg_field RMF_MGS_SEND_PARAM = DEFINE_MSGF("mgs_send_param", 0, sizeof(struct mgs_send_param), - NULL); + NULL, NULL); EXPORT_SYMBOL(RMF_MGS_SEND_PARAM); const struct req_msg_field RMF_SETINFO_VAL = - DEFINE_MSGF("setinfo_val", 0, -1, NULL); + DEFINE_MSGF("setinfo_val", 0, -1, NULL, NULL); EXPORT_SYMBOL(RMF_SETINFO_VAL); const struct req_msg_field RMF_GETINFO_KEY = - DEFINE_MSGF("getinfo_key", 0, -1, NULL); + DEFINE_MSGF("getinfo_key", 0, -1, NULL, NULL); EXPORT_SYMBOL(RMF_GETINFO_KEY); const struct req_msg_field RMF_GETINFO_VALLEN = DEFINE_MSGF("getinfo_vallen", 0, - sizeof(__u32), lustre_swab_generic_32s); + sizeof(__u32), lustre_swab_generic_32s, NULL); EXPORT_SYMBOL(RMF_GETINFO_VALLEN); const struct req_msg_field RMF_GETINFO_VAL = - DEFINE_MSGF("getinfo_val", 0, -1, NULL); + DEFINE_MSGF("getinfo_val", 0, -1, NULL, NULL); EXPORT_SYMBOL(RMF_GETINFO_VAL); const struct req_msg_field RMF_SEQ_OPC = DEFINE_MSGF("seq_query_opc", 0, - sizeof(__u32), lustre_swab_generic_32s); + sizeof(__u32), lustre_swab_generic_32s, NULL); EXPORT_SYMBOL(RMF_SEQ_OPC); const struct req_msg_field RMF_SEQ_RANGE = DEFINE_MSGF("seq_query_range", 0, - sizeof(struct lu_seq_range), lustre_swab_lu_seq_range); + sizeof(struct lu_seq_range), + lustre_swab_lu_seq_range, NULL); EXPORT_SYMBOL(RMF_SEQ_RANGE); const struct req_msg_field RMF_FLD_OPC = DEFINE_MSGF("fld_query_opc", 0, - sizeof(__u32), lustre_swab_generic_32s); + sizeof(__u32), lustre_swab_generic_32s, NULL); EXPORT_SYMBOL(RMF_FLD_OPC); const struct req_msg_field RMF_FLD_MDFLD = DEFINE_MSGF("fld_query_mdfld", 0, - sizeof(struct lu_seq_range), lustre_swab_lu_seq_range); + sizeof(struct lu_seq_range), + lustre_swab_lu_seq_range, NULL); EXPORT_SYMBOL(RMF_FLD_MDFLD); const struct req_msg_field RMF_MDT_BODY = DEFINE_MSGF("mdt_body", 0, - sizeof(struct mdt_body), lustre_swab_mdt_body); + sizeof(struct mdt_body), lustre_swab_mdt_body, NULL); EXPORT_SYMBOL(RMF_MDT_BODY); const struct req_msg_field RMF_OBD_QUOTACTL = DEFINE_MSGF("obd_quotactl", 0, - sizeof(struct obd_quotactl), lustre_swab_obd_quotactl); + sizeof(struct obd_quotactl), + lustre_swab_obd_quotactl, NULL); EXPORT_SYMBOL(RMF_OBD_QUOTACTL); const struct req_msg_field RMF_QUOTA_ADJUST_QUNIT = DEFINE_MSGF("quota_adjust_qunit", 0, sizeof(struct quota_adjust_qunit), - lustre_swab_quota_adjust_qunit); + lustre_swab_quota_adjust_qunit, NULL); EXPORT_SYMBOL(RMF_QUOTA_ADJUST_QUNIT); const struct req_msg_field RMF_QUNIT_DATA = DEFINE_MSGF("qunit_data", 0, - sizeof(struct qunit_data), NULL); + sizeof(struct qunit_data), lustre_swab_qdata, NULL); EXPORT_SYMBOL(RMF_QUNIT_DATA); const struct req_msg_field RMF_MDT_EPOCH = DEFINE_MSGF("mdt_epoch", 0, - sizeof(struct mdt_epoch), lustre_swab_mdt_epoch); + sizeof(struct mdt_epoch), lustre_swab_mdt_epoch, NULL); EXPORT_SYMBOL(RMF_MDT_EPOCH); const struct req_msg_field RMF_PTLRPC_BODY = DEFINE_MSGF("ptlrpc_body", 0, - sizeof(struct ptlrpc_body), lustre_swab_ptlrpc_body); + sizeof(struct ptlrpc_body), lustre_swab_ptlrpc_body, NULL); EXPORT_SYMBOL(RMF_PTLRPC_BODY); const struct req_msg_field RMF_OBD_STATFS = DEFINE_MSGF("obd_statfs", 0, - sizeof(struct obd_statfs), lustre_swab_obd_statfs); + sizeof(struct obd_statfs), lustre_swab_obd_statfs, NULL); EXPORT_SYMBOL(RMF_OBD_STATFS); const struct req_msg_field RMF_SETINFO_KEY = - DEFINE_MSGF("setinfo_key", 0, -1, NULL); + DEFINE_MSGF("setinfo_key", 0, -1, NULL, NULL); EXPORT_SYMBOL(RMF_SETINFO_KEY); const struct req_msg_field RMF_NAME = - DEFINE_MSGF("name", RMF_F_STRING, -1, NULL); + DEFINE_MSGF("name", RMF_F_STRING, -1, NULL, NULL); EXPORT_SYMBOL(RMF_NAME); const struct req_msg_field RMF_SYMTGT = - DEFINE_MSGF("symtgt", RMF_F_STRING, -1, NULL); + DEFINE_MSGF("symtgt", RMF_F_STRING, -1, NULL, NULL); EXPORT_SYMBOL(RMF_SYMTGT); const struct req_msg_field RMF_TGTUUID = - DEFINE_MSGF("tgtuuid", RMF_F_STRING, sizeof(struct obd_uuid) - 1, NULL); + DEFINE_MSGF("tgtuuid", RMF_F_STRING, sizeof(struct obd_uuid) - 1, NULL, + NULL); EXPORT_SYMBOL(RMF_TGTUUID); const struct req_msg_field RMF_CLUUID = - DEFINE_MSGF("cluuid", RMF_F_STRING, sizeof(struct obd_uuid) - 1, NULL); + DEFINE_MSGF("cluuid", RMF_F_STRING, sizeof(struct obd_uuid) - 1, NULL, + NULL); EXPORT_SYMBOL(RMF_CLUUID); const struct req_msg_field RMF_STRING = - DEFINE_MSGF("string", RMF_F_STRING, -1, NULL); + DEFINE_MSGF("string", RMF_F_STRING, -1, NULL, NULL); EXPORT_SYMBOL(RMF_STRING); const struct req_msg_field RMF_LLOGD_BODY = DEFINE_MSGF("llogd_body", 0, - sizeof(struct llogd_body), lustre_swab_llogd_body); + sizeof(struct llogd_body), lustre_swab_llogd_body, NULL); EXPORT_SYMBOL(RMF_LLOGD_BODY); const struct req_msg_field RMF_LLOG_LOG_HDR = DEFINE_MSGF("llog_log_hdr", 0, - sizeof(struct llog_log_hdr), lustre_swab_llog_hdr); + sizeof(struct llog_log_hdr), lustre_swab_llog_hdr, NULL); EXPORT_SYMBOL(RMF_LLOG_LOG_HDR); const struct req_msg_field RMF_LLOGD_CONN_BODY = DEFINE_MSGF("llogd_conn_body", 0, sizeof(struct llogd_conn_body), - lustre_swab_llogd_conn_body); + lustre_swab_llogd_conn_body, NULL); EXPORT_SYMBOL(RMF_LLOGD_CONN_BODY); /* * connection handle received in MDS_CONNECT request. * - * XXX no swabbing? + * No swabbing needed because struct lustre_handle contains only a 64-bit cookie + * that the client does not interpret at all. */ const struct req_msg_field RMF_CONN = - DEFINE_MSGF("conn", 0, sizeof(struct lustre_handle), NULL); + DEFINE_MSGF("conn", 0, sizeof(struct lustre_handle), NULL, NULL); EXPORT_SYMBOL(RMF_CONN); const struct req_msg_field RMF_CONNECT_DATA = DEFINE_MSGF("cdata", RMF_F_NO_SIZE_CHECK /* we allow extra space for interop */, - sizeof(struct obd_connect_data), lustre_swab_connect); + sizeof(struct obd_connect_data), lustre_swab_connect, NULL); EXPORT_SYMBOL(RMF_CONNECT_DATA); const struct req_msg_field RMF_DLM_REQ = DEFINE_MSGF("dlm_req", RMF_F_NO_SIZE_CHECK /* ldlm_request_bufsize */, - sizeof(struct ldlm_request), lustre_swab_ldlm_request); + sizeof(struct ldlm_request), + lustre_swab_ldlm_request, NULL); EXPORT_SYMBOL(RMF_DLM_REQ); const struct req_msg_field RMF_DLM_REP = DEFINE_MSGF("dlm_rep", 0, - sizeof(struct ldlm_reply), lustre_swab_ldlm_reply); + sizeof(struct ldlm_reply), lustre_swab_ldlm_reply, NULL); EXPORT_SYMBOL(RMF_DLM_REP); const struct req_msg_field RMF_LDLM_INTENT = DEFINE_MSGF("ldlm_intent", 0, - sizeof(struct ldlm_intent), lustre_swab_ldlm_intent); + sizeof(struct ldlm_intent), lustre_swab_ldlm_intent, NULL); EXPORT_SYMBOL(RMF_LDLM_INTENT); const struct req_msg_field RMF_DLM_LVB = - DEFINE_MSGF("dlm_lvb", 0, sizeof(struct ost_lvb), NULL); + DEFINE_MSGF("dlm_lvb", 0, sizeof(struct ost_lvb), lustre_swab_ost_lvb, + NULL); EXPORT_SYMBOL(RMF_DLM_LVB); const struct req_msg_field RMF_MDT_MD = - DEFINE_MSGF("mdt_md", RMF_F_NO_SIZE_CHECK, MIN_MD_SIZE, NULL); + DEFINE_MSGF("mdt_md", RMF_F_NO_SIZE_CHECK, MIN_MD_SIZE, NULL, NULL); EXPORT_SYMBOL(RMF_MDT_MD); const struct req_msg_field RMF_REC_REINT = DEFINE_MSGF("rec_reint", 0, sizeof(struct mdt_rec_reint), - lustre_swab_mdt_rec_reint); + lustre_swab_mdt_rec_reint, NULL); EXPORT_SYMBOL(RMF_REC_REINT); /* FIXME: this length should be defined as a macro */ -const struct req_msg_field RMF_EADATA = DEFINE_MSGF("eadata", 0, -1, NULL); +const struct req_msg_field RMF_EADATA = DEFINE_MSGF("eadata", 0, -1, + NULL, NULL); EXPORT_SYMBOL(RMF_EADATA); const struct req_msg_field RMF_ACL = DEFINE_MSGF("acl", RMF_F_NO_SIZE_CHECK, - LUSTRE_POSIX_ACL_MAX_SIZE, NULL); + LUSTRE_POSIX_ACL_MAX_SIZE, NULL, NULL); EXPORT_SYMBOL(RMF_ACL); +/* FIXME: this should be made to use RMF_F_STRUCT_ARRAY */ const struct req_msg_field RMF_LOGCOOKIES = DEFINE_MSGF("logcookies", RMF_F_NO_SIZE_CHECK /* multiple cookies */, - sizeof(struct llog_cookie), NULL); + sizeof(struct llog_cookie), NULL, NULL); EXPORT_SYMBOL(RMF_LOGCOOKIES); const struct req_msg_field RMF_CAPA1 = DEFINE_MSGF("capa", 0, sizeof(struct lustre_capa), - lustre_swab_lustre_capa); + lustre_swab_lustre_capa, NULL); EXPORT_SYMBOL(RMF_CAPA1); const struct req_msg_field RMF_CAPA2 = DEFINE_MSGF("capa", 0, sizeof(struct lustre_capa), - lustre_swab_lustre_capa); + lustre_swab_lustre_capa, NULL); EXPORT_SYMBOL(RMF_CAPA2); /* @@ -835,30 +876,37 @@ EXPORT_SYMBOL(RMF_CAPA2); */ const struct req_msg_field RMF_OST_BODY = DEFINE_MSGF("ost_body", 0, - sizeof(struct ost_body), lustre_swab_ost_body); + sizeof(struct ost_body), lustre_swab_ost_body, dump_ost_body); EXPORT_SYMBOL(RMF_OST_BODY); const struct req_msg_field RMF_OBD_IOOBJ = - DEFINE_MSGF("obd_ioobj", 0, - sizeof(struct obd_ioobj), lustre_swab_obd_ioobj); + DEFINE_MSGF("obd_ioobj", RMF_F_STRUCT_ARRAY, + sizeof(struct obd_ioobj), lustre_swab_obd_ioobj, dump_ioo); EXPORT_SYMBOL(RMF_OBD_IOOBJ); const struct req_msg_field RMF_NIOBUF_REMOTE = - DEFINE_MSGF("niobuf_remote", 0, -1, lustre_swab_niobuf_remote); + DEFINE_MSGF("niobuf_remote", RMF_F_STRUCT_ARRAY, + sizeof(struct niobuf_remote), lustre_swab_niobuf_remote, + dump_rniobuf); EXPORT_SYMBOL(RMF_NIOBUF_REMOTE); +const struct req_msg_field RMF_RCS = + DEFINE_MSGF("niobuf_remote", RMF_F_STRUCT_ARRAY, sizeof(__u32), + lustre_swab_generic_32s, dump_rcs); +EXPORT_SYMBOL(RMF_RCS); + const struct req_msg_field RMF_OBD_ID = DEFINE_MSGF("obd_id", 0, - sizeof(obd_id), lustre_swab_ost_last_id); + sizeof(obd_id), lustre_swab_ost_last_id, NULL); EXPORT_SYMBOL(RMF_OBD_ID); const struct req_msg_field RMF_FIEMAP_KEY = DEFINE_MSGF("fiemap", 0, sizeof(struct ll_fiemap_info_key), - lustre_swab_fiemap); + lustre_swab_fiemap, NULL); EXPORT_SYMBOL(RMF_FIEMAP_KEY); const struct req_msg_field RMF_FIEMAP_VAL = - DEFINE_MSGF("fiemap", 0, -1, lustre_swab_fiemap); + DEFINE_MSGF("fiemap", 0, -1, lustre_swab_fiemap, NULL); EXPORT_SYMBOL(RMF_FIEMAP_VAL); /* @@ -1220,7 +1268,7 @@ const struct req_format RQF_OST_STATFS = EXPORT_SYMBOL(RQF_OST_STATFS); const struct req_format RQF_OST_SET_GRANT_INFO = - DEFINE_REQ_FMT0("OST_SET_GRANT_INFO", obd_set_info_client, + DEFINE_REQ_FMT0("OST_SET_GRANT_INFO", ost_grant_shrink_client, ost_body_only); EXPORT_SYMBOL(RQF_OST_SET_GRANT_INFO); @@ -1242,6 +1290,13 @@ EXPORT_SYMBOL(RQF_OST_GET_INFO_FIEMAP); #if !defined(__REQ_LAYOUT_USER__) +/* Convenience macro */ +#define FMT_FIELD(fmt, i, j) (fmt)->rf_fields[(i)].d[(j)] + +/** + * Initializes the capsule abstraction by computing and setting the \a rf_idx + * field of RQFs and the \a rmf_offset field of RMFs. + */ int req_layout_init(void) { int i; @@ -1258,6 +1313,8 @@ int req_layout_init(void) struct req_msg_field *field; field = (typeof(field))rf->rf_fields[j].d[k]; + LASSERT(!(field->rmf_flags & RMF_F_STRUCT_ARRAY) + || field->rmf_size > 0); LASSERT(field->rmf_offset[i][j] == 0); /* * k + 1 to detect unused format/field @@ -1276,6 +1333,14 @@ void req_layout_fini(void) } EXPORT_SYMBOL(req_layout_fini); +/** + * Initializes the expected sizes of each RMF in a \a pill (\a rc_area) to -1. + * + * Actual/expected field sizes are set elsewhere in functions in this file: + * req_capsule_init(), req_capsule_server_pack(), req_capsule_set_size() and + * req_capsule_msg_size(). The \a rc_area information is used by. + * ptlrpc_request_set_replen(). + */ void req_capsule_init_area(struct req_capsule *pill) { int i; @@ -1287,11 +1352,11 @@ void req_capsule_init_area(struct req_capsule *pill) } EXPORT_SYMBOL(req_capsule_init_area); -/* - * Initialize capsule. +/** + * Initialize a pill. * - * @area is an array of REQ_MAX_FIELD_NR elements, used to store sizes of - * variable-sized fields. + * The \a location indicates whether the caller is executing on the client side + * (RCL_CLIENT) or server side (RCL_SERVER).. */ void req_capsule_init(struct req_capsule *pill, struct ptlrpc_request *req, @@ -1299,10 +1364,27 @@ void req_capsule_init(struct req_capsule *pill, { LASSERT(location == RCL_SERVER || location == RCL_CLIENT); + /* + * Today all capsules are embedded in ptlrpc_request structs, + * but just in case that ever isn't the case, we don't reach + * into req unless req != NULL and pill is the one embedded in + * the req. + * + * The req->rq_pill_init flag makes it safe to initialize a pill + * twice, which might happen in the OST paths as a result of the + * high-priority RPC queue getting peeked at before ost_handle() + * handles an OST RPC. + */ + if (req != NULL && pill == &req->rq_pill && req->rq_pill_init) + return; + memset(pill, 0, sizeof *pill); pill->rc_req = req; pill->rc_loc = location; req_capsule_init_area(pill); + + if (req != NULL && pill == &req->rq_pill) + req->rq_pill_init = 1; } EXPORT_SYMBOL(req_capsule_init); @@ -1327,15 +1409,27 @@ static struct lustre_msg *__req_msg(const struct req_capsule *pill, return loc == RCL_CLIENT ? req->rq_reqmsg : req->rq_repmsg; } +/** + * Set the format (\a fmt) of a \a pill; format changes are not allowed here + * (see req_capsule_extend()). + */ void req_capsule_set(struct req_capsule *pill, const struct req_format *fmt) { - LASSERT(pill->rc_fmt == NULL); + LASSERT(pill->rc_fmt == NULL || pill->rc_fmt == fmt); LASSERT(__req_format_is_sane(fmt)); pill->rc_fmt = fmt; } EXPORT_SYMBOL(req_capsule_set); +/** + * Fills in any parts of the \a rc_area of a \a pill that haven't been filled in + * yet. + + * \a rc_area is an array of REQ_MAX_FIELD_NR elements, used to store sizes of + * variable-sized fields. The field sizes come from the declared \a rmf_size + * field of a \a pill's \a rc_fmt's RMF's. + */ int req_capsule_filled_sizes(struct req_capsule *pill, enum req_location loc) { @@ -1349,7 +1443,12 @@ int req_capsule_filled_sizes(struct req_capsule *pill, pill->rc_area[loc][i] = fmt->rf_fields[loc].d[i]->rmf_size; if (pill->rc_area[loc][i] == -1) { - /* skip the following fields */ + /* + * Skip the following fields. + * + * If this LASSERT() trips then you're missing a + * call to req_capsule_set_size(). + */ LASSERT(loc != RCL_SERVER); break; } @@ -1359,6 +1458,13 @@ int req_capsule_filled_sizes(struct req_capsule *pill, } EXPORT_SYMBOL(req_capsule_filled_sizes); +/** + * Capsule equivalent of lustre_pack_request() and lustre_pack_reply(). + * + * This function uses the \a pill's \a rc_area as filled in by + * req_capsule_set_size() or req_capsule_filled_sizes() (the latter is called by + * this function). + */ int req_capsule_server_pack(struct req_capsule *pill) { const struct req_format *fmt; @@ -1374,13 +1480,17 @@ int req_capsule_server_pack(struct req_capsule *pill) pill->rc_area[RCL_SERVER], NULL); if (rc != 0) { DEBUG_REQ(D_ERROR, pill->rc_req, - "Cannot pack %d fields in format `%s': ", - count, fmt->rf_name); + "Cannot pack %d fields in format `%s': ", + count, fmt->rf_name); } return rc; } EXPORT_SYMBOL(req_capsule_server_pack); +/** + * Returns the PTLRPC request or reply (\a loc) buffer offset of a \a pill + * corresponding to the given RMF (\a field). + */ static int __req_capsule_offset(const struct req_capsule *pill, const struct req_msg_field *field, enum req_location loc) @@ -1397,17 +1507,99 @@ static int __req_capsule_offset(const struct req_capsule *pill, return offset; } +/** + * Helper for __req_capsule_get(); swabs value / array of values and/or dumps + * them if desired. + */ +static +void +swabber_dumper_helper(struct req_capsule *pill, + const struct req_msg_field *field, + enum req_location loc, + int offset, + void *value, int len, int dump, void (*swabber)( void *)) +{ + void *p; + int i; + int n; + int do_swab; + int inout = loc == RCL_CLIENT; + + swabber = swabber ?: field->rmf_swabber; + + if (ptlrpc_buf_need_swab(pill->rc_req, inout, offset) && + swabber != NULL && value != NULL) + do_swab = 1; + else + do_swab = 0; + + if (!(field->rmf_flags & RMF_F_STRUCT_ARRAY)) { + if (dump && field->rmf_dumper) { + CDEBUG(D_RPCTRACE, "Dump of %sfield %s follows\n", + do_swab ? "unswabbed " : "", field->rmf_name); + field->rmf_dumper(value); + } + if (!do_swab) + return; + swabber(value); + ptlrpc_buf_set_swabbed(pill->rc_req, inout, offset); + if (dump) { + CDEBUG(D_RPCTRACE, "Dump of swabbed field %s " + "follows\n", field->rmf_name); + field->rmf_dumper(value); + } + + return; + } + + /* + * We're swabbing an array; swabber() swabs a single array element, so + * swab every element. + */ + LASSERT((len % field->rmf_size) == 0); + for (p = value, i = 0, n = len / field->rmf_size; + i < n; + i++, p += field->rmf_size) { + if (dump && field->rmf_dumper) { + CDEBUG(D_RPCTRACE, "Dump of %sarray field %s, " + "element %d follows\n", + do_swab ? "unswabbed " : "", field->rmf_name, i); + field->rmf_dumper(p); + } + if (!do_swab) + continue; + swabber(p); + if (dump && field->rmf_dumper) { + CDEBUG(D_RPCTRACE, "Dump of swabbed array field %s, " + "element %d follows\n", field->rmf_name, i); + field->rmf_dumper(value); + } + } + if (do_swab) + ptlrpc_buf_set_swabbed(pill->rc_req, inout, offset); +} + +/** + * Returns the pointer to a PTLRPC request or reply (\a loc) buffer of a \a pill + * corresponding to the given RMF (\a field). + * + * The buffer will be swabbed using the given \a swabber. If \a swabber == NULL + * then the \a rmf_swabber from the RMF will be used. Soon there will be no + * calls to __req_capsule_get() with a non-NULL \a swabber; \a swabber will then + * be removed. Fields with the \a RMF_F_STRUCT_ARRAY flag set will have each + * element of the array swabbed. + */ static void *__req_capsule_get(struct req_capsule *pill, const struct req_msg_field *field, enum req_location loc, - void (*swabber)( void *)) + void (*swabber)( void *), + int dump) { const struct req_format *fmt; struct lustre_msg *msg; void *value; int len; int offset; - int inout = loc == RCL_CLIENT; void *(*getter)(struct lustre_msg *m, int n, int minlen); @@ -1431,86 +1623,193 @@ static void *__req_capsule_get(struct req_capsule *pill, getter = (field->rmf_flags & RMF_F_STRING) ? (typeof(getter))lustre_msg_string : lustre_msg_buf; - if (pill->rc_area[loc][offset] != -1) + if (field->rmf_flags & RMF_F_STRUCT_ARRAY) { + /* + * We've already asserted that field->rmf_size > 0 in + * req_layout_init(). + */ + len = lustre_msg_buflen(msg, offset); + if ((len % field->rmf_size) != 0) { + CERROR("%s: array field size mismatch " + "%d modulo %d != 0 (%d)\n", + field->rmf_name, len, field->rmf_size, loc); + return NULL; + } + } else if (pill->rc_area[loc][offset] != -1) { len = pill->rc_area[loc][offset]; - else + } else { len = max(field->rmf_size, 0); + } value = getter(msg, offset, len); - swabber = swabber ?: field->rmf_swabber; - if (ptlrpc_buf_need_swab(pill->rc_req, inout, offset) && - swabber != NULL && value != NULL) { - swabber(value); - ptlrpc_buf_set_swabbed(pill->rc_req, inout, offset); - } if (value == NULL) { DEBUG_REQ(D_ERROR, pill->rc_req, "Wrong buffer for field `%s' (%d of %d) " "in format `%s': %d vs. %d (%s)\n", - field->rmf_name, offset, lustre_msg_bufcount(msg), fmt->rf_name, - lustre_msg_buflen(msg, offset), len, + field->rmf_name, offset, lustre_msg_bufcount(msg), + fmt->rf_name, lustre_msg_buflen(msg, offset), len, rcl_names[loc]); + } else { + swabber_dumper_helper(pill, field, loc, offset, value, len, + dump, swabber); } return value; } +/** + * Dump a request and/or reply + */ +void __req_capsule_dump(struct req_capsule *pill, enum req_location loc) +{ + const struct req_format *fmt; + const struct req_msg_field *field; + int len; + int i; + + fmt = pill->rc_fmt; + + DEBUG_REQ(D_RPCTRACE, pill->rc_req, "BEGIN REQ CAPSULE DUMP\n"); + for (i = 0; i < fmt->rf_fields[loc].nr; ++i) { + field = FMT_FIELD(fmt, loc, i); + if (field->rmf_dumper == NULL) { + /* + * FIXME Add a default hex dumper for fields that don't + * have a specific dumper + */ + len = req_capsule_get_size(pill, field, loc); + CDEBUG(D_RPCTRACE, "Field %s has no dumper function;" + "field size is %d\n", field->rmf_name, len); + } else { + /* It's the dumping side-effect that we're interested in */ + (void) __req_capsule_get(pill, field, loc, NULL, 1); + } + } + CDEBUG(D_RPCTRACE, "END REQ CAPSULE DUMP\n"); +} + +/** + * Dump a request. + */ +void req_capsule_client_dump(struct req_capsule *pill) +{ + __req_capsule_dump(pill, RCL_CLIENT); +} +EXPORT_SYMBOL(req_capsule_client_dump); + +/** + * Dump a reply + */ +void req_capsule_server_dump(struct req_capsule *pill) +{ + __req_capsule_dump(pill, RCL_SERVER); +} +EXPORT_SYMBOL(req_capsule_server_dump); + +/** + * Trivial wrapper around __req_capsule_get(), that returns the PTLRPC request + * buffer corresponding to the given RMF (\a field) of a \a pill. + */ void *req_capsule_client_get(struct req_capsule *pill, const struct req_msg_field *field) { - return __req_capsule_get(pill, field, RCL_CLIENT, NULL); + return __req_capsule_get(pill, field, RCL_CLIENT, NULL, 0); } EXPORT_SYMBOL(req_capsule_client_get); +/** + * Same as req_capsule_client_get(), but with a \a swabber argument. + * + * Currently unused; will be removed when req_capsule_server_swab_get() is + * unused too. + */ void *req_capsule_client_swab_get(struct req_capsule *pill, const struct req_msg_field *field, void (*swabber)(void* )) { - return __req_capsule_get(pill, field, RCL_CLIENT, swabber); + return __req_capsule_get(pill, field, RCL_CLIENT, swabber, 0); } EXPORT_SYMBOL(req_capsule_client_swab_get); +/** + * Utility that combines req_capsule_set_size() and req_capsule_client_get(). + * + * First the \a pill's request \a field's size is set (\a rc_area) using + * req_capsule_set_size() with the given \a len. Then the actual buffer is + * returned. + */ void *req_capsule_client_sized_get(struct req_capsule *pill, const struct req_msg_field *field, int len) { req_capsule_set_size(pill, field, RCL_CLIENT, len); - return __req_capsule_get(pill, field, RCL_CLIENT, NULL); + return __req_capsule_get(pill, field, RCL_CLIENT, NULL, 0); } EXPORT_SYMBOL(req_capsule_client_sized_get); +/** + * Trivial wrapper around __req_capsule_get(), that returns the PTLRPC reply + * buffer corresponding to the given RMF (\a field) of a \a pill. + */ void *req_capsule_server_get(struct req_capsule *pill, const struct req_msg_field *field) { - return __req_capsule_get(pill, field, RCL_SERVER, NULL); + return __req_capsule_get(pill, field, RCL_SERVER, NULL, 0); } EXPORT_SYMBOL(req_capsule_server_get); +/** + * Same as req_capsule_server_get(), but with a \a swabber argument. + * + * Ideally all swabbing should be done pursuant to RMF definitions, with no + * swabbing done outside this capsule abstraction. + */ void *req_capsule_server_swab_get(struct req_capsule *pill, const struct req_msg_field *field, void *swabber) { - return __req_capsule_get(pill, field, RCL_SERVER, swabber); + return __req_capsule_get(pill, field, RCL_SERVER, swabber, 0); } EXPORT_SYMBOL(req_capsule_server_swab_get); - +/** + * Utility that combines req_capsule_set_size() and req_capsule_server_get(). + * + * First the \a pill's request \a field's size is set (\a rc_area) using + * req_capsule_set_size() with the given \a len. Then the actual buffer is + * returned. + */ void *req_capsule_server_sized_get(struct req_capsule *pill, const struct req_msg_field *field, int len) { req_capsule_set_size(pill, field, RCL_SERVER, len); - return __req_capsule_get(pill, field, RCL_SERVER, NULL); + return __req_capsule_get(pill, field, RCL_SERVER, NULL, 0); } EXPORT_SYMBOL(req_capsule_server_sized_get); +/** + * Returns the buffer of a \a pill corresponding to the given \a field from the + * request (if the caller is executing on the server-side) or reply (if the + * caller is executing on the client-side). + * + * This function convienient for use is code that could be executed on the + * client and server alike. + */ const void *req_capsule_other_get(struct req_capsule *pill, const struct req_msg_field *field) { - return __req_capsule_get(pill, field, pill->rc_loc ^ 1, NULL); + return __req_capsule_get(pill, field, pill->rc_loc ^ 1, NULL, 0); } EXPORT_SYMBOL(req_capsule_other_get); +/** + * Set the size of the PTLRPC request/reply (\a loc) buffer for the given \a + * field of the given \a pill. + * + * This function must be used when constructing variable sized fields of a + * request or reply. + */ void req_capsule_set_size(struct req_capsule *pill, const struct req_msg_field *field, enum req_location loc, int size) @@ -1521,16 +1820,29 @@ void req_capsule_set_size(struct req_capsule *pill, (field->rmf_size != -1) && !(field->rmf_flags & RMF_F_NO_SIZE_CHECK) && (size > 0)) { - CERROR("%s: field size mismatch %d != %d (%d)\n", - field->rmf_name, size, field->rmf_size, loc); - LBUG(); + if ((field->rmf_flags & RMF_F_STRUCT_ARRAY) && + (size % field->rmf_size != 0)) { + CERROR("%s: array field size mismatch " + "%d %% %d != 0 (%d)\n", + field->rmf_name, size, field->rmf_size, loc); + LBUG(); + } else if (!(field->rmf_flags & RMF_F_STRUCT_ARRAY) && + size < field->rmf_size) { + CERROR("%s: field size mismatch %d != %d (%d)\n", + field->rmf_name, size, field->rmf_size, loc); + LBUG(); + } } pill->rc_area[loc][__req_capsule_offset(pill, field, loc)] = size; } EXPORT_SYMBOL(req_capsule_set_size); -/* NB: this function doesn't correspond with req_capsule_set_size(), which +/** + * Return the actual PTLRPC buffer length of a request or reply (\a loc) + * for the given \a pill's given \a field. + * + * NB: this function doesn't correspond with req_capsule_set_size(), which * actually sets the size in pill.rc_area[loc][offset], but this function * returns the message buflen[offset], maybe we should use another name. */ @@ -1545,6 +1857,13 @@ int req_capsule_get_size(const struct req_capsule *pill, } EXPORT_SYMBOL(req_capsule_get_size); +/** + * Wrapper around lustre_msg_size() that returns the PTLRPC size needed for the + * given \a pill's request or reply (\a loc) given the field size recorded in + * the \a pill's rc_area. + * + * See also req_capsule_set_size(). + */ int req_capsule_msg_size(struct req_capsule *pill, enum req_location loc) { return lustre_msg_size(pill->rc_req->rq_import->imp_msg_magic, @@ -1552,11 +1871,26 @@ int req_capsule_msg_size(struct req_capsule *pill, enum req_location loc) pill->rc_area[loc]); } +/** + * While req_capsule_msg_size() computes the size of a PTLRPC request or reply + * (\a loc) given a \a pill's \a rc_area, this function computes the size of a + * PTLRPC request or reply given only an RQF (\a fmt). + * + * This function should not be used for formats which contain variable size + * fields. + */ int req_capsule_fmt_size(__u32 magic, const struct req_format *fmt, enum req_location loc) { int size, i = 0; + /* + * This function should probably LASSERT() that fmt has no fields with + * RMF_F_STRUCT_ARRAY in rmf_flags, since we can't know here how many + * elements in the array there will ultimately be, but then, we could + * assume that there will be at least one element, and that's just what + * we do. + */ size = lustre_msg_hdr_size(magic, fmt->rf_fields[loc].nr); if (size < 0) return size; @@ -1567,8 +1901,25 @@ int req_capsule_fmt_size(__u32 magic, const struct req_format *fmt, return size; } -#define FMT_FIELD(fmt, i, j) (fmt)->rf_fields[(i)].d[(j)] - +/** + * Changes the format of an RPC. + * + * The pill must already have been initialized, which means that it already has + * a request format. The new format \a fmt must be an extension of the pill's + * old format. Specifically: the new format must have as many request and reply + * fields as the old one, and all fields shared by the old and new format must + * be at least as large in the new format. + * + * The new format's fields may be of different "type" than the old format, but + * only for fields that are "opaque" blobs: fields which have a) have no + * \a rmf_swabber, b) \a rmf_flags == 0 or RMF_F_NO_SIZE_CHECK, and c) \a + * rmf_size == -1 or \a rmf_flags == RMF_F_NO_SIZE_CHECK. For example, + * OBD_SET_INFO has a key field and an opaque value field that gets interpreted + * according to the key field. When the value, according to the key, contains a + * structure (or array thereof) to be swabbed, the format should be changed to + * one where the value field has \a rmf_size/rmf_flags/rmf_swabber set + * accordingly. + */ void req_capsule_extend(struct req_capsule *pill, const struct req_format *fmt) { int i; @@ -1586,6 +1937,14 @@ void req_capsule_extend(struct req_capsule *pill, const struct req_format *fmt) for (i = 0; i < RCL_NR; ++i) { LASSERT(fmt->rf_fields[i].nr >= old->rf_fields[i].nr); for (j = 0; j < old->rf_fields[i].nr - 1; ++j) { + const struct req_msg_field *ofield = FMT_FIELD(old, i, j); + + /* "opaque" fields can be transmogrified */ + if (ofield->rmf_swabber == NULL && + (ofield->rmf_flags & ~RMF_F_NO_SIZE_CHECK) == 0 && + (ofield->rmf_size == -1 || + ofield->rmf_flags == RMF_F_NO_SIZE_CHECK)) + continue; LASSERT(FMT_FIELD(fmt, i, j) == FMT_FIELD(old, i, j)); } /* @@ -1599,6 +1958,11 @@ void req_capsule_extend(struct req_capsule *pill, const struct req_format *fmt) } EXPORT_SYMBOL(req_capsule_extend); +/** + * This function returns a non-zero value if the given \a field is present in + * the format (\a rc_fmt) of \a pill's PTLRPC request or reply (\a loc), else it + * returns 0. + */ int req_capsule_has_field(const struct req_capsule *pill, const struct req_msg_field *field, enum req_location loc) @@ -1609,6 +1973,10 @@ int req_capsule_has_field(const struct req_capsule *pill, } EXPORT_SYMBOL(req_capsule_has_field); +/** + * Returns a non-zero value if the given \a field is present in the given \a + * pill's PTLRPC request or reply (\a loc), else it returns 0. + */ int req_capsule_field_present(const struct req_capsule *pill, const struct req_msg_field *field, enum req_location loc) @@ -1623,6 +1991,12 @@ int req_capsule_field_present(const struct req_capsule *pill, } EXPORT_SYMBOL(req_capsule_field_present); +/** + * This function shrinks the size of the _buffer_ of the \a pill's PTLRPC + * request or reply (\a loc). + * + * This is not the opposite of req_capsule_extend(). + */ void req_capsule_shrink(struct req_capsule *pill, const struct req_msg_field *field, unsigned int newlen, diff --git a/lustre/ptlrpc/pack_generic.c b/lustre/ptlrpc/pack_generic.c index b5e3e53..6ec0e5b 100644 --- a/lustre/ptlrpc/pack_generic.c +++ b/lustre/ptlrpc/pack_generic.c @@ -773,26 +773,6 @@ static inline void *__lustre_swab_buf(struct lustre_msg *msg, int index, return ptr; } -void *lustre_swab_reqbuf(struct ptlrpc_request *req, int index, int min_size, - void *swabber) -{ - if (!ptlrpc_buf_need_swab(req, 1, index)) - return lustre_msg_buf(req->rq_reqmsg, index, min_size); - - lustre_set_req_swabbed(req, index); - return __lustre_swab_buf(req->rq_reqmsg, index, min_size, swabber); -} - -void *lustre_swab_repbuf(struct ptlrpc_request *req, int index, - int min_size, void *swabber) -{ - if (!ptlrpc_buf_need_swab(req, 0, index)) - return lustre_msg_buf(req->rq_repmsg, index, min_size); - - lustre_set_rep_swabbed(req, index); - return __lustre_swab_buf(req->rq_repmsg, index, min_size, swabber); -} - static inline struct ptlrpc_body *lustre_msg_ptlrpc_body(struct lustre_msg *msg) { return lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, @@ -2084,14 +2064,94 @@ void lustre_swab_qdata(struct qunit_data *d) __swab64s (&d->padding); } +/* Dump functions */ +void dump_ioo(struct obd_ioobj *ioo) +{ + CDEBUG(D_RPCTRACE, + "obd_ioobj: ioo_id="LPD64", ioo_gr="LPD64", ioo_type=%d, " + "ioo_bufct=%d\n", ioo->ioo_id, ioo->ioo_gr, ioo->ioo_type, + ioo->ioo_bufcnt); +} + +void dump_rniobuf(struct niobuf_remote *nb) +{ + CDEBUG(D_RPCTRACE, "niobuf_remote: offset="LPU64", len=%d, flags=%x\n", + nb->offset, nb->len, nb->flags); +} + +void dump_obdo(struct obdo *oa) +{ + __u32 valid = oa->o_valid; + + CDEBUG(D_RPCTRACE, "obdo: o_valid = %08x\n", valid); + if (valid & OBD_MD_FLID) + CDEBUG(D_RPCTRACE, "obdo: o_id = "LPD64"\n", oa->o_id); + if (valid & OBD_MD_FLGROUP) + CDEBUG(D_RPCTRACE, "obdo: o_gr = "LPD64"\n", oa->o_gr); + if (valid & OBD_MD_FLFID) + CDEBUG(D_RPCTRACE, "obdo: o_fid = "LPD64"\n", oa->o_fid); + if (valid & OBD_MD_FLSIZE) + CDEBUG(D_RPCTRACE, "obdo: o_size = "LPD64"\n", oa->o_size); + if (valid & OBD_MD_FLMTIME) + CDEBUG(D_RPCTRACE, "obdo: o_mtime = "LPD64"\n", oa->o_mtime); + if (valid & OBD_MD_FLATIME) + CDEBUG(D_RPCTRACE, "obdo: o_atime = "LPD64"\n", oa->o_atime); + if (valid & OBD_MD_FLCTIME) + CDEBUG(D_RPCTRACE, "obdo: o_ctime = "LPD64"\n", oa->o_ctime); + if (valid & OBD_MD_FLBLOCKS) /* allocation of space */ + CDEBUG(D_RPCTRACE, "obdo: o_blocks = "LPD64"\n", oa->o_blocks); + if (valid & OBD_MD_FLGRANT) + CDEBUG(D_RPCTRACE, "obdo: o_grant = "LPD64"\n", oa->o_grant); + if (valid & OBD_MD_FLBLKSZ) + CDEBUG(D_RPCTRACE, "obdo: o_blksize = %d\n", oa->o_blksize); + if (valid & (OBD_MD_FLTYPE | OBD_MD_FLMODE)) + CDEBUG(D_RPCTRACE, "obdo: o_mode = %o\n", + oa->o_mode & ((valid & OBD_MD_FLTYPE ? S_IFMT : 0) | + (valid & OBD_MD_FLMODE ? ~S_IFMT : 0))); + if (valid & OBD_MD_FLUID) + CDEBUG(D_RPCTRACE, "obdo: o_uid = %u\n", oa->o_uid); + if (valid & OBD_MD_FLGID) + CDEBUG(D_RPCTRACE, "obdo: o_gid = %u\n", oa->o_gid); + if (valid & OBD_MD_FLFLAGS) + CDEBUG(D_RPCTRACE, "obdo: o_flags = %x\n", oa->o_flags); + if (valid & OBD_MD_FLNLINK) + CDEBUG(D_RPCTRACE, "obdo: o_nlink = %u\n", oa->o_nlink); + else if (valid & OBD_MD_FLCKSUM) + CDEBUG(D_RPCTRACE, "obdo: o_checksum (o_nlink) = %u\n", oa->o_nlink); + if (valid & OBD_MD_FLGENER) + CDEBUG(D_RPCTRACE, "obdo: o_generation = %u\n", + oa->o_generation); + if (valid & OBD_MD_FLEASIZE) + CDEBUG(D_RPCTRACE, "obdo: o_easize = %u\n", oa->o_easize); + else if (valid & OBD_MD_FLEPOCH) + CDEBUG(D_RPCTRACE, "obdo: o_epoc (o_easize) = %u\n", oa->o_easize); + if (valid & OBD_MD_FLID) + CDEBUG(D_RPCTRACE, "obdo: o_stripe_idx = %u\n", oa->o_stripe_idx); + if (valid & OBD_MD_FLHANDLE) + CDEBUG(D_RPCTRACE, "obdo: o_handle = "LPD64"\n", oa->o_handle.cookie); + if (valid & OBD_MD_FLCOOKIE) + CDEBUG(D_RPCTRACE, "obdo: o_lcookie = " + "(llog_cookie dumping not yet implemented)\n"); +} + +void dump_ost_body(struct ost_body *ob) +{ + dump_obdo(&ob->oa); +} + +void dump_rcs(__u32 *rc) +{ + CDEBUG(D_RPCTRACE, "rmf_rcs: %d\n", *rc); +} + #ifdef __KERNEL__ /** * got qdata from request(req/rep) */ -struct qunit_data *quota_get_qdata(void *request, int is_req, int is_exp) +struct qunit_data *quota_get_qdata(void *r, int is_req, int is_exp) { - struct ptlrpc_request *req = (struct ptlrpc_request *)request; + struct ptlrpc_request *req = (struct ptlrpc_request *)r; struct qunit_data *qdata; __u64 flags = is_exp ? req->rq_export->exp_connect_flags : req->rq_import->imp_connect_data.ocd_connect_flags; @@ -2103,13 +2163,9 @@ struct qunit_data *quota_get_qdata(void *request, int is_req, int is_exp) LASSERT(flags & OBD_CONNECT_CHANGE_QS); if (is_req == QUOTA_REQUEST) - qdata = lustre_swab_reqbuf(req, REQ_REC_OFF, - sizeof(struct qunit_data), - lustre_swab_qdata); + qdata = req_capsule_client_get(&req->rq_pill, &RMF_QUNIT_DATA); else - qdata = lustre_swab_repbuf(req, REPLY_REC_OFF, - sizeof(struct qunit_data), - lustre_swab_qdata); + qdata = req_capsule_server_get(&req->rq_pill, &RMF_QUNIT_DATA); if (qdata == NULL) return ERR_PTR(-EPROTO); @@ -2121,10 +2177,10 @@ EXPORT_SYMBOL(quota_get_qdata); /** * copy qdata to request(req/rep) */ -int quota_copy_qdata(void *request, struct qunit_data *qdata, - int is_req, int is_exp) +int quota_copy_qdata(void *r, struct qunit_data *qdata, int is_req, + int is_exp) { - struct ptlrpc_request *req = (struct ptlrpc_request *)request; + struct ptlrpc_request *req = (struct ptlrpc_request *)r; void *target; __u64 flags = is_exp ? req->rq_export->exp_connect_flags : req->rq_import->imp_connect_data.ocd_connect_flags; @@ -2137,14 +2193,13 @@ int quota_copy_qdata(void *request, struct qunit_data *qdata, LASSERT(flags & OBD_CONNECT_CHANGE_QS); if (is_req == QUOTA_REQUEST) - target = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, - sizeof(struct qunit_data)); + target = req_capsule_client_get(&req->rq_pill, &RMF_QUNIT_DATA); else - target = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, - sizeof(struct qunit_data)); + target = req_capsule_server_get(&req->rq_pill, &RMF_QUNIT_DATA); if (target == NULL) return -EPROTO; + LASSERT(target != qdata); memcpy(target, qdata, sizeof(*qdata)); return 0; } diff --git a/lustre/ptlrpc/ptlrpc_module.c b/lustre/ptlrpc/ptlrpc_module.c index cab4334..a9012e7 100644 --- a/lustre/ptlrpc/ptlrpc_module.c +++ b/lustre/ptlrpc/ptlrpc_module.c @@ -238,8 +238,6 @@ EXPORT_SYMBOL(lustre_msg_string); EXPORT_SYMBOL(ptlrpc_buf_set_swabbed); EXPORT_SYMBOL(ptlrpc_buf_need_swab); EXPORT_SYMBOL(lustre_swab_ptlrpc_body); -EXPORT_SYMBOL(lustre_swab_reqbuf); -EXPORT_SYMBOL(lustre_swab_repbuf); EXPORT_SYMBOL(lustre_swab_obdo); EXPORT_SYMBOL(lustre_swab_obd_statfs); EXPORT_SYMBOL(lustre_swab_obd_ioobj); @@ -272,6 +270,11 @@ EXPORT_SYMBOL(lustre_swab_ldlm_resource_desc); EXPORT_SYMBOL(lustre_swab_ldlm_lock_desc); EXPORT_SYMBOL(lustre_swab_ldlm_request); EXPORT_SYMBOL(lustre_swab_ldlm_reply); +EXPORT_SYMBOL(dump_ioo); +EXPORT_SYMBOL(dump_rniobuf); +EXPORT_SYMBOL(dump_obdo); +EXPORT_SYMBOL(dump_ost_body); +EXPORT_SYMBOL(dump_rcs); EXPORT_SYMBOL(lustre_swab_qdata); EXPORT_SYMBOL(lustre_swab_quota_adjust_qunit); EXPORT_SYMBOL(lustre_msg_get_flags); diff --git a/lustre/utils/req-layout.c b/lustre/utils/req-layout.c index 74bdb0b..4b7c685 100644 --- a/lustre/utils/req-layout.c +++ b/lustre/utils/req-layout.c @@ -75,6 +75,13 @@ #define lustre_swab_ost_body NULL #define lustre_swab_ost_last_id NULL #define lustre_swab_fiemap NULL +#define lustre_swab_qdata NULL +#define lustre_swab_ost_lvb NULL +#define dump_rniobuf NULL +#define dump_ioo NULL +#define dump_obdo NULL +#define dump_ost_body NULL +#define dump_rcs NULL /* * Yes, include .c file. -- 1.8.3.1