Whamcloud - gitweb
Branch: b_new_cmd
[fs/lustre-release.git] / lustre / mdt / mdt_open.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mdt/mdt_open.c
5  *  Lustre Metadata Target (mdt) open/close file handling
6  *
7  *  Copyright (C) 2002-2006 Cluster File Systems, Inc.
8  *   Author: Huang Hua <huanghua@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_MDS
33
34 #include <linux/lustre_acl.h>
35 #include <lustre_mds.h>
36 #include "mdt_internal.h"
37
38 /* we do nothing because we do not have refcount now */
39 static void mdt_mfd_get(void *mfdp)
40 {
41 }
42
43 /* Create a new mdt_file_data struct, initialize it,
44  * and insert it to global hash table */
45 static struct mdt_file_data *mdt_mfd_new(void)
46 {
47         struct mdt_file_data *mfd;
48         ENTRY;
49
50         OBD_ALLOC_PTR(mfd);
51         if (mfd != NULL) {
52                 INIT_LIST_HEAD(&mfd->mfd_handle.h_link);
53                 INIT_LIST_HEAD(&mfd->mfd_list);
54                 class_handle_hash(&mfd->mfd_handle, mdt_mfd_get);
55         }
56         RETURN(mfd);
57 }
58
59 /* Find the mfd pointed to by handle in global hash table. */
60 static struct mdt_file_data *mdt_handle2mfd(const struct lustre_handle *handle)
61 {
62         ENTRY;
63         LASSERT(handle != NULL);
64         RETURN(class_handle2object(handle->cookie));
65 }
66
67 /* free mfd */
68 static void mdt_mfd_free(struct mdt_file_data *mfd)
69 {
70         LASSERT(list_empty(&mfd->mfd_handle.h_link));
71         LASSERT(list_empty(&mfd->mfd_list));
72         OBD_FREE_PTR(mfd);
73 }
74
75 static int mdt_create_data_obj(struct mdt_thread_info *info,
76                               struct mdt_object *p, struct mdt_object *o)
77 {
78         struct md_attr   *ma = &info->mti_attr;
79         struct mdt_reint_record *mrr = &info->mti_rr;
80
81         return mdo_create_data_object(info->mti_ctxt, mdt_object_child(p),
82                                  mdt_object_child(o), mrr->rr_eadata,
83                                  mrr->rr_eadatalen, ma);
84 }
85
86 static int mdt_mfd_open(struct mdt_thread_info *info,
87                         struct mdt_object *p,
88                         struct mdt_object *o,
89                         int flags, int created)
90 {
91         struct mdt_export_data *med;
92         struct mdt_file_data   *mfd;
93         struct mdt_body        *repbody;
94         struct md_attr         *ma = &info->mti_attr;
95         struct lu_attr         *la = &ma->ma_attr;
96         struct ptlrpc_request  *req = mdt_info_req(info);
97         struct ldlm_reply      *ldlm_rep;
98         int                     rc = 0;
99         int                     isreg, isdir, islnk;
100         ENTRY;
101
102         med = &req->rq_export->exp_mdt_data;
103         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
104
105         if (!created) {
106                 /* we have to get attr & lov ea for this object*/
107                 rc = mo_attr_get(info->mti_ctxt, mdt_object_child(o), ma);
108                 if (rc)
109                         RETURN(rc);
110         }
111         isreg = S_ISREG(la->la_mode);
112         isdir = S_ISDIR(la->la_mode);
113         islnk = S_ISLNK(la->la_mode);
114         if (ma->ma_valid & MA_INODE)
115                 mdt_pack_attr2body(repbody, la, mdt_object_fid(o));
116
117         /* if we are following a symlink, don't open */
118         if (islnk || (!isreg && !isdir &&
119             (req->rq_export->exp_connect_flags & OBD_CONNECT_NODEVOH))) {
120                 info->mti_trans_flags |= MDT_NONEED_TANSNO; 
121                 RETURN(0);
122         }
123         /* FIXME:maybe this can be done earlier? */
124         if (isdir) {
125                 if (flags & (MDS_OPEN_CREAT | FMODE_WRITE)) {
126                         /* we are trying to create or
127                          * write an existing dir. */
128                         RETURN(-EISDIR);
129                 }
130         } else if (flags & MDS_OPEN_DIRECTORY)
131                 RETURN(-ENOTDIR);
132
133
134         if (!isreg && !isdir &&
135             (req->rq_export->exp_connect_flags & OBD_CONNECT_NODEVOH))
136                 /* If client supports this, do not return open handle
137                 *  for special nodes */
138                 RETURN(0);
139
140         if ((isreg) && !(ma->ma_valid & MA_LOV)) {
141                 /*No EA, check whether it is will set regEA and dirEA
142                  *since in above attr get, these size might be zero,
143                  *so reset it, to retrieve the MD after create obj*/
144                 ma->ma_lmm_size = req_capsule_get_size(&info->mti_pill,
145                                                        &RMF_MDT_MD,
146                                                        RCL_SERVER);
147                 LASSERT(p != NULL);
148                 rc = mdt_create_data_obj(info, p, o);
149                 if (rc)
150                         RETURN(rc);
151         }
152
153         CDEBUG(D_INODE, "after open, ma_valid bit = "LPX64"\n", ma->ma_valid);
154         CDEBUG(D_INODE, "after open, lmm_size = %d\n", ma->ma_lmm_size);
155         repbody->eadatasize = 0;
156         repbody->aclsize = 0;
157
158         if (ma->ma_lmm_size && ma->ma_valid & MA_LOV) {
159                 repbody->eadatasize = ma->ma_lmm_size;
160                 if (isdir)
161                         repbody->valid |= OBD_MD_FLDIREA;
162                 else
163                         repbody->valid |= OBD_MD_FLEASIZE;
164         }
165
166         ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
167         
168         intent_set_disposition(ldlm_rep, DISP_OPEN_OPEN);
169         /*FIXME: should determine the offset dynamicly, 
170          *did not get ACL before shrink*/
171         lustre_shrink_reply(req, 2, repbody->eadatasize, 1);
172         lustre_shrink_reply(req, repbody->eadatasize ? 3 : 2, repbody->aclsize,
173                             0);
174
175         if (flags & FMODE_WRITE) {
176                 /*mds_get_write_access*/
177         } else if (flags & MDS_FMODE_EXEC) {
178                 /*mds_deny_write_access*/
179         }
180
181         /* (1) client wants transno when open to keep a ref count for replay;
182          *     see after_reply() and mdc_close_commit();
183          * (2) we need to record the transaction related stuff onto disk;
184          * But, question is: when do a rean only open, do we still need transno?
185          */
186         if (!created) {
187                 struct txn_param txn;
188                 struct thandle *th;
189                 struct dt_device *dt = info->mti_mdt->mdt_bottom;
190                 txn.tp_credits = 1;
191
192                 LASSERT(dt);
193                 th = dt->dd_ops->dt_trans_start(info->mti_ctxt, dt, &txn);
194                 if (!IS_ERR(th))
195                         dt->dd_ops->dt_trans_stop(info->mti_ctxt, th);
196                 else
197                         RETURN(PTR_ERR(th));
198         }
199
200         mfd = mdt_mfd_new();
201         if (mfd != NULL) {
202                 /* keep a reference on this object for this open,
203                 * and is released by mdt_mfd_close() */
204                 mdt_object_get(info->mti_ctxt, o);
205
206                 mfd->mfd_mode = flags;
207                 mfd->mfd_object = o;
208                 mfd->mfd_xid = mdt_info_req(info)->rq_xid;
209
210                 spin_lock(&med->med_open_lock);
211                 list_add(&mfd->mfd_list, &med->med_open_head);
212                 spin_unlock(&med->med_open_lock);
213
214                 repbody->handle.cookie = mfd->mfd_handle.h_cookie;
215         } else
216                 rc = -ENOMEM;
217
218         RETURN(rc);
219 }
220
221 int mdt_open_by_fid(struct mdt_thread_info* info, const struct lu_fid *fid,
222                     __u32 flags)
223 {
224         struct mdt_object *o;
225         struct lu_attr    *la = &info->mti_attr.ma_attr;
226         int                rc;
227         ENTRY;
228
229         o = mdt_object_find(info->mti_ctxt, info->mti_mdt, fid);
230         if (!IS_ERR(o)) {
231                 if (mdt_object_exists(info->mti_ctxt, &o->mot_obj.mo_lu) > 0) {
232                         if (la->la_flags & MDS_OPEN_EXCL &&
233                             la->la_flags & MDS_OPEN_CREAT)
234                                 rc = -EEXIST;
235                         else
236                                 rc = mdt_mfd_open(info, NULL, o, flags, 0);
237                 } else {
238                         rc = -ENOENT;
239                         if (la->la_flags & MDS_OPEN_CREAT) {
240                                 rc = mo_object_create(info->mti_ctxt,
241                                                       mdt_object_child(o),
242                                                       &info->mti_attr);
243                                 if (rc == 0)
244                                         rc = mdt_mfd_open(info, NULL, o, flags, 1);
245                         }
246                 }
247                 mdt_object_put(info->mti_ctxt, o);
248         } else
249                 rc = PTR_ERR(o);
250
251         RETURN(rc);
252 }
253
254 int mdt_pin(struct mdt_thread_info* info)
255 {
256         struct mdt_body *body;
257         int rc;
258         ENTRY;
259
260         rc = req_capsule_pack(&info->mti_pill);
261         if (rc == 0) {
262                 body = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
263                 rc = mdt_open_by_fid(info, &body->fid1, body->flags);
264         }
265         RETURN(rc);
266 }
267
268 int mdt_reint_open(struct mdt_thread_info *info)
269 {
270         struct mdt_device      *mdt = info->mti_mdt;
271         struct mdt_object      *parent;
272         struct mdt_object      *child;
273         struct mdt_lock_handle *lh;
274         struct ldlm_reply      *ldlm_rep;
275         struct lu_fid          *child_fid = &info->mti_tmp_fid1;
276         struct md_attr         *ma = &info->mti_attr;
277         struct lu_attr         *la = &ma->ma_attr;
278         int                     result;
279         int                     created = 0;
280         struct mdt_reint_record *rr = &info->mti_rr;
281         ENTRY;
282
283         req_capsule_set_size(&info->mti_pill, &RMF_MDT_MD, RCL_SERVER,
284                              mdt->mdt_max_mdsize);
285
286         result = req_capsule_pack(&info->mti_pill);
287         if (result)
288                 RETURN(result);
289
290         ma->ma_lmm = req_capsule_server_get(&info->mti_pill, &RMF_MDT_MD);
291         ma->ma_lmm_size = req_capsule_get_size(&info->mti_pill,
292                                                &RMF_MDT_MD, RCL_SERVER);
293
294         if (rr->rr_name[0] == 0) {
295                 /* reint partial remote open */
296                 RETURN(mdt_open_by_fid(info, rr->rr_fid1, la->la_flags));
297         }
298
299         /* we now have no resent message, so it must be an intent */
300         /*TODO: remove this and add MDS_CHECK_RESENT if resent enabled*/
301         LASSERT(info->mti_pill.rc_fmt == &RQF_LDLM_INTENT_OPEN);
302
303         ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
304
305         intent_set_disposition(ldlm_rep, DISP_LOOKUP_EXECD);
306         lh = &info->mti_lh[MDT_LH_PARENT];
307         if (!(la->la_flags & MDS_OPEN_CREAT))
308                 lh->mlh_mode = LCK_CR;
309         else
310                 lh->mlh_mode = LCK_EX;
311         parent = mdt_object_find_lock(info, rr->rr_fid1, lh,
312                                       MDS_INODELOCK_UPDATE);
313         if (IS_ERR(parent))
314                 GOTO(out, result = PTR_ERR(parent));
315
316         result = mdo_lookup(info->mti_ctxt, mdt_object_child(parent),
317                             rr->rr_name, child_fid);
318         if (result != 0 && result != -ENOENT)
319                 GOTO(out_parent, result);
320
321         if (result == -ENOENT) {
322                 intent_set_disposition(ldlm_rep, DISP_LOOKUP_NEG);
323                 if (!(la->la_flags & MDS_OPEN_CREAT))
324                         GOTO(out_parent, result);
325                 *child_fid = *info->mti_rr.rr_fid2;
326                 /* new object will be created. see the following */
327         } else {
328                 intent_set_disposition(ldlm_rep, DISP_LOOKUP_POS);
329                 if ((la->la_flags & MDS_OPEN_EXCL &&
330                          la->la_flags & MDS_OPEN_CREAT))
331                         GOTO(out_parent, result = -EEXIST);
332         }
333
334         child = mdt_object_find(info->mti_ctxt, mdt, child_fid);
335         if (IS_ERR(child))
336                 GOTO(out_parent, result = PTR_ERR(child));
337
338         if (result == -ENOENT) {
339                 /* not found and with MDS_OPEN_CREAT: let's create it */
340                 result = mdo_create(info->mti_ctxt,
341                                     mdt_object_child(parent),
342                                     rr->rr_name,
343                                     mdt_object_child(child),
344                                     rr->rr_tgt, rr->rr_eadata,
345                                     rr->rr_eadatalen, &info->mti_attr);
346                 intent_set_disposition(ldlm_rep, DISP_OPEN_CREATE);
347                 if (result != 0)
348                         GOTO(out_child, result);
349                 created = 1;
350         }
351
352         /* Open it now. */
353         result = mdt_mfd_open(info, parent, child, la->la_flags, created);
354         GOTO(finish_open, result);
355
356 finish_open:
357         if (result != 0 && created) {
358                 int rc2 = mdo_unlink(info->mti_ctxt, mdt_object_child(parent),
359                                      mdt_object_child(child), rr->rr_name,
360                                      &info->mti_attr);
361                 if (rc2 != 0)
362                         CERROR("error in cleanup of open");
363         }
364 out_child:
365         mdt_object_put(info->mti_ctxt, child);
366 out_parent:
367         mdt_object_unlock_put(info, parent, lh, result);
368 out:
369         return result;
370 }
371
372 void mdt_mfd_close(const struct lu_context *ctxt,
373                    struct mdt_file_data *mfd)
374 {
375         ENTRY;
376
377         if (mfd->mfd_mode & FMODE_WRITE) {
378                 /*mdt_put_write_access*/
379         } else if (mfd->mfd_mode & MDS_FMODE_EXEC) {
380                 /*mdt_allow_write_access*/
381         }
382
383         /* release reference on this object.
384          * it will be destroyed by lower layer if necessary.
385          */
386         mdt_object_put(ctxt, mfd->mfd_object);
387
388         mdt_mfd_free(mfd);
389         EXIT;
390 }
391
392 int mdt_close(struct mdt_thread_info *info)
393 {
394         struct md_attr         *ma = &info->mti_attr;
395         struct mdt_export_data *med;
396         struct mdt_file_data   *mfd;
397         struct mdt_object      *o;
398         int rc;
399         ENTRY;
400
401         med = &mdt_info_req(info)->rq_export->exp_mdt_data;
402
403         spin_lock(&med->med_open_lock);
404         mfd = mdt_handle2mfd(&(info->mti_body->handle));
405         if (mfd == NULL) {
406                 spin_unlock(&med->med_open_lock);
407                 CDEBUG(D_INODE, "no handle for file close: fid = "DFID3
408                        ": cookie = "LPX64"\n", PFID3(&info->mti_body->fid1),
409                        info->mti_body->handle.cookie);
410                 rc = -ESTALE;
411         } else {
412                 class_handle_unhash(&mfd->mfd_handle);
413                 list_del_init(&mfd->mfd_list);
414                 spin_unlock(&med->med_open_lock);
415
416                 o = mfd->mfd_object;
417                 ma->ma_lmm = req_capsule_server_get(&info->mti_pill,
418                                                     &RMF_MDT_MD);
419                 ma->ma_lmm_size = req_capsule_get_size(&info->mti_pill,
420                                                        &RMF_MDT_MD, RCL_SERVER);
421                 rc = mo_attr_get(info->mti_ctxt, mdt_object_child(o), ma);
422                 if (rc == 0)
423                         rc = mdt_handle_last_unlink(info, o, ma);
424
425                 mdt_mfd_close(info->mti_ctxt, mfd);
426         }
427         mdt_shrink_reply(info);
428         RETURN(rc);
429 }
430
431 int mdt_done_writing(struct mdt_thread_info *info)
432 {
433         int rc;
434         ENTRY;
435
436         req_capsule_set(&info->mti_pill, &RQF_MDS_DONE_WRITING);
437         rc = req_capsule_pack(&info->mti_pill);
438
439         RETURN(0);
440 }