Whamcloud - gitweb
LU-354 test: Change dev_set_rdonly() check to warning
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
index 001ef1d..55ab803 100644 (file)
@@ -28,9 +28,8 @@
 /*
  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
- */
-/*
- * Copyright (c) 2011 Whamcloud, Inc.
+ *
+ * Copyright (c) 2011, 2012, Whamcloud, Inc.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -213,7 +212,7 @@ void mdt_lock_pdo_init(struct mdt_lock_handle *lh, ldlm_mode_t lm,
         lh->mlh_reg_mode = lm;
         lh->mlh_type = MDT_PDO_LOCK;
 
-        if (name != NULL) {
+        if (name != NULL && (name[0] != '\0')) {
                 LASSERT(namelen > 0);
                 lh->mlh_pdo_hash = full_name_hash(name, namelen);
         } else {
@@ -469,6 +468,32 @@ static inline int mdt_body_has_lov(const struct lu_attr *la,
                 (S_ISDIR(la->la_mode) && (body->valid & OBD_MD_FLDIREA )) );
 }
 
+void mdt_client_compatibility(struct mdt_thread_info *info)
+{
+        struct mdt_body       *body;
+        struct ptlrpc_request *req = mdt_info_req(info);
+        struct obd_export     *exp = req->rq_export;
+        struct md_attr        *ma = &info->mti_attr;
+        struct lu_attr        *la = &ma->ma_attr;
+        ENTRY;
+
+        if (exp->exp_connect_flags & OBD_CONNECT_LAYOUTLOCK)
+                /* the client can deal with 16-bit lmm_stripe_count */
+                RETURN_EXIT;
+
+        body = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
+
+        if (!mdt_body_has_lov(la, body))
+                RETURN_EXIT;
+
+        /* now we have a reply with a lov for a client not compatible with the
+         * layout lock so we have to clean the layout generation number */
+        if (S_ISREG(la->la_mode))
+                ma->ma_lmm->lmm_layout_gen = 0;
+        EXIT;
+}
+
+
 static int mdt_getattr_internal(struct mdt_thread_info *info,
                                 struct mdt_object *o, int ma_need)
 {
@@ -740,6 +765,7 @@ out_shrink:
         if (rc == 0)
                 mdt_counter_incr(req->rq_export, LPROC_MDT_GETATTR);
 
+        mdt_client_compatibility(info);
         mdt_shrink_reply(info);
         return rc;
 }
@@ -789,6 +815,7 @@ static int mdt_raw_lookup(struct mdt_thread_info *info,
         LASSERT(!info->mti_cross_ref);
 
         /* Only got the fid of this obj by name */
+        fid_zero(child_fid);
         rc = mdo_lookup(info->mti_env, next, lname, child_fid,
                         &info->mti_spec);
 #if 0
@@ -869,8 +896,8 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                         name = NULL;
                         CDEBUG(D_INODE, "getattr with lock for "DFID"/"DFID", "
                                "ldlm_rep = %p\n",
-                               PFID(mdt_object_fid(parent)), PFID(&reqbody->fid2),
-                               ldlm_rep);
+                               PFID(mdt_object_fid(parent)),
+                               PFID(&reqbody->fid2), ldlm_rep);
                 } else {
                         lname = mdt_name(info->mti_env, (char *)name, namelen);
                         CDEBUG(D_INODE, "getattr with lock for "DFID"/%s, "
@@ -941,21 +968,26 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
         }
 
         if (lname) {
-                /* step 1: lock parent */
-                lhp = &info->mti_lh[MDT_LH_PARENT];
-                mdt_lock_pdo_init(lhp, LCK_PR, name, namelen);
-                rc = mdt_object_lock(info, parent, lhp, MDS_INODELOCK_UPDATE,
-                                     MDT_LOCAL_LOCK);
-                if (unlikely(rc != 0))
-                        RETURN(rc);
+                /* step 1: lock parent only if parent is a directory */
+                if (S_ISDIR(lu_object_attr(&parent->mot_obj.mo_lu))) {
+                        lhp = &info->mti_lh[MDT_LH_PARENT];
+                        mdt_lock_pdo_init(lhp, LCK_PR, name, namelen);
+                        rc = mdt_object_lock(info, parent, lhp,
+                                             MDS_INODELOCK_UPDATE,
+                                             MDT_LOCAL_LOCK);
+                        if (unlikely(rc != 0))
+                                RETURN(rc);
+                }
 
                 /* step 2: lookup child's fid by name */
+                fid_zero(child_fid);
                 rc = mdo_lookup(info->mti_env, next, lname, child_fid,
                                 &info->mti_spec);
 
                 if (rc != 0) {
                         if (rc == -ENOENT)
-                                mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
+                                mdt_set_disposition(info, ldlm_rep,
+                                                    DISP_LOOKUP_NEG);
                         GOTO(out_parent, rc);
                 } else
                         mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
@@ -1002,10 +1034,13 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
 relock:
                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RESEND, obd_timeout*2);
                 mdt_lock_handle_init(lhc);
-                mdt_lock_reg_init(lhc, LCK_PR);
+                if (child_bits == MDS_INODELOCK_LAYOUT)
+                        mdt_lock_reg_init(lhc, LCK_CR);
+                else
+                        mdt_lock_reg_init(lhc, LCK_PR);
 
                 if (mdt_object_exists(child) == 0) {
-                        LU_OBJECT_DEBUG(D_WARNING, info->mti_env,
+                        LU_OBJECT_DEBUG(D_INODE, info->mti_env,
                                         &child->mot_obj.mo_lu,
                                         "Object doesn't exist!\n");
                         GOTO(out_child, rc = -ENOENT);
@@ -1021,6 +1056,12 @@ relock:
                         if (unlikely(rc != 0))
                                 GOTO(out_child, rc);
 
+                        /* layout lock is used only on regular files */
+                        if ((ma->ma_valid & MA_INODE) &&
+                            (ma->ma_attr.la_valid & LA_MODE) &&
+                            !S_ISREG(ma->ma_attr.la_mode))
+                                child_bits &= ~MDS_INODELOCK_LAYOUT;
+
                         /* If the file has not been changed for some time, we
                          * return not only a LOOKUP lock, but also an UPDATE
                          * lock and this might save us RPC on later STAT. For
@@ -1108,6 +1149,7 @@ static int mdt_getattr_name(struct mdt_thread_info *info)
         mdt_exit_ucred(info);
         EXIT;
 out_shrink:
+        mdt_client_compatibility(info);
         mdt_shrink_reply(info);
         return rc;
 }
@@ -1562,6 +1604,7 @@ static int mdt_reint_internal(struct mdt_thread_info *info,
 out_ucred:
         mdt_exit_ucred(info);
 out_shrink:
+        mdt_client_compatibility(info);
         mdt_shrink_reply(info);
         return rc;
 }
@@ -3048,6 +3091,7 @@ enum mdt_it_code {
         MDT_IT_UNLINK,
         MDT_IT_TRUNC,
         MDT_IT_GETXATTR,
+        MDT_IT_LAYOUT,
         MDT_IT_NR
 };
 
@@ -3118,6 +3162,11 @@ static struct mdt_it_flavor {
                 .it_fmt   = NULL,
                 .it_flags = 0,
                 .it_act   = NULL
+        },
+        [MDT_IT_LAYOUT] = {
+                .it_fmt   = &RQF_LDLM_INTENT_GETATTR,
+                .it_flags = HABEO_REFERO,
+                .it_act   = mdt_intent_getattr
         }
 };
 
@@ -3293,8 +3342,18 @@ static int mdt_intent_getattr(enum mdt_it_code opcode,
         case MDT_IT_GETATTR:
                 child_bits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
                 break;
+        case MDT_IT_LAYOUT: {
+                static int printed = 0;
+
+                if (!printed) {
+                        CERROR("layout lock not supported by this version\n");
+                        printed = 1;
+                }
+                GOTO(out_shrink, rc = -EINVAL);
+                break;
+        }
         default:
-                CERROR("Unhandled till now");
+                CERROR("Unsupported intent (%d)\n", opcode);
                 GOTO(out_shrink, rc = -EINVAL);
         }
 
@@ -3325,6 +3384,7 @@ static int mdt_intent_getattr(enum mdt_it_code opcode,
 out_ucred:
         mdt_exit_ucred(info);
 out_shrink:
+        mdt_client_compatibility(info);
         mdt_shrink_reply(info);
         return rc;
 }
@@ -3448,6 +3508,9 @@ static int mdt_intent_code(long itcode)
         case IT_GETXATTR:
                 rc = MDT_IT_GETXATTR;
                 break;
+        case IT_LAYOUT:
+                rc = MDT_IT_LAYOUT;
+                break;
         default:
                 CERROR("Unknown intent opcode: %ld\n", itcode);
                 rc = -EINVAL;
@@ -3912,6 +3975,10 @@ static int mdt_start_ptlrpc_service(struct mdt_device *m)
 
         /*
          * setattr service configuration.
+         *
+         * XXX To keep the compatibility with old client(< 2.2), we need to
+         * preserve this portal for a certain time, it should be removed
+         * eventually. LU-617.
          */
         conf = (typeof(conf)) {
                 .psc_nbufs           = MDS_NBUFS,
@@ -4181,8 +4248,11 @@ static struct lu_device *mdt_layer_setup(struct lu_env *env,
         lu_device_get(d);
         lu_ref_add(&d->ld_reference, "lu-stack", &lu_site_init);
 
-        RETURN(d);
+        cfs_spin_lock(&d->ld_site->ls_ld_lock);
+        cfs_list_add_tail(&d->ld_linkage, &d->ld_site->ls_ld_linkage);
+        cfs_spin_unlock(&d->ld_site->ls_ld_lock);
 
+        RETURN(d);
 out_alloc:
         ldt->ldt_ops->ldto_device_free(env, d);
         type->typ_refcnt--;
@@ -4319,6 +4389,9 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m)
         mdt_obd_llog_cleanup(obd);
         obd_exports_barrier(obd);
         obd_zombie_barrier();
+
+        mdt_procfs_fini(m);
+
 #ifdef HAVE_QUOTA_SUPPORT
         next->md_ops->mdo_quota.mqo_cleanup(env, next);
 #endif
@@ -4354,10 +4427,6 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m)
          */
         mdt_stack_fini(env, m, md2lu_dev(m->mdt_child));
 
-        lprocfs_free_per_client_stats(obd);
-        lprocfs_free_obd_stats(obd);
-        mdt_procfs_fini(m);
-
         if (ls) {
                 struct md_site *mite;
 
@@ -4492,12 +4561,6 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
                 GOTO(err_free_site, rc);
         }
 
-        rc = mdt_procfs_init(m, dev);
-        if (rc) {
-                CERROR("Can't init MDT lprocfs, rc %d\n", rc);
-                GOTO(err_fini_proc, rc);
-        }
-
         /* set server index */
         lu_site2md(s)->ms_node_id = node_id;
 
@@ -4521,7 +4584,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
         rc = mdt_stack_init((struct lu_env *)env, m, cfg, lmi);
         if (rc) {
                 CERROR("Can't init device stack, rc %d\n", rc);
-                GOTO(err_fini_proc, rc);
+                GOTO(err_lu_site, rc);
         }
 
         rc = lut_init(env, &m->mdt_lut, obd, m->mdt_bottom);
@@ -4609,9 +4672,15 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
 
         target_recovery_init(&m->mdt_lut, mdt_recovery_handle);
 
+        rc = mdt_procfs_init(m, dev);
+        if (rc) {
+                CERROR("Can't init MDT lprocfs, rc %d\n", rc);
+                GOTO(err_recovery, rc);
+        }
+
         rc = mdt_start_ptlrpc_service(m);
         if (rc)
-                GOTO(err_recovery, rc);
+                GOTO(err_procfs, rc);
 
         ping_evictor_start();
 
@@ -4635,6 +4704,8 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
 err_stop_service:
         ping_evictor_stop();
         mdt_stop_ptlrpc_service(m);
+err_procfs:
+        mdt_procfs_fini(m);
 err_recovery:
         target_recovery_fini(obd);
         upcall_cache_cleanup(m->mdt_identity_cache);
@@ -4662,8 +4733,7 @@ err_lut:
         lut_fini(env, &m->mdt_lut);
 err_fini_stack:
         mdt_stack_fini(env, m, md2lu_dev(m->mdt_child));
-err_fini_proc:
-        mdt_procfs_fini(m);
+err_lu_site:
         lu_site_fini(s);
 err_free_site:
         OBD_FREE_PTR(mite);
@@ -4753,8 +4823,8 @@ static struct lu_object *mdt_object_alloc(const struct lu_env *env,
                 lu_object_init(o, h, d);
                 lu_object_add_top(h, o);
                 o->lo_ops = &mdt_obj_ops;
-                cfs_sema_init(&mo->mot_ioepoch_sem, 1);
-                cfs_sema_init(&mo->mot_lov_sem, 1);
+                cfs_mutex_init(&mo->mot_ioepoch_mutex);
+                cfs_mutex_init(&mo->mot_lov_mutex);
                 RETURN(o);
         } else
                 RETURN(NULL);
@@ -5159,7 +5229,7 @@ static int mdt_init_export(struct obd_export *exp)
 
         CFS_INIT_LIST_HEAD(&med->med_open_head);
         cfs_spin_lock_init(&med->med_open_lock);
-        cfs_sema_init(&med->med_idmap_sem, 1);
+        cfs_mutex_init(&med->med_idmap_mutex);
         med->med_idmap = NULL;
         cfs_spin_lock(&exp->exp_lock);
         exp->exp_connecting = 1;
@@ -5331,7 +5401,7 @@ static int mdt_fid2path(const struct lu_env *env, struct mdt_device *mdt,
 
         obj = mdt_object_find(env, mdt, &fp->gf_fid);
         if (obj == NULL || IS_ERR(obj)) {
-                CDEBUG(D_IOCTL, "no object "DFID": %ld\n",PFID(&fp->gf_fid),
+                CDEBUG(D_IOCTL, "no object "DFID": %ld\n", PFID(&fp->gf_fid),
                        PTR_ERR(obj));
                 RETURN(-EINVAL);
         }
@@ -5452,10 +5522,10 @@ static int mdt_ioc_version_get(struct mdt_thread_info *mti, void *karg)
                  */
                 CERROR("nonlocal object "DFID"\n", PFID(fid));
         } else if (rc == 0) {
-                 *(__u64 *)data->ioc_inlbuf2 = ENOENT_VERSION;
+                *(__u64 *)data->ioc_inlbuf2 = ENOENT_VERSION;
                 rc = -ENOENT;
         } else {
-                version = mo_version_get(mti->mti_env, mdt_object_child(obj));
+                version = dt_version_get(mti->mti_env, mdt_obj2dt(obj));
                *(__u64 *)data->ioc_inlbuf2 = version;
                 rc = 0;
         }
@@ -5484,7 +5554,7 @@ static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                 rc = mdt_device_sync(&env, mdt);
                 break;
         case OBD_IOC_SET_READONLY:
-                dt->dd_ops->dt_ro(&env, dt);
+                rc = dt->dd_ops->dt_ro(&env, dt);
                 break;
         case OBD_IOC_ABORT_RECOVERY:
                 CERROR("Aborting recovery for device %s\n", obd->obd_name);