Whamcloud - gitweb
attr_get is changed. It takes now md_attr() and does 'lov' getting inside
[fs/lustre-release.git] / lustre / mdt / mdt_open.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mdt/mdt_open.c
5  *  Lustre Metadata Target (mdt) open/close file handling
6  *
7  *  Copyright (C) 2002-2006 Cluster File Systems, Inc.
8  *   Author: Huang Hua <huanghua@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_MDS
33
34 #include "mdt_internal.h"
35
36 /* we do nothing because we do not have refcount now */
37 static void mdt_mfd_get(void *mfdp)
38 {
39 }
40
41 /* Create a new mdt_file_data struct, initialize it,
42  * and insert it to global hash table */
43 static struct mdt_file_data *mdt_mfd_new(void)
44 {
45         struct mdt_file_data *mfd;
46         ENTRY;
47
48         OBD_ALLOC_PTR(mfd);
49         if (mfd != NULL) {
50                 INIT_LIST_HEAD(&mfd->mfd_handle.h_link);
51                 INIT_LIST_HEAD(&mfd->mfd_list);
52                 class_handle_hash(&mfd->mfd_handle, mdt_mfd_get);
53         }
54         RETURN(mfd);
55 }
56
57 /* Find the mfd pointed to by handle in global hash table. */
58 static struct mdt_file_data *mdt_handle2mfd(const struct lustre_handle *handle)
59 {
60         ENTRY;
61         LASSERT(handle != NULL);
62         RETURN(class_handle2object(handle->cookie));
63 }
64
65 /* free mfd */
66 static void mdt_mfd_free(struct mdt_file_data *mfd)
67 {
68         LASSERT(list_empty(&mfd->mfd_handle.h_link));
69         LASSERT(list_empty(&mfd->mfd_list));
70         OBD_FREE_PTR(mfd);
71 }
72
73 static int mdt_mfd_open(struct mdt_thread_info *info,
74                         struct mdt_object *o,
75                         int flags, int created)
76 {
77         struct mdt_export_data *med;
78         struct mdt_file_data   *mfd;
79         struct mdt_body        *repbody;
80         struct md_attr         *ma = &info->mti_attr;
81         struct lu_attr         *la = &ma->ma_attr;
82         struct ptlrpc_request  *req = mdt_info_req(info);
83         int                     rc = 0;
84         ENTRY;
85
86         med = &req->rq_export->exp_mdt_data;
87         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
88
89         if (!created) {
90                 /* we have to get attr & lov ea for this object*/
91                 rc = mo_attr_get(info->mti_ctxt, mdt_object_child(o), ma);
92         }
93         if (rc == 0){
94                 if (!S_ISREG(la->la_mode) &&
95                     !S_ISDIR(la->la_mode) &&
96                     (req->rq_export->exp_connect_flags & OBD_CONNECT_NODEVOH ||
97                      S_ISLNK(la->la_mode)))
98                         /* If client supports this, do not return open handle
99                         *  for special nodes */
100                         RETURN(0);
101
102                 /* FIXME:maybe this can be done earlier? */
103                 if (S_ISDIR(la->la_mode)) {
104                         if (flags & (MDS_OPEN_CREAT | FMODE_WRITE)) {
105                                 /* we are trying to create or
106                                  * write an existing dir. */
107                                 rc = -EISDIR;
108                         }
109                 } else if (flags & MDS_OPEN_DIRECTORY)
110                         rc = -ENOTDIR;
111         }
112         if (rc != 0)
113                 RETURN(rc);
114         if (ma->ma_valid & MA_INODE)
115                 mdt_pack_attr2body(repbody, la, mdt_object_fid(o));
116         if (ma->ma_lmm_size && ma->ma_valid & MA_LOV) {
117                 repbody->eadatasize = ma->ma_lmm_size;
118                 if (S_ISDIR(la->la_mode))
119                         repbody->valid |= OBD_MD_FLDIREA;
120                 else
121                         repbody->valid |= OBD_MD_FLEASIZE;
122         }
123
124         if (flags & FMODE_WRITE) {
125                 /*mds_get_write_access*/
126         } else if (flags & MDS_FMODE_EXEC) {
127                 /*mds_deny_write_access*/
128         }
129
130         /* (1) client wants transno when open to keep a ref count for replay;
131          *     see after_reply() and mdc_close_commit();
132          * (2) we need to record the transaction related stuff onto disk;
133          * But, question is: when do a rean only open, do we still need transno?
134          */
135         if (1) {
136                 struct txn_param txn;
137                 struct thandle *th;
138                 struct dt_device *dt = info->mti_mdt->mdt_bottom;
139                 txn.tp_credits = 1;
140
141                 LASSERT(dt);
142                 th = dt->dd_ops->dt_trans_start(info->mti_ctxt, dt, &txn);
143                 if (!IS_ERR(th)) 
144                         dt->dd_ops->dt_trans_stop(info->mti_ctxt, th);
145                 else 
146                         RETURN(PTR_ERR(th));
147         }
148
149         mfd = mdt_mfd_new();
150         if (mfd != NULL) {
151                 /* keep a reference on this object for this open,
152                 * and is released by mdt_mfd_close() */
153                 mdt_object_get(info->mti_ctxt, o);
154
155                 mfd->mfd_mode = flags;
156                 mfd->mfd_object = o;
157                 mfd->mfd_xid = mdt_info_req(info)->rq_xid;
158
159                 spin_lock(&med->med_open_lock);
160                 list_add(&mfd->mfd_list, &med->med_open_head);
161                 spin_unlock(&med->med_open_lock);
162
163                 repbody->handle.cookie = mfd->mfd_handle.h_cookie;
164         } else
165                 rc = -ENOMEM;
166
167         RETURN(rc);
168 }
169
170 int mdt_open_by_fid(struct mdt_thread_info* info, const struct lu_fid *fid,
171                     __u32 flags)
172 {
173         struct mdt_object *o;
174         struct lu_attr    *la = &info->mti_attr.ma_attr;
175         int                rc;
176         ENTRY;
177
178         o = mdt_object_find(info->mti_ctxt, info->mti_mdt, fid);
179         if (!IS_ERR(o)) {
180                 if (mdt_object_exists(info->mti_ctxt, &o->mot_obj.mo_lu) > 0) {
181                         if (la->la_flags & MDS_OPEN_EXCL &&
182                             la->la_flags & MDS_OPEN_CREAT)
183                                 rc = -EEXIST;
184                         else
185                                 rc = mdt_mfd_open(info, o, flags, 0);
186                 } else {
187                         rc = -ENOENT;
188                         if (la->la_flags & MDS_OPEN_CREAT) {
189                                 rc = mo_object_create(info->mti_ctxt,
190                                                       mdt_object_child(o),
191                                                       &info->mti_attr);
192                                 if (rc == 0)
193                                         rc = mdt_mfd_open(info, o, flags, 1);
194                         }
195                 }
196                 mdt_object_put(info->mti_ctxt, o);
197         } else
198                 rc = PTR_ERR(o);
199
200         RETURN(rc);
201 }
202
203 int mdt_pin(struct mdt_thread_info* info)
204 {
205         struct mdt_body *body;
206         int rc;
207         ENTRY;
208
209         rc = req_capsule_pack(&info->mti_pill);
210         if (rc == 0) {
211                 body = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
212                 rc = mdt_open_by_fid(info, &body->fid1, body->flags);
213         }
214         RETURN(rc);
215 }
216
217 int mdt_reint_open(struct mdt_thread_info *info)
218 {
219         struct mdt_device      *mdt = info->mti_mdt;
220         struct mdt_object      *parent;
221         struct mdt_object      *child;
222         struct mdt_lock_handle *lh;
223         struct ldlm_reply      *ldlm_rep;
224         struct lu_fid          *child_fid = &info->mti_tmp_fid1;
225         struct md_attr         *ma = &info->mti_attr;
226         struct lu_attr         *la = &ma->ma_attr;
227         int                     result;
228         int                     created = 0;
229         struct mdt_reint_record *rr = &info->mti_rr;
230         ENTRY;
231
232         ma->ma_lmm = req_capsule_server_get(&info->mti_pill,
233                                             &RMF_MDT_MD);
234         ma->ma_lmm_size = req_capsule_get_size(&info->mti_pill,
235                                                &RMF_MDT_MD,
236                                                RCL_SERVER);
237
238         if (rr->rr_name[0] == 0) {
239                 /* reint partial remote open */
240                 RETURN(mdt_open_by_fid(info, rr->rr_fid1, la->la_flags));
241         }
242
243         /* we now have no resent message, so it must be an intent */
244         /*TODO: remove this and add MDS_CHECK_RESENT if resent enabled*/
245         LASSERT(info->mti_pill.rc_fmt == &RQF_LDLM_INTENT_OPEN);
246
247         ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
248
249         intent_set_disposition(ldlm_rep, DISP_LOOKUP_EXECD);
250         lh = &info->mti_lh[MDT_LH_PARENT];
251         if (!(la->la_flags & MDS_OPEN_CREAT))
252                 lh->mlh_mode = LCK_CR;
253         else
254                 lh->mlh_mode = LCK_EX;
255         parent = mdt_object_find_lock(info, rr->rr_fid1, lh,
256                                       MDS_INODELOCK_UPDATE);
257         if (IS_ERR(parent))
258                 GOTO(out, result = PTR_ERR(parent));
259
260         result = mdo_lookup(info->mti_ctxt, mdt_object_child(parent),
261                             rr->rr_name, child_fid);
262         if (result != 0 && result != -ENOENT)
263                 GOTO(out_parent, result);
264
265         if (result == -ENOENT) {
266                 intent_set_disposition(ldlm_rep, DISP_LOOKUP_NEG);
267                 if (!(la->la_flags & MDS_OPEN_CREAT))
268                         GOTO(out_parent, result);
269                 *child_fid = *info->mti_rr.rr_fid2;
270                 /* new object will be created. see the following */
271         } else {
272                 intent_set_disposition(ldlm_rep, DISP_LOOKUP_POS);
273                 if (la->la_flags & MDS_OPEN_EXCL &&
274                     la->la_flags & MDS_OPEN_CREAT)
275                         GOTO(out_parent, result = -EEXIST);
276         }
277
278         child = mdt_object_find(info->mti_ctxt, mdt, child_fid);
279         if (IS_ERR(child))
280                 GOTO(out_parent, result = PTR_ERR(child));
281
282         if (result == -ENOENT) {
283                 /* not found and with MDS_OPEN_CREAT: let's create it */
284                 result = mdo_create(info->mti_ctxt,
285                                     mdt_object_child(parent),
286                                     rr->rr_name,
287                                     mdt_object_child(child),
288                                     rr->rr_tgt,
289                                     &info->mti_attr);
290                 intent_set_disposition(ldlm_rep, DISP_OPEN_CREATE);
291                 if (result != 0)
292                         GOTO(out_child, result);
293                 created = 1;
294         }
295
296         /* Open it now. */
297         result = mdt_mfd_open(info, child, la->la_flags, created);
298         intent_set_disposition(ldlm_rep, DISP_OPEN_OPEN);
299         GOTO(finish_open, result);
300
301 finish_open:
302         if (result != 0 && created) {
303                 int rc2 = mdo_unlink(info->mti_ctxt, mdt_object_child(parent),
304                                      mdt_object_child(child), rr->rr_name,
305                                      &info->mti_attr);
306                 if (rc2 != 0)
307                         CERROR("error in cleanup of open");
308         }
309 out_child:
310         mdt_object_put(info->mti_ctxt, child);
311 out_parent:
312         mdt_object_unlock_put(info, parent, lh, result);
313 out:
314         return result;
315 }
316
317 void mdt_mfd_close(const struct lu_context *ctxt,
318                    struct mdt_file_data *mfd)
319 {
320         ENTRY;
321
322         if (mfd->mfd_mode & FMODE_WRITE) {
323                 /*mdt_put_write_access*/
324         } else if (mfd->mfd_mode & MDS_FMODE_EXEC) {
325                 /*mdt_allow_write_access*/
326         }
327
328         /* release reference on this object.
329          * it will be destroyed by lower layer if necessary.
330          */
331         mdt_object_put(ctxt, mfd->mfd_object);
332
333         mdt_mfd_free(mfd);
334         EXIT;
335 }
336
337 int mdt_close(struct mdt_thread_info *info)
338 {
339         struct md_attr         *ma = &info->mti_attr;
340         struct mdt_export_data *med;
341         struct mdt_file_data   *mfd;
342         struct mdt_object      *o;
343         int rc;
344         ENTRY;
345
346         med = &mdt_info_req(info)->rq_export->exp_mdt_data;
347
348         spin_lock(&med->med_open_lock);
349         mfd = mdt_handle2mfd(&(info->mti_body->handle));
350         if (mfd == NULL) {
351                 spin_unlock(&med->med_open_lock);
352                 CDEBUG(D_INODE, "no handle for file close: fid = "DFID3
353                        ": cookie = "LPX64"\n", PFID3(&info->mti_body->fid1),
354                        info->mti_body->handle.cookie);
355                 rc = -ESTALE;
356         } else {
357                 class_handle_unhash(&mfd->mfd_handle);
358                 list_del_init(&mfd->mfd_list);
359                 spin_unlock(&med->med_open_lock);
360
361                 o = mfd->mfd_object;
362                 ma->ma_lmm = req_capsule_server_get(&info->mti_pill, 
363                                                     &RMF_MDT_MD);
364                 ma->ma_lmm_size = req_capsule_get_size(&info->mti_pill,
365                                                        &RMF_MDT_MD, RCL_SERVER);
366                 rc = mo_attr_get(info->mti_ctxt, mdt_object_child(o), ma);
367                 if (rc == 0)
368                         rc = mdt_handle_last_unlink(info, o);
369
370                 mdt_mfd_close(info->mti_ctxt, mfd);
371         }
372         mdt_shrink_reply(info);
373         RETURN(rc);
374 }
375
376 int mdt_done_writing(struct mdt_thread_info *info)
377 {
378         int rc;
379         ENTRY;
380
381         req_capsule_set(&info->mti_pill, &RQF_MDS_DONE_WRITING);
382         rc = req_capsule_pack(&info->mti_pill);
383
384         RETURN(0);
385 }