1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
5 * Lustre Metadata Target (mdt) open/close file handling
7 * Copyright (C) 2002-2006 Cluster File Systems, Inc.
8 * Author: Huang Hua <huanghua@clusterfs.com>
10 * This file is part of the Lustre file system, http://www.lustre.org
11 * Lustre is a trademark of Cluster File Systems, Inc.
13 * You may have signed or agreed to another license before downloading
14 * this software. If so, you are bound by the terms and conditions
15 * of that agreement, and the following does not apply to you. See the
16 * LICENSE file included with this distribution for more information.
18 * If you did not agree to a different license, then this copy of Lustre
19 * is open source software; you can redistribute it and/or modify it
20 * under the terms of version 2 of the GNU General Public License as
21 * published by the Free Software Foundation.
23 * In either case, Lustre is distributed in the hope that it will be
24 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * license text for more details.
30 # define EXPORT_SYMTAB
32 #define DEBUG_SUBSYSTEM S_MDS
34 #include <linux/lustre_acl.h>
35 #include <lustre_mds.h>
36 #include "mdt_internal.h"
38 /* we do nothing because we do not have refcount now */
39 static void mdt_mfd_get(void *mfdp)
43 /* Create a new mdt_file_data struct, initialize it,
44 * and insert it to global hash table */
45 struct mdt_file_data *mdt_mfd_new(void)
47 struct mdt_file_data *mfd;
52 INIT_LIST_HEAD(&mfd->mfd_handle.h_link);
53 INIT_LIST_HEAD(&mfd->mfd_list);
54 class_handle_hash(&mfd->mfd_handle, mdt_mfd_get);
60 * Find the mfd pointed to by handle in global hash table.
61 * In case of replay the handle is obsoleted
62 * but mfd can be found in mfd list by that handle
64 struct mdt_file_data *mdt_handle2mfd(struct mdt_thread_info *info,
65 const struct lustre_handle *handle)
67 struct ptlrpc_request *req = mdt_info_req(info);
68 struct mdt_file_data *mfd;
71 LASSERT(handle != NULL);
72 mfd = class_handle2object(handle->cookie);
73 /* during dw/setattr replay the mfd can be found by old handle */
75 lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
76 struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
77 list_for_each_entry(mfd, &med->med_open_head, mfd_list) {
78 if (mfd->mfd_old_handle.cookie == handle->cookie)
87 void mdt_mfd_free(struct mdt_file_data *mfd)
89 LASSERT(list_empty(&mfd->mfd_list));
90 OBD_FREE_RCU(mfd, sizeof *mfd, &mfd->mfd_handle);
93 static int mdt_create_data(struct mdt_thread_info *info,
94 struct mdt_object *p, struct mdt_object *o)
96 struct md_op_spec *spec = &info->mti_spec;
97 struct md_attr *ma = &info->mti_attr;
101 if (!md_should_create(spec->sp_cr_flags))
104 ma->ma_need = MA_INODE | MA_LOV;
106 rc = mdo_create_data(info->mti_env,
107 p ? mdt_object_child(p) : NULL,
108 mdt_object_child(o), spec, ma);
112 static int mdt_epoch_opened(struct mdt_object *mo)
114 return mo->mot_epochcount;
117 int mdt_sizeonmds_enabled(struct mdt_object *mo)
119 return !mo->mot_ioepoch;
122 /* Re-enable Size-on-MDS. */
123 void mdt_sizeonmds_enable(struct mdt_thread_info *info,
124 struct mdt_object *mo)
126 spin_lock(&info->mti_mdt->mdt_ioepoch_lock);
127 if (info->mti_epoch->ioepoch == mo->mot_ioepoch) {
128 LASSERT(!mdt_epoch_opened(mo));
132 spin_unlock(&info->mti_mdt->mdt_ioepoch_lock);
135 /* Open the epoch. Epoch open is allowed if @writecount is not negative.
136 * The epoch and writecount handling is performed under the mdt_ioepoch_lock. */
137 int mdt_epoch_open(struct mdt_thread_info *info, struct mdt_object *o)
139 struct mdt_device *mdt = info->mti_mdt;
144 if (!(mdt_conn_flags(info) & OBD_CONNECT_SOM) ||
145 !S_ISREG(lu_object_attr(&o->mot_obj.mo_lu)))
148 spin_lock(&mdt->mdt_ioepoch_lock);
149 if (mdt_epoch_opened(o)) {
150 /* Epoch continues even if there is no writers yet. */
151 CDEBUG(D_INODE, "continue epoch "LPU64" for "DFID"\n",
152 o->mot_ioepoch, PFID(mdt_object_fid(o)));
154 if (info->mti_replayepoch > mdt->mdt_ioepoch)
155 mdt->mdt_ioepoch = info->mti_replayepoch;
158 o->mot_ioepoch = info->mti_replayepoch ?
159 info->mti_replayepoch : mdt->mdt_ioepoch;
160 CDEBUG(D_INODE, "starting epoch "LPU64" for "DFID"\n",
161 mdt->mdt_ioepoch, PFID(mdt_object_fid(o)));
165 spin_unlock(&mdt->mdt_ioepoch_lock);
167 /* Cancel Size-on-MDS attributes on clients if not truncate.
168 * In the later case, mdt_reint_setattr will do it. */
169 if (cancel && (info->mti_rr.rr_fid1 != NULL)) {
170 struct mdt_lock_handle *lh = &info->mti_lh[MDT_LH_CHILD];
171 mdt_lock_reg_init(lh, LCK_EX);
172 rc = mdt_object_lock(info, o, lh, MDS_INODELOCK_UPDATE,
175 mdt_object_unlock(info, o, lh, 1);
180 /* Update the on-disk attributes if needed and re-enable Size-on-MDS caching. */
181 static int mdt_sizeonmds_update(struct mdt_thread_info *info,
182 struct mdt_object *o)
186 CDEBUG(D_INODE, "Closing epoch "LPU64" on "DFID". Count %d\n",
187 o->mot_ioepoch, PFID(mdt_object_fid(o)), o->mot_epochcount);
189 if (info->mti_attr.ma_attr.la_valid & LA_SIZE) {
190 /* Do Size-on-MDS attribute update.
191 * Size-on-MDS is re-enabled inside. */
192 /* XXX: since we have opened the file, it is unnecessary
193 * to check permission when close it. Between the "open"
194 * and "close", maybe someone has changed the file mode
195 * or flags, or the file created mode do not permit wirte,
196 * and so on. Just set MDS_PERM_BYPASS for all the cases. */
197 info->mti_attr.ma_attr_flags |= MDS_PERM_BYPASS;
198 info->mti_attr.ma_attr.la_valid &= LA_SIZE | LA_BLOCKS |
199 LA_ATIME | LA_MTIME | LA_CTIME;
200 RETURN(mdt_attr_set(info, o, 0));
202 mdt_sizeonmds_enable(info, o);
207 * Returns 1 if epoch does not close.
208 * Returns 0 if epoch closes.
209 * Returns -EAGAIN if epoch closes but an Size-on-MDS Update is still needed
210 * from the client. */
211 static int mdt_epoch_close(struct mdt_thread_info *info, struct mdt_object *o)
213 int eviction = (mdt_info_req(info) == NULL ? 1 : 0);
214 struct lu_attr *la = &info->mti_attr.ma_attr;
220 if (!(mdt_conn_flags(info) & OBD_CONNECT_SOM) ||
221 !S_ISREG(lu_object_attr(&o->mot_obj.mo_lu)))
224 spin_lock(&info->mti_mdt->mdt_ioepoch_lock);
226 /* Epoch closes only if client tells about it or eviction occures. */
227 if (eviction || (info->mti_epoch->flags & MF_EPOCH_CLOSE)) {
228 LASSERT(o->mot_epochcount);
231 CDEBUG(D_INODE, "Closing epoch "LPU64" on "DFID". Count %d\n",
232 o->mot_ioepoch, PFID(mdt_object_fid(o)),
236 achange = (info->mti_epoch->flags & MF_SOM_CHANGE);
239 if (!eviction && !mdt_epoch_opened(o)) {
240 /* Epoch ends. Is an Size-on-MDS update needed? */
241 if (o->mot_flags & MF_SOM_CHANGE) {
242 /* Some previous writer changed the attribute.
243 * Do not believe to the current Size-on-MDS
244 * update, re-ask client. */
246 } else if (!(la->la_valid & LA_SIZE) && achange) {
247 /* Attributes were changed by the last writer
248 * only but no Size-on-MDS update is received.*/
253 if (achange || eviction)
254 o->mot_flags |= MF_SOM_CHANGE;
257 opened = mdt_epoch_opened(o);
258 spin_unlock(&info->mti_mdt->mdt_ioepoch_lock);
260 /* If eviction occurred, do nothing. */
261 if ((rc == 0) && !opened && !eviction) {
262 /* Epoch ends and wanted Size-on-MDS update is obtained. */
263 rc = mdt_sizeonmds_update(info, o);
264 /* Avoid the following setattrs of these attributes, e.g.
265 * for atime update. */
266 info->mti_attr.ma_valid = 0;
271 int mdt_write_read(struct mdt_device *mdt, struct mdt_object *o)
275 spin_lock(&mdt->mdt_ioepoch_lock);
276 rc = o->mot_writecount;
277 spin_unlock(&mdt->mdt_ioepoch_lock);
281 int mdt_write_get(struct mdt_device *mdt, struct mdt_object *o)
285 spin_lock(&mdt->mdt_ioepoch_lock);
286 if (o->mot_writecount < 0)
290 spin_unlock(&mdt->mdt_ioepoch_lock);
294 static void mdt_write_put(struct mdt_device *mdt, struct mdt_object *o)
297 spin_lock(&mdt->mdt_ioepoch_lock);
299 spin_unlock(&mdt->mdt_ioepoch_lock);
303 static int mdt_write_deny(struct mdt_device *mdt, struct mdt_object *o)
307 spin_lock(&mdt->mdt_ioepoch_lock);
308 if (o->mot_writecount > 0)
312 spin_unlock(&mdt->mdt_ioepoch_lock);
316 static void mdt_write_allow(struct mdt_device *mdt, struct mdt_object *o)
319 spin_lock(&mdt->mdt_ioepoch_lock);
321 spin_unlock(&mdt->mdt_ioepoch_lock);
325 /* there can be no real transaction so prepare the fake one */
326 static void mdt_empty_transno(struct mdt_thread_info* info)
328 struct mdt_device *mdt = info->mti_mdt;
329 struct ptlrpc_request *req = mdt_info_req(info);
332 /* transaction is occured already */
333 if (lustre_msg_get_transno(req->rq_repmsg) != 0) {
338 spin_lock(&mdt->mdt_transno_lock);
339 if (info->mti_transno == 0) {
340 info->mti_transno = ++ mdt->mdt_last_transno;
342 /* should be replay */
343 if (info->mti_transno > mdt->mdt_last_transno)
344 mdt->mdt_last_transno = info->mti_transno;
346 spin_unlock(&mdt->mdt_transno_lock);
348 CDEBUG(D_INODE, "transno = %llu, last_committed = %llu\n",
350 req->rq_export->exp_obd->obd_last_committed);
352 req->rq_transno = info->mti_transno;
353 lustre_msg_set_transno(req->rq_repmsg, info->mti_transno);
354 lustre_msg_set_last_xid(req->rq_repmsg, req->rq_xid);
358 void mdt_mfd_set_mode(struct mdt_file_data *mfd, int mode)
360 LASSERT(mfd != NULL);
362 CDEBUG(D_HA, "Change mfd %p mode 0x%x->0x%x\n",
363 mfd, (unsigned int)mfd->mfd_mode, (unsigned int)mode);
365 mfd->mfd_mode = mode;
368 static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p,
369 struct mdt_object *o, int flags, int created)
371 struct ptlrpc_request *req = mdt_info_req(info);
372 struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
373 struct mdt_file_data *mfd;
374 struct md_attr *ma = &info->mti_attr;
375 struct lu_attr *la = &ma->ma_attr;
376 struct mdt_body *repbody;
377 int rc = 0, isdir, isreg;
380 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
382 isreg = S_ISREG(la->la_mode);
383 isdir = S_ISDIR(la->la_mode);
384 if ((isreg && !(ma->ma_valid & MA_LOV))) {
386 * No EA, check whether it is will set regEA and dirEA since in
387 * above attr get, these size might be zero, so reset it, to
388 * retrieve the MD after create obj.
390 ma->ma_lmm_size = req_capsule_get_size(info->mti_pill,
393 /* in replay case, p == NULL */
394 rc = mdt_create_data(info, p, o);
399 CDEBUG(D_INODE, "after open, ma_valid bit = "LPX64" lmm_size = %d\n",
400 ma->ma_valid, ma->ma_lmm_size);
402 if (ma->ma_valid & MA_LOV) {
403 LASSERT(ma->ma_lmm_size != 0);
404 repbody->eadatasize = ma->ma_lmm_size;
406 repbody->valid |= OBD_MD_FLDIREA;
408 repbody->valid |= OBD_MD_FLEASIZE;
411 if (flags & FMODE_WRITE) {
412 rc = mdt_write_get(info->mti_mdt, o);
414 mdt_epoch_open(info, o);
415 repbody->ioepoch = o->mot_ioepoch;
417 } else if (flags & MDS_FMODE_EXEC) {
418 rc = mdt_write_deny(info->mti_mdt, o);
423 rc = mo_open(info->mti_env, mdt_object_child(o),
424 created ? flags | MDS_OPEN_CREATED : flags);
431 * Keep a reference on this object for this open, and is
432 * released by mdt_mfd_close().
434 mdt_object_get(info->mti_env, o);
437 * @flags is always not zero. At least it should be FMODE_READ,
438 * FMODE_WRITE or FMODE_EXEC.
443 mdt_mfd_set_mode(mfd, flags);
446 mfd->mfd_xid = req->rq_xid;
449 if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
450 struct mdt_file_data *old_mfd;
451 /* Check wheather old cookie already exist in
452 * the list, becasue when do recovery, client
453 * might be disconnected from server, and
454 * restart replay, so there maybe some orphan
455 * mfd here, we should remove them */
456 LASSERT(info->mti_rr.rr_handle != NULL);
457 old_mfd = mdt_handle2mfd(info, info->mti_rr.rr_handle);
459 CDEBUG(D_HA, "del orph mfd %p cookie" LPX64"\n",
460 mfd, info->mti_rr.rr_handle->cookie);
461 spin_lock(&med->med_open_lock);
462 class_handle_unhash(&old_mfd->mfd_handle);
463 list_del_init(&old_mfd->mfd_list);
464 spin_unlock(&med->med_open_lock);
465 mdt_mfd_free(old_mfd);
467 CDEBUG(D_HA, "Store old cookie "LPX64" in new mfd\n",
468 info->mti_rr.rr_handle->cookie);
469 mfd->mfd_old_handle.cookie =
470 info->mti_rr.rr_handle->cookie;
472 spin_lock(&med->med_open_lock);
473 list_add(&mfd->mfd_list, &med->med_open_head);
474 spin_unlock(&med->med_open_lock);
476 repbody->handle.cookie = mfd->mfd_handle.h_cookie;
477 mdt_empty_transno(info);
485 static int mdt_finish_open(struct mdt_thread_info *info,
486 struct mdt_object *p, struct mdt_object *o,
487 int flags, int created, struct ldlm_reply *rep)
489 struct ptlrpc_request *req = mdt_info_req(info);
490 struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
491 struct mdt_device *mdt = info->mti_mdt;
492 struct md_attr *ma = &info->mti_attr;
493 struct lu_attr *la = &ma->ma_attr;
494 struct mdt_file_data *mfd;
495 struct mdt_body *repbody;
497 int isreg, isdir, islnk;
501 LASSERT(ma->ma_valid & MA_INODE);
503 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
505 isreg = S_ISREG(la->la_mode);
506 isdir = S_ISDIR(la->la_mode);
507 islnk = S_ISLNK(la->la_mode);
508 mdt_pack_attr2body(info, repbody, la, mdt_object_fid(o));
510 if (med->med_rmtclient) {
511 void *buf = req_capsule_server_get(info->mti_pill, &RMF_ACL);
513 rc = mdt_pack_remote_perm(info, o, buf);
515 repbody->valid &= ~OBD_MD_FLRMTPERM;
516 repbody->aclsize = 0;
518 repbody->valid |= OBD_MD_FLRMTPERM;
519 repbody->aclsize = sizeof(struct mdt_remote_perm);
522 #ifdef CONFIG_FS_POSIX_ACL
523 else if (req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) {
524 const struct lu_env *env = info->mti_env;
525 struct md_object *next = mdt_object_child(o);
526 struct lu_buf *buf = &info->mti_buf;
528 buf->lb_buf = req_capsule_server_get(info->mti_pill, &RMF_ACL);
529 buf->lb_len = req_capsule_get_size(info->mti_pill, &RMF_ACL,
531 if (buf->lb_len > 0) {
532 rc = mo_xattr_get(env, next, buf,
533 XATTR_NAME_ACL_ACCESS);
535 if (rc == -ENODATA) {
536 repbody->aclsize = 0;
537 repbody->valid |= OBD_MD_FLACL;
539 } else if (rc == -EOPNOTSUPP) {
542 CERROR("got acl size: %d\n", rc);
545 repbody->aclsize = rc;
546 repbody->valid |= OBD_MD_FLACL;
553 if (mdt->mdt_opts.mo_mds_capa) {
554 struct lustre_capa *capa;
556 capa = req_capsule_server_get(info->mti_pill, &RMF_CAPA1);
558 capa->lc_opc = CAPA_OPC_MDS_DEFAULT;
560 rc = mo_capa_get(info->mti_env, mdt_object_child(o), capa, 0);
563 repbody->valid |= OBD_MD_FLMDSCAPA;
565 if (mdt->mdt_opts.mo_oss_capa &&
566 S_ISREG(lu_object_attr(&o->mot_obj.mo_lu))) {
567 struct lustre_capa *capa;
569 capa = req_capsule_server_get(info->mti_pill, &RMF_CAPA2);
571 capa->lc_opc = CAPA_OPC_OSS_DEFAULT | capa_open_opc(flags);
573 rc = mo_capa_get(info->mti_env, mdt_object_child(o), capa, 0);
576 repbody->valid |= OBD_MD_FLOSSCAPA;
580 * If we are following a symlink, don't open; and do not return open
581 * handle for special nodes as client required.
583 if (islnk || (!isreg && !isdir &&
584 (req->rq_export->exp_connect_flags & OBD_CONNECT_NODEVOH))) {
585 lustre_msg_set_transno(req->rq_repmsg, 0);
589 mdt_set_disposition(info, rep, DISP_OPEN_OPEN);
592 * We need to return the existing object's fid back, so it is done here,
593 * after preparing the reply.
595 if (!created && (flags & MDS_OPEN_EXCL) && (flags & MDS_OPEN_CREAT))
598 /* This can't be done earlier, we need to return reply body */
600 if (flags & (MDS_OPEN_CREAT | FMODE_WRITE)) {
601 /* We are trying to create or write an existing dir. */
604 } else if (flags & MDS_OPEN_DIRECTORY)
607 if (OBD_FAIL_CHECK_RESET(OBD_FAIL_MDS_OPEN_CREATE,
608 OBD_FAIL_LDLM_REPLY | OBD_FAIL_ONCE)) {
613 if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
614 spin_lock(&med->med_open_lock);
615 list_for_each(t, &med->med_open_head) {
616 mfd = list_entry(t, struct mdt_file_data, mfd_list);
617 if (mfd->mfd_xid == req->rq_xid) {
622 spin_unlock(&med->med_open_lock);
625 repbody->handle.cookie = mfd->mfd_handle.h_cookie;
626 /*set repbody->ea_size for resent case*/
627 if (ma->ma_valid & MA_LOV) {
628 LASSERT(ma->ma_lmm_size != 0);
629 repbody->eadatasize = ma->ma_lmm_size;
631 repbody->valid |= OBD_MD_FLDIREA;
633 repbody->valid |= OBD_MD_FLEASIZE;
639 rc = mdt_mfd_open(info, p, o, flags, created);
643 extern void mdt_req_from_mcd(struct ptlrpc_request *req,
644 struct mdt_client_data *mcd);
646 void mdt_reconstruct_open(struct mdt_thread_info *info,
647 struct mdt_lock_handle *lhc)
649 const struct lu_env *env = info->mti_env;
650 struct mdt_device *mdt = info->mti_mdt;
651 struct req_capsule *pill = info->mti_pill;
652 struct ptlrpc_request *req = mdt_info_req(info);
653 struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
654 struct mdt_client_data *mcd = med->med_mcd;
655 struct md_attr *ma = &info->mti_attr;
656 struct mdt_reint_record *rr = &info->mti_rr;
657 __u32 flags = info->mti_spec.sp_cr_flags;
658 struct ldlm_reply *ldlm_rep;
659 struct mdt_object *parent;
660 struct mdt_object *child;
661 struct mdt_body *repbody;
665 LASSERT(pill->rc_fmt == &RQF_LDLM_INTENT_OPEN);
666 ldlm_rep = req_capsule_server_get(pill, &RMF_DLM_REP);
667 repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
669 ma->ma_lmm = req_capsule_server_get(pill, &RMF_MDT_MD);
670 ma->ma_lmm_size = req_capsule_get_size(pill, &RMF_MDT_MD,
672 ma->ma_need = MA_INODE | MA_LOV;
675 mdt_req_from_mcd(req, med->med_mcd);
676 mdt_set_disposition(info, ldlm_rep, mcd->mcd_last_data);
678 CERROR("This is reconstruct open: disp="LPX64", result=%d\n",
679 ldlm_rep->lock_policy_res1, req->rq_status);
681 if (mdt_get_disposition(ldlm_rep, DISP_OPEN_CREATE) &&
683 /* We did not create successfully, return error to client. */
684 GOTO(out, rc = req->rq_status);
686 if (mdt_get_disposition(ldlm_rep, DISP_OPEN_CREATE)) {
688 * We failed after creation, but we do not know in which step
689 * we failed. So try to check the child object.
691 parent = mdt_object_find(env, mdt, rr->rr_fid1);
692 LASSERT(!IS_ERR(parent));
694 child = mdt_object_find(env, mdt, rr->rr_fid2);
695 LASSERT(!IS_ERR(child));
697 rc = mdt_object_exists(child);
699 struct md_object *next;
701 mdt_set_capainfo(info, 1, rr->rr_fid2, BYPASS_CAPA);
702 next = mdt_object_child(child);
703 rc = mo_attr_get(env, next, ma);
705 rc = mdt_finish_open(info, parent, child,
708 /* the child object was created on remote server */
709 repbody->fid1 = *rr->rr_fid2;
710 repbody->valid |= (OBD_MD_FLID | OBD_MD_MDS);
712 } else if (rc == 0) {
713 /* the child does not exist, we should do regular open */
714 mdt_object_put(env, parent);
715 mdt_object_put(env, child);
716 GOTO(regular_open, 0);
718 mdt_object_put(env, parent);
719 mdt_object_put(env, child);
723 /* We did not try to create, so we are a pure open */
724 rc = mdt_reint_open(info, lhc);
730 lustre_msg_set_status(req->rq_repmsg, req->rq_status);
731 LASSERT(ergo(rc < 0, lustre_msg_get_transno(req->rq_repmsg) == 0));
734 static int mdt_open_by_fid(struct mdt_thread_info* info,
735 struct ldlm_reply *rep)
737 const struct lu_env *env = info->mti_env;
738 __u32 flags = info->mti_spec.sp_cr_flags;
739 struct mdt_reint_record *rr = &info->mti_rr;
740 struct md_attr *ma = &info->mti_attr;
741 struct mdt_object *o;
745 o = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid2);
747 RETURN(rc = PTR_ERR(o));
749 rc = mdt_object_exists(o);
751 mdt_set_disposition(info, rep, (DISP_IT_EXECD |
755 rc = mo_attr_get(env, mdt_object_child(o), ma);
757 rc = mdt_finish_open(info, NULL, o, flags, 0, rep);
758 } else if (rc == 0) {
761 /* the child object was created on remote server */
762 struct mdt_body *repbody;
763 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
764 repbody->fid1 = *rr->rr_fid2;
765 repbody->valid |= (OBD_MD_FLID | OBD_MD_MDS);
769 mdt_object_put(info->mti_env, o);
773 int mdt_pin(struct mdt_thread_info* info)
776 RETURN(err_serious(-EOPNOTSUPP));
779 /* Cross-ref request. Currently it can only be a pure open (w/o create) */
780 static int mdt_cross_open(struct mdt_thread_info* info,
781 const struct lu_fid *fid,
782 struct ldlm_reply *rep, __u32 flags)
784 struct md_attr *ma = &info->mti_attr;
785 struct mdt_object *o;
789 o = mdt_object_find(info->mti_env, info->mti_mdt, fid);
791 RETURN(rc = PTR_ERR(o));
793 rc = mdt_object_exists(o);
795 /* Do permission check for cross-open. */
796 rc = mo_permission(info->mti_env, NULL, mdt_object_child(o),
797 NULL, flags | MDS_OPEN_CROSS);
801 mdt_set_capainfo(info, 0, fid, BYPASS_CAPA);
802 rc = mo_attr_get(info->mti_env, mdt_object_child(o), ma);
804 rc = mdt_finish_open(info, NULL, o, flags, 0, rep);
805 } else if (rc == 0) {
807 * Something is wrong here. lookup was positive but there is
810 CERROR("Cross-ref object doesn't exist!\n");
813 /* Something is wrong here, the object is on another MDS! */
814 CERROR("The object isn't on this server! FLD error?\n");
815 LU_OBJECT_DEBUG(D_WARNING, info->mti_env,
817 "Object isn't on this server! FLD error?\n");
823 mdt_object_put(info->mti_env, o);
827 int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
829 struct mdt_device *mdt = info->mti_mdt;
830 struct ptlrpc_request *req = mdt_info_req(info);
831 struct mdt_object *parent;
832 struct mdt_object *child;
833 struct mdt_lock_handle *lh;
834 struct ldlm_reply *ldlm_rep;
835 struct mdt_body *repbody;
836 struct lu_fid *child_fid = &info->mti_tmp_fid1;
837 struct md_attr *ma = &info->mti_attr;
838 __u32 create_flags = info->mti_spec.sp_cr_flags;
839 struct mdt_reint_record *rr = &info->mti_rr;
840 struct lu_name *lname;
845 OBD_FAIL_TIMEOUT_ORSET(OBD_FAIL_MDS_PAUSE_OPEN, OBD_FAIL_ONCE,
846 (obd_timeout + 1) / 4);
848 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
850 ma->ma_lmm = req_capsule_server_get(info->mti_pill, &RMF_MDT_MD);
851 ma->ma_lmm_size = req_capsule_get_size(info->mti_pill, &RMF_MDT_MD,
853 ma->ma_need = MA_INODE | MA_LOV;
856 LASSERT(info->mti_pill->rc_fmt == &RQF_LDLM_INTENT_OPEN);
857 ldlm_rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP);
859 /* TODO: JOIN file */
860 if (create_flags & MDS_OPEN_JOIN_FILE) {
861 CERROR("JOIN file will be supported soon\n");
862 GOTO(out, result = err_serious(-EOPNOTSUPP));
865 CDEBUG(D_INODE, "I am going to open "DFID"/(%s->"DFID") "
866 "cr_flag=0%o mode=0%06o msg_flag=0x%x\n",
867 PFID(rr->rr_fid1), rr->rr_name,
868 PFID(rr->rr_fid2), create_flags,
869 ma->ma_attr.la_mode, lustre_msg_get_flags(req->rq_reqmsg));
871 if ((lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) ||
872 (req->rq_export->exp_libclient && create_flags&MDS_OPEN_HAS_EA)) {
873 /* This is a replay request or from liblustre with ea. */
874 result = mdt_open_by_fid(info, ldlm_rep);
876 if (result != -ENOENT) {
877 if (req->rq_export->exp_libclient &&
878 create_flags&MDS_OPEN_HAS_EA)
879 GOTO(out, result = 0);
883 * We didn't find the correct object, so we need to re-create it
884 * via a regular replay.
886 if (!(create_flags & MDS_OPEN_CREAT)) {
887 DEBUG_REQ(D_ERROR, req,"OPEN & CREAT not in open replay.");
888 GOTO(out, result = -EFAULT);
890 CDEBUG(D_INFO, "Open replay did find object, continue as "
894 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK))
895 GOTO(out, result = err_serious(-ENOMEM));
897 mdt_set_disposition(info, ldlm_rep,
898 (DISP_IT_EXECD | DISP_LOOKUP_EXECD));
900 if (info->mti_cross_ref) {
901 /* This is cross-ref open */
902 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
903 result = mdt_cross_open(info, rr->rr_fid1, ldlm_rep,
908 lh = &info->mti_lh[MDT_LH_PARENT];
909 mdt_lock_pdo_init(lh, (create_flags & MDS_OPEN_CREAT) ?
910 LCK_PW : LCK_PR, rr->rr_name, rr->rr_namelen);
912 parent = mdt_object_find_lock(info, rr->rr_fid1, lh,
913 MDS_INODELOCK_UPDATE);
915 GOTO(out, result = PTR_ERR(parent));
919 lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
921 result = mdo_lookup(info->mti_env, mdt_object_child(parent),
922 lname, child_fid, &info->mti_spec);
923 LASSERTF(ergo(result == 0, fid_is_sane(child_fid)),
924 "looking for "DFID"/%s, result fid="DFID"\n",
925 PFID(mdt_object_fid(parent)), rr->rr_name, PFID(child_fid));
927 if (result != 0 && result != -ENOENT && result != -ESTALE)
928 GOTO(out_parent, result);
930 if (result == -ENOENT || result == -ESTALE) {
931 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
932 if (result == -ESTALE) {
934 * -ESTALE means the parent is a dead(unlinked) dir, so
935 * it should return -ENOENT to in accordance with the
936 * original mds implementaion.
938 GOTO(out_parent, result = -ENOENT);
940 if (!(create_flags & MDS_OPEN_CREAT))
941 GOTO(out_parent, result);
942 *child_fid = *info->mti_rr.rr_fid2;
943 LASSERTF(fid_is_sane(child_fid), "fid="DFID"\n",
947 * Check for O_EXCL is moved to the mdt_finish_open(), we need to
948 * return FID back in that case.
950 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
953 child = mdt_object_find(info->mti_env, mdt, child_fid);
955 GOTO(out_parent, result = PTR_ERR(child));
957 mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA);
958 if (result == -ENOENT) {
959 /* Not found and with MDS_OPEN_CREAT: let's create it. */
960 mdt_set_disposition(info, ldlm_rep, DISP_OPEN_CREATE);
962 /* Let lower layers know what is lock mode on directory. */
963 info->mti_spec.sp_cr_mode =
964 mdt_dlm_mode2mdl_mode(lh->mlh_pdo_mode);
967 * Do not perform lookup sanity check. We know that name does
970 info->mti_spec.sp_cr_lookup = 0;
972 result = mdo_create(info->mti_env,
973 mdt_object_child(parent),
975 mdt_object_child(child),
978 if (result == -ERESTART) {
979 mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_CREATE);
980 GOTO(out_child, result);
983 GOTO(out_child, result);
987 /* We have to get attr & lov ea for this object */
988 result = mo_attr_get(info->mti_env, mdt_object_child(child),
991 * The object is on remote node, return its FID for remote open.
993 if (result == -EREMOTE) {
997 * Check if this lock already was sent to client and
998 * this is resent case. For resent case do not take lock
999 * again, use what is already granted.
1001 LASSERT(lhc != NULL);
1003 if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
1004 struct ldlm_lock *lock;
1006 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
1009 lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
1011 CERROR("Invalid lock handle "LPX64"\n",
1012 lhc->mlh_reg_lh.cookie);
1015 LASSERT(fid_res_name_eq(mdt_object_fid(child),
1016 &lock->l_resource->lr_name));
1017 LDLM_LOCK_PUT(lock);
1020 mdt_lock_handle_init(lhc);
1021 mdt_lock_reg_init(lhc, LCK_PR);
1023 rc = mdt_object_lock(info, child, lhc,
1024 MDS_INODELOCK_LOOKUP,
1027 repbody->fid1 = *mdt_object_fid(child);
1028 repbody->valid |= (OBD_MD_FLID | OBD_MD_MDS);
1031 GOTO(out_child, result);
1035 /* Try to open it now. */
1036 result = mdt_finish_open(info, parent, child, create_flags,
1039 if (result != 0 && created) {
1043 ma->ma_cookie_size = 0;
1044 info->mti_no_need_trans = 1;
1045 rc2 = mdo_unlink(info->mti_env,
1046 mdt_object_child(parent),
1047 mdt_object_child(child),
1051 CERROR("Error in cleanup of open\n");
1055 mdt_object_put(info->mti_env, child);
1057 mdt_object_unlock_put(info, parent, lh, result);
1060 lustre_msg_set_transno(req->rq_repmsg, 0);
1064 #define MFD_CLOSED(mode) (((mode) & ~(FMODE_EPOCH | FMODE_SOM | \
1065 FMODE_EPOCHLCK)) == FMODE_CLOSED)
1067 static int mdt_mfd_closed(struct mdt_file_data *mfd)
1069 return ((mfd == NULL) || MFD_CLOSED(mfd->mfd_mode));
1072 int mdt_mfd_close(struct mdt_thread_info *info, struct mdt_file_data *mfd)
1074 struct mdt_object *o = mfd->mfd_object;
1075 struct md_object *next = mdt_object_child(o);
1076 struct md_attr *ma = &info->mti_attr;
1077 int rc = 0, ret = 0;
1081 mode = mfd->mfd_mode;
1083 if ((mode & FMODE_WRITE) || (mode & FMODE_EPOCHLCK)) {
1084 mdt_write_put(info->mti_mdt, o);
1085 ret = mdt_epoch_close(info, o);
1086 } else if (mode & MDS_FMODE_EXEC) {
1087 mdt_write_allow(info->mti_mdt, o);
1088 } else if (mode & FMODE_EPOCH) {
1089 ret = mdt_epoch_close(info, o);
1092 /* Update atime on close only. */
1093 if ((mode & MDS_FMODE_EXEC || mode & FMODE_READ || mode & FMODE_WRITE)
1094 && (ma->ma_valid & MA_INODE) && (ma->ma_attr.la_valid & LA_ATIME)) {
1095 /* Set the atime only. */
1096 ma->ma_attr.la_valid = LA_ATIME;
1097 rc = mo_attr_set(info->mti_env, next, ma);
1100 ma->ma_need |= MA_INODE;
1103 if (!MFD_CLOSED(mode))
1104 rc = mo_close(info->mti_env, next, ma);
1105 else if (ret == -EAGAIN)
1106 rc = mo_attr_get(info->mti_env, next, ma);
1108 /* If the object is unlinked, do not try to re-enable SIZEONMDS */
1109 if ((ret == -EAGAIN) && (ma->ma_valid & MA_INODE) &&
1110 (ma->ma_attr.la_nlink == 0)) {
1114 if ((ret == -EAGAIN) || (ret == 1)) {
1115 struct mdt_export_data *med;
1117 /* The epoch has not closed or Size-on-MDS update is needed.
1118 * Put mfd back into the list. */
1119 LASSERT(mdt_conn_flags(info) & OBD_CONNECT_SOM);
1120 mdt_mfd_set_mode(mfd, (ret == 1 ? FMODE_EPOCH : FMODE_SOM));
1122 LASSERT(mdt_info_req(info));
1123 med = &mdt_info_req(info)->rq_export->exp_mdt_data;
1124 spin_lock(&med->med_open_lock);
1125 list_add(&mfd->mfd_list, &med->med_open_head);
1126 class_handle_hash_back(&mfd->mfd_handle);
1127 spin_unlock(&med->med_open_lock);
1132 CDEBUG(D_INODE, "Size-on-MDS attribute update is "
1133 "needed on "DFID"\n", PFID(mdt_object_fid(o)));
1137 mdt_object_put(info->mti_env, o);
1140 RETURN(rc ? rc : ret);
1143 int mdt_close(struct mdt_thread_info *info)
1145 struct mdt_export_data *med;
1146 struct mdt_file_data *mfd;
1147 struct mdt_object *o;
1148 struct md_attr *ma = &info->mti_attr;
1149 struct mdt_body *repbody = NULL;
1150 struct ptlrpc_request *req = mdt_info_req(info);
1154 if (OBD_FAIL_CHECK_RESET(OBD_FAIL_MDS_CLOSE_NET,
1155 OBD_FAIL_MDS_CLOSE_NET)) {
1156 info->mti_fail_id = OBD_FAIL_MDS_CLOSE_NET;
1160 /* Close may come with the Size-on-MDS update. Unpack it. */
1161 rc = mdt_close_unpack(info);
1163 RETURN(err_serious(rc));
1165 LASSERT(info->mti_epoch);
1167 req_capsule_set_size(info->mti_pill, &RMF_MDT_MD, RCL_SERVER,
1168 info->mti_mdt->mdt_max_mdsize);
1169 req_capsule_set_size(info->mti_pill, &RMF_LOGCOOKIES, RCL_SERVER,
1170 info->mti_mdt->mdt_max_cookiesize);
1171 rc = req_capsule_server_pack(info->mti_pill);
1172 if (mdt_check_resent(info, mdt_reconstruct_generic, NULL))
1173 RETURN(lustre_msg_get_status(req->rq_repmsg));
1175 /* Continue to close handle even if we can not pack reply */
1177 repbody = req_capsule_server_get(info->mti_pill,
1179 ma->ma_lmm = req_capsule_server_get(info->mti_pill,
1181 ma->ma_lmm_size = req_capsule_get_size(info->mti_pill,
1184 ma->ma_cookie = req_capsule_server_get(info->mti_pill,
1186 ma->ma_cookie_size = req_capsule_get_size(info->mti_pill,
1189 ma->ma_need = MA_INODE | MA_LOV | MA_COOKIE;
1190 repbody->eadatasize = 0;
1191 repbody->aclsize = 0;
1193 rc = err_serious(rc);
1195 med = &req->rq_export->exp_mdt_data;
1196 spin_lock(&med->med_open_lock);
1197 mfd = mdt_handle2mfd(info, &info->mti_epoch->handle);
1198 if (mdt_mfd_closed(mfd)) {
1199 spin_unlock(&med->med_open_lock);
1200 CDEBUG(D_INODE, "no handle for file close: fid = "DFID
1201 ": cookie = "LPX64"\n", PFID(info->mti_rr.rr_fid1),
1202 info->mti_epoch->handle.cookie);
1203 rc = err_serious(-ESTALE);
1205 class_handle_unhash(&mfd->mfd_handle);
1206 list_del_init(&mfd->mfd_list);
1207 spin_unlock(&med->med_open_lock);
1209 /* Do not lose object before last unlink. */
1210 o = mfd->mfd_object;
1211 mdt_object_get(info->mti_env, o);
1212 ret = mdt_mfd_close(info, mfd);
1213 if (repbody != NULL)
1214 rc = mdt_handle_last_unlink(info, o, ma);
1215 mdt_empty_transno(info);
1216 mdt_object_put(info->mti_env, o);
1218 if (repbody != NULL)
1219 mdt_shrink_reply(info);
1221 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK))
1222 RETURN(err_serious(-ENOMEM));
1224 if (OBD_FAIL_CHECK_RESET(OBD_FAIL_MDS_CLOSE_NET_REP,
1225 OBD_FAIL_MDS_CLOSE_NET_REP))
1226 info->mti_fail_id = OBD_FAIL_MDS_CLOSE_NET_REP;
1227 RETURN(rc ? rc : ret);
1230 int mdt_done_writing(struct mdt_thread_info *info)
1232 struct mdt_body *repbody = NULL;
1233 struct mdt_export_data *med;
1234 struct mdt_file_data *mfd;
1238 rc = req_capsule_server_pack(info->mti_pill);
1240 RETURN(err_serious(rc));
1242 repbody = req_capsule_server_get(info->mti_pill,
1244 repbody->eadatasize = 0;
1245 repbody->aclsize = 0;
1247 /* Done Writing may come with the Size-on-MDS update. Unpack it. */
1248 rc = mdt_close_unpack(info);
1250 RETURN(err_serious(rc));
1252 if (mdt_check_resent(info, mdt_reconstruct_generic, NULL))
1253 RETURN(lustre_msg_get_status(mdt_info_req(info)->rq_repmsg));
1255 med = &info->mti_exp->exp_mdt_data;
1256 spin_lock(&med->med_open_lock);
1257 mfd = mdt_handle2mfd(info, &info->mti_epoch->handle);
1259 spin_unlock(&med->med_open_lock);
1260 CDEBUG(D_INODE, "no handle for done write: fid = "DFID
1261 ": cookie = "LPX64"\n", PFID(info->mti_rr.rr_fid1),
1262 info->mti_epoch->handle.cookie);
1266 LASSERT(mfd->mfd_mode == FMODE_EPOCH ||
1267 mfd->mfd_mode == FMODE_EPOCHLCK);
1268 class_handle_unhash(&mfd->mfd_handle);
1269 list_del_init(&mfd->mfd_list);
1270 spin_unlock(&med->med_open_lock);
1272 /* Set EPOCH CLOSE flag if not set by client. */
1273 info->mti_epoch->flags |= MF_EPOCH_CLOSE;
1274 info->mti_attr.ma_valid = 0;
1275 rc = mdt_mfd_close(info, mfd);
1276 mdt_empty_transno(info);