Whamcloud - gitweb
- add commit callback in osd_trans_stop() as underlying transaction may
authoralex <alex>
Sat, 25 Apr 2009 05:03:42 +0000 (05:03 +0000)
committeralex <alex>
Sat, 25 Apr 2009 05:03:42 +0000 (05:03 +0000)
   change in truncate
 - osd_declare_punch() grabs i_alloc_sem to avoid deadlock with write
 - osd_declare_punch() reserves credits
 - osd_punch() calls vmtruncate() directly
 - osd_get_bufs() grabs i_alloc_sem to avoid deadlock with truncate

lustre/osd/osd_handler.c
lustre/osd/osd_internal.h
lustre/osd/osd_io.c

index 42d5401..4677930 100644 (file)
@@ -753,13 +753,9 @@ int osd_trans_start(const struct lu_env *env,
 
         jh = journal_start(osd_journal(dev), oh->ot_credits >> 1);
         if (!IS_ERR(jh)) {
-                oh->ot_handle = jh;
-                jh->h_sync = th->th_sync;
-                /* add commit callback */
                 lu_context_init(&th->th_ctx, LCT_TX_HANDLE);
                 lu_context_enter(&th->th_ctx);
-                journal_callback_set(jh, osd_trans_commit_cb,
-                                (struct journal_callback *)&oh->ot_jcb);
+                oh->ot_handle = jh;
                 LASSERT(oti->oti_txns == 0);
                 LASSERT(oti->oti_r_locks == 0);
                 LASSERT(oti->oti_w_locks == 0);
@@ -791,9 +787,27 @@ static void osd_trans_stop(const struct lu_env *env, struct thandle *th)
         ENTRY;
 
         oh = container_of0(th, struct osd_thandle, ot_super);
+
+        /* see comments in osd_declare_punch() */
+        if (oh->ot_alloc_sem_obj) {
+                /* XXX: we don't grab reference on the object - hope it's OK */
+                up_write(&oh->ot_alloc_sem_obj->oo_inode->i_alloc_sem);
+                oh->ot_alloc_sem_obj = NULL;
+        }
+
         if (oh->ot_handle != NULL) {
                 handle_t *hdl = oh->ot_handle;
 
+                hdl->h_sync = th->th_sync;
+
+                /*
+                 * add commit callback
+                 * notice we don't do this in osd_trans_start()
+                 * as underlying transaction can change during truncate
+                 */
+                journal_callback_set(hdl, osd_trans_commit_cb,
+                                (struct journal_callback *)&oh->ot_jcb);
+
                 LASSERT(oti->oti_txns == 1);
                 oti->oti_txns--;
                 LASSERT(oti->oti_r_locks == 0);
@@ -1378,6 +1392,89 @@ static int osd_inode_setattr(const struct lu_env *env,
         return 0;
 }
 
+static int osd_declare_punch(const struct lu_env *env, struct dt_object *dt,
+                             __u64 start, __u64 end, struct thandle *th)
+{
+        struct osd_object  *oo = osd_dt_obj(dt);
+        struct osd_thandle *oh;
+        ENTRY;
+
+        LASSERT(th);       
+        oh = container_of(th, struct osd_thandle, ot_super);
+
+        OSD_DECLARE_OP(oh, punch);
+
+        /* 
+         * we don't need to reserve credits for whole truncate
+         * it's not possible as truncate may need to free too many
+         * blocks and that won't fit a single trunsaction. instead
+         * we reserve credits to change i_size and put inode onto
+         * orphan list. if needed truncate will extend or restart
+         * transaction
+         */
+        oh->ot_credits += osd_dto_credits_noquota[DTO_ATTR_SET_BASE];
+        oh->ot_credits += 3;
+
+        /*
+         * write path uses the following locking order:
+         *   i_alloc_sem, lockpage, journal_start
+         * we have to do same. IOW, we have to grab i_alloc_sem before
+         * transaction is started, thus we grab it here
+         */
+        LASSERT(oh->ot_alloc_sem_obj == NULL);
+        oh->ot_alloc_sem_obj = oo;
+        down_write(&oo->oo_inode->i_alloc_sem);
+
+        RETURN(0);
+}
+
+
+static int osd_punch(const struct lu_env *env, struct dt_object *dt,
+                     __u64 start, __u64 end, struct thandle *th,
+                     struct lustre_capa *capa)
+{
+        struct osd_thandle *oh;
+        struct osd_object  *obj = osd_dt_obj(dt);
+        handle_t           *h;
+        tid_t               tid;
+        int                 rc, rc2 = 0;
+        ENTRY;
+
+        LASSERT(end == OBD_OBJECT_EOF);
+        LASSERT(dt_object_exists(dt));
+        LASSERT(osd_invariant(obj));
+
+        LASSERT(th);       
+        oh = container_of(th, struct osd_thandle, ot_super);
+        LASSERT(oh->ot_handle->h_transaction != NULL);
+
+        OSD_EXEC_OP(th, punch);
+
+        tid = oh->ot_handle->h_transaction->t_tid;
+
+        rc = vmtruncate(obj->oo_inode, start);
+
+        h = journal_current_handle();
+        LASSERT(h != NULL);
+        LASSERT(h == oh->ot_handle);
+        
+        if (tid != h->h_transaction->t_tid) {
+                /*
+                 * transaction has changed during truncate
+                 * we need to restart the handle with our credits
+                 */
+                CERROR("transaction has changed: %lu -> %lu\n",
+                       (unsigned long) tid,
+                       (unsigned long) h->h_transaction->t_tid);
+                if (h->h_buffer_credits > oh->ot_credits) {
+                        if (journal_extend(h, oh->ot_credits))
+                                rc2 = journal_restart(h, oh->ot_credits);
+                }
+        }
+
+        RETURN(rc == 0 ? rc2 : 0);
+}
+
 /*
  * Object creation.
  *
@@ -2316,6 +2413,8 @@ static const struct dt_object_operations osd_obj_ops = {
         .do_attr_get          = osd_attr_get,
         .do_declare_attr_set  = osd_declare_attr_set,
         .do_attr_set          = osd_attr_set,
+        .do_declare_punch     = osd_declare_punch,
+        .do_punch             = osd_punch,
         .do_ah_init           = osd_ah_init,
         .do_declare_create    = osd_declare_object_create,
         .do_create            = osd_object_create,
@@ -2349,6 +2448,8 @@ static const struct dt_object_operations osd_obj_ea_ops = {
         .do_attr_get          = osd_attr_get,
         .do_declare_attr_set  = osd_declare_attr_set,
         .do_attr_set          = osd_attr_set,
+        .do_declare_punch     = osd_declare_punch,
+        .do_punch             = osd_punch,
         .do_ah_init           = osd_ah_init,
         .do_declare_create    = osd_declare_object_create,
         .do_create            = osd_object_ea_create,
index 6db4fdf..11276dc 100644 (file)
@@ -103,6 +103,7 @@ struct osd_thandle {
         /* Link to the device, for debugging. */
         struct lu_ref_link     *ot_dev_link;
         int                     ot_credits;
+        struct osd_object      *ot_alloc_sem_obj;
 #ifdef OSD_TRACK_DECLARES
         int                     ot_declare_attr_set;
         int                     ot_declare_punch;
index 14bcd1d..79df0b7 100644 (file)
@@ -410,6 +410,14 @@ int osd_get_bufs(const struct lu_env *env, struct dt_object *d, loff_t pos,
         }
         rc = i;
 
+        /* Filter truncate first locks i_mutex then partally truncated
+         * page, filter write code first locks pages then take
+         * i_mutex.  To avoid a deadlock in case of concurrent
+         * punch/write requests from one client, filter writes and
+         * filter truncates are serialized by i_alloc_sem, allowing
+         * multiple writes or single truncate. */
+        down_read(&obj->oo_inode->i_alloc_sem);
+
 cleanup:
         RETURN(rc);
 }
@@ -417,7 +425,10 @@ cleanup:
 static int osd_put_bufs(const struct lu_env *env, struct dt_object *dt,
                 struct niobuf_local *lb, int npages)
 {
-        int i;
+        struct osd_object *obj    = osd_dt_obj(dt);
+        int                i;
+
+        up_read(&obj->oo_inode->i_alloc_sem);
 
         for (i = 0; i < npages; i++) {
                 if (lb[i].page == NULL)
@@ -475,8 +486,28 @@ static int osd_write_prep(const struct lu_env *env, struct dt_object *dt,
 }
 
 static int osd_declare_write_commit(const struct lu_env *env, struct dt_object *dt,
-                struct niobuf_local *lb, int npages, struct thandle *thandle)
+                struct niobuf_local *lb, int npages, struct thandle *handle)
 {
+        struct osd_thandle  *oh;
+        int                  extents = 1;
+        int                  i;
+
+        LASSERT(handle != NULL);
+        oh = container_of0(handle, struct osd_thandle, ot_super);
+        LASSERT(oh->ot_handle == NULL);
+
+        /* allocate each block in different group (bitmap + gd) */
+        oh->ot_credits += npages * 2;
+
+        /* calculate number of extents (probably better to pass nb) */
+        for (i = 1; i < npages; i++)
+                if (lb[i].file_offset != lb[i-1].file_offset + lb[i-1].len)
+                        extents++;
+
+        /* each extent can go into new leaf causing a split */
+        /* 5 is max tree depth, 2 is bitmap + gd */
+        oh->ot_credits += (5 * (2 + 1)) * extents;
+        
         RETURN(0);
 }