Whamcloud - gitweb
LU-2875 mdt: use lu_name for rr_name and rr_tgt
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
index 1d384e8..bd23719 100644 (file)
@@ -164,27 +164,28 @@ void mdt_lock_reg_init(struct mdt_lock_handle *lh, ldlm_mode_t lm)
         lh->mlh_type = MDT_REG_LOCK;
 }
 
-void mdt_lock_pdo_init(struct mdt_lock_handle *lh, ldlm_mode_t lm,
-                       const char *name, int namelen)
+void mdt_lock_pdo_init(struct mdt_lock_handle *lh, ldlm_mode_t lock_mode,
+                      const struct lu_name *lname)
 {
-        lh->mlh_reg_mode = lm;
-       lh->mlh_rreg_mode = lm;
-        lh->mlh_type = MDT_PDO_LOCK;
+       lh->mlh_reg_mode = lock_mode;
+       lh->mlh_rreg_mode = lock_mode;
+       lh->mlh_type = MDT_PDO_LOCK;
 
-        if (name != NULL && (name[0] != '\0')) {
-                LASSERT(namelen > 0);
-                lh->mlh_pdo_hash = full_name_hash(name, namelen);
+       if (lu_name_is_valid(lname)) {
+               lh->mlh_pdo_hash = full_name_hash(lname->ln_name,
+                                                 lname->ln_namelen);
                /* XXX Workaround for LU-2856
-                * Zero is a valid return value of full_name_hash, but several
-                * users of mlh_pdo_hash assume a non-zero hash value. We
-                * therefore map zero onto an arbitrary, but consistent
-                * value (1) to avoid problems further down the road. */
-               if (unlikely(!lh->mlh_pdo_hash))
+                *
+                * Zero is a valid return value of full_name_hash, but
+                * several users of mlh_pdo_hash assume a non-zero
+                * hash value. We therefore map zero onto an
+                * arbitrary, but consistent value (1) to avoid
+                * problems further down the road. */
+               if (unlikely(lh->mlh_pdo_hash == 0))
                        lh->mlh_pdo_hash = 1;
-        } else {
-                LASSERT(namelen == 0);
-                lh->mlh_pdo_hash = 0ull;
-        }
+       } else {
+               lh->mlh_pdo_hash = 0;
+       }
 }
 
 static void mdt_lock_pdo_mode(struct mdt_thread_info *info, struct mdt_object *o,
@@ -1221,8 +1222,6 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
         struct md_object       *next      = mdt_object_child(parent);
         struct lu_fid          *child_fid = &info->mti_tmp_fid1;
         struct lu_name         *lname     = NULL;
-        const char             *name      = NULL;
-        int                     namelen   = 0;
         struct mdt_lock_handle *lhp       = NULL;
         struct ldlm_lock       *lock;
         struct ldlm_res_id     *res_id;
@@ -1237,43 +1236,40 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                      lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT));
 
         LASSERT(parent != NULL);
-        name = req_capsule_client_get(info->mti_pill, &RMF_NAME);
-        if (name == NULL)
-                RETURN(err_serious(-EFAULT));
 
-        namelen = req_capsule_get_size(info->mti_pill, &RMF_NAME,
-                                       RCL_CLIENT) - 1;
-        if (!info->mti_cross_ref) {
-                /*
-                 * XXX: Check for "namelen == 0" is for getattr by fid
-                 * (OBD_CONNECT_ATTRFID), otherwise do not allow empty name,
-                 * that is the name must contain at least one character and
-                 * the terminating '\0'
-                 */
-                if (namelen == 0) {
-                        reqbody = req_capsule_client_get(info->mti_pill,
-                                                         &RMF_MDT_BODY);
-                        if (unlikely(reqbody == NULL))
-                                RETURN(err_serious(-EFAULT));
-
-                        if (unlikely(!fid_is_sane(&reqbody->fid2)))
-                                RETURN(err_serious(-EINVAL));
-
-                        name = NULL;
-                        CDEBUG(D_INODE, "getattr with lock for "DFID"/"DFID", "
-                               "ldlm_rep = %p\n",
-                               PFID(mdt_object_fid(parent)),
-                               PFID(&reqbody->fid2), ldlm_rep);
-                } else {
-                        lname = mdt_name(info->mti_env, (char *)name, namelen);
-                        CDEBUG(D_INODE, "getattr with lock for "DFID"/%s, "
-                               "ldlm_rep = %p\n", PFID(mdt_object_fid(parent)),
-                               name, ldlm_rep);
-                }
-        }
+       lname = &info->mti_name;
+       mdt_name_unpack(info->mti_pill, &RMF_NAME, lname, MNF_FIX_ANON);
+
+       if (!info->mti_cross_ref) {
+               /*
+                * XXX: Check for anonymous name is for getattr by fid
+                * (OBD_CONNECT_ATTRFID), otherwise do not allow empty name,
+                * that is the name must contain at least one character and
+                * the terminating '\0'.
+                */
+               if (!lu_name_is_valid(lname)) {
+                       reqbody = req_capsule_client_get(info->mti_pill,
+                                                        &RMF_MDT_BODY);
+                       if (unlikely(reqbody == NULL))
+                               RETURN(err_serious(-EFAULT));
+
+                       if (unlikely(!fid_is_sane(&reqbody->fid2)))
+                               RETURN(err_serious(-EINVAL));
+
+                       CDEBUG(D_INODE, "getattr with lock for "DFID"/"DFID", "
+                              "ldlm_rep = %p\n",
+                              PFID(mdt_object_fid(parent)),
+                              PFID(&reqbody->fid2), ldlm_rep);
+               } else {
+                       CDEBUG(D_INODE, "getattr with lock for "DFID"/"DNAME", "
+                              "ldlm_rep = %p\n", PFID(mdt_object_fid(parent)),
+                              PNAME(lname), ldlm_rep);
+               }
+       }
+
         mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_EXECD);
 
-       if (unlikely(!mdt_object_exists(parent)) && lname) {
+       if (unlikely(!mdt_object_exists(parent)) && lu_name_is_valid(lname)) {
                LU_OBJECT_DEBUG(D_INODE, info->mti_env,
                                &parent->mot_obj,
                                "Parent doesn't exist!\n");
@@ -1283,7 +1279,8 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                         "Parent "DFID" is on remote server\n",
                         PFID(mdt_object_fid(parent)));
        }
-        if (lname) {
+
+       if (lu_name_is_valid(lname)) {
                 rc = mdt_raw_lookup(info, parent, lname, ldlm_rep);
                 if (rc != 0) {
                         if (rc > 0)
@@ -1309,21 +1306,30 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                         LDLM_LOCK_PUT(lock);
                         rc = 0;
                 } else {
-                        mdt_lock_handle_init(lhc);
-                        mdt_lock_reg_init(lhc, LCK_PR);
+                       mdt_lock_handle_init(lhc);
+                       mdt_lock_reg_init(lhc, LCK_PR);
 
-                        /*
-                         * Object's name is on another MDS, no lookup lock is
-                         * needed here but update is.
-                         */
-                        child_bits &= ~MDS_INODELOCK_LOOKUP;
+                       /*
+                        * Object's name is on another MDS, no lookup or layout
+                        * lock is needed here but update lock is.
+                        */
+                       child_bits &= ~(MDS_INODELOCK_LOOKUP |
+                                       MDS_INODELOCK_LAYOUT);
                        child_bits |= MDS_INODELOCK_PERM | MDS_INODELOCK_UPDATE;
 
                        rc = mdt_object_lock(info, child, lhc, child_bits,
-                                             MDT_LOCAL_LOCK);
-                }
+                                            MDT_LOCAL_LOCK);
+               }
                 if (rc == 0) {
                         /* Finally, we can get attr for child. */
+                       if (!mdt_object_exists(child)) {
+                               LU_OBJECT_DEBUG(D_INFO, info->mti_env,
+                                               &child->mot_obj,
+                                            "remote object doesn't exist.\n");
+                                mdt_object_unlock(info, child, lhc, 1);
+                               RETURN(-ENOENT);
+                       }
+
                         mdt_set_capainfo(info, 0, mdt_object_fid(child),
                                          BYPASS_CAPA);
                         rc = mdt_getattr_internal(info, child, 0);
@@ -1333,11 +1339,11 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                 RETURN(rc);
         }
 
-        if (lname) {
-                /* step 1: lock parent only if parent is a directory */
+       if (lu_name_is_valid(lname)) {
+               /* step 1: lock parent only if parent is a directory */
                if (S_ISDIR(lu_object_attr(&parent->mot_obj))) {
-                        lhp = &info->mti_lh[MDT_LH_PARENT];
-                        mdt_lock_pdo_init(lhp, LCK_PR, name, namelen);
+                       lhp = &info->mti_lh[MDT_LH_PARENT];
+                       mdt_lock_pdo_init(lhp, LCK_PR, lname);
                         rc = mdt_object_lock(info, parent, lhp,
                                              MDS_INODELOCK_UPDATE,
                                              MDT_LOCAL_LOCK);
@@ -1448,7 +1454,7 @@ relock:
                if (!OBD_FAIL_CHECK(OBD_FAIL_MDS_NO_LL_GETATTR) &&
                    exp_connect_layout(info->mti_exp) &&
                    S_ISREG(lu_object_attr(&child->mot_obj)) &&
-                   ldlm_rep != NULL) {
+                   !mdt_object_remote(child) && ldlm_rep != NULL) {
                        /* try to grant layout lock for regular file. */
                        try_layout = true;
                }
@@ -2280,11 +2286,13 @@ static int mdt_object_lock0(struct mdt_thread_info *info, struct mdt_object *o,
 
        if (mdt_object_remote(o)) {
                 if (locality == MDT_CROSS_LOCK) {
-                       ibits &= ~(MDS_INODELOCK_UPDATE | MDS_INODELOCK_PERM);
+                       ibits &= ~(MDS_INODELOCK_UPDATE | MDS_INODELOCK_PERM |
+                                  MDS_INODELOCK_LAYOUT);
                         ibits |= MDS_INODELOCK_LOOKUP;
                 } else {
                        LASSERTF(!(ibits &
-                                 (MDS_INODELOCK_UPDATE | MDS_INODELOCK_PERM)),
+                                (MDS_INODELOCK_UPDATE | MDS_INODELOCK_PERM |
+                                 MDS_INODELOCK_LAYOUT)),
                                "%s: wrong bit "LPX64" for remote obj "DFID"\n",
                                mdt_obd_name(info->mti_mdt), ibits,
                                PFID(mdt_object_fid(o)));
@@ -3390,97 +3398,201 @@ static int mdt_intent_policy(struct ldlm_namespace *ns,
        RETURN(rc);
 }
 
-static int mdt_seq_fini(const struct lu_env *env,
-                        struct mdt_device *m)
+static void mdt_deregister_seq_exp(struct mdt_device *mdt)
 {
-       return seq_site_fini(env, mdt_seq_site(m));
+       struct seq_server_site  *ss = mdt_seq_site(mdt);
+
+       if (ss->ss_node_id == 0)
+               return;
+
+       if (ss->ss_client_seq != NULL) {
+               lustre_deregister_lwp_item(&ss->ss_client_seq->lcs_exp);
+               ss->ss_client_seq->lcs_exp = NULL;
+       }
+
+       if (ss->ss_server_fld != NULL) {
+               lustre_deregister_lwp_item(&ss->ss_server_fld->lsf_control_exp);
+               ss->ss_server_fld->lsf_control_exp = NULL;
+       }
 }
 
-static int mdt_seq_init(const struct lu_env *env,
-                        const char *uuid,
-                        struct mdt_device *m)
+static void mdt_seq_fini_cli(struct mdt_device *mdt)
 {
-       struct seq_server_site *ss;
-       char *prefix;
-       int rc;
+       struct seq_server_site *ss = mdt_seq_site(mdt);
+
+       if (ss == NULL)
+               return;
+
+       if (ss->ss_server_seq == NULL)
+               seq_server_set_cli(NULL, ss->ss_server_seq, NULL);
+
+       return;
+}
+
+static int mdt_seq_fini(const struct lu_env *env, struct mdt_device *mdt)
+{
+       mdt_seq_fini_cli(mdt);
+       mdt_deregister_seq_exp(mdt);
+
+       return seq_site_fini(env, mdt_seq_site(mdt));
+}
+
+/**
+ * It will retrieve its FLDB entries from MDT0, and it only happens
+ * when upgrading existent FS to 2.6 or when local FLDB is corrupted,
+ * and it needs to refresh FLDB from the MDT0.
+ **/
+static int mdt_register_lwp_callback(void *data)
+{
+       struct lu_env           env;
+       struct mdt_device       *mdt = data;
+       struct lu_server_fld    *fld = mdt_seq_site(mdt)->ss_server_fld;
+       int                     rc;
        ENTRY;
 
-       ss = mdt_seq_site(m);
+       LASSERT(mdt_seq_site(mdt)->ss_node_id != 0);
 
-       /*
-        * This is sequence-controller node. Init seq-controller server on local
-        * MDT.
-        */
-       if (ss->ss_node_id == 0) {
-               LASSERT(ss->ss_control_seq == NULL);
+       if (!likely(fld->lsf_new))
+               RETURN(0);
 
-               OBD_ALLOC_PTR(ss->ss_control_seq);
-               if (ss->ss_control_seq == NULL)
-                       RETURN(-ENOMEM);
+       rc = lu_env_init(&env, LCT_MD_THREAD);
+       if (rc) {
+               CERROR("%s: cannot init env: rc = %d\n", mdt_obd_name(mdt), rc);
+               RETURN(rc);
+       }
 
-               rc = seq_server_init(ss->ss_control_seq,
-                                    m->mdt_bottom, uuid,
-                                    LUSTRE_SEQ_CONTROLLER,
-                                    ss,
-                                    env);
+       rc = fld_update_from_controller(&env, fld);
+       if (rc != 0) {
+               CERROR("%s: cannot update controller: rc = %d\n",
+                      mdt_obd_name(mdt), rc);
+               GOTO(out, rc);
+       }
+out:
+       lu_env_fini(&env);
+       RETURN(rc);
+}
 
-               if (rc)
-                       GOTO(out_seq_fini, rc);
+static int mdt_register_seq_exp(struct mdt_device *mdt)
+{
+       struct seq_server_site  *ss = mdt_seq_site(mdt);
+       char                    *lwp_name = NULL;
+       int                     rc;
 
-               OBD_ALLOC_PTR(ss->ss_client_seq);
-               if (ss->ss_client_seq == NULL)
-                       GOTO(out_seq_fini, rc = -ENOMEM);
+       if (ss->ss_node_id == 0)
+               return 0;
 
-               OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
-               if (prefix == NULL) {
-                       OBD_FREE_PTR(ss->ss_client_seq);
-                       GOTO(out_seq_fini, rc = -ENOMEM);
-               }
+       OBD_ALLOC(lwp_name, MAX_OBD_NAME);
+       if (lwp_name == NULL)
+               GOTO(out_free, rc = -ENOMEM);
 
-               snprintf(prefix, MAX_OBD_NAME + 5, "ctl-%s",
-                        uuid);
+       rc = tgt_name2lwpname(mdt_obd_name(mdt), lwp_name);
+       if (rc != 0)
+               GOTO(out_free, rc);
 
-               /*
-                * Init seq-controller client after seq-controller server is
-                * ready. Pass ss->ss_control_seq to it for direct talking.
-                */
-               rc = seq_client_init(ss->ss_client_seq, NULL,
-                                    LUSTRE_SEQ_METADATA, prefix,
-                                    ss->ss_control_seq);
-               OBD_FREE(prefix, MAX_OBD_NAME + 5);
+       rc = lustre_register_lwp_item(lwp_name, &ss->ss_client_seq->lcs_exp,
+                                     NULL, NULL);
+       if (rc != 0)
+               GOTO(out_free, rc);
+
+       rc = lustre_register_lwp_item(lwp_name,
+                                     &ss->ss_server_fld->lsf_control_exp,
+                                     mdt_register_lwp_callback, mdt);
+       if (rc != 0) {
+               lustre_deregister_lwp_item(&ss->ss_client_seq->lcs_exp);
+               ss->ss_client_seq->lcs_exp = NULL;
+               GOTO(out_free, rc);
+       }
+out_free:
+       if (lwp_name != NULL)
+               OBD_FREE(lwp_name, MAX_OBD_NAME);
+
+       return rc;
+}
+
+/*
+ * Init client sequence manager which is used by local MDS to talk to sequence
+ * controller on remote node.
+ */
+static int mdt_seq_init_cli(const struct lu_env *env, struct mdt_device *mdt)
+{
+       struct seq_server_site  *ss = mdt_seq_site(mdt);
+       int                     rc;
+       char                    *prefix;
+       ENTRY;
+
+       /* check if this is adding the first MDC and controller is not yet
+        * initialized. */
+       OBD_ALLOC_PTR(ss->ss_client_seq);
+       if (ss->ss_client_seq == NULL)
+               RETURN(-ENOMEM);
+
+       OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
+       if (prefix == NULL) {
+               OBD_FREE_PTR(ss->ss_client_seq);
+               ss->ss_client_seq = NULL;
+               RETURN(-ENOMEM);
+       }
+
+       /* Note: seq_client_fini will be called in seq_site_fini */
+       snprintf(prefix, MAX_OBD_NAME + 5, "ctl-%s", mdt_obd_name(mdt));
+       rc = seq_client_init(ss->ss_client_seq, NULL, LUSTRE_SEQ_METADATA,
+                            prefix, ss->ss_node_id == 0 ?  ss->ss_control_seq :
+                                                           NULL);
+       OBD_FREE(prefix, MAX_OBD_NAME + 5);
+       if (rc != 0) {
+               OBD_FREE_PTR(ss->ss_client_seq);
+               ss->ss_client_seq = NULL;
+               RETURN(rc);
+       }
+
+       rc = seq_server_set_cli(env, ss->ss_server_seq, ss->ss_client_seq);
+
+       RETURN(rc);
+}
 
+static int mdt_seq_init(const struct lu_env *env, struct mdt_device *mdt)
+{
+       struct seq_server_site  *ss;
+       int                     rc;
+       ENTRY;
+
+       ss = mdt_seq_site(mdt);
+       /* init sequence controller server(MDT0) */
+       if (ss->ss_node_id == 0) {
+               OBD_ALLOC_PTR(ss->ss_control_seq);
+               if (ss->ss_control_seq == NULL)
+                       RETURN(-ENOMEM);
+
+               rc = seq_server_init(env, ss->ss_control_seq, mdt->mdt_bottom,
+                                    mdt_obd_name(mdt), LUSTRE_SEQ_CONTROLLER,
+                                    ss);
                if (rc)
                        GOTO(out_seq_fini, rc);
        }
 
-       /* Init seq-server on local MDT */
-       LASSERT(ss->ss_server_seq == NULL);
-
+       /* Init normal sequence server */
        OBD_ALLOC_PTR(ss->ss_server_seq);
        if (ss->ss_server_seq == NULL)
                GOTO(out_seq_fini, rc = -ENOMEM);
 
-       rc = seq_server_init(ss->ss_server_seq,
-                            m->mdt_bottom, uuid,
-                            LUSTRE_SEQ_SERVER,
-                            ss,
-                            env);
+       rc = seq_server_init(env, ss->ss_server_seq, mdt->mdt_bottom,
+                            mdt_obd_name(mdt), LUSTRE_SEQ_SERVER, ss);
        if (rc)
-               GOTO(out_seq_fini, rc = -ENOMEM);
+               GOTO(out_seq_fini, rc);
 
-       /* Assign seq-controller client to local seq-server. */
-       if (ss->ss_node_id == 0) {
-               LASSERT(ss->ss_client_seq != NULL);
+       /* init seq client for seq server to talk to seq controller(MDT0) */
+       rc = mdt_seq_init_cli(env, mdt);
+       if (rc != 0)
+               GOTO(out_seq_fini, rc);
 
-               rc = seq_server_set_cli(ss->ss_server_seq,
-                                       ss->ss_client_seq,
-                                       env);
-       }
+       if (ss->ss_node_id != 0)
+               /* register controler export through lwp */
+               rc = mdt_register_seq_exp(mdt);
 
        EXIT;
 out_seq_fini:
        if (rc)
-               mdt_seq_fini(env, m);
+               mdt_seq_fini(env, mdt);
 
        return rc;
 }
@@ -3518,7 +3630,7 @@ static int mdt_fld_init(const struct lu_env *env,
                RETURN(rc = -ENOMEM);
 
        rc = fld_server_init(env, ss->ss_server_fld, m->mdt_bottom, uuid,
-                            ss->ss_node_id, LU_SEQ_RANGE_MDT);
+                            LU_SEQ_RANGE_MDT);
        if (rc) {
                OBD_FREE_PTR(ss->ss_server_fld);
                ss->ss_server_fld = NULL;
@@ -4239,7 +4351,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
        if (rc)
                GOTO(err_fini_stack, rc);
 
-       rc = mdt_seq_init(env, mdt_obd_name(m), m);
+       rc = mdt_seq_init(env, m);
        if (rc)
                GOTO(err_fini_fld, rc);
 
@@ -4763,7 +4875,9 @@ static int mdt_obd_connect(const struct lu_env *env,
         */
        if (!test_bit(MDT_FL_SYNCED, &mdt->mdt_state) && data != NULL &&
            !(data->ocd_connect_flags & OBD_CONNECT_LIGHTWEIGHT)) {
-               rc = obd_health_check(env, mdt->mdt_child_exp->exp_obd);
+               rc = obd_get_info(env, mdt->mdt_child_exp,
+                                 sizeof(KEY_OSP_CONNECTED),
+                                 KEY_OSP_CONNECTED, NULL, NULL, NULL);
                if (rc)
                        RETURN(-EAGAIN);
                set_bit(MDT_FL_SYNCED, &mdt->mdt_state);
@@ -5632,9 +5746,11 @@ static int __init mdt_mod_init(void)
                GOTO(lu_fini, rc);
 
        lprocfs_mdt_init_vars(&lvars);
-       rc = class_register_type(&mdt_obd_device_ops, NULL,
-                                lvars.module_vars, LUSTRE_MDT_NAME,
-                                &mdt_device_type);
+       rc = class_register_type(&mdt_obd_device_ops, NULL, NULL,
+#ifndef HAVE_ONLY_PROCFS_SEQ
+                               lvars.module_vars,
+#endif
+                               LUSTRE_MDT_NAME, &mdt_device_type);
        if (rc)
                GOTO(mds_fini, rc);
 lu_fini: