Whamcloud - gitweb
LU-6634 llog: destroy plain llog if init fails 27/16427/15
authorAlex Zhuravlev <alexey.zhuravlev@intel.com>
Tue, 15 Sep 2015 09:36:35 +0000 (12:36 +0300)
committerOleg Drokin <oleg.drokin@intel.com>
Tue, 3 Nov 2015 19:53:24 +0000 (19:53 +0000)
llog_cat_add_rec() should destroy the plain llog
in the same transaction if initialization of that
failed. also, llog_osd_write_rec() should check
the object still exists as it's possible that
another thread failed to initialize and destroyed
the llog.

Change-Id: I7b823d34b32b5caaf0cc17b4cfe278a07a78ec15
Signed-off-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Reviewed-on: http://review.whamcloud.com/16427
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: wangdi <di.wang@intel.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/include/lustre_log.h
lustre/include/obd_support.h
lustre/obdclass/llog_cat.c
lustre/obdclass/llog_osd.c
lustre/osd-zfs/osd_object.c
lustre/tests/sanity.sh

index 6922954..7d26af2 100644 (file)
@@ -497,6 +497,11 @@ static inline int llog_connect(struct llog_ctxt *ctxt,
        RETURN(rc);
 }
 
+static inline int llog_is_full(struct llog_handle *llh)
+{
+       return llh->lgh_last_idx >= LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr) - 1;
+}
+
 struct llog_cfg_rec {
        struct llog_rec_hdr     lcr_hdr;
        struct lustre_cfg       lcr_cfg;
index 5d897fd..0f58fb0 100644 (file)
@@ -247,6 +247,7 @@ extern char obd_jobid_var[];
 #define OBD_FAIL_MDS_STALE_DIR_LAYOUT   0x158
 #define OBD_FAIL_MDS_REINT_MULTI_NET     0x159
 #define OBD_FAIL_MDS_REINT_MULTI_NET_REP 0x15a
+#define OBD_FAIL_MDS_LLOG_CREATE_FAILED2 0x15b
 
 /* layout lock */
 #define OBD_FAIL_MDS_NO_LL_GETATTR      0x170
index afa7653..65b2d0b 100644 (file)
@@ -167,7 +167,7 @@ static int llog_cat_new_log(const struct lu_env *env,
        rc = llog_write_rec(env, cathandle, &rec->lid_hdr,
                            &loghandle->u.phd.phd_cookie, LLOG_NEXT_IDX, th);
        if (rc < 0)
-               GOTO(out, rc);
+               GOTO(out_destroy, rc);
 
        CDEBUG(D_OTHER, "new plain log "DOSTID":%x for index %u of catalog"
               DOSTID"\n", POSTID(&loghandle->lgh_id.lgl_oi),
@@ -180,6 +180,17 @@ out:
                dt_trans_stop(env, dt, handle);
 
        RETURN(0);
+
+out_destroy:
+       /* to signal llog_cat_close() it shouldn't try to destroy the llog,
+        * we want to destroy it in this transaction, otherwise the object
+        * becomes an orphan */
+       loghandle->lgh_hdr->llh_flags &= ~LLOG_F_ZAP_WHEN_EMPTY;
+       /* this is to mimic full log, so another llog_cat_current_log()
+        * can skip it and ask for another onet */
+       loghandle->lgh_last_idx = LLOG_HDR_BITMAP_SIZE(llh) + 1;
+       llog_trans_destroy(env, loghandle, th);
+       RETURN(rc);
 }
 
 /* Open an existent log handle and add it to the open list.
@@ -318,6 +329,12 @@ static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle,
         struct llog_handle *loghandle = NULL;
         ENTRY;
 
+
+       if (OBD_FAIL_CHECK(OBD_FAIL_MDS_LLOG_CREATE_FAILED2)) {
+               down_write_nested(&cathandle->lgh_lock, LLOGH_CAT);
+               GOTO(next, loghandle);
+       }
+
        down_read_nested(&cathandle->lgh_lock, LLOGH_CAT);
         loghandle = cathandle->u.chd.chd_current_log;
         if (loghandle) {
@@ -325,8 +342,7 @@ static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle,
 
                down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
                llh = loghandle->lgh_hdr;
-               if (llh == NULL ||
-                   loghandle->lgh_last_idx < LLOG_HDR_BITMAP_SIZE(llh) - 1) {
+               if (llh == NULL || !llog_is_full(loghandle)) {
                        up_read(&cathandle->lgh_lock);
                         RETURN(loghandle);
                 } else {
@@ -346,7 +362,7 @@ static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle,
                down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
                llh = loghandle->lgh_hdr;
                LASSERT(llh);
-               if (loghandle->lgh_last_idx < LLOG_HDR_BITMAP_SIZE(llh) - 1) {
+               if (!llog_is_full(loghandle)) {
                        up_write(&cathandle->lgh_lock);
                        RETURN(loghandle);
                } else {
@@ -354,6 +370,7 @@ static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle,
                }
         }
 
+next:
        CDEBUG(D_INODE, "use next log\n");
 
        loghandle = cathandle->u.chd.chd_next_log;
@@ -375,10 +392,12 @@ int llog_cat_add_rec(const struct lu_env *env, struct llog_handle *cathandle,
                     struct thandle *th)
 {
         struct llog_handle *loghandle;
-        int rc;
-        ENTRY;
+       int rc, retried = 0;
+       ENTRY;
 
        LASSERT(rec->lrh_len <= cathandle->lgh_ctxt->loc_chunk_size);
+
+retry:
        loghandle = llog_cat_current_log(cathandle, th);
        LASSERT(!IS_ERR(loghandle));
 
@@ -387,33 +406,35 @@ int llog_cat_add_rec(const struct lu_env *env, struct llog_handle *cathandle,
                rc = llog_cat_new_log(env, cathandle, loghandle, th);
                if (rc < 0) {
                        up_write(&loghandle->lgh_lock);
+                       /* nobody should be trying to use this llog */
+                       down_write(&cathandle->lgh_lock);
+                       if (cathandle->u.chd.chd_current_log == loghandle)
+                               cathandle->u.chd.chd_current_log = NULL;
+                       up_write(&cathandle->lgh_lock);
                        RETURN(rc);
                }
        }
        /* now let's try to add the record */
        rc = llog_write_rec(env, loghandle, rec, reccookie, LLOG_NEXT_IDX, th);
-       if (rc < 0)
+       if (rc < 0) {
                CDEBUG_LIMIT(rc == -ENOSPC ? D_HA : D_ERROR,
                             "llog_write_rec %d: lh=%p\n", rc, loghandle);
+               /* -ENOSPC is returned if no empty records left
+                * and when it's lack of space on the stogage.
+                * there is no point to try again if it's the second
+                * case. many callers (like llog test) expect ENOSPC,
+                * so we preserve this error code, but look for the
+                * actual cause here */
+               if (rc == -ENOSPC && llog_is_full(loghandle))
+                       rc = -ENOBUFS;
+       }
        up_write(&loghandle->lgh_lock);
-        if (rc == -ENOSPC) {
-               /* try to use next log */
-               loghandle = llog_cat_current_log(cathandle, th);
-               LASSERT(!IS_ERR(loghandle));
-               /* new llog can be created concurrently */
-               if (!llog_exist(loghandle)) {
-                       rc = llog_cat_new_log(env, cathandle, loghandle, th);
-                       if (rc < 0) {
-                               up_write(&loghandle->lgh_lock);
-                               RETURN(rc);
-                       }
-               }
-               /* now let's try to add the record */
-               rc = llog_write_rec(env, loghandle, rec, reccookie,
-                                   LLOG_NEXT_IDX, th);
-               if (rc < 0)
-                       CERROR("llog_write_rec %d: lh=%p\n", rc, loghandle);
-               up_write(&loghandle->lgh_lock);
+
+       if (rc == -ENOBUFS) {
+               if (retried++ == 0)
+                       GOTO(retry, rc);
+               CERROR("%s: error on 2nd llog: rc = %d\n",
+                      cathandle->lgh_ctxt->loc_obd->obd_name, rc);
        }
 
        RETURN(rc);
index fb8fba4..e7ae395 100644 (file)
@@ -111,6 +111,23 @@ static int llog_osd_create_new_object(const struct lu_env *env,
 }
 
 /**
+ * Implementation of the llog_operations::lop_exist
+ *
+ * This function checks that llog exists on storage.
+ *
+ * \param[in] handle   llog handle of the current llog
+ *
+ * \retval             true if llog object exists and is not just destroyed
+ * \retval             false if llog doesn't exist or just destroyed
+ */
+static int llog_osd_exist(struct llog_handle *handle)
+{
+       LASSERT(handle->lgh_obj);
+       return dt_object_exists(handle->lgh_obj) &&
+               !lu_object_is_dying(handle->lgh_obj->do_lu.lo_header);
+}
+
+/**
  * Write a padding record to the llog
  *
  * This function writes a padding record to the end of llog. That may
@@ -378,6 +395,9 @@ static int llog_osd_write_rec(const struct lu_env *env,
        CDEBUG(D_OTHER, "new record %x to "DFID"\n",
               rec->lrh_type, PFID(lu_object_fid(&o->do_lu)));
 
+       if (!llog_osd_exist(loghandle))
+               RETURN(-ENOENT);
+
        /* record length should not bigger than  */
        if (reclen > loghandle->lgh_hdr->llh_hdr.lrh_len)
                RETURN(-E2BIG);
@@ -521,6 +541,14 @@ static int llog_osd_write_rec(const struct lu_env *env,
         * process them page-at-a-time if needed.  If it will cross a chunk
         * boundary, write in a fake (but referenced) entry to pad the chunk.
         */
+
+
+       /* simulate ENOSPC when new plain llog is being added to the
+        * catalog */
+       if (OBD_FAIL_CHECK(OBD_FAIL_MDS_LLOG_CREATE_FAILED2) &&
+           llh->llh_flags & LLOG_F_IS_CAT)
+               RETURN(-ENOSPC);
+
        LASSERT(lgi->lgi_attr.la_valid & LA_SIZE);
        lgi->lgi_off = lgi->lgi_attr.la_size;
        left = chunk_size - (lgi->lgi_off & (chunk_size - 1));
@@ -535,7 +563,7 @@ static int llog_osd_write_rec(const struct lu_env *env,
        }
        /* if it's the last idx in log file, then return -ENOSPC
         * or wrap around if a catalog */
-       if ((loghandle->lgh_last_idx >= LLOG_HDR_BITMAP_SIZE(llh) - 1) ||
+       if (llog_is_full(loghandle) ||
            unlikely(llh->llh_flags & LLOG_F_IS_CAT &&
                     OBD_FAIL_PRECHECK(OBD_FAIL_CAT_RECORDS) &&
                     loghandle->lgh_last_idx >= cfs_fail_val)) {
@@ -1223,23 +1251,6 @@ out:
 }
 
 /**
- * Implementation of the llog_operations::lop_exist
- *
- * This function checks that llog exists on storage.
- *
- * \param[in] handle   llog handle of the current llog
- *
- * \retval             true if llog object exists and is not just destroyed
- * \retval             false if llog doesn't exist or just destroyed
- */
-static int llog_osd_exist(struct llog_handle *handle)
-{
-       LASSERT(handle->lgh_obj);
-       return (dt_object_exists(handle->lgh_obj) &&
-               !lu_object_is_dying(handle->lgh_obj->do_lu.lo_header));
-}
-
-/**
  * Get dir for regular fid log object
  *
  * Get directory for regular fid log object, and these regular fid log
index 39b31ba..8ef92c0 100644 (file)
@@ -583,7 +583,17 @@ static int osd_object_destroy(const struct lu_env *env,
                       obj->oo_attr.la_gid, rc);
 
        oid = obj->oo_db->db_object;
-       if (obj->oo_destroy == OSD_DESTROY_SYNC) {
+       if (unlikely(obj->oo_destroy == OSD_DESTROY_NONE)) {
+               /* this may happen if the destroy wasn't declared
+                * e.g. when the object is created and then destroyed
+                * in the same transaction - we don't need additional
+                * space for destroy specifically */
+               LASSERT(obj->oo_attr.la_size <= osd_sync_destroy_max_size);
+               rc = -dmu_object_free(osd->od_os, oid, oh->ot_tx);
+               if (rc)
+                       CERROR("%s: failed to free %s "LPU64": rc = %d\n",
+                              osd->od_svname, buf, oid, rc);
+       } else if (obj->oo_destroy == OSD_DESTROY_SYNC) {
                rc = -dmu_object_free(osd->od_os, oid, oh->ot_tx);
                if (rc)
                        CERROR("%s: failed to free %s "LPU64": rc = %d\n",
index 043a801..f43e434 100755 (executable)
@@ -5277,6 +5277,15 @@ test_60d() {
 }
 run_test 60d "test printk console message masking"
 
+test_60e() {
+       [ $PARALLEL == "yes" ] && skip "skip parallel run" && return
+       touch $DIR/$tfile
+#define OBD_FAIL_MDS_LLOG_CREATE_FAILED2  0x15b
+       do_facet mds1 lctl set_param fail_loc=0x15b
+       rm $DIR/$tfile
+}
+run_test 60e "no space while new llog is being created"
+
 test_61() {
        [ $PARALLEL == "yes" ] && skip "skip parallel run" && return
        f="$DIR/f61"