Whamcloud - gitweb
LU-1302 llog: structures changes, llog_thread_info
authorMikhail Pershin <tappro@whamcloud.com>
Tue, 14 Aug 2012 08:04:57 +0000 (12:04 +0400)
committerOleg Drokin <green@whamcloud.com>
Fri, 31 Aug 2012 22:51:40 +0000 (18:51 -0400)
Add changes in structures, introduce llog_thread_info,
protect llog header data with new lock.

Signed-off-by: Mikhail Pershin <tappro@whamcloud.com>
Change-Id: Id1438e69d8771880481e1a335d38d3e2d1635085
Reviewed-on: http://review.whamcloud.com/3631
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Jinshan Xiong <jinshan.xiong@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
lustre/include/lustre_log.h
lustre/obdclass/class_obd.c
lustre/obdclass/llog.c
lustre/obdclass/llog_cat.c
lustre/obdclass/llog_internal.h
lustre/obdclass/llog_obd.c

index e64ef1e..c3d2e67 100644 (file)
@@ -69,6 +69,7 @@
 #include <obd_class.h>
 #include <obd_ost.h>
 #include <lustre/lustre_idl.h>
 #include <obd_class.h>
 #include <obd_ost.h>
 #include <lustre/lustre_idl.h>
+#include <dt_object.h>
 
 #define LOG_NAME_LIMIT(logname, name)                   \
         snprintf(logname, sizeof(logname), "LOGS/%s", name)
 
 #define LOG_NAME_LIMIT(logname, name)                   \
         snprintf(logname, sizeof(logname), "LOGS/%s", name)
@@ -78,29 +79,36 @@ struct plain_handle_data {
         cfs_list_t          phd_entry;
         struct llog_handle *phd_cat_handle;
         struct llog_cookie  phd_cookie; /* cookie of this log in its cat */
         cfs_list_t          phd_entry;
         struct llog_handle *phd_cat_handle;
         struct llog_cookie  phd_cookie; /* cookie of this log in its cat */
-        int                 phd_last_idx;
 };
 
 struct cat_handle_data {
         cfs_list_t              chd_head;
         struct llog_handle     *chd_current_log; /* currently open log */
 };
 
 struct cat_handle_data {
         cfs_list_t              chd_head;
         struct llog_handle     *chd_current_log; /* currently open log */
+       struct llog_handle      *chd_next_log; /* llog to be used next */
 };
 
 };
 
-/* In-memory descriptor for a log object or log catalog */
-struct llog_handle {
-        cfs_rw_semaphore_t      lgh_lock;
-        struct llog_logid       lgh_id;              /* id of this log */
-        struct llog_log_hdr    *lgh_hdr;
-        struct file            *lgh_file;
-        int                     lgh_last_idx;
-        int                     lgh_cur_idx;    /* used during llog_process */
-        __u64                   lgh_cur_offset; /* used during llog_process */
-        struct llog_ctxt       *lgh_ctxt;
-        union {
-                struct plain_handle_data phd;
-                struct cat_handle_data   chd;
-        } u;
-};
+static inline void logid_to_fid(struct llog_logid *id, struct lu_fid *fid)
+{
+       /* For compatibility purposes we identify pre-OSD (~< 2.3.51 MDS)
+        * logid's by non-zero ogen (inode generation) and convert them
+        * into IGIF */
+       if (id->lgl_ogen == 0) {
+               fid->f_seq = id->lgl_oseq;
+               fid->f_oid = id->lgl_oid;
+               fid->f_ver = 0;
+       } else {
+               lu_igif_build(fid, id->lgl_oid, id->lgl_ogen);
+       }
+}
+
+static inline void fid_to_logid(struct lu_fid *fid, struct llog_logid *id)
+{
+       id->lgl_oseq = fid->f_seq;
+       id->lgl_oid = fid->f_oid;
+       id->lgl_ogen = 0;
+}
+
+struct llog_handle;
 
 /* llog.c  -  general API */
 typedef int (*llog_cb_t)(struct llog_handle *, struct llog_rec_hdr *, void *);
 
 /* llog.c  -  general API */
 typedef int (*llog_cb_t)(struct llog_handle *, struct llog_rec_hdr *, void *);
@@ -266,6 +274,29 @@ struct llog_operations {
         /* XXX add 2 more: commit callbacks and llog recovery functions */
 };
 
         /* XXX add 2 more: commit callbacks and llog recovery functions */
 };
 
+/* In-memory descriptor for a log object or log catalog */
+struct llog_handle {
+       cfs_rw_semaphore_t       lgh_lock;
+       struct llog_logid        lgh_id; /* id of this log */
+       struct llog_log_hdr     *lgh_hdr;
+       cfs_spinlock_t           lgh_hdr_lock; /* protect lgh_hdr data */
+       union {
+               struct file             *lgh_file;
+               struct dt_object        *lgh_obj;
+       };
+       int                      lgh_last_idx;
+       int                      lgh_cur_idx; /* used during llog_process */
+       __u64                    lgh_cur_offset; /* used during llog_process */
+       struct llog_ctxt        *lgh_ctxt;
+       union {
+               struct plain_handle_data         phd;
+               struct cat_handle_data           chd;
+       } u;
+       char                    *lgh_name;
+       void                    *private_data;
+       struct llog_operations  *lgh_logops;
+};
+
 /* llog_lvfs.c */
 extern struct llog_operations llog_lvfs_ops;
 int llog_get_cat_list(struct obd_device *disk_obd,
 /* llog_lvfs.c */
 extern struct llog_operations llog_lvfs_ops;
 int llog_get_cat_list(struct obd_device *disk_obd,
@@ -294,6 +325,7 @@ struct llog_ctxt {
         cfs_atomic_t             loc_refcount;
         void                    *llog_proc_cb;
         long                     loc_flags; /* flags, see above defines */
         cfs_atomic_t             loc_refcount;
         void                    *llog_proc_cb;
         long                     loc_flags; /* flags, see above defines */
+       struct dt_object        *loc_dir;
 };
 
 #define LCM_NAME_SIZE 64
 };
 
 #define LCM_NAME_SIZE 64
index 01996a0..1e935f4 100644 (file)
@@ -566,6 +566,10 @@ int init_obdclass(void)
         if (err)
                 return err;
 
         if (err)
                 return err;
 
+       err = llog_info_init();
+       if (err)
+               return err;
+
 #ifdef __KERNEL__
         err = lustre_register_fs();
 #endif
 #ifdef __KERNEL__
         err = lustre_register_fs();
 #endif
@@ -596,6 +600,7 @@ static void cleanup_obdclass(void)
                         OBP(obd, detach)(obd);
                 }
         }
                         OBP(obd, detach)(obd);
                 }
         }
+       llog_info_fini();
         lu_global_fini();
 
         obd_cleanup_caches();
         lu_global_fini();
 
         obd_cleanup_caches();
index f08d54b..658dbee 100644 (file)
@@ -39,6 +39,8 @@
  *   if an OST or MDS fails it need only look at log(s) relevant to itself
  *
  * Author: Andreas Dilger <adilger@clusterfs.com>
  *   if an OST or MDS fails it need only look at log(s) relevant to itself
  *
  * Author: Andreas Dilger <adilger@clusterfs.com>
+ * Author: Alex Zhuravlev <bzzz@whamcloud.com>
+ * Author: Mikhail Pershin <tappro@whamcloud.com>
  */
 
 #define DEBUG_SUBSYSTEM S_LOG
  */
 
 #define DEBUG_SUBSYSTEM S_LOG
@@ -49,7 +51,6 @@
 
 #include <obd_class.h>
 #include <lustre_log.h>
 
 #include <obd_class.h>
 #include <lustre_log.h>
-#include <libcfs/list.h>
 #include "llog_internal.h"
 
 /* Allocate a new log or catalog handle */
 #include "llog_internal.h"
 
 /* Allocate a new log or catalog handle */
@@ -58,11 +59,13 @@ struct llog_handle *llog_alloc_handle(void)
         struct llog_handle *loghandle;
         ENTRY;
 
         struct llog_handle *loghandle;
         ENTRY;
 
-        OBD_ALLOC(loghandle, sizeof(*loghandle));
+       OBD_ALLOC_PTR(loghandle);
         if (loghandle == NULL)
                 RETURN(ERR_PTR(-ENOMEM));
 
         cfs_init_rwsem(&loghandle->lgh_lock);
         if (loghandle == NULL)
                 RETURN(ERR_PTR(-ENOMEM));
 
         cfs_init_rwsem(&loghandle->lgh_lock);
+       cfs_spin_lock_init(&loghandle->lgh_hdr_lock);
+       CFS_INIT_LIST_HEAD(&loghandle->u.phd.phd_entry);
 
         RETURN(loghandle);
 }
 
         RETURN(loghandle);
 }
@@ -80,10 +83,11 @@ void llog_free_handle(struct llog_handle *loghandle)
                 cfs_list_del_init(&loghandle->u.phd.phd_entry);
         if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_CAT)
                 LASSERT(cfs_list_empty(&loghandle->u.chd.chd_head));
                 cfs_list_del_init(&loghandle->u.phd.phd_entry);
         if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_CAT)
                 LASSERT(cfs_list_empty(&loghandle->u.chd.chd_head));
-        OBD_FREE(loghandle->lgh_hdr, LLOG_CHUNK_SIZE);
+       LASSERT(sizeof(*(loghandle->lgh_hdr)) == LLOG_CHUNK_SIZE);
+       OBD_FREE(loghandle->lgh_hdr, LLOG_CHUNK_SIZE);
 
 
- out:
-        OBD_FREE(loghandle, sizeof(*loghandle));
+out:
+       OBD_FREE_PTR(loghandle);
 }
 EXPORT_SYMBOL(llog_free_handle);
 
 }
 EXPORT_SYMBOL(llog_free_handle);
 
@@ -102,7 +106,9 @@ int llog_cancel_rec(struct llog_handle *loghandle, int index)
                 RETURN(-EINVAL);
         }
 
                 RETURN(-EINVAL);
         }
 
-        if (!ext2_clear_bit(index, llh->llh_bitmap)) {
+       cfs_spin_lock(&loghandle->lgh_hdr_lock);
+       if (!ext2_clear_bit(index, llh->llh_bitmap)) {
+               cfs_spin_unlock(&loghandle->lgh_hdr_lock);
                 CDEBUG(D_RPCTRACE, "Catalog index %u already clear?\n", index);
                 RETURN(-ENOENT);
         }
                 CDEBUG(D_RPCTRACE, "Catalog index %u already clear?\n", index);
                 RETURN(-ENOENT);
         }
@@ -112,25 +118,38 @@ int llog_cancel_rec(struct llog_handle *loghandle, int index)
         if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
             (llh->llh_count == 1) &&
             (loghandle->lgh_last_idx == (LLOG_BITMAP_BYTES * 8) - 1)) {
         if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
             (llh->llh_count == 1) &&
             (loghandle->lgh_last_idx == (LLOG_BITMAP_BYTES * 8) - 1)) {
-                rc = llog_destroy(loghandle);
-                if (rc) {
-                        CERROR("Failure destroying log after last cancel: %d\n",
-                               rc);
-                        ext2_set_bit(index, llh->llh_bitmap);
-                        llh->llh_count++;
-                } else {
-                        rc = 1;
-                }
-                RETURN(rc);
-        }
-
-        rc = llog_write_rec(loghandle, &llh->llh_hdr, NULL, 0, NULL, 0);
-        if (rc) {
-                CERROR("Failure re-writing header %d\n", rc);
-                ext2_set_bit(index, llh->llh_bitmap);
-                llh->llh_count++;
-        }
-        RETURN(rc);
+               cfs_spin_unlock(&loghandle->lgh_hdr_lock);
+               rc = llog_destroy(loghandle);
+               if (rc < 0) {
+                       CERROR("%s: can't destroy empty llog #"LPX64"#"LPX64
+                              "#%08x: rc = %d\n",
+                              loghandle->lgh_ctxt->loc_obd->obd_name,
+                              loghandle->lgh_id.lgl_oid,
+                              loghandle->lgh_id.lgl_oseq,
+                              loghandle->lgh_id.lgl_ogen, rc);
+                       GOTO(out_err, rc);
+               }
+               RETURN(1);
+       }
+       cfs_spin_unlock(&loghandle->lgh_hdr_lock);
+
+       rc = llog_write_rec(loghandle, &llh->llh_hdr, NULL, 0, NULL, 0);
+       if (rc < 0) {
+               CERROR("%s: fail to write header for llog #"LPX64"#"LPX64
+                      "#%08x: rc = %d\n",
+                      loghandle->lgh_ctxt->loc_obd->obd_name,
+                      loghandle->lgh_id.lgl_oid,
+                      loghandle->lgh_id.lgl_oseq,
+                      loghandle->lgh_id.lgl_ogen, rc);
+               GOTO(out_err, rc);
+       }
+       RETURN(0);
+out_err:
+       cfs_spin_lock(&loghandle->lgh_hdr_lock);
+       ext2_set_bit(index, llh->llh_bitmap);
+       llh->llh_count++;
+       cfs_spin_unlock(&loghandle->lgh_hdr_lock);
+       return rc;
 }
 EXPORT_SYMBOL(llog_cancel_rec);
 
 }
 EXPORT_SYMBOL(llog_cancel_rec);
 
@@ -142,7 +161,7 @@ int llog_init_handle(struct llog_handle *handle, int flags,
         ENTRY;
         LASSERT(handle->lgh_hdr == NULL);
 
         ENTRY;
         LASSERT(handle->lgh_hdr == NULL);
 
-        OBD_ALLOC(llh, sizeof(*llh));
+       OBD_ALLOC_PTR(llh);
         if (llh == NULL)
                 RETURN(-ENOMEM);
         handle->lgh_hdr = llh;
         if (llh == NULL)
                 RETURN(-ENOMEM);
         handle->lgh_hdr = llh;
@@ -177,21 +196,20 @@ int llog_init_handle(struct llog_handle *handle, int flags,
 
 out:
         if (flags & LLOG_F_IS_CAT) {
 
 out:
         if (flags & LLOG_F_IS_CAT) {
-                CFS_INIT_LIST_HEAD(&handle->u.chd.chd_head);
-                llh->llh_size = sizeof(struct llog_logid_rec);
-        } else if (flags & LLOG_F_IS_PLAIN) {
-                CFS_INIT_LIST_HEAD(&handle->u.phd.phd_entry);
-        } else {
-                CERROR("Unknown flags: %#x (Expected %#x or %#x\n",
-                       flags, LLOG_F_IS_CAT, LLOG_F_IS_PLAIN);
-                LBUG();
-        }
-
-        if (rc) {
-                OBD_FREE(llh, sizeof(*llh));
-                handle->lgh_hdr = NULL;
-        }
-        RETURN(rc);
+               LASSERT(cfs_list_empty(&handle->u.chd.chd_head));
+               CFS_INIT_LIST_HEAD(&handle->u.chd.chd_head);
+               llh->llh_size = sizeof(struct llog_logid_rec);
+       } else if (!(flags & LLOG_F_IS_PLAIN)) {
+               CERROR("Unknown flags: %#x (Expected %#x or %#x\n",
+                      flags, LLOG_F_IS_CAT, LLOG_F_IS_PLAIN);
+               rc = -EINVAL;
+       }
+
+       if (rc) {
+               OBD_FREE_PTR(llh);
+               handle->lgh_hdr = NULL;
+       }
+       RETURN(rc);
 }
 EXPORT_SYMBOL(llog_init_handle);
 
 }
 EXPORT_SYMBOL(llog_init_handle);
 
index f0fa468..54e911f 100644 (file)
@@ -91,13 +91,18 @@ static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle)
 
         if (index == 0)
                 index = 1;
 
         if (index == 0)
                 index = 1;
+
+       cfs_spin_lock(&loghandle->lgh_hdr_lock);
+       llh->llh_count++;
         if (ext2_set_bit(index, llh->llh_bitmap)) {
                 CERROR("argh, index %u already set in log bitmap?\n",
                        index);
         if (ext2_set_bit(index, llh->llh_bitmap)) {
                 CERROR("argh, index %u already set in log bitmap?\n",
                        index);
+               cfs_spin_unlock(&loghandle->lgh_hdr_lock);
                 LBUG(); /* should never happen */
         }
                 LBUG(); /* should never happen */
         }
+       cfs_spin_unlock(&loghandle->lgh_hdr_lock);
+
         cathandle->lgh_last_idx = index;
         cathandle->lgh_last_idx = index;
-        llh->llh_count++;
         llh->llh_tail.lrt_index = index;
 
         CDEBUG(D_RPCTRACE,"new recovery log "LPX64":%x for index %u of catalog "
         llh->llh_tail.lrt_index = index;
 
         CDEBUG(D_RPCTRACE,"new recovery log "LPX64":%x for index %u of catalog "
index 6d2ac53..0fd1936 100644 (file)
@@ -45,8 +45,40 @@ struct llog_process_info {
         int                 lpi_rc;
         int                 lpi_flags;
         cfs_completion_t    lpi_completion;
         int                 lpi_rc;
         int                 lpi_flags;
         cfs_completion_t    lpi_completion;
+       const struct lu_env     *lpi_env;
+
+};
+
+struct llog_thread_info {
+       struct lu_attr                   lgi_attr;
+       struct lu_fid                    lgi_fid;
+       struct llog_logid                lgi_logid;
+       struct dt_object_format          lgi_dof;
+       struct llog_process_data         lgi_lpd;
+       struct lustre_mdt_attrs          lgi_lma_attr;
+
+       struct lu_buf                    lgi_buf;
+       loff_t                           lgi_off;
+
+       struct llog_rec_hdr              lgi_lrh;
+       struct llog_rec_tail             lgi_tail;
+       struct llog_logid_rec            lgi_lid;
 };
 
 };
 
+extern struct lu_context_key llog_thread_key;
+
+static inline struct llog_thread_info *llog_info(const struct lu_env *env)
+{
+       struct llog_thread_info *lgi;
+
+       lgi = lu_context_key_get(&env->le_ctx, &llog_thread_key);
+       LASSERT(lgi);
+       return lgi;
+}
+
+int llog_info_init(void);
+void llog_info_fini(void);
+
 int llog_cat_id2handle(struct llog_handle *cathandle, struct llog_handle **res,
                        struct llog_logid *logid);
 int class_config_dump_handler(struct llog_handle * handle,
 int llog_cat_id2handle(struct llog_handle *cathandle, struct llog_handle **res,
                        struct llog_logid *logid);
 int class_config_dump_handler(struct llog_handle * handle,
index 8bf6b41..aa9c3c0 100644 (file)
@@ -476,3 +476,22 @@ int obd_llog_finish(struct obd_device *obd, int count)
         RETURN(rc);
 }
 EXPORT_SYMBOL(obd_llog_finish);
         RETURN(rc);
 }
 EXPORT_SYMBOL(obd_llog_finish);
+
+/* context key constructor/destructor: llog_key_init, llog_key_fini */
+LU_KEY_INIT_FINI(llog, struct llog_thread_info);
+/* context key: llog_thread_key */
+LU_CONTEXT_KEY_DEFINE(llog, LCT_MD_THREAD | LCT_MG_THREAD | LCT_LOCAL);
+LU_KEY_INIT_GENERIC(llog);
+EXPORT_SYMBOL(llog_thread_key);
+
+int llog_info_init(void)
+{
+       llog_key_init_generic(&llog_thread_key, NULL);
+       lu_context_key_register(&llog_thread_key);
+       return 0;
+}
+
+void llog_info_fini(void)
+{
+       lu_context_key_degister(&llog_thread_key);
+}