Whamcloud - gitweb
Land b1_4_merge_ptlrpc onto b_release_1_4_6 (20051119_2035)
authorlsy <lsy>
Sat, 19 Nov 2005 15:32:38 +0000 (15:32 +0000)
committerlsy <lsy>
Sat, 19 Nov 2005 15:32:38 +0000 (15:32 +0000)
39 files changed:
lustre/include/linux/lustre_fsfilt.h
lustre/include/linux/lustre_idl.h
lustre/include/linux/lustre_log.h
lustre/include/linux/lustre_mds.h
lustre/include/linux/lustre_net.h
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_request.c
lustre/llite/dir.c
lustre/llite/namei.c
lustre/mdc/mdc_internal.h
lustre/mdc/mdc_lib.c
lustre/mdc/mdc_locks.c
lustre/mdc/mdc_reint.c
lustre/mdc/mdc_request.c
lustre/mds/handler.c
lustre/mds/mds_internal.h
lustre/mds/mds_open.c
lustre/mds/mds_reint.c
lustre/obdclass/llog.c
lustre/obdclass/llog_cat.c
lustre/obdclass/llog_lvfs.c
lustre/obdclass/llog_test.c
lustre/osc/osc_request.c
lustre/ost/ost_handler.c
lustre/ptlrpc/client.c
lustre/ptlrpc/import.c
lustre/ptlrpc/llog_client.c
lustre/ptlrpc/llog_net.c
lustre/ptlrpc/llog_server.c
lustre/ptlrpc/lproc_ptlrpc.c
lustre/ptlrpc/pack_generic.c
lustre/ptlrpc/pinger.c
lustre/ptlrpc/ptlrpc_module.c
lustre/ptlrpc/ptlrpcd.c
lustre/ptlrpc/recov_thread.c
lustre/ptlrpc/recover.c
lustre/quota/quota_check.c
lustre/quota/quota_context.c
lustre/quota/quota_ctl.c

index ebb798b..a5e1d4f 100644 (file)
@@ -113,6 +113,7 @@ extern void fsfilt_put_ops(struct fsfilt_operations *fs_ops);
 #define FSFILT_OP_SETATTR        8
 #define FSFILT_OP_LINK           9
 #define FSFILT_OP_CANCEL_UNLINK 10
+#define FSFILT_OP_NOOP          15
 
 #define fsfilt_check_slow(start, timeout, msg)                          \
 do {                                                                    \
index 596a682..56760fa 100644 (file)
@@ -503,6 +503,14 @@ extern void lustre_swab_ost_lvb(struct ost_lvb *);
  *   MDS REQ RECORDS
  */
 
+/* FIXME: this is different from HEAD, adjust it
+ * while merge GSS */
+#define MDS_REQ_REC_OFF                 0
+
+#define MDS_REQ_INTENT_LOCKREQ_OFF      0
+#define MDS_REQ_INTENT_IT_OFF           1
+#define MDS_REQ_INTENT_REC_OFF          2
+
 /* opcodes */
 typedef enum {
         MDS_GETATTR      = 33,
@@ -1143,6 +1151,7 @@ enum llogd_rpc_ops {
         LLOG_ORIGIN_HANDLE_CLOSE        = 505,
         LLOG_ORIGIN_CONNECT             = 506,
         LLOG_CATINFO                    = 507,  /* for lfs catinfo */
+        LLOG_ORIGIN_HANDLE_PREV_BLOCK   = 508,
 };
 
 struct llogd_body {
index d53a259..54e791f 100644 (file)
@@ -77,6 +77,8 @@ int llog_init_handle(struct llog_handle *handle, int flags,
 extern void llog_free_handle(struct llog_handle *handle);
 int llog_process(struct llog_handle *loghandle, llog_cb_t cb,
                  void *data, void *catdata);
+int llog_reverse_process(struct llog_handle *loghandle, llog_cb_t cb,
+                         void *data, void *catdata);
 extern int llog_cancel_rec(struct llog_handle *loghandle, int index);
 extern int llog_close(struct llog_handle *cathandle);
 
@@ -98,6 +100,7 @@ int llog_cat_add_rec(struct llog_handle *cathandle, struct llog_rec_hdr *rec,
 int llog_cat_cancel_records(struct llog_handle *cathandle, int count,
                             struct llog_cookie *cookies);
 int llog_cat_process(struct llog_handle *cat_llh, llog_cb_t cb, void *data);
+int llog_cat_reverse_process(struct llog_handle *cat_llh, llog_cb_t cb, void *data);
 int llog_cat_set_first_idx(struct llog_handle *cathandle, int index);
 
 /* llog_obd.c */
@@ -155,6 +158,8 @@ struct llog_operations {
         int (*lop_destroy)(struct llog_handle *handle);
         int (*lop_next_block)(struct llog_handle *h, int *curr_idx,
                               int next_idx, __u64 *offset, void *buf, int len);
+        int (*lop_prev_block)(struct llog_handle *h,
+                              int prev_idx, void *buf, int len);
         int (*lop_create)(struct llog_ctxt *ctxt, struct llog_handle **,
                           struct llog_logid *logid, char *name);
         int (*lop_close)(struct llog_handle *handle);
@@ -352,6 +357,23 @@ static inline int llog_next_block(struct llog_handle *loghandle, int *cur_idx,
         RETURN(rc);
 }
 
+static inline int llog_prev_block(struct llog_handle *loghandle,
+                                  int prev_idx, void *buf, int len)
+{
+        struct llog_operations *lop;
+        int rc;
+        ENTRY;
+
+        rc = llog_handle2ops(loghandle, &lop);
+        if (rc)
+                RETURN(rc);
+        if (lop->lop_prev_block == NULL)
+                RETURN(-EOPNOTSUPP);
+
+        rc = lop->lop_prev_block(loghandle, prev_idx, buf, len);
+        RETURN(rc);
+}
+
 static inline int llog_create(struct llog_ctxt *ctxt, struct llog_handle **res,
                               struct llog_logid *logid, char *name)
 {
index cc06517..9cdae9a 100644 (file)
@@ -189,7 +189,7 @@ int mdc_getattr(struct obd_export *exp, struct ll_fid *fid,
                 obd_valid valid, unsigned int ea_size,
                 struct ptlrpc_request **request);
 int mdc_getattr_name(struct obd_export *exp, struct ll_fid *fid,
-                     char *filename, int namelen, unsigned long valid,
+                     const char *filename, int namelen, unsigned long valid,
                      unsigned int ea_size, struct ptlrpc_request **request);
 int mdc_setattr(struct obd_export *exp, struct mdc_op_data *data,
                 struct iattr *iattr, void *ea, int ealen, void *ea2, int ea2len,
index 00347ab..bb31f68 100644 (file)
@@ -654,11 +654,10 @@ void ptlrpc_free_rq_pool(struct ptlrpc_request_pool *pool);
 void ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq);
 struct ptlrpc_request_pool *ptlrpc_init_rq_pool(int, int,
                                                 void (*populate_pool)(struct ptlrpc_request_pool *, int));
-struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, int opcode,
+struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, __u32 version, int opcode,
                                        int count, int *lengths, char **bufs);
-struct ptlrpc_request *ptlrpc_prep_req_pool(struct obd_import *imp, int opcode,
-                                            int count, int *lengths,
-                                            char **bufs,
+struct ptlrpc_request *ptlrpc_prep_req_pool(struct obd_import *imp, __u32 version, int opcode,
+                                            int count, int *lengths, char **bufs,
                                             struct ptlrpc_request_pool *pool);
 void ptlrpc_free_req(struct ptlrpc_request *request);
 void ptlrpc_req_finished(struct ptlrpc_request *request);
@@ -716,6 +715,7 @@ int ptlrpc_import_recovery_state_machine(struct obd_import *imp);
 
 /* ptlrpc/pack_generic.c */
 int lustre_msg_swabbed(struct lustre_msg *msg);
+int lustre_msg_check_version(struct lustre_msg *msg, __u32 version);
 int lustre_pack_request(struct ptlrpc_request *, int count, int *lens,
                         char **bufs);
 int lustre_pack_reply(struct ptlrpc_request *, int count, int *lens,
@@ -781,6 +781,7 @@ static inline void ptlrpc_lprocfs_unregister_obd(struct obd_device *obd) {}
 
 /* ptlrpc/llog_server.c */
 int llog_origin_handle_create(struct ptlrpc_request *req);
+int llog_origin_handle_prev_block(struct ptlrpc_request *req);
 int llog_origin_handle_next_block(struct ptlrpc_request *req);
 int llog_origin_handle_read_header(struct ptlrpc_request *req);
 int llog_origin_handle_close(struct ptlrpc_request *req);
index c00fa37..770c91b 100644 (file)
@@ -463,7 +463,8 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
                 instant_cancel = 1;
 
         req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,
-                              LDLM_BL_CALLBACK, 1, &size, NULL);
+                              LUSTRE_DLM_VERSION, LDLM_BL_CALLBACK,
+                              1, &size, NULL);
         if (req == NULL) {
                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
                 RETURN(-ENOMEM);
@@ -533,7 +534,8 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         up(&lock->l_resource->lr_lvb_sem);
 
         req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,
-                              LDLM_CP_CALLBACK, buffers, size, NULL);
+                              LUSTRE_DLM_VERSION, LDLM_CP_CALLBACK,
+                              buffers, size, NULL);
         if (req == NULL)
                 RETURN(-ENOMEM);
 
@@ -590,7 +592,8 @@ int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data)
         LASSERT(lock != NULL);
 
         req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,
-                              LDLM_GL_CALLBACK, 1, &size, NULL);
+                              LUSTRE_DLM_VERSION, LDLM_GL_CALLBACK,
+                              1, &size, NULL);
         if (req == NULL)
                 RETURN(-ENOMEM);
 
@@ -657,7 +660,8 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req,
 
         LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
 
-        dlm_req = lustre_swab_reqbuf (req, 0, sizeof (*dlm_req),
+        dlm_req = lustre_swab_reqbuf (req, MDS_REQ_INTENT_LOCKREQ_OFF,
+                                      sizeof (*dlm_req),
                                       lustre_swab_ldlm_request);
         if (dlm_req == NULL) {
                 CERROR ("Can't unpack dlm_req\n");
index 90eed82..1629314 100644 (file)
@@ -324,7 +324,7 @@ int ldlm_cli_enqueue(struct obd_export *exp,
         struct ldlm_lock *lock;
         struct ldlm_request *body;
         struct ldlm_reply *reply;
-        int rc, size[2] = {sizeof(*body), lvb_len}, req_passed_in = 1;
+        int rc, size[] = {sizeof(*body), lvb_len}, req_passed_in = 1;
         int is_replay = *flags & LDLM_FL_REPLAY;
         int cleanup_phase = 0;
         ENTRY;
@@ -365,16 +365,22 @@ int ldlm_cli_enqueue(struct obd_export *exp,
         cleanup_phase = 2;
 
         if (req == NULL) {
-                req = ptlrpc_prep_req(class_exp2cliimp(exp), LDLM_ENQUEUE, 1,
-                                      size, NULL);
+                req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
+                                      LDLM_ENQUEUE, 1, size, NULL);
                 if (req == NULL)
                         GOTO(cleanup, rc = -ENOMEM);
                 req_passed_in = 0;
-        } else if (req->rq_reqmsg->buflens[0] != sizeof(*body))
-                LBUG();
+        } else {
+                LASSERTF(req->rq_reqmsg->buflens[MDS_REQ_INTENT_LOCKREQ_OFF] ==
+                         sizeof(*body), "buflen[%d] = %d, not %d\n",
+                         MDS_REQ_INTENT_LOCKREQ_OFF,
+                         req->rq_reqmsg->buflens[MDS_REQ_INTENT_LOCKREQ_OFF],
+                         sizeof(*body));
+        }
 
         /* Dump lock data into the request buffer */
-        body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
+        body = lustre_msg_buf(req->rq_reqmsg, MDS_REQ_INTENT_LOCKREQ_OFF,
+                              sizeof(*body));
         ldlm_lock2desc(lock, &body->lock_desc);
         body->lock_flags = *flags;
 
@@ -382,11 +388,8 @@ int ldlm_cli_enqueue(struct obd_export *exp,
 
         /* Continue as normal. */
         if (!req_passed_in) {
-                int buffers = 1;
-                if (lvb_len > 0)
-                        buffers = 2;
                 size[0] = sizeof(*reply);
-                req->rq_replen = lustre_msg_size(buffers, size);
+                req->rq_replen = lustre_msg_size(1 + (lvb_len > 0), size);
         }
         lock->l_conn_export = exp;
         lock->l_export = NULL;
@@ -586,7 +589,7 @@ int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, int *flags)
         LDLM_DEBUG(lock, "client-side convert");
 
         req = ptlrpc_prep_req(class_exp2cliimp(lock->l_conn_export),
-                              LDLM_CONVERT, 1, &size, NULL);
+                              LUSTRE_DLM_VERSION, LDLM_CONVERT, 1, &size, NULL);
         if (!req)
                 GOTO(out, rc = -ENOMEM);
 
@@ -675,7 +678,8 @@ int ldlm_cli_cancel(struct lustre_handle *lockh)
                         goto local_cancel;
                 }
 
-                req = ptlrpc_prep_req(imp, LDLM_CANCEL, 1, &size, NULL);
+                req = ptlrpc_prep_req(imp, LUSTRE_DLM_VERSION, LDLM_CANCEL,
+                                      1, &size, NULL);
                 if (!req)
                         GOTO(out, rc = -ENOMEM);
                 req->rq_no_resend = 1;
@@ -1141,7 +1145,8 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
                 flags = LDLM_FL_REPLAY;
 
         size[0] = sizeof(*body);
-        req = ptlrpc_prep_req(imp, LDLM_ENQUEUE, 1, size, NULL);
+        req = ptlrpc_prep_req(imp, LUSTRE_DLM_VERSION, LDLM_ENQUEUE,
+                              1, size, NULL);
         if (!req)
                 RETURN(-ENOMEM);
 
index 48838dd..6af30ec 100644 (file)
@@ -643,7 +643,8 @@ static int ll_dir_ioctl(struct inode *inode, struct file *file,
                         bufs[1] = NULL;
                 }
                 size = data->ioc_plen1;
-                req = ptlrpc_prep_req(sbi2mdc(sbi)->cl_import, LLOG_CATINFO,
+                req = ptlrpc_prep_req(sbi2mdc(sbi)->cl_import,
+                                      LUSTRE_LOG_VERSION, LLOG_CATINFO,
                                       2, lens, bufs);
                 if (!req)
                         GOTO(out_catinfo, rc = -ENOMEM);
index 9b440dc..aaac23a 100644 (file)
@@ -514,7 +514,7 @@ static int ll_create_it(struct inode *dir, struct dentry *dentry, int mode,
         if (rc)
                 RETURN(rc);
 
-        mdc_store_inode_generation(request, 2, 1);
+        mdc_store_inode_generation(request, MDS_REQ_INTENT_REC_OFF, 1);
         inode = ll_create_node(dir, dentry->d_name.name, dentry->d_name.len,
                                NULL, 0, mode, 0, it);
         if (IS_ERR(inode)) {
index 0d05129..c15566b 100644 (file)
@@ -1,11 +1,12 @@
 #include <linux/lustre_mds.h>
-void mdc_pack_req_body(struct ptlrpc_request *);
+void mdc_pack_req_body(struct ptlrpc_request *req, int offset,
+                       __u64 valid, struct ll_fid *fid, int ea_size);
 void mdc_pack_rep_body(struct ptlrpc_request *);
-void mdc_readdir_pack(struct ptlrpc_request *req, __u64 offset, __u32 size,
-                      struct ll_fid *mdc_fid);
+void mdc_readdir_pack(struct ptlrpc_request *req, int pos, __u64 offset,
+                     __u32 size, struct ll_fid *mdc_fid);
 void mdc_getattr_pack(struct ptlrpc_request *req, int valid, int offset,
                       int flags, struct mdc_op_data *data);
-void mdc_setattr_pack(struct ptlrpc_request *req,
+void mdc_setattr_pack(struct ptlrpc_request *req, int offset,
                       struct mdc_op_data *data,
                       struct iattr *iattr, void *ea, int ealen,
                      void *ea2, int ea2len);
index b17b798..928d48c 100644 (file)
 #endif
 #endif
 
-void mdc_readdir_pack(struct ptlrpc_request *req, __u64 offset, __u32 size,
-                      struct ll_fid *mdc_fid)
+void mdc_readdir_pack(struct ptlrpc_request *req, int pos, __u64 offset,
+                      __u32 size, struct ll_fid *mdc_fid)
 {
         struct mds_body *b;
 
-        b = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*b));
+        b = lustre_msg_buf(req->rq_reqmsg, pos, sizeof (*b));
         b->fsuid = current->fsuid;
         b->fsgid = current->fsgid;
         b->capability = current->cap_effective;
@@ -63,9 +63,15 @@ static void mdc_pack_body(struct mds_body *b)
         b->capability = current->cap_effective;
 }
 
-void mdc_pack_req_body(struct ptlrpc_request *req)
+void mdc_pack_req_body(struct ptlrpc_request *req, int offset,
+                       __u64 valid, struct ll_fid *fid, int ea_size)
 {
-        struct mds_body *b = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*b));
+        struct mds_body *b = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*b));
+
+        if (fid)
+                b->fid1 = *fid;
+        b->valid = valid;
+        b->eadatasize = ea_size;
         mdc_pack_body(b);
 }
 
@@ -150,11 +156,11 @@ void mdc_open_pack(struct ptlrpc_request *req, int offset,
         }
 }
 
-void mdc_setattr_pack(struct ptlrpc_request *req, struct mdc_op_data *data,
-                      struct iattr *iattr, void *ea, int ealen,
-                      void *ea2, int ea2len)
+void mdc_setattr_pack(struct ptlrpc_request *req, int offset,
+                      struct mdc_op_data *data, struct iattr *iattr,
+                      void *ea, int ealen, void *ea2, int ea2len)
 {
-        struct mds_rec_setattr *rec = lustre_msg_buf(req->rq_reqmsg, 0,
+        struct mds_rec_setattr *rec = lustre_msg_buf(req->rq_reqmsg, offset,
                                                      sizeof (*rec));
         rec->sa_opcode = REINT_SETATTR;
         rec->sa_fsuid = current->fsuid;
@@ -182,12 +188,12 @@ void mdc_setattr_pack(struct ptlrpc_request *req, struct mdc_op_data *data,
         if (ealen == 0)
                 return;
 
-        memcpy(lustre_msg_buf(req->rq_reqmsg, 1, ealen), ea, ealen);
+        memcpy(lustre_msg_buf(req->rq_reqmsg, offset + 1, ealen), ea, ealen);
 
         if (ea2len == 0)
                 return;
 
-        memcpy(lustre_msg_buf(req->rq_reqmsg, 2, ea2len), ea2, ea2len);
+        memcpy(lustre_msg_buf(req->rq_reqmsg, offset + 2, ea2len), ea2, ea2len);
 }
 
 void mdc_unlink_pack(struct ptlrpc_request *req, int offset,
@@ -265,7 +271,7 @@ void mdc_rename_pack(struct ptlrpc_request *req, int offset,
         }
 }
 
-void mdc_getattr_pack(struct ptlrpc_request *req, int valid, int offset,
+void mdc_getattr_pack(struct ptlrpc_request *req, int offset, int valid,
                       int flags, struct mdc_op_data *data)
 {
         struct mds_body *b;
index 08f0fd9..bf6684a 100644 (file)
@@ -242,16 +242,18 @@ int mdc_enqueue(struct obd_export *exp,
         struct ldlm_res_id res_id =
                 { .name = {data->fid1.id, data->fid1.generation} };
         ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
-        int size[5] = {sizeof(struct ldlm_request), sizeof(struct ldlm_intent)};
-        int rc, flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
-        int repbufcnt = 3, repsize[4] = {sizeof(struct ldlm_reply),
-                                         sizeof(struct mds_body),
-                                         obddev->u.cli.cl_max_mds_easize};
-        struct ldlm_reply *dlm_rep;
-        struct ldlm_intent *lit;
         struct ldlm_request *lockreq;
+        struct ldlm_intent *lit;
+        int size[5] = {[MDS_REQ_INTENT_LOCKREQ_OFF] = sizeof(*lockreq),
+                       [MDS_REQ_INTENT_IT_OFF] = sizeof(*lit) };
+        struct ldlm_reply *dlm_rep;
+        int repsize[4] = {sizeof(*dlm_rep),
+                          sizeof(struct mds_body),
+                          obddev->u.cli.cl_max_mds_easize};
         void *eadata;
         unsigned long irqflags;
+        int repbufcnt = 3, req_buffers = 2;
+        int rc, flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
         ENTRY;
 
 //        LDLM_DEBUG_NOLOCK("mdsintent=%s,name=%s,dir=%lu",
@@ -260,21 +262,23 @@ int mdc_enqueue(struct obd_export *exp,
         if (it->it_op & IT_OPEN) {
                 it->it_create_mode |= S_IFREG;
 
-                size[2] = sizeof(struct mds_rec_create);
-                size[3] = data->namelen + 1;
+                size[req_buffers++] = sizeof(struct mds_rec_create);
+                size[req_buffers++] = data->namelen + 1;
                 /* As an optimization, we allocate an RPC request buffer for
                  * at least a default-sized LOV EA even if we aren't sending
                  * one.  We grow the whole request to the next power-of-two
                  * size since we get that much from a slab allocation anyways.
                  * This avoids an allocation below in the common case where
                  * we need to save a default-sized LOV EA for open replay. */
-                size[4] = max(lmmsize, obddev->u.cli.cl_default_mds_easize);
-                rc = lustre_msg_size(5, size);
+                size[req_buffers++] = max(lmmsize,
+                                          obddev->u.cli.cl_default_mds_easize);
+                rc = lustre_msg_size(req_buffers, size);
                 if (rc & (rc - 1))
-                        size[4] = min(size[4] + round_up(rc) - rc,
-                                      obddev->u.cli.cl_max_mds_easize);
-                req = ptlrpc_prep_req(class_exp2cliimp(exp), LDLM_ENQUEUE,
-                                      5, size, NULL);
+                        size[req_buffers - 1] = min(size[req_buffers - 1] +
+                                                    round_up(rc) - rc,
+                                        obddev->u.cli.cl_max_mds_easize);
+                req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
+                                      LDLM_ENQUEUE, req_buffers, size, NULL);
                 if (!req)
                         RETURN(-ENOMEM);
 
@@ -283,60 +287,66 @@ int mdc_enqueue(struct obd_export *exp,
                 spin_unlock_irqrestore (&req->rq_lock, irqflags);
 
                 /* pack the intent */
-                lit = lustre_msg_buf(req->rq_reqmsg, 1, sizeof (*lit));
+                lit = lustre_msg_buf(req->rq_reqmsg, MDS_REQ_INTENT_IT_OFF,
+                                     sizeof (*lit));
                 lit->opc = (__u64)it->it_op;
 
                 /* pack the intended request */
-                mdc_open_pack(req, 2, data, it->it_create_mode, 0,
+                mdc_open_pack(req, MDS_REQ_INTENT_REC_OFF, data,
+                              it->it_create_mode, 0,
                               it->it_flags, lmm, lmmsize);
 
                 repsize[repbufcnt++] = LUSTRE_POSIX_ACL_MAX_SIZE;
         } else if (it->it_op & IT_UNLINK) {
-                size[2] = sizeof(struct mds_rec_unlink);
-                size[3] = data->namelen + 1;
+                size[req_buffers++] = sizeof(struct mds_rec_unlink);
+                size[req_buffers++] = data->namelen + 1;
                 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
-                req = ptlrpc_prep_req(class_exp2cliimp(exp), LDLM_ENQUEUE, 4,
-                                      size, NULL);
+                req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
+                                      LDLM_ENQUEUE, req_buffers, size, NULL);
                 if (!req)
                         RETURN(-ENOMEM);
 
                 /* pack the intent */
-                lit = lustre_msg_buf(req->rq_reqmsg, 1, sizeof (*lit));
+                lit = lustre_msg_buf(req->rq_reqmsg, MDS_REQ_INTENT_IT_OFF,
+                                     sizeof (*lit));
                 lit->opc = (__u64)it->it_op;
 
                 /* pack the intended request */
-                mdc_unlink_pack(req, 2, data);
-
+                mdc_unlink_pack(req, MDS_REQ_INTENT_REC_OFF, data);
+                /* get ready for the reply */
                 repsize[repbufcnt++] = obddev->u.cli.cl_max_mds_cookiesize;
         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
                 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
                                   OBD_MD_FLACL;
-                size[2] = sizeof(struct mds_body);
-                size[3] = data->namelen + 1;
+                size[req_buffers++] = sizeof(struct mds_body);
+                size[req_buffers++] = data->namelen + 1;
 
                 if (it->it_op & IT_GETATTR)
                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
 
-                req = ptlrpc_prep_req(class_exp2cliimp(exp), LDLM_ENQUEUE, 4,
-                                      size, NULL);
+                req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
+                                      LDLM_ENQUEUE, req_buffers, size, NULL);
                 if (!req)
                         RETURN(-ENOMEM);
 
                 /* pack the intent */
-                lit = lustre_msg_buf(req->rq_reqmsg, 1, sizeof (*lit));
+                lit = lustre_msg_buf(req->rq_reqmsg, MDS_REQ_INTENT_IT_OFF,
+                                     sizeof (*lit));
                 lit->opc = (__u64)it->it_op;
 
                 /* pack the intended request */
-                mdc_getattr_pack(req, valid, 2, it->it_flags, data);
-
+                mdc_getattr_pack(req, MDS_REQ_INTENT_REC_OFF, valid,
+                                 it->it_flags, data);
+                /* get ready for the reply */
                 repsize[repbufcnt++] = LUSTRE_POSIX_ACL_MAX_SIZE;
         } else if (it->it_op == IT_READDIR) {
                 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
-                req = ptlrpc_prep_req(class_exp2cliimp(exp), LDLM_ENQUEUE, 1,
-                                      size, NULL);
+                req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
+                                      LDLM_ENQUEUE, 1, size, NULL);
                 if (!req)
                         RETURN(-ENOMEM);
 
+                /* get ready for the reply */
                 repbufcnt = 1;
         } else {
                 LBUG();
@@ -348,7 +358,7 @@ int mdc_enqueue(struct obd_export *exp,
 
         mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
         rc = ldlm_cli_enqueue(exp, req, obddev->obd_namespace, res_id,
-                              lock_type,&policy,lock_mode, &flags,cb_blocking,
+                              lock_type, &policy,lock_mode, &flags, cb_blocking,
                               cb_completion, NULL, cb_data, NULL, 0, NULL,
                               lockh);
         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
@@ -356,7 +366,9 @@ int mdc_enqueue(struct obd_export *exp,
         /* Similarly, if we're going to replay this request, we don't want to
          * actually get a lock, just perform the intent. */
         if (req->rq_transno || req->rq_replay) {
-                lockreq = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*lockreq));
+                lockreq = lustre_msg_buf(req->rq_reqmsg,
+                                         MDS_REQ_INTENT_LOCKREQ_OFF,
+                                         sizeof(*lockreq));
                 lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
         }
 
@@ -429,11 +441,13 @@ int mdc_enqueue(struct obd_export *exp,
                          * large enough request buffer above we need to
                          * reallocate it here to hold the actual LOV EA. */
                         if (it->it_op & IT_OPEN) {
-                                if (req->rq_reqmsg->buflens[4] <
+                                int pos = MDS_REQ_INTENT_REC_OFF + 2;
+
+                                if (req->rq_reqmsg->buflens[pos] <
                                     body->eadatasize)
                                         mdc_realloc_openmsg(req, body, size);
 
-                                lmm = lustre_msg_buf(req->rq_reqmsg, 4,
+                                lmm = lustre_msg_buf(req->rq_reqmsg, pos,
                                                      body->eadatasize);
                                 if (lmm)
                                         memcpy(lmm, eadata, body->eadatasize);
index f282a69..c586b64 100644 (file)
@@ -74,19 +74,20 @@ int mdc_setattr(struct obd_export *exp, struct mdc_op_data *data,
         struct mds_rec_setattr *rec;
         struct mdc_rpc_lock *rpc_lock;
         struct obd_device *obd = exp->exp_obd;
-        int rc, bufcount = 1, size[3] = {sizeof(*rec), ealen, ea2len};
+        int size[] = { sizeof(*rec), ealen, ea2len};
+        int rc, bufcount = 1;
         ENTRY;
 
         LASSERT(iattr != NULL);
 
         if (ealen > 0) {
-                bufcount = 2;
+                bufcount++;
                 if (ea2len > 0)
-                        bufcount = 3;
+                        bufcount++;
         }
 
-        req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_REINT, bufcount,
-                              size, NULL);
+        req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
+                              MDS_REINT, bufcount, size, NULL);
         if (req == NULL)
                 RETURN(-ENOMEM);
 
@@ -100,7 +101,7 @@ int mdc_setattr(struct obd_export *exp, struct mdc_op_data *data,
         if (iattr->ia_valid & (ATTR_MTIME | ATTR_CTIME))
                 CDEBUG(D_INODE, "setting mtime %lu, ctime %lu\n",
                        LTIME_S(iattr->ia_mtime), LTIME_S(iattr->ia_ctime));
-        mdc_setattr_pack(req, data, iattr, ea, ealen, ea2, ea2len);
+        mdc_setattr_pack(req, MDS_REQ_REC_OFF, data, iattr, ea, ealen, ea2, ea2len);
 
         size[0] = sizeof(struct mds_body);
         req->rq_replen = lustre_msg_size(1, size);
@@ -119,8 +120,8 @@ int mdc_create(struct obd_export *exp, struct mdc_op_data *op_data,
 {
         struct obd_device *obd = exp->exp_obd;
         struct ptlrpc_request *req;
-        int rc, size[3] = {sizeof(struct mds_rec_create), op_data->namelen + 1};
-        int level, bufcount = 2;
+        int size[] = { sizeof(struct mds_rec_create), op_data->namelen + 1, 0};
+        int rc, level, bufcount = 2;
         ENTRY;
 
         if (data && datalen) {
@@ -128,14 +129,14 @@ int mdc_create(struct obd_export *exp, struct mdc_op_data *op_data,
                 bufcount++;
         }
 
-        req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_REINT, bufcount,
-                              size, NULL);
+        req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
+                              MDS_REINT, bufcount, size, NULL);
         if (req == NULL)
                 RETURN(-ENOMEM);
 
         /* mdc_create_pack fills msg->bufs[1] with name
          * and msg->bufs[2] with tgt, for symlinks or lov MD data */
-        mdc_create_pack(req, 0, op_data, data, datalen, mode,
+        mdc_create_pack(req, MDS_REQ_REC_OFF, op_data, data, datalen, mode,
                         uid, gid, cap_effective, rdev);
 
         size[0] = sizeof(struct mds_body);
@@ -162,12 +163,12 @@ int mdc_unlink(struct obd_export *exp, struct mdc_op_data *data,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct ptlrpc_request *req = *request;
-        int rc, size[2] = {sizeof(struct mds_rec_unlink), data->namelen + 1};
+        int rc, size[] = { sizeof(struct mds_rec_unlink), data->namelen + 1};
         ENTRY;
 
         LASSERT(req == NULL);
-        req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_REINT, 2, size,
-                              NULL);
+        req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
+                              MDS_REINT, 2, size, NULL);
         if (req == NULL)
                 RETURN(-ENOMEM);
         *request = req;
@@ -177,7 +178,7 @@ int mdc_unlink(struct obd_export *exp, struct mdc_op_data *data,
         size[2] = obd->u.cli.cl_max_mds_cookiesize;
         req->rq_replen = lustre_msg_size(3, size);
 
-        mdc_unlink_pack(req, 0, data);
+        mdc_unlink_pack(req, MDS_REQ_REC_OFF, data);
 
         rc = mdc_reint(req, obd->u.cli.cl_rpc_lock, LUSTRE_IMP_FULL);
         if (rc == -ERESTARTSYS)
@@ -190,15 +191,15 @@ int mdc_link(struct obd_export *exp, struct mdc_op_data *data,
 {
         struct obd_device *obd = exp->exp_obd;
         struct ptlrpc_request *req;
-        int rc, size[2] = {sizeof(struct mds_rec_link), data->namelen + 1};
+        int rc, size[] = { sizeof(struct mds_rec_link), data->namelen + 1};
         ENTRY;
 
-        req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_REINT, 2, size,
-                              NULL);
+        req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
+                              MDS_REINT, 2, size, NULL);
         if (req == NULL)
                 RETURN(-ENOMEM);
 
-        mdc_link_pack(req, 0, data);
+        mdc_link_pack(req, MDS_REQ_REC_OFF, data);
 
         size[0] = sizeof(struct mds_body);
         req->rq_replen = lustre_msg_size(1, size);
@@ -217,16 +218,15 @@ int mdc_rename(struct obd_export *exp, struct mdc_op_data *data,
 {
         struct obd_device *obd = exp->exp_obd;
         struct ptlrpc_request *req;
-        int rc, size[3] = {sizeof(struct mds_rec_rename), oldlen + 1,
-                           newlen + 1};
+        int rc, size[] = { sizeof(struct mds_rec_rename), oldlen +1, newlen +1};
         ENTRY;
 
-        req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_REINT, 3, size,
-                              NULL);
+        req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
+                              MDS_REINT, 3, size, NULL);
         if (req == NULL)
                 RETURN(-ENOMEM);
 
-        mdc_rename_pack(req, 0, data, old, oldlen, new, newlen);
+        mdc_rename_pack(req, MDS_REQ_REC_OFF, data, old, oldlen, new, newlen);
 
         size[0] = sizeof(struct mds_body);
         size[1] = obd->u.cli.cl_max_mds_easize;
index 0fbdbc9..6ef0685 100644 (file)
@@ -53,25 +53,26 @@ static int send_getstatus(struct obd_import *imp, struct ll_fid *rootfid,
                           int level, int msg_flags)
 {
         struct ptlrpc_request *req;
-        struct mds_body *body;
-        int rc, size = sizeof(*body);
+        int rc, size[] = { [MDS_REQ_REC_OFF] = sizeof(struct mds_body) };
         ENTRY;
 
-        req = ptlrpc_prep_req(imp, MDS_GETSTATUS, 1, &size, NULL);
+        req = ptlrpc_prep_req(imp, LUSTRE_MDS_VERSION, MDS_GETSTATUS,
+                              1, size, NULL);
         if (!req)
                 GOTO(out, rc = -ENOMEM);
 
-        body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
         req->rq_send_state = level;
-        req->rq_replen = lustre_msg_size(1, &size);
+        req->rq_replen = lustre_msg_size(1, size);
 
-        mdc_pack_req_body(req);
+        mdc_pack_req_body(req, MDS_REQ_REC_OFF, 0, NULL, 0);
         req->rq_reqmsg->flags |= msg_flags;
         rc = ptlrpc_queue_wait(req);
 
         if (!rc) {
-                body = lustre_swab_repbuf (req, 0, sizeof (*body),
-                                           lustre_swab_mds_body);
+                struct mds_body *body;
+
+                body = lustre_swab_repbuf(req, 0, sizeof(*body),
+                                          lustre_swab_mds_body);
                 if (body == NULL) {
                         CERROR ("Can't extract mds_body\n");
                         GOTO (out, rc = -EPROTO);
@@ -164,16 +165,12 @@ int mdc_getattr(struct obd_export *exp, struct ll_fid *fid,
         /* XXX do we need to make another request here?  We just did a getattr
          *     to do the lookup in the first place.
          */
-        req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_GETATTR, 1, &size,
-                              NULL);
+        req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
+                              MDS_GETATTR, 1, &size, NULL);
         if (!req)
                 GOTO(out, rc = -ENOMEM);
 
-        body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
-        memcpy(&body->fid1, fid, sizeof(*fid));
-        body->valid = valid;
-        body->eadatasize = ea_size;
-        mdc_pack_req_body(req);
+        mdc_pack_req_body(req, MDS_REQ_REC_OFF, valid, fid, ea_size);
 
         /* currently only root inode will call us with FLACL */
         if (valid & OBD_MD_FLACL)
@@ -190,29 +187,24 @@ int mdc_getattr(struct obd_export *exp, struct ll_fid *fid,
 }
 
 int mdc_getattr_name(struct obd_export *exp, struct ll_fid *fid,
-                     char *filename, int namelen, unsigned long valid,
-                     unsigned int ea_size, struct ptlrpc_request **request)
+                     const char *filename, int namelen, unsigned long valid,
+                     unsigned int ea_len, struct ptlrpc_request **request)
 {
         struct ptlrpc_request *req;
-        struct mds_body *body;
-        int rc, size[2] = {sizeof(*body), namelen};
+        int rc, size[] = { sizeof(struct mds_body), namelen };
         ENTRY;
 
-        req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_GETATTR_NAME, 2,
-                              size, NULL);
+        req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
+                              MDS_GETATTR_NAME, 2, size, NULL);
         if (!req)
                 GOTO(out, rc = -ENOMEM);
 
-        body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
-        memcpy(&body->fid1, fid, sizeof(*fid));
-        body->valid = valid;
-        body->eadatasize = ea_size;
-        mdc_pack_req_body(req);
-
+        mdc_pack_req_body(req, MDS_REQ_REC_OFF, valid, fid, ea_len);
         LASSERT (strnlen (filename, namelen) == namelen - 1);
         memcpy(lustre_msg_buf(req->rq_reqmsg, 1, namelen), filename, namelen);
 
-        rc = mdc_getattr_common(exp, ea_size, 0, req);
+        rc = mdc_getattr_common(exp, ea_len, 0, req);
         if (rc != 0) {
                 ptlrpc_req_finished (req);
                 req = NULL;
@@ -244,18 +236,15 @@ int mdc_xattr_common(struct obd_export *exp, struct ll_fid *fid,
                 size[bufcnt++] = input_size;
         }
 
-        req = ptlrpc_prep_req(class_exp2cliimp(exp), opcode,
+        req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION, opcode,
                               bufcnt, size, NULL);
         if (!req)
                 GOTO(out, rc = -ENOMEM);
 
         /* request data */
-        body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
-        memcpy(&body->fid1, fid, sizeof(*fid));
-        body->valid = valid;
-        body->eadatasize = output_size;
+        body = lustre_msg_buf(req->rq_reqmsg, MDS_REQ_REC_OFF, sizeof(*body));
+        mdc_pack_req_body(req, MDS_REQ_REC_OFF, valid, fid, output_size);
         body->flags = flags;
-        mdc_pack_req_body(req);
 
         if (xattr_name) {
                 tmp = lustre_msg_buf(req->rq_reqmsg, 1, xattr_namelen);
@@ -330,10 +319,10 @@ int mdc_getxattr(struct obd_export *exp, struct ll_fid *fid,
 void mdc_store_inode_generation(struct ptlrpc_request *req, int reqoff,
                                 int repoff)
 {
-        struct mds_rec_create *rec =
-                lustre_msg_buf(req->rq_reqmsg, reqoff, sizeof(*rec));
-        struct mds_body *body =
-                lustre_msg_buf(req->rq_repmsg, repoff, sizeof(*body));
+        struct mds_rec_create *rec = lustre_msg_buf(req->rq_reqmsg, reqoff,
+                                                    sizeof(*rec));
+        struct mds_body *body = lustre_msg_buf(req->rq_repmsg, repoff,
+                                               sizeof(*body));
 
         LASSERT (rec != NULL);
         LASSERT (body != NULL);
@@ -508,7 +497,8 @@ static void mdc_replay_open(struct ptlrpc_request *req)
         if (close_req != NULL) {
                 struct mds_body *close_body;
                 LASSERT(close_req->rq_reqmsg->opc == MDS_CLOSE);
-                close_body = lustre_msg_buf(close_req->rq_reqmsg, 0,
+                close_body = lustre_msg_buf(close_req->rq_reqmsg,
+                                            MDS_REQ_REC_OFF,
                                             sizeof(*close_body));
                 if (och != NULL)
                         LASSERT(!memcmp(&old, &close_body->handle, sizeof old));
@@ -524,15 +514,16 @@ void mdc_set_open_replay_data(struct obd_client_handle *och,
                               struct ptlrpc_request *open_req)
 {
         struct mdc_open_data *mod;
-        struct mds_rec_create *rec =
-                lustre_msg_buf(open_req->rq_reqmsg, 2, sizeof(*rec));
-        struct mds_body *body =
-                lustre_msg_buf(open_req->rq_repmsg, 1, sizeof(*body));
+        struct mds_rec_create *rec = lustre_msg_buf(open_req->rq_reqmsg,
+                                                    MDS_REQ_INTENT_REC_OFF,
+                                                    sizeof(*rec));
+        struct mds_body *body = lustre_msg_buf(open_req->rq_repmsg, 1,
+                                               sizeof(*body));
 
-        LASSERT(rec != NULL);
-        /* outgoing messages always in my byte order */
         LASSERT(body != NULL);
         /* incoming message in my byte order (it's been swabbed) */
+        LASSERT(rec != NULL);
+        /* outgoing messages always in my byte order */
         LASSERT_REPSWABBED(open_req, 1);
 
         OBD_ALLOC(mod, sizeof(*mod));
@@ -643,8 +634,8 @@ int mdc_close(struct obd_export *exp, struct obdo *oa,
               struct obd_client_handle *och, struct ptlrpc_request **request)
 {
         struct obd_device *obd = class_exp2obd(exp);
-        int reqsize = sizeof(struct mds_body);
-        int rc, repsize[3] = {sizeof(struct mds_body),
+        int size[] = { sizeof(struct mds_body) };
+        int rc, repsize[] = { sizeof(struct mds_body),
                               obd->u.cli.cl_max_mds_easize,
                               obd->u.cli.cl_max_mds_cookiesize};
         struct ptlrpc_request *req;
@@ -652,8 +643,8 @@ int mdc_close(struct obd_export *exp, struct obdo *oa,
         struct l_wait_info lwi;
         ENTRY;
 
-        req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_CLOSE, 1, &reqsize,
-                              NULL);
+        req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
+                              MDS_CLOSE, 1, size, NULL);
         if (req == NULL)
                 GOTO(out, rc = -ENOMEM);
 
@@ -676,7 +667,7 @@ int mdc_close(struct obd_export *exp, struct obdo *oa,
                 CDEBUG(D_HA, "couldn't find open req; expecting close error\n");
         }
 
-        mdc_close_pack(req, 0, oa, oa->o_valid, och);
+        mdc_close_pack(req, MDS_REQ_REC_OFF, oa, oa->o_valid, och);
 
         req->rq_replen = lustre_msg_size(3, repsize);
         req->rq_commit_cb = mdc_commit_close;
@@ -731,15 +722,15 @@ int mdc_done_writing(struct obd_export *exp, struct obdo *obdo)
 {
         struct ptlrpc_request *req;
         struct mds_body *body;
-        int rc, size = sizeof(*body);
+        int rc, size[] = { [MDS_REQ_REC_OFF] = sizeof(*body) };
         ENTRY;
 
-        req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_DONE_WRITING, 1,
-                              &size, NULL);
+        req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
+                              MDS_DONE_WRITING, 1, size, NULL);
         if (req == NULL)
                 RETURN(-ENOMEM);
 
-        body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*body));
+        body = lustre_msg_buf(req->rq_reqmsg, MDS_REQ_REC_OFF, sizeof(*body));
         mdc_pack_fid(&body->fid1, obdo->o_id, 0, obdo->o_mode);
         body->size = obdo->o_size;
         body->blocks = obdo->o_blocks;
@@ -747,28 +738,30 @@ int mdc_done_writing(struct obd_export *exp, struct obdo *obdo)
         body->valid = obdo->o_valid;
 //        memcpy(&body->handle, &och->och_fh, sizeof(body->handle));
 
-        req->rq_replen = lustre_msg_size(1, &size);
+        req->rq_replen = lustre_msg_size(1, size);
 
         rc = ptlrpc_queue_wait(req);
         ptlrpc_req_finished(req);
         RETURN(rc);
 }
 
-int mdc_readpage(struct obd_export *exp, struct ll_fid *mdc_fid, __u64 offset,
+int mdc_readpage(struct obd_export *exp, struct ll_fid *fid, __u64 offset,
                  struct page *page, struct ptlrpc_request **request)
 {
         struct obd_import *imp = class_exp2cliimp(exp);
         struct ptlrpc_request *req = NULL;
         struct ptlrpc_bulk_desc *desc = NULL;
         struct mds_body *body;
-        int rc, size = sizeof(*body);
+        int rc, size[] = { sizeof(*body) };
         ENTRY;
 
-        CDEBUG(D_INODE, "inode: %ld\n", (long)mdc_fid->id);
+        CDEBUG(D_INODE, "inode: "LPU64"\n", fid->id);
 
-        req = ptlrpc_prep_req(imp, MDS_READPAGE, 1, &size, NULL);
+        req = ptlrpc_prep_req(imp, LUSTRE_MDS_VERSION, MDS_READPAGE,
+                              1, size, NULL);
         if (req == NULL)
                 GOTO(out, rc = -ENOMEM);
+
         /* XXX FIXME bug 249 */
         req->rq_request_portal = MDS_READPAGE_PORTAL;
 
@@ -779,9 +772,9 @@ int mdc_readpage(struct obd_export *exp, struct ll_fid *mdc_fid, __u64 offset,
 
         ptlrpc_prep_bulk_page(desc, page, 0, PAGE_CACHE_SIZE);
 
-        mdc_readdir_pack(req, offset, PAGE_CACHE_SIZE, mdc_fid);
+        mdc_readdir_pack(req, MDS_REQ_REC_OFF, offset, PAGE_CACHE_SIZE, fid);
 
-        req->rq_replen = lustre_msg_size(1, &size);
+        req->rq_replen = lustre_msg_size(1, size);
         rc = ptlrpc_queue_wait(req);
 
         if (rc == 0) {
@@ -899,7 +892,8 @@ int mdc_set_info(struct obd_export *exp, obd_count keylen,
                                 ~OBD_CONNECT_RDONLY;
                 }
 
-                req = ptlrpc_prep_req(imp, MDS_SET_INFO, 2, size, bufs);
+                req = ptlrpc_prep_req(imp, LUSTRE_MDS_VERSION,
+                                      MDS_SET_INFO, 2, size, bufs);
                 if (req == NULL)
                         RETURN(-ENOMEM);
 
@@ -926,7 +920,8 @@ static int mdc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
          * during mount that would help a bit).  Having relative timestamps
          * is not so great if request processing is slow, while absolute
          * timestamps are not ideal because they need time synchronization. */
-        req = ptlrpc_prep_req(obd->u.cli.cl_import, MDS_STATFS, 0, NULL, NULL);
+        req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_MDS_VERSION,
+                              MDS_STATFS, 0, NULL, NULL);
         if (!req)
                 RETURN(-ENOMEM);
 
@@ -958,18 +953,19 @@ static int mdc_pin(struct obd_export *exp, obd_id ino, __u32 gen, int type,
 {
         struct ptlrpc_request *req;
         struct mds_body *body;
-        int rc, size = sizeof(*body);
+        int rc, size[] = { [MDS_REQ_REC_OFF] = sizeof(struct mds_body) };
         ENTRY;
 
-        req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_PIN, 1, &size, NULL);
+        req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
+                              MDS_PIN, 1, size, NULL);
         if (req == NULL)
                 RETURN(-ENOMEM);
 
-        body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
+        body = lustre_msg_buf(req->rq_reqmsg, MDS_REQ_REC_OFF, sizeof (*body));
         mdc_pack_fid(&body->fid1, ino, gen, type);
         body->flags = flag;
 
-        req->rq_replen = lustre_msg_size(1, &size);
+        req->rq_replen = lustre_msg_size(1, size);
 
         mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
         rc = ptlrpc_queue_wait(req);
@@ -1004,17 +1000,18 @@ static int mdc_unpin(struct obd_export *exp,
 {
         struct ptlrpc_request *req;
         struct mds_body *body;
-        int rc, size = sizeof(*body);
+        int rc, size[] = { [MDS_REQ_REC_OFF] = sizeof(struct mds_body) };
         ENTRY;
 
         if (handle->och_magic != OBD_CLIENT_HANDLE_MAGIC)
                 RETURN(0);
 
-        req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_CLOSE, 1, &size, NULL);
+        req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
+                              MDS_CLOSE, 1, size, NULL);
         if (req == NULL)
                 RETURN(-ENOMEM);
 
-        body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*body));
+        body = lustre_msg_buf(req->rq_reqmsg, 1, sizeof(*body));
         memcpy(&body->handle, &handle->och_fh, sizeof(body->handle));
         body->flags = flag;
 
@@ -1036,22 +1033,17 @@ int mdc_sync(struct obd_export *exp, struct ll_fid *fid,
              struct ptlrpc_request **request)
 {
         struct ptlrpc_request *req;
-        struct mds_body *body;
-        int size = sizeof(*body);
-        int rc;
+        int rc, size[] = { [MDS_REQ_REC_OFF] = sizeof(struct mds_body) };
         ENTRY;
 
-        req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_SYNC, 1,&size,NULL);
+        req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
+                              MDS_SYNC, 1, size, NULL);
         if (!req)
                 RETURN(rc = -ENOMEM);
 
-        if (fid) {
-                body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
-                memcpy(&body->fid1, fid, sizeof(*fid));
-                mdc_pack_req_body(req);
-        }
+        mdc_pack_req_body(req, MDS_REQ_REC_OFF, 0, fid, 0);
 
-        req->rq_replen = lustre_msg_size(1, &size);
+        req->rq_replen = lustre_msg_size(1, size);
 
         rc = ptlrpc_queue_wait(req);
         if (rc || request == NULL)
index 0c331e2..2964d6f 100644 (file)
@@ -379,7 +379,7 @@ static int mds_destroy_export(struct obd_export *export)
                 /* child orphan sem protects orphan_dec_test and
                  * is_orphan race, mds_mfd_close drops it */
                 MDS_DOWN_WRITE_ORPHAN_SEM(dentry->d_inode);
-                rc = mds_mfd_close(NULL, obd, mfd,
+                rc = mds_mfd_close(NULL, MDS_REQ_REC_OFF, obd, mfd,
                                    !(export->exp_flags & OBD_OPT_FAILOVER));
 
                 if (rc)
@@ -763,9 +763,9 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req,
         if (rc)
                 GOTO(cleanup, rc);
 
-        LASSERT (offset == 0 || offset == 2);
+        LASSERT (offset == MDS_REQ_REC_OFF || offset == MDS_REQ_INTENT_REC_OFF);
         /* if requests were at offset 2, the getattr reply goes back at 1 */
-        if (offset) {
+        if (offset == MDS_REQ_INTENT_REC_OFF) {
                 rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
                 offset = 1;
         }
@@ -886,7 +886,7 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req,
         return rc;
 }
 
-static int mds_getattr(int offset, struct ptlrpc_request *req)
+static int mds_getattr(struct ptlrpc_request *req, int offset)
 {
         struct mds_obd *mds = mds_req2mds(req);
         struct obd_device *obd = req->rq_export->exp_obd;
@@ -979,7 +979,7 @@ out:
         return 0;
 }
 
-static int mds_sync(struct ptlrpc_request *req)
+static int mds_sync(struct ptlrpc_request *req, int offset)
 {
         struct obd_device *obd = req->rq_export->exp_obd;
         struct mds_obd *mds = &obd->u.mds;
@@ -1030,7 +1030,7 @@ out:
  *
  * If we were to take another one here, a deadlock will result, if another
  * thread is already waiting for a PW lock. */
-static int mds_readpage(struct ptlrpc_request *req)
+static int mds_readpage(struct ptlrpc_request *req, int offset)
 {
         struct obd_device *obd = req->rq_export->exp_obd;
         struct mds_obd *mds = &obd->u.mds;
@@ -1052,7 +1052,8 @@ static int mds_readpage(struct ptlrpc_request *req)
                 GOTO(out, rc);
         }
 
-        body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_mds_body);
+        body = lustre_swab_reqbuf(req, offset, sizeof(*body),
+                                  lustre_swab_mds_body);
         if (body == NULL)
                 GOTO (out, rc = -EFAULT);
 
@@ -1246,6 +1247,74 @@ static int mds_handle_quotactl(struct ptlrpc_request *req)
         RETURN(0);
 }
 
+static int mds_msg_check_version(struct lustre_msg *msg)
+{
+        int rc;
+
+        /* TODO: enable the below check while really introducing msg version.
+         * it's disabled because it will break compatibility with b1_4.
+         */
+        return (0);
+
+        switch (msg->opc) {
+        case MDS_CONNECT:
+        case MDS_DISCONNECT:
+        case OBD_PING:
+                rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
+                if (rc)
+                        CERROR("bad opc %u version %08x, expecting %08x\n",
+                               msg->opc, msg->version, LUSTRE_OBD_VERSION);
+                break;
+        case MDS_GETSTATUS:
+        case MDS_GETATTR:
+        case MDS_GETATTR_NAME:
+        case MDS_STATFS:
+        case MDS_READPAGE:
+        case MDS_REINT:
+        case MDS_CLOSE:
+        case MDS_DONE_WRITING:
+        case MDS_PIN:
+        case MDS_SYNC:
+        case MDS_GETXATTR:
+        case MDS_SETXATTR:
+        case MDS_SET_INFO:
+        case MDS_QUOTACHECK:
+        case MDS_QUOTACTL:
+        case QUOTA_DQACQ:
+        case QUOTA_DQREL:
+                rc = lustre_msg_check_version(msg, LUSTRE_MDS_VERSION);
+                if (rc)
+                        CERROR("bad opc %u version %08x, expecting %08x\n",
+                               msg->opc, msg->version, LUSTRE_MDS_VERSION);
+                break;
+        case LDLM_ENQUEUE:
+        case LDLM_CONVERT:
+        case LDLM_BL_CALLBACK:
+        case LDLM_CP_CALLBACK:
+                rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION);
+                if (rc)
+                        CERROR("bad opc %u version %08x, expecting %08x\n",
+                               msg->opc, msg->version, LUSTRE_DLM_VERSION);
+                break;
+        case OBD_LOG_CANCEL:
+        case LLOG_ORIGIN_HANDLE_CREATE:
+        case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
+        case LLOG_ORIGIN_HANDLE_PREV_BLOCK:
+        case LLOG_ORIGIN_HANDLE_READ_HEADER:
+        case LLOG_ORIGIN_HANDLE_CLOSE:
+        case LLOG_CATINFO:
+                rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION);
+                if (rc)
+                        CERROR("bad opc %u version %08x, expecting %08x\n",
+                               msg->opc, msg->version, LUSTRE_LOG_VERSION);
+                break;
+        default:
+                CERROR("MDS unknown opcode %d\n", msg->opc);
+                rc = -ENOTSUPP;
+        }
+        return rc;
+}
+
 int mds_handle(struct ptlrpc_request *req)
 {
         int should_process, fail = OBD_FAIL_MDS_ALL_REPLY_NET;
@@ -1257,6 +1326,13 @@ int mds_handle(struct ptlrpc_request *req)
         OBD_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
 
         LASSERT(current->journal_info == NULL);
+
+        rc = mds_msg_check_version(req->rq_reqmsg);
+        if (rc) {
+                CERROR("MDS drop mal-formed request\n");
+                RETURN(rc);
+        }
+
         /* XXX identical to OST */
         if (req->rq_reqmsg->opc != MDS_CONNECT) {
                 struct mds_export_data *med;
@@ -1332,7 +1408,7 @@ int mds_handle(struct ptlrpc_request *req)
         case MDS_GETATTR:
                 DEBUG_REQ(D_INODE, req, "getattr");
                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NET, 0);
-                rc = mds_getattr(0, req);
+                rc = mds_getattr(req, MDS_REQ_REC_OFF);
                 break;
 
         case MDS_SETXATTR:
@@ -1357,7 +1433,8 @@ int mds_handle(struct ptlrpc_request *req)
                  * want to cancel.
                  */
                 lockh.cookie = 0;
-                rc = mds_getattr_name(0, req, MDS_INODELOCK_UPDATE, &lockh);
+                rc = mds_getattr_name(MDS_REQ_REC_OFF, req,
+                                      MDS_INODELOCK_UPDATE, &lockh);
                 /* this non-intent call (from an ioctl) is special */
                 req->rq_status = rc;
                 if (rc == 0 && lockh.cookie)
@@ -1373,7 +1450,7 @@ int mds_handle(struct ptlrpc_request *req)
         case MDS_READPAGE:
                 DEBUG_REQ(D_INODE, req, "readpage");
                 OBD_FAIL_RETURN(OBD_FAIL_MDS_READPAGE_NET, 0);
-                rc = mds_readpage(req);
+                rc = mds_readpage(req, MDS_REQ_REC_OFF);
 
                 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_MDS_SENDPAGE)) {
                         RETURN(0);
@@ -1382,9 +1459,10 @@ int mds_handle(struct ptlrpc_request *req)
                 break;
 
         case MDS_REINT: {
-                __u32 *opcp = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*opcp));
+                __u32 *opcp = lustre_msg_buf(req->rq_reqmsg, MDS_REQ_REC_OFF,
+                                             sizeof (*opcp));
                 __u32  opc;
-                int size[3] = {sizeof(struct mds_body), mds->mds_max_mdsize,
+                int size[] = { sizeof(struct mds_body), mds->mds_max_mdsize,
                                mds->mds_max_cookiesize};
                 int bufcount;
 
@@ -1416,7 +1494,7 @@ int mds_handle(struct ptlrpc_request *req)
                 if (rc)
                         break;
 
-                rc = mds_reint(req, 0, NULL);
+                rc = mds_reint(req, MDS_REQ_REC_OFF, NULL);
                 fail = OBD_FAIL_MDS_REINT_NET_REP;
                 break;
         }
@@ -1424,25 +1502,25 @@ int mds_handle(struct ptlrpc_request *req)
         case MDS_CLOSE:
                 DEBUG_REQ(D_INODE, req, "close");
                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CLOSE_NET, 0);
-                rc = mds_close(req);
+                rc = mds_close(req, MDS_REQ_REC_OFF);
                 break;
 
         case MDS_DONE_WRITING:
                 DEBUG_REQ(D_INODE, req, "done_writing");
                 OBD_FAIL_RETURN(OBD_FAIL_MDS_DONE_WRITING_NET, 0);
-                rc = mds_done_writing(req);
+                rc = mds_done_writing(req, MDS_REQ_REC_OFF);
                 break;
 
         case MDS_PIN:
                 DEBUG_REQ(D_INODE, req, "pin");
                 OBD_FAIL_RETURN(OBD_FAIL_MDS_PIN_NET, 0);
-                rc = mds_pin(req);
+                rc = mds_pin(req, MDS_REQ_REC_OFF);
                 break;
 
         case MDS_SYNC:
                 DEBUG_REQ(D_INODE, req, "sync");
                 OBD_FAIL_RETURN(OBD_FAIL_MDS_SYNC_NET, 0);
-                rc = mds_sync(req);
+                rc = mds_sync(req, MDS_REQ_REC_OFF);
                 break;
 
         case MDS_SET_INFO:
@@ -1502,6 +1580,11 @@ int mds_handle(struct ptlrpc_request *req)
                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
                 rc = llog_origin_handle_next_block(req);
                 break;
+        case LLOG_ORIGIN_HANDLE_PREV_BLOCK:
+                DEBUG_REQ(D_INODE, req, "llog prev block");
+                OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
+                rc = llog_origin_handle_prev_block(req);
+                break;
         case LLOG_ORIGIN_HANDLE_READ_HEADER:
                 DEBUG_REQ(D_INODE, req, "llog read header");
                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
@@ -1980,7 +2063,7 @@ static int mds_cleanup(struct obd_device *obd)
         RETURN(0);
 }
 
-static void fixup_handle_for_resent_req(struct ptlrpc_request *req,
+static void fixup_handle_for_resent_req(struct ptlrpc_request *req, int offset,
                                         struct ldlm_lock *new_lock,
                                         struct ldlm_lock **old_lock,
                                         struct lustre_handle *lockh)
@@ -1988,7 +2071,7 @@ static void fixup_handle_for_resent_req(struct ptlrpc_request *req,
         struct obd_export *exp = req->rq_export;
         struct obd_device *obd = exp->exp_obd;
         struct ldlm_request *dlmreq =
-                lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*dlmreq));
+                lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*dlmreq));
         struct lustre_handle remote_hdl = dlmreq->lock_handle1;
         struct list_head *iter;
 
@@ -2057,15 +2140,16 @@ static int mds_intent_policy(struct ldlm_namespace *ns,
         struct lustre_handle lockh = { 0 };
         struct ldlm_lock *new_lock = NULL;
         int getattr_part = MDS_INODELOCK_UPDATE;
-        int rc, offset = 2;
-        int repbufcnt = 3, repsize[4] = {sizeof(struct ldlm_reply),
-                                         sizeof(struct mds_body),
-                                         mds->mds_max_mdsize};
+        int repsize[4] = {sizeof(*rep),
+                          sizeof(struct mds_body),
+                          mds->mds_max_mdsize};
+        int repbufcnt = 3, offset = MDS_REQ_INTENT_REC_OFF;
+        int rc;
         ENTRY;
 
         LASSERT(req != NULL);
 
-        if (req->rq_reqmsg->bufcount <= 1) {
+        if (req->rq_reqmsg->bufcount <= MDS_REQ_INTENT_IT_OFF) {
                 /* No intent was provided */
                 int size = sizeof(struct ldlm_reply);
                 rc = lustre_pack_reply(req, 1, &size, NULL);
@@ -2073,7 +2157,8 @@ static int mds_intent_policy(struct ldlm_namespace *ns,
                 RETURN(0);
         }
 
-        it = lustre_swab_reqbuf(req, 1, sizeof(*it), lustre_swab_ldlm_intent);
+        it = lustre_swab_reqbuf(req, MDS_REQ_INTENT_IT_OFF, sizeof(*it),
+                                lustre_swab_ldlm_intent);
         if (it == NULL) {
                 CERROR("Intent missing\n");
                 RETURN(req->rq_status = -EFAULT);
@@ -2099,7 +2184,8 @@ static int mds_intent_policy(struct ldlm_namespace *ns,
         switch ((long)it->opc) {
         case IT_OPEN:
         case IT_CREAT|IT_OPEN:
-                fixup_handle_for_resent_req(req, lock, NULL, &lockh);
+                fixup_handle_for_resent_req(req, MDS_REQ_INTENT_LOCKREQ_OFF,
+                                            lock, NULL, &lockh);
                 /* XXX swab here to assert that an mds_open reint
                  * packet is following */
                 rep->lock_policy_res2 = mds_reint(req, offset, &lockh);
@@ -2118,7 +2204,8 @@ static int mds_intent_policy(struct ldlm_namespace *ns,
         case IT_GETATTR:
                         getattr_part |= MDS_INODELOCK_LOOKUP;
         case IT_READDIR:
-                fixup_handle_for_resent_req(req, lock, &new_lock, &lockh);
+                fixup_handle_for_resent_req(req, MDS_REQ_INTENT_LOCKREQ_OFF,
+                                            lock, &new_lock, &lockh);
                 rep->lock_policy_res2 = mds_getattr_name(offset, req,
                                                          getattr_part, &lockh);
 
index 4f25fa4..2bb8868 100644 (file)
@@ -170,12 +170,12 @@ void mds_objids_from_lmm(obd_id *ids, struct lov_mds_md *lmm,
 int mds_query_write_access(struct inode *inode);
 int mds_open(struct mds_update_record *rec, int offset,
              struct ptlrpc_request *req, struct lustre_handle *);
-int mds_pin(struct ptlrpc_request *req);
+int mds_pin(struct ptlrpc_request *req, int offset);
 void mds_mfd_unlink(struct mds_file_data *mfd, int decref);
-int mds_mfd_close(struct ptlrpc_request *req, struct obd_device *obd,
+int mds_mfd_close(struct ptlrpc_request *req, int offset, struct obd_device *obd,
                   struct mds_file_data *mfd, int unlink_orphan);
-int mds_close(struct ptlrpc_request *req);
-int mds_done_writing(struct ptlrpc_request *req);
+int mds_close(struct ptlrpc_request *req, int offset);
+int mds_done_writing(struct ptlrpc_request *req, int offset);
 
 
 /* mds/mds_fs.c */
index c7e40e6..f6fa43b 100644 (file)
@@ -798,7 +798,7 @@ static int mds_open_by_fid(struct ptlrpc_request *req, struct ll_fid *fid,
         RETURN(rc);
 }
 
-int mds_pin(struct ptlrpc_request *req)
+int mds_pin(struct ptlrpc_request *req, int offset)
 {
         struct obd_device *obd = req->rq_export->exp_obd;
         struct mds_body *request_body, *reply_body;
@@ -806,7 +806,8 @@ int mds_pin(struct ptlrpc_request *req)
         int rc, size = sizeof(*reply_body);
         ENTRY;
 
-        request_body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*request_body));
+        request_body = lustre_msg_buf(req->rq_reqmsg, offset,
+                                      sizeof(*request_body));
 
         rc = lustre_pack_reply(req, 1, &size, NULL);
         if (rc)
@@ -870,7 +871,7 @@ int mds_open(struct mds_update_record *rec, int offset,
         if (offset == 2) { /* intent */
                 rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
                 body = lustre_msg_buf(req->rq_repmsg, 1, sizeof (*body));
-        } else if (offset == 0) { /* non-intent reint */
+        } else if (offset == MDS_REQ_REC_OFF) { /* non-intent reint */
                 body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
         } else {
                 body = NULL;
@@ -1144,7 +1145,7 @@ int mds_open(struct mds_update_record *rec, int offset,
  * (it will not even _have_ an entry in last_rcvd anymore).
  *
  * Returns EAGAIN if the client needs to get more data and re-close. */
-int mds_mfd_close(struct ptlrpc_request *req, struct obd_device *obd,
+int mds_mfd_close(struct ptlrpc_request *req, int offset,struct obd_device *obd,
                   struct mds_file_data *mfd, int unlink_orphan)
 {
         struct inode *inode = mfd->mfd_dentry->d_inode;
@@ -1160,7 +1161,7 @@ int mds_mfd_close(struct ptlrpc_request *req, struct obd_device *obd,
         ENTRY;
 
         if (req && req->rq_reqmsg != NULL)
-                request_body = lustre_msg_buf(req->rq_reqmsg, 0,
+                request_body = lustre_msg_buf(req->rq_reqmsg, offset,
                                               sizeof(*request_body));
         if (req && req->rq_repmsg != NULL)
                 reply_body = lustre_msg_buf(req->rq_repmsg, 0,
@@ -1325,7 +1326,7 @@ out:
         RETURN(rc);
 }
 
-int mds_close(struct ptlrpc_request *req)
+int mds_close(struct ptlrpc_request *req, int offset)
 {
         struct mds_export_data *med = &req->rq_export->exp_mds_data;
         struct obd_device *obd = req->rq_export->exp_obd;
@@ -1347,7 +1348,8 @@ int mds_close(struct ptlrpc_request *req)
                 MDS_CHECK_RESENT(req, mds_reconstruct_generic(req));
         }
 
-        body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_mds_body);
+        body = lustre_swab_reqbuf(req, offset, sizeof(*body),
+                                  lustre_swab_mds_body);
         if (body == NULL) {
                 CERROR("Can't unpack body\n");
                 req->rq_status = -EFAULT;
@@ -1384,7 +1386,7 @@ int mds_close(struct ptlrpc_request *req)
         }
 
         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
-        req->rq_status = mds_mfd_close(req, obd, mfd, 1);
+        req->rq_status = mds_mfd_close(req, offset, obd, mfd, 1);
         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
 
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK)) {
@@ -1396,7 +1398,7 @@ int mds_close(struct ptlrpc_request *req)
         RETURN(rc);
 }
 
-int mds_done_writing(struct ptlrpc_request *req)
+int mds_done_writing(struct ptlrpc_request *req, int offset)
 {
         struct mds_body *body;
         int rc, size = sizeof(struct mds_body);
@@ -1404,7 +1406,8 @@ int mds_done_writing(struct ptlrpc_request *req)
 
         MDS_CHECK_RESENT(req, mds_reconstruct_generic(req));
 
-        body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_mds_body);
+        body = lustre_swab_reqbuf(req, offset, sizeof(*body),
+                                  lustre_swab_mds_body);
         if (body == NULL) {
                 CERROR("Can't unpack body\n");
                 req->rq_status = -EFAULT;
index 9fb6b3d..a06e886 100644 (file)
@@ -457,7 +457,7 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset,
                                          rec->ur_iattr.ia_gid};
         ENTRY;
 
-        LASSERT(offset == 0);
+        LASSERT(offset == MDS_REQ_REC_OFF);
 
         DEBUG_REQ(D_INODE, req, "setattr "LPU64"/%u %x", rec->ur_fid1->id,
                   rec->ur_fid1->generation, rec->ur_iattr.ia_valid);
@@ -705,7 +705,7 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
         struct dentry_params dp;
         ENTRY;
 
-        LASSERT(offset == 0);
+        LASSERT(offset == MDS_REQ_REC_OFF);
         LASSERT(!strcmp(req->rq_export->exp_obd->obd_type->typ_name, "mds"));
 
         DEBUG_REQ(D_INODE, req, "parent "LPU64"/%u name %s mode %o",
@@ -1415,7 +1415,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
         unsigned int qpids [MAXQUOTAS] = {0, 0};
         ENTRY;
 
-        LASSERT(offset == 0 || offset == 2);
+        LASSERT(offset == MDS_REQ_REC_OFF || offset == 2);
 
         DEBUG_REQ(D_INODE, req, "parent ino "LPU64"/%u, child %s",
                   rec->ur_fid1->id, rec->ur_fid1->generation, rec->ur_name);
@@ -1644,7 +1644,7 @@ static int mds_reint_link(struct mds_update_record *rec, int offset,
         int rc = 0, cleanup_phase = 0;
         ENTRY;
 
-        LASSERT(offset == 0);
+        LASSERT(offset == MDS_REQ_REC_OFF);
 
         DEBUG_REQ(D_INODE, req, "original "LPU64"/%u to "LPU64"/%u %s",
                   rec->ur_fid1->id, rec->ur_fid1->generation,
@@ -1984,7 +1984,7 @@ static int mds_reint_rename(struct mds_update_record *rec, int offset,
         unsigned int qpids[4] = {0, 0, 0, 0};
         ENTRY;
 
-        LASSERT(offset == 0);
+        LASSERT(offset == MDS_REQ_REC_OFF);
 
         DEBUG_REQ(D_INODE, req, "parent "LPU64"/%u %s to "LPU64"/%u %s",
                   rec->ur_fid1->id, rec->ur_fid1->generation, rec->ur_name,
index 17f7a45..8e18703 100644 (file)
@@ -313,3 +313,86 @@ int llog_process(struct llog_handle *loghandle, llog_cb_t cb,
         RETURN(rc);
 }
 EXPORT_SYMBOL(llog_process);
+
+int llog_reverse_process(struct llog_handle *loghandle, llog_cb_t cb,
+                         void *data, void *catdata)
+{
+        struct llog_log_hdr *llh = loghandle->lgh_hdr;
+        struct llog_process_cat_data *cd = catdata;
+        void *buf;
+        int rc = 0, first_index = 1, index, idx;
+        struct llog_rec_tail *tail;
+        ENTRY;
+
+        OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
+        if (!buf)
+                RETURN(-ENOMEM);
+
+        if (cd != NULL)
+                first_index = cd->first_idx + 1;
+        if (cd != NULL && cd->last_idx)
+                index = cd->last_idx;
+        else
+                index = LLOG_BITMAP_BYTES * 8 - 1;
+
+        while (rc == 0) {
+                struct llog_rec_hdr *rec;
+
+                /* skip records not set in bitmap */
+                while (index >= first_index &&
+                       !ext2_test_bit(index, llh->llh_bitmap))
+                        --index;
+
+                LASSERT(index >= first_index - 1);
+                if (index == first_index - 1)
+                        break;
+
+                /* get the buf with our target record; avoid old garbage */
+                memset(buf, 0, LLOG_CHUNK_SIZE);
+                rc = llog_prev_block(loghandle, index, buf, LLOG_CHUNK_SIZE);
+                if (rc)
+                        GOTO(out, rc);
+
+                rec = buf;
+                idx = le32_to_cpu(rec->lrh_index);
+                if (idx < index)
+                        CDEBUG(D_HA, "index %u : idx %u\n", index, idx);
+                while (idx < index) {
+                        rec = ((void *)rec + le32_to_cpu(rec->lrh_len));
+                        idx ++;
+                }
+
+                /* process records in buffer, starting where we found one */
+                while ((void *)rec >= buf) {
+                        if (rec->lrh_index == 0)
+                                GOTO(out, 0); /* no more records */
+
+                        /* if set, process the callback on this record */
+                        if (ext2_test_bit(index, llh->llh_bitmap)) {
+                                rc = cb(loghandle, rec, data);
+                                if (rc == LLOG_PROC_BREAK) {
+                                        CWARN("recovery from log: "LPX64":%x"
+                                              " stopped\n",
+                                              loghandle->lgh_id.lgl_oid,
+                                              loghandle->lgh_id.lgl_ogen);
+                                        GOTO(out, rc);
+                                }
+                                if (rc)
+                                        GOTO(out, rc);
+                        }
+
+                        /* previous record, still in buffer? */
+                        --index;
+                        if (index < first_index)
+                                GOTO(out, rc = 0);
+                        tail = (void *)rec - sizeof(struct llog_rec_tail);
+                        rec = ((void *)rec - le32_to_cpu(tail->lrt_len));
+                }
+        }
+
+out:
+        if (buf)
+                OBD_FREE(buf, LLOG_CHUNK_SIZE);
+        RETURN(rc);
+}
+EXPORT_SYMBOL(llog_reverse_process);
index fd2c198..396c2ca 100644 (file)
@@ -395,6 +395,70 @@ int llog_cat_process(struct llog_handle *cat_llh, llog_cb_t cb, void *data)
 }
 EXPORT_SYMBOL(llog_cat_process);
 
+static int llog_cat_reverse_process_cb(struct llog_handle *cat_llh,
+                                       struct llog_rec_hdr *rec, void *data)
+{
+        struct llog_process_data *d = data;
+        struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
+        struct llog_handle *llh;
+        int rc;
+
+        if (le32_to_cpu(rec->lrh_type) != LLOG_LOGID_MAGIC) {
+                CERROR("invalid record in catalog\n");
+                RETURN(-EINVAL);
+        }
+        CWARN("processing log "LPX64":%x at index %u of catalog "LPX64"\n",
+               lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen,
+               le32_to_cpu(rec->lrh_index), cat_llh->lgh_id.lgl_oid);
+
+        rc = llog_cat_id2handle(cat_llh, &llh, &lir->lid_id);
+        if (rc) {
+                CERROR("Cannot find handle for log "LPX64"\n",
+                       lir->lid_id.lgl_oid);
+                RETURN(rc);
+        }
+
+        rc = llog_reverse_process(llh, d->lpd_cb, d->lpd_data, NULL);
+        RETURN(rc);
+}
+
+int llog_cat_reverse_process(struct llog_handle *cat_llh,
+                             llog_cb_t cb, void *data)
+{
+        struct llog_process_data d;
+        struct llog_process_cat_data cd;
+        struct llog_log_hdr *llh = cat_llh->lgh_hdr;
+        int rc;
+        ENTRY;
+
+        LASSERT(llh->llh_flags &cpu_to_le32(LLOG_F_IS_CAT));
+        d.lpd_data = data;
+        d.lpd_cb = cb;
+
+        if (llh->llh_cat_idx > cat_llh->lgh_last_idx) {
+                CWARN("catalog "LPX64" crosses index zero\n",
+                      cat_llh->lgh_id.lgl_oid);
+
+                cd.first_idx = 0;
+                cd.last_idx = cat_llh->lgh_last_idx;
+                rc = llog_reverse_process(cat_llh, llog_cat_reverse_process_cb,
+                                          &d, &cd);
+                if (rc != 0)
+                        RETURN(rc);
+
+                cd.first_idx = le32_to_cpu(llh->llh_cat_idx);
+                cd.last_idx = 0;
+                rc = llog_reverse_process(cat_llh, llog_cat_reverse_process_cb,
+                                          &d, &cd);
+        } else {
+                rc = llog_reverse_process(cat_llh, llog_cat_reverse_process_cb,
+                                          &d, NULL);
+        }
+
+        RETURN(rc);
+}
+EXPORT_SYMBOL(llog_cat_reverse_process);
+
 int llog_cat_set_first_idx(struct llog_handle *cathandle, int index)
 {
         struct llog_log_hdr *llh = cathandle->lgh_hdr;
index d98fdee..8bbaa62 100644 (file)
@@ -368,7 +368,6 @@ static int llog_lvfs_next_block(struct llog_handle *loghandle, int *cur_idx,
                 rc = fsfilt_read_record(loghandle->lgh_ctxt->loc_exp->exp_obd,
                                         loghandle->lgh_file, buf, len,
                                         &ppos);
-
                 if (rc) {
                         CERROR("Cant read llog block at log id "LPU64
                                "/%u offset "LPU64"\n",
@@ -424,6 +423,79 @@ static int llog_lvfs_next_block(struct llog_handle *loghandle, int *cur_idx,
         RETURN(-EIO);
 }
 
+static int llog_lvfs_prev_block(struct llog_handle *loghandle,
+                                int prev_idx, void *buf, int len)
+{
+        __u64 cur_offset;
+        int rc;
+        ENTRY;
+
+        if (len == 0 || len & (LLOG_CHUNK_SIZE - 1))
+                RETURN(-EINVAL);
+
+        CDEBUG(D_OTHER, "looking for log index %u n", prev_idx);
+
+        cur_offset = LLOG_CHUNK_SIZE;
+        llog_skip_over(&cur_offset, 0, prev_idx);
+
+        while (cur_offset < loghandle->lgh_file->f_dentry->d_inode->i_size) {
+                struct llog_rec_hdr *rec;
+                struct llog_rec_tail *tail;
+                loff_t ppos;
+
+                ppos = cur_offset;
+
+                rc = fsfilt_read_record(loghandle->lgh_ctxt->loc_exp->exp_obd,
+                                        loghandle->lgh_file, buf, len,
+                                        &ppos);
+                if (rc) {
+                        CERROR("Cant read llog block at log id "LPU64
+                               "/%u offset "LPU64"\n",
+                               loghandle->lgh_id.lgl_oid,
+                               loghandle->lgh_id.lgl_ogen,
+                               cur_offset);
+                        RETURN(rc);
+                }
+
+                /* put number of bytes read into rc to make code simpler */
+                rc = ppos - cur_offset;
+                cur_offset = ppos;
+
+                if (rc == 0) /* end of file, nothing to do */
+                        RETURN(0);
+
+                if (rc < sizeof(*tail)) {
+                        CERROR("Invalid llog block at log id "LPU64"/%u offset "
+                               LPU64"\n", loghandle->lgh_id.lgl_oid,
+                               loghandle->lgh_id.lgl_ogen, cur_offset);
+                        RETURN(-EINVAL);
+                }
+
+                tail = buf + rc - sizeof(struct llog_rec_tail);
+
+                /* this shouldn't happen */
+                if (tail->lrt_index == 0) {
+                        CERROR("Invalid llog tail at log id "LPU64"/%u offset "
+                               LPU64"\n", loghandle->lgh_id.lgl_oid,
+                               loghandle->lgh_id.lgl_ogen, cur_offset);
+                        RETURN(-EINVAL);
+                }
+                if (le32_to_cpu(tail->lrt_index) < prev_idx)
+                        continue;
+
+                /* sanity check that the start of the new buffer is no farther
+                 * than the record that we wanted.  This shouldn't happen. */
+                rec = buf;
+                if (le32_to_cpu(rec->lrh_index) > prev_idx) {
+                        CERROR("missed desired record? %u > %u\n",
+                               le32_to_cpu(rec->lrh_index), prev_idx);
+                        RETURN(-ENOENT);
+                }
+                RETURN(0);
+        }
+        RETURN(-EIO);
+}
+
 static struct file *llog_filp_open(char *name, int flags, int mode)
 {
         char *logname;
@@ -698,6 +770,7 @@ int llog_put_cat_list(struct obd_device *obd, struct obd_device *disk_obd,
 struct llog_operations llog_lvfs_ops = {
         lop_write_rec:   llog_lvfs_write_rec,
         lop_next_block:  llog_lvfs_next_block,
+        lop_prev_block:  llog_lvfs_prev_block,
         lop_read_header: llog_lvfs_read_header,
         lop_create:      llog_lvfs_create,
         lop_destroy:     llog_lvfs_destroy,
@@ -732,6 +805,13 @@ static int llog_lvfs_next_block(struct llog_handle *loghandle, int *cur_idx,
         return 0;
 }
 
+static int llog_lvfs_prev_block(struct llog_handle *loghandle,
+                                int prev_idx, void *buf, int len)
+{
+        LBUG();
+        return 0;
+}
+
 static int llog_lvfs_create(struct llog_ctxt *ctxt, struct llog_handle **res,
                             struct llog_logid *logid, char *name)
 {
@@ -768,6 +848,7 @@ int llog_put_cat_list(struct obd_device *obd, struct obd_device *disk_obd,
 struct llog_operations llog_lvfs_ops = {
         lop_write_rec:   llog_lvfs_write_rec,
         lop_next_block:  llog_lvfs_next_block,
+        lop_prev_block:  llog_lvfs_prev_block,
         lop_read_header: llog_lvfs_read_header,
         lop_create:      llog_lvfs_create,
         lop_destroy:     llog_lvfs_destroy,
index 7a2f82b..0e2018a 100644 (file)
@@ -425,6 +425,13 @@ static int llog_test_5(struct obd_device *obd)
                 GOTO(out, rc);
         }
 
+        CWARN("5f: print plain log entries reversely.. expect 6\n");
+        rc = llog_cat_reverse_process(llh, plain_print_cb, "foobar");
+        if (rc) {
+                CERROR("5f: reversely process with plain_print_cb failed: %d\n", rc);
+                GOTO(out, rc);
+        }
+
  out:
         CWARN("5: close re-opened catalog\n");
         if (llh)
@@ -479,6 +486,10 @@ static int llog_test_6(struct obd_device *obd, char *name)
         if (rc)
                 CERROR("6: llog_process failed %d\n", rc);
 
+        rc = llog_reverse_process(llh, (llog_cb_t)plain_print_cb, NULL, NULL);
+        if (rc)
+                CERROR("6: llog_reverse_process failed %d\n", rc);
+
 parse_out:
         rc = llog_close(llh);
         if (rc) {
index db8b42d..489fed8 100644 (file)
@@ -184,8 +184,8 @@ static int osc_getattr_async(struct obd_export *exp, struct obdo *oa,
         struct osc_getattr_async_args *aa;
         ENTRY;
 
-        request = ptlrpc_prep_req(class_exp2cliimp(exp), OST_GETATTR, 1,
-                                  &size, NULL);
+        request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
+                                  OST_GETATTR, 1, &size, NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
@@ -211,8 +211,8 @@ static int osc_getattr(struct obd_export *exp, struct obdo *oa,
         int rc, size = sizeof(*body);
         ENTRY;
 
-        request = ptlrpc_prep_req(class_exp2cliimp(exp), OST_GETATTR, 1,
-                                  &size, NULL);
+        request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
+                                  OST_GETATTR, 1, &size, NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
@@ -255,8 +255,8 @@ static int osc_setattr(struct obd_export *exp, struct obdo *oa,
         int rc, size = sizeof(*body);
         ENTRY;
 
-        request = ptlrpc_prep_req(class_exp2cliimp(exp), OST_SETATTR, 1, &size,
-                                  NULL);
+        request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
+                                  OST_SETATTR, 1, &size, NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
@@ -293,8 +293,8 @@ static int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
 
         LASSERT(oti);
 
-        request = ptlrpc_prep_req(class_exp2cliimp(exp), OST_SETATTR, 1,
-                                  &size, NULL);
+        request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
+                                  OST_SETATTR, 1, &size, NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
@@ -332,8 +332,8 @@ int osc_real_create(struct obd_export *exp, struct obdo *oa,
                         RETURN(rc);
         }
 
-        request = ptlrpc_prep_req(class_exp2cliimp(exp), OST_CREATE,
-                                  1, &size, NULL);
+        request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
+                                  OST_CREATE, 1, &size, NULL);
         if (!request)
                 GOTO(out, rc = -ENOMEM);
 
@@ -419,8 +419,8 @@ static int osc_punch(struct obd_export *exp, struct obdo *oa,
                 RETURN(-EINVAL);
         }
 
-        request = ptlrpc_prep_req(class_exp2cliimp(exp), OST_PUNCH, 1, &size,
-                                  NULL);
+        request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
+                                  OST_PUNCH, 1, &size, NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
@@ -466,8 +466,8 @@ static int osc_sync(struct obd_export *exp, struct obdo *oa,
                 RETURN(-EINVAL);
         }
 
-        request = ptlrpc_prep_req(class_exp2cliimp(exp), OST_SYNC, 1, &size,
-                                  NULL);
+        request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
+                                  OST_SYNC, 1, &size, NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
@@ -513,8 +513,8 @@ static int osc_destroy(struct obd_export *exp, struct obdo *oa,
                 RETURN(-EINVAL);
         }
 
-        request = ptlrpc_prep_req(class_exp2cliimp(exp), OST_DESTROY, 1,
-                                  &size, NULL);
+        request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
+                                  OST_DESTROY, 1, &size, NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
@@ -789,7 +789,8 @@ static int osc_brw_prep_request(int cmd, struct obd_import *imp,struct obdo *oa,
         size[2] = niocount * sizeof(*niobuf);
 
         OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM);
-        req = ptlrpc_prep_req_pool(imp, opc, 3, size, NULL, pool);
+        req = ptlrpc_prep_req_pool(imp, LUSTRE_OST_VERSION, opc, 3,
+                                   size, NULL, pool);
         if (req == NULL)
                 return (-ENOMEM);
 
@@ -2318,8 +2319,8 @@ static int sanosc_brw_read(struct obd_export *exp, struct obdo *oa,
         size[1] = sizeof(struct obd_ioobj);
         size[2] = page_count * sizeof(*nioptr);
 
-        request = ptlrpc_prep_req(class_exp2cliimp(exp), OST_SAN_READ, 3,
-                                  size, NULL);
+        request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
+                                  OST_SAN_READ, 3, size, NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
@@ -2447,7 +2448,8 @@ static int sanosc_brw_write(struct obd_export *exp, struct obdo *oa,
         size[1] = sizeof(struct obd_ioobj);
         size[2] = page_count * sizeof(*nioptr);
 
-        request = ptlrpc_prep_req_pool(class_exp2cliimp(exp), OST_SAN_WRITE,
+        request = ptlrpc_prep_req_pool(class_exp2cliimp(exp),
+                                       LUSTRE_OST_VERSION, OST_SAN_WRITE,
                                        3, size, NULL, cli->cl_rq_pool);
         if (!request)
                 RETURN(-ENOMEM);
@@ -2681,7 +2683,8 @@ static int osc_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm,
         if (*flags & LDLM_FL_HAS_INTENT) {
                 int size[2] = {sizeof(struct ldlm_request), sizeof(lvb)};
 
-                req = ptlrpc_prep_req(class_exp2cliimp(exp), LDLM_ENQUEUE, 1,
+                req = ptlrpc_prep_req(class_exp2cliimp(exp),
+                                      LUSTRE_DLM_VERSION, LDLM_ENQUEUE, 1,
                                       size, NULL);
                 if (req == NULL)
                         RETURN(-ENOMEM);
@@ -2805,8 +2808,8 @@ static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
          * during mount that would help a bit).  Having relative timestamps
          * is not so great if request processing is slow, while absolute
          * timestamps are not ideal because they need time synchronization. */
-        request = ptlrpc_prep_req(obd->u.cli.cl_import, OST_STATFS, 0,
-                                  NULL, NULL);
+        request = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
+                                  OST_STATFS,0,NULL,NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
@@ -2988,8 +2991,8 @@ static int osc_get_info(struct obd_export *exp, obd_count keylen,
                 obd_id *reply;
                 char *bufs[1] = {key};
                 int rc;
-                req = ptlrpc_prep_req(class_exp2cliimp(exp), OST_GET_INFO, 1,
-                                      &keylen, bufs);
+                req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
+                                      OST_GET_INFO, 1, &keylen, bufs);
                 if (req == NULL)
                         RETURN(-ENOMEM);
 
@@ -3056,7 +3059,8 @@ static int osc_set_info(struct obd_export *exp, obd_count keylen,
                 RETURN(-EINVAL);
 
 
-        req = ptlrpc_prep_req(imp, OST_SET_INFO, 2, size, bufs);
+        req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO,
+                              2, size, bufs);
         if (req == NULL)
                 RETURN(-ENOMEM);
 
index 3c534d0..4bd848b 100644 (file)
@@ -1149,6 +1149,66 @@ static int ost_filter_recovery_request(struct ptlrpc_request *req,
         }
 }
 
+int ost_msg_check_version(struct lustre_msg *msg)
+{
+        int rc;
+
+        /* TODO: enable the below check while really introducing msg version.
+         * it's disabled because it will break compatibility with b1_4.
+         */
+        return (0);
+        switch(msg->opc) {
+        case OST_CONNECT:
+        case OST_DISCONNECT:
+        case OBD_PING:
+                rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
+                if (rc)
+                        CERROR("bad opc %u version %08x, expecting %08x\n",
+                               msg->opc, msg->version, LUSTRE_OBD_VERSION);
+                break;
+        case OST_CREATE:
+        case OST_DESTROY:
+        case OST_GETATTR:
+        case OST_SETATTR:
+        case OST_WRITE:
+        case OST_READ:
+        case OST_SAN_READ:
+        case OST_SAN_WRITE:
+        case OST_PUNCH:
+        case OST_STATFS:
+        case OST_SYNC:
+        case OST_SET_INFO:
+        case OST_GET_INFO:
+        case OST_QUOTACHECK:
+        case OST_QUOTACTL:
+                rc = lustre_msg_check_version(msg, LUSTRE_OST_VERSION);
+                if (rc)
+                        CERROR("bad opc %u version %08x, expecting %08x\n",
+                               msg->opc, msg->version, LUSTRE_OST_VERSION);
+                break;
+        case LDLM_ENQUEUE:
+        case LDLM_CONVERT:
+        case LDLM_CANCEL:
+        case LDLM_BL_CALLBACK:
+        case LDLM_CP_CALLBACK:
+                rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION);
+                if (rc)
+                        CERROR("bad opc %u version %08x, expecting %08x\n",
+                               msg->opc, msg->version, LUSTRE_DLM_VERSION);
+                break;
+        case LLOG_ORIGIN_CONNECT:
+        case OBD_LOG_CANCEL:
+                rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION);
+                if (rc)
+                        CERROR("bad opc %u version %08x, expecting %08x\n",
+                               msg->opc, msg->version, LUSTRE_LOG_VERSION);
+        default:
+                CERROR("Unexpected opcode %d\n", msg->opc);
+                rc = -ENOTSUPP;
+        }
+        return rc;
+}
+
 static int ost_handle(struct ptlrpc_request *req)
 {
         struct obd_trans_info trans_info = { 0, };
@@ -1187,6 +1247,9 @@ static int ost_handle(struct ptlrpc_request *req)
         }
 
         oti_init(oti, req);
+        rc = ost_msg_check_version(req->rq_reqmsg);
+        if (rc)
+                RETURN(rc);
 
         switch (req->rq_reqmsg->opc) {
         case OST_CONNECT: {
index a99cb48..c6dc70b 100644 (file)
@@ -301,10 +301,10 @@ static struct ptlrpc_request *ptlrpc_prep_req_from_pool(struct ptlrpc_request_po
         return request;
 }
 
-struct ptlrpc_request *ptlrpc_prep_req_pool(struct obd_import *imp, int opcode,
-                                            int count, int *lengths,
-                                            char **bufs,
-                                            struct ptlrpc_request_pool *pool)
+struct ptlrpc_request *
+ptlrpc_prep_req_pool(struct obd_import *imp, __u32 version, int opcode,
+                     int count, int *lengths, char **bufs,
+                     struct ptlrpc_request_pool *pool)
 {
         struct ptlrpc_request *request = NULL;
         int rc;
@@ -333,6 +333,11 @@ struct ptlrpc_request *ptlrpc_prep_req_pool(struct obd_import *imp, int opcode,
                 RETURN(NULL);
         }
 
+#if 0   /* TODO: enable this while really introducing msg version.
+         * it's disabled because it will break compatibility with b1_4.
+         */        
+        request->rq_reqmsg->version |= version;
+#endif
         if (imp->imp_server_timeout)
                 request->rq_timeout = obd_timeout / 2;
         else
@@ -368,13 +373,14 @@ struct ptlrpc_request *ptlrpc_prep_req_pool(struct obd_import *imp, int opcode,
         RETURN(request);
 }
 
-struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, int opcode,
-                                       int count, int *lengths, char **bufs)
+struct ptlrpc_request *
+ptlrpc_prep_req(struct obd_import *imp, __u32 version, int opcode,
+                int count, int *lengths, char **bufs)
 {
-        return ptlrpc_prep_req_pool(imp, opcode, count, lengths, bufs, NULL);
+        return ptlrpc_prep_req_pool(imp, version, opcode, count, lengths,
+                                    bufs, NULL);
 }
 
-
 struct ptlrpc_request_set *ptlrpc_prep_set(void)
 {
         struct ptlrpc_request_set *set;
index 19e24f7..fd7bc1e 100644 (file)
@@ -354,7 +354,8 @@ int ptlrpc_connect_import(struct obd_import *imp, char * new_uuid)
         if (rc)
                 GOTO(out, rc);
 
-        request = ptlrpc_prep_req(imp, imp->imp_connect_op, 4, size, tmp);
+        request = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, imp->imp_connect_op,
+                                  4, size, tmp);
         if (!request)
                 GOTO(out, rc = -ENOMEM);
 
@@ -621,7 +622,8 @@ static int signal_completed_replay(struct obd_import *imp)
         LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
         atomic_inc(&imp->imp_replay_inflight);
 
-        req = ptlrpc_prep_req(imp, OBD_PING, 0, NULL, NULL);
+        req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, OBD_PING,
+                              0, NULL, NULL);
         if (!req) {
                 atomic_dec(&imp->imp_replay_inflight);
                 RETURN(-ENOMEM);
@@ -802,7 +804,8 @@ int ptlrpc_disconnect_import(struct obd_import *imp)
 
         spin_unlock_irqrestore(&imp->imp_lock, flags);
 
-        request = ptlrpc_prep_req(imp, rq_opc, 0, NULL, NULL);
+        request = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, rq_opc,
+                                  0, NULL, NULL);
         if (request) {
                 /* We are disconnecting, do not retry a failed DISCONNECT rpc if
                  * it fails.  We can get through the above with a down server
index 83ce394..2d5b744 100644 (file)
@@ -85,8 +85,8 @@ static int llog_client_create(struct llog_ctxt *ctxt, struct llog_handle **res,
                 bufcount++;
         }
 
-        req = ptlrpc_prep_req(imp, LLOG_ORIGIN_HANDLE_CREATE, 
-                              bufcount, size, tmp);
+        req = ptlrpc_prep_req(imp, LUSTRE_LOG_VERSION,
+                              LLOG_ORIGIN_HANDLE_CREATE, bufcount, size, tmp);
         if (!req)
                 GOTO(err_free, rc = -ENOMEM);
 
@@ -129,7 +129,8 @@ static int llog_client_next_block(struct llog_handle *loghandle,
         int rc;
         ENTRY;
 
-        req = ptlrpc_prep_req(imp, LLOG_ORIGIN_HANDLE_NEXT_BLOCK, 1,&size,NULL);
+        req = ptlrpc_prep_req(imp, LUSTRE_LOG_VERSION,
+                              LLOG_ORIGIN_HANDLE_NEXT_BLOCK, 1,&size,NULL);
         if (!req)
                 GOTO(out, rc = -ENOMEM);
 
@@ -173,6 +174,56 @@ out:
         RETURN(rc);
 }
 
+static int llog_client_prev_block(struct llog_handle *loghandle,
+                                  int prev_idx, void *buf, int len)
+{
+        struct obd_import *imp = loghandle->lgh_ctxt->loc_imp;
+        struct ptlrpc_request *req = NULL;
+        struct llogd_body *body;
+        void * ptr;
+        int size = sizeof(*body);
+        int repsize[2] = {sizeof (*body)};
+        int rc;
+        ENTRY;
+
+        req = ptlrpc_prep_req(imp, LUSTRE_LOG_VERSION,
+                              LLOG_ORIGIN_HANDLE_PREV_BLOCK, 1,&size,NULL);
+        if (!req)
+                GOTO(out, rc = -ENOMEM);
+
+        body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
+        body->lgd_logid = loghandle->lgh_id;
+        body->lgd_ctxt_idx = loghandle->lgh_ctxt->loc_idx - 1;
+        body->lgd_llh_flags = loghandle->lgh_hdr->llh_flags;
+        body->lgd_index = prev_idx;
+        body->lgd_len = len;
+        repsize[1] = len;
+
+        req->rq_replen = lustre_msg_size(2, repsize);
+        rc = ptlrpc_queue_wait(req);
+        if (rc)
+                GOTO(out, rc);
+
+        body = lustre_swab_repbuf(req, 0, sizeof(*body),
+                                 lustre_swab_llogd_body);
+        if (body == NULL) {
+                CERROR ("Can't unpack llogd_body\n");
+                GOTO(out, rc =-EFAULT);
+        }
+
+        ptr = lustre_msg_buf(req->rq_repmsg, 1, len);
+        if (ptr == NULL) {
+                CERROR ("Can't unpack bitmap\n");
+                GOTO(out, rc =-EFAULT);
+        }
+
+        memcpy(buf, ptr, len);
+
+out:
+        if (req)
+                ptlrpc_req_finished(req);
+        RETURN(rc);
+}
 
 static int llog_client_read_header(struct llog_handle *handle)
 {
@@ -186,8 +237,8 @@ static int llog_client_read_header(struct llog_handle *handle)
         int rc;
         ENTRY;
 
-        req = ptlrpc_prep_req(imp, LLOG_ORIGIN_HANDLE_READ_HEADER,
-                              1, &size, NULL);
+        req = ptlrpc_prep_req(imp, LUSTRE_LOG_VERSION,
+                              LLOG_ORIGIN_HANDLE_READ_HEADER, 1, &size, NULL);
         if (!req)
                 GOTO(out, rc = -ENOMEM);
 
@@ -240,6 +291,7 @@ static int llog_client_close(struct llog_handle *handle)
 
 struct llog_operations llog_client_ops = {
         lop_next_block:  llog_client_next_block,
+        lop_prev_block:  llog_client_prev_block,
         lop_read_header: llog_client_read_header,
         lop_create:      llog_client_create,
         lop_close:       llog_client_close,
index bdd8dbb..877c0c7 100644 (file)
@@ -82,7 +82,8 @@ int llog_origin_connect(struct llog_ctxt *ctxt, int count,
         LASSERT(ctxt->loc_imp);
         imp = ctxt->loc_imp;
 
-        request = ptlrpc_prep_req(imp, LLOG_ORIGIN_CONNECT, 1, &size, NULL);
+        request = ptlrpc_prep_req(imp, LUSTRE_LOG_VERSION,
+                                  LLOG_ORIGIN_CONNECT, 1, &size, NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
index b875002..2007f2d 100644 (file)
@@ -180,6 +180,77 @@ out:
         RETURN(rc);
 }
 
+int llog_origin_handle_prev_block(struct ptlrpc_request *req)
+{
+        struct obd_export *exp = req->rq_export;
+        struct obd_device *obd = exp->exp_obd;
+        struct llog_handle  *loghandle;
+        struct llogd_body *body;
+        struct obd_device *disk_obd;
+        struct lvfs_run_ctxt saved;
+        struct llog_ctxt *ctxt;
+        __u32 flags;
+        __u8 *buf;
+        void * ptr;
+        int size[] = {sizeof (*body),
+                      LLOG_CHUNK_SIZE};
+        int rc, rc2;
+        ENTRY;
+
+        body = lustre_swab_reqbuf(req, 0, sizeof(*body),
+                                  lustre_swab_llogd_body);
+        if (body == NULL) {
+                CERROR ("Can't unpack llogd_body\n");
+                GOTO(out, rc =-EFAULT);
+        }
+
+        OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
+        if (!buf)
+                GOTO(out, rc = -ENOMEM);
+
+        ctxt = llog_get_context(obd, body->lgd_ctxt_idx);
+        LASSERT(ctxt != NULL);
+        disk_obd = ctxt->loc_exp->exp_obd;
+        push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
+
+        rc = llog_create(ctxt, &loghandle, &body->lgd_logid, NULL);
+        if (rc)
+                GOTO(out_pop, rc);
+
+        flags = body->lgd_llh_flags;
+        rc = llog_init_handle(loghandle, flags, NULL);
+        if (rc)
+                GOTO(out_close, rc);
+
+        memset(buf, 0, LLOG_CHUNK_SIZE);
+        rc = llog_prev_block(loghandle, body->lgd_index,
+                             buf, LLOG_CHUNK_SIZE);
+        if (rc)
+                GOTO(out_close, rc);
+
+
+        rc = lustre_pack_reply(req, 2, size, NULL);
+        if (rc)
+                GOTO(out_close, rc = -ENOMEM);
+
+        ptr = lustre_msg_buf(req->rq_repmsg, 0, sizeof (body));
+        memcpy(ptr, body, sizeof(*body));
+
+        ptr = lustre_msg_buf(req->rq_repmsg, 1, LLOG_CHUNK_SIZE);
+        memcpy(ptr, buf, LLOG_CHUNK_SIZE);
+
+out_close:
+        rc2 = llog_close(loghandle);
+        if (!rc)
+                rc = rc2;
+
+out_pop:
+        pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
+        OBD_FREE(buf, LLOG_CHUNK_SIZE);
+out:
+        RETURN(rc);
+}
+
 int llog_origin_handle_read_header(struct ptlrpc_request *req)
 {
         struct obd_export *exp = req->rq_export;
@@ -545,6 +616,11 @@ int llog_origin_handle_next_block(struct ptlrpc_request *req)
         LBUG();
         return 0;
 }
+int llog_origin_handle_prev_block(struct ptlrpc_request *req)
+{
+        LBUG();
+        return 0;
+}
 int llog_origin_handle_read_header(struct ptlrpc_request *req)
 {
         LBUG();
index d2a410e..b19a309 100644 (file)
@@ -483,7 +483,8 @@ int lprocfs_wr_ping(struct file *file, const char *buffer,
         int rc;
         ENTRY;
 
-        req = ptlrpc_prep_req(obd->u.cli.cl_import, OBD_PING, 0, NULL, NULL);
+        req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OBD_VERSION,
+                              OBD_PING, 0, NULL, NULL);
         if (req == NULL)
                 RETURN(-ENOMEM);
 
index db308d3..6fd3eea 100644 (file)
@@ -46,6 +46,14 @@ int lustre_msg_swabbed(struct lustre_msg *msg)
         return (msg->magic == __swab32(PTLRPC_MSG_MAGIC));
 }
 
+int lustre_msg_check_version(struct lustre_msg *msg, __u32 version)
+{
+        if (lustre_msg_swabbed(msg))
+                 return (__swab32(msg->version) & LUSTRE_VERSION_MASK) != version;
+
+        return (msg->version & LUSTRE_VERSION_MASK) != version;
+}
+
 static void
 lustre_init_msg (struct lustre_msg *msg, int count, int *lens, char **bufs)
 {
@@ -321,7 +329,7 @@ int lustre_unpack_msg(struct lustre_msg *m, int len)
                 RETURN (-EINVAL);
         }
 
-        if (m->version != PTLRPC_MSG_VERSION) {
+        if ((m->version & ~LUSTRE_VERSION_MASK) != PTLRPC_MSG_VERSION) {
                 CERROR("wrong lustre_msg version %#08x\n", m->version);
                 RETURN (-EINVAL);
         }
index 291e8a5..21bb6f3 100644 (file)
@@ -47,8 +47,7 @@ int ptlrpc_ping(struct obd_import *imp)
         int rc = 0;
         ENTRY;
 
-        req = ptlrpc_prep_req(imp, OBD_PING, 0, NULL,
-                              NULL);
+        req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, OBD_PING, 0, NULL, NULL);
         if (req) {
                 DEBUG_REQ(D_INFO, req, "pinging %s->%s",
                           imp->imp_obd->obd_uuid.uuid,
@@ -100,7 +99,7 @@ int ptlrpc_statfs(struct obd_import *imp)
         struct ptlrpc_request *req;
         ENTRY;
 
-        req = ptlrpc_prep_req(imp, OST_STATFS, 0,
+        req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_STATFS, 0,
                               NULL, NULL);
         if (!req) {
                 CERROR("OOM trying to ping %s->%s\n",
@@ -461,8 +460,8 @@ static int pinger_check_rpcs(void *arg)
                                 continue;
                         }
 
-                        req = ptlrpc_prep_req(imp, OBD_PING, 0, NULL,
-                                              NULL);
+                        req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, OBD_PING,
+                                              0, NULL, NULL);
                         if (!req) {
                                 CERROR("out of memory\n");
                                 break;
index 5333e94..3c059c4 100644 (file)
@@ -147,6 +147,7 @@ EXPORT_SYMBOL(ptlrpc_service_health_check);
 
 /* pack_generic.c */
 EXPORT_SYMBOL(lustre_msg_swabbed);
+EXPORT_SYMBOL(lustre_msg_check_version);
 EXPORT_SYMBOL(lustre_pack_request);
 EXPORT_SYMBOL(lustre_pack_reply);
 EXPORT_SYMBOL(lustre_shrink_reply);
@@ -215,6 +216,7 @@ EXPORT_SYMBOL(ptlrpcd_wake);
 /* llogd.c */
 EXPORT_SYMBOL(llog_origin_handle_create);
 EXPORT_SYMBOL(llog_origin_handle_next_block);
+EXPORT_SYMBOL(llog_origin_handle_prev_block);
 EXPORT_SYMBOL(llog_origin_handle_read_header);
 EXPORT_SYMBOL(llog_origin_handle_close);
 EXPORT_SYMBOL(llog_client_ops);
index 38c2258..c42fa90 100644 (file)
@@ -202,8 +202,11 @@ int ptlrpcd_check_async_rpcs(void *arg)
         /* single threaded!! */
         pc->pc_recurred++;
 
-        if (pc->pc_recurred == 1)
+        if (pc->pc_recurred == 1) {
                 rc = ptlrpcd_check(pc);
+                if (!rc)
+                        ptlrpc_expired_set(pc->pc_set);
+        }
 
         pc->pc_recurred--;
         return rc;
index 1c6ada7..df7f97b 100644 (file)
@@ -355,7 +355,8 @@ static int log_commit_thread(void *arg)
                                 continue;
                         }
 
-                        request = ptlrpc_prep_req(import, OBD_LOG_CANCEL, 1,
+                        request = ptlrpc_prep_req(import, LUSTRE_LOG_VERSION,
+                                                  OBD_LOG_CANCEL, 1,
                                                   &llcd->llcd_cookiebytes,
                                                   bufs);
                         if (request == NULL) {
index f25ca69..54555bb 100644 (file)
@@ -289,7 +289,9 @@ void ptlrpc_request_handle_notconn(struct ptlrpc_request *failed_req)
                                imp->imp_obd->obd_name);
                         ptlrpc_deactivate_import(imp);
                 }
-                ptlrpc_connect_import(imp, NULL);
+                /* to control recovery via lctl {disable|enable}_recovery */
+                if (imp->imp_deactive == 0)
+                        ptlrpc_connect_import(imp, NULL);
         }
 
         /* Wait for recovery to complete and resend. If evicted, then
index 55a33a6..7c5d749 100644 (file)
@@ -52,8 +52,8 @@ static int target_quotacheck_callback(struct obd_export *exp,
         int rc, size = sizeof(*oqctl);
         ENTRY;
 
-        req = ptlrpc_prep_req(exp->exp_imp_reverse, OBD_QC_CALLBACK,
-                              1, &size, NULL);
+        req = ptlrpc_prep_req(exp->exp_imp_reverse, LUSTRE_OBD_VERSION,
+                              OBD_QC_CALLBACK, 1, &size, NULL);
         if (!req)
                 RETURN(-ENOMEM);
 
@@ -164,18 +164,21 @@ int client_quota_check(struct obd_export *exp, struct obd_quotactl *oqctl)
         struct client_obd *cli = &exp->exp_obd->u.cli;
         struct ptlrpc_request *req;
         struct obd_quotactl *body;
-        int size = sizeof(*body), opc;
+        int size = sizeof(*body), opc, version;
         int rc;
         ENTRY;
 
-        if (!strcmp(exp->exp_obd->obd_type->typ_name, LUSTRE_MDC_NAME))
+        if (!strcmp(exp->exp_obd->obd_type->typ_name, LUSTRE_MDC_NAME)) {
+                version = LUSTRE_MDS_VERSION;
                 opc = MDS_QUOTACHECK;
-        else if (!strcmp(exp->exp_obd->obd_type->typ_name, LUSTRE_OSC_NAME))
+        } else if (!strcmp(exp->exp_obd->obd_type->typ_name, LUSTRE_OSC_NAME)) {
+                version = LUSTRE_OST_VERSION;
                 opc = OST_QUOTACHECK;
-        else
+        } else {
                 RETURN(-EINVAL);
+        }
 
-        req = ptlrpc_prep_req(class_exp2cliimp(exp), opc, 1, &size,
+        req = ptlrpc_prep_req(class_exp2cliimp(exp), version, opc, 1, &size,
                               NULL);
         if (!req)
                 GOTO(out, rc = -ENOMEM);
index ed1ff48..6d4123f 100644 (file)
@@ -512,7 +512,8 @@ schedule_dqacq(struct obd_device *obd,
 
         /* build dqacq/dqrel request */
         LASSERT(qctxt->lqc_import);
-        req = ptlrpc_prep_req(qctxt->lqc_import, opc, 1, &size, NULL);
+        req = ptlrpc_prep_req(qctxt->lqc_import, LUSTRE_MDS_VERSION, opc, 1,
+                              &size, NULL);
         if (!req) {
                 dqacq_completion(obd, qctxt, qdata, -ENOMEM, opc);
                 RETURN(-ENOMEM);
index 3414763..b30dfc0 100644 (file)
@@ -168,18 +168,22 @@ int client_quota_ctl(struct obd_export *exp, struct obd_quotactl *oqctl)
 {
         struct ptlrpc_request *req;
         struct obd_quotactl *oqc;
-        int size = sizeof(*oqctl), opc;
+        int size = sizeof(*oqctl), opc, version;
         int rc;
         ENTRY;
 
-        if (!strcmp(exp->exp_obd->obd_type->typ_name, LUSTRE_MDC_NAME))
+        if (!strcmp(exp->exp_obd->obd_type->typ_name, LUSTRE_MDC_NAME)) {
                 opc = MDS_QUOTACTL;
-        else if (!strcmp(exp->exp_obd->obd_type->typ_name, LUSTRE_OSC_NAME))
+                version = LUSTRE_MDS_VERSION;
+        } else if (!strcmp(exp->exp_obd->obd_type->typ_name, LUSTRE_OSC_NAME)) {
                 opc = OST_QUOTACTL;
-        else
+                version = LUSTRE_OST_VERSION;
+        } else {
                 RETURN(-EINVAL);
+        }
 
-        req = ptlrpc_prep_req(class_exp2cliimp(exp), opc, 1, &size, NULL);
+        req = ptlrpc_prep_req(class_exp2cliimp(exp), version, opc, 1, &size,
+                              NULL);
         if (!req)
                 GOTO(out, rc = -ENOMEM);