From 7c243a561ffe8503a6abf5c4cafef0c3566192bc Mon Sep 17 00:00:00 2001 From: Mikhail Pershin Date: Fri, 25 Apr 2014 23:01:51 +0400 Subject: [PATCH] LU-4528 llog: don't write llog in 3-steps The llog record my be written by passing just buffer and writting its header and tail separately. If any write fails then that cause partially written buffer and corrupted llog. There is special undo procedure to clean things up after that but that demands twice more credits for normal operations and for undo case which is critical in case of wide striping. Such 3-steps write can be avoided by preparing correct llog record before calling llog_write(). Patch does the following: - remove all cases with passing separate buffer for llog_write(), llog_add(), llog_cat_add() and change API according with that. - special allocation case for lustre_cfg if it is going to be written to the llog, it allocates llog header/tail and initialize it. - uniform error checks after lustre_cfg_new() call to check for NULL result always instead of ERR_PTR() - remove 3-steps write in llog_osd code completely - write llog padding record in single step too - remove duplicated code from llog_cat_new_log(). Previously it assigned the next index for the record, changed bitmap and called llog_write with new index, so that was considered as 'modification' of record but not append. Meanwhile the llog_osd_write_rec() does the same intenally for append request. Now llog_cat_new_log() just calls llog_write() with -1 index (append), avoiding all header changes outside that call. - remove numcookie parameter from llog_write_rec() because it is always 1 anyway - remove cookie parameter from llog_write() because it is not used, note, the cookie remains as parameter of llog_write_rec() for llog catalog purposes. Signed-off-by: Mikhail Pershin Change-Id: Ibf231021ece9d7cb8a3c4288e12077e11f691661 Reviewed-on: http://review.whamcloud.com/10108 Tested-by: Jenkins Reviewed-by: John L. Hammond Tested-by: Maloo Reviewed-by: Alex Zhuravlev Reviewed-by: Andreas Dilger --- lustre/include/lustre/lustre_idl.h | 6 + lustre/include/lustre_cfg.h | 43 ++- lustre/include/lustre_log.h | 32 +- lustre/mdd/mdd_device.c | 9 +- lustre/mdd/mdd_dir.c | 4 +- lustre/mdt/mdt_hsm_cdt_actions.c | 4 +- lustre/mgs/lproc_mgs.c | 38 +- lustre/mgs/mgs_internal.h | 20 +- lustre/mgs/mgs_llog.c | 396 ++++++++++----------- lustre/obdclass/llog.c | 25 +- lustre/obdclass/llog_cat.c | 73 ++-- lustre/obdclass/llog_internal.h | 2 - lustre/obdclass/llog_osd.c | 699 ++++++++++++++++++++++++------------- lustre/obdclass/llog_test.c | 79 ++--- lustre/obdclass/obd_config.c | 38 ++ lustre/obdclass/obd_mount.c | 12 +- lustre/obdclass/obd_mount_server.c | 10 +- lustre/osd-ldiskfs/osd_internal.h | 4 +- lustre/osp/osp_sync.c | 5 +- lustre/ptlrpc/sec_config.c | 46 ++- lustre/utils/lustre_cfg.c | 124 ++++--- lustre/utils/obd.c | 13 +- 22 files changed, 957 insertions(+), 725 deletions(-) diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index cbf72c9..91a3d1d 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -3276,6 +3276,12 @@ struct llog_rec_tail { (rec->lrh_len - sizeof(struct llog_rec_hdr) - \ sizeof(struct llog_rec_tail)) +static inline void *rec_tail(struct llog_rec_hdr *rec) +{ + return (void *)((char *)rec + rec->lrh_len - + sizeof(struct llog_rec_tail)); +} + struct llog_logid_rec { struct llog_rec_hdr lid_hdr; struct llog_logid lid_id; diff --git a/lustre/include/lustre_cfg.h b/lustre/include/lustre_cfg.h index b1a7561..0cb0a1e 100644 --- a/lustre/include/lustre_cfg.h +++ b/lustre/include/lustre_cfg.h @@ -242,30 +242,35 @@ static inline int lustre_cfg_len(__u32 bufcount, __u32 *buflens) #include -static inline struct lustre_cfg *lustre_cfg_new(int cmd, - struct lustre_cfg_bufs *bufs) +static inline void lustre_cfg_init(struct lustre_cfg *lcfg, int cmd, + struct lustre_cfg_bufs *bufs) { - struct lustre_cfg *lcfg; - char *ptr; - int i; + char *ptr; + int i; - ENTRY; + lcfg->lcfg_version = LUSTRE_CFG_VERSION; + lcfg->lcfg_command = cmd; + lcfg->lcfg_bufcount = bufs->lcfg_bufcount; - OBD_ALLOC(lcfg, lustre_cfg_len(bufs->lcfg_bufcount, - bufs->lcfg_buflen)); - if (!lcfg) - RETURN(ERR_PTR(-ENOMEM)); + ptr = (char *)lcfg + LCFG_HDR_SIZE(lcfg->lcfg_bufcount); + for (i = 0; i < lcfg->lcfg_bufcount; i++) { + lcfg->lcfg_buflens[i] = bufs->lcfg_buflen[i]; + LOGL((char *)bufs->lcfg_buf[i], bufs->lcfg_buflen[i], ptr); + } +} - lcfg->lcfg_version = LUSTRE_CFG_VERSION; - lcfg->lcfg_command = cmd; - lcfg->lcfg_bufcount = bufs->lcfg_bufcount; +static inline struct lustre_cfg *lustre_cfg_new(int cmd, + struct lustre_cfg_bufs *bufs) +{ + struct lustre_cfg *lcfg; - ptr = (char *)lcfg + LCFG_HDR_SIZE(lcfg->lcfg_bufcount); - for (i = 0; i < lcfg->lcfg_bufcount; i++) { - lcfg->lcfg_buflens[i] = bufs->lcfg_buflen[i]; - LOGL((char *)bufs->lcfg_buf[i], bufs->lcfg_buflen[i], ptr); - } - RETURN(lcfg); + ENTRY; + + OBD_ALLOC(lcfg, lustre_cfg_len(bufs->lcfg_bufcount, + bufs->lcfg_buflen)); + if (lcfg != NULL) + lustre_cfg_init(lcfg, cmd, bufs); + RETURN(lcfg); } static inline void lustre_cfg_free(struct lustre_cfg *lcfg) diff --git a/lustre/include/lustre_log.h b/lustre/include/lustre_log.h index 7b7a812..446e1ca 100644 --- a/lustre/include/lustre_log.h +++ b/lustre/include/lustre_log.h @@ -185,13 +185,12 @@ struct llog_process_cat_data { int llog_cat_close(const struct lu_env *env, struct llog_handle *cathandle); int llog_cat_add_rec(const struct lu_env *env, struct llog_handle *cathandle, struct llog_rec_hdr *rec, struct llog_cookie *reccookie, - void *buf, struct thandle *th); + struct thandle *th); int llog_cat_declare_add_rec(const struct lu_env *env, struct llog_handle *cathandle, struct llog_rec_hdr *rec, struct thandle *th); int llog_cat_add(const struct lu_env *env, struct llog_handle *cathandle, - struct llog_rec_hdr *rec, struct llog_cookie *reccookie, - void *buf); + struct llog_rec_hdr *rec, struct llog_cookie *reccookie); int llog_cat_cancel_records(const struct lu_env *env, struct llog_handle *cathandle, int count, struct llog_cookie *cookies); @@ -290,8 +289,8 @@ struct llog_operations { int (*lop_write_rec)(const struct lu_env *env, struct llog_handle *loghandle, struct llog_rec_hdr *rec, - struct llog_cookie *cookie, int cookiecount, - void *buf, int idx, struct thandle *th); + struct llog_cookie *cookie, + int idx, struct thandle *th); /** * Add new record in llog catalog. Does the same as llog_write_rec() * but using llog catalog. @@ -301,7 +300,7 @@ struct llog_operations { struct llog_rec_hdr *rec, struct thandle *th); int (*lop_add)(const struct lu_env *env, struct llog_handle *lgh, struct llog_rec_hdr *rec, struct llog_cookie *cookie, - void *buf, struct thandle *th); + struct thandle *th); }; /* In-memory descriptor for a log object or log catalog */ @@ -551,6 +550,20 @@ static inline int llog_connect(struct llog_ctxt *ctxt, RETURN(rc); } +struct llog_cfg_rec { + struct llog_rec_hdr lcr_hdr; + struct lustre_cfg lcr_cfg; + struct llog_rec_tail lcr_tail; +}; + +struct llog_cfg_rec *lustre_cfg_rec_new(int cmd, struct lustre_cfg_bufs *bufs); +void lustre_cfg_rec_free(struct llog_cfg_rec *lcr); + +enum { + LLOG_NEXT_IDX = -1, + LLOG_HEADER_IDX = 0, +}; + /* llog.c */ int llog_exist(struct llog_handle *loghandle); int llog_declare_create(const struct lu_env *env, @@ -563,10 +576,10 @@ int llog_declare_write_rec(const struct lu_env *env, struct thandle *th); int llog_write_rec(const struct lu_env *env, struct llog_handle *handle, struct llog_rec_hdr *rec, struct llog_cookie *logcookies, - int numcookies, void *buf, int idx, struct thandle *th); + int idx, struct thandle *th); int llog_add(const struct lu_env *env, struct llog_handle *lgh, struct llog_rec_hdr *rec, struct llog_cookie *logcookies, - void *buf, struct thandle *th); + struct thandle *th); int llog_declare_add(const struct lu_env *env, struct llog_handle *lgh, struct llog_rec_hdr *rec, struct thandle *th); int lustre_process_log(struct super_block *sb, char *logname, @@ -579,8 +592,7 @@ int llog_open_create(const struct lu_env *env, struct llog_ctxt *ctxt, int llog_erase(const struct lu_env *env, struct llog_ctxt *ctxt, struct llog_logid *logid, char *name); int llog_write(const struct lu_env *env, struct llog_handle *loghandle, - struct llog_rec_hdr *rec, struct llog_cookie *reccookie, - int cookiecount, void *buf, int idx); + struct llog_rec_hdr *rec, int idx); /** @} log */ diff --git a/lustre/mdd/mdd_device.c b/lustre/mdd/mdd_device.c index 07b606d..4f0baa4 100644 --- a/lustre/mdd/mdd_device.c +++ b/lustre/mdd/mdd_device.c @@ -535,7 +535,7 @@ int mdd_changelog_write_header(const struct lu_env *env, ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT); LASSERT(ctxt); - rc = llog_cat_add(env, ctxt->loc_handle, &rec->cr_hdr, NULL, NULL); + rc = llog_cat_add(env, ctxt->loc_handle, &rec->cr_hdr, NULL); if (rc > 0) rc = 0; llog_ctxt_put(ctxt); @@ -1305,7 +1305,7 @@ static int mdd_changelog_user_register(const struct lu_env *env, rec->cur_endrec = mdd->mdd_cl.mc_index; spin_unlock(&mdd->mdd_cl.mc_user_lock); - rc = llog_cat_add(env, ctxt->loc_handle, &rec->cur_hdr, NULL, NULL); + rc = llog_cat_add(env, ctxt->loc_handle, &rec->cur_hdr, NULL); CDEBUG(D_IOCTL, "Registered changelog user %d\n", *id); out: @@ -1380,10 +1380,7 @@ static int mdd_changelog_user_purge_cb(const struct lu_env *env, CDEBUG(D_IOCTL, "Rewriting changelog user %d endrec to "LPU64"\n", mcud->mcud_id, rec->cur_endrec); - /* hdr+1 is loc of data */ - hdr->lrh_len -= sizeof(*hdr) + sizeof(struct llog_rec_tail); - rc = llog_write(env, llh, hdr, NULL, 0, (void *)(hdr + 1), - hdr->lrh_index); + rc = llog_write(env, llh, hdr, hdr->lrh_index); RETURN(rc); } diff --git a/lustre/mdd/mdd_dir.c b/lustre/mdd/mdd_dir.c index f593270..98e4e28 100644 --- a/lustre/mdd/mdd_dir.c +++ b/lustre/mdd/mdd_dir.c @@ -723,7 +723,7 @@ int mdd_changelog_store(const struct lu_env *env, struct mdd_device *mdd, if (ctxt == NULL) return -ENXIO; - rc = llog_add(env, ctxt->loc_handle, &rec->cr_hdr, NULL, NULL, th); + rc = llog_add(env, ctxt->loc_handle, &rec->cr_hdr, NULL, th); llog_ctxt_put(ctxt); if (rc > 0) rc = 0; @@ -762,7 +762,7 @@ mdd_changelog_ext_store(const struct lu_env *env, struct mdd_device *mdd, return -ENXIO; /* nested journal transaction */ - rc = llog_add(env, ctxt->loc_handle, &rec->cr_hdr, NULL, NULL, th); + rc = llog_add(env, ctxt->loc_handle, &rec->cr_hdr, NULL, th); llog_ctxt_put(ctxt); if (rc > 0) rc = 0; diff --git a/lustre/mdt/mdt_hsm_cdt_actions.c b/lustre/mdt/mdt_hsm_cdt_actions.c index da7f5a9..0f5e0fe 100644 --- a/lustre/mdt/mdt_hsm_cdt_actions.c +++ b/lustre/mdt/mdt_hsm_cdt_actions.c @@ -161,7 +161,7 @@ int mdt_agent_record_add(const struct lu_env *env, hai->hai_cookie = cdt->cdt_last_cookie; } larr->arr_hai.hai_cookie = hai->hai_cookie; - rc = llog_cat_add(env, lctxt->loc_handle, &larr->arr_hdr, NULL, NULL); + rc = llog_cat_add(env, lctxt->loc_handle, &larr->arr_hdr, NULL); if (rc > 0) rc = 0; @@ -313,7 +313,7 @@ int mdt_agent_llog_update_rec(const struct lu_env *env, larr->arr_hdr.lrh_id = 0; larr->arr_hdr.lrh_index = 0; rc = llog_cat_add(env, llh->u.phd.phd_cat_handle, &larr->arr_hdr, - NULL, NULL); + NULL); larr->arr_hdr = saved_hdr; RETURN(rc); } diff --git a/lustre/mgs/lproc_mgs.c b/lustre/mgs/lproc_mgs.c index f0ea956..20289cf 100644 --- a/lustre/mgs/lproc_mgs.c +++ b/lustre/mgs/lproc_mgs.c @@ -47,15 +47,16 @@ static int mgs_fs_seq_show(struct seq_file *seq, void *v) { - struct obd_device *obd = seq->private; - struct mgs_device *mgs; - cfs_list_t list; - struct mgs_direntry *dirent, *n; - struct lu_env env; - int rc, len; - ENTRY; - - LASSERT(obd != NULL); + struct obd_device *obd = seq->private; + struct mgs_device *mgs; + struct list_head list; + struct mgs_direntry *dirent, *n; + struct lu_env env; + int rc, len; + + ENTRY; + + LASSERT(obd != NULL); LASSERT(obd->obd_lu_dev != NULL); mgs = lu2mgs_dev(obd->obd_lu_dev); @@ -64,22 +65,21 @@ static int mgs_fs_seq_show(struct seq_file *seq, void *v) RETURN(rc); rc = class_dentry_readdir(&env, mgs, &list); - if (rc) { - CERROR("Can't read config dir\n"); + if (rc) GOTO(out, rc); - } - cfs_list_for_each_entry_safe(dirent, n, &list, list) { - cfs_list_del(&dirent->list); - len = strlen(dirent->name); + + list_for_each_entry_safe(dirent, n, &list, mde_list) { + list_del_init(&dirent->mde_list); + len = strlen(dirent->mde_name); if (len > 7 && - strncmp(dirent->name + len - 7, "-client", len) == 0) - seq_printf(seq, "%.*s\n", len - 7, dirent->name); + strncmp(dirent->mde_name + len - 7, "-client", len) == 0) + seq_printf(seq, "%.*s\n", len - 7, dirent->mde_name); mgs_direntry_free(dirent); - } + } out: lu_env_fini(&env); - RETURN(0); + RETURN(0); } LPROC_SEQ_FOPS_RO(mgs_fs); diff --git a/lustre/mgs/mgs_internal.h b/lustre/mgs/mgs_internal.h index 0a33f81..f326a13 100644 --- a/lustre/mgs/mgs_internal.h +++ b/lustre/mgs/mgs_internal.h @@ -375,16 +375,17 @@ static inline struct dt_object *dt_object_child(struct dt_object *o) struct dt_object, do_lu); } struct mgs_direntry { - cfs_list_t list; - char *name; - int len; + struct list_head mde_list; + char *mde_name; + int mde_len; }; static inline void mgs_direntry_free(struct mgs_direntry *de) { + LASSERT(list_empty(&de->mde_list)); if (de) { - LASSERT(de->len); - OBD_FREE(de->name, de->len); + LASSERT(de->mde_len); + OBD_FREE(de->mde_name, de->mde_len); OBD_FREE_PTR(de); } } @@ -397,19 +398,20 @@ static inline struct mgs_direntry *mgs_direntry_alloc(int len) if (de == NULL) return NULL; - OBD_ALLOC(de->name, len); - if (de->name == NULL) { + OBD_ALLOC(de->mde_name, len); + if (de->mde_name == NULL) { OBD_FREE_PTR(de); return NULL; } - de->len = len; + de->mde_len = len; + INIT_LIST_HEAD(&de->mde_list); return de; } /* mgs_llog.c */ int class_dentry_readdir(const struct lu_env *env, struct mgs_device *mgs, - cfs_list_t *list); + struct list_head *list); #endif /* _MGS_INTERNAL_H */ diff --git a/lustre/mgs/mgs_llog.c b/lustre/mgs/mgs_llog.c index ae6124d..753dcfc 100644 --- a/lustre/mgs/mgs_llog.c +++ b/lustre/mgs/mgs_llog.c @@ -55,8 +55,9 @@ /********************** Class functions ********************/ +/* Find all logs in CONFIG directory and link then into list */ int class_dentry_readdir(const struct lu_env *env, - struct mgs_device *mgs, cfs_list_t *list) + struct mgs_device *mgs, struct list_head *log_list) { struct dt_object *dir = mgs->mgs_configs_dir; const struct dt_it_ops *iops; @@ -65,7 +66,7 @@ int class_dentry_readdir(const struct lu_env *env, char *key; int rc, key_sz; - CFS_INIT_LIST_HEAD(list); + INIT_LIST_HEAD(log_list); LASSERT(dir); LASSERT(dir->do_index_ops); @@ -105,10 +106,10 @@ int class_dentry_readdir(const struct lu_env *env, break; } - memcpy(de->name, key, key_sz); - de->name[key_sz] = 0; + memcpy(de->mde_name, key, key_sz); + de->mde_name[key_sz] = 0; - cfs_list_add(&de->list, list); + list_add(&de->mde_list, log_list); next: rc = iops->next(env, it); @@ -640,11 +641,7 @@ static int mgs_modify_handler(const struct lu_env *env, marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */ marker->cm_flags |= mml->mml_marker.cm_flags; marker->cm_canceltime = mml->mml_marker.cm_canceltime; - /* Header and tail are added back to lrh_len in - llog_lvfs_write_rec */ - rec->lrh_len = cfg_len; - rc = llog_write(env, llh, rec, NULL, 0, (void *)lcfg, - rec->lrh_index); + rc = llog_write(env, llh, rec, rec->lrh_index); if (!rc) mml->mml_modified++; } @@ -784,35 +781,12 @@ static int check_markers(struct lustre_cfg *lcfg, return 0; } -static int record_lcfg(const struct lu_env *env, struct llog_handle *llh, - struct lustre_cfg *lcfg) -{ - struct llog_rec_hdr rec; - int buflen, rc; - - if (!lcfg || !llh) - return -ENOMEM; - - LASSERT(llh->lgh_ctxt); - - buflen = lustre_cfg_len(lcfg->lcfg_bufcount, - lcfg->lcfg_buflens); - rec.lrh_len = llog_data_len(buflen); - rec.lrh_type = OBD_CFG_REC; - - /* idx = -1 means append */ - rc = llog_write(env, llh, &rec, NULL, 0, (void *)lcfg, -1); - if (rc) - CERROR("failed %d\n", rc); - return rc; -} - static int record_base(const struct lu_env *env, struct llog_handle *llh, char *cfgname, lnet_nid_t nid, int cmd, char *s1, char *s2, char *s3, char *s4) { - struct mgs_thread_info *mgi = mgs_env_info(env); - struct lustre_cfg *lcfg; + struct mgs_thread_info *mgi = mgs_env_info(env); + struct llog_cfg_rec *lcr; int rc; CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname, @@ -828,19 +802,19 @@ static int record_base(const struct lu_env *env, struct llog_handle *llh, if (s4) lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 4, s4); - lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs); - if (!lcfg) + lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs); + if (lcr == NULL) return -ENOMEM; - lcfg->lcfg_nid = nid; - rc = record_lcfg(env, llh, lcfg); + lcr->lcr_cfg.lcfg_nid = nid; + rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX); - lustre_cfg_free(lcfg); + lustre_cfg_rec_free(lcr); - if (rc) { - CERROR("error %d: lcfg %s %#x %s %s %s %s\n", rc, cfgname, - cmd, s1, s2, s3, s4); - } + if (rc < 0) + CDEBUG(D_MGS, + "failed to write lcfg %s %#x %s %s %s %s: rc = %d\n", + cfgname, cmd, s1, s2, s3, s4, rc); return rc; } @@ -997,7 +971,7 @@ static int mgs_replace_handler(const struct lu_env *env, RETURN(0); copy_out: /* Record is placed in temporary llog as is */ - rc = llog_write(env, mrul->temp_llh, rec, NULL, 0, NULL, -1); + rc = llog_write(env, mrul->temp_llh, rec, LLOG_NEXT_IDX); CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n", rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command, @@ -1291,35 +1265,36 @@ out: static int record_lov_setup(const struct lu_env *env, struct llog_handle *llh, char *devname, struct lov_desc *desc) { - struct mgs_thread_info *mgi = mgs_env_info(env); - struct lustre_cfg *lcfg; + struct mgs_thread_info *mgi = mgs_env_info(env); + struct llog_cfg_rec *lcr; int rc; lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname); lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc)); - lcfg = lustre_cfg_new(LCFG_SETUP, &mgi->mgi_bufs); - if (!lcfg) + lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs); + if (lcr == NULL) return -ENOMEM; - rc = record_lcfg(env, llh, lcfg); - lustre_cfg_free(lcfg); + rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX); + lustre_cfg_rec_free(lcr); return rc; } static int record_lmv_setup(const struct lu_env *env, struct llog_handle *llh, char *devname, struct lmv_desc *desc) { - struct mgs_thread_info *mgi = mgs_env_info(env); - struct lustre_cfg *lcfg; + struct mgs_thread_info *mgi = mgs_env_info(env); + struct llog_cfg_rec *lcr; int rc; lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname); lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc)); - lcfg = lustre_cfg_new(LCFG_SETUP, &mgi->mgi_bufs); - - rc = record_lcfg(env, llh, lcfg); + lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs); + if (lcr == NULL) + return -ENOMEM; - lustre_cfg_free(lcfg); + rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX); + lustre_cfg_rec_free(lcr); return rc; } @@ -1357,7 +1332,7 @@ static int record_marker(const struct lu_env *env, char *tgtname, char *comment) { struct mgs_thread_info *mgi = mgs_env_info(env); - struct lustre_cfg *lcfg; + struct llog_cfg_rec *lcr; int rc; int cplen = 0; @@ -1379,12 +1354,12 @@ static int record_marker(const struct lu_env *env, lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL); lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, &mgi->mgi_marker, sizeof(mgi->mgi_marker)); - lcfg = lustre_cfg_new(LCFG_MARKER, &mgi->mgi_bufs); - if (!lcfg) + lcr = lustre_cfg_rec_new(LCFG_MARKER, &mgi->mgi_bufs); + if (lcr == NULL) return -ENOMEM; - rc = record_lcfg(env, llh, lcfg); - lustre_cfg_free(lcfg); + rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX); + lustre_cfg_rec_free(lcr); return rc; } @@ -1436,25 +1411,23 @@ static int record_end_log(const struct lu_env *env, struct llog_handle **llh) /* write an lcfg directly into a log (with markers) */ static int mgs_write_log_direct(const struct lu_env *env, struct mgs_device *mgs, struct fs_db *fsdb, - char *logname, struct lustre_cfg *lcfg, - char *devname, char *comment) + char *logname, struct llog_cfg_rec *lcr, + char *devname, char *comment) { - struct llog_handle *llh = NULL; - int rc; - ENTRY; + struct llog_handle *llh = NULL; + int rc; - if (!lcfg) - RETURN(-ENOMEM); + ENTRY; rc = record_start_log(env, mgs, &llh, logname); - if (rc) - RETURN(rc); + if (rc) + RETURN(rc); /* FIXME These should be a single journal transaction */ rc = record_marker(env, llh, fsdb, CM_START, devname, comment); if (rc) GOTO(out_end, rc); - rc = record_lcfg(env, llh, lcfg); + rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX); if (rc) GOTO(out_end, rc); rc = record_marker(env, llh, fsdb, CM_END, devname, comment); @@ -1466,79 +1439,76 @@ out_end: } /* write the lcfg in all logs for the given fs */ -int mgs_write_log_direct_all(const struct lu_env *env, - struct mgs_device *mgs, - struct fs_db *fsdb, - struct mgs_target_info *mti, - struct lustre_cfg *lcfg, - char *devname, char *comment, - int server_only) +int mgs_write_log_direct_all(const struct lu_env *env, struct mgs_device *mgs, + struct fs_db *fsdb, struct mgs_target_info *mti, + struct llog_cfg_rec *lcr, char *devname, + char *comment, int server_only) { - cfs_list_t list; - struct mgs_direntry *dirent, *n; - char *fsname = mti->mti_fsname; - char *logname; - int rc = 0, len = strlen(fsname); - ENTRY; + struct list_head log_list; + struct mgs_direntry *dirent, *n; + char *fsname = mti->mti_fsname; + char *logname; + int rc = 0, len = strlen(fsname); - /* We need to set params for any future logs - as well. FIXME Append this file to every new log. - Actually, we should store as params (text), not llogs. Or - in a database. */ + ENTRY; + /* We need to set params for any future logs + * as well. + * FIXME Append this file to every new log. + * Actually, we should store as params (text), not llogs, + * or in a database. */ rc = name_create(&logname, fsname, "-params"); if (rc) RETURN(rc); if (mgs_log_is_empty(env, mgs, logname)) { struct llog_handle *llh = NULL; + rc = record_start_log(env, mgs, &llh, logname); if (rc == 0) record_end_log(env, &llh); - } - name_destroy(&logname); - if (rc) - RETURN(rc); + } + name_destroy(&logname); + if (rc) + RETURN(rc); - /* Find all the logs in the CONFIGS directory */ - rc = class_dentry_readdir(env, mgs, &list); + /* Find all the logs in the CONFIGS directory */ + rc = class_dentry_readdir(env, mgs, &log_list); if (rc) - RETURN(rc); + RETURN(rc); - /* Could use fsdb index maps instead of directory listing */ - cfs_list_for_each_entry_safe(dirent, n, &list, list) { - cfs_list_del(&dirent->list); - /* don't write to sptlrpc rule log */ - if (strstr(dirent->name, "-sptlrpc") != NULL) + /* Could use fsdb index maps instead of directory listing */ + list_for_each_entry_safe(dirent, n, &log_list, mde_list) { + list_del_init(&dirent->mde_list); + /* don't write to sptlrpc rule log */ + if (strstr(dirent->mde_name, "-sptlrpc") != NULL) goto next; /* caller wants write server logs only */ - if (server_only && strstr(dirent->name, "-client") != NULL) + if (server_only && strstr(dirent->mde_name, "-client") != NULL) goto next; - if (strncmp(fsname, dirent->name, len) == 0) { - CDEBUG(D_MGS, "Changing log %s\n", dirent->name); - /* Erase any old settings of this same parameter */ - rc = mgs_modify(env, mgs, fsdb, mti, dirent->name, - devname, comment, CM_SKIP); - if (rc < 0) - CERROR("%s: Can't modify llog %s: rc = %d\n", - mgs->mgs_obd->obd_name, dirent->name,rc); - /* Write the new one */ - if (lcfg) { - rc = mgs_write_log_direct(env, mgs, fsdb, - dirent->name, - lcfg, devname, - comment); - if (rc) - CERROR("%s: writing log %s: rc = %d\n", - mgs->mgs_obd->obd_name, - dirent->name, rc); - } - } + if (strncmp(fsname, dirent->mde_name, len) != 0) + goto next; + + CDEBUG(D_MGS, "Changing log %s\n", dirent->mde_name); + /* Erase any old settings of this same parameter */ + rc = mgs_modify(env, mgs, fsdb, mti, dirent->mde_name, + devname, comment, CM_SKIP); + if (rc < 0) + CERROR("%s: Can't modify llog %s: rc = %d\n", + mgs->mgs_obd->obd_name, dirent->mde_name, rc); + if (lcr == NULL) + goto next; + /* Write the new one */ + rc = mgs_write_log_direct(env, mgs, fsdb, dirent->mde_name, + lcr, devname, comment); + if (rc != 0) + CERROR("%s: writing log %s: rc = %d\n", + mgs->mgs_obd->obd_name, dirent->mde_name, rc); next: mgs_direntry_free(dirent); - } + } - RETURN(rc); + RETURN(rc); } static int mgs_write_log_osp_to_mdt(const struct lu_env *env, @@ -2721,7 +2691,7 @@ static int mgs_wlp_lcfg(const struct lu_env *env, { char comment[MTI_NAME_MAXLEN]; char *tmp; - struct lustre_cfg *lcfg; + struct llog_cfg_rec *lcr; int rc, del; /* Erase any old settings of this same parameter */ @@ -2729,7 +2699,7 @@ static int mgs_wlp_lcfg(const struct lu_env *env, comment[MTI_NAME_MAXLEN - 1] = 0; /* But don't try to match the value. */ tmp = strchr(comment, '='); - if (tmp) + if (tmp != NULL) *tmp = 0; /* FIXME we should skip settings that are the same as old values */ rc = mgs_modify(env, mgs, fsdb, mti, logname, tgtname, comment,CM_SKIP); @@ -2751,13 +2721,14 @@ static int mgs_wlp_lcfg(const struct lu_env *env, if (mti->mti_flags & LDD_F_PARAM2) lustre_cfg_bufs_set_string(bufs, 2, LCTL_UPCALL); - lcfg = lustre_cfg_new((mti->mti_flags & LDD_F_PARAM2) ? - LCFG_SET_PARAM : LCFG_PARAM, bufs); - - if (!lcfg) + lcr = lustre_cfg_rec_new((mti->mti_flags & LDD_F_PARAM2) ? + LCFG_SET_PARAM : LCFG_PARAM, bufs); + if (lcr == NULL) return -ENOMEM; - rc = mgs_write_log_direct(env, mgs, fsdb, logname,lcfg,tgtname,comment); - lustre_cfg_free(lcfg); + + rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr, tgtname, + comment); + lustre_cfg_rec_free(lcr); return rc; } @@ -2782,8 +2753,9 @@ static int mgs_write_log_sys(const struct lu_env *env, struct mgs_device *mgs, struct fs_db *fsdb, struct mgs_target_info *mti, char *sys, char *ptr) { - struct mgs_thread_info *mgi = mgs_env_info(env); - struct lustre_cfg *lcfg; + struct mgs_thread_info *mgi = mgs_env_info(env); + struct lustre_cfg *lcfg; + struct llog_cfg_rec *lcr; char *tmp, sep; int rc, cmd, convert = 1; @@ -2814,7 +2786,11 @@ static int mgs_write_log_sys(const struct lu_env *env, lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, sys); if (!convert && *tmp != '\0') lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, tmp); - lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs); + lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs); + if (lcr == NULL) + return -ENOMEM; + + lcfg = &lcr->lcr_cfg; lcfg->lcfg_num = convert ? simple_strtoul(tmp, NULL, 0) : 0; /* truncate the comment to the parameter name */ ptr = tmp - 1; @@ -2822,7 +2798,7 @@ static int mgs_write_log_sys(const struct lu_env *env, *ptr = '\0'; /* modify all servers and clients */ rc = mgs_write_log_direct_all(env, mgs, fsdb, mti, - *tmp == '\0' ? NULL : lcfg, + *tmp == '\0' ? NULL : lcr, mti->mti_fsname, sys, 0); if (rc == 0 && *tmp != '\0') { switch (cmd) { @@ -2839,7 +2815,7 @@ static int mgs_write_log_sys(const struct lu_env *env, } } *ptr = sep; - lustre_cfg_free(lcfg); + lustre_cfg_rec_free(lcr); return rc; } @@ -2849,7 +2825,7 @@ static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs, char *quota, char *ptr) { struct mgs_thread_info *mgi = mgs_env_info(env); - struct lustre_cfg *lcfg; + struct llog_cfg_rec *lcr; char *tmp; char sep; int rc, cmd = LCFG_PARAM; @@ -2876,7 +2852,10 @@ static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs, lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_fsname); lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, quota); - lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs); + lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs); + if (lcr == NULL) + return -ENOMEM; + /* truncate the comment to the parameter name */ ptr = tmp - 1; sep = *ptr; @@ -2887,10 +2866,10 @@ static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs, * log once we cleanup the config log for global param. */ /* modify all servers */ rc = mgs_write_log_direct_all(env, mgs, fsdb, mti, - *tmp == '\0' ? NULL : lcfg, + *tmp == '\0' ? NULL : lcr, mti->mti_fsname, quota, 1); *ptr = sep; - lustre_cfg_free(lcfg); + lustre_cfg_rec_free(lcr); return rc < 0 ? rc : 0; } @@ -2900,61 +2879,63 @@ static int mgs_srpc_set_param_disk(const struct lu_env *env, struct mgs_target_info *mti, char *param) { - struct mgs_thread_info *mgi = mgs_env_info(env); - struct llog_handle *llh = NULL; - char *logname; - char *comment, *ptr; - struct lustre_cfg *lcfg; - int rc, len; - ENTRY; + struct mgs_thread_info *mgi = mgs_env_info(env); + struct llog_cfg_rec *lcr; + struct llog_handle *llh = NULL; + char *logname; + char *comment, *ptr; + int rc, len; - /* get comment */ - ptr = strchr(param, '='); - LASSERT(ptr); - len = ptr - param; + ENTRY; - OBD_ALLOC(comment, len + 1); - if (comment == NULL) - RETURN(-ENOMEM); - strncpy(comment, param, len); - comment[len] = '\0'; + /* get comment */ + ptr = strchr(param, '='); + LASSERT(ptr != NULL); + len = ptr - param; - /* prepare lcfg */ + OBD_ALLOC(comment, len + 1); + if (comment == NULL) + RETURN(-ENOMEM); + strncpy(comment, param, len); + comment[len] = '\0'; + + /* prepare lcfg */ lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_svname); lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, param); - lcfg = lustre_cfg_new(LCFG_SPTLRPC_CONF, &mgi->mgi_bufs); - if (lcfg == NULL) - GOTO(out_comment, rc = -ENOMEM); + lcr = lustre_cfg_rec_new(LCFG_SPTLRPC_CONF, &mgi->mgi_bufs); + if (lcr == NULL) + GOTO(out_comment, rc = -ENOMEM); - /* construct log name */ - rc = name_create(&logname, mti->mti_fsname, "-sptlrpc"); - if (rc) - GOTO(out_lcfg, rc); + /* construct log name */ + rc = name_create(&logname, mti->mti_fsname, "-sptlrpc"); + if (rc < 0) + GOTO(out_lcfg, rc); if (mgs_log_is_empty(env, mgs, logname)) { rc = record_start_log(env, mgs, &llh, logname); - if (rc) - GOTO(out, rc); + if (rc < 0) + GOTO(out, rc); record_end_log(env, &llh); - } + } - /* obsolete old one */ + /* obsolete old one */ rc = mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname, comment, CM_SKIP); if (rc < 0) GOTO(out, rc); - /* write the new one */ - rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcfg, - mti->mti_svname, comment); + /* write the new one */ + rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr, + mti->mti_svname, comment); if (rc) - CERROR("err %d writing log %s\n", rc, logname); + CERROR("%s: error writing log %s: rc = %d\n", + mgs->mgs_obd->obd_name, logname, rc); out: - name_destroy(&logname); + name_destroy(&logname); out_lcfg: - lustre_cfg_free(lcfg); + lustre_cfg_rec_free(lcr); out_comment: - OBD_FREE(comment, len + 1); - RETURN(rc); + OBD_FREE(comment, len + 1); + RETURN(rc); } static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb, @@ -3142,8 +3123,7 @@ static int mgs_srpc_read_handler(const struct lu_env *env, RETURN(-EINVAL); } - cfg_len = rec->lrh_len - sizeof(struct llog_rec_hdr) - - sizeof(struct llog_rec_tail); + cfg_len = REC_DATA_LEN(rec); rc = lustre_cfg_sanity_check(lcfg, cfg_len); if (rc) { @@ -3569,27 +3549,26 @@ int mgs_check_failnid(const struct lu_env *env, struct mgs_device *mgs, mutex_lock(&fsdb->fsdb_mutex); rc = mgs_write_log_add_failnid(obd, fsdb, mti); mutex_unlock(&fsdb->fsdb_mutex); + char *buf, *params; + int rc = -EINVAL; RETURN(rc); #endif return 0; } -int mgs_write_log_target(const struct lu_env *env, - struct mgs_device *mgs, - struct mgs_target_info *mti, - struct fs_db *fsdb) +int mgs_write_log_target(const struct lu_env *env, struct mgs_device *mgs, + struct mgs_target_info *mti, struct fs_db *fsdb) { - int rc = -EINVAL; - char *buf, *params; - ENTRY; + char *buf, *params; + int rc = -EINVAL; - /* set/check the new target index */ + ENTRY; + + /* set/check the new target index */ rc = mgs_set_index(env, mgs, mti); - if (rc < 0) { - CERROR("Can't get index (%d)\n", rc); - RETURN(rc); - } + if (rc < 0) + RETURN(rc); if (rc == EALREADY) { LCONSOLE_WARN("Found index %d for %s, updating log\n", @@ -3685,14 +3664,14 @@ int mgs_erase_log(const struct lu_env *env, struct mgs_device *mgs, char *name) int mgs_erase_logs(const struct lu_env *env, struct mgs_device *mgs, char *fsname) { struct fs_db *fsdb; - cfs_list_t list; + struct list_head log_list; struct mgs_direntry *dirent, *n; int rc, len = strlen(fsname); char *suffix; ENTRY; /* Find all the logs in the CONFIGS directory */ - rc = class_dentry_readdir(env, mgs, &list); + rc = class_dentry_readdir(env, mgs, &log_list); if (rc) RETURN(rc); @@ -3705,15 +3684,15 @@ int mgs_erase_logs(const struct lu_env *env, struct mgs_device *mgs, char *fsnam mutex_unlock(&mgs->mgs_mutex); - cfs_list_for_each_entry_safe(dirent, n, &list, list) { - cfs_list_del(&dirent->list); - suffix = strrchr(dirent->name, '-'); + list_for_each_entry_safe(dirent, n, &log_list, mde_list) { + list_del_init(&dirent->mde_list); + suffix = strrchr(dirent->mde_name, '-'); if (suffix != NULL) { - if ((len == suffix - dirent->name) && - (strncmp(fsname, dirent->name, len) == 0)) { + if ((len == suffix - dirent->mde_name) && + (strncmp(fsname, dirent->mde_name, len) == 0)) { CDEBUG(D_MGS, "Removing log %s\n", - dirent->name); - mgs_erase_log(env, mgs, dirent->name); + dirent->mde_name); + mgs_erase_log(env, mgs, dirent->mde_name); } } mgs_direntry_free(dirent); @@ -3726,7 +3705,7 @@ int mgs_erase_logs(const struct lu_env *env, struct mgs_device *mgs, char *fsnam int mgs_list_logs(const struct lu_env *env, struct mgs_device *mgs, struct obd_ioctl_data *data) { - cfs_list_t list; + struct list_head log_list; struct mgs_direntry *dirent, *n; char *out, *suffix; int l, remains, rc; @@ -3734,21 +3713,18 @@ int mgs_list_logs(const struct lu_env *env, struct mgs_device *mgs, ENTRY; /* Find all the logs in the CONFIGS directory */ - rc = class_dentry_readdir(env, mgs, &list); - if (rc) { - CERROR("%s: can't read %s dir = %d\n", - mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc); + rc = class_dentry_readdir(env, mgs, &log_list); + if (rc) RETURN(rc); - } out = data->ioc_bulk; remains = data->ioc_inllen1; - cfs_list_for_each_entry_safe(dirent, n, &list, list) { - cfs_list_del(&dirent->list); - suffix = strrchr(dirent->name, '-'); + list_for_each_entry_safe(dirent, n, &log_list, mde_list) { + list_del_init(&dirent->mde_list); + suffix = strrchr(dirent->mde_name, '-'); if (suffix != NULL) { l = snprintf(out, remains, "config log: $%s\n", - dirent->name); + dirent->mde_name); out += l; remains -= l; } diff --git a/lustre/obdclass/llog.c b/lustre/obdclass/llog.c index 3a3f430..7146001 100644 --- a/lustre/obdclass/llog.c +++ b/lustre/obdclass/llog.c @@ -150,7 +150,7 @@ int llog_cancel_rec(const struct lu_env *env, struct llog_handle *loghandle, } spin_unlock(&loghandle->lgh_hdr_lock); - rc = llog_write(env, loghandle, &llh->llh_hdr, NULL, 0, NULL, 0); + rc = llog_write(env, loghandle, &llh->llh_hdr, LLOG_HEADER_IDX); if (rc < 0) { CERROR("%s: fail to write header for llog #"DOSTID "#%08x: rc = %d\n", @@ -703,7 +703,7 @@ EXPORT_SYMBOL(llog_declare_write_rec); int llog_write_rec(const struct lu_env *env, struct llog_handle *handle, struct llog_rec_hdr *rec, struct llog_cookie *logcookies, - int numcookies, void *buf, int idx, struct thandle *th) + int idx, struct thandle *th) { struct llog_operations *lop; int raised, rc, buflen; @@ -718,18 +718,13 @@ int llog_write_rec(const struct lu_env *env, struct llog_handle *handle, if (lop->lop_write_rec == NULL) RETURN(-EOPNOTSUPP); - if (buf) - buflen = rec->lrh_len + sizeof(struct llog_rec_hdr) + - sizeof(struct llog_rec_tail); - else - buflen = rec->lrh_len; + buflen = rec->lrh_len; LASSERT(cfs_size_round(buflen) == buflen); raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE); if (!raised) cfs_cap_raise(CFS_CAP_SYS_RESOURCE); - rc = lop->lop_write_rec(env, handle, rec, logcookies, numcookies, - buf, idx, th); + rc = lop->lop_write_rec(env, handle, rec, logcookies, idx, th); if (!raised) cfs_cap_lower(CFS_CAP_SYS_RESOURCE); RETURN(rc); @@ -738,7 +733,7 @@ EXPORT_SYMBOL(llog_write_rec); int llog_add(const struct lu_env *env, struct llog_handle *lgh, struct llog_rec_hdr *rec, struct llog_cookie *logcookies, - void *buf, struct thandle *th) + struct thandle *th) { int raised, rc; @@ -750,7 +745,7 @@ int llog_add(const struct lu_env *env, struct llog_handle *lgh, raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE); if (!raised) cfs_cap_raise(CFS_CAP_SYS_RESOURCE); - rc = lgh->lgh_logops->lop_add(env, lgh, rec, logcookies, buf, th); + rc = lgh->lgh_logops->lop_add(env, lgh, rec, logcookies, th); if (!raised) cfs_cap_lower(CFS_CAP_SYS_RESOURCE); RETURN(rc); @@ -856,8 +851,7 @@ EXPORT_SYMBOL(llog_erase); * Valid only with local llog. */ int llog_write(const struct lu_env *env, struct llog_handle *loghandle, - struct llog_rec_hdr *rec, struct llog_cookie *reccookie, - int cookiecount, void *buf, int idx) + struct llog_rec_hdr *rec, int idx) { struct dt_device *dt; struct thandle *th; @@ -884,8 +878,7 @@ int llog_write(const struct lu_env *env, struct llog_handle *loghandle, GOTO(out_trans, rc); down_write(&loghandle->lgh_lock); - rc = llog_write_rec(env, loghandle, rec, reccookie, - cookiecount, buf, idx, th); + rc = llog_write_rec(env, loghandle, rec, NULL, idx, th); up_write(&loghandle->lgh_lock); out_trans: dt_trans_stop(env, dt, th); @@ -981,7 +974,7 @@ int llog_copy_handler(const struct lu_env *env, struct llog_handle *llh, struct llog_handle *copy_llh = data; /* Append all records */ - return llog_write(env, copy_llh, rec, NULL, 0, NULL, -1); + return llog_write(env, copy_llh, rec, LLOG_NEXT_IDX); } EXPORT_SYMBOL(llog_copy_handler); diff --git a/lustre/obdclass/llog_cat.c b/lustre/obdclass/llog_cat.c index 4ec7794..e5cafa7 100644 --- a/lustre/obdclass/llog_cat.c +++ b/lustre/obdclass/llog_cat.c @@ -67,21 +67,10 @@ static int llog_cat_new_log(const struct lu_env *env, struct thandle *th) { struct llog_thread_info *lgi = llog_info(env); - struct llog_logid_rec *rec = &lgi->lgi_logid; - struct llog_log_hdr *llh; - int rc, index, bitmap_size; - ENTRY; - - llh = cathandle->lgh_hdr; - bitmap_size = LLOG_BITMAP_SIZE(llh); - - index = (cathandle->lgh_last_idx + 1) % bitmap_size; + struct llog_logid_rec *rec = &lgi->lgi_logid; + int rc; - /* maximum number of available slots in catlog is bitmap_size - 2 */ - if (llh->llh_cat_idx == index) { - CERROR("no free catalog slots for log...\n"); - RETURN(-ENOSPC); - } + ENTRY; if (OBD_FAIL_CHECK(OBD_FAIL_MDS_LLOG_CREATE_FAILED)) RETURN(-ENOSPC); @@ -97,46 +86,29 @@ static int llog_cat_new_log(const struct lu_env *env, } rc = llog_init_handle(env, loghandle, - LLOG_F_IS_PLAIN | LLOG_F_ZAP_WHEN_EMPTY, - &cathandle->lgh_hdr->llh_tgtuuid); - if (rc) - GOTO(out_destroy, rc); - - if (index == 0) - index = 1; - - spin_lock(&loghandle->lgh_hdr_lock); - llh->llh_count++; - if (ext2_set_bit(index, llh->llh_bitmap)) { - CERROR("argh, index %u already set in log bitmap?\n", - index); - spin_unlock(&loghandle->lgh_hdr_lock); - LBUG(); /* should never happen */ - } - spin_unlock(&loghandle->lgh_hdr_lock); - - cathandle->lgh_last_idx = index; - llh->llh_tail.lrt_index = index; + LLOG_F_IS_PLAIN | LLOG_F_ZAP_WHEN_EMPTY, + &cathandle->lgh_hdr->llh_tgtuuid); + if (rc) + GOTO(out_destroy, rc); - CDEBUG(D_RPCTRACE,"new recovery log "DOSTID":%x for index %u of catalog" - DOSTID"\n", POSTID(&loghandle->lgh_id.lgl_oi), - loghandle->lgh_id.lgl_ogen, index, - POSTID(&cathandle->lgh_id.lgl_oi)); /* build the record for this log in the catalog */ rec->lid_hdr.lrh_len = sizeof(*rec); - rec->lid_hdr.lrh_index = index; rec->lid_hdr.lrh_type = LLOG_LOGID_MAGIC; rec->lid_id = loghandle->lgh_id; - rec->lid_tail.lrt_len = sizeof(*rec); - rec->lid_tail.lrt_index = index; - /* update the catalog: header and record */ + /* append the new record into catalog. The new index will be + * assigned to the record and updated in rec header */ rc = llog_write_rec(env, cathandle, &rec->lid_hdr, - &loghandle->u.phd.phd_cookie, 1, NULL, index, th); + &loghandle->u.phd.phd_cookie, LLOG_NEXT_IDX, th); if (rc < 0) GOTO(out_destroy, rc); - loghandle->lgh_hdr->llh_cat_idx = index; + CDEBUG(D_OTHER, "new recovery log "DOSTID":%x for index %u of catalog" + DOSTID"\n", POSTID(&loghandle->lgh_id.lgl_oi), + loghandle->lgh_id.lgl_ogen, rec->lid_hdr.lrh_index, + POSTID(&cathandle->lgh_id.lgl_oi)); + + loghandle->lgh_hdr->llh_cat_idx = rec->lid_hdr.lrh_index; RETURN(0); out_destroy: llog_destroy(env, loghandle); @@ -331,7 +303,7 @@ static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle, */ int llog_cat_add_rec(const struct lu_env *env, struct llog_handle *cathandle, struct llog_rec_hdr *rec, struct llog_cookie *reccookie, - void *buf, struct thandle *th) + struct thandle *th) { struct llog_handle *loghandle; int rc; @@ -350,7 +322,7 @@ int llog_cat_add_rec(const struct lu_env *env, struct llog_handle *cathandle, } } /* now let's try to add the record */ - rc = llog_write_rec(env, loghandle, rec, reccookie, 1, buf, -1, th); + rc = llog_write_rec(env, loghandle, rec, reccookie, LLOG_NEXT_IDX, th); if (rc < 0) CDEBUG_LIMIT(rc == -ENOSPC ? D_HA : D_ERROR, "llog_write_rec %d: lh=%p\n", rc, loghandle); @@ -368,8 +340,8 @@ int llog_cat_add_rec(const struct lu_env *env, struct llog_handle *cathandle, } } /* now let's try to add the record */ - rc = llog_write_rec(env, loghandle, rec, reccookie, 1, buf, - -1, th); + rc = llog_write_rec(env, loghandle, rec, reccookie, + LLOG_NEXT_IDX, th); if (rc < 0) CERROR("llog_write_rec %d: lh=%p\n", rc, loghandle); up_write(&loghandle->lgh_lock); @@ -454,8 +426,7 @@ out: EXPORT_SYMBOL(llog_cat_declare_add_rec); int llog_cat_add(const struct lu_env *env, struct llog_handle *cathandle, - struct llog_rec_hdr *rec, struct llog_cookie *reccookie, - void *buf) + struct llog_rec_hdr *rec, struct llog_cookie *reccookie) { struct llog_ctxt *ctxt; struct dt_device *dt; @@ -480,7 +451,7 @@ int llog_cat_add(const struct lu_env *env, struct llog_handle *cathandle, rc = dt_trans_start_local(env, dt, th); if (rc) GOTO(out_trans, rc); - rc = llog_cat_add_rec(env, cathandle, rec, reccookie, buf, th); + rc = llog_cat_add_rec(env, cathandle, rec, reccookie, th); out_trans: dt_trans_stop(env, dt, th); RETURN(rc); diff --git a/lustre/obdclass/llog_internal.h b/lustre/obdclass/llog_internal.h index f90be39..5bf04931 100644 --- a/lustre/obdclass/llog_internal.h +++ b/lustre/obdclass/llog_internal.h @@ -56,8 +56,6 @@ struct llog_thread_info { struct dt_object_format lgi_dof; struct lu_buf lgi_buf; loff_t lgi_off; - struct llog_rec_hdr lgi_lrh; - struct llog_rec_tail lgi_tail; struct llog_logid_rec lgi_logid; }; diff --git a/lustre/obdclass/llog_osd.c b/lustre/obdclass/llog_osd.c index fd9bfc36..6c80c73 100644 --- a/lustre/obdclass/llog_osd.c +++ b/lustre/obdclass/llog_osd.c @@ -15,25 +15,28 @@ * * You should have received a copy of the GNU General Public License * version 2 along with this program; If not, see - * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf - * - * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, - * CA 95054 USA or visit www.sun.com if you need additional information or - * have any questions. + * http://www.gnu.org/licenses/gpl-2.0.html * * GPL HEADER END */ /* - * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2012, 2013, Intel Corporation. + * Copyright (c) 2012, 2014 Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ * Lustre is a trademark of Sun Microsystems, Inc. + */ +/* + * lustre/obdclass/llog_osd.c + * + * Low level llog routines on top of OSD API * - * lustre/obdclass/llog_osd.c - low level llog routines on top of OSD API + * This file provides set of methods for llog operations on top of + * dt_device. It contains all supported llog_operations interfaces and + * supplimental functions. * * Author: Alexey Zhuravlev * Author: Mikhail Pershin @@ -49,14 +52,20 @@ #include "llog_internal.h" #include "local_storage.h" -/* - * - multi-chunks or big-declaration approach - * - use unique sequence instead of llog sb tracking unique ids - * - re-use existing environment - * - named llog support (can be used for testing only at the present) - * - llog_origin_connect() work with OSD API +/** + * Implementation of the llog_operations::lop_declare_create + * + * This function is a wrapper over local_storage API function + * local_object_declare_create(). + * + * \param[in] env execution environment + * \param[in] los local_storage for bottom storage device + * \param[in] o dt_object to create + * \param[in] th current transaction handle + * + * \retval 0 on successful declaration of the new object + * \retval negative error if declaration was failed */ - static int llog_osd_declare_new_object(const struct lu_env *env, struct local_oid_storage *los, struct dt_object *o, @@ -72,6 +81,20 @@ static int llog_osd_declare_new_object(const struct lu_env *env, &lgi->lgi_dof, th); } +/** + * Implementation of the llog_operations::lop_create + * + * This function is a wrapper over local_storage API function + * local_object_create(). + * + * \param[in] env execution environment + * \param[in] los local_storage for bottom storage device + * \param[in] o dt_object to create + * \param[in] th current transaction handle + * + * \retval 0 on successful creation of the new object + * \retval negative error if creation was failed + */ static int llog_osd_create_new_object(const struct lu_env *env, struct local_oid_storage *los, struct dt_object *o, @@ -87,10 +110,34 @@ static int llog_osd_create_new_object(const struct lu_env *env, &lgi->lgi_dof, th); } +/** + * Write a padding record to the llog + * + * This function writes a padding record to the end of llog. That may + * be needed if llog contains records of variable size, e.g. config logs + * or changelogs. + * The padding record just aligns llog to the LLOG_CHUNK_SIZE boundary if + * the current record doesn't fit in the remaining space. + * + * It allocates full length to avoid two separate writes for header and tail. + * Such 2-steps scheme needs extra protection and complex error handling. + * + * \param[in] env execution environment + * \param[in] o dt_object to create + * \param[in,out] off pointer to the padding start offset + * \param[in] len padding length + * \param[in] index index of the padding record in a llog + * \param[in] th current transaction handle + * + * \retval 0 on successful padding write + * \retval negative error if write failed + */ static int llog_osd_pad(const struct lu_env *env, struct dt_object *o, loff_t *off, int len, int index, struct thandle *th) { struct llog_thread_info *lgi = llog_info(env); + struct llog_rec_hdr *rec; + struct llog_rec_tail *tail; int rc; ENTRY; @@ -99,109 +146,41 @@ static int llog_osd_pad(const struct lu_env *env, struct dt_object *o, LASSERT(off); LASSERT(len >= LLOG_MIN_REC_SIZE && (len & 0x7) == 0); - lgi->lgi_tail.lrt_len = lgi->lgi_lrh.lrh_len = len; - lgi->lgi_tail.lrt_index = lgi->lgi_lrh.lrh_index = index; - lgi->lgi_lrh.lrh_type = LLOG_PAD_MAGIC; + OBD_ALLOC(rec, len); + if (rec == NULL) + RETURN(-ENOMEM); - lgi->lgi_buf.lb_buf = &lgi->lgi_lrh; - lgi->lgi_buf.lb_len = sizeof(lgi->lgi_lrh); - dt_write_lock(env, o, 0); - rc = dt_record_write(env, o, &lgi->lgi_buf, off, th); - if (rc) { - CERROR("%s: error writing padding record: rc = %d\n", - o->do_lu.lo_dev->ld_obd->obd_name, rc); - GOTO(out, rc); - } - - lgi->lgi_buf.lb_buf = &lgi->lgi_tail; - lgi->lgi_buf.lb_len = sizeof(lgi->lgi_tail); - *off += len - sizeof(lgi->lgi_lrh) - sizeof(lgi->lgi_tail); - rc = dt_record_write(env, o, &lgi->lgi_buf, off, th); - if (rc) - CERROR("%s: error writing padding record: rc = %d\n", - o->do_lu.lo_dev->ld_obd->obd_name, rc); -out: - dt_write_unlock(env, o); - RETURN(rc); -} - -static int llog_osd_write_blob(const struct lu_env *env, struct dt_object *o, - struct llog_rec_hdr *rec, void *buf, - loff_t *off, struct thandle *th) -{ - struct llog_thread_info *lgi = llog_info(env); - int buflen = rec->lrh_len; - int rc; - - ENTRY; - - LASSERT(env); - LASSERT(o); - - if (buflen == 0) - CWARN("0-length record\n"); - - CDEBUG(D_OTHER, "write blob with type %x, buf %p/%u at off %llu\n", - rec->lrh_type, buf, buflen, *off); - - lgi->lgi_attr.la_valid = LA_SIZE; - lgi->lgi_attr.la_size = *off; + rec->lrh_len = len; + rec->lrh_index = index; + rec->lrh_type = LLOG_PAD_MAGIC; - if (!buf) { - lgi->lgi_buf.lb_len = buflen; - lgi->lgi_buf.lb_buf = rec; - rc = dt_record_write(env, o, &lgi->lgi_buf, off, th); - if (rc) - CERROR("%s: error writing log record: rc = %d\n", - o->do_lu.lo_dev->ld_obd->obd_name, rc); - GOTO(out, rc); - } + tail = rec_tail(rec); + tail->lrt_len = len; + tail->lrt_index = index; - /* the buf case */ - /* protect the following 3 writes from concurrent read */ - dt_write_lock(env, o, 0); - rec->lrh_len = sizeof(*rec) + buflen + sizeof(lgi->lgi_tail); - lgi->lgi_buf.lb_len = sizeof(*rec); lgi->lgi_buf.lb_buf = rec; - rc = dt_record_write(env, o, &lgi->lgi_buf, off, th); - if (rc) { - CERROR("%s: error writing log hdr: rc = %d\n", - o->do_lu.lo_dev->ld_obd->obd_name, rc); - GOTO(out_unlock, rc); - } - - lgi->lgi_buf.lb_len = buflen; - lgi->lgi_buf.lb_buf = buf; - rc = dt_record_write(env, o, &lgi->lgi_buf, off, th); - if (rc) { - CERROR("%s: error writing log buffer: rc = %d\n", - o->do_lu.lo_dev->ld_obd->obd_name, rc); - GOTO(out_unlock, rc); - } - - lgi->lgi_tail.lrt_len = rec->lrh_len; - lgi->lgi_tail.lrt_index = rec->lrh_index; - lgi->lgi_buf.lb_len = sizeof(lgi->lgi_tail); - lgi->lgi_buf.lb_buf = &lgi->lgi_tail; + lgi->lgi_buf.lb_len = len; rc = dt_record_write(env, o, &lgi->lgi_buf, off, th); if (rc) - CERROR("%s: error writing log tail: rc = %d\n", + CERROR("%s: error writing padding record: rc = %d\n", o->do_lu.lo_dev->ld_obd->obd_name, rc); -out_unlock: - dt_write_unlock(env, o); - -out: - /* cleanup the content written above */ - if (rc) { - dt_punch(env, o, lgi->lgi_attr.la_size, OBD_OBJECT_EOF, th, - BYPASS_CAPA); - dt_attr_set(env, o, &lgi->lgi_attr, th, BYPASS_CAPA); - } - + OBD_FREE(rec, len); RETURN(rc); } +/** + * Implementation of the llog_operations::lop_read_header + * + * This function reads the current llog header from the bottom storage + * device. + * + * \param[in] env execution environment + * \param[in] handle llog handle of the current llog + * + * \retval 0 on successful header read + * \retval negative error if read failed + */ static int llog_osd_read_header(const struct lu_env *env, struct llog_handle *handle) { @@ -269,6 +248,25 @@ static int llog_osd_read_header(const struct lu_env *env, RETURN(0); } +/** + * Implementation of the llog_operations::lop_declare_write + * + * This function declares the new record write. + * + * \param[in] env execution environment + * \param[in] loghandle llog handle of the current llog + * \param[in] rec llog record header. This is a real header of the full + * llog record to write. This is the beginning of buffer + * to write, the length of buffer is stored in + * \a rec::lrh_len + * \param[in] idx index of the llog record. If \a idx == -1 then this is + * append case, otherwise \a idx is the index of record + * to modify + * \param[in] th current transaction handle + * + * \retval 0 on successful declaration + * \retval negative error if declaration failed + */ static int llog_osd_declare_write_rec(const struct lu_env *env, struct llog_handle *loghandle, struct llog_rec_hdr *rec, @@ -297,21 +295,11 @@ static int llog_osd_declare_write_rec(const struct lu_env *env, if (rc || idx == 0) /* if error or just header */ RETURN(rc); - if (dt_object_exists(o)) { - rc = dt_attr_get(env, o, &lgi->lgi_attr, BYPASS_CAPA); - lgi->lgi_off = lgi->lgi_attr.la_size; - LASSERT(ergo(rc == 0, lgi->lgi_attr.la_valid & LA_SIZE)); - if (rc) - RETURN(rc); - - rc = dt_declare_punch(env, o, lgi->lgi_off, OBD_OBJECT_EOF, th); - if (rc) - RETURN(rc); - } else { - lgi->lgi_off = 0; - } - - lgi->lgi_buf.lb_len = rec->lrh_len; + /** + * the pad record can be inserted so take into account double + * record size + */ + lgi->lgi_buf.lb_len = rec->lrh_len * 2; lgi->lgi_buf.lb_buf = NULL; /* XXX: implement declared window or multi-chunks approach */ rc = dt_declare_record_write(env, o, &lgi->lgi_buf, -1, th); @@ -319,21 +307,43 @@ static int llog_osd_declare_write_rec(const struct lu_env *env, RETURN(rc); } -/* returns negative in on error; 0 if success && reccookie == 0; 1 otherwise */ -/* appends if idx == -1, otherwise overwrites record idx. */ +/** + * Implementation of the llog_operations::lop_write + * + * This function writes the new record in the llog or modify the existed one. + * + * \param[in] env execution environment + * \param[in] loghandle llog handle of the current llog + * \param[in] rec llog record header. This is a real header of + * the full llog record to write. This is + * the beginning of buffer to write, the length + * of buffer is stored in \a rec::lrh_len + * \param[out] reccookie pointer to the cookie to return back if needed. + * It is used for further cancel of this llog + * record. + * \param[in] idx index of the llog record. If \a idx == -1 then + * this is append case, otherwise \a idx is + * the index of record to modify + * \param[in] th current transaction handle + * + * \retval 0 on successful write && \a reccookie == NULL + * 1 on successful write && \a reccookie != NULL + * \retval negative error if write failed + */ static int llog_osd_write_rec(const struct lu_env *env, struct llog_handle *loghandle, struct llog_rec_hdr *rec, - struct llog_cookie *reccookie, int cookiecount, - void *buf, int idx, struct thandle *th) + struct llog_cookie *reccookie, + int idx, struct thandle *th) { struct llog_thread_info *lgi = llog_info(env); struct llog_log_hdr *llh; int reclen = rec->lrh_len; - int index, rc, old_tail_idx; + int index, rc; struct llog_rec_tail *lrt; struct dt_object *o; size_t left; + bool header_is_updated = false; ENTRY; @@ -348,68 +358,100 @@ static int llog_osd_write_rec(const struct lu_env *env, rec->lrh_type, PFID(lu_object_fid(&o->do_lu))); /* record length should not bigger than LLOG_CHUNK_SIZE */ - if (buf) - rc = (reclen > LLOG_CHUNK_SIZE - sizeof(struct llog_rec_hdr) - - sizeof(struct llog_rec_tail)) ? -E2BIG : 0; - else - rc = (reclen > LLOG_CHUNK_SIZE) ? -E2BIG : 0; - if (rc) - RETURN(rc); + if (reclen > LLOG_CHUNK_SIZE) + RETURN(-E2BIG); rc = dt_attr_get(env, o, &lgi->lgi_attr, NULL); if (rc) RETURN(rc); - if (buf) - /* write_blob adds header and tail to lrh_len. */ - reclen = sizeof(*rec) + rec->lrh_len + - sizeof(struct llog_rec_tail); - - if (idx != -1) { - /* no header: only allowed to insert record 1 */ - if (idx != 1 && lgi->lgi_attr.la_size == 0) - LBUG(); - - if (idx && llh->llh_size && llh->llh_size != rec->lrh_len) - RETURN(-EINVAL); + /** + * The modification case. + * If idx set then the record with that index must be modified. + * There are three cases possible: + * 1) the common case is the llog header update (idx == 0) + * 2) the llog record modification during llog process. + * This is indicated by the \a loghandle::lgh_cur_idx > 0. + * In that case the \a loghandle::lgh_cur_offset + * 3) otherwise this is assumed that llog consist of records of + * fixed size, i.e. catalog. The llog header must has llh_size + * field equal to record size. The record offset is calculated + * just by /a idx value + * + * During modification we don't need extra header update because + * the bitmap and record count are not changed. The record header + * and tail remains the same too. + */ + if (idx != LLOG_NEXT_IDX) { + /* llog can be empty only when first record is being written */ + LASSERT(ergo(idx > 0, lgi->lgi_attr.la_size > 0)); - if (!ext2_test_bit(idx, llh->llh_bitmap)) + if (!ext2_test_bit(idx, llh->llh_bitmap)) { CERROR("%s: modify unset record %u\n", o->do_lu.lo_dev->ld_obd->obd_name, idx); - if (idx != rec->lrh_index) - CERROR("%s: index mismatch %d %u\n", + RETURN(-ENOENT); + } + + if (idx != rec->lrh_index) { + CERROR("%s: modify index mismatch %d %u\n", o->do_lu.lo_dev->ld_obd->obd_name, idx, rec->lrh_index); + RETURN(-EFAULT); + } - lgi->lgi_off = 0; - rc = llog_osd_write_blob(env, o, &llh->llh_hdr, NULL, - &lgi->lgi_off, th); - /* we are done if we only write the header or on error */ - if (rc || idx == 0) - RETURN(rc); + if (idx == LLOG_HEADER_IDX) { + /* llog header update */ + LASSERT(reclen == sizeof(struct llog_log_hdr)); + LASSERT(rec == &llh->llh_hdr); - if (buf) { - /* We assume that caller has set lgh_cur_* */ - lgi->lgi_off = loghandle->lgh_cur_offset; - CDEBUG(D_OTHER, - "modify record "DOSTID": idx:%d/%u/%d, len:%u " - "offset %llu\n", - POSTID(&loghandle->lgh_id.lgl_oi), idx, - rec->lrh_index, - loghandle->lgh_cur_idx, rec->lrh_len, - (long long)(lgi->lgi_off - sizeof(*llh))); - if (rec->lrh_index != loghandle->lgh_cur_idx) { - CERROR("%s: modify idx mismatch %u/%d\n", + lgi->lgi_off = 0; + lgi->lgi_buf.lb_len = reclen; + lgi->lgi_buf.lb_buf = rec; + rc = dt_record_write(env, o, &lgi->lgi_buf, + &lgi->lgi_off, th); + RETURN(rc); + } else if (loghandle->lgh_cur_idx > 0) { + /** + * The lgh_cur_offset can be used only if index is + * the same. + */ + if (idx != loghandle->lgh_cur_idx) { + CERROR("%s: modify index mismatch %d %d\n", o->do_lu.lo_dev->ld_obd->obd_name, idx, loghandle->lgh_cur_idx); RETURN(-EFAULT); } - } else { - /* Assumes constant lrh_len */ + + lgi->lgi_off = loghandle->lgh_cur_offset; + CDEBUG(D_OTHER, "modify record "DOSTID": idx:%d, " + "len:%u offset %llu\n", + POSTID(&loghandle->lgh_id.lgl_oi), idx, + rec->lrh_len, (long long)lgi->lgi_off); + } else if (llh->llh_size > 0) { + if (llh->llh_size != rec->lrh_len) { + CERROR("%s: wrong record size, llh_size is %u" + " but record size is %u\n", + o->do_lu.lo_dev->ld_obd->obd_name, + llh->llh_size, rec->lrh_len); + RETURN(-EINVAL); + } lgi->lgi_off = sizeof(*llh) + (idx - 1) * reclen; + } else { + /* This can be result of lgh_cur_idx is not set during + * llog processing or llh_size is not set to proper + * record size for fixed records llog. Therefore it is + * impossible to get record offset. */ + CERROR("%s: can't get record offset, idx:%d, " + "len:%u.\n", o->do_lu.lo_dev->ld_obd->obd_name, + idx, rec->lrh_len); + RETURN(-EFAULT); } - rc = llog_osd_write_blob(env, o, rec, buf, &lgi->lgi_off, th); + /* update only data, header and tail remain the same */ + lgi->lgi_off += sizeof(struct llog_rec_hdr); + lgi->lgi_buf.lb_len = REC_DATA_LEN(rec); + lgi->lgi_buf.lb_buf = REC_DATA(rec); + rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th); if (rc == 0 && reccookie) { reccookie->lgc_lgl = loghandle->lgh_id; reccookie->lgc_index = idx; @@ -418,12 +460,15 @@ static int llog_osd_write_rec(const struct lu_env *env, RETURN(rc); } - /* Make sure that records don't cross a chunk boundary, so we can + /** + * The append case. + * The most common case of using llog. The new index is assigned to + * the new record, new bit is set in llog bitmap and llog count is + * incremented. + * + * Make sure that records don't cross a chunk boundary, so we can * process them page-at-a-time if needed. If it will cross a chunk * boundary, write in a fake (but referenced) entry to pad the chunk. - * - * We know that llog_current_log() will return a loghandle that is - * big enough to hold reclen, so all we care about is padding here. */ LASSERT(lgi->lgi_attr.la_valid & LA_SIZE); lgi->lgi_off = lgi->lgi_attr.la_size; @@ -435,25 +480,30 @@ static int llog_osd_write_rec(const struct lu_env *env, rc = llog_osd_pad(env, o, &lgi->lgi_off, left, index, th); if (rc) RETURN(rc); - loghandle->lgh_last_idx++; /*for pad rec*/ + loghandle->lgh_last_idx++; /* for pad rec */ } /* if it's the last idx in log file, then return -ENOSPC */ if (loghandle->lgh_last_idx >= LLOG_BITMAP_SIZE(llh) - 1) RETURN(-ENOSPC); + /* increment the last_idx along with llh_tail index, they should + * be equal for a llog lifetime */ loghandle->lgh_last_idx++; index = loghandle->lgh_last_idx; + llh->llh_tail.lrt_index = index; + /** + * NB: the caller should make sure only 1 process access + * the lgh_last_idx, e.g. append should be exclusive. + * Otherwise it might hit the assert. + */ LASSERT(index < LLOG_BITMAP_SIZE(llh)); rec->lrh_index = index; - if (buf == NULL) { - lrt = (struct llog_rec_tail *)((char *)rec + rec->lrh_len - - sizeof(*lrt)); - lrt->lrt_len = rec->lrh_len; - lrt->lrt_index = rec->lrh_index; - } - /* The caller should make sure only 1 process access the lgh_last_idx, - * Otherwise it might hit the assert.*/ - LASSERT(index < LLOG_BITMAP_SIZE(llh)); + lrt = rec_tail(rec); + lrt->lrt_len = rec->lrh_len; + lrt->lrt_index = rec->lrh_index; + + /* the lgh_hdr_lock protects llog header data from concurrent + * update/cancel, the llh_count and llh_bitmap are protected */ spin_lock(&loghandle->lgh_hdr_lock); if (ext2_set_bit(index, llh->llh_bitmap)) { CERROR("%s: index %u already set in log bitmap\n", @@ -463,43 +513,30 @@ static int llog_osd_write_rec(const struct lu_env *env, } llh->llh_count++; spin_unlock(&loghandle->lgh_hdr_lock); - old_tail_idx = llh->llh_tail.lrt_index; - llh->llh_tail.lrt_index = index; lgi->lgi_off = 0; - rc = llog_osd_write_blob(env, o, &llh->llh_hdr, NULL, &lgi->lgi_off, - th); + lgi->lgi_buf.lb_len = llh->llh_hdr.lrh_len; + lgi->lgi_buf.lb_buf = &llh->llh_hdr; + rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th); if (rc) GOTO(out, rc); + header_is_updated = true; rc = dt_attr_get(env, o, &lgi->lgi_attr, NULL); if (rc) GOTO(out, rc); LASSERT(lgi->lgi_attr.la_valid & LA_SIZE); lgi->lgi_off = lgi->lgi_attr.la_size; + lgi->lgi_buf.lb_len = reclen; + lgi->lgi_buf.lb_buf = rec; + rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th); + if (rc < 0) + GOTO(out, rc); - rc = llog_osd_write_blob(env, o, rec, buf, &lgi->lgi_off, th); - -out: - /* cleanup llog for error case */ - if (rc) { - spin_lock(&loghandle->lgh_hdr_lock); - ext2_clear_bit(index, llh->llh_bitmap); - llh->llh_count--; - spin_unlock(&loghandle->lgh_hdr_lock); - - /* restore the header */ - loghandle->lgh_last_idx--; - llh->llh_tail.lrt_index = old_tail_idx; - lgi->lgi_off = 0; - llog_osd_write_blob(env, o, &llh->llh_hdr, NULL, - &lgi->lgi_off, th); - } - - CDEBUG(D_RPCTRACE, "added record "DOSTID": idx: %u, %u\n", + CDEBUG(D_OTHER, "added record "DOSTID": idx: %u, %u\n", POSTID(&loghandle->lgh_id.lgl_oi), index, rec->lrh_len); - if (rc == 0 && reccookie) { + if (reccookie != NULL) { reccookie->lgc_lgl = loghandle->lgh_id; reccookie->lgc_index = index; if ((rec->lrh_type == MDS_UNLINK_REC) || @@ -512,15 +549,36 @@ out: rc = 1; } RETURN(rc); +out: + /* cleanup llog for error case */ + spin_lock(&loghandle->lgh_hdr_lock); + ext2_clear_bit(index, llh->llh_bitmap); + llh->llh_count--; + spin_unlock(&loghandle->lgh_hdr_lock); + + /* restore llog last_idx */ + loghandle->lgh_last_idx--; + llh->llh_tail.lrt_index = loghandle->lgh_last_idx; + + /* restore the header on disk if it was written */ + if (header_is_updated) { + lgi->lgi_off = 0; + lgi->lgi_buf.lb_len = llh->llh_hdr.lrh_len; + lgi->lgi_buf.lb_buf = &llh->llh_hdr; + dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th); + } + + RETURN(rc); } -/* We can skip reading at least as many log blocks as the number of +/** + * We can skip reading at least as many log blocks as the number of * minimum sized log records we are skipping. If it turns out * that we are not far enough along the log (because the * actual records are larger than minimum size) we just skip * some more records. */ -static void llog_skip_over(__u64 *off, int curr, int goal) +static inline void llog_skip_over(__u64 *off, int curr, int goal) { if (goal <= curr) return; @@ -528,10 +586,23 @@ static void llog_skip_over(__u64 *off, int curr, int goal) ~(LLOG_CHUNK_SIZE - 1); } -/* sets: - * - cur_offset to the furthest point read in the log file - * - cur_idx to the log index preceeding cur_offset - * returns -EIO/-EINVAL on error +/** + * Implementation of the llog_operations::lop_next_block + * + * This function finds the the next llog block to return which contains + * record with required index. It is main part of llog processing. + * + * \param[in] env execution environment + * \param[in] loghandle llog handle of the current llog + * \param[in,out] cur_idx index preceeding cur_offset + * \param[in] next_idx target index to find + * \param[in,out] cur_offset furtherst point read in the file + * \param[in] buf pointer to data buffer to fill + * \param[in] len required len to read, it is + * LLOG_CHUNK_SIZE usually. + * + * \retval 0 on successful buffer read + * \retval negative value on error */ static int llog_osd_next_block(const struct lu_env *env, struct llog_handle *loghandle, int *cur_idx, @@ -578,18 +649,7 @@ static int llog_osd_next_block(const struct lu_env *env, (*cur_offset & (LLOG_CHUNK_SIZE - 1)); lgi->lgi_buf.lb_buf = buf; - /* Note: read lock is not needed around la_size get above at - * the time of dt_attr_get(). There are only two cases that - * matter. Either la_size == cur_offset, in which case the - * entire read is skipped, or la_size > cur_offset and the loop - * is entered and this thread is blocked at dt_read_lock() - * until the write is completed. When the write completes, then - * the dt_read() will be done with the full length, and will - * get the full data. - */ - dt_read_lock(env, o, 0); rc = dt_read(env, o, &lgi->lgi_buf, cur_offset); - dt_read_unlock(env, o); if (rc < 0) { CERROR("%s: can't read llog block from log "DFID " offset "LPU64": rc = %d\n", @@ -660,6 +720,23 @@ out: return rc; } +/** + * Implementation of the llog_operations::lop_prev_block + * + * This function finds the llog block to return which contains + * record with required index but in reverse order - from end of llog + * to the beginning. + * It is main part of reverse llog processing. + * + * \param[in] env execution environment + * \param[in] loghandle llog handle of the current llog + * \param[in] prev_idx target index to find + * \param[in] buf pointer to data buffer to fill + * \param[in] len required len to read, it is LLOG_CHUNK_SIZE usually. + * + * \retval 0 on successful buffer read + * \retval negative value on error + */ static int llog_osd_prev_block(const struct lu_env *env, struct llog_handle *loghandle, int prev_idx, void *buf, int len) @@ -699,12 +776,7 @@ static int llog_osd_prev_block(const struct lu_env *env, lgi->lgi_buf.lb_len = len; lgi->lgi_buf.lb_buf = buf; - /* It is OK to have locking around dt_read() only, see - * comment in llog_osd_next_block for details - */ - dt_read_lock(env, o, 0); rc = dt_read(env, o, &lgi->lgi_buf, &cur_offset); - dt_read_unlock(env, o); if (rc < 0) { CERROR("%s: can't read llog block from log "DFID " offset "LPU64": rc = %d\n", @@ -766,6 +838,16 @@ out: return rc; } +/** + * This is helper function to get llog directory object. It is used by named + * llog operations to find/insert/delete llog entry from llog directory. + * + * \param[in] env execution environment + * \param[in] ctxt llog context + * + * \retval dt_object of llog directory + * \retval ERR_PTR of negative value on error + */ struct dt_object *llog_osd_dir_get(const struct lu_env *env, struct llog_ctxt *ctxt) { @@ -793,6 +875,27 @@ struct dt_object *llog_osd_dir_get(const struct lu_env *env, return dir; } +/** + * Implementation of the llog_operations::lop_open + * + * This function opens the llog by its logid or by name, it may open also + * non existent llog and assing then new id to it. + * The llog_open/llog_close pair works similar to lu_object_find/put, + * the object may not exist prior open. The result of open is just dt_object + * in the llog header. + * + * \param[in] env execution environment + * \param[in] handle llog handle of the current llog + * \param[in] logid logid of llog to open (nameless llog) + * \param[in] name name of llog to open (named llog) + * \param[in] open_param + * LLOG_OPEN_NEW - new llog, may not exist + * LLOG_OPEN_EXIST - old llog, must exist + * + * \retval 0 on successful open, llog_handle::lgh_obj + * contains the dt_object of the llog. + * \retval negative value on error + */ static int llog_osd_open(const struct lu_env *env, struct llog_handle *handle, struct llog_logid *logid, char *name, enum llog_open_param open_param) @@ -883,6 +986,16 @@ out: RETURN(rc); } +/** + * Implementation of the llog_operations::lop_exist + * + * This function checks that llog exists on storage. + * + * \param[in] handle llog handle of the current llog + * + * \retval true if llog object exists and is not just destroyed + * \retval false if llog doesn't exist or just destroyed + */ static int llog_osd_exist(struct llog_handle *handle) { LASSERT(handle->lgh_obj); @@ -890,6 +1003,19 @@ static int llog_osd_exist(struct llog_handle *handle) !lu_object_is_dying(handle->lgh_obj->do_lu.lo_header)); } +/** + * Implementation of the llog_operations::lop_declare_create + * + * This function declares the llog create. It declares also name insert + * into llog directory in case of named llog. + * + * \param[in] env execution environment + * \param[in] res llog handle of the current llog + * \param[in] th current transaction handle + * + * \retval 0 on successful create declaration + * \retval negative value on error + */ static int llog_osd_declare_create(const struct lu_env *env, struct llog_handle *res, struct thandle *th) { @@ -937,8 +1063,19 @@ static int llog_osd_declare_create(const struct lu_env *env, RETURN(rc); } -/* This is a callback from the llog_* functions. - * Assumes caller has already pushed us into the kernel context. */ +/** + * Implementation of the llog_operations::lop_create + * + * This function creates the llog according with llog_handle::lgh_obj + * and llog_handle::lgh_name. + * + * \param[in] env execution environment + * \param[in] res llog handle of the current llog + * \param[in] th current transaction handle + * + * \retval 0 on successful create + * \retval negative value on error + */ static int llog_osd_create(const struct lu_env *env, struct llog_handle *res, struct thandle *th) { @@ -993,6 +1130,18 @@ static int llog_osd_create(const struct lu_env *env, struct llog_handle *res, RETURN(rc); } +/** + * Implementation of the llog_operations::lop_close + * + * This function closes the llog. It just put llog object and referenced + * local storage. + * + * \param[in] env execution environment + * \param[in] handle llog handle of the current llog + * + * \retval 0 on successful llog close + * \retval negative value on error + */ static int llog_osd_close(const struct lu_env *env, struct llog_handle *handle) { struct local_oid_storage *los; @@ -1014,6 +1163,20 @@ static int llog_osd_close(const struct lu_env *env, struct llog_handle *handle) RETURN(rc); } +/** + * Implementation of the llog_operations::lop_destroy + * + * This function destroys the llog and deletes also entry in the + * llog directory in case of named llog. Llog should be opened prior that. + * Destroy method is not part of external transaction and does everything + * inside. + * + * \param[in] env execution environment + * \param[in] loghandle llog handle of the current llog + * + * \retval 0 on successful destroy + * \retval negative value on error + */ static int llog_osd_destroy(const struct lu_env *env, struct llog_handle *loghandle) { @@ -1091,6 +1254,21 @@ out_trans: RETURN(rc); } +/** + * Implementation of the llog_operations::lop_setup + * + * This function setup the llog on local storage. + * + * \param[in] env execution environment + * \param[in] obd obd device the llog belongs to + * \param[in] olg the llog group, it is always zero group now. + * \param[in] ctxt_idx the llog index, it defines the purpose of this llog. + * Every new llog type have to use own index. + * \param[in] disk_obd the storage obd, where llog is stored. + * + * \retval 0 on successful llog setup + * \retval negative value on error + */ static int llog_osd_setup(const struct lu_env *env, struct obd_device *obd, struct obd_llog_group *olg, int ctxt_idx, struct obd_device *disk_obd) @@ -1135,6 +1313,16 @@ out: return rc; } +/** + * Implementation of the llog_operations::lop_cleanup + * + * This function cleanups the llog on local storage. + * + * \param[in] env execution environment + * \param[in] ctxt the llog context + * + * \retval 0 + */ static int llog_osd_cleanup(const struct lu_env *env, struct llog_ctxt *ctxt) { if (ctxt->loc_los_nameless != NULL) { @@ -1167,7 +1355,27 @@ struct llog_operations llog_osd_ops = { }; EXPORT_SYMBOL(llog_osd_ops); -/* reads the catalog list */ +/** + * Read the special file which contains the list of llog catalogs IDs + * + * This function reads the CATALOGS file which contains the array of llog + * catalogs IDs. The main purpose of this file is to store OSP llogs indexed + * by OST/MDT number. + * + * \param[in] env execution environment + * \param[in] d corresponding storage device + * \param[in] idx position to start from, usually OST/MDT index + * \param[in] count how many catalog IDs to read + * \param[out] idarray the buffer for the data. If it is NULL then + * function returns just number of catalog IDs + * in the file. + * \param[in] fid LLOG_CATALOGS_OID for CATALOG object + * + * \retval 0 on successful read of catalog IDs + * \retval negative value on error + * \retval positive value which is number of records in + * the file if \a idarray is NULL + */ int llog_osd_get_cat_list(const struct lu_env *env, struct dt_device *d, int idx, int count, struct llog_catid *idarray, const struct lu_fid *fid) @@ -1240,7 +1448,7 @@ out_trans: /* read for new ost index or for empty file */ memset(idarray, 0, size); - if (lgi->lgi_attr.la_size < lgi->lgi_off + size) + if (lgi->lgi_attr.la_size <= lgi->lgi_off) GOTO(out, rc = 0); if (lgi->lgi_attr.la_size < lgi->lgi_off + size) size = lgi->lgi_attr.la_size - lgi->lgi_off; @@ -1261,7 +1469,23 @@ out: } EXPORT_SYMBOL(llog_osd_get_cat_list); -/* writes the cat list */ +/** + * Write the special file which contains the list of llog catalogs IDs + * + * This function writes the CATALOG file which contains the array of llog + * catalogs IDs. It is used mostly to store OSP llogs indexed by OST/MDT + * number. + * + * \param[in] env execution environment + * \param[in] d corresponding storage device + * \param[in] idx position to start from, usually OST/MDT index + * \param[in] count how many catalog IDs to write + * \param[out] idarray the buffer with the data to write. + * \param[in] fid LLOG_CATALOGS_OID for CATALOG object + * + * \retval 0 on successful write of catalog IDs + * \retval negative value on error + */ int llog_osd_put_cat_list(const struct lu_env *env, struct dt_device *d, int idx, int count, struct llog_catid *idarray, const struct lu_fid *fid) @@ -1271,7 +1495,7 @@ int llog_osd_put_cat_list(const struct lu_env *env, struct dt_device *d, struct thandle *th; int rc, size; - if (!count) + if (count == 0) RETURN(0); LASSERT(d); @@ -1314,7 +1538,8 @@ int llog_osd_put_cat_list(const struct lu_env *env, struct dt_device *d, rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th); if (rc) - CDEBUG(D_INODE, "error writeing CATALOGS: rc = %d\n", rc); + CDEBUG(D_INODE, "can't write CATALOGS at index %d: rc = %d\n", + idx, rc); out_trans: dt_trans_stop(env, d, th); out: diff --git a/lustre/obdclass/llog_test.c b/lustre/obdclass/llog_test.c index 98a3880..52e7c33 100644 --- a/lustre/obdclass/llog_test.c +++ b/lustre/obdclass/llog_test.c @@ -230,7 +230,7 @@ static int llog_test_3(const struct lu_env *env, struct obd_device *obd, lgr.lgr_hdr.lrh_type = LLOG_GEN_REC; CWARN("3a: write one create_rec\n"); - rc = llog_write(env, llh, &lgr.lgr_hdr, NULL, 0, NULL, -1); + rc = llog_write(env, llh, &lgr.lgr_hdr, LLOG_NEXT_IDX); num_recs++; if (rc < 0) { CERROR("3a: write one log record failed: %d\n", rc); @@ -241,30 +241,9 @@ static int llog_test_3(const struct lu_env *env, struct obd_device *obd, if (rc) RETURN(rc); - CWARN("3b: write 10 cfg log records with 8 bytes bufs\n"); - for (i = 0; i < 10; i++) { - struct llog_rec_hdr hdr; - char buf[8]; - - hdr.lrh_len = 8; - hdr.lrh_type = OBD_CFG_REC; - memset(buf, 0, sizeof buf); - rc = llog_write(env, llh, &hdr, NULL, 0, buf, -1); - if (rc < 0) { - CERROR("3b: write 10 records failed at #%d: %d\n", - i + 1, rc); - RETURN(rc); - } - num_recs++; - } - - rc = verify_handle("3b", llh, num_recs); - if (rc) - RETURN(rc); - CWARN("3c: write 1000 more log records\n"); for (i = 0; i < 1000; i++) { - rc = llog_write(env, llh, &lgr.lgr_hdr, NULL, 0, NULL, -1); + rc = llog_write(env, llh, &lgr.lgr_hdr, LLOG_NEXT_IDX); if (rc < 0) { CERROR("3c: write 1000 records failed at #%d: %d\n", i + 1, rc); @@ -277,23 +256,20 @@ static int llog_test_3(const struct lu_env *env, struct obd_device *obd, if (rc) RETURN(rc); - CWARN("3d: write log more than BITMAP_SIZE, return -ENOSPC\n"); + CWARN("3d: write records with variable size until BITMAP_SIZE, " + "return -ENOSPC\n"); for (i = 0; i < LLOG_BITMAP_SIZE(llh->lgh_hdr) + 1; i++) { - struct llog_rec_hdr hdr; - char buf_even[24]; - char buf_odd[32]; - - memset(buf_odd, 0, sizeof buf_odd); - memset(buf_even, 0, sizeof buf_even); - if ((i % 2) == 0) { - hdr.lrh_len = 24; - hdr.lrh_type = OBD_CFG_REC; - rc = llog_write(env, llh, &hdr, NULL, 0, buf_even, -1); - } else { - hdr.lrh_len = 32; - hdr.lrh_type = OBD_CFG_REC; - rc = llog_write(env, llh, &hdr, NULL, 0, buf_odd, -1); - } + char buf[64]; + struct llog_rec_hdr *hdr = (void *)&buf; + + memset(buf, 0, sizeof buf); + if ((i % 2) == 0) + hdr->lrh_len = 40; + else + hdr->lrh_len = 64; + hdr->lrh_type = OBD_CFG_REC; + rc = llog_write(env, llh, hdr, LLOG_NEXT_IDX); + if (rc == -ENOSPC) { break; } else if (rc < 0) { @@ -326,7 +302,7 @@ static int llog_test_4(const struct lu_env *env, struct obd_device *obd) struct llog_ctxt *ctxt; int num_recs = 0; char *buf; - struct llog_rec_hdr rec; + struct llog_rec_hdr *rec; ENTRY; @@ -353,7 +329,7 @@ static int llog_test_4(const struct lu_env *env, struct obd_device *obd) cat_logid = cath->lgh_id; CWARN("4b: write 1 record into the catalog\n"); - rc = llog_cat_add(env, cath, &lmr.lmr_hdr, &cookie, NULL); + rc = llog_cat_add(env, cath, &lmr.lmr_hdr, &cookie); if (rc != 1) { CERROR("4b: write 1 catalog record failed at: %d\n", rc); GOTO(out, rc); @@ -381,7 +357,7 @@ static int llog_test_4(const struct lu_env *env, struct obd_device *obd) CWARN("4d: write %d more log records\n", LLOG_TEST_RECNUM); for (i = 0; i < LLOG_TEST_RECNUM; i++) { - rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL, NULL); + rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL); if (rc) { CERROR("4d: write %d records failed at #%d: %d\n", LLOG_TEST_RECNUM, i + 1, rc); @@ -396,15 +372,15 @@ static int llog_test_4(const struct lu_env *env, struct obd_device *obd) GOTO(out, rc); CWARN("4e: add 5 large records, one record per block\n"); - buflen = LLOG_CHUNK_SIZE - sizeof(struct llog_rec_hdr) - - sizeof(struct llog_rec_tail); + buflen = LLOG_CHUNK_SIZE; OBD_ALLOC(buf, buflen); if (buf == NULL) GOTO(out, rc = -ENOMEM); for (i = 0; i < 5; i++) { - rec.lrh_len = buflen; - rec.lrh_type = OBD_CFG_REC; - rc = llog_cat_add(env, cath, &rec, NULL, buf); + rec = (void *)buf; + rec->lrh_len = buflen; + rec->lrh_type = OBD_CFG_REC; + rc = llog_cat_add(env, cath, rec, NULL); if (rc) { CERROR("4e: write 5 records failed at #%d: %d\n", i + 1, rc); @@ -559,7 +535,7 @@ static int llog_test_5(const struct lu_env *env, struct obd_device *obd) } CWARN("5d: add 1 record to the log with many canceled empty pages\n"); - rc = llog_cat_add(env, llh, &lmr.lmr_hdr, NULL, NULL); + rc = llog_cat_add(env, llh, &lmr.lmr_hdr, NULL); if (rc) { CERROR("5d: add record to the log with many canceled empty " "pages failed\n"); @@ -744,8 +720,7 @@ static int llog_test_7_sub(const struct lu_env *env, struct llog_ctxt *ctxt) GOTO(out_close, rc); } for (i = 0; i < LLOG_BITMAP_SIZE(llh->lgh_hdr); i++) { - rc = llog_write(env, llh, &llog_records.lrh, NULL, 0, - NULL, -1); + rc = llog_write(env, llh, &llog_records.lrh, LLOG_NEXT_IDX); if (rc == -ENOSPC) { break; } else if (rc < 0) { @@ -1000,7 +975,7 @@ static int llog_test_8(const struct lu_env *env, struct obd_device *obd) orig_counter = plain_counter; for (i = 0; i < 100; i++) { - rc = llog_cat_add(env, llh, &lmr.lmr_hdr, NULL, NULL); + rc = llog_cat_add(env, llh, &lmr.lmr_hdr, NULL); if (rc) { CERROR("5a: add record failed\n"); GOTO(out, rc); @@ -1035,7 +1010,7 @@ static int llog_test_8(const struct lu_env *env, struct obd_device *obd) } for (i = 0; i < 100; i++) { - rc = llog_cat_add(env, llh, &lmr.lmr_hdr, NULL, NULL); + rc = llog_cat_add(env, llh, &lmr.lmr_hdr, NULL); if (rc) { CERROR("8b: add record failed\n"); GOTO(out, rc); diff --git a/lustre/obdclass/obd_config.c b/lustre/obdclass/obd_config.c index d81a9a7..03730a3 100644 --- a/lustre/obdclass/obd_config.c +++ b/lustre/obdclass/obd_config.c @@ -1472,6 +1472,42 @@ extern int lustre_check_exclusion(struct super_block *sb, char *svname); #define lustre_check_exclusion(a,b) 0 #endif +/* + * Supplemental functions for config logs, it allocates lustre_cfg + * buffers plus initialized llog record header at the beginning. + */ +struct llog_cfg_rec *lustre_cfg_rec_new(int cmd, struct lustre_cfg_bufs *bufs) +{ + struct llog_cfg_rec *lcr; + int reclen; + + ENTRY; + + reclen = lustre_cfg_len(bufs->lcfg_bufcount, bufs->lcfg_buflen); + reclen = llog_data_len(reclen) + sizeof(struct llog_rec_hdr) + + sizeof(struct llog_rec_tail); + + OBD_ALLOC(lcr, reclen); + if (lcr == NULL) + RETURN(NULL); + + lustre_cfg_init(&lcr->lcr_cfg, cmd, bufs); + + lcr->lcr_hdr.lrh_len = reclen; + lcr->lcr_hdr.lrh_type = OBD_CFG_REC; + + RETURN(lcr); +} +EXPORT_SYMBOL(lustre_cfg_rec_new); + +void lustre_cfg_rec_free(struct llog_cfg_rec *lcr) +{ + ENTRY; + OBD_FREE(lcr, lcr->lcr_hdr.lrh_len); + EXIT; +} +EXPORT_SYMBOL(lustre_cfg_rec_free); + /** Parse a configuration llog, doing various manipulations on them * for various reasons, (modifications for compatibility, skip obsolete * records, change uuids, etc), then class_process_config() resulting @@ -1654,6 +1690,8 @@ int class_config_llog_handler(const struct lu_env *env, } lcfg_new = lustre_cfg_new(lcfg->lcfg_command, &bufs); + if (lcfg_new == NULL) + GOTO(out, rc = -ENOMEM); lcfg_new->lcfg_num = lcfg->lcfg_num; lcfg_new->lcfg_flags = lcfg->lcfg_flags; diff --git a/lustre/obdclass/obd_mount.c b/lustre/obdclass/obd_mount.c index 2289b8d..f8ddf0b 100644 --- a/lustre/obdclass/obd_mount.c +++ b/lustre/obdclass/obd_mount.c @@ -95,9 +95,11 @@ int lustre_process_log(struct super_block *sb, char *logname, lustre_cfg_bufs_set(bufs, 2, cfg, sizeof(*cfg)); lustre_cfg_bufs_set(bufs, 3, &sb, sizeof(sb)); lcfg = lustre_cfg_new(LCFG_LOG_START, bufs); - rc = obd_process_config(mgc, sizeof(*lcfg), lcfg); - lustre_cfg_free(lcfg); - + if (lcfg == NULL) + GOTO(out, rc = -ENOMEM); + rc = obd_process_config(mgc, sizeof(*lcfg), lcfg); + lustre_cfg_free(lcfg); +out: OBD_FREE_PTR(bufs); if (rc == -EINVAL) @@ -140,6 +142,8 @@ int lustre_end_log(struct super_block *sb, char *logname, if (cfg) lustre_cfg_bufs_set(&bufs, 2, cfg, sizeof(*cfg)); lcfg = lustre_cfg_new(LCFG_LOG_END, &bufs); + if (lcfg == NULL) + RETURN(-ENOMEM); rc = obd_process_config(mgc, sizeof(*lcfg), lcfg); lustre_cfg_free(lcfg); RETURN(rc); @@ -172,6 +176,8 @@ int do_lcfg(char *cfgname, lnet_nid_t nid, int cmd, lustre_cfg_bufs_set_string(&bufs, 4, s4); lcfg = lustre_cfg_new(cmd, &bufs); + if (lcfg == NULL) + return -ENOMEM; lcfg->lcfg_nid = nid; rc = class_process_config(lcfg); lustre_cfg_free(lcfg); diff --git a/lustre/obdclass/obd_mount_server.c b/lustre/obdclass/obd_mount_server.c index 02d2cb0..86ae55c 100644 --- a/lustre/obdclass/obd_mount_server.c +++ b/lustre/obdclass/obd_mount_server.c @@ -698,16 +698,18 @@ static int lustre_lwp_add_conn(struct lustre_cfg *cfg, lustre_cfg_string(cfg, 1)); lcfg = lustre_cfg_new(LCFG_ADD_CONN, bufs); - + if (lcfg == NULL) + GOTO(out_cfg, rc = -ENOMEM); rc = class_add_conn(lwp, lcfg); if (rc) CERROR("%s: can't add conn: rc = %d\n", lwpname, rc); -out: - if (bufs != NULL) - OBD_FREE_PTR(bufs); if (lcfg != NULL) lustre_cfg_free(lcfg); +out_cfg: + if (bufs != NULL) + OBD_FREE_PTR(bufs); +out: if (lwpname != NULL) OBD_FREE(lwpname, MTI_NAME_MAXLEN); RETURN(rc); diff --git a/lustre/osd-ldiskfs/osd_internal.h b/lustre/osd-ldiskfs/osd_internal.h index a5f092e..3925b7a 100644 --- a/lustre/osd-ldiskfs/osd_internal.h +++ b/lustre/osd-ldiskfs/osd_internal.h @@ -910,9 +910,9 @@ static inline void osd_trans_declare_op(const struct lu_env *env, LASSERT(oh->ot_handle == NULL); if (unlikely(op >= OSD_OT_MAX)) { - if (unlikely(ldiskfs_track_declares_assert)) + if (unlikely(ldiskfs_track_declares_assert)) { LASSERT(op < OSD_OT_MAX); - else { + } else { CWARN("%s: Invalid operation index %d\n", osd_name(oti->oti_dev), op); libcfs_debug_dumpstack(NULL); diff --git a/lustre/osp/osp_sync.c b/lustre/osp/osp_sync.c index 07e3a41..d0805f3 100644 --- a/lustre/osp/osp_sync.c +++ b/lustre/osp/osp_sync.c @@ -250,7 +250,7 @@ static int osp_sync_add_rec(const struct lu_env *env, struct osp_device *d, if (ctxt == NULL) RETURN(-ENOMEM); rc = llog_add(env, ctxt->loc_handle, &osi->osi_hdr, &osi->osi_cookie, - NULL, th); + th); llog_ctxt_put(ctxt); CDEBUG(D_OTHER, "%s: new record "DOSTID":%lu/%lu: %d\n", @@ -1077,8 +1077,7 @@ static int osp_sync_llog_init(const struct lu_env *env, struct osp_device *d) memcpy(&osi->osi_gen.lgr_gen, &d->opd_syn_generation, sizeof(osi->osi_gen.lgr_gen)); - rc = llog_cat_add(env, lgh, &osi->osi_gen.lgr_hdr, &osi->osi_cookie, - NULL); + rc = llog_cat_add(env, lgh, &osi->osi_gen.lgr_hdr, &osi->osi_cookie); if (rc < 0) GOTO(out_close, rc); llog_ctxt_put(ctxt); diff --git a/lustre/ptlrpc/sec_config.c b/lustre/ptlrpc/sec_config.c index d0e8ff9..fad999f 100644 --- a/lustre/ptlrpc/sec_config.c +++ b/lustre/ptlrpc/sec_config.c @@ -969,32 +969,26 @@ static int sptlrpc_record_rule_set(struct llog_handle *llh, char *target, struct sptlrpc_rule_set *rset) { - struct lustre_cfg_bufs bufs; - struct lustre_cfg *lcfg; - struct llog_rec_hdr rec; - int buflen; - char param[48]; - int i, rc; - - for (i = 0; i < rset->srs_nrule; i++) { - rule2string(&rset->srs_rules[i], param, sizeof(param)); - - lustre_cfg_bufs_reset(&bufs, NULL); - lustre_cfg_bufs_set_string(&bufs, 1, target); - lustre_cfg_bufs_set_string(&bufs, 2, param); - lcfg = lustre_cfg_new(LCFG_SPTLRPC_CONF, &bufs); - LASSERT(lcfg); - - buflen = lustre_cfg_len(lcfg->lcfg_bufcount, - lcfg->lcfg_buflens); - rec.lrh_len = llog_data_len(buflen); - rec.lrh_type = OBD_CFG_REC; - rc = llog_write(NULL, llh, &rec, NULL, 0, (void *)lcfg, -1); - if (rc) - CERROR("failed to write a rec: rc = %d\n", rc); - lustre_cfg_free(lcfg); - } - return 0; + struct llog_cfg_rec *lcr; + struct lustre_cfg_bufs bufs; + char param[48]; + int i, rc; + + for (i = 0; i < rset->srs_nrule; i++) { + rule2string(&rset->srs_rules[i], param, sizeof(param)); + + lustre_cfg_bufs_reset(&bufs, NULL); + lustre_cfg_bufs_set_string(&bufs, 1, target); + lustre_cfg_bufs_set_string(&bufs, 2, param); + lcr = lustre_cfg_rec_new(LCFG_SPTLRPC_CONF, &bufs); + if (lcr == NULL) + return -ENOMEM; + rc = llog_write(NULL, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX); + lustre_cfg_rec_free(lcr); + if (rc) + return rc; + } + return 0; } static int sptlrpc_record_rules(struct llog_handle *llh, diff --git a/lustre/utils/lustre_cfg.c b/lustre/utils/lustre_cfg.c index 6c8355d..03b1dc5 100644 --- a/lustre/utils/lustre_cfg.c +++ b/lustre/utils/lustre_cfg.c @@ -128,8 +128,12 @@ int jt_lcfg_attach(int argc, char **argv) lustre_cfg_bufs_set_string(&bufs, 2, argv[3]); lcfg = lustre_cfg_new(LCFG_ATTACH, &bufs); - rc = lcfg_ioctl(argv[0], OBD_DEV_ID, lcfg); - lustre_cfg_free(lcfg); + if (lcfg == NULL) { + rc = -ENOMEM; + } else { + rc = lcfg_ioctl(argv[0], OBD_DEV_ID, lcfg); + lustre_cfg_free(lcfg); + } if (rc < 0) { fprintf(stderr, "error: %s: LCFG_ATTACH %s\n", jt_cmdname(argv[0]), strerror(rc = errno)); @@ -164,8 +168,12 @@ int jt_lcfg_setup(int argc, char **argv) } lcfg = lustre_cfg_new(LCFG_SETUP, &bufs); - rc = lcfg_ioctl(argv[0], OBD_DEV_ID, lcfg); - lustre_cfg_free(lcfg); + if (lcfg == NULL) { + rc = -ENOMEM; + } else { + rc = lcfg_ioctl(argv[0], OBD_DEV_ID, lcfg); + lustre_cfg_free(lcfg); + } if (rc < 0) fprintf(stderr, "error: %s: %s\n", jt_cmdname(argv[0]), strerror(rc = errno)); @@ -192,8 +200,12 @@ int jt_obd_detach(int argc, char **argv) return CMD_HELP; lcfg = lustre_cfg_new(LCFG_DETACH, &bufs); - rc = lcfg_ioctl(argv[0], OBD_DEV_ID, lcfg); - lustre_cfg_free(lcfg); + if (lcfg == NULL) { + rc = -ENOMEM; + } else { + rc = lcfg_ioctl(argv[0], OBD_DEV_ID, lcfg); + lustre_cfg_free(lcfg); + } if (rc < 0) fprintf(stderr, "error: %s: %s\n", jt_cmdname(argv[0]), strerror(rc = errno)); @@ -242,8 +254,12 @@ int jt_obd_cleanup(int argc, char **argv) } lcfg = lustre_cfg_new(LCFG_CLEANUP, &bufs); - rc = lcfg_ioctl(argv[0], OBD_DEV_ID, lcfg); - lustre_cfg_free(lcfg); + if (lcfg == NULL) { + rc = -ENOMEM; + } else { + rc = lcfg_ioctl(argv[0], OBD_DEV_ID, lcfg); + lustre_cfg_free(lcfg); + } if (rc < 0) fprintf(stderr, "error: %s: %s\n", jt_cmdname(argv[0]), strerror(rc = errno)); @@ -263,17 +279,16 @@ int do_add_uuid(char * func, char *uuid, lnet_nid_t nid) lustre_cfg_bufs_set_string(&bufs, 1, uuid); lcfg = lustre_cfg_new(LCFG_ADD_UUID, &bufs); - lcfg->lcfg_nid = nid; - /* Poison NAL -- pre 1.4.6 will LASSERT on 0 NAL, this way it - doesn't work without crashing (bz 10130) */ - lcfg->lcfg_nal = 0x5a; - -#if 0 - fprintf(stderr, "adding\tnid: %d\tuuid: %s\n", - lcfg->lcfg_nid, uuid); -#endif - rc = lcfg_ioctl(func, OBD_DEV_ID, lcfg); - lustre_cfg_free(lcfg); + if (lcfg == NULL) { + rc = -ENOMEM; + } else { + lcfg->lcfg_nid = nid; + /* Poison NAL -- pre 1.4.6 will LASSERT on 0 NAL, this way it + * doesn't work without crashing (bz 10130) */ + lcfg->lcfg_nal = 0x5a; + rc = lcfg_ioctl(func, OBD_DEV_ID, lcfg); + lustre_cfg_free(lcfg); + } if (rc) { fprintf(stderr, "IOC_PORTAL_ADD_UUID failed: %s\n", strerror(errno)); @@ -317,8 +332,12 @@ int jt_lcfg_del_uuid(int argc, char **argv) lustre_cfg_bufs_set_string(&bufs, 1, argv[1]); lcfg = lustre_cfg_new(LCFG_DEL_UUID, &bufs); - rc = lcfg_ioctl(argv[0], OBD_DEV_ID, lcfg); - lustre_cfg_free(lcfg); + if (lcfg == NULL) { + rc = -ENOMEM; + } else { + rc = lcfg_ioctl(argv[0], OBD_DEV_ID, lcfg); + lustre_cfg_free(lcfg); + } if (rc) { fprintf(stderr, "IOC_PORTAL_DEL_UUID failed: %s\n", strerror(errno)); @@ -342,8 +361,12 @@ int jt_lcfg_del_mount_option(int argc, char **argv) lustre_cfg_bufs_set_string(&bufs, 1, argv[1]); lcfg = lustre_cfg_new(LCFG_DEL_MOUNTOPT, &bufs); - rc = lcfg_ioctl(argv[0], OBD_DEV_ID, lcfg); - lustre_cfg_free(lcfg); + if (lcfg == NULL) { + rc = -ENOMEM; + } else { + rc = lcfg_ioctl(argv[0], OBD_DEV_ID, lcfg); + lustre_cfg_free(lcfg); + } if (rc < 0) { fprintf(stderr, "error: %s: %s\n", jt_cmdname(argv[0]), strerror(rc = errno)); @@ -368,12 +391,14 @@ int jt_lcfg_set_timeout(int argc, char **argv) lustre_cfg_bufs_reset(&bufs, lcfg_devname); lcfg = lustre_cfg_new(LCFG_SET_TIMEOUT, &bufs); - lcfg->lcfg_num = atoi(argv[1]); - - rc = lcfg_ioctl(argv[0], OBD_DEV_ID, lcfg); - //rc = lcfg_mgs_ioctl(argv[0], OBD_DEV_ID, lcfg); + if (lcfg == NULL) { + rc = -ENOMEM; + } else { + lcfg->lcfg_num = atoi(argv[1]); - lustre_cfg_free(lcfg); + rc = lcfg_ioctl(argv[0], OBD_DEV_ID, lcfg); + lustre_cfg_free(lcfg); + } if (rc < 0) { fprintf(stderr, "error: %s: %s\n", jt_cmdname(argv[0]), strerror(rc = errno)); @@ -407,10 +432,14 @@ int jt_lcfg_add_conn(int argc, char **argv) lustre_cfg_bufs_set_string(&bufs, 1, argv[1]); lcfg = lustre_cfg_new(LCFG_ADD_CONN, &bufs); - lcfg->lcfg_num = priority; + if (lcfg == NULL) { + rc = -ENOMEM; + } else { + lcfg->lcfg_num = priority; - rc = lcfg_ioctl(argv[0], OBD_DEV_ID, lcfg); - lustre_cfg_free (lcfg); + rc = lcfg_ioctl(argv[0], OBD_DEV_ID, lcfg); + lustre_cfg_free(lcfg); + } if (rc < 0) { fprintf(stderr, "error: %s: %s\n", jt_cmdname(argv[0]), strerror(rc = errno)); @@ -441,9 +470,12 @@ int jt_lcfg_del_conn(int argc, char **argv) lustre_cfg_bufs_set_string(&bufs, 1, argv[1]); lcfg = lustre_cfg_new(LCFG_DEL_MOUNTOPT, &bufs); - - rc = lcfg_ioctl(argv[0], OBD_DEV_ID, lcfg); - lustre_cfg_free(lcfg); + if (lcfg == NULL) { + rc = -ENOMEM; + } else { + rc = lcfg_ioctl(argv[0], OBD_DEV_ID, lcfg); + lustre_cfg_free(lcfg); + } if (rc < 0) { fprintf(stderr, "error: %s: %s\n", jt_cmdname(argv[0]), strerror(rc = errno)); @@ -469,9 +501,12 @@ int jt_lcfg_param(int argc, char **argv) } lcfg = lustre_cfg_new(LCFG_PARAM, &bufs); - - rc = lcfg_ioctl(argv[0], OBD_DEV_ID, lcfg); - lustre_cfg_free(lcfg); + if (lcfg == NULL) { + rc = -ENOMEM; + } else { + rc = lcfg_ioctl(argv[0], OBD_DEV_ID, lcfg); + lustre_cfg_free(lcfg); + } if (rc < 0) { fprintf(stderr, "error: %s: %s\n", jt_cmdname(argv[0]), strerror(rc = errno)); @@ -534,11 +569,11 @@ static int jt_lcfg_mgsparam2(int argc, char **argv, struct param_opts *popt) lustre_cfg_bufs_set_string(&bufs, 1, buf); lcfg = lustre_cfg_new(LCFG_SET_PARAM, &bufs); - if (IS_ERR(lcfg)) { + if (lcfg == NULL) { fprintf(stderr, "error: allocating lcfg for %s: %s\n", - jt_cmdname(argv[0]), strerror(PTR_ERR(lcfg))); + jt_cmdname(argv[0]), strerror(-ENOMEM)); if (rc == 0) - rc = PTR_ERR(lcfg); + rc = -ENOMEM; } else { int rc2 = lcfg_mgs_ioctl(argv[0], OBD_DEV_ID, lcfg); if (rc2 != 0) { @@ -608,9 +643,12 @@ int jt_lcfg_mgsparam(int argc, char **argv) /* We could put other opcodes here. */ lcfg = lustre_cfg_new(LCFG_PARAM, &bufs); - - rc = lcfg_mgs_ioctl(argv[0], OBD_DEV_ID, lcfg); - lustre_cfg_free(lcfg); + if (lcfg == NULL) { + rc = -ENOMEM; + } else { + rc = lcfg_mgs_ioctl(argv[0], OBD_DEV_ID, lcfg); + lustre_cfg_free(lcfg); + } if (buf) free(buf); if (rc < 0) { diff --git a/lustre/utils/obd.c b/lustre/utils/obd.c index 4774ac7..75a4422 100644 --- a/lustre/utils/obd.c +++ b/lustre/utils/obd.c @@ -3210,10 +3210,8 @@ static int pool_cmd(enum lcfg_command_type cmd, lustre_cfg_bufs_set_string(&bufs, 2, ostname); lcfg = lustre_cfg_new(cmd, &bufs); - if (IS_ERR(lcfg)) { - rc = PTR_ERR(lcfg); - return rc; - } + if (lcfg == NULL) + return rc; memset(&data, 0, sizeof(data)); rc = data.ioc_dev = get_mgs_device(); @@ -3277,11 +3275,8 @@ static int nodemap_cmd(enum lcfg_command_type cmd, void *ret_data, va_end(ap); lcfg = lustre_cfg_new(cmd, &bufs); - - if (IS_ERR(lcfg)) { - rc = PTR_ERR(lcfg); - return rc; - } + if (lcfg == NULL) + return -ENOMEM; memset(&data, 0, sizeof(data)); rc = data.ioc_dev = get_mgs_device(); -- 1.8.3.1