Whamcloud - gitweb
LU-8319 fid: fix race in fid allocation 39/20939/3
authorFan Yong <fan.yong@intel.com>
Sun, 15 May 2016 02:35:24 +0000 (10:35 +0800)
committerOleg Drokin <oleg.drokin@intel.com>
Mon, 11 Jul 2016 23:52:52 +0000 (23:52 +0000)
There is race condition when allocating fid/seq in parallel
as following:

The thread1 will release the lcs_mutex via seq_fid_alloc_prep(),
then another fid allocation thread2 can obtain the lcs_mutex and
allocate FID in the new sequence that has just been allocated by
the thread1 via seq_client_alloc_seq(); and then after thread2
released the lcs_mutex, the thread1 will re-allocate the current
FID in the new sequence without checking whether some others have
already taken such FID in the new sequence during it re-obtaining
the lcs_mutex.

Such race will cause two objects to use the same FID, and trigger
OI conflict and LMA verification failures.

This patch makes the fid allocation and lu_client_seq modification
to be protected by the lcs_mutex.

Signed-off-by: Fan Yong <fan.yong@intel.com>
Change-Id: I79c250f57b73d0abd7d039eea02424ae438f1b56
Reviewed-on: http://review.whamcloud.com/20939
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
lustre/fid/fid_request.c

index f935f8e..7f96185 100644 (file)
@@ -252,22 +252,52 @@ static int seq_fid_alloc_prep(struct lu_client_seq *seq,
                set_current_state(TASK_RUNNING);
                return -EAGAIN;
        }
+
        ++seq->lcs_update;
        mutex_unlock(&seq->lcs_mutex);
+
        return 0;
 }
 
-static void seq_fid_alloc_fini(struct lu_client_seq *seq)
+static void seq_fid_alloc_fini(struct lu_client_seq *seq, __u64 seqnr,
+                              bool whole)
 {
        LASSERT(seq->lcs_update == 1);
+
        mutex_lock(&seq->lcs_mutex);
+       if (seqnr != 0) {
+               CDEBUG(D_INFO, "%s: New sequence [0x%16.16"LPF64"x]\n",
+                      seq->lcs_name, seqnr);
+
+               seq->lcs_fid.f_seq = seqnr;
+               if (whole) {
+                       /* Since the caller require the whole seq,
+                        * so marked this seq to be used */
+                       if (seq->lcs_type == LUSTRE_SEQ_METADATA)
+                               seq->lcs_fid.f_oid =
+                                       LUSTRE_METADATA_SEQ_MAX_WIDTH;
+                       else
+                               seq->lcs_fid.f_oid = LUSTRE_DATA_SEQ_MAX_WIDTH;
+               } else {
+                       seq->lcs_fid.f_oid = LUSTRE_FID_INIT_OID;
+               }
+               seq->lcs_fid.f_ver = 0;
+       }
+
        --seq->lcs_update;
-       wake_up(&seq->lcs_waitq);
+       wake_up_all(&seq->lcs_waitq);
 }
 
 /**
- * Allocate the whole seq to the caller.
- **/
+ * Allocate the whole non-used seq to the caller.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in,out] seq  pointer to the client sequence manager
+ * \param[out] seqnr   to hold the new allocated sequence
+ *
+ * \retval             0 for new sequence allocated.
+ * \retval             Negative error number on failure.
+ */
 int seq_client_get_seq(const struct lu_env *env,
                       struct lu_client_seq *seq, u64 *seqnr)
 {
@@ -275,48 +305,36 @@ int seq_client_get_seq(const struct lu_env *env,
        int rc;
 
        LASSERT(seqnr != NULL);
+
        mutex_lock(&seq->lcs_mutex);
        init_waitqueue_entry(&link, current);
 
-        while (1) {
-                rc = seq_fid_alloc_prep(seq, &link);
-                if (rc == 0)
-                        break;
-        }
-
-        rc = seq_client_alloc_seq(env, seq, seqnr);
-        if (rc) {
-                CERROR("%s: Can't allocate new sequence, "
-                       "rc %d\n", seq->lcs_name, rc);
-                seq_fid_alloc_fini(seq);
-               mutex_unlock(&seq->lcs_mutex);
-                return rc;
-        }
-
-        CDEBUG(D_INFO, "%s: allocate sequence "
-               "[0x%16.16"LPF64"x]\n", seq->lcs_name, *seqnr);
+       /* To guarantee that we can get a whole non-used sequence. */
+       while (seq_fid_alloc_prep(seq, &link) != 0);
 
-       /* Since the caller require the whole seq,
-        * so marked this seq to be used */
-       if (seq->lcs_type == LUSTRE_SEQ_METADATA)
-               seq->lcs_fid.f_oid = LUSTRE_METADATA_SEQ_MAX_WIDTH;
-       else
-               seq->lcs_fid.f_oid = LUSTRE_DATA_SEQ_MAX_WIDTH;
-
-       seq->lcs_fid.f_seq = *seqnr;
-       seq->lcs_fid.f_ver = 0;
-        /*
-         * Inform caller that sequence switch is performed to allow it
-         * to setup FLD for it.
-         */
-        seq_fid_alloc_fini(seq);
+       rc = seq_client_alloc_seq(env, seq, seqnr);
+       seq_fid_alloc_fini(seq, rc ? 0 : *seqnr, true);
+       if (rc)
+               CERROR("%s: Can't allocate new sequence: rc = %d\n",
+                      seq->lcs_name, rc);
        mutex_unlock(&seq->lcs_mutex);
 
-        return rc;
+       return rc;
 }
 EXPORT_SYMBOL(seq_client_get_seq);
 
-/* Allocate new fid on passed client @seq and save it to @fid. */
+/**
+ * Allocate new fid on passed client @seq and save it to @fid.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in,out] seq  pointer to the client sequence manager
+ * \param[out] fid     to hold the new allocated fid
+ *
+ * \retval             1 for notify the caller that sequence switch
+ *                     is performed to allow it to setup FLD for it.
+ * \retval             0 for new FID allocated in current sequence.
+ * \retval             Negative error number on failure.
+ */
 int seq_client_alloc_fid(const struct lu_env *env,
                         struct lu_client_seq *seq, struct lu_fid *fid)
 {
@@ -333,52 +351,44 @@ int seq_client_alloc_fid(const struct lu_env *env,
        if (OBD_FAIL_CHECK(OBD_FAIL_SEQ_EXHAUST))
                seq->lcs_fid.f_oid = seq->lcs_width;
 
-        while (1) {
+       while (1) {
                u64 seqnr;
 
-                if (!fid_is_zero(&seq->lcs_fid) &&
-                    fid_oid(&seq->lcs_fid) < seq->lcs_width) {
-                        /* Just bump last allocated fid and return to caller. */
-                        seq->lcs_fid.f_oid += 1;
-                        rc = 0;
-                        break;
-                }
-
-                rc = seq_fid_alloc_prep(seq, &link);
-                if (rc)
-                        continue;
+               if (unlikely(!fid_is_zero(&seq->lcs_fid) &&
+                            fid_oid(&seq->lcs_fid) < seq->lcs_width)) {
+                       /* Just bump last allocated fid and return to caller. */
+                       seq->lcs_fid.f_oid++;
+                       rc = 0;
+                       break;
+               }
 
-                rc = seq_client_alloc_seq(env, seq, &seqnr);
-                if (rc) {
-                        CERROR("%s: Can't allocate new sequence, "
-                               "rc %d\n", seq->lcs_name, rc);
-                        seq_fid_alloc_fini(seq);
+               /* Release seq::lcs_mutex via seq_fid_alloc_prep() to avoid
+                * deadlock during seq_client_alloc_seq(). */
+               rc = seq_fid_alloc_prep(seq, &link);
+               if (rc)
+                       continue;
+
+               rc = seq_client_alloc_seq(env, seq, &seqnr);
+               /* Re-take seq::lcs_mutex via seq_fid_alloc_fini(). */
+               seq_fid_alloc_fini(seq, rc ? 0 : seqnr, false);
+               if (rc) {
+                       CERROR("%s: Can't allocate new sequence: rc = %d\n",
+                              seq->lcs_name, rc);
                        mutex_unlock(&seq->lcs_mutex);
-                        RETURN(rc);
-                }
-
-                CDEBUG(D_INFO, "%s: Switch to sequence "
-                       "[0x%16.16"LPF64"x]\n", seq->lcs_name, seqnr);
 
-                seq->lcs_fid.f_oid = LUSTRE_FID_INIT_OID;
-                seq->lcs_fid.f_seq = seqnr;
-                seq->lcs_fid.f_ver = 0;
-
-                /*
-                 * Inform caller that sequence switch is performed to allow it
-                 * to setup FLD for it.
-                 */
-                rc = 1;
+                       RETURN(rc);
+               }
 
-                seq_fid_alloc_fini(seq);
-                break;
-        }
+               rc = 1;
+               break;
+       }
 
-        *fid = seq->lcs_fid;
+       *fid = seq->lcs_fid;
        mutex_unlock(&seq->lcs_mutex);
 
-        CDEBUG(D_INFO, "%s: Allocated FID "DFID"\n", seq->lcs_name,  PFID(fid));
-        RETURN(rc);
+       CDEBUG(D_INFO, "%s: Allocated FID "DFID"\n", seq->lcs_name,  PFID(fid));
+
+       RETURN(rc);
 }
 EXPORT_SYMBOL(seq_client_alloc_fid);