Whamcloud - gitweb
added open support: the simplest case, no CMD, no xattr.
[fs/lustre-release.git] / lustre / mdt / mdt_open.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mdt/mdt_open.c
5  *  Lustre Metadata Target (mdt) open/close file handling
6  *
7  *  Copyright (C) 2002-2006 Cluster File Systems, Inc.
8  *   Author: Huang Hua <huanghua@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_MDS
33
34 #include "mdt_internal.h"
35
36 /*
37  * MDS file data handling: file data holds a handle for a file opened
38  * by a client.
39  */
40
41 static void mdt_mfd_get(void *mfdp)
42 {
43         struct mdt_file_data *mfd = mfdp;
44
45         atomic_inc(&mfd->mfd_refcount);
46         CDEBUG(D_INFO, "GETting mfd %p : new refcount %d\n", mfd,
47                atomic_read(&mfd->mfd_refcount));
48 }
49
50 /* Create a new mdt_file_data struct. 
51  * reference is set to 1 */
52 static struct mdt_file_data *mdt_mfd_new(void)
53 {
54         struct mdt_file_data *mfd;
55
56         OBD_ALLOC_PTR(mfd);
57         if (mfd == NULL) {
58                 CERROR("mds: out of memory\n");
59                 return NULL;
60         }
61
62         atomic_set(&mfd->mfd_refcount, 1);
63
64         INIT_LIST_HEAD(&mfd->mfd_handle.h_link);
65         INIT_LIST_HEAD(&mfd->mfd_list);
66         class_handle_hash(&mfd->mfd_handle, mdt_mfd_get);
67
68         return mfd;
69 }
70
71 /* Get a new reference on the mfd pointed to by handle, if handle is still
72  * valid.  Caller must drop reference with mdt_mfd_put(). */
73 static struct mdt_file_data *mdt_handle2mfd(const struct lustre_handle *handle)
74 {
75         ENTRY;
76         LASSERT(handle != NULL);
77         RETURN(class_handle2object(handle->cookie));
78 }
79
80 /* Drop mfd reference, freeing struct if this is the last one. */
81 static void mdt_mfd_put(struct mdt_file_data *mfd)
82 {
83         CDEBUG(D_INFO, "PUTting mfd %p : new refcount %d\n", mfd,
84                atomic_read(&mfd->mfd_refcount) - 1);
85         LASSERT(atomic_read(&mfd->mfd_refcount) > 0 &&
86                 atomic_read(&mfd->mfd_refcount) < 0x5a5a);
87         if (atomic_dec_and_test(&mfd->mfd_refcount)) {
88                 LASSERT(list_empty(&mfd->mfd_handle.h_link));
89                 OBD_FREE_PTR(mfd);
90         }
91 }
92
93 static int mdt_object_open(struct mdt_thread_info *info,
94                            struct mdt_object *o, 
95                            int flags)
96 {
97         struct mdt_export_data *med;
98         struct mdt_file_data   *mfd;
99         struct mdt_body        *repbody;
100         struct lov_mds_md      *lmm;
101         int                     rc = 0;
102         ENTRY;
103
104         med = &mdt_info_req(info)->rq_export->exp_mdt_data;
105         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
106         lmm = req_capsule_server_get(&info->mti_pill, &RMF_MDT_MD);
107
108         rc = mo_attr_get(info->mti_ctxt, mdt_object_child(o),
109                          &info->mti_attr);
110         if (rc != 0)
111                 GOTO(out, rc);
112
113         mdt_pack_attr2body(repbody, &info->mti_attr);
114         repbody->fid1 = *mdt_object_fid(o);
115         repbody->valid |= OBD_MD_FLID;
116
117 /*
118         rc = mo_xattr_get(info->mti_ctxt, mdt_object_child(o),
119                           lmm, info->mti_mdt->mdt_max_mdsize, "lov");
120         if (rc < 0)
121                 GOTO(out, rc = -EINVAL);
122
123         if (S_ISDIR(info->mti_attr.la_mode))
124                 repbody->valid |= OBD_MD_FLDIREA;
125         else
126                 repbody->valid |= OBD_MD_FLEASIZE;
127         repbody->eadatasize = rc;
128         rc = 0;
129 */
130         mfd = mdt_mfd_new();
131         if (mfd == NULL) {
132                 CERROR("mds: out of memory\n");
133                 GOTO(out, rc = -ENOMEM);
134         }
135
136         if (flags & FMODE_WRITE) {
137                 /*mds_get_write_access*/
138         } else if (flags & MDS_FMODE_EXEC) {
139                 /*mds_deny_write_access*/
140         }
141
142         /* keep a reference on this object for this open,
143          * and is released by mdt_mfd_close() */
144         mdt_object_get(info->mti_ctxt, o);
145
146         mfd->mfd_mode = flags;
147         mfd->mfd_object = o;
148         mfd->mfd_xid = mdt_info_req(info)->rq_xid;
149
150         spin_lock(&med->med_open_lock);
151         list_add(&mfd->mfd_list, &med->med_open_head);
152         spin_unlock(&med->med_open_lock);
153
154         repbody->handle.cookie = mfd->mfd_handle.h_cookie;
155
156         RETURN(rc);
157 out:
158         return rc;
159 }
160
161 int mdt_pin(struct mdt_thread_info* info)
162 {
163         struct mdt_object *o;
164         int rc;
165         ENTRY;
166
167         o = mdt_object_find(info->mti_ctxt, info->mti_mdt, &info->mti_body->fid1);
168         if (!IS_ERR(o)) {
169                 if (mdt_object_exists(info->mti_ctxt, &o->mot_obj.mo_lu)) {
170                         rc = mdt_object_open(info, o, info->mti_body->flags);
171                         mdt_object_put(info->mti_ctxt, o);
172                 } else
173                         rc = -ENOENT;
174         } else
175                 rc = PTR_ERR(o);
176
177         RETURN(rc);
178 }
179
180 /*  Get an internal lock on the inode number (but not generation) to sync
181  *  new inode creation with inode unlink (bug 2029).  If child_lockh is NULL
182  *  we just get the lock as a barrier to wait for other holders of this lock,
183  *  and drop it right away again. */
184 int mdt_lock_new_child(struct mdt_thread_info *info, 
185                        struct mdt_object *o,
186                        struct mdt_lock_handle *child_lockh)
187 {
188         struct mdt_lock_handle lockh;
189         int rc;
190         
191         if (child_lockh == NULL)
192                 child_lockh = &lockh;
193
194         mdt_lock_handle_init(&lockh);
195         lockh.mlh_mode = LCK_EX;
196         rc = mdt_object_lock(info->mti_mdt->mdt_namespace, 
197                              o, &lockh, MDS_INODELOCK_UPDATE);
198
199         if (rc != ELDLM_OK)
200                 CERROR("can not mdt_object_lock: %d\n", rc);
201         else if (child_lockh == &lockh)
202                 mdt_object_unlock(info->mti_mdt->mdt_namespace, 
203                                   o, &lockh);
204
205         RETURN(rc);
206 }
207
208 int mdt_reint_open(struct mdt_thread_info *info)
209 {
210         struct mdt_device      *mdt = info->mti_mdt;
211         struct mdt_object      *parent;
212         struct mdt_object      *child;
213         struct mdt_lock_handle *lh;
214         struct ldlm_reply      *ldlm_rep;
215         struct ptlrpc_request  *req = mdt_info_req(info);
216         struct mdt_body        *body;
217         struct lu_fid           child_fid;
218         int                     result;
219         int                     created = 0;
220         struct mdt_reint_record *rr = &info->mti_rr;
221         ENTRY;
222
223         /* we now have no resent message, so it must be an intent */
224         LASSERT(info->mti_pill.rc_fmt == &RQF_LDLM_INTENT_OPEN);
225
226         /*TODO: MDS_CHECK_RESENT */;
227
228         ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
229         body = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
230
231         lh = &info->mti_lh[MDT_LH_PARENT];
232         lh->mlh_mode = LCK_PW;
233         parent = mdt_object_find_lock(info->mti_ctxt, mdt, rr->rr_fid1,
234                                       lh, MDS_INODELOCK_UPDATE);
235         if (IS_ERR(parent))
236                 GOTO(out, result = PTR_ERR(parent));
237
238         result = mdo_lookup(info->mti_ctxt, mdt_object_child(parent),
239                             rr->rr_name, &child_fid);
240         if (result && result != -ENOENT) {
241                 GOTO(out_parent, result);
242         }
243
244         intent_set_disposition(ldlm_rep, DISP_LOOKUP_EXECD);
245
246         if (result == -ENOENT) {
247                 intent_set_disposition(ldlm_rep, DISP_LOOKUP_NEG);
248                 if (!(info->mti_attr.la_flags & MDS_OPEN_CREAT))
249                         GOTO(out_parent, result);
250                 if (req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
251                         GOTO(out_parent, result = -EROFS);
252                 child_fid = *info->mti_rr.rr_fid2;
253         } else {
254                 intent_set_disposition(ldlm_rep, DISP_LOOKUP_POS);
255                 if (info->mti_attr.la_flags & MDS_OPEN_EXCL &&
256                     info->mti_attr.la_flags & MDS_OPEN_CREAT)
257                         GOTO(out_parent, result = -EEXIST);
258
259         }
260
261         child = mdt_object_find(info->mti_ctxt, mdt, &child_fid);
262         if (IS_ERR(child))
263                 GOTO(out_parent, result = PTR_ERR(child));
264
265         if (result == -ENOENT) {
266                 /* not found and with MDS_OPEN_CREAT: let's create something */
267                 result = mdo_create(info->mti_ctxt,
268                                     mdt_object_child(parent),
269                                     rr->rr_name,
270                                     mdt_object_child(child),
271                                     &info->mti_attr);
272                 intent_set_disposition(ldlm_rep, DISP_OPEN_CREATE);
273                 if (result != 0)
274                         GOTO(out_child, result);
275                 created = 1;
276         } else
277                 intent_set_disposition(ldlm_rep, DISP_OPEN_OPEN);
278
279         /* Open it now. */
280         result = mdt_object_open(info, child, info->mti_attr.la_flags);
281         GOTO(destroy_child, result);
282
283 destroy_child:
284         if (result != 0 && created) {
285                 mdo_unlink(info->mti_ctxt, mdt_object_child(parent),
286                            mdt_object_child(child), rr->rr_name);
287         } else if (created) {
288                 mdt_lock_new_child(info, child, NULL);
289         }
290 out_child:
291         mdt_object_put(info->mti_ctxt, child);
292 out_parent:
293         mdt_object_unlock(mdt->mdt_namespace, parent, lh);
294         mdt_object_put(info->mti_ctxt, parent);
295 out:
296         return result;
297 }
298
299 int mdt_mfd_close(const struct lu_context *ctxt,
300                   struct mdt_file_data *mfd, 
301                   int unlink_orphan)
302 {
303         ENTRY;
304
305         if (mfd->mfd_mode & FMODE_WRITE) {
306                 /*mdt_put_write_access*/
307         } else if (mfd->mfd_mode & MDS_FMODE_EXEC) {
308                 /*mdt_allow_write_access*/
309         }
310
311         /* release reference on this object.
312          * it will be destroyed by lower layer if necessary.
313          */
314         mdt_object_put(ctxt, mfd->mfd_object);
315
316         mdt_mfd_put(mfd);
317         RETURN(0);
318 }
319
320 int mdt_close(struct mdt_thread_info *info)
321 {
322         struct mdt_export_data *med;
323         struct mdt_body        *repbody;
324         struct mdt_file_data   *mfd;
325         struct mdt_object      *o;
326         struct lov_mds_md      *lmm;
327         int rc;
328         ENTRY;
329
330         med = &mdt_info_req(info)->rq_export->exp_mdt_data;
331         LASSERT(med);        
332
333         spin_lock(&med->med_open_lock);
334         mfd = mdt_handle2mfd(&(info->mti_body->handle));
335         if (mfd == NULL) {
336                 spin_unlock(&med->med_open_lock);
337                 CDEBUG(D_INODE, "no handle for file close ino "DFID3
338                        ": cookie "LPX64, PFID3(&info->mti_body->fid1), 
339                        info->mti_body->handle.cookie);
340                 RETURN(-ESTALE);
341         }
342         class_handle_unhash(&mfd->mfd_handle);
343         list_del_init(&mfd->mfd_list);
344         spin_unlock(&med->med_open_lock);
345
346         o = mfd->mfd_object;
347         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
348         lmm = req_capsule_server_get(&info->mti_pill, &RMF_MDT_MD);
349
350         rc = mo_attr_get(info->mti_ctxt, mdt_object_child(o),
351                          &info->mti_attr);
352         if (rc == 0) {
353                 mdt_pack_attr2body(repbody, &info->mti_attr);
354                 repbody->fid1 = *mdt_object_fid(o);
355                 repbody->valid |= OBD_MD_FLID;
356 /*
357                 rc = mo_xattr_get(info->mti_ctxt, mdt_object_child(o),
358                                   lmm, info->mti_mdt->mdt_max_mdsize, "lov");
359                 if (rc >= 0) {
360                         if (S_ISDIR(info->mti_attr.la_mode))
361                                 repbody->valid |= OBD_MD_FLDIREA;
362                         else
363                                 repbody->valid |= OBD_MD_FLEASIZE;
364                         repbody->eadatasize = rc;
365                         rc = 0;
366                 }
367 */
368         }
369
370         rc = mdt_mfd_close(info->mti_ctxt, mfd, 1);
371
372         RETURN(rc);
373 }
374
375 int mdt_done_writing(struct mdt_thread_info *info)
376 {
377         ENTRY;
378
379         RETURN(0);
380 }