1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
5 * Lustre Metadata Target (mdt) open/close file handling
7 * Copyright (C) 2002-2006 Cluster File Systems, Inc.
8 * Author: Huang Hua <huanghua@clusterfs.com>
10 * This file is part of the Lustre file system, http://www.lustre.org
11 * Lustre is a trademark of Cluster File Systems, Inc.
13 * You may have signed or agreed to another license before downloading
14 * this software. If so, you are bound by the terms and conditions
15 * of that agreement, and the following does not apply to you. See the
16 * LICENSE file included with this distribution for more information.
18 * If you did not agree to a different license, then this copy of Lustre
19 * is open source software; you can redistribute it and/or modify it
20 * under the terms of version 2 of the GNU General Public License as
21 * published by the Free Software Foundation.
23 * In either case, Lustre is distributed in the hope that it will be
24 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * license text for more details.
30 # define EXPORT_SYMTAB
32 #define DEBUG_SUBSYSTEM S_MDS
34 #include <linux/lustre_acl.h>
35 #include <lustre_mds.h>
36 #include "mdt_internal.h"
38 /* we do nothing because we do not have refcount now */
39 static void mdt_mfd_get(void *mfdp)
43 /* Create a new mdt_file_data struct, initialize it,
44 * and insert it to global hash table */
45 struct mdt_file_data *mdt_mfd_new(void)
47 struct mdt_file_data *mfd;
52 CFS_INIT_LIST_HEAD(&mfd->mfd_handle.h_link);
53 CFS_INIT_LIST_HEAD(&mfd->mfd_list);
54 class_handle_hash(&mfd->mfd_handle, mdt_mfd_get);
60 * Find the mfd pointed to by handle in global hash table.
61 * In case of replay the handle is obsoleted
62 * but mfd can be found in mfd list by that handle
64 struct mdt_file_data *mdt_handle2mfd(struct mdt_thread_info *info,
65 const struct lustre_handle *handle)
67 struct ptlrpc_request *req = mdt_info_req(info);
68 struct mdt_file_data *mfd;
71 LASSERT(handle != NULL);
72 mfd = class_handle2object(handle->cookie);
73 /* during dw/setattr replay the mfd can be found by old handle */
75 lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
76 struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
77 list_for_each_entry(mfd, &med->med_open_head, mfd_list) {
78 if (mfd->mfd_old_handle.cookie == handle->cookie)
87 void mdt_mfd_free(struct mdt_file_data *mfd)
89 LASSERT(list_empty(&mfd->mfd_list));
90 OBD_FREE_RCU(mfd, sizeof *mfd, &mfd->mfd_handle);
93 static int mdt_create_data(struct mdt_thread_info *info,
94 struct mdt_object *p, struct mdt_object *o)
96 struct md_op_spec *spec = &info->mti_spec;
97 struct md_attr *ma = &info->mti_attr;
101 if (!md_should_create(spec->sp_cr_flags))
104 ma->ma_need = MA_INODE | MA_LOV;
106 rc = mdo_create_data(info->mti_env,
107 p ? mdt_object_child(p) : NULL,
108 mdt_object_child(o), spec, ma);
112 static int mdt_epoch_opened(struct mdt_object *mo)
114 return mo->mot_epochcount;
117 int mdt_sizeonmds_enabled(struct mdt_object *mo)
119 return !mo->mot_ioepoch;
122 /* Re-enable Size-on-MDS. */
123 void mdt_sizeonmds_enable(struct mdt_thread_info *info,
124 struct mdt_object *mo)
126 spin_lock(&info->mti_mdt->mdt_ioepoch_lock);
127 if (info->mti_epoch->ioepoch == mo->mot_ioepoch) {
128 LASSERT(!mdt_epoch_opened(mo));
132 spin_unlock(&info->mti_mdt->mdt_ioepoch_lock);
135 /* Open the epoch. Epoch open is allowed if @writecount is not negative.
136 * The epoch and writecount handling is performed under the mdt_ioepoch_lock. */
137 int mdt_epoch_open(struct mdt_thread_info *info, struct mdt_object *o)
139 struct mdt_device *mdt = info->mti_mdt;
144 if (!(mdt_conn_flags(info) & OBD_CONNECT_SOM) ||
145 !S_ISREG(lu_object_attr(&o->mot_obj.mo_lu)))
148 spin_lock(&mdt->mdt_ioepoch_lock);
149 if (mdt_epoch_opened(o)) {
150 /* Epoch continues even if there is no writers yet. */
151 CDEBUG(D_INODE, "continue epoch "LPU64" for "DFID"\n",
152 o->mot_ioepoch, PFID(mdt_object_fid(o)));
154 if (info->mti_replayepoch > mdt->mdt_ioepoch)
155 mdt->mdt_ioepoch = info->mti_replayepoch;
158 o->mot_ioepoch = info->mti_replayepoch ?
159 info->mti_replayepoch : mdt->mdt_ioepoch;
160 CDEBUG(D_INODE, "starting epoch "LPU64" for "DFID"\n",
161 mdt->mdt_ioepoch, PFID(mdt_object_fid(o)));
165 spin_unlock(&mdt->mdt_ioepoch_lock);
167 /* Cancel Size-on-MDS attributes on clients if not truncate.
168 * In the later case, mdt_reint_setattr will do it. */
169 if (cancel && (info->mti_rr.rr_fid1 != NULL)) {
170 struct mdt_lock_handle *lh = &info->mti_lh[MDT_LH_CHILD];
171 mdt_lock_reg_init(lh, LCK_EX);
172 rc = mdt_object_lock(info, o, lh, MDS_INODELOCK_UPDATE,
175 mdt_object_unlock(info, o, lh, 1);
180 /* Update the on-disk attributes if needed and re-enable Size-on-MDS caching. */
181 static int mdt_sizeonmds_update(struct mdt_thread_info *info,
182 struct mdt_object *o)
186 CDEBUG(D_INODE, "Closing epoch "LPU64" on "DFID". Count %d\n",
187 o->mot_ioepoch, PFID(mdt_object_fid(o)), o->mot_epochcount);
189 if (info->mti_attr.ma_attr.la_valid & LA_SIZE) {
190 /* Do Size-on-MDS attribute update.
191 * Size-on-MDS is re-enabled inside. */
192 /* XXX: since we have opened the file, it is unnecessary
193 * to check permission when close it. Between the "open"
194 * and "close", maybe someone has changed the file mode
195 * or flags, or the file created mode do not permit wirte,
196 * and so on. Just set MDS_PERM_BYPASS for all the cases. */
197 info->mti_attr.ma_attr_flags |= MDS_PERM_BYPASS;
198 info->mti_attr.ma_attr.la_valid &= LA_SIZE | LA_BLOCKS |
199 LA_ATIME | LA_MTIME | LA_CTIME;
200 RETURN(mdt_attr_set(info, o, 0));
202 mdt_sizeonmds_enable(info, o);
207 * Returns 1 if epoch does not close.
208 * Returns 0 if epoch closes.
209 * Returns -EAGAIN if epoch closes but an Size-on-MDS Update is still needed
210 * from the client. */
211 static int mdt_epoch_close(struct mdt_thread_info *info, struct mdt_object *o)
213 int eviction = (mdt_info_req(info) == NULL ? 1 : 0);
214 struct lu_attr *la = &info->mti_attr.ma_attr;
220 if (!(mdt_conn_flags(info) & OBD_CONNECT_SOM) ||
221 !S_ISREG(lu_object_attr(&o->mot_obj.mo_lu)))
224 spin_lock(&info->mti_mdt->mdt_ioepoch_lock);
226 /* Epoch closes only if client tells about it or eviction occures. */
227 if (eviction || (info->mti_epoch->flags & MF_EPOCH_CLOSE)) {
228 LASSERT(o->mot_epochcount);
231 CDEBUG(D_INODE, "Closing epoch "LPU64" on "DFID". Count %d\n",
232 o->mot_ioepoch, PFID(mdt_object_fid(o)),
236 achange = (info->mti_epoch->flags & MF_SOM_CHANGE);
239 if (!eviction && !mdt_epoch_opened(o)) {
240 /* Epoch ends. Is an Size-on-MDS update needed? */
241 if (o->mot_flags & MF_SOM_CHANGE) {
242 /* Some previous writer changed the attribute.
243 * Do not believe to the current Size-on-MDS
244 * update, re-ask client. */
246 } else if (!(la->la_valid & LA_SIZE) && achange) {
247 /* Attributes were changed by the last writer
248 * only but no Size-on-MDS update is received.*/
253 if (achange || eviction)
254 o->mot_flags |= MF_SOM_CHANGE;
257 opened = mdt_epoch_opened(o);
258 spin_unlock(&info->mti_mdt->mdt_ioepoch_lock);
260 /* If eviction occurred, do nothing. */
261 if ((rc == 0) && !opened && !eviction) {
262 /* Epoch ends and wanted Size-on-MDS update is obtained. */
263 rc = mdt_sizeonmds_update(info, o);
264 /* Avoid the following setattrs of these attributes, e.g.
265 * for atime update. */
266 info->mti_attr.ma_valid = 0;
271 int mdt_write_read(struct mdt_device *mdt, struct mdt_object *o)
275 spin_lock(&mdt->mdt_ioepoch_lock);
276 rc = o->mot_writecount;
277 spin_unlock(&mdt->mdt_ioepoch_lock);
281 int mdt_write_get(struct mdt_device *mdt, struct mdt_object *o)
285 spin_lock(&mdt->mdt_ioepoch_lock);
286 if (o->mot_writecount < 0)
290 spin_unlock(&mdt->mdt_ioepoch_lock);
294 static void mdt_write_put(struct mdt_device *mdt, struct mdt_object *o)
297 spin_lock(&mdt->mdt_ioepoch_lock);
299 spin_unlock(&mdt->mdt_ioepoch_lock);
303 static int mdt_write_deny(struct mdt_device *mdt, struct mdt_object *o)
307 spin_lock(&mdt->mdt_ioepoch_lock);
308 if (o->mot_writecount > 0)
312 spin_unlock(&mdt->mdt_ioepoch_lock);
316 static void mdt_write_allow(struct mdt_device *mdt, struct mdt_object *o)
319 spin_lock(&mdt->mdt_ioepoch_lock);
321 spin_unlock(&mdt->mdt_ioepoch_lock);
325 /* there can be no real transaction so prepare the fake one */
326 static void mdt_empty_transno(struct mdt_thread_info* info)
328 struct mdt_device *mdt = info->mti_mdt;
329 struct ptlrpc_request *req = mdt_info_req(info);
332 /* transaction is occured already */
333 if (lustre_msg_get_transno(req->rq_repmsg) != 0) {
338 spin_lock(&mdt->mdt_transno_lock);
339 if (info->mti_transno == 0) {
340 info->mti_transno = ++ mdt->mdt_last_transno;
342 /* should be replay */
343 if (info->mti_transno > mdt->mdt_last_transno)
344 mdt->mdt_last_transno = info->mti_transno;
346 spin_unlock(&mdt->mdt_transno_lock);
348 CDEBUG(D_INODE, "transno = %llu, last_committed = %llu\n",
350 req->rq_export->exp_obd->obd_last_committed);
352 req->rq_transno = info->mti_transno;
353 lustre_msg_set_transno(req->rq_repmsg, info->mti_transno);
354 lustre_msg_set_last_xid(req->rq_repmsg, req->rq_xid);
358 void mdt_mfd_set_mode(struct mdt_file_data *mfd, int mode)
360 LASSERT(mfd != NULL);
362 CDEBUG(D_HA, "Change mfd %p mode 0x%x->0x%x\n",
363 mfd, (unsigned int)mfd->mfd_mode, (unsigned int)mode);
365 mfd->mfd_mode = mode;
368 static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p,
369 struct mdt_object *o, int flags, int created)
371 struct ptlrpc_request *req = mdt_info_req(info);
372 struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
373 struct mdt_file_data *mfd;
374 struct md_attr *ma = &info->mti_attr;
375 struct lu_attr *la = &ma->ma_attr;
376 struct mdt_body *repbody;
377 int rc = 0, isdir, isreg;
380 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
382 isreg = S_ISREG(la->la_mode);
383 isdir = S_ISDIR(la->la_mode);
384 if ((isreg && !(ma->ma_valid & MA_LOV))) {
386 * No EA, check whether it is will set regEA and dirEA since in
387 * above attr get, these size might be zero, so reset it, to
388 * retrieve the MD after create obj.
390 ma->ma_lmm_size = req_capsule_get_size(info->mti_pill,
393 /* in replay case, p == NULL */
394 rc = mdt_create_data(info, p, o);
399 CDEBUG(D_INODE, "after open, ma_valid bit = "LPX64" lmm_size = %d\n",
400 ma->ma_valid, ma->ma_lmm_size);
402 if (ma->ma_valid & MA_LOV) {
403 LASSERT(ma->ma_lmm_size != 0);
404 repbody->eadatasize = ma->ma_lmm_size;
406 repbody->valid |= OBD_MD_FLDIREA;
408 repbody->valid |= OBD_MD_FLEASIZE;
411 if (flags & FMODE_WRITE) {
412 rc = mdt_write_get(info->mti_mdt, o);
414 mdt_epoch_open(info, o);
415 repbody->ioepoch = o->mot_ioepoch;
417 } else if (flags & MDS_FMODE_EXEC) {
418 rc = mdt_write_deny(info->mti_mdt, o);
423 rc = mo_open(info->mti_env, mdt_object_child(o),
424 created ? flags | MDS_OPEN_CREATED : flags);
431 * Keep a reference on this object for this open, and is
432 * released by mdt_mfd_close().
434 mdt_object_get(info->mti_env, o);
437 * @flags is always not zero. At least it should be FMODE_READ,
438 * FMODE_WRITE or FMODE_EXEC.
443 mdt_mfd_set_mode(mfd, flags);
446 mfd->mfd_xid = req->rq_xid;
449 if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
450 struct mdt_file_data *old_mfd;
451 /* Check wheather old cookie already exist in
452 * the list, becasue when do recovery, client
453 * might be disconnected from server, and
454 * restart replay, so there maybe some orphan
455 * mfd here, we should remove them */
456 LASSERT(info->mti_rr.rr_handle != NULL);
457 old_mfd = mdt_handle2mfd(info, info->mti_rr.rr_handle);
459 CDEBUG(D_HA, "del orph mfd %p cookie" LPX64"\n",
460 mfd, info->mti_rr.rr_handle->cookie);
461 spin_lock(&med->med_open_lock);
462 class_handle_unhash(&old_mfd->mfd_handle);
463 list_del_init(&old_mfd->mfd_list);
464 spin_unlock(&med->med_open_lock);
465 mdt_mfd_free(old_mfd);
467 CDEBUG(D_HA, "Store old cookie "LPX64" in new mfd\n",
468 info->mti_rr.rr_handle->cookie);
469 mfd->mfd_old_handle.cookie =
470 info->mti_rr.rr_handle->cookie;
472 spin_lock(&med->med_open_lock);
473 list_add(&mfd->mfd_list, &med->med_open_head);
474 spin_unlock(&med->med_open_lock);
476 repbody->handle.cookie = mfd->mfd_handle.h_cookie;
477 mdt_empty_transno(info);
485 static int mdt_finish_open(struct mdt_thread_info *info,
486 struct mdt_object *p, struct mdt_object *o,
487 int flags, int created, struct ldlm_reply *rep)
489 struct ptlrpc_request *req = mdt_info_req(info);
490 struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
491 struct mdt_device *mdt = info->mti_mdt;
492 struct md_attr *ma = &info->mti_attr;
493 struct lu_attr *la = &ma->ma_attr;
494 struct mdt_file_data *mfd;
495 struct mdt_body *repbody;
497 int isreg, isdir, islnk;
501 LASSERT(ma->ma_valid & MA_INODE);
503 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
505 isreg = S_ISREG(la->la_mode);
506 isdir = S_ISDIR(la->la_mode);
507 islnk = S_ISLNK(la->la_mode);
508 mdt_pack_attr2body(info, repbody, la, mdt_object_fid(o));
510 if (med->med_rmtclient) {
511 void *buf = req_capsule_server_get(info->mti_pill, &RMF_ACL);
513 rc = mdt_pack_remote_perm(info, o, buf);
515 repbody->valid &= ~OBD_MD_FLRMTPERM;
516 repbody->aclsize = 0;
518 repbody->valid |= OBD_MD_FLRMTPERM;
519 repbody->aclsize = sizeof(struct mdt_remote_perm);
522 #ifdef CONFIG_FS_POSIX_ACL
523 else if (req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) {
524 const struct lu_env *env = info->mti_env;
525 struct md_object *next = mdt_object_child(o);
526 struct lu_buf *buf = &info->mti_buf;
528 buf->lb_buf = req_capsule_server_get(info->mti_pill, &RMF_ACL);
529 buf->lb_len = req_capsule_get_size(info->mti_pill, &RMF_ACL,
531 if (buf->lb_len > 0) {
532 rc = mo_xattr_get(env, next, buf,
533 XATTR_NAME_ACL_ACCESS);
535 if (rc == -ENODATA) {
536 repbody->aclsize = 0;
537 repbody->valid |= OBD_MD_FLACL;
539 } else if (rc == -EOPNOTSUPP) {
542 CERROR("got acl size: %d\n", rc);
545 repbody->aclsize = rc;
546 repbody->valid |= OBD_MD_FLACL;
553 if (mdt->mdt_opts.mo_mds_capa) {
554 struct lustre_capa *capa;
556 capa = req_capsule_server_get(info->mti_pill, &RMF_CAPA1);
558 capa->lc_opc = CAPA_OPC_MDS_DEFAULT;
560 rc = mo_capa_get(info->mti_env, mdt_object_child(o), capa, 0);
563 repbody->valid |= OBD_MD_FLMDSCAPA;
565 if (mdt->mdt_opts.mo_oss_capa &&
566 S_ISREG(lu_object_attr(&o->mot_obj.mo_lu))) {
567 struct lustre_capa *capa;
569 capa = req_capsule_server_get(info->mti_pill, &RMF_CAPA2);
571 capa->lc_opc = CAPA_OPC_OSS_DEFAULT | capa_open_opc(flags);
573 rc = mo_capa_get(info->mti_env, mdt_object_child(o), capa, 0);
576 repbody->valid |= OBD_MD_FLOSSCAPA;
580 * If we are following a symlink, don't open; and do not return open
581 * handle for special nodes as client required.
583 if (islnk || (!isreg && !isdir &&
584 (req->rq_export->exp_connect_flags & OBD_CONNECT_NODEVOH))) {
585 lustre_msg_set_transno(req->rq_repmsg, 0);
589 mdt_set_disposition(info, rep, DISP_OPEN_OPEN);
592 * We need to return the existing object's fid back, so it is done here,
593 * after preparing the reply.
595 if (!created && (flags & MDS_OPEN_EXCL) && (flags & MDS_OPEN_CREAT))
598 /* This can't be done earlier, we need to return reply body */
600 if (flags & (MDS_OPEN_CREAT | FMODE_WRITE)) {
601 /* We are trying to create or write an existing dir. */
604 } else if (flags & MDS_OPEN_DIRECTORY)
607 if (OBD_FAIL_CHECK_RESET(OBD_FAIL_MDS_OPEN_CREATE,
608 OBD_FAIL_LDLM_REPLY | OBD_FAIL_ONCE)) {
613 if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
614 spin_lock(&med->med_open_lock);
615 list_for_each(t, &med->med_open_head) {
616 mfd = list_entry(t, struct mdt_file_data, mfd_list);
617 if (mfd->mfd_xid == req->rq_xid) {
622 spin_unlock(&med->med_open_lock);
625 repbody->handle.cookie = mfd->mfd_handle.h_cookie;
626 /*set repbody->ea_size for resent case*/
627 if (ma->ma_valid & MA_LOV) {
628 LASSERT(ma->ma_lmm_size != 0);
629 repbody->eadatasize = ma->ma_lmm_size;
631 repbody->valid |= OBD_MD_FLDIREA;
633 repbody->valid |= OBD_MD_FLEASIZE;
639 rc = mdt_mfd_open(info, p, o, flags, created);
643 extern void mdt_req_from_lcd(struct ptlrpc_request *req,
644 struct lsd_client_data *lcd);
646 void mdt_reconstruct_open(struct mdt_thread_info *info,
647 struct mdt_lock_handle *lhc)
649 const struct lu_env *env = info->mti_env;
650 struct mdt_device *mdt = info->mti_mdt;
651 struct req_capsule *pill = info->mti_pill;
652 struct ptlrpc_request *req = mdt_info_req(info);
653 struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
654 struct lsd_client_data *lcd = med->med_lcd;
655 struct md_attr *ma = &info->mti_attr;
656 struct mdt_reint_record *rr = &info->mti_rr;
657 __u32 flags = info->mti_spec.sp_cr_flags;
658 struct ldlm_reply *ldlm_rep;
659 struct mdt_object *parent;
660 struct mdt_object *child;
661 struct mdt_body *repbody;
665 LASSERT(pill->rc_fmt == &RQF_LDLM_INTENT_OPEN);
666 ldlm_rep = req_capsule_server_get(pill, &RMF_DLM_REP);
667 repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
669 ma->ma_lmm = req_capsule_server_get(pill, &RMF_MDT_MD);
670 ma->ma_lmm_size = req_capsule_get_size(pill, &RMF_MDT_MD,
672 ma->ma_need = MA_INODE | MA_LOV;
675 mdt_req_from_lcd(req, med->med_lcd);
676 mdt_set_disposition(info, ldlm_rep, lcd->lcd_last_data);
678 CERROR("This is reconstruct open: disp="LPX64", result=%d\n",
679 ldlm_rep->lock_policy_res1, req->rq_status);
681 if (mdt_get_disposition(ldlm_rep, DISP_OPEN_CREATE) &&
683 /* We did not create successfully, return error to client. */
684 GOTO(out, rc = req->rq_status);
686 if (mdt_get_disposition(ldlm_rep, DISP_OPEN_CREATE)) {
687 struct obd_export *exp = req->rq_export;
689 * We failed after creation, but we do not know in which step
690 * we failed. So try to check the child object.
692 parent = mdt_object_find(env, mdt, rr->rr_fid1);
693 if (IS_ERR(parent)) {
694 rc = PTR_ERR(parent);
695 LCONSOLE_WARN("Parent "DFID" lookup error %d."
696 " Evicting client %s with export %s.\n",
697 PFID(mdt_object_fid(parent)), rc,
698 obd_uuid2str(&exp->exp_client_uuid),
699 obd_export_nid2str(exp));
700 mdt_export_evict(exp);
704 child = mdt_object_find(env, mdt, rr->rr_fid2);
706 rc = PTR_ERR(parent);
707 LCONSOLE_WARN("Child "DFID" lookup error %d."
708 " Evicting client %s with export %s.\n",
709 PFID(mdt_object_fid(child)), rc,
710 obd_uuid2str(&exp->exp_client_uuid),
711 obd_export_nid2str(exp));
712 mdt_export_evict(exp);
716 rc = mdt_object_exists(child);
718 struct md_object *next;
720 mdt_set_capainfo(info, 1, rr->rr_fid2, BYPASS_CAPA);
721 next = mdt_object_child(child);
722 rc = mo_attr_get(env, next, ma);
724 rc = mdt_finish_open(info, parent, child,
727 /* the child object was created on remote server */
728 repbody->fid1 = *rr->rr_fid2;
729 repbody->valid |= (OBD_MD_FLID | OBD_MD_MDS);
731 } else if (rc == 0) {
732 /* the child does not exist, we should do regular open */
733 mdt_object_put(env, parent);
734 mdt_object_put(env, child);
735 GOTO(regular_open, 0);
737 mdt_object_put(env, parent);
738 mdt_object_put(env, child);
742 /* We did not try to create, so we are a pure open */
743 rc = mdt_reint_open(info, lhc);
749 lustre_msg_set_status(req->rq_repmsg, req->rq_status);
750 LASSERT(ergo(rc < 0, lustre_msg_get_transno(req->rq_repmsg) == 0));
753 static int mdt_open_by_fid(struct mdt_thread_info* info,
754 struct ldlm_reply *rep)
756 const struct lu_env *env = info->mti_env;
757 __u32 flags = info->mti_spec.sp_cr_flags;
758 struct mdt_reint_record *rr = &info->mti_rr;
759 struct md_attr *ma = &info->mti_attr;
760 struct mdt_object *o;
764 o = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid2);
766 RETURN(rc = PTR_ERR(o));
768 rc = mdt_object_exists(o);
770 mdt_set_disposition(info, rep, (DISP_IT_EXECD |
774 rc = mo_attr_get(env, mdt_object_child(o), ma);
776 rc = mdt_finish_open(info, NULL, o, flags, 0, rep);
777 } else if (rc == 0) {
780 /* the child object was created on remote server */
781 struct mdt_body *repbody;
782 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
783 repbody->fid1 = *rr->rr_fid2;
784 repbody->valid |= (OBD_MD_FLID | OBD_MD_MDS);
788 mdt_object_put(info->mti_env, o);
792 int mdt_pin(struct mdt_thread_info* info)
795 RETURN(err_serious(-EOPNOTSUPP));
798 /* Cross-ref request. Currently it can only be a pure open (w/o create) */
799 static int mdt_cross_open(struct mdt_thread_info* info,
800 const struct lu_fid *fid,
801 struct ldlm_reply *rep, __u32 flags)
803 struct md_attr *ma = &info->mti_attr;
804 struct mdt_object *o;
808 o = mdt_object_find(info->mti_env, info->mti_mdt, fid);
810 RETURN(rc = PTR_ERR(o));
812 rc = mdt_object_exists(o);
814 /* Do permission check for cross-open. */
815 rc = mo_permission(info->mti_env, NULL, mdt_object_child(o),
816 NULL, flags | MDS_OPEN_CROSS);
820 mdt_set_capainfo(info, 0, fid, BYPASS_CAPA);
821 rc = mo_attr_get(info->mti_env, mdt_object_child(o), ma);
823 rc = mdt_finish_open(info, NULL, o, flags, 0, rep);
824 } else if (rc == 0) {
826 * Something is wrong here. lookup was positive but there is
829 CERROR("Cross-ref object doesn't exist!\n");
832 /* Something is wrong here, the object is on another MDS! */
833 CERROR("The object isn't on this server! FLD error?\n");
834 LU_OBJECT_DEBUG(D_WARNING, info->mti_env,
836 "Object isn't on this server! FLD error?\n");
842 mdt_object_put(info->mti_env, o);
846 int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
848 struct mdt_device *mdt = info->mti_mdt;
849 struct ptlrpc_request *req = mdt_info_req(info);
850 struct mdt_object *parent;
851 struct mdt_object *child;
852 struct mdt_lock_handle *lh;
853 struct ldlm_reply *ldlm_rep;
854 struct mdt_body *repbody;
855 struct lu_fid *child_fid = &info->mti_tmp_fid1;
856 struct md_attr *ma = &info->mti_attr;
857 __u32 create_flags = info->mti_spec.sp_cr_flags;
858 struct mdt_reint_record *rr = &info->mti_rr;
859 struct lu_name *lname;
865 OBD_FAIL_TIMEOUT_ORSET(OBD_FAIL_MDS_PAUSE_OPEN, OBD_FAIL_ONCE,
866 (obd_timeout + 1) / 4);
868 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
870 ma->ma_lmm = req_capsule_server_get(info->mti_pill, &RMF_MDT_MD);
871 ma->ma_lmm_size = req_capsule_get_size(info->mti_pill, &RMF_MDT_MD,
873 ma->ma_need = MA_INODE | MA_LOV;
876 LASSERT(info->mti_pill->rc_fmt == &RQF_LDLM_INTENT_OPEN);
877 ldlm_rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP);
879 /* TODO: JOIN file */
880 if (create_flags & MDS_OPEN_JOIN_FILE) {
881 CERROR("JOIN file will be supported soon\n");
882 GOTO(out, result = err_serious(-EOPNOTSUPP));
884 msg_flags = lustre_msg_get_flags(req->rq_reqmsg);
886 CDEBUG(D_INODE, "I am going to open "DFID"/(%s->"DFID") "
887 "cr_flag=0%o mode=0%06o msg_flag=0x%x\n",
888 PFID(rr->rr_fid1), rr->rr_name,
889 PFID(rr->rr_fid2), create_flags,
890 ma->ma_attr.la_mode, msg_flags);
892 if (msg_flags & MSG_REPLAY ||
893 (req->rq_export->exp_libclient && create_flags&MDS_OPEN_HAS_EA)) {
894 /* This is a replay request or from liblustre with ea. */
895 result = mdt_open_by_fid(info, ldlm_rep);
897 if (result != -ENOENT) {
898 if (req->rq_export->exp_libclient &&
899 create_flags&MDS_OPEN_HAS_EA)
900 GOTO(out, result = 0);
904 * We didn't find the correct object, so we need to re-create it
905 * via a regular replay.
907 if (!(create_flags & MDS_OPEN_CREAT)) {
908 DEBUG_REQ(D_ERROR, req,"OPEN & CREAT not in open replay.");
909 GOTO(out, result = -EFAULT);
911 CDEBUG(D_INFO, "Open replay did find object, continue as "
915 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK))
916 GOTO(out, result = err_serious(-ENOMEM));
918 mdt_set_disposition(info, ldlm_rep,
919 (DISP_IT_EXECD | DISP_LOOKUP_EXECD));
921 if (info->mti_cross_ref) {
922 /* This is cross-ref open */
923 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
924 result = mdt_cross_open(info, rr->rr_fid1, ldlm_rep,
929 lh = &info->mti_lh[MDT_LH_PARENT];
930 mdt_lock_pdo_init(lh, (create_flags & MDS_OPEN_CREAT) ?
931 LCK_PW : LCK_PR, rr->rr_name, rr->rr_namelen);
933 parent = mdt_object_find_lock(info, rr->rr_fid1, lh,
934 MDS_INODELOCK_UPDATE);
936 GOTO(out, result = PTR_ERR(parent));
940 lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
942 result = mdo_lookup(info->mti_env, mdt_object_child(parent),
943 lname, child_fid, &info->mti_spec);
944 LASSERTF(ergo(result == 0, fid_is_sane(child_fid)),
945 "looking for "DFID"/%s, result fid="DFID"\n",
946 PFID(mdt_object_fid(parent)), rr->rr_name, PFID(child_fid));
948 if (result != 0 && result != -ENOENT && result != -ESTALE)
949 GOTO(out_parent, result);
951 if (result == -ENOENT || result == -ESTALE) {
952 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
953 if (result == -ESTALE) {
955 * -ESTALE means the parent is a dead(unlinked) dir, so
956 * it should return -ENOENT to in accordance with the
957 * original mds implementaion.
959 GOTO(out_parent, result = -ENOENT);
961 if (!(create_flags & MDS_OPEN_CREAT))
962 GOTO(out_parent, result);
963 *child_fid = *info->mti_rr.rr_fid2;
964 LASSERTF(fid_is_sane(child_fid), "fid="DFID"\n",
968 * Check for O_EXCL is moved to the mdt_finish_open(), we need to
969 * return FID back in that case.
971 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
974 child = mdt_object_find(info->mti_env, mdt, child_fid);
976 GOTO(out_parent, result = PTR_ERR(child));
978 mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA);
979 if (result == -ENOENT) {
980 /* Not found and with MDS_OPEN_CREAT: let's create it. */
981 mdt_set_disposition(info, ldlm_rep, DISP_OPEN_CREATE);
983 /* Let lower layers know what is lock mode on directory. */
984 info->mti_spec.sp_cr_mode =
985 mdt_dlm_mode2mdl_mode(lh->mlh_pdo_mode);
988 * Do not perform lookup sanity check. We know that name does
991 info->mti_spec.sp_cr_lookup = 0;
993 result = mdo_create(info->mti_env,
994 mdt_object_child(parent),
996 mdt_object_child(child),
999 if (result == -ERESTART) {
1000 mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_CREATE);
1001 GOTO(out_child, result);
1004 GOTO(out_child, result);
1008 /* We have to get attr & lov ea for this object */
1009 result = mo_attr_get(info->mti_env, mdt_object_child(child),
1012 * The object is on remote node, return its FID for remote open.
1014 if (result == -EREMOTE) {
1016 * Check if this lock already was sent to client and
1017 * this is resent case. For resent case do not take lock
1018 * again, use what is already granted.
1020 LASSERT(lhc != NULL);
1022 if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
1023 struct ldlm_lock *lock;
1025 LASSERT(msg_flags & MSG_RESENT);
1027 lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
1029 CERROR("Invalid lock handle "LPX64"\n",
1030 lhc->mlh_reg_lh.cookie);
1033 LASSERT(fid_res_name_eq(mdt_object_fid(child),
1034 &lock->l_resource->lr_name));
1035 LDLM_LOCK_PUT(lock);
1038 mdt_lock_handle_init(lhc);
1039 mdt_lock_reg_init(lhc, LCK_PR);
1041 rc = mdt_object_lock(info, child, lhc,
1042 MDS_INODELOCK_LOOKUP,
1045 repbody->fid1 = *mdt_object_fid(child);
1046 repbody->valid |= (OBD_MD_FLID | OBD_MD_MDS);
1049 GOTO(out_child, result);
1053 LASSERT(!lustre_handle_is_used(&lhc->mlh_reg_lh));
1055 /* get openlock if this is not replay and if a client requested it */
1056 if (!(msg_flags & MSG_REPLAY) && create_flags & MDS_OPEN_LOCK) {
1060 if (create_flags & FMODE_WRITE)
1062 else if (create_flags & MDS_FMODE_EXEC)
1066 mdt_lock_handle_init(lhc);
1067 mdt_lock_reg_init(lhc, lm);
1068 rc = mdt_object_lock(info, child, lhc,
1069 MDS_INODELOCK_LOOKUP | MDS_INODELOCK_OPEN,
1073 GOTO(out_child, result);
1076 mdt_set_disposition(info, ldlm_rep, DISP_OPEN_LOCK);
1080 /* Try to open it now. */
1081 rc = mdt_finish_open(info, parent, child, create_flags,
1085 if (lustre_handle_is_used(&lhc->mlh_reg_lh))
1086 /* openlock was acquired and mdt_finish_open failed -
1087 drop the openlock */
1088 mdt_object_unlock(info, child, lhc, 1);
1092 ma->ma_cookie_size = 0;
1093 info->mti_no_need_trans = 1;
1094 rc = mdo_unlink(info->mti_env,
1095 mdt_object_child(parent),
1096 mdt_object_child(child),
1100 CERROR("Error in cleanup of open\n");
1105 mdt_object_put(info->mti_env, child);
1107 mdt_object_unlock_put(info, parent, lh, result);
1109 if (result && result != -EREMOTE)
1110 lustre_msg_set_transno(req->rq_repmsg, 0);
1114 #define MFD_CLOSED(mode) (((mode) & ~(FMODE_EPOCH | FMODE_SOM | \
1115 FMODE_EPOCHLCK)) == FMODE_CLOSED)
1117 static int mdt_mfd_closed(struct mdt_file_data *mfd)
1119 return ((mfd == NULL) || MFD_CLOSED(mfd->mfd_mode));
1122 int mdt_mfd_close(struct mdt_thread_info *info, struct mdt_file_data *mfd)
1124 struct mdt_object *o = mfd->mfd_object;
1125 struct md_object *next = mdt_object_child(o);
1126 struct md_attr *ma = &info->mti_attr;
1127 int rc = 0, ret = 0;
1131 mode = mfd->mfd_mode;
1133 if ((mode & FMODE_WRITE) || (mode & FMODE_EPOCHLCK)) {
1134 mdt_write_put(info->mti_mdt, o);
1135 ret = mdt_epoch_close(info, o);
1136 } else if (mode & MDS_FMODE_EXEC) {
1137 mdt_write_allow(info->mti_mdt, o);
1138 } else if (mode & FMODE_EPOCH) {
1139 ret = mdt_epoch_close(info, o);
1142 /* Update atime on close only. */
1143 if ((mode & MDS_FMODE_EXEC || mode & FMODE_READ || mode & FMODE_WRITE)
1144 && (ma->ma_valid & MA_INODE) && (ma->ma_attr.la_valid & LA_ATIME)) {
1145 /* Set the atime only. */
1146 ma->ma_attr.la_valid = LA_ATIME;
1147 rc = mo_attr_set(info->mti_env, next, ma);
1150 ma->ma_need |= MA_INODE;
1153 if (!MFD_CLOSED(mode))
1154 rc = mo_close(info->mti_env, next, ma);
1155 else if (ret == -EAGAIN)
1156 rc = mo_attr_get(info->mti_env, next, ma);
1158 /* If the object is unlinked, do not try to re-enable SIZEONMDS */
1159 if ((ret == -EAGAIN) && (ma->ma_valid & MA_INODE) &&
1160 (ma->ma_attr.la_nlink == 0)) {
1164 if ((ret == -EAGAIN) || (ret == 1)) {
1165 struct mdt_export_data *med;
1167 /* The epoch has not closed or Size-on-MDS update is needed.
1168 * Put mfd back into the list. */
1169 LASSERT(mdt_conn_flags(info) & OBD_CONNECT_SOM);
1170 mdt_mfd_set_mode(mfd, (ret == 1 ? FMODE_EPOCH : FMODE_SOM));
1172 LASSERT(mdt_info_req(info));
1173 med = &mdt_info_req(info)->rq_export->exp_mdt_data;
1174 spin_lock(&med->med_open_lock);
1175 list_add(&mfd->mfd_list, &med->med_open_head);
1176 class_handle_hash_back(&mfd->mfd_handle);
1177 spin_unlock(&med->med_open_lock);
1182 CDEBUG(D_INODE, "Size-on-MDS attribute update is "
1183 "needed on "DFID"\n", PFID(mdt_object_fid(o)));
1187 mdt_object_put(info->mti_env, o);
1190 RETURN(rc ? rc : ret);
1193 int mdt_close(struct mdt_thread_info *info)
1195 struct mdt_export_data *med;
1196 struct mdt_file_data *mfd;
1197 struct mdt_object *o;
1198 struct md_attr *ma = &info->mti_attr;
1199 struct mdt_body *repbody = NULL;
1200 struct ptlrpc_request *req = mdt_info_req(info);
1204 /* Close may come with the Size-on-MDS update. Unpack it. */
1205 rc = mdt_close_unpack(info);
1207 RETURN(err_serious(rc));
1209 LASSERT(info->mti_epoch);
1211 req_capsule_set_size(info->mti_pill, &RMF_MDT_MD, RCL_SERVER,
1212 info->mti_mdt->mdt_max_mdsize);
1213 req_capsule_set_size(info->mti_pill, &RMF_LOGCOOKIES, RCL_SERVER,
1214 info->mti_mdt->mdt_max_cookiesize);
1215 rc = req_capsule_server_pack(info->mti_pill);
1216 if (mdt_check_resent(info, mdt_reconstruct_generic, NULL))
1217 RETURN(lustre_msg_get_status(req->rq_repmsg));
1219 /* Continue to close handle even if we can not pack reply */
1221 repbody = req_capsule_server_get(info->mti_pill,
1223 ma->ma_lmm = req_capsule_server_get(info->mti_pill,
1225 ma->ma_lmm_size = req_capsule_get_size(info->mti_pill,
1228 ma->ma_cookie = req_capsule_server_get(info->mti_pill,
1230 ma->ma_cookie_size = req_capsule_get_size(info->mti_pill,
1233 ma->ma_need = MA_INODE | MA_LOV | MA_COOKIE;
1234 repbody->eadatasize = 0;
1235 repbody->aclsize = 0;
1237 rc = err_serious(rc);
1239 med = &req->rq_export->exp_mdt_data;
1240 spin_lock(&med->med_open_lock);
1241 mfd = mdt_handle2mfd(info, &info->mti_epoch->handle);
1242 if (mdt_mfd_closed(mfd)) {
1243 spin_unlock(&med->med_open_lock);
1244 CDEBUG(D_INODE, "no handle for file close: fid = "DFID
1245 ": cookie = "LPX64"\n", PFID(info->mti_rr.rr_fid1),
1246 info->mti_epoch->handle.cookie);
1247 rc = err_serious(-ESTALE);
1249 class_handle_unhash(&mfd->mfd_handle);
1250 list_del_init(&mfd->mfd_list);
1251 spin_unlock(&med->med_open_lock);
1253 /* Do not lose object before last unlink. */
1254 o = mfd->mfd_object;
1255 mdt_object_get(info->mti_env, o);
1256 ret = mdt_mfd_close(info, mfd);
1257 if (repbody != NULL)
1258 rc = mdt_handle_last_unlink(info, o, ma);
1259 mdt_empty_transno(info);
1260 mdt_object_put(info->mti_env, o);
1262 if (repbody != NULL)
1263 mdt_shrink_reply(info);
1265 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK))
1266 RETURN(err_serious(-ENOMEM));
1268 if (OBD_FAIL_CHECK_RESET(OBD_FAIL_MDS_CLOSE_NET_REP,
1269 OBD_FAIL_MDS_CLOSE_NET_REP))
1270 info->mti_fail_id = OBD_FAIL_MDS_CLOSE_NET_REP;
1271 RETURN(rc ? rc : ret);
1274 int mdt_done_writing(struct mdt_thread_info *info)
1276 struct mdt_body *repbody = NULL;
1277 struct mdt_export_data *med;
1278 struct mdt_file_data *mfd;
1282 rc = req_capsule_server_pack(info->mti_pill);
1284 RETURN(err_serious(rc));
1286 repbody = req_capsule_server_get(info->mti_pill,
1288 repbody->eadatasize = 0;
1289 repbody->aclsize = 0;
1291 /* Done Writing may come with the Size-on-MDS update. Unpack it. */
1292 rc = mdt_close_unpack(info);
1294 RETURN(err_serious(rc));
1296 if (mdt_check_resent(info, mdt_reconstruct_generic, NULL))
1297 RETURN(lustre_msg_get_status(mdt_info_req(info)->rq_repmsg));
1299 med = &info->mti_exp->exp_mdt_data;
1300 spin_lock(&med->med_open_lock);
1301 mfd = mdt_handle2mfd(info, &info->mti_epoch->handle);
1303 spin_unlock(&med->med_open_lock);
1304 CDEBUG(D_INODE, "no handle for done write: fid = "DFID
1305 ": cookie = "LPX64"\n", PFID(info->mti_rr.rr_fid1),
1306 info->mti_epoch->handle.cookie);
1310 LASSERT(mfd->mfd_mode == FMODE_EPOCH ||
1311 mfd->mfd_mode == FMODE_EPOCHLCK);
1312 class_handle_unhash(&mfd->mfd_handle);
1313 list_del_init(&mfd->mfd_list);
1314 spin_unlock(&med->med_open_lock);
1316 /* Set EPOCH CLOSE flag if not set by client. */
1317 info->mti_epoch->flags |= MF_EPOCH_CLOSE;
1318 info->mti_attr.ma_valid = 0;
1319 rc = mdt_mfd_close(info, mfd);
1320 mdt_empty_transno(info);