Whamcloud - gitweb
This patch is to slove OSS hangs after "All ost request buffers busy"
authortianzy <tianzy>
Thu, 5 Apr 2007 09:20:12 +0000 (09:20 +0000)
committertianzy <tianzy>
Thu, 5 Apr 2007 09:20:12 +0000 (09:20 +0000)
It is a deadlock between quota and journal.
b=11693
r=green
r=adilger

lustre/include/lustre/lustre_idl.h
lustre/include/lustre_quota.h
lustre/include/obd_support.h
lustre/obdfilter/filter_io_26.c
lustre/quota/quota_context.c
lustre/quota/quota_interface.c
lustre/quota/quota_internal.h
lustre/tests/sanity-quota.sh
lustre/tests/test-framework.sh

index 149610b..38acc4d 100644 (file)
@@ -1416,4 +1416,10 @@ typedef enum {
 } quota_cmd_t;
 
 #define JOIN_FILE_ALIGN 4096
+
+/* quota check function */
+#define QUOTA_RET_OK           0 /* return successfully */
+#define QUOTA_RET_NOQUOTA      1 /* not support quota */
+#define QUOTA_RET_NOLIMIT      2 /* quota limit isn't set */
+#define QUOTA_RET_ACQUOTA      3 /* need to acquire extra quota */
 #endif
index 05b08d1..cf81f60 100644 (file)
@@ -203,6 +203,11 @@ typedef struct {
         
         /* For quota slave, acquire/release quota from master if needed */
         int (*quota_acquire) (struct obd_device *, unsigned int, unsigned int);
+
+        /* For quota slave, check whether specified uid/gid's remaining quota
+         * can finish a write rpc */
+        int (*quota_chkquota) (struct obd_device *, unsigned int, unsigned int,
+                               int);
         
         /* For quota client, poll if the quota check done */
         int (*quota_poll_check) (struct obd_export *, struct if_quotacheck *);
@@ -392,6 +397,19 @@ static inline int lquota_acquire(quota_interface_t *interface,
         RETURN(rc);
 }
 
+static inline int lquota_chkquota(quota_interface_t *interface,
+                                  struct obd_device *obd,
+                                  unsigned int uid, unsigned int gid,
+                                  int npage)
+{
+        int rc;
+        ENTRY;
+        
+        QUOTA_CHECK_OP(interface, chkquota);
+        rc = QUOTA_OP(interface, chkquota)(obd, uid, gid, npage);
+        RETURN(rc);
+}
+
 int lprocfs_rd_bunit(char *page, char **start, off_t off, int count, 
                      int *eof, void *data);
 int lprocfs_rd_iunit(char *page, char **start, off_t off, int count, 
index 1b31dd4..5fc474a 100644 (file)
@@ -132,6 +132,7 @@ extern int obd_race_state;
 #define OBD_FAIL_OST_BRW_SIZE            0x21c
 #define OBD_FAIL_OST_DROP_REQ            0x21d
 #define OBD_FAIL_OST_SETATTR_CREDITS     0x21e
+#define OBD_FAIL_OST_HOLD_WRITE_RPC      0x21f
 
 #define OBD_FAIL_LDLM                    0x300
 #define OBD_FAIL_LDLM_NAMESPACE_NEW      0x301
index f5ca9b8..73bb316 100644 (file)
@@ -514,24 +514,11 @@ int filter_direct_io(int rw, struct dentry *dchild, struct filter_iobuf *iobuf,
                 lquota_enforce(filter_quota_interface_ref, obd,
                                iobuf->dr_ignore_quota);
         }
-remap:
+
         rc = fsfilt_map_inode_pages(obd, inode, iobuf->dr_pages,
                                     iobuf->dr_npages, iobuf->dr_blocks,
                                     obdfilter_created_scratchpad, create, sem);
 
-        if (rc == -EDQUOT) {
-                LASSERT(rw == OBD_BRW_WRITE &&
-                        !cap_raised(current->cap_effective, CAP_SYS_RESOURCE));
-
-                /* Unfortunately, if quota master is too busy to handle the
-                 * pre-dqacq in time or this user has exceeded quota limit, we
-                 * have to wait for the completion of in flight dqacq/dqrel,
-                 * then try again */
-                if (lquota_acquire(filter_quota_interface_ref, obd,
-                                   inode->i_uid, inode->i_gid))
-                        goto remap;
-        }
-
         if (rw == OBD_BRW_WRITE) {
                 if (rc == 0) {
                         filter_tally_write(exp, iobuf->dr_pages,
@@ -606,7 +593,7 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa,
         int i, err, cleanup_phase = 0;
         struct obd_device *obd = exp->exp_obd;
         void *wait_handle;
-        int   total_size = 0;
+        int   total_size = 0, rc2;
         unsigned int qcids[MAXQUOTAS] = {0, 0};
         ENTRY;
 
@@ -617,6 +604,22 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa,
         if (rc != 0)
                 GOTO(cleanup, rc);
 
+        /* Unfortunately, if quota master is too busy to handle the
+         * pre-dqacq in time and quota hash on ost is used up, we
+         * have to wait for the completion of in flight dqacq/dqrel,
+         * then try again */
+        if ((rc2 = lquota_chkquota(filter_quota_interface_ref, obd, oa->o_uid,
+                                   oa->o_gid, niocount)) == QUOTA_RET_ACQUOTA) {
+                OBD_FAIL_TIMEOUT(OBD_FAIL_OST_HOLD_WRITE_RPC, 90);
+                lquota_acquire(filter_quota_interface_ref, obd, oa->o_uid,
+                               oa->o_gid);
+        }
+
+        if (rc2 < 0) {
+                rc = rc2;
+                GOTO(cleanup, rc);
+        }
+
         iobuf = filter_iobuf_get(&obd->u.filter, oti);
         if (IS_ERR(iobuf))
                 GOTO(cleanup, rc = PTR_ERR(iobuf));
index e601840..f6340b3 100644 (file)
@@ -221,6 +221,59 @@ out:
         return ret;
 }
 
+/* compute the remaining quota for certain gid or uid b=11693 */
+int compute_remquota(struct obd_device *obd,
+                     struct lustre_quota_ctxt *qctxt, struct qunit_data *qdata)
+{
+        struct super_block *sb = qctxt->lqc_sb;
+        __u64 usage, limit;
+        struct obd_quotactl *qctl;
+        int ret = QUOTA_RET_OK;
+        __u32 qdata_type = qdata->qd_flags & QUOTA_IS_GRP;
+        ENTRY;
+
+        if (!sb_any_quota_enabled(sb))
+                RETURN(QUOTA_RET_NOQUOTA);
+
+        /* ignore root user */
+        if (qdata->qd_id == 0 && qdata_type == USRQUOTA)
+                RETURN(QUOTA_RET_NOLIMIT);
+
+        OBD_ALLOC_PTR(qctl);
+        if (qctl == NULL) 
+                RETURN(-ENOMEM);
+
+        /* get fs quota usage & limit */
+        qctl->qc_cmd = Q_GETQUOTA;
+        qctl->qc_id = qdata->qd_id;
+        qctl->qc_type = qdata_type;
+        ret = fsfilt_quotactl(obd, sb, qctl);
+        if (ret) {
+                if (ret == -ESRCH)      /* no limit */
+                        ret = QUOTA_RET_NOLIMIT;
+                else
+                        CDEBUG(D_QUOTA, "can't get fs quota usage! (rc:%d)", 
+                               ret);
+                GOTO(out, ret);
+        }
+
+        usage = qctl->qc_dqblk.dqb_curspace;
+        limit = qctl->qc_dqblk.dqb_bhardlimit << QUOTABLOCK_BITS;
+        if (!limit){            /* no limit */
+                ret = QUOTA_RET_NOLIMIT;
+                GOTO(out, ret);
+        }
+
+        if (limit >= usage)
+                qdata->qd_count = limit - usage;
+        else
+                qdata->qd_count = 0;
+        EXIT;
+out:
+        OBD_FREE_PTR(qctl);
+        return ret;
+}
+
 /* caller must hold qunit_hash_lock */
 static struct lustre_qunit *dqacq_in_flight(struct lustre_quota_ctxt *qctxt,
                                             struct qunit_data *qdata)
index 8765119..5c915f4 100644 (file)
@@ -393,6 +393,38 @@ static int filter_quota_acquire(struct obd_device *obd, unsigned int uid,
         RETURN(rc == -EAGAIN);
 }
 
+/* check whether the left quota of certain uid and uid can satisfy a write rpc
+ * when need to acquire quota, return QUOTA_RET_ACQUOTA */
+static int filter_quota_check(struct obd_device *obd, unsigned int uid, 
+                              unsigned int gid, int npage)
+{
+        struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
+        int i;
+        __u32 id[MAXQUOTAS] = { uid, gid };
+        struct qunit_data qdata[MAXQUOTAS];
+        int rc;
+        ENTRY;
+
+        CLASSERT(MAXQUOTAS < 4);
+        if (!sb_any_quota_enabled(qctxt->lqc_sb))
+                RETURN(0);
+
+        for (i = 0; i < MAXQUOTAS; i++) {
+                qdata[i].qd_id = id[i];
+                qdata[i].qd_flags = i;
+                qdata[i].qd_flags |= QUOTA_IS_BLOCK;
+                qdata[i].qd_count = 0;
+
+                qctxt_wait_pending_dqacq(qctxt, id[i], i, 1);
+                rc = compute_remquota(obd, qctxt, &qdata[i]);
+                if (rc == QUOTA_RET_OK && 
+                    qdata[i].qd_count < npage * CFS_PAGE_SIZE)
+                        RETURN(QUOTA_RET_ACQUOTA);
+        }
+
+        RETURN(rc);
+}
+
 static int mds_quota_init(void)
 {
         return lustre_dquot_init();
@@ -680,6 +712,7 @@ quota_interface_t filter_quota_interface = {
         .quota_getflag  = filter_quota_getflag,
         .quota_acquire  = filter_quota_acquire,
         .quota_adjust   = filter_quota_adjust,
+        .quota_chkquota = filter_quota_check,
 };
 #endif /* __KERNEL__ */
 
index 5756e88..9550f52 100644 (file)
@@ -63,6 +63,8 @@ int qctxt_init(struct lustre_quota_ctxt *qctxt, struct super_block *sb,
 void qctxt_cleanup(struct lustre_quota_ctxt *qctxt, int force);
 void qslave_start_recovery(struct obd_device *obd, 
                            struct lustre_quota_ctxt *qctxt);
+int compute_remquota(struct obd_device *obd,
+                     struct lustre_quota_ctxt *qctxt, struct qunit_data *qdata);
 /* quota_master.c */
 int lustre_dquot_init(void);
 void lustre_dquot_exit(void);
index 209c365..37b3721 100644 (file)
@@ -35,8 +35,11 @@ LFS=${LFS:-lfs}
 LCTL=${LCTL:-lctl}
 SETSTRIPE=${SETSTRIPE:-"$LFS setstripe"}
 TSTID=${TSTID:-60000}
+TSTID2=${TSTID2:-60001}
 RUNAS=${RUNAS:-"runas -u $TSTID"}
+RUNAS2=${RUNAS2:-"runas -u $TSTID2"}
 TSTUSR=${TSTUSR:-"quota_usr"}
+TSTUSR2=${TSTUSR2:-"quota_2usr"}
 BLK_SZ=1024
 BUNIT_SZ=${BUNIT_SZ:-1000}     # default 1000 quota blocks
 BTUNE_SZ=${BTUNE_SZ:-500}      # default 50% of BUNIT_SZ
@@ -136,14 +139,30 @@ pass() {
 }
 
 mounted_lustre_filesystems() {
-       awk '($3 ~ "lustre" && $1 ~ ":") { print $2 }' /proc/mounts
+       awk '($3 ~ "lustre" && $1 ~ ":") { print $2 }' /proc/mounts | sed -n $1p
 }
-MOUNT="`mounted_lustre_filesystems`"
-if [ -z "$MOUNT" ]; then
+
+# Remember where our caller has hinted that we should mount lustre
+MOUNT_HINT=$MOUNT
+MOUNT_HINT2=$MOUNT2
+MOUNT="`mounted_lustre_filesystems 1`"
+MOUNT2="`mounted_lustre_filesystems 2`"
+if [ -n "$MOUNT" -a -z "$MOUNT2" ]; then
+        error "this test needs two mount point!"
+fi
+if [ -z "$MOUNT" -a -n "$MOUNT2" ]; then
+        error "this test needs two mount point!"
+fi
+if [ -z "$MOUNT" -a -z "$MOUNT2" ]; then
        export QUOTA_OPTS="quotaon=ug"
-       sh llmount.sh
-       MOUNT="`mounted_lustre_filesystems`"
-       [ -z "$MOUNT" ] && error "NAME=$NAME not mounted"
+       export MOUNT=$MOUNT_HINT 
+       export MOUNT2=$MOUNT_HINT2
+       MOUNT2=${MOUNT2:-/mnt/lustre2}
+       sh llmount.sh 
+       MOUNT="`mounted_lustre_filesystems 1`"
+       MOUNT2="`mounted_lustre_filesystems 2`"
+       [ -z "$MOUNT" ] && error "NAME=$MOUNT not mounted"
+       [ -z "$MOUNT2" ] && error "NAME=$MOUNT2 not mounted"
        I_MOUNTED=yes
 fi
 
@@ -161,6 +180,7 @@ ORIGFREE=`cat $LPROC/lov/$LOVNAME/kbytesavail`
 MAXFREE=${MAXFREE:-$((200000 * $OSTCOUNT))}
 MDS=$(\ls $LPROC/mds 2> /dev/null | grep -v num_refs | tail -n 1)
 TSTDIR="$MOUNT/quota_dir"
+TSTDIR2="$MOUNT2/quota_dir"
 
 build_test_filter
 
@@ -244,6 +264,12 @@ setup() {
        fi
        TSTID="`cat /etc/group | grep "$TSTUSR" | awk -F: '{print $3}'`"
 
+        GRP2="`cat /etc/group | grep "$TSTUSR2" | awk -F: '{print $1}'`"
+        if [ -z "$GRP2" ]; then
+                groupadd -g $TSTID2 "$TSTUSR2"
+        fi
+        TSTID2="`cat /etc/group | grep "$TSTUSR2" | awk -F: '{print $3}'`"
+
        # create test user
        USR="`cat /etc/passwd | grep "$TSTUSR" | awk -F: '{print $1}'`"
        if [ -z "$USR" ]; then
@@ -251,10 +277,20 @@ setup() {
        fi
        
        RUNAS="runas -u $TSTID"
+
+       USR2="`cat /etc/passwd | grep "$TSTUSR2" | awk -F: '{print $1}'`"
+        if [ -z "$USR2" ]; then
+                useradd -u $TSTID2 -g $TSTID2 -d /tmp "$TSTUSR2"
+        fi
+
+        RUNAS2="runas -u $TSTID2"
        
        # create test directory
        [ -d $TSTDIR ] || mkdir $TSTDIR 
        chmod 777 $TSTDIR
+
+        [ -d $TSTDIR2 ] || mkdir $TSTDIR2
+        chmod 777 $TSTDIR2
 }
 setup
 
@@ -654,7 +690,7 @@ run_test 8 "Run dbench with quota enabled ==========="
 
 # run for fixing bug10707, it needs a big room. test for 64bit
 test_9() {
-        lustrefs_size=`df | grep $MOUNT | awk '{print $(NF - 2)}'`
+        lustrefs_size=`df | grep $MOUNT | awk '{print $(NF - 2)}' | sed -n 1p`
         size_file=$((1024 * 1024 * 9 / 2 * $OSTCOUNT))
         echo "lustrefs_size:$lustrefs_size  size_file:$size_file"
         if [ $lustrefs_size -lt $size_file ]; then
@@ -702,7 +738,7 @@ run_test 9 "run for fixing bug10707(64bit) ==========="
 
 # run for fixing bug10707, it need a big room. test for 32bit
 test_10() {
-       lustrefs_size=`df | grep $MOUNT | awk '{print $(NF - 2)}'`
+       lustrefs_size=`df | grep $MOUNT | awk '{print $(NF - 2)}' | sed -n 1p`
        size_file=$((1024 * 1024 * 9 / 2 * $OSTCOUNT))
        echo "lustrefs_size:$lustrefs_size  size_file:$size_file"
        if [ $lustrefs_size -lt $size_file ]; then
@@ -764,7 +800,7 @@ run_test 10 "run for fixing bug10707(32bit) ==========="
 
 test_11() {
        #prepare the test
-       block_limit=`df | grep $MOUNT | awk '{print $(NF - 4)}'`
+       block_limit=`df | grep $MOUNT | awk '{print $(NF - 4)}'| sed -n 1p`
        echo $block_limit
        orig_dbr=`cat /proc/sys/vm/dirty_background_ratio`
        orig_dec=`cat /proc/sys/vm/dirty_expire_centisecs`
@@ -822,6 +858,122 @@ test_11() {
 }
 run_test 11 "run for fixing bug10912 ==========="
 
+# test a deadlock between quota and journal b=11693
+test_12() {
+       LIMIT=$(( $BUNIT_SZ * $(($OSTCOUNT + 1)) * 10)) # 10 bunits each sever
+       TESTFILE="$TSTDIR/quota_tst120"
+       TESTFILE2="$TSTDIR2/quota_tst121"
+       
+       echo "   User quota (limit: $LIMIT kilobytes)"
+       $LFS setquota -u $TSTUSR 0 $LIMIT 0 0 $MOUNT
+       
+       $LFS setstripe $TESTFILE 65536 0 1
+       chown $TSTUSR.$TSTUSR $TESTFILE
+       $LFS setstripe $TESTFILE2 65536 0 1
+        chown $TSTUSR2.$TSTUSR2 $TESTFILE2
+
+       #define OBD_FAIL_OST_HOLD_WRITE_RPC      0x21f
+       sysctl -w lustre.fail_loc=0x0000021f        
+
+       echo "   step1: write out of block quota ..."
+       $RUNAS dd if=/dev/zero of=$TESTFILE bs=$BLK_SZ count=$(($LIMIT*2)) & > /dev/null 2>&1
+       DDPID=$!
+       sleep 5
+       $RUNAS2 dd if=/dev/zero of=$TESTFILE2 bs=$BLK_SZ count=102400 & > /dev/null 2>&1
+       DDPID1=$!
+
+       echo  "   step2: testing ......"
+       count=0
+       while [ true ]; do
+           if [ -z `ps -ef | awk '$2 == '${DDPID1}' { print $8 }'` ]; then break; fi
+           count=$[count+1]
+           if [ $count -gt 64 ]; then
+               sysctl -w lustre.fail_loc=0
+               error "dd should be finished!"
+           fi
+           sleep 1
+       done    
+       echo "(dd_pid=$DDPID1, time=$count)successful"
+
+       #Recover fail_loc and dd will finish soon
+       sysctl -w lustre.fail_loc=0
+
+       echo  "   step3: testing ......"
+       count=0
+       while [ true ]; do
+           if [ -z `ps -ef | awk '$2 == '${DDPID}' { print $8 }'` ]; then break; fi
+           count=$[count+1]
+           if [ $count -gt 100 ]; then
+               error "dd should be finished!"
+           fi
+           sleep 1
+       done    
+       echo "(dd_pid=$DDPID, time=$count)successful"
+
+       rm -f $TESTFILE $TESTFILE2
+       
+       $LFS setquota -u $TSTUSR 0 0 0 0 $MOUNT         # clear user limit
+}
+run_test 12 "test a deadlock between quota and journal ==="
+
+# test multiple clients write block quota b=11693
+test_13() {
+       LIMIT=$(( $BUNIT_SZ * $(($OSTCOUNT + 1)) * 8 + $BUNIT_SZ ))
+       TESTFILE="$TSTDIR/quota_tst130"
+       TESTFILE2="$TSTDIR2/quota_tst131"
+       
+       echo "   User quota (limit: $LIMIT kilobytes)"
+       $LFS setquota -u $TSTUSR 0 $LIMIT 0 0 $MOUNT
+       
+       $LFS setstripe $TESTFILE 65536 0 1
+       chown $TSTUSR.$TSTUSR $TESTFILE
+       $LFS setstripe $TESTFILE2 65536 0 1
+        chown $TSTUSR.$TSTUSR $TESTFILE2
+
+       echo "   step1: write out of block quota ..."
+       # one bunit will give mds
+       $RUNAS dd if=/dev/zero of=$TESTFILE bs=$BLK_SZ count=$[($LIMIT - $BUNIT_SZ) / 2] & > /dev/null 2>&1
+       DDPID=$!
+       $RUNAS dd if=/dev/zero of=$TESTFILE2 bs=$BLK_SZ count=$[($LIMIT - $BUNIT_SZ) / 2] & > /dev/null 2>&1
+       DDPID1=$!
+
+       echo  "   step2: testing ......"
+       count=0
+       while [ true ]; do
+           if [ -z `ps -ef | awk '$2 == '${DDPID}' { print $8 }'` ]; then break; fi
+           count=$[count+1]
+           if [ $count -gt 64 ]; then
+               error "dd should be finished!"
+           fi
+           sleep 1
+       done    
+       echo "(dd_pid=$DDPID, time=$count)successful"
+
+       count=0
+       while [ true ]; do
+           if [ -z `ps -ef | awk '$2 == '${DDPID1}' { print $8 }'` ]; then break; fi
+           count=$[count+1]
+           if [ $count -gt 64 ]; then
+               error "dd should be finished!"
+           fi
+           sleep 1
+       done    
+       echo "(dd_pid=$DDPID1, time=$count)successful"
+
+       sync; sleep 5; sync;
+
+       echo  "   step3: checking ......"
+       fz=`stat -t $TESTFILE | awk '{print $2}'`
+       fz2=`stat -t $TESTFILE2 | awk '{print $2}'`
+       [ $fz  -ne $[($LIMIT - $BUNIT_SZ) / 2 * $BLK_SZ] ] && error "test13 failed!"
+       [ $fz2 -ne $[($LIMIT - $BUNIT_SZ) / 2 * $BLK_SZ] ] && error "test13 failed!"
+
+       rm -f $TESTFILE $TESTFILE2
+       
+       $LFS setquota -u $TSTUSR 0 0 0 0 $MOUNT         # clear user limit
+}
+run_test 13 "test multiple clients write block quota ==="
+
 # turn off quota
 test_99()
 {
@@ -837,6 +989,7 @@ if [ "`mount | grep ^$NAME`" ]; then
        post_test
        # delete test user and group
        userdel "$TSTUSR"
+       userdel "$TSTUSR2"
        if [ "$I_MOUNTED" = "yes" ]; then
                cd $ORIG_PWD && (sh llmountcleanup.sh || error "llmountcleanup failed")
        fi
index 17bea48..11d1a9b 100644 (file)
@@ -635,6 +635,9 @@ setupall() {
     done
     [ "$DAEMONFILE" ] && $LCTL debug_daemon start $DAEMONFILE $DAEMONSIZE
     mount_client $MOUNT
+    if [ "$MOUNT2" ]; then
+       mount_client $MOUNT2
+    fi
     sleep 5
 }