Whamcloud - gitweb
LU-11827 llog: protect cathandle in llog_cat_declare_add_rec 55/34455/3
authorVladimir Saveliev <c17830@cray.com>
Sat, 22 Dec 2018 00:31:45 +0000 (03:31 +0300)
committerOleg Drokin <green@whamcloud.com>
Mon, 1 Apr 2019 06:20:14 +0000 (06:20 +0000)
llog_cat_declare_add_rec() calls llog_cat_prep_log() passing
&cathandle->u.chd.chd_current_log and
&cathandle->u.chd.chd_next_log. Then it has to protect cathandle in
order to avoid race with llog_cat_current_log() when it decides to
change cathandle->u.chd.chd_current_log and
cathandle->u.chd.chd_next_log.

Lustre-change: https://review.whamcloud.com/33914
Lustre-commit: 59a62ada2e18174e5611730e8bcf5ba3165ca2b9

Signed-off-by: Vladimir Saveliev <c17830@cray.com>
Cray-bug-id: LUS-6804
Change-Id: I689efb40452af180f137aff35ccabe132a24180a
Reviewed-by: Alexandr Boyko <c17825@cray.com>
Reviewed-by: Lai Siyao <lai.siyao@whamcloud.com>
Signed-off-by: Minh Diep <mdiep@whamcloud.com>
Reviewed-on: https://review.whamcloud.com/34455
Tested-by: Jenkins
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/obdclass/llog_cat.c

index e489f23..55ffdf0 100644 (file)
@@ -247,18 +247,28 @@ unlock:
 /*
  * prepare current/next log for catalog.
  *
- * if log is NULL, open it, and declare create, NB, if log is remote, create it
- * synchronously here, see comments below.
+ * if \a *ploghandle is NULL, open it, and declare create, NB, if \a
+ * *ploghandle is remote, create it synchronously here, see comments
+ * below.
+ *
+ * \a cathandle->lgh_lock is down_read-ed, it gets down_write-ed if \a
+ * *ploghandle has to be opened.
  */
 static int llog_cat_prep_log(const struct lu_env *env,
                             struct llog_handle *cathandle,
                             struct llog_handle **ploghandle,
                             struct thandle *th)
 {
-       int rc = 0;
+       int rc;
+       int sem_upgraded;
 
+start:
+       rc = 0;
+       sem_upgraded = 0;
        if (IS_ERR_OR_NULL(*ploghandle)) {
+               up_read(&cathandle->lgh_lock);
                down_write(&cathandle->lgh_lock);
+               sem_upgraded = 1;
                if (IS_ERR_OR_NULL(*ploghandle)) {
                        struct llog_handle *loghandle;
 
@@ -270,18 +280,17 @@ static int llog_cat_prep_log(const struct lu_env *env,
                                              &cathandle->u.chd.chd_head);
                        }
                }
-               up_write(&cathandle->lgh_lock);
-
                if (rc)
-                       return rc;
+                       GOTO(out, rc);
        }
 
-       if (llog_exist(*ploghandle))
-               return 0;
+       rc = llog_exist(*ploghandle);
+       if (rc < 0)
+               GOTO(out, rc);
+       if (rc)
+               GOTO(out, rc = 0);
 
        if (dt_object_remote(cathandle->lgh_obj)) {
-create_remote_log:
-               down_read_nested(&cathandle->lgh_lock, LLOGH_CAT);
                down_write_nested(&(*ploghandle)->lgh_lock, LLOGH_LOG);
                if (!llog_exist(*ploghandle)) {
                        /* For remote operation, if we put the llog object
@@ -297,30 +306,41 @@ create_remote_log:
                                              NULL);
                        if (rc == -ESTALE) {
                                up_write(&(*ploghandle)->lgh_lock);
-                               up_read(&cathandle->lgh_lock);
+                               if (sem_upgraded)
+                                       up_write(&cathandle->lgh_lock);
+                               else
+                                       up_read(&cathandle->lgh_lock);
 
                                rc = llog_cat_refresh(env, cathandle);
-                               if (!rc)
-                                       goto create_remote_log;
-
-                               return rc;
+                               down_read_nested(&cathandle->lgh_lock,
+                                                LLOGH_CAT);
+                               if (rc)
+                                       return rc;
+                               /* *ploghandle might become NULL, restart */
+                               goto start;
                        }
                }
                up_write(&(*ploghandle)->lgh_lock);
-               up_read(&cathandle->lgh_lock);
        } else {
                struct llog_thread_info *lgi = llog_info(env);
                struct llog_logid_rec *lirec = &lgi->lgi_logid;
 
                rc = llog_declare_create(env, *ploghandle, th);
                if (rc)
-                       return rc;
+                       GOTO(out, rc);
 
                lirec->lid_hdr.lrh_len = sizeof(*lirec);
                rc = llog_declare_write_rec(env, cathandle, &lirec->lid_hdr, -1,
                                            th);
        }
 
+out:
+       if (sem_upgraded) {
+               up_write(&cathandle->lgh_lock);
+               down_read_nested(&cathandle->lgh_lock, LLOGH_CAT);
+               if (rc == 0)
+                       goto start;
+       }
        return rc;
 }
 
@@ -604,23 +624,26 @@ int llog_cat_declare_add_rec(const struct lu_env *env,
 
        ENTRY;
 
+start:
+       down_read_nested(&cathandle->lgh_lock, LLOGH_CAT);
        rc = llog_cat_prep_log(env, cathandle,
                               &cathandle->u.chd.chd_current_log, th);
        if (rc)
-               RETURN(rc);
+               GOTO(unlock, rc);
 
        rc = llog_cat_prep_log(env, cathandle, &cathandle->u.chd.chd_next_log,
                               th);
        if (rc)
-               RETURN(rc);
+               GOTO(unlock, rc);
 
-declare_write:
        rc = llog_declare_write_rec(env, cathandle->u.chd.chd_current_log,
                                    rec, -1, th);
        if (rc == -ESTALE && dt_object_remote(cathandle->lgh_obj)) {
+               up_read(&cathandle->lgh_lock);
                rc = llog_cat_refresh(env, cathandle);
-               if (!rc)
-                       goto declare_write;
+               if (rc)
+                       RETURN(rc);
+               goto start;
        }
 
 #if 0
@@ -632,7 +655,8 @@ declare_write:
        rc = llog_declare_write_rec(env, cathandle->u.chd.chd_next_log, rec, -1,
                                    th);
 #endif
-
+unlock:
+       up_read(&cathandle->lgh_lock);
        RETURN(rc);
 }
 EXPORT_SYMBOL(llog_cat_declare_add_rec);