Whamcloud - gitweb
LU-15283 quota: deadlock between reint & lquota_wb
[fs/lustre-release.git] / lustre / osd-zfs / osd_handler.c
index 4e23ba1..f4f3ea8 100644 (file)
@@ -27,7 +27,6 @@
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
  *
  * lustre/osd-zfs/osd_handler.c
  * Top-level entry points into osd module
@@ -97,8 +96,6 @@ static void arc_prune_func(int64_t bytes, void *private)
        struct lu_env      env;
        int rc;
 
-       LASSERT(site->ls_obj_hash);
-
        rc = lu_env_init(&env, LCT_SHRINKER);
        if (rc) {
                CERROR("%s: can't initialize shrinker env: rc = %d\n",
@@ -175,8 +172,8 @@ static void osd_trans_commit_cb(void *cb_data, int error)
 
 static int osd_trans_cb_add(struct thandle *th, struct dt_txn_commit_cb *dcb)
 {
-       struct osd_thandle *oh = container_of0(th, struct osd_thandle,
-                                              ot_super);
+       struct osd_thandle *oh = container_of(th, struct osd_thandle,
+                                             ot_super);
 
        LASSERT(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC);
        LASSERT(&dcb->dcb_func != NULL);
@@ -200,7 +197,7 @@ static int osd_trans_start(const struct lu_env *env, struct dt_device *d,
 
        ENTRY;
 
-       oh = container_of0(th, struct osd_thandle, ot_super);
+       oh = container_of(th, struct osd_thandle, ot_super);
        LASSERT(oh);
        LASSERT(oh->ot_tx);
 
@@ -280,13 +277,12 @@ static int osd_trans_stop(const struct lu_env *env, struct dt_device *dt,
        struct osd_device       *osd = osd_dt_dev(th->th_dev);
        bool                     sync = (th->th_sync != 0);
        struct osd_thandle      *oh;
-       struct list_head         unlinked;
+       LIST_HEAD(unlinked);
        uint64_t                 txg;
        int                      rc;
        ENTRY;
 
-       oh = container_of0(th, struct osd_thandle, ot_super);
-       INIT_LIST_HEAD(&unlinked);
+       oh = container_of(th, struct osd_thandle, ot_super);
        list_splice_init(&oh->ot_unlinked_list, &unlinked);
 
        osd_oti_get(env)->oti_ins_cache_depth--;
@@ -543,7 +539,7 @@ static int osd_objset_statfs(struct osd_device *osd, struct obd_statfs *osfs)
 
        if (!spa_writeable(dmu_objset_spa(os)) ||
            osd->od_dev_set_rdonly || osd->od_prop_rdonly)
-               osfs->os_state |= OS_STATE_READONLY;
+               osfs->os_state |= OS_STATFS_READONLY;
 
        return 0;
 }
@@ -569,7 +565,7 @@ int osd_statfs(const struct lu_env *env, struct dt_device *d,
        /* ZFS does not support reporting nonrotional status yet, so return
         * flag only if user has set nonrotational.
         */
-       osfs->os_state |= osd->od_nonrotational ? OS_STATE_NONROT : 0;
+       osfs->os_state |= osd->od_nonrotational ? OS_STATFS_NONROT : 0;
 
        RETURN(0);
 }
@@ -645,6 +641,12 @@ static void osd_conf_get(const struct lu_env *env,
                param->ddp_brw_size = osd->od_max_blksz;
        else
                param->ddp_brw_size = ONE_MB_BRW_SIZE;
+
+#ifdef HAVE_DMU_OFFSET_NEXT
+       param->ddp_has_lseek_data_hole = osd->od_sync_on_lseek;
+#else
+       param->ddp_has_lseek_data_hole = false;
+#endif
 }
 
 /*
@@ -696,17 +698,39 @@ static int osd_ro(const struct lu_env *env, struct dt_device *d)
        RETURN(0);
 }
 
-static struct dt_device_operations osd_dt_ops = {
-       .dt_root_get            = osd_root_get,
-       .dt_statfs              = osd_statfs,
-       .dt_trans_create        = osd_trans_create,
-       .dt_trans_start         = osd_trans_start,
-       .dt_trans_stop          = osd_trans_stop,
-       .dt_trans_cb_add        = osd_trans_cb_add,
-       .dt_conf_get            = osd_conf_get,
-       .dt_sync                = osd_sync,
-       .dt_commit_async        = osd_commit_async,
-       .dt_ro                  = osd_ro,
+/* reserve or free quota for some operation */
+static int osd_reserve_or_free_quota(const struct lu_env *env,
+                                    struct dt_device *dev,
+                                    enum quota_type type, __u64 uid,
+                                    __u64 gid, __s64 count, bool is_md)
+{
+       int rc;
+       struct osd_device       *osd = osd_dt_dev(dev);
+       struct osd_thread_info  *info = osd_oti_get(env);
+       struct lquota_id_info   *qi = &info->oti_qi;
+       struct qsd_instance     *qsd = NULL;
+
+       if (is_md)
+               qsd = osd->od_quota_slave_md;
+       else
+               qsd = osd->od_quota_slave_dt;
+
+       rc = quota_reserve_or_free(env, qsd, qi, type, uid, gid, count, is_md);
+       RETURN(rc);
+}
+
+static const struct dt_device_operations osd_dt_ops = {
+       .dt_root_get              = osd_root_get,
+       .dt_statfs                = osd_statfs,
+       .dt_trans_create          = osd_trans_create,
+       .dt_trans_start           = osd_trans_start,
+       .dt_trans_stop            = osd_trans_stop,
+       .dt_trans_cb_add          = osd_trans_cb_add,
+       .dt_conf_get              = osd_conf_get,
+       .dt_sync                  = osd_sync,
+       .dt_commit_async          = osd_commit_async,
+       .dt_ro                    = osd_ro,
+       .dt_reserve_or_free_quota = osd_reserve_or_free_quota,
 };
 
 /*
@@ -744,7 +768,7 @@ static void osd_key_fini(const struct lu_context *ctx,
 
        if (idc != NULL) {
                LASSERT(info->oti_ins_cache_size > 0);
-               OBD_FREE(idc, sizeof(*idc) * info->oti_ins_cache_size);
+               OBD_FREE_PTR_ARRAY_LARGE(idc, info->oti_ins_cache_size);
                info->oti_ins_cache = NULL;
                info->oti_ins_cache_size = 0;
        }
@@ -1067,12 +1091,15 @@ osd_unlinked_drain(const struct lu_env *env, struct osd_device *osd)
 static int osd_mount(const struct lu_env *env,
                     struct osd_device *o, struct lustre_cfg *cfg)
 {
-       char                    *mntdev = lustre_cfg_string(cfg, 1);
-       char                    *str    = lustre_cfg_string(cfg, 2);
-       char                    *svname = lustre_cfg_string(cfg, 4);
+       char *mntdev = lustre_cfg_string(cfg, 1);
+       char *str = lustre_cfg_string(cfg, 2);
+       char *svname = lustre_cfg_string(cfg, 4);
+       time64_t interval = AS_DEFAULT;
        dnode_t *rootdn;
-       const char              *opts;
-       int                      rc;
+       const char *opts;
+       bool resetoi = false;
+       int rc;
+
        ENTRY;
 
        if (o->od_os != NULL)
@@ -1089,6 +1116,8 @@ static int osd_mount(const struct lu_env *env,
        if (rc >= sizeof(o->od_svname))
                RETURN(-E2BIG);
 
+       opts = lustre_cfg_string(cfg, 3);
+
        o->od_index_backup_stop = 0;
        o->od_index = -1; /* -1 means index is invalid */
        rc = server_name2index(o->od_svname, &o->od_index, NULL);
@@ -1107,7 +1136,7 @@ static int osd_mount(const struct lu_env *env,
                }
 
                if (flags & LMD_FLG_NOSCRUB)
-                       o->od_auto_scrub_interval = AS_NEVER;
+                       interval = AS_NEVER;
        }
 
        if (server_name_is_ost(o->od_svname))
@@ -1159,8 +1188,15 @@ static int osd_mount(const struct lu_env *env,
        if (rc)
                GOTO(err, rc);
 
+       if (opts && strstr(opts, "resetoi"))
+               resetoi = true;
+
+       rc = lprocfs_init_brw_stats(&o->od_brw_stats);
+       if (rc)
+               GOTO(err, rc);
+
        o->od_in_init = 1;
-       rc = osd_scrub_setup(env, o);
+       rc = osd_scrub_setup(env, o, interval, resetoi);
        o->od_in_init = 0;
        if (rc)
                GOTO(err, rc);
@@ -1172,8 +1208,8 @@ static int osd_mount(const struct lu_env *env,
        /* currently it's no need to prepare qsd_instance_md for OST */
        if (!o->od_is_ost) {
                o->od_quota_slave_md = qsd_init(env, o->od_svname,
-                                               &o->od_dt_dev,
-                                               o->od_proc_entry, true);
+                                               &o->od_dt_dev, o->od_proc_entry,
+                                               true, false);
                if (IS_ERR(o->od_quota_slave_md)) {
                        rc = PTR_ERR(o->od_quota_slave_md);
                        o->od_quota_slave_md = NULL;
@@ -1182,7 +1218,7 @@ static int osd_mount(const struct lu_env *env,
        }
 
        o->od_quota_slave_dt = qsd_init(env, o->od_svname, &o->od_dt_dev,
-                                    o->od_proc_entry, false);
+                                       o->od_proc_entry, false, false);
 
        if (IS_ERR(o->od_quota_slave_dt)) {
                if (o->od_quota_slave_md != NULL) {
@@ -1203,7 +1239,6 @@ static int osd_mount(const struct lu_env *env,
 #endif
 
        /* parse mount option "noacl", and enable ACL by default */
-       opts = lustre_cfg_string(cfg, 3);
        if (opts == NULL || strstr(opts, "noacl") == NULL)
                o->od_posix_acl = 1;
 
@@ -1280,7 +1315,7 @@ static int osd_device_init0(const struct lu_env *env,
        o->od_dt_dev.dd_ops = &osd_dt_ops;
        sema_init(&o->od_otable_sem, 1);
        INIT_LIST_HEAD(&o->od_ios_list);
-       o->od_auto_scrub_interval = AS_DEFAULT;
+       o->od_sync_on_lseek = B_TRUE;
 
        /* ZFS does not support reporting nonrotional status yet, so this flag
         * is only set if explicitly set by the user.
@@ -1342,10 +1377,8 @@ static struct lu_device *osd_device_free(const struct lu_env *env,
        /* XXX: make osd top device in order to release reference */
        d->ld_site->ls_top_dev = d;
        lu_site_purge(env, d->ld_site, -1);
-       if (!cfs_hash_is_empty(d->ld_site->ls_obj_hash)) {
-               LIBCFS_DEBUG_MSG_DATA_DECL(msgdata, D_ERROR, NULL);
-               lu_site_print(env, d->ld_site, &msgdata, lu_cdebug_printer);
-       }
+       lu_site_print(env, d->ld_site, &d->ld_site->ls_obj_hash.nelems,
+                     D_ERROR, lu_cdebug_printer);
        lu_site_fini(&o->od_site);
        dt_device_fini(&o->od_dt_dev);
        OBD_FREE_PTR(o);
@@ -1360,7 +1393,7 @@ static struct lu_device *osd_device_fini(const struct lu_env *env,
        int                rc;
        ENTRY;
 
-
+       osd_index_backup(env, o, false);
        if (o->od_os) {
                osd_objset_unregister_callbacks(o);
                if (!o->od_dt_dev.dd_rdonly) {
@@ -1508,8 +1541,8 @@ static int osd_obd_disconnect(struct obd_export *exp)
 
 static int osd_fid_init(const struct lu_env *env, struct osd_device *osd)
 {
-       struct seq_server_site  *ss = osd_seq_site(osd);
-       int                     rc;
+       struct seq_server_site *ss = osd_seq_site(osd);
+       int rc = 0;
        ENTRY;
 
        if (osd->od_is_ost || osd->od_cl_seq != NULL)
@@ -1522,13 +1555,8 @@ static int osd_fid_init(const struct lu_env *env, struct osd_device *osd)
        if (osd->od_cl_seq == NULL)
                RETURN(-ENOMEM);
 
-       rc = seq_client_init(osd->od_cl_seq, NULL, LUSTRE_SEQ_METADATA,
-                            osd->od_svname, ss->ss_server_seq);
-
-       if (rc != 0) {
-               OBD_FREE_PTR(osd->od_cl_seq);
-               osd->od_cl_seq = NULL;
-       }
+       seq_client_init(osd->od_cl_seq, NULL, LUSTRE_SEQ_METADATA,
+                       osd->od_svname, ss->ss_server_seq);
 
        if (ss->ss_node_id == 0) {
                /*
@@ -1569,11 +1597,28 @@ static int osd_prepare(const struct lu_env *env, struct lu_device *pdev,
        RETURN(rc);
 }
 
-struct lu_device_operations osd_lu_ops = {
+/**
+ * Implementation of lu_device_operations::ldo_fid_alloc() for OSD
+ *
+ * Allocate FID.
+ *
+ * see include/lu_object.h for the details.
+ */
+static int osd_fid_alloc(const struct lu_env *env, struct lu_device *d,
+                        struct lu_fid *fid, struct lu_object *parent,
+                        const struct lu_name *name)
+{
+       struct osd_device *osd = osd_dev(d);
+
+       return seq_client_alloc_fid(env, osd->od_cl_seq, fid);
+}
+
+const struct lu_device_operations osd_lu_ops = {
        .ldo_object_alloc       = osd_object_alloc,
        .ldo_process_config     = osd_process_config,
        .ldo_recovery_complete  = osd_recovery_complete,
        .ldo_prepare            = osd_prepare,
+       .ldo_fid_alloc          = osd_fid_alloc,
 };
 
 static void osd_type_start(struct lu_device_type *t)
@@ -1584,15 +1629,7 @@ static void osd_type_stop(struct lu_device_type *t)
 {
 }
 
-int osd_fid_alloc(const struct lu_env *env, struct obd_export *exp,
-                 struct lu_fid *fid, struct md_op_data *op_data)
-{
-       struct osd_device *osd = osd_dev(exp->exp_obd->obd_lu_dev);
-
-       return seq_client_alloc_fid(env, osd->od_cl_seq, fid);
-}
-
-static struct lu_device_type_operations osd_device_type_ops = {
+static const struct lu_device_type_operations osd_device_type_ops = {
        .ldto_init              = osd_type_init,
        .ldto_fini              = osd_type_fini,
 
@@ -1614,11 +1651,10 @@ static struct lu_device_type osd_device_type = {
 };
 
 
-static struct obd_ops osd_obd_device_ops = {
+static const struct obd_ops osd_obd_device_ops = {
        .o_owner       = THIS_MODULE,
        .o_connect      = osd_obd_connect,
        .o_disconnect   = osd_obd_disconnect,
-       .o_fid_alloc    = osd_fid_alloc
 };
 
 static int __init osd_init(void)
@@ -1633,7 +1669,7 @@ static int __init osd_init(void)
        if (rc)
                return rc;
 
-       rc = class_register_type(&osd_obd_device_ops, NULL, true, NULL,
+       rc = class_register_type(&osd_obd_device_ops, NULL, true,
                                 LUSTRE_OSD_ZFS_NAME, &osd_device_type);
        if (rc)
                lu_kmem_fini(osd_caches);