Whamcloud - gitweb
LU-6846 llog: create remote llog synchronously 33/16333/5
authorwang di <di.wang@intel.com>
Mon, 7 Sep 2015 15:27:34 +0000 (08:27 -0700)
committerOleg Drokin <oleg.drokin@intel.com>
Sat, 19 Sep 2015 03:25:04 +0000 (03:25 +0000)
Create remote llog synchronously, because llog_create
for remote object only pack the RPC in the buffer,
instead the real llog object will be created until
transaction stop. If other thread happens to use
this llog object and send RPC before the creation,
which might cause the failure.

Signed-off-by: wang di <di.wang@intel.com>
Change-Id: I6c806174381b87836b1f6dd833cda50f0ab2d168
Reviewed-on: http://review.whamcloud.com/16333
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: James Simmons <uja.ornl@yahoo.com>
Reviewed-by: Lai Siyao <lai.siyao@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/obdclass/llog.c
lustre/obdclass/llog_cat.c
lustre/obdclass/llog_osd.c

index 6d1e07c..95c93f5 100644 (file)
@@ -202,6 +202,8 @@ int llog_cancel_rec(const struct lu_env *env, struct llog_handle *loghandle,
        struct llog_log_hdr     *llh = loghandle->lgh_hdr;
        struct thandle          *th;
        int                      rc;
        struct llog_log_hdr     *llh = loghandle->lgh_hdr;
        struct thandle          *th;
        int                      rc;
+       int rc1;
+       bool subtract_count = false;
 
        ENTRY;
 
 
        ENTRY;
 
@@ -244,17 +246,15 @@ int llog_cancel_rec(const struct lu_env *env, struct llog_handle *loghandle,
        }
 
        loghandle->lgh_hdr->llh_count--;
        }
 
        loghandle->lgh_hdr->llh_count--;
+       subtract_count = true;
        /* Pass this index to llog_osd_write_rec(), which will use the index
         * to only update the necesary bitmap. */
        lgi->lgi_cookie.lgc_index = index;
        /* update header */
        rc = llog_write_rec(env, loghandle, &llh->llh_hdr, &lgi->lgi_cookie,
                            LLOG_HEADER_IDX, th);
        /* Pass this index to llog_osd_write_rec(), which will use the index
         * to only update the necesary bitmap. */
        lgi->lgi_cookie.lgc_index = index;
        /* update header */
        rc = llog_write_rec(env, loghandle, &llh->llh_hdr, &lgi->lgi_cookie,
                            LLOG_HEADER_IDX, th);
-       if (rc != 0) {
-               loghandle->lgh_hdr->llh_count++;
-               ext2_set_bit(index, LLOG_HDR_BITMAP(llh));
+       if (rc != 0)
                GOTO(out_unlock, rc);
                GOTO(out_unlock, rc);
-       }
 
        if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
            (llh->llh_count == 1) &&
 
        if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
            (llh->llh_count == 1) &&
@@ -270,7 +270,7 @@ int llog_cancel_rec(const struct lu_env *env, struct llog_handle *loghandle,
                               loghandle->lgh_ctxt->loc_obd->obd_name,
                               POSTID(&loghandle->lgh_id.lgl_oi),
                               loghandle->lgh_id.lgl_ogen, rc);
                               loghandle->lgh_ctxt->loc_obd->obd_name,
                               POSTID(&loghandle->lgh_id.lgl_oi),
                               loghandle->lgh_id.lgl_ogen, rc);
-                       GOTO(out_unlock, rc = 0);
+                       GOTO(out_unlock, rc);
                }
                rc = LLOG_DEL_PLAIN;
        }
                }
                rc = LLOG_DEL_PLAIN;
        }
@@ -279,7 +279,15 @@ out_unlock:
        mutex_unlock(&loghandle->lgh_hdr_mutex);
        up_write(&loghandle->lgh_lock);
 out_trans:
        mutex_unlock(&loghandle->lgh_hdr_mutex);
        up_write(&loghandle->lgh_lock);
 out_trans:
-       dt_trans_stop(env, dt, th);
+       rc1 = dt_trans_stop(env, dt, th);
+       if (rc == 0)
+               rc = rc1;
+       if (rc < 0 && subtract_count) {
+               mutex_lock(&loghandle->lgh_hdr_mutex);
+               loghandle->lgh_hdr->llh_count++;
+               ext2_set_bit(index, LLOG_HDR_BITMAP(llh));
+               mutex_unlock(&loghandle->lgh_hdr_mutex);
+       }
        RETURN(rc);
 }
 
        RETURN(rc);
 }
 
index 981af6f..c6b4122 100644 (file)
@@ -66,6 +66,8 @@ static int llog_cat_new_log(const struct lu_env *env,
        struct llog_thread_info *lgi = llog_info(env);
        struct llog_logid_rec   *rec = &lgi->lgi_logid;
        int                      rc;
        struct llog_thread_info *lgi = llog_info(env);
        struct llog_logid_rec   *rec = &lgi->lgi_logid;
        int                      rc;
+       struct thandle *handle = NULL;
+       struct dt_device *dt = NULL;
 
        ENTRY;
 
 
        ENTRY;
 
@@ -80,21 +82,57 @@ static int llog_cat_new_log(const struct lu_env *env,
                loghandle->lgh_hdr = NULL;
        }
 
                loghandle->lgh_hdr = NULL;
        }
 
+       if (th == NULL) {
+               dt = lu2dt_dev(cathandle->lgh_obj->do_lu.lo_dev);
+
+               handle = dt_trans_create(env, dt);
+               if (IS_ERR(handle))
+                       RETURN(PTR_ERR(handle));
+
+               /* Create update llog object synchronously, which
+                * happens during inialization process see
+                * lod_sub_prep_llog(), to make sure the update
+                * llog object is created before corss-MDT writing
+                * updates into the llog object */
+               if (cathandle->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID)
+                       handle->th_sync = 1;
+
+               handle->th_wait_submit = 1;
+
+               rc = llog_declare_create(env, loghandle, handle);
+               if (rc != 0)
+                       GOTO(out, rc);
+
+               rec->lid_hdr.lrh_len = sizeof(*rec);
+               rec->lid_hdr.lrh_type = LLOG_LOGID_MAGIC;
+               rec->lid_id = loghandle->lgh_id;
+               rc = llog_declare_write_rec(env, cathandle, &rec->lid_hdr, -1,
+                                           handle);
+               if (rc != 0)
+                       GOTO(out, rc);
+
+               rc = dt_trans_start_local(env, dt, handle);
+               if (rc != 0)
+                       GOTO(out, rc);
+
+               th = handle;
+       }
+
        rc = llog_create(env, loghandle, th);
        /* if llog is already created, no need to initialize it */
        if (rc == -EEXIST) {
        rc = llog_create(env, loghandle, th);
        /* if llog is already created, no need to initialize it */
        if (rc == -EEXIST) {
-               RETURN(0);
+               GOTO(out, rc = 0);
        } else if (rc != 0) {
                CERROR("%s: can't create new plain llog in catalog: rc = %d\n",
                       loghandle->lgh_ctxt->loc_obd->obd_name, rc);
        } else if (rc != 0) {
                CERROR("%s: can't create new plain llog in catalog: rc = %d\n",
                       loghandle->lgh_ctxt->loc_obd->obd_name, rc);
-               RETURN(rc);
+               GOTO(out, rc);
        }
 
        rc = llog_init_handle(env, loghandle,
                              LLOG_F_IS_PLAIN | LLOG_F_ZAP_WHEN_EMPTY,
                              &cathandle->lgh_hdr->llh_tgtuuid);
        }
 
        rc = llog_init_handle(env, loghandle,
                              LLOG_F_IS_PLAIN | LLOG_F_ZAP_WHEN_EMPTY,
                              &cathandle->lgh_hdr->llh_tgtuuid);
-       if (rc)
-               GOTO(out_destroy, rc);
+       if (rc < 0)
+               GOTO(out, rc);
 
        /* build the record for this log in the catalog */
        rec->lid_hdr.lrh_len = sizeof(*rec);
 
        /* build the record for this log in the catalog */
        rec->lid_hdr.lrh_len = sizeof(*rec);
@@ -106,7 +144,7 @@ static int llog_cat_new_log(const struct lu_env *env,
        rc = llog_write_rec(env, cathandle, &rec->lid_hdr,
                            &loghandle->u.phd.phd_cookie, LLOG_NEXT_IDX, th);
        if (rc < 0)
        rc = llog_write_rec(env, cathandle, &rec->lid_hdr,
                            &loghandle->u.phd.phd_cookie, LLOG_NEXT_IDX, th);
        if (rc < 0)
-               GOTO(out_destroy, rc);
+               GOTO(out, rc);
 
        CDEBUG(D_OTHER, "new recovery log "DOSTID":%x for index %u of catalog"
               DOSTID"\n", POSTID(&loghandle->lgh_id.lgl_oi),
 
        CDEBUG(D_OTHER, "new recovery log "DOSTID":%x for index %u of catalog"
               DOSTID"\n", POSTID(&loghandle->lgh_id.lgl_oi),
@@ -114,10 +152,11 @@ static int llog_cat_new_log(const struct lu_env *env,
               POSTID(&cathandle->lgh_id.lgl_oi));
 
        loghandle->lgh_hdr->llh_cat_idx = rec->lid_hdr.lrh_index;
               POSTID(&cathandle->lgh_id.lgl_oi));
 
        loghandle->lgh_hdr->llh_cat_idx = rec->lid_hdr.lrh_index;
+out:
+       if (handle != NULL)
+               dt_trans_stop(env, dt, handle);
+
        RETURN(0);
        RETURN(0);
-out_destroy:
-       llog_destroy(env, loghandle);
-       RETURN(rc);
 }
 
 /* Open an existent log handle and add it to the open list.
 }
 
 /* Open an existent log handle and add it to the open list.
@@ -402,11 +441,20 @@ int llog_cat_declare_add_rec(const struct lu_env *env,
        lirec->lid_hdr.lrh_len = sizeof(*lirec);
 
        if (!llog_exist(cathandle->u.chd.chd_current_log)) {
        lirec->lid_hdr.lrh_len = sizeof(*lirec);
 
        if (!llog_exist(cathandle->u.chd.chd_current_log)) {
-               rc = llog_declare_create(env, cathandle->u.chd.chd_current_log,
-                                        th);
-               if (rc)
-                       GOTO(out, rc);
-               llog_declare_write_rec(env, cathandle, &lirec->lid_hdr, -1, th);
+               if (dt_object_remote(cathandle->lgh_obj)) {
+                       /* If it is remote cat-llog here, let's create the
+                        * remote llog object synchronously, so other threads
+                        * can use it correctly. */
+                       rc = llog_cat_new_log(env, cathandle,
+                                       cathandle->u.chd.chd_current_log, NULL);
+               } else {
+                       rc = llog_declare_create(env,
+                                       cathandle->u.chd.chd_current_log, th);
+                       if (rc)
+                               GOTO(out, rc);
+                       llog_declare_write_rec(env, cathandle,
+                                              &lirec->lid_hdr, -1, th);
+               }
        }
        /* declare records in the llogs */
        rc = llog_declare_write_rec(env, cathandle->u.chd.chd_current_log,
        }
        /* declare records in the llogs */
        rc = llog_declare_write_rec(env, cathandle->u.chd.chd_current_log,
@@ -417,9 +465,17 @@ int llog_cat_declare_add_rec(const struct lu_env *env,
        next = cathandle->u.chd.chd_next_log;
        if (next) {
                if (!llog_exist(next)) {
        next = cathandle->u.chd.chd_next_log;
        if (next) {
                if (!llog_exist(next)) {
-                       rc = llog_declare_create(env, next, th);
-                       llog_declare_write_rec(env, cathandle, &lirec->lid_hdr,
-                                              -1, th);
+                       if (dt_object_remote(cathandle->lgh_obj)) {
+                               /* If it is remote cat-llog here, let's create
+                                * the remote remote llog object synchronously,
+                                * so other threads can use it correctly. */
+                               rc = llog_cat_new_log(env, cathandle, next,
+                                                     NULL);
+                       } else {
+                               rc = llog_declare_create(env, next, th);
+                               llog_declare_write_rec(env, cathandle,
+                                               &lirec->lid_hdr, -1, th);
+                       }
                }
                /* XXX: we hope for declarations made for existing llog
                 *      this might be not correct with some backends
                }
                /* XXX: we hope for declarations made for existing llog
                 *      this might be not correct with some backends
index 7913294..53c9d3b 100644 (file)
@@ -1112,6 +1112,7 @@ static int llog_osd_open(const struct lu_env *env, struct llog_handle *handle,
        dt = ctxt->loc_exp->exp_obd->obd_lvfs_ctxt.dt;
        LASSERT(dt);
        if (ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) {
        dt = ctxt->loc_exp->exp_obd->obd_lvfs_ctxt.dt;
        LASSERT(dt);
        if (ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) {
+               struct lu_object_conf conf = { 0 };
                if (logid != NULL) {
                        logid_to_fid(logid, &lgi->lgi_fid);
                } else {
                if (logid != NULL) {
                        logid_to_fid(logid, &lgi->lgi_fid);
                } else {
@@ -1122,9 +1123,11 @@ static int llog_osd_open(const struct lu_env *env, struct llog_handle *handle,
                        if (rc < 0)
                                RETURN(rc);
                        rc = 0;
                        if (rc < 0)
                                RETURN(rc);
                        rc = 0;
+                       conf.loc_flags = LOC_F_NEW;
                }
 
                }
 
-               o = dt_locate(env, dt, &lgi->lgi_fid);
+               o = dt_locate_at(env, dt, &lgi->lgi_fid,
+                                dt->dd_lu_dev.ld_site->ls_top_dev, &conf);
                if (IS_ERR(o))
                        RETURN(PTR_ERR(o));
 
                if (IS_ERR(o))
                        RETURN(PTR_ERR(o));