Whamcloud - gitweb
a few fixes after testing & inspection:
[fs/lustre-release.git] / lustre / mdt / mdt_open.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mdt/mdt_open.c
5  *  Lustre Metadata Target (mdt) open/close file handling
6  *
7  *  Copyright (C) 2002-2006 Cluster File Systems, Inc.
8  *   Author: Huang Hua <huanghua@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_MDS
33
34 #include "mdt_internal.h"
35
36 /*
37  * MDS file data handling: file data holds a handle for a file opened
38  * by a client.
39  */
40
41 static void mdt_mfd_get(void *mfdp)
42 {
43         struct mdt_file_data *mfd = mfdp;
44
45         atomic_inc(&mfd->mfd_refcount);
46         CDEBUG(D_INFO, "GETting mfd %p : new refcount %d\n", mfd,
47                atomic_read(&mfd->mfd_refcount));
48 }
49
50 /* Create a new mdt_file_data struct. 
51  * reference is set to 1 */
52 static struct mdt_file_data *mdt_mfd_new(void)
53 {
54         struct mdt_file_data *mfd;
55         ENTRY;
56
57         OBD_ALLOC_PTR(mfd);
58         if (mfd != NULL) {
59                 atomic_set(&mfd->mfd_refcount, 1);
60
61                 INIT_LIST_HEAD(&mfd->mfd_handle.h_link);
62                 INIT_LIST_HEAD(&mfd->mfd_list);
63                 class_handle_hash(&mfd->mfd_handle, mdt_mfd_get);
64         } else
65                 CERROR("mdt: out of memory\n");
66
67         RETURN(mfd);
68 }
69
70 /* Get a new reference on the mfd pointed to by handle, if handle is still
71  * valid.  Caller must drop reference with mdt_mfd_put(). */
72 static struct mdt_file_data *mdt_handle2mfd(const struct lustre_handle *handle)
73 {
74         ENTRY;
75         LASSERT(handle != NULL);
76         RETURN(class_handle2object(handle->cookie));
77 }
78
79 /* Drop mfd reference, freeing struct if this is the last one. */
80 static void mdt_mfd_put(struct mdt_file_data *mfd)
81 {
82         CDEBUG(D_INFO, "PUTting mfd %p : new refcount %d\n", mfd,
83                atomic_read(&mfd->mfd_refcount) - 1);
84         LASSERT(atomic_read(&mfd->mfd_refcount) > 0 &&
85                 atomic_read(&mfd->mfd_refcount) < 0x5a5a);
86         if (atomic_dec_and_test(&mfd->mfd_refcount)) {
87                 LASSERT(list_empty(&mfd->mfd_handle.h_link));
88                 OBD_FREE_PTR(mfd);
89         }
90 }
91
92 static int mdt_object_open(struct mdt_thread_info *info,
93                            struct mdt_object *o, 
94                            int flags)
95 {
96         struct mdt_export_data *med;
97         struct mdt_file_data   *mfd;
98         struct mdt_body        *repbody;
99         struct lov_mds_md      *lmm;
100         int                     rc = 0;
101         ENTRY;
102
103         med = &mdt_info_req(info)->rq_export->exp_mdt_data;
104         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
105         lmm = req_capsule_server_get(&info->mti_pill, &RMF_MDT_MD);
106
107         rc = mo_attr_get(info->mti_ctxt, mdt_object_child(o),
108                          &info->mti_attr);
109         if (rc == -EREMOTE) {
110                 repbody->fid1 = *mdt_object_fid(o);
111                 repbody->valid |= OBD_MD_FLID;
112                 /*FIXME: should be return 0 or -EREMOTE? */
113                 /* also in mdt_reint:mdt_md_create() */
114         }
115         if (rc != 0)
116                 RETURN(rc);
117
118         mdt_pack_attr2body(repbody, &info->mti_attr, mdt_object_fid(o));
119
120 /*
121         rc = mo_xattr_get(info->mti_ctxt, mdt_object_child(o),
122                           lmm, info->mti_mdt->mdt_max_mdsize, "lov");
123         if (rc < 0)
124                 RETURN(-EINVAL);
125         if (S_ISDIR(info->mti_attr.la_mode))
126                 repbody->valid |= OBD_MD_FLDIREA;
127         else
128                 repbody->valid |= OBD_MD_FLEASIZE;
129         repbody->eadatasize = rc;
130         rc = 0;
131 */
132         if (flags & FMODE_WRITE) {
133                 /*mds_get_write_access*/
134         } else if (flags & MDS_FMODE_EXEC) {
135                 /*mds_deny_write_access*/
136         }
137
138         mfd = mdt_mfd_new();
139         if (mfd != NULL) {
140                 /* keep a reference on this object for this open,
141                 * and is released by mdt_mfd_close() */
142                 mdt_object_get(info->mti_ctxt, o);
143
144                 mfd->mfd_mode = flags;
145                 mfd->mfd_object = o;
146                 mfd->mfd_xid = mdt_info_req(info)->rq_xid;
147
148                 spin_lock(&med->med_open_lock);
149                 list_add(&mfd->mfd_list, &med->med_open_head);
150                 spin_unlock(&med->med_open_lock);
151
152                 repbody->handle.cookie = mfd->mfd_handle.h_cookie;
153         } else {
154                 CERROR("mdt: out of memory\n");
155                 rc = -ENOMEM;
156         }
157
158         RETURN(rc);
159 }
160
161 int mdt_open_by_fid(struct mdt_thread_info* info, const struct lu_fid *fid,
162                     __u32 flags, struct ldlm_reply *rep)
163 {
164         struct mdt_object *o;
165         int rc;
166         ENTRY;
167
168         intent_set_disposition(rep, DISP_LOOKUP_EXECD);
169         o = mdt_object_find(info->mti_ctxt, info->mti_mdt, fid);
170         if (!IS_ERR(o)) {
171                 if (mdt_object_exists(info->mti_ctxt, &o->mot_obj.mo_lu)) {
172                         intent_set_disposition(rep, DISP_LOOKUP_POS);
173                         rc = mdt_object_open(info, o, flags);
174                         intent_set_disposition(rep, DISP_OPEN_OPEN);
175                 } else {
176                         intent_set_disposition(rep, DISP_LOOKUP_NEG);
177                         rc = -ENOENT;
178                 }
179                 mdt_object_put(info->mti_ctxt, o);
180         } else
181                 rc = PTR_ERR(o);
182
183         RETURN(rc);
184 }
185
186 int mdt_pin(struct mdt_thread_info* info)
187 {
188         int rc;
189         ENTRY;
190         rc = mdt_open_by_fid(info, &info->mti_body->fid1, 
191                              info->mti_body->flags, NULL);
192         RETURN(rc);
193 }
194
195 /*  Get an internal lock on the inode number (but not generation) to sync
196  *  new inode creation with inode unlink (bug 2029).  If child_lockh is NULL
197  *  we just get the lock as a barrier to wait for other holders of this lock,
198  *  and drop it right away again. */
199 int mdt_lock_new_child(struct mdt_thread_info *info, 
200                        struct mdt_object *o,
201                        struct mdt_lock_handle *child_lockh)
202 {
203         struct mdt_lock_handle lockh;
204         int rc;
205         ENTRY;
206         
207         if (child_lockh == NULL)
208                 child_lockh = &lockh;
209
210         mdt_lock_handle_init(&lockh);
211         lockh.mlh_mode = LCK_EX;
212         rc = mdt_object_lock(info, o, &lockh, MDS_INODELOCK_UPDATE);
213
214         if (rc != ELDLM_OK)
215                 CERROR("can not mdt_object_lock: %d\n", rc);
216         else if (child_lockh == &lockh)
217                 mdt_object_unlock(info, o, &lockh);
218
219         RETURN(rc);
220 }
221
222 int mdt_reint_open(struct mdt_thread_info *info)
223 {
224         struct mdt_device      *mdt = info->mti_mdt;
225         struct mdt_object      *parent;
226         struct mdt_object      *child;
227         struct mdt_lock_handle *lh;
228         struct ldlm_reply      *ldlm_rep;
229         struct ptlrpc_request  *req = mdt_info_req(info);
230         struct mdt_body        *body;
231         struct lu_fid          *child_fid = &info->mti_tmp_fid1;
232         int                     result;
233         int                     created = 0;
234         struct mdt_reint_record *rr = &info->mti_rr;
235         ENTRY;
236
237         /* we now have no resent message, so it must be an intent */
238         LASSERT(info->mti_pill.rc_fmt == &RQF_LDLM_INTENT_OPEN);
239
240         /*TODO: MDS_CHECK_RESENT */;
241         ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
242         body = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
243
244         if (strlen(rr->rr_name) == 0) {
245                 /* partial remote open */
246                 RETURN(mdt_open_by_fid(info, rr->rr_fid1, 
247                                        info->mti_attr.la_flags, ldlm_rep));
248         }
249
250         intent_set_disposition(ldlm_rep, DISP_LOOKUP_EXECD);
251         lh = &info->mti_lh[MDT_LH_PARENT];
252         lh->mlh_mode = LCK_PW;
253         parent = mdt_object_find_lock(info, rr->rr_fid1, lh, 
254                                       MDS_INODELOCK_UPDATE);
255         if (IS_ERR(parent)) {
256                 /* FIXME: just simulate child not exist */
257                 intent_set_disposition(ldlm_rep, DISP_LOOKUP_NEG);
258                 GOTO(out, result = PTR_ERR(parent));
259         }
260
261         result = mdo_lookup(info->mti_ctxt, mdt_object_child(parent),
262                             rr->rr_name, child_fid);
263         if (result != 0 && result != -ENOENT) {
264                 GOTO(out_parent, result);
265         }
266
267         if (result == -ENOENT) {
268                 intent_set_disposition(ldlm_rep, DISP_LOOKUP_NEG);
269                 if (!(info->mti_attr.la_flags & MDS_OPEN_CREAT))
270                         GOTO(out_parent, result);
271                 if (req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
272                         GOTO(out_parent, result = -EROFS);
273                 *child_fid = *info->mti_rr.rr_fid2;
274         } else {
275                 intent_set_disposition(ldlm_rep, DISP_LOOKUP_POS);
276                 if (info->mti_attr.la_flags & MDS_OPEN_EXCL &&
277                     info->mti_attr.la_flags & MDS_OPEN_CREAT)
278                         GOTO(out_parent, result = -EEXIST);
279                 /* child_fid is filled by mdo_lookup(). */
280                 LASSERT(lu_fid_eq(child_fid, info->mti_rr.rr_fid2));
281         }
282
283         child = mdt_object_find(info->mti_ctxt, mdt, child_fid);
284         if (IS_ERR(child))
285                 GOTO(out_parent, result = PTR_ERR(child));
286
287         if (result == -ENOENT) {
288                 /* not found and with MDS_OPEN_CREAT: let's create something */
289                 result = mdo_create(info->mti_ctxt,
290                                     mdt_object_child(parent),
291                                     rr->rr_name,
292                                     mdt_object_child(child),
293                                     rr->rr_tgt,
294                                     &info->mti_attr);
295                 intent_set_disposition(ldlm_rep, DISP_OPEN_CREATE);
296                 if (result != 0)
297                         GOTO(out_child, result);
298                 created = 1;
299         }
300
301         /* Open it now. */
302         result = mdt_object_open(info, child, info->mti_attr.la_flags);
303         intent_set_disposition(ldlm_rep, DISP_OPEN_OPEN);
304         GOTO(destroy_child, result);
305
306 destroy_child:
307         if (created) {
308                 if (result != 0 && result != -EREMOTE) {
309                         mdo_unlink(info->mti_ctxt, mdt_object_child(parent),
310                                    mdt_object_child(child), rr->rr_name);
311                 } else {
312                         /* barrier with other thread */
313                         mdt_lock_new_child(info, child, NULL);
314                 }
315         }
316 out_child:
317         mdt_object_put(info->mti_ctxt, child);
318 out_parent:
319         mdt_object_unlock_put(info, parent, lh);
320 out:
321         return result;
322 }
323
324 int mdt_mfd_close(const struct lu_context *ctxt,
325                   struct mdt_file_data *mfd, 
326                   int unlink_orphan)
327 {
328         ENTRY;
329
330         if (mfd->mfd_mode & FMODE_WRITE) {
331                 /*mdt_put_write_access*/
332         } else if (mfd->mfd_mode & MDS_FMODE_EXEC) {
333                 /*mdt_allow_write_access*/
334         }
335
336         /* release reference on this object.
337          * it will be destroyed by lower layer if necessary.
338          */
339         mdt_object_put(ctxt, mfd->mfd_object);
340
341         mdt_mfd_put(mfd);
342         RETURN(0);
343 }
344
345 int mdt_close(struct mdt_thread_info *info)
346 {
347         struct mdt_export_data *med;
348         struct mdt_file_data   *mfd;
349         int rc;
350         ENTRY;
351
352         med = &mdt_info_req(info)->rq_export->exp_mdt_data;
353
354         spin_lock(&med->med_open_lock);
355         mfd = mdt_handle2mfd(&(info->mti_body->handle));
356         if (mfd == NULL) {
357                 spin_unlock(&med->med_open_lock);
358                 CDEBUG(D_INODE, "no handle for file close: fid = "DFID3
359                        ": cookie = "LPX64, PFID3(&info->mti_body->fid1),
360                        info->mti_body->handle.cookie);
361                 rc = -ESTALE;
362         } else {
363                 class_handle_unhash(&mfd->mfd_handle);
364                 list_del_init(&mfd->mfd_list);
365                 spin_unlock(&med->med_open_lock);
366         
367                 /* mdt_handle2mfd increases reference count on mfd,
368                  * we must drop it here. */
369                 mdt_mfd_put(mfd);
370
371                 rc = mdt_handle_last_unlink(info, mfd->mfd_object,
372                                             &RQF_MDS_CLOSE_LAST);
373
374                 rc = mdt_mfd_close(info->mti_ctxt, mfd, 1);
375         }
376         RETURN(rc);
377 }
378
379 int mdt_done_writing(struct mdt_thread_info *info)
380 {
381         ENTRY;
382
383         RETURN(0);
384 }