jh = journal_start(osd_journal(dev), oh->ot_credits >> 1);
if (!IS_ERR(jh)) {
- oh->ot_handle = jh;
- jh->h_sync = th->th_sync;
- /* add commit callback */
lu_context_init(&th->th_ctx, LCT_TX_HANDLE);
lu_context_enter(&th->th_ctx);
- journal_callback_set(jh, osd_trans_commit_cb,
- (struct journal_callback *)&oh->ot_jcb);
+ oh->ot_handle = jh;
LASSERT(oti->oti_txns == 0);
LASSERT(oti->oti_r_locks == 0);
LASSERT(oti->oti_w_locks == 0);
ENTRY;
oh = container_of0(th, struct osd_thandle, ot_super);
+
+ /* see comments in osd_declare_punch() */
+ if (oh->ot_alloc_sem_obj) {
+ /* XXX: we don't grab reference on the object - hope it's OK */
+ up_write(&oh->ot_alloc_sem_obj->oo_inode->i_alloc_sem);
+ oh->ot_alloc_sem_obj = NULL;
+ }
+
if (oh->ot_handle != NULL) {
handle_t *hdl = oh->ot_handle;
+ hdl->h_sync = th->th_sync;
+
+ /*
+ * add commit callback
+ * notice we don't do this in osd_trans_start()
+ * as underlying transaction can change during truncate
+ */
+ journal_callback_set(hdl, osd_trans_commit_cb,
+ (struct journal_callback *)&oh->ot_jcb);
+
LASSERT(oti->oti_txns == 1);
oti->oti_txns--;
LASSERT(oti->oti_r_locks == 0);
return 0;
}
+static int osd_declare_punch(const struct lu_env *env, struct dt_object *dt,
+ __u64 start, __u64 end, struct thandle *th)
+{
+ struct osd_object *oo = osd_dt_obj(dt);
+ struct osd_thandle *oh;
+ ENTRY;
+
+ LASSERT(th);
+ oh = container_of(th, struct osd_thandle, ot_super);
+
+ OSD_DECLARE_OP(oh, punch);
+
+ /*
+ * we don't need to reserve credits for whole truncate
+ * it's not possible as truncate may need to free too many
+ * blocks and that won't fit a single trunsaction. instead
+ * we reserve credits to change i_size and put inode onto
+ * orphan list. if needed truncate will extend or restart
+ * transaction
+ */
+ oh->ot_credits += osd_dto_credits_noquota[DTO_ATTR_SET_BASE];
+ oh->ot_credits += 3;
+
+ /*
+ * write path uses the following locking order:
+ * i_alloc_sem, lockpage, journal_start
+ * we have to do same. IOW, we have to grab i_alloc_sem before
+ * transaction is started, thus we grab it here
+ */
+ LASSERT(oh->ot_alloc_sem_obj == NULL);
+ oh->ot_alloc_sem_obj = oo;
+ down_write(&oo->oo_inode->i_alloc_sem);
+
+ RETURN(0);
+}
+
+
+static int osd_punch(const struct lu_env *env, struct dt_object *dt,
+ __u64 start, __u64 end, struct thandle *th,
+ struct lustre_capa *capa)
+{
+ struct osd_thandle *oh;
+ struct osd_object *obj = osd_dt_obj(dt);
+ handle_t *h;
+ tid_t tid;
+ int rc, rc2 = 0;
+ ENTRY;
+
+ LASSERT(end == OBD_OBJECT_EOF);
+ LASSERT(dt_object_exists(dt));
+ LASSERT(osd_invariant(obj));
+
+ LASSERT(th);
+ oh = container_of(th, struct osd_thandle, ot_super);
+ LASSERT(oh->ot_handle->h_transaction != NULL);
+
+ OSD_EXEC_OP(th, punch);
+
+ tid = oh->ot_handle->h_transaction->t_tid;
+
+ rc = vmtruncate(obj->oo_inode, start);
+
+ h = journal_current_handle();
+ LASSERT(h != NULL);
+ LASSERT(h == oh->ot_handle);
+
+ if (tid != h->h_transaction->t_tid) {
+ /*
+ * transaction has changed during truncate
+ * we need to restart the handle with our credits
+ */
+ CERROR("transaction has changed: %lu -> %lu\n",
+ (unsigned long) tid,
+ (unsigned long) h->h_transaction->t_tid);
+ if (h->h_buffer_credits > oh->ot_credits) {
+ if (journal_extend(h, oh->ot_credits))
+ rc2 = journal_restart(h, oh->ot_credits);
+ }
+ }
+
+ RETURN(rc == 0 ? rc2 : 0);
+}
+
/*
* Object creation.
*
.do_attr_get = osd_attr_get,
.do_declare_attr_set = osd_declare_attr_set,
.do_attr_set = osd_attr_set,
+ .do_declare_punch = osd_declare_punch,
+ .do_punch = osd_punch,
.do_ah_init = osd_ah_init,
.do_declare_create = osd_declare_object_create,
.do_create = osd_object_create,
.do_attr_get = osd_attr_get,
.do_declare_attr_set = osd_declare_attr_set,
.do_attr_set = osd_attr_set,
+ .do_declare_punch = osd_declare_punch,
+ .do_punch = osd_punch,
.do_ah_init = osd_ah_init,
.do_declare_create = osd_declare_object_create,
.do_create = osd_object_ea_create,
}
rc = i;
+ /* Filter truncate first locks i_mutex then partally truncated
+ * page, filter write code first locks pages then take
+ * i_mutex. To avoid a deadlock in case of concurrent
+ * punch/write requests from one client, filter writes and
+ * filter truncates are serialized by i_alloc_sem, allowing
+ * multiple writes or single truncate. */
+ down_read(&obj->oo_inode->i_alloc_sem);
+
cleanup:
RETURN(rc);
}
static int osd_put_bufs(const struct lu_env *env, struct dt_object *dt,
struct niobuf_local *lb, int npages)
{
- int i;
+ struct osd_object *obj = osd_dt_obj(dt);
+ int i;
+
+ up_read(&obj->oo_inode->i_alloc_sem);
for (i = 0; i < npages; i++) {
if (lb[i].page == NULL)
}
static int osd_declare_write_commit(const struct lu_env *env, struct dt_object *dt,
- struct niobuf_local *lb, int npages, struct thandle *thandle)
+ struct niobuf_local *lb, int npages, struct thandle *handle)
{
+ struct osd_thandle *oh;
+ int extents = 1;
+ int i;
+
+ LASSERT(handle != NULL);
+ oh = container_of0(handle, struct osd_thandle, ot_super);
+ LASSERT(oh->ot_handle == NULL);
+
+ /* allocate each block in different group (bitmap + gd) */
+ oh->ot_credits += npages * 2;
+
+ /* calculate number of extents (probably better to pass nb) */
+ for (i = 1; i < npages; i++)
+ if (lb[i].file_offset != lb[i-1].file_offset + lb[i-1].len)
+ extents++;
+
+ /* each extent can go into new leaf causing a split */
+ /* 5 is max tree depth, 2 is bitmap + gd */
+ oh->ot_credits += (5 * (2 + 1)) * extents;
+
RETURN(0);
}