Whamcloud - gitweb
LU-2730 mdt: fix erroneous LASSERT in mdt_reint_opcode
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
index 3ed1784..02dcdb8 100644 (file)
@@ -427,7 +427,7 @@ void mdt_client_compatibility(struct mdt_thread_info *info)
         struct lu_attr        *la = &ma->ma_attr;
         ENTRY;
 
-       if (exp_connect_flags(exp) & OBD_CONNECT_LAYOUTLOCK)
+       if (exp_connect_layout(exp))
                /* the client can deal with 16-bit lmm_stripe_count */
                RETURN_EXIT;
 
@@ -686,6 +686,10 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
 
        if (mdt_object_remote(o)) {
                /* This object is located on remote node.*/
+               /* Return -EIO for old client */
+               if (!mdt_is_dne_client(req->rq_export))
+                       GOTO(out, rc = -EIO);
+
                repbody->fid1 = *mdt_object_fid(o);
                repbody->valid = OBD_MD_FLID | OBD_MD_MDS;
                GOTO(out, rc = 0);
@@ -1006,6 +1010,95 @@ int mdt_is_subdir(struct mdt_thread_info *info)
        RETURN(rc);
 }
 
+int mdt_swap_layouts(struct mdt_thread_info *info)
+{
+       struct ptlrpc_request   *req = mdt_info_req(info);
+       struct obd_export       *exp = req->rq_export;
+       struct mdt_object       *o1, *o2, *o;
+       struct mdt_lock_handle  *lh1, *lh2;
+       struct mdc_swap_layouts *msl;
+       int                      rc;
+       ENTRY;
+
+       /* client does not support layout lock, so layout swaping
+        * is disabled.
+        * FIXME: there is a problem for old clients which don't support
+        * layout lock yet. If those clients have already opened the file
+        * they won't be notified at all so that old layout may still be
+        * used to do IO. This can be fixed after file release is landed by
+        * doing exclusive open and taking full EX ibits lock. - Jinshan */
+       if (!exp_connect_layout(exp))
+               RETURN(-EOPNOTSUPP);
+
+       if (req_capsule_get_size(info->mti_pill, &RMF_CAPA1, RCL_CLIENT))
+               mdt_set_capainfo(info, 0, &info->mti_body->fid1,
+                                req_capsule_client_get(info->mti_pill,
+                                                       &RMF_CAPA1));
+
+       if (req_capsule_get_size(info->mti_pill, &RMF_CAPA2, RCL_CLIENT))
+               mdt_set_capainfo(info, 1, &info->mti_body->fid2,
+                                req_capsule_client_get(info->mti_pill,
+                                                       &RMF_CAPA2));
+
+       o1 = info->mti_object;
+       o = o2 = mdt_object_find(info->mti_env, info->mti_mdt,
+                               &info->mti_body->fid2);
+       if (IS_ERR(o))
+               GOTO(out, rc = PTR_ERR(o));
+
+       if (mdt_object_remote(o) || !mdt_object_exists(o)) /* remote object */
+               GOTO(put, rc = -ENOENT);
+
+       rc = lu_fid_cmp(&info->mti_body->fid1, &info->mti_body->fid2);
+       if (unlikely(rc == 0)) /* same file, you kidding me? no-op. */
+               GOTO(put, rc);
+
+       if (rc < 0)
+               swap(o1, o2);
+
+       /* permission check. Make sure the calling process having permission
+        * to write both files. */
+       rc = mo_permission(info->mti_env, NULL, mdt_object_child(o1), NULL,
+                               MAY_WRITE);
+       if (rc < 0)
+               GOTO(put, rc);
+
+       rc = mo_permission(info->mti_env, NULL, mdt_object_child(o2), NULL,
+                               MAY_WRITE);
+       if (rc < 0)
+               GOTO(put, rc);
+
+       msl = req_capsule_client_get(info->mti_pill, &RMF_SWAP_LAYOUTS);
+       LASSERT(msl != NULL);
+
+       lh1 = &info->mti_lh[MDT_LH_NEW];
+       mdt_lock_reg_init(lh1, LCK_EX);
+       lh2 = &info->mti_lh[MDT_LH_OLD];
+       mdt_lock_reg_init(lh2, LCK_EX);
+
+       rc = mdt_object_lock(info, o1, lh1, MDS_INODELOCK_LAYOUT,
+                            MDT_LOCAL_LOCK);
+       if (rc < 0)
+               GOTO(put, rc);
+
+       rc = mdt_object_lock(info, o2, lh2, MDS_INODELOCK_LAYOUT,
+                            MDT_LOCAL_LOCK);
+       if (rc < 0)
+               GOTO(unlock1, rc);
+
+       rc = mo_swap_layouts(info->mti_env, mdt_object_child(o1),
+                            mdt_object_child(o2), msl->msl_flags);
+       GOTO(unlock2, rc);
+unlock2:
+       mdt_object_unlock(info, o2, lh2, rc);
+unlock1:
+       mdt_object_unlock(info, o1, lh1, rc);
+put:
+       mdt_object_put(info->mti_env, o);
+out:
+       RETURN(rc);
+}
+
 static int mdt_raw_lookup(struct mdt_thread_info *info,
                           struct mdt_object *parent,
                           const struct lu_name *lname,
@@ -1516,10 +1609,10 @@ static int mdt_sendpage(struct mdt_thread_info *info,
         int                      rc;
         ENTRY;
 
-        desc = ptlrpc_prep_bulk_exp(req, rdpg->rp_npages, BULK_PUT_SOURCE,
-                                    MDS_BULK_PORTAL);
-        if (desc == NULL)
-                RETURN(-ENOMEM);
+       desc = ptlrpc_prep_bulk_exp(req, rdpg->rp_npages, 1, BULK_PUT_SOURCE,
+                                   MDS_BULK_PORTAL);
+       if (desc == NULL)
+               RETURN(-ENOMEM);
 
        if (!(exp_connect_flags(exp) & OBD_CONNECT_BRW_SIZE))
                /* old client requires reply size in it's PAGE_SIZE,
@@ -1572,7 +1665,7 @@ int mdt_readpage(struct mdt_thread_info *info)
        if (exp_connect_flags(info->mti_exp) & OBD_CONNECT_64BITHASH)
                rdpg->rp_attrs |= LUDA_64BITHASH;
        rdpg->rp_count  = min_t(unsigned int, reqbody->nlink,
-                               exp_brw_size(info->mti_exp));
+                               exp_max_brw_size(info->mti_exp));
         rdpg->rp_npages = (rdpg->rp_count + CFS_PAGE_SIZE - 1) >>
                           CFS_PAGE_SHIFT;
         OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
@@ -1677,24 +1770,28 @@ out_shrink:
 }
 
 static long mdt_reint_opcode(struct mdt_thread_info *info,
-                             const struct req_format **fmt)
-{
-        struct mdt_rec_reint *rec;
-        long opc;
-
-        opc = err_serious(-EFAULT);
-        rec = req_capsule_client_get(info->mti_pill, &RMF_REC_REINT);
-        if (rec != NULL) {
-                opc = rec->rr_opcode;
-                DEBUG_REQ(D_INODE, mdt_info_req(info), "reint opt = %ld", opc);
-                if (opc < REINT_MAX && fmt[opc] != NULL)
-                        req_capsule_extend(info->mti_pill, fmt[opc]);
-                else {
-                        CERROR("Unsupported opc: %ld\n", opc);
-                        opc = err_serious(opc);
-                }
-        }
-        return opc;
+                            const struct req_format **fmt)
+{
+       struct mdt_rec_reint *rec;
+       long opc;
+
+       rec = req_capsule_client_get(info->mti_pill, &RMF_REC_REINT);
+       if (rec != NULL) {
+               opc = rec->rr_opcode;
+               DEBUG_REQ(D_INODE, mdt_info_req(info), "reint opt = %ld", opc);
+               if (opc < REINT_MAX && fmt[opc] != NULL)
+                       req_capsule_extend(info->mti_pill, fmt[opc]);
+               else {
+                       CERROR("%s: Unsupported opcode '%ld' from client '%s': "
+                              "rc = %d\n", mdt_obd_name(info->mti_mdt), opc,
+                              info->mti_mdt->mdt_ldlm_client->cli_name,
+                              -EFAULT);
+                       opc = err_serious(-EFAULT);
+               }
+       } else {
+               opc = err_serious(-EFAULT);
+       }
+       return opc;
 }
 
 int mdt_reint(struct mdt_thread_info *info)
@@ -2006,7 +2103,7 @@ int mdt_obd_idx_read(struct mdt_thread_info *info)
        if (req_ii->ii_count <= 0)
                GOTO(out, rc = -EFAULT);
        rdpg->rp_count = min_t(unsigned int, req_ii->ii_count << LU_PAGE_SHIFT,
-                              exp_brw_size(info->mti_exp));
+                              exp_max_brw_size(info->mti_exp));
        rdpg->rp_npages = (rdpg->rp_count + CFS_PAGE_SIZE -1) >> CFS_PAGE_SHIFT;
 
        /* allocate pages to store the containers */
@@ -2991,6 +3088,7 @@ static void mdt_thread_info_init(struct ptlrpc_request *req,
 
         /* To not check for split by default. */
         info->mti_spec.no_create = 0;
+       info->mti_spec.sp_rm_entry = 0;
 }
 
 static void mdt_thread_info_fini(struct mdt_thread_info *info)
@@ -3167,6 +3265,7 @@ static int mdt_msg_check_version(struct lustre_msg *msg)
         case MDS_QUOTACHECK:
         case MDS_QUOTACTL:
        case UPDATE_OBJ:
+       case MDS_SWAP_LAYOUTS:
         case QUOTA_DQACQ:
         case QUOTA_DQREL:
         case SEQ_QUERY:
@@ -4467,6 +4566,8 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m)
                 m->mdt_nosquash_strlen = 0;
         }
 
+       next->md_ops->mdo_iocontrol(env, next, OBD_IOC_PAUSE_LFSCK,
+                                   0, NULL);
         mdt_seq_fini(env, m);
         mdt_fld_fini(env, m);
         sptlrpc_rule_set_free(&m->mdt_sptlrpc_rset);
@@ -4586,6 +4687,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
        spin_lock_init(&m->mdt_osfs_lock);
        m->mdt_osfs_age = cfs_time_shift_64(-1000);
        m->mdt_enable_remote_dir = 0;
+       m->mdt_enable_remote_dir_gid = 0;
 
         m->mdt_md_dev.md_lu_dev.ld_ops = &mdt_lu_ops;
         m->mdt_md_dev.md_lu_dev.ld_obd = obd;
@@ -4926,6 +5028,12 @@ static int mdt_prepare(const struct lu_env *env,
        if (rc)
                RETURN(rc);
 
+       rc = mdt->mdt_child->md_ops->mdo_iocontrol(env, mdt->mdt_child,
+                                                  OBD_IOC_START_LFSCK,
+                                                  0, NULL);
+       if (rc != 0)
+               CWARN("Fail to auto trigger paused LFSCK.\n");
+
        rc = mdt->mdt_child->md_ops->mdo_root_get(env, mdt->mdt_child,
                                                  &mdt->mdt_md_root_fid);
        if (rc)
@@ -5420,12 +5528,12 @@ static int mdt_fid2path(const struct lu_env *env, struct mdt_device *mdt,
                RETURN(-EINVAL);
        }
 
-       rc = lu_object_exists(&obj->mot_obj.mo_lu);
-       if (rc <= 0) {
-               if (rc == -1)
-                       rc = -EREMOTE;
-               else
-                       rc = -ENOENT;
+       if (mdt_object_remote(obj))
+               rc = -EREMOTE;
+       else if (!mdt_object_exists(obj))
+               rc = -ENOENT;
+
+       if (rc < 0) {
                mdt_object_put(env, obj);
                CDEBUG(D_IOCTL, "nonlocal object "DFID": %d\n",
                        PFID(&fp->gf_fid), rc);