/* In-memory descriptor for a log object or log catalog */
struct llog_handle {
struct rw_semaphore lgh_lock;
- struct rw_semaphore lgh_hdr_lock; /* protect lgh_hdr data */
+ struct mutex lgh_hdr_mutex; /* protect lgh_hdr data */
struct llog_logid lgh_id; /* id of this log */
struct llog_log_hdr *lgh_hdr;
size_t lgh_hdr_size;
return NULL;
init_rwsem(&loghandle->lgh_lock);
- init_rwsem(&loghandle->lgh_hdr_lock);
+ mutex_init(&loghandle->lgh_hdr_mutex);
INIT_LIST_HEAD(&loghandle->u.phd.phd_entry);
atomic_set(&loghandle->lgh_refcount, 1);
llog_free_handle(loghandle);
}
+static int llog_cancel_rec_internal(const struct lu_env *env,
+ struct llog_handle *loghandle, int index)
+{
+ struct dt_device *dt;
+ struct llog_log_hdr *llh = loghandle->lgh_hdr;
+ struct thandle *th;
+ int rc;
+
+ ENTRY;
+
+ LASSERT(loghandle);
+ LASSERT(loghandle->lgh_ctxt);
+ LASSERT(loghandle->lgh_obj != NULL);
+
+ dt = lu2dt_dev(loghandle->lgh_obj->do_lu.lo_dev);
+
+ th = dt_trans_create(env, dt);
+ if (IS_ERR(th))
+ RETURN(PTR_ERR(th));
+
+ rc = llog_declare_write_rec(env, loghandle, &llh->llh_hdr, index, th);
+ if (rc < 0)
+ GOTO(out_trans, rc);
+
+ th->th_wait_submit = 1;
+ rc = dt_trans_start_local(env, dt, th);
+ if (rc < 0)
+ GOTO(out_trans, rc);
+
+ down_write(&loghandle->lgh_lock);
+ /* clear bitmap */
+ mutex_lock(&loghandle->lgh_hdr_mutex);
+ if (!ext2_clear_bit(index, LLOG_HDR_BITMAP(llh))) {
+ CDEBUG(D_RPCTRACE, "Catalog index %u already clear?\n", index);
+ GOTO(out_unlock, rc);
+ }
+ /* update header */
+ rc = llog_write_rec(env, loghandle, &llh->llh_hdr, NULL,
+ LLOG_HEADER_IDX, th);
+ if (rc == 0)
+ loghandle->lgh_hdr->llh_count--;
+ else
+ ext2_set_bit(index, LLOG_HDR_BITMAP(llh));
+out_unlock:
+ mutex_unlock(&loghandle->lgh_hdr_mutex);
+ up_write(&loghandle->lgh_lock);
+out_trans:
+ dt_trans_stop(env, dt, th);
+ RETURN(rc);
+}
+
/* returns negative on error; 0 if success; 1 if success & log destroyed */
int llog_cancel_rec(const struct lu_env *env, struct llog_handle *loghandle,
int index)
RETURN(-EINVAL);
}
- down_write(&loghandle->lgh_hdr_lock);
- if (!ext2_clear_bit(index, LLOG_HDR_BITMAP(llh))) {
- up_write(&loghandle->lgh_hdr_lock);
- CDEBUG(D_RPCTRACE, "Catalog index %u already clear?\n", index);
- RETURN(-ENOENT);
+ rc = llog_cancel_rec_internal(env, loghandle, index);
+ if (rc < 0) {
+ CERROR("%s: fail to write header for llog #"DOSTID
+ "#%08x: rc = %d\n",
+ loghandle->lgh_ctxt->loc_obd->obd_name,
+ POSTID(&loghandle->lgh_id.lgl_oi),
+ loghandle->lgh_id.lgl_ogen, rc);
+ RETURN(rc);
}
- llh->llh_count--;
-
if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
(llh->llh_count == 1) &&
(loghandle->lgh_last_idx == LLOG_HDR_BITMAP_SIZE(llh) - 1)) {
- up_write(&loghandle->lgh_hdr_lock);
rc = llog_destroy(env, loghandle);
if (rc < 0) {
+ /* Sigh, can not destroy the final plain llog, but
+ * the bitmap has been clearly, so the record can not
+ * be accessed anymore, let's return 0 for now, and
+ * the orphan will be handled by LFSCK. */
CERROR("%s: can't destroy empty llog #"DOSTID
"#%08x: rc = %d\n",
loghandle->lgh_ctxt->loc_obd->obd_name,
POSTID(&loghandle->lgh_id.lgl_oi),
loghandle->lgh_id.lgl_ogen, rc);
- GOTO(out_err, rc);
+ RETURN(0);
}
RETURN(LLOG_DEL_PLAIN);
}
- up_write(&loghandle->lgh_hdr_lock);
- rc = llog_write(env, loghandle, &llh->llh_hdr, LLOG_HEADER_IDX);
- if (rc < 0) {
- CERROR("%s: fail to write header for llog #"DOSTID
- "#%08x: rc = %d\n",
- loghandle->lgh_ctxt->loc_obd->obd_name,
- POSTID(&loghandle->lgh_id.lgl_oi),
- loghandle->lgh_id.lgl_ogen, rc);
- GOTO(out_err, rc);
- }
RETURN(0);
-out_err:
- down_write(&loghandle->lgh_hdr_lock);
- ext2_set_bit(index, LLOG_HDR_BITMAP(llh));
- llh->llh_count++;
- up_write(&loghandle->lgh_hdr_lock);
- return rc;
}
static int llog_read_header(const struct lu_env *env,
lrt->lrt_len = rec->lrh_len;
lrt->lrt_index = rec->lrh_index;
- /* the lgh_hdr_lock protects llog header data from concurrent
+ /* the lgh_hdr_mutex protects llog header data from concurrent
* update/cancel, the llh_count and llh_bitmap are protected */
- down_write(&loghandle->lgh_hdr_lock);
+ mutex_lock(&loghandle->lgh_hdr_mutex);
if (ext2_set_bit(index, LLOG_HDR_BITMAP(llh))) {
CERROR("%s: index %u already set in log bitmap\n",
o->do_lu.lo_dev->ld_obd->obd_name, index);
- up_write(&loghandle->lgh_hdr_lock);
+ mutex_unlock(&loghandle->lgh_hdr_mutex);
LBUG(); /* should never happen */
}
llh->llh_count++;
- /* XXX It is a bit tricky here, if the log object is local,
- * we do not need lock during write here, because if there is
- * race, the transaction(jbd2, what about ZFS?) will make sure the
- * conflicts will all committed in the same transaction group.
- * But for remote object, we need lock the whole process, so to
- * set the version of the remote transaction to make sure they
- * are being sent in order. (see osp_md_write()) */
- if (!dt_object_remote(o))
- up_write(&loghandle->lgh_hdr_lock);
-
if (lgi->lgi_attr.la_size == 0) {
lgi->lgi_off = 0;
lgi->lgi_buf.lb_len = llh->llh_hdr.lrh_len;
lgi->lgi_buf.lb_buf = &llh->llh_hdr;
rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
if (rc != 0)
- GOTO(out_remote_unlock, rc);
+ GOTO(out_unlock, rc);
} else {
+ __u32 *bitmap = LLOG_HDR_BITMAP(llh);
+
/* Note: If this is not initialization (size == 0), then do not
* write the whole header (8k bytes), only update header/tail
* and bits needs to be updated. Because this update might be
lgi->lgi_buf.lb_buf = &llh->llh_count;
rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
if (rc != 0)
- GOTO(out_remote_unlock, rc);
+ GOTO(out_unlock, rc);
- lgi->lgi_off = offsetof(typeof(*llh),
- llh_bitmap[index / (sizeof(*llh->llh_bitmap) * 8)]);
- lgi->lgi_buf.lb_len = sizeof(*llh->llh_bitmap);
- lgi->lgi_buf.lb_buf =
- &llh->llh_bitmap[index/(sizeof(*llh->llh_bitmap)*8)];
+ lgi->lgi_off = llh->llh_bitmap_offset +
+ (index / (sizeof(*bitmap) * 8)) * sizeof(*bitmap);
+ lgi->lgi_buf.lb_len = sizeof(*bitmap);
+ lgi->lgi_buf.lb_buf = &bitmap[index/(sizeof(*bitmap)*8)];
rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
if (rc != 0)
- GOTO(out_remote_unlock, rc);
+ GOTO(out_unlock, rc);
lgi->lgi_off = (unsigned long)LLOG_HDR_TAIL(llh) -
(unsigned long)llh;
lgi->lgi_buf.lb_buf = LLOG_HDR_TAIL(llh);
rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
if (rc != 0)
- GOTO(out_remote_unlock, rc);
+ GOTO(out_unlock, rc);
}
-out_remote_unlock:
+out_unlock:
/* unlock here for remote object */
- if (dt_object_remote(o))
- up_write(&loghandle->lgh_hdr_lock);
+ mutex_unlock(&loghandle->lgh_hdr_mutex);
if (rc)
GOTO(out, rc);
RETURN(rc);
out:
/* cleanup llog for error case */
- down_write(&loghandle->lgh_hdr_lock);
+ mutex_lock(&loghandle->lgh_hdr_mutex);
ext2_clear_bit(index, LLOG_HDR_BITMAP(llh));
llh->llh_count--;
- up_write(&loghandle->lgh_hdr_lock);
+ mutex_unlock(&loghandle->lgh_hdr_mutex);
/* restore llog last_idx */
loghandle->lgh_last_idx--;