Whamcloud - gitweb
merge b_devel into HEAD, which will become 0.7.3
[fs/lustre-release.git] / lustre / llite / namei.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  *
21  *  derived in small part from linux/fs/ext2/namei.c
22  *
23  *  Copyright (C) 1991, 1992  Linus Torvalds
24  *
25  *  Big-endian to little-endian byte-swapping/bitmaps by
26  *        David S. Miller (davem@caip.rutgers.edu), 1995
27  *  Directory entry file type support and forward compatibility hooks
28  *      for B-tree directories by Theodore Ts'o (tytso@mit.edu), 1998
29  */
30
31 #include <linux/fs.h>
32 #include <linux/sched.h>
33 #include <linux/mm.h>
34 #include <linux/smp_lock.h>
35 #include <linux/quotaops.h>
36 #include <linux/highmem.h>
37 #include <linux/pagemap.h>
38
39 #define DEBUG_SUBSYSTEM S_LLITE
40
41 #include <linux/obd_support.h>
42 #include <linux/lustre_lite.h>
43 #include <linux/lustre_dlm.h>
44 #include "llite_internal.h"
45
46 /* methods */
47
48 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
49 static int ll_test_inode(struct inode *inode, unsigned long ino, void *opaque)
50 #else
51 static int ll_test_inode(struct inode *inode, void *opaque)
52 #endif
53 {
54         struct lustre_md *md = opaque;
55
56         if (!(md->body->valid & (OBD_MD_FLGENER | OBD_MD_FLID)))
57                 CERROR("invalid generation\n");
58         CDEBUG(D_VFSTRACE, "comparing inode %p ino %lu/%u to body %u/%u\n",
59                inode, inode->i_ino, inode->i_generation, 
60                md->body->ino, md->body->generation);
61
62         if (inode->i_generation != md->body->generation)
63                 return 0;
64
65         /* Apply the attributes in 'opaque' to this inode */
66         ll_update_inode(inode, md->body, md->lsm);
67         return 1;
68 }
69
70 extern struct dentry_operations ll_d_ops;
71
72 int ll_unlock(__u32 mode, struct lustre_handle *lockh)
73 {
74         ENTRY;
75
76         ldlm_lock_decref(lockh, mode);
77
78         RETURN(0);
79 }
80
81 /* Get an inode by inode number (already instantiated by the intent lookup).
82  * Returns inode or NULL
83  */
84 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
85 int ll_set_inode(struct inode *inode, void *opaque)
86 {
87         ll_read_inode2(inode, opaque);
88         return 0;
89 }
90 struct inode *ll_iget(struct super_block *sb, ino_t hash,
91                       struct lustre_md *md)
92 {
93         struct inode *inode;
94
95         LASSERT(hash != 0);
96         inode = iget5_locked(sb, hash, ll_test_inode, ll_set_inode, md);
97
98         if (!inode)
99                 return (NULL);              /* removed ERR_PTR(-ENOMEM) -eeb */
100
101         if (inode->i_state & I_NEW)
102                 unlock_new_inode(inode);
103
104         // XXX Coda always fills inodes, should Lustre?
105         return inode;
106 }
107 #else
108 struct inode *ll_iget(struct super_block *sb, ino_t hash,
109                       struct lustre_md *md)
110 {
111         struct inode *inode;
112         LASSERT(hash != 0);
113         inode = iget4(sb, hash, ll_test_inode, md);
114         CDEBUG(D_VFSTRACE, "inode: %lu/%u(%p)\n", inode->i_ino,
115                inode->i_generation, inode);
116         return inode;
117 }
118 #endif
119
120 static int ll_intent_to_lock_mode(struct lookup_intent *it)
121 {
122         /* CREAT needs to be tested before open (both could be set) */
123         if (it->it_op & IT_CREAT)
124                 return LCK_PW;
125         else if (it->it_op & (IT_READDIR | IT_GETATTR | IT_OPEN | IT_LOOKUP))
126                 return LCK_PR;
127
128         LBUG();
129         RETURN(-EINVAL);
130 }
131
132 int ll_it_open_error(int phase, struct lookup_intent *it)
133 {
134         if (it_disposition(it, DISP_OPEN_OPEN)) {
135                 if (phase == DISP_OPEN_OPEN)
136                         return it->it_status;
137                 else
138                         return 0;
139         }
140
141         if (it_disposition(it, DISP_OPEN_CREATE)) {
142                 if (phase == DISP_OPEN_CREATE)
143                         return it->it_status;
144                 else
145                         return 0;
146         }
147
148         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
149                 if (phase == DISP_LOOKUP_EXECD)
150                         return it->it_status;
151                 else
152                         return 0;
153         }
154         CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
155         LBUG();
156         return 0;
157 }
158
159 int ll_mdc_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
160                         void *data, int flag)
161 {
162         int rc;
163         struct lustre_handle lockh;
164         struct inode *inode = lock->l_data;
165         ENTRY;
166
167         switch (flag) {
168         case LDLM_CB_BLOCKING:
169                 ldlm_lock2handle(lock, &lockh);
170                 rc = ldlm_cli_cancel(&lockh);
171                 if (rc < 0) {
172                         CDEBUG(D_INODE, "ldlm_cli_cancel: %d\n", rc);
173                         RETURN(rc);
174                 }
175                 break;
176         case LDLM_CB_CANCELING: {
177                 /* Invalidate all dentries associated with this inode */
178                 if (inode == NULL)
179                         break;
180                 if (lock->l_resource->lr_name.name[0] != inode->i_ino ||
181                     lock->l_resource->lr_name.name[1] != inode->i_generation) {
182                         LDLM_ERROR(lock, "data mismatch with ino %lu/%u",
183                                    inode->i_ino, inode->i_generation);
184                 }
185                 if (S_ISDIR(inode->i_mode)) {
186                         CDEBUG(D_INODE, "invalidating inode %lu\n",
187                                inode->i_ino);
188
189                         ll_invalidate_inode_pages(inode);
190                 }
191
192 #warning FIXME: we should probably free this inode if there are no aliases
193                 if (inode->i_sb->s_root &&
194                     inode != inode->i_sb->s_root->d_inode)
195                         ll_unhash_aliases(inode);
196                 break;
197         }
198         default:
199                 LBUG();
200         }
201
202         RETURN(0);
203 }
204
205 int ll_mdc_cancel_unused(struct lustre_handle *conn, struct inode *inode,
206                          int flags, void *opaque)
207 {
208         struct ldlm_res_id res_id =
209                 { .name = {inode->i_ino, inode->i_generation} };
210         struct obd_device *obddev = class_conn2obd(conn);
211         ENTRY;
212         RETURN(ldlm_cli_cancel_unused(obddev->obd_namespace, &res_id, flags,
213                                       opaque));
214 }
215
216 void ll_prepare_mdc_op_data(struct mdc_op_data *data,
217                             struct inode *i1,
218                             struct inode *i2,
219                             const char *name,
220                             int namelen,
221                             int mode)
222 {
223         LASSERT(i1);
224
225         data->ino1 = i1->i_ino;
226         data->gen1 = i1->i_generation;
227         data->typ1 = i1->i_mode & S_IFMT;
228         data->gid1 = i1->i_gid;
229
230         if (i2) {
231                 data->ino2 = i2->i_ino;
232                 data->gen2 = i2->i_generation;
233                 data->typ2 = i2->i_mode & S_IFMT;
234                 data->gid2 = i2->i_gid;
235         } else {
236                 data->ino2 = 0;
237         }
238
239         data->name = name;
240         data->namelen = namelen;
241         data->mode = mode;
242 }
243
244 /* 
245  *This long block is all about fixing up the local state so that it is
246  *correct as of the moment _before_ the operation was applied; that
247  *way, the VFS will think that everything is normal and call Lustre's
248  *regular VFS methods.
249  *
250  * If we're performing a creation, that means that unless the creation
251  * failed with EEXIST, we should fake up a negative dentry.
252  *
253  * For everything else, we want to lookup to succeed.
254  *
255  * One additional note: if CREATE or OPEN succeeded, we add an extra
256  * reference to the request because we need to keep it around until
257  * ll_create/ll_open gets called.
258  *
259  * The server will return to us, in it_disposition, an indication of
260  * exactly what it_status refers to.
261  *
262  * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
263  * otherwise if DISP_OPEN_CREATE is set, then it status is the
264  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
265  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
266  * was successful.
267  *
268  * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the child
269  * lookup.
270  */
271 int ll_intent_lock(struct inode *parent, struct dentry **de,
272                    struct lookup_intent *it, int flags, intent_finish_cb intent_finish)
273 {
274         struct dentry *dentry = *de;
275         struct inode *inode = dentry->d_inode;
276         struct ll_sb_info *sbi = ll_i2sbi(parent);
277         struct lustre_handle lockh;
278         struct lookup_intent lookup_it = { .it_op = IT_LOOKUP };
279         struct ptlrpc_request *request;
280         int rc = 0;
281         struct mds_body *mds_body;
282         int mode;
283         obd_id ino = 0;
284         ENTRY;
285
286 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
287         if (it && it->it_magic != INTENT_MAGIC) { 
288                 CERROR("WARNING: uninitialized intent\n");
289                 LBUG();
290                 intent_init(it, IT_LOOKUP, 0);
291         }
292         if (it->it_op == IT_GETATTR || 
293             it->it_op == 0)
294                 it->it_op = IT_LOOKUP;
295         
296 #endif
297         if (!it ||it->it_op == IT_GETXATTR)
298                 it = &lookup_it;
299
300         it->it_op_release = ll_intent_release;
301
302         CDEBUG(D_DLMTRACE, "name: %*s, intent: %s\n", dentry->d_name.len,
303                dentry->d_name.name, ldlm_it2str(it->it_op));
304         
305         if (dentry->d_name.len > EXT2_NAME_LEN)
306                 RETURN(-ENAMETOOLONG);
307
308         /* This function may be called twice, we only once want to
309            execute the request associated with the intent. If it was
310            done already, we skip past this and use the results. */ 
311         if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
312                 struct mdc_op_data op_data;
313
314                 ll_prepare_mdc_op_data(&op_data, parent, dentry->d_inode,
315                                        dentry->d_name.name, dentry->d_name.len,
316                                        0);
317
318                 rc = mdc_enqueue(&sbi->ll_mdc_conn, LDLM_PLAIN, it,
319                                  ll_intent_to_lock_mode(it), &op_data,
320                                  &lockh, NULL, 0, ldlm_completion_ast,
321                                  ll_mdc_blocking_ast, NULL);
322                 if (rc < 0)
323                         RETURN(rc);
324                 memcpy(it->it_lock_handle, &lockh, sizeof(lockh));
325         }
326         request = it->it_data;
327         LASSERT(request != NULL);
328
329         /* non-zero it_disposition indicates that the server performed the
330          * intent on our behalf. */
331         LASSERT(it_disposition(it, DISP_IT_EXECD));
332
333                 
334         mds_body = lustre_msg_buf(request->rq_repmsg, 1, sizeof(*mds_body));
335         LASSERT(mds_body != NULL);           /* mdc_enqueue checked */
336         LASSERT_REPSWABBED(request, 1); /* mdc_enqueue swabbed */
337
338         /* XXX everything with fids please, no ino's inode's etc */
339         ino = mds_body->fid1.id;
340         mode = mds_body->mode;
341
342         /*We were called from revalidate2: did we find the same inode?*/
343         if (inode && 
344             (ino != inode->i_ino ||
345              mds_body->fid1.generation != inode->i_generation)) {
346                 it_set_disposition(it, DISP_ENQ_COMPLETE);
347                 RETURN(-ESTALE);
348         }
349
350         /* If we're doing an IT_OPEN which did not result in an actual
351          * successful open, then we need to remove the bit which saves
352          * this request for unconditional replay. */
353         if (it->it_op & IT_OPEN) {
354                 if (!it_disposition(it, DISP_OPEN_OPEN) ||
355                     it->it_status != 0) {
356                         unsigned long flags;
357                 
358                         spin_lock_irqsave (&request->rq_lock, flags);
359                         request->rq_replay = 0;
360                         spin_unlock_irqrestore (&request->rq_lock, flags);
361                 }
362         }
363
364         rc = ll_it_open_error(DISP_LOOKUP_EXECD, it);
365         if (rc)
366                 GOTO(drop_req, rc);
367         
368         /* keep requests around for the multiple phases of the call
369          * this shows the DISP_XX must guarantee we make it into the call 
370          */ 
371         if (it_disposition(it, DISP_OPEN_CREATE))
372                 ptlrpc_request_addref(request);
373         if (it_disposition(it, DISP_OPEN_OPEN))
374                 ptlrpc_request_addref(request);
375         
376         if (it->it_op & IT_CREAT) {
377                 /* XXX this belongs in ll_create_iit */
378         } else if (it->it_op == IT_OPEN) {
379                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
380         } else 
381                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
382
383         if (intent_finish != NULL) {
384                 struct lustre_handle old_lock;
385                 struct ldlm_lock *lock;
386
387                 rc = intent_finish(request, parent, de, it, 1, ino);
388                 dentry = *de; /* intent_finish may change *de */
389                 inode = dentry->d_inode;
390                 if (rc != 0)
391                         GOTO(drop_lock, rc);
392
393                 /* The intent processing may well have given us a lock different
394                  * from the one we requested.  If we already have a matching
395                  * lock, then cancel the new one.  (We have to do this here,
396                  * instead of in mdc_enqueue, because we need to use the child's
397                  * inode as the l_data to match, and that's not available until
398                  * intent_finish has performed the iget().) */
399                 lock = ldlm_handle2lock(&lockh);
400                 if (lock) {
401                         LDLM_DEBUG(lock, "matching against this");
402                         LDLM_LOCK_PUT(lock);
403                         memcpy(&old_lock, &lockh, sizeof(lockh));
404                         if (ldlm_lock_match(NULL,
405                                             LDLM_FL_BLOCK_GRANTED |
406                                             LDLM_FL_MATCH_DATA,
407                                             NULL, LDLM_PLAIN, NULL, 0, LCK_NL,
408                                             inode, &old_lock)) {
409                                 ldlm_lock_decref_and_cancel(&lockh,
410                                                             it->it_lock_mode);
411                                 memcpy(&lockh, &old_lock, sizeof(old_lock));
412                                 memcpy(it->it_lock_handle, &lockh,
413                                        sizeof(lockh));
414                         }
415                 }
416
417         }
418         ptlrpc_req_finished(request);
419
420         CDEBUG(D_DENTRY, "D_IT dentry %p intent: %s status %d disp %x\n",
421                dentry, ldlm_it2str(it->it_op), it->it_status, it->it_disposition);
422         
423         /* drop IT_LOOKUP locks */
424         if (it->it_op == IT_LOOKUP)
425                 ll_intent_release(it);
426         RETURN(rc);
427
428  drop_lock:
429         ll_intent_release(it);
430  drop_req:
431         ptlrpc_req_finished(request);
432         RETURN(rc);
433 }
434
435 /* Search "inode"'s alias list for a dentry that has the same name and parent as
436  * de.  If found, return it.  If not found, return de. */
437 struct dentry *ll_find_alias(struct inode *inode, struct dentry *de)
438 {
439         struct list_head *tmp;
440
441         spin_lock(&dcache_lock);
442         list_for_each(tmp, &inode->i_dentry) {
443                 struct dentry *dentry = list_entry(tmp, struct dentry, d_alias);
444
445                 /* We are called here with 'de' already on the aliases list. */
446                 if (dentry == de) {
447                         CERROR("whoops\n");
448                         continue;
449                 }
450
451                 if (dentry->d_parent != de->d_parent)
452                         continue;
453
454                 if (dentry->d_name.len != de->d_name.len)
455                         continue;
456
457                 if (memcmp(dentry->d_name.name, de->d_name.name,
458                            de->d_name.len) != 0)
459                         continue;
460
461                 if (!list_empty(&dentry->d_lru))
462                         list_del_init(&dentry->d_lru);
463
464                 hlist_del_init(&dentry->d_hash);
465                 __d_rehash(dentry, 0); /* avoid taking dcache_lock inside */
466                 spin_unlock(&dcache_lock);
467                 atomic_inc(&dentry->d_count);
468                 iput(inode);
469                 dentry->d_flags &= ~DCACHE_LUSTRE_INVALID;
470                 return dentry;
471         }
472
473         spin_unlock(&dcache_lock);
474
475         return de;
476 }
477
478 static int
479 lookup2_finish(struct ptlrpc_request *request,
480                struct inode *parent, struct dentry **de,
481                struct lookup_intent *it, int offset, obd_id ino)
482 {
483         struct ll_sb_info *sbi = ll_i2sbi(parent);
484         struct dentry *dentry = *de, *saved = *de;
485         struct inode *inode = NULL;
486         int rc;
487
488         /* NB 1 request reference will be taken away by ll_intent_lock()
489          * when I return */
490         if (!it_disposition(it, DISP_LOOKUP_NEG)) {
491                 struct lustre_md md;
492                 ENTRY;
493
494                 rc =mdc_req2lustre_md(request, offset, &sbi->ll_osc_conn, &md);
495                 if (rc) 
496                         RETURN(rc);
497
498                 inode = ll_iget(dentry->d_sb, ino, &md);
499                 if (!inode) {
500                         /* free the lsm if we allocated one above */
501                         if (md.lsm != NULL)
502                                 obd_free_memmd(&sbi->ll_osc_conn, &md.lsm);
503                         RETURN(-ENOMEM);
504                 } else if (md.lsm != NULL &&
505                            ll_i2info(inode)->lli_smd != md.lsm) {
506                         obd_free_memmd(&sbi->ll_osc_conn, &md.lsm);
507                 }
508
509                 /* If this is a stat, get the authoritative file size */
510                 if (it->it_op == IT_GETATTR && S_ISREG(inode->i_mode) &&
511                     ll_i2info(inode)->lli_smd != NULL) {
512                         struct ldlm_extent extent = {0, OBD_OBJECT_EOF};
513                         struct lustre_handle lockh = {0};
514                         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
515                         ldlm_error_t rc;
516
517                         LASSERT(lsm->lsm_object_id != 0);
518
519                         rc = ll_extent_lock(NULL, inode, lsm, LCK_PR, &extent,
520                                             &lockh);
521                         if (rc != ELDLM_OK) {
522                                 iput(inode);
523                                 RETURN(-EIO);
524                         }
525                         ll_extent_unlock(NULL, inode, lsm, LCK_PR, &lockh);
526                 }
527
528                 dentry = *de = ll_find_alias(inode, dentry);
529
530                 /* We asked for a lock on the directory, and may have been
531                  * granted a lock on the inode.  Just in case, fixup the data
532                  * pointer. */
533                 CDEBUG(D_DLMTRACE, "setting l_data to inode %p (%lu/%u)\n",
534                        inode, inode->i_ino, inode->i_generation);
535                 ldlm_lock_set_data((struct lustre_handle*)it->it_lock_handle,
536                                    inode);
537         } else {
538                 ENTRY;
539         }
540
541         dentry->d_op = &ll_d_ops;
542         ll_set_dd(dentry);
543
544         if (dentry == saved)
545                 d_add(dentry, inode);
546
547         RETURN(0);
548 }
549
550 static struct dentry *ll_lookup_it(struct inode *parent, struct dentry *dentry,
551                                    struct lookup_intent *it, int flags)
552 {
553         struct dentry *save = dentry, *retval;
554         int rc;
555         ENTRY;
556
557         CDEBUG(D_VFSTRACE, "VFS Op:name=%s,dir=%lu/%u(%p),intent=%s\n",
558                dentry->d_name.name, parent->i_ino, parent->i_generation,
559                parent, LL_IT2STR(it));
560
561         if (d_mountpoint(dentry)) { 
562                 CERROR("Tell Peter, lookup on mtpt, it %s\n", LL_IT2STR(it));
563         }
564
565         rc = ll_intent_lock(parent, &dentry, it, flags, lookup2_finish);
566         if (rc < 0) {
567                 CDEBUG(D_INFO, "ll_intent_lock: %d\n", rc);
568                 GOTO(out, retval = ERR_PTR(rc));
569         }
570
571         if (dentry == save)
572                 GOTO(out, retval = NULL);
573         else
574                 GOTO(out, retval = dentry);
575  out:
576         return retval;
577 }
578
579 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
580 static struct dentry *ll_lookup_nd(struct inode *parent, struct dentry *dentry, 
581                                    struct nameidata *nd)
582 {
583         struct dentry *de;
584         ENTRY;
585
586         if (nd->flags & LOOKUP_LAST && !(nd->flags & LOOKUP_LINK_NOTLAST))
587                 de = ll_lookup_it(parent, dentry, &nd->it, nd->flags);
588         else 
589                 de = ll_lookup_it(parent, dentry, NULL, 0);
590
591         RETURN(de);
592 }
593 #endif
594
595 static int ll_mdc_unlink(struct inode *dir, struct inode *child, __u32 mode,
596                          const char *name, int len)
597 {
598         struct ptlrpc_request *request = NULL;
599         struct mds_body *body;
600         struct lov_mds_md *eadata;
601         struct lov_stripe_md *lsm = NULL;
602         struct obd_trans_info oti = { 0 };
603         struct mdc_op_data op_data;
604         struct obdo *oa;
605         int rc;
606         ENTRY;
607
608         ll_prepare_mdc_op_data(&op_data, dir, child, name, len, mode);
609         rc = mdc_unlink(&ll_i2sbi(dir)->ll_mdc_conn, &op_data, &request);
610         if (rc)
611                 GOTO(out, rc);
612         /* req is swabbed so this is safe */
613         body = lustre_msg_buf(request->rq_repmsg, 0, sizeof(*body));
614
615         if (!(body->valid & OBD_MD_FLEASIZE))
616                 GOTO(out, rc = 0);
617
618         if (body->eadatasize == 0) {
619                 CERROR("OBD_MD_FLEASIZE set but eadatasize zero\n");
620                 GOTO(out, rc = -EPROTO);
621         }
622
623         /* The MDS sent back the EA because we unlinked the last reference
624          * to this file. Use this EA to unlink the objects on the OST.
625          * It's opaque so we don't swab here; we leave it to obd_unpackmd() to
626          * check it is complete and sensible. */
627         eadata = lustre_swab_repbuf(request, 1, body->eadatasize, NULL);
628         LASSERT(eadata != NULL);
629         if (eadata == NULL) {
630                 CERROR("Can't unpack MDS EA data\n");
631                 GOTO(out, rc = -EPROTO);
632         }
633
634         rc = obd_unpackmd(ll_i2obdconn(dir), &lsm, eadata, body->eadatasize);
635         if (rc < 0) {
636                 CERROR("obd_unpackmd: %d\n", rc);
637                 GOTO(out, rc);
638         }
639         LASSERT(rc >= sizeof(*lsm));
640
641         oa = obdo_alloc();
642         if (oa == NULL)
643                 GOTO(out_free_memmd, rc = -ENOMEM);
644
645         oa->o_id = lsm->lsm_object_id;
646         oa->o_mode = body->mode & S_IFMT;
647         oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE;
648
649         if (body->valid & OBD_MD_FLCOOKIE) {
650                 oa->o_valid |= OBD_MD_FLCOOKIE;
651                 oti.oti_logcookies = lustre_msg_buf(request->rq_repmsg, 3,
652                                                     body->eadatasize);
653         }
654
655         rc = obd_destroy(ll_i2obdconn(dir), oa, lsm, &oti);
656         obdo_free(oa);
657         if (rc)
658                 CERROR("obd destroy objid 0x"LPX64" error %d\n",
659                        lsm->lsm_object_id, rc);
660  out_free_memmd:
661         obd_free_memmd(ll_i2obdconn(dir), &lsm);
662  out:
663         ptlrpc_req_finished(request);
664         return rc;
665 }
666
667 /* We depend on "mode" being set with the proper file type/umask by now */
668 static struct inode *ll_create_node(struct inode *dir, const char *name,
669                                     int namelen, const void *data, int datalen,
670                                     int mode, __u64 extra,
671                                     struct lookup_intent *it)
672 {
673         struct inode *inode;
674         struct ptlrpc_request *request = NULL;
675         struct ll_sb_info *sbi = ll_i2sbi(dir);
676         struct lustre_md md;
677         int rc;
678         ENTRY;
679
680         LASSERT(it && it->it_disposition);
681
682         ll_invalidate_inode_pages(dir);
683
684         request = it->it_data;
685         rc = mdc_req2lustre_md(request, 1, &sbi->ll_osc_conn, &md);
686         if (rc) { 
687                 GOTO(out, inode = ERR_PTR(rc));
688         }
689
690         inode = ll_iget(dir->i_sb, md.body->ino, &md);
691         if (!inode || is_bad_inode(inode)) {
692                 /* XXX might need iput() for bad inode */
693                 int rc = -EIO;
694                 CERROR("new_inode -fatal: rc %d\n", rc);
695                 LBUG();
696                 GOTO(out, rc);
697         }
698         LASSERT(list_empty(&inode->i_dentry));
699
700         CDEBUG(D_DLMTRACE, "setting l_data to inode %p (%lu/%u)\n",
701                inode, inode->i_ino, inode->i_generation);
702         ldlm_lock_set_data((struct lustre_handle*)it->it_lock_handle,
703                            inode);
704
705         EXIT;
706  out:
707         ptlrpc_req_finished(request);
708         return inode;
709 }
710
711 /*
712  * By the time this is called, we already have created the directory cache
713  * entry for the new file, but it is so far negative - it has no inode.
714  *
715  * We defer creating the OBD object(s) until open, to keep the intent and
716  * non-intent code paths similar, and also because we do not have the MDS
717  * inode number before calling ll_create_node() (which is needed for LOV),
718  * so we would need to do yet another RPC to the MDS to store the LOV EA
719  * data on the MDS.  If needed, we would pass the PACKED lmm as data and
720  * lmm_size in datalen (the MDS still has code which will handle that).
721  *
722  * If the create succeeds, we fill in the inode information
723  * with d_instantiate().
724  */
725 static int ll_create_it(struct inode *dir, struct dentry *dentry, int mode, struct lookup_intent *it)
726 {
727         struct inode *inode;
728         struct ptlrpc_request *request = it->it_data;
729         int rc = 0;
730         ENTRY;
731
732         CDEBUG(D_VFSTRACE, "VFS Op:name=%s,dir=%lu/%u(%p),intent=%s\n",
733                dentry->d_name.name, dir->i_ino, dir->i_generation, dir,
734                LL_IT2STR(it));
735
736         rc = ll_it_open_error(DISP_OPEN_CREATE, it);
737         if (rc) {
738                 ptlrpc_req_finished(request);
739                 RETURN(rc);
740         }
741
742         mdc_store_inode_generation(request, 2, 1);
743         inode = ll_create_node(dir, dentry->d_name.name, dentry->d_name.len,
744                                NULL, 0, mode, 0, it);
745         if (IS_ERR(inode)) {
746                 RETURN(PTR_ERR(inode));
747         }
748
749         d_instantiate(dentry, inode);
750         RETURN(0);
751 }
752
753 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
754 static int ll_create_nd(struct inode *dir, struct dentry *dentry, int mode, struct nameidata *nd)
755 {
756         return ll_create_it(dir, dentry, mode, &nd->it);
757 }
758 #endif
759
760 static int ll_mknod_raw(struct nameidata *nd, int mode, dev_t rdev)
761 {
762         struct inode *dir = nd->dentry->d_inode;
763         const char *name = nd->last.name;
764         int len = nd->last.len;
765         struct ptlrpc_request *request = NULL;
766         time_t time = LTIME_S(CURRENT_TIME);
767         struct ll_sb_info *sbi = ll_i2sbi(dir);
768         struct mdc_op_data op_data;
769         int err = -EMLINK;
770         ENTRY;
771
772         CDEBUG(D_VFSTRACE, "VFS Op:name=%s,dir=%lu/%u(%p)\n",
773                name, dir->i_ino, dir->i_generation, dir);
774
775         if (dir->i_nlink >= EXT2_LINK_MAX)
776                 RETURN(err);
777
778         mode &= ~current->fs->umask;
779
780         switch (mode & S_IFMT) {
781         case 0: 
782         case S_IFREG:
783                 mode |= S_IFREG; /* for mode = 0 case, fallthrough */
784         case S_IFCHR: 
785         case S_IFBLK:
786         case S_IFIFO: 
787         case S_IFSOCK:
788                 ll_prepare_mdc_op_data(&op_data, dir, NULL, name, len, 0);
789                 err = mdc_create(&sbi->ll_mdc_conn, &op_data, NULL, 0, mode,
790                                  current->fsuid, current->fsgid, time,
791                                  rdev, &request);
792                 ptlrpc_req_finished(request);
793                 break;
794         case S_IFDIR:
795                 err = -EPERM;
796                 break;
797         default:
798                 err = -EINVAL;
799         }
800         RETURN(err);
801 }
802
803 static int ll_symlink_raw(struct nameidata *nd, const char *tgt)
804 {
805         struct inode *dir = nd->dentry->d_inode;
806         const char *name = nd->last.name;
807         int len = nd->last.len;
808         struct ptlrpc_request *request = NULL;
809         time_t time = LTIME_S(CURRENT_TIME);
810         struct ll_sb_info *sbi = ll_i2sbi(dir);
811         struct mdc_op_data op_data;
812         int err = -EMLINK;
813         ENTRY;
814
815         CDEBUG(D_VFSTRACE, "VFS Op:name=%s,dir=%lu/%u(%p),target=%s\n",
816                name, dir->i_ino, dir->i_generation, dir, tgt);
817
818         if (dir->i_nlink >= EXT2_LINK_MAX)
819                 RETURN(err);
820
821         ll_prepare_mdc_op_data(&op_data, dir, NULL, name, len, 0);
822         err = mdc_create(&sbi->ll_mdc_conn, &op_data,
823                          tgt, strlen(tgt) + 1, S_IFLNK | S_IRWXUGO,
824                          current->fsuid, current->fsgid, time, 0, &request);
825         ptlrpc_req_finished(request);
826         RETURN(err);
827 }
828
829 static int ll_link_raw(struct nameidata *srcnd, struct nameidata *tgtnd)
830 {
831         struct inode *src = srcnd->dentry->d_inode;
832         struct inode *dir = tgtnd->dentry->d_inode;
833         const char *name = tgtnd->last.name;
834         int len = tgtnd->last.len;
835         struct ptlrpc_request *request = NULL;
836         struct mdc_op_data op_data;
837         int err;
838         struct ll_sb_info *sbi = ll_i2sbi(dir);
839
840         ENTRY;
841         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),dir=%lu/%u(%p),target=%s\n",
842                src->i_ino, src->i_generation, src,
843                dir->i_ino, dir->i_generation, dir, name);
844
845         ll_prepare_mdc_op_data(&op_data, src, dir, name, len, 0);
846         err = mdc_link(&sbi->ll_mdc_conn, &op_data, &request);
847         ptlrpc_req_finished(request);
848
849         RETURN(err);
850 }
851
852
853 static int ll_mkdir_raw(struct nameidata *nd, int mode)
854 {
855         struct inode *dir = nd->dentry->d_inode;
856         const char *name = nd->last.name;
857         int len = nd->last.len;
858         struct ptlrpc_request *request = NULL;
859         time_t time = LTIME_S(CURRENT_TIME);
860         struct ll_sb_info *sbi = ll_i2sbi(dir);
861         struct mdc_op_data op_data;
862         int err = -EMLINK;
863         ENTRY;
864         CDEBUG(D_VFSTRACE, "VFS Op:name=%s,dir=%lu/%u(%p)\n",
865                name, dir->i_ino, dir->i_generation, dir);
866
867         if (dir->i_nlink >= EXT2_LINK_MAX)
868                 RETURN(err);
869
870         mode = (mode & (S_IRWXUGO|S_ISVTX) & ~current->fs->umask) | S_IFDIR;
871         ll_prepare_mdc_op_data(&op_data, dir, NULL, name, len, 0);
872         err = mdc_create(&sbi->ll_mdc_conn, &op_data, NULL, 0, mode,
873                          current->fsuid, current->fsgid, time, 0, &request);
874         ptlrpc_req_finished(request);
875         RETURN(err);
876 }
877
878 static int ll_rmdir_raw(struct nameidata *nd)
879 {
880         struct inode *dir = nd->dentry->d_inode;
881         const char *name = nd->last.name;
882         int len = nd->last.len;
883         int rc;
884         ENTRY;
885         CDEBUG(D_VFSTRACE, "VFS Op:name=%s,dir=%lu/%u(%p)\n",
886                name, dir->i_ino, dir->i_generation, dir);
887
888         rc = ll_mdc_unlink(dir, NULL, S_IFDIR, name, len);
889         RETURN(rc);
890 }
891
892 static int ll_unlink_raw(struct nameidata *nd)
893 {
894         struct inode *dir = nd->dentry->d_inode;
895         const char *name = nd->last.name;
896         int len = nd->last.len;
897         int rc;
898         ENTRY;
899         CDEBUG(D_VFSTRACE, "VFS Op:name=%s,dir=%lu/%u(%p)\n",
900                name, dir->i_ino, dir->i_generation, dir);
901
902         rc = ll_mdc_unlink(dir, NULL, S_IFREG, name, len);
903         RETURN(rc);
904 }
905
906 static int ll_rename_raw(struct nameidata *oldnd, struct nameidata *newnd)
907 {
908         struct inode *src = oldnd->dentry->d_inode;
909         struct inode *tgt = newnd->dentry->d_inode;
910         const char *oldname = oldnd->last.name;
911         int oldlen  = oldnd->last.len;
912         const char *newname = newnd->last.name;
913         int newlen  = newnd->last.len;
914         struct ptlrpc_request *request = NULL;
915         struct ll_sb_info *sbi = ll_i2sbi(src);
916         struct mdc_op_data op_data;
917         int err;
918         ENTRY;
919         CDEBUG(D_VFSTRACE, "VFS Op:oldname=%s,src_dir=%lu/%u(%p),newname=%s,"
920                "tgt_dir=%lu/%u(%p)\n", oldname, src->i_ino, src->i_generation,
921                src, newname, tgt->i_ino, tgt->i_generation, tgt);
922
923         ll_prepare_mdc_op_data(&op_data, src, tgt, NULL, 0, 0);
924         err = mdc_rename(&sbi->ll_mdc_conn, &op_data,
925                          oldname, oldlen, newname, newlen, &request);
926         ptlrpc_req_finished(request);
927
928         RETURN(err);
929 }
930
931 struct inode_operations ll_dir_inode_operations = {
932         link_raw:           ll_link_raw,
933         unlink_raw:         ll_unlink_raw,
934         symlink_raw:        ll_symlink_raw,
935         mkdir_raw:          ll_mkdir_raw,
936         rmdir_raw:          ll_rmdir_raw,
937         mknod_raw:          ll_mknod_raw,
938         rename_raw:         ll_rename_raw,
939         setattr:         ll_setattr,
940         setattr_raw:     ll_setattr_raw,
941 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
942         create_it:          ll_create_it,
943         lookup_it:            ll_lookup_it,
944         revalidate_it:      ll_inode_revalidate_it,
945 #else
946         lookup_it:          ll_lookup_nd,
947         create_nd:          ll_create_nd,
948         getattr_it:         ll_getattr,
949 #endif
950 };