* Unset cookies should be all-zero (which will never occur naturally). */
static int lov_llog_origin_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec,
void *buf, struct llog_cookie *logcookies,
- int numcookies, void *data)
+ int numcookies, void *data,
+ struct rw_semaphore **lock, int *lock_count)
{
struct obd_device *obd = ctxt->loc_obd;
struct lov_obd *lov = &obd->u.lov;
LASSERT(logcookies && numcookies >= lsm->lsm_stripe_count);
+ /* We need this to serialize llog records between parallel unlinks so
+ * we can replay llog records in strict transno and llog order. If
+ * and when we want to make this more scalable we need to lock and
+ * write records in strictly ost_idx order not lsm order. Consider
+ * file 1 on ost_idx [1, 2, 3, 4] and file 2 on ost_idx [3, 4, 1, 2] */
+ down(&lov->lov_llog_sem);
for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
struct obd_device *child =
- lov->tgts[loi->loi_ost_idx].ltd_exp->exp_obd;
+ lov->tgts[loi->loi_ost_idx].ltd_exp->exp_obd;
struct llog_ctxt *cctxt;
cctxt = llog_get_context(&child->obd_llogs, ctxt->loc_idx);
lur->lur_ogen = loi->loi_gr;
LASSERT(lsm->lsm_object_gr == loi->loi_gr);
rc += llog_add(cctxt, &lur->lur_hdr, NULL, logcookies + rc,
- numcookies - rc, NULL);
+ numcookies - rc, NULL,
+ lock != NULL ? lock + rc : NULL, lock_count);
}
+ up(&lov->lov_llog_sem);
OBD_FREE(lur, sizeof(*lur));
RETURN(rc);