Whamcloud - gitweb
LU-3534 osp: send updates by separate thread
[fs/lustre-release.git] / lustre / obdclass / llog_osd.c
index e81fb9e..ad843e9 100644 (file)
@@ -507,15 +507,24 @@ static int llog_osd_write_rec(const struct lu_env *env,
 
        /* the lgh_hdr_lock protects llog header data from concurrent
         * update/cancel, the llh_count and llh_bitmap are protected */
-       spin_lock(&loghandle->lgh_hdr_lock);
+       down_write(&loghandle->lgh_hdr_lock);
        if (ext2_set_bit(index, llh->llh_bitmap)) {
                CERROR("%s: index %u already set in log bitmap\n",
                       o->do_lu.lo_dev->ld_obd->obd_name, index);
-               spin_unlock(&loghandle->lgh_hdr_lock);
+               up_write(&loghandle->lgh_hdr_lock);
                LBUG(); /* should never happen */
        }
        llh->llh_count++;
-       spin_unlock(&loghandle->lgh_hdr_lock);
+
+       /* XXX It is a bit tricky here, if the log object is local,
+        * we do not need lock during write here, because if there is
+        * race, the transaction(jbd2, what about ZFS?) will make sure the
+        * conflicts will all committed in the same transaction group.
+        * But for remote object, we need lock the whole process, so to
+        * set the version of the remote transaction to make sure they
+        * are being sent in order. (see osp_md_write()) */
+       if (!dt_object_remote(o))
+               up_write(&loghandle->lgh_hdr_lock);
 
        if (lgi->lgi_attr.la_size == 0) {
                lgi->lgi_off = 0;
@@ -523,7 +532,7 @@ static int llog_osd_write_rec(const struct lu_env *env,
                lgi->lgi_buf.lb_buf = &llh->llh_hdr;
                rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
                if (rc != 0)
-                       GOTO(out, rc);
+                       GOTO(out_remote_unlock, rc);
        } else {
                /* Note: If this is not initialization (size == 0), then do not
                 * write the whole header (8k bytes), only update header/tail
@@ -538,7 +547,7 @@ static int llog_osd_write_rec(const struct lu_env *env,
                lgi->lgi_buf.lb_buf = &llh->llh_count;
                rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
                if (rc != 0)
-                       GOTO(out, rc);
+                       GOTO(out_remote_unlock, rc);
 
                lgi->lgi_off = offsetof(typeof(*llh),
                        llh_bitmap[index / (sizeof(*llh->llh_bitmap) * 8)]);
@@ -547,16 +556,23 @@ static int llog_osd_write_rec(const struct lu_env *env,
                        &llh->llh_bitmap[index/(sizeof(*llh->llh_bitmap)*8)];
                rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
                if (rc != 0)
-                       GOTO(out, rc);
+                       GOTO(out_remote_unlock, rc);
 
                lgi->lgi_off = offsetof(typeof(*llh), llh_tail);
                lgi->lgi_buf.lb_len = sizeof(llh->llh_tail);
                lgi->lgi_buf.lb_buf = &llh->llh_tail;
                rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
                if (rc != 0)
-                       GOTO(out, rc);
+                       GOTO(out_remote_unlock, rc);
        }
 
+out_remote_unlock:
+       /* unlock here for remote object */
+       if (dt_object_remote(o))
+               up_write(&loghandle->lgh_hdr_lock);
+       if (rc)
+               GOTO(out, rc);
+
        rc = dt_attr_get(env, o, &lgi->lgi_attr);
        if (rc)
                GOTO(out, rc);
@@ -586,10 +602,10 @@ static int llog_osd_write_rec(const struct lu_env *env,
        RETURN(rc);
 out:
        /* cleanup llog for error case */
-       spin_lock(&loghandle->lgh_hdr_lock);
+       down_write(&loghandle->lgh_hdr_lock);
        ext2_clear_bit(index, llh->llh_bitmap);
        llh->llh_count--;
-       spin_unlock(&loghandle->lgh_hdr_lock);
+       up_write(&loghandle->lgh_hdr_lock);
 
        /* restore llog last_idx */
        loghandle->lgh_last_idx--;