Whamcloud - gitweb
LU-2901 ldlm: fix resource/fid check, use DLDLMRES
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
index d13acbb..3c558ae 100644 (file)
@@ -396,7 +396,12 @@ void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
                 b->blocks = 0;
                 /* if no object is allocated on osts, the size on mds is valid. b=22272 */
                 b->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
-        }
+       } else if ((ma->ma_valid & MA_LOV) && ma->ma_lmm &&
+                  (ma->ma_lmm->lmm_pattern & LOV_PATTERN_F_RELEASED)) {
+               /* A released file stores its size on MDS. */
+               b->blocks = 0;
+               b->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
+       }
 
         if (fid) {
                 b->fid1 = *fid;
@@ -734,6 +739,13 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
                RETURN(rc);
        }
 
+       /* if file is released, check if a restore is running */
+       if ((ma->ma_valid & MA_HSM) && (ma->ma_hsm.mh_flags & HS_RELEASED) &&
+           mdt_hsm_restore_is_running(info, mdt_object_fid(o))) {
+               repbody->t_state = MS_RESTORE;
+               repbody->valid |= OBD_MD_TSTATE;
+       }
+
        is_root = lu_fid_eq(mdt_object_fid(o), &info->mti_mdt->mdt_md_root_fid);
 
        /* the Lustre protocol supposes to return default striping
@@ -1317,35 +1329,33 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
          */
         child = mdt_object_find(info->mti_env, info->mti_mdt, child_fid);
 
-        if (unlikely(IS_ERR(child)))
-                GOTO(out_parent, rc = PTR_ERR(child));
-        if (is_resent) {
-                /* Do not take lock for resent case. */
-                lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
-                LASSERTF(lock != NULL, "Invalid lock handle "LPX64"\n",
-                         lhc->mlh_reg_lh.cookie);
-
-                res_id = &lock->l_resource->lr_name;
-                if (!fid_res_name_eq(mdt_object_fid(child),
-                                    &lock->l_resource->lr_name)) {
-                         LASSERTF(fid_res_name_eq(mdt_object_fid(parent),
-                                                 &lock->l_resource->lr_name),
-                                 "Lock res_id: %lu/%lu/%lu, Fid: "DFID".\n",
-                                 (unsigned long)res_id->name[0],
-                                 (unsigned long)res_id->name[1],
-                                 (unsigned long)res_id->name[2],
-                                 PFID(mdt_object_fid(parent)));
-                          CWARN("Although resent, but still not get child lock"
-                                "parent:"DFID" child:"DFID"\n",
-                                PFID(mdt_object_fid(parent)),
-                                PFID(mdt_object_fid(child)));
-                          lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
-                          LDLM_LOCK_PUT(lock);
-                          GOTO(relock, 0);
-                }
-                LDLM_LOCK_PUT(lock);
-                rc = 0;
-        } else {
+       if (unlikely(IS_ERR(child)))
+               GOTO(out_parent, rc = PTR_ERR(child));
+       if (is_resent) {
+               /* Do not take lock for resent case. */
+               lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
+               LASSERTF(lock != NULL, "Invalid lock handle "LPX64"\n",
+                        lhc->mlh_reg_lh.cookie);
+
+               res_id = &lock->l_resource->lr_name;
+               if (!fid_res_name_eq(mdt_object_fid(child),
+                                    &lock->l_resource->lr_name)) {
+                       LASSERTF(fid_res_name_eq(mdt_object_fid(parent),
+                                                &lock->l_resource->lr_name),
+                                "Lock res_id: "DLDLMRES", fid: "DFID"\n",
+                                PLDLMRES(lock->l_resource),
+                                PFID(mdt_object_fid(parent)));
+                       CWARN("Although resent, but still not get child lock"
+                             "parent:"DFID" child:"DFID"\n",
+                             PFID(mdt_object_fid(parent)),
+                             PFID(mdt_object_fid(child)));
+                       lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
+                       LDLM_LOCK_PUT(lock);
+                       GOTO(relock, 0);
+               }
+               LDLM_LOCK_PUT(lock);
+               rc = 0;
+       } else {
                bool try_layout = false;
 
 relock:
@@ -1427,17 +1437,15 @@ relock:
         rc = mdt_getattr_internal(info, child, ma_need);
         if (unlikely(rc != 0)) {
                 mdt_object_unlock(info, child, lhc, 1);
-        } else if (lock) {
-                /* Debugging code. */
-                res_id = &lock->l_resource->lr_name;
-                LDLM_DEBUG(lock, "Returning lock to client");
-                LASSERTF(fid_res_name_eq(mdt_object_fid(child),
-                                         &lock->l_resource->lr_name),
-                         "Lock res_id: %lu/%lu/%lu, Fid: "DFID".\n",
-                         (unsigned long)res_id->name[0],
-                         (unsigned long)res_id->name[1],
-                         (unsigned long)res_id->name[2],
-                         PFID(mdt_object_fid(child)));
+       } else if (lock) {
+               /* Debugging code. */
+               res_id = &lock->l_resource->lr_name;
+               LDLM_DEBUG(lock, "Returning lock to client");
+               LASSERTF(fid_res_name_eq(mdt_object_fid(child),
+                                        &lock->l_resource->lr_name),
+                        "Lock res_id: "DLDLMRES", fid: "DFID"\n",
+                        PLDLMRES(lock->l_resource),
+                        PFID(mdt_object_fid(child)));
                if (mdt_object_exists(child) && !mdt_object_remote(child))
                        mdt_pack_size2body(info, child);
         }
@@ -4599,8 +4607,9 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m)
 
         ping_evictor_stop();
 
-
        mdt_stack_pre_fini(env, m, md2lu_dev(m->mdt_child));
+
+       mdt_llog_ctxt_unclone(env, m, LLOG_AGENT_ORIG_CTXT);
         mdt_llog_ctxt_unclone(env, m, LLOG_CHANGELOG_ORIG_CTXT);
         obd_exports_barrier(obd);
         obd_zombie_barrier();
@@ -4717,6 +4726,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
         m->mdt_som_conf = 0;
 
         m->mdt_opts.mo_cos = MDT_COS_DEFAULT;
+
        lmi = server_get_mount(dev);
         if (lmi == NULL) {
                 CERROR("Cannot get mount info for %s!\n", dev);
@@ -4888,20 +4898,21 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
         RETURN(0);
 
 err_procfs:
-        mdt_procfs_fini(m);
+       mdt_procfs_fini(m);
 err_recovery:
-        target_recovery_fini(obd);
-        upcall_cache_cleanup(m->mdt_identity_cache);
-        m->mdt_identity_cache = NULL;
+       target_recovery_fini(obd);
+       upcall_cache_cleanup(m->mdt_identity_cache);
+       m->mdt_identity_cache = NULL;
 err_llog_cleanup:
-        mdt_llog_ctxt_unclone(env, m, LLOG_CHANGELOG_ORIG_CTXT);
-        mdt_fs_cleanup(env, m);
+       mdt_llog_ctxt_unclone(env, m, LLOG_AGENT_ORIG_CTXT);
+       mdt_llog_ctxt_unclone(env, m, LLOG_CHANGELOG_ORIG_CTXT);
+       mdt_fs_cleanup(env, m);
 err_capa:
-        cfs_timer_disarm(&m->mdt_ck_timer);
-        mdt_ck_thread_stop(m);
+       cfs_timer_disarm(&m->mdt_ck_timer);
+       mdt_ck_thread_stop(m);
 err_free_ns:
-        ldlm_namespace_free(m->mdt_namespace, NULL, 0);
-        obd->obd_namespace = m->mdt_namespace = NULL;
+       ldlm_namespace_free(m->mdt_namespace, NULL, 0);
+       obd->obd_namespace = m->mdt_namespace = NULL;
 err_fini_seq:
        mdt_seq_fini(env, m);
 err_fini_fld:
@@ -4909,11 +4920,11 @@ err_fini_fld:
 err_lut:
        tgt_fini(env, &m->mdt_lut);
 err_fini_stack:
-        mdt_stack_fini(env, m, md2lu_dev(m->mdt_child));
+       mdt_stack_fini(env, m, md2lu_dev(m->mdt_child));
 err_lmi:
        if (lmi)
                server_put_mount(dev, lmi->lmi_mnt);
-        return (rc);
+       return(rc);
 }
 
 /* For interoperability, the left element is old parameter, the right one
@@ -5016,9 +5027,10 @@ static struct lu_object *mdt_object_alloc(const struct lu_env *env,
                 o->lo_ops = &mdt_obj_ops;
                mutex_init(&mo->mot_ioepoch_mutex);
                mutex_init(&mo->mot_lov_mutex);
-                RETURN(o);
-        } else
-                RETURN(NULL);
+               init_rwsem(&mo->mot_open_sem);
+               RETURN(o);
+       }
+       RETURN(NULL);
 }
 
 static int mdt_object_init(const struct lu_env *env, struct lu_object *o,
@@ -5053,11 +5065,14 @@ static void mdt_object_free(const struct lu_env *env, struct lu_object *o)
         CDEBUG(D_INFO, "object free, fid = "DFID"\n",
                PFID(lu_object_fid(o)));
 
-        lu_object_fini(o);
-        lu_object_header_fini(h);
+       LASSERT(atomic_read(&mo->mot_open_count) == 0);
+       LASSERT(atomic_read(&mo->mot_lease_count) == 0);
+
+       lu_object_fini(o);
+       lu_object_header_fini(h);
        OBD_SLAB_FREE_PTR(mo, mdt_object_kmem);
 
-        EXIT;
+       EXIT;
 }
 
 static int mdt_object_print(const struct lu_env *env, void *cookie,
@@ -5092,6 +5107,10 @@ static int mdt_prepare(const struct lu_env *env,
        if (rc)
                RETURN(rc);
 
+       rc = mdt_llog_ctxt_clone(env, mdt, LLOG_AGENT_ORIG_CTXT);
+       if (rc)
+               RETURN(rc);
+
        lsp.lsp_start = NULL;
        lsp.lsp_namespace = mdt->mdt_namespace;
        rc = mdt->mdt_child->md_ops->mdo_iocontrol(env, mdt->mdt_child,
@@ -5109,6 +5128,7 @@ static int mdt_prepare(const struct lu_env *env,
                if (rc)
                        RETURN(rc);
        }
+
        LASSERT(!test_bit(MDT_FL_CFGLOG, &mdt->mdt_state));
        target_recovery_init(&mdt->mdt_lut, mdt_recovery_handle);
        set_bit(MDT_FL_CFGLOG, &mdt->mdt_state);