/* the lgh_hdr_lock protects llog header data from concurrent
* update/cancel, the llh_count and llh_bitmap are protected */
- spin_lock(&loghandle->lgh_hdr_lock);
+ down_write(&loghandle->lgh_hdr_lock);
if (ext2_set_bit(index, llh->llh_bitmap)) {
CERROR("%s: index %u already set in log bitmap\n",
o->do_lu.lo_dev->ld_obd->obd_name, index);
- spin_unlock(&loghandle->lgh_hdr_lock);
+ up_write(&loghandle->lgh_hdr_lock);
LBUG(); /* should never happen */
}
llh->llh_count++;
- spin_unlock(&loghandle->lgh_hdr_lock);
+
+ /* XXX It is a bit tricky here, if the log object is local,
+ * we do not need lock during write here, because if there is
+ * race, the transaction(jbd2, what about ZFS?) will make sure the
+ * conflicts will all committed in the same transaction group.
+ * But for remote object, we need lock the whole process, so to
+ * set the version of the remote transaction to make sure they
+ * are being sent in order. (see osp_md_write()) */
+ if (!dt_object_remote(o))
+ up_write(&loghandle->lgh_hdr_lock);
if (lgi->lgi_attr.la_size == 0) {
lgi->lgi_off = 0;
lgi->lgi_buf.lb_buf = &llh->llh_hdr;
rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
if (rc != 0)
- GOTO(out, rc);
+ GOTO(out_remote_unlock, rc);
} else {
/* Note: If this is not initialization (size == 0), then do not
* write the whole header (8k bytes), only update header/tail
lgi->lgi_buf.lb_buf = &llh->llh_count;
rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
if (rc != 0)
- GOTO(out, rc);
+ GOTO(out_remote_unlock, rc);
lgi->lgi_off = offsetof(typeof(*llh),
llh_bitmap[index / (sizeof(*llh->llh_bitmap) * 8)]);
&llh->llh_bitmap[index/(sizeof(*llh->llh_bitmap)*8)];
rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
if (rc != 0)
- GOTO(out, rc);
+ GOTO(out_remote_unlock, rc);
lgi->lgi_off = offsetof(typeof(*llh), llh_tail);
lgi->lgi_buf.lb_len = sizeof(llh->llh_tail);
lgi->lgi_buf.lb_buf = &llh->llh_tail;
rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
if (rc != 0)
- GOTO(out, rc);
+ GOTO(out_remote_unlock, rc);
}
+out_remote_unlock:
+ /* unlock here for remote object */
+ if (dt_object_remote(o))
+ up_write(&loghandle->lgh_hdr_lock);
+ if (rc)
+ GOTO(out, rc);
+
rc = dt_attr_get(env, o, &lgi->lgi_attr);
if (rc)
GOTO(out, rc);
RETURN(rc);
out:
/* cleanup llog for error case */
- spin_lock(&loghandle->lgh_hdr_lock);
+ down_write(&loghandle->lgh_hdr_lock);
ext2_clear_bit(index, llh->llh_bitmap);
llh->llh_count--;
- spin_unlock(&loghandle->lgh_hdr_lock);
+ up_write(&loghandle->lgh_hdr_lock);
/* restore llog last_idx */
loghandle->lgh_last_idx--;