Whamcloud - gitweb
make CATALOG processing more safe.
authorshadow <shadow>
Wed, 1 Oct 2008 04:46:03 +0000 (04:46 +0000)
committershadow <shadow>
Wed, 1 Oct 2008 04:46:03 +0000 (04:46 +0000)
Branch HEAD
b=17157
i=johann
i=wangdi

lustre/include/lustre_log.h
lustre/include/obd.h
lustre/mds/mds_lov.c
lustre/obdclass/llog_cat.c
lustre/obdclass/llog_ioctl.c
lustre/obdclass/llog_lvfs.c
lustre/obdclass/llog_obd.c
lustre/ptlrpc/llog_server.c

index d1b8e82..9f6972a 100644 (file)
@@ -250,6 +250,9 @@ int llog_get_cat_list(struct obd_device *obd, struct obd_device *disk_obd,
                       char *name, int idx, int count,
                       struct llog_catid *idarray);
 
+int llog_put_cat_list(struct obd_device *obd, struct obd_device *disk_obd,
+                      char *name, int idx, int count, struct llog_catid *idarray);
+
 struct llog_ctxt {
         int                      loc_idx; /* my index the obd array of ctxt's */
         struct llog_gen          loc_gen;
@@ -410,6 +413,7 @@ static inline void llog_group_init(struct obd_llog_group *olg, int group)
 {
         cfs_waitq_init(&olg->olg_waitq);
         spin_lock_init(&olg->olg_lock);
+        sema_init(&olg->olg_cat_processing, 1);
         olg->olg_group = group;
 }
 
index 01d8c8f..b02fd6b 100644 (file)
@@ -946,6 +946,7 @@ struct obd_llog_group {
         spinlock_t         olg_lock;
         struct obd_export *olg_exp;
         int                olg_initializing;
+        struct semaphore   olg_cat_processing;
 };
 
 /* corresponds to one of the obd's */
index 9073cc8..01fcd1a 100644 (file)
@@ -362,8 +362,8 @@ static int mds_lov_update_desc(struct obd_device *obd, int idx,
         /* Don't change the mds_lov_desc until the objids size matches the
            count (paranoia) */
         mds->mds_lov_desc = *ld;
-        CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d\n",
-               mds->mds_lov_desc.ld_tgt_count);
+        CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d - idx %d / uuid %s\n",
+               mds->mds_lov_desc.ld_tgt_count, idx, uuid->uuid);
 
         stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
                                mds->mds_lov_desc.ld_tgt_count);
index 01089b4..b32d343 100644 (file)
@@ -63,7 +63,8 @@
  *
  * Assumes caller has already pushed us into the kernel context and is locking.
  */
-static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle)
+static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle,
+                                            struct llog_logid *lid)
 {
         struct llog_handle *loghandle;
         struct llog_log_hdr *llh;
@@ -84,11 +85,14 @@ static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle)
 
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_LLOG_CREATE_FAILED))
                 RETURN(ERR_PTR(-ENOSPC));
-        rc = llog_create(cathandle->lgh_ctxt, &loghandle, NULL, NULL);
-        if (rc) 
+
+        if (lid != NULL && lid->lgl_oid == 0)
+                lid = NULL;
+
+        rc = llog_create(cathandle->lgh_ctxt, &loghandle, lid, NULL);
+        if (rc)
                 RETURN(ERR_PTR(rc));
-        
+
         rc = llog_init_handle(loghandle,
                               LLOG_F_IS_PLAIN | LLOG_F_ZAP_WHEN_EMPTY,
                               &cathandle->lgh_hdr->llh_tgtuuid);
@@ -129,13 +133,12 @@ static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle)
         LASSERT(list_empty(&loghandle->u.phd.phd_entry));
         list_add_tail(&loghandle->u.phd.phd_entry, &cathandle->u.chd.chd_head);
 
- out_destroy:
+out_destroy:
         if (rc < 0)
                 llog_destroy(loghandle);
 
         RETURN(loghandle);
 }
-EXPORT_SYMBOL(llog_cat_new_log);
 
 /* Open an existent log handle and add it to the open list.
  * This log handle will be closed when all of the records in it are removed.
@@ -219,6 +222,7 @@ EXPORT_SYMBOL(llog_cat_put);
  * NOTE: loghandle is write-locked upon successful return
  */
 static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle,
+                                                struct llog_logid *lid,
                                                 int create)
 {
         struct llog_handle *loghandle = NULL;
@@ -233,6 +237,7 @@ static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle,
                         up_read(&cathandle->lgh_lock);
                         RETURN(loghandle);
                 } else {
+                        lid = NULL;
                         up_write(&loghandle->lgh_lock);
                 }
         }
@@ -256,12 +261,13 @@ static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle,
                         up_write(&cathandle->lgh_lock);
                         RETURN(loghandle);
                 } else {
+                        lid = NULL;
                         up_write(&loghandle->lgh_lock);
                 }
         }
 
         CDEBUG(D_INODE, "creating new log\n");
-        loghandle = llog_cat_new_log(cathandle);
+        loghandle = llog_cat_new_log(cathandle, lid);
         if (!IS_ERR(loghandle))
                 down_write(&loghandle->lgh_lock);
         up_write(&cathandle->lgh_lock);
@@ -281,7 +287,7 @@ int llog_cat_add_rec(struct llog_handle *cathandle, struct llog_rec_hdr *rec,
         ENTRY;
 
         LASSERT(rec->lrh_len <= LLOG_CHUNK_SIZE);
-        loghandle = llog_cat_current_log(cathandle, 1);
+        loghandle = llog_cat_current_log(cathandle, &reccookie->lgc_lgl, 1);
         if (IS_ERR(loghandle))
                 RETURN(PTR_ERR(loghandle));
         /* loghandle is already locked by llog_cat_current_log() for us */
@@ -289,7 +295,7 @@ int llog_cat_add_rec(struct llog_handle *cathandle, struct llog_rec_hdr *rec,
         up_write(&loghandle->lgh_lock);
         if (rc == -ENOSPC) {
                 /* to create a new plain log */
-                loghandle = llog_cat_current_log(cathandle, 1);
+                loghandle = llog_cat_current_log(cathandle, &reccookie->lgc_lgl, 1);
                 if (IS_ERR(loghandle))
                         RETURN(PTR_ERR(loghandle));
                 rc = llog_write_rec(loghandle, rec, reccookie, 1, buf, -1);
index 48b9cc1..f6b4b17 100644 (file)
@@ -436,11 +436,10 @@ int llog_catalog_list(struct obd_device *obd, int count,
         if (!idarray)
                 RETURN(-ENOMEM);
 
+        mutex_down(&obd->obd_olg.olg_cat_processing);
         rc = llog_get_cat_list(obd, obd, name, 0, count, idarray);
-        if (rc) {
-                OBD_FREE(idarray, size);
-                RETURN(rc);
-        }
+        if (rc)
+                GOTO(out, rc);
 
         out = data->ioc_bulk;
         remains = data->ioc_inllen1;
@@ -456,8 +455,12 @@ int llog_catalog_list(struct obd_device *obd, int count,
                         break;
                 }
         }
+out:
+        /* release semaphore */
+        mutex_up(&obd->obd_olg.olg_cat_processing);
+
         OBD_VFREE(idarray, size);
-        RETURN(0);
+        RETURN(rc);
 
 }
 EXPORT_SYMBOL(llog_catalog_list);
index f2a5cd8..eb07423 100644 (file)
@@ -831,7 +831,7 @@ int llog_put_cat_list(struct obd_device *obd, struct obd_device *disk_obd,
         loff_t off = idx * sizeof(*idarray);
 
         if (!count)
-                return (0);
+                GOTO(out1, rc = 0);
 
         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
         file = filp_open(name, O_RDWR | O_CREAT | O_LARGEFILE, 0700);
@@ -855,15 +855,17 @@ int llog_put_cat_list(struct obd_device *obd, struct obd_device *disk_obd,
                 GOTO(out, rc);
         }
 
- out:
+out:
         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
         if (file && !IS_ERR(file))
                 rc1 = filp_close(file, 0);
 
         if (rc == 0)
                 rc = rc1;
+out1:
         RETURN(rc);
 }
+EXPORT_SYMBOL(llog_put_cat_list);
 
 struct llog_operations llog_lvfs_ops = {
         lop_write_rec:   llog_lvfs_write_rec,
index 0542aa0..2a9a4c4 100644 (file)
@@ -414,12 +414,17 @@ int llog_cat_initialize(struct obd_device *obd, struct obd_llog_group *olg,
         int rc;
         ENTRY;
 
+        mutex_down(&olg->olg_cat_processing);
         rc = llog_get_cat_list(obd, obd, name, idx, 1, &idarray);
         if (rc) {
                 CERROR("rc: %d\n", rc);
                 GOTO(out, rc);
         }
 
+        CDEBUG(D_INFO, "init llog for %s/%d - catid "LPX64"/"LPX64"/%x\n",
+               uuid->uuid, idx, idarray.lci_logid.lgl_oid,
+               idarray.lci_logid.lgl_ogr, idarray.lci_logid.lgl_ogen);
+
         rc = obd_llog_init(obd, olg, obd, 1, &idarray, uuid);
         if (rc) {
                 CERROR("rc: %d\n", rc);
@@ -433,6 +438,8 @@ int llog_cat_initialize(struct obd_device *obd, struct obd_llog_group *olg,
         }
 
  out:
+        mutex_up(&olg->olg_cat_processing);
+
         RETURN(rc);
 }
 EXPORT_SYMBOL(llog_cat_initialize);
index 93c03ec..974b8b8 100644 (file)
@@ -575,6 +575,7 @@ static int llog_catinfo_deletions(struct obd_device *obd, char *buf,
         if (!idarray)
                 GOTO(release_ctxt, rc = -ENOMEM);
 
+        mutex_down(&obd->obd_olg.olg_cat_processing);
         rc = llog_get_cat_list(obd, obd, name, 0, count, idarray);
         if (rc)
                 GOTO(out_free, rc);
@@ -620,6 +621,9 @@ static int llog_catinfo_deletions(struct obd_device *obd, char *buf,
 out_pop:
         pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
 out_free:
+        /* release semphore */
+        mutex_up(&obd->obd_olg.olg_cat_processing);
+
         OBD_VFREE(idarray, size);
 release_ctxt:
         llog_ctxt_put(ctxt);