Whamcloud - gitweb
LU-10188 osd-zfs: handle non 4K aligned block size 41/29241/17
authorFan Yong <fan.yong@intel.com>
Wed, 6 Dec 2017 13:45:57 +0000 (21:45 +0800)
committerOleg Drokin <oleg.drokin@intel.com>
Sun, 17 Dec 2017 06:19:21 +0000 (06:19 +0000)
After restored from server side file level backup, the files
created via ZPL may use non 4K-bytes aligned block size. When
the device is mounted as Lustre, the zfs-osd will adjust the
client visible OST-objects' block size as at least 4K-bytes
aligned. For the objects that cannot be reset the block size,
the OSD logic needs to handle the non aligned case properly.
Otherwise, Lustre I/O handler may cause osd-zfs or ZFS panic
when osd_bufs_get_read().

Signed-off-by: Fan Yong <fan.yong@intel.com>
Change-Id: Ic07aeec3fc774508cedd6d24cca54b76171d143b
Reviewed-on: https://review.whamcloud.com/29241
Tested-by: Jenkins
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/osd-zfs/osd_handler.c
lustre/osd-zfs/osd_internal.h
lustre/osd-zfs/osd_io.c
lustre/osd-zfs/osd_object.c

index 55ef2fe..bee965a 100644 (file)
@@ -223,6 +223,7 @@ static int osd_trans_start(const struct lu_env *env, struct dt_device *d,
                /* add commit callback */
                dmu_tx_callback_register(oh->ot_tx, osd_trans_commit_cb, oh);
                oh->ot_assigned = 1;
                /* add commit callback */
                dmu_tx_callback_register(oh->ot_tx, osd_trans_commit_cb, oh);
                oh->ot_assigned = 1;
+               osd_oti_get(env)->oti_in_trans = 1;
                lu_device_get(&d->dd_lu_dev);
        }
 
                lu_device_get(&d->dd_lu_dev);
        }
 
@@ -310,6 +311,7 @@ static int osd_trans_stop(const struct lu_env *env, struct dt_device *dt,
        /* XXX: Once dmu_tx_commit() called, oh/th could have been freed
         * by osd_trans_commit_cb already. */
        dmu_tx_commit(oh->ot_tx);
        /* XXX: Once dmu_tx_commit() called, oh/th could have been freed
         * by osd_trans_commit_cb already. */
        dmu_tx_commit(oh->ot_tx);
+       osd_oti_get(env)->oti_in_trans = 0;
 
        osd_unlinked_list_emptify(env, osd, &unlinked, true);
 
 
        osd_unlinked_list_emptify(env, osd, &unlinked, true);
 
index 4e19b20..abe740c 100644 (file)
@@ -201,7 +201,8 @@ struct osd_thread_info {
                __u64            oti_key64[(MAXNAMELEN + 1)/sizeof(__u64)];
                sa_bulk_attr_t   oti_attr_bulk[OSD_MAX_IN_BULK];
        };
                __u64            oti_key64[(MAXNAMELEN + 1)/sizeof(__u64)];
                sa_bulk_attr_t   oti_attr_bulk[OSD_MAX_IN_BULK];
        };
-       struct lustre_mdt_attrs oti_mdt_attrs;
+       struct lustre_mdt_attrs  oti_mdt_attrs;
+       unsigned int             oti_in_trans:1;
 
        struct lu_attr           oti_la;
        struct osa_attr          oti_osa;
 
        struct lu_attr           oti_la;
        struct osa_attr          oti_osa;
index afe987c..8573dbf 100644 (file)
@@ -329,7 +329,13 @@ static int osd_bufs_get_read(const struct lu_env *env, struct osd_object *obj,
         * If we discover this is a vital for good performance we
         * can get own replacement for dmu_buf_hold_array_by_bonus().
         */
         * If we discover this is a vital for good performance we
         * can get own replacement for dmu_buf_hold_array_by_bonus().
         */
-       while (len > 0) {
+       while (len > 0 &&
+              (obj->oo_dn->dn_datablkshift != 0 ||
+               off < obj->oo_dn->dn_datablksz)) {
+               if (obj->oo_dn->dn_datablkshift == 0 &&
+                   off + len > obj->oo_dn->dn_datablksz)
+                       len = obj->oo_dn->dn_datablksz - off;
+
                rc = -dmu_buf_hold_array_by_bonus(&obj->oo_dn->dn_bonus->db,
                                                  off, len, TRUE, osd_0copy_tag,
                                                  &numbufs, &dbp);
                rc = -dmu_buf_hold_array_by_bonus(&obj->oo_dn->dn_bonus->db,
                                                  off, len, TRUE, osd_0copy_tag,
                                                  &numbufs, &dbp);
index 8c495ef..e9a8dbe 100644 (file)
@@ -325,6 +325,66 @@ struct lu_object *osd_object_alloc(const struct lu_env *env,
        }
 }
 
        }
 }
 
+static void osd_obj_set_blksize(const struct lu_env *env,
+                               struct osd_device *osd, struct osd_object *obj)
+{
+       const struct lu_fid *fid = lu_object_fid(&obj->oo_dt.do_lu);
+       dmu_tx_t *tx;
+       dnode_t *dn = obj->oo_dn;
+       uint32_t blksz;
+       int rc = 0;
+       ENTRY;
+
+       LASSERT(!osd_oti_get(env)->oti_in_trans);
+
+       tx = dmu_tx_create(osd->od_os);
+       if (!tx) {
+               CERROR("%s: fail to create tx to set blksize for "DFID"\n",
+                      osd->od_svname, PFID(fid));
+               RETURN_EXIT;
+       }
+
+       dmu_tx_hold_bonus(tx, dn->dn_object);
+       rc = -dmu_tx_assign(tx, TXG_WAIT);
+       if (rc) {
+               dmu_tx_abort(tx);
+               CERROR("%s: fail to assign tx to set blksize for "DFID
+                      ": rc = %d\n", osd->od_svname, PFID(fid), rc);
+               RETURN_EXIT;
+       }
+
+       down_write(&obj->oo_guard);
+       if (unlikely((1 << dn->dn_datablkshift) >= PAGE_SIZE))
+               GOTO(out, rc = 1);
+
+       blksz = dn->dn_datablksz;
+       if (!is_power_of_2(blksz))
+               blksz = size_roundup_power2(blksz);
+
+       if (blksz > osd->od_max_blksz)
+               blksz = osd->od_max_blksz;
+       else if (blksz < PAGE_SIZE)
+               blksz = PAGE_SIZE;
+       rc = -dmu_object_set_blocksize(osd->od_os, dn->dn_object, blksz, 0, tx);
+
+       GOTO(out, rc);
+
+out:
+       up_write(&obj->oo_guard);
+       if (rc) {
+               dmu_tx_abort(tx);
+               if (unlikely(obj->oo_dn->dn_maxblkid > 0))
+                       rc = 1;
+               if (rc < 0)
+                       CERROR("%s: fail to set blksize for "DFID": rc = %d\n",
+                              osd->od_svname, PFID(fid), rc);
+       } else {
+               dmu_tx_commit(tx);
+               CDEBUG(D_INODE, "%s: set blksize as %u for "DFID"\n",
+                      osd->od_svname, blksz, PFID(fid));
+       }
+}
+
 /*
  * Concurrency: shouldn't matter.
  */
 /*
  * Concurrency: shouldn't matter.
  */
@@ -335,10 +395,7 @@ int osd_object_init0(const struct lu_env *env, struct osd_object *obj)
        int                      rc = 0;
        ENTRY;
 
        int                      rc = 0;
        ENTRY;
 
-       if (obj->oo_dn == NULL)
-               RETURN(0);
-
-       /* object exist */
+       LASSERT(obj->oo_dn);
 
        rc = osd_object_sa_init(obj, osd);
        if (rc)
 
        rc = osd_object_sa_init(obj, osd);
        if (rc)
@@ -349,10 +406,19 @@ int osd_object_init0(const struct lu_env *env, struct osd_object *obj)
        if (rc)
                RETURN(rc);
 
        if (rc)
                RETURN(rc);
 
-       if (likely(!fid_is_acct(fid)))
+       if (likely(!fid_is_acct(fid))) {
                /* no body operations for accounting objects */
                obj->oo_dt.do_body_ops = &osd_body_ops;
 
                /* no body operations for accounting objects */
                obj->oo_dt.do_body_ops = &osd_body_ops;
 
+               if (S_ISREG(obj->oo_attr.la_mode) &&
+                   obj->oo_dn->dn_maxblkid == 0 &&
+                   (1 << obj->oo_dn->dn_datablkshift) < PAGE_SIZE &&
+                   (fid_is_idif(fid) || fid_is_norm(fid) ||
+                    fid_is_echo(fid)) &&
+                   osd->od_is_ost && !osd->od_dt_dev.dd_rdonly)
+                       osd_obj_set_blksize(env, osd, obj);
+       }
+
        /*
         * initialize object before marking it existing
         */
        /*
         * initialize object before marking it existing
         */
@@ -475,7 +541,6 @@ static int osd_object_init(const struct lu_env *env, struct lu_object *l,
                               osd->od_svname, PFID(lu_object_fid(l)), oid, rc);
                        GOTO(out, rc);
                }
                               osd->od_svname, PFID(lu_object_fid(l)), oid, rc);
                        GOTO(out, rc);
                }
-               LASSERT(obj->oo_dn);
                rc = osd_object_init0(env, obj);
                if (rc != 0)
                        GOTO(out, rc);
                rc = osd_object_init0(env, obj);
                if (rc != 0)
                        GOTO(out, rc);