Whamcloud - gitweb
b=1090
[fs/lustre-release.git] / lustre / llite / namei.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  *
21  *  derived in small part from linux/fs/ext2/namei.c
22  *
23  *  Copyright (C) 1991, 1992  Linus Torvalds
24  *
25  *  Big-endian to little-endian byte-swapping/bitmaps by
26  *        David S. Miller (davem@caip.rutgers.edu), 1995
27  *  Directory entry file type support and forward compatibility hooks
28  *      for B-tree directories by Theodore Ts'o (tytso@mit.edu), 1998
29  */
30
31 #include <linux/fs.h>
32 #include <linux/sched.h>
33 #include <linux/mm.h>
34 #include <linux/smp_lock.h>
35 #include <linux/quotaops.h>
36 #include <linux/highmem.h>
37 #include <linux/pagemap.h>
38
39 #define DEBUG_SUBSYSTEM S_LLITE
40
41 #include <linux/obd_support.h>
42 #include <linux/lustre_lite.h>
43 #include <linux/lustre_dlm.h>
44 #include "llite_internal.h"
45
46 /* methods */
47
48 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
49 static int ll_test_inode(struct inode *inode, unsigned long ino, void *opaque)
50 #else
51 static int ll_test_inode(struct inode *inode, void *opaque)
52 #endif
53 {
54         struct lustre_md *md = opaque;
55
56         if (!(md->body->valid & (OBD_MD_FLGENER | OBD_MD_FLID)))
57                 CERROR("invalid generation\n");
58         CDEBUG(D_VFSTRACE, "comparing inode %p ino %lu/%u to body %u/%u\n",
59                inode, inode->i_ino, inode->i_generation, 
60                md->body->ino, md->body->generation);
61
62         if (inode->i_generation != md->body->generation)
63                 return 0;
64
65         /* Apply the attributes in 'opaque' to this inode */
66         ll_update_inode(inode, md->body, md->lsm);
67         return 1;
68 }
69
70 extern struct dentry_operations ll_d_ops;
71
72 int ll_unlock(__u32 mode, struct lustre_handle *lockh)
73 {
74         ENTRY;
75
76         ldlm_lock_decref(lockh, mode);
77
78         RETURN(0);
79 }
80
81 /* Get an inode by inode number (already instantiated by the intent lookup).
82  * Returns inode or NULL
83  */
84 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
85 int ll_set_inode(struct inode *inode, void *opaque)
86 {
87         ll_read_inode2(inode, opaque);
88         return 0;
89 }
90 struct inode *ll_iget(struct super_block *sb, ino_t hash,
91                       struct lustre_md *md)
92 {
93         struct inode *inode;
94
95         LASSERT(hash != 0);
96         inode = iget5_locked(sb, hash, ll_test_inode, ll_set_inode, md);
97
98         if (!inode)
99                 return (NULL);              /* removed ERR_PTR(-ENOMEM) -eeb */
100
101         if (inode->i_state & I_NEW)
102                 unlock_new_inode(inode);
103
104         // XXX Coda always fills inodes, should Lustre?
105         return inode;
106 }
107 #else
108 struct inode *ll_iget(struct super_block *sb, ino_t hash,
109                       struct lustre_md *md)
110 {
111         struct inode *inode;
112         LASSERT(hash != 0);
113         inode = iget4(sb, hash, ll_test_inode, md);
114         CDEBUG(D_VFSTRACE, "inode: %lu/%u(%p)\n", inode->i_ino,
115                inode->i_generation, inode);
116         return inode;
117 }
118 #endif
119
120 static int ll_intent_to_lock_mode(struct lookup_intent *it)
121 {
122         /* CREAT needs to be tested before open (both could be set) */
123         if (it->it_op & IT_CREAT)
124                 return LCK_PW;
125         else if (it->it_op & (IT_READDIR | IT_GETATTR | IT_OPEN | IT_LOOKUP))
126                 return LCK_PR;
127
128         LBUG();
129         RETURN(-EINVAL);
130 }
131
132 int ll_it_open_error(int phase, struct lookup_intent *it)
133 {
134         if (it_disposition(it, DISP_OPEN_OPEN)) {
135                 if (phase == DISP_OPEN_OPEN)
136                         return it->it_status;
137                 else
138                         return 0;
139         }
140
141         if (it_disposition(it, DISP_OPEN_CREATE)) {
142                 if (phase == DISP_OPEN_CREATE)
143                         return it->it_status;
144                 else
145                         return 0;
146         }
147
148         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
149                 if (phase == DISP_LOOKUP_EXECD)
150                         return it->it_status;
151                 else
152                         return 0;
153         }
154         CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
155         LBUG();
156         return 0;
157 }
158
159 int ll_mdc_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
160                         void *data, int flag)
161 {
162         int rc;
163         struct lustre_handle lockh;
164         struct inode *inode = lock->l_data;
165         ENTRY;
166
167         switch (flag) {
168         case LDLM_CB_BLOCKING:
169                 ldlm_lock2handle(lock, &lockh);
170                 rc = ldlm_cli_cancel(&lockh);
171                 if (rc < 0) {
172                         CDEBUG(D_INODE, "ldlm_cli_cancel: %d\n", rc);
173                         RETURN(rc);
174                 }
175                 break;
176         case LDLM_CB_CANCELING: {
177                 /* Invalidate all dentries associated with this inode */
178                 if (inode == NULL)
179                         break;
180                 if (lock->l_resource->lr_name.name[0] != inode->i_ino ||
181                     lock->l_resource->lr_name.name[1] != inode->i_generation) {
182                         LDLM_ERROR(lock, "data mismatch with ino %lu/%u",
183                                    inode->i_ino, inode->i_generation);
184                 }
185                 if (S_ISDIR(inode->i_mode)) {
186                         CDEBUG(D_INODE, "invalidating inode %lu\n",
187                                inode->i_ino);
188
189                         ll_invalidate_inode_pages(inode);
190                 }
191
192 #warning FIXME: we should probably free this inode if there are no aliases
193                 if (inode->i_sb->s_root &&
194                     inode != inode->i_sb->s_root->d_inode)
195                         ll_unhash_aliases(inode);
196                 break;
197         }
198         default:
199                 LBUG();
200         }
201
202         RETURN(0);
203 }
204
205 int ll_mdc_cancel_unused(struct lustre_handle *conn, struct inode *inode,
206                          int flags, void *opaque)
207 {
208         struct ldlm_res_id res_id =
209                 { .name = {inode->i_ino, inode->i_generation} };
210         struct obd_device *obddev = class_conn2obd(conn);
211         ENTRY;
212         RETURN(ldlm_cli_cancel_unused(obddev->obd_namespace, &res_id, flags,
213                                       opaque));
214 }
215
216 void ll_prepare_mdc_op_data(struct mdc_op_data *data,
217                             struct inode *i1,
218                             struct inode *i2,
219                             const char *name,
220                             int namelen,
221                             int mode)
222 {
223         LASSERT(i1);
224
225         data->ino1 = i1->i_ino;
226         data->gen1 = i1->i_generation;
227         data->typ1 = i1->i_mode & S_IFMT;
228         data->gid1 = i1->i_gid;
229
230         if (i2) {
231                 data->ino2 = i2->i_ino;
232                 data->gen2 = i2->i_generation;
233                 data->typ2 = i2->i_mode & S_IFMT;
234                 data->gid2 = i2->i_gid;
235         } else {
236                 data->ino2 = 0;
237         }
238
239         data->name = name;
240         data->namelen = namelen;
241         data->mode = mode;
242 }
243
244 /* 
245  *This long block is all about fixing up the local state so that it is
246  *correct as of the moment _before_ the operation was applied; that
247  *way, the VFS will think that everything is normal and call Lustre's
248  *regular VFS methods.
249  *
250  * If we're performing a creation, that means that unless the creation
251  * failed with EEXIST, we should fake up a negative dentry.
252  *
253  * For everything else, we want to lookup to succeed.
254  *
255  * One additional note: if CREATE or OPEN succeeded, we add an extra
256  * reference to the request because we need to keep it around until
257  * ll_create/ll_open gets called.
258  *
259  * The server will return to us, in it_disposition, an indication of
260  * exactly what it_status refers to.
261  *
262  * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
263  * otherwise if DISP_OPEN_CREATE is set, then it status is the
264  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
265  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
266  * was successful.
267  *
268  * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the child
269  * lookup.
270  */
271 int ll_intent_lock(struct inode *parent, struct dentry **de,
272                    struct lookup_intent *it, int flags, intent_finish_cb intent_finish)
273 {
274         struct dentry *dentry = *de;
275         struct inode *inode = dentry->d_inode;
276         struct ll_sb_info *sbi = ll_i2sbi(parent);
277         struct lustre_handle lockh;
278         struct lookup_intent lookup_it = { .it_op = IT_LOOKUP };
279         struct ptlrpc_request *request;
280         int rc = 0;
281         struct mds_body *mds_body;
282         int mode;
283         obd_id ino = 0;
284         ENTRY;
285
286 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
287         if (it && it->it_magic != INTENT_MAGIC) { 
288                 CERROR("WARNING: uninitialized intent\n");
289                 LBUG();
290                 intent_init(it, IT_LOOKUP, 0);
291         }
292         if (it->it_op == IT_GETATTR || 
293             it->it_op == 0)
294                 it->it_op = IT_LOOKUP;
295         
296 #endif
297         if (!it ||it->it_op == IT_GETXATTR)
298                 it = &lookup_it;
299
300         it->it_op_release = ll_intent_release;
301
302         CDEBUG(D_DLMTRACE, "name: %*s, intent: %s\n", dentry->d_name.len,
303                dentry->d_name.name, ldlm_it2str(it->it_op));
304         
305         if (dentry->d_name.len > EXT2_NAME_LEN)
306                 RETURN(-ENAMETOOLONG);
307
308         /* This function may be called twice, we only once want to
309            execute the request associated with the intent. If it was
310            done already, we skip past this and use the results. */ 
311         if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
312                 struct mdc_op_data op_data;
313
314                 ll_prepare_mdc_op_data(&op_data, parent, dentry->d_inode,
315                                        dentry->d_name.name, dentry->d_name.len,
316                                        0);
317
318                 rc = mdc_enqueue(&sbi->ll_mdc_conn, LDLM_PLAIN, it,
319                                  ll_intent_to_lock_mode(it), &op_data,
320                                  &lockh, NULL, 0, ldlm_completion_ast,
321                                  ll_mdc_blocking_ast, NULL);
322                 if (rc < 0)
323                         RETURN(rc);
324                 memcpy(it->it_lock_handle, &lockh, sizeof(lockh));
325         }
326         request = it->it_data;
327         LASSERT(request != NULL);
328
329         if (!it_disposition(it, DISP_IT_EXECD)) {
330                 /* The server failed before it even started executing the
331                  * intent, i.e. because it couldn't unpack the request. */
332                 LASSERT(it->it_status != 0);
333                 GOTO(drop_req, rc = it->it_status);
334         }
335
336         mds_body = lustre_msg_buf(request->rq_repmsg, 1, sizeof(*mds_body));
337         LASSERT(mds_body != NULL);           /* mdc_enqueue checked */
338         LASSERT_REPSWABBED(request, 1); /* mdc_enqueue swabbed */
339
340         /* XXX everything with fids please, no ino's inode's etc */
341         ino = mds_body->fid1.id;
342         mode = mds_body->mode;
343
344         /*We were called from revalidate2: did we find the same inode?*/
345         if (inode && 
346             (ino != inode->i_ino ||
347              mds_body->fid1.generation != inode->i_generation)) {
348                 it_set_disposition(it, DISP_ENQ_COMPLETE);
349                 RETURN(-ESTALE);
350         }
351
352         /* If we're doing an IT_OPEN which did not result in an actual
353          * successful open, then we need to remove the bit which saves
354          * this request for unconditional replay. */
355         if (it->it_op & IT_OPEN) {
356                 if (!it_disposition(it, DISP_OPEN_OPEN) ||
357                     it->it_status != 0) {
358                         unsigned long flags;
359                 
360                         spin_lock_irqsave (&request->rq_lock, flags);
361                         request->rq_replay = 0;
362                         spin_unlock_irqrestore (&request->rq_lock, flags);
363                 }
364         }
365
366         rc = ll_it_open_error(DISP_LOOKUP_EXECD, it);
367         if (rc)
368                 GOTO(drop_req, rc);
369         
370         /* keep requests around for the multiple phases of the call
371          * this shows the DISP_XX must guarantee we make it into the call 
372          */ 
373         if (it_disposition(it, DISP_OPEN_CREATE))
374                 ptlrpc_request_addref(request);
375         if (it_disposition(it, DISP_OPEN_OPEN))
376                 ptlrpc_request_addref(request);
377         
378         if (it->it_op & IT_CREAT) {
379                 /* XXX this belongs in ll_create_iit */
380         } else if (it->it_op == IT_OPEN) {
381                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
382         } else 
383                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
384
385         if (intent_finish != NULL) {
386                 struct lustre_handle old_lock;
387                 struct ldlm_lock *lock;
388
389                 rc = intent_finish(request, parent, de, it, 1, ino);
390                 dentry = *de; /* intent_finish may change *de */
391                 inode = dentry->d_inode;
392                 if (rc != 0)
393                         GOTO(drop_lock, rc);
394
395                 /* The intent processing may well have given us a lock different
396                  * from the one we requested.  If we already have a matching
397                  * lock, then cancel the new one.  (We have to do this here,
398                  * instead of in mdc_enqueue, because we need to use the child's
399                  * inode as the l_data to match, and that's not available until
400                  * intent_finish has performed the iget().) */
401                 lock = ldlm_handle2lock(&lockh);
402                 if (lock) {
403                         LDLM_DEBUG(lock, "matching against this");
404                         LDLM_LOCK_PUT(lock);
405                         memcpy(&old_lock, &lockh, sizeof(lockh));
406                         if (ldlm_lock_match(NULL,
407                                             LDLM_FL_BLOCK_GRANTED |
408                                             LDLM_FL_MATCH_DATA,
409                                             NULL, LDLM_PLAIN, NULL, 0, LCK_NL,
410                                             inode, &old_lock)) {
411                                 ldlm_lock_decref_and_cancel(&lockh,
412                                                             it->it_lock_mode);
413                                 memcpy(&lockh, &old_lock, sizeof(old_lock));
414                                 memcpy(it->it_lock_handle, &lockh,
415                                        sizeof(lockh));
416                         }
417                 }
418
419         }
420         ptlrpc_req_finished(request);
421
422         CDEBUG(D_DENTRY, "D_IT dentry %p intent: %s status %d disp %x\n",
423                dentry, ldlm_it2str(it->it_op), it->it_status, it->it_disposition);
424         
425         /* drop IT_LOOKUP locks */
426         if (it->it_op == IT_LOOKUP)
427                 ll_intent_release(it);
428         RETURN(rc);
429
430  drop_lock:
431         ll_intent_release(it);
432  drop_req:
433         ptlrpc_req_finished(request);
434         RETURN(rc);
435 }
436
437 /* Search "inode"'s alias list for a dentry that has the same name and parent as
438  * de.  If found, return it.  If not found, return de. */
439 struct dentry *ll_find_alias(struct inode *inode, struct dentry *de)
440 {
441         struct list_head *tmp;
442
443         spin_lock(&dcache_lock);
444         list_for_each(tmp, &inode->i_dentry) {
445                 struct dentry *dentry = list_entry(tmp, struct dentry, d_alias);
446
447                 /* We are called here with 'de' already on the aliases list. */
448                 if (dentry == de) {
449                         CERROR("whoops\n");
450                         continue;
451                 }
452
453                 if (dentry->d_parent != de->d_parent)
454                         continue;
455
456                 if (dentry->d_name.len != de->d_name.len)
457                         continue;
458
459                 if (memcmp(dentry->d_name.name, de->d_name.name,
460                            de->d_name.len) != 0)
461                         continue;
462
463                 if (!list_empty(&dentry->d_lru))
464                         list_del_init(&dentry->d_lru);
465
466                 hlist_del_init(&dentry->d_hash);
467                 __d_rehash(dentry, 0); /* avoid taking dcache_lock inside */
468                 spin_unlock(&dcache_lock);
469                 atomic_inc(&dentry->d_count);
470                 iput(inode);
471                 dentry->d_flags &= ~DCACHE_LUSTRE_INVALID;
472                 return dentry;
473         }
474
475         spin_unlock(&dcache_lock);
476
477         return de;
478 }
479
480 static int
481 lookup2_finish(struct ptlrpc_request *request,
482                struct inode *parent, struct dentry **de,
483                struct lookup_intent *it, int offset, obd_id ino)
484 {
485         struct ll_sb_info *sbi = ll_i2sbi(parent);
486         struct dentry *dentry = *de, *saved = *de;
487         struct inode *inode = NULL;
488         int rc;
489
490         /* NB 1 request reference will be taken away by ll_intent_lock()
491          * when I return */
492         if (!it_disposition(it, DISP_LOOKUP_NEG)) {
493                 struct lustre_md md;
494                 ENTRY;
495
496                 rc =mdc_req2lustre_md(request, offset, &sbi->ll_osc_conn, &md);
497                 if (rc) 
498                         RETURN(rc);
499
500                 inode = ll_iget(dentry->d_sb, ino, &md);
501                 if (!inode) {
502                         /* free the lsm if we allocated one above */
503                         if (md.lsm != NULL)
504                                 obd_free_memmd(&sbi->ll_osc_conn, &md.lsm);
505                         RETURN(-ENOMEM);
506                 } else if (md.lsm != NULL &&
507                            ll_i2info(inode)->lli_smd != md.lsm) {
508                         obd_free_memmd(&sbi->ll_osc_conn, &md.lsm);
509                 }
510
511                 /* If this is a stat, get the authoritative file size */
512                 if (it->it_op == IT_GETATTR && S_ISREG(inode->i_mode) &&
513                     ll_i2info(inode)->lli_smd != NULL) {
514                         struct ldlm_extent extent = {0, OBD_OBJECT_EOF};
515                         struct lustre_handle lockh = {0};
516                         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
517                         ldlm_error_t rc;
518
519                         LASSERT(lsm->lsm_object_id != 0);
520
521                         rc = ll_extent_lock(NULL, inode, lsm, LCK_PR, &extent,
522                                             &lockh);
523                         if (rc != ELDLM_OK) {
524                                 iput(inode);
525                                 RETURN(-EIO);
526                         }
527                         ll_extent_unlock(NULL, inode, lsm, LCK_PR, &lockh);
528                 }
529
530                 dentry = *de = ll_find_alias(inode, dentry);
531
532                 /* We asked for a lock on the directory, and may have been
533                  * granted a lock on the inode.  Just in case, fixup the data
534                  * pointer. */
535                 CDEBUG(D_DLMTRACE, "setting l_data to inode %p (%lu/%u)\n",
536                        inode, inode->i_ino, inode->i_generation);
537                 ldlm_lock_set_data((struct lustre_handle*)it->it_lock_handle,
538                                    inode);
539         } else {
540                 ENTRY;
541         }
542
543         dentry->d_op = &ll_d_ops;
544         ll_set_dd(dentry);
545
546         if (dentry == saved)
547                 d_add(dentry, inode);
548
549         RETURN(0);
550 }
551
552 static struct dentry *ll_lookup_it(struct inode *parent, struct dentry *dentry,
553                                    struct lookup_intent *it, int flags)
554 {
555         struct dentry *save = dentry, *retval;
556         int rc;
557         ENTRY;
558
559         CDEBUG(D_VFSTRACE, "VFS Op:name=%s,dir=%lu/%u(%p),intent=%s\n",
560                dentry->d_name.name, parent->i_ino, parent->i_generation,
561                parent, LL_IT2STR(it));
562
563         if (d_mountpoint(dentry)) { 
564                 CERROR("Tell Peter, lookup on mtpt, it %s\n", LL_IT2STR(it));
565         }
566
567         rc = ll_intent_lock(parent, &dentry, it, flags, lookup2_finish);
568         if (rc < 0) {
569                 CDEBUG(D_INFO, "ll_intent_lock: %d\n", rc);
570                 GOTO(out, retval = ERR_PTR(rc));
571         }
572
573         if (dentry == save)
574                 GOTO(out, retval = NULL);
575         else
576                 GOTO(out, retval = dentry);
577  out:
578         return retval;
579 }
580
581 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
582 static struct dentry *ll_lookup_nd(struct inode *parent, struct dentry *dentry, 
583                                    struct nameidata *nd)
584 {
585         struct dentry *de;
586         ENTRY;
587
588         if (nd->flags & LOOKUP_LAST && !(nd->flags & LOOKUP_LINK_NOTLAST))
589                 de = ll_lookup_it(parent, dentry, &nd->it, nd->flags);
590         else 
591                 de = ll_lookup_it(parent, dentry, NULL, 0);
592
593         RETURN(de);
594 }
595 #endif
596
597 static int ll_mdc_unlink(struct inode *dir, struct inode *child, __u32 mode,
598                          const char *name, int len)
599 {
600         struct ptlrpc_request *request = NULL;
601         struct mds_body *body;
602         struct lov_mds_md *eadata;
603         struct lov_stripe_md *lsm = NULL;
604         struct obd_trans_info oti = { 0 };
605         struct mdc_op_data op_data;
606         struct obdo *oa;
607         int rc;
608         ENTRY;
609
610         ll_prepare_mdc_op_data(&op_data, dir, child, name, len, mode);
611         rc = mdc_unlink(&ll_i2sbi(dir)->ll_mdc_conn, &op_data, &request);
612         if (rc)
613                 GOTO(out, rc);
614         /* req is swabbed so this is safe */
615         body = lustre_msg_buf(request->rq_repmsg, 0, sizeof(*body));
616
617         if (!(body->valid & OBD_MD_FLEASIZE))
618                 GOTO(out, rc = 0);
619
620         if (body->eadatasize == 0) {
621                 CERROR("OBD_MD_FLEASIZE set but eadatasize zero\n");
622                 GOTO(out, rc = -EPROTO);
623         }
624
625         /* The MDS sent back the EA because we unlinked the last reference
626          * to this file. Use this EA to unlink the objects on the OST.
627          * It's opaque so we don't swab here; we leave it to obd_unpackmd() to
628          * check it is complete and sensible. */
629         eadata = lustre_swab_repbuf(request, 1, body->eadatasize, NULL);
630         LASSERT(eadata != NULL);
631         if (eadata == NULL) {
632                 CERROR("Can't unpack MDS EA data\n");
633                 GOTO(out, rc = -EPROTO);
634         }
635
636         rc = obd_unpackmd(ll_i2obdconn(dir), &lsm, eadata, body->eadatasize);
637         if (rc < 0) {
638                 CERROR("obd_unpackmd: %d\n", rc);
639                 GOTO(out, rc);
640         }
641         LASSERT(rc >= sizeof(*lsm));
642
643         oa = obdo_alloc();
644         if (oa == NULL)
645                 GOTO(out_free_memmd, rc = -ENOMEM);
646
647         oa->o_id = lsm->lsm_object_id;
648         oa->o_mode = body->mode & S_IFMT;
649         oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE;
650
651         if (body->valid & OBD_MD_FLCOOKIE) {
652                 oa->o_valid |= OBD_MD_FLCOOKIE;
653                 oti.oti_logcookies = lustre_msg_buf(request->rq_repmsg, 3,
654                                                     body->eadatasize);
655         }
656
657         rc = obd_destroy(ll_i2obdconn(dir), oa, lsm, &oti);
658         obdo_free(oa);
659         if (rc)
660                 CERROR("obd destroy objid 0x"LPX64" error %d\n",
661                        lsm->lsm_object_id, rc);
662  out_free_memmd:
663         obd_free_memmd(ll_i2obdconn(dir), &lsm);
664  out:
665         ptlrpc_req_finished(request);
666         return rc;
667 }
668
669 /* We depend on "mode" being set with the proper file type/umask by now */
670 static struct inode *ll_create_node(struct inode *dir, const char *name,
671                                     int namelen, const void *data, int datalen,
672                                     int mode, __u64 extra,
673                                     struct lookup_intent *it)
674 {
675         struct inode *inode;
676         struct ptlrpc_request *request = NULL;
677         struct ll_sb_info *sbi = ll_i2sbi(dir);
678         struct lustre_md md;
679         int rc;
680         ENTRY;
681
682         LASSERT(it && it->it_disposition);
683
684         ll_invalidate_inode_pages(dir);
685
686         request = it->it_data;
687         rc = mdc_req2lustre_md(request, 1, &sbi->ll_osc_conn, &md);
688         if (rc) { 
689                 GOTO(out, inode = ERR_PTR(rc));
690         }
691
692         inode = ll_iget(dir->i_sb, md.body->ino, &md);
693         if (!inode || is_bad_inode(inode)) {
694                 /* XXX might need iput() for bad inode */
695                 int rc = -EIO;
696                 CERROR("new_inode -fatal: rc %d\n", rc);
697                 LBUG();
698                 GOTO(out, rc);
699         }
700         LASSERT(list_empty(&inode->i_dentry));
701
702         CDEBUG(D_DLMTRACE, "setting l_data to inode %p (%lu/%u)\n",
703                inode, inode->i_ino, inode->i_generation);
704         ldlm_lock_set_data((struct lustre_handle*)it->it_lock_handle,
705                            inode);
706
707         EXIT;
708  out:
709         ptlrpc_req_finished(request);
710         return inode;
711 }
712
713 /*
714  * By the time this is called, we already have created the directory cache
715  * entry for the new file, but it is so far negative - it has no inode.
716  *
717  * We defer creating the OBD object(s) until open, to keep the intent and
718  * non-intent code paths similar, and also because we do not have the MDS
719  * inode number before calling ll_create_node() (which is needed for LOV),
720  * so we would need to do yet another RPC to the MDS to store the LOV EA
721  * data on the MDS.  If needed, we would pass the PACKED lmm as data and
722  * lmm_size in datalen (the MDS still has code which will handle that).
723  *
724  * If the create succeeds, we fill in the inode information
725  * with d_instantiate().
726  */
727 static int ll_create_it(struct inode *dir, struct dentry *dentry, int mode, struct lookup_intent *it)
728 {
729         struct inode *inode;
730         struct ptlrpc_request *request = it->it_data;
731         int rc = 0;
732         ENTRY;
733
734         CDEBUG(D_VFSTRACE, "VFS Op:name=%s,dir=%lu/%u(%p),intent=%s\n",
735                dentry->d_name.name, dir->i_ino, dir->i_generation, dir,
736                LL_IT2STR(it));
737
738         rc = ll_it_open_error(DISP_OPEN_CREATE, it);
739         if (rc) {
740                 ptlrpc_req_finished(request);
741                 RETURN(rc);
742         }
743
744         mdc_store_inode_generation(request, 2, 1);
745         inode = ll_create_node(dir, dentry->d_name.name, dentry->d_name.len,
746                                NULL, 0, mode, 0, it);
747         if (IS_ERR(inode)) {
748                 RETURN(PTR_ERR(inode));
749         }
750
751         d_instantiate(dentry, inode);
752         RETURN(0);
753 }
754
755 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
756 static int ll_create_nd(struct inode *dir, struct dentry *dentry, int mode, struct nameidata *nd)
757 {
758         return ll_create_it(dir, dentry, mode, &nd->it);
759 }
760 #endif
761
762 static int ll_mknod_raw(struct nameidata *nd, int mode, dev_t rdev)
763 {
764         struct inode *dir = nd->dentry->d_inode;
765         const char *name = nd->last.name;
766         int len = nd->last.len;
767         struct ptlrpc_request *request = NULL;
768         time_t time = LTIME_S(CURRENT_TIME);
769         struct ll_sb_info *sbi = ll_i2sbi(dir);
770         struct mdc_op_data op_data;
771         int err = -EMLINK;
772         ENTRY;
773
774         CDEBUG(D_VFSTRACE, "VFS Op:name=%s,dir=%lu/%u(%p)\n",
775                name, dir->i_ino, dir->i_generation, dir);
776
777         if (dir->i_nlink >= EXT2_LINK_MAX)
778                 RETURN(err);
779
780         mode &= ~current->fs->umask;
781
782         switch (mode & S_IFMT) {
783         case 0: 
784         case S_IFREG:
785                 mode |= S_IFREG; /* for mode = 0 case, fallthrough */
786         case S_IFCHR: 
787         case S_IFBLK:
788         case S_IFIFO: 
789         case S_IFSOCK:
790                 ll_prepare_mdc_op_data(&op_data, dir, NULL, name, len, 0);
791                 err = mdc_create(&sbi->ll_mdc_conn, &op_data, NULL, 0, mode,
792                                  current->fsuid, current->fsgid, time,
793                                  rdev, &request);
794                 ptlrpc_req_finished(request);
795                 break;
796         case S_IFDIR:
797                 err = -EPERM;
798                 break;
799         default:
800                 err = -EINVAL;
801         }
802         RETURN(err);
803 }
804
805 static int ll_symlink_raw(struct nameidata *nd, const char *tgt)
806 {
807         struct inode *dir = nd->dentry->d_inode;
808         const char *name = nd->last.name;
809         int len = nd->last.len;
810         struct ptlrpc_request *request = NULL;
811         time_t time = LTIME_S(CURRENT_TIME);
812         struct ll_sb_info *sbi = ll_i2sbi(dir);
813         struct mdc_op_data op_data;
814         int err = -EMLINK;
815         ENTRY;
816
817         CDEBUG(D_VFSTRACE, "VFS Op:name=%s,dir=%lu/%u(%p),target=%s\n",
818                name, dir->i_ino, dir->i_generation, dir, tgt);
819
820         if (dir->i_nlink >= EXT2_LINK_MAX)
821                 RETURN(err);
822
823         ll_prepare_mdc_op_data(&op_data, dir, NULL, name, len, 0);
824         err = mdc_create(&sbi->ll_mdc_conn, &op_data,
825                          tgt, strlen(tgt) + 1, S_IFLNK | S_IRWXUGO,
826                          current->fsuid, current->fsgid, time, 0, &request);
827         ptlrpc_req_finished(request);
828         RETURN(err);
829 }
830
831 static int ll_link_raw(struct nameidata *srcnd, struct nameidata *tgtnd)
832 {
833         struct inode *src = srcnd->dentry->d_inode;
834         struct inode *dir = tgtnd->dentry->d_inode;
835         const char *name = tgtnd->last.name;
836         int len = tgtnd->last.len;
837         struct ptlrpc_request *request = NULL;
838         struct mdc_op_data op_data;
839         int err;
840         struct ll_sb_info *sbi = ll_i2sbi(dir);
841
842         ENTRY;
843         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),dir=%lu/%u(%p),target=%s\n",
844                src->i_ino, src->i_generation, src,
845                dir->i_ino, dir->i_generation, dir, name);
846
847         ll_prepare_mdc_op_data(&op_data, src, dir, name, len, 0);
848         err = mdc_link(&sbi->ll_mdc_conn, &op_data, &request);
849         ptlrpc_req_finished(request);
850
851         RETURN(err);
852 }
853
854
855 static int ll_mkdir_raw(struct nameidata *nd, int mode)
856 {
857         struct inode *dir = nd->dentry->d_inode;
858         const char *name = nd->last.name;
859         int len = nd->last.len;
860         struct ptlrpc_request *request = NULL;
861         time_t time = LTIME_S(CURRENT_TIME);
862         struct ll_sb_info *sbi = ll_i2sbi(dir);
863         struct mdc_op_data op_data;
864         int err = -EMLINK;
865         ENTRY;
866         CDEBUG(D_VFSTRACE, "VFS Op:name=%s,dir=%lu/%u(%p)\n",
867                name, dir->i_ino, dir->i_generation, dir);
868
869         if (dir->i_nlink >= EXT2_LINK_MAX)
870                 RETURN(err);
871
872         mode = (mode & (S_IRWXUGO|S_ISVTX) & ~current->fs->umask) | S_IFDIR;
873         ll_prepare_mdc_op_data(&op_data, dir, NULL, name, len, 0);
874         err = mdc_create(&sbi->ll_mdc_conn, &op_data, NULL, 0, mode,
875                          current->fsuid, current->fsgid, time, 0, &request);
876         ptlrpc_req_finished(request);
877         RETURN(err);
878 }
879
880 static int ll_rmdir_raw(struct nameidata *nd)
881 {
882         struct inode *dir = nd->dentry->d_inode;
883         const char *name = nd->last.name;
884         int len = nd->last.len;
885         int rc;
886         ENTRY;
887         CDEBUG(D_VFSTRACE, "VFS Op:name=%s,dir=%lu/%u(%p)\n",
888                name, dir->i_ino, dir->i_generation, dir);
889
890         rc = ll_mdc_unlink(dir, NULL, S_IFDIR, name, len);
891         RETURN(rc);
892 }
893
894 static int ll_unlink_raw(struct nameidata *nd)
895 {
896         struct inode *dir = nd->dentry->d_inode;
897         const char *name = nd->last.name;
898         int len = nd->last.len;
899         int rc;
900         ENTRY;
901         CDEBUG(D_VFSTRACE, "VFS Op:name=%s,dir=%lu/%u(%p)\n",
902                name, dir->i_ino, dir->i_generation, dir);
903
904         rc = ll_mdc_unlink(dir, NULL, S_IFREG, name, len);
905         RETURN(rc);
906 }
907
908 static int ll_rename_raw(struct nameidata *oldnd, struct nameidata *newnd)
909 {
910         struct inode *src = oldnd->dentry->d_inode;
911         struct inode *tgt = newnd->dentry->d_inode;
912         const char *oldname = oldnd->last.name;
913         int oldlen  = oldnd->last.len;
914         const char *newname = newnd->last.name;
915         int newlen  = newnd->last.len;
916         struct ptlrpc_request *request = NULL;
917         struct ll_sb_info *sbi = ll_i2sbi(src);
918         struct mdc_op_data op_data;
919         int err;
920         ENTRY;
921         CDEBUG(D_VFSTRACE, "VFS Op:oldname=%s,src_dir=%lu/%u(%p),newname=%s,"
922                "tgt_dir=%lu/%u(%p)\n", oldname, src->i_ino, src->i_generation,
923                src, newname, tgt->i_ino, tgt->i_generation, tgt);
924
925         ll_prepare_mdc_op_data(&op_data, src, tgt, NULL, 0, 0);
926         err = mdc_rename(&sbi->ll_mdc_conn, &op_data,
927                          oldname, oldlen, newname, newlen, &request);
928         ptlrpc_req_finished(request);
929
930         RETURN(err);
931 }
932
933 struct inode_operations ll_dir_inode_operations = {
934         link_raw:           ll_link_raw,
935         unlink_raw:         ll_unlink_raw,
936         symlink_raw:        ll_symlink_raw,
937         mkdir_raw:          ll_mkdir_raw,
938         rmdir_raw:          ll_rmdir_raw,
939         mknod_raw:          ll_mknod_raw,
940         rename_raw:         ll_rename_raw,
941         setattr:         ll_setattr,
942         setattr_raw:     ll_setattr_raw,
943 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
944         create_it:          ll_create_it,
945         lookup_it:            ll_lookup_it,
946         revalidate_it:      ll_inode_revalidate_it,
947 #else
948         lookup_it:          ll_lookup_nd,
949         create_nd:          ll_create_nd,
950         getattr_it:         ll_getattr,
951 #endif
952 };