1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
5 * Lustre Metadata Target (mdt) open/close file handling
7 * Copyright (C) 2002-2006 Cluster File Systems, Inc.
8 * Author: Huang Hua <huanghua@clusterfs.com>
10 * This file is part of the Lustre file system, http://www.lustre.org
11 * Lustre is a trademark of Cluster File Systems, Inc.
13 * You may have signed or agreed to another license before downloading
14 * this software. If so, you are bound by the terms and conditions
15 * of that agreement, and the following does not apply to you. See the
16 * LICENSE file included with this distribution for more information.
18 * If you did not agree to a different license, then this copy of Lustre
19 * is open source software; you can redistribute it and/or modify it
20 * under the terms of version 2 of the GNU General Public License as
21 * published by the Free Software Foundation.
23 * In either case, Lustre is distributed in the hope that it will be
24 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * license text for more details.
30 # define EXPORT_SYMTAB
32 #define DEBUG_SUBSYSTEM S_MDS
34 #include "mdt_internal.h"
37 * MDS file data handling: file data holds a handle for a file opened
41 static void mdt_mfd_get(void *mfdp)
43 struct mdt_file_data *mfd = mfdp;
45 atomic_inc(&mfd->mfd_refcount);
46 CDEBUG(D_INFO, "GETting mfd %p : new refcount %d\n", mfd,
47 atomic_read(&mfd->mfd_refcount));
50 /* Create a new mdt_file_data struct.
51 * reference is set to 1 */
52 static struct mdt_file_data *mdt_mfd_new(void)
54 struct mdt_file_data *mfd;
58 CERROR("mds: out of memory\n");
62 atomic_set(&mfd->mfd_refcount, 1);
64 INIT_LIST_HEAD(&mfd->mfd_handle.h_link);
65 INIT_LIST_HEAD(&mfd->mfd_list);
66 class_handle_hash(&mfd->mfd_handle, mdt_mfd_get);
71 /* Get a new reference on the mfd pointed to by handle, if handle is still
72 * valid. Caller must drop reference with mdt_mfd_put(). */
73 static struct mdt_file_data *mdt_handle2mfd(const struct lustre_handle *handle)
76 LASSERT(handle != NULL);
77 RETURN(class_handle2object(handle->cookie));
80 /* Drop mfd reference, freeing struct if this is the last one. */
81 static void mdt_mfd_put(struct mdt_file_data *mfd)
83 CDEBUG(D_INFO, "PUTting mfd %p : new refcount %d\n", mfd,
84 atomic_read(&mfd->mfd_refcount) - 1);
85 LASSERT(atomic_read(&mfd->mfd_refcount) > 0 &&
86 atomic_read(&mfd->mfd_refcount) < 0x5a5a);
87 if (atomic_dec_and_test(&mfd->mfd_refcount)) {
88 LASSERT(list_empty(&mfd->mfd_handle.h_link));
93 static int mdt_object_open(struct mdt_thread_info *info,
97 struct mdt_export_data *med;
98 struct mdt_file_data *mfd;
99 struct mdt_body *repbody;
100 struct lov_mds_md *lmm;
104 med = &mdt_info_req(info)->rq_export->exp_mdt_data;
105 repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
106 lmm = req_capsule_server_get(&info->mti_pill, &RMF_MDT_MD);
108 rc = mo_attr_get(info->mti_ctxt, mdt_object_child(o),
113 mdt_pack_attr2body(repbody, &info->mti_attr);
114 repbody->fid1 = *mdt_object_fid(o);
115 repbody->valid |= OBD_MD_FLID;
118 rc = mo_xattr_get(info->mti_ctxt, mdt_object_child(o),
119 lmm, info->mti_mdt->mdt_max_mdsize, "lov");
121 GOTO(out, rc = -EINVAL);
123 if (S_ISDIR(info->mti_attr.la_mode))
124 repbody->valid |= OBD_MD_FLDIREA;
126 repbody->valid |= OBD_MD_FLEASIZE;
127 repbody->eadatasize = rc;
132 CERROR("mds: out of memory\n");
133 GOTO(out, rc = -ENOMEM);
136 if (flags & FMODE_WRITE) {
137 /*mds_get_write_access*/
138 } else if (flags & MDS_FMODE_EXEC) {
139 /*mds_deny_write_access*/
142 /* keep a reference on this object for this open,
143 * and is released by mdt_mfd_close() */
144 mdt_object_get(info->mti_ctxt, o);
146 mfd->mfd_mode = flags;
148 mfd->mfd_xid = mdt_info_req(info)->rq_xid;
150 spin_lock(&med->med_open_lock);
151 list_add(&mfd->mfd_list, &med->med_open_head);
152 spin_unlock(&med->med_open_lock);
154 repbody->handle.cookie = mfd->mfd_handle.h_cookie;
161 int mdt_pin(struct mdt_thread_info* info)
163 struct mdt_object *o;
167 o = mdt_object_find(info->mti_ctxt, info->mti_mdt, &info->mti_body->fid1);
169 if (mdt_object_exists(info->mti_ctxt, &o->mot_obj.mo_lu)) {
170 rc = mdt_object_open(info, o, info->mti_body->flags);
171 mdt_object_put(info->mti_ctxt, o);
180 /* Get an internal lock on the inode number (but not generation) to sync
181 * new inode creation with inode unlink (bug 2029). If child_lockh is NULL
182 * we just get the lock as a barrier to wait for other holders of this lock,
183 * and drop it right away again. */
184 int mdt_lock_new_child(struct mdt_thread_info *info,
185 struct mdt_object *o,
186 struct mdt_lock_handle *child_lockh)
188 struct mdt_lock_handle lockh;
191 if (child_lockh == NULL)
192 child_lockh = &lockh;
194 mdt_lock_handle_init(&lockh);
195 lockh.mlh_mode = LCK_EX;
196 rc = mdt_object_lock(info->mti_mdt->mdt_namespace,
197 o, &lockh, MDS_INODELOCK_UPDATE);
200 CERROR("can not mdt_object_lock: %d\n", rc);
201 else if (child_lockh == &lockh)
202 mdt_object_unlock(info->mti_mdt->mdt_namespace,
208 int mdt_reint_open(struct mdt_thread_info *info)
210 struct mdt_device *mdt = info->mti_mdt;
211 struct mdt_object *parent;
212 struct mdt_object *child;
213 struct mdt_lock_handle *lh;
214 struct ldlm_reply *ldlm_rep;
215 struct ptlrpc_request *req = mdt_info_req(info);
216 struct mdt_body *body;
217 struct lu_fid child_fid;
220 struct mdt_reint_record *rr = &info->mti_rr;
223 /* we now have no resent message, so it must be an intent */
224 LASSERT(info->mti_pill.rc_fmt == &RQF_LDLM_INTENT_OPEN);
226 /*TODO: MDS_CHECK_RESENT */;
228 ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
229 body = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
231 lh = &info->mti_lh[MDT_LH_PARENT];
232 lh->mlh_mode = LCK_PW;
233 parent = mdt_object_find_lock(info->mti_ctxt, mdt, rr->rr_fid1,
234 lh, MDS_INODELOCK_UPDATE);
236 GOTO(out, result = PTR_ERR(parent));
238 result = mdo_lookup(info->mti_ctxt, mdt_object_child(parent),
239 rr->rr_name, &child_fid);
240 if (result && result != -ENOENT) {
241 GOTO(out_parent, result);
244 intent_set_disposition(ldlm_rep, DISP_LOOKUP_EXECD);
246 if (result == -ENOENT) {
247 intent_set_disposition(ldlm_rep, DISP_LOOKUP_NEG);
248 if (!(info->mti_attr.la_flags & MDS_OPEN_CREAT))
249 GOTO(out_parent, result);
250 if (req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
251 GOTO(out_parent, result = -EROFS);
252 child_fid = *info->mti_rr.rr_fid2;
254 intent_set_disposition(ldlm_rep, DISP_LOOKUP_POS);
255 if (info->mti_attr.la_flags & MDS_OPEN_EXCL &&
256 info->mti_attr.la_flags & MDS_OPEN_CREAT)
257 GOTO(out_parent, result = -EEXIST);
261 child = mdt_object_find(info->mti_ctxt, mdt, &child_fid);
263 GOTO(out_parent, result = PTR_ERR(child));
265 if (result == -ENOENT) {
266 /* not found and with MDS_OPEN_CREAT: let's create something */
267 result = mdo_create(info->mti_ctxt,
268 mdt_object_child(parent),
270 mdt_object_child(child),
272 intent_set_disposition(ldlm_rep, DISP_OPEN_CREATE);
274 GOTO(out_child, result);
277 intent_set_disposition(ldlm_rep, DISP_OPEN_OPEN);
280 result = mdt_object_open(info, child, info->mti_attr.la_flags);
281 GOTO(destroy_child, result);
284 if (result != 0 && created) {
285 mdo_unlink(info->mti_ctxt, mdt_object_child(parent),
286 mdt_object_child(child), rr->rr_name);
287 } else if (created) {
288 mdt_lock_new_child(info, child, NULL);
291 mdt_object_put(info->mti_ctxt, child);
293 mdt_object_unlock(mdt->mdt_namespace, parent, lh);
294 mdt_object_put(info->mti_ctxt, parent);
299 int mdt_mfd_close(const struct lu_context *ctxt,
300 struct mdt_file_data *mfd,
305 if (mfd->mfd_mode & FMODE_WRITE) {
306 /*mdt_put_write_access*/
307 } else if (mfd->mfd_mode & MDS_FMODE_EXEC) {
308 /*mdt_allow_write_access*/
311 /* release reference on this object.
312 * it will be destroyed by lower layer if necessary.
314 mdt_object_put(ctxt, mfd->mfd_object);
320 int mdt_close(struct mdt_thread_info *info)
322 struct mdt_export_data *med;
323 struct mdt_body *repbody;
324 struct mdt_file_data *mfd;
325 struct mdt_object *o;
326 struct lov_mds_md *lmm;
330 med = &mdt_info_req(info)->rq_export->exp_mdt_data;
333 spin_lock(&med->med_open_lock);
334 mfd = mdt_handle2mfd(&(info->mti_body->handle));
336 spin_unlock(&med->med_open_lock);
337 CDEBUG(D_INODE, "no handle for file close ino "DFID3
338 ": cookie "LPX64, PFID3(&info->mti_body->fid1),
339 info->mti_body->handle.cookie);
342 class_handle_unhash(&mfd->mfd_handle);
343 list_del_init(&mfd->mfd_list);
344 spin_unlock(&med->med_open_lock);
347 repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
348 lmm = req_capsule_server_get(&info->mti_pill, &RMF_MDT_MD);
350 rc = mo_attr_get(info->mti_ctxt, mdt_object_child(o),
353 mdt_pack_attr2body(repbody, &info->mti_attr);
354 repbody->fid1 = *mdt_object_fid(o);
355 repbody->valid |= OBD_MD_FLID;
357 rc = mo_xattr_get(info->mti_ctxt, mdt_object_child(o),
358 lmm, info->mti_mdt->mdt_max_mdsize, "lov");
360 if (S_ISDIR(info->mti_attr.la_mode))
361 repbody->valid |= OBD_MD_FLDIREA;
363 repbody->valid |= OBD_MD_FLEASIZE;
364 repbody->eadatasize = rc;
370 rc = mdt_mfd_close(info->mti_ctxt, mfd, 1);
375 int mdt_done_writing(struct mdt_thread_info *info)