Whamcloud - gitweb
git://git.whamcloud.com
/
fs
/
lustre-release.git
/ commitdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
| commitdiff |
tree
raw
|
patch
| inline |
side by side
(parent:
e24895b
)
mdt: new portion of cleanup. Mostly switch to req-layout.
author
nikita
<nikita>
Wed, 24 May 2006 22:21:27 +0000
(22:21 +0000)
committer
nikita
<nikita>
Wed, 24 May 2006 22:21:27 +0000
(22:21 +0000)
Tested by: mount && cd /mnt/lustre && mkdir foo && ls -ldi foo
lustre/include/lustre/lustre_idl.h
patch
|
blob
|
history
lustre/include/lustre_dlm.h
patch
|
blob
|
history
lustre/include/lustre_req_layout.h
patch
|
blob
|
history
lustre/ldlm/ldlm_lockd.c
patch
|
blob
|
history
lustre/lmv/lmv_fld.c
patch
|
blob
|
history
lustre/mdt/mdt_handler.c
patch
|
blob
|
history
lustre/mdt/mdt_internal.h
patch
|
blob
|
history
lustre/mdt/mdt_reint.c
patch
|
blob
|
history
lustre/ptlrpc/layout.c
patch
|
blob
|
history
diff --git
a/lustre/include/lustre/lustre_idl.h
b/lustre/include/lustre/lustre_idl.h
index
7f591fe
..
e1266d6
100644
(file)
--- a/
lustre/include/lustre/lustre_idl.h
+++ b/
lustre/include/lustre/lustre_idl.h
@@
-177,9
+177,16
@@
static inline __u64 fid_num(const struct lu_fid *fid)
return f_ver | fid_oid(fid);
}
+static inline int fid_seq_is_sane(__u64 seq)
+{
+ return seq != 0;
+}
+
static inline int fid_is_sane(const struct lu_fid *fid)
{
- return fid != NULL && fid_seq(fid) != 0 && fid_oid(fid) != 0;
+ return
+ fid != NULL &&
+ fid_seq_is_sane(fid_seq(fid)) && fid_oid(fid) != 0;
}
#define DFID3 "["LPU64"/%u:%u]"
diff --git
a/lustre/include/lustre_dlm.h
b/lustre/include/lustre_dlm.h
index
4b74c90
..
f68fca4
100644
(file)
--- a/
lustre/include/lustre_dlm.h
+++ b/
lustre/include/lustre_dlm.h
@@
-570,13
+570,13
@@
int ldlm_cli_enqueue(struct obd_export *exp,
void *lvb_swabber,
struct lustre_handle *lockh);
int ldlm_handle_enqueue0(struct ldlm_namespace *ns, struct ptlrpc_request *req,
- struct ldlm_request *dlm_req,
- struct ldlm_callback_suite *cbs);
+
const
struct ldlm_request *dlm_req,
+
const
struct ldlm_callback_suite *cbs);
int ldlm_server_ast(struct lustre_handle *lockh, struct ldlm_lock_desc *new,
void *data, __u32 data_len);
int ldlm_cli_convert(struct lustre_handle *, int new_mode, int *flags);
int ldlm_handle_convert0(struct ptlrpc_request *req,
- struct ldlm_request *dlm_req);
+
const
struct ldlm_request *dlm_req);
int ldlm_cli_cancel(struct lustre_handle *lockh);
int ldlm_cli_cancel_unused(struct ldlm_namespace *, struct ldlm_res_id *,
int flags, void *opaque);
diff --git
a/lustre/include/lustre_req_layout.h
b/lustre/include/lustre_req_layout.h
index
89660f4
..
8774a06
100644
(file)
--- a/
lustre/include/lustre_req_layout.h
+++ b/
lustre/include/lustre_req_layout.h
@@
-46,27
+46,30
@@
enum req_location {
struct req_capsule {
struct ptlrpc_request *rc_req;
- const struct req_format *rc_fmt
[RCL_NR]
;
+ const struct req_format *rc_fmt;
__u32 rc_swabbed;
enum req_location rc_loc;
+ int *rc_area;
};
-void req_capsule_init(struct req_capsule *pill,
- struct ptlrpc_request *req,
- enum req_location location);
+void req_capsule_init(struct req_capsule *pill, struct ptlrpc_request *req,
+ enum req_location location, int *area);
void req_capsule_fini(struct req_capsule *pill);
-void req_capsule_client_init(struct req_capsule *pill,
- const struct req_format *fmt);
-void req_capsule_server_init(struct req_capsule *pill,
- const struct req_format *fmt);
-int req_capsule_start(struct req_capsule *pill,
- const struct req_format *fmt, int *area);
+void req_capsule_set(struct req_capsule *pill, const struct req_format *fmt);
+int req_capsule_pack(struct req_capsule *pill);
-
const
void *req_capsule_client_get(const struct req_capsule *pill,
-
const struct req_msg_field *field);
+void *req_capsule_client_get(const struct req_capsule *pill,
+ const struct req_msg_field *field);
void *req_capsule_server_get(const struct req_capsule *pill,
const struct req_msg_field *field);
+const void *req_capsule_other_get(const struct req_capsule *pill,
+ const struct req_msg_field *field);
+
+void req_capsule_set_size(const struct req_capsule *pill,
+ const struct req_msg_field *field,
+ enum req_location loc, int size);
+void req_capsule_extend(struct req_capsule *pill, const struct req_format *fmt);
int req_layout_init(void);
void req_layout_fini(void);
@@
-74,13
+77,32
@@
void req_layout_fini(void);
extern const struct req_format RQF_MDS_GETSTATUS;
extern const struct req_format RQF_MDS_STATFS;
extern const struct req_format RQF_MDS_GETATTR;
+extern const struct req_format RQF_MDS_CONNECT;
+extern const struct req_format RQF_MDS_DISCONNECT;
+
+/*
+ * This is format of direct (non-intent) MDS_GETATTR_NAME request.
+ */
extern const struct req_format RQF_MDS_GETATTR_NAME;
extern const struct req_format RQF_MDS_REINT_CREATE;
+extern const struct req_format RQF_LDLM_ENQUEUE;
+extern const struct req_format RQF_LDLM_INTENT;
+extern const struct req_format RQF_LDLM_INTENT_GETATTR;
extern const struct req_msg_field RMF_MDT_BODY;
extern const struct req_msg_field RMF_OBD_STATFS;
extern const struct req_msg_field RMF_NAME;
extern const struct req_msg_field RMF_REC_CREATE;
-
+extern const struct req_msg_field RMF_TGTUUID;
+extern const struct req_msg_field RMF_CLUUID;
+/*
+ * connection handle received in MDS_CONNECT request.
+ */
+extern const struct req_msg_field RMF_CONN;
+extern const struct req_msg_field RMF_CONNECT_DATA;
+extern const struct req_msg_field RMF_DLM_REQ;
+extern const struct req_msg_field RMF_DLM_REP;
+extern const struct req_msg_field RMF_LDLM_INTENT;
+extern const struct req_msg_field RMF_MDT_MD;
#endif /* _LUSTRE_REQ_LAYOUT_H__ */
diff --git
a/lustre/ldlm/ldlm_lockd.c
b/lustre/ldlm/ldlm_lockd.c
index
0ec457e
..
274fe33
100644
(file)
--- a/
lustre/ldlm/ldlm_lockd.c
+++ b/
lustre/ldlm/ldlm_lockd.c
@@
-659,8
+659,8
@@
find_existing_lock(struct obd_export *exp, struct lustre_handle *remote_hdl)
*/
int ldlm_handle_enqueue0(struct ldlm_namespace *ns,
struct ptlrpc_request *req,
- struct ldlm_request *dlm_req,
- struct ldlm_callback_suite *cbs)
+
const
struct ldlm_request *dlm_req,
+
const
struct ldlm_callback_suite *cbs)
{
struct ldlm_reply *dlm_rep;
int rc = 0, size[2] = {sizeof(*dlm_rep)};
@@
-704,7
+704,7
@@
int ldlm_handle_enqueue0(struct ldlm_namespace *ns,
}
#if 0
- /* FIXME this makes it impossible to use LDLM_PLAIN locks -- check
+ /* FIXME this makes it impossible to use LDLM_PLAIN locks -- check
against server's _CONNECT_SUPPORTED flags? (I don't want to use
ibits for mgc/mgs) */
@@
-921,7
+921,7
@@
int ldlm_handle_enqueue(struct ptlrpc_request *req,
}
int ldlm_handle_convert0(struct ptlrpc_request *req,
- struct ldlm_request *dlm_req)
+
const
struct ldlm_request *dlm_req)
{
struct ldlm_reply *dlm_rep;
struct ldlm_lock *lock;
@@
-1182,7
+1182,7
@@
static void ldlm_handle_gl_callback(struct ptlrpc_request *req,
l_unlock(&ns->ns_lock);
if (lock->l_granted_mode == LCK_PW &&
!lock->l_readers && !lock->l_writers &&
- cfs_time_after(cfs_time_current(),
+ cfs_time_after(cfs_time_current(),
cfs_time_add(lock->l_last_used, cfs_time_seconds(10)))) {
if (ldlm_bl_to_thread(ns, NULL, lock))
ldlm_handle_bl_callback(ns, NULL, lock);
@@
-1610,7
+1610,7
@@
static int ldlm_setup(void)
spin_lock_init(&waiting_locks_spinlock);
cfs_timer_init(&waiting_locks_timer, waiting_locks_callback, 0);
- /* Using CLONE_FILES instead of CLONE_FS here causes failures in
+ /* Using CLONE_FILES instead of CLONE_FS here causes failures in
conf-sanity test 21. But using CLONE_FS can cause problems
if the daemonize happens between push/pop_ctxt... */
rc = cfs_kernel_thread(expired_lock_main, NULL, CLONE_VM | CLONE_FS);
diff --git
a/lustre/lmv/lmv_fld.c
b/lustre/lmv/lmv_fld.c
index
106dd44
..
99a5cab
100644
(file)
--- a/
lustre/lmv/lmv_fld.c
+++ b/
lustre/lmv/lmv_fld.c
@@
-49,6
+49,9
@@
int lmv_fld_lookup(struct obd_device *obd, struct lu_fid *fid)
{
int rc;
ENTRY;
+
+ LASSERT(fid_is_sane(fid));
+
/* temporary hack until fld will works */
rc = (unsigned long)fid_seq(fid) / LUSTRE_SEQ_RANGE;
CWARN("LMV: got MDS %d for sequence: "LPU64"\n", rc, fid_seq(fid));
diff --git
a/lustre/mdt/mdt_handler.c
b/lustre/mdt/mdt_handler.c
index
90a8540
..
093c350
100644
(file)
--- a/
lustre/mdt/mdt_handler.c
+++ b/
lustre/mdt/mdt_handler.c
@@
-73,23
+73,18
@@
static int mdt_getstatus(struct mdt_thread_info *info,
{
struct md_device *next = info->mti_mdt->mdt_child;
int result;
+ struct mdt_body *body;
ENTRY;
- result = req_capsule_start(&info->mti_pill, &RQF_MDS_GETSTATUS,
- info->mti_rep_buf_size);
- if (result)
- ;
- else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK))
+ if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK))
result = -ENOMEM;
else {
- info->mti_body = req_capsule_server_get(&info->mti_pill,
- &RMF_MDT_BODY);
+ body = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
result = next->md_ops->mdo_root_get(info->mti_ctxt,
- next,
- &info->mti_body->fid1);
+ next, &body->fid1);
if (result == 0)
-
info->mti_
body->valid |= OBD_MD_FLID;
+ body->valid |= OBD_MD_FLID;
}
/* the last_committed and last_xid fields are filled in for all
@@
-103,29
+98,19
@@
static int mdt_statfs(struct mdt_thread_info *info,
{
struct md_device *next = info->mti_mdt->mdt_child;
struct obd_statfs *osfs;
- struct kstatfs *sfs;
- int result;
+ int result;
ENTRY;
- info->mti_rep_buf_size[0] = sizeof(struct obd_statfs);
- result = lustre_pack_reply(req, 1, info->mti_rep_buf_size, NULL);
- if (result)
- CERROR(LUSTRE_MDT0_NAME" out of memory for statfs: size=%d\n",
- sizeof(struct obd_statfs));
- else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
+ if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
CERROR(LUSTRE_MDT0_NAME": statfs lustre_pack_reply failed\n");
result = -ENOMEM;
} else {
- osfs = lustre_msg_buf(req->rq_repmsg, 0,
- sizeof(struct obd_statfs));
- OBD_ALLOC_PTR(sfs);
- if(sfs == NULL)
- RETURN(-ENOMEM);
+ osfs = req_capsule_server_get(&info->mti_pill, &RMF_OBD_STATFS);
/* XXX max_age optimisation is needed here. See mds_statfs */
- result = next->md_ops->mdo_statfs(info->mti_ctxt,
next, sfs);
-
statfs_pack(osfs,
sfs);
-
OBD_FREE_PTR(
sfs);
+ result = next->md_ops->mdo_statfs(info->mti_ctxt,
+
next, &info->mti_
sfs);
+
statfs_pack(osfs, &info->mti_
sfs);
}
RETURN(result);
@@
-157,19
+142,14
@@
static int mdt_getattr(struct mdt_thread_info *info,
struct ptlrpc_request *req, int offset)
{
int result;
+ struct mdt_body *body;
LASSERT(info->mti_object != NULL);
LASSERT(lu_object_exists(info->mti_ctxt,
&info->mti_object->mot_obj.mo_lu));
-
ENTRY;
- info->mti_rep_buf_size[0] = sizeof(struct mdt_body);
- result = lustre_pack_reply(req, 1, info->mti_rep_buf_size, NULL);
- if (result)
- CERROR(LUSTRE_MDT0_NAME" cannot pack size=%d, rc=%d\n",
- sizeof(struct mdt_body), result);
- else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
+ if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
CERROR(LUSTRE_MDT0_NAME": statfs lustre_pack_reply failed\n");
result = -ENOMEM;
} else {
@@
-177,11
+157,11
@@
static int mdt_getattr(struct mdt_thread_info *info,
result = mo_attr_get(info->mti_ctxt, next, &info->mti_attr);
if (result == 0) {
-
info->mti_body = lustre_msg_buf(req->rq_repmsg, 0
,
-
sizeof(struct mdt_body)
);
- mdt_pack_attr2body(
info->mti_
body, &info->mti_attr);
-
info->mti_
body->fid1 = *mdt_object_fid(info->mti_object);
-
info->mti_
body->valid |= OBD_MD_FLID;
+
body = req_capsule_server_get(&info->mti_pill
,
+
&RMF_MDT_BODY
);
+ mdt_pack_attr2body(body, &info->mti_attr);
+ body->fid1 = *mdt_object_fid(info->mti_object);
+ body->valid |= OBD_MD_FLID;
}
}
RETURN(result);
@@
-193,23
+173,17
@@
static int mdt_getattr_name(struct mdt_thread_info *info,
struct md_object *next = mdt_object_child(info->mti_object);
struct mdt_object *child;
struct mdt_body *body;
- char *name;
- int namesize;
+ const char *name;
int result;
LASSERT(info->mti_object != NULL);
ENTRY;
- body = lustre_msg_buf(req->rq_repmsg, 1, sizeof *body);
- LASSERT(body != NULL);
-
- name = lustre_msg_string(req->rq_reqmsg, offset, 0);
- if (name == NULL) {
- CERROR("Can't unpack name\n");
+ body = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
+ name = req_capsule_client_get(&info->mti_pill, &RMF_NAME);
+ if (name == NULL)
RETURN(-EFAULT);
- }
- namesize = lustre_msg_buflen(req->rq_reqmsg, offset);
result = mdo_lookup(info->mti_ctxt, next, name, &body->fid1);
if (result == 0) {
@@
-303,11
+277,11
@@
static int mdt_reint_internal(struct mdt_thread_info *info,
CERROR("invalid record\n");
RETURN(rc = -EINVAL);
}
-
+
/* XXX: this should be cleanup up */
rep_off = offset == MDS_REQ_INTENT_REC_OFF ? 1 : 0;
-
- /* init body for handlers by rep's body, they may want to
+
+ /* init body for handlers by rep's body, they may want to
* store there something. */
info->mti_body = lustre_msg_buf(req->rq_repmsg, rep_off,
sizeof(struct mdt_body));
@@
-351,7
+325,7
@@
static int mdt_reint(struct mdt_thread_info *info,
info->mti_rep_buf_size, NULL);
if (rc)
RETURN(rc);
-
+
/* init body by rep body, needed by handlers to return data to client */
info->mti_body = lustre_msg_buf(req->rq_repmsg, 0,
sizeof(struct mdt_body));
@@
-512,7
+486,7
@@
static struct mdt_object *mdt_obj(struct lu_object *o)
struct mdt_object *mdt_object_find(const struct lu_context *ctxt,
struct mdt_device *d,
- struct lu_fid *f)
+
const
struct lu_fid *f)
{
struct lu_object *o;
@@
-584,23
+558,29
@@
struct mdt_handler {
__u32 mh_flags;
int (*mh_act)(struct mdt_thread_info *info,
struct ptlrpc_request *req, int offset);
+
+ const struct req_format *mh_fmt;
};
enum mdt_handler_flags {
/*
- * struct mdt_body is passed in the
0-th incoming buffer
.
+ * struct mdt_body is passed in the
incoming message
.
*/
HABEO_CORPUS = (1 << 0),
/*
- * struct ldlm_request is passed in MDS_REQ_INTENT_LOCKREQ_OFF-th
- * incoming buffer.
+ * struct ldlm_request is passed in the incoming message.
*/
HABEO_CLAVIS = (1 << 1),
/*
- * object, identified by fid passed in mdt_body already exists (on
- * disk).
.
+ * object, identified by fid passed in mdt_body
,
already exists (on
+ * disk).
*/
- HABEO_DISCUS = HABEO_CORPUS | (1 << 2)
+ HABEO_DISCUS = HABEO_CORPUS | (1 << 2),
+ /*
+ * this request has fixed reply format, so that reply message can be
+ * packed by generic code.
+ */
+ HABEO_REFERO = (1 << 3)
};
struct mdt_opc_slice {
@@
-649,6
+629,58
@@
static int mdt_lock_reply_compat(struct mdt_device *m, struct ldlm_reply *rep)
}
/*
+ * Generic code handling requests that have struct mdt_body passed in:
+ *
+ * - extract mdt_body from request and save it in @info;
+ *
+ * - create lu_object, corresponding to the fid in mdt_body, and save it in
+ * @info;
+ *
+ * - if HABEO_DISCUS flag is set for this request type in addition to
+ * HABEO_CORPUS, check whether object actually exists on storage
+ * (lu_object_exists()).
+ *
+ */
+static int mdt_habeo_corpus(struct mdt_thread_info *info, __u32 flags)
+{
+ const struct mdt_body *body;
+ struct mdt_object *obj;
+ const struct lu_context *ctx;
+
+ int result;
+
+ ctx = info->mti_ctxt;
+
+ body = info->mti_body = req_capsule_client_get(&info->mti_pill,
+ &RMF_MDT_BODY);
+ if (body != NULL) {
+ if (fid_is_sane(&body->fid1)) {
+ obj = mdt_object_find(ctx, info->mti_mdt, &body->fid1);
+
+ if (!IS_ERR(obj)) {
+ if ((flags & HABEO_DISCUS) == HABEO_DISCUS &&
+ !lu_object_exists(ctx,
+ &obj->mot_obj.mo_lu)) {
+ mdt_object_put(ctx, obj);
+ result = -ENOENT;
+ } else {
+ info->mti_object = obj;
+ result = 0;
+ }
+ } else
+ result = PTR_ERR(obj);
+ } else {
+ CERROR("Invalid fid: "DFID3"\n", PFID3(&body->fid1));
+ result = -EINVAL;
+ }
+ } else {
+ CERROR("Can't unpack body\n");
+ result = -EFAULT;
+ }
+ return result;
+}
+
+/*
* Invoke handler for this request opc. Also do necessary preprocessing
* (according to handler ->mh_flags), and post-processing (setting of
* ->last_{xid,committed}).
@@
-677,49
+709,38
@@
static int mdt_req_handle(struct mdt_thread_info *info,
result = 0;
flags = h->mh_flags;
- req_capsule_init(&info->mti_pill, req, RCL_SERVER);
- if (flags & HABEO_CORPUS) {
- struct mdt_body *body;
- const struct lu_context *ctx;
- struct mdt_object *obj;
-
- ctx = info->mti_ctxt;
- body = info->mti_body =
- lustre_swab_reqbuf(req, off, sizeof *info->mti_body,
- lustre_swab_mdt_body);
- if (body != NULL) {
- obj = mdt_object_find(ctx, info->mti_mdt, &body->fid1);
- if (!IS_ERR(obj)) {
- if ((flags & HABEO_DISCUS) == HABEO_DISCUS &&
- !lu_object_exists(ctx,
- &obj->mot_obj.mo_lu)) {
- mdt_object_put(ctx, obj);
- result = -ENOENT;
- } else
- info->mti_object = obj;
- } else
- result = PTR_ERR(obj);
- } else {
- CERROR("Can't unpack body\n");
- result = -EFAULT;
- }
- } else if (flags & HABEO_CLAVIS) {
+ LASSERT(ergo(flags & (HABEO_CORPUS | HABEO_REFERO), h->mh_fmt != NULL));
+
+ req_capsule_init(&info->mti_pill,
+ req, RCL_SERVER, info->mti_rep_buf_size);
+
+ if (h->mh_fmt != NULL) {
+ req_capsule_set(&info->mti_pill, h->mh_fmt);
+ if (flags & HABEO_REFERO)
+ result = req_capsule_pack(&info->mti_pill);
+ }
+
+ if (result == 0 && flags & HABEO_CORPUS)
+ result = mdt_habeo_corpus(info, flags);
+
+ if (result == 0 && flags & HABEO_CLAVIS) {
struct ldlm_request *dlm;
LASSERT(shift == 0);
- dlm = info->mti_dlm_req =
- lustre_swab_reqbuf(req, MDS_REQ_INTENT_LOCKREQ_OFF,
- sizeof *dlm,
- lustre_swab_ldlm_request);
+ LASSERT(h->mh_fmt != NULL);
+
+ dlm = req_capsule_client_get(&info->mti_pill, &RMF_DLM_REQ);
if (dlm != NULL) {
if (info->mti_mdt->mdt_flags & MDT_CL_COMPAT_RESNAME)
result = mdt_lock_resname_compat(info->mti_mdt,
dlm);
+ info->mti_dlm_req = dlm;
} else {
CERROR("Can't unpack dlm request\n");
result = -EFAULT;
}
}
+
if (result == 0)
/*
* Process request.
@@
-736,7
+757,7
@@
static int mdt_req_handle(struct mdt_thread_info *info,
LASSERT(current->journal_info == NULL);
- if (flags & HABEO_CLAVIS &&
+ if (
result == 0 &&
flags & HABEO_CLAVIS &&
info->mti_mdt->mdt_flags & MDT_CL_COMPAT_RESNAME) {
struct ldlm_reply *rep;
@@
-770,11
+791,8
@@
static void mdt_thread_info_init(struct mdt_thread_info *info)
int i;
info->mti_fail_id = OBD_FAIL_MDS_ALL_REPLY_NET;
- /*
- * Poison size array.
- */
for (i = 0; i < ARRAY_SIZE(info->mti_rep_buf_size); i++)
- info->mti_rep_buf_size[i] =
~
0;
+ info->mti_rep_buf_size[i] = 0;
info->mti_rep_buf_nr = i;
for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
mdt_lock_handle_init(&info->mti_lh[i]);
@@
-1051,6
+1069,77
@@
void intent_set_disposition(struct ldlm_reply *rep, int flag)
rep->lock_policy_res1 |= flag;
}
+enum mdt_it_code {
+ MDT_IT_OPEN,
+ MDT_IT_OCREAT,
+ MDT_IT_CREATE,
+ MDT_IT_GETATTR,
+ MDT_IT_READDIR,
+ MDT_IT_LOOKUP,
+ MDT_IT_UNLINK,
+ MDT_IT_TRUNC,
+ MDT_IT_GETXATTR,
+ MDT_IT_NR
+};
+
+static struct {
+ const struct req_format *it_fmt;
+ __u32 it_flags;
+ int (*it_act)(struct mdt_thread_info *,
+ struct ptlrpc_request *, int);
+} mdt_it_flavor[] = {
+ [MDT_IT_OPEN] = { NULL, 0, mdt_reint_internal },
+ [MDT_IT_OCREAT] = { NULL, 0, mdt_reint_internal },
+ [MDT_IT_CREATE] = { NULL, 0, NULL },
+ [MDT_IT_GETATTR] = { &RQF_LDLM_INTENT_GETATTR,
+ HABEO_DISCUS, mdt_getattr_name },
+ [MDT_IT_READDIR] = { NULL, 0, NULL },
+ [MDT_IT_LOOKUP] = { &RQF_LDLM_INTENT_GETATTR,
+ HABEO_DISCUS, mdt_getattr_name },
+ [MDT_IT_UNLINK] = { NULL, 0, NULL },
+ [MDT_IT_TRUNC] = { NULL, 0, NULL },
+ [MDT_IT_GETXATTR] = { NULL, 0, NULL }
+};
+
+static int mdt_intent_code(long itcode)
+{
+ int result;
+
+ switch(itcode) {
+ case IT_OPEN:
+ result = MDT_IT_OPEN;
+ break;
+ case IT_OPEN|IT_CREAT:
+ result = MDT_IT_OCREAT;
+ break;
+ case IT_CREAT:
+ result = MDT_IT_CREATE;
+ break;
+ case IT_READDIR:
+ result = MDT_IT_READDIR;
+ break;
+ case IT_GETATTR:
+ result = MDT_IT_GETATTR;
+ break;
+ case IT_LOOKUP:
+ result = MDT_IT_LOOKUP;
+ break;
+ case IT_UNLINK:
+ result = MDT_IT_UNLINK;
+ break;
+ case IT_TRUNC:
+ result = MDT_IT_TRUNC;
+ break;
+ case IT_GETXATTR:
+ result = MDT_IT_GETXATTR;
+ break;
+ default:
+ CERROR("Unknown intent opcode: %ld\n", itcode);
+ result = -EINVAL;
+ break;
+ }
+ return result;
+}
static int mdt_intent_policy(struct ldlm_namespace *ns,
struct ldlm_lock **lockp, void *req_cookie,
@@
-1064,99
+1153,53
@@
static int mdt_intent_policy(struct ldlm_namespace *ns,
int rc;
int gflags = 0;
struct mdt_thread_info *info;
- struct mdt_body *body;
+ struct req_capsule *pill;
+ int opcode;
ENTRY;
LASSERT(req != NULL);
info = lu_context_key_get(req->rq_svc_thread->t_ctx, &mdt_thread_key);
LASSERT(info != NULL);
-
+ pill = &info->mti_pill;
+ LASSERT(pill->rc_req == req);
if (req->rq_reqmsg->bufcount <= MDS_REQ_INTENT_IT_OFF) {
/* No intent was provided */
- info->mti_rep_buf_size[0] = sizeof(struct ldlm_reply);
- rc = lustre_pack_reply(req, 1, info->mti_rep_buf_size, NULL);
- LASSERT(rc == 0);
- RETURN(0);
+ LASSERT(pill->rc_fmt == &RQF_LDLM_ENQUEUE);
+ RETURN(req_capsule_pack(pill));
}
- it = lustre_swab_reqbuf(req, MDS_REQ_INTENT_IT_OFF, sizeof(*it),
-
lustre_swab_ldlm_intent
);
+ req_capsule_extend(pill, &RQF_LDLM_INTENT);
+
it = req_capsule_client_get(pill, &RMF_LDLM_INTENT
);
if (it == NULL) {
-
CERROR("Intent missing\n")
;
- RETURN(r
eq->rq_status = -EFAULT
);
+
rc = -EFAULT
;
+ RETURN(r
c
);
}
- if (it->opc == IT_GETATTR || it->opc == IT_LOOKUP) {
- body = lustre_swab_reqbuf(req, MDS_REQ_INTENT_REC_OFF,
- sizeof *body,
- lustre_swab_mdt_body);
- info->mti_body = body;
- if (body != NULL) {
- struct mdt_object *obj;
- const struct lu_context *ctx = info->mti_ctxt;
-
- obj = mdt_object_find(ctx, info->mti_mdt, &body->fid1);
- if (!IS_ERR(obj)) {
- if (!lu_object_exists(ctx,
- &obj->mot_obj.mo_lu)) {
- CERROR("Object doesn't exist\n");
- mdt_object_put(ctx, obj);
- rc = -ENOENT;
- } else {
- info->mti_object = obj;
- rc = 0;
- }
- } else
- rc = PTR_ERR(obj);
- } else {
- CERROR("Can't unpack body\n");
- rc = -EFAULT;
- }
-
- //TODO: if rc then pack reply according to it
- if (rc) {
- CERROR("Cannot prepare intent info, rc=%u\n", rc);
- RETURN(req->rq_status = rc);
- }
- }
LDLM_DEBUG(lock, "intent policy, opc: %s", ldlm_it2str(it->opc));
- info->mti_rep_buf_nr = 3;
- info->mti_rep_buf_size[0] = sizeof(*rep);
- info->mti_rep_buf_size[1] = sizeof(struct mdt_body);
- info->mti_rep_buf_size[2] = sizeof(struct lov_mds_md);/*FIXME:See mds*/
+ opcode = mdt_intent_code(it->opc);
+ if (opcode < 0)
+ RETURN(-EINVAL);
- rc = lustre_pack_reply(req, info->mti_rep_buf_nr,
- info->mti_rep_buf_size, NULL);
- if (rc) {
- RETURN(req->rq_status = rc);
+ req_capsule_extend(pill, mdt_it_flavor[opcode].it_fmt);
+
+ if (mdt_it_flavor[opcode].it_flags & HABEO_CORPUS) {
+ rc = mdt_habeo_corpus(info, mdt_it_flavor[opcode].it_flags);
+ if (rc != 0)
+ RETURN(rc);
}
- rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
- intent_set_disposition(rep, DISP_IT_EXECD);
+ rc = req_capsule_pack(pill);
+ if (rc != 0)
+ RETURN(rc);
+ rep = req_capsule_server_get(pill, &RMF_DLM_REP);
+ intent_set_disposition(rep, DISP_IT_EXECD);
/* execute policy */
- switch ((long)it->opc) {
- case IT_OPEN:
- case IT_OPEN|IT_CREAT:
- rep->lock_policy_res2 = mdt_reint_internal(info, req, offset);
- RETURN(ELDLM_LOCK_ABORTED);
- break;
- case IT_GETATTR:
- gflags = MDS_INODELOCK_UPDATE;
- case IT_LOOKUP:
- gflags |= MDS_INODELOCK_LOOKUP;
- //hmm.. something should be done with gflags..
- rep->lock_policy_res2 = mdt_getattr_name(info, req, offset + 1);
-
- break;
- default:
- CERROR("Unhandled intent till now "LPD64"\n", it->opc);
- LBUG();
- }
+ rep->lock_policy_res2 = mdt_it_flavor[opcode].it_act(info, req, offset);
RETURN(ELDLM_OK);
}
@@
-1792,11
+1835,18
@@
static struct lu_device *mdt_device_alloc(const struct lu_context *ctx,
return l;
}
+/*
+ * context key constructor/destructor
+ */
static void *mdt_thread_init(const struct lu_context *ctx)
{
struct mdt_thread_info *info;
+ /*
+ * check that no high order allocations are incurred.
+ */
+ CLASSERT(CFS_PAGE_SIZE >= sizeof *info);
OBD_ALLOC_PTR(info);
if (info != NULL)
info->mti_ctxt = ctx;
@@
-1867,48
+1917,61
@@
static void __exit mdt_mod_exit(void)
}
-#define DEF_HNDL(prefix, base, suffix, flags, opc, fn
)
\
+#define DEF_HNDL(prefix, base, suffix, flags, opc, fn
, fmt)
\
[prefix ## _ ## opc - prefix ## _ ## base] = { \
.mh_name = #opc, \
.mh_fail_id = OBD_FAIL_ ## prefix ## _ ## opc ## suffix, \
.mh_opc = prefix ## _ ## opc, \
.mh_flags = flags, \
- .mh_act = fn \
+ .mh_act = fn, \
+ .mh_fmt = fmt \
}
-#define DEF_MDT_HNDL(flags, name, fn) \
- DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn)
+#define DEF_MDT_HNDL(flags, name, fn, fmt) \
+ DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, fmt)
+/*
+ * Request with a format known in advance
+ */
+#define DEF_MDT_HNDL_F(flags, name, fn) \
+ DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, &RQF_MDS_ ## name)
+/*
+ * Request with a format we do not yet know
+ */
+#define DEF_MDT_HNDL_0(flags, name, fn) \
+ DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, NULL)
static struct mdt_handler mdt_mds_ops[] = {
-
DEF_MDT_HNDL(0, CONNECT,
mdt_connect),
-
DEF_MDT_HNDL(0, DISCONNECT,
mdt_disconnect),
-
DEF_MDT_HNDL(0, GETSTATUS,
mdt_getstatus),
-
DEF_MDT_HNDL(HABEO_DISCUS, GETATTR,
mdt_getattr),
-
DEF_MDT_HNDL(HABEO_DISCUS, GETATTR_NAME,
mdt_getattr_name),
-
DEF_MDT_HNDL(HABEO_DISCUS, SETXATTR,
mdt_setxattr),
-
DEF_MDT_HNDL(HABEO_DISCUS, GETXATTR,
mdt_getxattr),
-
DEF_MDT_HNDL(0, STATFS,
mdt_statfs),
-
DEF_MDT_HNDL(HABEO_DISCUS, READPAGE,
mdt_readpage),
-
DEF_MDT_HNDL(0, REINT,
mdt_reint),
-
DEF_MDT_HNDL(HABEO_DISCUS, CLOSE,
mdt_close),
-
DEF_MDT_HNDL(HABEO_CORPUS, DONE_WRITING,
mdt_done_writing),
-
DEF_MDT_HNDL(0, PIN,
mdt_pin),
-
DEF_MDT_HNDL(HABEO_DISCUS, SYNC,
mdt_sync),
-
DEF_MDT_HNDL(0, QUOTACHECK,
mdt_handle_quotacheck),
-
DEF_MDT_HNDL(0, QUOTACTL,
mdt_handle_quotactl)
+
DEF_MDT_HNDL_F(0, CONNECT,
mdt_connect),
+
DEF_MDT_HNDL_F(0, DISCONNECT,
mdt_disconnect),
+
DEF_MDT_HNDL_F(0 |HABEO_REFERO, GETSTATUS,
mdt_getstatus),
+
DEF_MDT_HNDL_F(HABEO_DISCUS|HABEO_REFERO, GETATTR,
mdt_getattr),
+
DEF_MDT_HNDL_F(HABEO_DISCUS|HABEO_REFERO, GETATTR_NAME,
mdt_getattr_name),
+
DEF_MDT_HNDL_0(HABEO_DISCUS, SETXATTR,
mdt_setxattr),
+
DEF_MDT_HNDL_0(HABEO_DISCUS, GETXATTR,
mdt_getxattr),
+
DEF_MDT_HNDL_F(0 |HABEO_REFERO, STATFS,
mdt_statfs),
+
DEF_MDT_HNDL_0(HABEO_DISCUS, READPAGE,
mdt_readpage),
+
DEF_MDT_HNDL_0(0, REINT,
mdt_reint),
+
DEF_MDT_HNDL_0(HABEO_DISCUS, CLOSE,
mdt_close),
+
DEF_MDT_HNDL_0(HABEO_CORPUS, DONE_WRITING,
mdt_done_writing),
+
DEF_MDT_HNDL_0(0, PIN,
mdt_pin),
+
DEF_MDT_HNDL_0(HABEO_DISCUS, SYNC,
mdt_sync),
+
DEF_MDT_HNDL_0(0, QUOTACHECK,
mdt_handle_quotacheck),
+
DEF_MDT_HNDL_0(0, QUOTACTL,
mdt_handle_quotactl)
};
static struct mdt_handler mdt_obd_ops[] = {
};
-#define DEF_DLM_HNDL(flags, name, fn) \
- DEF_HNDL(LDLM, ENQUEUE, , flags, name, fn)
+#define DEF_DLM_HNDL_0(flags, name, fn) \
+ DEF_HNDL(LDLM, ENQUEUE, , flags, name, fn, NULL)
+#define DEF_DLM_HNDL_F(flags, name, fn) \
+ DEF_HNDL(LDLM, ENQUEUE, , flags, name, fn, &RQF_LDLM_ ## name)
static struct mdt_handler mdt_dlm_ops[] = {
- DEF_DLM_HNDL(HABEO_CLAVIS, ENQUEUE, mdt_enqueue),
- DEF_DLM_HNDL(HABEO_CLAVIS, CONVERT, mdt_convert),
- DEF_DLM_HNDL(0, BL_CALLBACK, mdt_bl_callback),
- DEF_DLM_HNDL(0, CP_CALLBACK, mdt_cp_callback)
+ DEF_DLM_HNDL
_F
(HABEO_CLAVIS, ENQUEUE, mdt_enqueue),
+ DEF_DLM_HNDL
_0
(HABEO_CLAVIS, CONVERT, mdt_convert),
+ DEF_DLM_HNDL
_0
(0, BL_CALLBACK, mdt_bl_callback),
+ DEF_DLM_HNDL
_0
(0, CP_CALLBACK, mdt_cp_callback)
};
static struct mdt_handler mdt_llog_ops[] = {
diff --git
a/lustre/mdt/mdt_internal.h
b/lustre/mdt/mdt_internal.h
index
c6820cb
..
aff0ccc
100644
(file)
--- a/
lustre/mdt/mdt_internal.h
+++ b/
lustre/mdt/mdt_internal.h
@@
-137,49
+137,57
@@
struct mdt_reint_record {
* reduce stack consumption.
*/
struct mdt_thread_info {
- const struct lu_context *mti_ctxt;
- struct mdt_device *mti_mdt;
+ const struct lu_context
*mti_ctxt;
+ struct mdt_device
*mti_mdt;
/*
* number of buffers in reply message.
*/
- int mti_rep_buf_nr;
+ int
mti_rep_buf_nr;
/*
* sizes of reply buffers.
*/
- int mti_rep_buf_size[MDT_REP_BUF_NR_MAX];
+ int
mti_rep_buf_size[MDT_REP_BUF_NR_MAX];
/*
* Body for "habeo corpus" operations.
*/
-
struct mdt_body
*mti_body;
+
const struct mdt_body
*mti_body;
/*
* Lock request for "habeo clavis" operations.
*/
-
struct ldlm_request
*mti_dlm_req;
+
const struct ldlm_request
*mti_dlm_req;
/*
* Host object. This is released at the end of mdt_handler().
*/
- struct mdt_object *mti_object;
+ struct mdt_object
*mti_object;
/*
* Object attributes.
*/
- struct lu_attr mti_attr;
+ struct lu_attr
mti_attr;
/*
* reint record. Containing information for reint operations.
*/
- struct mdt_reint_record mti_rr;
+ struct mdt_reint_record
mti_rr;
/*
* Additional fail id that can be set by handler. Passed to
* target_send_reply().
*/
- int mti_fail_id;
+ int
mti_fail_id;
/*
* A couple of lock handles.
*/
- struct mdt_lock_handle mti_lh[MDT_LH_NR];
+ struct mdt_lock_handle
mti_lh[MDT_LH_NR];
/*
* for req-layout interface.
*/
- struct req_capsule mti_pill;
+ struct req_capsule mti_pill;
+ /*
+ * buffer for mdt_statfs().
+ *
+ * XXX this is probably huge overkill, because statfs is not that
+ * frequent.
+ */
+ struct kstatfs mti_sfs;
+
};
int fid_lock(struct ldlm_namespace *, const struct lu_fid *,
@@
-190,7
+198,7
@@
void fid_unlock(struct ldlm_namespace *, const struct lu_fid *,
struct lustre_handle *, ldlm_mode_t);
struct mdt_object *mdt_object_find(const struct lu_context *,
- struct mdt_device *, struct lu_fid *);
+ struct mdt_device *,
const
struct lu_fid *);
void mdt_object_put(const struct lu_context *ctxt, struct mdt_object *);
int mdt_object_lock(struct ldlm_namespace *, struct mdt_object *,
diff --git
a/lustre/mdt/mdt_reint.c
b/lustre/mdt/mdt_reint.c
index
6640657
..
1524ee3
100644
(file)
--- a/
lustre/mdt/mdt_reint.c
+++ b/
lustre/mdt/mdt_reint.c
@@
-130,7
+130,7
@@
static int mdt_md_mkobj(struct mdt_thread_info *info)
mdt_object_put(info->mti_ctxt, o);
} else
result = PTR_ERR(o);
-
+
RETURN(result);
}
@@
-157,11
+157,12
@@
static int mdt_reint_create(struct mdt_thread_info *info)
rc = mdt_md_mkdir(info);
else
rc = mdt_md_mkobj(info);
-
- /* return fid to client. mti_body should point to
+
+ /* return fid to client. mti_body should point to
* rep's body. */
- info->mti_body->fid1 = *info->mti_rr.rr_fid2;
- info->mti_body->valid |= OBD_MD_FLID;
+ /* XXX these fields are not used by the callers. */
+ /* info->mti_body->fid1 = *info->mti_rr.rr_fid2; */
+ /* info->mti_body->valid |= OBD_MD_FLID; */
break;
}
case S_IFLNK:{
@@
-226,9
+227,9
@@
static int mdt_reint_open(struct mdt_thread_info *info)
if (result && result != -ENOENT) {
GOTO(out_parent, result);
}
-
+
child = mdt_object_find(info->mti_ctxt, mdt, info->mti_rr.rr_fid2);
- if (IS_ERR(child))
+ if (IS_ERR(child))
GOTO(out_parent, PTR_ERR(child));
if (info->mti_rr.rr_flags & MDS_OPEN_CREAT) {
@@
-243,10
+244,10
@@
static int mdt_reint_open(struct mdt_thread_info *info)
result = -EEXIST;
}
}
-
+
if (result == 0)
result = mdt_md_open(info, child);
-
+
mdt_object_put(info->mti_ctxt, child);
out_parent:
diff --git
a/lustre/ptlrpc/layout.c
b/lustre/ptlrpc/layout.c
index
1a65430
..
06ee214
100644
(file)
--- a/
lustre/ptlrpc/layout.c
+++ b/
lustre/ptlrpc/layout.c
@@
-69,12
+69,54
@@
static const struct req_msg_field *mds_reint_create_client[] = {
&RMF_NAME
};
+static const struct req_msg_field *mds_connect_client[] = {
+ &RMF_TGTUUID,
+ &RMF_CLUUID,
+ &RMF_CONN,
+ &RMF_CONNECT_DATA
+};
+
+static const struct req_msg_field *mds_connect_server[] = {
+ &RMF_CONNECT_DATA
+};
+
+static const struct req_msg_field *ldlm_enqueue_client[] = {
+ &RMF_DLM_REQ
+};
+
+static const struct req_msg_field *ldlm_enqueue_server[] = {
+ &RMF_DLM_REP
+};
+
+static const struct req_msg_field *ldlm_intent_client[] = {
+ &RMF_DLM_REQ,
+ &RMF_LDLM_INTENT
+};
+
+static const struct req_msg_field *ldlm_intent_server[] = {
+ &RMF_DLM_REP,
+ &RMF_MDT_BODY,
+ &RMF_MDT_MD
+};
+
+static const struct req_msg_field *ldlm_intent_getattr_client[] = {
+ &RMF_DLM_REQ,
+ &RMF_LDLM_INTENT,
+ &RMF_MDT_BODY, /* coincides with mds_getattr_name_client[] */
+ &RMF_NAME
+};
+
static const struct req_format *req_formats[] = {
+ &RQF_MDS_CONNECT,
+ &RQF_MDS_DISCONNECT,
&RQF_MDS_GETSTATUS,
&RQF_MDS_STATFS,
&RQF_MDS_GETATTR,
&RQF_MDS_GETATTR_NAME,
- &RQF_MDS_REINT_CREATE
+ &RQF_MDS_REINT_CREATE,
+ &RQF_LDLM_ENQUEUE,
+ &RQF_LDLM_INTENT,
+ &RQF_LDLM_INTENT_GETATTR
};
struct req_msg_field {
@@
-120,6
+162,48
@@
const struct req_msg_field RMF_REC_CREATE =
sizeof(struct mdt_rec_create), lustre_swab_mdt_rec_create);
EXPORT_SYMBOL(RMF_REC_CREATE);
+const struct req_msg_field RMF_TGTUUID =
+ DEFINE_MSGF("tgtuuid", RMF_F_STRING, sizeof(struct obd_uuid) - 1, NULL);
+EXPORT_SYMBOL(RMF_TGTUUID);
+
+const struct req_msg_field RMF_CLUUID =
+ DEFINE_MSGF("cluuid", RMF_F_STRING, sizeof(struct obd_uuid) - 1, NULL);
+EXPORT_SYMBOL(RMF_CLUUID);
+
+/*
+ * connection handle received in MDS_CONNECT request.
+ *
+ * XXX no swabbing?
+ */
+const struct req_msg_field RMF_CONN =
+ DEFINE_MSGF("conn", 0, sizeof(struct lustre_handle), NULL);
+EXPORT_SYMBOL(RMF_CONN);
+
+const struct req_msg_field RMF_CONNECT_DATA =
+ DEFINE_MSGF("cdata", 0,
+ sizeof(struct obd_connect_data), lustre_swab_connect);
+EXPORT_SYMBOL(RMF_CONNECT_DATA);
+
+const struct req_msg_field RMF_DLM_REQ =
+ DEFINE_MSGF("dlm_req", 0,
+ sizeof(struct ldlm_request), lustre_swab_ldlm_request);
+EXPORT_SYMBOL(RMF_DLM_REQ);
+
+const struct req_msg_field RMF_DLM_REP =
+ DEFINE_MSGF("dlm_rep", 0,
+ sizeof(struct ldlm_reply), lustre_swab_ldlm_reply);
+EXPORT_SYMBOL(RMF_DLM_REP);
+
+const struct req_msg_field RMF_LDLM_INTENT =
+ DEFINE_MSGF("ldlm_intent", 0,
+ sizeof(struct ldlm_intent), lustre_swab_ldlm_intent);
+EXPORT_SYMBOL(RMF_LDLM_INTENT);
+
+const struct req_msg_field RMF_MDT_MD =
+ DEFINE_MSGF("mdt_md",
+ 0, sizeof(struct lov_mds_md) /* FIXME: See mds */, NULL);
+EXPORT_SYMBOL(RMF_MDT_MD);
+
/*
* Request formats.
*/
@@
-172,6
+256,30
@@
const struct req_format RQF_MDS_REINT_CREATE =
mds_reint_create_client, mdt_body_only);
EXPORT_SYMBOL(RQF_MDS_REINT_CREATE);
+const struct req_format RQF_MDS_CONNECT =
+ DEFINE_REQ_FMT0("MDS_CONNECT",
+ mds_connect_client, mds_connect_server);
+EXPORT_SYMBOL(RQF_MDS_CONNECT);
+
+const struct req_format RQF_MDS_DISCONNECT =
+ DEFINE_REQ_FMT0("MDS_DISCONNECT", empty, empty);
+EXPORT_SYMBOL(RQF_MDS_DISCONNECT);
+
+const struct req_format RQF_LDLM_ENQUEUE =
+ DEFINE_REQ_FMT0("LDLM_ENQUEUE",
+ ldlm_enqueue_client, ldlm_enqueue_server);
+EXPORT_SYMBOL(RQF_LDLM_ENQUEUE);
+
+const struct req_format RQF_LDLM_INTENT =
+ DEFINE_REQ_FMT0("LDLM_INTENT",
+ ldlm_intent_client, ldlm_intent_server);
+EXPORT_SYMBOL(RQF_LDLM_INTENT);
+
+const struct req_format RQF_LDLM_INTENT_GETATTR =
+ DEFINE_REQ_FMT0("LDLM_INTENT",
+ ldlm_intent_getattr_client, ldlm_intent_server);
+EXPORT_SYMBOL(RQF_LDLM_INTENT_GETATTR);
+
int req_layout_init(void)
{
int i;
@@
-207,13
+315,15
@@
void req_layout_fini(void)
EXPORT_SYMBOL(req_layout_fini);
void req_capsule_init(struct req_capsule *pill,
- struct ptlrpc_request *req, enum req_location location)
+ struct ptlrpc_request *req, enum req_location location,
+ int *area)
{
LASSERT(location == RCL_SERVER || location == RCL_CLIENT);
memset(pill, 0, sizeof *pill);
pill->rc_req = req;
pill->rc_loc = location;
+ pill->rc_area = area;
}
EXPORT_SYMBOL(req_capsule_init);
@@
-229,54
+339,61
@@
static int __req_format_is_sane(const struct req_format *fmt)
req_formats[fmt->rf_idx] == fmt;
}
-static void __req_capsule_init(struct req_capsule *pill,
- const struct req_format *fmt,
- enum req_location loc)
+void req_capsule_set(struct req_capsule *pill, const struct req_format *fmt)
{
- LASSERT(0 <= loc && loc < ARRAY_SIZE(pill->rc_fmt));
- LASSERT(pill->rc_fmt[loc] == NULL);
- LASSERT(ergo(loc != pill->rc_loc, pill->rc_swabbed == 0));
+ LASSERT(pill->rc_fmt == NULL);
LASSERT(__req_format_is_sane(fmt));
- pill->rc_fmt[loc] = fmt;
-}
-
-void req_capsule_server_init(struct req_capsule *pill,
- const struct req_format *fmt)
-{
- __req_capsule_init(pill, fmt, RCL_SERVER);
+ pill->rc_fmt = fmt;
}
-EXPORT_SYMBOL(req_capsule_se
rver_ini
t);
+EXPORT_SYMBOL(req_capsule_set);
-void req_capsule_client_init(struct req_capsule *pill,
- const struct req_format *fmt)
-{
- __req_capsule_init(pill, fmt, RCL_CLIENT);
-}
-EXPORT_SYMBOL(req_capsule_client_init);
-
-int req_capsule_start(struct req_capsule *pill,
- const struct req_format *fmt, int *area)
+int req_capsule_pack(struct req_capsule *pill)
{
int i;
int nr;
int result;
+ int total;
+
+ const struct req_format *fmt;
LASSERT(pill->rc_loc == RCL_SERVER);
+ fmt = pill->rc_fmt;
+ LASSERT(fmt != NULL);
nr = fmt->rf_fields[RCL_SERVER].nr;
- for (i = 0; i < nr; ++i)
- area[i] = fmt->rf_fields[RCL_SERVER].d[i]->rmf_size;
- req_capsule_server_init(pill, fmt);
- result = lustre_pack_reply(pill->rc_req, nr, area, NULL);
+ for (total = 0, i = 0; i < nr; ++i) {
+ int *size;
+
+ size = &pill->rc_area[i];
+ if (*size == 0) {
+ *size = fmt->rf_fields[RCL_SERVER].d[i]->rmf_size;
+ LASSERT(*size != 0);
+ }
+ total += *size;
+ }
+ result = lustre_pack_reply(pill->rc_req, nr, pill->rc_area, NULL);
if (result != 0) {
DEBUG_REQ(D_ERROR, pill->rc_req,
- "
Failed to pack %d fields
in format `%s': ",
- nr, fmt->rf_name);
+ "
Cannot pack %d fields (%d bytes)
in format `%s': ",
+ nr,
total,
fmt->rf_name);
}
return result;
}
-EXPORT_SYMBOL(req_capsule_start);
+EXPORT_SYMBOL(req_capsule_pack);
+
+static int __req_capsule_offset(const struct req_capsule *pill,
+ const struct req_msg_field *field,
+ enum req_location loc)
+{
+ int offset;
+
+ offset = field->rmf_offset[pill->rc_fmt->rf_idx][loc];
+ LASSERT(offset > 0);
+ offset --;
+ LASSERT(0 <= offset && offset < (sizeof(pill->rc_swabbed) << 3));
+ return offset;
+}
static void *__req_capsule_get(const struct req_capsule *pill,
const struct req_msg_field *field,
@@
-295,17
+412,11
@@
static void *__req_capsule_get(const struct req_capsule *pill,
[RCL_SERVER] = "server"
};
- LASSERT(0 <= loc && loc < ARRAY_SIZE(pill->rc_fmt));
- CLASSERT(ARRAY_SIZE(pill->rc_fmt) == ARRAY_SIZE(field->rmf_offset[0]));
-
- fmt = pill->rc_fmt[loc];
+ fmt = pill->rc_fmt;
LASSERT(fmt != NULL);
LASSERT(__req_format_is_sane(fmt));
- offset = field->rmf_offset[fmt->rf_idx][loc];
- LASSERT(offset > 0);
- -- offset;
- LASSERT(0 <= offset && offset < (sizeof(pill->rc_swabbed) << 3));
+ offset = __req_capsule_offset(pill, field, loc);
req = pill->rc_req;
msg = loc == RCL_CLIENT ? req->rq_reqmsg : req->rq_repmsg;
@@
-329,10
+440,10
@@
static void *__req_capsule_get(const struct req_capsule *pill,
return value;
}
-
const
void *req_capsule_client_get(const struct req_capsule *pill,
-
const struct req_msg_field *field)
+void *req_capsule_client_get(const struct req_capsule *pill,
+ const struct req_msg_field *field)
{
- return
(const void *)
__req_capsule_get(pill, field, RCL_CLIENT);
+ return __req_capsule_get(pill, field, RCL_CLIENT);
}
EXPORT_SYMBOL(req_capsule_client_get);
@@
-343,3
+454,47
@@
void *req_capsule_server_get(const struct req_capsule *pill,
}
EXPORT_SYMBOL(req_capsule_server_get);
+const void *req_capsule_other_get(const struct req_capsule *pill,
+ const struct req_msg_field *field)
+{
+ return __req_capsule_get(pill, field, pill->rc_loc ^ 1);
+}
+EXPORT_SYMBOL(req_capsule_other_get);
+
+void req_capsule_set_size(const struct req_capsule *pill,
+ const struct req_msg_field *field,
+ enum req_location loc, int size)
+{
+ pill->rc_area[__req_capsule_offset(pill, field, loc)] = size;
+}
+EXPORT_SYMBOL(req_capsule_set_size);
+
+void req_capsule_extend(struct req_capsule *pill, const struct req_format *fmt)
+{
+ int i;
+ int j;
+
+ const struct req_format *old;
+
+ LASSERT(pill->rc_fmt != NULL);
+ LASSERT(__req_format_is_sane(fmt));
+
+ old = pill->rc_fmt;
+ /*
+ * Sanity checking...
+ */
+ for (i = 0; i < RCL_NR; ++i) {
+ LASSERT(fmt->rf_fields[i].nr >= old->rf_fields[i].nr);
+ for (j = 0; j < old->rf_fields[i].nr - 1; ++j) {
+ LASSERT(fmt->rf_fields[i].d[j]->rmf_size ==
+ old->rf_fields[i].d[j]->rmf_size);
+ }
+ /*
+ * Last field in old format can be shorter than in new.
+ */
+ LASSERT(fmt->rf_fields[i].d[j]->rmf_size >=
+ old->rf_fields[i].d[j]->rmf_size);
+ }
+ pill->rc_fmt = fmt;
+}
+EXPORT_SYMBOL(req_capsule_extend);