Whamcloud - gitweb
modified open handling, return lov ea to client.
[fs/lustre-release.git] / lustre / mdt / mdt_open.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mdt/mdt_open.c
5  *  Lustre Metadata Target (mdt) open/close file handling
6  *
7  *  Copyright (C) 2002-2006 Cluster File Systems, Inc.
8  *   Author: Huang Hua <huanghua@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_MDS
33
34 #include "mdt_internal.h"
35
36 /* we do nothing because we do not have refcount now */
37 static void mdt_mfd_get(void *mfdp)
38 {
39 }
40
41 /* Create a new mdt_file_data struct, initialize it, 
42  * and insert it to global hash table */ 
43 static struct mdt_file_data *mdt_mfd_new(void)
44 {
45         struct mdt_file_data *mfd;
46         ENTRY;
47
48         OBD_ALLOC_PTR(mfd);
49         if (mfd != NULL) {
50                 INIT_LIST_HEAD(&mfd->mfd_handle.h_link);
51                 INIT_LIST_HEAD(&mfd->mfd_list);
52                 class_handle_hash(&mfd->mfd_handle, mdt_mfd_get);
53         } else
54                 CERROR("mdt: out of memory\n");
55
56         RETURN(mfd);
57 }
58
59 /* Find the mfd pointed to by handle in global hash table. */
60 static struct mdt_file_data *mdt_handle2mfd(const struct lustre_handle *handle)
61 {
62         ENTRY;
63         LASSERT(handle != NULL);
64         RETURN(class_handle2object(handle->cookie));
65 }
66
67 /* free mfd */
68 static void mdt_mfd_free(struct mdt_file_data *mfd)
69 {
70         LASSERT(list_empty(&mfd->mfd_handle.h_link));
71         OBD_FREE_PTR(mfd);
72 }
73
74 static int mdt_mfd_open(struct mdt_thread_info *info,
75                         struct mdt_object *o, 
76                         int flags, int created)
77 {
78         struct mdt_export_data *med;
79         struct mdt_file_data   *mfd;
80         struct mdt_body        *repbody;
81         struct md_attr         *ma = &info->mti_attr;
82         struct lu_attr         *la = &ma->ma_attr;
83         struct ptlrpc_request  *req = mdt_info_req(info);
84         int                     rc = 0;
85         ENTRY;
86
87         med = &req->rq_export->exp_mdt_data;
88         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
89
90         if (!created) {
91                 /* we have to get attr & lov ea for this object*/
92                 rc = mo_attr_get(info->mti_ctxt, mdt_object_child(o), la);
93                 if (rc == 0 && S_ISREG(la->la_mode)) {
94                         ma->ma_valid |= MA_INODE;
95                         rc = mo_xattr_get(info->mti_ctxt, 
96                                           mdt_object_child(o),
97                                           ma->ma_lmm, 
98                                           ma->ma_lmm_size,
99                                           XATTR_NAME_LOV);
100                         if (rc >= 0) {
101                                 ma->ma_lmm_size = rc;
102                                 rc = 0;
103                                 ma->ma_valid |= MA_LOV;
104                         }
105                 }
106         }
107         if (rc == 0){
108                 if (!S_ISREG(la->la_mode) &&
109                     !S_ISDIR(la->la_mode) &&
110                      (req->rq_export->exp_connect_flags & OBD_CONNECT_NODEVOH))
111                         /* If client supports this, do not return open handle
112                         *  for special device nodes */
113                         RETURN(0);
114
115                 /* FIXME:maybe this can be done earlier? */
116                 if (S_ISDIR(la->la_mode)) {
117                         if (flags & (MDS_OPEN_CREAT | FMODE_WRITE)) {
118                                 /* we are trying to create or 
119                                  * write an existing dir. */
120                                 rc = -EISDIR;
121                         }
122                 } else if (flags & MDS_OPEN_DIRECTORY) 
123                         rc = -ENOTDIR;
124         }
125         if (rc != 0)
126                 RETURN(rc);
127         if (ma->ma_valid & MA_INODE)
128                 mdt_pack_attr2body(repbody, la, mdt_object_fid(o));
129         if (ma->ma_lmm_size && ma->ma_valid & MA_LOV) {
130                 CERROR("LOVLOV size = %d\n", ma->ma_lmm_size);
131                 repbody->eadatasize = ma->ma_lmm_size;
132                 if (S_ISDIR(la->la_mode))
133                         repbody->valid |= OBD_MD_FLDIREA;
134                 else
135                         repbody->valid |= OBD_MD_FLEASIZE;
136         }
137
138         if (flags & FMODE_WRITE) {
139                 /*mds_get_write_access*/
140         } else if (flags & MDS_FMODE_EXEC) {
141                 /*mds_deny_write_access*/
142         }
143
144         mfd = mdt_mfd_new();
145         if (mfd != NULL) {
146                 /* keep a reference on this object for this open,
147                 * and is released by mdt_mfd_close() */
148                 mdt_object_get(info->mti_ctxt, o);
149
150                 mfd->mfd_mode = flags;
151                 mfd->mfd_object = o;
152                 mfd->mfd_xid = mdt_info_req(info)->rq_xid;
153
154                 spin_lock(&med->med_open_lock);
155                 list_add(&mfd->mfd_list, &med->med_open_head);
156                 spin_unlock(&med->med_open_lock);
157
158                 repbody->handle.cookie = mfd->mfd_handle.h_cookie;
159         } else 
160                 rc = -ENOMEM;
161
162         RETURN(rc);
163 }
164
165 int mdt_open_by_fid(struct mdt_thread_info* info, const struct lu_fid *fid,
166                     __u32 flags)
167 {
168         struct mdt_object *o;
169         struct lu_attr    *la = &info->mti_attr.ma_attr;
170         int                rc;
171         ENTRY;
172
173         o = mdt_object_find(info->mti_ctxt, info->mti_mdt, fid);
174         if (!IS_ERR(o)) {
175                 if (mdt_object_exists(info->mti_ctxt, &o->mot_obj.mo_lu)) {
176                         if (la->la_flags & MDS_OPEN_EXCL &&
177                             la->la_flags & MDS_OPEN_CREAT)
178                                 rc = -EEXIST;
179                         else 
180                                 rc = mdt_mfd_open(info, o, flags, 0);
181                 } else {
182                         rc = -ENOENT;
183                         if (la->la_flags & MDS_OPEN_CREAT) {
184                                 rc = mo_object_create(info->mti_ctxt, 
185                                                       mdt_object_child(o),
186                                                       &info->mti_attr);
187                                 if (rc == 0)
188                                         rc = mdt_mfd_open(info, o, flags, 1);
189                         }
190                 }
191                 mdt_object_put(info->mti_ctxt, o);
192         } else
193                 rc = PTR_ERR(o);
194
195         RETURN(rc);
196 }
197
198 int mdt_pin(struct mdt_thread_info* info)
199 {
200         struct mdt_body *body;
201         int rc;
202         ENTRY;
203         
204         rc = req_capsule_pack(&info->mti_pill);
205         if (rc == 0) {
206                 body = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
207                 rc = mdt_open_by_fid(info, &body->fid1, body->flags);
208         }
209         RETURN(rc);
210 }
211
212 /*  Get an internal lock on the inode number (but not generation) to sync
213  *  new inode creation with inode unlink (bug 2029).  If child_lockh is NULL
214  *  we just get the lock as a barrier to wait for other holders of this lock,
215  *  and drop it right away again. */
216 int mdt_lock_new_child(struct mdt_thread_info *info, 
217                        struct mdt_object *o,
218                        struct mdt_lock_handle *child_lockh)
219 {
220         struct mdt_lock_handle lockh;
221         int rc;
222         ENTRY;
223         
224         if (child_lockh == NULL)
225                 child_lockh = &lockh;
226
227         mdt_lock_handle_init(&lockh);
228         lockh.mlh_mode = LCK_EX;
229         rc = mdt_object_lock(info, o, &lockh, MDS_INODELOCK_UPDATE);
230
231         if (rc != ELDLM_OK)
232                 CERROR("can not mdt_object_lock: %d\n", rc);
233         else if (child_lockh == &lockh)
234                 mdt_object_unlock(info, o, &lockh);
235
236         RETURN(rc);
237 }
238
239 int mdt_reint_open(struct mdt_thread_info *info)
240 {
241         struct mdt_device      *mdt = info->mti_mdt;
242         struct mdt_object      *parent;
243         struct mdt_object      *child;
244         struct mdt_lock_handle *lh;
245         struct ldlm_reply      *ldlm_rep;
246         struct lu_fid          *child_fid = &info->mti_tmp_fid1;
247         struct md_attr         *ma = &info->mti_attr;
248         struct lu_attr         *la = &ma->ma_attr;
249         int                     result;
250         int                     created = 0;
251         struct mdt_reint_record *rr = &info->mti_rr;
252         ENTRY;
253         
254         ma->ma_lmm = req_capsule_server_get(&info->mti_pill,
255                                             &RMF_MDT_MD);
256         ma->ma_lmm_size = req_capsule_get_size(&info->mti_pill,
257                                                &RMF_MDT_MD,
258                                                RCL_SERVER);
259
260         if (strlen(rr->rr_name) == 0) {
261                 /* reint partial remote open */
262                 RETURN(mdt_open_by_fid(info, rr->rr_fid1, la->la_flags));
263         }
264
265         /* we now have no resent message, so it must be an intent */
266         /*TODO: remove this and add MDS_CHECK_RESENT if resent enabled*/
267         LASSERT(info->mti_pill.rc_fmt == &RQF_LDLM_INTENT_OPEN);
268
269         ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
270
271         intent_set_disposition(ldlm_rep, DISP_LOOKUP_EXECD);
272         lh = &info->mti_lh[MDT_LH_PARENT];
273         lh->mlh_mode = LCK_PW;
274         parent = mdt_object_find_lock(info, rr->rr_fid1, lh, 
275                                       MDS_INODELOCK_UPDATE);
276         if (IS_ERR(parent)) {
277                 /* just simulate child not existing */
278                 intent_set_disposition(ldlm_rep, DISP_LOOKUP_NEG);
279                 GOTO(out, result = PTR_ERR(parent));
280         }
281
282         result = mdo_lookup(info->mti_ctxt, mdt_object_child(parent),
283                             rr->rr_name, child_fid);
284         if (result != 0 && result != -ENOENT) {
285                 GOTO(out_parent, result);
286         }
287
288         if (result == -ENOENT) {
289                 intent_set_disposition(ldlm_rep, DISP_LOOKUP_NEG);
290                 if (!(la->la_flags & MDS_OPEN_CREAT))
291                         GOTO(out_parent, result);
292                 *child_fid = *info->mti_rr.rr_fid2;
293         } else {
294                 intent_set_disposition(ldlm_rep, DISP_LOOKUP_POS);
295                 if (la->la_flags & MDS_OPEN_EXCL &&
296                     la->la_flags & MDS_OPEN_CREAT)
297                         GOTO(out_parent, result = -EEXIST);
298         }
299
300         child = mdt_object_find(info->mti_ctxt, mdt, child_fid);
301         if (IS_ERR(child))
302                 GOTO(out_parent, result = PTR_ERR(child));
303
304         if (result == -ENOENT) {
305                 /* not found and with MDS_OPEN_CREAT: let's create it */
306                 result = mdo_create(info->mti_ctxt,
307                                     mdt_object_child(parent),
308                                     rr->rr_name,
309                                     mdt_object_child(child),
310                                     rr->rr_tgt,
311                                     &info->mti_attr);
312                 intent_set_disposition(ldlm_rep, DISP_OPEN_CREATE);
313                 if (result != 0)
314                         GOTO(out_child, result);
315                 created = 1;
316         }
317
318         /* Open it now. */
319         result = mdt_mfd_open(info, child, la->la_flags, created);
320         intent_set_disposition(ldlm_rep, DISP_OPEN_OPEN);
321         GOTO(finish_open, result);
322
323 finish_open:
324         if (result != 0 && created) {
325                 mdo_unlink(info->mti_ctxt, mdt_object_child(parent),
326                            mdt_object_child(child), rr->rr_name,
327                            &info->mti_attr);
328         } 
329 out_child:
330         mdt_object_put(info->mti_ctxt, child);
331 out_parent:
332         mdt_object_unlock_put(info, parent, lh);
333 out:
334         return result;
335 }
336
337 int mdt_mfd_close(const struct lu_context *ctxt,
338                   struct mdt_file_data *mfd)
339 {
340         ENTRY;
341
342         if (mfd->mfd_mode & FMODE_WRITE) {
343                 /*mdt_put_write_access*/
344         } else if (mfd->mfd_mode & MDS_FMODE_EXEC) {
345                 /*mdt_allow_write_access*/
346         }
347
348         /* release reference on this object.
349          * it will be destroyed by lower layer if necessary.
350          */
351         mdt_object_put(ctxt, mfd->mfd_object);
352
353         mdt_mfd_free(mfd);
354         RETURN(0);
355 }
356
357 int mdt_close(struct mdt_thread_info *info)
358 {
359         struct mdt_export_data *med;
360         struct mdt_file_data   *mfd;
361         int rc;
362         ENTRY;
363
364         med = &mdt_info_req(info)->rq_export->exp_mdt_data;
365
366         spin_lock(&med->med_open_lock);
367         mfd = mdt_handle2mfd(&(info->mti_body->handle));
368         if (mfd == NULL) {
369                 spin_unlock(&med->med_open_lock);
370                 CDEBUG(D_INODE, "no handle for file close: fid = "DFID3
371                        ": cookie = "LPX64, PFID3(&info->mti_body->fid1),
372                        info->mti_body->handle.cookie);
373                 rc = -ESTALE;
374         } else {
375                 class_handle_unhash(&mfd->mfd_handle);
376                 list_del_init(&mfd->mfd_list);
377                 spin_unlock(&med->med_open_lock);
378         
379                 rc = mdt_handle_last_unlink(info, mfd->mfd_object,
380                                             &RQF_MDS_CLOSE_LAST);
381
382                 rc = mdt_mfd_close(info->mti_ctxt, mfd);
383         }
384         RETURN(rc);
385 }
386
387 int mdt_done_writing(struct mdt_thread_info *info)
388 {
389         ENTRY;
390
391         RETURN(0);
392 }