Whamcloud - gitweb
LU-6602 osp: change lgh_hdr_lock to mutex 74/15274/8
authorwang di <di.wang@intel.com>
Tue, 7 Jul 2015 18:11:16 +0000 (14:11 -0400)
committerOleg Drokin <oleg.drokin@intel.com>
Thu, 16 Jul 2015 03:32:50 +0000 (03:32 +0000)
Change lgh_hdr_lock from spinlock to mutex, because in
osp_md_write_obj(), it might be blocked by memory
allocation.

Reorganize the llog_cache_rec process, so lgh_hdr_mutex
will protect both setbit and update llog header, to avoid
conflicts between add records and cancel llog records.

Signed-off-by: wang di <di.wang@intel.com>
Change-Id: Ic12d7d768546c67a858cb235929de8ae6aeaa1aa
Reviewed-on: http://review.whamcloud.com/15274
Tested-by: Jenkins
Reviewed-by: James Simmons <uja.ornl@yahoo.com>
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Lai Siyao <lai.siyao@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/include/lustre_log.h
lustre/obdclass/llog.c
lustre/obdclass/llog_osd.c

index 27573c8..458cc2e 100644 (file)
@@ -292,7 +292,7 @@ struct llog_operations {
 /* In-memory descriptor for a log object or log catalog */
 struct llog_handle {
        struct rw_semaphore      lgh_lock;
-       struct rw_semaphore      lgh_hdr_lock; /* protect lgh_hdr data */
+       struct mutex             lgh_hdr_mutex; /* protect lgh_hdr data */
        struct llog_logid        lgh_id; /* id of this log */
        struct llog_log_hdr     *lgh_hdr;
        size_t                  lgh_hdr_size;
index 56413ef..489e851 100644 (file)
@@ -65,7 +65,7 @@ static struct llog_handle *llog_alloc_handle(void)
                return NULL;
 
        init_rwsem(&loghandle->lgh_lock);
-       init_rwsem(&loghandle->lgh_hdr_lock);
+       mutex_init(&loghandle->lgh_hdr_mutex);
        INIT_LIST_HEAD(&loghandle->u.phd.phd_entry);
        atomic_set(&loghandle->lgh_refcount, 1);
 
@@ -104,6 +104,57 @@ void llog_handle_put(struct llog_handle *loghandle)
                llog_free_handle(loghandle);
 }
 
+static int llog_cancel_rec_internal(const struct lu_env *env,
+                                   struct llog_handle *loghandle, int index)
+{
+       struct dt_device        *dt;
+       struct llog_log_hdr     *llh = loghandle->lgh_hdr;
+       struct thandle          *th;
+       int                      rc;
+
+       ENTRY;
+
+       LASSERT(loghandle);
+       LASSERT(loghandle->lgh_ctxt);
+       LASSERT(loghandle->lgh_obj != NULL);
+
+       dt = lu2dt_dev(loghandle->lgh_obj->do_lu.lo_dev);
+
+       th = dt_trans_create(env, dt);
+       if (IS_ERR(th))
+               RETURN(PTR_ERR(th));
+
+       rc = llog_declare_write_rec(env, loghandle, &llh->llh_hdr, index, th);
+       if (rc < 0)
+               GOTO(out_trans, rc);
+
+       th->th_wait_submit = 1;
+       rc = dt_trans_start_local(env, dt, th);
+       if (rc < 0)
+               GOTO(out_trans, rc);
+
+       down_write(&loghandle->lgh_lock);
+       /* clear bitmap */
+       mutex_lock(&loghandle->lgh_hdr_mutex);
+       if (!ext2_clear_bit(index, LLOG_HDR_BITMAP(llh))) {
+               CDEBUG(D_RPCTRACE, "Catalog index %u already clear?\n", index);
+               GOTO(out_unlock, rc);
+       }
+       /* update header */
+       rc = llog_write_rec(env, loghandle, &llh->llh_hdr, NULL,
+                           LLOG_HEADER_IDX, th);
+       if (rc == 0)
+               loghandle->lgh_hdr->llh_count--;
+       else
+               ext2_set_bit(index, LLOG_HDR_BITMAP(llh));
+out_unlock:
+       mutex_unlock(&loghandle->lgh_hdr_mutex);
+       up_write(&loghandle->lgh_lock);
+out_trans:
+       dt_trans_stop(env, dt, th);
+       RETURN(rc);
+}
+
 /* returns negative on error; 0 if success; 1 if success & log destroyed */
 int llog_cancel_rec(const struct lu_env *env, struct llog_handle *loghandle,
                    int index)
@@ -120,48 +171,36 @@ int llog_cancel_rec(const struct lu_env *env, struct llog_handle *loghandle,
                 RETURN(-EINVAL);
         }
 
-       down_write(&loghandle->lgh_hdr_lock);
-       if (!ext2_clear_bit(index, LLOG_HDR_BITMAP(llh))) {
-               up_write(&loghandle->lgh_hdr_lock);
-               CDEBUG(D_RPCTRACE, "Catalog index %u already clear?\n", index);
-               RETURN(-ENOENT);
+       rc = llog_cancel_rec_internal(env, loghandle, index);
+       if (rc < 0) {
+               CERROR("%s: fail to write header for llog #"DOSTID
+                      "#%08x: rc = %d\n",
+                      loghandle->lgh_ctxt->loc_obd->obd_name,
+                      POSTID(&loghandle->lgh_id.lgl_oi),
+                      loghandle->lgh_id.lgl_ogen, rc);
+               RETURN(rc);
        }
 
-       llh->llh_count--;
-
        if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
            (llh->llh_count == 1) &&
            (loghandle->lgh_last_idx == LLOG_HDR_BITMAP_SIZE(llh) - 1)) {
-               up_write(&loghandle->lgh_hdr_lock);
                rc = llog_destroy(env, loghandle);
                if (rc < 0) {
+                       /* Sigh, can not destroy the final plain llog, but
+                        * the bitmap has been clearly, so the record can not
+                        * be accessed anymore, let's return 0 for now, and
+                        * the orphan will be handled by LFSCK. */
                        CERROR("%s: can't destroy empty llog #"DOSTID
                               "#%08x: rc = %d\n",
                               loghandle->lgh_ctxt->loc_obd->obd_name,
                               POSTID(&loghandle->lgh_id.lgl_oi),
                               loghandle->lgh_id.lgl_ogen, rc);
-                       GOTO(out_err, rc);
+                       RETURN(0);
                }
                RETURN(LLOG_DEL_PLAIN);
        }
-       up_write(&loghandle->lgh_hdr_lock);
 
-       rc = llog_write(env, loghandle, &llh->llh_hdr, LLOG_HEADER_IDX);
-       if (rc < 0) {
-               CERROR("%s: fail to write header for llog #"DOSTID
-                      "#%08x: rc = %d\n",
-                      loghandle->lgh_ctxt->loc_obd->obd_name,
-                      POSTID(&loghandle->lgh_id.lgl_oi),
-                      loghandle->lgh_id.lgl_ogen, rc);
-               GOTO(out_err, rc);
-       }
        RETURN(0);
-out_err:
-       down_write(&loghandle->lgh_hdr_lock);
-       ext2_set_bit(index, LLOG_HDR_BITMAP(llh));
-       llh->llh_count++;
-       up_write(&loghandle->lgh_hdr_lock);
-       return rc;
 }
 
 static int llog_read_header(const struct lu_env *env,
index a79b1ec..b823185 100644 (file)
@@ -523,35 +523,27 @@ static int llog_osd_write_rec(const struct lu_env *env,
        lrt->lrt_len = rec->lrh_len;
        lrt->lrt_index = rec->lrh_index;
 
-       /* the lgh_hdr_lock protects llog header data from concurrent
+       /* the lgh_hdr_mutex protects llog header data from concurrent
         * update/cancel, the llh_count and llh_bitmap are protected */
-       down_write(&loghandle->lgh_hdr_lock);
+       mutex_lock(&loghandle->lgh_hdr_mutex);
        if (ext2_set_bit(index, LLOG_HDR_BITMAP(llh))) {
                CERROR("%s: index %u already set in log bitmap\n",
                       o->do_lu.lo_dev->ld_obd->obd_name, index);
-               up_write(&loghandle->lgh_hdr_lock);
+               mutex_unlock(&loghandle->lgh_hdr_mutex);
                LBUG(); /* should never happen */
        }
        llh->llh_count++;
 
-       /* XXX It is a bit tricky here, if the log object is local,
-        * we do not need lock during write here, because if there is
-        * race, the transaction(jbd2, what about ZFS?) will make sure the
-        * conflicts will all committed in the same transaction group.
-        * But for remote object, we need lock the whole process, so to
-        * set the version of the remote transaction to make sure they
-        * are being sent in order. (see osp_md_write()) */
-       if (!dt_object_remote(o))
-               up_write(&loghandle->lgh_hdr_lock);
-
        if (lgi->lgi_attr.la_size == 0) {
                lgi->lgi_off = 0;
                lgi->lgi_buf.lb_len = llh->llh_hdr.lrh_len;
                lgi->lgi_buf.lb_buf = &llh->llh_hdr;
                rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
                if (rc != 0)
-                       GOTO(out_remote_unlock, rc);
+                       GOTO(out_unlock, rc);
        } else {
+               __u32   *bitmap = LLOG_HDR_BITMAP(llh);
+
                /* Note: If this is not initialization (size == 0), then do not
                 * write the whole header (8k bytes), only update header/tail
                 * and bits needs to be updated. Because this update might be
@@ -565,16 +557,15 @@ static int llog_osd_write_rec(const struct lu_env *env,
                lgi->lgi_buf.lb_buf = &llh->llh_count;
                rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
                if (rc != 0)
-                       GOTO(out_remote_unlock, rc);
+                       GOTO(out_unlock, rc);
 
-               lgi->lgi_off = offsetof(typeof(*llh),
-                       llh_bitmap[index / (sizeof(*llh->llh_bitmap) * 8)]);
-               lgi->lgi_buf.lb_len = sizeof(*llh->llh_bitmap);
-               lgi->lgi_buf.lb_buf =
-                       &llh->llh_bitmap[index/(sizeof(*llh->llh_bitmap)*8)];
+               lgi->lgi_off = llh->llh_bitmap_offset +
+                             (index / (sizeof(*bitmap) * 8)) * sizeof(*bitmap);
+               lgi->lgi_buf.lb_len = sizeof(*bitmap);
+               lgi->lgi_buf.lb_buf = &bitmap[index/(sizeof(*bitmap)*8)];
                rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
                if (rc != 0)
-                       GOTO(out_remote_unlock, rc);
+                       GOTO(out_unlock, rc);
 
                lgi->lgi_off =  (unsigned long)LLOG_HDR_TAIL(llh) -
                                (unsigned long)llh;
@@ -582,13 +573,12 @@ static int llog_osd_write_rec(const struct lu_env *env,
                lgi->lgi_buf.lb_buf = LLOG_HDR_TAIL(llh);
                rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
                if (rc != 0)
-                       GOTO(out_remote_unlock, rc);
+                       GOTO(out_unlock, rc);
        }
 
-out_remote_unlock:
+out_unlock:
        /* unlock here for remote object */
-       if (dt_object_remote(o))
-               up_write(&loghandle->lgh_hdr_lock);
+       mutex_unlock(&loghandle->lgh_hdr_mutex);
        if (rc)
                GOTO(out, rc);
 
@@ -622,10 +612,10 @@ out_remote_unlock:
        RETURN(rc);
 out:
        /* cleanup llog for error case */
-       down_write(&loghandle->lgh_hdr_lock);
+       mutex_lock(&loghandle->lgh_hdr_mutex);
        ext2_clear_bit(index, LLOG_HDR_BITMAP(llh));
        llh->llh_count--;
-       up_write(&loghandle->lgh_hdr_lock);
+       mutex_unlock(&loghandle->lgh_hdr_mutex);
 
        /* restore llog last_idx */
        loghandle->lgh_last_idx--;