Whamcloud - gitweb
- changes with names on exports and another fields in llite and mds. lov_exp is
[fs/lustre-release.git] / lustre / llite / namei.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  *
21  *  derived in small part from linux/fs/ext2/namei.c
22  *
23  *  Copyright (C) 1991, 1992  Linus Torvalds
24  *
25  *  Big-endian to little-endian byte-swapping/bitmaps by
26  *        David S. Miller (davem@caip.rutgers.edu), 1995
27  *  Directory entry file type support and forward compatibility hooks
28  *      for B-tree directories by Theodore Ts'o (tytso@mit.edu), 1998
29  */
30
31 #include <linux/fs.h>
32 #include <linux/sched.h>
33 #include <linux/mm.h>
34 #include <linux/smp_lock.h>
35 #include <linux/quotaops.h>
36 #include <linux/highmem.h>
37 #include <linux/pagemap.h>
38
39 #define DEBUG_SUBSYSTEM S_LLITE
40
41 #include <linux/obd_support.h>
42 #include <linux/lustre_lite.h>
43 #include <linux/lustre_dlm.h>
44 #include <linux/lustre_version.h>
45 #include "llite_internal.h"
46
47 /* methods */
48
49 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
50 static int ll_test_inode(struct inode *inode, unsigned long ino, void *opaque)
51 #else
52 static int ll_test_inode(struct inode *inode, void *opaque)
53 #endif
54 {
55         static int last_ino, last_gen, last_count;
56         struct lustre_md *md = opaque;
57
58         if (!(md->body->valid & (OBD_MD_FLGENER | OBD_MD_FLID))) {
59                 CERROR("MDS body missing inum or generation\n");
60                 return 0;
61         }
62
63         if (last_ino == id_ino(&md->body->id1) &&
64             last_gen == id_gen(&md->body->id1) &&
65             last_count < 500) {
66                 last_count++;
67         } else {
68                 if (last_count > 1)
69                         CDEBUG(D_VFSTRACE, "compared %u/%u %u times\n",
70                                last_ino, last_gen, last_count);
71                 last_count = 0;
72                 last_ino = id_ino(&md->body->id1);
73                 last_gen = id_gen(&md->body->id1);
74                 CDEBUG(D_VFSTRACE,
75                        "comparing inode %p ino "DLID4" to body "DLID4"\n",
76                        inode, OLID4(&ll_i2info(inode)->lli_id),
77                        OLID4(&md->body->id1));
78         }
79
80 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
81         if (inode->i_ino != id_ino(&md->body->id1))
82                 return 0;
83 #endif
84         if (inode->i_generation != id_gen(&md->body->id1))
85                 return 0;
86
87         if (id_group(&ll_i2info(inode)->lli_id) != id_group(&md->body->id1))
88                 return 0;
89         
90         /* apply the attributes in 'opaque' to this inode. */
91         ll_update_inode(inode, md);
92         return 1;
93 }
94
95 extern struct dentry_operations ll_d_ops;
96
97 int ll_unlock(__u32 mode, struct lustre_handle *lockh)
98 {
99         ENTRY;
100
101         ldlm_lock_decref(lockh, mode);
102
103         RETURN(0);
104 }
105
106 /*
107  * get an inode by inode number (already instantiated by the intent lookup).
108  * Returns inode or NULL.
109  */
110 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
111 int ll_set_inode(struct inode *inode, void *opaque)
112 {
113         ll_read_inode2(inode, opaque);
114         return 0;
115 }
116
117 struct inode *ll_iget(struct super_block *sb, ino_t hash,
118                       struct lustre_md *md)
119 {
120         struct inode *inode;
121
122         LASSERT(hash != 0);
123         inode = iget5_locked(sb, hash, ll_test_inode, ll_set_inode, md);
124
125         if (inode) {
126                 if (inode->i_state & I_NEW)
127                         unlock_new_inode(inode);
128                 CDEBUG(D_VFSTRACE, "inode: %lu/%u(%p)\n", inode->i_ino,
129                        inode->i_generation, inode);
130         }
131
132         return inode;
133 }
134 #else
135 struct inode *ll_iget(struct super_block *sb, ino_t hash,
136                       struct lustre_md *md)
137 {
138         struct inode *inode;
139         LASSERT(hash != 0);
140         inode = iget4(sb, hash, ll_test_inode, md);
141         if (inode)
142                 CDEBUG(D_VFSTRACE, "inode: %lu/%u(%p)\n", inode->i_ino,
143                        inode->i_generation, inode);
144         return inode;
145 }
146 #endif
147
148 int ll_mdc_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
149                         void *data, int flag)
150 {
151         int rc;
152         struct lustre_handle lockh;
153         ENTRY;
154
155         switch (flag) {
156         case LDLM_CB_BLOCKING:
157                 ldlm_lock2handle(lock, &lockh);
158                 rc = ldlm_cli_cancel(&lockh);
159                 if (rc < 0) {
160                         CDEBUG(D_INODE, "ldlm_cli_cancel: %d\n", rc);
161                         RETURN(rc);
162                 }
163                 break;
164         case LDLM_CB_CANCELING: {
165                 struct inode *inode = ll_inode_from_lock(lock);
166                 struct ll_inode_info *li = ll_i2info(inode);
167                 __u64 bits = lock->l_policy_data.l_inodebits.bits;
168
169                 /* For lookup locks: Invalidate all dentries associated with
170                    this inode, for UPDATE locks - invalidate directory pages */
171                 if (inode == NULL)
172                         break;
173
174                 if (bits & MDS_INODELOCK_UPDATE)
175                         clear_bit(LLI_F_HAVE_MDS_SIZE_LOCK,
176                                   &(ll_i2info(inode)->lli_flags));
177
178
179                 if (lock->l_resource->lr_name.name[0] != id_fid(&li->lli_id) ||
180                     lock->l_resource->lr_name.name[1] != id_group(&li->lli_id)) {
181                         LDLM_ERROR(lock, "data mismatch with object %lu/%lu",
182                                    (unsigned long)id_fid(&li->lli_id),
183                                    (unsigned long)id_group(&li->lli_id));
184                 }
185
186                 /* If lookup lock is cancelled, we just drop the dentry and
187                    this will cause us to reget data from MDS when we'd want to
188                    access this dentry/inode again. If this is lock on
189                    other parts of inode that is cancelled, we do not need to do
190                    much (but need to discard data from readdir, if any), since
191                    abscence of lock will cause ll_revalidate_it (called from
192                    stat() and similar functions) to renew the data anyway */
193                 if (S_ISDIR(inode->i_mode) &&
194                     (bits & MDS_INODELOCK_UPDATE)) {
195                         CDEBUG(D_INODE, "invalidating inode %lu/%u(%p)\n",
196                                inode->i_ino, inode->i_generation, inode);
197                         truncate_inode_pages(inode->i_mapping, 0);
198                 }
199
200                 if (inode->i_sb->s_root &&
201                     inode != inode->i_sb->s_root->d_inode &&
202                     (bits & MDS_INODELOCK_LOOKUP))
203                         ll_unhash_aliases(inode);
204                 iput(inode);
205                 break;
206         }
207         default:
208                 LBUG();
209         }
210
211         RETURN(0);
212 }
213
214 int ll_mdc_cancel_unused(struct lustre_handle *conn, struct inode *inode,
215                          int flags, void *opaque)
216 {
217         struct ll_inode_info *li = ll_i2info(inode);
218         struct ldlm_res_id res_id =
219                 { .name = {id_fid(&li->lli_id), id_group(&li->lli_id)} };
220         struct obd_device *obddev = class_conn2obd(conn);
221         ENTRY;
222         
223         RETURN(ldlm_cli_cancel_unused(obddev->obd_namespace, &res_id, flags,
224                                       opaque));
225 }
226
227 /* Search "inode"'s alias list for a dentry that has the same name and parent as
228  * de.  If found, return it.  If not found, return de. */
229 struct dentry *ll_find_alias(struct inode *inode, struct dentry *de)
230 {
231         struct list_head *tmp;
232
233         spin_lock(&dcache_lock);
234         list_for_each(tmp, &inode->i_dentry) {
235                 struct dentry *dentry = list_entry(tmp, struct dentry, d_alias);
236
237                 /* We are called here with 'de' already on the aliases list. */
238                 if (dentry == de) {
239                         CERROR("whoops\n");
240                         continue;
241                 }
242
243                 if (dentry->d_parent != de->d_parent)
244                         continue;
245
246                 if (dentry->d_name.len != de->d_name.len)
247                         continue;
248
249                 if (memcmp(dentry->d_name.name, de->d_name.name,
250                            de->d_name.len) != 0)
251                         continue;
252
253                 if (!list_empty(&dentry->d_lru))
254                         list_del_init(&dentry->d_lru);
255
256                 hlist_del_init(&dentry->d_hash);
257                 __d_rehash(dentry, 0); /* avoid taking dcache_lock inside */
258                 spin_unlock(&dcache_lock);
259                 atomic_inc(&dentry->d_count);
260                 iput(inode);
261                 dentry->d_flags &= ~DCACHE_LUSTRE_INVALID;
262                 CDEBUG(D_DENTRY, "alias dentry %*s (%p) parent %p inode %p "
263                        "refc %d\n", de->d_name.len, de->d_name.name, de,
264                        de->d_parent, de->d_inode, atomic_read(&de->d_count));
265                 return dentry;
266         }
267
268         spin_unlock(&dcache_lock);
269
270         return de;
271 }
272
273 static int lookup_it_finish(struct ptlrpc_request *request, int offset,
274                             struct lookup_intent *it, void *data)
275 {
276         struct it_cb_data *icbd = data;
277         struct dentry **de = icbd->icbd_childp;
278         struct inode *parent = icbd->icbd_parent;
279         struct ll_sb_info *sbi = ll_i2sbi(parent);
280         struct dentry *dentry = *de, *saved = *de;
281         struct inode *inode = NULL;
282         int rc;
283
284         /* NB 1 request reference will be taken away by ll_intent_lock()
285          * when I return */
286         if (!it_disposition(it, DISP_LOOKUP_NEG)) {
287                 ENTRY;
288
289                 rc = ll_prep_inode(sbi->ll_dt_exp, sbi->ll_md_exp,
290                                    &inode, request, offset, dentry->d_sb);
291                 if (rc)
292                         RETURN(rc);
293
294                 CDEBUG(D_DLMTRACE, "setting l_data to inode %p (%lu/%u)\n",
295                        inode, inode->i_ino, inode->i_generation);
296                 
297                 mdc_set_lock_data(NULL, &it->d.lustre.it_lock_handle, inode);
298                 
299                 /* If this is a stat, get the authoritative file size */
300                 if (it->it_op == IT_GETATTR && S_ISREG(inode->i_mode) &&
301                     ll_i2info(inode)->lli_smd != NULL) {
302                         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
303                         ldlm_error_t rc;
304
305                         LASSERT(lsm->lsm_object_id != 0);
306
307                         /* bug 2334: drop MDS lock before acquiring OST lock */
308                         ll_intent_drop_lock(it);
309
310                         rc = ll_glimpse_size(inode);
311                         if (rc) {
312                                 iput(inode);
313                                 RETURN(rc);
314                         }
315                 }
316
317                 dentry = *de = ll_find_alias(inode, dentry);
318         } else {
319                 ENTRY;
320         }
321
322         dentry->d_op = &ll_d_ops;
323         ll_set_dd(dentry);
324
325         if (dentry == saved)
326                 d_add(dentry, inode);
327
328         RETURN(0);
329 }
330
331 static struct dentry *ll_lookup_it(struct inode *parent, struct dentry *dentry,
332                                    struct nameidata *nd, struct lookup_intent *it,
333                                    int flags)
334 {
335         struct dentry *save = dentry, *retval;
336         struct lustre_id pid;
337         struct it_cb_data icbd;
338         struct ptlrpc_request *req = NULL;
339         struct lookup_intent lookup_it = { .it_op = IT_LOOKUP };
340         int rc;
341         ENTRY;
342
343         if (dentry->d_name.len > EXT3_NAME_LEN)
344                 RETURN(ERR_PTR(-ENAMETOOLONG));
345
346         CDEBUG(D_VFSTRACE, "VFS Op:name=%s,dir=%lu/%u(%p),intent=%s\n",
347                dentry->d_name.name, parent->i_ino, parent->i_generation,
348                parent, LL_IT2STR(it));
349
350         if (d_mountpoint(dentry))
351                 CERROR("Tell Peter, lookup on mtpt, it %s\n", LL_IT2STR(it));
352
353         if (nd != NULL)
354                 nd->mnt->mnt_last_used = jiffies;
355
356         ll_frob_intent(&it, &lookup_it);
357
358         icbd.icbd_childp = &dentry;
359         icbd.icbd_parent = parent;
360         ll_inode2id(&pid, parent);
361
362         rc = md_intent_lock(ll_i2mdexp(parent), &pid,
363                             dentry->d_name.name, dentry->d_name.len, NULL, 0,
364                             NULL, it, flags, &req, ll_mdc_blocking_ast);
365         if (rc < 0)
366                 GOTO(out, retval = ERR_PTR(rc));
367
368         rc = lookup_it_finish(req, 1, it, &icbd);
369         if (rc != 0) {
370                 ll_intent_release(it);
371                 GOTO(out, retval = ERR_PTR(rc));
372         }
373
374         ll_lookup_finish_locks(it, dentry);
375
376         if (nd &&
377             dentry->d_inode != NULL && dentry->d_inode->i_mode & S_ISUID &&
378             S_ISDIR(dentry->d_inode->i_mode) &&
379             (flags & LOOKUP_CONTINUE || (it->it_op & (IT_CHDIR | IT_OPEN))))
380                 ll_dir_process_mount_object(dentry, nd->mnt);
381
382         if (dentry == save)
383                 GOTO(out, retval = NULL);
384         else
385                 GOTO(out, retval = dentry);
386  out:
387         if (req)
388                 ptlrpc_req_finished(req);
389         if (dentry->d_inode)
390                 CDEBUG(D_INODE, "lookup 0x%p in %lu/%lu: %*s -> %lu/%lu\n",
391                        dentry,
392                        (unsigned long) parent->i_ino,
393                        (unsigned long) parent->i_generation,
394                        dentry->d_name.len, dentry->d_name.name,
395                        (unsigned long) dentry->d_inode->i_ino,
396                        (unsigned long) dentry->d_inode->i_generation);
397         else
398                 CDEBUG(D_INODE, "lookup 0x%p in %lu/%lu: %*s -> ??\n",
399                        dentry,
400                        (unsigned long) parent->i_ino,
401                        (unsigned long) parent->i_generation,
402                        dentry->d_name.len, dentry->d_name.name);
403         return retval;
404 }
405
406 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
407 static struct dentry *ll_lookup_nd(struct inode *parent, struct dentry *dentry,
408                                    struct nameidata *nd)
409 {
410         struct dentry *de;
411         ENTRY;
412
413         if (nd && nd->flags & LOOKUP_LAST && !(nd->flags & LOOKUP_LINK_NOTLAST))
414                 de = ll_lookup_it(parent, dentry, nd, &nd->intent, nd->flags);
415         else
416                 de = ll_lookup_it(parent, dentry, nd, NULL, 0);
417
418         RETURN(de);
419 }
420 #endif
421
422 /* We depend on "mode" being set with the proper file type/umask by now */
423 static struct inode *ll_create_node(struct inode *dir, const char *name,
424                                     int namelen, const void *data, int datalen,
425                                     int mode, __u64 extra,
426                                     struct lookup_intent *it)
427 {
428         struct inode *inode = NULL;
429         struct ptlrpc_request *request = NULL;
430         struct ll_sb_info *sbi = ll_i2sbi(dir);
431         int rc;
432         ENTRY;
433
434         LASSERT(it && it->d.lustre.it_disposition);
435
436         request = it->d.lustre.it_data;
437         rc = ll_prep_inode(sbi->ll_dt_exp, sbi->ll_md_exp,
438                            &inode, request, 1, dir->i_sb);
439         if (rc)
440                 GOTO(out, inode = ERR_PTR(rc));
441
442         LASSERT(list_empty(&inode->i_dentry));
443
444         /* We asked for a lock on the directory, but were granted a
445          * lock on the inode.  Since we finally have an inode pointer,
446          * stuff it in the lock. */
447         CDEBUG(D_DLMTRACE, "setting l_ast_data to inode %p (%lu/%u)\n",
448                inode, inode->i_ino, inode->i_generation);
449         mdc_set_lock_data(NULL, &it->d.lustre.it_lock_handle, inode);
450         EXIT;
451  out:
452         ptlrpc_req_finished(request);
453         return inode;
454 }
455
456 /*
457  * By the time this is called, we already have created the directory cache
458  * entry for the new file, but it is so far negative - it has no inode.
459  *
460  * We defer creating the OBD object(s) until open, to keep the intent and
461  * non-intent code paths similar, and also because we do not have the MDS
462  * inode number before calling ll_create_node() (which is needed for LOV),
463  * so we would need to do yet another RPC to the MDS to store the LOV EA
464  * data on the MDS.  If needed, we would pass the PACKED lmm as data and
465  * lmm_size in datalen (the MDS still has code which will handle that).
466  *
467  * If the create succeeds, we fill in the inode information
468  * with d_instantiate().
469  */
470 static int ll_create_it(struct inode *dir, struct dentry *dentry, int mode,
471                         struct lookup_intent *it)
472 {
473         struct inode *inode;
474         struct ptlrpc_request *request = it->d.lustre.it_data;
475         struct obd_export *md_exp = ll_i2mdexp(dir); 
476         int rc = 0;
477         ENTRY;
478
479         CDEBUG(D_VFSTRACE, "VFS Op:name=%s,dir=%lu/%u(%p),intent=%s\n",
480                dentry->d_name.name, dir->i_ino, dir->i_generation, dir,
481                LL_IT2STR(it));
482
483         rc = it_open_error(DISP_OPEN_CREATE, it);
484         if (rc)
485                 RETURN(rc);
486
487         mdc_store_inode_generation(md_exp, request, MDS_REQ_INTENT_REC_OFF, 1);
488         inode = ll_create_node(dir, dentry->d_name.name, dentry->d_name.len,
489                                NULL, 0, mode, 0, it);
490         if (IS_ERR(inode))
491                 RETURN(PTR_ERR(inode));
492
493         d_instantiate(dentry, inode);
494         RETURN(0);
495 }
496
497 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
498 static int ll_create_nd(struct inode *dir, struct dentry *dentry, int mode, struct nameidata *nd)
499 {
500         return ll_create_it(dir, dentry, mode, &nd->intent);
501 }
502 #endif
503
504 static void ll_update_times(struct ptlrpc_request *request, int offset,
505                             struct inode *inode)
506 {
507         struct mds_body *body = lustre_msg_buf(request->rq_repmsg, offset,
508                                                sizeof(*body));
509         LASSERT(body);
510
511         if (body->valid & OBD_MD_FLMTIME &&
512             body->mtime > LTIME_S(inode->i_mtime)) {
513                 CDEBUG(D_INODE, "setting ino %lu mtime from %lu to %u\n",
514                        inode->i_ino, LTIME_S(inode->i_mtime), body->mtime);
515                 LTIME_S(inode->i_mtime) = body->mtime;
516         }
517         if (body->valid & OBD_MD_FLCTIME &&
518             body->ctime > LTIME_S(inode->i_ctime))
519                 LTIME_S(inode->i_ctime) = body->ctime;
520 }
521
522 static int ll_mknod_raw(struct nameidata *nd, int mode, dev_t rdev)
523 {
524         struct ptlrpc_request *request = NULL;
525         struct inode *dir = nd->dentry->d_inode;
526         const char *name = nd->last.name;
527         int len = nd->last.len;
528         struct ll_sb_info *sbi = ll_i2sbi(dir);
529         struct mdc_op_data *op_data;
530         int err = -EMLINK;
531         ENTRY;
532
533         CDEBUG(D_VFSTRACE, "VFS Op:name=%s,dir=%lu/%u(%p)\n",
534                name, dir->i_ino, dir->i_generation, dir);
535
536         if (dir->i_nlink >= EXT3_LINK_MAX)
537                 RETURN(err);
538
539         mode &= ~current->fs->umask;
540
541         switch (mode & S_IFMT) {
542         case 0:
543         case S_IFREG:
544                 mode |= S_IFREG; /* for mode = 0 case, fallthrough */
545         case S_IFCHR:
546         case S_IFBLK:
547         case S_IFIFO:
548         case S_IFSOCK:
549                 OBD_ALLOC(op_data, sizeof(*op_data));
550                 if (op_data == NULL)
551                         RETURN(-ENOMEM);
552                 ll_prepare_mdc_data(op_data, dir, NULL, name, len, 0);
553                 err = md_create(sbi->ll_md_exp, op_data, NULL, 0, mode,
554                                 current->fsuid, current->fsgid, rdev,
555                                 &request);
556                 OBD_FREE(op_data, sizeof(*op_data));
557                 if (err == 0)
558                         ll_update_times(request, 0, dir);
559                 ptlrpc_req_finished(request);
560                 break;
561         case S_IFDIR:
562                 err = -EPERM;
563                 break;
564         default:
565                 err = -EINVAL;
566         }
567         RETURN(err);
568 }
569
570 static int ll_mknod(struct inode *dir, struct dentry *child,
571                     int mode, ll_dev_t rdev)
572 {
573         struct ptlrpc_request *request = NULL;
574         struct inode *inode = NULL;
575         const char *name = child->d_name.name;
576         int len = child->d_name.len;
577         struct ll_sb_info *sbi = ll_i2sbi(dir);
578         struct mdc_op_data *op_data;
579         int err = -EMLINK;
580         ENTRY;
581
582         CDEBUG(D_VFSTRACE, "VFS Op:name=%s,dir=%lu/%u(%p)\n",
583                name, dir->i_ino, dir->i_generation, dir);
584
585         if (dir->i_nlink >= EXT3_LINK_MAX)
586                 RETURN(err);
587
588         mode &= ~current->fs->umask;
589
590         switch (mode & S_IFMT) {
591         case 0:
592         case S_IFREG:
593                 mode |= S_IFREG; /* for mode = 0 case, fallthrough */
594         case S_IFCHR:
595         case S_IFBLK:
596         case S_IFIFO:
597         case S_IFSOCK:
598                 OBD_ALLOC(op_data, sizeof(*op_data));
599                 if (op_data == NULL)
600                         RETURN(-ENOMEM);
601                 ll_prepare_mdc_data(op_data, dir, NULL, name, len, 0);
602                 err = md_create(sbi->ll_md_exp, op_data, NULL, 0, mode,
603                                 current->fsuid, current->fsgid, rdev,
604                                 &request);
605                 OBD_FREE(op_data, sizeof(*op_data));
606                 if (err)
607                         GOTO(out_err, err);
608
609                 ll_update_times(request, 0, dir);
610                 
611                 err = ll_prep_inode(sbi->ll_dt_exp, sbi->ll_md_exp,
612                                     &inode, request, 0, child->d_sb);
613                 if (err)
614                         GOTO(out_err, err);
615                 break;
616         case S_IFDIR:
617                 RETURN(-EPERM);
618                 break;
619         default:
620                 RETURN(-EINVAL);
621         }
622
623         d_instantiate(child, inode);
624         EXIT;
625  out_err:
626         ptlrpc_req_finished(request);
627         return err;
628 }
629
630 static int ll_symlink_raw(struct nameidata *nd, const char *tgt)
631 {
632         struct inode *dir = nd->dentry->d_inode;
633         const char *name = nd->last.name;
634         int len = nd->last.len;
635         struct ptlrpc_request *request = NULL;
636         struct ll_sb_info *sbi = ll_i2sbi(dir);
637         struct mdc_op_data *op_data;
638         int err = -EMLINK;
639         ENTRY;
640
641         CDEBUG(D_VFSTRACE, "VFS Op:name=%s,dir=%lu/%u(%p),target=%s\n",
642                name, dir->i_ino, dir->i_generation, dir, tgt);
643
644         if (dir->i_nlink >= EXT3_LINK_MAX)
645                 RETURN(err);
646
647         OBD_ALLOC(op_data, sizeof(*op_data));
648         if (op_data == NULL)
649                 RETURN(-ENOMEM);
650         ll_prepare_mdc_data(op_data, dir, NULL, name, len, 0);
651         err = md_create(sbi->ll_md_exp, op_data,
652                         tgt, strlen(tgt) + 1, S_IFLNK | S_IRWXUGO,
653                         current->fsuid, current->fsgid, 0, &request);
654         OBD_FREE(op_data, sizeof(*op_data));
655         if (err == 0)
656                 ll_update_times(request, 0, dir);
657         
658         ptlrpc_req_finished(request);
659         RETURN(err);
660 }
661
662 static int ll_link_raw(struct nameidata *srcnd, struct nameidata *tgtnd)
663 {
664         struct inode *src = srcnd->dentry->d_inode;
665         struct inode *dir = tgtnd->dentry->d_inode;
666         const char *name = tgtnd->last.name;
667         int len = tgtnd->last.len;
668         struct ptlrpc_request *request = NULL;
669         struct mdc_op_data *op_data;
670         int err;
671         struct ll_sb_info *sbi = ll_i2sbi(dir);
672         ENTRY;
673
674         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),dir=%lu/%u(%p),target=%s\n",
675                src->i_ino, src->i_generation, src, dir->i_ino, dir->i_generation,
676                dir, name);
677
678         OBD_ALLOC(op_data, sizeof(*op_data));
679         if (op_data == NULL)
680                 RETURN(-ENOMEM);
681         ll_prepare_mdc_data(op_data, src, dir, name, len, 0);
682         err = md_link(sbi->ll_md_exp, op_data, &request);
683         OBD_FREE(op_data, sizeof(*op_data));
684         if (err == 0)
685                 ll_update_times(request, 0, dir);
686         ptlrpc_req_finished(request);
687         RETURN(err);
688 }
689
690
691 static int ll_mkdir_raw(struct nameidata *nd, int mode)
692 {
693         struct inode *dir = nd->dentry->d_inode;
694         const char *name = nd->last.name;
695         int len = nd->last.len;
696         struct ptlrpc_request *request = NULL;
697         struct ll_sb_info *sbi = ll_i2sbi(dir);
698         struct mdc_op_data *op_data;
699         int err = -EMLINK;
700         ENTRY;
701         CDEBUG(D_VFSTRACE, "VFS Op:name=%s,dir=%lu/%u(%p)\n",
702                name, dir->i_ino, dir->i_generation, dir);
703
704         mode = (mode & (S_IRWXUGO|S_ISVTX) & ~current->fs->umask) | S_IFDIR;
705         OBD_ALLOC(op_data, sizeof(*op_data));
706         if (op_data == NULL)
707                 RETURN(-ENOMEM);
708         ll_prepare_mdc_data(op_data, dir, NULL, name, len, 0);
709         err = md_create(sbi->ll_md_exp, op_data, NULL, 0, mode,
710                         current->fsuid, current->fsgid, 0, &request);
711         OBD_FREE(op_data, sizeof(*op_data));
712         if (err == 0)
713                 ll_update_times(request, 0, dir);
714         ptlrpc_req_finished(request);
715         RETURN(err);
716 }
717
718 static int ll_rmdir_raw(struct nameidata *nd)
719 {
720         struct inode *dir = nd->dentry->d_inode;
721         const char *name = nd->last.name;
722         int len = nd->last.len;
723         struct ptlrpc_request *request = NULL;
724         struct mdc_op_data *op_data;
725         int rc;
726         ENTRY;
727         CDEBUG(D_VFSTRACE, "VFS Op:name=%s,dir=%lu/%u(%p)\n",
728                name, dir->i_ino, dir->i_generation, dir);
729
730         OBD_ALLOC(op_data, sizeof(*op_data));
731         if (op_data == NULL)
732                 RETURN(-ENOMEM);
733         ll_prepare_mdc_data(op_data, dir, NULL, name, len, S_IFDIR);
734         rc = md_unlink(ll_i2sbi(dir)->ll_md_exp, op_data, &request);
735         OBD_FREE(op_data, sizeof(*op_data));
736         if (rc == 0)
737                 ll_update_times(request, 0, dir);
738         ptlrpc_req_finished(request);
739         RETURN(rc);
740 }
741
742 int ll_objects_destroy(struct ptlrpc_request *request,
743                        struct inode *dir, int offset)
744 {
745         struct mds_body *body;
746         struct lov_mds_md *eadata;
747         struct lov_stripe_md *lsm = NULL;
748         struct obd_trans_info oti = { 0 };
749         struct obdo *oa;
750         int rc;
751         ENTRY;
752
753         /* req is swabbed so this is safe */
754         body = lustre_msg_buf(request->rq_repmsg, 0, sizeof(*body));
755
756         if (!(body->valid & OBD_MD_FLEASIZE))
757                 RETURN(0);
758
759         if (body->eadatasize == 0) {
760                 CERROR("OBD_MD_FLEASIZE set but eadatasize zero\n");
761                 GOTO(out, rc = -EPROTO);
762         }
763
764         /* The MDS sent back the EA because we unlinked the last reference
765          * to this file. Use this EA to unlink the objects on the OST.
766          * It's opaque so we don't swab here; we leave it to obd_unpackmd() to
767          * check it is complete and sensible. */
768         eadata = lustre_swab_repbuf(request, 1, body->eadatasize, NULL);
769         LASSERT(eadata != NULL);
770         if (eadata == NULL) {
771                 CERROR("Can't unpack MDS EA data\n");
772                 GOTO(out, rc = -EPROTO);
773         }
774
775         rc = obd_unpackmd(ll_i2dtexp(dir), &lsm, eadata, body->eadatasize);
776         if (rc < 0) {
777                 CERROR("obd_unpackmd: %d\n", rc);
778                 GOTO(out, rc);
779         }
780         LASSERT(rc >= sizeof(*lsm));
781
782         oa = obdo_alloc();
783         if (oa == NULL)
784                 GOTO(out_free_memmd, rc = -ENOMEM);
785
786         oa->o_id = lsm->lsm_object_id;
787         oa->o_gr = lsm->lsm_object_gr;
788         oa->o_mode = body->mode & S_IFMT;
789         oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLGROUP;
790
791         if (body->valid & OBD_MD_FLCOOKIE) {
792                 int length = sizeof(struct llog_cookie) *
793                                 lsm->lsm_stripe_count;
794                 oa->o_valid |= OBD_MD_FLCOOKIE;
795                 oti.oti_logcookies =
796                         lustre_msg_buf(request->rq_repmsg, 2, length);
797                 if (oti.oti_logcookies == NULL) {
798                         oa->o_valid &= ~OBD_MD_FLCOOKIE;
799                         body->valid &= ~OBD_MD_FLCOOKIE;
800                 } else {
801                         /* copy llog cookies to request to replay unlink
802                          * so that the same llog file and records as those created
803                          * during fail can be re-created while doing replay 
804                          */
805                         if (offset >= 0)
806                                 memcpy(lustre_msg_buf(request->rq_reqmsg, offset, 0),
807                                        oti.oti_logcookies, length);
808                 }
809         }
810
811         rc = obd_destroy(ll_i2dtexp(dir), oa, lsm, &oti);
812         obdo_free(oa);
813         if (rc)
814                 CERROR("obd destroy objid "LPX64" error %d\n",
815                        lsm->lsm_object_id, rc);
816         EXIT;
817  out_free_memmd:
818         obd_free_memmd(ll_i2dtexp(dir), &lsm);
819  out:
820         return rc;
821 }
822
823 static int ll_unlink_raw(struct nameidata *nd)
824 {
825         struct inode *dir = nd->dentry->d_inode;
826         const char *name = nd->last.name;
827         int len = nd->last.len;
828         struct ptlrpc_request *request = NULL;
829         struct mdc_op_data *op_data;
830         int rc;
831         ENTRY;
832         CDEBUG(D_VFSTRACE, "VFS Op:name=%s,dir=%lu/%u(%p)\n",
833                name, dir->i_ino, dir->i_generation, dir);
834
835         OBD_ALLOC(op_data, sizeof(*op_data));
836         if (op_data == NULL)
837                 RETURN(-ENOMEM);
838         ll_prepare_mdc_data(op_data, dir, NULL, name, len, 0);
839         rc = md_unlink(ll_i2sbi(dir)->ll_md_exp, op_data, &request);
840         OBD_FREE(op_data, sizeof(*op_data));
841         if (rc)
842                 GOTO(out, rc);
843         ll_update_times(request, 0, dir);
844         
845         rc = ll_objects_destroy(request, dir, 2);
846         EXIT;
847 out:
848         ptlrpc_req_finished(request);
849         return rc;
850 }
851
852 static int ll_rename_raw(struct nameidata *oldnd, struct nameidata *newnd)
853 {
854         struct inode *src = oldnd->dentry->d_inode;
855         struct inode *tgt = newnd->dentry->d_inode;
856         const char *oldname = oldnd->last.name;
857         int oldlen  = oldnd->last.len;
858         const char *newname = newnd->last.name;
859         int newlen  = newnd->last.len;
860         struct ptlrpc_request *request = NULL;
861         struct ll_sb_info *sbi = ll_i2sbi(src);
862         struct mdc_op_data *op_data;
863         int err;
864         ENTRY;
865         CDEBUG(D_VFSTRACE, "VFS Op:oldname=%s, src_dir=%lu/%u(%p), newname=%s, "
866                "tgt_dir=%lu/%u(%p)\n", oldname, src->i_ino, src->i_generation,
867                src, newname, tgt->i_ino, tgt->i_generation, tgt);
868
869         OBD_ALLOC(op_data, sizeof(*op_data));
870         if (op_data == NULL)
871                 RETURN(-ENOMEM);
872         ll_prepare_mdc_data(op_data, src, tgt, NULL, 0, 0);
873         err = md_rename(sbi->ll_md_exp, op_data, oldname, oldlen,
874                         newname, newlen, &request);
875         OBD_FREE(op_data, sizeof(*op_data));
876         if (!err) {
877                 ll_update_times(request, 0, src);
878                 ll_update_times(request, 0, tgt);
879                 err = ll_objects_destroy(request, src, 3);
880         }
881
882         ptlrpc_req_finished(request);
883         RETURN(err);
884 }
885
886 struct inode_operations ll_dir_inode_operations = {
887         .link_raw           = ll_link_raw,
888         .unlink_raw         = ll_unlink_raw,
889         .symlink_raw        = ll_symlink_raw,
890         .mkdir_raw          = ll_mkdir_raw,
891         .rmdir_raw          = ll_rmdir_raw,
892         .mknod_raw          = ll_mknod_raw,
893         .mknod              = ll_mknod,
894         .rename_raw         = ll_rename_raw,
895         .setattr            = ll_setattr,
896         .setattr_raw        = ll_setattr_raw,
897 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
898         .create_it          = ll_create_it,
899         .lookup_it          = ll_lookup_it,
900         .revalidate_it      = ll_inode_revalidate_it,
901 #else
902         .lookup             = ll_lookup_nd,
903         .create             = ll_create_nd,
904         .getattr_it         = ll_getattr,
905 #endif
906 };