1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
5 * Lustre Metadata Target (mdt) open/close file handling
7 * Copyright (C) 2002-2006 Cluster File Systems, Inc.
8 * Author: Huang Hua <huanghua@clusterfs.com>
10 * This file is part of the Lustre file system, http://www.lustre.org
11 * Lustre is a trademark of Cluster File Systems, Inc.
13 * You may have signed or agreed to another license before downloading
14 * this software. If so, you are bound by the terms and conditions
15 * of that agreement, and the following does not apply to you. See the
16 * LICENSE file included with this distribution for more information.
18 * If you did not agree to a different license, then this copy of Lustre
19 * is open source software; you can redistribute it and/or modify it
20 * under the terms of version 2 of the GNU General Public License as
21 * published by the Free Software Foundation.
23 * In either case, Lustre is distributed in the hope that it will be
24 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * license text for more details.
30 # define EXPORT_SYMTAB
32 #define DEBUG_SUBSYSTEM S_MDS
34 #include <linux/lustre_acl.h>
35 #include <lustre_mds.h>
36 #include "mdt_internal.h"
38 /* we do nothing because we do not have refcount now */
39 static void mdt_mfd_get(void *mfdp)
43 /* Create a new mdt_file_data struct, initialize it,
44 * and insert it to global hash table */
45 static struct mdt_file_data *mdt_mfd_new(void)
47 struct mdt_file_data *mfd;
52 INIT_LIST_HEAD(&mfd->mfd_handle.h_link);
53 INIT_LIST_HEAD(&mfd->mfd_list);
54 class_handle_hash(&mfd->mfd_handle, mdt_mfd_get);
59 /* Find the mfd pointed to by handle in global hash table. */
60 static struct mdt_file_data *mdt_handle2mfd(const struct lustre_handle *handle)
63 LASSERT(handle != NULL);
64 RETURN(class_handle2object(handle->cookie));
68 static void mdt_mfd_free(struct mdt_file_data *mfd)
70 LASSERT(list_empty(&mfd->mfd_handle.h_link));
71 LASSERT(list_empty(&mfd->mfd_list));
75 static int mdt_create_data_obj(struct mdt_thread_info *info,
76 struct mdt_object *p, struct mdt_object *o)
78 struct md_attr *ma = &info->mti_attr;
79 struct mdt_reint_record *mrr = &info->mti_rr;
81 return mdo_create_data(info->mti_ctxt, mdt_object_child(p),
82 mdt_object_child(o), mrr->rr_eadata,
83 mrr->rr_eadatalen, ma);
87 /*The following four functions are copied from MDS */
89 /* Write access to a file: executors cause a negative count,
90 * writers a positive count. The semaphore is needed to perform
91 * a check for the sign and then increment or decrement atomically.
93 * This code is closely tied to the allocation of the d_fsdata and the
94 * MDS epoch, so we use the same semaphore for the whole lot.
96 * FIXME and TODO : handle the epoch!
97 * epoch argument is nonzero during recovery */
98 static int mdt_get_write_access(struct mdt_device *mdt, struct mdt_object *o,
104 spin_lock(&mdt->mdt_epoch_lock);
106 if (atomic_read(&o->mot_writecount) < 0) {
109 if (o->mot_io_epoch != 0) {
110 CDEBUG(D_INODE, "continue epoch "LPU64" for "DFID3"\n",
111 o->mot_io_epoch, PFID3(mdt_object_fid(o)));
113 if (epoch > mdt->mdt_io_epoch)
114 mdt->mdt_io_epoch = epoch;
117 o->mot_io_epoch = mdt->mdt_io_epoch;
118 CDEBUG(D_INODE, "starting epoch "LPU64" for "DFID3"\n",
119 mdt->mdt_io_epoch, PFID3(mdt_object_fid(o)));
121 atomic_inc(&o->mot_writecount);
123 spin_unlock(&mdt->mdt_epoch_lock);
127 static int mdt_put_write_access(struct mdt_device *mdt, struct mdt_object *o)
132 spin_lock(&mdt->mdt_epoch_lock);
133 atomic_dec(&o->mot_writecount);
134 rc = atomic_read(&o->mot_writecount);
137 spin_unlock(&mdt->mdt_epoch_lock);
141 static int mdt_deny_write_access(struct mdt_device *mdt, struct mdt_object *o)
145 spin_lock(&mdt->mdt_epoch_lock);
146 if (atomic_read(&o->mot_writecount) > 0) {
149 atomic_dec(&o->mot_writecount);
150 spin_unlock(&mdt->mdt_epoch_lock);
154 static void mdt_allow_write_access(struct mdt_object *o)
157 atomic_inc(&o->mot_writecount);
161 int mdt_query_write_access(struct mdt_object *o)
164 RETURN(atomic_read(&o->mot_writecount));
167 static int mdt_mfd_open(struct mdt_thread_info *info,
168 struct mdt_object *p,
169 struct mdt_object *o,
170 int flags, int created)
172 struct mdt_export_data *med;
173 struct mdt_file_data *mfd;
174 struct mdt_device *mdt = info->mti_mdt;
175 struct mdt_body *repbody;
176 struct md_attr *ma = &info->mti_attr;
177 struct lu_attr *la = &ma->ma_attr;
178 struct ptlrpc_request *req = mdt_info_req(info);
179 struct ldlm_reply *ldlm_rep;
181 int isreg, isdir, islnk;
184 repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
187 /* we have to get attr & lov ea for this object*/
188 rc = mo_attr_get(info->mti_ctxt, mdt_object_child(o), ma);
192 isreg = S_ISREG(la->la_mode);
193 isdir = S_ISDIR(la->la_mode);
194 islnk = S_ISLNK(la->la_mode);
195 if (ma->ma_valid & MA_INODE)
196 mdt_pack_attr2body(repbody, la, mdt_object_fid(o));
198 /* if we are following a symlink, don't open
199 * do not return open handle for special nodes as client required
201 if (islnk || (!isreg && !isdir &&
202 (req->rq_export->exp_connect_flags & OBD_CONNECT_NODEVOH))) {
203 info->mti_trans_flags |= MDT_NONEED_TANSNO;
206 /* FIXME:maybe this can be done earlier? */
208 if (flags & (MDS_OPEN_CREAT | FMODE_WRITE)) {
209 /* we are trying to create or
210 * write an existing dir. */
213 } else if (flags & MDS_OPEN_DIRECTORY)
216 if ((isreg) && !(ma->ma_valid & MA_LOV)) {
217 /*No EA, check whether it is will set regEA and dirEA
218 *since in above attr get, these size might be zero,
219 *so reset it, to retrieve the MD after create obj*/
220 ma->ma_lmm_size = req_capsule_get_size(&info->mti_pill,
224 /*XXX: Tom, do we need this?
225 rc = mdt_create_data_obj(info, p, o);
231 CDEBUG(D_INODE, "after open, ma_valid bit = "LPX64" lmm_size = %d\n",
232 ma->ma_valid, ma->ma_lmm_size);
233 repbody->eadatasize = 0;
234 repbody->aclsize = 0;
236 if (ma->ma_lmm_size && ma->ma_valid & MA_LOV) {
237 repbody->eadatasize = ma->ma_lmm_size;
239 repbody->valid |= OBD_MD_FLDIREA;
241 repbody->valid |= OBD_MD_FLEASIZE;
243 /*FIXME: should determine the offset dynamicly,
244 *did not get ACL before shrink*/
245 lustre_shrink_reply(req, 2, repbody->eadatasize, 1);
246 lustre_shrink_reply(req, repbody->eadatasize ? 3 : 2, repbody->aclsize,
249 ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
250 intent_set_disposition(ldlm_rep, DISP_OPEN_OPEN);
252 if (flags & FMODE_WRITE) {
253 /* FIXME: in recovery, need to pass old epoch here */
254 rc = mdt_get_write_access(mdt, o, 0);
256 repbody->io_epoch = o->mot_io_epoch;
257 } else if (flags & MDS_FMODE_EXEC) {
258 rc = mdt_deny_write_access(mdt, o);
263 /* (1) client wants transno when open to keep a ref count for replay;
264 * see after_reply() and mdc_close_commit();
265 * (2) we need to record the transaction related stuff onto disk;
266 * But, question is: when do a rean only open, do we still need transno?
269 struct txn_param txn;
271 struct dt_device *dt = info->mti_mdt->mdt_bottom;
275 th = dt->dd_ops->dt_trans_start(info->mti_ctxt, dt, &txn);
277 dt->dd_ops->dt_trans_stop(info->mti_ctxt, th);
284 /* keep a reference on this object for this open,
285 * and is released by mdt_mfd_close() */
286 mdt_object_get(info->mti_ctxt, o);
288 mfd->mfd_mode = flags;
290 mfd->mfd_xid = mdt_info_req(info)->rq_xid;
292 med = &req->rq_export->exp_mdt_data;
293 spin_lock(&med->med_open_lock);
294 list_add(&mfd->mfd_list, &med->med_open_head);
295 spin_unlock(&med->med_open_lock);
297 repbody->handle.cookie = mfd->mfd_handle.h_cookie;
304 int mdt_open_by_fid(struct mdt_thread_info* info, const struct lu_fid *fid,
307 struct mdt_object *o;
308 struct lu_attr *la = &info->mti_attr.ma_attr;
312 o = mdt_object_find(info->mti_ctxt, info->mti_mdt, fid);
314 if (mdt_object_exists(info->mti_ctxt, &o->mot_obj.mo_lu) > 0) {
315 if (la->la_flags & MDS_OPEN_EXCL &&
316 la->la_flags & MDS_OPEN_CREAT)
319 rc = mdt_mfd_open(info, NULL, o, flags, 0);
322 if (la->la_flags & MDS_OPEN_CREAT) {
323 rc = mo_object_create(info->mti_ctxt,
328 rc = mdt_mfd_open(info, NULL, o, flags, 1);
331 mdt_object_put(info->mti_ctxt, o);
338 int mdt_pin(struct mdt_thread_info* info)
340 struct mdt_body *body;
344 rc = req_capsule_pack(&info->mti_pill);
346 body = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
347 rc = mdt_open_by_fid(info, &body->fid1, body->flags);
352 int mdt_reint_open(struct mdt_thread_info *info)
354 struct mdt_device *mdt = info->mti_mdt;
355 struct mdt_object *parent;
356 struct mdt_object *child;
357 struct mdt_lock_handle *lh;
358 struct ldlm_reply *ldlm_rep;
359 struct lu_fid *child_fid = &info->mti_tmp_fid1;
360 struct md_attr *ma = &info->mti_attr;
361 struct lu_attr *la = &ma->ma_attr;
364 struct mdt_reint_record *rr = &info->mti_rr;
367 req_capsule_set_size(&info->mti_pill, &RMF_MDT_MD, RCL_SERVER,
368 mdt->mdt_max_mdsize);
370 result = req_capsule_pack(&info->mti_pill);
374 ma->ma_lmm = req_capsule_server_get(&info->mti_pill, &RMF_MDT_MD);
375 ma->ma_lmm_size = mdt->mdt_max_mdsize;
377 if (rr->rr_name[0] == 0) {
378 /* reint partial remote open */
379 RETURN(mdt_open_by_fid(info, rr->rr_fid1, la->la_flags));
382 /* we now have no resent message, so it must be an intent */
383 /*TODO: remove this and add MDS_CHECK_RESENT if resent enabled*/
384 LASSERT(info->mti_pill.rc_fmt == &RQF_LDLM_INTENT_OPEN);
386 CDEBUG(D_INODE, "I am going to create "DFID3"/("DFID3":%s) flag=%x\n",
387 PFID3(rr->rr_fid1), PFID3(rr->rr_fid2),
388 rr->rr_name, la->la_flags);
390 ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
391 intent_set_disposition(ldlm_rep, DISP_LOOKUP_EXECD);
393 lh = &info->mti_lh[MDT_LH_PARENT];
394 if (!(la->la_flags & MDS_OPEN_CREAT))
395 lh->mlh_mode = LCK_CR;
397 lh->mlh_mode = LCK_EX;
398 parent = mdt_object_find_lock(info, rr->rr_fid1, lh,
399 MDS_INODELOCK_UPDATE);
401 GOTO(out, result = PTR_ERR(parent));
403 result = mdo_lookup(info->mti_ctxt, mdt_object_child(parent),
404 rr->rr_name, child_fid);
405 if (result != 0 && result != -ENOENT)
406 GOTO(out_parent, result);
408 if (result == -ENOENT) {
409 intent_set_disposition(ldlm_rep, DISP_LOOKUP_NEG);
410 if (!(la->la_flags & MDS_OPEN_CREAT))
411 GOTO(out_parent, result);
412 *child_fid = *info->mti_rr.rr_fid2;
413 /* new object will be created. see the following */
415 intent_set_disposition(ldlm_rep, DISP_LOOKUP_POS);
416 if ((la->la_flags & MDS_OPEN_EXCL &&
417 la->la_flags & MDS_OPEN_CREAT))
418 GOTO(out_parent, result = -EEXIST);
421 child = mdt_object_find(info->mti_ctxt, mdt, child_fid);
423 GOTO(out_parent, result = PTR_ERR(child));
425 if (result == -ENOENT) {
426 /* not found and with MDS_OPEN_CREAT: let's create it */
427 result = mdo_create(info->mti_ctxt,
428 mdt_object_child(parent),
430 mdt_object_child(child),
432 /* rr->rr_tgt, rr->rr_eadata, rr->rr_eadatalen,*/
434 intent_set_disposition(ldlm_rep, DISP_OPEN_CREATE);
436 GOTO(out_child, result);
441 result = mdt_mfd_open(info, parent, child, la->la_flags, created);
442 GOTO(finish_open, result);
445 if (result != 0 && created) {
446 int rc2 = mdo_unlink(info->mti_ctxt, mdt_object_child(parent),
447 mdt_object_child(child), rr->rr_name,
450 CERROR("error in cleanup of open");
453 mdt_object_put(info->mti_ctxt, child);
455 mdt_object_unlock_put(info, parent, lh, result);
460 void mdt_mfd_close(const struct lu_context *ctxt, struct mdt_device *mdt,
461 struct mdt_file_data *mfd)
463 struct mdt_object *o = mfd->mfd_object;
466 if (mfd->mfd_mode & FMODE_WRITE) {
467 mdt_put_write_access(mdt, o);
468 } else if (mfd->mfd_mode & MDS_FMODE_EXEC) {
469 mdt_allow_write_access(o);
472 /* release reference on this object.
473 * it will be destroyed by lower layer if necessary.
475 mdt_object_put(ctxt, mfd->mfd_object);
481 int mdt_close(struct mdt_thread_info *info)
483 struct md_attr *ma = &info->mti_attr;
484 struct mdt_export_data *med;
485 struct mdt_file_data *mfd;
486 struct mdt_object *o;
490 med = &mdt_info_req(info)->rq_export->exp_mdt_data;
492 spin_lock(&med->med_open_lock);
493 mfd = mdt_handle2mfd(&(info->mti_body->handle));
495 spin_unlock(&med->med_open_lock);
496 CDEBUG(D_INODE, "no handle for file close: fid = "DFID3
497 ": cookie = "LPX64"\n", PFID3(&info->mti_body->fid1),
498 info->mti_body->handle.cookie);
501 class_handle_unhash(&mfd->mfd_handle);
502 list_del_init(&mfd->mfd_list);
503 spin_unlock(&med->med_open_lock);
506 ma->ma_lmm = req_capsule_server_get(&info->mti_pill,
508 ma->ma_lmm_size = req_capsule_get_size(&info->mti_pill,
509 &RMF_MDT_MD, RCL_SERVER);
510 rc = mo_attr_get(info->mti_ctxt, mdt_object_child(o), ma);
512 rc = mdt_handle_last_unlink(info, o, ma);
514 mdt_mfd_close(info->mti_ctxt, info->mti_mdt, mfd);
516 mdt_shrink_reply(info);
520 int mdt_done_writing(struct mdt_thread_info *info)
525 req_capsule_set(&info->mti_pill, &RQF_MDS_DONE_WRITING);
526 rc = req_capsule_pack(&info->mti_pill);