1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
5 * Lustre Metadata Target (mdt) open/close file handling
7 * Copyright (C) 2002-2006 Cluster File Systems, Inc.
8 * Author: Huang Hua <huanghua@clusterfs.com>
10 * This file is part of the Lustre file system, http://www.lustre.org
11 * Lustre is a trademark of Cluster File Systems, Inc.
13 * You may have signed or agreed to another license before downloading
14 * this software. If so, you are bound by the terms and conditions
15 * of that agreement, and the following does not apply to you. See the
16 * LICENSE file included with this distribution for more information.
18 * If you did not agree to a different license, then this copy of Lustre
19 * is open source software; you can redistribute it and/or modify it
20 * under the terms of version 2 of the GNU General Public License as
21 * published by the Free Software Foundation.
23 * In either case, Lustre is distributed in the hope that it will be
24 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * license text for more details.
30 # define EXPORT_SYMTAB
32 #define DEBUG_SUBSYSTEM S_MDS
34 #include <linux/lustre_acl.h>
35 #include <lustre_mds.h>
36 #include "mdt_internal.h"
38 /* we do nothing because we do not have refcount now */
39 static void mdt_mfd_get(void *mfdp)
43 /* Create a new mdt_file_data struct, initialize it,
44 * and insert it to global hash table */
45 struct mdt_file_data *mdt_mfd_new(void)
47 struct mdt_file_data *mfd;
52 INIT_LIST_HEAD(&mfd->mfd_handle.h_link);
53 INIT_LIST_HEAD(&mfd->mfd_list);
54 class_handle_hash(&mfd->mfd_handle, mdt_mfd_get);
60 * Find the mfd pointed to by handle in global hash table.
61 * In case of replay the handle is obsoleted
62 * but mfd can be found in mfd list by that handle
64 struct mdt_file_data *mdt_handle2mfd(struct mdt_thread_info *info,
65 const struct lustre_handle *handle)
67 struct ptlrpc_request *req = mdt_info_req(info);
68 struct mdt_file_data *mfd;
71 LASSERT(handle != NULL);
72 mfd = class_handle2object(handle->cookie);
73 /* during dw/setattr replay the mfd can be found by old handle */
75 lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
76 struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
77 list_for_each_entry(mfd, &med->med_open_head, mfd_list) {
78 if (mfd->mfd_old_handle.cookie == handle->cookie)
87 void mdt_mfd_free(struct mdt_file_data *mfd)
89 LASSERT(list_empty(&mfd->mfd_list));
90 OBD_FREE_RCU(mfd, sizeof *mfd, &mfd->mfd_handle);
93 static int mdt_create_data(struct mdt_thread_info *info,
94 struct mdt_object *p, struct mdt_object *o)
96 struct md_op_spec *spec = &info->mti_spec;
97 struct md_attr *ma = &info->mti_attr;
101 if (!md_should_create(spec->sp_cr_flags))
104 ma->ma_need = MA_INODE | MA_LOV;
106 rc = mdo_create_data(info->mti_env,
107 p ? mdt_object_child(p) : NULL,
108 mdt_object_child(o), spec, ma);
112 static int mdt_epoch_opened(struct mdt_object *mo)
114 return mo->mot_epochcount;
117 int mdt_sizeonmds_enabled(struct mdt_object *mo)
119 return !mo->mot_ioepoch;
122 /* Re-enable Size-on-MDS. */
123 void mdt_sizeonmds_enable(struct mdt_thread_info *info,
124 struct mdt_object *mo)
126 spin_lock(&info->mti_mdt->mdt_ioepoch_lock);
127 if (info->mti_epoch->ioepoch == mo->mot_ioepoch) {
128 LASSERT(!mdt_epoch_opened(mo));
132 spin_unlock(&info->mti_mdt->mdt_ioepoch_lock);
135 /* Open the epoch. Epoch open is allowed if @writecount is not negative.
136 * The epoch and writecount handling is performed under the mdt_ioepoch_lock. */
137 int mdt_epoch_open(struct mdt_thread_info *info, struct mdt_object *o)
139 struct mdt_device *mdt = info->mti_mdt;
144 if (!(mdt_conn_flags(info) & OBD_CONNECT_SOM) ||
145 !S_ISREG(lu_object_attr(&o->mot_obj.mo_lu)))
148 spin_lock(&mdt->mdt_ioepoch_lock);
149 if (mdt_epoch_opened(o)) {
150 /* Epoch continues even if there is no writers yet. */
151 CDEBUG(D_INODE, "continue epoch "LPU64" for "DFID"\n",
152 o->mot_ioepoch, PFID(mdt_object_fid(o)));
154 if (info->mti_replayepoch > mdt->mdt_ioepoch)
155 mdt->mdt_ioepoch = info->mti_replayepoch;
158 o->mot_ioepoch = info->mti_replayepoch ?
159 info->mti_replayepoch : mdt->mdt_ioepoch;
160 CDEBUG(D_INODE, "starting epoch "LPU64" for "DFID"\n",
161 mdt->mdt_ioepoch, PFID(mdt_object_fid(o)));
165 spin_unlock(&mdt->mdt_ioepoch_lock);
167 /* Cancel Size-on-MDS attributes on clients if not truncate.
168 * In the later case, mdt_reint_setattr will do it. */
169 if (cancel && (info->mti_rr.rr_fid1 != NULL)) {
170 struct mdt_lock_handle *lh = &info->mti_lh[MDT_LH_CHILD];
171 mdt_lock_reg_init(lh, LCK_EX);
172 rc = mdt_object_lock(info, o, lh, MDS_INODELOCK_UPDATE,
175 mdt_object_unlock(info, o, lh, 1);
180 /* Update the on-disk attributes if needed and re-enable Size-on-MDS caching. */
181 static int mdt_sizeonmds_update(struct mdt_thread_info *info,
182 struct mdt_object *o)
186 CDEBUG(D_INODE, "Closing epoch "LPU64" on "DFID". Count %d\n",
187 o->mot_ioepoch, PFID(mdt_object_fid(o)), o->mot_epochcount);
189 if (info->mti_attr.ma_attr.la_valid & LA_SIZE) {
190 /* Do Size-on-MDS attribute update.
191 * Size-on-MDS is re-enabled inside. */
192 /* XXX: since we have opened the file, it is unnecessary
193 * to check permission when close it. Between the "open"
194 * and "close", maybe someone has changed the file mode
195 * or flags, or the file created mode do not permit wirte,
196 * and so on. Just set MDS_PERM_BYPASS for all the cases. */
197 info->mti_attr.ma_attr_flags |= MDS_PERM_BYPASS;
198 info->mti_attr.ma_attr.la_valid &= LA_SIZE | LA_BLOCKS |
199 LA_ATIME | LA_MTIME | LA_CTIME;
200 RETURN(mdt_attr_set(info, o, 0));
202 mdt_sizeonmds_enable(info, o);
207 * Returns 1 if epoch does not close.
208 * Returns 0 if epoch closes.
209 * Returns -EAGAIN if epoch closes but an Size-on-MDS Update is still needed
210 * from the client. */
211 static int mdt_epoch_close(struct mdt_thread_info *info, struct mdt_object *o)
213 int eviction = (mdt_info_req(info) == NULL ? 1 : 0);
214 struct lu_attr *la = &info->mti_attr.ma_attr;
220 if (!(mdt_conn_flags(info) & OBD_CONNECT_SOM) ||
221 !S_ISREG(lu_object_attr(&o->mot_obj.mo_lu)))
224 spin_lock(&info->mti_mdt->mdt_ioepoch_lock);
226 /* Epoch closes only if client tells about it or eviction occures. */
227 if (eviction || (info->mti_epoch->flags & MF_EPOCH_CLOSE)) {
228 LASSERT(o->mot_epochcount);
231 CDEBUG(D_INODE, "Closing epoch "LPU64" on "DFID". Count %d\n",
232 o->mot_ioepoch, PFID(mdt_object_fid(o)),
236 achange = (info->mti_epoch->flags & MF_SOM_CHANGE);
239 if (!eviction && !mdt_epoch_opened(o)) {
240 /* Epoch ends. Is an Size-on-MDS update needed? */
241 if (o->mot_flags & MF_SOM_CHANGE) {
242 /* Some previous writer changed the attribute.
243 * Do not believe to the current Size-on-MDS
244 * update, re-ask client. */
246 } else if (!(la->la_valid & LA_SIZE) && achange) {
247 /* Attributes were changed by the last writer
248 * only but no Size-on-MDS update is received.*/
253 if (achange || eviction)
254 o->mot_flags |= MF_SOM_CHANGE;
257 opened = mdt_epoch_opened(o);
258 spin_unlock(&info->mti_mdt->mdt_ioepoch_lock);
260 /* If eviction occurred, do nothing. */
261 if ((rc == 0) && !opened && !eviction) {
262 /* Epoch ends and wanted Size-on-MDS update is obtained. */
263 rc = mdt_sizeonmds_update(info, o);
264 /* Avoid the following setattrs of these attributes, e.g.
265 * for atime update. */
266 info->mti_attr.ma_valid = 0;
271 int mdt_write_read(struct mdt_device *mdt, struct mdt_object *o)
275 spin_lock(&mdt->mdt_ioepoch_lock);
276 rc = o->mot_writecount;
277 spin_unlock(&mdt->mdt_ioepoch_lock);
281 int mdt_write_get(struct mdt_device *mdt, struct mdt_object *o)
285 spin_lock(&mdt->mdt_ioepoch_lock);
286 if (o->mot_writecount < 0)
290 spin_unlock(&mdt->mdt_ioepoch_lock);
294 static void mdt_write_put(struct mdt_device *mdt, struct mdt_object *o)
297 spin_lock(&mdt->mdt_ioepoch_lock);
299 spin_unlock(&mdt->mdt_ioepoch_lock);
303 static int mdt_write_deny(struct mdt_device *mdt, struct mdt_object *o)
307 spin_lock(&mdt->mdt_ioepoch_lock);
308 if (o->mot_writecount > 0)
312 spin_unlock(&mdt->mdt_ioepoch_lock);
316 static void mdt_write_allow(struct mdt_device *mdt, struct mdt_object *o)
319 spin_lock(&mdt->mdt_ioepoch_lock);
321 spin_unlock(&mdt->mdt_ioepoch_lock);
325 /* there can be no real transaction so prepare the fake one */
326 static void mdt_empty_transno(struct mdt_thread_info* info)
328 struct mdt_device *mdt = info->mti_mdt;
329 struct ptlrpc_request *req = mdt_info_req(info);
332 /* transaction is occured already */
333 if (lustre_msg_get_transno(req->rq_repmsg) != 0) {
338 spin_lock(&mdt->mdt_transno_lock);
339 if (info->mti_transno == 0) {
340 info->mti_transno = ++ mdt->mdt_last_transno;
342 /* should be replay */
343 if (info->mti_transno > mdt->mdt_last_transno)
344 mdt->mdt_last_transno = info->mti_transno;
346 spin_unlock(&mdt->mdt_transno_lock);
348 CDEBUG(D_INODE, "transno = %llu, last_committed = %llu\n",
350 req->rq_export->exp_obd->obd_last_committed);
352 req->rq_transno = info->mti_transno;
353 lustre_msg_set_transno(req->rq_repmsg, info->mti_transno);
354 lustre_msg_set_last_xid(req->rq_repmsg, req->rq_xid);
358 void mdt_mfd_set_mode(struct mdt_file_data *mfd, int mode)
360 LASSERT(mfd != NULL);
362 CDEBUG(D_HA, "Change mfd %p mode 0x%x->0x%x\n",
363 mfd, (unsigned int)mfd->mfd_mode, (unsigned int)mode);
365 mfd->mfd_mode = mode;
368 static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p,
369 struct mdt_object *o, int flags, int created)
371 struct ptlrpc_request *req = mdt_info_req(info);
372 struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
373 struct mdt_file_data *mfd;
374 struct md_attr *ma = &info->mti_attr;
375 struct lu_attr *la = &ma->ma_attr;
376 struct mdt_body *repbody;
377 int rc = 0, isdir, isreg;
380 repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
382 isreg = S_ISREG(la->la_mode);
383 isdir = S_ISDIR(la->la_mode);
384 if ((isreg && !(ma->ma_valid & MA_LOV))) {
386 * No EA, check whether it is will set regEA and dirEA since in
387 * above attr get, these size might be zero, so reset it, to
388 * retrieve the MD after create obj.
390 ma->ma_lmm_size = req_capsule_get_size(&info->mti_pill,
393 /* in replay case, p == NULL */
394 rc = mdt_create_data(info, p, o);
399 CDEBUG(D_INODE, "after open, ma_valid bit = "LPX64" lmm_size = %d\n",
400 ma->ma_valid, ma->ma_lmm_size);
402 if (ma->ma_valid & MA_LOV) {
403 LASSERT(ma->ma_lmm_size != 0);
404 repbody->eadatasize = ma->ma_lmm_size;
406 repbody->valid |= OBD_MD_FLDIREA;
408 repbody->valid |= OBD_MD_FLEASIZE;
411 if (flags & FMODE_WRITE) {
412 rc = mdt_write_get(info->mti_mdt, o);
414 mdt_epoch_open(info, o);
415 repbody->ioepoch = o->mot_ioepoch;
417 } else if (flags & MDS_FMODE_EXEC) {
418 rc = mdt_write_deny(info->mti_mdt, o);
423 rc = mo_open(info->mti_env, mdt_object_child(o),
424 created ? flags | MDS_OPEN_CREATED : flags);
431 * Keep a reference on this object for this open, and is
432 * released by mdt_mfd_close().
434 mdt_object_get(info->mti_env, o);
437 * @flags is always not zero. At least it should be FMODE_READ,
438 * FMODE_WRITE or FMODE_EXEC.
443 mdt_mfd_set_mode(mfd, flags);
446 mfd->mfd_xid = req->rq_xid;
449 if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
450 struct mdt_file_data *old_mfd;
451 /* Check wheather old cookie already exist in
452 * the list, becasue when do recovery, client
453 * might be disconnected from server, and
454 * restart replay, so there maybe some orphan
455 * mfd here, we should remove them */
456 LASSERT(info->mti_rr.rr_handle != NULL);
457 old_mfd = mdt_handle2mfd(info, info->mti_rr.rr_handle);
459 CDEBUG(D_HA, "del orph mfd %p cookie" LPX64"\n",
460 mfd, info->mti_rr.rr_handle->cookie);
461 spin_lock(&med->med_open_lock);
462 class_handle_unhash(&old_mfd->mfd_handle);
463 list_del_init(&old_mfd->mfd_list);
464 spin_unlock(&med->med_open_lock);
465 mdt_mfd_free(old_mfd);
467 CDEBUG(D_HA, "Store old cookie "LPX64" in new mfd\n",
468 info->mti_rr.rr_handle->cookie);
469 mfd->mfd_old_handle.cookie =
470 info->mti_rr.rr_handle->cookie;
472 spin_lock(&med->med_open_lock);
473 list_add(&mfd->mfd_list, &med->med_open_head);
474 spin_unlock(&med->med_open_lock);
476 repbody->handle.cookie = mfd->mfd_handle.h_cookie;
477 mdt_empty_transno(info);
485 static int mdt_finish_open(struct mdt_thread_info *info,
486 struct mdt_object *p, struct mdt_object *o,
487 int flags, int created, struct ldlm_reply *rep)
489 struct ptlrpc_request *req = mdt_info_req(info);
490 struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
491 struct mdt_device *mdt = info->mti_mdt;
492 struct md_attr *ma = &info->mti_attr;
493 struct lu_attr *la = &ma->ma_attr;
494 struct mdt_file_data *mfd;
495 struct mdt_body *repbody;
497 int isreg, isdir, islnk;
501 LASSERT(ma->ma_valid & MA_INODE);
503 repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
505 isreg = S_ISREG(la->la_mode);
506 isdir = S_ISDIR(la->la_mode);
507 islnk = S_ISLNK(la->la_mode);
508 mdt_pack_attr2body(info, repbody, la, mdt_object_fid(o));
510 if (med->med_rmtclient) {
511 void *buf = req_capsule_server_get(&info->mti_pill, &RMF_ACL);
513 rc = mdt_pack_remote_perm(info, o, buf);
515 repbody->valid &= ~OBD_MD_FLRMTPERM;
516 repbody->aclsize = 0;
518 repbody->valid |= OBD_MD_FLRMTPERM;
519 repbody->aclsize = sizeof(struct mdt_remote_perm);
522 #ifdef CONFIG_FS_POSIX_ACL
523 else if (req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) {
524 const struct lu_env *env = info->mti_env;
525 struct md_object *next = mdt_object_child(o);
526 struct lu_buf *buf = &info->mti_buf;
528 buf->lb_buf = req_capsule_server_get(&info->mti_pill, &RMF_ACL);
529 buf->lb_len = req_capsule_get_size(&info->mti_pill, &RMF_ACL,
531 if (buf->lb_len > 0) {
532 rc = mo_xattr_get(env, next, buf,
533 XATTR_NAME_ACL_ACCESS);
535 if (rc == -ENODATA) {
536 repbody->aclsize = 0;
537 repbody->valid |= OBD_MD_FLACL;
539 } else if (rc == -EOPNOTSUPP) {
542 CERROR("got acl size: %d\n", rc);
545 repbody->aclsize = rc;
546 repbody->valid |= OBD_MD_FLACL;
553 if (mdt->mdt_opts.mo_mds_capa) {
554 struct lustre_capa *capa;
556 capa = req_capsule_server_get(&info->mti_pill, &RMF_CAPA1);
558 capa->lc_opc = CAPA_OPC_MDS_DEFAULT;
560 rc = mo_capa_get(info->mti_env, mdt_object_child(o), capa, 0);
563 repbody->valid |= OBD_MD_FLMDSCAPA;
565 if (mdt->mdt_opts.mo_oss_capa &&
566 S_ISREG(lu_object_attr(&o->mot_obj.mo_lu))) {
567 struct lustre_capa *capa;
569 capa = req_capsule_server_get(&info->mti_pill, &RMF_CAPA2);
571 capa->lc_opc = CAPA_OPC_OSS_DEFAULT | capa_open_opc(flags);
573 rc = mo_capa_get(info->mti_env, mdt_object_child(o), capa, 0);
576 repbody->valid |= OBD_MD_FLOSSCAPA;
580 * If we are following a symlink, don't open; and do not return open
581 * handle for special nodes as client required.
583 if (islnk || (!isreg && !isdir &&
584 (req->rq_export->exp_connect_flags & OBD_CONNECT_NODEVOH))) {
585 lustre_msg_set_transno(req->rq_repmsg, 0);
589 mdt_set_disposition(info, rep, DISP_OPEN_OPEN);
592 * We need to return the existing object's fid back, so it is done here,
593 * after preparing the reply.
595 if (!created && (flags & MDS_OPEN_EXCL) && (flags & MDS_OPEN_CREAT))
598 /* This can't be done earlier, we need to return reply body */
600 if (flags & (MDS_OPEN_CREAT | FMODE_WRITE)) {
601 /* We are trying to create or write an existing dir. */
604 } else if (flags & MDS_OPEN_DIRECTORY)
607 if (OBD_FAIL_CHECK_RESET(OBD_FAIL_MDS_OPEN_CREATE,
608 OBD_FAIL_LDLM_REPLY | OBD_FAIL_ONCE)) {
613 if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
614 spin_lock(&med->med_open_lock);
615 list_for_each(t, &med->med_open_head) {
616 mfd = list_entry(t, struct mdt_file_data, mfd_list);
617 if (mfd->mfd_xid == req->rq_xid) {
622 spin_unlock(&med->med_open_lock);
625 repbody->handle.cookie = mfd->mfd_handle.h_cookie;
626 /*set repbody->ea_size for resent case*/
627 if (ma->ma_valid & MA_LOV) {
628 LASSERT(ma->ma_lmm_size != 0);
629 repbody->eadatasize = ma->ma_lmm_size;
631 repbody->valid |= OBD_MD_FLDIREA;
633 repbody->valid |= OBD_MD_FLEASIZE;
639 rc = mdt_mfd_open(info, p, o, flags, created);
643 extern void mdt_req_from_mcd(struct ptlrpc_request *req,
644 struct mdt_client_data *mcd);
646 void mdt_reconstruct_open(struct mdt_thread_info *info,
647 struct mdt_lock_handle *lhc)
649 const struct lu_env *env = info->mti_env;
650 struct mdt_device *mdt = info->mti_mdt;
651 struct req_capsule *pill = &info->mti_pill;
652 struct ptlrpc_request *req = mdt_info_req(info);
653 struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
654 struct mdt_client_data *mcd = med->med_mcd;
655 struct md_attr *ma = &info->mti_attr;
656 struct mdt_reint_record *rr = &info->mti_rr;
657 __u32 flags = info->mti_spec.sp_cr_flags;
658 struct ldlm_reply *ldlm_rep;
659 struct mdt_object *parent;
660 struct mdt_object *child;
661 struct mdt_body *repbody;
665 LASSERT(pill->rc_fmt == &RQF_LDLM_INTENT_OPEN);
666 ldlm_rep = req_capsule_server_get(pill, &RMF_DLM_REP);
667 repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
669 ma->ma_lmm = req_capsule_server_get(pill, &RMF_MDT_MD);
670 ma->ma_lmm_size = req_capsule_get_size(pill, &RMF_MDT_MD,
672 ma->ma_need = MA_INODE | MA_LOV;
675 mdt_req_from_mcd(req, med->med_mcd);
676 mdt_set_disposition(info, ldlm_rep, mcd->mcd_last_data);
678 CERROR("This is reconstruct open: disp="LPX64", result=%d\n",
679 ldlm_rep->lock_policy_res1, req->rq_status);
681 if (mdt_get_disposition(ldlm_rep, DISP_OPEN_CREATE) &&
682 req->rq_status != 0) {
683 /* We did not create successfully, return error to client. */
684 mdt_shrink_reply(info);
685 GOTO(out, rc = req->rq_status);
688 if (mdt_get_disposition(ldlm_rep, DISP_OPEN_CREATE)) {
690 * We failed after creation, but we do not know in which step
691 * we failed. So try to check the child object.
693 parent = mdt_object_find(env, mdt, rr->rr_fid1);
694 LASSERT(!IS_ERR(parent));
696 child = mdt_object_find(env, mdt, rr->rr_fid2);
697 LASSERT(!IS_ERR(child));
699 rc = mdt_object_exists(child);
701 struct md_object *next;
703 mdt_set_capainfo(info, 1, rr->rr_fid2, BYPASS_CAPA);
704 next = mdt_object_child(child);
705 rc = mo_attr_get(env, next, ma);
707 rc = mdt_finish_open(info, parent, child,
710 /* the child object was created on remote server */
711 repbody->fid1 = *rr->rr_fid2;
712 repbody->valid |= (OBD_MD_FLID | OBD_MD_MDS);
714 } else if (rc == 0) {
715 /* the child does not exist, we should do regular open */
716 mdt_object_put(env, parent);
717 mdt_object_put(env, child);
718 GOTO(regular_open, 0);
720 mdt_object_put(env, parent);
721 mdt_object_put(env, child);
722 mdt_shrink_reply(info);
726 /* We did not try to create, so we are a pure open */
727 rc = mdt_reint_open(info, lhc);
733 lustre_msg_set_status(req->rq_repmsg, req->rq_status);
734 LASSERT(ergo(rc < 0, lustre_msg_get_transno(req->rq_repmsg) == 0));
737 static int mdt_open_by_fid(struct mdt_thread_info* info,
738 struct ldlm_reply *rep)
740 const struct lu_env *env = info->mti_env;
741 __u32 flags = info->mti_spec.sp_cr_flags;
742 struct mdt_reint_record *rr = &info->mti_rr;
743 struct md_attr *ma = &info->mti_attr;
744 struct mdt_object *o;
748 o = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid2);
750 RETURN(rc = PTR_ERR(o));
752 rc = mdt_object_exists(o);
754 mdt_set_disposition(info, rep, (DISP_IT_EXECD |
758 rc = mo_attr_get(env, mdt_object_child(o), ma);
760 rc = mdt_finish_open(info, NULL, o, flags, 0, rep);
761 } else if (rc == 0) {
764 /* the child object was created on remote server */
765 struct mdt_body *repbody;
766 repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
767 repbody->fid1 = *rr->rr_fid2;
768 repbody->valid |= (OBD_MD_FLID | OBD_MD_MDS);
772 mdt_object_put(info->mti_env, o);
776 int mdt_pin(struct mdt_thread_info* info)
779 RETURN(err_serious(-EOPNOTSUPP));
782 /* Cross-ref request. Currently it can only be a pure open (w/o create) */
783 static int mdt_cross_open(struct mdt_thread_info* info,
784 const struct lu_fid *fid,
785 struct ldlm_reply *rep, __u32 flags)
787 struct md_attr *ma = &info->mti_attr;
788 struct mdt_object *o;
792 o = mdt_object_find(info->mti_env, info->mti_mdt, fid);
794 RETURN(rc = PTR_ERR(o));
796 rc = mdt_object_exists(o);
798 /* Do permission check for cross-open. */
799 rc = mo_permission(info->mti_env, NULL, mdt_object_child(o),
800 NULL, flags | MDS_OPEN_CROSS);
804 mdt_set_capainfo(info, 0, fid, BYPASS_CAPA);
805 rc = mo_attr_get(info->mti_env, mdt_object_child(o), ma);
807 rc = mdt_finish_open(info, NULL, o, flags, 0, rep);
808 } else if (rc == 0) {
810 * Something is wrong here. lookup was positive but there is
813 CERROR("Cross-ref object doesn't exist!\n");
816 /* Something is wrong here, the object is on another MDS! */
817 CERROR("The object isn't on this server! FLD error?\n");
818 LU_OBJECT_DEBUG(D_WARNING, info->mti_env,
820 "Object isn't on this server! FLD error?\n");
826 mdt_object_put(info->mti_env, o);
830 int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
832 struct mdt_device *mdt = info->mti_mdt;
833 struct ptlrpc_request *req = mdt_info_req(info);
834 struct mdt_object *parent;
835 struct mdt_object *child;
836 struct mdt_lock_handle *lh;
837 struct ldlm_reply *ldlm_rep;
838 struct mdt_body *repbody;
839 struct lu_fid *child_fid = &info->mti_tmp_fid1;
840 struct md_attr *ma = &info->mti_attr;
841 __u32 create_flags = info->mti_spec.sp_cr_flags;
842 struct mdt_reint_record *rr = &info->mti_rr;
843 struct lu_name *lname;
848 OBD_FAIL_TIMEOUT_ORSET(OBD_FAIL_MDS_PAUSE_OPEN, OBD_FAIL_ONCE,
849 (obd_timeout + 1) / 4);
851 repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
853 ma->ma_lmm = req_capsule_server_get(&info->mti_pill, &RMF_MDT_MD);
854 ma->ma_lmm_size = req_capsule_get_size(&info->mti_pill, &RMF_MDT_MD,
856 ma->ma_need = MA_INODE | MA_LOV;
859 LASSERT(info->mti_pill.rc_fmt == &RQF_LDLM_INTENT_OPEN);
860 ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
862 /* TODO: JOIN file */
863 if (create_flags & MDS_OPEN_JOIN_FILE) {
864 CERROR("JOIN file will be supported soon\n");
865 GOTO(out, result = err_serious(-EOPNOTSUPP));
868 CDEBUG(D_INODE, "I am going to open "DFID"/(%s->"DFID") "
869 "cr_flag=0%o mode=0%06o msg_flag=0x%x\n",
870 PFID(rr->rr_fid1), rr->rr_name,
871 PFID(rr->rr_fid2), create_flags,
872 ma->ma_attr.la_mode, lustre_msg_get_flags(req->rq_reqmsg));
874 if ((lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) ||
875 (req->rq_export->exp_libclient && create_flags&MDS_OPEN_HAS_EA)) {
876 /* This is a replay request or from liblustre with ea. */
877 result = mdt_open_by_fid(info, ldlm_rep);
879 if (result != -ENOENT) {
880 if (req->rq_export->exp_libclient &&
881 create_flags&MDS_OPEN_HAS_EA)
882 GOTO(out, result = 0);
886 * We didn't find the correct object, so we need to re-create it
887 * via a regular replay.
889 if (!(create_flags & MDS_OPEN_CREAT)) {
890 DEBUG_REQ(D_ERROR, req,"OPEN & CREAT not in open replay.");
891 GOTO(out, result = -EFAULT);
893 CDEBUG(D_INFO, "Open replay did find object, continue as "
897 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK))
898 GOTO(out, result = err_serious(-ENOMEM));
900 mdt_set_disposition(info, ldlm_rep,
901 (DISP_IT_EXECD | DISP_LOOKUP_EXECD));
903 if (info->mti_cross_ref) {
904 /* This is cross-ref open */
905 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
906 result = mdt_cross_open(info, rr->rr_fid1, ldlm_rep,
911 lh = &info->mti_lh[MDT_LH_PARENT];
912 mdt_lock_pdo_init(lh, (create_flags & MDS_OPEN_CREAT) ?
913 LCK_PW : LCK_PR, rr->rr_name, rr->rr_namelen);
915 parent = mdt_object_find_lock(info, rr->rr_fid1, lh,
916 MDS_INODELOCK_UPDATE);
918 GOTO(out, result = PTR_ERR(parent));
922 lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
924 result = mdo_lookup(info->mti_env, mdt_object_child(parent),
925 lname, child_fid, &info->mti_spec);
926 LASSERTF(ergo(result == 0, fid_is_sane(child_fid)),
927 "looking for "DFID"/%s, result fid="DFID"\n",
928 PFID(mdt_object_fid(parent)), rr->rr_name, PFID(child_fid));
930 if (result != 0 && result != -ENOENT && result != -ESTALE)
931 GOTO(out_parent, result);
933 if (result == -ENOENT || result == -ESTALE) {
934 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
935 if (result == -ESTALE) {
937 * -ESTALE means the parent is a dead(unlinked) dir, so
938 * it should return -ENOENT to in accordance with the
939 * original mds implementaion.
941 GOTO(out_parent, result = -ENOENT);
943 if (!(create_flags & MDS_OPEN_CREAT))
944 GOTO(out_parent, result);
945 *child_fid = *info->mti_rr.rr_fid2;
946 LASSERTF(fid_is_sane(child_fid), "fid="DFID"\n",
950 * Check for O_EXCL is moved to the mdt_finish_open(), we need to
951 * return FID back in that case.
953 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
956 child = mdt_object_find(info->mti_env, mdt, child_fid);
958 GOTO(out_parent, result = PTR_ERR(child));
960 mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA);
961 if (result == -ENOENT) {
962 /* Not found and with MDS_OPEN_CREAT: let's create it. */
963 mdt_set_disposition(info, ldlm_rep, DISP_OPEN_CREATE);
965 /* Let lower layers know what is lock mode on directory. */
966 info->mti_spec.sp_cr_mode =
967 mdt_dlm_mode2mdl_mode(lh->mlh_pdo_mode);
970 * Do not perform lookup sanity check. We know that name does
973 info->mti_spec.sp_cr_lookup = 0;
975 result = mdo_create(info->mti_env,
976 mdt_object_child(parent),
978 mdt_object_child(child),
981 if (result == -ERESTART) {
982 mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_CREATE);
983 GOTO(out_child, result);
986 GOTO(out_child, result);
990 /* We have to get attr & lov ea for this object */
991 result = mo_attr_get(info->mti_env, mdt_object_child(child),
994 * The object is on remote node, return its FID for remote open.
996 if (result == -EREMOTE) {
1000 * Check if this lock already was sent to client and
1001 * this is resent case. For resent case do not take lock
1002 * again, use what is already granted.
1004 LASSERT(lhc != NULL);
1006 if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
1007 struct ldlm_lock *lock;
1009 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
1012 lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
1014 CERROR("Invalid lock handle "LPX64"\n",
1015 lhc->mlh_reg_lh.cookie);
1018 LASSERT(fid_res_name_eq(mdt_object_fid(child),
1019 &lock->l_resource->lr_name));
1020 LDLM_LOCK_PUT(lock);
1023 mdt_lock_handle_init(lhc);
1024 mdt_lock_reg_init(lhc, LCK_PR);
1026 rc = mdt_object_lock(info, child, lhc,
1027 MDS_INODELOCK_LOOKUP,
1030 repbody->fid1 = *mdt_object_fid(child);
1031 repbody->valid |= (OBD_MD_FLID | OBD_MD_MDS);
1034 GOTO(out_child, result);
1038 /* Try to open it now. */
1039 result = mdt_finish_open(info, parent, child, create_flags,
1042 if (result != 0 && created) {
1046 ma->ma_cookie_size = 0;
1047 info->mti_no_need_trans = 1;
1048 rc2 = mdo_unlink(info->mti_env,
1049 mdt_object_child(parent),
1050 mdt_object_child(child),
1054 CERROR("Error in cleanup of open\n");
1058 mdt_object_put(info->mti_env, child);
1060 mdt_object_unlock_put(info, parent, lh, result);
1062 mdt_shrink_reply(info);
1064 lustre_msg_set_transno(req->rq_repmsg, 0);
1068 #define MFD_CLOSED(mode) (((mode) & ~(FMODE_EPOCH | FMODE_SOM | \
1069 FMODE_EPOCHLCK)) == FMODE_CLOSED)
1071 static int mdt_mfd_closed(struct mdt_file_data *mfd)
1073 return ((mfd == NULL) || MFD_CLOSED(mfd->mfd_mode));
1076 int mdt_mfd_close(struct mdt_thread_info *info, struct mdt_file_data *mfd)
1078 struct mdt_object *o = mfd->mfd_object;
1079 struct md_object *next = mdt_object_child(o);
1080 struct md_attr *ma = &info->mti_attr;
1081 int rc = 0, ret = 0;
1085 mode = mfd->mfd_mode;
1087 if ((mode & FMODE_WRITE) || (mode & FMODE_EPOCHLCK)) {
1088 mdt_write_put(info->mti_mdt, o);
1089 ret = mdt_epoch_close(info, o);
1090 } else if (mode & MDS_FMODE_EXEC) {
1091 mdt_write_allow(info->mti_mdt, o);
1092 } else if (mode & FMODE_EPOCH) {
1093 ret = mdt_epoch_close(info, o);
1096 /* Update atime on close only. */
1097 if ((mode & MDS_FMODE_EXEC || mode & FMODE_READ || mode & FMODE_WRITE)
1098 && (ma->ma_valid & MA_INODE) && (ma->ma_attr.la_valid & LA_ATIME)) {
1099 /* Set the atime only. */
1100 ma->ma_attr.la_valid = LA_ATIME;
1101 rc = mo_attr_set(info->mti_env, next, ma);
1104 ma->ma_need |= MA_INODE;
1107 if (!MFD_CLOSED(mode))
1108 rc = mo_close(info->mti_env, next, ma);
1109 else if (ret == -EAGAIN)
1110 rc = mo_attr_get(info->mti_env, next, ma);
1112 /* If the object is unlinked, do not try to re-enable SIZEONMDS */
1113 if ((ret == -EAGAIN) && (ma->ma_valid & MA_INODE) &&
1114 (ma->ma_attr.la_nlink == 0)) {
1118 if ((ret == -EAGAIN) || (ret == 1)) {
1119 struct mdt_export_data *med;
1121 /* The epoch has not closed or Size-on-MDS update is needed.
1122 * Put mfd back into the list. */
1123 LASSERT(mdt_conn_flags(info) & OBD_CONNECT_SOM);
1124 mdt_mfd_set_mode(mfd, (ret == 1 ? FMODE_EPOCH : FMODE_SOM));
1126 LASSERT(mdt_info_req(info));
1127 med = &mdt_info_req(info)->rq_export->exp_mdt_data;
1128 spin_lock(&med->med_open_lock);
1129 list_add(&mfd->mfd_list, &med->med_open_head);
1130 class_handle_hash_back(&mfd->mfd_handle);
1131 spin_unlock(&med->med_open_lock);
1136 CDEBUG(D_INODE, "Size-on-MDS attribute update is "
1137 "needed on "DFID"\n", PFID(mdt_object_fid(o)));
1141 mdt_object_put(info->mti_env, o);
1144 RETURN(rc ? rc : ret);
1147 int mdt_close(struct mdt_thread_info *info)
1149 struct mdt_export_data *med;
1150 struct mdt_file_data *mfd;
1151 struct mdt_object *o;
1152 struct md_attr *ma = &info->mti_attr;
1153 struct mdt_body *repbody = NULL;
1154 struct ptlrpc_request *req = mdt_info_req(info);
1158 /* Close may come with the Size-on-MDS update. Unpack it. */
1159 rc = mdt_close_unpack(info);
1161 RETURN(err_serious(rc));
1163 LASSERT(info->mti_epoch);
1165 req_capsule_set_size(&info->mti_pill, &RMF_MDT_MD, RCL_SERVER,
1166 info->mti_mdt->mdt_max_mdsize);
1167 req_capsule_set_size(&info->mti_pill, &RMF_LOGCOOKIES, RCL_SERVER,
1168 info->mti_mdt->mdt_max_cookiesize);
1169 rc = req_capsule_pack(&info->mti_pill);
1170 if (mdt_check_resent(info, mdt_reconstruct_generic, NULL))
1171 RETURN(lustre_msg_get_status(req->rq_repmsg));
1173 /* Continue to close handle even if we can not pack reply */
1175 repbody = req_capsule_server_get(&info->mti_pill,
1177 ma->ma_lmm = req_capsule_server_get(&info->mti_pill,
1179 ma->ma_lmm_size = req_capsule_get_size(&info->mti_pill,
1182 ma->ma_cookie = req_capsule_server_get(&info->mti_pill,
1184 ma->ma_cookie_size = req_capsule_get_size(&info->mti_pill,
1187 ma->ma_need = MA_INODE | MA_LOV | MA_COOKIE;
1188 repbody->eadatasize = 0;
1189 repbody->aclsize = 0;
1191 rc = err_serious(rc);
1193 med = &req->rq_export->exp_mdt_data;
1194 spin_lock(&med->med_open_lock);
1195 mfd = mdt_handle2mfd(info, &info->mti_epoch->handle);
1196 if (mdt_mfd_closed(mfd)) {
1197 spin_unlock(&med->med_open_lock);
1198 CDEBUG(D_INODE, "no handle for file close: fid = "DFID
1199 ": cookie = "LPX64"\n", PFID(info->mti_rr.rr_fid1),
1200 info->mti_epoch->handle.cookie);
1201 rc = err_serious(-ESTALE);
1203 class_handle_unhash(&mfd->mfd_handle);
1204 list_del_init(&mfd->mfd_list);
1205 spin_unlock(&med->med_open_lock);
1207 /* Do not lose object before last unlink. */
1208 o = mfd->mfd_object;
1209 mdt_object_get(info->mti_env, o);
1210 ret = mdt_mfd_close(info, mfd);
1211 if (repbody != NULL)
1212 rc = mdt_handle_last_unlink(info, o, ma);
1213 mdt_empty_transno(info);
1214 mdt_object_put(info->mti_env, o);
1216 if (repbody != NULL)
1217 mdt_shrink_reply(info);
1219 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK))
1220 RETURN(err_serious(-ENOMEM));
1222 RETURN(rc ? rc : ret);
1225 int mdt_done_writing(struct mdt_thread_info *info)
1227 struct mdt_body *repbody = NULL;
1228 struct mdt_export_data *med;
1229 struct mdt_file_data *mfd;
1233 rc = req_capsule_pack(&info->mti_pill);
1235 RETURN(err_serious(rc));
1237 repbody = req_capsule_server_get(&info->mti_pill,
1239 repbody->eadatasize = 0;
1240 repbody->aclsize = 0;
1242 /* Done Writing may come with the Size-on-MDS update. Unpack it. */
1243 rc = mdt_close_unpack(info);
1245 RETURN(err_serious(rc));
1247 if (mdt_check_resent(info, mdt_reconstruct_generic, NULL))
1248 RETURN(lustre_msg_get_status(mdt_info_req(info)->rq_repmsg));
1250 med = &info->mti_exp->exp_mdt_data;
1251 spin_lock(&med->med_open_lock);
1252 mfd = mdt_handle2mfd(info, &info->mti_epoch->handle);
1254 spin_unlock(&med->med_open_lock);
1255 CDEBUG(D_INODE, "no handle for done write: fid = "DFID
1256 ": cookie = "LPX64"\n", PFID(info->mti_rr.rr_fid1),
1257 info->mti_epoch->handle.cookie);
1261 LASSERT(mfd->mfd_mode == FMODE_EPOCH ||
1262 mfd->mfd_mode == FMODE_EPOCHLCK);
1263 class_handle_unhash(&mfd->mfd_handle);
1264 list_del_init(&mfd->mfd_list);
1265 spin_unlock(&med->med_open_lock);
1267 /* Set EPOCH CLOSE flag if not set by client. */
1268 info->mti_epoch->flags |= MF_EPOCH_CLOSE;
1269 info->mti_attr.ma_valid = 0;
1270 rc = mdt_mfd_close(info, mfd);
1271 mdt_empty_transno(info);