1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
5 * Lustre Metadata Target (mdt) open/close file handling
7 * Copyright (C) 2002-2006 Cluster File Systems, Inc.
8 * Author: Huang Hua <huanghua@clusterfs.com>
10 * This file is part of the Lustre file system, http://www.lustre.org
11 * Lustre is a trademark of Cluster File Systems, Inc.
13 * You may have signed or agreed to another license before downloading
14 * this software. If so, you are bound by the terms and conditions
15 * of that agreement, and the following does not apply to you. See the
16 * LICENSE file included with this distribution for more information.
18 * If you did not agree to a different license, then this copy of Lustre
19 * is open source software; you can redistribute it and/or modify it
20 * under the terms of version 2 of the GNU General Public License as
21 * published by the Free Software Foundation.
23 * In either case, Lustre is distributed in the hope that it will be
24 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * license text for more details.
30 # define EXPORT_SYMTAB
32 #define DEBUG_SUBSYSTEM S_MDS
34 #include <linux/lustre_acl.h>
35 #include <lustre_mds.h>
36 #include "mdt_internal.h"
38 /* we do nothing because we do not have refcount now */
39 static void mdt_mfd_get(void *mfdp)
43 /* Create a new mdt_file_data struct, initialize it,
44 * and insert it to global hash table */
45 struct mdt_file_data *mdt_mfd_new(void)
47 struct mdt_file_data *mfd;
52 CFS_INIT_LIST_HEAD(&mfd->mfd_handle.h_link);
53 CFS_INIT_LIST_HEAD(&mfd->mfd_list);
54 class_handle_hash(&mfd->mfd_handle, mdt_mfd_get);
60 * Find the mfd pointed to by handle in global hash table.
61 * In case of replay the handle is obsoleted
62 * but mfd can be found in mfd list by that handle
64 struct mdt_file_data *mdt_handle2mfd(struct mdt_thread_info *info,
65 const struct lustre_handle *handle)
67 struct ptlrpc_request *req = mdt_info_req(info);
68 struct mdt_file_data *mfd;
71 LASSERT(handle != NULL);
72 mfd = class_handle2object(handle->cookie);
73 /* during dw/setattr replay the mfd can be found by old handle */
75 lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
76 struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
77 list_for_each_entry(mfd, &med->med_open_head, mfd_list) {
78 if (mfd->mfd_old_handle.cookie == handle->cookie)
87 void mdt_mfd_free(struct mdt_file_data *mfd)
89 LASSERT(list_empty(&mfd->mfd_list));
90 OBD_FREE_RCU(mfd, sizeof *mfd, &mfd->mfd_handle);
93 static int mdt_create_data(struct mdt_thread_info *info,
94 struct mdt_object *p, struct mdt_object *o)
96 struct md_op_spec *spec = &info->mti_spec;
97 struct md_attr *ma = &info->mti_attr;
101 if (!md_should_create(spec->sp_cr_flags))
104 ma->ma_need = MA_INODE | MA_LOV;
106 rc = mdo_create_data(info->mti_env,
107 p ? mdt_object_child(p) : NULL,
108 mdt_object_child(o), spec, ma);
112 static int mdt_epoch_opened(struct mdt_object *mo)
114 return mo->mot_epochcount;
117 int mdt_sizeonmds_enabled(struct mdt_object *mo)
119 return !mo->mot_ioepoch;
122 /* Re-enable Size-on-MDS. */
123 void mdt_sizeonmds_enable(struct mdt_thread_info *info,
124 struct mdt_object *mo)
126 spin_lock(&info->mti_mdt->mdt_ioepoch_lock);
127 if (info->mti_epoch->ioepoch == mo->mot_ioepoch) {
128 LASSERT(!mdt_epoch_opened(mo));
132 spin_unlock(&info->mti_mdt->mdt_ioepoch_lock);
135 /* Open the epoch. Epoch open is allowed if @writecount is not negative.
136 * The epoch and writecount handling is performed under the mdt_ioepoch_lock. */
137 int mdt_epoch_open(struct mdt_thread_info *info, struct mdt_object *o)
139 struct mdt_device *mdt = info->mti_mdt;
144 if (!(mdt_conn_flags(info) & OBD_CONNECT_SOM) ||
145 !S_ISREG(lu_object_attr(&o->mot_obj.mo_lu)))
148 spin_lock(&mdt->mdt_ioepoch_lock);
149 if (mdt_epoch_opened(o)) {
150 /* Epoch continues even if there is no writers yet. */
151 CDEBUG(D_INODE, "continue epoch "LPU64" for "DFID"\n",
152 o->mot_ioepoch, PFID(mdt_object_fid(o)));
154 if (info->mti_replayepoch > mdt->mdt_ioepoch)
155 mdt->mdt_ioepoch = info->mti_replayepoch;
158 o->mot_ioepoch = info->mti_replayepoch ?
159 info->mti_replayepoch : mdt->mdt_ioepoch;
160 CDEBUG(D_INODE, "starting epoch "LPU64" for "DFID"\n",
161 mdt->mdt_ioepoch, PFID(mdt_object_fid(o)));
165 spin_unlock(&mdt->mdt_ioepoch_lock);
167 /* Cancel Size-on-MDS attributes on clients if not truncate.
168 * In the later case, mdt_reint_setattr will do it. */
169 if (cancel && (info->mti_rr.rr_fid1 != NULL)) {
170 struct mdt_lock_handle *lh = &info->mti_lh[MDT_LH_CHILD];
171 mdt_lock_reg_init(lh, LCK_EX);
172 rc = mdt_object_lock(info, o, lh, MDS_INODELOCK_UPDATE,
175 mdt_object_unlock(info, o, lh, 1);
180 /* Update the on-disk attributes if needed and re-enable Size-on-MDS caching. */
181 static int mdt_sizeonmds_update(struct mdt_thread_info *info,
182 struct mdt_object *o)
186 CDEBUG(D_INODE, "Closing epoch "LPU64" on "DFID". Count %d\n",
187 o->mot_ioepoch, PFID(mdt_object_fid(o)), o->mot_epochcount);
189 if (info->mti_attr.ma_attr.la_valid & LA_SIZE) {
190 /* Do Size-on-MDS attribute update.
191 * Size-on-MDS is re-enabled inside. */
192 /* XXX: since we have opened the file, it is unnecessary
193 * to check permission when close it. Between the "open"
194 * and "close", maybe someone has changed the file mode
195 * or flags, or the file created mode do not permit wirte,
196 * and so on. Just set MDS_PERM_BYPASS for all the cases. */
197 info->mti_attr.ma_attr_flags |= MDS_PERM_BYPASS;
198 info->mti_attr.ma_attr.la_valid &= LA_SIZE | LA_BLOCKS |
199 LA_ATIME | LA_MTIME | LA_CTIME;
200 RETURN(mdt_attr_set(info, o, 0));
202 mdt_sizeonmds_enable(info, o);
207 * Returns 1 if epoch does not close.
208 * Returns 0 if epoch closes.
209 * Returns -EAGAIN if epoch closes but an Size-on-MDS Update is still needed
210 * from the client. */
211 static int mdt_epoch_close(struct mdt_thread_info *info, struct mdt_object *o)
213 int eviction = (mdt_info_req(info) == NULL ? 1 : 0);
214 struct lu_attr *la = &info->mti_attr.ma_attr;
220 if (!(mdt_conn_flags(info) & OBD_CONNECT_SOM) ||
221 !S_ISREG(lu_object_attr(&o->mot_obj.mo_lu)))
224 spin_lock(&info->mti_mdt->mdt_ioepoch_lock);
226 /* Epoch closes only if client tells about it or eviction occures. */
227 if (eviction || (info->mti_epoch->flags & MF_EPOCH_CLOSE)) {
228 LASSERT(o->mot_epochcount);
231 CDEBUG(D_INODE, "Closing epoch "LPU64" on "DFID". Count %d\n",
232 o->mot_ioepoch, PFID(mdt_object_fid(o)),
236 achange = (info->mti_epoch->flags & MF_SOM_CHANGE);
239 if (!eviction && !mdt_epoch_opened(o)) {
240 /* Epoch ends. Is an Size-on-MDS update needed? */
241 if (o->mot_flags & MF_SOM_CHANGE) {
242 /* Some previous writer changed the attribute.
243 * Do not believe to the current Size-on-MDS
244 * update, re-ask client. */
246 } else if (!(la->la_valid & LA_SIZE) && achange) {
247 /* Attributes were changed by the last writer
248 * only but no Size-on-MDS update is received.*/
253 if (achange || eviction)
254 o->mot_flags |= MF_SOM_CHANGE;
257 opened = mdt_epoch_opened(o);
258 spin_unlock(&info->mti_mdt->mdt_ioepoch_lock);
260 /* If eviction occurred, do nothing. */
261 if ((rc == 0) && !opened && !eviction) {
262 /* Epoch ends and wanted Size-on-MDS update is obtained. */
263 rc = mdt_sizeonmds_update(info, o);
264 /* Avoid the following setattrs of these attributes, e.g.
265 * for atime update. */
266 info->mti_attr.ma_valid = 0;
271 int mdt_write_read(struct mdt_device *mdt, struct mdt_object *o)
275 spin_lock(&mdt->mdt_ioepoch_lock);
276 rc = o->mot_writecount;
277 spin_unlock(&mdt->mdt_ioepoch_lock);
281 int mdt_write_get(struct mdt_device *mdt, struct mdt_object *o)
285 spin_lock(&mdt->mdt_ioepoch_lock);
286 if (o->mot_writecount < 0)
290 spin_unlock(&mdt->mdt_ioepoch_lock);
294 static void mdt_write_put(struct mdt_device *mdt, struct mdt_object *o)
297 spin_lock(&mdt->mdt_ioepoch_lock);
299 spin_unlock(&mdt->mdt_ioepoch_lock);
303 static int mdt_write_deny(struct mdt_device *mdt, struct mdt_object *o)
307 spin_lock(&mdt->mdt_ioepoch_lock);
308 if (o->mot_writecount > 0)
312 spin_unlock(&mdt->mdt_ioepoch_lock);
316 static void mdt_write_allow(struct mdt_device *mdt, struct mdt_object *o)
319 spin_lock(&mdt->mdt_ioepoch_lock);
321 spin_unlock(&mdt->mdt_ioepoch_lock);
325 /* there can be no real transaction so prepare the fake one */
326 static void mdt_empty_transno(struct mdt_thread_info* info)
328 struct mdt_device *mdt = info->mti_mdt;
329 struct ptlrpc_request *req = mdt_info_req(info);
332 /* transaction is occured already */
333 if (lustre_msg_get_transno(req->rq_repmsg) != 0) {
338 spin_lock(&mdt->mdt_transno_lock);
339 if (info->mti_transno == 0) {
340 info->mti_transno = ++ mdt->mdt_last_transno;
342 /* should be replay */
343 if (info->mti_transno > mdt->mdt_last_transno)
344 mdt->mdt_last_transno = info->mti_transno;
346 spin_unlock(&mdt->mdt_transno_lock);
348 CDEBUG(D_INODE, "transno = %llu, last_committed = %llu\n",
350 req->rq_export->exp_obd->obd_last_committed);
352 req->rq_transno = info->mti_transno;
353 lustre_msg_set_transno(req->rq_repmsg, info->mti_transno);
354 lustre_msg_set_last_xid(req->rq_repmsg, req->rq_xid);
358 void mdt_mfd_set_mode(struct mdt_file_data *mfd, int mode)
360 LASSERT(mfd != NULL);
362 CDEBUG(D_HA, "Change mfd %p mode 0x%x->0x%x\n",
363 mfd, (unsigned int)mfd->mfd_mode, (unsigned int)mode);
365 mfd->mfd_mode = mode;
368 static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p,
369 struct mdt_object *o, int flags, int created)
371 struct ptlrpc_request *req = mdt_info_req(info);
372 struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
373 struct mdt_file_data *mfd;
374 struct md_attr *ma = &info->mti_attr;
375 struct lu_attr *la = &ma->ma_attr;
376 struct mdt_body *repbody;
377 int rc = 0, isdir, isreg;
380 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
382 isreg = S_ISREG(la->la_mode);
383 isdir = S_ISDIR(la->la_mode);
384 if ((isreg && !(ma->ma_valid & MA_LOV))) {
386 * No EA, check whether it is will set regEA and dirEA since in
387 * above attr get, these size might be zero, so reset it, to
388 * retrieve the MD after create obj.
390 ma->ma_lmm_size = req_capsule_get_size(info->mti_pill,
393 /* in replay case, p == NULL */
394 rc = mdt_create_data(info, p, o);
399 CDEBUG(D_INODE, "after open, ma_valid bit = "LPX64" lmm_size = %d\n",
400 ma->ma_valid, ma->ma_lmm_size);
402 if (ma->ma_valid & MA_LOV) {
403 LASSERT(ma->ma_lmm_size != 0);
404 repbody->eadatasize = ma->ma_lmm_size;
406 repbody->valid |= OBD_MD_FLDIREA;
408 repbody->valid |= OBD_MD_FLEASIZE;
411 if (flags & FMODE_WRITE) {
412 rc = mdt_write_get(info->mti_mdt, o);
414 mdt_epoch_open(info, o);
415 repbody->ioepoch = o->mot_ioepoch;
417 } else if (flags & MDS_FMODE_EXEC) {
418 rc = mdt_write_deny(info->mti_mdt, o);
423 rc = mo_open(info->mti_env, mdt_object_child(o),
424 created ? flags | MDS_OPEN_CREATED : flags);
431 * Keep a reference on this object for this open, and is
432 * released by mdt_mfd_close().
434 mdt_object_get(info->mti_env, o);
437 * @flags is always not zero. At least it should be FMODE_READ,
438 * FMODE_WRITE or FMODE_EXEC.
443 mdt_mfd_set_mode(mfd, flags);
446 mfd->mfd_xid = req->rq_xid;
449 if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
450 struct mdt_file_data *old_mfd;
451 /* Check wheather old cookie already exist in
452 * the list, becasue when do recovery, client
453 * might be disconnected from server, and
454 * restart replay, so there maybe some orphan
455 * mfd here, we should remove them */
456 LASSERT(info->mti_rr.rr_handle != NULL);
457 old_mfd = mdt_handle2mfd(info, info->mti_rr.rr_handle);
459 CDEBUG(D_HA, "del orph mfd %p cookie" LPX64"\n",
460 mfd, info->mti_rr.rr_handle->cookie);
461 spin_lock(&med->med_open_lock);
462 class_handle_unhash(&old_mfd->mfd_handle);
463 list_del_init(&old_mfd->mfd_list);
464 spin_unlock(&med->med_open_lock);
465 mdt_mfd_free(old_mfd);
467 CDEBUG(D_HA, "Store old cookie "LPX64" in new mfd\n",
468 info->mti_rr.rr_handle->cookie);
469 mfd->mfd_old_handle.cookie =
470 info->mti_rr.rr_handle->cookie;
472 spin_lock(&med->med_open_lock);
473 list_add(&mfd->mfd_list, &med->med_open_head);
474 spin_unlock(&med->med_open_lock);
476 repbody->handle.cookie = mfd->mfd_handle.h_cookie;
477 mdt_empty_transno(info);
485 static int mdt_finish_open(struct mdt_thread_info *info,
486 struct mdt_object *p, struct mdt_object *o,
487 int flags, int created, struct ldlm_reply *rep)
489 struct ptlrpc_request *req = mdt_info_req(info);
490 struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
491 struct mdt_device *mdt = info->mti_mdt;
492 struct md_attr *ma = &info->mti_attr;
493 struct lu_attr *la = &ma->ma_attr;
494 struct mdt_file_data *mfd;
495 struct mdt_body *repbody;
497 int isreg, isdir, islnk;
501 LASSERT(ma->ma_valid & MA_INODE);
503 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
505 isreg = S_ISREG(la->la_mode);
506 isdir = S_ISDIR(la->la_mode);
507 islnk = S_ISLNK(la->la_mode);
508 mdt_pack_attr2body(info, repbody, la, mdt_object_fid(o));
510 if (med->med_rmtclient) {
511 void *buf = req_capsule_server_get(info->mti_pill, &RMF_ACL);
513 rc = mdt_pack_remote_perm(info, o, buf);
515 repbody->valid &= ~OBD_MD_FLRMTPERM;
516 repbody->aclsize = 0;
518 repbody->valid |= OBD_MD_FLRMTPERM;
519 repbody->aclsize = sizeof(struct mdt_remote_perm);
522 #ifdef CONFIG_FS_POSIX_ACL
523 else if (req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) {
524 const struct lu_env *env = info->mti_env;
525 struct md_object *next = mdt_object_child(o);
526 struct lu_buf *buf = &info->mti_buf;
528 buf->lb_buf = req_capsule_server_get(info->mti_pill, &RMF_ACL);
529 buf->lb_len = req_capsule_get_size(info->mti_pill, &RMF_ACL,
531 if (buf->lb_len > 0) {
532 rc = mo_xattr_get(env, next, buf,
533 XATTR_NAME_ACL_ACCESS);
535 if (rc == -ENODATA) {
536 repbody->aclsize = 0;
537 repbody->valid |= OBD_MD_FLACL;
539 } else if (rc == -EOPNOTSUPP) {
542 CERROR("got acl size: %d\n", rc);
545 repbody->aclsize = rc;
546 repbody->valid |= OBD_MD_FLACL;
553 if (mdt->mdt_opts.mo_mds_capa) {
554 struct lustre_capa *capa;
556 capa = req_capsule_server_get(info->mti_pill, &RMF_CAPA1);
558 capa->lc_opc = CAPA_OPC_MDS_DEFAULT;
560 rc = mo_capa_get(info->mti_env, mdt_object_child(o), capa, 0);
563 repbody->valid |= OBD_MD_FLMDSCAPA;
565 if (mdt->mdt_opts.mo_oss_capa &&
566 S_ISREG(lu_object_attr(&o->mot_obj.mo_lu))) {
567 struct lustre_capa *capa;
569 capa = req_capsule_server_get(info->mti_pill, &RMF_CAPA2);
571 capa->lc_opc = CAPA_OPC_OSS_DEFAULT | capa_open_opc(flags);
573 rc = mo_capa_get(info->mti_env, mdt_object_child(o), capa, 0);
576 repbody->valid |= OBD_MD_FLOSSCAPA;
580 * If we are following a symlink, don't open; and do not return open
581 * handle for special nodes as client required.
583 if (islnk || (!isreg && !isdir &&
584 (req->rq_export->exp_connect_flags & OBD_CONNECT_NODEVOH))) {
585 lustre_msg_set_transno(req->rq_repmsg, 0);
589 mdt_set_disposition(info, rep, DISP_OPEN_OPEN);
592 * We need to return the existing object's fid back, so it is done here,
593 * after preparing the reply.
595 if (!created && (flags & MDS_OPEN_EXCL) && (flags & MDS_OPEN_CREAT))
598 /* This can't be done earlier, we need to return reply body */
600 if (flags & (MDS_OPEN_CREAT | FMODE_WRITE)) {
601 /* We are trying to create or write an existing dir. */
604 } else if (flags & MDS_OPEN_DIRECTORY)
607 if (OBD_FAIL_CHECK_RESET(OBD_FAIL_MDS_OPEN_CREATE,
608 OBD_FAIL_LDLM_REPLY | OBD_FAIL_ONCE)) {
613 if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
614 spin_lock(&med->med_open_lock);
615 list_for_each(t, &med->med_open_head) {
616 mfd = list_entry(t, struct mdt_file_data, mfd_list);
617 if (mfd->mfd_xid == req->rq_xid) {
622 spin_unlock(&med->med_open_lock);
625 repbody->handle.cookie = mfd->mfd_handle.h_cookie;
626 /*set repbody->ea_size for resent case*/
627 if (ma->ma_valid & MA_LOV) {
628 LASSERT(ma->ma_lmm_size != 0);
629 repbody->eadatasize = ma->ma_lmm_size;
631 repbody->valid |= OBD_MD_FLDIREA;
633 repbody->valid |= OBD_MD_FLEASIZE;
639 rc = mdt_mfd_open(info, p, o, flags, created);
643 extern void mdt_req_from_lcd(struct ptlrpc_request *req,
644 struct lsd_client_data *lcd);
646 void mdt_reconstruct_open(struct mdt_thread_info *info,
647 struct mdt_lock_handle *lhc)
649 const struct lu_env *env = info->mti_env;
650 struct mdt_device *mdt = info->mti_mdt;
651 struct req_capsule *pill = info->mti_pill;
652 struct ptlrpc_request *req = mdt_info_req(info);
653 struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
654 struct lsd_client_data *lcd = med->med_lcd;
655 struct md_attr *ma = &info->mti_attr;
656 struct mdt_reint_record *rr = &info->mti_rr;
657 __u32 flags = info->mti_spec.sp_cr_flags;
658 struct ldlm_reply *ldlm_rep;
659 struct mdt_object *parent;
660 struct mdt_object *child;
661 struct mdt_body *repbody;
665 LASSERT(pill->rc_fmt == &RQF_LDLM_INTENT_OPEN);
666 ldlm_rep = req_capsule_server_get(pill, &RMF_DLM_REP);
667 repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
669 ma->ma_lmm = req_capsule_server_get(pill, &RMF_MDT_MD);
670 ma->ma_lmm_size = req_capsule_get_size(pill, &RMF_MDT_MD,
672 ma->ma_need = MA_INODE | MA_LOV;
675 mdt_req_from_lcd(req, med->med_lcd);
676 mdt_set_disposition(info, ldlm_rep, lcd->lcd_last_data);
678 CERROR("This is reconstruct open: disp="LPX64", result=%d\n",
679 ldlm_rep->lock_policy_res1, req->rq_status);
681 if (mdt_get_disposition(ldlm_rep, DISP_OPEN_CREATE) &&
683 /* We did not create successfully, return error to client. */
684 GOTO(out, rc = req->rq_status);
686 if (mdt_get_disposition(ldlm_rep, DISP_OPEN_CREATE)) {
687 struct obd_export *exp = req->rq_export;
689 * We failed after creation, but we do not know in which step
690 * we failed. So try to check the child object.
692 parent = mdt_object_find(env, mdt, rr->rr_fid1);
693 if (IS_ERR(parent)) {
694 rc = PTR_ERR(parent);
695 LCONSOLE_WARN("Parent "DFID" lookup error %d."
696 " Evicting client %s with export %s.\n",
697 PFID(mdt_object_fid(parent)), rc,
698 obd_uuid2str(&exp->exp_client_uuid),
699 obd_export_nid2str(exp));
700 mdt_export_evict(exp);
704 child = mdt_object_find(env, mdt, rr->rr_fid2);
706 rc = PTR_ERR(parent);
707 LCONSOLE_WARN("Child "DFID" lookup error %d."
708 " Evicting client %s with export %s.\n",
709 PFID(mdt_object_fid(child)), rc,
710 obd_uuid2str(&exp->exp_client_uuid),
711 obd_export_nid2str(exp));
712 mdt_export_evict(exp);
716 rc = mdt_object_exists(child);
718 struct md_object *next;
720 mdt_set_capainfo(info, 1, rr->rr_fid2, BYPASS_CAPA);
721 next = mdt_object_child(child);
722 rc = mo_attr_get(env, next, ma);
724 rc = mdt_finish_open(info, parent, child,
727 /* the child object was created on remote server */
728 repbody->fid1 = *rr->rr_fid2;
729 repbody->valid |= (OBD_MD_FLID | OBD_MD_MDS);
731 } else if (rc == 0) {
732 /* the child does not exist, we should do regular open */
733 mdt_object_put(env, parent);
734 mdt_object_put(env, child);
735 GOTO(regular_open, 0);
737 mdt_object_put(env, parent);
738 mdt_object_put(env, child);
742 /* We did not try to create, so we are a pure open */
743 rc = mdt_reint_open(info, lhc);
749 lustre_msg_set_status(req->rq_repmsg, req->rq_status);
750 LASSERT(ergo(rc < 0, lustre_msg_get_transno(req->rq_repmsg) == 0));
753 static int mdt_open_by_fid(struct mdt_thread_info* info,
754 struct ldlm_reply *rep)
756 const struct lu_env *env = info->mti_env;
757 __u32 flags = info->mti_spec.sp_cr_flags;
758 struct mdt_reint_record *rr = &info->mti_rr;
759 struct md_attr *ma = &info->mti_attr;
760 struct mdt_object *o;
764 o = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid2);
766 RETURN(rc = PTR_ERR(o));
768 rc = mdt_object_exists(o);
770 mdt_set_disposition(info, rep, (DISP_IT_EXECD |
774 rc = mo_attr_get(env, mdt_object_child(o), ma);
776 rc = mdt_finish_open(info, NULL, o, flags, 0, rep);
777 } else if (rc == 0) {
780 /* the child object was created on remote server */
781 struct mdt_body *repbody;
782 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
783 repbody->fid1 = *rr->rr_fid2;
784 repbody->valid |= (OBD_MD_FLID | OBD_MD_MDS);
788 mdt_object_put(info->mti_env, o);
792 int mdt_pin(struct mdt_thread_info* info)
795 RETURN(err_serious(-EOPNOTSUPP));
798 /* Cross-ref request. Currently it can only be a pure open (w/o create) */
799 static int mdt_cross_open(struct mdt_thread_info* info,
800 const struct lu_fid *fid,
801 struct ldlm_reply *rep, __u32 flags)
803 struct md_attr *ma = &info->mti_attr;
804 struct mdt_object *o;
808 o = mdt_object_find(info->mti_env, info->mti_mdt, fid);
810 RETURN(rc = PTR_ERR(o));
812 rc = mdt_object_exists(o);
814 /* Do permission check for cross-open. */
815 rc = mo_permission(info->mti_env, NULL, mdt_object_child(o),
816 NULL, flags | MDS_OPEN_CROSS);
820 mdt_set_capainfo(info, 0, fid, BYPASS_CAPA);
821 rc = mo_attr_get(info->mti_env, mdt_object_child(o), ma);
823 rc = mdt_finish_open(info, NULL, o, flags, 0, rep);
824 } else if (rc == 0) {
826 * Something is wrong here. lookup was positive but there is
829 CERROR("Cross-ref object doesn't exist!\n");
832 /* Something is wrong here, the object is on another MDS! */
833 CERROR("The object isn't on this server! FLD error?\n");
834 LU_OBJECT_DEBUG(D_WARNING, info->mti_env,
836 "Object isn't on this server! FLD error?\n");
842 mdt_object_put(info->mti_env, o);
846 int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
848 struct mdt_device *mdt = info->mti_mdt;
849 struct ptlrpc_request *req = mdt_info_req(info);
850 struct mdt_object *parent;
851 struct mdt_object *child;
852 struct mdt_lock_handle *lh;
853 struct ldlm_reply *ldlm_rep;
854 struct mdt_body *repbody;
855 struct lu_fid *child_fid = &info->mti_tmp_fid1;
856 struct md_attr *ma = &info->mti_attr;
857 __u32 create_flags = info->mti_spec.sp_cr_flags;
858 struct mdt_reint_record *rr = &info->mti_rr;
859 struct lu_name *lname;
864 OBD_FAIL_TIMEOUT_ORSET(OBD_FAIL_MDS_PAUSE_OPEN, OBD_FAIL_ONCE,
865 (obd_timeout + 1) / 4);
867 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
869 ma->ma_lmm = req_capsule_server_get(info->mti_pill, &RMF_MDT_MD);
870 ma->ma_lmm_size = req_capsule_get_size(info->mti_pill, &RMF_MDT_MD,
872 ma->ma_need = MA_INODE | MA_LOV;
875 LASSERT(info->mti_pill->rc_fmt == &RQF_LDLM_INTENT_OPEN);
876 ldlm_rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP);
878 /* TODO: JOIN file */
879 if (create_flags & MDS_OPEN_JOIN_FILE) {
880 CERROR("JOIN file will be supported soon\n");
881 GOTO(out, result = err_serious(-EOPNOTSUPP));
884 CDEBUG(D_INODE, "I am going to open "DFID"/(%s->"DFID") "
885 "cr_flag=0%o mode=0%06o msg_flag=0x%x\n",
886 PFID(rr->rr_fid1), rr->rr_name,
887 PFID(rr->rr_fid2), create_flags,
888 ma->ma_attr.la_mode, lustre_msg_get_flags(req->rq_reqmsg));
890 if ((lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) ||
891 (req->rq_export->exp_libclient && create_flags&MDS_OPEN_HAS_EA)) {
892 /* This is a replay request or from liblustre with ea. */
893 result = mdt_open_by_fid(info, ldlm_rep);
895 if (result != -ENOENT) {
896 if (req->rq_export->exp_libclient &&
897 create_flags&MDS_OPEN_HAS_EA)
898 GOTO(out, result = 0);
902 * We didn't find the correct object, so we need to re-create it
903 * via a regular replay.
905 if (!(create_flags & MDS_OPEN_CREAT)) {
906 DEBUG_REQ(D_ERROR, req,"OPEN & CREAT not in open replay.");
907 GOTO(out, result = -EFAULT);
909 CDEBUG(D_INFO, "Open replay did find object, continue as "
913 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK))
914 GOTO(out, result = err_serious(-ENOMEM));
916 mdt_set_disposition(info, ldlm_rep,
917 (DISP_IT_EXECD | DISP_LOOKUP_EXECD));
919 if (info->mti_cross_ref) {
920 /* This is cross-ref open */
921 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
922 result = mdt_cross_open(info, rr->rr_fid1, ldlm_rep,
927 lh = &info->mti_lh[MDT_LH_PARENT];
928 mdt_lock_pdo_init(lh, (create_flags & MDS_OPEN_CREAT) ?
929 LCK_PW : LCK_PR, rr->rr_name, rr->rr_namelen);
931 parent = mdt_object_find_lock(info, rr->rr_fid1, lh,
932 MDS_INODELOCK_UPDATE);
934 GOTO(out, result = PTR_ERR(parent));
938 lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
940 result = mdo_lookup(info->mti_env, mdt_object_child(parent),
941 lname, child_fid, &info->mti_spec);
942 LASSERTF(ergo(result == 0, fid_is_sane(child_fid)),
943 "looking for "DFID"/%s, result fid="DFID"\n",
944 PFID(mdt_object_fid(parent)), rr->rr_name, PFID(child_fid));
946 if (result != 0 && result != -ENOENT && result != -ESTALE)
947 GOTO(out_parent, result);
949 if (result == -ENOENT || result == -ESTALE) {
950 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
951 if (result == -ESTALE) {
953 * -ESTALE means the parent is a dead(unlinked) dir, so
954 * it should return -ENOENT to in accordance with the
955 * original mds implementaion.
957 GOTO(out_parent, result = -ENOENT);
959 if (!(create_flags & MDS_OPEN_CREAT))
960 GOTO(out_parent, result);
961 *child_fid = *info->mti_rr.rr_fid2;
962 LASSERTF(fid_is_sane(child_fid), "fid="DFID"\n",
966 * Check for O_EXCL is moved to the mdt_finish_open(), we need to
967 * return FID back in that case.
969 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
972 child = mdt_object_find(info->mti_env, mdt, child_fid);
974 GOTO(out_parent, result = PTR_ERR(child));
976 mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA);
977 if (result == -ENOENT) {
978 /* Not found and with MDS_OPEN_CREAT: let's create it. */
979 mdt_set_disposition(info, ldlm_rep, DISP_OPEN_CREATE);
981 /* Let lower layers know what is lock mode on directory. */
982 info->mti_spec.sp_cr_mode =
983 mdt_dlm_mode2mdl_mode(lh->mlh_pdo_mode);
986 * Do not perform lookup sanity check. We know that name does
989 info->mti_spec.sp_cr_lookup = 0;
991 result = mdo_create(info->mti_env,
992 mdt_object_child(parent),
994 mdt_object_child(child),
997 if (result == -ERESTART) {
998 mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_CREATE);
999 GOTO(out_child, result);
1002 GOTO(out_child, result);
1006 /* We have to get attr & lov ea for this object */
1007 result = mo_attr_get(info->mti_env, mdt_object_child(child),
1010 * The object is on remote node, return its FID for remote open.
1012 if (result == -EREMOTE) {
1016 * Check if this lock already was sent to client and
1017 * this is resent case. For resent case do not take lock
1018 * again, use what is already granted.
1020 LASSERT(lhc != NULL);
1022 if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
1023 struct ldlm_lock *lock;
1025 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
1028 lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
1030 CERROR("Invalid lock handle "LPX64"\n",
1031 lhc->mlh_reg_lh.cookie);
1034 LASSERT(fid_res_name_eq(mdt_object_fid(child),
1035 &lock->l_resource->lr_name));
1036 LDLM_LOCK_PUT(lock);
1039 mdt_lock_handle_init(lhc);
1040 mdt_lock_reg_init(lhc, LCK_PR);
1042 rc = mdt_object_lock(info, child, lhc,
1043 MDS_INODELOCK_LOOKUP,
1046 repbody->fid1 = *mdt_object_fid(child);
1047 repbody->valid |= (OBD_MD_FLID | OBD_MD_MDS);
1050 GOTO(out_child, result);
1054 /* Try to open it now. */
1055 result = mdt_finish_open(info, parent, child, create_flags,
1058 if (result != 0 && created) {
1062 ma->ma_cookie_size = 0;
1063 info->mti_no_need_trans = 1;
1064 rc2 = mdo_unlink(info->mti_env,
1065 mdt_object_child(parent),
1066 mdt_object_child(child),
1070 CERROR("Error in cleanup of open\n");
1074 mdt_object_put(info->mti_env, child);
1076 mdt_object_unlock_put(info, parent, lh, result);
1079 lustre_msg_set_transno(req->rq_repmsg, 0);
1083 #define MFD_CLOSED(mode) (((mode) & ~(FMODE_EPOCH | FMODE_SOM | \
1084 FMODE_EPOCHLCK)) == FMODE_CLOSED)
1086 static int mdt_mfd_closed(struct mdt_file_data *mfd)
1088 return ((mfd == NULL) || MFD_CLOSED(mfd->mfd_mode));
1091 int mdt_mfd_close(struct mdt_thread_info *info, struct mdt_file_data *mfd)
1093 struct mdt_object *o = mfd->mfd_object;
1094 struct md_object *next = mdt_object_child(o);
1095 struct md_attr *ma = &info->mti_attr;
1096 int rc = 0, ret = 0;
1100 mode = mfd->mfd_mode;
1102 if ((mode & FMODE_WRITE) || (mode & FMODE_EPOCHLCK)) {
1103 mdt_write_put(info->mti_mdt, o);
1104 ret = mdt_epoch_close(info, o);
1105 } else if (mode & MDS_FMODE_EXEC) {
1106 mdt_write_allow(info->mti_mdt, o);
1107 } else if (mode & FMODE_EPOCH) {
1108 ret = mdt_epoch_close(info, o);
1111 /* Update atime on close only. */
1112 if ((mode & MDS_FMODE_EXEC || mode & FMODE_READ || mode & FMODE_WRITE)
1113 && (ma->ma_valid & MA_INODE) && (ma->ma_attr.la_valid & LA_ATIME)) {
1114 /* Set the atime only. */
1115 ma->ma_attr.la_valid = LA_ATIME;
1116 rc = mo_attr_set(info->mti_env, next, ma);
1119 ma->ma_need |= MA_INODE;
1122 if (!MFD_CLOSED(mode))
1123 rc = mo_close(info->mti_env, next, ma);
1124 else if (ret == -EAGAIN)
1125 rc = mo_attr_get(info->mti_env, next, ma);
1127 /* If the object is unlinked, do not try to re-enable SIZEONMDS */
1128 if ((ret == -EAGAIN) && (ma->ma_valid & MA_INODE) &&
1129 (ma->ma_attr.la_nlink == 0)) {
1133 if ((ret == -EAGAIN) || (ret == 1)) {
1134 struct mdt_export_data *med;
1136 /* The epoch has not closed or Size-on-MDS update is needed.
1137 * Put mfd back into the list. */
1138 LASSERT(mdt_conn_flags(info) & OBD_CONNECT_SOM);
1139 mdt_mfd_set_mode(mfd, (ret == 1 ? FMODE_EPOCH : FMODE_SOM));
1141 LASSERT(mdt_info_req(info));
1142 med = &mdt_info_req(info)->rq_export->exp_mdt_data;
1143 spin_lock(&med->med_open_lock);
1144 list_add(&mfd->mfd_list, &med->med_open_head);
1145 class_handle_hash_back(&mfd->mfd_handle);
1146 spin_unlock(&med->med_open_lock);
1151 CDEBUG(D_INODE, "Size-on-MDS attribute update is "
1152 "needed on "DFID"\n", PFID(mdt_object_fid(o)));
1156 mdt_object_put(info->mti_env, o);
1159 RETURN(rc ? rc : ret);
1162 int mdt_close(struct mdt_thread_info *info)
1164 struct mdt_export_data *med;
1165 struct mdt_file_data *mfd;
1166 struct mdt_object *o;
1167 struct md_attr *ma = &info->mti_attr;
1168 struct mdt_body *repbody = NULL;
1169 struct ptlrpc_request *req = mdt_info_req(info);
1173 /* Close may come with the Size-on-MDS update. Unpack it. */
1174 rc = mdt_close_unpack(info);
1176 RETURN(err_serious(rc));
1178 LASSERT(info->mti_epoch);
1180 req_capsule_set_size(info->mti_pill, &RMF_MDT_MD, RCL_SERVER,
1181 info->mti_mdt->mdt_max_mdsize);
1182 req_capsule_set_size(info->mti_pill, &RMF_LOGCOOKIES, RCL_SERVER,
1183 info->mti_mdt->mdt_max_cookiesize);
1184 rc = req_capsule_server_pack(info->mti_pill);
1185 if (mdt_check_resent(info, mdt_reconstruct_generic, NULL))
1186 RETURN(lustre_msg_get_status(req->rq_repmsg));
1188 /* Continue to close handle even if we can not pack reply */
1190 repbody = req_capsule_server_get(info->mti_pill,
1192 ma->ma_lmm = req_capsule_server_get(info->mti_pill,
1194 ma->ma_lmm_size = req_capsule_get_size(info->mti_pill,
1197 ma->ma_cookie = req_capsule_server_get(info->mti_pill,
1199 ma->ma_cookie_size = req_capsule_get_size(info->mti_pill,
1202 ma->ma_need = MA_INODE | MA_LOV | MA_COOKIE;
1203 repbody->eadatasize = 0;
1204 repbody->aclsize = 0;
1206 rc = err_serious(rc);
1208 med = &req->rq_export->exp_mdt_data;
1209 spin_lock(&med->med_open_lock);
1210 mfd = mdt_handle2mfd(info, &info->mti_epoch->handle);
1211 if (mdt_mfd_closed(mfd)) {
1212 spin_unlock(&med->med_open_lock);
1213 CDEBUG(D_INODE, "no handle for file close: fid = "DFID
1214 ": cookie = "LPX64"\n", PFID(info->mti_rr.rr_fid1),
1215 info->mti_epoch->handle.cookie);
1216 rc = err_serious(-ESTALE);
1218 class_handle_unhash(&mfd->mfd_handle);
1219 list_del_init(&mfd->mfd_list);
1220 spin_unlock(&med->med_open_lock);
1222 /* Do not lose object before last unlink. */
1223 o = mfd->mfd_object;
1224 mdt_object_get(info->mti_env, o);
1225 ret = mdt_mfd_close(info, mfd);
1226 if (repbody != NULL)
1227 rc = mdt_handle_last_unlink(info, o, ma);
1228 mdt_empty_transno(info);
1229 mdt_object_put(info->mti_env, o);
1231 if (repbody != NULL)
1232 mdt_shrink_reply(info);
1234 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK))
1235 RETURN(err_serious(-ENOMEM));
1237 if (OBD_FAIL_CHECK_RESET(OBD_FAIL_MDS_CLOSE_NET_REP,
1238 OBD_FAIL_MDS_CLOSE_NET_REP))
1239 info->mti_fail_id = OBD_FAIL_MDS_CLOSE_NET_REP;
1240 RETURN(rc ? rc : ret);
1243 int mdt_done_writing(struct mdt_thread_info *info)
1245 struct mdt_body *repbody = NULL;
1246 struct mdt_export_data *med;
1247 struct mdt_file_data *mfd;
1251 rc = req_capsule_server_pack(info->mti_pill);
1253 RETURN(err_serious(rc));
1255 repbody = req_capsule_server_get(info->mti_pill,
1257 repbody->eadatasize = 0;
1258 repbody->aclsize = 0;
1260 /* Done Writing may come with the Size-on-MDS update. Unpack it. */
1261 rc = mdt_close_unpack(info);
1263 RETURN(err_serious(rc));
1265 if (mdt_check_resent(info, mdt_reconstruct_generic, NULL))
1266 RETURN(lustre_msg_get_status(mdt_info_req(info)->rq_repmsg));
1268 med = &info->mti_exp->exp_mdt_data;
1269 spin_lock(&med->med_open_lock);
1270 mfd = mdt_handle2mfd(info, &info->mti_epoch->handle);
1272 spin_unlock(&med->med_open_lock);
1273 CDEBUG(D_INODE, "no handle for done write: fid = "DFID
1274 ": cookie = "LPX64"\n", PFID(info->mti_rr.rr_fid1),
1275 info->mti_epoch->handle.cookie);
1279 LASSERT(mfd->mfd_mode == FMODE_EPOCH ||
1280 mfd->mfd_mode == FMODE_EPOCHLCK);
1281 class_handle_unhash(&mfd->mfd_handle);
1282 list_del_init(&mfd->mfd_list);
1283 spin_unlock(&med->med_open_lock);
1285 /* Set EPOCH CLOSE flag if not set by client. */
1286 info->mti_epoch->flags |= MF_EPOCH_CLOSE;
1287 info->mti_attr.ma_valid = 0;
1288 rc = mdt_mfd_close(info, mfd);
1289 mdt_empty_transno(info);