Whamcloud - gitweb
Branch b1_8
authorhuanghua <huanghua>
Thu, 5 Feb 2009 12:30:29 +0000 (12:30 +0000)
committerhuanghua <huanghua>
Thu, 5 Feb 2009 12:30:29 +0000 (12:30 +0000)
b=11824
i=rahul.deshmukh
i=yong.fan

interop - enable recovery between 1.8 client and 2.0 server.

12 files changed:
lustre/include/lustre/lustre_idl.h
lustre/include/obd.h
lustre/ldlm/ldlm_request.c
lustre/llite/llite_internal.h
lustre/llite/llite_lib.c
lustre/llite/namei.c
lustre/mdc/mdc_locks.c
lustre/mdc/mdc_request.c
lustre/obdfilter/filter.c
lustre/ptlrpc/import.c
lustre/tests/replay-single.sh
lustre/tests/test-framework.sh

index ef61a12..7e97469 100644 (file)
@@ -961,6 +961,29 @@ static inline int fid_is_idif(const struct lu_fid *fid)
 }
 
 /**
+ * Check if a fid sequence is sane or not
+ * \param seq the sequence to be tested.
+ * \return true if the sequence is a sane sequence; otherwise false.
+ */
+static inline int fid_seq_is_sane(__u64 seq)
+{
+        return seq != 0;
+}
+
+/**
+ * Check if a fid is sane or not
+ * \param fid the fid to be tested.
+ * \return true if the fid is sane; otherwise false.
+ */
+static inline int fid_is_sane(const struct lu_fid *fid)
+{
+        return
+                fid != NULL &&
+                ((fid_seq_is_sane(fid_seq(fid)) && fid_oid(fid) != 0
+                                                && fid_ver(fid) == 0) ||
+                fid_is_igif(fid));
+}
+/**
  * Check if a fid is zero.
  * \param fid the fid to be tested.
  * \return true if the fid is zero; otherwise false. 
@@ -994,7 +1017,7 @@ static inline __u32 lu_igif_gen(const struct lu_fid *fid)
  * Check if two fids are equal or not.
  * \param f0 the first fid
  * \param f1 the second fid
- * \return true if the two fids are equal; otherwise false. 
+ * \return true if the two fids are equal; otherwise false.
  */
 static inline int lu_fid_eq(const struct lu_fid *f0,
                             const struct lu_fid *f1)
index c050d37..60a5d29 100644 (file)
@@ -1033,6 +1033,7 @@ enum obd_cleanup_stage {
 #define KEY_FIEMAP              "fiemap"
 /* XXX unused */
 #define KEY_ASYNC               "async"
+#define KEY_CAPA_KEY            "capa_key"
 
 struct obd_ops {
         struct module *o_owner;
index 5e77ff9..d2b2985 100644 (file)
@@ -1922,10 +1922,11 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
 
         ldlm_lock2handle(lock, &body->lock_handle[0]);
         size[DLM_LOCKREPLY_OFF] = sizeof(*reply);
+        buffers = 3;
         if (lock->l_lvb_len != 0) {
-                buffers = 3;
                 size[DLM_REPLY_REC_OFF] = lock->l_lvb_len;
-        }
+        } else
+                size[DLM_REPLY_REC_OFF] = sizeof (struct ost_lvb);
         ptlrpc_req_set_repsize(req, buffers, size);
 
         LDLM_DEBUG(lock, "replaying lock:");
index 522f9ce..f50c804 100644 (file)
@@ -1173,6 +1173,8 @@ void ll_iocontrol_unregister(void *magic);
 
 ino_t ll_fid_build_ino(struct ll_sb_info *sbi,
                        struct ll_fid *fid);
+__u32 ll_fid_build_gen(struct ll_sb_info *sbi,
+                       struct ll_fid *fid);
 
 #endif
 
index 8c1de03..85503dd 100644 (file)
@@ -1958,8 +1958,8 @@ void ll_update_inode(struct inode *inode, struct lustre_md *md)
 #endif
 
         inode->i_ino = ll_fid_build_ino(sbi, &body->fid1);
-        if (body->valid & OBD_MD_FLGENER)
-                inode->i_generation = body->generation;
+        inode->i_generation = ll_fid_build_gen(sbi, &body->fid1);
+        *ll_inode_lu_fid(inode) = *((struct lu_fid*)&md->body->fid1);
 
         if (body->valid & OBD_MD_FLATIME) {
                 if (body->atime > LTIME_S(inode->i_atime))
index 0a16d65..5b0c8bc 100644 (file)
@@ -95,18 +95,30 @@ ino_t ll_fid_build_ino(struct ll_sb_info *sbi,
 
 }
 
+__u32 ll_fid_build_gen(struct ll_sb_info *sbi, struct ll_fid *fid)
+{
+        __u32 gen = 0;
+        ENTRY;
+
+        if (fid_is_igif((struct lu_fid*)fid)) {
+                gen = lu_igif_gen((struct lu_fid*)fid);
+        }
+        RETURN(gen);
+}
+
 /* called from iget5_locked->find_inode() under inode_lock spinlock */
 static int fid_test_inode(struct inode *inode, void *opaque)
 {
         struct lustre_md     *md = opaque;
+        struct lu_fid        *fid = (struct lu_fid*)&md->body->fid1;
 
         if (unlikely(!(md->body->valid & OBD_MD_FLID))) {
                 CERROR("MDS body missing FID\n");
                 return 0;
         }
 
-        return lu_fid_eq(ll_inode_lu_fid(inode),
-                         (struct lu_fid*)&md->body->fid1);
+        return fid_seq(ll_inode_lu_fid(inode)) == fid_seq(fid) &&
+               fid_oid(ll_inode_lu_fid(inode)) == fid_oid(fid);
 }
 
 static int fid_set_inode(struct inode *inode, void *opaque)
index 605ec48..02c350b 100644 (file)
@@ -198,10 +198,16 @@ static void mdc_realloc_openmsg(struct ptlrpc_request *req,
         int old_len, new_size, old_size;
         struct lustre_msg *old_msg = req->rq_reqmsg;
         struct lustre_msg *new_msg;
+        int offset;
 
-        old_len = lustre_msg_buflen(old_msg, DLM_INTENT_REC_OFF + 2);
+        if (mdc_req_is_2_0_server(req))
+                offset = 4;
+        else
+                offset = 2;
+
+        old_len = lustre_msg_buflen(old_msg, DLM_INTENT_REC_OFF + offset);
         old_size = lustre_packed_msg_size(old_msg);
-        lustre_msg_set_buflen(old_msg, DLM_INTENT_REC_OFF + 2,
+        lustre_msg_set_buflen(old_msg, DLM_INTENT_REC_OFF + offset,
                               body->eadatasize);
         new_size = lustre_packed_msg_size(old_msg);
 
@@ -218,7 +224,8 @@ static void mdc_realloc_openmsg(struct ptlrpc_request *req,
 
                 OBD_FREE(old_msg, old_size);
         } else {
-                lustre_msg_set_buflen(old_msg, DLM_INTENT_REC_OFF + 2, old_len);
+                lustre_msg_set_buflen(old_msg,
+                                      DLM_INTENT_REC_OFF + offset, old_len);
                 body->valid &= ~OBD_MD_FLEASIZE;
                 body->eadatasize = 0;
         }
@@ -270,7 +277,7 @@ static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
                 size[DLM_INTENT_REC_OFF+1] = 0; /* capa */
                 bufcount = 8;
                 repsize[DLM_REPLY_REC_OFF+3]=sizeof(struct lustre_capa);
-                repsize[DLM_REPLY_REC_OFF+4]=sizeof(struct lustre_capa); 
+                repsize[DLM_REPLY_REC_OFF+4]=sizeof(struct lustre_capa);
                 repbufcount = 7;
         }
         rc = lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic,
@@ -541,9 +548,14 @@ static int mdc_finish_enqueue(struct obd_export *exp,
                          * large enough request buffer above we need to
                          * reallocate it here to hold the actual LOV EA. */
                         if (it->it_op & IT_OPEN) {
-                                int offset = DLM_INTENT_REC_OFF + 2;
+                                int offset = DLM_INTENT_REC_OFF;
                                 void *lmm;
 
+                                if (mdc_req_is_2_0_server(req))
+                                        offset += 4;
+                                else
+                                        offset += 2;
+
                                 if (lustre_msg_buflen(req->rq_reqmsg, offset) <
                                     body->eadatasize)
                                         mdc_realloc_openmsg(req, body);
index 462b2b6..61671bd 100644 (file)
@@ -396,9 +396,9 @@ int mdc_getxattr(struct obd_export *exp, struct ll_fid *fid,
                                 input, input_size, output_size, 0, request);
 }
 
-/* This should be called with both the request and the reply still packed. */
-void mdc_store_inode_generation(struct ptlrpc_request *req, int reqoff,
-                                int repoff)
+/* For the fid-less server */
+static void mdc_store_inode_generation_18(struct ptlrpc_request *req,
+                                          int reqoff, int repoff)
 {
         struct mds_rec_create *rec = lustre_msg_buf(req->rq_reqmsg, reqoff,
                                                     sizeof(*rec));
@@ -419,6 +419,41 @@ void mdc_store_inode_generation(struct ptlrpc_request *req, int reqoff,
                   rec->cr_replayfid.generation, rec->cr_replayfid.id);
 }
 
+static void mdc_store_inode_generation_20(struct ptlrpc_request *req,
+                                          int reqoff, int repoff)
+{
+        struct mdt_rec_create *rec = lustre_msg_buf(req->rq_reqmsg, reqoff,
+                                                    sizeof(*rec));
+        struct mdt_body *body = lustre_msg_buf(req->rq_repmsg, repoff,
+                                               sizeof(*body));
+
+        LASSERT (rec != NULL);
+        LASSERT (body != NULL);
+
+        rec->cr_fid2 = body->fid1;
+        rec->cr_ioepoch = body->ioepoch;
+        rec->cr_old_handle.cookie = body->handle.cookie;
+
+        if (!fid_is_sane(&body->fid1)) {
+                DEBUG_REQ(D_ERROR, req, "saving replay request with"
+                          "insane fid");
+                LBUG();
+        }
+
+        DEBUG_REQ(D_INODE, req, "storing generation %u for ino "LPU64,
+                  rec->cr_fid1.f_oid, rec->cr_fid2.f_seq);
+}
+
+/* This should be called with both the request and the reply still packed. */
+void mdc_store_inode_generation(struct ptlrpc_request *req, int reqoff,
+                                int repoff)
+{
+        if (mdc_req_is_2_0_server(req))
+                mdc_store_inode_generation_20(req, reqoff, repoff);
+        else
+                mdc_store_inode_generation_18(req, reqoff, repoff);
+}
+
 #ifdef CONFIG_FS_POSIX_ACL
 static
 int mdc_unpack_acl(struct obd_export *exp, struct ptlrpc_request *req,
@@ -569,7 +604,7 @@ static void mdc_commit_open(struct ptlrpc_request *req)
         if (mod->mod_och != NULL)
                 mod->mod_och->och_mod = NULL;
 
-        OBD_FREE(mod, sizeof(*mod));
+        OBD_FREE_PTR(mod);
         req->rq_cb_data = NULL;
 }
 
@@ -601,28 +636,99 @@ static void mdc_replay_open(struct ptlrpc_request *req)
                 file_fh = &och->och_fh;
                 CDEBUG(D_RPCTRACE, "updating handle from "LPX64" to "LPX64"\n",
                        file_fh->cookie, body->handle.cookie);
-                memcpy(&old, file_fh, sizeof(old));
-                memcpy(file_fh, &body->handle, sizeof(*file_fh));
+                old = *file_fh;
+                *file_fh = body->handle;
         }
 
         close_req = mod->mod_close_req;
+
         if (close_req != NULL) {
-                struct mds_body *close_body;
                 LASSERT(lustre_msg_get_opc(close_req->rq_reqmsg) == MDS_CLOSE);
-                close_body = lustre_msg_buf(close_req->rq_reqmsg, REQ_REC_OFF,
-                                            sizeof(*close_body));
-                if (och != NULL)
-                        LASSERT(!memcmp(&old, &close_body->handle, sizeof old));
-                DEBUG_REQ(D_RPCTRACE, close_req, "updating close with new fh");
-                memcpy(&close_body->handle, &body->handle,
-                       sizeof(close_body->handle));
+                if (mdc_req_is_2_0_server(close_req)) {
+                        struct mdt_epoch *epoch = NULL;
+
+                        epoch = lustre_msg_buf(close_req->rq_reqmsg,
+                                               REQ_REC_OFF, sizeof(*epoch));
+                        LASSERT(epoch);
+                        if (och != NULL)
+                                LASSERT(!memcmp(&old, &epoch->handle,
+                                        sizeof(old)));
+                        DEBUG_REQ(D_RPCTRACE, close_req,
+                                  "updating close with new fh");
+                        epoch->handle = body->handle;
+                 } else {
+                        struct mds_body *close_body = NULL;
+
+                        close_body = lustre_msg_buf(close_req->rq_reqmsg,
+                                                    REQ_REC_OFF,
+                                                    sizeof(*close_body));
+                        if (och != NULL)
+                                LASSERT(!memcmp(&old, &close_body->handle,
+                                        sizeof(old)));
+                        DEBUG_REQ(D_RPCTRACE, close_req,
+                                  "updating close with new fh");
+                        close_body->handle = body->handle;
+                 }
         }
 
         EXIT;
 }
 
-void mdc_set_open_replay_data(struct obd_client_handle *och,
-                              struct ptlrpc_request *open_req)
+static void mdc_set_open_replay_data_20(struct obd_client_handle *och,
+                                        struct ptlrpc_request *open_req)
+{
+       struct mdc_open_data  *mod;
+        struct obd_import     *imp = open_req->rq_import;
+        struct mdt_rec_create *rec = lustre_msg_buf(open_req->rq_reqmsg,
+                                                    DLM_INTENT_REC_OFF,
+                                                    sizeof(*rec));
+        struct mdt_body       *body = lustre_msg_buf(open_req->rq_repmsg,
+                                                     DLM_REPLY_REC_OFF,
+                                                     sizeof(*body));
+
+        /* If request is not eligible for replay, just bail out */
+        if (!open_req->rq_replay)
+                return;
+
+        /* incoming message in my byte order (it's been swabbed) */
+        LASSERT(rec != NULL);
+        LASSERT(lustre_rep_swabbed(open_req, DLM_REPLY_REC_OFF));
+        /* outgoing messages always in my byte order */
+        LASSERT(body != NULL);
+
+        /* Only if the import is replayable, we set replay_open data */
+        if (och && imp->imp_replayable) {
+                OBD_ALLOC_PTR(mod);
+                if (mod == NULL) {
+                        DEBUG_REQ(D_ERROR, open_req,
+                                  "can't allocate mdc_open_data");
+                        return;
+                }
+
+                spin_lock(&open_req->rq_lock);
+                och->och_mod = mod;
+                mod->mod_och = och;
+                mod->mod_open_req = open_req;
+                open_req->rq_cb_data = mod;
+                open_req->rq_commit_cb = mdc_commit_open;
+                spin_unlock(&open_req->rq_lock);
+        }
+
+        rec->cr_fid2 = body->fid1;
+        rec->cr_ioepoch = body->ioepoch;
+        rec->cr_old_handle.cookie = body->handle.cookie;
+        open_req->rq_replay_cb = mdc_replay_open;
+        if (!fid_is_sane(&body->fid1)) {
+                DEBUG_REQ(D_ERROR, open_req, "saving replay request with "
+                          "insane fid");
+                LBUG();
+        }
+
+        DEBUG_REQ(D_RPCTRACE, open_req, "set up replay data");
+}
+
+static void mdc_set_open_replay_data_18(struct obd_client_handle *och,
+                                        struct ptlrpc_request *open_req)
 {
         struct mdc_open_data *mod;
         struct mds_rec_create *rec = lustre_msg_buf(open_req->rq_reqmsg,
@@ -669,6 +775,15 @@ void mdc_set_open_replay_data(struct obd_client_handle *och,
         DEBUG_REQ(D_RPCTRACE, open_req, "set up replay data");
 }
 
+void mdc_set_open_replay_data(struct obd_client_handle *och,
+                              struct ptlrpc_request *open_req)
+{
+        if (mdc_req_is_2_0_server(open_req))
+                mdc_set_open_replay_data_20(och, open_req);
+        else
+                mdc_set_open_replay_data_18(och, open_req);
+}
+
 void mdc_clear_open_replay_data(struct obd_client_handle *och)
 {
         struct mdc_open_data *mod = och->och_mod;
index c4599bf..b0f4883 100644 (file)
@@ -3812,6 +3812,10 @@ static int filter_set_info_async(struct obd_export *exp, __u32 keylen,
                 RETURN(-EINVAL);
         }
 
+        if (KEY_IS(KEY_CAPA_KEY)) {
+                RETURN(0);
+        }
+
         if (!KEY_IS(KEY_MDS_CONN))
                 RETURN(-EINVAL);
 
index 7a272fb..f1017dc 100644 (file)
@@ -529,9 +529,31 @@ static int import_select_connection(struct obd_import *imp)
         RETURN(0);
 }
 
+/**
+ * must be called under imp lock
+ */
+static int ptlrpc_first_transno(struct obd_import *imp, __u64 *transno)
+{
+        struct ptlrpc_request *req;
+        struct list_head *tmp;
+
+        if (list_empty(&imp->imp_replay_list))
+                return 0;
+        tmp = imp->imp_replay_list.next;
+        req = list_entry(tmp, struct ptlrpc_request, rq_replay_list);
+        *transno = req->rq_transno;
+        if (req->rq_transno == 0) {
+                DEBUG_REQ(D_ERROR, req, "zero transno in replay");
+                LBUG();
+        }
+
+        return 1;
+}
+
 int ptlrpc_connect_import(struct obd_import *imp, char *new_uuid)
 {
         struct obd_device *obd = imp->imp_obd;
+        int set_transno = 0;
         int initial_connect = 0;
         int rc;
         __u64 committed_before_reconnect = 0;
@@ -574,6 +596,9 @@ int ptlrpc_connect_import(struct obd_import *imp, char *new_uuid)
         else
                 committed_before_reconnect = imp->imp_peer_committed_transno;
 
+        set_transno = ptlrpc_first_transno(imp,
+                                           &imp->imp_connect_data.ocd_transno);
+
         spin_unlock(&imp->imp_lock);
 
         if (new_uuid) {
@@ -667,6 +692,10 @@ int ptlrpc_connect_import(struct obd_import *imp, char *new_uuid)
                                         MSG_CONNECT_INITIAL);
         }
 
+        if (set_transno)
+                lustre_msg_add_op_flags(request->rq_reqmsg,
+                                        MSG_CONNECT_TRANSNO);
+
         DEBUG_REQ(D_RPCTRACE, request, "%sconnect request %d",
                   aa->pcaa_initial_connect ? "initial " : "re",
                   imp->imp_conn_cnt);
@@ -1160,7 +1189,11 @@ static int signal_completed_replay(struct obd_import *imp)
 
         ptlrpc_req_set_repsize(req, 1, NULL);
         req->rq_send_state = LUSTRE_IMP_REPLAY_WAIT;
-        lustre_msg_add_flags(req->rq_reqmsg, MSG_LAST_REPLAY);
+        lustre_msg_add_flags(req->rq_reqmsg,
+                             MSG_LOCK_REPLAY_DONE |
+                             MSG_REQ_REPLAY_DONE |
+                             MSG_LAST_REPLAY);
+
         if (imp->imp_delayed_recovery)
                 lustre_msg_add_flags(req->rq_reqmsg, MSG_DELAY_REPLAY);
         req->rq_timeout *= 3;
index bcd1817..40e5531 100755 (executable)
@@ -412,6 +412,11 @@ test_20b() { # bug 10480
     df -P $DIR || df -P $DIR || true    # reconnect
     wait_mds_recovery_done || error "MDS recovery not done"
 
+    # For interop with 2.0 only:
+    # FIXME just because recovery is done doesn't mean we've finished
+    # orphan cleanup.  Fake it with a sleep for now...
+    sleep 10
+
     AFTERUSED=`df -P $DIR | tail -1 | awk '{ print $3 }'`
     log "before $BEFOREUSED, after $AFTERUSED"
     [ $AFTERUSED -gt $((BEFOREUSED + 20)) ] && \
index 98d7dc4..51059f7 100644 (file)
@@ -663,13 +663,14 @@ wait_for() {
 
 wait_mds_recovery_done () {
     local timeout=`do_facet mds lctl get_param  -n timeout`
+    local mdtdevice=$(get_mds_mdt_device_proc_path)
 #define OBD_RECOVERY_TIMEOUT (obd_timeout * 5 / 2)
 # as we are in process of changing obd_timeout in different ways
 # let's set MAX longer than that
     local MAX=$(( timeout * 4 ))
     local WAIT=0
     while [ $WAIT -lt $MAX ]; do
-        STATUS=`do_facet mds "lctl get_param -n mds.*-MDT*.recovery_status | grep status"`
+        STATUS=`do_facet mds "lctl get_param -n ${mdtdevice}.*-MDT*.recovery_status | grep status"`
         echo $STATUS | grep COMPLETE && return 0
         sleep 5
         WAIT=$((WAIT + 5))
@@ -799,7 +800,8 @@ replay_barrier_nodf() {
 
 mds_evict_client() {
     UUID=`lctl get_param -n mdc.${mds_svc}-mdc-*.uuid`
-    do_facet mds "lctl set_param -n mds.${mds_svc}.evict_client $UUID"
+    local mdtdevice=$(get_mds_mdt_device_proc_path)
+    do_facet mds "lctl set_param -n ${mdtdevice}.${mds_svc}.evict_client $UUID"
 }
 
 ost_evict_client() {
@@ -2152,3 +2154,15 @@ get_mdtosc_proc_path() {
         echo "${ost}-osc-MDT0000"
     fi
 }
+
+get_mds_mdt_device_proc_path() {
+    local major=$(get_mds_version_major)
+    local minor=$(get_mds_version_minor)
+    if [ $major -le 1 -a $minor -le 8 ] ; then
+        echo "mds"
+    else
+        echo "mdt"
+    fi
+}
+
+