Whamcloud - gitweb
LU-7064 obd: detect errors from llog_declare_destroy()
[fs/lustre-release.git] / lustre / obdclass / llog.c
index 95c93f5..0ab7a34 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2012, 2014, Intel Corporation.
+ * Copyright (c) 2012, 2015, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
 #define DEBUG_SUBSYSTEM S_LOG
 
 #include <linux/kthread.h>
-#include <obd_class.h>
+#include <llog_swab.h>
 #include <lustre_log.h>
+#include <obd_class.h>
 #include "llog_internal.h"
-
 /*
  * Allocate a new log or catalog handle
  * Used inside llog_open().
@@ -229,8 +229,11 @@ int llog_cancel_rec(const struct lu_env *env, struct llog_handle *loghandle,
        if (rc < 0)
                GOTO(out_trans, rc);
 
-       if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY))
+       if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY)) {
                rc = llog_declare_destroy(env, loghandle, th);
+               if (rc < 0)
+                       GOTO(out_trans, rc);
+       }
 
        th->th_wait_submit = 1;
        rc = dt_trans_start_local(env, dt, th);
@@ -258,7 +261,12 @@ int llog_cancel_rec(const struct lu_env *env, struct llog_handle *loghandle,
 
        if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
            (llh->llh_count == 1) &&
-           (loghandle->lgh_last_idx == LLOG_HDR_BITMAP_SIZE(llh) - 1)) {
+           ((loghandle->lgh_last_idx == LLOG_HDR_BITMAP_SIZE(llh) - 1) ||
+            (loghandle->u.phd.phd_cat_handle != NULL &&
+             loghandle->u.phd.phd_cat_handle->u.chd.chd_current_log !=
+               loghandle))) {
+               /* never try to destroy it again */
+               llh->llh_flags &= ~LLOG_F_ZAP_WHEN_EMPTY;
                rc = llog_trans_destroy(env, loghandle, th);
                if (rc < 0) {
                        /* Sigh, can not destroy the final plain llog, but
@@ -291,12 +299,12 @@ out_trans:
        RETURN(rc);
 }
 
-static int llog_read_header(const struct lu_env *env,
-                           struct llog_handle *handle,
-                           struct obd_uuid *uuid)
+int llog_read_header(const struct lu_env *env, struct llog_handle *handle,
+                    const struct obd_uuid *uuid)
 {
        struct llog_operations *lop;
        int rc;
+       ENTRY;
 
        rc = llog_handle2ops(handle, &lop);
        if (rc)
@@ -311,6 +319,7 @@ static int llog_read_header(const struct lu_env *env,
 
                /* lrh_len should be initialized in llog_init_handle */
                handle->lgh_last_idx = 0; /* header is record with index 0 */
+               handle->lgh_write_offset = 0;
                llh->llh_count = 1;         /* for the header record */
                llh->llh_hdr.lrh_type = LLOG_HDR_MAGIC;
                LASSERT(handle->lgh_ctxt->loc_chunk_size >=
@@ -322,13 +331,19 @@ static int llog_read_header(const struct lu_env *env,
                        memcpy(&llh->llh_tgtuuid, uuid,
                               sizeof(llh->llh_tgtuuid));
                llh->llh_bitmap_offset = offsetof(typeof(*llh), llh_bitmap);
+               /* Since update llog header might also call this function,
+                * let's reset the bitmap to 0 here */
+               memset(LLOG_HDR_BITMAP(llh), 0, llh->llh_hdr.lrh_len -
+                                               llh->llh_bitmap_offset -
+                                               sizeof(llh->llh_tail));
                ext2_set_bit(0, LLOG_HDR_BITMAP(llh));
                LLOG_HDR_TAIL(llh)->lrt_len = llh->llh_hdr.lrh_len;
                LLOG_HDR_TAIL(llh)->lrt_index = llh->llh_hdr.lrh_index;
                rc = 0;
        }
-       return rc;
+       RETURN(rc);
 }
+EXPORT_SYMBOL(llog_read_header);
 
 int llog_init_handle(const struct lu_env *env, struct llog_handle *handle,
                     int flags, struct obd_uuid *uuid)
@@ -412,7 +427,7 @@ static int llog_process_thread(void *arg)
        struct llog_process_cat_data    *cd  = lpi->lpi_catdata;
        char                            *buf;
        size_t                           chunk_size;
-       __u64                            cur_offset;
+       __u64                            cur_offset, tmp_offset;
        int                              rc = 0, index = 1, last_index;
        int                              saved_index = 0;
        int                              last_called_index = 0;
@@ -472,7 +487,8 @@ repeat:
                 * The absolute offset of the current chunk is calculated
                 * from cur_offset value and stored in chunk_offset variable.
                 */
-               if (cur_offset % chunk_size != 0) {
+               tmp_offset = cur_offset;
+               if (do_div(tmp_offset, chunk_size) != 0) {
                        partial_chunk = true;
                        chunk_offset = cur_offset & ~(chunk_size - 1);
                } else {
@@ -571,17 +587,30 @@ out:
                cd->lpcd_last_idx = last_called_index;
 
        if (unlikely(rc == -EIO && loghandle->lgh_obj != NULL)) {
-               /* something bad happened to the processing of a local
-                * llog file, probably I/O error or the log got corrupted..
-                * to be able to finally release the log we discard any
-                * remaining bits in the header */
-               CERROR("Local llog found corrupted\n");
-               while (index <= last_index) {
-                       if (ext2_test_bit(index, LLOG_HDR_BITMAP(llh)) != 0)
-                               llog_cancel_rec(lpi->lpi_env, loghandle, index);
-                       index++;
+               if (dt_object_remote(loghandle->lgh_obj)) {
+                       /* If it is remote object, then -EIO might means
+                        * disconnection or eviction, let's return -EAGAIN,
+                        * so for update recovery log processing, it will
+                        * retry until the umount or abort recovery, see
+                        * lod_sub_recovery_thread() */
+                       CERROR("%s retry remote llog process\n",
+                              loghandle->lgh_ctxt->loc_obd->obd_name);
+                       rc = -EAGAIN;
+               } else {
+                       /* something bad happened to the processing of a local
+                        * llog file, probably I/O error or the log got
+                        * corrupted to be able to finally release the log we
+                        * discard any remaining bits in the header */
+                       CERROR("Local llog found corrupted\n");
+                       while (index <= last_index) {
+                               if (ext2_test_bit(index,
+                                                 LLOG_HDR_BITMAP(llh)) != 0)
+                                       llog_cancel_rec(lpi->lpi_env, loghandle,
+                                                       index);
+                               index++;
+                       }
+                       rc = 0;
                }
-               rc = 0;
        }
 
        OBD_FREE_LARGE(buf, chunk_size);
@@ -1221,3 +1250,21 @@ out_close:
        RETURN(rc);
 }
 EXPORT_SYMBOL(llog_backup);
+
+/* Get size of llog */
+__u64 llog_size(const struct lu_env *env, struct llog_handle *llh)
+{
+       int rc;
+       struct lu_attr la;
+
+       rc = llh->lgh_obj->do_ops->do_attr_get(env, llh->lgh_obj, &la);
+       if (rc) {
+               CERROR("%s: attr_get failed, rc = %d\n",
+                      llh->lgh_ctxt->loc_obd->obd_name, rc);
+               return 0;
+       }
+
+       return la.la_size;
+}
+EXPORT_SYMBOL(llog_size);
+