From: shadow Date: Wed, 1 Oct 2008 04:46:03 +0000 (+0000) Subject: make CATALOG processing more safe. X-Git-Tag: v1_9_80~39 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=3cfab6a72cd2488414d8f06a9ca139ce25a04894 make CATALOG processing more safe. Branch HEAD b=17157 i=johann i=wangdi --- diff --git a/lustre/include/lustre_log.h b/lustre/include/lustre_log.h index d1b8e82..9f6972a 100644 --- a/lustre/include/lustre_log.h +++ b/lustre/include/lustre_log.h @@ -250,6 +250,9 @@ int llog_get_cat_list(struct obd_device *obd, struct obd_device *disk_obd, char *name, int idx, int count, struct llog_catid *idarray); +int llog_put_cat_list(struct obd_device *obd, struct obd_device *disk_obd, + char *name, int idx, int count, struct llog_catid *idarray); + struct llog_ctxt { int loc_idx; /* my index the obd array of ctxt's */ struct llog_gen loc_gen; @@ -410,6 +413,7 @@ static inline void llog_group_init(struct obd_llog_group *olg, int group) { cfs_waitq_init(&olg->olg_waitq); spin_lock_init(&olg->olg_lock); + sema_init(&olg->olg_cat_processing, 1); olg->olg_group = group; } diff --git a/lustre/include/obd.h b/lustre/include/obd.h index 01d8c8f..b02fd6b 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -946,6 +946,7 @@ struct obd_llog_group { spinlock_t olg_lock; struct obd_export *olg_exp; int olg_initializing; + struct semaphore olg_cat_processing; }; /* corresponds to one of the obd's */ diff --git a/lustre/mds/mds_lov.c b/lustre/mds/mds_lov.c index 9073cc8..01fcd1a 100644 --- a/lustre/mds/mds_lov.c +++ b/lustre/mds/mds_lov.c @@ -362,8 +362,8 @@ static int mds_lov_update_desc(struct obd_device *obd, int idx, /* Don't change the mds_lov_desc until the objids size matches the count (paranoia) */ mds->mds_lov_desc = *ld; - CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d\n", - mds->mds_lov_desc.ld_tgt_count); + CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d - idx %d / uuid %s\n", + mds->mds_lov_desc.ld_tgt_count, idx, uuid->uuid); stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT, mds->mds_lov_desc.ld_tgt_count); diff --git a/lustre/obdclass/llog_cat.c b/lustre/obdclass/llog_cat.c index 01089b4..b32d343 100644 --- a/lustre/obdclass/llog_cat.c +++ b/lustre/obdclass/llog_cat.c @@ -63,7 +63,8 @@ * * Assumes caller has already pushed us into the kernel context and is locking. */ -static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle) +static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle, + struct llog_logid *lid) { struct llog_handle *loghandle; struct llog_log_hdr *llh; @@ -84,11 +85,14 @@ static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle) if (OBD_FAIL_CHECK(OBD_FAIL_MDS_LLOG_CREATE_FAILED)) RETURN(ERR_PTR(-ENOSPC)); - - rc = llog_create(cathandle->lgh_ctxt, &loghandle, NULL, NULL); - if (rc) + + if (lid != NULL && lid->lgl_oid == 0) + lid = NULL; + + rc = llog_create(cathandle->lgh_ctxt, &loghandle, lid, NULL); + if (rc) RETURN(ERR_PTR(rc)); - + rc = llog_init_handle(loghandle, LLOG_F_IS_PLAIN | LLOG_F_ZAP_WHEN_EMPTY, &cathandle->lgh_hdr->llh_tgtuuid); @@ -129,13 +133,12 @@ static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle) LASSERT(list_empty(&loghandle->u.phd.phd_entry)); list_add_tail(&loghandle->u.phd.phd_entry, &cathandle->u.chd.chd_head); - out_destroy: +out_destroy: if (rc < 0) llog_destroy(loghandle); RETURN(loghandle); } -EXPORT_SYMBOL(llog_cat_new_log); /* Open an existent log handle and add it to the open list. * This log handle will be closed when all of the records in it are removed. @@ -219,6 +222,7 @@ EXPORT_SYMBOL(llog_cat_put); * NOTE: loghandle is write-locked upon successful return */ static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle, + struct llog_logid *lid, int create) { struct llog_handle *loghandle = NULL; @@ -233,6 +237,7 @@ static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle, up_read(&cathandle->lgh_lock); RETURN(loghandle); } else { + lid = NULL; up_write(&loghandle->lgh_lock); } } @@ -256,12 +261,13 @@ static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle, up_write(&cathandle->lgh_lock); RETURN(loghandle); } else { + lid = NULL; up_write(&loghandle->lgh_lock); } } CDEBUG(D_INODE, "creating new log\n"); - loghandle = llog_cat_new_log(cathandle); + loghandle = llog_cat_new_log(cathandle, lid); if (!IS_ERR(loghandle)) down_write(&loghandle->lgh_lock); up_write(&cathandle->lgh_lock); @@ -281,7 +287,7 @@ int llog_cat_add_rec(struct llog_handle *cathandle, struct llog_rec_hdr *rec, ENTRY; LASSERT(rec->lrh_len <= LLOG_CHUNK_SIZE); - loghandle = llog_cat_current_log(cathandle, 1); + loghandle = llog_cat_current_log(cathandle, &reccookie->lgc_lgl, 1); if (IS_ERR(loghandle)) RETURN(PTR_ERR(loghandle)); /* loghandle is already locked by llog_cat_current_log() for us */ @@ -289,7 +295,7 @@ int llog_cat_add_rec(struct llog_handle *cathandle, struct llog_rec_hdr *rec, up_write(&loghandle->lgh_lock); if (rc == -ENOSPC) { /* to create a new plain log */ - loghandle = llog_cat_current_log(cathandle, 1); + loghandle = llog_cat_current_log(cathandle, &reccookie->lgc_lgl, 1); if (IS_ERR(loghandle)) RETURN(PTR_ERR(loghandle)); rc = llog_write_rec(loghandle, rec, reccookie, 1, buf, -1); diff --git a/lustre/obdclass/llog_ioctl.c b/lustre/obdclass/llog_ioctl.c index 48b9cc1..f6b4b17 100644 --- a/lustre/obdclass/llog_ioctl.c +++ b/lustre/obdclass/llog_ioctl.c @@ -436,11 +436,10 @@ int llog_catalog_list(struct obd_device *obd, int count, if (!idarray) RETURN(-ENOMEM); + mutex_down(&obd->obd_olg.olg_cat_processing); rc = llog_get_cat_list(obd, obd, name, 0, count, idarray); - if (rc) { - OBD_FREE(idarray, size); - RETURN(rc); - } + if (rc) + GOTO(out, rc); out = data->ioc_bulk; remains = data->ioc_inllen1; @@ -456,8 +455,12 @@ int llog_catalog_list(struct obd_device *obd, int count, break; } } +out: + /* release semaphore */ + mutex_up(&obd->obd_olg.olg_cat_processing); + OBD_VFREE(idarray, size); - RETURN(0); + RETURN(rc); } EXPORT_SYMBOL(llog_catalog_list); diff --git a/lustre/obdclass/llog_lvfs.c b/lustre/obdclass/llog_lvfs.c index f2a5cd8..eb07423 100644 --- a/lustre/obdclass/llog_lvfs.c +++ b/lustre/obdclass/llog_lvfs.c @@ -831,7 +831,7 @@ int llog_put_cat_list(struct obd_device *obd, struct obd_device *disk_obd, loff_t off = idx * sizeof(*idarray); if (!count) - return (0); + GOTO(out1, rc = 0); push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); file = filp_open(name, O_RDWR | O_CREAT | O_LARGEFILE, 0700); @@ -855,15 +855,17 @@ int llog_put_cat_list(struct obd_device *obd, struct obd_device *disk_obd, GOTO(out, rc); } - out: +out: pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); if (file && !IS_ERR(file)) rc1 = filp_close(file, 0); if (rc == 0) rc = rc1; +out1: RETURN(rc); } +EXPORT_SYMBOL(llog_put_cat_list); struct llog_operations llog_lvfs_ops = { lop_write_rec: llog_lvfs_write_rec, diff --git a/lustre/obdclass/llog_obd.c b/lustre/obdclass/llog_obd.c index 0542aa0..2a9a4c4 100644 --- a/lustre/obdclass/llog_obd.c +++ b/lustre/obdclass/llog_obd.c @@ -414,12 +414,17 @@ int llog_cat_initialize(struct obd_device *obd, struct obd_llog_group *olg, int rc; ENTRY; + mutex_down(&olg->olg_cat_processing); rc = llog_get_cat_list(obd, obd, name, idx, 1, &idarray); if (rc) { CERROR("rc: %d\n", rc); GOTO(out, rc); } + CDEBUG(D_INFO, "init llog for %s/%d - catid "LPX64"/"LPX64"/%x\n", + uuid->uuid, idx, idarray.lci_logid.lgl_oid, + idarray.lci_logid.lgl_ogr, idarray.lci_logid.lgl_ogen); + rc = obd_llog_init(obd, olg, obd, 1, &idarray, uuid); if (rc) { CERROR("rc: %d\n", rc); @@ -433,6 +438,8 @@ int llog_cat_initialize(struct obd_device *obd, struct obd_llog_group *olg, } out: + mutex_up(&olg->olg_cat_processing); + RETURN(rc); } EXPORT_SYMBOL(llog_cat_initialize); diff --git a/lustre/ptlrpc/llog_server.c b/lustre/ptlrpc/llog_server.c index 93c03ec..974b8b8 100644 --- a/lustre/ptlrpc/llog_server.c +++ b/lustre/ptlrpc/llog_server.c @@ -575,6 +575,7 @@ static int llog_catinfo_deletions(struct obd_device *obd, char *buf, if (!idarray) GOTO(release_ctxt, rc = -ENOMEM); + mutex_down(&obd->obd_olg.olg_cat_processing); rc = llog_get_cat_list(obd, obd, name, 0, count, idarray); if (rc) GOTO(out_free, rc); @@ -620,6 +621,9 @@ static int llog_catinfo_deletions(struct obd_device *obd, char *buf, out_pop: pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL); out_free: + /* release semphore */ + mutex_up(&obd->obd_olg.olg_cat_processing); + OBD_VFREE(idarray, size); release_ctxt: llog_ctxt_put(ctxt);