Whamcloud - gitweb
branch: HEAD
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
index 6e582c1..061e22c 100644 (file)
@@ -72,6 +72,7 @@
 #endif
 #include <lustre_acl.h>
 #include <lustre_param.h>
+#include <lustre_fsfilt.h>
 
 mdl_mode_t mdt_mdl_lock_modes[] = {
         [LCK_MINMODE] = MDL_MINMODE,
@@ -4097,6 +4098,54 @@ out:
         return rc;
 }
 
+/**
+ * setup CONFIG_ORIG context, used to access local config log.
+ * this may need to be rewrite as part of llog rewrite for lu-api.
+ */
+static int mdt_obd_llog_setup(struct obd_device *obd,
+                              struct lustre_sb_info *lsi)
+{
+        int     rc;
+
+        LASSERT(obd->obd_fsops == NULL);
+
+        obd->obd_fsops = fsfilt_get_ops(MT_STR(lsi->lsi_ldd));
+        if (IS_ERR(obd->obd_fsops))
+                return (int) PTR_ERR(obd->obd_fsops);
+
+        rc = fsfilt_setup(obd, lsi->lsi_srv_mnt->mnt_sb);
+        if (rc) {
+                fsfilt_put_ops(obd->obd_fsops);
+                return rc;
+        }
+
+        OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt);
+        obd->obd_lvfs_ctxt.pwdmnt = lsi->lsi_srv_mnt;
+        obd->obd_lvfs_ctxt.pwd = lsi->lsi_srv_mnt->mnt_root;
+        obd->obd_lvfs_ctxt.fs = get_ds();
+
+        rc = llog_setup(obd, &obd->obd_olg, LLOG_CONFIG_ORIG_CTXT, obd,
+                        0, NULL, &llog_lvfs_ops);
+        if (rc) {
+                CERROR("llog setup failed: %d\n", rc);
+                fsfilt_put_ops(obd->obd_fsops);
+        }
+
+        return rc;
+}
+
+static void mdt_obd_llog_cleanup(struct obd_device *obd)
+{
+        struct llog_ctxt *ctxt;
+
+        ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
+        if (ctxt)
+                llog_cleanup(ctxt);
+
+        if (obd->obd_fsops)
+                fsfilt_put_ops(obd->obd_fsops);
+}
+
 static void mdt_fini(const struct lu_env *env, struct mdt_device *m)
 {
         struct md_device *next = m->mdt_child;
@@ -4131,6 +4180,7 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m)
 
         target_recovery_fini(obd);
         mdt_stop_ptlrpc_service(m);
+        mdt_obd_llog_cleanup(obd);
         obd_zombie_barrier();
 #ifdef HAVE_QUOTA_SUPPORT
         next->md_ops->mdo_quota.mqo_cleanup(env, next);
@@ -4176,6 +4226,30 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m)
         EXIT;
 }
 
+static int mdt_adapt_sptlrpc_conf(struct obd_device *obd, int initial)
+{
+        struct mdt_device       *m = mdt_dev(obd->obd_lu_dev);
+        struct sptlrpc_rule_set  tmp_rset;
+        int                      rc;
+
+        sptlrpc_rule_set_init(&tmp_rset);
+        rc = sptlrpc_conf_target_get_rules(obd, &tmp_rset, initial);
+        if (rc) {
+                CERROR("mdt %s: failed get sptlrpc rules: %d\n",
+                       obd->obd_name, rc);
+                return rc;
+        }
+
+        sptlrpc_target_update_exp_flavor(obd, &tmp_rset);
+
+        write_lock(&m->mdt_sptlrpc_lock);
+        sptlrpc_rule_set_free(&m->mdt_sptlrpc_rset);
+        m->mdt_sptlrpc_rset = tmp_rset;
+        write_unlock(&m->mdt_sptlrpc_lock);
+
+        return 0;
+}
+
 static void fsoptions_to_mdt_flags(struct mdt_device *m, char *options)
 {
         char *p = options;
@@ -4404,11 +4478,17 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
         if (rc)
                 GOTO(err_capa, rc);
 
+        rc = mdt_obd_llog_setup(obd, lsi);
+        if (rc)
+                GOTO(err_fs_cleanup, rc);
+
+        mdt_adapt_sptlrpc_conf(obd, 1);
+
 #ifdef HAVE_QUOTA_SUPPORT
         next = m->mdt_child;
         rc = next->md_ops->mdo_quota.mqo_setup(env, next, lmi->lmi_mnt);
         if (rc)
-                GOTO(err_fs_cleanup, rc);
+                GOTO(err_llog_cleanup, rc);
 #endif
 
         server_put_mount_2(dev, lmi->lmi_mnt);
@@ -4446,8 +4526,10 @@ err_recovery:
         target_recovery_fini(obd);
 #ifdef HAVE_QUOTA_SUPPORT
         next->md_ops->mdo_quota.mqo_cleanup(env, next);
-err_fs_cleanup:
+err_llog_cleanup:
 #endif
+        mdt_obd_llog_cleanup(obd);
+err_fs_cleanup:
         mdt_fs_cleanup(env, m);
 err_capa:
         cfs_timer_disarm(&m->mdt_ck_timer);
@@ -4490,34 +4572,6 @@ static int mdt_process_config(const struct lu_env *env,
         ENTRY;
 
         switch (cfg->lcfg_command) {
-        case LCFG_SPTLRPC_CONF: {
-                struct sptlrpc_conf_log *log;
-                struct sptlrpc_rule_set  tmp_rset;
-
-                log = sptlrpc_conf_log_extract(cfg);
-                if (IS_ERR(log)) {
-                        rc = PTR_ERR(log);
-                        break;
-                }
-
-                sptlrpc_rule_set_init(&tmp_rset);
-
-                rc = sptlrpc_rule_set_from_log(&tmp_rset, log);
-                if (rc) {
-                        CERROR("mdt %p: failed get sptlrpc rules: %d\n", m, rc);
-                        break;
-                }
-
-                write_lock(&m->mdt_sptlrpc_lock);
-                sptlrpc_rule_set_free(&m->mdt_sptlrpc_rset);
-                m->mdt_sptlrpc_rset = tmp_rset;
-                write_unlock(&m->mdt_sptlrpc_lock);
-
-                sptlrpc_target_update_exp_flavor(
-                                md2lu_dev(&m->mdt_md_dev)->ld_obd, &tmp_rset);
-
-                break;
-        }
         case LCFG_PARAM: {
                 struct lprocfs_static_vars lvars;
                 struct obd_device *obd = d->ld_obd;
@@ -4634,6 +4688,25 @@ static const struct lu_object_operations mdt_obj_ops = {
         .loo_object_free    = mdt_object_free
 };
 
+static int mdt_obd_set_info_async(struct obd_export *exp,
+                                  __u32 keylen, void *key,
+                                  __u32 vallen, void *val,
+                                  struct ptlrpc_request_set *set)
+{
+        struct obd_device     *obd = exp->exp_obd;
+        int                    rc;
+        ENTRY;
+
+        LASSERT(obd);
+
+        if (KEY_IS(KEY_SPTLRPC_CONF)) {
+                rc = mdt_adapt_sptlrpc_conf(obd, 0);
+                RETURN(rc);
+        }
+
+        RETURN(0);
+}
+
 /* mds_connect_internal */
 static int mdt_connect_internal(struct obd_export *exp,
                                 struct mdt_device *mdt,
@@ -4681,6 +4754,49 @@ static int mdt_connect_internal(struct obd_export *exp,
         return 0;
 }
 
+static int mdt_connect_check_sptlrpc(struct mdt_device *mdt,
+                                     struct obd_export *exp,
+                                     struct ptlrpc_request *req)
+{
+        struct sptlrpc_flavor   flvr;
+        int                     rc = 0;
+
+        if (exp->exp_flvr.sf_rpc == SPTLRPC_FLVR_INVALID) {
+                read_lock(&mdt->mdt_sptlrpc_lock);
+                sptlrpc_target_choose_flavor(&mdt->mdt_sptlrpc_rset,
+                                             req->rq_sp_from,
+                                             req->rq_peer.nid,
+                                             &flvr);
+                read_unlock(&mdt->mdt_sptlrpc_lock);
+
+                spin_lock(&exp->exp_lock);
+
+                exp->exp_sp_peer = req->rq_sp_from;
+                exp->exp_flvr = flvr;
+
+                if (exp->exp_flvr.sf_rpc != req->rq_flvr.sf_rpc) {
+                        CERROR("unauthorized rpc flavor %x from %s, "
+                               "expect %x\n", req->rq_flvr.sf_rpc,
+                               libcfs_nid2str(req->rq_peer.nid),
+                               exp->exp_flvr.sf_rpc);
+                        rc = -EACCES;
+                }
+
+                spin_unlock(&exp->exp_lock);
+        } else {
+                if (exp->exp_sp_peer != req->rq_sp_from) {
+                        CERROR("RPC source %s doesn't match %s\n",
+                               sptlrpc_part2name(req->rq_sp_from),
+                               sptlrpc_part2name(exp->exp_sp_peer));
+                        rc = -EACCES;
+                } else {
+                        rc = sptlrpc_target_export_check(exp, req);
+                }
+        }
+
+        return rc;
+}
+
 /* mds_connect copy */
 static int mdt_obd_connect(const struct lu_env *env,
                            struct lustre_handle *conn, struct obd_device *obd,
@@ -4711,25 +4827,9 @@ static int mdt_obd_connect(const struct lu_env *env,
         exp = class_conn2export(conn);
         LASSERT(exp != NULL);
 
-        CDEBUG(D_SEC, "from %s\n", sptlrpc_part2name(req->rq_sp_from));
-
-        spin_lock(&exp->exp_lock);
-        exp->exp_sp_peer = req->rq_sp_from;
-
-        read_lock(&mdt->mdt_sptlrpc_lock);
-        sptlrpc_rule_set_choose(&mdt->mdt_sptlrpc_rset, exp->exp_sp_peer,
-                                req->rq_peer.nid, &exp->exp_flvr);
-        read_unlock(&mdt->mdt_sptlrpc_lock);
-
-        if (exp->exp_flvr.sf_rpc != req->rq_flvr.sf_rpc) {
-                CERROR("invalid rpc flavor %x, expect %x, from %s\n",
-                       req->rq_flvr.sf_rpc, exp->exp_flvr.sf_rpc,
-                       libcfs_nid2str(req->rq_peer.nid));
-                exp->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
-                spin_unlock(&exp->exp_lock);
-                RETURN(-EACCES);
-        }
-        spin_unlock(&exp->exp_lock);
+        rc = mdt_connect_check_sptlrpc(mdt, exp, req);
+        if (rc)
+                GOTO(out, rc);
 
         rc = mdt_connect_internal(exp, mdt, data);
         if (rc == 0) {
@@ -4753,6 +4853,7 @@ static int mdt_obd_connect(const struct lu_env *env,
                         rc = -ENOMEM;
         }
 
+out:
         if (rc != 0)
                 class_disconnect(exp);
         else
@@ -4780,28 +4881,9 @@ static int mdt_obd_reconnect(const struct lu_env *env,
         req = info->mti_pill->rc_req;
         mdt = mdt_dev(obd->obd_lu_dev);
 
-        CDEBUG(D_SEC, "from %s\n", sptlrpc_part2name(req->rq_sp_from));
-
-        spin_lock(&exp->exp_lock);
-        if (exp->exp_flvr.sf_rpc == SPTLRPC_FLVR_INVALID) {
-                exp->exp_sp_peer = req->rq_sp_from;
-
-                read_lock(&mdt->mdt_sptlrpc_lock);
-                sptlrpc_rule_set_choose(&mdt->mdt_sptlrpc_rset,
-                                        exp->exp_sp_peer,
-                                        req->rq_peer.nid, &exp->exp_flvr);
-                read_unlock(&mdt->mdt_sptlrpc_lock);
-
-                if (exp->exp_flvr.sf_rpc != req->rq_flvr.sf_rpc) {
-                        CERROR("invalid rpc flavor %x, expect %x, from %s\n",
-                               req->rq_flvr.sf_rpc, exp->exp_flvr.sf_rpc,
-                               libcfs_nid2str(req->rq_peer.nid));
-                        exp->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
-                        spin_unlock(&exp->exp_lock);
-                        RETURN(-EACCES);
-                }
-        }
-        spin_unlock(&exp->exp_lock);
+        rc = mdt_connect_check_sptlrpc(mdt, exp, req);
+        if (rc)
+                RETURN(rc);
 
         rc = mdt_connect_internal(exp, mdt_dev(obd->obd_lu_dev), data);
         if (rc == 0)
@@ -5112,6 +5194,7 @@ int mdt_obd_postrecov(struct obd_device *obd)
 
 static struct obd_ops mdt_obd_device_ops = {
         .o_owner          = THIS_MODULE,
+        .o_set_info_async = mdt_obd_set_info_async,
         .o_connect        = mdt_obd_connect,
         .o_reconnect      = mdt_obd_reconnect,
         .o_disconnect     = mdt_obd_disconnect,