Whamcloud - gitweb
b=3284
authorrread <rread>
Mon, 26 Apr 2004 22:04:10 +0000 (22:04 +0000)
committerrread <rread>
Mon, 26 Apr 2004 22:04:10 +0000 (22:04 +0000)
Add more checks to ensure the OSCC is not removing orphans before
doing precreate.  A new test added, and also some improved debugging
for precreate.

lustre/ChangeLog
lustre/include/linux/lustre_fsfilt.h
lustre/obdfilter/filter.c
lustre/osc/osc_create.c
lustre/osc/osc_request.c
lustre/tests/cfg/insanity-local.sh
lustre/tests/insanity.sh
lustre/tests/replay-single.sh
lustre/tests/test-framework.sh

index 41d8357..78faf6a 100644 (file)
@@ -21,6 +21,7 @@ tbd  Cluster File Systems, Inc. <info@clusterfs.com>
        - initialize RPC timeout timer earlier for 2.6 (3219)
        - don't dereference NULL reply buffer if mdc_close was never sent (2410)
        - print nal/nid for unknown nid (3258)
+       - additional checks for oscc recovery before doing precreate (3284)
        * miscellania
        - allow default OST striping configuration per directory (1414)
        - increase maximum number of MDS request buffers for large systems
index 40e9914..72f3817 100644 (file)
@@ -100,7 +100,7 @@ static inline void *fsfilt_start_log(struct obd_device *obd,
         unsigned long now = jiffies;
         void *parent_handle = oti ? oti->oti_handle : NULL;
         void *handle = obd->obd_fsops->fs_start(inode, op, parent_handle, logs);
-        CDEBUG(D_HA, "started handle %p (%p)\n", handle, parent_handle);
+        CDEBUG(D_INFO, "started handle %p (%p)\n", handle, parent_handle);
 
         if (oti != NULL) {
                 if (parent_handle == NULL) {
@@ -162,7 +162,7 @@ static inline int fsfilt_commit(struct obd_device *obd, struct inode *inode,
 {
         unsigned long now = jiffies;
         int rc = obd->obd_fsops->fs_commit(inode, handle, force_sync);
-        CDEBUG(D_HA, "committing handle %p\n", handle);
+        CDEBUG(D_INFO, "committing handle %p\n", handle);
 
         if (time_after(jiffies, now + 15 * HZ))
                 CERROR("long journal start time %lus\n", (jiffies - now) / HZ);
index 4809e22..eb888cc 100644 (file)
@@ -320,8 +320,8 @@ int filter_update_last_objid(struct obd_device *obd, obd_gr group,
         int rc;
         ENTRY;
 
-        CDEBUG(D_INODE, "server last_objid for group "LPU64": "LPU64"\n",
-               group, filter->fo_last_objids[group]);
+        CDEBUG(D_INODE, "%s: server last_objid for group "LPU64": "LPU64"\n",
+               obd->obd_name, group, filter->fo_last_objids[group]);
 
         tmp = cpu_to_le64(filter->fo_last_objids[group]);
         rc = fsfilt_write_record(obd, filter->fo_last_objid_files[group],
@@ -696,7 +696,7 @@ static int filter_prep_groups(struct obd_device *obd)
                 }
                 filter->fo_last_objids[i] =
                         le64_to_cpu(filter->fo_last_objids[i]);
-                CDEBUG(D_INODE, "%s: server last_objid group %d: "LPU64"\n",
+                CDEBUG(D_HA, "%s: server last_objid group %d: "LPU64"\n",
                        obd->obd_name, i, filter->fo_last_objids[i]);
         }
 
@@ -1755,12 +1755,16 @@ static void filter_destroy_precreated(struct obd_export *exp, struct obdo *oa,
         doa.o_mode = S_IFREG;
 
         last = filter_last_id(filter, &doa);
-        CWARN("deleting orphan objects from "LPU64" to "LPU64"\n",
-               oa->o_id + 1, last);
+        CWARN("%s: deleting orphan objects from "LPU64" to "LPU64"\n",
+               exp->exp_obd->obd_name, oa->o_id + 1, last);
         for (id = oa->o_id + 1; id <= last; id++) {
                 doa.o_id = id;
                 filter_destroy(exp, &doa, NULL, NULL);
         }
+
+        CDEBUG(D_HA, "%s: after destroy: set last_objids[%d] = "LPU64"\n", 
+               exp->exp_obd->obd_name, doa.o_gr, oa->o_id);
+
         spin_lock(&filter->fo_objidlock);
         filter->fo_last_objids[doa.o_gr] = oa->o_id;
         spin_unlock(&filter->fo_objidlock);
@@ -1836,6 +1840,8 @@ static int filter_precreate(struct obd_device *obd, struct obdo *oa,
                 recreate_obj = 1;
         }
 
+        CDEBUG(D_HA, "%s: precreating %d objects\n", obd->obd_name, *num); 
+
         for (i = 0; i < *num && err == 0; i++) {
                 int cleanup_phase = 0;
 
@@ -1870,13 +1876,17 @@ static int filter_precreate(struct obd_device *obd, struct obdo *oa,
                          * already exists
                          */
                         if (recreate_obj) {
-                                CERROR("Serious error: recreating obj %*s but "
-                                       "obj already exists \n",
-                                       dchild->d_name.len, dchild->d_name.name);
+                                CERROR("%s: Serious error: recreating obj %*s "
+                                       "but obj already exists \n",
+                                       obd->obd_name, dchild->d_name.len, 
+                                       dchild->d_name.name);
+                                LBUG();
                         } else {
-                                CERROR("Serious error: objid %*s already "
+                                CERROR("%s: Serious error: objid %*s already "
                                        "exists; is this filesystem corrupt?\n",
-                                       dchild->d_name.len, dchild->d_name.name);
+                                       obd->obd_name, dchild->d_name.len, 
+                                       dchild->d_name.name);
+                                LBUG();
                         }
                         GOTO(cleanup, rc = -EEXIST);
                 }
@@ -1923,7 +1933,11 @@ static int filter_precreate(struct obd_device *obd, struct obdo *oa,
         }
         *num = i;
 
-        CDEBUG(D_INFO, "filter_precreate() created %d objects\n", i);
+        CDEBUG(D_HA, "%s: server last_objid for group "LPU64": "LPU64"\n",
+               obd->obd_name, group, filter->fo_last_objids[group]);
+
+        CDEBUG(D_HA, "%s: filter_precreate() created %d objects\n", 
+               obd->obd_name, i);
         RETURN(rc);
 }
 
index d4db2c7..033b490 100644 (file)
@@ -87,7 +87,7 @@ static int osc_interpret_create(struct ptlrpc_request *req, void *data,
         oscc->oscc_flags &= ~OSCC_FLAG_CREATING;
         spin_unlock(&oscc->oscc_lock);
 
-        CDEBUG(D_INFO, "preallocated through id "LPU64" (last used "LPU64")\n",
+        CDEBUG(D_HA, "preallocated through id "LPU64" (last used "LPU64")\n",
                oscc->oscc_last_id, oscc->oscc_next_id);
 
         wake_up(&oscc->oscc_waitq);
@@ -102,7 +102,8 @@ static int oscc_internal_create(struct osc_creator *oscc)
         ENTRY;
 
         spin_lock(&oscc->oscc_lock);
-        if (oscc->oscc_flags & OSCC_FLAG_CREATING) {
+        if (oscc->oscc_flags & OSCC_FLAG_CREATING ||
+            oscc->oscc_flags & OSCC_FLAG_RECOVERING) {
                 spin_unlock(&oscc->oscc_lock);
                 RETURN(0);
         }
@@ -124,7 +125,7 @@ static int oscc_internal_create(struct osc_creator *oscc)
         spin_lock(&oscc->oscc_lock);
         body->oa.o_id = oscc->oscc_last_id + oscc->oscc_grow_count;
         body->oa.o_valid |= OBD_MD_FLID;
-        CDEBUG(D_INFO, "preallocating through id "LPU64" (last used "LPU64")\n",
+        CDEBUG(D_HA, "preallocating through id "LPU64" (last used "LPU64")\n",
                body->oa.o_id, oscc->oscc_next_id);
         spin_unlock(&oscc->oscc_lock);
 
@@ -235,6 +236,10 @@ int osc_create(struct obd_export *exp, struct obdo *oa,
                 oa->o_valid |= OBD_MD_FLID;
                 oa->o_id = oscc->oscc_next_id - 1;
 
+                CDEBUG(D_HA, "%s: deleting to next_id: "LPU64"\n", 
+                       oscc->oscc_obd->u.cli.cl_import->imp_target_uuid.uuid, 
+                       oa->o_id);
+
                 rc = osc_real_create(exp, oa, ea, NULL);
 
                 spin_lock(&oscc->oscc_lock);
@@ -250,26 +255,29 @@ int osc_create(struct obd_export *exp, struct obdo *oa,
                RETURN(rc);
        }
 
-        /* If orphans are being recovered, then we must wait until it is 
-           finished before we can continue with create. */
-        if (oscc_recovering(oscc)) {
-                struct l_wait_info lwi;
-
-                CDEBUG(D_HA, "%p: oscc recovery in progress, waiting\n", oscc);
-
-                lwi = LWI_TIMEOUT(MAX(obd_timeout * HZ, 1), NULL, NULL);
-                rc = l_wait_event(oscc->oscc_waitq, !oscc_recovering(oscc),
-                                  &lwi);
-                LASSERT(rc == 0 || rc == -ETIMEDOUT);
-                if (rc == -ETIMEDOUT) {
-                        CDEBUG(D_HA, "%p: timed out waiting for recovery\n", oscc);
-                        RETURN(rc);
-                }
-                CDEBUG(D_HA, "%p: oscc recovery over, waking up\n", oscc);
-        }
-        
-        
         while (try_again) {
+                /* If orphans are being recovered, then we must wait until 
+                   it is finished before we can continue with create. */
+                if (oscc_recovering(oscc)) {
+                        struct l_wait_info lwi;
+                        
+                        CDEBUG(D_HA,"%p: oscc recovery in progress, waiting\n", 
+                               oscc);
+                        
+                        lwi = LWI_TIMEOUT(MAX(obd_timeout * HZ, 1), NULL, NULL);
+                        rc = l_wait_event(oscc->oscc_waitq, 
+                                          !oscc_recovering(oscc), &lwi);
+                        LASSERT(rc == 0 || rc == -ETIMEDOUT);
+                        if (rc == -ETIMEDOUT) {
+                                CDEBUG(D_HA, "%p: timed out waiting for "
+                                       "recovery\n", 
+                                       oscc);
+                                RETURN(rc);
+                        }
+                        CDEBUG(D_HA, "%p: oscc recovery over, waking up\n", 
+                               oscc);
+                }
+                
                 spin_lock(&oscc->oscc_lock);
                 if (oscc->oscc_last_id >= oscc->oscc_next_id) {
                         memcpy(oa, &oscc->oscc_oa, sizeof(*oa));
@@ -290,7 +298,9 @@ int osc_create(struct obd_export *exp, struct obdo *oa,
         }
 
         if (rc == 0)
-                CDEBUG(D_INFO, "returning objid "LPU64"\n", lsm->lsm_object_id);
+                CDEBUG(D_HA, "%s: returning objid "LPU64"\n", 
+                       oscc->oscc_obd->u.cli.cl_import->imp_target_uuid.uuid, 
+                       lsm->lsm_object_id);
         else if (*ea == NULL)
                 obd_free_memmd(exp, &lsm);
         RETURN(rc);
index 8e65721..bc1312b 100644 (file)
@@ -2707,7 +2707,7 @@ static int osc_set_info(struct obd_export *exp, obd_count keylen,
                 if (vallen != sizeof(obd_id))
                         RETURN(-EINVAL);
                obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
-                CDEBUG(D_INODE, "%s: set oscc_next_id = "LPU64"\n",
+                CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
                        exp->exp_obd->obd_name,
                        obd->u.cli.cl_oscc.oscc_next_id);
 
index c6de54c..2ca1485 100644 (file)
@@ -10,8 +10,8 @@ FAIL_CLIENTS=${FAIL_CLIENTS:-""}
 
 NETTYPE=${NETTYPE:-tcp}
 TIMEOUT=${TIMEOUT:-30}
-PTLDEBUG=${PTLDEBUG:-0}
-SUBSYSTEM=${SUBSYSTEM:-0}
+PTLDEBUG=${PTLDEBUG:-0x3f0400}
+SUBSYSTEM=${SUBSYSTEM:- 0xffb7e3ff}
 MOUNT=${MOUNT:-"/mnt/lustre"}
 #CLIENT_UPCALL=${CLIENT_UPCALL:-`pwd`/client-upcall-mdev.sh}
 UPCALL=${CLIENT_UPCALL:-`pwd`/replay-single-upcall.sh}
index 7ad2c1c..93a6e56 100755 (executable)
@@ -156,7 +156,7 @@ client_touch() {
     file=$1
     for c in $LIVE_CLIENT $FAIL_CLIENTS;  do
        if echo $DOWN_CLIENTS | grep -q $c; then continue; fi
-       $PDSH $c touch $MOUNT/${c}_$file
+       $PDSH $c touch $MOUNT/${c}_$file || return 1
     done
 }
 
index 3425372..5a1dd22 100755 (executable)
@@ -79,6 +79,15 @@ test_0() {
 }
 run_test 0 "empty replay"
 
+test_0b() {
+    # this test attempts to trigger a race in the precreation code, 
+    # and must run before any other objects are created on the filesystem
+    fail ost
+    createmany -o $DIR/$tfile 20 || return 1
+    unlinkmany $DIR/$tfile 20 || return 2
+}
+run_test 0b "ensure object created after recover exists. (3284)"
+
 test_1() {
     replay_barrier mds
     mcreate $DIR/$tfile
index 791b523..33f9786 100644 (file)
@@ -27,7 +27,7 @@ usage() {
 init_test_env() {
     export LUSTRE=`absolute_path $LUSTRE`
     export TESTSUITE=`basename $0 .sh`
-    export XMLCONFIG="${TESTSUITE}.xml"
+    export XMLCONFIG=${XMLCONFIG:-${TESTSUITE}.xml}
     export LTESTDIR=${LTESTDIR:-$LUSTRE/../ltest}
 
     [ -d /r ] && export ROOT=/r