Whamcloud - gitweb
- unland b_fid to HEAD
[fs/lustre-release.git] / lustre / mds / mds_reint.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/mds_reint.c
5  *  Lustre Metadata Server (mds) reintegration routines
6  *
7  *  Copyright (C) 2002, 2003 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *   Author: Andreas Dilger <adilger@clusterfs.com>
10  *   Author: Phil Schwan <phil@clusterfs.com>
11  *
12  *   This file is part of Lustre, http://www.lustre.org.
13  *
14  *   Lustre is free software; you can redistribute it and/or
15  *   modify it under the terms of version 2 of the GNU General Public
16  *   License as published by the Free Software Foundation.
17  *
18  *   Lustre is distributed in the hope that it will be useful,
19  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
20  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21  *   GNU General Public License for more details.
22  *
23  *   You should have received a copy of the GNU General Public License
24  *   along with Lustre; if not, write to the Free Software
25  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
26  */
27
28 #ifndef EXPORT_SYMTAB
29 # define EXPORT_SYMTAB
30 #endif
31 #define DEBUG_SUBSYSTEM S_MDS
32
33 #include <linux/fs.h>
34 #include <linux/jbd.h>
35 #include <linux/ext3_fs.h>
36 #include <linux/obd_support.h>
37 #include <linux/obd_class.h>
38 #include <linux/obd.h>
39 #include <linux/lustre_lib.h>
40 #include <linux/lustre_idl.h>
41 #include <linux/lustre_mds.h>
42 #include <linux/lustre_dlm.h>
43 #include <linux/lustre_log.h>
44 #include <linux/lustre_fsfilt.h>
45 #include "mds_internal.h"
46
47 void mds_commit_cb(struct obd_device *obd, __u64 transno, void *data,
48                    int error)
49 {
50         obd_transno_commit_cb(obd, transno, error);
51 }
52
53 struct mds_logcancel_data {
54         struct lov_mds_md      *mlcd_lmm;
55         int                     mlcd_size;
56         int                     mlcd_cookielen;
57         int                     mlcd_eadatalen;
58         struct llog_cookie      mlcd_cookies[0];
59 };
60
61
62 static void mds_cancel_cookies_cb(struct obd_device *obd, __u64 transno,
63                                   void *cb_data, int error)
64 {
65         struct mds_logcancel_data *mlcd = cb_data;
66         struct lov_stripe_md *lsm = NULL;
67         struct llog_ctxt *ctxt;
68         int rc;
69
70         obd_transno_commit_cb(obd, transno, error);
71
72         CDEBUG(D_HA, "cancelling %d cookies\n",
73                (int)(mlcd->mlcd_cookielen / sizeof(*mlcd->mlcd_cookies)));
74
75         rc = obd_unpackmd(obd->u.mds.mds_osc_exp, &lsm, mlcd->mlcd_lmm,
76                           mlcd->mlcd_eadatalen);
77         if (rc < 0) {
78                 CERROR("bad LSM cancelling %d log cookies: rc %d\n",
79                        (int)(mlcd->mlcd_cookielen/sizeof(*mlcd->mlcd_cookies)),
80                        rc);
81         } else {
82                 ///* XXX 0 normally, SENDNOW for debug */);
83                 ctxt = llog_get_context(&obd->obd_llogs,
84                                         mlcd->mlcd_cookies[0].lgc_subsys + 1);
85                 rc = llog_cancel(ctxt, mlcd->mlcd_cookielen /
86                                  sizeof(*mlcd->mlcd_cookies),
87                                  mlcd->mlcd_cookies, OBD_LLOG_FL_SENDNOW, lsm);
88                 if (rc)
89                         CERROR("error cancelling %d log cookies: rc %d\n",
90                                (int)(mlcd->mlcd_cookielen /
91                                      sizeof(*mlcd->mlcd_cookies)), rc);
92         }
93
94         OBD_FREE(mlcd, mlcd->mlcd_size);
95 }
96
97 /* Assumes caller has already pushed us into the kernel context. */
98 int mds_finish_transno(struct mds_obd *mds, struct inode *inode, void *handle,
99                        struct ptlrpc_request *req, int rc, __u32 op_data)
100 {
101         struct mds_export_data *med = &req->rq_export->exp_mds_data;
102         struct mds_client_data *mcd = med->med_mcd;
103         struct obd_device *obd = req->rq_export->exp_obd;
104         int err;
105         __u64 transno;
106         loff_t off;
107         int log_pri = D_HA;
108         ENTRY;
109
110         /* if the export has already been failed, we have no last_rcvd slot */
111         if (req->rq_export->exp_failed) {
112                 CERROR("committing transaction for disconnected client\n");
113                 if (handle)
114                         GOTO(commit, rc);
115                 RETURN(rc);
116         }
117
118         if (IS_ERR(handle))
119                 RETURN(rc);
120
121         if (handle == NULL) {
122                 /* if we're starting our own xaction, use our own inode */
123                 inode = mds->mds_rcvd_filp->f_dentry->d_inode;
124                 handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR, NULL);
125                 if (IS_ERR(handle)) {
126                         CERROR("fsfilt_start: %ld\n", PTR_ERR(handle));
127                         RETURN(PTR_ERR(handle));
128                 }
129         }
130
131         off = med->med_off;
132
133         transno = req->rq_reqmsg->transno;
134         if (rc != 0) {
135                 LASSERTF(transno == 0, "BUG 3934, t"LPU64" rc %d\n", transno, rc);
136         } else if (transno == 0) {
137                 spin_lock(&mds->mds_transno_lock);
138                 transno = ++mds->mds_last_transno;
139                 spin_unlock(&mds->mds_transno_lock);
140         } else {
141                 spin_lock(&mds->mds_transno_lock);
142                 if (transno > mds->mds_last_transno)
143                         mds->mds_last_transno = transno;
144                 spin_unlock(&mds->mds_transno_lock);
145         }
146         req->rq_repmsg->transno = req->rq_transno = transno;
147         mcd->mcd_last_transno = cpu_to_le64(transno);
148         mcd->mcd_last_xid = cpu_to_le64(req->rq_xid);
149         mcd->mcd_last_result = cpu_to_le32(rc);
150         mcd->mcd_last_data = cpu_to_le32(op_data);
151
152
153         fsfilt_add_journal_cb(req->rq_export->exp_obd, mds->mds_sb,
154                               transno, handle, mds_commit_cb, NULL);
155         
156         err = fsfilt_write_record(obd, mds->mds_rcvd_filp, mcd, sizeof(*mcd),
157                                   &off, 0);
158
159         if (err) {
160                 log_pri = D_ERROR;
161                 if (rc == 0)
162                         rc = err;
163         }
164
165         DEBUG_REQ(log_pri, req,
166                   "wrote trans #"LPU64" client %s at idx %u: err = %d",
167                   transno, mcd->mcd_uuid, med->med_idx, err);
168
169         err = mds_lov_write_objids(obd);
170         if (err) {
171                 log_pri = D_ERROR;
172                 if (rc == 0)
173                         rc = err;
174         }
175         CDEBUG(log_pri, "wrote objids: err = %d\n", err);
176
177 commit:
178         err = fsfilt_commit(obd, mds->mds_sb, inode, handle, 0);
179         if (err) {
180                 CERROR("error committing transaction: %d\n", err);
181                 if (!rc)
182                         rc = err;
183         }
184
185         RETURN(rc);
186 }
187
188 /* this gives the same functionality as the code between
189  * sys_chmod and inode_setattr
190  * chown_common and inode_setattr
191  * utimes and inode_setattr
192  */
193 int mds_fix_attr(struct inode *inode, struct mds_update_record *rec)
194 {
195         time_t now = LTIME_S(CURRENT_TIME);
196         struct iattr *attr = &rec->ur_iattr;
197         unsigned int ia_valid = attr->ia_valid;
198         int error;
199         ENTRY;
200
201         /* only fix up attrs if the client VFS didn't already */
202         if (!(ia_valid & ATTR_RAW))
203                 RETURN(0);
204
205         if (!(ia_valid & ATTR_CTIME_SET))
206                 LTIME_S(attr->ia_ctime) = now;
207         if (!(ia_valid & ATTR_ATIME_SET))
208                 LTIME_S(attr->ia_atime) = now;
209         if (!(ia_valid & ATTR_MTIME_SET))
210                 LTIME_S(attr->ia_mtime) = now;
211
212         if (IS_IMMUTABLE(inode) || IS_APPEND(inode))
213                 RETURN(-EPERM);
214
215         /* times */
216         if ((ia_valid & (ATTR_MTIME|ATTR_ATIME)) == (ATTR_MTIME|ATTR_ATIME)) {
217                 if (rec->_ur_fsuid != inode->i_uid &&
218                     (error = ll_permission(inode, MAY_WRITE, NULL)) != 0)
219                         RETURN(error);
220         }
221
222         if (ia_valid & ATTR_SIZE) {
223                 if ((error = ll_permission(inode, MAY_WRITE, NULL)) != 0)
224                         RETURN(error);
225         }
226
227         if (ia_valid & ATTR_UID) {
228                 /* chown */
229                 error = -EPERM;
230                 if (IS_IMMUTABLE(inode) || IS_APPEND(inode))
231                         RETURN(-EPERM);
232                 if (attr->ia_uid == (uid_t) -1)
233                         attr->ia_uid = inode->i_uid;
234                 if (attr->ia_gid == (gid_t) -1)
235                         attr->ia_gid = inode->i_gid;
236                 attr->ia_mode = inode->i_mode;
237                 /*
238                  * If the user or group of a non-directory has been
239                  * changed by a non-root user, remove the setuid bit.
240                  * 19981026 David C Niemi <niemi@tux.org>
241                  *
242                  * Changed this to apply to all users, including root,
243                  * to avoid some races. This is the behavior we had in
244                  * 2.0. The check for non-root was definitely wrong
245                  * for 2.2 anyway, as it should have been using
246                  * CAP_FSETID rather than fsuid -- 19990830 SD.
247                  */
248                 if ((inode->i_mode & S_ISUID) == S_ISUID &&
249                     !S_ISDIR(inode->i_mode)) {
250                         attr->ia_mode &= ~S_ISUID;
251                         attr->ia_valid |= ATTR_MODE;
252                 }
253                 /*
254                  * Likewise, if the user or group of a non-directory
255                  * has been changed by a non-root user, remove the
256                  * setgid bit UNLESS there is no group execute bit
257                  * (this would be a file marked for mandatory
258                  * locking).  19981026 David C Niemi <niemi@tux.org>
259                  *
260                  * Removed the fsuid check (see the comment above) --
261                  * 19990830 SD.
262                  */
263                 if (((inode->i_mode & (S_ISGID | S_IXGRP)) ==
264                      (S_ISGID | S_IXGRP)) && !S_ISDIR(inode->i_mode)) {
265                         attr->ia_mode &= ~S_ISGID;
266                         attr->ia_valid |= ATTR_MODE;
267                 }
268         } else if (ia_valid & ATTR_MODE) {
269                 int mode = attr->ia_mode;
270                 /* chmod */
271                 if (attr->ia_mode == (mode_t) -1)
272                         attr->ia_mode = inode->i_mode;
273                 attr->ia_mode =
274                         (mode & S_IALLUGO) | (inode->i_mode & ~S_IALLUGO);
275         }
276         RETURN(0);
277 }
278
279 void mds_steal_ack_locks(struct ptlrpc_request *req)
280 {
281         struct obd_export         *exp = req->rq_export;
282         struct list_head          *tmp;
283         struct ptlrpc_reply_state *oldrep;
284         struct ptlrpc_service     *svc;
285         struct llog_create_locks  *lcl;
286         unsigned long              flags;
287         char                       str[PTL_NALFMT_SIZE];
288         int                        i;
289
290         /* CAVEAT EMPTOR: spinlock order */
291         spin_lock_irqsave (&exp->exp_lock, flags);
292         list_for_each (tmp, &exp->exp_outstanding_replies) {
293                 oldrep = list_entry(tmp, struct ptlrpc_reply_state,rs_exp_list);
294
295                 if (oldrep->rs_xid != req->rq_xid)
296                         continue;
297
298                 if (oldrep->rs_msg.opc != req->rq_reqmsg->opc)
299                         CERROR ("Resent req xid "LPX64" has mismatched opc: "
300                                 "new %d old %d\n", req->rq_xid,
301                                 req->rq_reqmsg->opc, oldrep->rs_msg.opc);
302
303                 svc = oldrep->rs_srv_ni->sni_service;
304                 spin_lock (&svc->srv_lock);
305
306                 list_del_init (&oldrep->rs_exp_list);
307
308                 CWARN("Stealing %d locks from rs %p x"LPD64".t"LPD64
309                       " o%d NID %s\n",
310                       oldrep->rs_nlocks, oldrep,
311                       oldrep->rs_xid, oldrep->rs_transno, oldrep->rs_msg.opc,
312                       ptlrpc_peernid2str(&exp->exp_connection->c_peer, str));
313
314                 for (i = 0; i < oldrep->rs_nlocks; i++)
315                         ptlrpc_save_lock(req,
316                                          &oldrep->rs_locks[i],
317                                          oldrep->rs_modes[i]);
318                 oldrep->rs_nlocks = 0;
319
320                 lcl = oldrep->rs_llog_locks;
321                 oldrep->rs_llog_locks = NULL;
322                 if (lcl != NULL)
323                         ptlrpc_save_llog_lock(req, lcl);
324
325                 DEBUG_REQ(D_HA, req, "stole locks for");
326                 ptlrpc_schedule_difficult_reply (oldrep);
327
328                 spin_unlock (&svc->srv_lock);
329                 spin_unlock_irqrestore (&exp->exp_lock, flags);
330                 return;
331         }
332         spin_unlock_irqrestore (&exp->exp_lock, flags);
333 }
334
335 void mds_req_from_mcd(struct ptlrpc_request *req, struct mds_client_data *mcd)
336 {
337         DEBUG_REQ(D_HA, req, "restoring transno "LPD64"/status %d",
338                   mcd->mcd_last_transno, mcd->mcd_last_result);
339         req->rq_repmsg->transno = req->rq_transno = mcd->mcd_last_transno;
340         req->rq_repmsg->status = req->rq_status = mcd->mcd_last_result;
341
342         mds_steal_ack_locks(req);
343 }
344
345 static void reconstruct_reint_setattr(struct mds_update_record *rec,
346                                       int offset, struct ptlrpc_request *req)
347 {
348         struct mds_export_data *med = &req->rq_export->exp_mds_data;
349         struct mds_obd *obd = &req->rq_export->exp_obd->u.mds;
350         struct dentry *de;
351         struct mds_body *body;
352
353         mds_req_from_mcd(req, med->med_mcd);
354
355         de = mds_fid2dentry(obd, rec->ur_fid1, NULL);
356         if (IS_ERR(de)) {
357                 LASSERT(PTR_ERR(de) == req->rq_status);
358                 return;
359         }
360
361         body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
362         mds_pack_inode2fid(req2obd(req), &body->fid1, de->d_inode);
363         mds_pack_inode2body(req2obd(req), body, de->d_inode);
364
365         /* Don't return OST-specific attributes if we didn't just set them */
366         if (rec->ur_iattr.ia_valid & ATTR_SIZE)
367                 body->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
368         if (rec->ur_iattr.ia_valid & (ATTR_MTIME | ATTR_MTIME_SET))
369                 body->valid |= OBD_MD_FLMTIME;
370         if (rec->ur_iattr.ia_valid & (ATTR_ATIME | ATTR_ATIME_SET))
371                 body->valid |= OBD_MD_FLATIME;
372
373         l_dput(de);
374 }
375
376 /* In the raw-setattr case, we lock the child inode.
377  * In the write-back case or if being called from open, the client holds a lock
378  * already.
379  *
380  * We use the ATTR_FROM_OPEN flag to tell these cases apart. */
381 static int mds_reint_setattr(struct mds_update_record *rec, int offset,
382                              struct ptlrpc_request *req,
383                              struct lustre_handle *lh)
384 {
385         struct mds_obd *mds = mds_req2mds(req);
386         struct obd_device *obd = req->rq_export->exp_obd;
387         struct mds_body *body;
388         struct dentry *de;
389         struct inode *inode = NULL;
390         struct lustre_handle lockh[2] = {{0}, {0}};
391         int parent_mode;
392         void *handle = NULL;
393         struct mds_logcancel_data *mlcd = NULL;
394         int rc = 0, cleanup_phase = 0, err, locked = 0;
395         ENTRY;
396
397         LASSERT(offset == 1);
398
399         DEBUG_REQ(D_INODE, req, "setattr "LPU64"/%u %x", rec->ur_fid1->id,
400                   rec->ur_fid1->generation, rec->ur_iattr.ia_valid);
401
402         MDS_CHECK_RESENT(req, reconstruct_reint_setattr(rec, offset, req));
403
404         MDS_UPDATE_COUNTER(mds, MDS_SETATTR_COUNT);
405
406         if (rec->ur_iattr.ia_valid & ATTR_FROM_OPEN) {
407                 de = mds_fid2dentry(mds, rec->ur_fid1, NULL);
408                 if (IS_ERR(de))
409                         GOTO(cleanup, rc = PTR_ERR(de));
410         } else {
411                 __u64 lockpart = MDS_INODELOCK_UPDATE;
412                 if (rec->ur_iattr.ia_valid & (ATTR_MODE|ATTR_UID|ATTR_GID) )
413                         lockpart |= MDS_INODELOCK_LOOKUP;
414                 de = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_PW,
415                                            lockh, &parent_mode, NULL, 0, lockpart);
416                 if (IS_ERR(de))
417                         GOTO(cleanup, rc = PTR_ERR(de));
418                 locked = 1;
419         }
420
421         cleanup_phase = 1;
422
423         inode = de->d_inode;
424         LASSERT(inode);
425         if ((S_ISREG(inode->i_mode) || S_ISDIR(inode->i_mode)) &&
426             rec->ur_eadata != NULL)
427                 down(&inode->i_sem);
428
429         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_SETATTR_WRITE, inode->i_sb);
430
431         handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR, NULL);
432         if (IS_ERR(handle))
433                 GOTO(cleanup, rc = PTR_ERR(handle));
434
435         if (rec->ur_iattr.ia_valid & (ATTR_MTIME | ATTR_CTIME))
436                 CDEBUG(D_INODE, "setting mtime %lu, ctime %lu\n",
437                        LTIME_S(rec->ur_iattr.ia_mtime),
438                        LTIME_S(rec->ur_iattr.ia_ctime));
439         rc = mds_fix_attr(inode, rec);
440         if (rc)
441                 GOTO(cleanup, rc);
442
443         if (rec->ur_iattr.ia_valid & ATTR_ATTR_FLAG)    /* ioctl */
444                 rc = fsfilt_iocontrol(obd, inode, NULL, EXT3_IOC_SETFLAGS,
445                                       (long)&rec->ur_iattr.ia_attr_flags);
446         else                                            /* setattr */
447                 rc = fsfilt_setattr(obd, de, handle, &rec->ur_iattr, 0);
448
449         if (rc == 0 && (S_ISREG(inode->i_mode) || S_ISDIR(inode->i_mode)) &&
450             rec->ur_eadata != NULL) {
451                 struct lov_stripe_md *lsm = NULL;
452
453                 rc = ll_permission(inode, MAY_WRITE, NULL);
454                 if (rc < 0)
455                         GOTO(cleanup, rc);
456
457                 rc = obd_iocontrol(OBD_IOC_LOV_SETSTRIPE,
458                                    mds->mds_osc_exp, 0, &lsm, rec->ur_eadata);
459                 if (rc)
460                         GOTO(cleanup, rc);
461
462                 obd_free_memmd(mds->mds_osc_exp, &lsm);
463
464                 rc = fsfilt_set_md(obd, inode, handle, rec->ur_eadata,
465                                    rec->ur_eadatalen);
466                 if (rc)
467                         GOTO(cleanup, rc);
468         }
469
470         body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
471         mds_pack_inode2fid(obd, &body->fid1, inode);
472         mds_pack_inode2body(obd, body, inode);
473
474         /* Don't return OST-specific attributes if we didn't just set them */
475         if (rec->ur_iattr.ia_valid & ATTR_SIZE)
476                 body->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
477         if (rec->ur_iattr.ia_valid & (ATTR_MTIME | ATTR_MTIME_SET))
478                 body->valid |= OBD_MD_FLMTIME;
479         if (rec->ur_iattr.ia_valid & (ATTR_ATIME | ATTR_ATIME_SET))
480                 body->valid |= OBD_MD_FLATIME;
481
482         if (rc == 0 && rec->ur_cookielen && !IS_ERR(mds->mds_osc_obd)) {
483                 OBD_ALLOC(mlcd, sizeof(*mlcd) + rec->ur_cookielen +
484                           rec->ur_eadatalen);
485                 if (mlcd) {
486                         mlcd->mlcd_size = sizeof(*mlcd) + rec->ur_cookielen +
487                                 rec->ur_eadatalen;
488                         mlcd->mlcd_eadatalen = rec->ur_eadatalen;
489                         mlcd->mlcd_cookielen = rec->ur_cookielen;
490                         mlcd->mlcd_lmm = (void *)&mlcd->mlcd_cookies +
491                                 mlcd->mlcd_cookielen;
492                         memcpy(&mlcd->mlcd_cookies, rec->ur_logcookies,
493                                mlcd->mlcd_cookielen);
494                         memcpy(mlcd->mlcd_lmm, rec->ur_eadata,
495                                mlcd->mlcd_eadatalen);
496                 } else {
497                         CERROR("unable to allocate log cancel data\n");
498                 }
499         }
500         EXIT;
501  cleanup:
502         if (mlcd != NULL)
503                 fsfilt_add_journal_cb(req->rq_export->exp_obd, mds->mds_sb, 0,
504                                       handle, mds_cancel_cookies_cb, mlcd);
505         err = mds_finish_transno(mds, inode, handle, req, rc, 0);
506         switch (cleanup_phase) {
507         case 1:
508                 if ((S_ISREG(inode->i_mode) || S_ISDIR(inode->i_mode)) &&
509                     rec->ur_eadata != NULL)
510                         up(&inode->i_sem);
511                 l_dput(de);
512                 if (locked) {
513 #ifdef S_PDIROPS
514                         if (lockh[1].cookie != 0)
515                                 ldlm_lock_decref(lockh + 1, parent_mode);
516 #endif
517                         if (rc) {
518                                 ldlm_lock_decref(lockh, LCK_PW);
519                         } else {
520                                 ptlrpc_save_lock (req, lockh, LCK_PW);
521                         }
522                 }
523         case 0:
524                 break;
525         default:
526                 LBUG();
527         }
528         if (err && !rc)
529                 rc = err;
530
531         req->rq_status = rc;
532         return 0;
533 }
534
535 static void reconstruct_reint_create(struct mds_update_record *rec, int offset,
536                                      struct ptlrpc_request *req)
537 {
538         struct mds_export_data *med = &req->rq_export->exp_mds_data;
539         struct mds_obd *obd = &req->rq_export->exp_obd->u.mds;
540         struct dentry *parent, *child;
541         struct mds_body *body;
542         ENTRY;
543
544         mds_req_from_mcd(req, med->med_mcd);
545
546         if (req->rq_status) {
547                 EXIT;
548                 return;
549         }
550
551         parent = mds_fid2dentry(obd, rec->ur_fid1, NULL);
552         LASSERT(!IS_ERR(parent));
553         child = ll_lookup_one_len(rec->ur_name, parent, rec->ur_namelen - 1);
554         LASSERT(!IS_ERR(child));
555         if ((child->d_flags & DCACHE_CROSS_REF)) {
556                 LASSERTF(child->d_inode == NULL, "BUG 3869\n");
557                 body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
558                 mds_pack_dentry2fid(&body->fid1, child);
559                 mds_pack_dentry2body(body, child);
560                 body->valid |= OBD_MD_MDS;
561         } else if (child->d_inode == NULL) {
562                 DEBUG_REQ(D_ERROR, req, "parent "LPU64"/%u name %s mode %o",
563                           rec->ur_fid1->id, rec->ur_fid1->generation,
564                           rec->ur_name, rec->ur_mode);
565                 LASSERTF(child->d_inode != NULL, "BUG 3869\n");
566         } else {
567                 body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
568                 mds_pack_inode2fid(req2obd(req), &body->fid1, child->d_inode);
569                 mds_pack_inode2body(req2obd(req), body, child->d_inode);
570         }
571         l_dput(parent);
572         l_dput(child);
573         EXIT;
574 }
575
576 static int mds_reint_create(struct mds_update_record *rec, int offset,
577                             struct ptlrpc_request *req,
578                             struct lustre_handle *lh)
579 {
580         struct dentry *dparent = NULL;
581         struct mds_obd *mds = mds_req2mds(req);
582         struct obd_device *obd = req->rq_export->exp_obd;
583         struct dentry *dchild = NULL;
584         struct inode *dir = NULL;
585         void *handle = NULL;
586         struct lustre_handle lockh[2] = {{0}, {0}};
587         int parent_mode;
588         int rc = 0, err, type = rec->ur_mode & S_IFMT, cleanup_phase = 0;
589         int created = 0;
590         struct dentry_params dp;
591         struct mea *mea = NULL;
592         int mea_size;
593         ENTRY;
594
595         LASSERT(offset == 1);
596         LASSERT(!strcmp(req->rq_export->exp_obd->obd_type->typ_name, LUSTRE_MDS_NAME));
597
598         DEBUG_REQ(D_INODE, req, "parent "LPU64"/%u name %s mode %o",
599                   rec->ur_fid1->id, rec->ur_fid1->generation,
600                   rec->ur_name, rec->ur_mode);
601
602         MDS_CHECK_RESENT(req, reconstruct_reint_create(rec, offset, req));
603
604
605         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
606                 GOTO(cleanup, rc = -ESTALE);
607
608         dparent = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_PW, lockh,
609                                         &parent_mode, rec->ur_name,
610                                         rec->ur_namelen - 1, MDS_INODELOCK_UPDATE);
611         if (IS_ERR(dparent)) {
612                 rc = PTR_ERR(dparent);
613                 CERROR("parent lookup error %d\n", rc);
614                 GOTO(cleanup, rc);
615         }
616         cleanup_phase = 1; /* locked parent dentry */
617         dir = dparent->d_inode;
618         LASSERT(dir);
619
620         ldlm_lock_dump_handle(D_OTHER, lockh);
621
622         /* try to retrieve MEA data for this dir */
623         rc = mds_get_lmv_attr(obd, dparent->d_inode, &mea, &mea_size);
624         if (rc)
625                 GOTO(cleanup, rc);
626
627         if (mea != NULL) {
628                 /* dir is already splitted, check is requested filename
629                  * should live at this MDS or at another one */
630                 int i;
631                 i = mea_name2idx(mea, rec->ur_name, rec->ur_namelen - 1);
632                 if (mea->mea_master != mea->mea_fids[i].mds) {
633                         CDEBUG(D_OTHER, "inapropriate MDS(%d) for %lu/%u:%s."
634                                " should be %d(%d)\n",
635                                mea->mea_master, dparent->d_inode->i_ino,
636                                dparent->d_inode->i_generation, rec->ur_name,
637                                mea->mea_fids[i].mds, i);
638                         GOTO(cleanup, rc = -ERESTART);
639                 }
640         }
641
642         dchild = ll_lookup_one_len(rec->ur_name, dparent, rec->ur_namelen - 1);
643         if (IS_ERR(dchild)) {
644                 rc = PTR_ERR(dchild);
645                 CERROR("child lookup error %d\n", rc);
646                 GOTO(cleanup, rc);
647         }
648
649         cleanup_phase = 2; /* child dentry */
650
651         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_CREATE_WRITE, dir->i_sb);
652
653         if (type == S_IFREG || type == S_IFDIR) {
654                 rc = mds_try_to_split_dir(obd, dparent, &mea, 0, parent_mode);
655                 CDEBUG(D_OTHER, "%s: splitted %lu/%u - %d/%d\n",
656                        obd->obd_name, dparent->d_inode->i_ino,
657                        dparent->d_inode->i_generation, rc, parent_mode);
658                 if (rc > 0) {
659                         /* dir got splitted */
660                         GOTO(cleanup, rc = -ERESTART);
661                 } else if (rc < 0) {
662                         /* error happened during spitting. */
663                         GOTO(cleanup, rc);
664                 }
665         }
666
667         if (dir->i_mode & S_ISGID) {
668                 if (S_ISDIR(rec->ur_mode))
669                         rec->ur_mode |= S_ISGID;
670         }
671
672         dchild->d_fsdata = (void *)&dp;
673         dp.p_inum = (unsigned long)rec->ur_fid2->id;
674         dp.p_ptr = req;
675
676         switch (type) {
677         case S_IFREG:{
678                 handle = fsfilt_start(obd, dir, FSFILT_OP_CREATE, NULL);
679                 if (IS_ERR(handle))
680                         GOTO(cleanup, rc = PTR_ERR(handle));
681                 rc = ll_vfs_create(dir, dchild, rec->ur_mode, NULL);
682                 EXIT;
683                 break;
684         }
685         case S_IFDIR:{
686                 int nstripes = 0;
687                 int i;
688                 
689                 /* as Peter asked, mkdir() should distribute new directories
690                  * over the whole cluster in order to distribute namespace
691                  * processing load. first, we calculate which MDS to use to put
692                  * new directory's inode in. */
693                 i = mds_choose_mdsnum(obd, rec->ur_name, rec->ur_namelen - 1, 
694                                       rec->ur_flags);
695                 if (i == mds->mds_num) {
696                         /* inode will be created locally */
697                         handle = fsfilt_start(obd, dir, FSFILT_OP_MKDIR, NULL);
698                         if (IS_ERR(handle))
699                                 GOTO(cleanup, rc = PTR_ERR(handle));
700
701                         rc = vfs_mkdir(dir, dchild, rec->ur_mode);
702
703                         if (rec->ur_eadata)
704                                 nstripes = *(u16 *)rec->ur_eadata;
705
706                         if (rc == 0 && nstripes) {
707                                 /* we pass LCK_EX to split routine to
708                                  * signalthat we have exclusive access
709                                  * to the directory. simple because
710                                  * nobody knows it already exists -bzzz */
711                                 rc = mds_try_to_split_dir(obd, dchild,
712                                                           NULL, nstripes,
713                                                           LCK_EX);
714                                 if (rc > 0) {
715                                         /* dir got splitted */
716                                         rc = 0;
717                                 } else if (rc < 0) {
718                                         /* an error occured during
719                                          * splitting. */
720                                         GOTO(cleanup, rc);
721                                 }
722                         }
723                 } else if (!DENTRY_VALID(dchild)) {
724                         /* inode will be created on another MDS */
725                         struct obdo *oa = NULL;
726                         struct mds_body *body;
727                         
728                         /* first, create that inode */
729                         oa = obdo_alloc();
730                         LASSERT(oa != NULL);
731                         oa->o_mds = i;
732                         oa->o_easize = 0;
733                         if (rec->ur_eadata) {
734                                 /* user asks for creating splitted dir */
735                                 oa->o_easize = *((u16 *) rec->ur_eadata);
736                         }
737
738                         obdo_from_inode(oa, dir, OBD_MD_FLTYPE | OBD_MD_FLATIME |
739                                         OBD_MD_FLMTIME | OBD_MD_FLCTIME |
740                                         OBD_MD_FLUID | OBD_MD_FLGID);
741                         oa->o_mode = dir->i_mode;
742                         CDEBUG(D_OTHER, "%s: create dir on MDS %u\n",
743                                         obd->obd_name, i);
744                         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
745                                 CDEBUG(D_HA, "%s: replay dir creation %*s -> %u/%u\n",
746                                        obd->obd_name, rec->ur_namelen - 1,
747                                        rec->ur_name, (unsigned) rec->ur_fid2->id,
748                                        (unsigned) rec->ur_fid2->generation);
749                                 oa->o_id = rec->ur_fid2->id;
750                                 oa->o_generation = rec->ur_fid2->generation;
751                                 oa->o_flags |= OBD_FL_RECREATE_OBJS;
752                         }
753
754                         rc = obd_create(mds->mds_lmv_exp, oa, NULL, NULL);
755                         if (rc) {
756                                 CERROR("can't create remote inode: %d\n", rc);
757                                 DEBUG_REQ(D_ERROR, req, "parent "LPU64"/%u name %s mode %o",
758                                           rec->ur_fid1->id, rec->ur_fid1->generation,
759                                           rec->ur_name, rec->ur_mode);
760                                 obdo_free(oa);
761                                 GOTO(cleanup, rc);
762                         }
763                         
764                         /* now, add new dir entry for it */
765                         handle = fsfilt_start(obd, dir, FSFILT_OP_MKDIR, NULL);
766                         if (IS_ERR(handle)) {
767                                 obdo_free(oa);
768                                 GOTO(cleanup, rc = PTR_ERR(handle));
769                         }
770                         rc = fsfilt_add_dir_entry(obd, dparent, rec->ur_name,
771                                                   rec->ur_namelen - 1,
772                                                   oa->o_id, oa->o_generation,
773                                                   i);
774                         LASSERT(rc == 0);
775
776                         /* fill reply */
777                         body = lustre_msg_buf(req->rq_repmsg, 0,
778                                               sizeof (*body));
779                         body->valid |= OBD_MD_FLID | OBD_MD_MDS;
780                         body->fid1.id = oa->o_id;
781                         body->fid1.mds = i;
782                         body->fid1.generation = oa->o_generation;
783                         obdo_free(oa);
784                 } else {
785                         /* requested name exists in the directory */
786                         rc = -EEXIST;
787                 }
788                 EXIT;
789                 break;
790         }
791         case S_IFLNK:{
792                 handle = fsfilt_start(obd, dir, FSFILT_OP_SYMLINK, NULL);
793                 if (IS_ERR(handle))
794                         GOTO(cleanup, rc = PTR_ERR(handle));
795                 if (rec->ur_tgt == NULL)        /* no target supplied */
796                         rc = -EINVAL;           /* -EPROTO? */
797                 else
798                         rc = ll_vfs_symlink(dir, dchild, rec->ur_tgt, S_IALLUGO);
799                 EXIT;
800                 break;
801         }
802         case S_IFCHR:
803         case S_IFBLK:
804         case S_IFIFO:
805         case S_IFSOCK:{
806                 int rdev = rec->ur_rdev;
807                 handle = fsfilt_start(obd, dir, FSFILT_OP_MKNOD, NULL);
808                 if (IS_ERR(handle))
809                         GOTO(cleanup, (handle = NULL, rc = PTR_ERR(handle)));
810                 rc = vfs_mknod(dir, dchild, rec->ur_mode, rdev);
811                 EXIT;
812                 break;
813         }
814         default:
815                 CERROR("bad file type %o creating %s\n", type, rec->ur_name);
816                 dchild->d_fsdata = NULL;
817                 GOTO(cleanup, rc = -EINVAL);
818         }
819
820         /* In case we stored the desired inum in here, we want to clean up. */
821         if (dchild->d_fsdata == (void *)(unsigned long)rec->ur_fid2->id)
822                 dchild->d_fsdata = NULL;
823
824         if (rc) {
825                 CDEBUG(D_INODE, "error during create: %d\n", rc);
826                 GOTO(cleanup, rc);
827         } else if (dchild->d_inode) {
828                 struct iattr iattr;
829                 struct inode *inode = dchild->d_inode;
830                 struct mds_body *body;
831
832                 created = 1;
833                 LTIME_S(iattr.ia_atime) = rec->ur_time;
834                 LTIME_S(iattr.ia_ctime) = rec->ur_time;
835                 LTIME_S(iattr.ia_mtime) = rec->ur_time;
836                 iattr.ia_uid = rec->_ur_fsuid;
837                 if (dir->i_mode & S_ISGID)
838                         iattr.ia_gid = dir->i_gid;
839                 else
840                         iattr.ia_gid = rec->_ur_fsgid;
841                 iattr.ia_valid = ATTR_UID | ATTR_GID | ATTR_ATIME |
842                         ATTR_MTIME | ATTR_CTIME;
843
844                 if (rec->ur_fid2->id) {
845                         LASSERT(rec->ur_fid2->id == inode->i_ino);
846                         inode->i_generation = rec->ur_fid2->generation;
847                         /* Dirtied and committed by the upcoming setattr. */
848                         CDEBUG(D_INODE, "recreated ino %lu with gen %u\n",
849                                inode->i_ino, inode->i_generation);
850                 } else {
851 #if 0
852                         struct lustre_handle child_ino_lockh;
853 #endif
854
855                         CDEBUG(D_INODE, "created ino %lu with gen %x\n",
856                                inode->i_ino, inode->i_generation);
857
858 #if 0
859                         /* The inode we were allocated may have just been freed
860                          * by an unlink operation.  We take this lock to
861                          * synchronize against the matching reply-ack-lock taken
862                          * in unlink, to avoid replay problems if this reply
863                          * makes it out to the client but the unlink's does not.
864                          * See bug 2029 for more detail.*/
865                         rc = mds_lock_new_child(obd, inode, &child_ino_lockh);
866                         if (rc != ELDLM_OK) {
867                                 CERROR("error locking for unlink/create sync: "
868                                        "%d\n", rc);
869                         } else {
870                                 ldlm_lock_decref(&child_ino_lockh, LCK_EX);
871                         }
872 #endif
873                 }
874
875                 rc = fsfilt_setattr(obd, dchild, handle, &iattr, 0);
876                 if (rc)
877                         CERROR("error on child setattr: rc = %d\n", rc);
878
879                 iattr.ia_valid = ATTR_MTIME | ATTR_CTIME;
880                 rc = fsfilt_setattr(obd, dparent, handle, &iattr, 0);
881                 if (rc)
882                         CERROR("error on parent setattr: rc = %d\n", rc);
883                 else
884                         MDS_UPDATE_COUNTER(mds, MDS_CREATE_COUNT);
885
886                 body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
887                 mds_pack_inode2fid(obd, &body->fid1, inode);
888                 mds_pack_inode2body(obd, body, inode);
889         }
890         EXIT;
891
892 cleanup:
893         err = mds_finish_transno(mds, dir, handle, req, rc, 0);
894
895         if (rc && created) {
896                 /* Destroy the file we just created. This should not need extra
897                  * journal credits, as we have already modified all of the
898                  * blocks needed in order to create the file in the first
899                  * place. */
900                 switch (type) {
901                 case S_IFDIR:
902                         err = vfs_rmdir(dir, dchild);
903                         if (err)
904                                 CERROR("rmdir in error path: %d\n", err);
905                         break;
906                 default:
907                         err = vfs_unlink(dir, dchild);
908                         if (err)
909                                 CERROR("unlink in error path: %d\n", err);
910                         break;
911                 }
912         } else {
913                 rc = err;
914         }
915         switch (cleanup_phase) {
916         case 2: /* child dentry */
917                 l_dput(dchild);
918         case 1: /* locked parent dentry */
919 #ifdef S_PDIROPS
920                 if (lockh[1].cookie != 0)
921                         ldlm_lock_decref(lockh + 1, parent_mode);
922 #endif
923                 if (rc) {
924                         ldlm_lock_decref(lockh, LCK_PW);
925                 } else {
926                         ptlrpc_save_lock(req, lockh, LCK_PW);
927                 }
928                 l_dput(dparent);
929         case 0:
930                 break;
931         default:
932                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
933                 LBUG();
934         }
935         if (mea)
936                 OBD_FREE(mea, mea_size);
937         req->rq_status = rc;
938         return 0;
939 }
940
941 static int res_gt(struct ldlm_res_id *res1, struct ldlm_res_id *res2,
942            ldlm_policy_data_t *p1, ldlm_policy_data_t *p2)
943 {
944         int i;
945
946         for (i = 0; i < RES_NAME_SIZE; i++) {
947                 /* return 1 here, because enqueue_ordered will skip resources
948                  * of all zeroes if they're sorted to the end of the list. */
949                 if (res1->name[i] == 0 && res2->name[i] != 0)
950                         return 1;
951                 if (res2->name[i] == 0 && res1->name[i] != 0)
952                         return 0;
953
954                 if (res1->name[i] > res2->name[i])
955                         return 1;
956                 if (res1->name[i] < res2->name[i])
957                         return 0;
958         }
959
960         if (!p1 || !p2)
961                 return 0;
962
963         if (memcmp(p1, p2, sizeof(*p1)) < 0)
964                 return 1;
965
966         return 0;
967 }
968
969 /* This function doesn't use ldlm_match_or_enqueue because we're always called
970  * with EX or PW locks, and the MDS is no longer allowed to match write locks,
971  * because they take the place of local semaphores.
972  *
973  * One or two locks are taken in numerical order.  A res_id->name[0] of 0 means
974  * no lock is taken for that res_id.  Must be at least one non-zero res_id. */
975 int enqueue_ordered_locks(struct obd_device *obd, struct ldlm_res_id *p1_res_id,
976                           struct lustre_handle *p1_lockh, int p1_lock_mode,
977                           ldlm_policy_data_t *p1_policy,
978                           struct ldlm_res_id *p2_res_id,
979                           struct lustre_handle *p2_lockh, int p2_lock_mode,
980                           ldlm_policy_data_t *p2_policy)
981 {
982         struct ldlm_res_id *res_id[2] = { p1_res_id, p2_res_id };
983         struct lustre_handle *handles[2] = { p1_lockh, p2_lockh };
984         int lock_modes[2] = { p1_lock_mode, p2_lock_mode };
985         ldlm_policy_data_t *policies[2] = { p1_policy, p2_policy };
986         int rc, flags;
987         ENTRY;
988
989         LASSERT(p1_res_id != NULL && p2_res_id != NULL);
990
991         CDEBUG(D_INFO, "locks before: "LPU64"/"LPU64"\n", res_id[0]->name[0],
992                res_id[1]->name[0]);
993
994         if (res_gt(p1_res_id, p2_res_id, p1_policy, p2_policy)) {
995                 handles[1] = p1_lockh;
996                 handles[0] = p2_lockh;
997                 res_id[1] = p1_res_id;
998                 res_id[0] = p2_res_id;
999                 lock_modes[1] = p1_lock_mode;
1000                 lock_modes[0] = p2_lock_mode;
1001                 policies[1] = p1_policy;
1002                 policies[0] = p2_policy;
1003         }
1004
1005         CDEBUG(D_DLMTRACE, "lock order: "LPU64"/"LPU64"\n",
1006                res_id[0]->name[0], res_id[1]->name[0]);
1007
1008         flags = LDLM_FL_LOCAL_ONLY;
1009         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, *res_id[0],
1010                               LDLM_IBITS, policies[0], lock_modes[0], &flags,
1011                               mds_blocking_ast, ldlm_completion_ast, NULL, NULL,
1012                               NULL, 0, NULL, handles[0]);
1013         if (rc != ELDLM_OK)
1014                 RETURN(-EIO);
1015         ldlm_lock_dump_handle(D_OTHER, handles[0]);
1016
1017         if (!memcmp(res_id[0], res_id[1], sizeof(*res_id[0])) &&
1018             (policies[0]->l_inodebits.bits & policies[1]->l_inodebits.bits)) {
1019                 memcpy(handles[1], handles[0], sizeof(*(handles[1])));
1020                 ldlm_lock_addref(handles[1], lock_modes[1]);
1021         } else if (res_id[1]->name[0] != 0) {
1022                 flags = LDLM_FL_LOCAL_ONLY;
1023                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
1024                                       *res_id[1], LDLM_IBITS, policies[1],
1025                                       lock_modes[1], &flags, mds_blocking_ast,
1026                                       ldlm_completion_ast, NULL, NULL, NULL, 0,
1027                                       NULL, handles[1]);
1028                 if (rc != ELDLM_OK) {
1029                         ldlm_lock_decref(handles[0], lock_modes[0]);
1030                         RETURN(-EIO);
1031                 }
1032                 ldlm_lock_dump_handle(D_OTHER, handles[1]);
1033         }
1034
1035         RETURN(0);
1036 }
1037
1038 int enqueue_4ordered_locks(struct obd_device *obd,struct ldlm_res_id *p1_res_id,
1039                            struct lustre_handle *p1_lockh, int p1_lock_mode,
1040                            ldlm_policy_data_t *p1_policy,
1041                            struct ldlm_res_id *p2_res_id,
1042                            struct lustre_handle *p2_lockh, int p2_lock_mode,
1043                            ldlm_policy_data_t *p2_policy,
1044                            struct ldlm_res_id *c1_res_id,
1045                            struct lustre_handle *c1_lockh, int c1_lock_mode,
1046                            ldlm_policy_data_t *c1_policy,
1047                            struct ldlm_res_id *c2_res_id,
1048                            struct lustre_handle *c2_lockh, int c2_lock_mode,
1049                            ldlm_policy_data_t *c2_policy)
1050 {
1051         struct ldlm_res_id *res_id[5] = { p1_res_id, p2_res_id,
1052                                           c1_res_id, c2_res_id };
1053         struct lustre_handle *dlm_handles[5] = { p1_lockh, p2_lockh,
1054                                                  c1_lockh, c2_lockh };
1055         int lock_modes[5] = { p1_lock_mode, p2_lock_mode,
1056                               c1_lock_mode, c2_lock_mode };
1057         ldlm_policy_data_t *policies[5] = { p1_policy, p2_policy,
1058                                             c1_policy, c2_policy};
1059         int rc, i, j, sorted, flags;
1060         ENTRY;
1061
1062         CDEBUG(D_DLMTRACE, "locks before: "LPU64"/"LPU64"/"LPU64"/"LPU64"\n",
1063                res_id[0]->name[0], res_id[1]->name[0], res_id[2]->name[0],
1064                res_id[3]->name[0]);
1065
1066         /* simple insertion sort - we have at most 4 elements */
1067         for (i = 1; i < 4; i++) {
1068                 j = i - 1;
1069                 dlm_handles[4] = dlm_handles[i];
1070                 res_id[4] = res_id[i];
1071                 lock_modes[4] = lock_modes[i];
1072                 policies[4] = policies[i];
1073
1074                 sorted = 0;
1075                 do {
1076                         if (res_gt(res_id[j], res_id[4], policies[j],
1077                                    policies[4])) {
1078                                 dlm_handles[j + 1] = dlm_handles[j];
1079                                 res_id[j + 1] = res_id[j];
1080                                 lock_modes[j + 1] = lock_modes[j];
1081                                 policies[j + 1] = policies[j];
1082                                 j--;
1083                         } else {
1084                                 sorted = 1;
1085                         }
1086                 } while (j >= 0 && !sorted);
1087
1088                 dlm_handles[j + 1] = dlm_handles[4];
1089                 res_id[j + 1] = res_id[4];
1090                 lock_modes[j + 1] = lock_modes[4];
1091                 policies[j + 1] = policies[4];
1092         }
1093
1094         CDEBUG(D_DLMTRACE, "lock order: "LPU64"/"LPU64"/"LPU64"/"LPU64"\n",
1095                res_id[0]->name[0], res_id[1]->name[0], res_id[2]->name[0],
1096                res_id[3]->name[0]);
1097
1098         /* XXX we could send ASTs on all these locks first before blocking? */
1099         for (i = 0; i < 4; i++) {
1100                 flags = 0;
1101                 if (res_id[i]->name[0] == 0)
1102                         break;
1103                 if (i != 0 &&
1104                     !memcmp(res_id[i], res_id[i-1], sizeof(*res_id[i])) &&
1105                     (policies[i]->l_inodebits.bits &
1106                      policies[i-1]->l_inodebits.bits) ) {
1107                         memcpy(dlm_handles[i], dlm_handles[i-1],
1108                                sizeof(*(dlm_handles[i])));
1109                         ldlm_lock_addref(dlm_handles[i], lock_modes[i]);
1110                 } else {
1111                         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
1112                                               *res_id[i], LDLM_IBITS,
1113                                               policies[i],
1114                                               lock_modes[i], &flags,
1115                                               mds_blocking_ast,
1116                                               ldlm_completion_ast, NULL, NULL,
1117                                               NULL, 0, NULL, dlm_handles[i]);
1118                         if (rc != ELDLM_OK)
1119                                 GOTO(out_err, rc = -EIO);
1120                         ldlm_lock_dump_handle(D_OTHER, dlm_handles[i]);
1121                 }
1122         }
1123
1124         RETURN(0);
1125 out_err:
1126         while (i-- > 0)
1127                 ldlm_lock_decref(dlm_handles[i], lock_modes[i]);
1128
1129         return rc;
1130 }
1131
1132 /* In the unlikely case that the child changed while we were waiting
1133  * on the lock, we need to drop the lock on the old child and either:
1134  * - if the child has a lower resource name, then we have to also
1135  *   drop the parent lock and regain the locks in the right order
1136  * - in the rename case, if the child has a lower resource name than one of
1137  *   the other parent/child resources (maxres) we also need to reget the locks
1138  * - if the child has a higher resource name (this is the common case)
1139  *   we can just get the lock on the new child (still in lock order)
1140  *
1141  * Returns 0 if the child did not change or if it changed but could be locked.
1142  * Returns 1 if the child changed and we need to re-lock (no locks held).
1143  * Returns -ve error with a valid dchild (no locks held). */
1144 static int mds_verify_child(struct obd_device *obd,
1145                             struct ldlm_res_id *parent_res_id,
1146                             struct lustre_handle *parent_lockh,
1147                             struct dentry *dparent, int parent_mode,
1148                             struct ldlm_res_id *child_res_id,
1149                             struct lustre_handle *child_lockh,
1150                             struct dentry **dchildp, int child_mode,
1151                             ldlm_policy_data_t *child_policy,
1152                             const char *name, int namelen,
1153                             struct ldlm_res_id *maxres)
1154 {
1155         struct dentry *vchild, *dchild = *dchildp;
1156         int rc = 0, cleanup_phase = 2; /* parent, child locks */
1157         ENTRY;
1158
1159         vchild = ll_lookup_one_len(name, dparent, namelen - 1);
1160         if (IS_ERR(vchild))
1161                 GOTO(cleanup, rc = PTR_ERR(vchild));
1162
1163         if ((vchild->d_flags & DCACHE_CROSS_REF)) {
1164                 if  (child_res_id->name[0] == vchild->d_inum &&
1165                                 child_res_id->name[1] == vchild->d_generation) {
1166                         if (dchild != NULL)
1167                                 l_dput(dchild);
1168                         *dchildp = vchild;
1169                         RETURN(0);
1170                 }
1171                 goto changed;
1172         }
1173
1174         if (likely((vchild->d_inode == NULL && child_res_id->name[0] == 0) ||
1175                    (vchild->d_inode != NULL &&
1176                     child_res_id->name[0] == vchild->d_inode->i_ino &&
1177                     child_res_id->name[1] == vchild->d_inode->i_generation))) {
1178                 if (dchild != NULL)
1179                         l_dput(dchild);
1180                 *dchildp = vchild;
1181
1182                 RETURN(0);
1183         }
1184
1185 changed:
1186         CDEBUG(D_DLMTRACE, "child inode changed: %p != %p (%lu != "LPU64")\n",
1187                vchild->d_inode, dchild ? dchild->d_inode : 0,
1188                vchild->d_inode ? vchild->d_inode->i_ino : 0,
1189                child_res_id->name[0]);
1190         if (child_res_id->name[0] != 0)
1191                 ldlm_lock_decref(child_lockh, child_mode);
1192         if (dchild)
1193                 l_dput(dchild);
1194
1195         cleanup_phase = 1; /* parent lock only */
1196         *dchildp = dchild = vchild;
1197
1198         if (dchild->d_inode || (dchild->d_flags & DCACHE_CROSS_REF)) {
1199                 int flags = 0;
1200                 if (dchild->d_inode) {
1201                         child_res_id->name[0] = dchild->d_inode->i_ino;
1202                         child_res_id->name[1] = dchild->d_inode->i_generation;
1203                 } else {
1204                         child_res_id->name[0] = dchild->d_inum;
1205                         child_res_id->name[1] = dchild->d_generation;
1206                 }
1207
1208                 if (res_gt(parent_res_id, child_res_id, NULL, NULL) ||
1209                     res_gt(maxres, child_res_id, NULL, NULL)) {
1210                         CDEBUG(D_DLMTRACE, "relock "LPU64"<("LPU64"|"LPU64")\n",
1211                                child_res_id->name[0], parent_res_id->name[0],
1212                                maxres->name[0]);
1213                         GOTO(cleanup, rc = 1);
1214                 }
1215
1216                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
1217                                       *child_res_id, LDLM_IBITS, child_policy,
1218                                       child_mode, &flags, mds_blocking_ast,
1219                                       ldlm_completion_ast, NULL, NULL, NULL, 0,
1220                                       NULL, child_lockh);
1221                 if (rc != ELDLM_OK)
1222                         GOTO(cleanup, rc = -EIO);
1223
1224         } else {
1225                 memset(child_res_id, 0, sizeof(*child_res_id));
1226         }
1227
1228         EXIT;
1229 cleanup:
1230         if (rc) {
1231                 switch(cleanup_phase) {
1232                 case 2:
1233                         if (child_res_id->name[0] != 0)
1234                                 ldlm_lock_decref(child_lockh, child_mode);
1235                 case 1:
1236                         ldlm_lock_decref(parent_lockh, parent_mode);
1237                 }
1238         }
1239         return rc;
1240 }
1241
1242 int mds_get_parent_child_locked(struct obd_device *obd, struct mds_obd *mds,
1243                                 struct ll_fid *fid,
1244                                 struct lustre_handle *parent_lockh,
1245                                 struct dentry **dparentp, int parent_mode,
1246                                 __u64 parent_lockpart, int *update_mode,
1247                                 char *name, int namelen,
1248                                 struct lustre_handle *child_lockh,
1249                                 struct dentry **dchildp, int child_mode,
1250                                 __u64 child_lockpart)
1251 {
1252         struct ldlm_res_id child_res_id = { .name = {0} };
1253         struct ldlm_res_id parent_res_id = { .name = {0} };
1254         ldlm_policy_data_t parent_policy = {.l_inodebits = { parent_lockpart }};
1255         ldlm_policy_data_t child_policy = {.l_inodebits = { child_lockpart }};
1256         struct inode *inode;
1257         int rc = 0, cleanup_phase = 0;
1258         ENTRY;
1259
1260         /* Step 1: Lookup parent */
1261         *dparentp = mds_fid2dentry(mds, fid, NULL);
1262         if (IS_ERR(*dparentp)) {
1263                 rc = PTR_ERR(*dparentp);
1264                 *dparentp = NULL;
1265                 RETURN(rc);
1266         }
1267
1268         CDEBUG(D_INODE, "parent ino %lu, name %s\n",
1269                (*dparentp)->d_inode->i_ino, name);
1270
1271         parent_res_id.name[0] = (*dparentp)->d_inode->i_ino;
1272         parent_res_id.name[1] = (*dparentp)->d_inode->i_generation;
1273 #ifdef S_PDIROPS
1274         parent_lockh[1].cookie = 0;
1275         if (name && IS_PDIROPS((*dparentp)->d_inode)) {
1276                 struct ldlm_res_id res_id = { .name = {0} };
1277                 ldlm_policy_data_t policy;
1278                 int flags = 0;
1279                 *update_mode = mds_lock_mode_for_dir(obd, *dparentp, parent_mode);
1280                 if (*update_mode) {
1281                         res_id.name[0] = (*dparentp)->d_inode->i_ino;
1282                         res_id.name[1] = (*dparentp)->d_inode->i_generation;
1283                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1284                         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
1285                                               res_id, LDLM_IBITS, &policy,
1286                                               *update_mode, &flags,
1287                                               mds_blocking_ast,
1288                                               ldlm_completion_ast,
1289                                               NULL, NULL, NULL, 0, NULL,
1290                                               parent_lockh + 1);
1291                         if (rc != ELDLM_OK)
1292                                 RETURN(-ENOLCK);
1293                 }
1294
1295                 parent_res_id.name[2] = full_name_hash(name, namelen - 1);
1296                 CDEBUG(D_INFO, "take lock on %lu:%u:"LPX64"\n",
1297                        (*dparentp)->d_inode->i_ino, 
1298                        (*dparentp)->d_inode->i_generation,
1299                        parent_res_id.name[2]);
1300         }
1301 #endif
1302
1303         cleanup_phase = 1; /* parent dentry */
1304
1305         /* Step 2: Lookup child (without DLM lock, to get resource name) */
1306         *dchildp = ll_lookup_one_len(name, *dparentp, namelen - 1);
1307         if (IS_ERR(*dchildp)) {
1308                 rc = PTR_ERR(*dchildp);
1309                 CDEBUG(D_INODE, "child lookup error %d\n", rc);
1310                 GOTO(cleanup, rc);
1311         }
1312
1313         if ((*dchildp)->d_flags & DCACHE_CROSS_REF) {
1314                 /* inode lives on another MDS: return * mds/ino/gen
1315                  * and LOOKUP lock. drop possible UPDATE lock! */
1316                 child_policy.l_inodebits.bits &= ~MDS_INODELOCK_UPDATE;
1317                 child_policy.l_inodebits.bits |= MDS_INODELOCK_LOOKUP;
1318                 child_res_id.name[0] = (*dchildp)->d_inum;
1319                 child_res_id.name[1] = (*dchildp)->d_generation;
1320                 goto retry_locks;
1321         }
1322
1323         inode = (*dchildp)->d_inode;
1324         if (inode != NULL)
1325                 inode = igrab(inode);
1326         if (inode == NULL)
1327                 goto retry_locks;
1328
1329         child_res_id.name[0] = inode->i_ino;
1330         child_res_id.name[1] = inode->i_generation;
1331
1332         iput(inode);
1333
1334 retry_locks:
1335         cleanup_phase = 2; /* child dentry */
1336
1337         /* Step 3: Lock parent and child in resource order.  If child doesn't
1338          *         exist, we still have to lock the parent and re-lookup. */
1339         rc = enqueue_ordered_locks(obd,&parent_res_id,parent_lockh,parent_mode,
1340                                    &parent_policy, &child_res_id, child_lockh,
1341                                    child_mode, &child_policy);
1342         if (rc)
1343                 GOTO(cleanup, rc);
1344
1345         if ((*dchildp)->d_inode || ((*dchildp)->d_flags & DCACHE_CROSS_REF))
1346                 cleanup_phase = 4; /* child lock */
1347         else
1348                 cleanup_phase = 3; /* parent lock */
1349
1350         /* Step 4: Re-lookup child to verify it hasn't changed since locking */
1351         rc = mds_verify_child(obd, &parent_res_id, parent_lockh, *dparentp,
1352                               parent_mode, &child_res_id, child_lockh, 
1353                               dchildp, child_mode, &child_policy,
1354                               name, namelen, &parent_res_id);
1355         if (rc > 0)
1356                 goto retry_locks;
1357         if (rc < 0) {
1358                 cleanup_phase = 3;
1359                 GOTO(cleanup, rc);
1360         }
1361
1362 cleanup:
1363         if (rc) {
1364                 switch (cleanup_phase) {
1365                 case 4:
1366                         ldlm_lock_decref(child_lockh, child_mode);
1367                 case 3:
1368                         ldlm_lock_decref(parent_lockh, parent_mode);
1369                 case 2:
1370                         l_dput(*dchildp);
1371                 case 1:
1372 #ifdef S_PDIROPS
1373                         if (parent_lockh[1].cookie)
1374                                 ldlm_lock_decref(parent_lockh + 1, *update_mode);
1375 #endif
1376                         l_dput(*dparentp);
1377                 default: ;
1378                 }
1379         }
1380         return rc;
1381 }
1382
1383 void mds_reconstruct_generic(struct ptlrpc_request *req)
1384 {
1385         struct mds_export_data *med = &req->rq_export->exp_mds_data;
1386
1387         mds_req_from_mcd(req, med->med_mcd);
1388 }
1389 /* If we are unlinking an open file/dir (i.e. creating an orphan) then
1390  * we instead link the inode into the PENDING directory until it is
1391  * finally released.  We can't simply call mds_reint_rename() or some
1392  * part thereof, because we don't have the inode to check for link
1393  * count/open status until after it is locked.
1394  *
1395  * For lock ordering, caller must get child->i_sem first, then pending->i_sem
1396  * before starting journal transaction.
1397  *
1398  * returns 1 on success
1399  * returns 0 if we lost a race and didn't make a new link
1400  * returns negative on error
1401  */
1402 static int mds_orphan_add_link(struct mds_update_record *rec,
1403                                struct obd_device *obd, struct dentry *dentry)
1404 {
1405         struct mds_obd *mds = &obd->u.mds;
1406         struct inode *pending_dir = mds->mds_pending_dir->d_inode;
1407         struct inode *inode = dentry->d_inode;
1408         struct dentry *pending_child;
1409         char fidname[LL_FID_NAMELEN];
1410         int fidlen = 0, rc, mode;
1411         ENTRY;
1412
1413         LASSERT(inode != NULL);
1414         LASSERT(!mds_inode_is_orphan(inode));
1415 #ifndef HAVE_I_ALLOC_SEM
1416         LASSERT(down_trylock(&inode->i_sem) != 0);
1417 #endif
1418         LASSERT(down_trylock(&pending_dir->i_sem) != 0);
1419
1420         fidlen = ll_fid2str(fidname, inode->i_ino, inode->i_generation);
1421
1422         CDEBUG(D_INODE, "pending destroy of %dx open %d linked %s %s = %s\n",
1423                mds_orphan_open_count(inode), inode->i_nlink,
1424                S_ISDIR(inode->i_mode) ? "dir" :
1425                 S_ISREG(inode->i_mode) ? "file" : "other",rec->ur_name,fidname);
1426
1427         if (mds_orphan_open_count(inode) == 0 || inode->i_nlink != 0)
1428                 RETURN(0);
1429
1430         pending_child = lookup_one_len(fidname, mds->mds_pending_dir, fidlen);
1431         if (IS_ERR(pending_child))
1432                 RETURN(PTR_ERR(pending_child));
1433
1434         if (pending_child->d_inode != NULL) {
1435                 CERROR("re-destroying orphan file %s?\n", rec->ur_name);
1436                 LASSERT(pending_child->d_inode == inode);
1437                 GOTO(out_dput, rc = 0);
1438         }
1439
1440         /* link() is semanticaly-wrong for S_IFDIR, so we set S_IFREG
1441          * for linking and return real mode back then -bzzz */
1442         mode = inode->i_mode;
1443         inode->i_mode = S_IFREG;
1444         rc = vfs_link(dentry, pending_dir, pending_child);
1445         if (rc)
1446                 CERROR("error linking orphan %s to PENDING: rc = %d\n",
1447                        rec->ur_name, rc);
1448         else
1449                 mds_inode_set_orphan(inode);
1450
1451         /* return mode and correct i_nlink if inode is directory */
1452         inode->i_mode = mode;
1453         LASSERTF(inode->i_nlink == 1, "%s nlink == %d\n",
1454                  S_ISDIR(mode) ? "dir" : S_ISREG(mode) ? "file" : "other",
1455                  inode->i_nlink);
1456         if (S_ISDIR(mode)) {
1457                 inode->i_nlink++;
1458                 pending_dir->i_nlink++;
1459                 mark_inode_dirty(inode);
1460                 mark_inode_dirty(pending_dir);
1461         }
1462
1463         GOTO(out_dput, rc = 1);
1464 out_dput:
1465         l_dput(pending_child);
1466         RETURN(rc);
1467 }
1468
1469 int mds_create_local_dentry(struct mds_update_record *rec,
1470                            struct obd_device *obd)
1471 {
1472         struct mds_obd *mds = &obd->u.mds;
1473         struct inode *fids_dir = mds->mds_fids_dir->d_inode;
1474         int fidlen = 0, rc, cleanup_phase = 0;
1475         struct dentry *new_child = NULL;
1476         char *fidname = rec->ur_name;
1477         struct dentry *child = NULL;
1478         struct lustre_handle lockh[2] = {{0}, {0}};
1479         void *handle;
1480         ENTRY;
1481
1482         down(&fids_dir->i_sem);
1483         fidlen = ll_fid2str(fidname, rec->ur_fid1->id, rec->ur_fid1->generation);
1484         CDEBUG(D_OTHER, "look for local dentry '%s' for %u/%u\n",
1485                         fidname, (unsigned) rec->ur_fid1->id,
1486                         (unsigned) rec->ur_fid1->generation);
1487
1488         new_child = lookup_one_len(fidname, mds->mds_fids_dir, fidlen);
1489         up(&fids_dir->i_sem);
1490         if (IS_ERR(new_child)) {
1491                 CERROR("can't lookup %s: %d\n", fidname,
1492                                 (int) PTR_ERR(new_child));
1493                 GOTO(cleanup, rc = PTR_ERR(new_child));
1494         }
1495         cleanup_phase = 1;
1496
1497         if (new_child->d_inode != NULL) {
1498                 /* nice. we've already have local dentry! */
1499                 CDEBUG(D_OTHER, "found dentry in FIDS/: %u/%u\n", 
1500                        (unsigned) new_child->d_inode->i_ino,
1501                        (unsigned) new_child->d_inode->i_generation);
1502                 rec->ur_fid1->id = fids_dir->i_ino;
1503                 rec->ur_fid1->generation = fids_dir->i_generation;
1504                 rec->ur_namelen = fidlen + 1;
1505                 GOTO(cleanup, rc = 0);
1506         }
1507
1508         /* new, local dentry will be added soon. we need no aliases here */
1509         d_drop(new_child);
1510
1511         if (rec->ur_mode & MDS_MODE_DONT_LOCK) {
1512                 child = mds_fid2dentry(mds, rec->ur_fid1, NULL);
1513         } else {
1514                 child = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL,
1515                                               LCK_EX, lockh, NULL, NULL, 0,
1516                                               MDS_INODELOCK_UPDATE);
1517         }
1518
1519         if (IS_ERR(child)) {
1520                 rc = PTR_ERR(child);
1521                 if (rc != -ENOENT || !(rec->ur_mode & MDS_MODE_REPLAY))
1522                         CERROR("can't get victim: %d\n", rc);
1523                 GOTO(cleanup, rc);
1524         }
1525         cleanup_phase = 2;
1526
1527         handle = fsfilt_start(obd, fids_dir, FSFILT_OP_LINK, NULL);
1528         if (IS_ERR(handle))
1529                 GOTO(cleanup, rc = PTR_ERR(handle));
1530
1531         rc = fsfilt_add_dir_entry(obd, mds->mds_fids_dir, fidname, fidlen,
1532                                   rec->ur_fid1->id, rec->ur_fid1->generation,
1533                                   mds->mds_num);
1534         if (rc)
1535                 CERROR("error linking orphan %lu/%lu to FIDS: rc = %d\n",
1536                        (unsigned long) child->d_inode->i_ino,
1537                        (unsigned long) child->d_inode->i_generation, rc);
1538         else {
1539                 if (S_ISDIR(child->d_inode->i_mode)) {
1540                         fids_dir->i_nlink++;
1541                         mark_inode_dirty(fids_dir);
1542                 }
1543                 mark_inode_dirty(child->d_inode);
1544         }
1545         fsfilt_commit(obd, mds->mds_sb, fids_dir, handle, 0);
1546
1547         rec->ur_fid1->id = fids_dir->i_ino;
1548         rec->ur_fid1->generation = fids_dir->i_generation;
1549         rec->ur_namelen = fidlen + 1;
1550
1551 cleanup:
1552         switch(cleanup_phase) {
1553                 case 2:
1554                         if (!(rec->ur_mode & MDS_MODE_DONT_LOCK))
1555                                 ldlm_lock_decref(lockh, LCK_EX);
1556                         dput(child);
1557                 case 1:
1558                         dput(new_child);
1559                 case 0:
1560                        break; 
1561         }
1562         RETURN(rc);
1563 }
1564
1565 static int mds_copy_unlink_reply(struct ptlrpc_request *master,
1566                                         struct ptlrpc_request *slave)
1567 {
1568         void *cookie, *cookie2;
1569         struct mds_body *body2;
1570         struct mds_body *body;
1571         void *ea, *ea2;
1572         ENTRY;
1573
1574         body = lustre_msg_buf(slave->rq_repmsg, 0, sizeof(*body));
1575         LASSERT(body != NULL);
1576
1577         body2 = lustre_msg_buf(master->rq_repmsg, 0, sizeof (*body));
1578         LASSERT(body2 != NULL);
1579
1580         if (!(body->valid & (OBD_MD_FLID | OBD_MD_FLGENER))) {
1581                 RETURN(0);
1582         }
1583
1584         memcpy(body2, body, sizeof(*body));
1585         body2->valid &= ~OBD_MD_FLCOOKIE;
1586
1587         if (!(body->valid & OBD_MD_FLEASIZE) &&
1588             !(body->valid & OBD_MD_FLDIREA))
1589                 RETURN(0);
1590
1591         if (body->eadatasize == 0) {
1592                 CERROR("OBD_MD_FLEASIZE set but eadatasize zero\n");
1593                 RETURN(0);
1594         }
1595
1596         LASSERT(master->rq_repmsg->buflens[1] >= body->eadatasize);
1597         
1598         ea = lustre_msg_buf(slave->rq_repmsg, 1, body->eadatasize);
1599         LASSERT(ea != NULL);
1600         
1601         ea2 = lustre_msg_buf(master->rq_repmsg, 1, body->eadatasize);
1602         LASSERT(ea2 != NULL);
1603
1604         memcpy(ea2, ea, body->eadatasize);
1605
1606         if (body->valid & OBD_MD_FLCOOKIE) {
1607                 LASSERT(master->rq_repmsg->buflens[2] >=
1608                                 slave->rq_repmsg->buflens[2]);
1609                 cookie = lustre_msg_buf(slave->rq_repmsg, 2,
1610                                 slave->rq_repmsg->buflens[2]);
1611                 LASSERT(cookie != NULL);
1612
1613                 cookie2 = lustre_msg_buf(master->rq_repmsg, 2,
1614                                 master->rq_repmsg->buflens[2]);
1615                 LASSERT(cookie2 != NULL);
1616                 memcpy(cookie2, cookie, slave->rq_repmsg->buflens[2]);
1617                 body2->valid |= OBD_MD_FLCOOKIE;
1618         }
1619         RETURN(0);
1620 }
1621
1622 static int mds_reint_unlink_remote(struct mds_update_record *rec, int offset,
1623                                    struct ptlrpc_request *req,
1624                                    struct lustre_handle *parent_lockh,
1625                                    int update_mode,
1626                                    struct dentry *dparent,
1627                                    struct lustre_handle *child_lockh,
1628                                    struct dentry *dchild)
1629 {
1630         struct obd_device *obd = req->rq_export->exp_obd;
1631         struct mds_obd *mds = mds_req2mds(req);
1632         struct mdc_op_data op_data;
1633         int rc = 0, cleanup_phase = 0;
1634         struct ptlrpc_request *request = NULL;
1635         void *handle;
1636         ENTRY;
1637
1638         LASSERT(offset == 1 || offset == 3);
1639
1640         DEBUG_REQ(D_INODE, req, "unlink %*s (remote inode %u/%u/%u)",
1641                   rec->ur_namelen - 1, rec->ur_name, (unsigned)dchild->d_mdsnum,
1642                   (unsigned) dchild->d_inum, (unsigned) dchild->d_generation);
1643         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
1644                 DEBUG_REQ(D_HA, req, "unlink %*s (remote inode %u/%u/%u)",
1645                           rec->ur_namelen - 1, rec->ur_name,
1646                           (unsigned)dchild->d_mdsnum,
1647                           (unsigned) dchild->d_inum,
1648                           (unsigned) dchild->d_generation);
1649
1650         /* time to drop i_nlink on remote MDS */
1651         memset(&op_data, 0, sizeof(op_data));
1652         op_data.fid1.mds = dchild->d_mdsnum;
1653         op_data.fid1.id = dchild->d_inum;
1654         op_data.fid1.generation = dchild->d_generation;
1655         op_data.create_mode = rec->ur_mode;
1656         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
1657                 op_data.create_mode |= MDS_MODE_REPLAY;
1658         rc = md_unlink(mds->mds_lmv_exp, &op_data, &request);
1659         cleanup_phase = 2;
1660         if (request) {
1661                 if (rc == 0)
1662                         mds_copy_unlink_reply(req, request);
1663                 ptlrpc_req_finished(request);
1664         }
1665         if (rc == 0) {
1666                 handle = fsfilt_start(obd, dparent->d_inode, FSFILT_OP_RMDIR,
1667                                       NULL);
1668                 if (IS_ERR(handle))
1669                         GOTO(cleanup, rc = PTR_ERR(handle));
1670                 rc = fsfilt_del_dir_entry(req->rq_export->exp_obd, dchild);
1671                 rc = mds_finish_transno(mds, dparent->d_inode, handle, req,
1672                                         rc, 0);
1673         }
1674 cleanup:
1675         req->rq_status = rc;
1676
1677 #ifdef S_PDIROPS
1678         if (parent_lockh[1].cookie != 0)
1679                 ldlm_lock_decref(parent_lockh + 1, update_mode);
1680 #endif
1681         ldlm_lock_decref(child_lockh, LCK_EX);
1682         if (rc)
1683                 ldlm_lock_decref(parent_lockh, LCK_PW);
1684         else
1685                 ptlrpc_save_lock(req, parent_lockh, LCK_PW);
1686         l_dput(dchild);
1687         l_dput(dparent);
1688
1689         return 0;
1690 }
1691
1692 static int mds_reint_unlink(struct mds_update_record *rec, int offset,
1693                             struct ptlrpc_request *req,
1694                             struct lustre_handle *lh)
1695 {
1696         struct dentry *dparent = NULL, *dchild;
1697         struct mds_obd *mds = mds_req2mds(req);
1698         struct obd_device *obd = req->rq_export->exp_obd;
1699         struct mds_body *body = NULL;
1700         struct inode *child_inode = NULL;
1701         struct lustre_handle parent_lockh[2] = {{0}, {0}}; 
1702         struct lustre_handle child_lockh = {0};
1703 #if 0
1704         struct lustre_handle child_reuse_lockh = {0};
1705 #endif
1706         struct lustre_handle * slave_lockh = NULL;
1707         struct llog_create_locks *lcl = NULL;
1708         char fidname[LL_FID_NAMELEN];
1709         void *handle = NULL;
1710         int rc = 0, cleanup_phase = 0;
1711         int unlink_by_fid = 0;
1712         int update_mode;
1713         ENTRY;
1714
1715         LASSERT(offset == 1 || offset == 3);
1716
1717         DEBUG_REQ(D_INODE, req, "parent ino "LPU64"/%u, child %s",
1718                   rec->ur_fid1->id, rec->ur_fid1->generation, rec->ur_name);
1719
1720         MDS_CHECK_RESENT(req, mds_reconstruct_generic(req));
1721
1722         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
1723                 DEBUG_REQ(D_HA, req, "unlink replay\n");
1724                 LASSERT(offset == 1); /* should not come from intent */
1725                 memcpy(lustre_msg_buf(req->rq_repmsg, 2, 0),
1726                        lustre_msg_buf(req->rq_reqmsg, offset + 2, 0),
1727                        req->rq_repmsg->buflens[2]);
1728         }
1729
1730         MDS_UPDATE_COUNTER(mds, MDS_UNLINK_COUNT);
1731
1732         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
1733                 GOTO(cleanup, rc = -ENOENT);
1734
1735         if (rec->ur_namelen == 1) {
1736                 /* this is request to drop i_nlink on local inode */
1737                 unlink_by_fid = 1;
1738                 rec->ur_name = fidname;
1739                 rc = mds_create_local_dentry(rec, obd);
1740                 if (rc == -ENOENT || (rec->ur_mode & MDS_MODE_REPLAY)) {
1741                         DEBUG_REQ(D_HA, req,
1742                                   "drop nlink on inode %u/%u/%u (replay)",
1743                                   (unsigned) rec->ur_fid1->mds,
1744                                   (unsigned) rec->ur_fid1->id,
1745                                   (unsigned) rec->ur_fid1->generation);
1746                         req->rq_status = 0;
1747                         RETURN(0);
1748                 }
1749         }
1750
1751         if (rec->ur_mode & MDS_MODE_DONT_LOCK) {
1752                 /* master mds for directory asks slave removing
1753                  * inode is already locked */
1754                 dparent = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL,
1755                                                LCK_PW, parent_lockh,
1756                                                &update_mode, rec->ur_name,
1757                                                rec->ur_namelen,
1758                                                MDS_INODELOCK_UPDATE);
1759                 if (IS_ERR(dparent))
1760                         GOTO(cleanup, rc = PTR_ERR(dparent));
1761                 dchild = ll_lookup_one_len(rec->ur_name, dparent,
1762                                            rec->ur_namelen - 1);
1763                 if (IS_ERR(dchild))
1764                         GOTO(cleanup, rc = PTR_ERR(dchild));
1765                 child_lockh.cookie = 0;
1766                 LASSERT(!(dchild->d_flags & DCACHE_CROSS_REF));
1767                 LASSERT(dchild->d_inode != NULL);
1768                 LASSERT(S_ISDIR(dchild->d_inode->i_mode));
1769         } else {
1770                 rc = mds_get_parent_child_locked(obd, mds, rec->ur_fid1,
1771                                                  parent_lockh, &dparent,
1772                                                  LCK_PW, MDS_INODELOCK_UPDATE,
1773                                                  &update_mode, rec->ur_name,
1774                                                  rec->ur_namelen, &child_lockh,
1775                                                  &dchild, LCK_EX,
1776                                                  MDS_INODELOCK_LOOKUP |
1777                                                  MDS_INODELOCK_UPDATE);
1778         }
1779         if (rc)
1780                 GOTO(cleanup, rc);
1781
1782         if (dchild->d_flags & DCACHE_CROSS_REF) {
1783                 /* we should have parent lock only here */
1784                 LASSERT(unlink_by_fid == 0);
1785                 LASSERT(dchild->d_mdsnum != mds->mds_num);
1786                 mds_reint_unlink_remote(rec, offset, req, parent_lockh,
1787                                         update_mode, dparent, &child_lockh, dchild);
1788                 RETURN(0);
1789         }
1790
1791         cleanup_phase = 1; /* dchild, dparent, locks */
1792
1793         dget(dchild);
1794         child_inode = dchild->d_inode;
1795         if (child_inode == NULL) {
1796                 CDEBUG(D_INODE, "child doesn't exist (dir %lu, name %s)\n",
1797                        dparent ? dparent->d_inode->i_ino : 0, rec->ur_name);
1798                 GOTO(cleanup, rc = -ENOENT);
1799         }
1800
1801         cleanup_phase = 2; /* dchild has a lock */
1802
1803         /* We have to do these checks ourselves, in case we are making an
1804          * orphan.  The client tells us whether rmdir() or unlink() was called,
1805          * so we need to return appropriate errors (bug 72).
1806          *
1807          * We don't have to check permissions, because vfs_rename (called from
1808          * mds_open_unlink_rename) also calls may_delete. */
1809         if ((rec->ur_mode & S_IFMT) == S_IFDIR) {
1810                 if (!S_ISDIR(child_inode->i_mode))
1811                         GOTO(cleanup, rc = -ENOTDIR);
1812         } else {
1813                 if (S_ISDIR(child_inode->i_mode))
1814                         GOTO(cleanup, rc = -EISDIR);
1815         }
1816
1817         /* handle splitted dir */
1818         rc = mds_lock_slave_objs(obd, dchild, &slave_lockh);
1819         if (rc)
1820                 GOTO(cleanup, rc);
1821
1822 #if 0
1823         /* Step 4: Get a lock on the ino to sync with creation WRT inode
1824          * reuse (see bug 2029). */
1825         rc = mds_lock_new_child(obd, child_inode, &child_reuse_lockh);
1826         if (rc != ELDLM_OK)
1827                 GOTO(cleanup, rc);
1828 #endif
1829         cleanup_phase = 3; /* child inum lock */
1830
1831         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_UNLINK_WRITE, dparent->d_inode->i_sb);
1832
1833         /* ldlm_reply in buf[0] if called via intent */
1834         if (offset == 3)
1835                 offset = 1;
1836         else
1837                 offset = 0;
1838
1839         body = lustre_msg_buf(req->rq_repmsg, offset, sizeof (*body));
1840         LASSERT(body != NULL);
1841
1842         /* child i_alloc_sem protects orphan_dec_test && is_orphan race */
1843         DOWN_READ_I_ALLOC_SEM(child_inode);
1844         cleanup_phase = 4; /* up(&child_inode->i_sem) when finished */
1845
1846         /* If this is potentially the last reference to this inode, get the
1847          * OBD EA data first so the client can destroy OST objects.  We
1848          * only do the object removal later if no open files/links remain. */
1849         if ((S_ISDIR(child_inode->i_mode) && child_inode->i_nlink == 2) ||
1850             child_inode->i_nlink == 1) {
1851                 if (mds_orphan_open_count(child_inode) > 0) {
1852                         /* need to lock pending_dir before transaction */
1853                         down(&mds->mds_pending_dir->d_inode->i_sem);
1854                         cleanup_phase = 5; /* up(&pending_dir->i_sem) */
1855                 } else if (S_ISREG(child_inode->i_mode)) {
1856                         mds_pack_inode2fid(obd, &body->fid1, child_inode);
1857                         mds_pack_inode2body(obd, body, child_inode);
1858                         mds_pack_md(obd, req->rq_repmsg, offset + 1, body,
1859                                     child_inode, MDS_PACK_MD_LOCK);
1860                 }
1861         }
1862
1863         /* Step 4: Do the unlink: we already verified ur_mode above (bug 72) */
1864         switch (child_inode->i_mode & S_IFMT) {
1865         case S_IFDIR:
1866                 /* Drop any lingering child directories before we start our
1867                  * transaction, to avoid doing multiple inode dirty/delete
1868                  * in our compound transaction (bug 1321). */
1869                 shrink_dcache_parent(dchild);
1870                 handle = fsfilt_start(obd, dparent->d_inode, FSFILT_OP_RMDIR,
1871                                       NULL);
1872                 if (IS_ERR(handle))
1873                         GOTO(cleanup, rc = PTR_ERR(handle));
1874                 rc = vfs_rmdir(dparent->d_inode, dchild);
1875                 break;
1876         case S_IFREG: {
1877 #warning "optimization is possible here: we could drop nlink w/o removing local dentry in FIDS/"
1878                 struct lov_mds_md *lmm = lustre_msg_buf(req->rq_repmsg,
1879                                                         offset + 1, 0);
1880                 handle = fsfilt_start_log(obd, dparent->d_inode,
1881                                           FSFILT_OP_UNLINK, NULL,
1882                                           le32_to_cpu(lmm->lmm_stripe_count));
1883                 if (IS_ERR(handle))
1884                         GOTO(cleanup, rc = PTR_ERR(handle));
1885                 rc = vfs_unlink(dparent->d_inode, dchild);
1886                 break;
1887         }
1888         case S_IFLNK:
1889         case S_IFCHR:
1890         case S_IFBLK:
1891         case S_IFIFO:
1892         case S_IFSOCK:
1893                 handle = fsfilt_start(obd, dparent->d_inode, FSFILT_OP_UNLINK,
1894                                       NULL);
1895                 if (IS_ERR(handle))
1896                         GOTO(cleanup, rc = PTR_ERR(handle));
1897                 rc = vfs_unlink(dparent->d_inode, dchild);
1898                 break;
1899         default:
1900                 CERROR("bad file type %o unlinking %s\n", rec->ur_mode,
1901                        rec->ur_name);
1902                 LBUG();
1903                 GOTO(cleanup, rc = -EINVAL);
1904         }
1905
1906         if (rc == 0 && child_inode->i_nlink == 0) {
1907                 if (mds_orphan_open_count(child_inode) > 0)
1908                         rc = mds_orphan_add_link(rec, obd, dchild);
1909
1910                 if (rc == 1)
1911                         GOTO(cleanup, rc = 0);
1912
1913                 if (!S_ISREG(child_inode->i_mode))
1914                         GOTO(cleanup, rc);
1915
1916                 if (!(body->valid & OBD_MD_FLEASIZE)) {
1917                         body->valid |=(OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
1918                                        OBD_MD_FLATIME | OBD_MD_FLMTIME);
1919                 } else if (mds_log_op_unlink(obd, child_inode,
1920                                 lustre_msg_buf(req->rq_repmsg, offset + 1, 0),
1921                                         req->rq_repmsg->buflens[offset + 1],
1922                                 lustre_msg_buf(req->rq_repmsg, offset + 2, 0),
1923                                         req->rq_repmsg->buflens[offset+2], 
1924                                 &lcl) > 0){
1925                         body->valid |= OBD_MD_FLCOOKIE;
1926                 }
1927         }
1928
1929         GOTO(cleanup, rc);
1930
1931 cleanup:
1932         if (rc == 0) {
1933                 struct iattr iattr;
1934                 int err;
1935
1936                 iattr.ia_valid = ATTR_MTIME | ATTR_CTIME;
1937                 LTIME_S(iattr.ia_mtime) = rec->ur_time;
1938                 LTIME_S(iattr.ia_ctime) = rec->ur_time;
1939
1940                 err = fsfilt_setattr(obd, dparent, handle, &iattr, 0);
1941                 if (err)
1942                         CERROR("error on parent setattr: rc = %d\n", err);
1943         }
1944         rc = mds_finish_transno(mds, dparent ? dparent->d_inode : NULL,
1945                                 handle, req, rc, 0);
1946         if (!rc)
1947                 (void)obd_set_info(mds->mds_osc_exp, strlen("unlinked"),
1948                                    "unlinked", 0, NULL);
1949         switch(cleanup_phase) {
1950         case 5: /* pending_dir semaphore */
1951                 up(&mds->mds_pending_dir->d_inode->i_sem);
1952         case 4: /* child inode semaphore */
1953                 UP_READ_I_ALLOC_SEM(child_inode);
1954                  /* handle splitted dir */
1955                 if (rc == 0) {
1956                         /* master directory can be non-empty or something else ... */
1957                         mds_unlink_slave_objs(obd, dchild);
1958                 }
1959                 if (lcl != NULL)
1960                         ptlrpc_save_llog_lock(req, lcl);
1961         case 3: /* child ino-reuse lock */
1962 #if 0
1963                 if (rc && body != NULL) {
1964                         // Don't unlink the OST objects if the MDS unlink failed
1965                         body->valid = 0;
1966                 }
1967                 if (rc)
1968                         ldlm_lock_decref(&child_reuse_lockh, LCK_EX);
1969                 else
1970                         ptlrpc_save_lock(req, &child_reuse_lockh, LCK_EX);
1971 #endif
1972         case 2: /* child lock */
1973                 mds_unlock_slave_objs(obd, dchild, slave_lockh);
1974                 if (child_lockh.cookie)
1975                         ldlm_lock_decref(&child_lockh, LCK_EX);
1976         case 1: /* child and parent dentry, parent lock */
1977 #ifdef S_PDIROPS
1978                 if (parent_lockh[1].cookie != 0)
1979                         ldlm_lock_decref(parent_lockh + 1, update_mode);
1980 #endif
1981                 if (rc)
1982                         ldlm_lock_decref(parent_lockh, LCK_PW);
1983                 else
1984                         ptlrpc_save_lock(req, parent_lockh, LCK_PW);
1985                 l_dput(dchild);
1986                 l_dput(dchild);
1987                 l_dput(dparent);
1988         case 0:
1989                 break;
1990         default:
1991                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
1992                 LBUG();
1993         }
1994         req->rq_status = rc;
1995         return 0;
1996 }
1997
1998 /*
1999  * to service requests from remote MDS to increment i_nlink
2000  */
2001 static int mds_reint_link_acquire(struct mds_update_record *rec,
2002                                  int offset, struct ptlrpc_request *req,
2003                                  struct lustre_handle *lh)
2004 {
2005         struct obd_device *obd = req->rq_export->exp_obd;
2006         struct ldlm_res_id src_res_id = { .name = {0} };
2007         struct lustre_handle *handle = NULL, src_lockh = {0};
2008         struct mds_obd *mds = mds_req2mds(req);
2009         int rc = 0, cleanup_phase = 0;
2010         struct dentry *de_src = NULL;
2011         ldlm_policy_data_t policy;
2012         int flags = 0;
2013         ENTRY;
2014
2015         DEBUG_REQ(D_INODE, req, "%s: request to acquire i_nlinks %u/%u/%u\n",
2016                   obd->obd_name, (unsigned) rec->ur_fid1->mds,
2017                   (unsigned) rec->ur_fid1->id,
2018                   (unsigned) rec->ur_fid1->generation);
2019
2020         /* Step 1: Lookup the source inode and target directory by FID */
2021         de_src = mds_fid2dentry(mds, rec->ur_fid1, NULL);
2022         if (IS_ERR(de_src))
2023                 GOTO(cleanup, rc = PTR_ERR(de_src));
2024         cleanup_phase = 1; /* source dentry */
2025
2026         src_res_id.name[0] = de_src->d_inode->i_ino;
2027         src_res_id.name[1] = de_src->d_inode->i_generation;
2028         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
2029
2030         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
2031                         src_res_id, LDLM_IBITS, &policy,
2032                         LCK_EX, &flags, mds_blocking_ast,
2033                         ldlm_completion_ast, NULL, NULL,
2034                         NULL, 0, NULL, &src_lockh);
2035         if (rc != ELDLM_OK)
2036                 GOTO(cleanup, rc = -ENOLCK);
2037         cleanup_phase = 2; /* lock */
2038
2039         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_LINK_WRITE, de_src->d_inode->i_sb);
2040
2041         handle = fsfilt_start(obd, de_src->d_inode, FSFILT_OP_LINK, NULL);
2042         if (IS_ERR(handle)) {
2043                 rc = PTR_ERR(handle);
2044                 GOTO(cleanup, rc);
2045         }
2046         de_src->d_inode->i_nlink++;
2047         mark_inode_dirty(de_src->d_inode);
2048
2049 cleanup:
2050         rc = mds_finish_transno(mds, de_src ? de_src->d_inode : NULL,
2051                                         handle, req, rc, 0);
2052         EXIT;
2053         switch (cleanup_phase) {
2054                 case 2:
2055                         if (rc)
2056                                 ldlm_lock_decref(&src_lockh, LCK_EX);
2057                         else
2058                                 ptlrpc_save_lock(req, &src_lockh, LCK_EX);
2059                 case 1:
2060                         l_dput(de_src);
2061                 case 0:
2062                         break;
2063                 default:
2064                         CERROR("invalid cleanup_phase %d\n", cleanup_phase);
2065                         LBUG();
2066         }
2067         req->rq_status = rc;
2068         return 0;
2069 }
2070
2071 /*
2072  * request to link to foreign inode:
2073  *  - acquire i_nlinks on this inode
2074  *  - add dentry
2075  */
2076 static int mds_reint_link_to_remote(struct mds_update_record *rec,
2077                                     int offset, struct ptlrpc_request *req,
2078                                     struct lustre_handle *lh)
2079 {
2080         struct lustre_handle *handle = NULL, tgt_dir_lockh[2] = {{0}, {0}};
2081         struct obd_device *obd = req->rq_export->exp_obd;
2082         struct dentry *de_tgt_dir = NULL;
2083         struct mds_obd *mds = mds_req2mds(req);
2084         int rc = 0, cleanup_phase = 0;
2085         struct mdc_op_data op_data;
2086         struct ptlrpc_request *request = NULL;
2087         int update_mode;
2088         ENTRY;
2089
2090 #define fmt     "%s: request to link %u/%u/%u:%*s to foreign inode %u/%u/%u\n"
2091         DEBUG_REQ(D_INODE, req, fmt, obd->obd_name,
2092                   (unsigned) rec->ur_fid2->mds,
2093                   (unsigned) rec->ur_fid2->id,
2094                   (unsigned) rec->ur_fid2->generation,
2095                   rec->ur_namelen - 1, rec->ur_name,
2096                   (unsigned) rec->ur_fid1->mds,
2097                   (unsigned) rec->ur_fid1->id,
2098                   (unsigned)rec->ur_fid1->generation);
2099
2100         de_tgt_dir = mds_fid2locked_dentry(obd, rec->ur_fid2, NULL, LCK_EX,
2101                                            tgt_dir_lockh, &update_mode,
2102                                            rec->ur_name, rec->ur_namelen - 1,
2103                                            MDS_INODELOCK_UPDATE);
2104         if (IS_ERR(de_tgt_dir))
2105                 GOTO(cleanup, rc = PTR_ERR(de_tgt_dir));
2106         cleanup_phase = 1;
2107
2108         op_data.fid1 = *(rec->ur_fid1);
2109         op_data.namelen = 0;
2110         op_data.name = NULL;
2111         rc = md_link(mds->mds_lmv_exp, &op_data, &request);
2112         if (rc)
2113                 GOTO(cleanup, rc);
2114
2115         cleanup_phase = 2;
2116         if (request)
2117                 ptlrpc_req_finished(request);
2118
2119         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_LINK_WRITE, de_tgt_dir->d_inode->i_sb);
2120
2121         handle = fsfilt_start(obd, de_tgt_dir->d_inode, FSFILT_OP_LINK, NULL);
2122         if (IS_ERR(handle)) {
2123                 rc = PTR_ERR(handle);
2124                 GOTO(cleanup, rc);
2125         }
2126         
2127         rc = fsfilt_add_dir_entry(obd, de_tgt_dir, rec->ur_name,
2128                                   rec->ur_namelen - 1, rec->ur_fid1->id,
2129                                   rec->ur_fid1->generation, rec->ur_fid1->mds);
2130         cleanup_phase = 3;
2131
2132 cleanup:
2133         rc = mds_finish_transno(mds, de_tgt_dir ? de_tgt_dir->d_inode : NULL,
2134                                 handle, req, rc, 0);
2135         EXIT;
2136
2137         switch (cleanup_phase) {
2138                 case 3:
2139                         if (rc) {
2140                                 /* FIXME: drop i_nlink on remote inode here */
2141                                 CERROR("MUST drop drop i_nlink here\n");
2142                         }
2143                 case 2:
2144                 case 1:
2145                         if (rc) {
2146                                 ldlm_lock_decref(tgt_dir_lockh, LCK_EX);
2147 #ifdef S_PDIROPS
2148                                 ldlm_lock_decref(tgt_dir_lockh + 1, update_mode);
2149 #endif
2150                         } else {
2151                                 ptlrpc_save_lock(req, tgt_dir_lockh, LCK_EX);
2152 #ifdef S_PDIROPS
2153                                 ptlrpc_save_lock(req, tgt_dir_lockh+1, update_mode);
2154 #endif
2155                         }
2156                         l_dput(de_tgt_dir);
2157                         break;
2158                 default:
2159                         CERROR("invalid cleanup_phase %d\n", cleanup_phase);
2160                         LBUG();
2161         }
2162         req->rq_status = rc;
2163         return 0;
2164 }
2165
2166 static int mds_reint_link(struct mds_update_record *rec, int offset,
2167                           struct ptlrpc_request *req,
2168                           struct lustre_handle *lh)
2169 {
2170         struct obd_device *obd = req->rq_export->exp_obd;
2171         struct dentry *de_src = NULL;
2172         struct dentry *de_tgt_dir = NULL;
2173         struct dentry *dchild = NULL;
2174         struct mds_obd *mds = mds_req2mds(req);
2175         struct lustre_handle *handle = NULL;
2176         struct lustre_handle tgt_dir_lockh[2] = {{0}, {0}}, src_lockh = {0};
2177         struct ldlm_res_id src_res_id = { .name = {0} };
2178         struct ldlm_res_id tgt_dir_res_id = { .name = {0} };
2179         ldlm_policy_data_t src_policy ={.l_inodebits = {MDS_INODELOCK_UPDATE}};
2180         ldlm_policy_data_t tgt_dir_policy =
2181                                        {.l_inodebits = {MDS_INODELOCK_UPDATE}};
2182         int rc = 0, cleanup_phase = 0;
2183 #ifdef S_PDIROPS
2184         int update_mode = 0;
2185 #endif
2186         ENTRY;
2187
2188         LASSERT(offset == 1);
2189
2190         DEBUG_REQ(D_INODE, req, "original "LPU64"/%u to "LPU64"/%u %s",
2191                   rec->ur_fid1->id, rec->ur_fid1->generation,
2192                   rec->ur_fid2->id, rec->ur_fid2->generation, rec->ur_name);
2193
2194         MDS_CHECK_RESENT(req, mds_reconstruct_generic(req));
2195         
2196         MDS_UPDATE_COUNTER(mds, MDS_LINK_COUNT);
2197         
2198 //      memset(tgt_dir_lockh, 0, 2*sizeof(tgt_dir_lockh[0]));
2199         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
2200                 GOTO(cleanup, rc = -ENOENT);
2201
2202         if (rec->ur_fid1->mds != mds->mds_num) {
2203                 rc = mds_reint_link_to_remote(rec, offset, req, lh);
2204                 RETURN(rc);
2205         }
2206         
2207         if (rec->ur_namelen == 1) {
2208                 rc = mds_reint_link_acquire(rec, offset, req, lh);
2209                 RETURN(rc);
2210         }
2211
2212         /* Step 1: Lookup the source inode and target directory by FID */
2213         de_src = mds_fid2dentry(mds, rec->ur_fid1, NULL);
2214         if (IS_ERR(de_src))
2215                 GOTO(cleanup, rc = PTR_ERR(de_src));
2216
2217         cleanup_phase = 1; /* source dentry */
2218
2219         de_tgt_dir = mds_fid2dentry(mds, rec->ur_fid2, NULL);
2220         if (IS_ERR(de_tgt_dir)) {
2221                 rc = PTR_ERR(de_tgt_dir);
2222                 de_tgt_dir = NULL;
2223                 GOTO(cleanup, rc);
2224         }
2225
2226         cleanup_phase = 2; /* target directory dentry */
2227
2228         CDEBUG(D_INODE, "linking %*s/%s to inode %lu\n",
2229                de_tgt_dir->d_name.len, de_tgt_dir->d_name.name, rec->ur_name,
2230                de_src->d_inode->i_ino);
2231
2232         /* Step 2: Take the two locks */
2233         src_res_id.name[0] = de_src->d_inode->i_ino;
2234         src_res_id.name[1] = de_src->d_inode->i_generation;
2235         tgt_dir_res_id.name[0] = de_tgt_dir->d_inode->i_ino;
2236         tgt_dir_res_id.name[1] = de_tgt_dir->d_inode->i_generation;
2237 #ifdef S_PDIROPS
2238         if (IS_PDIROPS(de_tgt_dir->d_inode)) {
2239                 int flags = 0;
2240                 update_mode = mds_lock_mode_for_dir(obd, de_tgt_dir, LCK_EX);
2241                 if (update_mode) {
2242                         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
2243                                               tgt_dir_res_id, LDLM_IBITS,
2244                                               &src_policy, update_mode, &flags,
2245                                               mds_blocking_ast,
2246                                               ldlm_completion_ast, NULL, NULL,
2247                                               NULL, 0, NULL, tgt_dir_lockh + 1);
2248                         if (rc != ELDLM_OK)
2249                                 GOTO(cleanup, rc = -ENOLCK);
2250                 }
2251
2252                 tgt_dir_res_id.name[2] = full_name_hash(rec->ur_name,
2253                                                         rec->ur_namelen - 1);
2254                 CDEBUG(D_INFO, "take lock on %lu:%u:"LPX64"\n",
2255                        de_tgt_dir->d_inode->i_ino,
2256                        de_tgt_dir->d_inode->i_generation,
2257                        tgt_dir_res_id.name[2]);
2258         }
2259 #endif
2260         rc = enqueue_ordered_locks(obd, &src_res_id, &src_lockh, LCK_EX,
2261                                    &src_policy,
2262                                    &tgt_dir_res_id, tgt_dir_lockh, LCK_EX,
2263                                    &tgt_dir_policy);
2264         if (rc)
2265                 GOTO(cleanup, rc);
2266
2267         cleanup_phase = 3; /* locks */
2268
2269         /* Step 3: Lookup the child */
2270         dchild = ll_lookup_one_len(rec->ur_name, de_tgt_dir, rec->ur_namelen-1);
2271         if (IS_ERR(dchild)) {
2272                 rc = PTR_ERR(dchild);
2273                 if (rc != -EPERM && rc != -EACCES)
2274                         CERROR("child lookup error %d\n", rc);
2275                 GOTO(cleanup, rc);
2276         }
2277
2278         cleanup_phase = 4; /* child dentry */
2279
2280         if (dchild->d_inode) {
2281                 CDEBUG(D_INODE, "child exists (dir %lu, name %s)\n",
2282                        de_tgt_dir->d_inode->i_ino, rec->ur_name);
2283                 rc = -EEXIST;
2284                 GOTO(cleanup, rc);
2285         }
2286
2287         /* Step 4: Do it. */
2288         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_LINK_WRITE, de_src->d_inode->i_sb);
2289
2290         handle = fsfilt_start(obd, de_tgt_dir->d_inode, FSFILT_OP_LINK, NULL);
2291         if (IS_ERR(handle)) {
2292                 rc = PTR_ERR(handle);
2293                 GOTO(cleanup, rc);
2294         }
2295
2296         rc = vfs_link(de_src, de_tgt_dir->d_inode, dchild);
2297         if (rc && rc != -EPERM && rc != -EACCES)
2298                 CERROR("vfs_link error %d\n", rc);
2299 cleanup:
2300         rc = mds_finish_transno(mds, de_tgt_dir ? de_tgt_dir->d_inode : NULL,
2301                                 handle, req, rc, 0);
2302         EXIT;
2303
2304         switch (cleanup_phase) {
2305         case 4: /* child dentry */
2306                 l_dput(dchild);
2307         case 3: /* locks */
2308                 if (rc) {
2309                         ldlm_lock_decref(&src_lockh, LCK_EX);
2310                         ldlm_lock_decref(tgt_dir_lockh, LCK_EX);
2311                 } else {
2312                         ptlrpc_save_lock(req, &src_lockh, LCK_EX);
2313                         ptlrpc_save_lock(req, tgt_dir_lockh, LCK_EX);
2314                 }
2315         case 2: /* target dentry */
2316 #ifdef S_PDIROPS
2317                 if (tgt_dir_lockh[1].cookie && update_mode)
2318                         ldlm_lock_decref(tgt_dir_lockh + 1, update_mode);
2319 #endif
2320                 if (de_tgt_dir)
2321                         l_dput(de_tgt_dir);
2322         case 1: /* source dentry */
2323                 l_dput(de_src);
2324         case 0:
2325                 break;
2326         default:
2327                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
2328                 LBUG();
2329         }
2330         req->rq_status = rc;
2331         return 0;
2332 }
2333
2334 /* The idea here is that we need to get four locks in the end:
2335  * one on each parent directory, one on each child.  We need to take
2336  * these locks in some kind of order (to avoid deadlocks), and the order
2337  * I selected is "increasing resource number" order.  We need to look up
2338  * the children, however, before we know what the resource number(s) are.
2339  * Thus the following plan:
2340  *
2341  * 1,2. Look up the parents
2342  * 3,4. Look up the children
2343  * 5. Take locks on the parents and children, in order
2344  * 6. Verify that the children haven't changed since they were looked up
2345  *
2346  * If there was a race and the children changed since they were first looked
2347  * up, it is possible that mds_verify_child() will be able to just grab the
2348  * lock on the new child resource (if it has a higher resource than any other)
2349  * but we need to compare against not only its parent, but also against the
2350  * parent and child of the "other half" of the rename, hence maxres_{src,tgt}.
2351  *
2352  * We need the fancy igrab() on the child inodes because we aren't holding a
2353  * lock on the parent after the lookup is done, so dentry->d_inode may change
2354  * at any time, and igrab() itself doesn't like getting passed a NULL argument.
2355  */
2356 static int mds_get_parents_children_locked(struct obd_device *obd,
2357                                            struct mds_obd *mds,
2358                                            struct ll_fid *p1_fid,
2359                                            struct dentry **de_srcdirp,
2360                                            struct ll_fid *p2_fid,
2361                                            struct dentry **de_tgtdirp,
2362                                            int parent_mode,
2363                                            const char *old_name, int old_len,
2364                                            struct dentry **de_oldp,
2365                                            const char *new_name, int new_len,
2366                                            struct dentry **de_newp,
2367                                            struct lustre_handle *dlm_handles,
2368                                            int child_mode)
2369 {
2370         struct ldlm_res_id p1_res_id = { .name = {0} };
2371         struct ldlm_res_id p2_res_id = { .name = {0} };
2372         struct ldlm_res_id c1_res_id = { .name = {0} };
2373         struct ldlm_res_id c2_res_id = { .name = {0} };
2374         ldlm_policy_data_t p_policy = {.l_inodebits = {MDS_INODELOCK_UPDATE}};
2375         /* Only dentry should change, but the inode itself would be
2376            intact otherwise */
2377         ldlm_policy_data_t c1_policy = {.l_inodebits = {MDS_INODELOCK_LOOKUP}};
2378         /* If something is going to be replaced, both dentry and inode locks are
2379            needed */
2380         ldlm_policy_data_t c2_policy = {.l_inodebits = {MDS_INODELOCK_LOOKUP|
2381                                                         MDS_INODELOCK_UPDATE}};
2382         struct ldlm_res_id *maxres_src, *maxres_tgt;
2383         struct inode *inode;
2384         int rc = 0, cleanup_phase = 0;
2385         ENTRY;
2386
2387         /* Step 1: Lookup the source directory */
2388         *de_srcdirp = mds_fid2dentry(mds, p1_fid, NULL);
2389         if (IS_ERR(*de_srcdirp))
2390                 GOTO(cleanup, rc = PTR_ERR(*de_srcdirp));
2391
2392         cleanup_phase = 1; /* source directory dentry */
2393
2394         p1_res_id.name[0] = (*de_srcdirp)->d_inode->i_ino;
2395         p1_res_id.name[1] = (*de_srcdirp)->d_inode->i_generation;
2396
2397         /* Step 2: Lookup the target directory */
2398         if (memcmp(p1_fid, p2_fid, sizeof(*p1_fid)) == 0) {
2399                 *de_tgtdirp = dget(*de_srcdirp);
2400         } else {
2401                 *de_tgtdirp = mds_fid2dentry(mds, p2_fid, NULL);
2402                 if (IS_ERR(*de_tgtdirp)) {
2403                         rc = PTR_ERR(*de_tgtdirp);
2404                         *de_tgtdirp = NULL;
2405                         GOTO(cleanup, rc);
2406                 }
2407         }
2408
2409         cleanup_phase = 2; /* target directory dentry */
2410
2411         p2_res_id.name[0] = (*de_tgtdirp)->d_inode->i_ino;
2412         p2_res_id.name[1] = (*de_tgtdirp)->d_inode->i_generation;
2413
2414 #ifdef S_PDIROPS
2415         dlm_handles[5].cookie = 0;
2416         dlm_handles[6].cookie = 0;
2417         if (IS_PDIROPS((*de_srcdirp)->d_inode)) {
2418                 /* Get a temp lock on just ino, gen to flush client cache and
2419                  * to protect dirs from concurrent splitting */
2420                 rc = enqueue_ordered_locks(obd, &p1_res_id, &(dlm_handles[5]),
2421                                            LCK_PW, &p_policy, &p2_res_id,
2422                                            &(dlm_handles[6]),LCK_PW,&p_policy);
2423                 if (rc != ELDLM_OK)
2424                         GOTO(cleanup, rc);
2425                 p1_res_id.name[2] = full_name_hash(old_name, old_len - 1);
2426                 p2_res_id.name[2] = full_name_hash(new_name, new_len - 1);
2427                 CDEBUG(D_INFO, "take locks on %lu:%u:"LPX64", %lu:%u:"LPX64"\n",
2428                        (*de_srcdirp)->d_inode->i_ino,
2429                        (*de_srcdirp)->d_inode->i_generation, p1_res_id.name[2],
2430                        (*de_tgtdirp)->d_inode->i_ino,
2431                        (*de_tgtdirp)->d_inode->i_generation, p2_res_id.name[2]);
2432         }
2433         cleanup_phase = 3;
2434 #endif
2435
2436         /* Step 3: Lookup the source child entry */
2437         *de_oldp = ll_lookup_one_len(old_name, *de_srcdirp, old_len - 1);
2438         if (IS_ERR(*de_oldp)) {
2439                 rc = PTR_ERR(*de_oldp);
2440                 CERROR("old child lookup error (%*s): %d\n",
2441                        old_len - 1, old_name, rc);
2442                 GOTO(cleanup, rc);
2443         }
2444
2445         cleanup_phase = 4; /* original name dentry */
2446
2447         inode = (*de_oldp)->d_inode;
2448         
2449         if (inode != NULL) {
2450                 inode = igrab(inode);
2451                 if (inode == NULL)
2452                         GOTO(cleanup, rc = -ENOENT);
2453
2454                 c1_res_id.name[0] = inode->i_ino;
2455                 c1_res_id.name[1] = inode->i_generation;
2456                 iput(inode);
2457         } else if ((*de_oldp)->d_flags & DCACHE_CROSS_REF) {
2458                 c1_res_id.name[0] = (*de_oldp)->d_inum;
2459                 c1_res_id.name[1] = (*de_oldp)->d_generation;
2460         }
2461
2462         /* Step 4: Lookup the target child entry */
2463         *de_newp = ll_lookup_one_len(new_name, *de_tgtdirp, new_len - 1);
2464         if (IS_ERR(*de_newp)) {
2465                 rc = PTR_ERR(*de_newp);
2466                 CERROR("new child lookup error (%*s): %d\n",
2467                        old_len - 1, old_name, rc);
2468                 GOTO(cleanup, rc);
2469         }
2470
2471         cleanup_phase = 5; /* target dentry */
2472
2473         inode = (*de_newp)->d_inode;
2474         
2475         if (inode == NULL)
2476                 goto retry_locks;
2477         
2478         if (inode != NULL) {
2479                 inode = igrab(inode);
2480                 if (inode == NULL)
2481                         goto retry_locks;
2482
2483                 c2_res_id.name[0] = inode->i_ino;
2484                 c2_res_id.name[1] = inode->i_generation;
2485                 iput(inode);
2486         } else if ((*de_newp)->d_flags & DCACHE_CROSS_REF) {
2487                 c2_res_id.name[0] = (*de_newp)->d_inum;
2488                 c2_res_id.name[1] = (*de_newp)->d_generation;
2489         }
2490
2491 retry_locks:
2492         /* Step 5: Take locks on the parents and child(ren) */
2493         maxres_src = &p1_res_id;
2494         maxres_tgt = &p2_res_id;
2495         cleanup_phase = 5; /* target dentry */
2496
2497         if (c1_res_id.name[0] != 0 && res_gt(&c1_res_id, &p1_res_id, NULL,NULL))
2498                 maxres_src = &c1_res_id;
2499         if (c2_res_id.name[0] != 0 && res_gt(&c2_res_id, &p2_res_id, NULL,NULL))
2500                 maxres_tgt = &c2_res_id;
2501
2502         rc = enqueue_4ordered_locks(obd, &p1_res_id,&dlm_handles[0],parent_mode,
2503                                     &p_policy,
2504                                     &p2_res_id, &dlm_handles[1], parent_mode,
2505                                     &p_policy,
2506                                     &c1_res_id, &dlm_handles[2], child_mode,
2507                                     &c1_policy,
2508                                     &c2_res_id, &dlm_handles[3], child_mode,
2509                                     &c2_policy);
2510         if (rc)
2511                 GOTO(cleanup, rc);
2512
2513         cleanup_phase = 6; /* parent and child(ren) locks */
2514
2515         /* Step 6a: Re-lookup source child to verify it hasn't changed */
2516         rc = mds_verify_child(obd, &p1_res_id, &dlm_handles[0], *de_srcdirp,
2517                               parent_mode, &c1_res_id, &dlm_handles[2],
2518                               de_oldp, child_mode, &c1_policy, old_name,old_len,
2519                               maxres_tgt);
2520         if (rc) {
2521                 if (c2_res_id.name[0] != 0)
2522                         ldlm_lock_decref(&dlm_handles[3], child_mode);
2523                 ldlm_lock_decref(&dlm_handles[1], parent_mode);
2524                 cleanup_phase = 5;
2525                 if (rc > 0)
2526                         goto retry_locks;
2527                 GOTO(cleanup, rc);
2528         }
2529
2530         if (!DENTRY_VALID(*de_oldp))
2531                 GOTO(cleanup, rc = -ENOENT);
2532
2533         /* Step 6b: Re-lookup target child to verify it hasn't changed */
2534         rc = mds_verify_child(obd, &p2_res_id, &dlm_handles[1], *de_tgtdirp,
2535                               parent_mode, &c2_res_id, &dlm_handles[3],
2536                               de_newp, child_mode, &c2_policy, new_name,
2537                               new_len, maxres_src);
2538         if (rc) {
2539                 ldlm_lock_decref(&dlm_handles[2], child_mode);
2540                 ldlm_lock_decref(&dlm_handles[0], parent_mode);
2541                 cleanup_phase = 5;
2542                 if (rc > 0)
2543                         goto retry_locks;
2544                 GOTO(cleanup, rc);
2545         }
2546
2547         EXIT;
2548 cleanup:
2549         if (rc) {
2550                 switch (cleanup_phase) {
2551                 case 6: /* child lock(s) */
2552                         if (c2_res_id.name[0] != 0)
2553                                 ldlm_lock_decref(&dlm_handles[3], child_mode);
2554                         if (c1_res_id.name[0] != 0)
2555                                 ldlm_lock_decref(&dlm_handles[2], child_mode);
2556                         ldlm_lock_decref(&dlm_handles[1], parent_mode);
2557                         ldlm_lock_decref(&dlm_handles[0], parent_mode);
2558                 case 5: /* target dentry */
2559                         l_dput(*de_newp);
2560                 case 4: /* source dentry */
2561                         l_dput(*de_oldp);
2562                 case 3:
2563 #ifdef S_PDIROPS
2564                         if (dlm_handles[5].cookie != 0)
2565                                 ldlm_lock_decref(&(dlm_handles[5]), LCK_PW);
2566                         if (dlm_handles[6].cookie != 0)
2567                                 ldlm_lock_decref(&(dlm_handles[6]), LCK_PW);
2568 #endif
2569                 case 2: /* target directory dentry */
2570                         l_dput(*de_tgtdirp);
2571                 case 1: /* source directry dentry */
2572                         l_dput(*de_srcdirp);
2573                 }
2574         }
2575
2576         return rc;
2577 }
2578                                                                                                                                                                                                      
2579 static int mds_add_local_dentry(struct mds_update_record *rec, int offset,
2580                                 struct ptlrpc_request *req, struct dentry *dentry,
2581                                 struct dentry *de_dir, 
2582                                 struct dentry *de)
2583 {
2584         struct obd_device *obd = req->rq_export->exp_obd;
2585         struct mds_obd *mds = mds_req2mds(req);
2586         void *handle = NULL;
2587         int rc = 0;
2588         ENTRY;
2589                                                                                                                                                                                                      
2590         if (de->d_inode) {
2591                 /*
2592                  * name exists and points to local inode try to unlink this name
2593                  * and create new one.
2594                  */
2595                 CDEBUG(D_OTHER, "%s: %s points to local inode %lu/%lu\n",
2596                        obd->obd_name, rec->ur_tgt,
2597                        (unsigned long)de->d_inode->i_ino,
2598                        (unsigned long)de->d_inode->i_generation);
2599                 handle = fsfilt_start(obd, de_dir->d_inode,
2600                                       FSFILT_OP_RENAME, NULL);
2601                 if (IS_ERR(handle))
2602                         GOTO(cleanup, rc = PTR_ERR(handle));
2603                 rc = fsfilt_del_dir_entry(req->rq_export->exp_obd, de);
2604                 if (rc)
2605                         GOTO(cleanup, rc);
2606         } else if (de->d_flags & DCACHE_CROSS_REF) {
2607                 /* name exists and points to remove inode */
2608                 CDEBUG(D_OTHER, "%s: %s points to remote inode %lu/%lu/%lu\n",
2609                        obd->obd_name, rec->ur_tgt, (unsigned long)de->d_mdsnum,
2610                        (unsigned long)de->d_inum, 
2611                        (unsigned long)de->d_generation);
2612                 handle = fsfilt_start(obd, de_dir->d_inode,
2613                                       FSFILT_OP_RENAME, NULL);
2614                 if (IS_ERR(handle))
2615                         GOTO(cleanup, rc = PTR_ERR(handle));
2616                 rc = fsfilt_del_dir_entry(req->rq_export->exp_obd, de);
2617                 if (rc)
2618                         GOTO(cleanup, rc);
2619         } else {
2620                 /* name doesn't exist. the simplest case. */
2621                 handle = fsfilt_start(obd, de_dir->d_inode,
2622                                       FSFILT_OP_LINK, NULL);
2623                 if (IS_ERR(handle))
2624                         GOTO(cleanup, rc = PTR_ERR(handle));
2625         }
2626                                                                                                                                                                                                      
2627         rc = fsfilt_add_dir_entry(obd, de_dir, rec->ur_tgt,
2628                                   rec->ur_tgtlen - 1, dentry->d_inum,
2629                                   dentry->d_generation, dentry->d_mdsnum);
2630         if (rc) {
2631                 CERROR("add_dir_entry() returned error %d\n", rc);
2632                 GOTO(cleanup, rc);
2633         }
2634                                                                                                                                                                                                      
2635 cleanup:
2636         EXIT;
2637         rc = mds_finish_transno(mds, de_dir ? de_dir->d_inode : NULL,
2638                                 handle, req, rc, 0);
2639                                                                                                                                                                                                      
2640         RETURN(rc);
2641 }
2642
2643 static int mds_del_local_dentry(struct mds_update_record *rec, int offset,
2644                                 struct ptlrpc_request *req, 
2645                                 struct dentry *dentry, struct dentry *de_dir, 
2646                                 struct dentry *de)
2647 {
2648         struct obd_device *obd = req->rq_export->exp_obd;
2649         struct mds_obd *mds = mds_req2mds(req);
2650         void *handle = NULL;
2651         int rc = 0;
2652         ENTRY;
2653                                                                                                                                                                                                      
2654         handle = fsfilt_start(obd, de_dir->d_inode, FSFILT_OP_UNLINK, NULL);
2655         if (IS_ERR(handle))
2656                 GOTO(cleanup, rc = PTR_ERR(handle));
2657         rc = fsfilt_del_dir_entry(obd, de);
2658         d_drop(de);
2659                                                                                                                                                                                                      
2660 cleanup:
2661         EXIT;
2662         rc = mds_finish_transno(mds, de_dir ? de_dir->d_inode : NULL,
2663                                 handle, req, rc, 0);
2664         RETURN(0);
2665 }
2666
2667
2668 static int mds_reint_rename_create_name(struct mds_update_record *rec,
2669                                         int offset, struct ptlrpc_request *req)
2670 {
2671         struct obd_device *obd = req->rq_export->exp_obd;
2672         struct dentry *de_srcdir = NULL;
2673         struct dentry *de_new = NULL;
2674         struct mds_obd *mds = mds_req2mds(req);
2675         struct lustre_handle parent_lockh[2] = {{0}, {0}};
2676         struct lustre_handle child_lockh = {0};
2677         int cleanup_phase = 0;
2678         void *handle = NULL;
2679         int update_mode, rc = 0;
2680         ENTRY;
2681
2682         /* another MDS executing rename operation has asked us
2683          * to create target name. such a creation should destroy
2684          * existing target name */
2685
2686         CDEBUG(D_OTHER, "%s: request to create name %s for %lu/%lu/%lu\n",
2687                         obd->obd_name, rec->ur_tgt,
2688                         (unsigned long) rec->ur_fid1->mds,
2689                         (unsigned long) rec->ur_fid1->id,
2690                         (unsigned long) rec->ur_fid1->generation);
2691
2692         /* first, lookup the target */
2693         child_lockh.cookie = 0;
2694         rc = mds_get_parent_child_locked(obd, mds, rec->ur_fid2, parent_lockh,
2695                                          &de_srcdir,LCK_PW,MDS_INODELOCK_UPDATE,
2696                                          &update_mode, rec->ur_tgt, rec->ur_tgtlen,
2697                                          &child_lockh, &de_new, LCK_EX,
2698                                          MDS_INODELOCK_LOOKUP);
2699         if (rc)
2700                 GOTO(cleanup, rc);
2701
2702         cleanup_phase = 1;
2703
2704         LASSERT(de_srcdir);
2705         LASSERT(de_srcdir->d_inode);
2706         LASSERT(de_new);
2707
2708         if (de_new->d_inode) {
2709                 /* name exists and points to local inode
2710                  * try to unlink this name and create new one */
2711                 CERROR("%s: %s points to local inode %lu/%lu\n",
2712                        obd->obd_name, rec->ur_tgt,
2713                        (unsigned long) de_new->d_inode->i_ino,
2714                        (unsigned long) de_new->d_inode->i_generation);
2715                 handle = fsfilt_start(obd, de_srcdir->d_inode,
2716                                       FSFILT_OP_RENAME, NULL);
2717                 if (IS_ERR(handle))
2718                         GOTO(cleanup, rc = PTR_ERR(handle));
2719                 rc = fsfilt_del_dir_entry(req->rq_export->exp_obd, de_new);
2720                 if (rc)
2721                         GOTO(cleanup, rc);
2722         } else if (de_new->d_flags & DCACHE_CROSS_REF) {
2723                 /* name exists adn points to remove inode */
2724                 CERROR("%s: %s points to remote inode %lu/%lu/%lu\n",
2725                        obd->obd_name, rec->ur_tgt,
2726                        (unsigned long) de_new->d_mdsnum,
2727                        (unsigned long) de_new->d_inum,
2728                        (unsigned long) de_new->d_generation);
2729         } else {
2730                 /* name doesn't exist. the simplest case */
2731                 handle = fsfilt_start(obd, de_srcdir->d_inode,
2732                                       FSFILT_OP_LINK, NULL);
2733                 if (IS_ERR(handle))
2734                         GOTO(cleanup, rc = PTR_ERR(handle));
2735         }
2736        
2737         cleanup_phase = 2;
2738         rc = fsfilt_add_dir_entry(obd, de_srcdir, rec->ur_tgt,
2739                         rec->ur_tgtlen - 1, rec->ur_fid1->id,
2740                         rec->ur_fid1->generation, rec->ur_fid1->mds);
2741         if (rc)
2742                 CERROR("add_dir_entry() returned error %d\n", rc);
2743 cleanup:
2744         EXIT;
2745         rc = mds_finish_transno(mds, de_srcdir ? de_srcdir->d_inode : NULL,
2746                                 handle, req, rc, 0);
2747         switch(cleanup_phase) {
2748                 case 2:
2749                 case 1:
2750 #ifdef S_PDIROPS
2751                         if (parent_lockh[1].cookie != 0)
2752                                 ldlm_lock_decref(&parent_lockh[1], update_mode);
2753 #endif
2754                         ldlm_lock_decref(&parent_lockh[0], LCK_PW);
2755                         if (child_lockh.cookie != 0)
2756                                 ldlm_lock_decref(&child_lockh, LCK_EX);
2757                         l_dput(de_new);
2758                         l_dput(de_srcdir);
2759                         break;
2760                 default:
2761                         LBUG();
2762         }
2763
2764         req->rq_status = rc;
2765
2766         RETURN(0);
2767 }
2768
2769 static int mds_reint_rename_to_remote(struct mds_update_record *rec, int offset,
2770                                       struct ptlrpc_request *req)
2771 {
2772         struct obd_device *obd = req->rq_export->exp_obd;
2773         struct ptlrpc_request *req2 = NULL;
2774         struct dentry *de_srcdir = NULL;
2775         struct dentry *de_old = NULL;
2776         struct mds_obd *mds = mds_req2mds(req);
2777         struct lustre_handle parent_lockh[2] = {{0}, {0}};
2778         struct lustre_handle child_lockh = {0};
2779         struct mdc_op_data opdata;
2780         int update_mode, rc = 0;
2781         ENTRY;
2782
2783         CDEBUG(D_OTHER, "%s: move name %s onto another mds%u\n",
2784                obd->obd_name, rec->ur_name, rec->ur_fid2->mds + 1);
2785         memset(&opdata, 0, sizeof(opdata));
2786
2787         child_lockh.cookie = 0;
2788         rc = mds_get_parent_child_locked(obd, mds, rec->ur_fid1, parent_lockh,
2789                                          &de_srcdir,LCK_PW,MDS_INODELOCK_UPDATE,
2790                                          &update_mode, rec->ur_name, 
2791                                          rec->ur_namelen, &child_lockh, &de_old,
2792                                          LCK_EX, MDS_INODELOCK_LOOKUP);
2793         LASSERT(rc == 0);
2794         LASSERT(de_srcdir);
2795         LASSERT(de_srcdir->d_inode);
2796         LASSERT(de_old);
2797        
2798         /* we already know the target should be created on another MDS
2799          * so, we have to request that MDS to do it */
2800
2801         /* prepare source fid */
2802         if (de_old->d_flags & DCACHE_CROSS_REF) {
2803                 LASSERT(de_old->d_inode == NULL);
2804                 CDEBUG(D_OTHER, "request to move remote name\n");
2805                 opdata.fid1.mds = de_old->d_mdsnum;
2806                 opdata.fid1.id = de_old->d_inum;
2807                 opdata.fid1.generation = de_old->d_generation;
2808         } else if (de_old->d_inode == NULL) {
2809                 /* oh, source doesn't exist */
2810                 GOTO(cleanup, rc = -ENOENT);
2811         } else {
2812                 LASSERT(de_old->d_inode != NULL);
2813                 CDEBUG(D_OTHER, "request to move local name\n");
2814                 opdata.fid1.mds = mds->mds_num;
2815                 opdata.fid1.id = de_old->d_inode->i_ino;
2816                 opdata.fid1.generation = de_old->d_inode->i_generation;
2817         }
2818
2819         opdata.fid2 = *(rec->ur_fid2);
2820         rc = md_rename(mds->mds_lmv_exp, &opdata, NULL, 0, rec->ur_tgt,
2821                        rec->ur_tgtlen - 1, &req2);
2822        
2823         if (rc)
2824                 GOTO(cleanup, rc);
2825         
2826         rc = mds_del_local_dentry(rec, offset, req, NULL, de_srcdir, de_old);
2827 cleanup:
2828         EXIT;
2829
2830         if (req2)
2831                 ptlrpc_req_finished(req2);
2832
2833 #ifdef S_PDIROPS
2834         if (parent_lockh[1].cookie != 0)
2835                 ldlm_lock_decref(&parent_lockh[1], update_mode);
2836 #endif
2837         ldlm_lock_decref(&parent_lockh[0], LCK_PW);
2838         if (child_lockh.cookie != 0)
2839                 ldlm_lock_decref(&child_lockh, LCK_EX);
2840
2841         l_dput(de_old);
2842         l_dput(de_srcdir);
2843
2844         req->rq_status = rc;
2845         RETURN(0);
2846
2847 }
2848
2849 static int mds_reint_rename(struct mds_update_record *rec, int offset,
2850                             struct ptlrpc_request *req, struct lustre_handle *lockh)
2851 {
2852         struct obd_device *obd = req->rq_export->exp_obd;
2853         struct dentry *de_srcdir = NULL;
2854         struct dentry *de_tgtdir = NULL;
2855         struct dentry *de_old = NULL;
2856         struct dentry *de_new = NULL;
2857         struct inode *old_inode = NULL, *new_inode = NULL;
2858         struct mds_obd *mds = mds_req2mds(req);
2859         struct lustre_handle dlm_handles[7] = {{0},{0},{0},{0},{0},{0},{0}};
2860         struct mds_body *body = NULL;
2861         struct llog_create_locks *lcl = NULL;
2862         struct lov_mds_md *lmm = NULL;
2863         int rc = 0, cleanup_phase = 0;
2864
2865         void *handle = NULL;
2866         ENTRY;
2867
2868         LASSERT(offset == 1);
2869
2870         DEBUG_REQ(D_INODE, req, "parent "LPU64"/%u %s to "LPU64"/%u %s",
2871                   rec->ur_fid1->id, rec->ur_fid1->generation, rec->ur_name,
2872                   rec->ur_fid2->id, rec->ur_fid2->generation, rec->ur_tgt);
2873
2874         MDS_CHECK_RESENT(req, mds_reconstruct_generic(req));
2875
2876         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
2877                 DEBUG_REQ(D_HA, req, "rename replay\n");
2878                 memcpy(lustre_msg_buf(req->rq_repmsg, 2, 0),
2879                        lustre_msg_buf(req->rq_reqmsg, offset + 3, 0),
2880                        req->rq_repmsg->buflens[2]);
2881         }
2882
2883         MDS_UPDATE_COUNTER(mds, MDS_RENAME_COUNT);
2884
2885         if (rec->ur_namelen == 1) {
2886                 rc = mds_reint_rename_create_name(rec, offset, req);
2887                 RETURN(rc);
2888         }
2889
2890         if (rec->ur_fid2->mds != mds->mds_num) {
2891                 rc = mds_reint_rename_to_remote(rec, offset, req);
2892                 RETURN(rc);
2893         }
2894         
2895         rc = mds_get_parents_children_locked(obd, mds, rec->ur_fid1, &de_srcdir,
2896                                              rec->ur_fid2, &de_tgtdir, LCK_PW,
2897                                              rec->ur_name, rec->ur_namelen,
2898                                              &de_old, rec->ur_tgt,
2899                                              rec->ur_tgtlen, &de_new,
2900                                              dlm_handles, LCK_EX);
2901         if (rc)
2902                 GOTO(cleanup, rc);
2903
2904         cleanup_phase = 1; /* parent(s), children, locks */
2905
2906         old_inode = de_old->d_inode;
2907         new_inode = de_new->d_inode;
2908
2909         /* sanity check for src inode */
2910         if (de_old->d_flags & DCACHE_CROSS_REF) {
2911                 LASSERT(de_old->d_inode == NULL);
2912                                                                                                                                                                                                      
2913                 /*
2914                  * in the case of cross-ref dir, we can perform this check only
2915                  * if child and parent lie on the same mds. This is because
2916                  * otherwise they can have the same inodes.
2917                  */
2918                 if (de_old->d_mdsnum == mds->mds_num) {
2919                         if (de_old->d_inum == de_srcdir->d_inode->i_ino ||
2920                             de_old->d_inum == de_tgtdir->d_inode->i_ino)
2921                                 GOTO(cleanup, rc = -EINVAL);
2922                 }
2923         } else {
2924                 LASSERT(de_old->d_inode != NULL);
2925                 if (de_old->d_inode->i_ino == de_srcdir->d_inode->i_ino ||
2926                     de_old->d_inode->i_ino == de_tgtdir->d_inode->i_ino)
2927                         GOTO(cleanup, rc = -EINVAL);
2928         }
2929                                                                                                                                                                                                      
2930         /* sanity check for dest inode */
2931         if (de_new->d_flags & DCACHE_CROSS_REF) {
2932                 LASSERT(new_inode == NULL);
2933                                                                                                                                                                                                      
2934                 /* the same check about target dentry. */
2935                 if (de_new->d_mdsnum == mds->mds_num) {
2936                         if (de_new->d_inum == de_srcdir->d_inode->i_ino ||
2937                             de_new->d_inum == de_tgtdir->d_inode->i_ino)
2938                                 GOTO(cleanup, rc = -EINVAL);
2939                 }
2940                                                                                                                                                                                                      
2941                 /*
2942                  * regular files usualy do not have ->rename() implemented. But
2943                  * we handle only this case when @de_new is cross-ref entry,
2944                  * because in other cases it will be handled by vfs_rename().
2945                  */
2946                 if (de_old->d_inode && (!de_old->d_inode->i_op || 
2947                     !de_old->d_inode->i_op->rename))
2948                         GOTO(cleanup, rc = -EPERM);
2949         } else {
2950                 if (new_inode &&
2951                     (new_inode->i_ino == de_srcdir->d_inode->i_ino ||
2952                      new_inode->i_ino == de_tgtdir->d_inode->i_ino))
2953                         GOTO(cleanup, rc = -EINVAL);
2954         }
2955         
2956         /* check if inodes point to each other. */
2957         if (!(de_old->d_flags & DCACHE_CROSS_REF) &&
2958             !(de_new->d_flags & DCACHE_CROSS_REF) &&
2959             old_inode == new_inode)
2960                 GOTO(cleanup, rc = 0);
2961
2962         /* if we are about to remove the target at first, pass the EA of
2963          * that inode to client to perform and cleanup on OST */
2964         body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
2965         LASSERT(body != NULL);
2966
2967         /* child i_alloc_sem protects orphan_dec_test && is_orphan race */
2968         if (new_inode) 
2969                 DOWN_READ_I_ALLOC_SEM(new_inode);
2970         cleanup_phase = 2; /* up(&new_inode->i_sem) when finished */
2971
2972         if (new_inode && ((S_ISDIR(new_inode->i_mode) && 
2973             new_inode->i_nlink == 2) ||
2974             new_inode->i_nlink == 1)) {
2975                 if (mds_orphan_open_count(new_inode) > 0) {
2976                         /* need to lock pending_dir before transaction */
2977                         down(&mds->mds_pending_dir->d_inode->i_sem);
2978                         cleanup_phase = 3; /* up(&pending_dir->i_sem) */
2979                 } else if (S_ISREG(new_inode->i_mode)) {
2980                         mds_pack_inode2fid(obd, &body->fid1, new_inode);
2981                         mds_pack_inode2body(obd, body, new_inode);
2982                         mds_pack_md(obd, req->rq_repmsg, 1, body, new_inode, 
2983                                     MDS_PACK_MD_LOCK);
2984                  }
2985         }
2986
2987         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_RENAME_WRITE,
2988                        de_srcdir->d_inode->i_sb);
2989
2990 #if LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0)
2991         /* Check if we are moving old entry into its child. 2.6 does not
2992            check for this in vfs_rename() anymore */
2993         if (is_subdir(de_new, de_old))
2994                 GOTO(cleanup, rc = -EINVAL);
2995 #endif
2996         if (de_old->d_flags & DCACHE_CROSS_REF) {
2997                 rc = mds_add_local_dentry(rec, offset, req, de_old, de_tgtdir, 
2998                                           de_new);
2999                 if (rc)
3000                         GOTO(cleanup, rc);
3001                                                                                                                                                                                                      
3002                 rc = mds_del_local_dentry(rec, offset, req, de_old, de_srcdir, 
3003                                           de_old);
3004                 GOTO(cleanup, rc);
3005         }
3006
3007         lmm = lustre_msg_buf(req->rq_repmsg, 1, 0);
3008         handle = fsfilt_start_log(obd, de_tgtdir->d_inode, FSFILT_OP_RENAME,
3009                                   NULL, le32_to_cpu(lmm->lmm_stripe_count));
3010
3011         if (IS_ERR(handle))
3012                 GOTO(cleanup, rc = PTR_ERR(handle));
3013
3014         lock_kernel();
3015         de_old->d_fsdata = req;
3016         de_new->d_fsdata = req;
3017         rc = vfs_rename(de_srcdir->d_inode, de_old, de_tgtdir->d_inode, de_new);
3018         unlock_kernel();
3019
3020         if (rc == 0 && new_inode != NULL && new_inode->i_nlink == 0) {
3021                 if (mds_orphan_open_count(new_inode) > 0)
3022                         rc = mds_orphan_add_link(rec, obd, de_new);
3023
3024                 if (rc == 1)
3025                         GOTO(cleanup, rc = 0);
3026
3027                 if (!S_ISREG(new_inode->i_mode))
3028                         GOTO(cleanup, rc);
3029
3030                 if (!(body->valid & OBD_MD_FLEASIZE)) {
3031                         body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
3032                                         OBD_MD_FLATIME | OBD_MD_FLMTIME);
3033                 } else if (mds_log_op_unlink(obd, new_inode,
3034                                              lustre_msg_buf(req->rq_repmsg,1,0),
3035                                              req->rq_repmsg->buflens[1],
3036                                              lustre_msg_buf(req->rq_repmsg,2,0),
3037                                              req->rq_repmsg->buflens[2], 
3038                                              &lcl) > 0) {
3039                         body->valid |= OBD_MD_FLCOOKIE;
3040                 }
3041         }
3042
3043         GOTO(cleanup, rc);
3044 cleanup:
3045         rc = mds_finish_transno(mds, de_tgtdir ? de_tgtdir->d_inode : NULL,
3046                                 handle, req, rc, 0);
3047         switch (cleanup_phase) {
3048         case 3:
3049                 up(&mds->mds_pending_dir->d_inode->i_sem);
3050         case 2:
3051                 if (new_inode)
3052                         UP_READ_I_ALLOC_SEM(new_inode);
3053         case 1:
3054 #ifdef S_PDIROPS
3055                 if (dlm_handles[5].cookie != 0)
3056                         ldlm_lock_decref(&(dlm_handles[5]), LCK_PW);
3057                 if (dlm_handles[6].cookie != 0)
3058                         ldlm_lock_decref(&(dlm_handles[6]), LCK_PW);
3059 #endif
3060                 if (lcl != NULL)
3061                         ptlrpc_save_llog_lock(req, lcl);
3062
3063                 if (rc) {
3064                         if (dlm_handles[3].cookie != 0)
3065                                 ldlm_lock_decref(&(dlm_handles[3]), LCK_EX);
3066                         ldlm_lock_decref(&(dlm_handles[2]), LCK_EX);
3067                         ldlm_lock_decref(&(dlm_handles[1]), LCK_PW);
3068                         ldlm_lock_decref(&(dlm_handles[0]), LCK_PW);
3069                 } else {
3070                         if (dlm_handles[3].cookie != 0)
3071                                 ptlrpc_save_lock(req,&(dlm_handles[3]), LCK_EX);
3072                         ptlrpc_save_lock(req, &(dlm_handles[2]), LCK_EX);
3073                         ptlrpc_save_lock(req, &(dlm_handles[1]), LCK_PW);
3074                         ptlrpc_save_lock(req, &(dlm_handles[0]), LCK_PW);
3075                 }
3076                 l_dput(de_new);
3077                 l_dput(de_old);
3078                 l_dput(de_tgtdir);
3079                 l_dput(de_srcdir);
3080         case 0:
3081                 break;
3082         default:
3083                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
3084                 LBUG();
3085         }
3086         req->rq_status = rc;
3087         return 0;
3088 }
3089
3090 typedef int (*mds_reinter)(struct mds_update_record *, int offset,
3091                            struct ptlrpc_request *, struct lustre_handle *);
3092
3093 static mds_reinter reinters[REINT_MAX + 1] = {
3094         [REINT_SETATTR] mds_reint_setattr,
3095         [REINT_CREATE] mds_reint_create,
3096         [REINT_LINK] mds_reint_link,
3097         [REINT_UNLINK] mds_reint_unlink,
3098         [REINT_RENAME] mds_reint_rename,
3099         [REINT_OPEN] mds_open
3100 };
3101
3102 int mds_reint_rec(struct mds_update_record *rec, int offset,
3103                   struct ptlrpc_request *req, struct lustre_handle *lockh)
3104 {
3105         struct obd_device *obd = req->rq_export->exp_obd;
3106         struct lvfs_run_ctxt saved;
3107         int rc;
3108
3109         /* checked by unpacker */
3110         LASSERT(rec->ur_opcode <= REINT_MAX &&
3111                 reinters[rec->ur_opcode] != NULL);
3112
3113         push_ctxt(&saved, &obd->obd_lvfs_ctxt, &rec->ur_uc);
3114         rc = reinters[rec->ur_opcode] (rec, offset, req, lockh);
3115         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &rec->ur_uc);
3116
3117         return rc;
3118 }