From: nikita Date: Wed, 24 May 2006 22:21:27 +0000 (+0000) Subject: mdt: new portion of cleanup. Mostly switch to req-layout. X-Git-Tag: v1_8_0_110~486^2~1770 X-Git-Url: https://git.whamcloud.com/?a=commitdiff_plain;h=72145fcd3121288c85f2a3a3d3db075be27347a7;p=fs%2Flustre-release.git mdt: new portion of cleanup. Mostly switch to req-layout. Tested by: mount && cd /mnt/lustre && mkdir foo && ls -ldi foo --- diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index 7f591fe..e1266d6 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -177,9 +177,16 @@ static inline __u64 fid_num(const struct lu_fid *fid) return f_ver | fid_oid(fid); } +static inline int fid_seq_is_sane(__u64 seq) +{ + return seq != 0; +} + static inline int fid_is_sane(const struct lu_fid *fid) { - return fid != NULL && fid_seq(fid) != 0 && fid_oid(fid) != 0; + return + fid != NULL && + fid_seq_is_sane(fid_seq(fid)) && fid_oid(fid) != 0; } #define DFID3 "["LPU64"/%u:%u]" diff --git a/lustre/include/lustre_dlm.h b/lustre/include/lustre_dlm.h index 4b74c90..f68fca4 100644 --- a/lustre/include/lustre_dlm.h +++ b/lustre/include/lustre_dlm.h @@ -570,13 +570,13 @@ int ldlm_cli_enqueue(struct obd_export *exp, void *lvb_swabber, struct lustre_handle *lockh); int ldlm_handle_enqueue0(struct ldlm_namespace *ns, struct ptlrpc_request *req, - struct ldlm_request *dlm_req, - struct ldlm_callback_suite *cbs); + const struct ldlm_request *dlm_req, + const struct ldlm_callback_suite *cbs); int ldlm_server_ast(struct lustre_handle *lockh, struct ldlm_lock_desc *new, void *data, __u32 data_len); int ldlm_cli_convert(struct lustre_handle *, int new_mode, int *flags); int ldlm_handle_convert0(struct ptlrpc_request *req, - struct ldlm_request *dlm_req); + const struct ldlm_request *dlm_req); int ldlm_cli_cancel(struct lustre_handle *lockh); int ldlm_cli_cancel_unused(struct ldlm_namespace *, struct ldlm_res_id *, int flags, void *opaque); diff --git a/lustre/include/lustre_req_layout.h b/lustre/include/lustre_req_layout.h index 89660f4..8774a06 100644 --- a/lustre/include/lustre_req_layout.h +++ b/lustre/include/lustre_req_layout.h @@ -46,27 +46,30 @@ enum req_location { struct req_capsule { struct ptlrpc_request *rc_req; - const struct req_format *rc_fmt[RCL_NR]; + const struct req_format *rc_fmt; __u32 rc_swabbed; enum req_location rc_loc; + int *rc_area; }; -void req_capsule_init(struct req_capsule *pill, - struct ptlrpc_request *req, - enum req_location location); +void req_capsule_init(struct req_capsule *pill, struct ptlrpc_request *req, + enum req_location location, int *area); void req_capsule_fini(struct req_capsule *pill); -void req_capsule_client_init(struct req_capsule *pill, - const struct req_format *fmt); -void req_capsule_server_init(struct req_capsule *pill, - const struct req_format *fmt); -int req_capsule_start(struct req_capsule *pill, - const struct req_format *fmt, int *area); +void req_capsule_set(struct req_capsule *pill, const struct req_format *fmt); +int req_capsule_pack(struct req_capsule *pill); -const void *req_capsule_client_get(const struct req_capsule *pill, - const struct req_msg_field *field); +void *req_capsule_client_get(const struct req_capsule *pill, + const struct req_msg_field *field); void *req_capsule_server_get(const struct req_capsule *pill, const struct req_msg_field *field); +const void *req_capsule_other_get(const struct req_capsule *pill, + const struct req_msg_field *field); + +void req_capsule_set_size(const struct req_capsule *pill, + const struct req_msg_field *field, + enum req_location loc, int size); +void req_capsule_extend(struct req_capsule *pill, const struct req_format *fmt); int req_layout_init(void); void req_layout_fini(void); @@ -74,13 +77,32 @@ void req_layout_fini(void); extern const struct req_format RQF_MDS_GETSTATUS; extern const struct req_format RQF_MDS_STATFS; extern const struct req_format RQF_MDS_GETATTR; +extern const struct req_format RQF_MDS_CONNECT; +extern const struct req_format RQF_MDS_DISCONNECT; + +/* + * This is format of direct (non-intent) MDS_GETATTR_NAME request. + */ extern const struct req_format RQF_MDS_GETATTR_NAME; extern const struct req_format RQF_MDS_REINT_CREATE; +extern const struct req_format RQF_LDLM_ENQUEUE; +extern const struct req_format RQF_LDLM_INTENT; +extern const struct req_format RQF_LDLM_INTENT_GETATTR; extern const struct req_msg_field RMF_MDT_BODY; extern const struct req_msg_field RMF_OBD_STATFS; extern const struct req_msg_field RMF_NAME; extern const struct req_msg_field RMF_REC_CREATE; - +extern const struct req_msg_field RMF_TGTUUID; +extern const struct req_msg_field RMF_CLUUID; +/* + * connection handle received in MDS_CONNECT request. + */ +extern const struct req_msg_field RMF_CONN; +extern const struct req_msg_field RMF_CONNECT_DATA; +extern const struct req_msg_field RMF_DLM_REQ; +extern const struct req_msg_field RMF_DLM_REP; +extern const struct req_msg_field RMF_LDLM_INTENT; +extern const struct req_msg_field RMF_MDT_MD; #endif /* _LUSTRE_REQ_LAYOUT_H__ */ diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 0ec457e..274fe33 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -659,8 +659,8 @@ find_existing_lock(struct obd_export *exp, struct lustre_handle *remote_hdl) */ int ldlm_handle_enqueue0(struct ldlm_namespace *ns, struct ptlrpc_request *req, - struct ldlm_request *dlm_req, - struct ldlm_callback_suite *cbs) + const struct ldlm_request *dlm_req, + const struct ldlm_callback_suite *cbs) { struct ldlm_reply *dlm_rep; int rc = 0, size[2] = {sizeof(*dlm_rep)}; @@ -704,7 +704,7 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns, } #if 0 - /* FIXME this makes it impossible to use LDLM_PLAIN locks -- check + /* FIXME this makes it impossible to use LDLM_PLAIN locks -- check against server's _CONNECT_SUPPORTED flags? (I don't want to use ibits for mgc/mgs) */ @@ -921,7 +921,7 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req, } int ldlm_handle_convert0(struct ptlrpc_request *req, - struct ldlm_request *dlm_req) + const struct ldlm_request *dlm_req) { struct ldlm_reply *dlm_rep; struct ldlm_lock *lock; @@ -1182,7 +1182,7 @@ static void ldlm_handle_gl_callback(struct ptlrpc_request *req, l_unlock(&ns->ns_lock); if (lock->l_granted_mode == LCK_PW && !lock->l_readers && !lock->l_writers && - cfs_time_after(cfs_time_current(), + cfs_time_after(cfs_time_current(), cfs_time_add(lock->l_last_used, cfs_time_seconds(10)))) { if (ldlm_bl_to_thread(ns, NULL, lock)) ldlm_handle_bl_callback(ns, NULL, lock); @@ -1610,7 +1610,7 @@ static int ldlm_setup(void) spin_lock_init(&waiting_locks_spinlock); cfs_timer_init(&waiting_locks_timer, waiting_locks_callback, 0); - /* Using CLONE_FILES instead of CLONE_FS here causes failures in + /* Using CLONE_FILES instead of CLONE_FS here causes failures in conf-sanity test 21. But using CLONE_FS can cause problems if the daemonize happens between push/pop_ctxt... */ rc = cfs_kernel_thread(expired_lock_main, NULL, CLONE_VM | CLONE_FS); diff --git a/lustre/lmv/lmv_fld.c b/lustre/lmv/lmv_fld.c index 106dd44..99a5cab 100644 --- a/lustre/lmv/lmv_fld.c +++ b/lustre/lmv/lmv_fld.c @@ -49,6 +49,9 @@ int lmv_fld_lookup(struct obd_device *obd, struct lu_fid *fid) { int rc; ENTRY; + + LASSERT(fid_is_sane(fid)); + /* temporary hack until fld will works */ rc = (unsigned long)fid_seq(fid) / LUSTRE_SEQ_RANGE; CWARN("LMV: got MDS %d for sequence: "LPU64"\n", rc, fid_seq(fid)); diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index 90a8540..093c350 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -73,23 +73,18 @@ static int mdt_getstatus(struct mdt_thread_info *info, { struct md_device *next = info->mti_mdt->mdt_child; int result; + struct mdt_body *body; ENTRY; - result = req_capsule_start(&info->mti_pill, &RQF_MDS_GETSTATUS, - info->mti_rep_buf_size); - if (result) - ; - else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK)) + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK)) result = -ENOMEM; else { - info->mti_body = req_capsule_server_get(&info->mti_pill, - &RMF_MDT_BODY); + body = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY); result = next->md_ops->mdo_root_get(info->mti_ctxt, - next, - &info->mti_body->fid1); + next, &body->fid1); if (result == 0) - info->mti_body->valid |= OBD_MD_FLID; + body->valid |= OBD_MD_FLID; } /* the last_committed and last_xid fields are filled in for all @@ -103,29 +98,19 @@ static int mdt_statfs(struct mdt_thread_info *info, { struct md_device *next = info->mti_mdt->mdt_child; struct obd_statfs *osfs; - struct kstatfs *sfs; - int result; + int result; ENTRY; - info->mti_rep_buf_size[0] = sizeof(struct obd_statfs); - result = lustre_pack_reply(req, 1, info->mti_rep_buf_size, NULL); - if (result) - CERROR(LUSTRE_MDT0_NAME" out of memory for statfs: size=%d\n", - sizeof(struct obd_statfs)); - else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) { + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) { CERROR(LUSTRE_MDT0_NAME": statfs lustre_pack_reply failed\n"); result = -ENOMEM; } else { - osfs = lustre_msg_buf(req->rq_repmsg, 0, - sizeof(struct obd_statfs)); - OBD_ALLOC_PTR(sfs); - if(sfs == NULL) - RETURN(-ENOMEM); + osfs = req_capsule_server_get(&info->mti_pill, &RMF_OBD_STATFS); /* XXX max_age optimisation is needed here. See mds_statfs */ - result = next->md_ops->mdo_statfs(info->mti_ctxt, next, sfs); - statfs_pack(osfs, sfs); - OBD_FREE_PTR(sfs); + result = next->md_ops->mdo_statfs(info->mti_ctxt, + next, &info->mti_sfs); + statfs_pack(osfs, &info->mti_sfs); } RETURN(result); @@ -157,19 +142,14 @@ static int mdt_getattr(struct mdt_thread_info *info, struct ptlrpc_request *req, int offset) { int result; + struct mdt_body *body; LASSERT(info->mti_object != NULL); LASSERT(lu_object_exists(info->mti_ctxt, &info->mti_object->mot_obj.mo_lu)); - ENTRY; - info->mti_rep_buf_size[0] = sizeof(struct mdt_body); - result = lustre_pack_reply(req, 1, info->mti_rep_buf_size, NULL); - if (result) - CERROR(LUSTRE_MDT0_NAME" cannot pack size=%d, rc=%d\n", - sizeof(struct mdt_body), result); - else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) { + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) { CERROR(LUSTRE_MDT0_NAME": statfs lustre_pack_reply failed\n"); result = -ENOMEM; } else { @@ -177,11 +157,11 @@ static int mdt_getattr(struct mdt_thread_info *info, result = mo_attr_get(info->mti_ctxt, next, &info->mti_attr); if (result == 0) { - info->mti_body = lustre_msg_buf(req->rq_repmsg, 0, - sizeof(struct mdt_body)); - mdt_pack_attr2body(info->mti_body, &info->mti_attr); - info->mti_body->fid1 = *mdt_object_fid(info->mti_object); - info->mti_body->valid |= OBD_MD_FLID; + body = req_capsule_server_get(&info->mti_pill, + &RMF_MDT_BODY); + mdt_pack_attr2body(body, &info->mti_attr); + body->fid1 = *mdt_object_fid(info->mti_object); + body->valid |= OBD_MD_FLID; } } RETURN(result); @@ -193,23 +173,17 @@ static int mdt_getattr_name(struct mdt_thread_info *info, struct md_object *next = mdt_object_child(info->mti_object); struct mdt_object *child; struct mdt_body *body; - char *name; - int namesize; + const char *name; int result; LASSERT(info->mti_object != NULL); ENTRY; - body = lustre_msg_buf(req->rq_repmsg, 1, sizeof *body); - LASSERT(body != NULL); - - name = lustre_msg_string(req->rq_reqmsg, offset, 0); - if (name == NULL) { - CERROR("Can't unpack name\n"); + body = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY); + name = req_capsule_client_get(&info->mti_pill, &RMF_NAME); + if (name == NULL) RETURN(-EFAULT); - } - namesize = lustre_msg_buflen(req->rq_reqmsg, offset); result = mdo_lookup(info->mti_ctxt, next, name, &body->fid1); if (result == 0) { @@ -303,11 +277,11 @@ static int mdt_reint_internal(struct mdt_thread_info *info, CERROR("invalid record\n"); RETURN(rc = -EINVAL); } - + /* XXX: this should be cleanup up */ rep_off = offset == MDS_REQ_INTENT_REC_OFF ? 1 : 0; - - /* init body for handlers by rep's body, they may want to + + /* init body for handlers by rep's body, they may want to * store there something. */ info->mti_body = lustre_msg_buf(req->rq_repmsg, rep_off, sizeof(struct mdt_body)); @@ -351,7 +325,7 @@ static int mdt_reint(struct mdt_thread_info *info, info->mti_rep_buf_size, NULL); if (rc) RETURN(rc); - + /* init body by rep body, needed by handlers to return data to client */ info->mti_body = lustre_msg_buf(req->rq_repmsg, 0, sizeof(struct mdt_body)); @@ -512,7 +486,7 @@ static struct mdt_object *mdt_obj(struct lu_object *o) struct mdt_object *mdt_object_find(const struct lu_context *ctxt, struct mdt_device *d, - struct lu_fid *f) + const struct lu_fid *f) { struct lu_object *o; @@ -584,23 +558,29 @@ struct mdt_handler { __u32 mh_flags; int (*mh_act)(struct mdt_thread_info *info, struct ptlrpc_request *req, int offset); + + const struct req_format *mh_fmt; }; enum mdt_handler_flags { /* - * struct mdt_body is passed in the 0-th incoming buffer. + * struct mdt_body is passed in the incoming message. */ HABEO_CORPUS = (1 << 0), /* - * struct ldlm_request is passed in MDS_REQ_INTENT_LOCKREQ_OFF-th - * incoming buffer. + * struct ldlm_request is passed in the incoming message. */ HABEO_CLAVIS = (1 << 1), /* - * object, identified by fid passed in mdt_body already exists (on - * disk).. + * object, identified by fid passed in mdt_body, already exists (on + * disk). */ - HABEO_DISCUS = HABEO_CORPUS | (1 << 2) + HABEO_DISCUS = HABEO_CORPUS | (1 << 2), + /* + * this request has fixed reply format, so that reply message can be + * packed by generic code. + */ + HABEO_REFERO = (1 << 3) }; struct mdt_opc_slice { @@ -649,6 +629,58 @@ static int mdt_lock_reply_compat(struct mdt_device *m, struct ldlm_reply *rep) } /* + * Generic code handling requests that have struct mdt_body passed in: + * + * - extract mdt_body from request and save it in @info; + * + * - create lu_object, corresponding to the fid in mdt_body, and save it in + * @info; + * + * - if HABEO_DISCUS flag is set for this request type in addition to + * HABEO_CORPUS, check whether object actually exists on storage + * (lu_object_exists()). + * + */ +static int mdt_habeo_corpus(struct mdt_thread_info *info, __u32 flags) +{ + const struct mdt_body *body; + struct mdt_object *obj; + const struct lu_context *ctx; + + int result; + + ctx = info->mti_ctxt; + + body = info->mti_body = req_capsule_client_get(&info->mti_pill, + &RMF_MDT_BODY); + if (body != NULL) { + if (fid_is_sane(&body->fid1)) { + obj = mdt_object_find(ctx, info->mti_mdt, &body->fid1); + + if (!IS_ERR(obj)) { + if ((flags & HABEO_DISCUS) == HABEO_DISCUS && + !lu_object_exists(ctx, + &obj->mot_obj.mo_lu)) { + mdt_object_put(ctx, obj); + result = -ENOENT; + } else { + info->mti_object = obj; + result = 0; + } + } else + result = PTR_ERR(obj); + } else { + CERROR("Invalid fid: "DFID3"\n", PFID3(&body->fid1)); + result = -EINVAL; + } + } else { + CERROR("Can't unpack body\n"); + result = -EFAULT; + } + return result; +} + +/* * Invoke handler for this request opc. Also do necessary preprocessing * (according to handler ->mh_flags), and post-processing (setting of * ->last_{xid,committed}). @@ -677,49 +709,38 @@ static int mdt_req_handle(struct mdt_thread_info *info, result = 0; flags = h->mh_flags; - req_capsule_init(&info->mti_pill, req, RCL_SERVER); - if (flags & HABEO_CORPUS) { - struct mdt_body *body; - const struct lu_context *ctx; - struct mdt_object *obj; - - ctx = info->mti_ctxt; - body = info->mti_body = - lustre_swab_reqbuf(req, off, sizeof *info->mti_body, - lustre_swab_mdt_body); - if (body != NULL) { - obj = mdt_object_find(ctx, info->mti_mdt, &body->fid1); - if (!IS_ERR(obj)) { - if ((flags & HABEO_DISCUS) == HABEO_DISCUS && - !lu_object_exists(ctx, - &obj->mot_obj.mo_lu)) { - mdt_object_put(ctx, obj); - result = -ENOENT; - } else - info->mti_object = obj; - } else - result = PTR_ERR(obj); - } else { - CERROR("Can't unpack body\n"); - result = -EFAULT; - } - } else if (flags & HABEO_CLAVIS) { + LASSERT(ergo(flags & (HABEO_CORPUS | HABEO_REFERO), h->mh_fmt != NULL)); + + req_capsule_init(&info->mti_pill, + req, RCL_SERVER, info->mti_rep_buf_size); + + if (h->mh_fmt != NULL) { + req_capsule_set(&info->mti_pill, h->mh_fmt); + if (flags & HABEO_REFERO) + result = req_capsule_pack(&info->mti_pill); + } + + if (result == 0 && flags & HABEO_CORPUS) + result = mdt_habeo_corpus(info, flags); + + if (result == 0 && flags & HABEO_CLAVIS) { struct ldlm_request *dlm; LASSERT(shift == 0); - dlm = info->mti_dlm_req = - lustre_swab_reqbuf(req, MDS_REQ_INTENT_LOCKREQ_OFF, - sizeof *dlm, - lustre_swab_ldlm_request); + LASSERT(h->mh_fmt != NULL); + + dlm = req_capsule_client_get(&info->mti_pill, &RMF_DLM_REQ); if (dlm != NULL) { if (info->mti_mdt->mdt_flags & MDT_CL_COMPAT_RESNAME) result = mdt_lock_resname_compat(info->mti_mdt, dlm); + info->mti_dlm_req = dlm; } else { CERROR("Can't unpack dlm request\n"); result = -EFAULT; } } + if (result == 0) /* * Process request. @@ -736,7 +757,7 @@ static int mdt_req_handle(struct mdt_thread_info *info, LASSERT(current->journal_info == NULL); - if (flags & HABEO_CLAVIS && + if (result == 0 && flags & HABEO_CLAVIS && info->mti_mdt->mdt_flags & MDT_CL_COMPAT_RESNAME) { struct ldlm_reply *rep; @@ -770,11 +791,8 @@ static void mdt_thread_info_init(struct mdt_thread_info *info) int i; info->mti_fail_id = OBD_FAIL_MDS_ALL_REPLY_NET; - /* - * Poison size array. - */ for (i = 0; i < ARRAY_SIZE(info->mti_rep_buf_size); i++) - info->mti_rep_buf_size[i] = ~0; + info->mti_rep_buf_size[i] = 0; info->mti_rep_buf_nr = i; for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++) mdt_lock_handle_init(&info->mti_lh[i]); @@ -1051,6 +1069,77 @@ void intent_set_disposition(struct ldlm_reply *rep, int flag) rep->lock_policy_res1 |= flag; } +enum mdt_it_code { + MDT_IT_OPEN, + MDT_IT_OCREAT, + MDT_IT_CREATE, + MDT_IT_GETATTR, + MDT_IT_READDIR, + MDT_IT_LOOKUP, + MDT_IT_UNLINK, + MDT_IT_TRUNC, + MDT_IT_GETXATTR, + MDT_IT_NR +}; + +static struct { + const struct req_format *it_fmt; + __u32 it_flags; + int (*it_act)(struct mdt_thread_info *, + struct ptlrpc_request *, int); +} mdt_it_flavor[] = { + [MDT_IT_OPEN] = { NULL, 0, mdt_reint_internal }, + [MDT_IT_OCREAT] = { NULL, 0, mdt_reint_internal }, + [MDT_IT_CREATE] = { NULL, 0, NULL }, + [MDT_IT_GETATTR] = { &RQF_LDLM_INTENT_GETATTR, + HABEO_DISCUS, mdt_getattr_name }, + [MDT_IT_READDIR] = { NULL, 0, NULL }, + [MDT_IT_LOOKUP] = { &RQF_LDLM_INTENT_GETATTR, + HABEO_DISCUS, mdt_getattr_name }, + [MDT_IT_UNLINK] = { NULL, 0, NULL }, + [MDT_IT_TRUNC] = { NULL, 0, NULL }, + [MDT_IT_GETXATTR] = { NULL, 0, NULL } +}; + +static int mdt_intent_code(long itcode) +{ + int result; + + switch(itcode) { + case IT_OPEN: + result = MDT_IT_OPEN; + break; + case IT_OPEN|IT_CREAT: + result = MDT_IT_OCREAT; + break; + case IT_CREAT: + result = MDT_IT_CREATE; + break; + case IT_READDIR: + result = MDT_IT_READDIR; + break; + case IT_GETATTR: + result = MDT_IT_GETATTR; + break; + case IT_LOOKUP: + result = MDT_IT_LOOKUP; + break; + case IT_UNLINK: + result = MDT_IT_UNLINK; + break; + case IT_TRUNC: + result = MDT_IT_TRUNC; + break; + case IT_GETXATTR: + result = MDT_IT_GETXATTR; + break; + default: + CERROR("Unknown intent opcode: %ld\n", itcode); + result = -EINVAL; + break; + } + return result; +} static int mdt_intent_policy(struct ldlm_namespace *ns, struct ldlm_lock **lockp, void *req_cookie, @@ -1064,99 +1153,53 @@ static int mdt_intent_policy(struct ldlm_namespace *ns, int rc; int gflags = 0; struct mdt_thread_info *info; - struct mdt_body *body; + struct req_capsule *pill; + int opcode; ENTRY; LASSERT(req != NULL); info = lu_context_key_get(req->rq_svc_thread->t_ctx, &mdt_thread_key); LASSERT(info != NULL); - + pill = &info->mti_pill; + LASSERT(pill->rc_req == req); if (req->rq_reqmsg->bufcount <= MDS_REQ_INTENT_IT_OFF) { /* No intent was provided */ - info->mti_rep_buf_size[0] = sizeof(struct ldlm_reply); - rc = lustre_pack_reply(req, 1, info->mti_rep_buf_size, NULL); - LASSERT(rc == 0); - RETURN(0); + LASSERT(pill->rc_fmt == &RQF_LDLM_ENQUEUE); + RETURN(req_capsule_pack(pill)); } - it = lustre_swab_reqbuf(req, MDS_REQ_INTENT_IT_OFF, sizeof(*it), - lustre_swab_ldlm_intent); + req_capsule_extend(pill, &RQF_LDLM_INTENT); + it = req_capsule_client_get(pill, &RMF_LDLM_INTENT); if (it == NULL) { - CERROR("Intent missing\n"); - RETURN(req->rq_status = -EFAULT); + rc = -EFAULT; + RETURN(rc); } - if (it->opc == IT_GETATTR || it->opc == IT_LOOKUP) { - body = lustre_swab_reqbuf(req, MDS_REQ_INTENT_REC_OFF, - sizeof *body, - lustre_swab_mdt_body); - info->mti_body = body; - if (body != NULL) { - struct mdt_object *obj; - const struct lu_context *ctx = info->mti_ctxt; - - obj = mdt_object_find(ctx, info->mti_mdt, &body->fid1); - if (!IS_ERR(obj)) { - if (!lu_object_exists(ctx, - &obj->mot_obj.mo_lu)) { - CERROR("Object doesn't exist\n"); - mdt_object_put(ctx, obj); - rc = -ENOENT; - } else { - info->mti_object = obj; - rc = 0; - } - } else - rc = PTR_ERR(obj); - } else { - CERROR("Can't unpack body\n"); - rc = -EFAULT; - } - - //TODO: if rc then pack reply according to it - if (rc) { - CERROR("Cannot prepare intent info, rc=%u\n", rc); - RETURN(req->rq_status = rc); - } - } LDLM_DEBUG(lock, "intent policy, opc: %s", ldlm_it2str(it->opc)); - info->mti_rep_buf_nr = 3; - info->mti_rep_buf_size[0] = sizeof(*rep); - info->mti_rep_buf_size[1] = sizeof(struct mdt_body); - info->mti_rep_buf_size[2] = sizeof(struct lov_mds_md);/*FIXME:See mds*/ + opcode = mdt_intent_code(it->opc); + if (opcode < 0) + RETURN(-EINVAL); - rc = lustre_pack_reply(req, info->mti_rep_buf_nr, - info->mti_rep_buf_size, NULL); - if (rc) { - RETURN(req->rq_status = rc); + req_capsule_extend(pill, mdt_it_flavor[opcode].it_fmt); + + if (mdt_it_flavor[opcode].it_flags & HABEO_CORPUS) { + rc = mdt_habeo_corpus(info, mdt_it_flavor[opcode].it_flags); + if (rc != 0) + RETURN(rc); } - rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep)); - intent_set_disposition(rep, DISP_IT_EXECD); + rc = req_capsule_pack(pill); + if (rc != 0) + RETURN(rc); + rep = req_capsule_server_get(pill, &RMF_DLM_REP); + intent_set_disposition(rep, DISP_IT_EXECD); /* execute policy */ - switch ((long)it->opc) { - case IT_OPEN: - case IT_OPEN|IT_CREAT: - rep->lock_policy_res2 = mdt_reint_internal(info, req, offset); - RETURN(ELDLM_LOCK_ABORTED); - break; - case IT_GETATTR: - gflags = MDS_INODELOCK_UPDATE; - case IT_LOOKUP: - gflags |= MDS_INODELOCK_LOOKUP; - //hmm.. something should be done with gflags.. - rep->lock_policy_res2 = mdt_getattr_name(info, req, offset + 1); - - break; - default: - CERROR("Unhandled intent till now "LPD64"\n", it->opc); - LBUG(); - } + rep->lock_policy_res2 = mdt_it_flavor[opcode].it_act(info, req, offset); RETURN(ELDLM_OK); } @@ -1792,11 +1835,18 @@ static struct lu_device *mdt_device_alloc(const struct lu_context *ctx, return l; } +/* + * context key constructor/destructor + */ static void *mdt_thread_init(const struct lu_context *ctx) { struct mdt_thread_info *info; + /* + * check that no high order allocations are incurred. + */ + CLASSERT(CFS_PAGE_SIZE >= sizeof *info); OBD_ALLOC_PTR(info); if (info != NULL) info->mti_ctxt = ctx; @@ -1867,48 +1917,61 @@ static void __exit mdt_mod_exit(void) } -#define DEF_HNDL(prefix, base, suffix, flags, opc, fn) \ +#define DEF_HNDL(prefix, base, suffix, flags, opc, fn, fmt) \ [prefix ## _ ## opc - prefix ## _ ## base] = { \ .mh_name = #opc, \ .mh_fail_id = OBD_FAIL_ ## prefix ## _ ## opc ## suffix, \ .mh_opc = prefix ## _ ## opc, \ .mh_flags = flags, \ - .mh_act = fn \ + .mh_act = fn, \ + .mh_fmt = fmt \ } -#define DEF_MDT_HNDL(flags, name, fn) \ - DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn) +#define DEF_MDT_HNDL(flags, name, fn, fmt) \ + DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, fmt) +/* + * Request with a format known in advance + */ +#define DEF_MDT_HNDL_F(flags, name, fn) \ + DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, &RQF_MDS_ ## name) +/* + * Request with a format we do not yet know + */ +#define DEF_MDT_HNDL_0(flags, name, fn) \ + DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, NULL) static struct mdt_handler mdt_mds_ops[] = { - DEF_MDT_HNDL(0, CONNECT, mdt_connect), - DEF_MDT_HNDL(0, DISCONNECT, mdt_disconnect), - DEF_MDT_HNDL(0, GETSTATUS, mdt_getstatus), - DEF_MDT_HNDL(HABEO_DISCUS, GETATTR, mdt_getattr), - DEF_MDT_HNDL(HABEO_DISCUS, GETATTR_NAME, mdt_getattr_name), - DEF_MDT_HNDL(HABEO_DISCUS, SETXATTR, mdt_setxattr), - DEF_MDT_HNDL(HABEO_DISCUS, GETXATTR, mdt_getxattr), - DEF_MDT_HNDL(0, STATFS, mdt_statfs), - DEF_MDT_HNDL(HABEO_DISCUS, READPAGE, mdt_readpage), - DEF_MDT_HNDL(0, REINT, mdt_reint), - DEF_MDT_HNDL(HABEO_DISCUS, CLOSE, mdt_close), - DEF_MDT_HNDL(HABEO_CORPUS, DONE_WRITING, mdt_done_writing), - DEF_MDT_HNDL(0, PIN, mdt_pin), - DEF_MDT_HNDL(HABEO_DISCUS, SYNC, mdt_sync), - DEF_MDT_HNDL(0, QUOTACHECK, mdt_handle_quotacheck), - DEF_MDT_HNDL(0, QUOTACTL, mdt_handle_quotactl) +DEF_MDT_HNDL_F(0, CONNECT, mdt_connect), +DEF_MDT_HNDL_F(0, DISCONNECT, mdt_disconnect), +DEF_MDT_HNDL_F(0 |HABEO_REFERO, GETSTATUS, mdt_getstatus), +DEF_MDT_HNDL_F(HABEO_DISCUS|HABEO_REFERO, GETATTR, mdt_getattr), +DEF_MDT_HNDL_F(HABEO_DISCUS|HABEO_REFERO, GETATTR_NAME, mdt_getattr_name), +DEF_MDT_HNDL_0(HABEO_DISCUS, SETXATTR, mdt_setxattr), +DEF_MDT_HNDL_0(HABEO_DISCUS, GETXATTR, mdt_getxattr), +DEF_MDT_HNDL_F(0 |HABEO_REFERO, STATFS, mdt_statfs), +DEF_MDT_HNDL_0(HABEO_DISCUS, READPAGE, mdt_readpage), +DEF_MDT_HNDL_0(0, REINT, mdt_reint), +DEF_MDT_HNDL_0(HABEO_DISCUS, CLOSE, mdt_close), +DEF_MDT_HNDL_0(HABEO_CORPUS, DONE_WRITING, mdt_done_writing), +DEF_MDT_HNDL_0(0, PIN, mdt_pin), +DEF_MDT_HNDL_0(HABEO_DISCUS, SYNC, mdt_sync), +DEF_MDT_HNDL_0(0, QUOTACHECK, mdt_handle_quotacheck), +DEF_MDT_HNDL_0(0, QUOTACTL, mdt_handle_quotactl) }; static struct mdt_handler mdt_obd_ops[] = { }; -#define DEF_DLM_HNDL(flags, name, fn) \ - DEF_HNDL(LDLM, ENQUEUE, , flags, name, fn) +#define DEF_DLM_HNDL_0(flags, name, fn) \ + DEF_HNDL(LDLM, ENQUEUE, , flags, name, fn, NULL) +#define DEF_DLM_HNDL_F(flags, name, fn) \ + DEF_HNDL(LDLM, ENQUEUE, , flags, name, fn, &RQF_LDLM_ ## name) static struct mdt_handler mdt_dlm_ops[] = { - DEF_DLM_HNDL(HABEO_CLAVIS, ENQUEUE, mdt_enqueue), - DEF_DLM_HNDL(HABEO_CLAVIS, CONVERT, mdt_convert), - DEF_DLM_HNDL(0, BL_CALLBACK, mdt_bl_callback), - DEF_DLM_HNDL(0, CP_CALLBACK, mdt_cp_callback) + DEF_DLM_HNDL_F(HABEO_CLAVIS, ENQUEUE, mdt_enqueue), + DEF_DLM_HNDL_0(HABEO_CLAVIS, CONVERT, mdt_convert), + DEF_DLM_HNDL_0(0, BL_CALLBACK, mdt_bl_callback), + DEF_DLM_HNDL_0(0, CP_CALLBACK, mdt_cp_callback) }; static struct mdt_handler mdt_llog_ops[] = { diff --git a/lustre/mdt/mdt_internal.h b/lustre/mdt/mdt_internal.h index c6820cb..aff0ccc 100644 --- a/lustre/mdt/mdt_internal.h +++ b/lustre/mdt/mdt_internal.h @@ -137,49 +137,57 @@ struct mdt_reint_record { * reduce stack consumption. */ struct mdt_thread_info { - const struct lu_context *mti_ctxt; - struct mdt_device *mti_mdt; + const struct lu_context *mti_ctxt; + struct mdt_device *mti_mdt; /* * number of buffers in reply message. */ - int mti_rep_buf_nr; + int mti_rep_buf_nr; /* * sizes of reply buffers. */ - int mti_rep_buf_size[MDT_REP_BUF_NR_MAX]; + int mti_rep_buf_size[MDT_REP_BUF_NR_MAX]; /* * Body for "habeo corpus" operations. */ - struct mdt_body *mti_body; + const struct mdt_body *mti_body; /* * Lock request for "habeo clavis" operations. */ - struct ldlm_request *mti_dlm_req; + const struct ldlm_request *mti_dlm_req; /* * Host object. This is released at the end of mdt_handler(). */ - struct mdt_object *mti_object; + struct mdt_object *mti_object; /* * Object attributes. */ - struct lu_attr mti_attr; + struct lu_attr mti_attr; /* * reint record. Containing information for reint operations. */ - struct mdt_reint_record mti_rr; + struct mdt_reint_record mti_rr; /* * Additional fail id that can be set by handler. Passed to * target_send_reply(). */ - int mti_fail_id; + int mti_fail_id; /* * A couple of lock handles. */ - struct mdt_lock_handle mti_lh[MDT_LH_NR]; + struct mdt_lock_handle mti_lh[MDT_LH_NR]; /* * for req-layout interface. */ - struct req_capsule mti_pill; + struct req_capsule mti_pill; + /* + * buffer for mdt_statfs(). + * + * XXX this is probably huge overkill, because statfs is not that + * frequent. + */ + struct kstatfs mti_sfs; + }; int fid_lock(struct ldlm_namespace *, const struct lu_fid *, @@ -190,7 +198,7 @@ void fid_unlock(struct ldlm_namespace *, const struct lu_fid *, struct lustre_handle *, ldlm_mode_t); struct mdt_object *mdt_object_find(const struct lu_context *, - struct mdt_device *, struct lu_fid *); + struct mdt_device *, const struct lu_fid *); void mdt_object_put(const struct lu_context *ctxt, struct mdt_object *); int mdt_object_lock(struct ldlm_namespace *, struct mdt_object *, diff --git a/lustre/mdt/mdt_reint.c b/lustre/mdt/mdt_reint.c index 6640657..1524ee3 100644 --- a/lustre/mdt/mdt_reint.c +++ b/lustre/mdt/mdt_reint.c @@ -130,7 +130,7 @@ static int mdt_md_mkobj(struct mdt_thread_info *info) mdt_object_put(info->mti_ctxt, o); } else result = PTR_ERR(o); - + RETURN(result); } @@ -157,11 +157,12 @@ static int mdt_reint_create(struct mdt_thread_info *info) rc = mdt_md_mkdir(info); else rc = mdt_md_mkobj(info); - - /* return fid to client. mti_body should point to + + /* return fid to client. mti_body should point to * rep's body. */ - info->mti_body->fid1 = *info->mti_rr.rr_fid2; - info->mti_body->valid |= OBD_MD_FLID; + /* XXX these fields are not used by the callers. */ + /* info->mti_body->fid1 = *info->mti_rr.rr_fid2; */ + /* info->mti_body->valid |= OBD_MD_FLID; */ break; } case S_IFLNK:{ @@ -226,9 +227,9 @@ static int mdt_reint_open(struct mdt_thread_info *info) if (result && result != -ENOENT) { GOTO(out_parent, result); } - + child = mdt_object_find(info->mti_ctxt, mdt, info->mti_rr.rr_fid2); - if (IS_ERR(child)) + if (IS_ERR(child)) GOTO(out_parent, PTR_ERR(child)); if (info->mti_rr.rr_flags & MDS_OPEN_CREAT) { @@ -243,10 +244,10 @@ static int mdt_reint_open(struct mdt_thread_info *info) result = -EEXIST; } } - + if (result == 0) result = mdt_md_open(info, child); - + mdt_object_put(info->mti_ctxt, child); out_parent: diff --git a/lustre/ptlrpc/layout.c b/lustre/ptlrpc/layout.c index 1a65430..06ee214 100644 --- a/lustre/ptlrpc/layout.c +++ b/lustre/ptlrpc/layout.c @@ -69,12 +69,54 @@ static const struct req_msg_field *mds_reint_create_client[] = { &RMF_NAME }; +static const struct req_msg_field *mds_connect_client[] = { + &RMF_TGTUUID, + &RMF_CLUUID, + &RMF_CONN, + &RMF_CONNECT_DATA +}; + +static const struct req_msg_field *mds_connect_server[] = { + &RMF_CONNECT_DATA +}; + +static const struct req_msg_field *ldlm_enqueue_client[] = { + &RMF_DLM_REQ +}; + +static const struct req_msg_field *ldlm_enqueue_server[] = { + &RMF_DLM_REP +}; + +static const struct req_msg_field *ldlm_intent_client[] = { + &RMF_DLM_REQ, + &RMF_LDLM_INTENT +}; + +static const struct req_msg_field *ldlm_intent_server[] = { + &RMF_DLM_REP, + &RMF_MDT_BODY, + &RMF_MDT_MD +}; + +static const struct req_msg_field *ldlm_intent_getattr_client[] = { + &RMF_DLM_REQ, + &RMF_LDLM_INTENT, + &RMF_MDT_BODY, /* coincides with mds_getattr_name_client[] */ + &RMF_NAME +}; + static const struct req_format *req_formats[] = { + &RQF_MDS_CONNECT, + &RQF_MDS_DISCONNECT, &RQF_MDS_GETSTATUS, &RQF_MDS_STATFS, &RQF_MDS_GETATTR, &RQF_MDS_GETATTR_NAME, - &RQF_MDS_REINT_CREATE + &RQF_MDS_REINT_CREATE, + &RQF_LDLM_ENQUEUE, + &RQF_LDLM_INTENT, + &RQF_LDLM_INTENT_GETATTR }; struct req_msg_field { @@ -120,6 +162,48 @@ const struct req_msg_field RMF_REC_CREATE = sizeof(struct mdt_rec_create), lustre_swab_mdt_rec_create); EXPORT_SYMBOL(RMF_REC_CREATE); +const struct req_msg_field RMF_TGTUUID = + DEFINE_MSGF("tgtuuid", RMF_F_STRING, sizeof(struct obd_uuid) - 1, NULL); +EXPORT_SYMBOL(RMF_TGTUUID); + +const struct req_msg_field RMF_CLUUID = + DEFINE_MSGF("cluuid", RMF_F_STRING, sizeof(struct obd_uuid) - 1, NULL); +EXPORT_SYMBOL(RMF_CLUUID); + +/* + * connection handle received in MDS_CONNECT request. + * + * XXX no swabbing? + */ +const struct req_msg_field RMF_CONN = + DEFINE_MSGF("conn", 0, sizeof(struct lustre_handle), NULL); +EXPORT_SYMBOL(RMF_CONN); + +const struct req_msg_field RMF_CONNECT_DATA = + DEFINE_MSGF("cdata", 0, + sizeof(struct obd_connect_data), lustre_swab_connect); +EXPORT_SYMBOL(RMF_CONNECT_DATA); + +const struct req_msg_field RMF_DLM_REQ = + DEFINE_MSGF("dlm_req", 0, + sizeof(struct ldlm_request), lustre_swab_ldlm_request); +EXPORT_SYMBOL(RMF_DLM_REQ); + +const struct req_msg_field RMF_DLM_REP = + DEFINE_MSGF("dlm_rep", 0, + sizeof(struct ldlm_reply), lustre_swab_ldlm_reply); +EXPORT_SYMBOL(RMF_DLM_REP); + +const struct req_msg_field RMF_LDLM_INTENT = + DEFINE_MSGF("ldlm_intent", 0, + sizeof(struct ldlm_intent), lustre_swab_ldlm_intent); +EXPORT_SYMBOL(RMF_LDLM_INTENT); + +const struct req_msg_field RMF_MDT_MD = + DEFINE_MSGF("mdt_md", + 0, sizeof(struct lov_mds_md) /* FIXME: See mds */, NULL); +EXPORT_SYMBOL(RMF_MDT_MD); + /* * Request formats. */ @@ -172,6 +256,30 @@ const struct req_format RQF_MDS_REINT_CREATE = mds_reint_create_client, mdt_body_only); EXPORT_SYMBOL(RQF_MDS_REINT_CREATE); +const struct req_format RQF_MDS_CONNECT = + DEFINE_REQ_FMT0("MDS_CONNECT", + mds_connect_client, mds_connect_server); +EXPORT_SYMBOL(RQF_MDS_CONNECT); + +const struct req_format RQF_MDS_DISCONNECT = + DEFINE_REQ_FMT0("MDS_DISCONNECT", empty, empty); +EXPORT_SYMBOL(RQF_MDS_DISCONNECT); + +const struct req_format RQF_LDLM_ENQUEUE = + DEFINE_REQ_FMT0("LDLM_ENQUEUE", + ldlm_enqueue_client, ldlm_enqueue_server); +EXPORT_SYMBOL(RQF_LDLM_ENQUEUE); + +const struct req_format RQF_LDLM_INTENT = + DEFINE_REQ_FMT0("LDLM_INTENT", + ldlm_intent_client, ldlm_intent_server); +EXPORT_SYMBOL(RQF_LDLM_INTENT); + +const struct req_format RQF_LDLM_INTENT_GETATTR = + DEFINE_REQ_FMT0("LDLM_INTENT", + ldlm_intent_getattr_client, ldlm_intent_server); +EXPORT_SYMBOL(RQF_LDLM_INTENT_GETATTR); + int req_layout_init(void) { int i; @@ -207,13 +315,15 @@ void req_layout_fini(void) EXPORT_SYMBOL(req_layout_fini); void req_capsule_init(struct req_capsule *pill, - struct ptlrpc_request *req, enum req_location location) + struct ptlrpc_request *req, enum req_location location, + int *area) { LASSERT(location == RCL_SERVER || location == RCL_CLIENT); memset(pill, 0, sizeof *pill); pill->rc_req = req; pill->rc_loc = location; + pill->rc_area = area; } EXPORT_SYMBOL(req_capsule_init); @@ -229,54 +339,61 @@ static int __req_format_is_sane(const struct req_format *fmt) req_formats[fmt->rf_idx] == fmt; } -static void __req_capsule_init(struct req_capsule *pill, - const struct req_format *fmt, - enum req_location loc) +void req_capsule_set(struct req_capsule *pill, const struct req_format *fmt) { - LASSERT(0 <= loc && loc < ARRAY_SIZE(pill->rc_fmt)); - LASSERT(pill->rc_fmt[loc] == NULL); - LASSERT(ergo(loc != pill->rc_loc, pill->rc_swabbed == 0)); + LASSERT(pill->rc_fmt == NULL); LASSERT(__req_format_is_sane(fmt)); - pill->rc_fmt[loc] = fmt; -} - -void req_capsule_server_init(struct req_capsule *pill, - const struct req_format *fmt) -{ - __req_capsule_init(pill, fmt, RCL_SERVER); + pill->rc_fmt = fmt; } -EXPORT_SYMBOL(req_capsule_server_init); +EXPORT_SYMBOL(req_capsule_set); -void req_capsule_client_init(struct req_capsule *pill, - const struct req_format *fmt) -{ - __req_capsule_init(pill, fmt, RCL_CLIENT); -} -EXPORT_SYMBOL(req_capsule_client_init); - -int req_capsule_start(struct req_capsule *pill, - const struct req_format *fmt, int *area) +int req_capsule_pack(struct req_capsule *pill) { int i; int nr; int result; + int total; + + const struct req_format *fmt; LASSERT(pill->rc_loc == RCL_SERVER); + fmt = pill->rc_fmt; + LASSERT(fmt != NULL); nr = fmt->rf_fields[RCL_SERVER].nr; - for (i = 0; i < nr; ++i) - area[i] = fmt->rf_fields[RCL_SERVER].d[i]->rmf_size; - req_capsule_server_init(pill, fmt); - result = lustre_pack_reply(pill->rc_req, nr, area, NULL); + for (total = 0, i = 0; i < nr; ++i) { + int *size; + + size = &pill->rc_area[i]; + if (*size == 0) { + *size = fmt->rf_fields[RCL_SERVER].d[i]->rmf_size; + LASSERT(*size != 0); + } + total += *size; + } + result = lustre_pack_reply(pill->rc_req, nr, pill->rc_area, NULL); if (result != 0) { DEBUG_REQ(D_ERROR, pill->rc_req, - "Failed to pack %d fields in format `%s': ", - nr, fmt->rf_name); + "Cannot pack %d fields (%d bytes) in format `%s': ", + nr, total, fmt->rf_name); } return result; } -EXPORT_SYMBOL(req_capsule_start); +EXPORT_SYMBOL(req_capsule_pack); + +static int __req_capsule_offset(const struct req_capsule *pill, + const struct req_msg_field *field, + enum req_location loc) +{ + int offset; + + offset = field->rmf_offset[pill->rc_fmt->rf_idx][loc]; + LASSERT(offset > 0); + offset --; + LASSERT(0 <= offset && offset < (sizeof(pill->rc_swabbed) << 3)); + return offset; +} static void *__req_capsule_get(const struct req_capsule *pill, const struct req_msg_field *field, @@ -295,17 +412,11 @@ static void *__req_capsule_get(const struct req_capsule *pill, [RCL_SERVER] = "server" }; - LASSERT(0 <= loc && loc < ARRAY_SIZE(pill->rc_fmt)); - CLASSERT(ARRAY_SIZE(pill->rc_fmt) == ARRAY_SIZE(field->rmf_offset[0])); - - fmt = pill->rc_fmt[loc]; + fmt = pill->rc_fmt; LASSERT(fmt != NULL); LASSERT(__req_format_is_sane(fmt)); - offset = field->rmf_offset[fmt->rf_idx][loc]; - LASSERT(offset > 0); - -- offset; - LASSERT(0 <= offset && offset < (sizeof(pill->rc_swabbed) << 3)); + offset = __req_capsule_offset(pill, field, loc); req = pill->rc_req; msg = loc == RCL_CLIENT ? req->rq_reqmsg : req->rq_repmsg; @@ -329,10 +440,10 @@ static void *__req_capsule_get(const struct req_capsule *pill, return value; } -const void *req_capsule_client_get(const struct req_capsule *pill, - const struct req_msg_field *field) +void *req_capsule_client_get(const struct req_capsule *pill, + const struct req_msg_field *field) { - return (const void *)__req_capsule_get(pill, field, RCL_CLIENT); + return __req_capsule_get(pill, field, RCL_CLIENT); } EXPORT_SYMBOL(req_capsule_client_get); @@ -343,3 +454,47 @@ void *req_capsule_server_get(const struct req_capsule *pill, } EXPORT_SYMBOL(req_capsule_server_get); +const void *req_capsule_other_get(const struct req_capsule *pill, + const struct req_msg_field *field) +{ + return __req_capsule_get(pill, field, pill->rc_loc ^ 1); +} +EXPORT_SYMBOL(req_capsule_other_get); + +void req_capsule_set_size(const struct req_capsule *pill, + const struct req_msg_field *field, + enum req_location loc, int size) +{ + pill->rc_area[__req_capsule_offset(pill, field, loc)] = size; +} +EXPORT_SYMBOL(req_capsule_set_size); + +void req_capsule_extend(struct req_capsule *pill, const struct req_format *fmt) +{ + int i; + int j; + + const struct req_format *old; + + LASSERT(pill->rc_fmt != NULL); + LASSERT(__req_format_is_sane(fmt)); + + old = pill->rc_fmt; + /* + * Sanity checking... + */ + for (i = 0; i < RCL_NR; ++i) { + LASSERT(fmt->rf_fields[i].nr >= old->rf_fields[i].nr); + for (j = 0; j < old->rf_fields[i].nr - 1; ++j) { + LASSERT(fmt->rf_fields[i].d[j]->rmf_size == + old->rf_fields[i].d[j]->rmf_size); + } + /* + * Last field in old format can be shorter than in new. + */ + LASSERT(fmt->rf_fields[i].d[j]->rmf_size >= + old->rf_fields[i].d[j]->rmf_size); + } + pill->rc_fmt = fmt; +} +EXPORT_SYMBOL(req_capsule_extend);