Whamcloud - gitweb
Branch: b_new_cmd
[fs/lustre-release.git] / lustre / mdt / mdt_open.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mdt/mdt_open.c
5  *  Lustre Metadata Target (mdt) open/close file handling
6  *
7  *  Copyright (C) 2002-2006 Cluster File Systems, Inc.
8  *   Author: Huang Hua <huanghua@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_MDS
33
34 #include <lustre_mds.h>
35 #include "mdt_internal.h"
36
37 /* we do nothing because we do not have refcount now */
38 static void mdt_mfd_get(void *mfdp)
39 {
40 }
41
42 /* Create a new mdt_file_data struct, initialize it,
43  * and insert it to global hash table */
44 static struct mdt_file_data *mdt_mfd_new(void)
45 {
46         struct mdt_file_data *mfd;
47         ENTRY;
48
49         OBD_ALLOC_PTR(mfd);
50         if (mfd != NULL) {
51                 INIT_LIST_HEAD(&mfd->mfd_handle.h_link);
52                 INIT_LIST_HEAD(&mfd->mfd_list);
53                 class_handle_hash(&mfd->mfd_handle, mdt_mfd_get);
54         }
55         RETURN(mfd);
56 }
57
58 /* Find the mfd pointed to by handle in global hash table. */
59 static struct mdt_file_data *mdt_handle2mfd(const struct lustre_handle *handle)
60 {
61         ENTRY;
62         LASSERT(handle != NULL);
63         RETURN(class_handle2object(handle->cookie));
64 }
65
66 /* free mfd */
67 static void mdt_mfd_free(struct mdt_file_data *mfd)
68 {
69         LASSERT(list_empty(&mfd->mfd_handle.h_link));
70         LASSERT(list_empty(&mfd->mfd_list));
71         OBD_FREE_PTR(mfd);
72 }
73
74 static int mdt_create_data_obj(struct mdt_thread_info *info, 
75                               struct mdt_object *p, struct mdt_object *o)
76 {
77         struct md_attr   *ma = &info->mti_attr;
78         struct mdt_reint_record *mrr = &info->mti_rr;
79
80         return mdo_create_data_object(info->mti_ctxt, mdt_object_child(p),
81                                  mdt_object_child(o), mrr->rr_eadata, 
82                                  mrr->rr_eadatalen, ma);
83 }
84
85 static int mdt_mfd_open(struct mdt_thread_info *info,
86                         struct mdt_object *p,
87                         struct mdt_object *o,
88                         int flags, int created)
89 {
90         struct mdt_export_data *med;
91         struct mdt_file_data   *mfd;
92         struct mdt_body        *repbody;
93         struct md_attr         *ma = &info->mti_attr;
94         struct lu_attr         *la = &ma->ma_attr;
95         struct ptlrpc_request  *req = mdt_info_req(info);
96         int                     rc = 0;
97         ENTRY;
98
99         med = &req->rq_export->exp_mdt_data;
100         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
101
102         if (!created) {
103                 /* we have to get attr & lov ea for this object*/
104                 rc = mo_attr_get(info->mti_ctxt, mdt_object_child(o), ma);
105         }
106         if (rc == 0){
107                 if (ma->ma_valid & MA_INODE)
108                         mdt_pack_attr2body(repbody, la, mdt_object_fid(o));
109
110                 if (!S_ISREG(la->la_mode) &&
111                     !S_ISDIR(la->la_mode) &&
112                     (req->rq_export->exp_connect_flags & OBD_CONNECT_NODEVOH))
113                         /* If client supports this, do not return open handle
114                         *  for special nodes */
115                         RETURN(0);
116
117                 if ((S_ISREG(la->la_mode) || S_ISDIR(la->la_mode))
118                      && !created && !(ma->ma_valid & MA_LOV)) {
119                         /*No EA, check whether it is will set regEA and dirEA
120                          *since in above attr get, these size might be zero,
121                          *so reset it, to retrieve the MD after create obj*/
122
123                         LASSERT(p != NULL);
124                         rc = mdt_create_data_obj(info, p, o);
125                         if (rc)
126                                 RETURN(rc);
127                 }
128                 /* FIXME:maybe this can be done earlier? */
129                 if (S_ISDIR(la->la_mode)) {
130                         if (flags & (MDS_OPEN_CREAT | FMODE_WRITE)) {
131                                 /* we are trying to create or
132                                  * write an existing dir. */
133                                 rc = -EISDIR;
134                         }
135                 } else if (flags & MDS_OPEN_DIRECTORY)
136                         rc = -ENOTDIR;
137         }
138         if (rc != 0)
139                 RETURN(rc);
140
141         CDEBUG(D_INODE, "after open, ma_valid bit = "LPX64"\n", ma->ma_valid);
142         CDEBUG(D_INODE, "after open, lmm_size = %d\n", ma->ma_lmm_size);
143         repbody->eadatasize = 0;
144         repbody->aclsize = 0;
145
146         if (ma->ma_lmm_size && ma->ma_valid & MA_LOV) {
147                 repbody->eadatasize = ma->ma_lmm_size;
148                 if (S_ISDIR(la->la_mode))
149                         repbody->valid |= OBD_MD_FLDIREA;
150                 else
151                         repbody->valid |= OBD_MD_FLEASIZE;
152         }
153
154         /*FIXME: should determine the offset dynamicly */
155         lustre_shrink_reply(req, 2, repbody->eadatasize, 1);
156         lustre_shrink_reply(req, repbody->eadatasize ? 3 : 2, repbody->aclsize, 0);
157
158         if (flags & FMODE_WRITE) {
159                 /*mds_get_write_access*/
160         } else if (flags & MDS_FMODE_EXEC) {
161                 /*mds_deny_write_access*/
162         }
163
164         /* (1) client wants transno when open to keep a ref count for replay;
165          *     see after_reply() and mdc_close_commit();
166          * (2) we need to record the transaction related stuff onto disk;
167          * But, question is: when do a rean only open, do we still need transno?
168          */
169         if (!created) {
170                 struct txn_param txn;
171                 struct thandle *th;
172                 struct dt_device *dt = info->mti_mdt->mdt_bottom;
173                 txn.tp_credits = 1;
174
175                 LASSERT(dt);
176                 th = dt->dd_ops->dt_trans_start(info->mti_ctxt, dt, &txn);
177                 if (!IS_ERR(th)) 
178                         dt->dd_ops->dt_trans_stop(info->mti_ctxt, th);
179                 else 
180                         RETURN(PTR_ERR(th));
181         }
182
183         mfd = mdt_mfd_new();
184         if (mfd != NULL) {
185                 /* keep a reference on this object for this open,
186                 * and is released by mdt_mfd_close() */
187                 mdt_object_get(info->mti_ctxt, o);
188
189                 mfd->mfd_mode = flags;
190                 mfd->mfd_object = o;
191                 mfd->mfd_xid = mdt_info_req(info)->rq_xid;
192
193                 spin_lock(&med->med_open_lock);
194                 list_add(&mfd->mfd_list, &med->med_open_head);
195                 spin_unlock(&med->med_open_lock);
196
197                 repbody->handle.cookie = mfd->mfd_handle.h_cookie;
198         } else
199                 rc = -ENOMEM;
200
201         RETURN(rc);
202 }
203
204 int mdt_open_by_fid(struct mdt_thread_info* info, const struct lu_fid *fid,
205                     __u32 flags)
206 {
207         struct mdt_object *o;
208         struct lu_attr    *la = &info->mti_attr.ma_attr;
209         int                rc;
210         ENTRY;
211
212         o = mdt_object_find(info->mti_ctxt, info->mti_mdt, fid);
213         if (!IS_ERR(o)) {
214                 if (mdt_object_exists(info->mti_ctxt, &o->mot_obj.mo_lu) > 0) {
215                         if (la->la_flags & MDS_OPEN_EXCL &&
216                             la->la_flags & MDS_OPEN_CREAT)
217                                 rc = -EEXIST;
218                         else
219                                 rc = mdt_mfd_open(info, NULL, o, flags, 0);
220                 } else {
221                         rc = -ENOENT;
222                         if (la->la_flags & MDS_OPEN_CREAT) {
223                                 rc = mo_object_create(info->mti_ctxt,
224                                                       mdt_object_child(o),
225                                                       &info->mti_attr);
226                                 if (rc == 0)
227                                         rc = mdt_mfd_open(info, NULL, o, flags, 1);
228                         }
229                 }
230                 mdt_object_put(info->mti_ctxt, o);
231         } else
232                 rc = PTR_ERR(o);
233
234         RETURN(rc);
235 }
236
237 int mdt_pin(struct mdt_thread_info* info)
238 {
239         struct mdt_body *body;
240         int rc;
241         ENTRY;
242
243         rc = req_capsule_pack(&info->mti_pill);
244         if (rc == 0) {
245                 body = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
246                 rc = mdt_open_by_fid(info, &body->fid1, body->flags);
247         }
248         RETURN(rc);
249 }
250
251 int mdt_reint_open(struct mdt_thread_info *info)
252 {
253         struct mdt_device      *mdt = info->mti_mdt;
254         struct mdt_object      *parent;
255         struct mdt_object      *child;
256         struct mdt_lock_handle *lh;
257         struct ldlm_reply      *ldlm_rep;
258         struct lu_fid          *child_fid = &info->mti_tmp_fid1;
259         struct md_attr         *ma = &info->mti_attr;
260         struct lu_attr         *la = &ma->ma_attr;
261         int                     result;
262         int                     created = 0;
263         struct mdt_reint_record *rr = &info->mti_rr;
264         ENTRY;
265
266         req_capsule_set_size(&info->mti_pill, &RMF_MDT_MD, RCL_SERVER,
267                              mdt->mdt_max_mdsize);
268
269         req_capsule_set_size(&info->mti_pill, &RMF_EADATA, RCL_SERVER,
270                              LUSTRE_POSIX_ACL_MAX_SIZE);
271
272         result = req_capsule_pack(&info->mti_pill);
273         if (result)
274                 RETURN(result);
275
276         ma->ma_lmm = req_capsule_server_get(&info->mti_pill,
277                                             &RMF_MDT_MD);
278         ma->ma_lmm_size = req_capsule_get_size(&info->mti_pill,
279                                                &RMF_MDT_MD,
280                                                RCL_SERVER);
281
282         if (rr->rr_name[0] == 0) {
283                 /* reint partial remote open */
284                 RETURN(mdt_open_by_fid(info, rr->rr_fid1, la->la_flags));
285         }
286
287         /* we now have no resent message, so it must be an intent */
288         /*TODO: remove this and add MDS_CHECK_RESENT if resent enabled*/
289         LASSERT(info->mti_pill.rc_fmt == &RQF_LDLM_INTENT_OPEN);
290
291         ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
292
293         intent_set_disposition(ldlm_rep, DISP_LOOKUP_EXECD);
294         lh = &info->mti_lh[MDT_LH_PARENT];
295         if (!(la->la_flags & MDS_OPEN_CREAT))
296                 lh->mlh_mode = LCK_CR;
297         else
298                 lh->mlh_mode = LCK_EX;
299         parent = mdt_object_find_lock(info, rr->rr_fid1, lh,
300                                       MDS_INODELOCK_UPDATE);
301         if (IS_ERR(parent))
302                 GOTO(out, result = PTR_ERR(parent));
303
304         result = mdo_lookup(info->mti_ctxt, mdt_object_child(parent),
305                             rr->rr_name, child_fid);
306         if (result != 0 && result != -ENOENT)
307                 GOTO(out_parent, result);
308
309         if (result == -ENOENT) {
310                 intent_set_disposition(ldlm_rep, DISP_LOOKUP_NEG);
311                 if (!(la->la_flags & MDS_OPEN_CREAT))
312                         GOTO(out_parent, result);
313                 *child_fid = *info->mti_rr.rr_fid2;
314                 /* new object will be created. see the following */
315         } else {
316                 intent_set_disposition(ldlm_rep, DISP_LOOKUP_POS);
317                 if ((la->la_flags & MDS_OPEN_EXCL &&
318                          la->la_flags & MDS_OPEN_CREAT))
319                         GOTO(out_parent, result = -EEXIST);
320         }
321
322         child = mdt_object_find(info->mti_ctxt, mdt, child_fid);
323         if (IS_ERR(child))
324                 GOTO(out_parent, result = PTR_ERR(child));
325
326         if (result == -ENOENT) {
327                 /* not found and with MDS_OPEN_CREAT: let's create it */
328                 result = mdo_create(info->mti_ctxt,
329                                     mdt_object_child(parent),
330                                     rr->rr_name,
331                                     mdt_object_child(child),
332                                     rr->rr_tgt, rr->rr_eadata,
333                                     rr->rr_eadatalen, &info->mti_attr);
334                 intent_set_disposition(ldlm_rep, DISP_OPEN_CREATE);
335                 if (result != 0)
336                         GOTO(out_child, result);
337                 created = 1;
338         }
339
340         /* Open it now. */
341         result = mdt_mfd_open(info, parent, child, la->la_flags, created);
342         intent_set_disposition(ldlm_rep, DISP_OPEN_OPEN);
343         GOTO(finish_open, result);
344
345 finish_open:
346         if (result != 0 && created) {
347                 int rc2 = mdo_unlink(info->mti_ctxt, mdt_object_child(parent),
348                                      mdt_object_child(child), rr->rr_name,
349                                      &info->mti_attr);
350                 if (rc2 != 0)
351                         CERROR("error in cleanup of open");
352         }
353 out_child:
354         mdt_object_put(info->mti_ctxt, child);
355 out_parent:
356         mdt_object_unlock_put(info, parent, lh, result);
357 out:
358         return result;
359 }
360
361 void mdt_mfd_close(const struct lu_context *ctxt,
362                    struct mdt_file_data *mfd)
363 {
364         ENTRY;
365
366         if (mfd->mfd_mode & FMODE_WRITE) {
367                 /*mdt_put_write_access*/
368         } else if (mfd->mfd_mode & MDS_FMODE_EXEC) {
369                 /*mdt_allow_write_access*/
370         }
371
372         /* release reference on this object.
373          * it will be destroyed by lower layer if necessary.
374          */
375         mdt_object_put(ctxt, mfd->mfd_object);
376
377         mdt_mfd_free(mfd);
378         EXIT;
379 }
380
381 int mdt_close(struct mdt_thread_info *info)
382 {
383         struct md_attr         *ma = &info->mti_attr;
384         struct mdt_export_data *med;
385         struct mdt_file_data   *mfd;
386         struct mdt_object      *o;
387         int rc;
388         ENTRY;
389
390         med = &mdt_info_req(info)->rq_export->exp_mdt_data;
391
392         spin_lock(&med->med_open_lock);
393         mfd = mdt_handle2mfd(&(info->mti_body->handle));
394         if (mfd == NULL) {
395                 spin_unlock(&med->med_open_lock);
396                 CDEBUG(D_INODE, "no handle for file close: fid = "DFID3
397                        ": cookie = "LPX64"\n", PFID3(&info->mti_body->fid1),
398                        info->mti_body->handle.cookie);
399                 rc = -ESTALE;
400         } else {
401                 class_handle_unhash(&mfd->mfd_handle);
402                 list_del_init(&mfd->mfd_list);
403                 spin_unlock(&med->med_open_lock);
404
405                 o = mfd->mfd_object;
406                 ma->ma_lmm = req_capsule_server_get(&info->mti_pill, 
407                                                     &RMF_MDT_MD);
408                 ma->ma_lmm_size = req_capsule_get_size(&info->mti_pill,
409                                                        &RMF_MDT_MD, RCL_SERVER);
410                 rc = mo_attr_get(info->mti_ctxt, mdt_object_child(o), ma);
411                 if (rc == 0)
412                         rc = mdt_handle_last_unlink(info, o);
413
414                 mdt_mfd_close(info->mti_ctxt, mfd);
415         }
416         mdt_shrink_reply(info);
417         RETURN(rc);
418 }
419
420 int mdt_done_writing(struct mdt_thread_info *info)
421 {
422         int rc;
423         ENTRY;
424
425         req_capsule_set(&info->mti_pill, &RQF_MDS_DONE_WRITING);
426         rc = req_capsule_pack(&info->mti_pill);
427
428         RETURN(0);
429 }