Whamcloud - gitweb
LU-2843 llog: restore the size of llog for -ENOSPC
authorHongchao Zhang <hongchao.zhang@intel.com>
Fri, 29 Mar 2013 10:36:55 +0000 (18:36 +0800)
committerOleg Drokin <oleg.drokin@intel.com>
Wed, 3 Apr 2013 16:27:31 +0000 (12:27 -0400)
the llog is NOT aware its valid size, and if there is some invalid
space in the tail of the llog file, which could be caused if the
free space in the last block(512 bytes) of the llog file can't hold
the next record but it failed to get an extra block for -ENOSPC,
then it will mistake the data written partially for normal llog data!

Change-Id: Ie2619843b538cbb64ae21f9f2a12ff85a5a3e8b4
Signed-off-by: Hongchao Zhang <hongchao.zhang@intel.com>
Reviewed-on: http://review.whamcloud.com/5604
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Mike Pershin <mike.pershin@intel.com>
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/obdclass/llog_osd.c

index 70aa4c4..e344f00 100644 (file)
@@ -148,6 +148,9 @@ static int llog_osd_write_blob(const struct lu_env *env, struct dt_object *o,
        CDEBUG(D_OTHER, "write blob with type %x, buf %p/%u at off %llu\n",
               rec->lrh_type, buf, buflen, *off);
 
        CDEBUG(D_OTHER, "write blob with type %x, buf %p/%u at off %llu\n",
               rec->lrh_type, buf, buflen, *off);
 
+       lgi->lgi_attr.la_valid = LA_SIZE;
+       lgi->lgi_attr.la_size = *off;
+
        if (!buf) {
                lgi->lgi_buf.lb_len = buflen;
                lgi->lgi_buf.lb_buf = rec;
        if (!buf) {
                lgi->lgi_buf.lb_len = buflen;
                lgi->lgi_buf.lb_buf = rec;
@@ -155,7 +158,7 @@ static int llog_osd_write_blob(const struct lu_env *env, struct dt_object *o,
                if (rc)
                        CERROR("%s: error writing log record: rc = %d\n",
                               o->do_lu.lo_dev->ld_obd->obd_name, rc);
                if (rc)
                        CERROR("%s: error writing log record: rc = %d\n",
                               o->do_lu.lo_dev->ld_obd->obd_name, rc);
-               RETURN(rc);
+               GOTO(out, rc);
        }
 
        /* the buf case */
        }
 
        /* the buf case */
@@ -168,7 +171,7 @@ static int llog_osd_write_blob(const struct lu_env *env, struct dt_object *o,
        if (rc) {
                CERROR("%s: error writing log hdr: rc = %d\n",
                       o->do_lu.lo_dev->ld_obd->obd_name, rc);
        if (rc) {
                CERROR("%s: error writing log hdr: rc = %d\n",
                       o->do_lu.lo_dev->ld_obd->obd_name, rc);
-               GOTO(out, rc);
+               GOTO(out_unlock, rc);
        }
 
        lgi->lgi_buf.lb_len = buflen;
        }
 
        lgi->lgi_buf.lb_len = buflen;
@@ -177,7 +180,7 @@ static int llog_osd_write_blob(const struct lu_env *env, struct dt_object *o,
        if (rc) {
                CERROR("%s: error writing log buffer: rc = %d\n",
                       o->do_lu.lo_dev->ld_obd->obd_name,  rc);
        if (rc) {
                CERROR("%s: error writing log buffer: rc = %d\n",
                       o->do_lu.lo_dev->ld_obd->obd_name,  rc);
-               GOTO(out, rc);
+               GOTO(out_unlock, rc);
        }
 
        lgi->lgi_tail.lrt_len = rec->lrh_len;
        }
 
        lgi->lgi_tail.lrt_len = rec->lrh_len;
@@ -188,8 +191,18 @@ static int llog_osd_write_blob(const struct lu_env *env, struct dt_object *o,
        if (rc)
                CERROR("%s: error writing log tail: rc = %d\n",
                       o->do_lu.lo_dev->ld_obd->obd_name, rc);
        if (rc)
                CERROR("%s: error writing log tail: rc = %d\n",
                       o->do_lu.lo_dev->ld_obd->obd_name, rc);
-out:
+
+out_unlock:
        dt_write_unlock(env, o);
        dt_write_unlock(env, o);
+
+out:
+       /* cleanup the content written above */
+       if (rc) {
+               dt_punch(env, o, lgi->lgi_attr.la_size, OBD_OBJECT_EOF, th,
+                        BYPASS_CAPA);
+               dt_attr_set(env, o, &lgi->lgi_attr, th, BYPASS_CAPA);
+       }
+
        RETURN(rc);
 }
 
        RETURN(rc);
 }
 
@@ -290,6 +303,10 @@ static int llog_osd_declare_write_rec(const struct lu_env *env,
                LASSERT(ergo(rc == 0, lgi->lgi_attr.la_valid & LA_SIZE));
                if (rc)
                        RETURN(rc);
                LASSERT(ergo(rc == 0, lgi->lgi_attr.la_valid & LA_SIZE));
                if (rc)
                        RETURN(rc);
+
+               rc = dt_declare_punch(env, o, lgi->lgi_off, OBD_OBJECT_EOF, th);
+               if (rc)
+                       RETURN(rc);
        } else {
                lgi->lgi_off = 0;
        }
        } else {
                lgi->lgi_off = 0;
        }
@@ -311,7 +328,7 @@ static int llog_osd_write_rec(const struct lu_env *env,
        struct llog_thread_info *lgi = llog_info(env);
        struct llog_log_hdr     *llh;
        int                      reclen = rec->lrh_len;
        struct llog_thread_info *lgi = llog_info(env);
        struct llog_log_hdr     *llh;
        int                      reclen = rec->lrh_len;
-       int                      index, rc;
+       int                      index, rc, old_tail_idx;
        struct llog_rec_tail    *lrt;
        struct dt_object        *o;
        size_t                   left;
        struct llog_rec_tail    *lrt;
        struct dt_object        *o;
        size_t                   left;
@@ -443,23 +460,39 @@ static int llog_osd_write_rec(const struct lu_env *env,
        }
        llh->llh_count++;
        spin_unlock(&loghandle->lgh_hdr_lock);
        }
        llh->llh_count++;
        spin_unlock(&loghandle->lgh_hdr_lock);
+       old_tail_idx = llh->llh_tail.lrt_index;
        llh->llh_tail.lrt_index = index;
 
        lgi->lgi_off = 0;
        rc = llog_osd_write_blob(env, o, &llh->llh_hdr, NULL, &lgi->lgi_off,
                                 th);
        if (rc)
        llh->llh_tail.lrt_index = index;
 
        lgi->lgi_off = 0;
        rc = llog_osd_write_blob(env, o, &llh->llh_hdr, NULL, &lgi->lgi_off,
                                 th);
        if (rc)
-               RETURN(rc);
+               GOTO(out, rc);
 
        rc = dt_attr_get(env, o, &lgi->lgi_attr, NULL);
        if (rc)
 
        rc = dt_attr_get(env, o, &lgi->lgi_attr, NULL);
        if (rc)
-               RETURN(rc);
+               GOTO(out, rc);
+
        LASSERT(lgi->lgi_attr.la_valid & LA_SIZE);
        lgi->lgi_off = lgi->lgi_attr.la_size;
 
        rc = llog_osd_write_blob(env, o, rec, buf, &lgi->lgi_off, th);
        LASSERT(lgi->lgi_attr.la_valid & LA_SIZE);
        lgi->lgi_off = lgi->lgi_attr.la_size;
 
        rc = llog_osd_write_blob(env, o, rec, buf, &lgi->lgi_off, th);
-       if (rc)
-               RETURN(rc);
+
+out:
+       /* cleanup llog for error case */
+       if (rc) {
+               spin_lock(&loghandle->lgh_hdr_lock);
+               ext2_clear_bit(index, llh->llh_bitmap);
+               llh->llh_count--;
+               spin_unlock(&loghandle->lgh_hdr_lock);
+
+               /* restore the header */
+               loghandle->lgh_last_idx--;
+               llh->llh_tail.lrt_index = old_tail_idx;
+               lgi->lgi_off = 0;
+               llog_osd_write_blob(env, o, &llh->llh_hdr, NULL,
+                                   &lgi->lgi_off, th);
+       }
 
        CDEBUG(D_RPCTRACE, "added record "LPX64": idx: %u, %u\n",
               loghandle->lgh_id.lgl_oid, index, rec->lrh_len);
 
        CDEBUG(D_RPCTRACE, "added record "LPX64": idx: %u, %u\n",
               loghandle->lgh_id.lgl_oid, index, rec->lrh_len);