Whamcloud - gitweb
LU-11827 llog: protect cathandle in llog_cat_declare_add_rec
[fs/lustre-release.git] / lustre / obdclass / llog_cat.c
index 8448a83..dbb2c39 100644 (file)
@@ -256,18 +256,28 @@ unlock:
 /*
  * prepare current/next log for catalog.
  *
- * if log is NULL, open it, and declare create, NB, if log is remote, create it
- * synchronously here, see comments below.
+ * if \a *ploghandle is NULL, open it, and declare create, NB, if \a
+ * *ploghandle is remote, create it synchronously here, see comments
+ * below.
+ *
+ * \a cathandle->lgh_lock is down_read-ed, it gets down_write-ed if \a
+ * *ploghandle has to be opened.
  */
 static int llog_cat_prep_log(const struct lu_env *env,
                             struct llog_handle *cathandle,
                             struct llog_handle **ploghandle,
                             struct thandle *th)
 {
-       int rc = 0;
+       int rc;
+       int sem_upgraded;
 
+start:
+       rc = 0;
+       sem_upgraded = 0;
        if (IS_ERR_OR_NULL(*ploghandle)) {
+               up_read(&cathandle->lgh_lock);
                down_write(&cathandle->lgh_lock);
+               sem_upgraded = 1;
                if (IS_ERR_OR_NULL(*ploghandle)) {
                        struct llog_handle *loghandle;
 
@@ -279,18 +289,17 @@ static int llog_cat_prep_log(const struct lu_env *env,
                                              &cathandle->u.chd.chd_head);
                        }
                }
-               up_write(&cathandle->lgh_lock);
-
                if (rc)
-                       return rc;
+                       GOTO(out, rc);
        }
 
-       if (llog_exist(*ploghandle))
-               return 0;
+       rc = llog_exist(*ploghandle);
+       if (rc < 0)
+               GOTO(out, rc);
+       if (rc)
+               GOTO(out, rc = 0);
 
        if (dt_object_remote(cathandle->lgh_obj)) {
-create_remote_log:
-               down_read_nested(&cathandle->lgh_lock, LLOGH_CAT);
                down_write_nested(&(*ploghandle)->lgh_lock, LLOGH_LOG);
                if (!llog_exist(*ploghandle)) {
                        /* For remote operation, if we put the llog object
@@ -306,30 +315,41 @@ create_remote_log:
                                              NULL);
                        if (rc == -ESTALE) {
                                up_write(&(*ploghandle)->lgh_lock);
-                               up_read(&cathandle->lgh_lock);
+                               if (sem_upgraded)
+                                       up_write(&cathandle->lgh_lock);
+                               else
+                                       up_read(&cathandle->lgh_lock);
 
                                rc = llog_cat_refresh(env, cathandle);
-                               if (!rc)
-                                       goto create_remote_log;
-
-                               return rc;
+                               down_read_nested(&cathandle->lgh_lock,
+                                                LLOGH_CAT);
+                               if (rc)
+                                       return rc;
+                               /* *ploghandle might become NULL, restart */
+                               goto start;
                        }
                }
                up_write(&(*ploghandle)->lgh_lock);
-               up_read(&cathandle->lgh_lock);
        } else {
                struct llog_thread_info *lgi = llog_info(env);
                struct llog_logid_rec *lirec = &lgi->lgi_logid;
 
                rc = llog_declare_create(env, *ploghandle, th);
                if (rc)
-                       return rc;
+                       GOTO(out, rc);
 
                lirec->lid_hdr.lrh_len = sizeof(*lirec);
                rc = llog_declare_write_rec(env, cathandle, &lirec->lid_hdr, -1,
                                            th);
        }
 
+out:
+       if (sem_upgraded) {
+               up_write(&cathandle->lgh_lock);
+               down_read_nested(&cathandle->lgh_lock, LLOGH_CAT);
+               if (rc == 0)
+                       goto start;
+       }
        return rc;
 }
 
@@ -605,23 +625,26 @@ int llog_cat_declare_add_rec(const struct lu_env *env,
 
        ENTRY;
 
+start:
+       down_read_nested(&cathandle->lgh_lock, LLOGH_CAT);
        rc = llog_cat_prep_log(env, cathandle,
                               &cathandle->u.chd.chd_current_log, th);
        if (rc)
-               RETURN(rc);
+               GOTO(unlock, rc);
 
        rc = llog_cat_prep_log(env, cathandle, &cathandle->u.chd.chd_next_log,
                               th);
        if (rc)
-               RETURN(rc);
+               GOTO(unlock, rc);
 
-declare_write:
        rc = llog_declare_write_rec(env, cathandle->u.chd.chd_current_log,
                                    rec, -1, th);
        if (rc == -ESTALE && dt_object_remote(cathandle->lgh_obj)) {
+               up_read(&cathandle->lgh_lock);
                rc = llog_cat_refresh(env, cathandle);
-               if (!rc)
-                       goto declare_write;
+               if (rc)
+                       RETURN(rc);
+               goto start;
        }
 
 #if 0
@@ -633,7 +656,8 @@ declare_write:
        rc = llog_declare_write_rec(env, cathandle->u.chd.chd_next_log, rec, -1,
                                    th);
 #endif
-
+unlock:
+       up_read(&cathandle->lgh_lock);
        RETURN(rc);
 }
 EXPORT_SYMBOL(llog_cat_declare_add_rec);