4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2016, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
32 * lustre/mdt/mdt_open.c
34 * Lustre Metadata Target (mdt) open/close file handling
36 * Author: Huang Hua <huanghua@clusterfs.com>
39 #define DEBUG_SUBSYSTEM S_MDS
41 #include <lustre_acl.h>
42 #include <lustre_mds.h>
43 #include "mdt_internal.h"
44 #include <lustre_nodemap.h>
46 /* we do nothing because we do not have refcount now */
47 static void mdt_mfd_get(void *mfdp)
51 static struct portals_handle_ops mfd_handle_ops = {
52 .hop_addref = mdt_mfd_get,
56 /* Create a new mdt_file_data struct, initialize it,
57 * and insert it to global hash table */
58 struct mdt_file_data *mdt_mfd_new(const struct mdt_export_data *med)
60 struct mdt_file_data *mfd;
65 INIT_LIST_HEAD(&mfd->mfd_handle.h_link);
66 mfd->mfd_handle.h_owner = med;
67 INIT_LIST_HEAD(&mfd->mfd_list);
68 class_handle_hash(&mfd->mfd_handle, &mfd_handle_ops);
75 * Find the mfd pointed to by handle in global hash table.
76 * In case of replay the handle is obsoleted
77 * but mfd can be found in mfd list by that handle.
78 * Callers need to be holding med_open_lock.
80 struct mdt_file_data *mdt_handle2mfd(struct mdt_export_data *med,
81 const struct lustre_handle *handle,
82 bool is_replay_or_resent)
84 struct mdt_file_data *mfd;
87 LASSERT(handle != NULL);
88 mfd = class_handle2object(handle->cookie, med);
89 /* during dw/setattr replay the mfd can be found by old handle */
90 if (mfd == NULL && is_replay_or_resent) {
91 list_for_each_entry(mfd, &med->med_open_head, mfd_list) {
92 if (mfd->mfd_old_handle.cookie == handle->cookie)
102 void mdt_mfd_free(struct mdt_file_data *mfd)
104 LASSERT(list_empty(&mfd->mfd_list));
105 OBD_FREE_RCU(mfd, sizeof *mfd, &mfd->mfd_handle);
108 static int mdt_create_data(struct mdt_thread_info *info,
109 struct mdt_object *p, struct mdt_object *o)
111 struct md_op_spec *spec = &info->mti_spec;
112 struct md_attr *ma = &info->mti_attr;
116 if (!md_should_create(spec->sp_cr_flags))
119 ma->ma_need = MA_INODE | MA_LOV;
121 mutex_lock(&o->mot_lov_mutex);
122 if (!o->mot_lov_created) {
123 rc = mdo_create_data(info->mti_env,
124 p ? mdt_object_child(p) : NULL,
125 mdt_object_child(o), spec, ma);
127 rc = mdt_attr_get_complex(info, o, ma);
129 if (rc == 0 && ma->ma_valid & MA_LOV)
130 o->mot_lov_created = 1;
133 mutex_unlock(&o->mot_lov_mutex);
137 int mdt_write_read(struct mdt_object *o)
141 spin_lock(&o->mot_write_lock);
142 rc = o->mot_write_count;
143 spin_unlock(&o->mot_write_lock);
147 int mdt_write_get(struct mdt_object *o)
151 spin_lock(&o->mot_write_lock);
152 if (o->mot_write_count < 0)
155 o->mot_write_count++;
156 spin_unlock(&o->mot_write_lock);
161 void mdt_write_put(struct mdt_object *o)
164 spin_lock(&o->mot_write_lock);
165 o->mot_write_count--;
166 spin_unlock(&o->mot_write_lock);
170 static int mdt_write_deny(struct mdt_object *o)
174 spin_lock(&o->mot_write_lock);
175 if (o->mot_write_count > 0)
178 o->mot_write_count--;
179 spin_unlock(&o->mot_write_lock);
183 static void mdt_write_allow(struct mdt_object *o)
186 spin_lock(&o->mot_write_lock);
187 o->mot_write_count++;
188 spin_unlock(&o->mot_write_lock);
192 /* there can be no real transaction so prepare the fake one */
193 static void mdt_empty_transno(struct mdt_thread_info *info, int rc)
195 struct mdt_device *mdt = info->mti_mdt;
196 struct ptlrpc_request *req = mdt_info_req(info);
197 struct tg_export_data *ted;
198 struct lsd_client_data *lcd;
201 /* transaction has occurred already */
202 if (lustre_msg_get_transno(req->rq_repmsg) != 0)
205 if (tgt_is_multimodrpcs_client(req->rq_export)) {
208 /* generate an empty transaction to get a transno
210 th = dt_trans_create(info->mti_env, mdt->mdt_bottom);
212 rc = dt_trans_start(info->mti_env, mdt->mdt_bottom, th);
213 dt_trans_stop(info->mti_env, mdt->mdt_bottom, th);
218 spin_lock(&mdt->mdt_lut.lut_translock);
220 if (info->mti_transno != 0) {
221 struct obd_export *exp = req->rq_export;
223 CERROR("%s: replay trans %llu NID %s: rc = %d\n",
224 mdt_obd_name(mdt), info->mti_transno,
225 libcfs_nid2str(exp->exp_connection->c_peer.nid),
227 spin_unlock(&mdt->mdt_lut.lut_translock);
230 } else if (info->mti_transno == 0) {
231 info->mti_transno = ++mdt->mdt_lut.lut_last_transno;
233 /* should be replay */
234 if (info->mti_transno > mdt->mdt_lut.lut_last_transno)
235 mdt->mdt_lut.lut_last_transno = info->mti_transno;
237 spin_unlock(&mdt->mdt_lut.lut_translock);
239 CDEBUG(D_INODE, "transno = %llu, last_committed = %llu\n",
241 req->rq_export->exp_obd->obd_last_committed);
243 req->rq_transno = info->mti_transno;
244 lustre_msg_set_transno(req->rq_repmsg, info->mti_transno);
246 /* update lcd in memory only for resent cases */
247 ted = &req->rq_export->exp_target_data;
249 mutex_lock(&ted->ted_lcd_lock);
251 if (info->mti_transno < lcd->lcd_last_transno &&
252 info->mti_transno != 0) {
253 /* This should happen during replay. Do not update
254 * last rcvd info if replay req transno < last transno,
255 * otherwise the following resend(after replay) can not
256 * be checked correctly by xid */
257 mutex_unlock(&ted->ted_lcd_lock);
258 CDEBUG(D_HA, "%s: transno = %llu < last_transno = %llu\n",
259 mdt_obd_name(mdt), info->mti_transno,
260 lcd->lcd_last_transno);
264 if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_CLOSE) {
265 if (info->mti_transno != 0)
266 lcd->lcd_last_close_transno = info->mti_transno;
267 lcd->lcd_last_close_xid = req->rq_xid;
268 lcd->lcd_last_close_result = rc;
270 /* VBR: save versions in last_rcvd for reconstruct. */
271 __u64 *pre_versions = lustre_msg_get_versions(req->rq_repmsg);
273 lcd->lcd_pre_versions[0] = pre_versions[0];
274 lcd->lcd_pre_versions[1] = pre_versions[1];
275 lcd->lcd_pre_versions[2] = pre_versions[2];
276 lcd->lcd_pre_versions[3] = pre_versions[3];
278 if (info->mti_transno != 0)
279 lcd->lcd_last_transno = info->mti_transno;
281 lcd->lcd_last_xid = req->rq_xid;
282 lcd->lcd_last_result = rc;
283 lcd->lcd_last_data = info->mti_opdata;
285 mutex_unlock(&ted->ted_lcd_lock);
290 void mdt_mfd_set_mode(struct mdt_file_data *mfd, __u64 mode)
292 LASSERT(mfd != NULL);
294 CDEBUG(D_HA, DFID " Change mfd mode %#llo -> %#llo.\n",
295 PFID(mdt_object_fid(mfd->mfd_object)), mfd->mfd_mode, mode);
297 mfd->mfd_mode = mode;
301 * prep ma_lmm/ma_lmv for md_attr from reply
303 static void mdt_prep_ma_buf_from_rep(struct mdt_thread_info *info,
304 struct mdt_object *obj,
307 LASSERT(ma->ma_lmv == NULL && ma->ma_lmm == NULL);
308 if (S_ISDIR(obj->mot_header.loh_attr)) {
309 ma->ma_lmv = req_capsule_server_get(info->mti_pill,
311 ma->ma_lmv_size = req_capsule_get_size(info->mti_pill,
314 if (ma->ma_lmv_size > 0)
315 ma->ma_need |= MA_LMV;
317 ma->ma_lmm = req_capsule_server_get(info->mti_pill,
319 ma->ma_lmm_size = req_capsule_get_size(info->mti_pill,
322 if (ma->ma_lmm_size > 0)
323 ma->ma_need |= MA_LOV;
327 static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p,
328 struct mdt_object *o, __u64 flags, int created,
329 struct ldlm_reply *rep)
331 struct ptlrpc_request *req = mdt_info_req(info);
332 struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
333 struct mdt_file_data *mfd;
334 struct md_attr *ma = &info->mti_attr;
335 struct lu_attr *la = &ma->ma_attr;
336 struct mdt_body *repbody;
337 int rc = 0, isdir, isreg;
340 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
342 isreg = S_ISREG(la->la_mode);
343 isdir = S_ISDIR(la->la_mode);
344 if (isreg && !(ma->ma_valid & MA_LOV) && !(flags & MDS_OPEN_RELEASE)) {
346 * No EA, check whether it is will set regEA and dirEA since in
347 * above attr get, these size might be zero, so reset it, to
348 * retrieve the MD after create obj.
350 ma->ma_lmm_size = req_capsule_get_size(info->mti_pill,
353 /* in replay case, p == NULL */
354 rc = mdt_create_data(info, p, o);
358 if (exp_connect_flags(req->rq_export) & OBD_CONNECT_DISP_STRIPE)
359 mdt_set_disposition(info, rep, DISP_OPEN_STRIPE);
362 CDEBUG(D_INODE, "after open, ma_valid bit = %#llx lmm_size = %d\n",
363 ma->ma_valid, ma->ma_lmm_size);
365 if (ma->ma_valid & MA_LOV) {
366 LASSERT(ma->ma_lmm_size != 0);
367 repbody->mbo_eadatasize = ma->ma_lmm_size;
369 repbody->mbo_valid |= OBD_MD_FLDIREA;
371 repbody->mbo_valid |= OBD_MD_FLEASIZE;
374 if (ma->ma_valid & MA_LMV) {
375 LASSERT(ma->ma_lmv_size != 0);
376 repbody->mbo_eadatasize = ma->ma_lmv_size;
378 repbody->mbo_valid |= OBD_MD_FLDIREA | OBD_MD_MEA;
381 if (flags & FMODE_WRITE)
382 rc = mdt_write_get(o);
383 else if (flags & MDS_FMODE_EXEC)
384 rc = mdt_write_deny(o);
389 rc = mo_open(info->mti_env, mdt_object_child(o),
390 created ? flags | MDS_OPEN_CREATED : flags);
392 /* If we allow the client to chgrp (CFS_SETGRP_PERM), but the
393 * client does not know which suppgid should be sent to the MDS,
394 * or some other(s) changed the target file's GID after this RPC
395 * sent to the MDS with the suppgid as the original GID, then we
396 * give the client another chance to send the right suppgid. */
398 allow_client_chgrp(info, lu_ucred(info->mti_env)))
399 mdt_set_disposition(info, rep, DISP_OPEN_DENY);
404 mfd = mdt_mfd_new(med);
406 GOTO(err_out, rc = -ENOMEM);
409 * Keep a reference on this object for this open, and is
410 * released by mdt_mfd_close().
412 mdt_object_get(info->mti_env, o);
414 mfd->mfd_xid = req->rq_xid;
417 * @flags is always not zero. At least it should be FMODE_READ,
418 * FMODE_WRITE or MDS_FMODE_EXEC.
423 mdt_mfd_set_mode(mfd, flags);
425 atomic_inc(&o->mot_open_count);
426 if (flags & MDS_OPEN_LEASE)
427 atomic_inc(&o->mot_lease_count);
430 if (req_is_replay(req)) {
431 struct mdt_file_data *old_mfd;
432 /* Check wheather old cookie already exist in
433 * the list, becasue when do recovery, client
434 * might be disconnected from server, and
435 * restart replay, so there maybe some orphan
436 * mfd here, we should remove them */
437 LASSERT(info->mti_rr.rr_handle != NULL);
438 spin_lock(&med->med_open_lock);
439 old_mfd = mdt_handle2mfd(med, info->mti_rr.rr_handle, true);
440 if (old_mfd != NULL) {
441 CDEBUG(D_HA, "delete orphan mfd = %p, fid = "DFID", "
442 "cookie = %#llx\n", mfd,
443 PFID(mdt_object_fid(mfd->mfd_object)),
444 info->mti_rr.rr_handle->cookie);
445 class_handle_unhash(&old_mfd->mfd_handle);
446 list_del_init(&old_mfd->mfd_list);
447 spin_unlock(&med->med_open_lock);
448 /* no attr update for that close */
450 ma->ma_valid |= MA_FLAGS;
451 ma->ma_attr_flags |= MDS_RECOV_OPEN;
452 mdt_mfd_close(info, old_mfd);
453 ma->ma_attr_flags &= ~MDS_RECOV_OPEN;
454 ma->ma_valid &= ~MA_FLAGS;
456 spin_unlock(&med->med_open_lock);
457 CDEBUG(D_HA, "orphan mfd not found, fid = "DFID", "
459 PFID(mdt_object_fid(mfd->mfd_object)),
460 info->mti_rr.rr_handle->cookie);
463 CDEBUG(D_HA, "Store old cookie %#llx in new mfd\n",
464 info->mti_rr.rr_handle->cookie);
466 mfd->mfd_old_handle.cookie = info->mti_rr.rr_handle->cookie;
469 repbody->mbo_handle.cookie = mfd->mfd_handle.h_cookie;
471 if (req->rq_export->exp_disconnected) {
472 spin_lock(&med->med_open_lock);
473 class_handle_unhash(&mfd->mfd_handle);
474 list_del_init(&mfd->mfd_list);
475 spin_unlock(&med->med_open_lock);
476 mdt_mfd_close(info, mfd);
478 spin_lock(&med->med_open_lock);
479 list_add(&mfd->mfd_list, &med->med_open_head);
480 spin_unlock(&med->med_open_lock);
483 mdt_empty_transno(info, rc);
488 if (flags & FMODE_WRITE)
490 else if (flags & MDS_FMODE_EXEC)
496 static int mdt_finish_open(struct mdt_thread_info *info,
497 struct mdt_object *p, struct mdt_object *o,
498 __u64 flags, int created, struct ldlm_reply *rep)
500 struct ptlrpc_request *req = mdt_info_req(info);
501 struct obd_export *exp = req->rq_export;
502 struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
503 struct md_attr *ma = &info->mti_attr;
504 struct lu_attr *la = &ma->ma_attr;
505 struct mdt_file_data *mfd;
506 struct mdt_body *repbody;
508 int isreg, isdir, islnk;
512 LASSERT(ma->ma_valid & MA_INODE);
514 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
516 isreg = S_ISREG(la->la_mode);
517 isdir = S_ISDIR(la->la_mode);
518 islnk = S_ISLNK(la->la_mode);
519 mdt_pack_attr2body(info, repbody, la, mdt_object_fid(o));
521 /* LU-2275, simulate broken behaviour (esp. prevalent in
522 * pre-2.4 servers where a very strange reply is sent on error
523 * that looks like it was actually almost successful and a
524 * failure at the same time.) */
525 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_NEGATIVE_POSITIVE)) {
526 mdt_set_disposition(info, rep, DISP_OPEN_OPEN |
530 if (flags & MDS_OPEN_LOCK)
531 mdt_set_disposition(info, rep, DISP_OPEN_LOCK);
536 #ifdef CONFIG_FS_POSIX_ACL
537 if (exp_connect_flags(exp) & OBD_CONNECT_ACL) {
538 struct lu_nodemap *nodemap = nodemap_get_from_exp(exp);
540 RETURN(PTR_ERR(nodemap));
542 rc = mdt_pack_acl2body(info, repbody, o, nodemap);
543 nodemap_putref(nodemap);
550 * If we are following a symlink, don't open; and do not return open
551 * handle for special nodes as client required.
553 if (islnk || (!isreg && !isdir &&
554 (exp_connect_flags(req->rq_export) & OBD_CONNECT_NODEVOH))) {
555 lustre_msg_set_transno(req->rq_repmsg, 0);
560 * We need to return the existing object's fid back, so it is done here,
561 * after preparing the reply.
563 if (!created && (flags & MDS_OPEN_EXCL) && (flags & MDS_OPEN_CREAT))
566 /* This can't be done earlier, we need to return reply body */
568 if (flags & (MDS_OPEN_CREAT | FMODE_WRITE)) {
569 /* We are trying to create or write an existing dir. */
572 } else if (flags & MDS_OPEN_DIRECTORY)
575 if (OBD_FAIL_CHECK_RESET(OBD_FAIL_MDS_OPEN_CREATE,
576 OBD_FAIL_LDLM_REPLY | OBD_FAIL_ONCE)) {
581 if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
582 spin_lock(&med->med_open_lock);
583 list_for_each(t, &med->med_open_head) {
584 mfd = list_entry(t, struct mdt_file_data, mfd_list);
585 if (mfd->mfd_xid == req->rq_xid)
589 spin_unlock(&med->med_open_lock);
592 repbody->mbo_handle.cookie = mfd->mfd_handle.h_cookie;
593 /* set repbody->ea_size for resent case */
594 if (ma->ma_valid & MA_LOV) {
595 LASSERT(ma->ma_lmm_size != 0);
596 repbody->mbo_eadatasize = ma->ma_lmm_size;
598 repbody->mbo_valid |= OBD_MD_FLDIREA;
600 repbody->mbo_valid |= OBD_MD_FLEASIZE;
602 mdt_set_disposition(info, rep, DISP_OPEN_OPEN);
607 rc = mdt_mfd_open(info, p, o, flags, created, rep);
609 mdt_set_disposition(info, rep, DISP_OPEN_OPEN);
614 void mdt_reconstruct_open(struct mdt_thread_info *info,
615 struct mdt_lock_handle *lhc)
617 const struct lu_env *env = info->mti_env;
618 struct mdt_device *mdt = info->mti_mdt;
619 struct req_capsule *pill = info->mti_pill;
620 struct ptlrpc_request *req = mdt_info_req(info);
621 struct md_attr *ma = &info->mti_attr;
622 struct mdt_reint_record *rr = &info->mti_rr;
623 __u64 flags = info->mti_spec.sp_cr_flags;
624 struct ldlm_reply *ldlm_rep;
625 struct mdt_object *parent;
626 struct mdt_object *child;
627 struct mdt_body *repbody;
632 LASSERT(pill->rc_fmt == &RQF_LDLM_INTENT_OPEN);
633 ldlm_rep = req_capsule_server_get(pill, &RMF_DLM_REP);
634 repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
636 ma->ma_need = MA_INODE | MA_HSM;
639 opdata = mdt_req_from_lrd(req, info->mti_reply_data);
640 mdt_set_disposition(info, ldlm_rep, opdata);
642 CDEBUG(D_INODE, "This is reconstruct open: disp=%#llx, result=%d\n",
643 ldlm_rep->lock_policy_res1, req->rq_status);
645 if (mdt_get_disposition(ldlm_rep, DISP_OPEN_CREATE) &&
647 /* We did not create successfully, return error to client. */
648 GOTO(out, rc = req->rq_status);
650 if (mdt_get_disposition(ldlm_rep, DISP_OPEN_CREATE)) {
651 struct obd_export *exp = req->rq_export;
653 * We failed after creation, but we do not know in which step
654 * we failed. So try to check the child object.
656 parent = mdt_object_find(env, mdt, rr->rr_fid1);
657 if (IS_ERR(parent)) {
658 rc = PTR_ERR(parent);
659 LCONSOLE_WARN("Parent "DFID" lookup error %d."
660 " Evicting client %s with export %s.\n",
661 PFID(rr->rr_fid1), rc,
662 obd_uuid2str(&exp->exp_client_uuid),
663 obd_export_nid2str(exp));
664 mdt_export_evict(exp);
668 child = mdt_object_find(env, mdt, rr->rr_fid2);
671 LCONSOLE_WARN("cannot lookup child "DFID": rc = %d; "
672 "evicting client %s with export %s\n",
673 PFID(rr->rr_fid2), rc,
674 obd_uuid2str(&exp->exp_client_uuid),
675 obd_export_nid2str(exp));
676 mdt_object_put(env, parent);
677 mdt_export_evict(exp);
681 if (unlikely(mdt_object_remote(child))) {
682 /* the child object was created on remote server */
683 if (!mdt_is_dne_client(exp)) {
684 /* Return -EIO for old client */
685 mdt_object_put(env, parent);
686 mdt_object_put(env, child);
687 GOTO(out, rc = -EIO);
689 repbody->mbo_fid1 = *rr->rr_fid2;
690 repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
693 if (mdt_object_exists(child)) {
694 mdt_prep_ma_buf_from_rep(info, child, ma);
695 rc = mdt_attr_get_complex(info, child, ma);
697 rc = mdt_finish_open(info, parent,
701 /* the child does not exist, we should do
703 mdt_object_put(env, parent);
704 mdt_object_put(env, child);
705 GOTO(regular_open, 0);
708 mdt_object_put(env, parent);
709 mdt_object_put(env, child);
713 /* We did not try to create, so we are a pure open */
714 rc = mdt_reint_open(info, lhc);
720 lustre_msg_set_status(req->rq_repmsg, req->rq_status);
721 LASSERT(ergo(rc < 0, lustre_msg_get_transno(req->rq_repmsg) == 0));
724 static int mdt_open_by_fid(struct mdt_thread_info *info, struct ldlm_reply *rep)
726 __u64 flags = info->mti_spec.sp_cr_flags;
727 struct mdt_reint_record *rr = &info->mti_rr;
728 struct md_attr *ma = &info->mti_attr;
729 struct mdt_object *o;
733 o = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid2);
735 RETURN(rc = PTR_ERR(o));
737 if (unlikely(mdt_object_remote(o))) {
738 /* the child object was created on remote server */
739 struct mdt_body *repbody;
741 mdt_set_disposition(info, rep, (DISP_IT_EXECD |
744 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
745 repbody->mbo_fid1 = *rr->rr_fid2;
746 repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
749 if (mdt_object_exists(o)) {
750 mdt_set_disposition(info, rep, (DISP_IT_EXECD |
753 mdt_prep_ma_buf_from_rep(info, o, ma);
754 rc = mdt_attr_get_complex(info, o, ma);
756 rc = mdt_finish_open(info, NULL, o, flags, 0,
763 mdt_object_put(info->mti_env, o);
767 /* lock object for open */
768 static int mdt_object_open_lock(struct mdt_thread_info *info,
769 struct mdt_object *obj,
770 struct mdt_lock_handle *lhc,
773 struct md_attr *ma = &info->mti_attr;
774 __u64 open_flags = info->mti_spec.sp_cr_flags;
775 enum ldlm_mode lm = LCK_CR;
776 bool acq_lease = !!(open_flags & MDS_OPEN_LEASE);
777 bool try_layout = false;
778 bool create_layout = false;
783 mdt_lock_handle_init(lhc);
785 if (req_is_replay(mdt_info_req(info)))
788 if (S_ISREG(lu_object_attr(&obj->mot_obj))) {
789 if (ma->ma_need & MA_LOV && !(ma->ma_valid & MA_LOV) &&
790 md_should_create(open_flags))
791 create_layout = true;
792 if (exp_connect_layout(info->mti_exp) && !create_layout &&
793 ma->ma_need & MA_LOV)
798 /* lease open, acquire write mode of open sem */
799 down_write(&obj->mot_open_sem);
801 /* Lease exists and ask for new lease */
802 if (atomic_read(&obj->mot_lease_count) > 0) {
803 /* only exclusive open is supported, so lease
804 * are conflicted to each other */
805 GOTO(out, rc = -EBUSY);
808 /* Lease must be with open lock */
809 if (!(open_flags & MDS_OPEN_LOCK)) {
810 CERROR("%s: Request lease for file:"DFID ", but open lock "
811 "is missed, open_flags = %#llo : rc = %d\n",
812 mdt_obd_name(info->mti_mdt),
813 PFID(mdt_object_fid(obj)), open_flags, -EPROTO);
814 GOTO(out, rc = -EPROTO);
817 /* XXX: only exclusive open is supported. */
819 *ibits = MDS_INODELOCK_OPEN;
821 /* never grant LCK_EX layout lock to client */
823 } else { /* normal open */
824 /* normal open holds read mode of open sem */
825 down_read(&obj->mot_open_sem);
827 if (open_flags & MDS_OPEN_LOCK) {
828 if (open_flags & FMODE_WRITE)
830 else if (open_flags & MDS_FMODE_EXEC)
835 *ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_OPEN;
836 } else if (atomic_read(&obj->mot_lease_count) > 0) {
837 if (open_flags & FMODE_WRITE)
843 *ibits = MDS_INODELOCK_OPEN;
846 lhc = &info->mti_lh[MDT_LH_LOCAL];
848 CDEBUG(D_INODE, "normal open:"DFID" lease count: %d, lm: %d\n",
849 PFID(mdt_object_fid(obj)),
850 atomic_read(&obj->mot_open_count), lm);
853 mdt_lock_reg_init(lhc, lm);
855 /* one problem to return layout lock on open is that it may result
856 * in too many layout locks cached on the client side. */
857 if (!OBD_FAIL_CHECK(OBD_FAIL_MDS_NO_LL_OPEN) && try_layout) {
858 /* return lookup lock to validate inode at the client side,
859 * this is pretty important otherwise mdt will return layout
860 * lock for each open.
861 * However this is a double-edged sword because changing
862 * permission will revoke huge # of LOOKUP locks. */
863 *ibits |= MDS_INODELOCK_LAYOUT | MDS_INODELOCK_LOOKUP;
864 if (!mdt_object_lock_try(info, obj, lhc, *ibits)) {
865 *ibits &= ~(MDS_INODELOCK_LAYOUT|MDS_INODELOCK_LOOKUP);
867 rc = mdt_object_lock(info, obj, lhc, *ibits);
869 } else if (*ibits != 0) {
870 rc = mdt_object_lock(info, obj, lhc, *ibits);
873 CDEBUG(D_INODE, "%s: Requested bits lock:"DFID ", ibits = %#llx"
874 ", open_flags = %#llo, try_layout = %d : rc = %d\n",
875 mdt_obd_name(info->mti_mdt), PFID(mdt_object_fid(obj)),
876 *ibits, open_flags, try_layout, rc);
878 /* will change layout, revoke layout locks by enqueuing EX lock. */
879 if (rc == 0 && create_layout) {
880 struct mdt_lock_handle *ll = &info->mti_lh[MDT_LH_LAYOUT];
882 CDEBUG(D_INODE, "Will create layout, get EX layout lock:"DFID
883 ", open_flags = %#llo\n",
884 PFID(mdt_object_fid(obj)), open_flags);
886 /* We cannot enqueue another lock for the same resource we
887 * already have a lock for, due to mechanics of waiting list
888 * iterating in ldlm, see LU-3601.
889 * As such we'll drop the open lock we just got above here,
890 * it's ok not to have this open lock as it's main purpose is to
891 * flush unused cached client open handles. */
892 if (lustre_handle_is_used(&lhc->mlh_reg_lh))
893 mdt_object_unlock(info, obj, lhc, 1);
895 LASSERT(!try_layout);
896 mdt_lock_handle_init(ll);
897 mdt_lock_reg_init(ll, LCK_EX);
898 rc = mdt_object_lock(info, obj, ll, MDS_INODELOCK_LAYOUT);
900 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LL_BLOCK, 2);
903 /* Check if there is any other open handles after acquiring
904 * open lock. At this point, caching open handles have been revoked
906 * XXX: Now only exclusive open is supported. Need to check the
907 * type of open for generic lease support. */
908 if (rc == 0 && acq_lease) {
909 struct ptlrpc_request *req = mdt_info_req(info);
910 struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
911 struct mdt_file_data *mfd;
912 bool is_replay_or_resent;
915 /* For lease: application can open a file and then apply lease,
916 * @handle contains original open handle in that case.
917 * In recovery, open REQ will be replayed and the lease REQ may
918 * be resent that means the open handle is already stale, so we
919 * need to fix it up here by finding new handle. */
920 is_replay_or_resent = req_is_replay(req) ||
921 lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT;
923 /* if the request is _not_ a replay request, rr_handle
924 * may be used to hold an openhandle which is issuing the
925 * lease request, so that this openhandle doesn't count. */
926 mfd = mdt_handle2mfd(med, info->mti_rr.rr_handle,
927 is_replay_or_resent);
931 CDEBUG(D_INODE, "acq_lease "DFID": openers: %d, want: %d\n",
932 PFID(mdt_object_fid(obj)),
933 atomic_read(&obj->mot_open_count), open_count);
935 if (atomic_read(&obj->mot_open_count) > open_count)
936 GOTO(out, rc = -EBUSY);
944 static void mdt_object_open_unlock(struct mdt_thread_info *info,
945 struct mdt_object *obj,
946 struct mdt_lock_handle *lhc,
949 __u64 open_flags = info->mti_spec.sp_cr_flags;
950 struct mdt_lock_handle *ll = &info->mti_lh[MDT_LH_LOCAL];
953 if (req_is_replay(mdt_info_req(info)))
956 /* Release local lock - the lock put in MDT_LH_LOCAL will never
957 * return to client side. */
958 if (lustre_handle_is_used(&ll->mlh_reg_lh))
959 mdt_object_unlock(info, obj, ll, 1);
961 ll = &info->mti_lh[MDT_LH_LAYOUT];
962 /* Release local layout lock, layout was created */
963 if (lustre_handle_is_used(&ll->mlh_reg_lh)) {
964 LASSERT(!(ibits & MDS_INODELOCK_LAYOUT));
965 mdt_object_unlock(info, obj, ll, 1);
968 if (open_flags & MDS_OPEN_LEASE)
969 up_write(&obj->mot_open_sem);
971 up_read(&obj->mot_open_sem);
973 /* Cross-ref case, the lock should be returned to the client */
974 if (ibits == 0 || rc == -MDT_EREMOTE_OPEN)
977 if (!(open_flags & MDS_OPEN_LOCK) && !(ibits & MDS_INODELOCK_LAYOUT)) {
978 /* for the open request, the lock will only return to client
979 * if open or layout lock is granted. */
983 if (rc != 0 || !lustre_handle_is_used(&lhc->mlh_reg_lh)) {
984 struct ldlm_reply *ldlm_rep;
986 ldlm_rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP);
987 mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_LOCK);
988 if (lustre_handle_is_used(&lhc->mlh_reg_lh))
989 mdt_object_unlock(info, obj, lhc, 1);
995 * Check release is permitted for the current HSM flags.
997 static bool mdt_hsm_release_allow(const struct md_attr *ma)
999 if (!(ma->ma_valid & MA_HSM))
1002 if (ma->ma_hsm.mh_flags & (HS_DIRTY|HS_NORELEASE|HS_LOST))
1005 if (!(ma->ma_hsm.mh_flags & HS_ARCHIVED))
1011 static int mdt_open_by_fid_lock(struct mdt_thread_info *info,
1012 struct ldlm_reply *rep,
1013 struct mdt_lock_handle *lhc)
1015 const struct lu_env *env = info->mti_env;
1016 struct mdt_device *mdt = info->mti_mdt;
1017 __u64 flags = info->mti_spec.sp_cr_flags;
1018 struct mdt_reint_record *rr = &info->mti_rr;
1019 struct md_attr *ma = &info->mti_attr;
1020 struct mdt_object *parent= NULL;
1021 struct mdt_object *o;
1023 bool object_locked = false;
1027 if (md_should_create(flags) && !(flags & MDS_OPEN_HAS_EA)) {
1028 if (!lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) {
1029 parent = mdt_object_find(env, mdt, rr->rr_fid1);
1030 if (IS_ERR(parent)) {
1031 CDEBUG(D_INODE, "Fail to find parent "DFID
1032 " for anonymous created %ld, try to"
1033 " use server-side parent.\n",
1034 PFID(rr->rr_fid1), PTR_ERR(parent));
1039 ma->ma_need |= MA_PFID;
1042 o = mdt_object_find(env, mdt, rr->rr_fid2);
1044 GOTO(out_parent_put, rc = PTR_ERR(o));
1046 if (mdt_object_remote(o)) {
1047 CDEBUG(D_INFO, "%s: "DFID" is on remote MDT.\n",
1048 mdt_obd_name(info->mti_mdt),
1050 GOTO(out, rc = -EREMOTE);
1051 } else if (!mdt_object_exists(o)) {
1052 mdt_set_disposition(info, rep,
1056 GOTO(out, rc = -ENOENT);
1059 mdt_set_disposition(info, rep, (DISP_IT_EXECD | DISP_LOOKUP_EXECD));
1061 mdt_prep_ma_buf_from_rep(info, o, ma);
1062 if (flags & MDS_OPEN_RELEASE)
1063 ma->ma_need |= MA_HSM;
1064 rc = mdt_attr_get_complex(info, o, ma);
1068 /* We should not change file's existing LOV EA */
1069 if (S_ISREG(lu_object_attr(&o->mot_obj)) &&
1070 flags & MDS_OPEN_HAS_EA && ma->ma_valid & MA_LOV)
1071 GOTO(out, rc = -EEXIST);
1073 /* If a release request, check file flags are fine and ask for an
1074 * exclusive open access. */
1075 if (flags & MDS_OPEN_RELEASE && !mdt_hsm_release_allow(ma))
1076 GOTO(out, rc = -EPERM);
1078 rc = mdt_check_resent_lock(info, o, lhc);
1081 } else if (rc > 0) {
1082 rc = mdt_object_open_lock(info, o, lhc, &ibits);
1083 object_locked = true;
1085 GOTO(out_unlock, rc);
1088 if (ma->ma_valid & MA_PFID) {
1089 parent = mdt_object_find(env, mdt, &ma->ma_pfid);
1090 if (IS_ERR(parent)) {
1091 CDEBUG(D_INODE, "Fail to find parent "DFID
1092 " for anonymous created %ld, try to"
1093 " use system default.\n",
1094 PFID(&ma->ma_pfid), PTR_ERR(parent));
1099 rc = mdt_finish_open(info, parent, o, flags, 0, rep);
1101 mdt_set_disposition(info, rep, DISP_LOOKUP_POS);
1102 if (flags & MDS_OPEN_LOCK)
1103 mdt_set_disposition(info, rep, DISP_OPEN_LOCK);
1104 if (flags & MDS_OPEN_LEASE)
1105 mdt_set_disposition(info, rep, DISP_OPEN_LEASE);
1107 GOTO(out_unlock, rc);
1111 mdt_object_open_unlock(info, o, lhc, ibits, rc);
1113 mdt_object_put(env, o);
1116 mdt_object_put(env, parent);
1120 /* Cross-ref request. Currently it can only be a pure open (w/o create) */
1121 static int mdt_cross_open(struct mdt_thread_info *info,
1122 const struct lu_fid *parent_fid,
1123 const struct lu_fid *fid,
1124 struct ldlm_reply *rep, __u32 flags)
1126 struct md_attr *ma = &info->mti_attr;
1127 struct mdt_object *o;
1131 o = mdt_object_find(info->mti_env, info->mti_mdt, fid);
1133 RETURN(rc = PTR_ERR(o));
1135 if (mdt_object_remote(o)) {
1136 /* Something is wrong here, the object is on another MDS! */
1137 CERROR("%s: "DFID" isn't on this server!: rc = %d\n",
1138 mdt_obd_name(info->mti_mdt), PFID(fid), -EFAULT);
1139 LU_OBJECT_DEBUG(D_WARNING, info->mti_env,
1141 "Object isn't on this server! FLD error?");
1144 if (mdt_object_exists(o)) {
1145 /* Do permission check for cross-open. */
1146 rc = mo_permission(info->mti_env, NULL,
1147 mdt_object_child(o),
1148 NULL, flags | MDS_OPEN_CROSS);
1152 mdt_prep_ma_buf_from_rep(info, o, ma);
1153 rc = mdt_attr_get_complex(info, o, ma);
1157 rc = mdt_finish_open(info, NULL, o, flags, 0, rep);
1160 * Something is wrong here. lookup was positive but
1161 * there is no object!
1163 CERROR("%s: "DFID" doesn't exist!: rc = %d\n",
1164 mdt_obd_name(info->mti_mdt), PFID(fid), -EFAULT);
1169 mdt_object_put(info->mti_env, o);
1174 * find root object and take its xattr lock if it's on remote MDT, later create
1175 * may use fs default striping (which is stored in root xattr).
1177 static int mdt_lock_root_xattr(struct mdt_thread_info *info,
1178 struct mdt_device *mdt)
1180 struct mdt_object *md_root = mdt->mdt_md_root;
1181 struct lustre_handle lhroot;
1184 if (md_root == NULL) {
1185 lu_root_fid(&info->mti_tmp_fid1);
1186 md_root = mdt_object_find(info->mti_env, mdt,
1187 &info->mti_tmp_fid1);
1188 if (IS_ERR(md_root))
1189 return PTR_ERR(md_root);
1191 spin_lock(&mdt->mdt_lock);
1192 if (mdt->mdt_md_root != NULL) {
1193 spin_unlock(&mdt->mdt_lock);
1195 LASSERTF(mdt->mdt_md_root == md_root,
1196 "Different root object ("
1197 DFID") instances, %p, %p\n",
1198 PFID(&info->mti_tmp_fid1),
1199 mdt->mdt_md_root, md_root);
1200 LASSERT(atomic_read(
1201 &md_root->mot_obj.lo_header->loh_ref) > 1);
1203 mdt_object_put(info->mti_env, md_root);
1205 mdt->mdt_md_root = md_root;
1206 spin_unlock(&mdt->mdt_lock);
1210 if (md_root->mot_cache_attr || !mdt_object_remote(md_root))
1213 rc = mdt_remote_object_lock(info, md_root, mdt_object_fid(md_root),
1214 &lhroot, LCK_PR, MDS_INODELOCK_XATTR,
1219 md_root->mot_cache_attr = 1;
1221 /* don't cancel this lock, so that we know the cached xattr is valid. */
1222 ldlm_lock_decref(&lhroot, LCK_PR);
1227 int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
1229 struct mdt_device *mdt = info->mti_mdt;
1230 struct ptlrpc_request *req = mdt_info_req(info);
1231 struct mdt_object *parent;
1232 struct mdt_object *child;
1233 struct mdt_lock_handle *lh;
1234 struct ldlm_reply *ldlm_rep;
1235 struct mdt_body *repbody;
1236 struct lu_fid *child_fid = &info->mti_tmp_fid1;
1237 struct md_attr *ma = &info->mti_attr;
1238 __u64 create_flags = info->mti_spec.sp_cr_flags;
1240 struct mdt_reint_record *rr = &info->mti_rr;
1243 int object_locked = 0;
1247 OBD_FAIL_TIMEOUT_ORSET(OBD_FAIL_MDS_PAUSE_OPEN, OBD_FAIL_ONCE,
1248 (obd_timeout + 1) / 4);
1250 mdt_counter_incr(req, LPROC_MDT_OPEN);
1251 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
1253 ma->ma_need = MA_INODE;
1256 LASSERT(info->mti_pill->rc_fmt == &RQF_LDLM_INTENT_OPEN);
1257 ldlm_rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP);
1259 if (unlikely(create_flags & MDS_OPEN_JOIN_FILE)) {
1260 CERROR("file join is not supported anymore.\n");
1261 GOTO(out, result = err_serious(-EOPNOTSUPP));
1263 msg_flags = lustre_msg_get_flags(req->rq_reqmsg);
1265 if ((create_flags & (MDS_OPEN_HAS_EA | MDS_OPEN_HAS_OBJS)) &&
1266 info->mti_spec.u.sp_ea.eadata == NULL)
1267 GOTO(out, result = err_serious(-EINVAL));
1269 if (create_flags & FMODE_WRITE &&
1270 exp_connect_flags(req->rq_export) & OBD_CONNECT_RDONLY)
1271 GOTO(out, result = -EROFS);
1273 CDEBUG(D_INODE, "I am going to open "DFID"/("DNAME"->"DFID") "
1274 "cr_flag=%#llo mode=0%06o msg_flag=0x%x\n",
1275 PFID(rr->rr_fid1), PNAME(&rr->rr_name),
1276 PFID(rr->rr_fid2), create_flags,
1277 ma->ma_attr.la_mode, msg_flags);
1279 if (info->mti_cross_ref) {
1280 /* This is cross-ref open */
1281 mdt_set_disposition(info, ldlm_rep,
1282 (DISP_IT_EXECD | DISP_LOOKUP_EXECD |
1284 result = mdt_cross_open(info, rr->rr_fid2, rr->rr_fid1,
1285 ldlm_rep, create_flags);
1287 } else if (req_is_replay(req) ||
1288 (req->rq_export->exp_libclient && create_flags & MDS_OPEN_HAS_EA)) {
1289 /* This is a replay request or from liblustre with ea. */
1290 result = mdt_open_by_fid(info, ldlm_rep);
1292 if (result != -ENOENT) {
1293 if (req->rq_export->exp_libclient &&
1294 create_flags & MDS_OPEN_HAS_EA)
1295 GOTO(out, result = 0);
1298 /* We didn't find the correct object, so we need to re-create it
1299 * via a regular replay. */
1300 if (!(create_flags & MDS_OPEN_CREAT)) {
1301 DEBUG_REQ(D_ERROR, req,
1302 "OPEN & CREAT not in open replay/by_fid.");
1303 GOTO(out, result = -EFAULT);
1305 CDEBUG(D_INFO, "No object(1), continue as regular open.\n");
1306 } else if (create_flags & (MDS_OPEN_BY_FID | MDS_OPEN_LOCK)) {
1308 * MDS_OPEN_LOCK is checked for backward compatibility with 2.1
1311 result = mdt_open_by_fid_lock(info, ldlm_rep, lhc);
1313 CDEBUG(D_INFO, "no object for "DFID": %d\n",
1314 PFID(rr->rr_fid2), result);
1318 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK))
1319 GOTO(out, result = err_serious(-ENOMEM));
1321 mdt_set_disposition(info, ldlm_rep,
1322 (DISP_IT_EXECD | DISP_LOOKUP_EXECD));
1324 if (!lu_name_is_valid(&rr->rr_name))
1325 GOTO(out, result = -EPROTO);
1327 result = mdt_lock_root_xattr(info, mdt);
1332 lh = &info->mti_lh[MDT_LH_PARENT];
1333 mdt_lock_pdo_init(lh,
1334 (create_flags & MDS_OPEN_CREAT) ? LCK_PW : LCK_PR,
1337 parent = mdt_object_find(info->mti_env, mdt, rr->rr_fid1);
1339 GOTO(out, result = PTR_ERR(parent));
1341 result = mdt_object_lock(info, parent, lh, MDS_INODELOCK_UPDATE);
1343 mdt_object_put(info->mti_env, parent);
1347 /* get and check version of parent */
1348 result = mdt_version_get_check(info, parent, 0);
1350 GOTO(out_parent, result);
1352 fid_zero(child_fid);
1354 result = mdo_lookup(info->mti_env, mdt_object_child(parent),
1355 &rr->rr_name, child_fid, &info->mti_spec);
1357 LASSERTF(ergo(result == 0, fid_is_sane(child_fid)),
1358 "looking for "DFID"/"DNAME", found FID = "DFID"\n",
1359 PFID(mdt_object_fid(parent)), PNAME(&rr->rr_name),
1362 if (result != 0 && result != -ENOENT && result != -ESTALE)
1363 GOTO(out_parent, result);
1365 if (result == -ENOENT || result == -ESTALE) {
1366 /* If the object is dead, let's check if the object
1367 * is being migrated to a new object */
1368 if (result == -ESTALE) {
1369 struct lu_buf lmv_buf;
1371 lmv_buf.lb_buf = info->mti_xattr_buf;
1372 lmv_buf.lb_len = sizeof(info->mti_xattr_buf);
1373 rc = mo_xattr_get(info->mti_env,
1374 mdt_object_child(parent),
1375 &lmv_buf, XATTR_NAME_LMV);
1377 struct lmv_mds_md_v1 *lmv;
1379 lmv = lmv_buf.lb_buf;
1380 if (le32_to_cpu(lmv->lmv_hash_type) &
1381 LMV_HASH_FLAG_MIGRATION) {
1382 /* Get the new parent FID and retry */
1383 mdt_object_unlock_put(info, parent,
1385 mdt_lock_handle_init(lh);
1387 (struct lu_fid *)rr->rr_fid1,
1388 &lmv->lmv_stripe_fids[1]);
1394 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
1395 if (result == -ESTALE) {
1397 * -ESTALE means the parent is a dead(unlinked) dir, so
1398 * it should return -ENOENT to in accordance with the
1399 * original mds implementaion.
1401 GOTO(out_parent, result = -ENOENT);
1404 if (!(create_flags & MDS_OPEN_CREAT))
1405 GOTO(out_parent, result);
1406 if (exp_connect_flags(req->rq_export) & OBD_CONNECT_RDONLY)
1407 GOTO(out_parent, result = -EROFS);
1408 *child_fid = *info->mti_rr.rr_fid2;
1409 LASSERTF(fid_is_sane(child_fid), "fid="DFID"\n",
1411 /* In the function below, .hs_keycmp resolves to
1412 * lu_obj_hop_keycmp() */
1413 /* coverity[overrun-buffer-val] */
1414 child = mdt_object_new(info->mti_env, mdt, child_fid);
1417 * Check for O_EXCL is moved to the mdt_finish_open(), we need to
1418 * return FID back in that case.
1420 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
1421 child = mdt_object_find(info->mti_env, mdt, child_fid);
1424 GOTO(out_parent, result = PTR_ERR(child));
1426 /** check version of child */
1427 rc = mdt_version_get_check(info, child, 1);
1429 GOTO(out_child, result = rc);
1431 if (result == -ENOENT) {
1432 /* Create under OBF and .lustre is not permitted */
1433 if (!fid_is_md_operative(rr->rr_fid1))
1434 GOTO(out_child, result = -EPERM);
1436 /* save versions in reply */
1437 mdt_version_get_save(info, parent, 0);
1438 mdt_version_get_save(info, child, 1);
1440 /* version of child will be changed */
1441 tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(child));
1443 /* Not found and with MDS_OPEN_CREAT: let's create it. */
1444 mdt_set_disposition(info, ldlm_rep, DISP_OPEN_CREATE);
1446 /* Let lower layers know what is lock mode on directory. */
1447 info->mti_spec.sp_cr_mode =
1448 mdt_dlm_mode2mdl_mode(lh->mlh_pdo_mode);
1451 * Do not perform lookup sanity check. We know that name does
1454 info->mti_spec.sp_cr_lookup = 0;
1455 info->mti_spec.sp_feat = &dt_directory_features;
1457 result = mdo_create(info->mti_env,
1458 mdt_object_child(parent),
1460 mdt_object_child(child),
1463 if (result == -ERESTART) {
1464 mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_CREATE);
1465 GOTO(out_child, result);
1467 mdt_prep_ma_buf_from_rep(info, child, ma);
1468 /* XXX: we should call this once, see few lines below */
1470 result = mdt_attr_get_complex(info, child, ma);
1473 GOTO(out_child, result);
1476 mdt_counter_incr(req, LPROC_MDT_MKNOD);
1479 * The object is on remote node, return its FID for remote open.
1481 if (mdt_object_remote(child)) {
1483 * Check if this lock already was sent to client and
1484 * this is resent case. For resent case do not take lock
1485 * again, use what is already granted.
1487 LASSERT(lhc != NULL);
1489 rc = mdt_check_resent_lock(info, child, lhc);
1491 GOTO(out_child, result = rc);
1492 } else if (rc > 0) {
1493 mdt_lock_handle_init(lhc);
1494 mdt_lock_reg_init(lhc, LCK_PR);
1496 rc = mdt_object_lock(info, child, lhc,
1497 MDS_INODELOCK_LOOKUP);
1499 repbody->mbo_fid1 = *mdt_object_fid(child);
1500 repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
1504 result = -MDT_EREMOTE_OPEN;
1505 GOTO(out_child, result);
1506 } else if (mdt_object_exists(child)) {
1507 /* We have to get attr & LOV EA & HSM for this
1509 mdt_prep_ma_buf_from_rep(info, child, ma);
1510 ma->ma_need |= MA_HSM;
1511 result = mdt_attr_get_complex(info, child, ma);
1513 GOTO(out_child, result);
1515 /* Object does not exist. Likely FS corruption. */
1516 CERROR("%s: name '"DNAME"' present, but FID "
1517 DFID" is invalid\n", mdt_obd_name(info->mti_mdt),
1518 PNAME(&rr->rr_name), PFID(child_fid));
1519 GOTO(out_child, result = -EIO);
1523 rc = mdt_check_resent_lock(info, child, lhc);
1525 GOTO(out_child, result = rc);
1526 } else if (rc == 0) {
1527 /* the open lock might already be gotten in
1528 * ldlm_handle_enqueue() */
1529 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT);
1530 if (create_flags & MDS_OPEN_LOCK)
1531 mdt_set_disposition(info, ldlm_rep, DISP_OPEN_LOCK);
1533 /* get openlock if this isn't replay and client requested it */
1534 if (!req_is_replay(req)) {
1535 rc = mdt_object_open_lock(info, child, lhc, &ibits);
1538 GOTO(out_child_unlock, result = rc);
1539 else if (create_flags & MDS_OPEN_LOCK)
1540 mdt_set_disposition(info, ldlm_rep,
1544 /* Try to open it now. */
1545 rc = mdt_finish_open(info, parent, child, create_flags,
1549 /* openlock will be released if mdt_finish_open failed */
1550 mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_LOCK);
1552 if (created && create_flags & MDS_OPEN_VOLATILE) {
1553 CERROR("%s: cannot open volatile file "DFID", orphan "
1554 "file will be left in PENDING directory until "
1555 "next reboot, rc = %d\n", mdt_obd_name(mdt),
1556 PFID(mdt_object_fid(child)), rc);
1557 GOTO(out_child_unlock, result);
1563 rc = mdo_unlink(info->mti_env,
1564 mdt_object_child(parent),
1565 mdt_object_child(child),
1567 &info->mti_attr, 0);
1569 CERROR("%s: "DFID" cleanup of open: rc = %d\n",
1570 mdt_obd_name(info->mti_mdt),
1571 PFID(mdt_object_fid(child)), rc);
1572 mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_CREATE);
1578 mdt_object_open_unlock(info, child, lhc, ibits, result);
1580 mdt_object_put(info->mti_env, child);
1582 mdt_object_unlock_put(info, parent, lh, result || !created);
1585 lustre_msg_set_transno(req->rq_repmsg, 0);
1590 * Create an orphan object use local root.
1592 static struct mdt_object *mdt_orphan_open(struct mdt_thread_info *info,
1593 struct mdt_device *mdt,
1594 const struct lu_fid *fid,
1595 struct md_attr *attr, fmode_t fmode)
1597 const struct lu_env *env = info->mti_env;
1598 struct md_op_spec *spec = &info->mti_spec;
1599 struct lu_fid *local_root_fid = &info->mti_tmp_fid1;
1600 struct mdt_object *obj = NULL;
1601 struct mdt_object *local_root;
1602 static const struct lu_name lname = {
1603 .ln_name = "i_am_nobody",
1604 .ln_namelen = sizeof("i_am_nobody") - 1,
1606 struct lu_ucred *uc;
1607 cfs_cap_t uc_cap_save;
1611 rc = dt_root_get(env, mdt->mdt_bottom, local_root_fid);
1613 RETURN(ERR_PTR(rc));
1615 local_root = mdt_object_find(env, mdt, local_root_fid);
1616 if (IS_ERR(local_root))
1619 obj = mdt_object_new(env, mdt, fid);
1621 GOTO(out, rc = PTR_ERR(obj));
1623 spec->sp_cr_lookup = 0;
1624 spec->sp_feat = &dt_directory_features;
1625 spec->sp_cr_mode = MDL_MINMODE; /* no lock */
1626 spec->sp_cr_flags = MDS_OPEN_VOLATILE | fmode;
1627 if (attr->ma_valid & MA_LOV) {
1628 spec->u.sp_ea.eadata = attr->ma_lmm;
1629 spec->u.sp_ea.eadatalen = attr->ma_lmm_size;
1630 spec->sp_cr_flags |= MDS_OPEN_HAS_EA;
1632 spec->sp_cr_flags |= MDS_OPEN_DELAY_CREATE;
1636 uc_cap_save = uc->uc_cap;
1637 uc->uc_cap |= 1 << CFS_CAP_DAC_OVERRIDE;
1638 rc = mdo_create(env, mdt_object_child(local_root), &lname,
1639 mdt_object_child(obj), spec, attr);
1640 uc->uc_cap = uc_cap_save;
1642 CERROR("%s: cannot create volatile file "DFID": rc = %d\n",
1643 mdt_obd_name(mdt), PFID(fid), rc);
1647 rc = mo_open(env, mdt_object_child(obj), MDS_OPEN_CREATED);
1649 CERROR("%s: cannot open volatile file "DFID", orphan "
1650 "file will be left in PENDING directory until "
1651 "next reboot, rc = %d\n", mdt_obd_name(mdt),
1658 mdt_object_put(env, obj);
1661 mdt_object_put(env, local_root);
1665 static int mdt_hsm_release(struct mdt_thread_info *info, struct mdt_object *o,
1668 struct mdt_lock_handle *lh = &info->mti_lh[MDT_LH_LAYOUT];
1669 struct close_data *data;
1670 struct ldlm_lock *lease;
1671 struct mdt_object *orphan;
1672 struct md_attr *orp_ma;
1679 if (exp_connect_flags(info->mti_exp) & OBD_CONNECT_RDONLY)
1682 data = req_capsule_client_get(info->mti_pill, &RMF_CLOSE_DATA);
1686 lease = ldlm_handle2lock(&data->cd_handle);
1690 /* try to hold open_sem so that nobody else can open the file */
1691 if (!down_write_trylock(&o->mot_open_sem)) {
1692 ldlm_lock_cancel(lease);
1693 GOTO(out_reprocess, rc = -EBUSY);
1696 /* Check if the lease open lease has already canceled */
1697 lock_res_and_lock(lease);
1698 lease_broken = ldlm_is_cancel(lease);
1699 unlock_res_and_lock(lease);
1701 LDLM_DEBUG(lease, DFID " lease broken? %d",
1702 PFID(mdt_object_fid(o)), lease_broken);
1704 /* Cancel server side lease. Client side counterpart should
1705 * have been cancelled. It's okay to cancel it now as we've
1706 * held mot_open_sem. */
1707 ldlm_lock_cancel(lease);
1709 if (lease_broken) /* don't perform release task */
1710 GOTO(out_unlock, rc = -ESTALE);
1712 if (fid_is_zero(&data->cd_fid) || !fid_is_sane(&data->cd_fid))
1713 GOTO(out_unlock, rc = -EINVAL);
1715 /* ma_need was set before but it seems fine to change it in order to
1716 * avoid modifying the one from RPC */
1717 ma->ma_need = MA_HSM;
1718 rc = mdt_attr_get_complex(info, o, ma);
1720 GOTO(out_unlock, rc);
1722 if (!mdt_hsm_release_allow(ma))
1723 GOTO(out_unlock, rc = -EPERM);
1725 /* already released? */
1726 if (ma->ma_hsm.mh_flags & HS_RELEASED)
1727 GOTO(out_unlock, rc = 0);
1729 /* Compare on-disk and packed data_version */
1730 if (data->cd_data_version != ma->ma_hsm.mh_arch_ver) {
1731 CDEBUG(D_HSM, DFID" data_version mismatches: packed=%llu"
1732 " and on-disk=%llu\n", PFID(mdt_object_fid(o)),
1733 data->cd_data_version, ma->ma_hsm.mh_arch_ver);
1734 GOTO(out_unlock, rc = -EPERM);
1737 ma->ma_valid = MA_INODE;
1738 ma->ma_attr.la_valid &= LA_ATIME | LA_MTIME | LA_CTIME | LA_SIZE;
1739 rc = mo_attr_set(info->mti_env, mdt_object_child(o), ma);
1741 GOTO(out_unlock, rc);
1743 ma->ma_need = MA_INODE | MA_LOV;
1744 rc = mdt_attr_get_complex(info, o, ma);
1746 GOTO(out_unlock, rc);
1748 if (!(ma->ma_valid & MA_LOV)) {
1749 /* Even empty file are released */
1750 memset(ma->ma_lmm, 0, sizeof(*ma->ma_lmm));
1751 ma->ma_lmm->lmm_magic = cpu_to_le32(LOV_MAGIC_V1_DEF);
1752 ma->ma_lmm->lmm_pattern = cpu_to_le32(LOV_PATTERN_RAID0);
1753 ma->ma_lmm->lmm_stripe_size = cpu_to_le32(LOV_MIN_STRIPE_SIZE);
1754 ma->ma_lmm_size = sizeof(*ma->ma_lmm);
1756 /* Magic must be LOV_MAGIC_Vx_DEF otherwise LOD will interpret
1757 * ma_lmm as lov_user_md, then it will be confused by union of
1758 * layout_gen and stripe_offset. */
1759 if (le32_to_cpu(ma->ma_lmm->lmm_magic) == LOV_MAGIC_V1)
1760 ma->ma_lmm->lmm_magic = cpu_to_le32(LOV_MAGIC_V1_DEF);
1761 else if (le32_to_cpu(ma->ma_lmm->lmm_magic) == LOV_MAGIC_V3)
1762 ma->ma_lmm->lmm_magic = cpu_to_le32(LOV_MAGIC_V3_DEF);
1764 GOTO(out_unlock, rc = -EINVAL);
1767 /* Set file as released */
1768 ma->ma_lmm->lmm_pattern |= cpu_to_le32(LOV_PATTERN_F_RELEASED);
1770 orp_ma = &info->mti_u.hsm.attr;
1771 orp_ma->ma_attr.la_mode = S_IFREG | S_IWUSR;
1772 orp_ma->ma_attr.la_uid = ma->ma_attr.la_uid;
1773 orp_ma->ma_attr.la_gid = ma->ma_attr.la_gid;
1774 orp_ma->ma_attr.la_valid = LA_MODE | LA_UID | LA_GID;
1775 orp_ma->ma_lmm = ma->ma_lmm;
1776 orp_ma->ma_lmm_size = ma->ma_lmm_size;
1777 orp_ma->ma_valid = MA_INODE | MA_LOV;
1778 orphan = mdt_orphan_open(info, info->mti_mdt, &data->cd_fid, orp_ma,
1780 if (IS_ERR(orphan)) {
1781 CERROR("%s: cannot open orphan file "DFID": rc = %ld\n",
1782 mdt_obd_name(info->mti_mdt), PFID(&data->cd_fid),
1784 GOTO(out_unlock, rc = PTR_ERR(orphan));
1787 /* Set up HSM attribute for orphan object */
1788 CLASSERT(sizeof(struct hsm_attrs) <= sizeof(info->mti_xattr_buf));
1789 buf = &info->mti_buf;
1790 buf->lb_buf = info->mti_xattr_buf;
1791 buf->lb_len = sizeof(struct hsm_attrs);
1792 ma->ma_hsm.mh_flags |= HS_RELEASED;
1793 lustre_hsm2buf(buf->lb_buf, &ma->ma_hsm);
1794 ma->ma_hsm.mh_flags &= ~HS_RELEASED;
1796 mdt_lock_reg_init(lh, LCK_EX);
1797 rc = mdt_object_lock(info, o, lh, MDS_INODELOCK_LAYOUT |
1798 MDS_INODELOCK_XATTR);
1800 GOTO(out_close, rc);
1802 rc = mo_xattr_set(info->mti_env, mdt_object_child(orphan), buf,
1806 /* Swap layout with orphan object */
1807 rc = mo_swap_layouts(info->mti_env, mdt_object_child(o),
1808 mdt_object_child(orphan),
1809 SWAP_LAYOUTS_MDS_HSM);
1811 /* Release exclusive LL */
1812 mdt_object_unlock(info, o, lh, 1);
1817 /* Close orphan object anyway */
1818 rc2 = mo_close(info->mti_env, mdt_object_child(orphan), orp_ma,
1821 CERROR("%s: error closing volatile file "DFID": rc = %d\n",
1822 mdt_obd_name(info->mti_mdt), PFID(&data->cd_fid), rc2);
1823 LU_OBJECT_DEBUG(D_HSM, info->mti_env, &orphan->mot_obj,
1825 mdt_object_put(info->mti_env, orphan);
1828 up_write(&o->mot_open_sem);
1830 /* already released */
1832 struct mdt_body *repbody;
1834 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
1835 LASSERT(repbody != NULL);
1836 repbody->mbo_valid |= OBD_MD_CLOSE_INTENT_EXECED;
1840 ldlm_reprocess_all(lease->l_resource);
1841 LDLM_LOCK_PUT(lease);
1849 int mdt_close_swap_layouts(struct mdt_thread_info *info,
1850 struct mdt_object *o, struct md_attr *ma)
1852 struct mdt_lock_handle *lh1 = &info->mti_lh[MDT_LH_NEW];
1853 struct mdt_lock_handle *lh2 = &info->mti_lh[MDT_LH_OLD];
1854 struct close_data *data;
1855 struct ldlm_lock *lease;
1856 struct mdt_object *o1 = o, *o2;
1862 if (exp_connect_flags(info->mti_exp) & OBD_CONNECT_RDONLY)
1865 if (!S_ISREG(lu_object_attr(&o1->mot_obj)))
1868 data = req_capsule_client_get(info->mti_pill, &RMF_CLOSE_DATA);
1872 if (fid_is_zero(&data->cd_fid) || !fid_is_sane(&data->cd_fid))
1875 rc = lu_fid_cmp(&data->cd_fid, mdt_object_fid(o));
1876 if (unlikely(rc == 0))
1879 /* Exchange o1 and o2, to enforce locking order */
1880 swap_objects = (rc < 0);
1882 lease = ldlm_handle2lock(&data->cd_handle);
1886 o2 = mdt_object_find(info->mti_env, info->mti_mdt, &data->cd_fid);
1888 GOTO(out_lease, rc = PTR_ERR(o2));
1890 if (!S_ISREG(lu_object_attr(&o2->mot_obj))) {
1891 swap_objects = false; /* not swapped yet */
1892 GOTO(out_obj, rc = -EINVAL);
1898 rc = mo_permission(info->mti_env, NULL, mdt_object_child(o1), NULL,
1903 rc = mo_permission(info->mti_env, NULL, mdt_object_child(o2), NULL,
1908 /* try to hold open_sem so that nobody else can open the file */
1909 if (!down_write_trylock(&o->mot_open_sem)) {
1910 ldlm_lock_cancel(lease);
1911 GOTO(out_obj, rc = -EBUSY);
1914 /* Check if the lease open lease has already canceled */
1915 lock_res_and_lock(lease);
1916 lease_broken = ldlm_is_cancel(lease);
1917 unlock_res_and_lock(lease);
1919 LDLM_DEBUG(lease, DFID " lease broken? %d",
1920 PFID(mdt_object_fid(o)), lease_broken);
1922 /* Cancel server side lease. Client side counterpart should
1923 * have been cancelled. It's okay to cancel it now as we've
1924 * held mot_open_sem. */
1925 ldlm_lock_cancel(lease);
1928 GOTO(out_unlock_sem, rc = -ESTALE);
1930 mdt_lock_reg_init(lh1, LCK_EX);
1931 rc = mdt_object_lock(info, o1, lh1, MDS_INODELOCK_LAYOUT |
1932 MDS_INODELOCK_XATTR);
1934 GOTO(out_unlock_sem, rc);
1936 mdt_lock_reg_init(lh2, LCK_EX);
1937 rc = mdt_object_lock(info, o2, lh2, MDS_INODELOCK_LAYOUT |
1938 MDS_INODELOCK_XATTR);
1940 GOTO(out_unlock1, rc);
1942 /* Swap layout with orphan object */
1943 rc = mo_swap_layouts(info->mti_env, mdt_object_child(o1),
1944 mdt_object_child(o2), 0);
1946 GOTO(out_unlock2, rc);
1951 /* Release exclusive LL */
1952 mdt_object_unlock(info, o2, lh2, 1);
1955 mdt_object_unlock(info, o1, lh1, 1);
1958 up_write(&o->mot_open_sem);
1960 /* already swapped */
1962 struct mdt_body *repbody;
1964 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
1965 LASSERT(repbody != NULL);
1966 repbody->mbo_valid |= OBD_MD_CLOSE_INTENT_EXECED;
1970 mdt_object_put(info->mti_env, swap_objects ? o1 : o2);
1972 ldlm_reprocess_all(lease->l_resource);
1975 LDLM_LOCK_PUT(lease);
1985 #define MFD_CLOSED(mode) ((mode) == MDS_FMODE_CLOSED)
1986 static int mdt_mfd_closed(struct mdt_file_data *mfd)
1988 return ((mfd == NULL) || MFD_CLOSED(mfd->mfd_mode));
1991 int mdt_mfd_close(struct mdt_thread_info *info, struct mdt_file_data *mfd)
1993 struct mdt_object *o = mfd->mfd_object;
1994 struct md_object *next = mdt_object_child(o);
1995 struct md_attr *ma = &info->mti_attr;
2000 mode = mfd->mfd_mode;
2002 if (ma->ma_attr_flags & MDS_HSM_RELEASE) {
2003 rc = mdt_hsm_release(info, o, ma);
2005 CDEBUG(D_HSM, "%s: File " DFID " release failed: %d\n",
2006 mdt_obd_name(info->mti_mdt),
2007 PFID(mdt_object_fid(o)), rc);
2008 /* continue to close even error occurred. */
2012 if (ma->ma_attr_flags & MDS_CLOSE_LAYOUT_SWAP) {
2013 rc = mdt_close_swap_layouts(info, o, ma);
2016 "%s: cannot swap layout of "DFID": rc=%d\n",
2017 mdt_obd_name(info->mti_mdt),
2018 PFID(mdt_object_fid(o)), rc);
2019 /* continue to close even if error occurred. */
2023 if (mode & FMODE_WRITE)
2025 else if (mode & MDS_FMODE_EXEC)
2028 /* Update atime on close only. */
2029 if ((mode & MDS_FMODE_EXEC || mode & FMODE_READ || mode & FMODE_WRITE)
2030 && (ma->ma_valid & MA_INODE) && (ma->ma_attr.la_valid & LA_ATIME)) {
2031 /* Set the atime only. */
2032 ma->ma_valid = MA_INODE;
2033 ma->ma_attr.la_valid = LA_ATIME;
2034 rc = mo_attr_set(info->mti_env, next, ma);
2037 /* If file data is modified, add the dirty flag. */
2038 if (ma->ma_attr_flags & MDS_DATA_MODIFIED)
2039 rc = mdt_add_dirty_flag(info, o, ma);
2041 ma->ma_need |= MA_INODE;
2042 ma->ma_valid &= ~MA_INODE;
2044 if (!MFD_CLOSED(mode))
2045 rc = mo_close(info->mti_env, next, ma, mode);
2047 /* adjust open and lease count */
2048 if (mode & MDS_OPEN_LEASE) {
2049 LASSERT(atomic_read(&o->mot_lease_count) > 0);
2050 atomic_dec(&o->mot_lease_count);
2053 LASSERT(atomic_read(&o->mot_open_count) > 0);
2054 atomic_dec(&o->mot_open_count);
2056 mdt_object_put(info->mti_env, o);
2061 int mdt_close_internal(struct mdt_thread_info *info, struct ptlrpc_request *req,
2062 struct mdt_body *repbody)
2064 struct mdt_export_data *med;
2065 struct mdt_file_data *mfd;
2066 struct mdt_object *o;
2067 struct md_attr *ma = &info->mti_attr;
2072 med = &req->rq_export->exp_mdt_data;
2073 spin_lock(&med->med_open_lock);
2074 mfd = mdt_handle2mfd(med, &info->mti_close_handle, req_is_replay(req));
2075 if (mdt_mfd_closed(mfd)) {
2076 spin_unlock(&med->med_open_lock);
2077 CDEBUG(D_INODE, "no handle for file close: fid = "DFID
2078 ": cookie = %#llx\n", PFID(info->mti_rr.rr_fid1),
2079 info->mti_close_handle.cookie);
2080 /** not serious error since bug 3633 */
2083 class_handle_unhash(&mfd->mfd_handle);
2084 list_del_init(&mfd->mfd_list);
2085 spin_unlock(&med->med_open_lock);
2087 /* Do not lose object before last unlink. */
2088 o = mfd->mfd_object;
2089 mdt_object_get(info->mti_env, o);
2090 ret = mdt_mfd_close(info, mfd);
2091 if (repbody != NULL)
2092 rc = mdt_handle_last_unlink(info, o, ma);
2093 mdt_object_put(info->mti_env, o);
2096 RETURN(rc ? rc : ret);
2099 int mdt_close(struct tgt_session_info *tsi)
2101 struct mdt_thread_info *info = tsi2mdt_info(tsi);
2102 struct ptlrpc_request *req = tgt_ses_req(tsi);
2103 struct md_attr *ma = &info->mti_attr;
2104 struct mdt_body *repbody = NULL;
2108 mdt_counter_incr(req, LPROC_MDT_CLOSE);
2109 /* Close may come with the Size-on-MDS update. Unpack it. */
2110 rc = mdt_close_unpack(info);
2112 GOTO(out, rc = err_serious(rc));
2114 /* These fields are no longer used and are left for compatibility.
2115 * size is always zero */
2116 req_capsule_set_size(info->mti_pill, &RMF_MDT_MD, RCL_SERVER,
2118 req_capsule_set_size(info->mti_pill, &RMF_LOGCOOKIES, RCL_SERVER,
2120 rc = req_capsule_server_pack(info->mti_pill);
2121 if (mdt_check_resent(info, mdt_reconstruct_generic, NULL)) {
2122 mdt_client_compatibility(info);
2124 mdt_fix_reply(info);
2125 mdt_exit_ucred(info);
2126 GOTO(out, rc = lustre_msg_get_status(req->rq_repmsg));
2129 /* Continue to close handle even if we can not pack reply */
2131 repbody = req_capsule_server_get(info->mti_pill,
2133 ma->ma_lmm = req_capsule_server_get(info->mti_pill,
2135 ma->ma_lmm_size = req_capsule_get_size(info->mti_pill,
2138 ma->ma_need = MA_INODE | MA_LOV;
2139 repbody->mbo_eadatasize = 0;
2140 repbody->mbo_aclsize = 0;
2142 rc = err_serious(rc);
2145 rc = mdt_close_internal(info, req, repbody);
2147 mdt_empty_transno(info, rc);
2149 if (repbody != NULL) {
2150 mdt_client_compatibility(info);
2151 rc = mdt_fix_reply(info);
2154 mdt_exit_ucred(info);
2155 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK))
2156 GOTO(out, rc = err_serious(-ENOMEM));
2158 if (OBD_FAIL_CHECK_RESET(OBD_FAIL_MDS_CLOSE_NET_REP,
2159 OBD_FAIL_MDS_CLOSE_NET_REP))
2160 tsi->tsi_reply_fail_id = OBD_FAIL_MDS_CLOSE_NET_REP;
2162 mdt_thread_info_fini(info);
2163 RETURN(rc ? rc : ret);