From 6fb209991e80cf71d97ad566d1a757e4a31466e0 Mon Sep 17 00:00:00 2001 From: Mikhail Pershin Date: Tue, 14 Aug 2012 12:04:57 +0400 Subject: [PATCH] LU-1302 llog: structures changes, llog_thread_info Add changes in structures, introduce llog_thread_info, protect llog header data with new lock. Signed-off-by: Mikhail Pershin Change-Id: Id1438e69d8771880481e1a335d38d3e2d1635085 Reviewed-on: http://review.whamcloud.com/3631 Tested-by: Hudson Tested-by: Maloo Reviewed-by: Jinshan Xiong Reviewed-by: Andreas Dilger --- lustre/include/lustre_log.h | 64 ++++++++++++++++++------- lustre/obdclass/class_obd.c | 5 ++ lustre/obdclass/llog.c | 100 ++++++++++++++++++++++++---------------- lustre/obdclass/llog_cat.c | 7 ++- lustre/obdclass/llog_internal.h | 32 +++++++++++++ lustre/obdclass/llog_obd.c | 19 ++++++++ 6 files changed, 169 insertions(+), 58 deletions(-) diff --git a/lustre/include/lustre_log.h b/lustre/include/lustre_log.h index e64ef1e..c3d2e67 100644 --- a/lustre/include/lustre_log.h +++ b/lustre/include/lustre_log.h @@ -69,6 +69,7 @@ #include #include #include +#include #define LOG_NAME_LIMIT(logname, name) \ snprintf(logname, sizeof(logname), "LOGS/%s", name) @@ -78,29 +79,36 @@ struct plain_handle_data { cfs_list_t phd_entry; struct llog_handle *phd_cat_handle; struct llog_cookie phd_cookie; /* cookie of this log in its cat */ - int phd_last_idx; }; struct cat_handle_data { cfs_list_t chd_head; struct llog_handle *chd_current_log; /* currently open log */ + struct llog_handle *chd_next_log; /* llog to be used next */ }; -/* In-memory descriptor for a log object or log catalog */ -struct llog_handle { - cfs_rw_semaphore_t lgh_lock; - struct llog_logid lgh_id; /* id of this log */ - struct llog_log_hdr *lgh_hdr; - struct file *lgh_file; - int lgh_last_idx; - int lgh_cur_idx; /* used during llog_process */ - __u64 lgh_cur_offset; /* used during llog_process */ - struct llog_ctxt *lgh_ctxt; - union { - struct plain_handle_data phd; - struct cat_handle_data chd; - } u; -}; +static inline void logid_to_fid(struct llog_logid *id, struct lu_fid *fid) +{ + /* For compatibility purposes we identify pre-OSD (~< 2.3.51 MDS) + * logid's by non-zero ogen (inode generation) and convert them + * into IGIF */ + if (id->lgl_ogen == 0) { + fid->f_seq = id->lgl_oseq; + fid->f_oid = id->lgl_oid; + fid->f_ver = 0; + } else { + lu_igif_build(fid, id->lgl_oid, id->lgl_ogen); + } +} + +static inline void fid_to_logid(struct lu_fid *fid, struct llog_logid *id) +{ + id->lgl_oseq = fid->f_seq; + id->lgl_oid = fid->f_oid; + id->lgl_ogen = 0; +} + +struct llog_handle; /* llog.c - general API */ typedef int (*llog_cb_t)(struct llog_handle *, struct llog_rec_hdr *, void *); @@ -266,6 +274,29 @@ struct llog_operations { /* XXX add 2 more: commit callbacks and llog recovery functions */ }; +/* In-memory descriptor for a log object or log catalog */ +struct llog_handle { + cfs_rw_semaphore_t lgh_lock; + struct llog_logid lgh_id; /* id of this log */ + struct llog_log_hdr *lgh_hdr; + cfs_spinlock_t lgh_hdr_lock; /* protect lgh_hdr data */ + union { + struct file *lgh_file; + struct dt_object *lgh_obj; + }; + int lgh_last_idx; + int lgh_cur_idx; /* used during llog_process */ + __u64 lgh_cur_offset; /* used during llog_process */ + struct llog_ctxt *lgh_ctxt; + union { + struct plain_handle_data phd; + struct cat_handle_data chd; + } u; + char *lgh_name; + void *private_data; + struct llog_operations *lgh_logops; +}; + /* llog_lvfs.c */ extern struct llog_operations llog_lvfs_ops; int llog_get_cat_list(struct obd_device *disk_obd, @@ -294,6 +325,7 @@ struct llog_ctxt { cfs_atomic_t loc_refcount; void *llog_proc_cb; long loc_flags; /* flags, see above defines */ + struct dt_object *loc_dir; }; #define LCM_NAME_SIZE 64 diff --git a/lustre/obdclass/class_obd.c b/lustre/obdclass/class_obd.c index 01996a0..1e935f4 100644 --- a/lustre/obdclass/class_obd.c +++ b/lustre/obdclass/class_obd.c @@ -566,6 +566,10 @@ int init_obdclass(void) if (err) return err; + err = llog_info_init(); + if (err) + return err; + #ifdef __KERNEL__ err = lustre_register_fs(); #endif @@ -596,6 +600,7 @@ static void cleanup_obdclass(void) OBP(obd, detach)(obd); } } + llog_info_fini(); lu_global_fini(); obd_cleanup_caches(); diff --git a/lustre/obdclass/llog.c b/lustre/obdclass/llog.c index f08d54b..658dbee 100644 --- a/lustre/obdclass/llog.c +++ b/lustre/obdclass/llog.c @@ -39,6 +39,8 @@ * if an OST or MDS fails it need only look at log(s) relevant to itself * * Author: Andreas Dilger + * Author: Alex Zhuravlev + * Author: Mikhail Pershin */ #define DEBUG_SUBSYSTEM S_LOG @@ -49,7 +51,6 @@ #include #include -#include #include "llog_internal.h" /* Allocate a new log or catalog handle */ @@ -58,11 +59,13 @@ struct llog_handle *llog_alloc_handle(void) struct llog_handle *loghandle; ENTRY; - OBD_ALLOC(loghandle, sizeof(*loghandle)); + OBD_ALLOC_PTR(loghandle); if (loghandle == NULL) RETURN(ERR_PTR(-ENOMEM)); cfs_init_rwsem(&loghandle->lgh_lock); + cfs_spin_lock_init(&loghandle->lgh_hdr_lock); + CFS_INIT_LIST_HEAD(&loghandle->u.phd.phd_entry); RETURN(loghandle); } @@ -80,10 +83,11 @@ void llog_free_handle(struct llog_handle *loghandle) cfs_list_del_init(&loghandle->u.phd.phd_entry); if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_CAT) LASSERT(cfs_list_empty(&loghandle->u.chd.chd_head)); - OBD_FREE(loghandle->lgh_hdr, LLOG_CHUNK_SIZE); + LASSERT(sizeof(*(loghandle->lgh_hdr)) == LLOG_CHUNK_SIZE); + OBD_FREE(loghandle->lgh_hdr, LLOG_CHUNK_SIZE); - out: - OBD_FREE(loghandle, sizeof(*loghandle)); +out: + OBD_FREE_PTR(loghandle); } EXPORT_SYMBOL(llog_free_handle); @@ -102,7 +106,9 @@ int llog_cancel_rec(struct llog_handle *loghandle, int index) RETURN(-EINVAL); } - if (!ext2_clear_bit(index, llh->llh_bitmap)) { + cfs_spin_lock(&loghandle->lgh_hdr_lock); + if (!ext2_clear_bit(index, llh->llh_bitmap)) { + cfs_spin_unlock(&loghandle->lgh_hdr_lock); CDEBUG(D_RPCTRACE, "Catalog index %u already clear?\n", index); RETURN(-ENOENT); } @@ -112,25 +118,38 @@ int llog_cancel_rec(struct llog_handle *loghandle, int index) if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) && (llh->llh_count == 1) && (loghandle->lgh_last_idx == (LLOG_BITMAP_BYTES * 8) - 1)) { - rc = llog_destroy(loghandle); - if (rc) { - CERROR("Failure destroying log after last cancel: %d\n", - rc); - ext2_set_bit(index, llh->llh_bitmap); - llh->llh_count++; - } else { - rc = 1; - } - RETURN(rc); - } - - rc = llog_write_rec(loghandle, &llh->llh_hdr, NULL, 0, NULL, 0); - if (rc) { - CERROR("Failure re-writing header %d\n", rc); - ext2_set_bit(index, llh->llh_bitmap); - llh->llh_count++; - } - RETURN(rc); + cfs_spin_unlock(&loghandle->lgh_hdr_lock); + rc = llog_destroy(loghandle); + if (rc < 0) { + CERROR("%s: can't destroy empty llog #"LPX64"#"LPX64 + "#%08x: rc = %d\n", + loghandle->lgh_ctxt->loc_obd->obd_name, + loghandle->lgh_id.lgl_oid, + loghandle->lgh_id.lgl_oseq, + loghandle->lgh_id.lgl_ogen, rc); + GOTO(out_err, rc); + } + RETURN(1); + } + cfs_spin_unlock(&loghandle->lgh_hdr_lock); + + rc = llog_write_rec(loghandle, &llh->llh_hdr, NULL, 0, NULL, 0); + if (rc < 0) { + CERROR("%s: fail to write header for llog #"LPX64"#"LPX64 + "#%08x: rc = %d\n", + loghandle->lgh_ctxt->loc_obd->obd_name, + loghandle->lgh_id.lgl_oid, + loghandle->lgh_id.lgl_oseq, + loghandle->lgh_id.lgl_ogen, rc); + GOTO(out_err, rc); + } + RETURN(0); +out_err: + cfs_spin_lock(&loghandle->lgh_hdr_lock); + ext2_set_bit(index, llh->llh_bitmap); + llh->llh_count++; + cfs_spin_unlock(&loghandle->lgh_hdr_lock); + return rc; } EXPORT_SYMBOL(llog_cancel_rec); @@ -142,7 +161,7 @@ int llog_init_handle(struct llog_handle *handle, int flags, ENTRY; LASSERT(handle->lgh_hdr == NULL); - OBD_ALLOC(llh, sizeof(*llh)); + OBD_ALLOC_PTR(llh); if (llh == NULL) RETURN(-ENOMEM); handle->lgh_hdr = llh; @@ -177,21 +196,20 @@ int llog_init_handle(struct llog_handle *handle, int flags, out: if (flags & LLOG_F_IS_CAT) { - CFS_INIT_LIST_HEAD(&handle->u.chd.chd_head); - llh->llh_size = sizeof(struct llog_logid_rec); - } else if (flags & LLOG_F_IS_PLAIN) { - CFS_INIT_LIST_HEAD(&handle->u.phd.phd_entry); - } else { - CERROR("Unknown flags: %#x (Expected %#x or %#x\n", - flags, LLOG_F_IS_CAT, LLOG_F_IS_PLAIN); - LBUG(); - } - - if (rc) { - OBD_FREE(llh, sizeof(*llh)); - handle->lgh_hdr = NULL; - } - RETURN(rc); + LASSERT(cfs_list_empty(&handle->u.chd.chd_head)); + CFS_INIT_LIST_HEAD(&handle->u.chd.chd_head); + llh->llh_size = sizeof(struct llog_logid_rec); + } else if (!(flags & LLOG_F_IS_PLAIN)) { + CERROR("Unknown flags: %#x (Expected %#x or %#x\n", + flags, LLOG_F_IS_CAT, LLOG_F_IS_PLAIN); + rc = -EINVAL; + } + + if (rc) { + OBD_FREE_PTR(llh); + handle->lgh_hdr = NULL; + } + RETURN(rc); } EXPORT_SYMBOL(llog_init_handle); diff --git a/lustre/obdclass/llog_cat.c b/lustre/obdclass/llog_cat.c index f0fa468..54e911f 100644 --- a/lustre/obdclass/llog_cat.c +++ b/lustre/obdclass/llog_cat.c @@ -91,13 +91,18 @@ static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle) if (index == 0) index = 1; + + cfs_spin_lock(&loghandle->lgh_hdr_lock); + llh->llh_count++; if (ext2_set_bit(index, llh->llh_bitmap)) { CERROR("argh, index %u already set in log bitmap?\n", index); + cfs_spin_unlock(&loghandle->lgh_hdr_lock); LBUG(); /* should never happen */ } + cfs_spin_unlock(&loghandle->lgh_hdr_lock); + cathandle->lgh_last_idx = index; - llh->llh_count++; llh->llh_tail.lrt_index = index; CDEBUG(D_RPCTRACE,"new recovery log "LPX64":%x for index %u of catalog " diff --git a/lustre/obdclass/llog_internal.h b/lustre/obdclass/llog_internal.h index 6d2ac53..0fd1936 100644 --- a/lustre/obdclass/llog_internal.h +++ b/lustre/obdclass/llog_internal.h @@ -45,8 +45,40 @@ struct llog_process_info { int lpi_rc; int lpi_flags; cfs_completion_t lpi_completion; + const struct lu_env *lpi_env; + +}; + +struct llog_thread_info { + struct lu_attr lgi_attr; + struct lu_fid lgi_fid; + struct llog_logid lgi_logid; + struct dt_object_format lgi_dof; + struct llog_process_data lgi_lpd; + struct lustre_mdt_attrs lgi_lma_attr; + + struct lu_buf lgi_buf; + loff_t lgi_off; + + struct llog_rec_hdr lgi_lrh; + struct llog_rec_tail lgi_tail; + struct llog_logid_rec lgi_lid; }; +extern struct lu_context_key llog_thread_key; + +static inline struct llog_thread_info *llog_info(const struct lu_env *env) +{ + struct llog_thread_info *lgi; + + lgi = lu_context_key_get(&env->le_ctx, &llog_thread_key); + LASSERT(lgi); + return lgi; +} + +int llog_info_init(void); +void llog_info_fini(void); + int llog_cat_id2handle(struct llog_handle *cathandle, struct llog_handle **res, struct llog_logid *logid); int class_config_dump_handler(struct llog_handle * handle, diff --git a/lustre/obdclass/llog_obd.c b/lustre/obdclass/llog_obd.c index 8bf6b41..aa9c3c0 100644 --- a/lustre/obdclass/llog_obd.c +++ b/lustre/obdclass/llog_obd.c @@ -476,3 +476,22 @@ int obd_llog_finish(struct obd_device *obd, int count) RETURN(rc); } EXPORT_SYMBOL(obd_llog_finish); + +/* context key constructor/destructor: llog_key_init, llog_key_fini */ +LU_KEY_INIT_FINI(llog, struct llog_thread_info); +/* context key: llog_thread_key */ +LU_CONTEXT_KEY_DEFINE(llog, LCT_MD_THREAD | LCT_MG_THREAD | LCT_LOCAL); +LU_KEY_INIT_GENERIC(llog); +EXPORT_SYMBOL(llog_thread_key); + +int llog_info_init(void) +{ + llog_key_init_generic(&llog_thread_key, NULL); + lu_context_key_register(&llog_thread_key); + return 0; +} + +void llog_info_fini(void) +{ + lu_context_key_degister(&llog_thread_key); +} -- 1.8.3.1