* RHEL 4 and RHEL 5/SLES 10 clients behaves differently on 'cd' to a
removed cwd "./" (refer to Bugzilla 14399).
+Severity : normal$
+Bugzilla : 18631
+Description: Unify req format on client/servers
+Details : Use new req_capsule API [almost] everywhere instead of old PTLRPC
+ buffers and swabbers approach..
+
Severity : normal
Frequency : cleanup
Bugzilla : 19200
struct lustre_cfg;
extern void lustre_swab_lustre_cfg(struct lustre_cfg *lcfg);
+/* Functions for dumping PTLRPC fields */
+void dump_rniobuf(struct niobuf_remote *rnb);
+void dump_ioo(struct obd_ioobj *nb);
+void dump_obdo(struct obdo *oa);
+void dump_ost_body(struct ost_body *ob);
+void dump_rcs(__u32 *rc);
+
/* this will be used when OBD_CONNECT_CHANGE_QS is set */
struct qunit_data {
/**
#define QDATA_CLR_CHANGE_QS(qdata) ((qdata)->qd_flags &= ~LQUOTA_FLAGS_CHG_QS)
extern void lustre_swab_qdata(struct qunit_data *d);
-extern struct qunit_data *quota_get_qdata(void*req, int is_req, int is_exp);
+extern struct qunit_data *quota_get_qdata(void *req, int is_req, int is_exp);
extern int quota_copy_qdata(void *request, struct qunit_data *qdata,
int is_req, int is_exp);
return (capa_opc(capa) & opc) == opc;
}
-static inline struct lustre_capa *
-lustre_unpack_incoming_capa(struct ptlrpc_request *req, unsigned int offset)
-{
- struct lustre_capa *capa;
-
- capa = lustre_swab_reqbuf(req, offset, sizeof(*capa),
- lustre_swab_lustre_capa);
- if (capa == NULL)
- CERROR("bufcount %u, bufsize %u\n",
- lustre_msg_bufcount(req->rq_reqmsg),
- (lustre_msg_bufcount(req->rq_reqmsg) <= offset) ?
- -1 : lustre_msg_buflen(req->rq_reqmsg, offset));
-
- return capa;
-}
-
struct filter_capa_key {
struct list_head k_list;
struct lustre_capa_key k_key;
#define _LUSTRE_DEBUG_H
#include <lustre_net.h>
+#include <obd.h>
#if defined(__linux__)
#include <linux/lustre_debug.h>
}} while(0)
/* lib/debug.c */
-int dump_lniobuf(struct niobuf_local *lnb);
-int dump_rniobuf(struct niobuf_remote *rnb);
-int dump_ioo(struct obd_ioobj *nb);
+void dump_lniobuf(struct niobuf_local *lnb);
int dump_req(struct ptlrpc_request *req);
-int dump_obdo(struct obdo *oa);
void dump_lsm(int level, struct lov_stripe_md *lsm);
int block_debug_setup(void *addr, int len, __u64 off, __u64 id);
int block_debug_check(char *who, void *addr, int len, __u64 off, __u64 id);
int (*lvbo_init)(struct ldlm_resource *res);
int (*lvbo_update)(struct ldlm_resource *res,
struct ptlrpc_request *r,
- int buf_idx, int increase);
+ int increase);
};
typedef enum {
*/
__u32 l_lvb_len;
void *l_lvb_data;
- void *l_lvb_swabber;
void *l_ast_data;
spinlock_t l_extents_list_lock;
}
static inline int ldlm_res_lvbo_update(struct ldlm_resource *res,
- struct ptlrpc_request *r, int buf_idx,
- int increase)
+ struct ptlrpc_request *r, int increase)
{
if (res->lr_namespace->ns_lvbo &&
res->lr_namespace->ns_lvbo->lvbo_update) {
- return res->lr_namespace->ns_lvbo->lvbo_update(res, r, buf_idx,
+ return res->lr_namespace->ns_lvbo->lvbo_update(res, r,
increase);
}
return 0;
struct ldlm_enqueue_info *einfo,
const struct ldlm_res_id *res_id,
ldlm_policy_data_t *policy, int *flags,
- void *lvb, __u32 lvb_len, void *lvb_swabber,
- struct lustre_handle *lockh, int async);
+ void *lvb, __u32 lvb_len, struct lustre_handle *lockh,
+ int async);
int ldlm_prep_enqueue_req(struct obd_export *exp,
struct ptlrpc_request *req,
struct list_head *cancels,
int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
ldlm_type_t type, __u8 with_policy, ldlm_mode_t mode,
int *flags, void *lvb, __u32 lvb_len,
- void *lvb_swabber, struct lustre_handle *lockh,
- int rc);
+ struct lustre_handle *lockh, int rc);
int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
const struct ldlm_res_id *res_id,
ldlm_type_t type, ldlm_policy_data_t *policy,
ldlm_blocking_callback blocking,
ldlm_completion_callback completion,
ldlm_glimpse_callback glimpse,
- void *data, __u32 lvb_len, void *lvb_swabber,
+ void *data, __u32 lvb_len,
const __u64 *client_cookie,
struct lustre_handle *lockh);
int ldlm_server_ast(struct lustre_handle *lockh, struct ldlm_lock_desc *new,
rq_pack_udesc:1,
rq_pack_bulk:1,
/* doesn't expect reply FIXME */
- rq_no_reply:1;
+ rq_no_reply:1,
+ rq_pill_init:1; /* pill initialized */
uid_t rq_auth_uid; /* authed uid */
uid_t rq_auth_mapped_uid; /* authed uid mapped to */
void lustre_msg_set_buflen(struct lustre_msg *m, int n, int len);
int lustre_msg_bufcount(struct lustre_msg *m);
char *lustre_msg_string (struct lustre_msg *m, int n, int max_len);
-void *lustre_swab_reqbuf(struct ptlrpc_request *req, int n, int minlen,
- void *swabber);
-void *lustre_swab_repbuf(struct ptlrpc_request *req, int n, int minlen,
- void *swabber);
__u32 lustre_msghdr_get_flags(struct lustre_msg *msg);
void lustre_msghdr_set_flags(struct lustre_msg *msg, __u32 flags);
__u32 lustre_msg_get_flags(struct lustre_msg *msg);
void req_capsule_fini(struct req_capsule *pill);
void req_capsule_set(struct req_capsule *pill, const struct req_format *fmt);
+void req_capsule_client_dump(struct req_capsule *pill);
+void req_capsule_server_dump(struct req_capsule *pill);
void req_capsule_init_area(struct req_capsule *pill);
int req_capsule_filled_sizes(struct req_capsule *pill, enum req_location loc);
int req_capsule_server_pack(struct req_capsule *pill);
extern const struct req_msg_field RMF_OBD_IOOBJ;
extern const struct req_msg_field RMF_OBD_ID;
extern const struct req_msg_field RMF_NIOBUF_REMOTE;
+extern const struct req_msg_field RMF_RCS;
extern const struct req_msg_field RMF_FIEMAP_KEY;
extern const struct req_msg_field RMF_FIEMAP_VAL;
LDLM_LOCK_GET(lock);
LDLM_DEBUG(lock, "export %p", exp);
- ldlm_res_lvbo_update(res, NULL, 0, 1);
+ ldlm_res_lvbo_update(res, NULL, 1);
ldlm_lock_cancel(lock);
ldlm_reprocess_all(res);
ldlm_resource_putref(res);
* been received yet, we need to update lvbo to have the
* proper attributes cached. */
if (rc == -EINVAL && arg->type == LDLM_BL_CALLBACK)
- ldlm_res_lvbo_update(lock->l_resource, NULL,
- 0, 1);
+ ldlm_res_lvbo_update(lock->l_resource, NULL, 1);
rc = ldlm_handle_ast_error(lock, req, rc,
arg->type == LDLM_BL_CALLBACK
? "blocking" : "completion");
else if (rc != 0)
rc = ldlm_handle_ast_error(lock, req, rc, "glimpse");
else
- rc = ldlm_res_lvbo_update(res, req, REPLY_REC_OFF, 1);
+ rc = ldlm_res_lvbo_update(res, req, 1);
ptlrpc_req_finished(req);
if (rc == -ERESTART)
if (res != NULL) {
ldlm_resource_getref(res);
LDLM_RESOURCE_ADDREF(res);
- ldlm_res_lvbo_update(res, NULL, 0, 1);
+ ldlm_res_lvbo_update(res, NULL, 1);
}
pres = res;
}
LDLM_ERROR(lock, "completion AST did not contain "
"expected LVB!");
} else {
- void *lvb = req_capsule_client_swab_get(&req->rq_pill,
- &RMF_DLM_LVB,
- (void *)lock->l_lvb_swabber);
+ void *lvb = req_capsule_client_get(&req->rq_pill,
+ &RMF_DLM_LVB);
memcpy(lock->l_lvb_data, lvb, lock->l_lvb_len);
}
}
ldlm_blocking_callback blocking,
ldlm_completion_callback completion,
ldlm_glimpse_callback glimpse,
- void *data, __u32 lvb_len, void *lvb_swabber,
+ void *data, __u32 lvb_len,
const __u64 *client_cookie,
struct lustre_handle *lockh)
{
lock->l_flags |= LDLM_FL_LOCAL;
if (*flags & LDLM_FL_ATOMIC_CB)
lock->l_flags |= LDLM_FL_ATOMIC_CB;
- lock->l_lvb_swabber = lvb_swabber;
unlock_res_and_lock(lock);
if (policy != NULL)
lock->l_policy_data = *policy;
int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
ldlm_type_t type, __u8 with_policy, ldlm_mode_t mode,
int *flags, void *lvb, __u32 lvb_len,
- void *lvb_swabber, struct lustre_handle *lockh,int rc)
+ struct lustre_handle *lockh,int rc)
{
struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
int is_replay = *flags & LDLM_FL_REPLAY;
req_capsule_set_size(&req->rq_pill,
&RMF_DLM_LVB, RCL_SERVER,
lvb_len);
- tmplvb = req_capsule_server_swab_get(&req->rq_pill,
- &RMF_DLM_LVB,
- lvb_swabber);
+ tmplvb = req_capsule_server_get(&req->rq_pill,
+ &RMF_DLM_LVB);
if (tmplvb == NULL)
GOTO(cleanup, rc = -EPROTO);
if (lvb != NULL)
req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
lvb_len);
- tmplvb = req_capsule_server_swab_get(&req->rq_pill,
- &RMF_DLM_LVB,
- lvb_swabber);
+ tmplvb = req_capsule_server_get(&req->rq_pill,
+ &RMF_DLM_LVB);
if (tmplvb == NULL)
GOTO(cleanup, rc = -EPROTO);
memcpy(lock->l_lvb_data, tmplvb, lvb_len);
struct ldlm_enqueue_info *einfo,
const struct ldlm_res_id *res_id,
ldlm_policy_data_t *policy, int *flags,
- void *lvb, __u32 lvb_len, void *lvb_swabber,
- struct lustre_handle *lockh, int async)
+ void *lvb, __u32 lvb_len, struct lustre_handle *lockh,
+ int async)
{
struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
struct ldlm_lock *lock;
/* for the local lock, add the reference */
ldlm_lock_addref_internal(lock, einfo->ei_mode);
ldlm_lock2handle(lock, lockh);
- lock->l_lvb_swabber = lvb_swabber;
if (policy != NULL) {
/* INODEBITS_INTEROP: If the server does not support
* inodebits, we will request a plain lock in the
err = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, policy ? 1 : 0,
einfo->ei_mode, flags, lvb, lvb_len,
- lvb_swabber, lockh, rc);
+ lockh, rc);
/* If ldlm_cli_enqueue_fini did not find the lock, we need to free
* one reference that we took */
if (lmv->tgts[0].ltd_exp != NULL)
rc = ldlm_cli_enqueue(lmv->tgts[0].ltd_exp, NULL, &einfo, &res_id,
- &flock, &flags, NULL, 0, NULL, &lockh, 0);
+ &flock, &flags, NULL, 0, &lockh, 0);
else
rc = -ENODEV;
}
mdc_enter_request(&obddev->u.cli);
}
rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
- 0, NULL, lockh, 0);
+ 0, lockh, 0);
if (reqp)
*reqp = req;
rc = -ETIMEDOUT;
rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
- &flags, NULL, 0, NULL, lockh, rc);
+ &flags, NULL, 0, lockh, rc);
if (rc < 0) {
CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
mdc_clear_replay_flag(req, rc);
mdc_enter_request(&obddev->u.cli);
rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
- 0, NULL, &minfo->mi_lockh, 1);
+ 0, &minfo->mi_lockh, 1);
if (rc < 0) {
mdc_exit_request(&obddev->u.cli);
RETURN(rc);
rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, policy,
mode, &flags, mdt_blocking_ast,
- ldlm_completion_ast,
- NULL, NULL, 0, NULL, client_cookie, lh);
+ ldlm_completion_ast, NULL, NULL, 0,
+ client_cookie, lh);
return rc == ELDLM_OK ? 0 : -EIO;
}
rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, policy,
LCK_EX, &flags, ldlm_blocking_ast,
ldlm_completion_ast, NULL, NULL, 0,
- NULL,
&info->mti_exp->exp_handle.h_cookie,
lh);
} else {
* This is the case mdt0 is remote node, issue DLM lock like
* other clients.
*/
- rc = ldlm_cli_enqueue(ms->ms_control_exp,
- NULL, &einfo, res_id,
- policy, &flags, NULL, 0, NULL, lh, 0);
+ rc = ldlm_cli_enqueue(ms->ms_control_exp, NULL, &einfo, res_id,
+ policy, &flags, NULL, 0, lh, 0);
}
RETURN(rc);
{
struct ptlrpc_request *req;
struct mgs_send_param *req_msp, *rep_msp;
- int size[] = { sizeof(struct ptlrpc_body), sizeof(*req_msp) };
- __u32 rep_size[] = { sizeof(struct ptlrpc_body), sizeof(*msp) };
int rc;
ENTRY;
- req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MGS_VERSION,
- MGS_SET_INFO, 2, size, NULL);
+ req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
+ &RQF_MGS_SET_INFO, LUSTRE_MGS_VERSION,
+ MGS_SET_INFO);
if (!req)
RETURN(-ENOMEM);
- req_msp = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*req_msp));
+ req_msp = req_capsule_client_get(&req->rq_pill, &RMF_MGS_SEND_PARAM);
if (!req_msp) {
ptlrpc_req_finished(req);
RETURN(-ENOMEM);
}
memcpy(req_msp, msp, sizeof(*req_msp));
- ptlrpc_req_set_repsize(req, 2, rep_size);
+ ptlrpc_request_set_replen(req);
rc = ptlrpc_queue_wait(req);
if (!rc) {
- rep_msp = lustre_swab_repbuf(req, REPLY_REC_OFF,
- sizeof(*rep_msp), NULL);
+ rep_msp = req_capsule_server_get(&req->rq_pill, &RMF_MGS_SEND_PARAM);
memcpy(msp, rep_msp, sizeof(*rep_msp));
}
/* We need a callback for every lockholder, so don't try to
ldlm_lock_match (see rev 1.1.2.11.2.47) */
- rc = ldlm_cli_enqueue(exp, NULL, &einfo, &cld->cld_resid,
- NULL, flags, NULL, 0, NULL, lockh, 0);
+ rc = ldlm_cli_enqueue(exp, NULL, &einfo, &cld->cld_resid, NULL, flags,
+ NULL, 0, lockh, 0);
/* A failed enqueue should still call the mgc_blocking_ast,
where it will be requeued if needed ("grant failed"). */
LDLM_PLAIN, NULL, LCK_EX,
&flags, ldlm_blocking_ast,
ldlm_completion_ast, NULL,
- fsname, 0, NULL, NULL, lockh);
+ fsname, 0, NULL, lockh);
if (rc)
CERROR("can't take cfg lock for %s (%d)\n", fsname, rc);
#include <lustre_debug.h>
#include <lustre_net.h>
-int dump_ioo(struct obd_ioobj *ioo)
+void dump_lniobuf(struct niobuf_local *nb)
{
- CERROR("obd_ioobj: ioo_id="LPD64", ioo_gr="LPD64", ioo_type=%d, "
- "ioo_bufct=%d\n",
- ioo->ioo_id, ioo->ioo_gr, ioo->ioo_type, ioo->ioo_bufcnt);
- return -EINVAL;
-}
-
-int dump_lniobuf(struct niobuf_local *nb)
-{
- CERROR("niobuf_local: offset="LPD64", len=%d, page=%p, rc=%d\n",
+ CDEBUG(D_RPCTRACE,
+ "niobuf_local: offset="LPD64", len=%d, page=%p, rc=%d\n",
nb->offset, nb->len, nb->page, nb->rc);
- CERROR("nb->page: index = %ld\n", nb->page ? cfs_page_index(nb->page) : -1);
-
- return -EINVAL;
-}
-
-int dump_rniobuf(struct niobuf_remote *nb)
-{
- CERROR("niobuf_remote: offset="LPU64", len=%d, flags=%x\n",
- nb->offset, nb->len, nb->flags);
-
- return -EINVAL;
-}
-
-int dump_obdo(struct obdo *oa)
-{
- __u32 valid = oa->o_valid;
-
- CERROR("obdo: o_valid = %08x\n", valid);
- if (valid & OBD_MD_FLID)
- CERROR("obdo: o_id = "LPD64"\n", oa->o_id);
- if (valid & OBD_MD_FLATIME)
- CERROR("obdo: o_atime = "LPD64"\n", oa->o_atime);
- if (valid & OBD_MD_FLMTIME)
- CERROR("obdo: o_mtime = "LPD64"\n", oa->o_mtime);
- if (valid & OBD_MD_FLCTIME)
- CERROR("obdo: o_ctime = "LPD64"\n", oa->o_ctime);
- if (valid & OBD_MD_FLSIZE)
- CERROR("obdo: o_size = "LPD64"\n", oa->o_size);
- if (valid & OBD_MD_FLBLOCKS) /* allocation of space */
- CERROR("obdo: o_blocks = "LPD64"\n", oa->o_blocks);
- if (valid & OBD_MD_FLBLKSZ)
- CERROR("obdo: o_blksize = %d\n", oa->o_blksize);
- if (valid & (OBD_MD_FLTYPE | OBD_MD_FLMODE))
- CERROR("obdo: o_mode = %o\n",
- oa->o_mode & ((valid & OBD_MD_FLTYPE ? S_IFMT : 0) |
- (valid & OBD_MD_FLMODE ? ~S_IFMT : 0)));
- if (valid & OBD_MD_FLUID)
- CERROR("obdo: o_uid = %u\n", oa->o_uid);
- if (valid & OBD_MD_FLGID)
- CERROR("obdo: o_gid = %u\n", oa->o_gid);
- if (valid & OBD_MD_FLFLAGS)
- CERROR("obdo: o_flags = %x\n", oa->o_flags);
- if (valid & OBD_MD_FLNLINK)
- CERROR("obdo: o_nlink = %u\n", oa->o_nlink);
- if (valid & OBD_MD_FLGENER)
- CERROR("obdo: o_generation = %u\n", oa->o_generation);
-
- return -EINVAL;
+ CDEBUG(D_RPCTRACE, "nb->page: index = %ld\n",
+ nb->page ? cfs_page_index(nb->page) : -1);
}
+EXPORT_SYMBOL(dump_lniobuf);
void dump_lsm(int level, struct lov_stripe_md *lsm)
{
lsm->lsm_pool_name);
}
-/* XXX assumes only a single page in request */
-/*
-int dump_req(struct ptlrpc_request *req)
-{
- struct ost_body *body = lustre_msg_buf(req->rq_reqmsg, 0);
- struct obd_ioobj *ioo = lustre_msg_buf(req->rq_reqmsg, 1);
- //struct niobuf *nb = lustre_msg_buf(req->rq_reqmsg, 2);
-
- dump_obdo(&body->oa);
- //dump_niobuf(nb);
- dump_ioo(ioo);
-
- return -EINVAL;
-}
-*/
-
#define LPDS sizeof(__u64)
int block_debug_setup(void *addr, int len, __u64 off, __u64 id)
{
}
#undef LPDS
-EXPORT_SYMBOL(dump_lniobuf);
-EXPORT_SYMBOL(dump_rniobuf);
-EXPORT_SYMBOL(dump_ioo);
//EXPORT_SYMBOL(dump_req);
-EXPORT_SYMBOL(dump_obdo);
EXPORT_SYMBOL(dump_lsm);
EXPORT_SYMBOL(block_debug_setup);
EXPORT_SYMBOL(block_debug_check);
rc = ldlm_cli_enqueue_local(obd->obd_namespace, &res_id, LDLM_PLAIN,
NULL, LCK_NL, &lock_flags, NULL,
- ldlm_completion_ast, NULL, NULL,
- 0, NULL, NULL, &obd->u.echo.eo_nl_lock);
+ ldlm_completion_ast, NULL, NULL, 0, NULL,
+ &obd->u.echo.eo_nl_lock);
LASSERT (rc == ELDLM_OK);
lprocfs_echo_init_vars(&lvars);
rc = ldlm_cli_enqueue_local(obd->obd_namespace, &res_id, LDLM_EXTENT,
&policy, LCK_PW, &flags, ldlm_blocking_ast,
ldlm_completion_ast, NULL, NULL, 0, NULL,
- NULL, lockh);
+ lockh);
if (rc != ELDLM_OK)
lockh->cookie = 0;
RETURN(rc);
*
* Of course, this will all disappear when we switch to
* taking liblustre locks on the OST. */
- ldlm_res_lvbo_update(res, NULL, 0, 1);
+ ldlm_res_lvbo_update(res, NULL, 1);
}
RETURN(ELDLM_LOCK_ABORTED);
}
* sending ast is not handled. This can result in lost client writes.
*/
if (rc != 0)
- ldlm_res_lvbo_update(res, NULL, 0, 1);
+ ldlm_res_lvbo_update(res, NULL, 1);
lock_res(res);
*reply_lvb = *res_lvb;
if (res != NULL) {
LDLM_RESOURCE_ADDREF(res);
- rc = ldlm_res_lvbo_update(res, NULL, 0, 0);
+ rc = ldlm_res_lvbo_update(res, NULL, 0);
LDLM_RESOURCE_DELREF(res);
ldlm_resource_putref(res);
}
if (resource != NULL) {
LDLM_RESOURCE_ADDREF(resource);
- ns->ns_lvbo->lvbo_update(resource, NULL, 0, 1);
+ ns->ns_lvbo->lvbo_update(resource, NULL, 1);
LDLM_RESOURCE_DELREF(resource);
ldlm_resource_putref(resource);
}
* If 'increase_only' is true, don't allow values to move backwards.
*/
static int filter_lvbo_update(struct ldlm_resource *res,
- struct ptlrpc_request *r,
- int buf_idx, int increase_only)
+ struct ptlrpc_request *r, int increase_only)
{
int rc = 0;
struct ost_lvb *lvb;
struct ost_lvb *new;
/* XXX update always from reply buffer */
- new = lustre_swab_repbuf(r, buf_idx, sizeof(*new),
- lustre_swab_ost_lvb);
+ new = req_capsule_server_get(&r->rq_pill, &RMF_DLM_LVB);
if (new == NULL) {
CERROR("lustre_swab_buf failed\n");
ENTRY;
if (req->rq_repmsg) {
- body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
- lustre_swab_ost_body);
+ body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
if (body == NULL && rc == 0)
rc = -EPROTO;
}
{
struct ptlrpc_request *request;
struct ost_body *body;
- __u32 size[] = { sizeof(struct ptlrpc_body), sizeof(*body) };
ENTRY;
LASSERT_SPIN_LOCKED(&oscc->oscc_lock);
oscc->oscc_flags |= OSCC_FLAG_CREATING;
spin_unlock(&oscc->oscc_lock);
- request = ptlrpc_prep_req(oscc->oscc_obd->u.cli.cl_import,
- LUSTRE_OST_VERSION, OST_CREATE, 2,
- size, NULL);
+ request = ptlrpc_request_alloc_pack(oscc->oscc_obd->u.cli.cl_import,
+ &RQF_OST_CREATE,
+ LUSTRE_OST_VERSION, OST_CREATE);
if (request == NULL) {
spin_lock(&oscc->oscc_lock);
oscc->oscc_flags &= ~OSCC_FLAG_CREATING;
request->rq_request_portal = OST_CREATE_PORTAL;
ptlrpc_at_set_req_timeout(request);
- body = lustre_msg_buf(request->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
+ body = req_capsule_client_get(&request->rq_pill, &RMF_OST_BODY);
spin_lock(&oscc->oscc_lock);
body->oa.o_id = oscc->oscc_last_id + oscc->oscc_grow_count;
/* we should not resend create request - anyway we will have delorphan
* and kill these objects */
request->rq_no_delay = request->rq_no_resend = 1;
- ptlrpc_req_set_repsize(request, 2, size);
+ ptlrpc_request_set_replen(request);
request->rq_async_args.pointer_arg[0] = oscc;
request->rq_interpret_reply = osc_interpret_create;
if (rc != 0)
GOTO(out, rc);
- body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
- lustre_swab_ost_body);
+ body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
if (body) {
CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
int requested_nob, int niocount,
obd_count page_count, struct brw_page **pga)
{
- int *remote_rcs, i;
+ int i;
+ __u32 *remote_rcs;
- /* return error if any niobuf was in error */
- remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
- sizeof(*remote_rcs) * niocount, NULL);
+ remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
+ sizeof(*remote_rcs) *
+ niocount);
if (remote_rcs == NULL) {
CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
return(-EPROTO);
}
- if (ptlrpc_rep_need_swab(req))
- for (i = 0; i < niocount; i++)
- __swab32s(&remote_rcs[i]);
+ /* return error if any niobuf was in error */
for (i = 0; i < niocount; i++) {
if (remote_rcs[i] < 0)
return(remote_rcs[i]);
}
pill = &req->rq_pill;
+ req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
+ sizeof(*ioobj));
req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
niocount * sizeof(*niobuf));
osc_set_capa_size(req, &RMF_CAPA1, ocapa);
body = req_capsule_client_get(pill, &RMF_OST_BODY);
ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
- LASSERT(body && ioobj && niobuf);
+ LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
lustre_set_wire_obdo(&body->oa, oa);
}
LASSERTF((void *)(niobuf - niocount) ==
- lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
- niocount * sizeof(*niobuf)),
- "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg,
- REQ_REC_OFF + 2, niocount * sizeof(*niobuf)),
- (void *)(niobuf - niocount));
+ req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
+ "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
+ &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
if (osc_should_shrink_grant(cli))
}
oa->o_cksum = body->oa.o_cksum;
/* 1 RC per niobuf */
- req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER,
+ req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
sizeof(__u32) * niocount);
} else {
if (unlikely(cli->cl_checksum) &&
body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
}
- req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER, 0);
+ req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER, 0);
/* 1 RC for the whole I/O */
}
ptlrpc_request_set_replen(req);
RETURN(rc);
LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
- body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
- lustre_swab_ost_body);
+ body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
if (body == NULL) {
CDEBUG(D_INFO, "Can't unpack body\n");
RETURN(-EPROTO);
/* Complete obtaining the lock procedure. */
rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
mode, aa->oa_flags, aa->oa_lvb,
- sizeof(*aa->oa_lvb), lustre_swab_ost_lvb,
- &handle, rc);
+ sizeof(*aa->oa_lvb), &handle, rc);
/* Complete osc stuff. */
rc = osc_enqueue_fini(req, aa->oa_lvb,
aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
*flags &= ~LDLM_FL_BLOCK_GRANTED;
rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
- sizeof(*lvb), lustre_swab_ost_lvb, lockh, async);
+ sizeof(*lvb), lockh, async);
if (rqset) {
if (!rc) {
struct osc_enqueue_args *aa;
struct obd_trans_info *oti)
{
struct ost_body *body, *repbody;
- __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
struct lustre_capa *capa = NULL;
int rc;
ENTRY;
- body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
- lustre_swab_ost_body);
+ /* Get the request body */
+ body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
if (body == NULL)
RETURN(-EFAULT);
if (body->oa.o_id == 0)
RETURN(-EPROTO);
- if (lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1)) {
+ /* If there's a DLM request, cancel the locks mentioned in it*/
+ if (req_capsule_field_present(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT)) {
struct ldlm_request *dlm;
- dlm = lustre_swab_reqbuf(req, REQ_REC_OFF + 1, sizeof(*dlm),
- lustre_swab_ldlm_request);
+
+ dlm = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
if (dlm == NULL)
RETURN (-EFAULT);
ldlm_request_cancel(req, dlm, 0);
}
- if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
- capa = lustre_unpack_incoming_capa(req, REQ_REC_OFF + 2);
+ /* If there's a capability, get it */
+ if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
+ capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
+ if (capa == NULL) {
+ CERROR("Missing capability for OST DESTROY");
+ RETURN (-EFAULT);
+ }
+ }
- rc = lustre_pack_reply(req, 2, size, NULL);
+ /* Prepare the reply */
+ rc = req_capsule_server_pack(&req->rq_pill);
if (rc)
RETURN(rc);
+ /* Get the log cancellation cookie */
if (body->oa.o_valid & OBD_MD_FLCOOKIE)
oti->oti_logcookies = &body->oa.o_lcookie;
- repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
- sizeof(*repbody));
+
+ /* Finish the reply */
+ repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
+
+ /* Do the destroy and set the reply status accordingly */
req->rq_status = obd_destroy(exp, &body->oa, NULL, oti, NULL, capa);
RETURN(0);
}
{
struct ost_body *body, *repbody;
struct obd_info oinfo = { { { 0 } } };
- __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
int rc;
ENTRY;
- body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
- lustre_swab_ost_body);
+ body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
if (body == NULL)
RETURN(-EFAULT);
- rc = lustre_pack_reply(req, 2, size, NULL);
+ rc = req_capsule_server_pack(&req->rq_pill);
if (rc)
RETURN(rc);
- repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
- sizeof(*repbody));
+ repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
repbody->oa = body->oa;
oinfo.oi_oa = &repbody->oa;
- if (oinfo.oi_oa->o_valid & OBD_MD_FLOSSCAPA)
- oinfo.oi_capa = lustre_unpack_incoming_capa(req,
- REQ_REC_OFF + 1);
+ if (oinfo.oi_oa->o_valid & OBD_MD_FLOSSCAPA) {
+ oinfo.oi_capa = req_capsule_client_get(&req->rq_pill,
+ &RMF_CAPA1);
+ if (oinfo.oi_capa == NULL) {
+ CERROR("Missing capability for OST GETATTR");
+ RETURN (-EFAULT);
+ }
+ }
+
req->rq_status = obd_getattr(exp, &oinfo);
ost_drop_id(exp, &repbody->oa);
RETURN(0);
static int ost_statfs(struct ptlrpc_request *req)
{
struct obd_statfs *osfs;
- __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
int rc;
ENTRY;
- rc = lustre_pack_reply(req, 2, size, NULL);
+ rc = req_capsule_server_pack(&req->rq_pill);
if (rc)
RETURN(rc);
- osfs = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*osfs));
+ osfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
req->rq_status = obd_statfs(req->rq_export->exp_obd, osfs,
cfs_time_current_64() - HZ, 0);
struct obd_trans_info *oti)
{
struct ost_body *body, *repbody;
- __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*repbody) };
int rc;
ENTRY;
- body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
- lustre_swab_ost_body);
+ body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
if (body == NULL)
RETURN(-EFAULT);
- rc = lustre_pack_reply(req, 2, size, NULL);
+ rc = req_capsule_server_pack(&req->rq_pill);
if (rc)
RETURN(rc);
- repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
- sizeof(*repbody));
+ repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
oti->oti_logcookies = &repbody->oa.o_lcookie;
RETURN(0);
}
-/*
+/**
* Helper function for ost_punch(): if asked by client, acquire [size, EOF]
* lock on the file being truncated.
*/
RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id,
LDLM_EXTENT, &policy, LCK_PW, &flags,
ldlm_blocking_ast, ldlm_completion_ast,
- ldlm_glimpse_ast, NULL, 0, NULL,
- NULL, lh));
+ ldlm_glimpse_ast, NULL, 0, NULL, lh));
}
-/*
+/**
* Helper function for ost_punch(): release lock acquired by
* ost_punch_lock_get(), if any.
*/
{
struct obd_info oinfo = { { { 0 } } };
struct ost_body *body, *repbody;
- __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*repbody) };
int rc;
struct lustre_handle lh = {0,};
ENTRY;
/* check that we do support OBD_CONNECT_TRUNCLOCK. */
CLASSERT(OST_CONNECT_SUPPORTED & OBD_CONNECT_TRUNCLOCK);
- /* ost_body is varified and swabbed in ost_hpreq_handler() */
- body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
- LASSERT(body != NULL);
+ body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
+ if (body == NULL)
+ RETURN(-EFAULT);
oinfo.oi_oa = &body->oa;
oinfo.oi_policy.l_extent.start = oinfo.oi_oa->o_size;
(OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
RETURN(-EPROTO);
- rc = lustre_pack_reply(req, 2, size, NULL);
+ rc = req_capsule_server_pack(&req->rq_pill);
if (rc)
RETURN(rc);
- repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
- sizeof(*repbody));
+ repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
rc = ost_punch_lock_get(exp, oinfo.oi_oa, &lh);
if (rc == 0) {
if (oinfo.oi_oa->o_valid & OBD_MD_FLFLAGS &&
*/
oinfo.oi_oa->o_valid &= ~OBD_MD_FLFLAGS;
- if (oinfo.oi_oa->o_valid & OBD_MD_FLOSSCAPA)
- oinfo.oi_capa = lustre_unpack_incoming_capa(req,
- REQ_REC_OFF + 1);
+ if (oinfo.oi_oa->o_valid & OBD_MD_FLOSSCAPA) {
+ oinfo.oi_capa = req_capsule_client_get(&req->rq_pill,
+ &RMF_CAPA1);
+ if (oinfo.oi_capa == NULL) {
+ CERROR("Missing capability for OST PUNCH");
+ RETURN (-EFAULT);
+ }
+ }
req->rq_status = obd_punch(exp, &oinfo, oti, NULL);
ost_punch_lock_put(exp, oinfo.oi_oa, &lh);
}
{
struct ost_body *body, *repbody;
struct lustre_capa *capa = NULL;
- __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*repbody) };
int rc;
ENTRY;
- body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
- lustre_swab_ost_body);
+ body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
if (body == NULL)
RETURN(-EFAULT);
- if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
- capa = lustre_unpack_incoming_capa(req, REQ_REC_OFF + 1);
+ if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
+ capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
+ if (capa == NULL) {
+ CERROR("Missing capability for OST SYNC");
+ RETURN (-EFAULT);
+ }
+ }
- rc = lustre_pack_reply(req, 2, size, NULL);
+ rc = req_capsule_server_pack(&req->rq_pill);
if (rc)
RETURN(rc);
- repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
- sizeof(*repbody));
+ repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
req->rq_status = obd_sync(exp, &repbody->oa, NULL, repbody->oa.o_size,
repbody->oa.o_blocks, capa);
struct obd_trans_info *oti)
{
struct ost_body *body, *repbody;
- __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*repbody) };
int rc;
struct obd_info oinfo = { { { 0 } } };
ENTRY;
- body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
- lustre_swab_ost_body);
+ body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
if (body == NULL)
RETURN(-EFAULT);
- rc = lustre_pack_reply(req, 2, size, NULL);
+ rc = req_capsule_server_pack(&req->rq_pill);
if (rc)
RETURN(rc);
- repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
- sizeof(*repbody));
+ repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
repbody->oa = body->oa;
oinfo.oi_oa = &repbody->oa;
- if (oinfo.oi_oa->o_valid & OBD_MD_FLOSSCAPA)
- oinfo.oi_capa = lustre_unpack_incoming_capa(req,
- REQ_REC_OFF + 1);
+ if (oinfo.oi_oa->o_valid & OBD_MD_FLOSSCAPA) {
+ oinfo.oi_capa = req_capsule_client_get(&req->rq_pill,
+ &RMF_CAPA1);
+ if (oinfo.oi_capa == NULL) {
+ CERROR("Missing capability for OST SETATTR");
+ RETURN (-EFAULT);
+ }
+ }
req->rq_status = obd_setattr(exp, &oinfo, oti);
ost_drop_id(exp, &repbody->oa);
RETURN(0);
if (nrbufs == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK))
RETURN(0);
- /* EXPENSIVE ASSERTION */
for (i = 1; i < nrbufs; i ++)
- LASSERT((nb[0].flags & OBD_BRW_SRVLOCK) ==
- (nb[i].flags & OBD_BRW_SRVLOCK));
+ if ((nb[0].flags & OBD_BRW_SRVLOCK) !=
+ (nb[i].flags & OBD_BRW_SRVLOCK))
+ RETURN(-EFAULT);
policy.l_extent.start = nb[0].offset & CFS_PAGE_MASK;
policy.l_extent.end = (nb[nrbufs - 1].offset +
RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id,
LDLM_EXTENT, &policy, mode, &flags,
ldlm_blocking_ast, ldlm_completion_ast,
- ldlm_glimpse_ast, NULL, 0, NULL,
- NULL, lh));
+ ldlm_glimpse_ast, NULL, 0, NULL, lh));
}
static void ost_brw_lock_put(int mode,
struct lustre_capa *capa = NULL;
struct l_wait_info lwi;
struct lustre_handle lockh = { 0 };
- __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
int niocount, npages, nob = 0, rc, i;
int no_reply = 0;
ENTRY;
/* ost_body, ioobj & noibuf_remote are verified and swabbed in
* ost_rw_hpreq_check(). */
- body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
- LASSERT(body != NULL);
+ body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
+ if (body == NULL)
+ GOTO(out, rc = -EFAULT);
- ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioo));
- LASSERT(ioo != NULL);
+ /*
+ * A req_capsule_X_get_array(pill, field, ptr_to_element_count) function
+ * would be useful here and wherever we get &RMF_OBD_IOOBJ and
+ * &RMF_NIOBUF_REMOTE.
+ */
+ ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
+ if (ioo == NULL)
+ GOTO(out, rc = -EFAULT);
niocount = ioo->ioo_bufcnt;
- remote_nb = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
- niocount * sizeof(*remote_nb));
- LASSERT(remote_nb != NULL);
+ remote_nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
+ if (remote_nb == NULL)
+ GOTO(out, rc = -EFAULT);
- if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
- capa = lustre_unpack_incoming_capa(req, REQ_REC_OFF + 3);
+ if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
+ capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
+ if (capa == NULL) {
+ CERROR("Missing capability for OST BRW READ");
+ GOTO(out, rc = -EFAULT);
+ }
+ }
- rc = lustre_pack_reply(req, 2, size, NULL);
+ req_capsule_set_size(&req->rq_pill, &RMF_RCS, RCL_SERVER, 0);
+ rc = req_capsule_server_pack(&req->rq_pill);
if (rc)
GOTO(out, rc);
remote_nb, npages, local_nb, oti, rc);
if (rc == 0) {
- repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
- sizeof(*repbody));
+ repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
ost_drop_id(exp, &repbody->oa);
}
struct lustre_handle lockh = {0};
struct lustre_capa *capa = NULL;
__u32 *rcs;
- __u32 size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
int objcount, niocount, npages;
int rc, i, j;
obd_count client_cksum = 0, server_cksum = 0;
/* ost_body, ioobj & noibuf_remote are verified and swabbed in
* ost_rw_hpreq_check(). */
- body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
- LASSERT(body != NULL);
+ body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
+ if (body == NULL)
+ GOTO(out, rc = -EFAULT);
if ((body->oa.o_flags & OBD_BRW_MEMALLOC) &&
(exp->exp_connection->c_peer.nid == exp->exp_connection->c_self))
libcfs_memory_pressure_set();
- objcount = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1) /
- sizeof(*ioo);
- ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1,
- objcount * sizeof(*ioo));
- LASSERT(ioo != NULL);
+ objcount = req_capsule_get_size(&req->rq_pill, &RMF_OBD_IOOBJ,
+ RCL_CLIENT) / sizeof(*ioo);
+ ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
+ if (ioo == NULL)
+ GOTO(out, rc = -EFAULT);
for (niocount = i = 0; i < objcount; i++)
niocount += ioo[i].ioo_bufcnt;
- remote_nb = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
- niocount * sizeof(*remote_nb));
- LASSERT(remote_nb != NULL);
+ /*
+ * It'd be nice to have a capsule function to indicate how many elements
+ * there were in a buffer for an RMF that's declared to be an array.
+ * It's easy enough to compute the number of elements here though.
+ */
+ remote_nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
+ if (remote_nb == NULL || niocount != (req_capsule_get_size(&req->rq_pill,
+ &RMF_NIOBUF_REMOTE, RCL_CLIENT) / sizeof(*remote_nb)))
+ GOTO(out, rc = -EFAULT);
- if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
- capa = lustre_unpack_incoming_capa(req, REQ_REC_OFF + 3);
+ if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
+ capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
+ if (capa == NULL) {
+ CERROR("Missing capability for OST BRW WRITE");
+ GOTO(out, rc = -EFAULT);
+ }
+ }
- size[REPLY_REC_OFF + 1] = niocount * sizeof(*rcs);
- rc = lustre_pack_reply(req, 3, size, NULL);
+ req_capsule_set_size(&req->rq_pill, &RMF_RCS, RCL_SERVER,
+ niocount * sizeof(*rcs));
+ rc = req_capsule_server_pack(&req->rq_pill);
if (rc != 0)
GOTO(out, rc);
OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_PACK, obd_fail_val);
- rcs = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF + 1,
- niocount * sizeof(*rcs));
+ rcs = req_capsule_server_get(&req->rq_pill, &RMF_RCS);
/*
* Per-thread array of struct niobuf_{local,remote}'s was allocated by
}
no_reply = rc != 0;
- repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
- sizeof(*repbody));
+ repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
if (unlikely(client_cksum != 0 && rc == 0)) {
RETURN(rc);
}
+/**
+ * Implementation of OST_SET_INFO.
+ *
+ * OST_SET_INFO is like ioctl(): heavily overloaded. Specifically, it takes a
+ * "key" and a value RPC buffers as arguments, with the value's contents
+ * interpreted according to the key.
+ *
+ * Value types that need swabbing have swabbing done explicitly, either here or
+ * in functions called from here. This should be corrected: all swabbing should
+ * be done in the capsule abstraction, as that will then allow us to move
+ * swabbing exclusively to the client without having to modify server code
+ * outside the capsule abstraction's implementation itself. To correct this
+ * will require minor changes to the capsule abstraction; see the comments for
+ * req_capsule_extend() in layout.c.
+ */
static int ost_set_info(struct obd_export *exp, struct ptlrpc_request *req)
{
struct ost_body *body = NULL, *repbody;
- __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
char *key, *val = NULL;
int keylen, vallen, rc = 0;
+ int is_grant_shrink = 0;
ENTRY;
- key = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, 1);
+ key = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
if (key == NULL) {
DEBUG_REQ(D_HA, req, "no set_info key");
RETURN(-EFAULT);
}
- keylen = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF);
+ keylen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_KEY,
+ RCL_CLIENT);
- if (KEY_IS(KEY_GRANT_SHRINK)) {
- rc = lustre_pack_reply(req, 2, size, NULL);
- if (rc)
- RETURN(rc);
- } else {
- rc = lustre_pack_reply(req, 1, NULL, NULL);
- if (rc)
- RETURN(rc);
- }
+ vallen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_VAL,
+ RCL_CLIENT);
+
+ if ((is_grant_shrink = KEY_IS(KEY_GRANT_SHRINK)))
+ /* In this case the value is actually an RMF_OST_BODY, so we
+ * transmutate the type of this PTLRPC */
+ req_capsule_extend(&req->rq_pill, &RQF_OST_SET_GRANT_INFO);
+
+ rc = req_capsule_server_pack(&req->rq_pill);
+ if (rc)
+ RETURN(rc);
- vallen = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1);
if (vallen) {
- if (KEY_IS(KEY_GRANT_SHRINK)) {
- body = lustre_swab_reqbuf(req, REQ_REC_OFF + 1,
- sizeof(*body),
- lustre_swab_ost_body);
+ if (is_grant_shrink) {
+ body = req_capsule_client_get(&req->rq_pill,
+ &RMF_OST_BODY);
if (!body)
RETURN(-EFAULT);
- repbody = lustre_msg_buf(req->rq_repmsg,
- REPLY_REC_OFF,
- sizeof(*repbody));
+ repbody = req_capsule_server_get(&req->rq_pill,
+ &RMF_OST_BODY);
memcpy(repbody, body, sizeof(*body));
val = (char*)repbody;
- } else
- val = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1,0);
+ } else {
+ val = req_capsule_client_get(&req->rq_pill,
+ &RMF_SETINFO_VAL);
+ }
}
if (KEY_IS(KEY_EVICT_BY_NID)) {
obd_export_evict_by_nid(exp->exp_obd, val);
GOTO(out, rc = 0);
} else if (KEY_IS(KEY_MDS_CONN) && ptlrpc_req_need_swab(req)) {
- /* Val's are not swabbed automatically */
+ if (vallen < sizeof(__u32))
+ RETURN(-EFAULT);
__swab32s((__u32 *)val);
}
+ /* OBD will also check if KEY_IS(KEY_GRANT_SHRINK), and will cast val to
+ * a struct ost_body * value */
rc = obd_set_info_async(exp, keylen, key, vallen, val, NULL);
out:
lustre_msg_set_status(req->rq_repmsg, 0);
struct req_capsule *pill = &req->rq_pill;
ENTRY;
- req_capsule_set(&req->rq_pill, &RQF_OST_GET_INFO_GENERIC);
-
/* this common part for get_info rpc */
key = req_capsule_client_get(pill, &RMF_SETINFO_KEY);
if (key == NULL) {
int rc;
ENTRY;
- body = lustre_msg_buf(req->rq_reqmsg, 1, sizeof(*body));
+ body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_CONN_BODY);
rc = obd_llog_connect(exp, body);
RETURN(rc);
}
return rc;
}
+/**
+ * Returns 1 if the given PTLRPC matches the given LDLM locks, or 0 if it does
+ * not.
+ */
static int ost_rw_hpreq_lock_match(struct ptlrpc_request *req,
struct ldlm_lock *lock)
{
/* As the request may be covered by several locks, do not look at
* o_handle, look at the RPC IO region. */
- body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
- lustre_swab_obdo);
- objcount = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1) /
- sizeof(*ioo);
- ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1,
- objcount * sizeof(*ioo));
- LASSERT(ioo != NULL);
+ body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
+ if (body == NULL)
+ RETURN(0);
+
+ objcount = req_capsule_get_size(&req->rq_pill, &RMF_OBD_IOOBJ,
+ RCL_CLIENT) / sizeof(*ioo);
+ ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
+ if (ioo == NULL)
+ RETURN(0);
+
for (niocount = i = 0; i < objcount; i++)
niocount += ioo[i].ioo_bufcnt;
- nb = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
- niocount * sizeof(*nb));
- LASSERT(nb != NULL);
+ nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
+ if (nb == NULL ||
+ niocount != (req_capsule_get_size(&req->rq_pill, &RMF_NIOBUF_REMOTE,
+ RCL_CLIENT) / sizeof(*nb)))
+ RETURN(0);
mode = LCK_PW;
if (opc == OST_READ)
}
/**
- * Swab buffers needed to call ost_rw_prolong_locks() and call it.
- * Return the value from ost_rw_prolong_locks() which is non-zero if
- * there is a cancelled lock which is waiting for this IO request.
+ * High-priority queue request check for whether the given PTLRPC request (\a
+ * req) is blocking an LDLM lock cancel.
+ *
+ * Returns 1 if the given given PTLRPC request (\a req) is blocking an LDLM lock
+ * cancel, 0 if it is not, and -EFAULT if the request is malformed.
+ *
+ * Only OST_READs, OST_WRITEs and OST_PUNCHes go on the h-p RPC queue. This
+ * function looks only at OST_READs and OST_WRITEs.
*/
static int ost_rw_hpreq_check(struct ptlrpc_request *req)
{
opc = lustre_msg_get_opc(req->rq_reqmsg);
LASSERT(opc == OST_READ || opc == OST_WRITE);
- body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
- LASSERT(body != NULL);
+ body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
+ if (body == NULL)
+ RETURN(-EFAULT);
- objcount = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1) /
- sizeof(*ioo);
- ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1,
- objcount * sizeof(*ioo));
- LASSERT(ioo != NULL);
+ objcount = req_capsule_get_size(&req->rq_pill, &RMF_OBD_IOOBJ,
+ RCL_CLIENT) / sizeof(*ioo);
+ ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
+ if (ioo == NULL)
+ RETURN(-EFAULT);
for (niocount = i = 0; i < objcount; i++)
niocount += ioo[i].ioo_bufcnt;
- nb = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
- niocount * sizeof(*nb));
- LASSERT(nb != NULL);
- LASSERT(niocount == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK));
+ nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
+ if (nb == NULL ||
+ niocount != (req_capsule_get_size(&req->rq_pill, &RMF_NIOBUF_REMOTE,
+ RCL_CLIENT) / sizeof(*nb)))
+ RETURN(-EFAULT);
+ if (niocount != 0 && (nb[0].flags & OBD_BRW_SRVLOCK))
+ RETURN(-EFAULT);
mode = LCK_PW;
if (opc == OST_READ)
RETURN(opd.opd_lock_match);
}
+/**
+ * Like ost_rw_hpreq_lock_match(), but for OST_PUNCH RPCs.
+ */
static int ost_punch_hpreq_lock_match(struct ptlrpc_request *req,
struct ldlm_lock *lock)
{
struct ost_body *body;
ENTRY;
- body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
- lustre_swab_obdo);
- LASSERT(body != NULL);
+ body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
+ if (body == NULL)
+ RETURN(0); /* can't return -EFAULT here */
if (body->oa.o_valid & OBD_MD_FLHANDLE &&
body->oa.o_handle.cookie == lock->l_handle.h_cookie)
RETURN(0);
}
+/**
+ * Like ost_rw_hpreq_check(), but for OST_PUNCH RPCs.
+ */
static int ost_punch_hpreq_check(struct ptlrpc_request *req)
{
- struct ost_body *body = lustre_msg_buf(req->rq_reqmsg,
- REQ_REC_OFF, sizeof(*body));
- LASSERT(body != NULL);
+ struct ost_body *body;
+
+ body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
+ if (body == NULL)
+ RETURN(-EFAULT);
+
LASSERT(!(body->oa.o_valid & OBD_MD_FLFLAGS) ||
!(body->oa.o_flags & OBD_FL_TRUNCLOCK));
struct niobuf_remote *nb;
struct obd_ioobj *ioo;
int objcount, niocount;
- int swab, i;
+ int i;
+
+ /* RPCs on the H-P queue can be inspected before
+ * ost_handler() initializes their pills, so we
+ * initialize that here. Capsule initialization is
+ * idempotent, as is setting the pill's format (provided
+ * it doesn't change).
+ */
+ req_capsule_init(&req->rq_pill, req, RCL_SERVER);
+ req_capsule_set(&req->rq_pill, &RQF_OST_BRW);
- body = lustre_swab_reqbuf(req, REQ_REC_OFF,
- sizeof(*body),
- lustre_swab_obdo);
- if (!body) {
+ body = req_capsule_client_get(&req->rq_pill,
+ &RMF_OST_BODY);
+ if (body == NULL) {
CERROR("Missing/short ost_body\n");
RETURN(-EFAULT);
}
- objcount = lustre_msg_buflen(req->rq_reqmsg,
- REQ_REC_OFF + 1) /
- sizeof(*ioo);
+ objcount = req_capsule_get_size(&req->rq_pill,
+ &RMF_OBD_IOOBJ,
+ RCL_CLIENT) /
+ sizeof(*ioo);
if (objcount == 0) {
CERROR("Missing/short ioobj\n");
RETURN(-EFAULT);
RETURN(-EFAULT);
}
- ioo = lustre_swab_reqbuf(req, REQ_REC_OFF + 1,
- objcount * sizeof(*ioo),
- lustre_swab_obd_ioobj);
- if (!ioo) {
+ ioo = req_capsule_client_get(&req->rq_pill,
+ &RMF_OBD_IOOBJ);
+ if (ioo == NULL) {
CERROR("Missing/short ioobj\n");
RETURN(-EFAULT);
}
- swab = ptlrpc_req_need_swab(req);
for (niocount = i = 0; i < objcount; i++) {
- if (i > 0 && swab)
- lustre_swab_obd_ioobj(&ioo[i]);
if (ioo[i].ioo_bufcnt == 0) {
CERROR("ioo[%d] has zero bufcnt\n", i);
RETURN(-EFAULT);
niocount += ioo[i].ioo_bufcnt;
}
if (niocount > PTLRPC_MAX_BRW_PAGES) {
- DEBUG_REQ(D_ERROR, req, "bulk has too many "
- "pages (%d)", niocount);
+ DEBUG_REQ(D_RPCTRACE, req,
+ "bulk has too many pages (%d)",
+ niocount);
RETURN(-EFAULT);
}
- nb = lustre_swab_reqbuf(req, REQ_REC_OFF + 2,
- niocount * sizeof(*nb),
- lustre_swab_niobuf_remote);
- if (!nb) {
+ nb = req_capsule_client_get(&req->rq_pill,
+ &RMF_NIOBUF_REMOTE);
+ if (nb == NULL) {
CERROR("Missing/short niobuf\n");
RETURN(-EFAULT);
}
- swab = ptlrpc_req_need_swab(req);
- if (swab) {
- /* swab remaining niobufs */
- for (i = 1; i < niocount; i++)
- lustre_swab_niobuf_remote(&nb[i]);
- }
-
if (niocount == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK))
req->rq_ops = &ost_hpreq_rw;
} else if (opc == OST_PUNCH) {
- body = lustre_swab_reqbuf(req, REQ_REC_OFF,
- sizeof(*body),
- lustre_swab_obdo);
- if (!body) {
+ req_capsule_init(&req->rq_pill, req, RCL_SERVER);
+ req_capsule_set(&req->rq_pill, &RQF_OST_PUNCH);
+
+ body = req_capsule_client_get(&req->rq_pill,
+ &RMF_OST_BODY);
+ if (body == NULL) {
CERROR("Missing/short ost_body\n");
RETURN(-EFAULT);
}
break;
case OST_CREATE:
CDEBUG(D_INODE, "create\n");
+ req_capsule_set(&req->rq_pill, &RQF_OST_CREATE);
if (OBD_FAIL_CHECK(OBD_FAIL_OST_CREATE_NET))
RETURN(0);
if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOSPC))
break;
case OST_DESTROY:
CDEBUG(D_INODE, "destroy\n");
+ req_capsule_set(&req->rq_pill, &RQF_OST_DESTROY);
if (OBD_FAIL_CHECK(OBD_FAIL_OST_DESTROY_NET))
RETURN(0);
if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
break;
case OST_GETATTR:
CDEBUG(D_INODE, "getattr\n");
+ req_capsule_set(&req->rq_pill, &RQF_OST_GETATTR);
if (OBD_FAIL_CHECK(OBD_FAIL_OST_GETATTR_NET))
RETURN(0);
rc = ost_getattr(req->rq_export, req);
break;
case OST_SETATTR:
CDEBUG(D_INODE, "setattr\n");
+ req_capsule_set(&req->rq_pill, &RQF_OST_SETATTR);
if (OBD_FAIL_CHECK(OBD_FAIL_OST_SETATTR_NET))
RETURN(0);
rc = ost_setattr(req->rq_export, req, oti);
break;
case OST_WRITE:
+ req_capsule_set(&req->rq_pill, &RQF_OST_BRW);
CDEBUG(D_INODE, "write\n");
/* req->rq_request_portal would be nice, if it was set */
if (req->rq_rqbd->rqbd_service->srv_req_portal !=OST_IO_PORTAL){
/* ost_brw_write sends its own replies */
RETURN(rc);
case OST_READ:
+ req_capsule_set(&req->rq_pill, &RQF_OST_BRW);
CDEBUG(D_INODE, "read\n");
/* req->rq_request_portal would be nice, if it was set */
if (req->rq_rqbd->rqbd_service->srv_req_portal !=OST_IO_PORTAL){
RETURN(rc);
case OST_PUNCH:
CDEBUG(D_INODE, "punch\n");
+ req_capsule_set(&req->rq_pill, &RQF_OST_PUNCH);
if (OBD_FAIL_CHECK(OBD_FAIL_OST_PUNCH_NET))
RETURN(0);
if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
break;
case OST_STATFS:
CDEBUG(D_INODE, "statfs\n");
+ req_capsule_set(&req->rq_pill, &RQF_OST_STATFS);
if (OBD_FAIL_CHECK(OBD_FAIL_OST_STATFS_NET))
RETURN(0);
rc = ost_statfs(req);
break;
case OST_SYNC:
CDEBUG(D_INODE, "sync\n");
+ req_capsule_set(&req->rq_pill, &RQF_OST_SYNC);
if (OBD_FAIL_CHECK(OBD_FAIL_OST_SYNC_NET))
RETURN(0);
rc = ost_sync(req->rq_export, req);
break;
case OST_GET_INFO:
DEBUG_REQ(D_INODE, req, "get_info");
+ req_capsule_set(&req->rq_pill, &RQF_OST_GET_INFO_GENERIC);
rc = ost_get_info(req->rq_export, req);
break;
#ifdef HAVE_QUOTA_SUPPORT
*
* Author: Nikita Danilov <nikita@clusterfs.com>
*/
+/*
+ * This file contains the "capsule/pill" abstraction layered above PTLRPC.
+ *
+ * Every struct ptlrpc_request contains a "pill", which points to a description
+ * of the format that the request conforms to.
+ */
#if !defined(__REQ_LAYOUT_USER__)
/* struct ptlrpc_request, lustre_msg* */
#include <lustre_req_layout.h>
#include <lustre_acl.h>
+#include <lustre_debug.h>
/*
- * empty set of fields... for suitable definition of emptiness.
+ * RQFs (see below) refer to two struct req_msg_field arrays describing the
+ * client request and server reply, respectively.
*/
+/* empty set of fields... for suitable definition of emptiness. */
static const struct req_msg_field *empty[] = {
&RMF_PTLRPC_BODY
};
&RMF_SETINFO_VAL
};
+static const struct req_msg_field *ost_grant_shrink_client[] = {
+ &RMF_PTLRPC_BODY,
+ &RMF_SETINFO_KEY,
+ &RMF_OST_BODY
+};
+
static const struct req_msg_field *mds_getinfo_client[] = {
&RMF_PTLRPC_BODY,
&RMF_GETINFO_KEY,
static const struct req_msg_field *ost_brw_server[] = {
&RMF_PTLRPC_BODY,
&RMF_OST_BODY,
- &RMF_NIOBUF_REMOTE
+ &RMF_RCS
};
static const struct req_msg_field *ost_get_info_generic_server[] = {
__u32 rmf_flags;
const char *rmf_name;
/**
- * Field length. (-1) means "variable length".
+ * Field length. (-1) means "variable length". If the
+ * \a RMF_F_STRUCT_ARRAY flag is set the field is also variable-length,
+ * but the actual size must be a whole multiple of \a rmf_size.
*/
int rmf_size;
void (*rmf_swabber)(void *);
+ void (*rmf_dumper)(void *);
int rmf_offset[ARRAY_SIZE(req_formats)][RCL_NR];
};
enum rmf_flags {
+ /**
+ * The field is a string, must be NUL-terminated.
+ */
RMF_F_STRING = 1 << 0,
- RMF_F_NO_SIZE_CHECK = 1 << 1
+ /**
+ * The field's buffer size need not match the declared \a rmf_size.
+ */
+ RMF_F_NO_SIZE_CHECK = 1 << 1,
+ /**
+ * The field's buffer size must be a whole multiple of the declared \a
+ * rmf_size and the \a rmf_swabber function must work on the declared \a
+ * rmf_size worth of bytes.
+ */
+ RMF_F_STRUCT_ARRAY = 1 << 2
};
struct req_capsule;
/*
* Request fields.
*/
-#define DEFINE_MSGF(name, flags, size, swabber) { \
- .rmf_name = (name), \
- .rmf_flags = (flags), \
- .rmf_size = (size), \
- .rmf_swabber = (void (*)(void*))(swabber) \
+#define DEFINE_MSGF(name, flags, size, swabber, dumper) { \
+ .rmf_name = (name), \
+ .rmf_flags = (flags), \
+ .rmf_size = (size), \
+ .rmf_swabber = (void (*)(void*))(swabber), \
+ .rmf_dumper = (void (*)(void*))(dumper) \
}
const struct req_msg_field RMF_GENERIC_DATA =
DEFINE_MSGF("generic_data", 0,
- -1, NULL);
+ -1, NULL, NULL);
EXPORT_SYMBOL(RMF_GENERIC_DATA);
const struct req_msg_field RMF_MGS_TARGET_INFO =
DEFINE_MSGF("mgs_target_info", 0,
sizeof(struct mgs_target_info),
- lustre_swab_mgs_target_info);
+ lustre_swab_mgs_target_info, NULL);
EXPORT_SYMBOL(RMF_MGS_TARGET_INFO);
const struct req_msg_field RMF_MGS_SEND_PARAM =
DEFINE_MSGF("mgs_send_param", 0,
sizeof(struct mgs_send_param),
- NULL);
+ NULL, NULL);
EXPORT_SYMBOL(RMF_MGS_SEND_PARAM);
const struct req_msg_field RMF_SETINFO_VAL =
- DEFINE_MSGF("setinfo_val", 0, -1, NULL);
+ DEFINE_MSGF("setinfo_val", 0, -1, NULL, NULL);
EXPORT_SYMBOL(RMF_SETINFO_VAL);
const struct req_msg_field RMF_GETINFO_KEY =
- DEFINE_MSGF("getinfo_key", 0, -1, NULL);
+ DEFINE_MSGF("getinfo_key", 0, -1, NULL, NULL);
EXPORT_SYMBOL(RMF_GETINFO_KEY);
const struct req_msg_field RMF_GETINFO_VALLEN =
DEFINE_MSGF("getinfo_vallen", 0,
- sizeof(__u32), lustre_swab_generic_32s);
+ sizeof(__u32), lustre_swab_generic_32s, NULL);
EXPORT_SYMBOL(RMF_GETINFO_VALLEN);
const struct req_msg_field RMF_GETINFO_VAL =
- DEFINE_MSGF("getinfo_val", 0, -1, NULL);
+ DEFINE_MSGF("getinfo_val", 0, -1, NULL, NULL);
EXPORT_SYMBOL(RMF_GETINFO_VAL);
const struct req_msg_field RMF_SEQ_OPC =
DEFINE_MSGF("seq_query_opc", 0,
- sizeof(__u32), lustre_swab_generic_32s);
+ sizeof(__u32), lustre_swab_generic_32s, NULL);
EXPORT_SYMBOL(RMF_SEQ_OPC);
const struct req_msg_field RMF_SEQ_RANGE =
DEFINE_MSGF("seq_query_range", 0,
- sizeof(struct lu_seq_range), lustre_swab_lu_seq_range);
+ sizeof(struct lu_seq_range),
+ lustre_swab_lu_seq_range, NULL);
EXPORT_SYMBOL(RMF_SEQ_RANGE);
const struct req_msg_field RMF_FLD_OPC =
DEFINE_MSGF("fld_query_opc", 0,
- sizeof(__u32), lustre_swab_generic_32s);
+ sizeof(__u32), lustre_swab_generic_32s, NULL);
EXPORT_SYMBOL(RMF_FLD_OPC);
const struct req_msg_field RMF_FLD_MDFLD =
DEFINE_MSGF("fld_query_mdfld", 0,
- sizeof(struct lu_seq_range), lustre_swab_lu_seq_range);
+ sizeof(struct lu_seq_range),
+ lustre_swab_lu_seq_range, NULL);
EXPORT_SYMBOL(RMF_FLD_MDFLD);
const struct req_msg_field RMF_MDT_BODY =
DEFINE_MSGF("mdt_body", 0,
- sizeof(struct mdt_body), lustre_swab_mdt_body);
+ sizeof(struct mdt_body), lustre_swab_mdt_body, NULL);
EXPORT_SYMBOL(RMF_MDT_BODY);
const struct req_msg_field RMF_OBD_QUOTACTL =
DEFINE_MSGF("obd_quotactl", 0,
- sizeof(struct obd_quotactl), lustre_swab_obd_quotactl);
+ sizeof(struct obd_quotactl),
+ lustre_swab_obd_quotactl, NULL);
EXPORT_SYMBOL(RMF_OBD_QUOTACTL);
const struct req_msg_field RMF_QUOTA_ADJUST_QUNIT =
DEFINE_MSGF("quota_adjust_qunit", 0,
sizeof(struct quota_adjust_qunit),
- lustre_swab_quota_adjust_qunit);
+ lustre_swab_quota_adjust_qunit, NULL);
EXPORT_SYMBOL(RMF_QUOTA_ADJUST_QUNIT);
const struct req_msg_field RMF_QUNIT_DATA =
DEFINE_MSGF("qunit_data", 0,
- sizeof(struct qunit_data), NULL);
+ sizeof(struct qunit_data), lustre_swab_qdata, NULL);
EXPORT_SYMBOL(RMF_QUNIT_DATA);
const struct req_msg_field RMF_MDT_EPOCH =
DEFINE_MSGF("mdt_epoch", 0,
- sizeof(struct mdt_epoch), lustre_swab_mdt_epoch);
+ sizeof(struct mdt_epoch), lustre_swab_mdt_epoch, NULL);
EXPORT_SYMBOL(RMF_MDT_EPOCH);
const struct req_msg_field RMF_PTLRPC_BODY =
DEFINE_MSGF("ptlrpc_body", 0,
- sizeof(struct ptlrpc_body), lustre_swab_ptlrpc_body);
+ sizeof(struct ptlrpc_body), lustre_swab_ptlrpc_body, NULL);
EXPORT_SYMBOL(RMF_PTLRPC_BODY);
const struct req_msg_field RMF_OBD_STATFS =
DEFINE_MSGF("obd_statfs", 0,
- sizeof(struct obd_statfs), lustre_swab_obd_statfs);
+ sizeof(struct obd_statfs), lustre_swab_obd_statfs, NULL);
EXPORT_SYMBOL(RMF_OBD_STATFS);
const struct req_msg_field RMF_SETINFO_KEY =
- DEFINE_MSGF("setinfo_key", 0, -1, NULL);
+ DEFINE_MSGF("setinfo_key", 0, -1, NULL, NULL);
EXPORT_SYMBOL(RMF_SETINFO_KEY);
const struct req_msg_field RMF_NAME =
- DEFINE_MSGF("name", RMF_F_STRING, -1, NULL);
+ DEFINE_MSGF("name", RMF_F_STRING, -1, NULL, NULL);
EXPORT_SYMBOL(RMF_NAME);
const struct req_msg_field RMF_SYMTGT =
- DEFINE_MSGF("symtgt", RMF_F_STRING, -1, NULL);
+ DEFINE_MSGF("symtgt", RMF_F_STRING, -1, NULL, NULL);
EXPORT_SYMBOL(RMF_SYMTGT);
const struct req_msg_field RMF_TGTUUID =
- DEFINE_MSGF("tgtuuid", RMF_F_STRING, sizeof(struct obd_uuid) - 1, NULL);
+ DEFINE_MSGF("tgtuuid", RMF_F_STRING, sizeof(struct obd_uuid) - 1, NULL,
+ NULL);
EXPORT_SYMBOL(RMF_TGTUUID);
const struct req_msg_field RMF_CLUUID =
- DEFINE_MSGF("cluuid", RMF_F_STRING, sizeof(struct obd_uuid) - 1, NULL);
+ DEFINE_MSGF("cluuid", RMF_F_STRING, sizeof(struct obd_uuid) - 1, NULL,
+ NULL);
EXPORT_SYMBOL(RMF_CLUUID);
const struct req_msg_field RMF_STRING =
- DEFINE_MSGF("string", RMF_F_STRING, -1, NULL);
+ DEFINE_MSGF("string", RMF_F_STRING, -1, NULL, NULL);
EXPORT_SYMBOL(RMF_STRING);
const struct req_msg_field RMF_LLOGD_BODY =
DEFINE_MSGF("llogd_body", 0,
- sizeof(struct llogd_body), lustre_swab_llogd_body);
+ sizeof(struct llogd_body), lustre_swab_llogd_body, NULL);
EXPORT_SYMBOL(RMF_LLOGD_BODY);
const struct req_msg_field RMF_LLOG_LOG_HDR =
DEFINE_MSGF("llog_log_hdr", 0,
- sizeof(struct llog_log_hdr), lustre_swab_llog_hdr);
+ sizeof(struct llog_log_hdr), lustre_swab_llog_hdr, NULL);
EXPORT_SYMBOL(RMF_LLOG_LOG_HDR);
const struct req_msg_field RMF_LLOGD_CONN_BODY =
DEFINE_MSGF("llogd_conn_body", 0,
sizeof(struct llogd_conn_body),
- lustre_swab_llogd_conn_body);
+ lustre_swab_llogd_conn_body, NULL);
EXPORT_SYMBOL(RMF_LLOGD_CONN_BODY);
/*
* connection handle received in MDS_CONNECT request.
*
- * XXX no swabbing?
+ * No swabbing needed because struct lustre_handle contains only a 64-bit cookie
+ * that the client does not interpret at all.
*/
const struct req_msg_field RMF_CONN =
- DEFINE_MSGF("conn", 0, sizeof(struct lustre_handle), NULL);
+ DEFINE_MSGF("conn", 0, sizeof(struct lustre_handle), NULL, NULL);
EXPORT_SYMBOL(RMF_CONN);
const struct req_msg_field RMF_CONNECT_DATA =
DEFINE_MSGF("cdata",
RMF_F_NO_SIZE_CHECK /* we allow extra space for interop */,
- sizeof(struct obd_connect_data), lustre_swab_connect);
+ sizeof(struct obd_connect_data), lustre_swab_connect, NULL);
EXPORT_SYMBOL(RMF_CONNECT_DATA);
const struct req_msg_field RMF_DLM_REQ =
DEFINE_MSGF("dlm_req", RMF_F_NO_SIZE_CHECK /* ldlm_request_bufsize */,
- sizeof(struct ldlm_request), lustre_swab_ldlm_request);
+ sizeof(struct ldlm_request),
+ lustre_swab_ldlm_request, NULL);
EXPORT_SYMBOL(RMF_DLM_REQ);
const struct req_msg_field RMF_DLM_REP =
DEFINE_MSGF("dlm_rep", 0,
- sizeof(struct ldlm_reply), lustre_swab_ldlm_reply);
+ sizeof(struct ldlm_reply), lustre_swab_ldlm_reply, NULL);
EXPORT_SYMBOL(RMF_DLM_REP);
const struct req_msg_field RMF_LDLM_INTENT =
DEFINE_MSGF("ldlm_intent", 0,
- sizeof(struct ldlm_intent), lustre_swab_ldlm_intent);
+ sizeof(struct ldlm_intent), lustre_swab_ldlm_intent, NULL);
EXPORT_SYMBOL(RMF_LDLM_INTENT);
const struct req_msg_field RMF_DLM_LVB =
- DEFINE_MSGF("dlm_lvb", 0, sizeof(struct ost_lvb), NULL);
+ DEFINE_MSGF("dlm_lvb", 0, sizeof(struct ost_lvb), lustre_swab_ost_lvb,
+ NULL);
EXPORT_SYMBOL(RMF_DLM_LVB);
const struct req_msg_field RMF_MDT_MD =
- DEFINE_MSGF("mdt_md", RMF_F_NO_SIZE_CHECK, MIN_MD_SIZE, NULL);
+ DEFINE_MSGF("mdt_md", RMF_F_NO_SIZE_CHECK, MIN_MD_SIZE, NULL, NULL);
EXPORT_SYMBOL(RMF_MDT_MD);
const struct req_msg_field RMF_REC_REINT =
DEFINE_MSGF("rec_reint", 0, sizeof(struct mdt_rec_reint),
- lustre_swab_mdt_rec_reint);
+ lustre_swab_mdt_rec_reint, NULL);
EXPORT_SYMBOL(RMF_REC_REINT);
/* FIXME: this length should be defined as a macro */
-const struct req_msg_field RMF_EADATA = DEFINE_MSGF("eadata", 0, -1, NULL);
+const struct req_msg_field RMF_EADATA = DEFINE_MSGF("eadata", 0, -1,
+ NULL, NULL);
EXPORT_SYMBOL(RMF_EADATA);
const struct req_msg_field RMF_ACL =
DEFINE_MSGF("acl", RMF_F_NO_SIZE_CHECK,
- LUSTRE_POSIX_ACL_MAX_SIZE, NULL);
+ LUSTRE_POSIX_ACL_MAX_SIZE, NULL, NULL);
EXPORT_SYMBOL(RMF_ACL);
+/* FIXME: this should be made to use RMF_F_STRUCT_ARRAY */
const struct req_msg_field RMF_LOGCOOKIES =
DEFINE_MSGF("logcookies", RMF_F_NO_SIZE_CHECK /* multiple cookies */,
- sizeof(struct llog_cookie), NULL);
+ sizeof(struct llog_cookie), NULL, NULL);
EXPORT_SYMBOL(RMF_LOGCOOKIES);
const struct req_msg_field RMF_CAPA1 =
DEFINE_MSGF("capa", 0, sizeof(struct lustre_capa),
- lustre_swab_lustre_capa);
+ lustre_swab_lustre_capa, NULL);
EXPORT_SYMBOL(RMF_CAPA1);
const struct req_msg_field RMF_CAPA2 =
DEFINE_MSGF("capa", 0, sizeof(struct lustre_capa),
- lustre_swab_lustre_capa);
+ lustre_swab_lustre_capa, NULL);
EXPORT_SYMBOL(RMF_CAPA2);
/*
*/
const struct req_msg_field RMF_OST_BODY =
DEFINE_MSGF("ost_body", 0,
- sizeof(struct ost_body), lustre_swab_ost_body);
+ sizeof(struct ost_body), lustre_swab_ost_body, dump_ost_body);
EXPORT_SYMBOL(RMF_OST_BODY);
const struct req_msg_field RMF_OBD_IOOBJ =
- DEFINE_MSGF("obd_ioobj", 0,
- sizeof(struct obd_ioobj), lustre_swab_obd_ioobj);
+ DEFINE_MSGF("obd_ioobj", RMF_F_STRUCT_ARRAY,
+ sizeof(struct obd_ioobj), lustre_swab_obd_ioobj, dump_ioo);
EXPORT_SYMBOL(RMF_OBD_IOOBJ);
const struct req_msg_field RMF_NIOBUF_REMOTE =
- DEFINE_MSGF("niobuf_remote", 0, -1, lustre_swab_niobuf_remote);
+ DEFINE_MSGF("niobuf_remote", RMF_F_STRUCT_ARRAY,
+ sizeof(struct niobuf_remote), lustre_swab_niobuf_remote,
+ dump_rniobuf);
EXPORT_SYMBOL(RMF_NIOBUF_REMOTE);
+const struct req_msg_field RMF_RCS =
+ DEFINE_MSGF("niobuf_remote", RMF_F_STRUCT_ARRAY, sizeof(__u32),
+ lustre_swab_generic_32s, dump_rcs);
+EXPORT_SYMBOL(RMF_RCS);
+
const struct req_msg_field RMF_OBD_ID =
DEFINE_MSGF("obd_id", 0,
- sizeof(obd_id), lustre_swab_ost_last_id);
+ sizeof(obd_id), lustre_swab_ost_last_id, NULL);
EXPORT_SYMBOL(RMF_OBD_ID);
const struct req_msg_field RMF_FIEMAP_KEY =
DEFINE_MSGF("fiemap", 0, sizeof(struct ll_fiemap_info_key),
- lustre_swab_fiemap);
+ lustre_swab_fiemap, NULL);
EXPORT_SYMBOL(RMF_FIEMAP_KEY);
const struct req_msg_field RMF_FIEMAP_VAL =
- DEFINE_MSGF("fiemap", 0, -1, lustre_swab_fiemap);
+ DEFINE_MSGF("fiemap", 0, -1, lustre_swab_fiemap, NULL);
EXPORT_SYMBOL(RMF_FIEMAP_VAL);
/*
EXPORT_SYMBOL(RQF_OST_STATFS);
const struct req_format RQF_OST_SET_GRANT_INFO =
- DEFINE_REQ_FMT0("OST_SET_GRANT_INFO", obd_set_info_client,
+ DEFINE_REQ_FMT0("OST_SET_GRANT_INFO", ost_grant_shrink_client,
ost_body_only);
EXPORT_SYMBOL(RQF_OST_SET_GRANT_INFO);
#if !defined(__REQ_LAYOUT_USER__)
+/* Convenience macro */
+#define FMT_FIELD(fmt, i, j) (fmt)->rf_fields[(i)].d[(j)]
+
+/**
+ * Initializes the capsule abstraction by computing and setting the \a rf_idx
+ * field of RQFs and the \a rmf_offset field of RMFs.
+ */
int req_layout_init(void)
{
int i;
struct req_msg_field *field;
field = (typeof(field))rf->rf_fields[j].d[k];
+ LASSERT(!(field->rmf_flags & RMF_F_STRUCT_ARRAY)
+ || field->rmf_size > 0);
LASSERT(field->rmf_offset[i][j] == 0);
/*
* k + 1 to detect unused format/field
}
EXPORT_SYMBOL(req_layout_fini);
+/**
+ * Initializes the expected sizes of each RMF in a \a pill (\a rc_area) to -1.
+ *
+ * Actual/expected field sizes are set elsewhere in functions in this file:
+ * req_capsule_init(), req_capsule_server_pack(), req_capsule_set_size() and
+ * req_capsule_msg_size(). The \a rc_area information is used by.
+ * ptlrpc_request_set_replen().
+ */
void req_capsule_init_area(struct req_capsule *pill)
{
int i;
}
EXPORT_SYMBOL(req_capsule_init_area);
-/*
- * Initialize capsule.
+/**
+ * Initialize a pill.
*
- * @area is an array of REQ_MAX_FIELD_NR elements, used to store sizes of
- * variable-sized fields.
+ * The \a location indicates whether the caller is executing on the client side
+ * (RCL_CLIENT) or server side (RCL_SERVER)..
*/
void req_capsule_init(struct req_capsule *pill,
struct ptlrpc_request *req,
{
LASSERT(location == RCL_SERVER || location == RCL_CLIENT);
+ /*
+ * Today all capsules are embedded in ptlrpc_request structs,
+ * but just in case that ever isn't the case, we don't reach
+ * into req unless req != NULL and pill is the one embedded in
+ * the req.
+ *
+ * The req->rq_pill_init flag makes it safe to initialize a pill
+ * twice, which might happen in the OST paths as a result of the
+ * high-priority RPC queue getting peeked at before ost_handle()
+ * handles an OST RPC.
+ */
+ if (req != NULL && pill == &req->rq_pill && req->rq_pill_init)
+ return;
+
memset(pill, 0, sizeof *pill);
pill->rc_req = req;
pill->rc_loc = location;
req_capsule_init_area(pill);
+
+ if (req != NULL && pill == &req->rq_pill)
+ req->rq_pill_init = 1;
}
EXPORT_SYMBOL(req_capsule_init);
return loc == RCL_CLIENT ? req->rq_reqmsg : req->rq_repmsg;
}
+/**
+ * Set the format (\a fmt) of a \a pill; format changes are not allowed here
+ * (see req_capsule_extend()).
+ */
void req_capsule_set(struct req_capsule *pill, const struct req_format *fmt)
{
- LASSERT(pill->rc_fmt == NULL);
+ LASSERT(pill->rc_fmt == NULL || pill->rc_fmt == fmt);
LASSERT(__req_format_is_sane(fmt));
pill->rc_fmt = fmt;
}
EXPORT_SYMBOL(req_capsule_set);
+/**
+ * Fills in any parts of the \a rc_area of a \a pill that haven't been filled in
+ * yet.
+
+ * \a rc_area is an array of REQ_MAX_FIELD_NR elements, used to store sizes of
+ * variable-sized fields. The field sizes come from the declared \a rmf_size
+ * field of a \a pill's \a rc_fmt's RMF's.
+ */
int req_capsule_filled_sizes(struct req_capsule *pill,
enum req_location loc)
{
pill->rc_area[loc][i] =
fmt->rf_fields[loc].d[i]->rmf_size;
if (pill->rc_area[loc][i] == -1) {
- /* skip the following fields */
+ /*
+ * Skip the following fields.
+ *
+ * If this LASSERT() trips then you're missing a
+ * call to req_capsule_set_size().
+ */
LASSERT(loc != RCL_SERVER);
break;
}
}
EXPORT_SYMBOL(req_capsule_filled_sizes);
+/**
+ * Capsule equivalent of lustre_pack_request() and lustre_pack_reply().
+ *
+ * This function uses the \a pill's \a rc_area as filled in by
+ * req_capsule_set_size() or req_capsule_filled_sizes() (the latter is called by
+ * this function).
+ */
int req_capsule_server_pack(struct req_capsule *pill)
{
const struct req_format *fmt;
pill->rc_area[RCL_SERVER], NULL);
if (rc != 0) {
DEBUG_REQ(D_ERROR, pill->rc_req,
- "Cannot pack %d fields in format `%s': ",
- count, fmt->rf_name);
+ "Cannot pack %d fields in format `%s': ",
+ count, fmt->rf_name);
}
return rc;
}
EXPORT_SYMBOL(req_capsule_server_pack);
+/**
+ * Returns the PTLRPC request or reply (\a loc) buffer offset of a \a pill
+ * corresponding to the given RMF (\a field).
+ */
static int __req_capsule_offset(const struct req_capsule *pill,
const struct req_msg_field *field,
enum req_location loc)
return offset;
}
+/**
+ * Helper for __req_capsule_get(); swabs value / array of values and/or dumps
+ * them if desired.
+ */
+static
+void
+swabber_dumper_helper(struct req_capsule *pill,
+ const struct req_msg_field *field,
+ enum req_location loc,
+ int offset,
+ void *value, int len, int dump, void (*swabber)( void *))
+{
+ void *p;
+ int i;
+ int n;
+ int do_swab;
+ int inout = loc == RCL_CLIENT;
+
+ swabber = swabber ?: field->rmf_swabber;
+
+ if (ptlrpc_buf_need_swab(pill->rc_req, inout, offset) &&
+ swabber != NULL && value != NULL)
+ do_swab = 1;
+ else
+ do_swab = 0;
+
+ if (!(field->rmf_flags & RMF_F_STRUCT_ARRAY)) {
+ if (dump && field->rmf_dumper) {
+ CDEBUG(D_RPCTRACE, "Dump of %sfield %s follows\n",
+ do_swab ? "unswabbed " : "", field->rmf_name);
+ field->rmf_dumper(value);
+ }
+ if (!do_swab)
+ return;
+ swabber(value);
+ ptlrpc_buf_set_swabbed(pill->rc_req, inout, offset);
+ if (dump) {
+ CDEBUG(D_RPCTRACE, "Dump of swabbed field %s "
+ "follows\n", field->rmf_name);
+ field->rmf_dumper(value);
+ }
+
+ return;
+ }
+
+ /*
+ * We're swabbing an array; swabber() swabs a single array element, so
+ * swab every element.
+ */
+ LASSERT((len % field->rmf_size) == 0);
+ for (p = value, i = 0, n = len / field->rmf_size;
+ i < n;
+ i++, p += field->rmf_size) {
+ if (dump && field->rmf_dumper) {
+ CDEBUG(D_RPCTRACE, "Dump of %sarray field %s, "
+ "element %d follows\n",
+ do_swab ? "unswabbed " : "", field->rmf_name, i);
+ field->rmf_dumper(p);
+ }
+ if (!do_swab)
+ continue;
+ swabber(p);
+ if (dump && field->rmf_dumper) {
+ CDEBUG(D_RPCTRACE, "Dump of swabbed array field %s, "
+ "element %d follows\n", field->rmf_name, i);
+ field->rmf_dumper(value);
+ }
+ }
+ if (do_swab)
+ ptlrpc_buf_set_swabbed(pill->rc_req, inout, offset);
+}
+
+/**
+ * Returns the pointer to a PTLRPC request or reply (\a loc) buffer of a \a pill
+ * corresponding to the given RMF (\a field).
+ *
+ * The buffer will be swabbed using the given \a swabber. If \a swabber == NULL
+ * then the \a rmf_swabber from the RMF will be used. Soon there will be no
+ * calls to __req_capsule_get() with a non-NULL \a swabber; \a swabber will then
+ * be removed. Fields with the \a RMF_F_STRUCT_ARRAY flag set will have each
+ * element of the array swabbed.
+ */
static void *__req_capsule_get(struct req_capsule *pill,
const struct req_msg_field *field,
enum req_location loc,
- void (*swabber)( void *))
+ void (*swabber)( void *),
+ int dump)
{
const struct req_format *fmt;
struct lustre_msg *msg;
void *value;
int len;
int offset;
- int inout = loc == RCL_CLIENT;
void *(*getter)(struct lustre_msg *m, int n, int minlen);
getter = (field->rmf_flags & RMF_F_STRING) ?
(typeof(getter))lustre_msg_string : lustre_msg_buf;
- if (pill->rc_area[loc][offset] != -1)
+ if (field->rmf_flags & RMF_F_STRUCT_ARRAY) {
+ /*
+ * We've already asserted that field->rmf_size > 0 in
+ * req_layout_init().
+ */
+ len = lustre_msg_buflen(msg, offset);
+ if ((len % field->rmf_size) != 0) {
+ CERROR("%s: array field size mismatch "
+ "%d modulo %d != 0 (%d)\n",
+ field->rmf_name, len, field->rmf_size, loc);
+ return NULL;
+ }
+ } else if (pill->rc_area[loc][offset] != -1) {
len = pill->rc_area[loc][offset];
- else
+ } else {
len = max(field->rmf_size, 0);
+ }
value = getter(msg, offset, len);
- swabber = swabber ?: field->rmf_swabber;
- if (ptlrpc_buf_need_swab(pill->rc_req, inout, offset) &&
- swabber != NULL && value != NULL) {
- swabber(value);
- ptlrpc_buf_set_swabbed(pill->rc_req, inout, offset);
- }
if (value == NULL) {
DEBUG_REQ(D_ERROR, pill->rc_req,
"Wrong buffer for field `%s' (%d of %d) "
"in format `%s': %d vs. %d (%s)\n",
- field->rmf_name, offset, lustre_msg_bufcount(msg), fmt->rf_name,
- lustre_msg_buflen(msg, offset), len,
+ field->rmf_name, offset, lustre_msg_bufcount(msg),
+ fmt->rf_name, lustre_msg_buflen(msg, offset), len,
rcl_names[loc]);
+ } else {
+ swabber_dumper_helper(pill, field, loc, offset, value, len,
+ dump, swabber);
}
return value;
}
+/**
+ * Dump a request and/or reply
+ */
+void __req_capsule_dump(struct req_capsule *pill, enum req_location loc)
+{
+ const struct req_format *fmt;
+ const struct req_msg_field *field;
+ int len;
+ int i;
+
+ fmt = pill->rc_fmt;
+
+ DEBUG_REQ(D_RPCTRACE, pill->rc_req, "BEGIN REQ CAPSULE DUMP\n");
+ for (i = 0; i < fmt->rf_fields[loc].nr; ++i) {
+ field = FMT_FIELD(fmt, loc, i);
+ if (field->rmf_dumper == NULL) {
+ /*
+ * FIXME Add a default hex dumper for fields that don't
+ * have a specific dumper
+ */
+ len = req_capsule_get_size(pill, field, loc);
+ CDEBUG(D_RPCTRACE, "Field %s has no dumper function;"
+ "field size is %d\n", field->rmf_name, len);
+ } else {
+ /* It's the dumping side-effect that we're interested in */
+ (void) __req_capsule_get(pill, field, loc, NULL, 1);
+ }
+ }
+ CDEBUG(D_RPCTRACE, "END REQ CAPSULE DUMP\n");
+}
+
+/**
+ * Dump a request.
+ */
+void req_capsule_client_dump(struct req_capsule *pill)
+{
+ __req_capsule_dump(pill, RCL_CLIENT);
+}
+EXPORT_SYMBOL(req_capsule_client_dump);
+
+/**
+ * Dump a reply
+ */
+void req_capsule_server_dump(struct req_capsule *pill)
+{
+ __req_capsule_dump(pill, RCL_SERVER);
+}
+EXPORT_SYMBOL(req_capsule_server_dump);
+
+/**
+ * Trivial wrapper around __req_capsule_get(), that returns the PTLRPC request
+ * buffer corresponding to the given RMF (\a field) of a \a pill.
+ */
void *req_capsule_client_get(struct req_capsule *pill,
const struct req_msg_field *field)
{
- return __req_capsule_get(pill, field, RCL_CLIENT, NULL);
+ return __req_capsule_get(pill, field, RCL_CLIENT, NULL, 0);
}
EXPORT_SYMBOL(req_capsule_client_get);
+/**
+ * Same as req_capsule_client_get(), but with a \a swabber argument.
+ *
+ * Currently unused; will be removed when req_capsule_server_swab_get() is
+ * unused too.
+ */
void *req_capsule_client_swab_get(struct req_capsule *pill,
const struct req_msg_field *field,
void (*swabber)(void* ))
{
- return __req_capsule_get(pill, field, RCL_CLIENT, swabber);
+ return __req_capsule_get(pill, field, RCL_CLIENT, swabber, 0);
}
EXPORT_SYMBOL(req_capsule_client_swab_get);
+/**
+ * Utility that combines req_capsule_set_size() and req_capsule_client_get().
+ *
+ * First the \a pill's request \a field's size is set (\a rc_area) using
+ * req_capsule_set_size() with the given \a len. Then the actual buffer is
+ * returned.
+ */
void *req_capsule_client_sized_get(struct req_capsule *pill,
const struct req_msg_field *field,
int len)
{
req_capsule_set_size(pill, field, RCL_CLIENT, len);
- return __req_capsule_get(pill, field, RCL_CLIENT, NULL);
+ return __req_capsule_get(pill, field, RCL_CLIENT, NULL, 0);
}
EXPORT_SYMBOL(req_capsule_client_sized_get);
+/**
+ * Trivial wrapper around __req_capsule_get(), that returns the PTLRPC reply
+ * buffer corresponding to the given RMF (\a field) of a \a pill.
+ */
void *req_capsule_server_get(struct req_capsule *pill,
const struct req_msg_field *field)
{
- return __req_capsule_get(pill, field, RCL_SERVER, NULL);
+ return __req_capsule_get(pill, field, RCL_SERVER, NULL, 0);
}
EXPORT_SYMBOL(req_capsule_server_get);
+/**
+ * Same as req_capsule_server_get(), but with a \a swabber argument.
+ *
+ * Ideally all swabbing should be done pursuant to RMF definitions, with no
+ * swabbing done outside this capsule abstraction.
+ */
void *req_capsule_server_swab_get(struct req_capsule *pill,
const struct req_msg_field *field,
void *swabber)
{
- return __req_capsule_get(pill, field, RCL_SERVER, swabber);
+ return __req_capsule_get(pill, field, RCL_SERVER, swabber, 0);
}
EXPORT_SYMBOL(req_capsule_server_swab_get);
-
+/**
+ * Utility that combines req_capsule_set_size() and req_capsule_server_get().
+ *
+ * First the \a pill's request \a field's size is set (\a rc_area) using
+ * req_capsule_set_size() with the given \a len. Then the actual buffer is
+ * returned.
+ */
void *req_capsule_server_sized_get(struct req_capsule *pill,
const struct req_msg_field *field,
int len)
{
req_capsule_set_size(pill, field, RCL_SERVER, len);
- return __req_capsule_get(pill, field, RCL_SERVER, NULL);
+ return __req_capsule_get(pill, field, RCL_SERVER, NULL, 0);
}
EXPORT_SYMBOL(req_capsule_server_sized_get);
+/**
+ * Returns the buffer of a \a pill corresponding to the given \a field from the
+ * request (if the caller is executing on the server-side) or reply (if the
+ * caller is executing on the client-side).
+ *
+ * This function convienient for use is code that could be executed on the
+ * client and server alike.
+ */
const void *req_capsule_other_get(struct req_capsule *pill,
const struct req_msg_field *field)
{
- return __req_capsule_get(pill, field, pill->rc_loc ^ 1, NULL);
+ return __req_capsule_get(pill, field, pill->rc_loc ^ 1, NULL, 0);
}
EXPORT_SYMBOL(req_capsule_other_get);
+/**
+ * Set the size of the PTLRPC request/reply (\a loc) buffer for the given \a
+ * field of the given \a pill.
+ *
+ * This function must be used when constructing variable sized fields of a
+ * request or reply.
+ */
void req_capsule_set_size(struct req_capsule *pill,
const struct req_msg_field *field,
enum req_location loc, int size)
(field->rmf_size != -1) &&
!(field->rmf_flags & RMF_F_NO_SIZE_CHECK) &&
(size > 0)) {
- CERROR("%s: field size mismatch %d != %d (%d)\n",
- field->rmf_name, size, field->rmf_size, loc);
- LBUG();
+ if ((field->rmf_flags & RMF_F_STRUCT_ARRAY) &&
+ (size % field->rmf_size != 0)) {
+ CERROR("%s: array field size mismatch "
+ "%d %% %d != 0 (%d)\n",
+ field->rmf_name, size, field->rmf_size, loc);
+ LBUG();
+ } else if (!(field->rmf_flags & RMF_F_STRUCT_ARRAY) &&
+ size < field->rmf_size) {
+ CERROR("%s: field size mismatch %d != %d (%d)\n",
+ field->rmf_name, size, field->rmf_size, loc);
+ LBUG();
+ }
}
pill->rc_area[loc][__req_capsule_offset(pill, field, loc)] = size;
}
EXPORT_SYMBOL(req_capsule_set_size);
-/* NB: this function doesn't correspond with req_capsule_set_size(), which
+/**
+ * Return the actual PTLRPC buffer length of a request or reply (\a loc)
+ * for the given \a pill's given \a field.
+ *
+ * NB: this function doesn't correspond with req_capsule_set_size(), which
* actually sets the size in pill.rc_area[loc][offset], but this function
* returns the message buflen[offset], maybe we should use another name.
*/
}
EXPORT_SYMBOL(req_capsule_get_size);
+/**
+ * Wrapper around lustre_msg_size() that returns the PTLRPC size needed for the
+ * given \a pill's request or reply (\a loc) given the field size recorded in
+ * the \a pill's rc_area.
+ *
+ * See also req_capsule_set_size().
+ */
int req_capsule_msg_size(struct req_capsule *pill, enum req_location loc)
{
return lustre_msg_size(pill->rc_req->rq_import->imp_msg_magic,
pill->rc_area[loc]);
}
+/**
+ * While req_capsule_msg_size() computes the size of a PTLRPC request or reply
+ * (\a loc) given a \a pill's \a rc_area, this function computes the size of a
+ * PTLRPC request or reply given only an RQF (\a fmt).
+ *
+ * This function should not be used for formats which contain variable size
+ * fields.
+ */
int req_capsule_fmt_size(__u32 magic, const struct req_format *fmt,
enum req_location loc)
{
int size, i = 0;
+ /*
+ * This function should probably LASSERT() that fmt has no fields with
+ * RMF_F_STRUCT_ARRAY in rmf_flags, since we can't know here how many
+ * elements in the array there will ultimately be, but then, we could
+ * assume that there will be at least one element, and that's just what
+ * we do.
+ */
size = lustre_msg_hdr_size(magic, fmt->rf_fields[loc].nr);
if (size < 0)
return size;
return size;
}
-#define FMT_FIELD(fmt, i, j) (fmt)->rf_fields[(i)].d[(j)]
-
+/**
+ * Changes the format of an RPC.
+ *
+ * The pill must already have been initialized, which means that it already has
+ * a request format. The new format \a fmt must be an extension of the pill's
+ * old format. Specifically: the new format must have as many request and reply
+ * fields as the old one, and all fields shared by the old and new format must
+ * be at least as large in the new format.
+ *
+ * The new format's fields may be of different "type" than the old format, but
+ * only for fields that are "opaque" blobs: fields which have a) have no
+ * \a rmf_swabber, b) \a rmf_flags == 0 or RMF_F_NO_SIZE_CHECK, and c) \a
+ * rmf_size == -1 or \a rmf_flags == RMF_F_NO_SIZE_CHECK. For example,
+ * OBD_SET_INFO has a key field and an opaque value field that gets interpreted
+ * according to the key field. When the value, according to the key, contains a
+ * structure (or array thereof) to be swabbed, the format should be changed to
+ * one where the value field has \a rmf_size/rmf_flags/rmf_swabber set
+ * accordingly.
+ */
void req_capsule_extend(struct req_capsule *pill, const struct req_format *fmt)
{
int i;
for (i = 0; i < RCL_NR; ++i) {
LASSERT(fmt->rf_fields[i].nr >= old->rf_fields[i].nr);
for (j = 0; j < old->rf_fields[i].nr - 1; ++j) {
+ const struct req_msg_field *ofield = FMT_FIELD(old, i, j);
+
+ /* "opaque" fields can be transmogrified */
+ if (ofield->rmf_swabber == NULL &&
+ (ofield->rmf_flags & ~RMF_F_NO_SIZE_CHECK) == 0 &&
+ (ofield->rmf_size == -1 ||
+ ofield->rmf_flags == RMF_F_NO_SIZE_CHECK))
+ continue;
LASSERT(FMT_FIELD(fmt, i, j) == FMT_FIELD(old, i, j));
}
/*
}
EXPORT_SYMBOL(req_capsule_extend);
+/**
+ * This function returns a non-zero value if the given \a field is present in
+ * the format (\a rc_fmt) of \a pill's PTLRPC request or reply (\a loc), else it
+ * returns 0.
+ */
int req_capsule_has_field(const struct req_capsule *pill,
const struct req_msg_field *field,
enum req_location loc)
}
EXPORT_SYMBOL(req_capsule_has_field);
+/**
+ * Returns a non-zero value if the given \a field is present in the given \a
+ * pill's PTLRPC request or reply (\a loc), else it returns 0.
+ */
int req_capsule_field_present(const struct req_capsule *pill,
const struct req_msg_field *field,
enum req_location loc)
}
EXPORT_SYMBOL(req_capsule_field_present);
+/**
+ * This function shrinks the size of the _buffer_ of the \a pill's PTLRPC
+ * request or reply (\a loc).
+ *
+ * This is not the opposite of req_capsule_extend().
+ */
void req_capsule_shrink(struct req_capsule *pill,
const struct req_msg_field *field,
unsigned int newlen,
return ptr;
}
-void *lustre_swab_reqbuf(struct ptlrpc_request *req, int index, int min_size,
- void *swabber)
-{
- if (!ptlrpc_buf_need_swab(req, 1, index))
- return lustre_msg_buf(req->rq_reqmsg, index, min_size);
-
- lustre_set_req_swabbed(req, index);
- return __lustre_swab_buf(req->rq_reqmsg, index, min_size, swabber);
-}
-
-void *lustre_swab_repbuf(struct ptlrpc_request *req, int index,
- int min_size, void *swabber)
-{
- if (!ptlrpc_buf_need_swab(req, 0, index))
- return lustre_msg_buf(req->rq_repmsg, index, min_size);
-
- lustre_set_rep_swabbed(req, index);
- return __lustre_swab_buf(req->rq_repmsg, index, min_size, swabber);
-}
-
static inline struct ptlrpc_body *lustre_msg_ptlrpc_body(struct lustre_msg *msg)
{
return lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF,
__swab64s (&d->padding);
}
+/* Dump functions */
+void dump_ioo(struct obd_ioobj *ioo)
+{
+ CDEBUG(D_RPCTRACE,
+ "obd_ioobj: ioo_id="LPD64", ioo_gr="LPD64", ioo_type=%d, "
+ "ioo_bufct=%d\n", ioo->ioo_id, ioo->ioo_gr, ioo->ioo_type,
+ ioo->ioo_bufcnt);
+}
+
+void dump_rniobuf(struct niobuf_remote *nb)
+{
+ CDEBUG(D_RPCTRACE, "niobuf_remote: offset="LPU64", len=%d, flags=%x\n",
+ nb->offset, nb->len, nb->flags);
+}
+
+void dump_obdo(struct obdo *oa)
+{
+ __u32 valid = oa->o_valid;
+
+ CDEBUG(D_RPCTRACE, "obdo: o_valid = %08x\n", valid);
+ if (valid & OBD_MD_FLID)
+ CDEBUG(D_RPCTRACE, "obdo: o_id = "LPD64"\n", oa->o_id);
+ if (valid & OBD_MD_FLGROUP)
+ CDEBUG(D_RPCTRACE, "obdo: o_gr = "LPD64"\n", oa->o_gr);
+ if (valid & OBD_MD_FLFID)
+ CDEBUG(D_RPCTRACE, "obdo: o_fid = "LPD64"\n", oa->o_fid);
+ if (valid & OBD_MD_FLSIZE)
+ CDEBUG(D_RPCTRACE, "obdo: o_size = "LPD64"\n", oa->o_size);
+ if (valid & OBD_MD_FLMTIME)
+ CDEBUG(D_RPCTRACE, "obdo: o_mtime = "LPD64"\n", oa->o_mtime);
+ if (valid & OBD_MD_FLATIME)
+ CDEBUG(D_RPCTRACE, "obdo: o_atime = "LPD64"\n", oa->o_atime);
+ if (valid & OBD_MD_FLCTIME)
+ CDEBUG(D_RPCTRACE, "obdo: o_ctime = "LPD64"\n", oa->o_ctime);
+ if (valid & OBD_MD_FLBLOCKS) /* allocation of space */
+ CDEBUG(D_RPCTRACE, "obdo: o_blocks = "LPD64"\n", oa->o_blocks);
+ if (valid & OBD_MD_FLGRANT)
+ CDEBUG(D_RPCTRACE, "obdo: o_grant = "LPD64"\n", oa->o_grant);
+ if (valid & OBD_MD_FLBLKSZ)
+ CDEBUG(D_RPCTRACE, "obdo: o_blksize = %d\n", oa->o_blksize);
+ if (valid & (OBD_MD_FLTYPE | OBD_MD_FLMODE))
+ CDEBUG(D_RPCTRACE, "obdo: o_mode = %o\n",
+ oa->o_mode & ((valid & OBD_MD_FLTYPE ? S_IFMT : 0) |
+ (valid & OBD_MD_FLMODE ? ~S_IFMT : 0)));
+ if (valid & OBD_MD_FLUID)
+ CDEBUG(D_RPCTRACE, "obdo: o_uid = %u\n", oa->o_uid);
+ if (valid & OBD_MD_FLGID)
+ CDEBUG(D_RPCTRACE, "obdo: o_gid = %u\n", oa->o_gid);
+ if (valid & OBD_MD_FLFLAGS)
+ CDEBUG(D_RPCTRACE, "obdo: o_flags = %x\n", oa->o_flags);
+ if (valid & OBD_MD_FLNLINK)
+ CDEBUG(D_RPCTRACE, "obdo: o_nlink = %u\n", oa->o_nlink);
+ else if (valid & OBD_MD_FLCKSUM)
+ CDEBUG(D_RPCTRACE, "obdo: o_checksum (o_nlink) = %u\n", oa->o_nlink);
+ if (valid & OBD_MD_FLGENER)
+ CDEBUG(D_RPCTRACE, "obdo: o_generation = %u\n",
+ oa->o_generation);
+ if (valid & OBD_MD_FLEASIZE)
+ CDEBUG(D_RPCTRACE, "obdo: o_easize = %u\n", oa->o_easize);
+ else if (valid & OBD_MD_FLEPOCH)
+ CDEBUG(D_RPCTRACE, "obdo: o_epoc (o_easize) = %u\n", oa->o_easize);
+ if (valid & OBD_MD_FLID)
+ CDEBUG(D_RPCTRACE, "obdo: o_stripe_idx = %u\n", oa->o_stripe_idx);
+ if (valid & OBD_MD_FLHANDLE)
+ CDEBUG(D_RPCTRACE, "obdo: o_handle = "LPD64"\n", oa->o_handle.cookie);
+ if (valid & OBD_MD_FLCOOKIE)
+ CDEBUG(D_RPCTRACE, "obdo: o_lcookie = "
+ "(llog_cookie dumping not yet implemented)\n");
+}
+
+void dump_ost_body(struct ost_body *ob)
+{
+ dump_obdo(&ob->oa);
+}
+
+void dump_rcs(__u32 *rc)
+{
+ CDEBUG(D_RPCTRACE, "rmf_rcs: %d\n", *rc);
+}
+
#ifdef __KERNEL__
/**
* got qdata from request(req/rep)
*/
-struct qunit_data *quota_get_qdata(void *request, int is_req, int is_exp)
+struct qunit_data *quota_get_qdata(void *r, int is_req, int is_exp)
{
- struct ptlrpc_request *req = (struct ptlrpc_request *)request;
+ struct ptlrpc_request *req = (struct ptlrpc_request *)r;
struct qunit_data *qdata;
__u64 flags = is_exp ? req->rq_export->exp_connect_flags :
req->rq_import->imp_connect_data.ocd_connect_flags;
LASSERT(flags & OBD_CONNECT_CHANGE_QS);
if (is_req == QUOTA_REQUEST)
- qdata = lustre_swab_reqbuf(req, REQ_REC_OFF,
- sizeof(struct qunit_data),
- lustre_swab_qdata);
+ qdata = req_capsule_client_get(&req->rq_pill, &RMF_QUNIT_DATA);
else
- qdata = lustre_swab_repbuf(req, REPLY_REC_OFF,
- sizeof(struct qunit_data),
- lustre_swab_qdata);
+ qdata = req_capsule_server_get(&req->rq_pill, &RMF_QUNIT_DATA);
if (qdata == NULL)
return ERR_PTR(-EPROTO);
/**
* copy qdata to request(req/rep)
*/
-int quota_copy_qdata(void *request, struct qunit_data *qdata,
- int is_req, int is_exp)
+int quota_copy_qdata(void *r, struct qunit_data *qdata, int is_req,
+ int is_exp)
{
- struct ptlrpc_request *req = (struct ptlrpc_request *)request;
+ struct ptlrpc_request *req = (struct ptlrpc_request *)r;
void *target;
__u64 flags = is_exp ? req->rq_export->exp_connect_flags :
req->rq_import->imp_connect_data.ocd_connect_flags;
LASSERT(flags & OBD_CONNECT_CHANGE_QS);
if (is_req == QUOTA_REQUEST)
- target = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF,
- sizeof(struct qunit_data));
+ target = req_capsule_client_get(&req->rq_pill, &RMF_QUNIT_DATA);
else
- target = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
- sizeof(struct qunit_data));
+ target = req_capsule_server_get(&req->rq_pill, &RMF_QUNIT_DATA);
if (target == NULL)
return -EPROTO;
+ LASSERT(target != qdata);
memcpy(target, qdata, sizeof(*qdata));
return 0;
}
EXPORT_SYMBOL(ptlrpc_buf_set_swabbed);
EXPORT_SYMBOL(ptlrpc_buf_need_swab);
EXPORT_SYMBOL(lustre_swab_ptlrpc_body);
-EXPORT_SYMBOL(lustre_swab_reqbuf);
-EXPORT_SYMBOL(lustre_swab_repbuf);
EXPORT_SYMBOL(lustre_swab_obdo);
EXPORT_SYMBOL(lustre_swab_obd_statfs);
EXPORT_SYMBOL(lustre_swab_obd_ioobj);
EXPORT_SYMBOL(lustre_swab_ldlm_lock_desc);
EXPORT_SYMBOL(lustre_swab_ldlm_request);
EXPORT_SYMBOL(lustre_swab_ldlm_reply);
+EXPORT_SYMBOL(dump_ioo);
+EXPORT_SYMBOL(dump_rniobuf);
+EXPORT_SYMBOL(dump_obdo);
+EXPORT_SYMBOL(dump_ost_body);
+EXPORT_SYMBOL(dump_rcs);
EXPORT_SYMBOL(lustre_swab_qdata);
EXPORT_SYMBOL(lustre_swab_quota_adjust_qunit);
EXPORT_SYMBOL(lustre_msg_get_flags);
#define lustre_swab_ost_body NULL
#define lustre_swab_ost_last_id NULL
#define lustre_swab_fiemap NULL
+#define lustre_swab_qdata NULL
+#define lustre_swab_ost_lvb NULL
+#define dump_rniobuf NULL
+#define dump_ioo NULL
+#define dump_obdo NULL
+#define dump_ost_body NULL
+#define dump_rcs NULL
/*
* Yes, include .c file.