Whamcloud - gitweb
LU-1897 test: replay-single test_70b dbench not found
[fs/lustre-release.git] / lustre / obdclass / llog_lvfs.c
index 763649b..1debe27 100644 (file)
@@ -26,6 +26,8 @@
 /*
  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
+ *
+ * Copyright (c) 2011, 2012, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -216,7 +218,7 @@ static int llog_lvfs_write_rec(const struct lu_env *env,
                               struct llog_handle *loghandle,
                               struct llog_rec_hdr *rec,
                               struct llog_cookie *reccookie, int cookiecount,
-                              void *buf, int idx)
+                              void *buf, int idx, struct thandle *th)
 {
         struct llog_log_hdr *llh;
         int reclen = rec->lrh_len, index, rc;
@@ -266,12 +268,7 @@ static int llog_lvfs_write_rec(const struct lu_env *env,
                 if (rc || idx == 0)
                         RETURN(rc);
 
-                /* Assumes constant lrh_len */
-                saved_offset = sizeof(*llh) + (idx - 1) * reclen;
-
                 if (buf) {
-                        struct llog_rec_hdr check;
-
                         /* We assume that caller has set lgh_cur_* */
                         saved_offset = loghandle->lgh_cur_offset;
                         CDEBUG(D_OTHER,
@@ -285,19 +282,10 @@ static int llog_lvfs_write_rec(const struct lu_env *env,
                                        idx, loghandle->lgh_cur_idx);
                                 RETURN(-EFAULT);
                         }
-#if 1  /* FIXME remove this safety check at some point */
-                        /* Verify that the record we're modifying is the
-                           right one. */
-                        rc = llog_lvfs_read_blob(obd, file, &check,
-                                                 sizeof(check), saved_offset);
-                        if (check.lrh_index != idx || check.lrh_len != reclen) {
-                                CERROR("Bad modify idx %u/%u size %u/%u (%d)\n",
-                                       idx, check.lrh_index, reclen,
-                                       check.lrh_len, rc);
-                                RETURN(-EFAULT);
-                        }
-#endif
-                }
+               } else {
+                       /* Assumes constant lrh_len */
+                       saved_offset = sizeof(*llh) + (idx - 1) * reclen;
+               }
 
                 rc = llog_lvfs_write_blob(obd, file, rec, buf, saved_offset);
                 if (rc == 0 && reccookie) {
@@ -342,11 +330,14 @@ static int llog_lvfs_write_rec(const struct lu_env *env,
         /*The caller should make sure only 1 process access the lgh_last_idx,
          *Otherwise it might hit the assert.*/
         LASSERT(index < LLOG_BITMAP_SIZE(llh));
-        if (ext2_set_bit(index, llh->llh_bitmap)) {
-                CERROR("argh, index %u already set in log bitmap?\n", index);
-                LBUG(); /* should never happen */
-        }
-        llh->llh_count++;
+       spin_lock(&loghandle->lgh_hdr_lock);
+       if (ext2_set_bit(index, llh->llh_bitmap)) {
+               CERROR("argh, index %u already set in log bitmap?\n", index);
+               spin_unlock(&loghandle->lgh_hdr_lock);
+               LBUG(); /* should never happen */
+       }
+       llh->llh_count++;
+       spin_unlock(&loghandle->lgh_hdr_lock);
         llh->llh_tail.lrt_index = index;
 
         rc = llog_lvfs_write_blob(obd, file, &llh->llh_hdr, NULL, 0);
@@ -412,17 +403,20 @@ static int llog_lvfs_next_block(const struct lu_env *env,
                next_idx, *cur_idx, *cur_offset);
 
         while (*cur_offset < i_size_read(loghandle->lgh_file->f_dentry->d_inode)) {
-                struct llog_rec_hdr *rec;
-                struct llog_rec_tail *tail;
-                loff_t ppos;
-
-                llog_skip_over(cur_offset, *cur_idx, next_idx);
-
-                ppos = *cur_offset;
-                rc = fsfilt_read_record(loghandle->lgh_ctxt->loc_exp->exp_obd,
-                                        loghandle->lgh_file, buf, len,
-                                        &ppos);
-                if (rc) {
+               struct llog_rec_hdr *rec, *last_rec;
+               struct llog_rec_tail *tail;
+               loff_t ppos;
+               int llen;
+
+               llog_skip_over(cur_offset, *cur_idx, next_idx);
+
+               /* read up to next LLOG_CHUNK_SIZE block */
+               ppos = *cur_offset;
+               llen = LLOG_CHUNK_SIZE - (*cur_offset & (LLOG_CHUNK_SIZE - 1));
+               rc = fsfilt_read_record(loghandle->lgh_ctxt->loc_exp->exp_obd,
+                                       loghandle->lgh_file, buf, llen,
+                                       cur_offset);
+               if (rc < 0) {
                         CERROR("Cant read llog block at log id "LPU64
                                "/%u offset "LPU64"\n",
                                loghandle->lgh_id.lgl_oid,
@@ -432,9 +426,7 @@ static int llog_lvfs_next_block(const struct lu_env *env,
                 }
 
                 /* put number of bytes read into rc to make code simpler */
-                rc = ppos - *cur_offset;
-                *cur_offset = ppos;
-
+               rc = *cur_offset - ppos;
                 if (rc < len) {
                         /* signal the end of the valid buffer to llog_process */
                         memset(buf + rc, 0, len - rc);
@@ -451,12 +443,20 @@ static int llog_lvfs_next_block(const struct lu_env *env,
                 }
 
                 rec = buf;
-                tail = (struct llog_rec_tail *)((char *)buf + rc -
-                                                sizeof(struct llog_rec_tail));
-
                if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
                        lustre_swab_llog_rec(rec);
 
+               tail = (struct llog_rec_tail *)(buf + rc -
+                                               sizeof(struct llog_rec_tail));
+
+               /* get the last record in block */
+               last_rec = (struct llog_rec_hdr *)(buf + rc -
+                                                  le32_to_cpu(tail->lrt_len));
+
+               if (LLOG_REC_HDR_NEEDS_SWABBING(last_rec))
+                       lustre_swab_llog_rec(last_rec);
+               LASSERT(last_rec->lrh_index == tail->lrt_index);
+
                 *cur_idx = tail->lrt_index;
 
                 /* this shouldn't happen */
@@ -498,16 +498,14 @@ static int llog_lvfs_prev_block(const struct lu_env *env,
         llog_skip_over(&cur_offset, 0, prev_idx);
 
         while (cur_offset < i_size_read(loghandle->lgh_file->f_dentry->d_inode)) {
-                struct llog_rec_hdr *rec;
-                struct llog_rec_tail *tail;
-                loff_t ppos;
-
-                ppos = cur_offset;
-
-                rc = fsfilt_read_record(loghandle->lgh_ctxt->loc_exp->exp_obd,
-                                        loghandle->lgh_file, buf, len,
-                                        &ppos);
-                if (rc) {
+               struct llog_rec_hdr *rec, *last_rec;
+               struct llog_rec_tail *tail;
+               loff_t ppos = cur_offset;
+
+               rc = fsfilt_read_record(loghandle->lgh_ctxt->loc_exp->exp_obd,
+                                       loghandle->lgh_file, buf, len,
+                                       &cur_offset);
+               if (rc < 0) {
                         CERROR("Cant read llog block at log id "LPU64
                                "/%u offset "LPU64"\n",
                                loghandle->lgh_id.lgl_oid,
@@ -517,8 +515,7 @@ static int llog_lvfs_prev_block(const struct lu_env *env,
                 }
 
                 /* put number of bytes read into rc to make code simpler */
-                rc = ppos - cur_offset;
-                cur_offset = ppos;
+               rc = cur_offset - ppos;
 
                 if (rc == 0) /* end of file, nothing to do */
                         RETURN(0);
@@ -530,7 +527,20 @@ static int llog_lvfs_prev_block(const struct lu_env *env,
                         RETURN(-EINVAL);
                 }
 
-                tail = buf + rc - sizeof(struct llog_rec_tail);
+               rec = buf;
+               if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
+                       lustre_swab_llog_rec(rec);
+
+               tail = (struct llog_rec_tail *)(buf + rc -
+                                               sizeof(struct llog_rec_tail));
+
+               /* get the last record in block */
+               last_rec = (struct llog_rec_hdr *)(buf + rc -
+                                                  le32_to_cpu(tail->lrt_len));
+
+               if (LLOG_REC_HDR_NEEDS_SWABBING(last_rec))
+                       lustre_swab_llog_rec(last_rec);
+               LASSERT(last_rec->lrh_index == tail->lrt_index);
 
                 /* this shouldn't happen */
                 if (tail->lrt_index == 0) {
@@ -539,15 +549,14 @@ static int llog_lvfs_prev_block(const struct lu_env *env,
                                loghandle->lgh_id.lgl_ogen, cur_offset);
                         RETURN(-EINVAL);
                 }
-                if (le32_to_cpu(tail->lrt_index) < prev_idx)
+               if (tail->lrt_index < prev_idx)
                         continue;
 
                 /* sanity check that the start of the new buffer is no farther
                  * than the record that we wanted.  This shouldn't happen. */
-                rec = buf;
-                if (le32_to_cpu(rec->lrh_index) > prev_idx) {
-                        CERROR("missed desired record? %u > %u\n",
-                               le32_to_cpu(rec->lrh_index), prev_idx);
+               if (rec->lrh_index > prev_idx) {
+                       CERROR("missed desired record? %u > %u\n",
+                              rec->lrh_index, prev_idx);
                         RETURN(-ENOENT);
                 }
                 RETURN(0);
@@ -627,7 +636,6 @@ static int llog_lvfs_open(const struct lu_env *env,  struct llog_handle *handle,
                               logid->lgl_ogen, rc);
                        GOTO(out, rc);
                }
-
                handle->lgh_id = *logid;
        } else if (name) {
                handle->lgh_file = llog_filp_open(MOUNT_CONFIGS_DIR, name,
@@ -683,6 +691,7 @@ static int llog_lvfs_create(const struct lu_env *env,
        struct llog_ctxt        *ctxt = handle->lgh_ctxt;
        struct obd_device       *obd;
        struct l_dentry         *dchild = NULL;
+       struct file             *file;
        struct obdo             *oa = NULL;
        int                      rc = 0;
        int                      open_flags = O_RDWR | O_CREAT | O_LARGEFILE;
@@ -695,17 +704,16 @@ static int llog_lvfs_create(const struct lu_env *env,
        LASSERT(handle->lgh_file == NULL);
 
        if (handle->lgh_name) {
-               handle->lgh_file = llog_filp_open(MOUNT_CONFIGS_DIR,
-                                                 handle->lgh_name,
-                                                 open_flags, 0644);
-               if (IS_ERR(handle->lgh_file))
-                       RETURN(PTR_ERR(handle->lgh_file));
+               file = llog_filp_open(MOUNT_CONFIGS_DIR, handle->lgh_name,
+                                     open_flags, 0644);
+               if (IS_ERR(file))
+                       RETURN(PTR_ERR(file));
 
                handle->lgh_id.lgl_oseq = FID_SEQ_LLOG;
-               handle->lgh_id.lgl_oid =
-                       handle->lgh_file->f_dentry->d_inode->i_ino;
+               handle->lgh_id.lgl_oid = file->f_dentry->d_inode->i_ino;
                handle->lgh_id.lgl_ogen =
-                       handle->lgh_file->f_dentry->d_inode->i_generation;
+                               file->f_dentry->d_inode->i_generation;
+               handle->lgh_file = file;
        } else {
                OBDO_ALLOC(oa);
                if (oa == NULL)
@@ -727,14 +735,13 @@ static int llog_lvfs_create(const struct lu_env *env,
                if (IS_ERR(dchild))
                        GOTO(out, rc = PTR_ERR(dchild));
 
-               handle->lgh_file = l_dentry_open(&obd->obd_lvfs_ctxt, dchild,
-                                                open_flags);
-               if (IS_ERR(handle->lgh_file))
-                       GOTO(out, rc = PTR_ERR(handle->lgh_file));
-
+               file = l_dentry_open(&obd->obd_lvfs_ctxt, dchild, open_flags);
+               if (IS_ERR(file))
+                       GOTO(out, rc = PTR_ERR(file));
                handle->lgh_id.lgl_oseq = oa->o_seq;
                handle->lgh_id.lgl_oid = oa->o_id;
                handle->lgh_id.lgl_ogen = oa->o_generation;
+               handle->lgh_file = file;
 out:
                OBDO_FREE(oa);
        }
@@ -757,8 +764,10 @@ static int llog_lvfs_close(const struct lu_env *env,
                       handle->lgh_id.lgl_oid, handle->lgh_id.lgl_oseq,
                       handle->lgh_id.lgl_ogen, rc);
        handle->lgh_file = NULL;
-       if (handle->lgh_name)
+       if (handle->lgh_name) {
                OBD_FREE(handle->lgh_name, strlen(handle->lgh_name) + 1);
+               handle->lgh_name = NULL;
+       }
        RETURN(rc);
 }
 
@@ -930,6 +939,21 @@ out1:
 }
 EXPORT_SYMBOL(llog_put_cat_list);
 
+static int llog_lvfs_declare_create(const struct lu_env *env,
+                                   struct llog_handle *res,
+                                   struct thandle *th)
+{
+       return 0;
+}
+
+static int llog_lvfs_declare_write_rec(const struct lu_env *env,
+                                      struct llog_handle *loghandle,
+                                      struct llog_rec_hdr *rec,
+                                      int idx, struct thandle *th)
+{
+       return 0;
+}
+
 struct llog_operations llog_lvfs_ops = {
        .lop_write_rec          = llog_lvfs_write_rec,
        .lop_next_block         = llog_lvfs_next_block,
@@ -940,6 +964,8 @@ struct llog_operations llog_lvfs_ops = {
        .lop_close              = llog_lvfs_close,
        .lop_open               = llog_lvfs_open,
        .lop_exist              = llog_lvfs_exist,
+       .lop_declare_create     = llog_lvfs_declare_create,
+       .lop_declare_write_rec  = llog_lvfs_declare_write_rec,
 };
 EXPORT_SYMBOL(llog_lvfs_ops);
 #else /* !__KERNEL__ */