Whamcloud - gitweb
fixed some bug during testing:
[fs/lustre-release.git] / lustre / mdt / mdt_reint.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mdt/mdt_reint.c
5  *  Lustre Metadata Target (mdt) reintegration routines
6  *
7  *  Copyright (C) 2002-2006 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *   Author: Andreas Dilger <adilger@clusterfs.com>
10  *   Author: Phil Schwan <phil@clusterfs.com>
11  *   Author: Huang Hua <huanghua@clusterfs.com>
12  *
13  *   This file is part of the Lustre file system, http://www.lustre.org
14  *   Lustre is a trademark of Cluster File Systems, Inc.
15  *
16  *   You may have signed or agreed to another license before downloading
17  *   this software.  If so, you are bound by the terms and conditions
18  *   of that agreement, and the following does not apply to you.  See the
19  *   LICENSE file included with this distribution for more information.
20  *
21  *   If you did not agree to a different license, then this copy of Lustre
22  *   is open source software; you can redistribute it and/or modify it
23  *   under the terms of version 2 of the GNU General Public License as
24  *   published by the Free Software Foundation.
25  *
26  *   In either case, Lustre is distributed in the hope that it will be
27  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
28  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
29  *   license text for more details.
30  */
31
32 #ifndef EXPORT_SYMTAB
33 # define EXPORT_SYMTAB
34 #endif
35 #define DEBUG_SUBSYSTEM S_MDS
36
37 #include "mdt_internal.h"
38
39
40 static int mdt_md_create(struct mdt_thread_info *info)
41 {
42         struct mdt_device      *mdt = info->mti_mdt;
43         struct mdt_object      *parent;
44         struct mdt_object      *child;
45         struct mdt_lock_handle *lh;
46         struct mdt_body        *repbody;
47         struct md_attr         *ma = &info->mti_attr;
48         struct mdt_reint_record *rr = &info->mti_rr;
49         int rc;
50         ENTRY;
51
52         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
53
54         lh = &info->mti_lh[MDT_LH_PARENT];
55         lh->mlh_mode = LCK_PW;
56
57         parent = mdt_object_find_lock(info, rr->rr_fid1,
58                                       lh, MDS_INODELOCK_UPDATE);
59         if (IS_ERR(parent))
60                 RETURN(PTR_ERR(parent));
61
62         child = mdt_object_find(info->mti_ctxt, mdt, rr->rr_fid2);
63         if (!IS_ERR(child)) {
64                 struct md_object *next = mdt_object_child(parent);
65
66                 rc = mdo_create(info->mti_ctxt, next, rr->rr_name,
67                                 mdt_object_child(child), rr->rr_tgt, ma);
68                 if (rc == 0) {
69                         /* return fid & attr to client. */
70                         if (ma->ma_valid & MA_INODE)
71                                 mdt_pack_attr2body(repbody, &ma->ma_attr,
72                                                    mdt_object_fid(child));
73                 }
74                 mdt_object_put(info->mti_ctxt, child);
75         } else
76                 rc = PTR_ERR(child);
77         mdt_object_unlock_put(info, parent, lh, rc);
78         RETURN(rc);
79 }
80
81 /* partial request to create object only */
82 static int mdt_md_mkobj(struct mdt_thread_info *info)
83 {
84         struct mdt_device      *mdt = info->mti_mdt;
85         struct mdt_object      *o;
86         struct mdt_body        *repbody;
87         struct md_attr         *ma = &info->mti_attr;
88         int rc;
89         ENTRY;
90
91         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
92
93         o = mdt_object_find(info->mti_ctxt, mdt, info->mti_rr.rr_fid1);
94         if (!IS_ERR(o)) {
95                 struct md_object *next = mdt_object_child(o);
96
97                 rc = mo_object_create(info->mti_ctxt, next, ma);
98                 if (rc == 0) {
99                         /* return fid & attr to client. */
100                         if (ma->ma_valid & MA_INODE)
101                                 mdt_pack_attr2body(repbody, &ma->ma_attr,
102                                                    mdt_object_fid(o));
103                 }
104                 mdt_object_put(info->mti_ctxt, o);
105         } else
106                 rc = PTR_ERR(o);
107
108         RETURN(rc);
109 }
110
111
112 /* In the raw-setattr case, we lock the child inode.
113  * In the write-back case or if being called from open,
114  *               the client holds a lock already.
115  * We use the ATTR_FROM_OPEN flag to tell these cases apart. */
116 static int mdt_reint_setattr(struct mdt_thread_info *info)
117 {
118         struct lu_attr          *attr = &info->mti_attr.ma_attr;
119         struct mdt_reint_record *rr = &info->mti_rr;
120         struct ptlrpc_request   *req = mdt_info_req(info);
121         struct mdt_object       *mo;
122         struct md_object        *next;
123         struct mdt_lock_handle  *lh;
124         struct mdt_body         *repbody;
125         /*__u64                   valid = attr->la_valid;*/
126         int                      rc;
127
128         ENTRY;
129
130         DEBUG_REQ(D_INODE, req, "setattr "DFID3" %x", PFID3(rr->rr_fid1),
131                   (unsigned int)attr->la_valid);
132
133         /* MDS_CHECK_RESENT */
134         lh = &info->mti_lh[MDT_LH_PARENT];
135         lh->mlh_mode = LCK_EX;
136
137         if (attr->la_valid & ATTR_FROM_OPEN) {
138                 mo = mdt_object_find(info->mti_ctxt, info->mti_mdt,
139                                      rr->rr_fid1);
140         } else {
141                 __u64 lockpart = MDS_INODELOCK_UPDATE;
142                 if (attr->la_valid & (ATTR_MODE|ATTR_UID|ATTR_GID))
143                         lockpart |= MDS_INODELOCK_LOOKUP;
144
145                 mo = mdt_object_find_lock(info, rr->rr_fid1, lh, lockpart);
146         }
147         if (IS_ERR(mo))
148                 RETURN(rc = PTR_ERR(mo));
149
150         next = mdt_object_child(mo);
151         if (lu_object_exists(info->mti_ctxt, &mo->mot_obj.mo_lu) <= 0)
152                 GOTO(out_unlock, rc = -ENOENT);
153
154         rc = mo_attr_set(info->mti_ctxt, next, attr);
155         if (rc != 0)
156                 GOTO(out_unlock, rc);
157
158         rc = mo_attr_get(info->mti_ctxt, next, attr);
159         if (rc != 0)
160                 GOTO(out_unlock, rc);
161
162         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
163         mdt_pack_attr2body(repbody, attr, mdt_object_fid(mo));
164
165         /* don't return OST-specific attributes if we didn't just set them.
166         if (valid & ATTR_SIZE)
167                 repbody->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
168         if (valid & (ATTR_MTIME | ATTR_MTIME_SET))
169                 repbody->valid |= OBD_MD_FLMTIME;
170         if (valid & (ATTR_ATIME | ATTR_ATIME_SET))
171                 repbody->valid |= OBD_MD_FLATIME;
172         */
173         /* FIXME: I have to combine the attr_set & xattr_set into one single
174                   transaction. How can I?
175          */
176
177         if (rr->rr_eadatalen > 0)
178                 rc = mo_xattr_set(info->mti_ctxt, next,
179                                   rr->rr_eadata, rr->rr_eadatalen,
180                                   XATTR_NAME_LOV);
181
182         /* FIXME & TODO Please deal with logcookies here*/
183         GOTO(out_unlock, rc);
184 out_unlock:
185         mdt_object_unlock_put(info, mo, lh, rc);
186         return rc;
187 }
188
189
190 static int mdt_reint_create(struct mdt_thread_info *info)
191 {
192         int rc;
193         ENTRY;
194
195         switch (info->mti_attr.ma_attr.la_mode & S_IFMT) {
196         case S_IFREG:
197         case S_IFDIR:{
198                 if (strlen(info->mti_rr.rr_name) > 0)
199                         rc = mdt_md_create(info);
200                 else
201                         rc = mdt_md_mkobj(info);
202                 break;
203         }
204         case S_IFLNK:
205         case S_IFCHR:
206         case S_IFBLK:
207         case S_IFIFO:
208         case S_IFSOCK:{
209                 /* special file should stay on the same node as parent */
210                 LASSERT(strlen(info->mti_rr.rr_name) > 0);
211
212                 rc = mdt_md_create(info);
213                 break;
214         }
215         default:
216                 rc = -EOPNOTSUPP;
217         }
218         RETURN(rc);
219 }
220
221
222 static int mdt_reint_unlink(struct mdt_thread_info *info)
223 {
224         struct mdt_reint_record *rr = &info->mti_rr;
225         struct ptlrpc_request   *req = mdt_info_req(info);
226         struct mdt_object       *mp;
227         struct mdt_object       *mc;
228         struct mdt_lock_handle  *lhp;
229         struct mdt_lock_handle  *lhc;
230         struct mdt_body         *repbody;
231         struct lu_fid           *child_fid = &info->mti_tmp_fid1;
232         int                      rc;
233
234         ENTRY;
235
236         DEBUG_REQ(D_INODE, req, "unlink "DFID3"/%s\n", PFID3(rr->rr_fid1),
237                   rr->rr_name);
238
239         /* MDS_CHECK_RESENT here */
240
241         /* step 1: lock the parent */
242         lhp = &info->mti_lh[MDT_LH_PARENT];
243         lhp->mlh_mode = LCK_EX;
244         mp = mdt_object_find_lock(info, rr->rr_fid1, lhp, MDS_INODELOCK_UPDATE);
245         if (IS_ERR(mp))
246                 RETURN(PTR_ERR(mp));
247
248         if (strlen(rr->rr_name) == 0) {
249                 /* remote partial operation */
250                 rc = mo_ref_del(info->mti_ctxt, mdt_object_child(mp), 
251                                 &info->mti_attr);
252                 GOTO(out_unlock_parent, rc);
253         }
254
255         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
256
257         /*step 2: find & lock the child */
258         lhc = &info->mti_lh[MDT_LH_CHILD];
259         lhc->mlh_mode = LCK_EX;
260         rc = mdo_lookup(info->mti_ctxt, mdt_object_child(mp),
261                         rr->rr_name, child_fid);
262         if (rc != 0)
263                  GOTO(out_unlock_parent, rc);
264
265         /* we will lock the child regardless it is local or remote. No harm. */
266         mc = mdt_object_find_lock(info, child_fid, lhc, MDS_INODELOCK_FULL);
267         if (IS_ERR(mc))
268                 GOTO(out_unlock_parent, rc = PTR_ERR(mc));
269
270         /*step 3:  do some checking ...*/
271
272         /* step 4: delete it */
273         /* cmm will take care if child is local or remote */
274         rc = mdo_unlink(info->mti_ctxt, mdt_object_child(mp),
275                         mdt_object_child(mc), rr->rr_name, &info->mti_attr);
276         
277         if (rc)
278                 GOTO(out_unlock_child, rc);
279         
280         rc = mdt_handle_last_unlink(info, mc, &RQF_MDS_REINT_UNLINK_LAST);
281         GOTO(out_unlock_child, rc);
282
283 out_unlock_child:
284         mdt_object_unlock_put(info, mc, lhc, rc);
285 out_unlock_parent:
286         mdt_object_unlock_put(info, mp, lhp, rc);
287         return rc;
288 }
289
290 static int mdt_reint_link(struct mdt_thread_info *info)
291 {
292         struct mdt_reint_record *rr = &info->mti_rr;
293         struct ptlrpc_request   *req = mdt_info_req(info);
294         struct mdt_object       *ms;
295         struct mdt_object       *mp;
296         struct mdt_lock_handle  *lhs;
297         struct mdt_lock_handle  *lhp;
298         int rc;
299
300         ENTRY;
301
302         DEBUG_REQ(D_INODE, req, "link original "DFID3" to "DFID3" %s",
303                   PFID3(rr->rr_fid1), PFID3(rr->rr_fid2), rr->rr_name);
304
305         /* MDS_CHECK_RESENT here */
306
307         /* step 1: lock the source */
308         lhs = &info->mti_lh[MDT_LH_PARENT];
309         lhs->mlh_mode = LCK_EX;
310         ms = mdt_object_find_lock(info, rr->rr_fid1, lhs, MDS_INODELOCK_UPDATE);
311         if (IS_ERR(ms))
312                 RETURN(PTR_ERR(ms));
313
314         if (strlen(rr->rr_name) == 0) {
315                 /* remote partial operation */
316                 rc = mo_ref_add(info->mti_ctxt, mdt_object_child(ms));
317                 GOTO(out_unlock_source, rc);
318         }
319         /*step 2: find & lock the target parent dir*/
320         lhp = &info->mti_lh[MDT_LH_CHILD];
321         lhp->mlh_mode = LCK_EX;
322         mp = mdt_object_find_lock(info, rr->rr_fid2, lhp, MDS_INODELOCK_UPDATE);
323         if (IS_ERR(mp))
324                 GOTO(out_unlock_source, rc = PTR_ERR(mp));
325
326         /* step 4: link it */
327         rc = mdo_link(info->mti_ctxt, mdt_object_child(mp),
328                       mdt_object_child(ms), rr->rr_name);
329         GOTO(out_unlock_target, rc);
330
331 out_unlock_target:
332         mdt_object_unlock_put(info, mp, lhp, rc);
333 out_unlock_source:
334         mdt_object_unlock_put(info, ms, lhs, rc);
335         return rc;
336 }
337
338 /* partial operation for rename */
339 static int mdt_reint_rename_tgt(struct mdt_thread_info *info)
340 {
341         struct mdt_reint_record *rr = &info->mti_rr;
342         struct ptlrpc_request   *req = mdt_info_req(info);
343         struct mdt_object       *mtgtdir;
344         struct mdt_object       *mtgt = NULL;
345         struct mdt_lock_handle  *lh_tgtdir;
346         struct mdt_lock_handle  *lh_tgt;
347         struct lu_fid           *tgt_fid = &info->mti_tmp_fid1;
348         int                      rc;
349
350         ENTRY;
351
352         DEBUG_REQ(D_INODE, req, "rename_tgt "DFID3" to "DFID3" %s",
353                   PFID3(rr->rr_fid2),
354                   PFID3(rr->rr_fid1), rr->rr_tgt);
355
356         /* step 1: lookup & lock the tgt dir */
357         lh_tgt = &info->mti_lh[MDT_LH_CHILD];
358         lh_tgtdir = &info->mti_lh[MDT_LH_PARENT];
359         lh_tgtdir->mlh_mode = LCK_PW;
360         mtgtdir = mdt_object_find_lock(info, rr->rr_fid1, lh_tgtdir,
361                                        MDS_INODELOCK_UPDATE);
362         if (IS_ERR(mtgtdir))
363                 GOTO(out, rc = PTR_ERR(mtgtdir));
364
365         /*step 2: find & lock the target object if exists*/
366         rc = mdo_lookup(info->mti_ctxt, mdt_object_child(mtgtdir),
367                         rr->rr_tgt, tgt_fid);
368         if (rc != 0 && rc != -ENOENT)
369                 GOTO(out_unlock_tgtdir, rc);
370
371         if (rc == 0) {
372                 lh_tgt->mlh_mode = LCK_EX;
373
374                 mtgt = mdt_object_find_lock(info, tgt_fid, lh_tgt,
375                                             MDS_INODELOCK_LOOKUP);
376                 if (IS_ERR(mtgt))
377                         GOTO(out_unlock_tgtdir, rc = PTR_ERR(mtgt));
378         }
379
380         /* step 3: rename_tgt or name_insert */
381         if (mtgt)
382                 rc = mdo_rename_tgt(info->mti_ctxt, mdt_object_child(mtgtdir),
383                                     mdt_object_child(mtgt),
384                                     rr->rr_fid2, rr->rr_tgt);
385         else
386                 rc = mdo_name_insert(info->mti_ctxt, mdt_object_child(mtgtdir),
387                                      rr->rr_tgt, rr->rr_fid2);
388         GOTO(out_unlock_tgt, rc);
389
390 out_unlock_tgt:
391         if (mtgt) {
392                 mdt_object_unlock_put(info, mtgt, lh_tgt, rc);
393         }
394 out_unlock_tgtdir:
395         mdt_object_unlock_put(info, mtgtdir, lh_tgtdir, rc);
396 out:
397         return rc;
398 }
399
400
401 static int mdt_reint_rename(struct mdt_thread_info *info)
402 {
403         struct mdt_reint_record *rr = &info->mti_rr;
404         struct req_capsule      *pill = &info->mti_pill;
405         struct ptlrpc_request   *req = mdt_info_req(info);
406         struct mdt_object       *msrcdir;
407         struct mdt_object       *mtgtdir;
408         struct mdt_object       *mold;
409         struct mdt_object       *mnew = NULL;
410         struct mdt_lock_handle  *lh_srcdirp;
411         struct mdt_lock_handle  *lh_tgtdirp;
412         struct mdt_lock_handle  *lh_oldp;
413         struct mdt_lock_handle  *lh_newp;
414         struct lu_fid           *old_fid = &info->mti_tmp_fid1;
415         struct lu_fid           *new_fid = &info->mti_tmp_fid2;
416         int                      rc;
417
418         ENTRY;
419
420         DEBUG_REQ(D_INODE, req, "rename "DFID3"/%s to "DFID3"/%s",
421                   PFID3(rr->rr_fid1), rr->rr_name,
422                   PFID3(rr->rr_fid2), rr->rr_tgt);
423
424         /* MDS_CHECK_RESENT here */
425
426         rc = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT);
427         if (rc == 1) {
428         /* if (rr->rr_name[0] == 0) {*/
429                 RETURN(mdt_reint_rename_tgt(info));
430         }
431
432         lh_newp = &info->mti_lh[MDT_LH_NEW];
433
434         /* step 1: lock the source dir */
435         lh_srcdirp = &info->mti_lh[MDT_LH_PARENT];
436         lh_srcdirp->mlh_mode = LCK_EX;
437         msrcdir = mdt_object_find_lock(info, rr->rr_fid1, lh_srcdirp,
438                                        MDS_INODELOCK_UPDATE);
439         if (IS_ERR(msrcdir))
440                 GOTO(out, rc = PTR_ERR(msrcdir));
441
442         /*step 2: find & lock the target dir*/
443         lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD];
444         lh_tgtdirp->mlh_mode = LCK_EX;
445         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) {
446                 mdt_object_get(info->mti_ctxt, msrcdir);
447                 mtgtdir = msrcdir;
448         } else {
449                 mtgtdir = mdt_object_find_lock(info, rr->rr_fid2, lh_tgtdirp,
450                                                MDS_INODELOCK_UPDATE);
451                 if (IS_ERR(mtgtdir))
452                         GOTO(out_unlock_source, rc = PTR_ERR(mtgtdir));
453         }
454
455         /*step 3: find & lock the old object*/
456         rc = mdo_lookup(info->mti_ctxt, mdt_object_child(msrcdir),
457                         rr->rr_name, old_fid);
458         if (rc != 0)
459                 GOTO(out_unlock_target, rc);
460
461         lh_oldp = &info->mti_lh[MDT_LH_OLD];
462         lh_oldp->mlh_mode = LCK_EX;
463         mold = mdt_object_find_lock(info, old_fid, lh_oldp,
464                                     MDS_INODELOCK_LOOKUP);
465         if (IS_ERR(mold))
466                 GOTO(out_unlock_target, rc = PTR_ERR(mold));
467
468         /*step 4: find & lock the new object*/
469         /* new target object may not exist now */
470         rc = mdo_lookup(info->mti_ctxt, mdt_object_child(mtgtdir),
471                         rr->rr_tgt, new_fid);
472         if (rc != 0 && rc != -ENOENT)
473                 GOTO(out_unlock_old, rc);
474
475         if (rc == 0) {
476                 /* the new_fid should have been filled at this moment*/
477                 lh_newp->mlh_mode = LCK_EX;
478                 mnew = mdt_object_find_lock(info, new_fid, lh_newp,
479                                             MDS_INODELOCK_FULL);
480                 if (IS_ERR(mnew))
481                         GOTO(out_unlock_old, rc = PTR_ERR(mnew));
482         }
483
484         /* step 5: dome some checking ...*/
485
486         /* step 6: rename it */
487         rc = mdo_rename(info->mti_ctxt, mdt_object_child(msrcdir),
488                         mdt_object_child(mtgtdir), old_fid,
489                         rr->rr_name, mnew ? mdt_object_child(mnew): NULL,
490                         rr->rr_tgt);
491         GOTO(out_unlock_new, rc);
492
493 out_unlock_new:
494         if (mnew) {
495                 mdt_object_unlock_put(info, mnew, lh_newp, rc);
496         }
497 out_unlock_old:
498         mdt_object_unlock_put(info, mold, lh_oldp, rc);
499 out_unlock_target:
500         mdt_object_unlock_put(info, mtgtdir, lh_tgtdirp, rc);
501 out_unlock_source:
502         mdt_object_unlock_put(info, msrcdir, lh_srcdirp, rc);
503 out:
504         return rc;
505 }
506
507
508 typedef int (*mdt_reinter)(struct mdt_thread_info *info);
509
510 static mdt_reinter reinters[REINT_MAX] = {
511         [REINT_SETATTR]  = mdt_reint_setattr,
512         [REINT_CREATE] = mdt_reint_create,
513         [REINT_LINK] = mdt_reint_link,
514         [REINT_UNLINK] = mdt_reint_unlink,
515         [REINT_RENAME] = mdt_reint_rename,
516         [REINT_OPEN] = mdt_reint_open
517 };
518
519 int mdt_reint_rec(struct mdt_thread_info *info)
520 {
521         int rc;
522         ENTRY;
523
524         rc = reinters[info->mti_rr.rr_opcode](info);
525
526         RETURN(rc);
527 }