Whamcloud - gitweb
ed4f35e257aee247fb20aa39ef4220314793debc
[fs/lustre-release.git] / lustre / mdt / mdt_open.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mdt/mdt_open.c
5  *  Lustre Metadata Target (mdt) open/close file handling
6  *
7  *  Copyright (C) 2002-2006 Cluster File Systems, Inc.
8  *   Author: Huang Hua <huanghua@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_MDS
33
34 #include "mdt_internal.h"
35
36 /* we do nothing because we do not have refcount now */
37 static void mdt_mfd_get(void *mfdp)
38 {
39 }
40
41 /* Create a new mdt_file_data struct, initialize it, 
42  * and insert it to global hash table */ 
43 static struct mdt_file_data *mdt_mfd_new(void)
44 {
45         struct mdt_file_data *mfd;
46         ENTRY;
47
48         OBD_ALLOC_PTR(mfd);
49         if (mfd != NULL) {
50                 INIT_LIST_HEAD(&mfd->mfd_handle.h_link);
51                 INIT_LIST_HEAD(&mfd->mfd_list);
52                 class_handle_hash(&mfd->mfd_handle, mdt_mfd_get);
53         } else
54                 CERROR("mdt: out of memory\n");
55
56         RETURN(mfd);
57 }
58
59 /* Find the mfd pointed to by handle in global hash table. */
60 static struct mdt_file_data *mdt_handle2mfd(const struct lustre_handle *handle)
61 {
62         ENTRY;
63         LASSERT(handle != NULL);
64         RETURN(class_handle2object(handle->cookie));
65 }
66
67 /* free mfd */
68 static void mdt_mfd_free(struct mdt_file_data *mfd)
69 {
70         LASSERT(list_empty(&mfd->mfd_handle.h_link));
71         OBD_FREE_PTR(mfd);
72 }
73
74 static int mdt_mfd_open(struct mdt_thread_info *info,
75                         struct mdt_object *o, 
76                         int flags)
77 {
78         struct mdt_export_data *med;
79         struct mdt_file_data   *mfd;
80         struct mdt_body        *repbody;
81         struct lov_mds_md      *lmm = NULL;
82         struct lu_attr         *attr = &info->mti_attr;
83         struct ptlrpc_request  *req = mdt_info_req(info);
84         int                     rc = 0;
85         ENTRY;
86
87         med = &req->rq_export->exp_mdt_data;
88         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
89         if (req_capsule_has_field(&info->mti_pill, &RMF_MDT_MD))
90                 lmm = req_capsule_server_get(&info->mti_pill, &RMF_MDT_MD);
91
92         rc = mo_attr_get(info->mti_ctxt, mdt_object_child(o), attr);
93         if (rc == 0) {
94                 if (!S_ISREG(attr->la_mode) &&
95                     !S_ISDIR(attr->la_mode) &&
96                      (req->rq_export->exp_connect_flags & OBD_CONNECT_NODEVOH))
97                         /* If client supports this, do not return open handle
98                         *  for special device nodes */
99                         RETURN(0);
100
101                 /* FIXME:maybe this can be done earlier? */
102                 if (S_ISDIR(attr->la_mode)) {
103                         if (flags & (MDS_OPEN_CREAT | FMODE_WRITE)) {
104                                 /* we are trying to create or 
105                                  * write an existing dir. */
106                                 rc = -EISDIR;
107                         }
108                 } else if (flags & MDS_OPEN_DIRECTORY) 
109                         rc = -ENOTDIR;
110         }
111         if (rc != 0) {
112                 if (rc == -EREMOTE) {
113                         repbody->fid1 = *mdt_object_fid(o);
114                         repbody->valid |= OBD_MD_FLID;
115                 }
116                 RETURN(rc);
117         }
118
119         mdt_pack_attr2body(repbody, attr, mdt_object_fid(o));
120
121 /*
122         if (lmm) {
123                 rc = mo_xattr_get(info->mti_ctxt, mdt_object_child(o),
124                                   lmm, info->mti_mdt->mdt_max_mdsize, 
125                                   XATTR_NAME_LOV);
126                 if (rc < 0)
127                         RETURN(-EINVAL);
128                 if (S_ISDIR(attr->la_mode))
129                         repbody->valid |= OBD_MD_FLDIREA;
130                 else
131                         repbody->valid |= OBD_MD_FLEASIZE;
132                 repbody->eadatasize = rc;
133                 rc = 0;
134         }
135 */
136         if (flags & FMODE_WRITE) {
137                 /*mds_get_write_access*/
138         } else if (flags & MDS_FMODE_EXEC) {
139                 /*mds_deny_write_access*/
140         }
141
142         mfd = mdt_mfd_new();
143         if (mfd != NULL) {
144                 /* keep a reference on this object for this open,
145                 * and is released by mdt_mfd_close() */
146                 mdt_object_get(info->mti_ctxt, o);
147
148                 mfd->mfd_mode = flags;
149                 mfd->mfd_object = o;
150                 mfd->mfd_xid = mdt_info_req(info)->rq_xid;
151
152                 spin_lock(&med->med_open_lock);
153                 list_add(&mfd->mfd_list, &med->med_open_head);
154                 spin_unlock(&med->med_open_lock);
155
156                 repbody->handle.cookie = mfd->mfd_handle.h_cookie;
157         } else 
158                 rc = -ENOMEM;
159
160         RETURN(rc);
161 }
162
163 int mdt_open_by_fid(struct mdt_thread_info* info, const struct lu_fid *fid,
164                     __u32 flags)
165 {
166         struct mdt_object *o;
167         int rc;
168         ENTRY;
169
170         o = mdt_object_find(info->mti_ctxt, info->mti_mdt, fid);
171         if (!IS_ERR(o)) {
172                 if (mdt_object_exists(info->mti_ctxt, &o->mot_obj.mo_lu)) {
173                         rc = mdt_mfd_open(info, o, flags);
174                 } else {
175                         rc = -ENOENT;
176                 }
177                 mdt_object_put(info->mti_ctxt, o);
178         } else
179                 rc = PTR_ERR(o);
180
181         RETURN(rc);
182 }
183
184 int mdt_pin(struct mdt_thread_info* info)
185 {
186         struct mdt_body *body;
187         int rc;
188         ENTRY;
189         
190         rc = req_capsule_pack(&info->mti_pill);
191         if (rc == 0) {
192                 body = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
193                 rc = mdt_open_by_fid(info, &body->fid1, body->flags);
194         }
195         RETURN(rc);
196 }
197
198 /*  Get an internal lock on the inode number (but not generation) to sync
199  *  new inode creation with inode unlink (bug 2029).  If child_lockh is NULL
200  *  we just get the lock as a barrier to wait for other holders of this lock,
201  *  and drop it right away again. */
202 int mdt_lock_new_child(struct mdt_thread_info *info, 
203                        struct mdt_object *o,
204                        struct mdt_lock_handle *child_lockh)
205 {
206         struct mdt_lock_handle lockh;
207         int rc;
208         ENTRY;
209         
210         if (child_lockh == NULL)
211                 child_lockh = &lockh;
212
213         mdt_lock_handle_init(&lockh);
214         lockh.mlh_mode = LCK_EX;
215         rc = mdt_object_lock(info, o, &lockh, MDS_INODELOCK_UPDATE);
216
217         if (rc != ELDLM_OK)
218                 CERROR("can not mdt_object_lock: %d\n", rc);
219         else if (child_lockh == &lockh)
220                 mdt_object_unlock(info, o, &lockh);
221
222         RETURN(rc);
223 }
224
225 int mdt_reint_open(struct mdt_thread_info *info)
226 {
227         struct mdt_device      *mdt = info->mti_mdt;
228         struct mdt_object      *parent;
229         struct mdt_object      *child;
230         struct mdt_lock_handle *lh;
231         struct ldlm_reply      *ldlm_rep;
232         struct lu_fid          *child_fid = &info->mti_tmp_fid1;
233         int                     result;
234         int                     created = 0;
235         struct mdt_reint_record *rr = &info->mti_rr;
236         ENTRY;
237
238         if (strlen(rr->rr_name) == 0) {
239                 /* reint partial remote open */
240                 RETURN(mdt_open_by_fid(info, rr->rr_fid1, 
241                                        info->mti_attr.la_flags));
242         }
243
244         /* we now have no resent message, so it must be an intent */
245         /*TODO: remove this and add MDS_CHECK_RESENT if resent enabled*/
246         LASSERT(info->mti_pill.rc_fmt == &RQF_LDLM_INTENT_OPEN);
247
248         ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
249
250         intent_set_disposition(ldlm_rep, DISP_LOOKUP_EXECD);
251         lh = &info->mti_lh[MDT_LH_PARENT];
252         lh->mlh_mode = LCK_PW;
253         parent = mdt_object_find_lock(info, rr->rr_fid1, lh, 
254                                       MDS_INODELOCK_UPDATE);
255         if (IS_ERR(parent)) {
256                 /* just simulate child not existing */
257                 intent_set_disposition(ldlm_rep, DISP_LOOKUP_NEG);
258                 GOTO(out, result = PTR_ERR(parent));
259         }
260
261         result = mdo_lookup(info->mti_ctxt, mdt_object_child(parent),
262                             rr->rr_name, child_fid);
263         if (result != 0 && result != -ENOENT) {
264                 GOTO(out_parent, result);
265         }
266
267         if (result == -ENOENT) {
268                 intent_set_disposition(ldlm_rep, DISP_LOOKUP_NEG);
269                 if (!(info->mti_attr.la_flags & MDS_OPEN_CREAT))
270                         GOTO(out_parent, result);
271                 *child_fid = *info->mti_rr.rr_fid2;
272         } else {
273                 intent_set_disposition(ldlm_rep, DISP_LOOKUP_POS);
274                 if (info->mti_attr.la_flags & MDS_OPEN_EXCL &&
275                     info->mti_attr.la_flags & MDS_OPEN_CREAT)
276                         GOTO(out_parent, result = -EEXIST);
277                 /* child_fid is filled by mdo_lookup(). */
278                 LASSERT(lu_fid_eq(child_fid, info->mti_rr.rr_fid2));
279         }
280
281         child = mdt_object_find(info->mti_ctxt, mdt, child_fid);
282         if (IS_ERR(child))
283                 GOTO(out_parent, result = PTR_ERR(child));
284
285         if (result == -ENOENT) {
286                 /* not found and with MDS_OPEN_CREAT: let's create it */
287                 result = mdo_create(info->mti_ctxt,
288                                     mdt_object_child(parent),
289                                     rr->rr_name,
290                                     mdt_object_child(child),
291                                     rr->rr_tgt,
292                                     &info->mti_attr);
293                 intent_set_disposition(ldlm_rep, DISP_OPEN_CREATE);
294                 if (result != 0)
295                         GOTO(out_child, result);
296                 created = 1;
297         }
298
299         /* Open it now. */
300         result = mdt_mfd_open(info, child, info->mti_attr.la_flags);
301         intent_set_disposition(ldlm_rep, DISP_OPEN_OPEN);
302         GOTO(finish_open, result);
303
304 finish_open:
305         if (result != 0 && result != -EREMOTE && created) {
306                 mdo_unlink(info->mti_ctxt, mdt_object_child(parent),
307                            mdt_object_child(child), rr->rr_name);
308         } 
309 out_child:
310         mdt_object_put(info->mti_ctxt, child);
311 out_parent:
312         mdt_object_unlock_put(info, parent, lh);
313 out:
314         return result;
315 }
316
317 int mdt_mfd_close(const struct lu_context *ctxt,
318                   struct mdt_file_data *mfd)
319 {
320         ENTRY;
321
322         if (mfd->mfd_mode & FMODE_WRITE) {
323                 /*mdt_put_write_access*/
324         } else if (mfd->mfd_mode & MDS_FMODE_EXEC) {
325                 /*mdt_allow_write_access*/
326         }
327
328         /* release reference on this object.
329          * it will be destroyed by lower layer if necessary.
330          */
331         mdt_object_put(ctxt, mfd->mfd_object);
332
333         mdt_mfd_free(mfd);
334         RETURN(0);
335 }
336
337 int mdt_close(struct mdt_thread_info *info)
338 {
339         struct mdt_export_data *med;
340         struct mdt_file_data   *mfd;
341         int rc;
342         ENTRY;
343
344         med = &mdt_info_req(info)->rq_export->exp_mdt_data;
345
346         spin_lock(&med->med_open_lock);
347         mfd = mdt_handle2mfd(&(info->mti_body->handle));
348         if (mfd == NULL) {
349                 spin_unlock(&med->med_open_lock);
350                 CDEBUG(D_INODE, "no handle for file close: fid = "DFID3
351                        ": cookie = "LPX64, PFID3(&info->mti_body->fid1),
352                        info->mti_body->handle.cookie);
353                 rc = -ESTALE;
354         } else {
355                 class_handle_unhash(&mfd->mfd_handle);
356                 list_del_init(&mfd->mfd_list);
357                 spin_unlock(&med->med_open_lock);
358         
359                 rc = mdt_handle_last_unlink(info, mfd->mfd_object,
360                                             &RQF_MDS_CLOSE_LAST);
361
362                 rc = mdt_mfd_close(info->mti_ctxt, mfd);
363         }
364         RETURN(rc);
365 }
366
367 int mdt_done_writing(struct mdt_thread_info *info)
368 {
369         ENTRY;
370
371         RETURN(0);
372 }