Whamcloud - gitweb
2f05e257bc943b26234741a53541c5c18f169a02
[fs/lustre-release.git] / lustre / mdt / mdt_open.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mdt/mdt_open.c
5  *  Lustre Metadata Target (mdt) open/close file handling
6  *
7  *  Copyright (C) 2002-2006 Cluster File Systems, Inc.
8  *   Author: Huang Hua <huanghua@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_MDS
33
34 #include <linux/lustre_acl.h>
35 #include <lustre_mds.h>
36 #include "mdt_internal.h"
37
38 /* we do nothing because we do not have refcount now */
39 static void mdt_mfd_get(void *mfdp)
40 {
41 }
42
43 /* Create a new mdt_file_data struct, initialize it,
44  * and insert it to global hash table */
45 static struct mdt_file_data *mdt_mfd_new(void)
46 {
47         struct mdt_file_data *mfd;
48         ENTRY;
49
50         OBD_ALLOC_PTR(mfd);
51         if (mfd != NULL) {
52                 INIT_LIST_HEAD(&mfd->mfd_handle.h_link);
53                 INIT_LIST_HEAD(&mfd->mfd_list);
54                 class_handle_hash(&mfd->mfd_handle, mdt_mfd_get);
55         }
56         RETURN(mfd);
57 }
58
59 /* Find the mfd pointed to by handle in global hash table. */
60 static struct mdt_file_data *mdt_handle2mfd(const struct lustre_handle *handle)
61 {
62         ENTRY;
63         LASSERT(handle != NULL);
64         RETURN(class_handle2object(handle->cookie));
65 }
66
67 /* free mfd */
68 static void mdt_mfd_free(struct mdt_file_data *mfd)
69 {
70         LASSERT(list_empty(&mfd->mfd_handle.h_link));
71         LASSERT(list_empty(&mfd->mfd_list));
72         OBD_FREE_PTR(mfd);
73 }
74
75 static int mdt_create_data_obj(struct mdt_thread_info *info,
76                               struct mdt_object *p, struct mdt_object *o)
77 {
78         struct md_attr   *ma = &info->mti_attr;
79         struct mdt_reint_record *mrr = &info->mti_rr;
80
81         return mdo_create_data(info->mti_ctxt, mdt_object_child(p),
82                                mdt_object_child(o), mrr->rr_eadata,
83                                mrr->rr_eadatalen, ma);
84 }
85
86
87 /*The following four functions are copied from MDS */
88
89 /* Write access to a file: executors cause a negative count,
90  * writers a positive count.  The semaphore is needed to perform
91  * a check for the sign and then increment or decrement atomically.
92  *
93  * This code is closely tied to the allocation of the d_fsdata and the
94  * MDS epoch, so we use the same semaphore for the whole lot.
95  *
96  * FIXME and TODO : handle the epoch!
97  * epoch argument is nonzero during recovery */
98 static int mdt_get_write_access(struct mdt_device *mdt, struct mdt_object *o,
99                                 __u64 epoch)
100 {
101         int rc = 0;
102         ENTRY;
103
104         spin_lock(&mdt->mdt_epoch_lock);
105
106         if (atomic_read(&o->mot_writecount) < 0) {
107                 rc = -ETXTBSY;
108         } else {
109                 if (o->mot_io_epoch != 0) {
110                         CDEBUG(D_INODE, "continue epoch "LPU64" for "DFID3"\n",
111                                o->mot_io_epoch, PFID3(mdt_object_fid(o)));
112                 } else {
113                         if (epoch > mdt->mdt_io_epoch)
114                                 mdt->mdt_io_epoch = epoch;
115                         else
116                                 mdt->mdt_io_epoch++;
117                         o->mot_io_epoch = mdt->mdt_io_epoch;
118                         CDEBUG(D_INODE, "starting epoch "LPU64" for "DFID3"\n",
119                                mdt->mdt_io_epoch, PFID3(mdt_object_fid(o)));
120                 }
121                 atomic_inc(&o->mot_writecount);
122         }
123         spin_unlock(&mdt->mdt_epoch_lock);
124         RETURN(rc);
125 }
126
127 static int mdt_put_write_access(struct mdt_device *mdt, struct mdt_object *o)
128 {
129         int rc;
130         ENTRY;
131
132         spin_lock(&mdt->mdt_epoch_lock);
133         atomic_dec(&o->mot_writecount);
134         rc = atomic_read(&o->mot_writecount);
135         if (rc == 0)
136                 o->mot_io_epoch = 0;
137         spin_unlock(&mdt->mdt_epoch_lock);
138         RETURN(rc);
139 }
140
141 static int mdt_deny_write_access(struct mdt_device *mdt, struct mdt_object *o)
142 {
143         int rc = 0;
144         ENTRY;
145         spin_lock(&mdt->mdt_epoch_lock);
146         if (atomic_read(&o->mot_writecount) > 0) {
147                 rc = -ETXTBSY;
148         } else
149                 atomic_dec(&o->mot_writecount);
150         spin_unlock(&mdt->mdt_epoch_lock);
151         RETURN(rc);
152 }
153
154 static void mdt_allow_write_access(struct mdt_object *o)
155 {
156         ENTRY;
157         atomic_inc(&o->mot_writecount);
158         EXIT;
159 }
160
161 int mdt_query_write_access(struct mdt_object *o)
162 {
163         ENTRY;
164         RETURN(atomic_read(&o->mot_writecount));
165 }
166
167 static int mdt_mfd_open(struct mdt_thread_info *info,
168                         struct mdt_object *p,
169                         struct mdt_object *o,
170                         int flags, int created)
171 {
172         struct mdt_export_data *med;
173         struct mdt_file_data   *mfd;
174         struct mdt_device      *mdt = info->mti_mdt;
175         struct mdt_body        *repbody;
176         struct md_attr         *ma = &info->mti_attr;
177         struct lu_attr         *la = &ma->ma_attr;
178         struct ptlrpc_request  *req = mdt_info_req(info);
179         struct ldlm_reply      *ldlm_rep;
180         int                     rc = 0;
181         int                     isreg, isdir, islnk;
182         ENTRY;
183
184         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
185
186         if (!created) {
187                 /* we have to get attr & lov ea for this object*/
188                 rc = mo_attr_get(info->mti_ctxt, mdt_object_child(o), ma);
189                 if (rc)
190                         RETURN(rc);
191         }
192         isreg = S_ISREG(la->la_mode);
193         isdir = S_ISDIR(la->la_mode);
194         islnk = S_ISLNK(la->la_mode);
195         if (ma->ma_valid & MA_INODE)
196                 mdt_pack_attr2body(repbody, la, mdt_object_fid(o));
197
198         /* if we are following a symlink, don't open
199          * do not return open handle for special nodes as client required
200          */
201         if (islnk || (!isreg && !isdir &&
202             (req->rq_export->exp_connect_flags & OBD_CONNECT_NODEVOH))) {
203                 info->mti_trans_flags |= MDT_NONEED_TANSNO; 
204                 RETURN(0);
205         }
206         /* FIXME:maybe this can be done earlier? */
207         if (isdir) {
208                 if (flags & (MDS_OPEN_CREAT | FMODE_WRITE)) {
209                         /* we are trying to create or
210                          * write an existing dir. */
211                         RETURN(-EISDIR);
212                 }
213         } else if (flags & MDS_OPEN_DIRECTORY)
214                 RETURN(-ENOTDIR);
215
216         if ((isreg) && !(ma->ma_valid & MA_LOV)) {
217                 /*No EA, check whether it is will set regEA and dirEA
218                  *since in above attr get, these size might be zero,
219                  *so reset it, to retrieve the MD after create obj*/
220                 ma->ma_lmm_size = req_capsule_get_size(&info->mti_pill,
221                                                        &RMF_MDT_MD,
222                                                        RCL_SERVER);
223                 LASSERT(p != NULL);
224                 /*XXX: Tom, do we need this?
225                 rc = mdt_create_data_obj(info, p, o);
226                 if (rc)
227                         RETURN(rc);
228                 */
229         }
230
231         CDEBUG(D_INODE, "after open, ma_valid bit = "LPX64" lmm_size = %d\n", 
232                         ma->ma_valid, ma->ma_lmm_size);
233         repbody->eadatasize = 0;
234         repbody->aclsize = 0;
235
236         if (ma->ma_lmm_size && ma->ma_valid & MA_LOV) {
237                 repbody->eadatasize = ma->ma_lmm_size;
238                 if (isdir)
239                         repbody->valid |= OBD_MD_FLDIREA;
240                 else
241                         repbody->valid |= OBD_MD_FLEASIZE;
242         }
243         /*FIXME: should determine the offset dynamicly, 
244          *did not get ACL before shrink*/
245         lustre_shrink_reply(req, 2, repbody->eadatasize, 1);
246         lustre_shrink_reply(req, repbody->eadatasize ? 3 : 2, repbody->aclsize,
247                             0);
248
249         ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
250         intent_set_disposition(ldlm_rep, DISP_OPEN_OPEN);
251
252         if (flags & FMODE_WRITE) {
253                 /* FIXME: in recovery, need to pass old epoch here */
254                 rc = mdt_get_write_access(mdt, o, 0);
255                 if (rc == 0)
256                         repbody->io_epoch = o->mot_io_epoch;
257         } else if (flags & MDS_FMODE_EXEC) {
258                 rc = mdt_deny_write_access(mdt, o);
259         }
260         if (rc)
261                 RETURN(rc);
262
263         /* (1) client wants transno when open to keep a ref count for replay;
264          *     see after_reply() and mdc_close_commit();
265          * (2) we need to record the transaction related stuff onto disk;
266          * But, question is: when do a rean only open, do we still need transno?
267          */
268         if (!created) {
269                 struct txn_param txn;
270                 struct thandle *th;
271                 struct dt_device *dt = info->mti_mdt->mdt_bottom;
272                 txn.tp_credits = 1;
273
274                 LASSERT(dt);
275                 th = dt->dd_ops->dt_trans_start(info->mti_ctxt, dt, &txn);
276                 if (!IS_ERR(th))
277                         dt->dd_ops->dt_trans_stop(info->mti_ctxt, th);
278                 else
279                         RETURN(PTR_ERR(th));
280         }
281
282         mfd = mdt_mfd_new();
283         if (mfd != NULL) {
284                 /* keep a reference on this object for this open,
285                 * and is released by mdt_mfd_close() */
286                 mdt_object_get(info->mti_ctxt, o);
287
288                 mfd->mfd_mode = flags;
289                 mfd->mfd_object = o;
290                 mfd->mfd_xid = mdt_info_req(info)->rq_xid;
291
292                 med = &req->rq_export->exp_mdt_data;
293                 spin_lock(&med->med_open_lock);
294                 list_add(&mfd->mfd_list, &med->med_open_head);
295                 spin_unlock(&med->med_open_lock);
296
297                 repbody->handle.cookie = mfd->mfd_handle.h_cookie;
298         } else
299                 rc = -ENOMEM;
300
301         RETURN(rc);
302 }
303
304 int mdt_open_by_fid(struct mdt_thread_info* info, const struct lu_fid *fid,
305                     __u32 flags)
306 {
307         struct mdt_object *o;
308         struct lu_attr    *la = &info->mti_attr.ma_attr;
309         int                rc;
310         ENTRY;
311
312         o = mdt_object_find(info->mti_ctxt, info->mti_mdt, fid);
313         if (!IS_ERR(o)) {
314                 if (mdt_object_exists(info->mti_ctxt, &o->mot_obj.mo_lu) > 0) {
315                         if (la->la_flags & MDS_OPEN_EXCL &&
316                             la->la_flags & MDS_OPEN_CREAT)
317                                 rc = -EEXIST;
318                         else
319                                 rc = mdt_mfd_open(info, NULL, o, flags, 0);
320                 } else {
321                         rc = -ENOENT;
322                         if (la->la_flags & MDS_OPEN_CREAT) {
323                                 rc = mo_object_create(info->mti_ctxt,
324                                                       mdt_object_child(o),
325                                                       &info->mti_spec,
326                                                       &info->mti_attr);
327                                 if (rc == 0)
328                                         rc = mdt_mfd_open(info, NULL, o, flags, 1);
329                         }
330                 }
331                 mdt_object_put(info->mti_ctxt, o);
332         } else
333                 rc = PTR_ERR(o);
334
335         RETURN(rc);
336 }
337
338 int mdt_pin(struct mdt_thread_info* info)
339 {
340         struct mdt_body *body;
341         int rc;
342         ENTRY;
343
344         rc = req_capsule_pack(&info->mti_pill);
345         if (rc == 0) {
346                 body = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
347                 rc = mdt_open_by_fid(info, &body->fid1, body->flags);
348         }
349         RETURN(rc);
350 }
351
352 int mdt_reint_open(struct mdt_thread_info *info)
353 {
354         struct mdt_device      *mdt = info->mti_mdt;
355         struct mdt_object      *parent;
356         struct mdt_object      *child;
357         struct mdt_lock_handle *lh;
358         struct ldlm_reply      *ldlm_rep;
359         struct lu_fid          *child_fid = &info->mti_tmp_fid1;
360         struct md_attr         *ma = &info->mti_attr;
361         struct lu_attr         *la = &ma->ma_attr;
362         int                     result;
363         int                     created = 0;
364         struct mdt_reint_record *rr = &info->mti_rr;
365         ENTRY;
366
367         req_capsule_set_size(&info->mti_pill, &RMF_MDT_MD, RCL_SERVER,
368                              mdt->mdt_max_mdsize);
369
370         result = req_capsule_pack(&info->mti_pill);
371         if (result)
372                 RETURN(result);
373
374         ma->ma_lmm = req_capsule_server_get(&info->mti_pill, &RMF_MDT_MD);
375         ma->ma_lmm_size = mdt->mdt_max_mdsize;
376
377         if (rr->rr_name[0] == 0) {
378                 /* reint partial remote open */
379                 RETURN(mdt_open_by_fid(info, rr->rr_fid1, la->la_flags));
380         }
381
382         /* we now have no resent message, so it must be an intent */
383         /*TODO: remove this and add MDS_CHECK_RESENT if resent enabled*/
384         LASSERT(info->mti_pill.rc_fmt == &RQF_LDLM_INTENT_OPEN);
385
386         CDEBUG(D_INODE, "I am going to create "DFID3"/("DFID3":%s) flag=%x\n",
387                         PFID3(rr->rr_fid1), PFID3(rr->rr_fid2), 
388                         rr->rr_name, la->la_flags);
389
390         ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
391         intent_set_disposition(ldlm_rep, DISP_LOOKUP_EXECD);
392
393         lh = &info->mti_lh[MDT_LH_PARENT];
394         if (!(la->la_flags & MDS_OPEN_CREAT))
395                 lh->mlh_mode = LCK_CR;
396         else
397                 lh->mlh_mode = LCK_EX;
398         parent = mdt_object_find_lock(info, rr->rr_fid1, lh,
399                                       MDS_INODELOCK_UPDATE);
400         if (IS_ERR(parent))
401                 GOTO(out, result = PTR_ERR(parent));
402
403         result = mdo_lookup(info->mti_ctxt, mdt_object_child(parent),
404                             rr->rr_name, child_fid);
405         if (result != 0 && result != -ENOENT)
406                 GOTO(out_parent, result);
407
408         if (result == -ENOENT) {
409                 intent_set_disposition(ldlm_rep, DISP_LOOKUP_NEG);
410                 if (!(la->la_flags & MDS_OPEN_CREAT))
411                         GOTO(out_parent, result);
412                 *child_fid = *info->mti_rr.rr_fid2;
413                 /* new object will be created. see the following */
414         } else {
415                 intent_set_disposition(ldlm_rep, DISP_LOOKUP_POS);
416                 if ((la->la_flags & MDS_OPEN_EXCL &&
417                          la->la_flags & MDS_OPEN_CREAT))
418                         GOTO(out_parent, result = -EEXIST);
419         }
420
421         child = mdt_object_find(info->mti_ctxt, mdt, child_fid);
422         if (IS_ERR(child))
423                 GOTO(out_parent, result = PTR_ERR(child));
424
425         if (result == -ENOENT) {
426                 /* not found and with MDS_OPEN_CREAT: let's create it */
427                 result = mdo_create(info->mti_ctxt,
428                                     mdt_object_child(parent),
429                                     rr->rr_name,
430                                     mdt_object_child(child),
431                                     &info->mti_spec,
432                                     /* rr->rr_tgt, rr->rr_eadata, rr->rr_eadatalen,*/
433                                     &info->mti_attr);
434                 intent_set_disposition(ldlm_rep, DISP_OPEN_CREATE);
435                 if (result != 0)
436                         GOTO(out_child, result);
437                 created = 1;
438         }
439
440         /* Open it now. */
441         result = mdt_mfd_open(info, parent, child, la->la_flags, created);
442         GOTO(finish_open, result);
443
444 finish_open:
445         if (result != 0 && created) {
446                 int rc2 = mdo_unlink(info->mti_ctxt, mdt_object_child(parent),
447                                      mdt_object_child(child), rr->rr_name,
448                                      &info->mti_attr);
449                 if (rc2 != 0)
450                         CERROR("error in cleanup of open");
451         }
452 out_child:
453         mdt_object_put(info->mti_ctxt, child);
454 out_parent:
455         mdt_object_unlock_put(info, parent, lh, result);
456 out:
457         return result;
458 }
459
460 void mdt_mfd_close(const struct lu_context *ctxt, struct mdt_device *mdt,
461                    struct mdt_file_data *mfd)
462 {
463         struct mdt_object *o = mfd->mfd_object;
464         ENTRY;
465
466         if (mfd->mfd_mode & FMODE_WRITE) {
467                 mdt_put_write_access(mdt, o);
468         } else if (mfd->mfd_mode & MDS_FMODE_EXEC) {
469                 mdt_allow_write_access(o);
470         }
471
472         /* release reference on this object.
473          * it will be destroyed by lower layer if necessary.
474          */
475         mdt_object_put(ctxt, mfd->mfd_object);
476
477         mdt_mfd_free(mfd);
478         EXIT;
479 }
480
481 int mdt_close(struct mdt_thread_info *info)
482 {
483         struct md_attr         *ma = &info->mti_attr;
484         struct mdt_export_data *med;
485         struct mdt_file_data   *mfd;
486         struct mdt_object      *o;
487         int rc;
488         ENTRY;
489
490         med = &mdt_info_req(info)->rq_export->exp_mdt_data;
491
492         spin_lock(&med->med_open_lock);
493         mfd = mdt_handle2mfd(&(info->mti_body->handle));
494         if (mfd == NULL) {
495                 spin_unlock(&med->med_open_lock);
496                 CDEBUG(D_INODE, "no handle for file close: fid = "DFID3
497                        ": cookie = "LPX64"\n", PFID3(&info->mti_body->fid1),
498                        info->mti_body->handle.cookie);
499                 rc = -ESTALE;
500         } else {
501                 class_handle_unhash(&mfd->mfd_handle);
502                 list_del_init(&mfd->mfd_list);
503                 spin_unlock(&med->med_open_lock);
504
505                 o = mfd->mfd_object;
506                 ma->ma_lmm = req_capsule_server_get(&info->mti_pill,
507                                                     &RMF_MDT_MD);
508                 ma->ma_lmm_size = req_capsule_get_size(&info->mti_pill,
509                                                        &RMF_MDT_MD, RCL_SERVER);
510                 rc = mo_attr_get(info->mti_ctxt, mdt_object_child(o), ma);
511                 if (rc == 0)
512                         rc = mdt_handle_last_unlink(info, o, ma);
513
514                 mdt_mfd_close(info->mti_ctxt, info->mti_mdt, mfd);
515         }
516         mdt_shrink_reply(info);
517         RETURN(rc);
518 }
519
520 int mdt_done_writing(struct mdt_thread_info *info)
521 {
522         int rc;
523         ENTRY;
524
525         req_capsule_set(&info->mti_pill, &RQF_MDS_DONE_WRITING);
526         rc = req_capsule_pack(&info->mti_pill);
527
528         RETURN(0);
529 }