Whamcloud - gitweb
b=17353
authoryury <yury>
Tue, 21 Oct 2008 13:53:47 +0000 (13:53 +0000)
committeryury <yury>
Tue, 21 Oct 2008 13:53:47 +0000 (13:53 +0000)
r=shadow,wangdi

- fixes wrong work with llog in catalog that led to removing alive object on ost in recovery time;
- cleanups.

lustre/ldlm/ldlm_lockd.c
lustre/lov/lov_log.c
lustre/mdd/mdd_lov.c
lustre/mdd/mdd_object.c
lustre/mds/mds_log.c
lustre/obdclass/llog_cat.c
lustre/obdclass/llog_obd.c
lustre/obdfilter/filter.c
lustre/obdfilter/filter_log.c
lustre/obdfilter/filter_lvb.c

index c12c3c6..08f841e 100644 (file)
@@ -713,10 +713,10 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
                 LDLM_ERROR(lock, "enqueue wait took %luus from "CFS_TIME_T,
                            total_enqueue_wait, lock->l_enqueued_time.tv_sec);
 
                 LDLM_ERROR(lock, "enqueue wait took %luus from "CFS_TIME_T,
                            total_enqueue_wait, lock->l_enqueued_time.tv_sec);
 
-         req = ptlrpc_request_alloc(lock->l_export->exp_imp_reverse,
+        req = ptlrpc_request_alloc(lock->l_export->exp_imp_reverse,
                                     &RQF_LDLM_CP_CALLBACK);
                                     &RQF_LDLM_CP_CALLBACK);
-         if (req == NULL)
-                 RETURN(-ENOMEM);
+        if (req == NULL)
+                RETURN(-ENOMEM);
 
         lock_res_and_lock(lock);
         if (lock->l_resource->lr_lvb_len)
 
         lock_res_and_lock(lock);
         if (lock->l_resource->lr_lvb_len)
index e7210af..1c47d99 100644 (file)
@@ -68,9 +68,9 @@
  * we need to keep cookies in stripe order, even if some are NULL, so that
  * the right cookies are passed back to the right OSTs at the client side.
  * Unset cookies should be all-zero (which will never occur naturally). */
  * we need to keep cookies in stripe order, even if some are NULL, so that
  * the right cookies are passed back to the right OSTs at the client side.
  * Unset cookies should be all-zero (which will never occur naturally). */
-static int lov_llog_origin_add(struct llog_ctxt *ctxt,
-                        struct llog_rec_hdr *rec, struct lov_stripe_md *lsm,
-                        struct llog_cookie *logcookies, int numcookies)
+static int lov_llog_origin_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec,
+                               struct lov_stripe_md *lsm, 
+                               struct llog_cookie *logcookies, int numcookies)
 {
         struct obd_device *obd = ctxt->loc_obd;
         struct lov_obd *lov = &obd->u.lov;
 {
         struct obd_device *obd = ctxt->loc_obd;
         struct lov_obd *lov = &obd->u.lov;
index 23c68c9..0cd36e1 100644 (file)
@@ -597,9 +597,10 @@ int mdd_log_op_unlink(struct obd_device *obd,
         llog_ctxt_put(ctxt);
 
         OBD_FREE(lur, sizeof(*lur));
         llog_ctxt_put(ctxt);
 
         OBD_FREE(lur, sizeof(*lur));
+        GOTO(out, rc);
 out:
         obd_free_memmd(mds->mds_osc_exp, &lsm);
 out:
         obd_free_memmd(mds->mds_osc_exp, &lsm);
-        RETURN(rc);
+        return rc;
 }
 
 int mdd_unlink_log(const struct lu_env *env, struct mdd_device *mdd,
 }
 
 int mdd_unlink_log(const struct lu_env *env, struct mdd_device *mdd,
index 2503006..96425cd 100644 (file)
@@ -106,9 +106,11 @@ struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
         }
         if (unlikely(mti->mti_max_cookie == NULL)) {
                 OBD_ALLOC(mti->mti_max_cookie, max_cookie_size);
         }
         if (unlikely(mti->mti_max_cookie == NULL)) {
                 OBD_ALLOC(mti->mti_max_cookie, max_cookie_size);
-                if (unlikely(mti->mti_max_cookie != NULL))
+                if (likely(mti->mti_max_cookie != NULL))
                         mti->mti_max_cookie_size = max_cookie_size;
         }
                         mti->mti_max_cookie_size = max_cookie_size;
         }
+        if (likely(mti->mti_max_cookie != NULL))
+                memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
         return mti->mti_max_cookie;
 }
 
         return mti->mti_max_cookie;
 }
 
index 60bd5d4..0e8447f 100644 (file)
@@ -55,9 +55,9 @@
 #include <lustre_log.h>
 #include "mds_internal.h"
 
 #include <lustre_log.h>
 #include "mds_internal.h"
 
-static int mds_llog_origin_add(struct llog_ctxt *ctxt,
-                        struct llog_rec_hdr *rec, struct lov_stripe_md *lsm,
-                        struct llog_cookie *logcookies, int numcookies)
+static int mds_llog_origin_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec,
+                               struct lov_stripe_md *lsm,
+                               struct llog_cookie *logcookies, int numcookies)
 {
         struct obd_device *obd = ctxt->loc_obd;
         struct obd_device *lov_obd = obd->u.mds.mds_osc_obd;
 {
         struct obd_device *obd = ctxt->loc_obd;
         struct obd_device *lov_obd = obd->u.mds.mds_osc_obd;
index 1d2101c..dafcb23 100644 (file)
@@ -63,8 +63,7 @@
  *
  * Assumes caller has already pushed us into the kernel context and is locking.
  */
  *
  * Assumes caller has already pushed us into the kernel context and is locking.
  */
-static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle,
-                                            struct llog_logid *lid)
+static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle)
 {
         struct llog_handle *loghandle;
         struct llog_log_hdr *llh;
 {
         struct llog_handle *loghandle;
         struct llog_log_hdr *llh;
@@ -86,10 +85,7 @@ static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle,
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_LLOG_CREATE_FAILED))
                 RETURN(ERR_PTR(-ENOSPC));
 
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_LLOG_CREATE_FAILED))
                 RETURN(ERR_PTR(-ENOSPC));
 
-        if (lid != NULL && lid->lgl_oid == 0)
-                lid = NULL;
-
-        rc = llog_create(cathandle->lgh_ctxt, &loghandle, lid, NULL);
+        rc = llog_create(cathandle->lgh_ctxt, &loghandle, NULL, NULL);
         if (rc)
                 RETURN(ERR_PTR(rc));
 
         if (rc)
                 RETURN(ERR_PTR(rc));
 
@@ -230,7 +226,6 @@ enum {
  * NOTE: loghandle is write-locked upon successful return
  */
 static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle,
  * NOTE: loghandle is write-locked upon successful return
  */
 static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle,
-                                                struct llog_logid *lid,
                                                 int create)
 {
         struct llog_handle *loghandle = NULL;
                                                 int create)
 {
         struct llog_handle *loghandle = NULL;
@@ -245,7 +240,6 @@ static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle,
                         up_read(&cathandle->lgh_lock);
                         RETURN(loghandle);
                 } else {
                         up_read(&cathandle->lgh_lock);
                         RETURN(loghandle);
                 } else {
-                        lid = NULL;
                         up_write(&loghandle->lgh_lock);
                 }
         }
                         up_write(&loghandle->lgh_lock);
                 }
         }
@@ -269,13 +263,12 @@ static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle,
                         up_write(&cathandle->lgh_lock);
                         RETURN(loghandle);
                 } else {
                         up_write(&cathandle->lgh_lock);
                         RETURN(loghandle);
                 } else {
-                        lid = NULL;
                         up_write(&loghandle->lgh_lock);
                 }
         }
 
         CDEBUG(D_INODE, "creating new log\n");
                         up_write(&loghandle->lgh_lock);
                 }
         }
 
         CDEBUG(D_INODE, "creating new log\n");
-        loghandle = llog_cat_new_log(cathandle, lid);
+        loghandle = llog_cat_new_log(cathandle);
         if (!IS_ERR(loghandle))
                 down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
         up_write(&cathandle->lgh_lock);
         if (!IS_ERR(loghandle))
                 down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
         up_write(&cathandle->lgh_lock);
@@ -295,7 +288,7 @@ int llog_cat_add_rec(struct llog_handle *cathandle, struct llog_rec_hdr *rec,
         ENTRY;
 
         LASSERT(rec->lrh_len <= LLOG_CHUNK_SIZE);
         ENTRY;
 
         LASSERT(rec->lrh_len <= LLOG_CHUNK_SIZE);
-        loghandle = llog_cat_current_log(cathandle, &reccookie->lgc_lgl, 1);
+        loghandle = llog_cat_current_log(cathandle, 1);
         if (IS_ERR(loghandle))
                 RETURN(PTR_ERR(loghandle));
         /* loghandle is already locked by llog_cat_current_log() for us */
         if (IS_ERR(loghandle))
                 RETURN(PTR_ERR(loghandle));
         /* loghandle is already locked by llog_cat_current_log() for us */
@@ -303,7 +296,7 @@ int llog_cat_add_rec(struct llog_handle *cathandle, struct llog_rec_hdr *rec,
         up_write(&loghandle->lgh_lock);
         if (rc == -ENOSPC) {
                 /* to create a new plain log */
         up_write(&loghandle->lgh_lock);
         if (rc == -ENOSPC) {
                 /* to create a new plain log */
-                loghandle = llog_cat_current_log(cathandle, &reccookie->lgc_lgl, 1);
+                loghandle = llog_cat_current_log(cathandle, 1);
                 if (IS_ERR(loghandle))
                         RETURN(PTR_ERR(loghandle));
                 rc = llog_write_rec(loghandle, rec, reccookie, 1, buf, -1);
                 if (IS_ERR(loghandle))
                         RETURN(PTR_ERR(loghandle));
                 rc = llog_write_rec(loghandle, rec, reccookie, 1, buf, -1);
@@ -463,14 +456,14 @@ int llog_cat_process_thread(void *data)
          * Make sure that all cached data is sent. 
          */
         llog_sync(ctxt, NULL);
          * Make sure that all cached data is sent. 
          */
         llog_sync(ctxt, NULL);
-        EXIT;
+        GOTO(release_llh, rc);
 release_llh:
         rc = llog_cat_put(llh);
         if (rc)
                 CERROR("llog_cat_put() failed %d\n", rc);
 out:
         llog_ctxt_put(ctxt);
 release_llh:
         rc = llog_cat_put(llh);
         if (rc)
                 CERROR("llog_cat_put() failed %d\n", rc);
 out:
         llog_ctxt_put(ctxt);
-        OBD_FREE(args, sizeof(*args));
+        OBD_FREE_PTR(args);
         return rc;
 }
 EXPORT_SYMBOL(llog_cat_process_thread);
         return rc;
 }
 EXPORT_SYMBOL(llog_cat_process_thread);
@@ -571,49 +564,3 @@ out:
 
         RETURN(0);
 }
 
         RETURN(0);
 }
-
-#if 0
-/* Assumes caller has already pushed us into the kernel context. */
-int llog_cat_init(struct llog_handle *cathandle, struct obd_uuid *tgtuuid)
-{
-        struct llog_log_hdr *llh;
-        loff_t offset = 0;
-        int rc = 0;
-        ENTRY;
-
-        LASSERT(sizeof(*llh) == LLOG_CHUNK_SIZE);
-
-        down(&cathandle->lgh_lock);
-        llh = cathandle->lgh_hdr;
-
-        if (i_size_read(cathandle->lgh_file->f_dentry->d_inode) == 0) {
-                llog_write_rec(cathandle, &llh->llh_hdr, NULL, 0, NULL, 0);
-
-write_hdr:
-                rc = lustre_fwrite(cathandle->lgh_file, llh, LLOG_CHUNK_SIZE,
-                                   &offset);
-                if (rc != LLOG_CHUNK_SIZE) {
-                        CERROR("error writing catalog header: rc %d\n", rc);
-                        OBD_FREE(llh, sizeof(*llh));
-                        if (rc >= 0)
-                                rc = -ENOSPC;
-                } else
-                        rc = 0;
-        } else {
-                rc = lustre_fread(cathandle->lgh_file, llh, LLOG_CHUNK_SIZE,
-                                  &offset);
-                if (rc != LLOG_CHUNK_SIZE) {
-                        CERROR("error reading catalog header: rc %d\n", rc);
-                        /* Can we do much else if the header is bad? */
-                        goto write_hdr;
-                } else
-                        rc = 0;
-        }
-
-        cathandle->lgh_tgtuuid = &llh->llh_tgtuuid;
-        up(&cathandle->lgh_lock);
-        RETURN(rc);
-}
-EXPORT_SYMBOL(llog_cat_init);
-
-#endif
index 2a9a4c4..77c28eb 100644 (file)
@@ -210,8 +210,8 @@ int llog_sync(struct llog_ctxt *ctxt, struct obd_export *exp)
 EXPORT_SYMBOL(llog_sync);
 
 int llog_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec,
 EXPORT_SYMBOL(llog_sync);
 
 int llog_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec,
-                struct lov_stripe_md *lsm, struct llog_cookie *logcookies,
-                int numcookies)
+             struct lov_stripe_md *lsm, struct llog_cookie *logcookies,
+             int numcookies)
 {
         int raised, rc;
         ENTRY;
 {
         int raised, rc;
         ENTRY;
@@ -421,8 +421,8 @@ int llog_cat_initialize(struct obd_device *obd, struct obd_llog_group *olg,
                 GOTO(out, rc);
         }
 
                 GOTO(out, rc);
         }
 
-        CDEBUG(D_INFO, "init llog for %s/%d - catid "LPX64"/"LPX64"/%x\n",
-               uuid->uuid, idx, idarray.lci_logid.lgl_oid,
+        CDEBUG(D_INFO, "%s: Init llog for %s/%d - catid "LPX64"/"LPX64":%x\n",
+               obd->obd_name, uuid->uuid, idx, idarray.lci_logid.lgl_oid,
                idarray.lci_logid.lgl_ogr, idarray.lci_logid.lgl_ogen);
 
         rc = obd_llog_init(obd, olg, obd, 1, &idarray, uuid);
                idarray.lci_logid.lgl_ogr, idarray.lci_logid.lgl_ogen);
 
         rc = obd_llog_init(obd, olg, obd, 1, &idarray, uuid);
index 8c7fb1c..03cd85a 100644 (file)
@@ -2438,10 +2438,9 @@ static int filter_llog_connect(struct obd_export *exp,
         int rc;
         ENTRY;
 
         int rc;
         ENTRY;
 
-        CDEBUG(D_OTHER, "handle connect for %s: %u/%u/%u\n", obd->obd_name,
-                (unsigned) body->lgdc_logid.lgl_ogr,
-                (unsigned) body->lgdc_logid.lgl_oid,
-                (unsigned) body->lgdc_logid.lgl_ogen);
+        CDEBUG(D_OTHER, "%s: LLog connect for: "LPX64"/"LPX64":%x\n", 
+               obd->obd_name, body->lgdc_logid.lgl_oid,
+               body->lgdc_logid.lgl_ogr, body->lgdc_logid.lgl_ogen);
 
         olg = filter_find_olg(obd, body->lgdc_logid.lgl_ogr);
         if (!olg) {
 
         olg = filter_find_olg(obd, body->lgdc_logid.lgl_ogr);
         if (!olg) {
@@ -2454,6 +2453,11 @@ static int filter_llog_connect(struct obd_export *exp,
         ctxt = llog_group_get_ctxt(olg, body->lgdc_ctxt_idx);
         LASSERTF(ctxt != NULL, "ctxt is not null, ctxt idx %d \n",
                  body->lgdc_ctxt_idx);
         ctxt = llog_group_get_ctxt(olg, body->lgdc_ctxt_idx);
         LASSERTF(ctxt != NULL, "ctxt is not null, ctxt idx %d \n",
                  body->lgdc_ctxt_idx);
+
+        CWARN("%s: Recovery from log "LPX64"/"LPX64":%x\n",
+              obd->obd_name, body->lgdc_logid.lgl_oid, 
+              body->lgdc_logid.lgl_ogr, body->lgdc_logid.lgl_ogen);
+
         rc = llog_connect(ctxt, 1, &body->lgdc_logid,
                           &body->lgdc_gen, NULL);
         llog_ctxt_put(ctxt);
         rc = llog_connect(ctxt, 1, &body->lgdc_logid,
                           &body->lgdc_gen, NULL);
         llog_ctxt_put(ctxt);
@@ -3663,8 +3667,6 @@ static int filter_precreate(struct obd_device *obd, struct obdo *oa,
                 } else
                         next_id = filter_last_id(filter, group) + 1;
 
                 } else
                         next_id = filter_last_id(filter, group) + 1;
 
-                CDEBUG(D_INFO, "precreate objid "LPU64"\n", next_id);
-
                 dparent = filter_parent_lock(obd, group, next_id);
                 if (IS_ERR(dparent))
                         GOTO(cleanup, rc = PTR_ERR(dparent));
                 dparent = filter_parent_lock(obd, group, next_id);
                 if (IS_ERR(dparent))
                         GOTO(cleanup, rc = PTR_ERR(dparent));
@@ -3710,6 +3712,10 @@ static int filter_precreate(struct obd_device *obd, struct obdo *oa,
                         GOTO(cleanup, rc = PTR_ERR(handle));
                 cleanup_phase = 3;
 
                         GOTO(cleanup, rc = PTR_ERR(handle));
                 cleanup_phase = 3;
 
+                CDEBUG(D_INODE, "%s: filter_precreate(od->o_gr="LPU64
+                       ",od->o_id="LPU64")\n", obd->obd_name, group, 
+                       next_id);
+
                 /* We mark object SUID+SGID to flag it for accepting UID+GID
                  * from client on first write.  Currently the permission bits
                  * on the OST are never used, so this is OK. */
                 /* We mark object SUID+SGID to flag it for accepting UID+GID
                  * from client on first write.  Currently the permission bits
                  * on the OST are never used, so this is OK. */
@@ -3767,21 +3773,23 @@ set_last_id:
 static int filter_create(struct obd_export *exp, struct obdo *oa,
                          struct lov_stripe_md **ea, struct obd_trans_info *oti)
 {
 static int filter_create(struct obd_export *exp, struct obdo *oa,
                          struct lov_stripe_md **ea, struct obd_trans_info *oti)
 {
+        struct obd_device *obd = exp->exp_obd;
         struct filter_export_data *fed;
         struct filter_export_data *fed;
-        struct obd_device *obd = NULL;
         struct filter_obd *filter;
         struct lvfs_run_ctxt saved;
         struct lov_stripe_md *lsm = NULL;
         int rc = 0, diff, group = oa->o_gr;
         ENTRY;
 
         struct filter_obd *filter;
         struct lvfs_run_ctxt saved;
         struct lov_stripe_md *lsm = NULL;
         int rc = 0, diff, group = oa->o_gr;
         ENTRY;
 
+        CDEBUG(D_INODE, "%s: filter_create(od->o_gr="LPU64",od->o_id="
+               LPU64")\n", obd->obd_name, oa->o_gr, oa->o_id);
+
         if (!(oa->o_valid & OBD_MD_FLGROUP) || group == 0) {
                 CERROR("!!! nid %s sent invalid object group %d\n",
                         obd_export_nid2str(exp), group);
                 RETURN(-EINVAL);
         }
 
         if (!(oa->o_valid & OBD_MD_FLGROUP) || group == 0) {
                 CERROR("!!! nid %s sent invalid object group %d\n",
                         obd_export_nid2str(exp), group);
                 RETURN(-EINVAL);
         }
 
-        obd = exp->exp_obd;
         fed = &exp->exp_filter_data;
         filter = &obd->u.filter;
 
         fed = &exp->exp_filter_data;
         filter = &obd->u.filter;
 
@@ -3794,8 +3802,6 @@ static int filter_create(struct obd_export *exp, struct obdo *oa,
                 RETURN(-ENOTUNIQ);
         }
 
                 RETURN(-ENOTUNIQ);
         }
 
-        CDEBUG(D_INFO, "filter_create(od->o_gr="LPU64",od->o_id="LPU64")\n",
-               oa->o_gr, oa->o_id);
         if (ea != NULL) {
                 lsm = *ea;
                 if (lsm == NULL) {
         if (ea != NULL) {
                 lsm = *ea;
                 if (lsm == NULL) {
@@ -3863,6 +3869,9 @@ int filter_destroy(struct obd_export *exp, struct obdo *oa,
         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
         cleanup_phase = 1;
 
         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
         cleanup_phase = 1;
 
+        CDEBUG(D_INODE, "%s: filter_destroy(od->o_gr="LPU64",od->o_id="
+               LPU64")\n", obd->obd_name, oa->o_gr, oa->o_id);
+
         dchild = filter_fid2dentry(obd, NULL, oa->o_gr, oa->o_id);
         if (IS_ERR(dchild))
                 GOTO(cleanup, rc = PTR_ERR(dchild));
         dchild = filter_fid2dentry(obd, NULL, oa->o_gr, oa->o_id);
         if (IS_ERR(dchild))
                 GOTO(cleanup, rc = PTR_ERR(dchild));
index 26c8399..2e1faf0 100644 (file)
@@ -232,7 +232,7 @@ static int filter_recov_log_setattr_cb(struct llog_ctxt *ctxt,
 }
 
 int filter_recov_log_mds_ost_cb(struct llog_handle *llh,
 }
 
 int filter_recov_log_mds_ost_cb(struct llog_handle *llh,
-                               struct llog_rec_hdr *rec, void *data)
+                                struct llog_rec_hdr *rec, void *data)
 {
         struct llog_ctxt *ctxt = llh->lgh_ctxt;
         struct llog_cookie cookie;
 {
         struct llog_ctxt *ctxt = llh->lgh_ctxt;
         struct llog_cookie cookie;
index f9450a9..eed2a34 100644 (file)
@@ -79,6 +79,10 @@ static int filter_lvbo_init(struct ldlm_resource *res)
         obd = res->lr_namespace->ns_lvbp;
         LASSERT(obd != NULL);
 
         obd = res->lr_namespace->ns_lvbp;
         LASSERT(obd != NULL);
 
+        CDEBUG(D_INODE, "%s: filter_lvbo_init(o_gr="LPU64", o_id="
+               LPU64")\n", obd->obd_name, res->lr_name.name[1],
+               res->lr_name.name[0]);
+
         dentry = filter_fid2dentry(obd, NULL, res->lr_name.name[1], 
                                               res->lr_name.name[0]);
         if (IS_ERR(dentry)) {
         dentry = filter_fid2dentry(obd, NULL, res->lr_name.name[1], 
                                               res->lr_name.name[0]);
         if (IS_ERR(dentry)) {