Whamcloud - gitweb
LU-14357 fid: simplify locking for fid updates 99/41299/2
authorMr NeilBrown <neilb@suse.de>
Fri, 22 Jan 2021 02:42:33 +0000 (13:42 +1100)
committerOleg Drokin <green@whamcloud.com>
Wed, 5 May 2021 02:49:56 +0000 (02:49 +0000)
'struct lu_client_seq' contains a mutex (lcs_mutex) and a second
open-coded mutex (lcs_waitq, lcs_update).  Both of these are using in
gettign a new fid, possibly from the server.

lcs_mutex is the main mutex which protects the local variables.  If an
RPC to the server is required, the extra mutex is held during that
RPC.

This was apparently intended to avoid some deadlock, presumably with
seq_client_flush().  However as seq_client_flush() now takes both
mutexes as well, it is still prone to any such deadlock, but does not
appear to actually suffer from one.
See:
  Commit 23e2a370c8a3 ("b=24255 move seq_client_alloc_seq out of
   lcs_sem")
  Commit d1feb5c774d4 ("LU-662 fix conflict between seq_client_flush
   and seq_client_alloc_fid")

for some of the history.

The extra open-coded mutex appears to provide no value, so let's
remove it.

As part of this, seq_fid_alloc_fini() is open-coded into the two call
sites, which adds further simplification.

Signed-off-by: Mr NeilBrown <neilb@suse.de>
Change-Id: Ia39eca7d925c9d49fbf942923de8af79dba4f6bf
Reviewed-on: https://review.whamcloud.com/41299
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: James Simmons <jsimmons@infradead.org>
Reviewed-by: Alex Zhuravlev <bzzz@whamcloud.com>
Reviewed-by: Lai Siyao <lai.siyao@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/fid/fid_request.c
lustre/include/lustre_fid.h

index add56f6..2fa8590 100644 (file)
@@ -250,59 +250,6 @@ static int seq_client_alloc_seq(const struct lu_env *env,
        RETURN(rc);
 }
 
-static int seq_fid_alloc_prep(struct lu_client_seq *seq,
-                             wait_queue_entry_t *link)
-{
-       if (seq->lcs_update) {
-               add_wait_queue(&seq->lcs_waitq, link);
-               set_current_state(TASK_UNINTERRUPTIBLE);
-               mutex_unlock(&seq->lcs_mutex);
-
-               schedule();
-
-               mutex_lock(&seq->lcs_mutex);
-               remove_wait_queue(&seq->lcs_waitq, link);
-               set_current_state(TASK_RUNNING);
-               return -EAGAIN;
-       }
-
-       ++seq->lcs_update;
-       mutex_unlock(&seq->lcs_mutex);
-
-       return 0;
-}
-
-static void seq_fid_alloc_fini(struct lu_client_seq *seq, __u64 seqnr,
-                              bool whole)
-{
-       LASSERT(seq->lcs_update == 1);
-
-       mutex_lock(&seq->lcs_mutex);
-       if (seqnr != 0) {
-               CDEBUG(D_INFO, "%s: New sequence [0x%16.16llx]\n",
-                      seq->lcs_name, seqnr);
-
-               seq->lcs_fid.f_seq = seqnr;
-               if (whole) {
-                       /*
-                        * Since the caller require the whole seq,
-                        * so marked this seq to be used
-                        */
-                       if (seq->lcs_type == LUSTRE_SEQ_METADATA)
-                               seq->lcs_fid.f_oid =
-                                       LUSTRE_METADATA_SEQ_MAX_WIDTH;
-                       else
-                               seq->lcs_fid.f_oid = LUSTRE_DATA_SEQ_MAX_WIDTH;
-               } else {
-                       seq->lcs_fid.f_oid = LUSTRE_FID_INIT_OID;
-               }
-               seq->lcs_fid.f_ver = 0;
-       }
-
-       --seq->lcs_update;
-       wake_up(&seq->lcs_waitq);
-}
-
 /**
  * Allocate the whole non-used seq to the caller.
  *
@@ -316,23 +263,31 @@ static void seq_fid_alloc_fini(struct lu_client_seq *seq, __u64 seqnr,
 int seq_client_get_seq(const struct lu_env *env,
                       struct lu_client_seq *seq, u64 *seqnr)
 {
-       wait_queue_entry_t link;
        int rc;
 
        LASSERT(seqnr != NULL);
 
        mutex_lock(&seq->lcs_mutex);
-       init_wait(&link);
-
-       /* To guarantee that we can get a whole non-used sequence. */
-       while (seq_fid_alloc_prep(seq, &link) != 0)
-               ; /* do nothing */
 
        rc = seq_client_alloc_seq(env, seq, seqnr);
-       seq_fid_alloc_fini(seq, rc ? 0 : *seqnr, true);
-       if (rc)
+       if (rc) {
                CERROR("%s: Can't allocate new sequence: rc = %d\n",
                       seq->lcs_name, rc);
+       } else {
+               CDEBUG(D_INFO, "%s: New sequence [0x%16.16llx]\n",
+                      seq->lcs_name, *seqnr);
+               seq->lcs_fid.f_seq = *seqnr;
+               seq->lcs_fid.f_ver = 0;
+               /*
+                *  The caller require the whole seq,
+                * so marked this seq to be used
+                */
+               if (seq->lcs_type == LUSTRE_SEQ_METADATA)
+                       seq->lcs_fid.f_oid =
+                               LUSTRE_METADATA_SEQ_MAX_WIDTH;
+               else
+                       seq->lcs_fid.f_oid = LUSTRE_DATA_SEQ_MAX_WIDTH;
+       }
        mutex_unlock(&seq->lcs_mutex);
 
        return rc;
@@ -354,59 +309,48 @@ EXPORT_SYMBOL(seq_client_get_seq);
 int seq_client_alloc_fid(const struct lu_env *env,
                         struct lu_client_seq *seq, struct lu_fid *fid)
 {
-       wait_queue_entry_t link;
        int rc;
        ENTRY;
 
        LASSERT(seq != NULL);
        LASSERT(fid != NULL);
 
-       init_wait(&link);
        mutex_lock(&seq->lcs_mutex);
 
        if (OBD_FAIL_CHECK(OBD_FAIL_SEQ_EXHAUST))
                seq->lcs_fid.f_oid = seq->lcs_width;
 
-       while (1) {
+       if (unlikely(!fid_is_zero(&seq->lcs_fid) &&
+                    fid_oid(&seq->lcs_fid) < seq->lcs_width)) {
+               /* Just bump last allocated fid and return to caller. */
+               seq->lcs_fid.f_oid++;
+               rc = 0;
+       } else {
                u64 seqnr;
 
-               if (unlikely(!fid_is_zero(&seq->lcs_fid) &&
-                            fid_oid(&seq->lcs_fid) < seq->lcs_width)) {
-                       /* Just bump last allocated fid and return to caller. */
-                       seq->lcs_fid.f_oid++;
-                       rc = 0;
-                       break;
-               }
-
-               /*
-                * Release seq::lcs_mutex via seq_fid_alloc_prep() to avoid
-                * deadlock during seq_client_alloc_seq().
-                */
-               rc = seq_fid_alloc_prep(seq, &link);
-               if (rc)
-                       continue;
-
                rc = seq_client_alloc_seq(env, seq, &seqnr);
-               /* Re-take seq::lcs_mutex via seq_fid_alloc_fini(). */
-               seq_fid_alloc_fini(seq, rc ? 0 : seqnr, false);
                if (rc) {
                        if (rc != -EINPROGRESS)
                                CERROR("%s: Can't allocate new sequence: rc = %d\n",
                                       seq->lcs_name, rc);
-                       mutex_unlock(&seq->lcs_mutex);
+               } else {
+                       CDEBUG(D_INFO, "%s: New sequence [0x%16.16llx]\n",
+                              seq->lcs_name, seqnr);
 
-                       RETURN(rc);
+                       seq->lcs_fid.f_seq = seqnr;
+                       seq->lcs_fid.f_oid = LUSTRE_FID_INIT_OID;
+                       seq->lcs_fid.f_ver = 0;
+                       rc = 1;
                }
-
-               rc = 1;
-               break;
        }
 
-       *fid = seq->lcs_fid;
+       if (rc >= 0) {
+               *fid = seq->lcs_fid;
+               CDEBUG(D_INFO, "%s: Allocated FID "DFID"\n", seq->lcs_name,
+                      PFID(fid));
+       }
        mutex_unlock(&seq->lcs_mutex);
 
-       CDEBUG(D_INFO, "%s: Allocated FID "DFID"\n", seq->lcs_name,  PFID(fid));
-
        RETURN(rc);
 }
 EXPORT_SYMBOL(seq_client_alloc_fid);
@@ -417,24 +361,9 @@ EXPORT_SYMBOL(seq_client_alloc_fid);
  */
 void seq_client_flush(struct lu_client_seq *seq)
 {
-       wait_queue_entry_t link;
-
        LASSERT(seq != NULL);
-       init_wait(&link);
        mutex_lock(&seq->lcs_mutex);
 
-       while (seq->lcs_update) {
-               add_wait_queue(&seq->lcs_waitq, &link);
-               set_current_state(TASK_UNINTERRUPTIBLE);
-               mutex_unlock(&seq->lcs_mutex);
-
-               schedule();
-
-               mutex_lock(&seq->lcs_mutex);
-               remove_wait_queue(&seq->lcs_waitq, &link);
-               set_current_state(TASK_RUNNING);
-       }
-
        fid_zero(&seq->lcs_fid);
        /**
         * this id shld not be used for seq range allocation.
@@ -497,7 +426,6 @@ void seq_client_init(struct lu_client_seq *seq,
        else
                seq->lcs_width = LUSTRE_DATA_SEQ_MAX_WIDTH;
 
-       init_waitqueue_head(&seq->lcs_waitq);
        /* Make sure that things are clear before work is started. */
        seq_client_flush(seq);
 
index d7388f0..fccb5ff 100644 (file)
@@ -409,25 +409,25 @@ struct lu_server_seq;
 
 /* Client sequence manager interface. */
 struct lu_client_seq {
-        /* Sequence-controller export. */
-        struct obd_export      *lcs_exp;
+       /* Sequence-controller export. */
+       struct obd_export       *lcs_exp;
        struct mutex            lcs_mutex;
 
-        /*
-         * Range of allowed for allocation sequeces. When using lu_client_seq on
-         * clients, this contains meta-sequence range. And for servers this
-         * contains super-sequence range.
-         */
-        struct lu_seq_range         lcs_space;
+       /*
+        * Range of allowed for allocation sequeces. When using lu_client_seq on
+        * clients, this contains meta-sequence range. And for servers this
+        * contains super-sequence range.
+        */
+       struct lu_seq_range     lcs_space;
 
        /* Seq related debugfs */
        struct dentry           *lcs_debugfs_entry;
 
-        /* This holds last allocated fid in last obtained seq */
-        struct lu_fid           lcs_fid;
+       /* This holds last allocated fid in last obtained seq */
+       struct lu_fid           lcs_fid;
 
-        /* LUSTRE_SEQ_METADATA or LUSTRE_SEQ_DATA */
-        enum lu_cli_type        lcs_type;
+       /* LUSTRE_SEQ_METADATA or LUSTRE_SEQ_DATA */
+       enum lu_cli_type        lcs_type;
 
        /*
         * Service uuid, passed from MDT + seq name to form unique seq name to
@@ -435,18 +435,14 @@ struct lu_client_seq {
         */
        char                    lcs_name[LUSTRE_MDT_MAXNAMELEN];
 
-        /*
-         * Sequence width, that is how many objects may be allocated in one
-         * sequence. Default value for it is LUSTRE_SEQ_MAX_WIDTH.
-         */
-        __u64                   lcs_width;
-
-        /* Seq-server for direct talking */
-        struct lu_server_seq   *lcs_srv;
+       /*
+        * Sequence width, that is how many objects may be allocated in one
+        * sequence. Default value for it is LUSTRE_SEQ_MAX_WIDTH.
+        */
+       __u64                   lcs_width;
 
-       /* wait queue for fid allocation and update indicator */
-       wait_queue_head_t       lcs_waitq;
-       int                     lcs_update;
+       /* Seq-server for direct talking */
+       struct lu_server_seq    *lcs_srv;
 };
 
 /* server sequence manager interface */