Whamcloud - gitweb
Fix minor error in error message (duplicate 0x prefix).
[fs/lustre-release.git] / lustre / llite / namei.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  *
21  *  derived in small part from linux/fs/ext2/namei.c
22  *
23  *  Copyright (C) 1991, 1992  Linus Torvalds
24  *
25  *  Big-endian to little-endian byte-swapping/bitmaps by
26  *        David S. Miller (davem@caip.rutgers.edu), 1995
27  *  Directory entry file type support and forward compatibility hooks
28  *      for B-tree directories by Theodore Ts'o (tytso@mit.edu), 1998
29  */
30
31 #include <linux/fs.h>
32 #include <linux/sched.h>
33 #include <linux/mm.h>
34 #include <linux/smp_lock.h>
35 #include <linux/quotaops.h>
36 #include <linux/highmem.h>
37 #include <linux/pagemap.h>
38
39 #define DEBUG_SUBSYSTEM S_LLITE
40
41 #include <linux/obd_support.h>
42 #include <linux/lustre_lite.h>
43 #include <linux/lustre_dlm.h>
44 #include "llite_internal.h"
45
46 /* methods */
47
48 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
49 static int ll_test_inode(struct inode *inode, unsigned long ino, void *opaque)
50 #else
51 static int ll_test_inode(struct inode *inode, void *opaque)
52 #endif
53 {
54         struct lustre_md *md = opaque;
55
56         if (!(md->body->valid & (OBD_MD_FLGENER | OBD_MD_FLID)))
57                 CERROR("invalid generation\n");
58         CDEBUG(D_VFSTRACE, "comparing inode %p ino %lu/%u to body %u/%u\n",
59                inode, inode->i_ino, inode->i_generation, 
60                md->body->ino, md->body->generation);
61
62         if (inode->i_generation != md->body->generation)
63                 return 0;
64
65         /* Apply the attributes in 'opaque' to this inode */
66         ll_update_inode(inode, md->body, md->lsm);
67         return 1;
68 }
69
70 extern struct dentry_operations ll_d_ops;
71
72 int ll_unlock(__u32 mode, struct lustre_handle *lockh)
73 {
74         ENTRY;
75
76         ldlm_lock_decref(lockh, mode);
77
78         RETURN(0);
79 }
80
81 /* Get an inode by inode number (already instantiated by the intent lookup).
82  * Returns inode or NULL
83  */
84 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
85 int ll_set_inode(struct inode *inode, void *opaque)
86 {
87         ll_read_inode2(inode, opaque);
88         return 0;
89 }
90 struct inode *ll_iget(struct super_block *sb, ino_t hash,
91                       struct lustre_md *md)
92 {
93         struct inode *inode;
94
95         LASSERT(hash != 0);
96         inode = iget5_locked(sb, hash, ll_test_inode, ll_set_inode, md);
97
98         if (!inode)
99                 return (NULL);              /* removed ERR_PTR(-ENOMEM) -eeb */
100
101         if (inode->i_state & I_NEW)
102                 unlock_new_inode(inode);
103
104         // XXX Coda always fills inodes, should Lustre?
105         return inode;
106 }
107 #else
108 struct inode *ll_iget(struct super_block *sb, ino_t hash,
109                       struct lustre_md *md)
110 {
111         struct inode *inode;
112         LASSERT(hash != 0);
113         inode = iget4(sb, hash, ll_test_inode, md);
114         CDEBUG(D_VFSTRACE, "inode: %lu/%u(%p)\n", inode->i_ino,
115                inode->i_generation, inode);
116         return inode;
117 }
118 #endif
119
120 static int ll_intent_to_lock_mode(struct lookup_intent *it)
121 {
122         /* CREAT needs to be tested before open (both could be set) */
123         if (it->it_op & IT_CREAT)
124                 return LCK_PW;
125         else if (it->it_op & (IT_READDIR | IT_GETATTR | IT_OPEN | IT_LOOKUP))
126                 return LCK_PR;
127
128         LBUG();
129         RETURN(-EINVAL);
130 }
131
132 int ll_it_open_error(int phase, struct lookup_intent *it)
133 {
134         if (it_disposition(it, DISP_OPEN_OPEN)) {
135                 if (phase == DISP_OPEN_OPEN)
136                         return it->it_status;
137                 else
138                         return 0;
139         }
140
141         if (it_disposition(it, DISP_OPEN_CREATE)) {
142                 if (phase == DISP_OPEN_CREATE)
143                         return it->it_status;
144                 else
145                         return 0;
146         }
147
148         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
149                 if (phase == DISP_LOOKUP_EXECD)
150                         return it->it_status;
151                 else
152                         return 0;
153         }
154
155         if (it_disposition(it, DISP_IT_EXECD)) {
156                 if (phase == DISP_IT_EXECD)
157                         return it->it_status;
158                 else
159                         return 0;
160         }
161         CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
162         LBUG();
163         return 0;
164 }
165
166 int ll_mdc_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
167                         void *data, int flag)
168 {
169         int rc;
170         struct lustre_handle lockh;
171         struct inode *inode = lock->l_data;
172         ENTRY;
173
174         switch (flag) {
175         case LDLM_CB_BLOCKING:
176                 ldlm_lock2handle(lock, &lockh);
177                 rc = ldlm_cli_cancel(&lockh);
178                 if (rc < 0) {
179                         CDEBUG(D_INODE, "ldlm_cli_cancel: %d\n", rc);
180                         RETURN(rc);
181                 }
182                 break;
183         case LDLM_CB_CANCELING: {
184                 /* Invalidate all dentries associated with this inode */
185                 if (inode == NULL)
186                         break;
187                 if (lock->l_resource->lr_name.name[0] != inode->i_ino ||
188                     lock->l_resource->lr_name.name[1] != inode->i_generation) {
189                         LDLM_ERROR(lock, "data mismatch with ino %lu/%u",
190                                    inode->i_ino, inode->i_generation);
191                 }
192                 if (S_ISDIR(inode->i_mode)) {
193                         CDEBUG(D_INODE, "invalidating inode %lu\n",
194                                inode->i_ino);
195
196                         ll_invalidate_inode_pages(inode);
197                 }
198
199 #warning FIXME: we should probably free this inode if there are no aliases
200                 if (inode->i_sb->s_root &&
201                     inode != inode->i_sb->s_root->d_inode)
202                         ll_unhash_aliases(inode);
203                 break;
204         }
205         default:
206                 LBUG();
207         }
208
209         RETURN(0);
210 }
211
212 int ll_mdc_cancel_unused(struct lustre_handle *conn, struct inode *inode,
213                          int flags, void *opaque)
214 {
215         struct ldlm_res_id res_id =
216                 { .name = {inode->i_ino, inode->i_generation} };
217         struct obd_device *obddev = class_conn2obd(conn);
218         ENTRY;
219         RETURN(ldlm_cli_cancel_unused(obddev->obd_namespace, &res_id, flags,
220                                       opaque));
221 }
222
223 void ll_prepare_mdc_op_data(struct mdc_op_data *data,
224                             struct inode *i1,
225                             struct inode *i2,
226                             const char *name,
227                             int namelen,
228                             int mode)
229 {
230         LASSERT(i1);
231
232         data->ino1 = i1->i_ino;
233         data->gen1 = i1->i_generation;
234         data->typ1 = i1->i_mode & S_IFMT;
235         data->gid1 = i1->i_gid;
236
237         if (i2) {
238                 data->ino2 = i2->i_ino;
239                 data->gen2 = i2->i_generation;
240                 data->typ2 = i2->i_mode & S_IFMT;
241                 data->gid2 = i2->i_gid;
242         } else {
243                 data->ino2 = 0;
244         }
245
246         data->name = name;
247         data->namelen = namelen;
248         data->mode = mode;
249 }
250
251 /* 
252  *This long block is all about fixing up the local state so that it is
253  *correct as of the moment _before_ the operation was applied; that
254  *way, the VFS will think that everything is normal and call Lustre's
255  *regular VFS methods.
256  *
257  * If we're performing a creation, that means that unless the creation
258  * failed with EEXIST, we should fake up a negative dentry.
259  *
260  * For everything else, we want to lookup to succeed.
261  *
262  * One additional note: if CREATE or OPEN succeeded, we add an extra
263  * reference to the request because we need to keep it around until
264  * ll_create/ll_open gets called.
265  *
266  * The server will return to us, in it_disposition, an indication of
267  * exactly what it_status refers to.
268  *
269  * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
270  * otherwise if DISP_OPEN_CREATE is set, then it status is the
271  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
272  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
273  * was successful.
274  *
275  * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the child
276  * lookup.
277  */
278 int ll_intent_lock(struct inode *parent, struct dentry **de,
279                    struct lookup_intent *it, int flags, intent_finish_cb intent_finish)
280 {
281         struct dentry *dentry = *de;
282         struct inode *inode = dentry->d_inode;
283         struct ll_sb_info *sbi = ll_i2sbi(parent);
284         struct lustre_handle lockh;
285         struct lookup_intent lookup_it = { .it_op = IT_LOOKUP };
286         struct ptlrpc_request *request;
287         int rc = 0;
288         struct mds_body *mds_body;
289         int mode;
290         obd_id ino = 0;
291         ENTRY;
292
293 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
294         if (it && it->it_magic != INTENT_MAGIC) { 
295                 CERROR("WARNING: uninitialized intent\n");
296                 LBUG();
297                 intent_init(it, IT_LOOKUP, 0);
298         }
299         if (it->it_op == IT_GETATTR || 
300             it->it_op == 0)
301                 it->it_op = IT_LOOKUP;
302         
303 #endif
304         if (!it ||it->it_op == IT_GETXATTR)
305                 it = &lookup_it;
306
307         it->it_op_release = ll_intent_release;
308
309         CDEBUG(D_DLMTRACE, "name: %*s, intent: %s\n", dentry->d_name.len,
310                dentry->d_name.name, ldlm_it2str(it->it_op));
311         
312         if (dentry->d_name.len > EXT2_NAME_LEN)
313                 RETURN(-ENAMETOOLONG);
314
315         /* This function may be called twice, we only once want to
316            execute the request associated with the intent. If it was
317            done already, we skip past this and use the results. */ 
318         if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
319                 struct mdc_op_data op_data;
320
321                 ll_prepare_mdc_op_data(&op_data, parent, dentry->d_inode,
322                                        dentry->d_name.name, dentry->d_name.len,
323                                        0);
324
325                 rc = mdc_enqueue(&sbi->ll_mdc_conn, LDLM_PLAIN, it,
326                                  ll_intent_to_lock_mode(it), &op_data,
327                                  &lockh, NULL, 0, ldlm_completion_ast,
328                                  ll_mdc_blocking_ast, NULL);
329                 if (rc < 0)
330                         RETURN(rc);
331                 memcpy(it->it_lock_handle, &lockh, sizeof(lockh));
332         }
333         request = it->it_data;
334         LASSERT(request != NULL);
335
336         if (!it_disposition(it, DISP_IT_EXECD)) {
337                 /* The server failed before it even started executing the
338                  * intent, i.e. because it couldn't unpack the request. */
339                 LASSERT(it->it_status != 0);
340                 GOTO(drop_req, rc = it->it_status);
341         }
342         rc = ll_it_open_error(DISP_IT_EXECD, it);
343         if (rc)
344                 GOTO(drop_req, rc);
345
346         mds_body = lustre_msg_buf(request->rq_repmsg, 1, sizeof(*mds_body));
347         LASSERT(mds_body != NULL);           /* mdc_enqueue checked */
348         LASSERT_REPSWABBED(request, 1); /* mdc_enqueue swabbed */
349
350         /* XXX everything with fids please, no ino's inode's etc */
351         ino = mds_body->fid1.id;
352         mode = mds_body->mode;
353
354         /*We were called from revalidate2: did we find the same inode?*/
355         if (inode && 
356             (ino != inode->i_ino ||
357              mds_body->fid1.generation != inode->i_generation)) {
358                 it_set_disposition(it, DISP_ENQ_COMPLETE);
359                 RETURN(-ESTALE);
360         }
361
362         /* If we're doing an IT_OPEN which did not result in an actual
363          * successful open, then we need to remove the bit which saves
364          * this request for unconditional replay. */
365         if (it->it_op & IT_OPEN) {
366                 if (!it_disposition(it, DISP_OPEN_OPEN) ||
367                     it->it_status != 0) {
368                         unsigned long flags;
369                 
370                         spin_lock_irqsave (&request->rq_lock, flags);
371                         request->rq_replay = 0;
372                         spin_unlock_irqrestore (&request->rq_lock, flags);
373                 }
374         }
375
376         rc = ll_it_open_error(DISP_LOOKUP_EXECD, it);
377         if (rc)
378                 GOTO(drop_req, rc);
379         
380         /* keep requests around for the multiple phases of the call
381          * this shows the DISP_XX must guarantee we make it into the call 
382          */ 
383         if (it_disposition(it, DISP_OPEN_CREATE))
384                 ptlrpc_request_addref(request);
385         if (it_disposition(it, DISP_OPEN_OPEN))
386                 ptlrpc_request_addref(request);
387         
388         if (it->it_op & IT_CREAT) {
389                 /* XXX this belongs in ll_create_iit */
390         } else if (it->it_op == IT_OPEN) {
391                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
392         } else 
393                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
394
395         if (intent_finish != NULL) {
396                 struct lustre_handle old_lock;
397                 struct ldlm_lock *lock;
398
399                 rc = intent_finish(request, parent, de, it, 1, ino);
400                 dentry = *de; /* intent_finish may change *de */
401                 inode = dentry->d_inode;
402                 if (rc != 0)
403                         GOTO(drop_lock, rc);
404
405                 /* The intent processing may well have given us a lock different
406                  * from the one we requested.  If we already have a matching
407                  * lock, then cancel the new one.  (We have to do this here,
408                  * instead of in mdc_enqueue, because we need to use the child's
409                  * inode as the l_data to match, and that's not available until
410                  * intent_finish has performed the iget().) */
411                 lock = ldlm_handle2lock(&lockh);
412                 if (lock) {
413                         LDLM_DEBUG(lock, "matching against this");
414                         LDLM_LOCK_PUT(lock);
415                         memcpy(&old_lock, &lockh, sizeof(lockh));
416                         if (ldlm_lock_match(NULL,
417                                             LDLM_FL_BLOCK_GRANTED |
418                                             LDLM_FL_MATCH_DATA,
419                                             NULL, LDLM_PLAIN, NULL, 0, LCK_NL,
420                                             inode, &old_lock)) {
421                                 ldlm_lock_decref_and_cancel(&lockh,
422                                                             it->it_lock_mode);
423                                 memcpy(&lockh, &old_lock, sizeof(old_lock));
424                                 memcpy(it->it_lock_handle, &lockh,
425                                        sizeof(lockh));
426                         }
427                 }
428
429         }
430         ptlrpc_req_finished(request);
431
432         CDEBUG(D_DENTRY, "D_IT dentry %p intent: %s status %d disp %x\n",
433                dentry, ldlm_it2str(it->it_op), it->it_status, it->it_disposition);
434         
435         /* drop IT_LOOKUP locks */
436         if (it->it_op == IT_LOOKUP)
437                 ll_intent_release(it);
438         RETURN(rc);
439
440  drop_lock:
441         ll_intent_release(it);
442  drop_req:
443         ptlrpc_req_finished(request);
444         RETURN(rc);
445 }
446
447 /* Search "inode"'s alias list for a dentry that has the same name and parent as
448  * de.  If found, return it.  If not found, return de. */
449 struct dentry *ll_find_alias(struct inode *inode, struct dentry *de)
450 {
451         struct list_head *tmp;
452
453         spin_lock(&dcache_lock);
454         list_for_each(tmp, &inode->i_dentry) {
455                 struct dentry *dentry = list_entry(tmp, struct dentry, d_alias);
456
457                 /* We are called here with 'de' already on the aliases list. */
458                 if (dentry == de) {
459                         CERROR("whoops\n");
460                         continue;
461                 }
462
463                 if (dentry->d_parent != de->d_parent)
464                         continue;
465
466                 if (dentry->d_name.len != de->d_name.len)
467                         continue;
468
469                 if (memcmp(dentry->d_name.name, de->d_name.name,
470                            de->d_name.len) != 0)
471                         continue;
472
473                 if (!list_empty(&dentry->d_lru))
474                         list_del_init(&dentry->d_lru);
475
476                 hlist_del_init(&dentry->d_hash);
477                 __d_rehash(dentry, 0); /* avoid taking dcache_lock inside */
478                 spin_unlock(&dcache_lock);
479                 atomic_inc(&dentry->d_count);
480                 iput(inode);
481                 dentry->d_flags &= ~DCACHE_LUSTRE_INVALID;
482                 return dentry;
483         }
484
485         spin_unlock(&dcache_lock);
486
487         return de;
488 }
489
490 static int
491 lookup2_finish(struct ptlrpc_request *request,
492                struct inode *parent, struct dentry **de,
493                struct lookup_intent *it, int offset, obd_id ino)
494 {
495         struct ll_sb_info *sbi = ll_i2sbi(parent);
496         struct dentry *dentry = *de, *saved = *de;
497         struct inode *inode = NULL;
498         int rc;
499
500         /* NB 1 request reference will be taken away by ll_intent_lock()
501          * when I return */
502         if (!it_disposition(it, DISP_LOOKUP_NEG)) {
503                 struct lustre_md md;
504                 ENTRY;
505
506                 rc =mdc_req2lustre_md(request, offset, &sbi->ll_osc_conn, &md);
507                 if (rc) 
508                         RETURN(rc);
509
510                 inode = ll_iget(dentry->d_sb, ino, &md);
511                 if (!inode) {
512                         /* free the lsm if we allocated one above */
513                         if (md.lsm != NULL)
514                                 obd_free_memmd(&sbi->ll_osc_conn, &md.lsm);
515                         RETURN(-ENOMEM);
516                 } else if (md.lsm != NULL &&
517                            ll_i2info(inode)->lli_smd != md.lsm) {
518                         obd_free_memmd(&sbi->ll_osc_conn, &md.lsm);
519                 }
520
521                 /* If this is a stat, get the authoritative file size */
522                 if (it->it_op == IT_GETATTR && S_ISREG(inode->i_mode) &&
523                     ll_i2info(inode)->lli_smd != NULL) {
524                         struct ldlm_extent extent = {0, OBD_OBJECT_EOF};
525                         struct lustre_handle lockh = {0};
526                         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
527                         ldlm_error_t rc;
528
529                         LASSERT(lsm->lsm_object_id != 0);
530
531                         rc = ll_extent_lock(NULL, inode, lsm, LCK_PR, &extent,
532                                             &lockh);
533                         if (rc != ELDLM_OK) {
534                                 iput(inode);
535                                 RETURN(-EIO);
536                         }
537                         ll_extent_unlock(NULL, inode, lsm, LCK_PR, &lockh);
538                 }
539
540                 dentry = *de = ll_find_alias(inode, dentry);
541
542                 /* We asked for a lock on the directory, and may have been
543                  * granted a lock on the inode.  Just in case, fixup the data
544                  * pointer. */
545                 CDEBUG(D_DLMTRACE, "setting l_data to inode %p (%lu/%u)\n",
546                        inode, inode->i_ino, inode->i_generation);
547                 ldlm_lock_set_data((struct lustre_handle*)it->it_lock_handle,
548                                    inode);
549         } else {
550                 ENTRY;
551         }
552
553         dentry->d_op = &ll_d_ops;
554         ll_set_dd(dentry);
555
556         if (dentry == saved)
557                 d_add(dentry, inode);
558
559         RETURN(0);
560 }
561
562 static struct dentry *ll_lookup_it(struct inode *parent, struct dentry *dentry,
563                                    struct lookup_intent *it, int flags)
564 {
565         struct dentry *save = dentry, *retval;
566         int rc;
567         ENTRY;
568
569         CDEBUG(D_VFSTRACE, "VFS Op:name=%s,dir=%lu/%u(%p),intent=%s\n",
570                dentry->d_name.name, parent->i_ino, parent->i_generation,
571                parent, LL_IT2STR(it));
572
573         if (d_mountpoint(dentry)) { 
574                 CERROR("Tell Peter, lookup on mtpt, it %s\n", LL_IT2STR(it));
575         }
576
577         rc = ll_intent_lock(parent, &dentry, it, flags, lookup2_finish);
578         if (rc < 0) {
579                 CDEBUG(D_INFO, "ll_intent_lock: %d\n", rc);
580                 GOTO(out, retval = ERR_PTR(rc));
581         }
582
583         if (dentry == save)
584                 GOTO(out, retval = NULL);
585         else
586                 GOTO(out, retval = dentry);
587  out:
588         return retval;
589 }
590
591 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
592 static struct dentry *ll_lookup_nd(struct inode *parent, struct dentry *dentry, 
593                                    struct nameidata *nd)
594 {
595         struct dentry *de;
596         ENTRY;
597
598         if (nd->flags & LOOKUP_LAST && !(nd->flags & LOOKUP_LINK_NOTLAST))
599                 de = ll_lookup_it(parent, dentry, &nd->it, nd->flags);
600         else 
601                 de = ll_lookup_it(parent, dentry, NULL, 0);
602
603         RETURN(de);
604 }
605 #endif
606
607 static int ll_mdc_unlink(struct inode *dir, struct inode *child, __u32 mode,
608                          const char *name, int len)
609 {
610         struct ptlrpc_request *request = NULL;
611         struct mds_body *body;
612         struct lov_mds_md *eadata;
613         struct lov_stripe_md *lsm = NULL;
614         struct obd_trans_info oti = { 0 };
615         struct mdc_op_data op_data;
616         struct obdo *oa;
617         int rc;
618         ENTRY;
619
620         ll_prepare_mdc_op_data(&op_data, dir, child, name, len, mode);
621         rc = mdc_unlink(&ll_i2sbi(dir)->ll_mdc_conn, &op_data, &request);
622         if (rc)
623                 GOTO(out, rc);
624         /* req is swabbed so this is safe */
625         body = lustre_msg_buf(request->rq_repmsg, 0, sizeof(*body));
626
627         if (!(body->valid & OBD_MD_FLEASIZE))
628                 GOTO(out, rc = 0);
629
630         if (body->eadatasize == 0) {
631                 CERROR("OBD_MD_FLEASIZE set but eadatasize zero\n");
632                 GOTO(out, rc = -EPROTO);
633         }
634
635         /* The MDS sent back the EA because we unlinked the last reference
636          * to this file. Use this EA to unlink the objects on the OST.
637          * It's opaque so we don't swab here; we leave it to obd_unpackmd() to
638          * check it is complete and sensible. */
639         eadata = lustre_swab_repbuf(request, 1, body->eadatasize, NULL);
640         LASSERT(eadata != NULL);
641         if (eadata == NULL) {
642                 CERROR("Can't unpack MDS EA data\n");
643                 GOTO(out, rc = -EPROTO);
644         }
645
646         rc = obd_unpackmd(ll_i2obdconn(dir), &lsm, eadata, body->eadatasize);
647         if (rc < 0) {
648                 CERROR("obd_unpackmd: %d\n", rc);
649                 GOTO(out, rc);
650         }
651         LASSERT(rc >= sizeof(*lsm));
652
653         oa = obdo_alloc();
654         if (oa == NULL)
655                 GOTO(out_free_memmd, rc = -ENOMEM);
656
657         oa->o_id = lsm->lsm_object_id;
658         oa->o_mode = body->mode & S_IFMT;
659         oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE;
660
661         if (body->valid & OBD_MD_FLCOOKIE) {
662                 oa->o_valid |= OBD_MD_FLCOOKIE;
663                 oti.oti_logcookies = lustre_msg_buf(request->rq_repmsg, 3,
664                                                     body->eadatasize);
665         }
666
667         rc = obd_destroy(ll_i2obdconn(dir), oa, lsm, &oti);
668         obdo_free(oa);
669         if (rc)
670                 CERROR("obd destroy objid "LPX64": rc %d\n",
671                        lsm->lsm_object_id, rc);
672  out_free_memmd:
673         obd_free_memmd(ll_i2obdconn(dir), &lsm);
674  out:
675         ptlrpc_req_finished(request);
676         return rc;
677 }
678
679 /* We depend on "mode" being set with the proper file type/umask by now */
680 static struct inode *ll_create_node(struct inode *dir, const char *name,
681                                     int namelen, const void *data, int datalen,
682                                     int mode, __u64 extra,
683                                     struct lookup_intent *it)
684 {
685         struct inode *inode;
686         struct ptlrpc_request *request = NULL;
687         struct ll_sb_info *sbi = ll_i2sbi(dir);
688         struct lustre_md md;
689         int rc;
690         ENTRY;
691
692         LASSERT(it && it->it_disposition);
693
694         ll_invalidate_inode_pages(dir);
695
696         request = it->it_data;
697         rc = mdc_req2lustre_md(request, 1, &sbi->ll_osc_conn, &md);
698         if (rc) { 
699                 GOTO(out, inode = ERR_PTR(rc));
700         }
701
702         inode = ll_iget(dir->i_sb, md.body->ino, &md);
703         if (!inode || is_bad_inode(inode)) {
704                 /* XXX might need iput() for bad inode */
705                 int rc = -EIO;
706                 CERROR("new_inode -fatal: rc %d\n", rc);
707                 LBUG();
708                 GOTO(out, rc);
709         }
710         LASSERT(list_empty(&inode->i_dentry));
711
712         CDEBUG(D_DLMTRACE, "setting l_data to inode %p (%lu/%u)\n",
713                inode, inode->i_ino, inode->i_generation);
714         ldlm_lock_set_data((struct lustre_handle*)it->it_lock_handle,
715                            inode);
716
717         EXIT;
718  out:
719         ptlrpc_req_finished(request);
720         return inode;
721 }
722
723 /*
724  * By the time this is called, we already have created the directory cache
725  * entry for the new file, but it is so far negative - it has no inode.
726  *
727  * We defer creating the OBD object(s) until open, to keep the intent and
728  * non-intent code paths similar, and also because we do not have the MDS
729  * inode number before calling ll_create_node() (which is needed for LOV),
730  * so we would need to do yet another RPC to the MDS to store the LOV EA
731  * data on the MDS.  If needed, we would pass the PACKED lmm as data and
732  * lmm_size in datalen (the MDS still has code which will handle that).
733  *
734  * If the create succeeds, we fill in the inode information
735  * with d_instantiate().
736  */
737 static int ll_create_it(struct inode *dir, struct dentry *dentry, int mode, struct lookup_intent *it)
738 {
739         struct inode *inode;
740         struct ptlrpc_request *request = it->it_data;
741         int rc = 0;
742         ENTRY;
743
744         CDEBUG(D_VFSTRACE, "VFS Op:name=%s,dir=%lu/%u(%p),intent=%s\n",
745                dentry->d_name.name, dir->i_ino, dir->i_generation, dir,
746                LL_IT2STR(it));
747
748         rc = ll_it_open_error(DISP_OPEN_CREATE, it);
749         if (rc) {
750                 ptlrpc_req_finished(request);
751                 RETURN(rc);
752         }
753
754         mdc_store_inode_generation(request, 2, 1);
755         inode = ll_create_node(dir, dentry->d_name.name, dentry->d_name.len,
756                                NULL, 0, mode, 0, it);
757         if (IS_ERR(inode)) {
758                 RETURN(PTR_ERR(inode));
759         }
760
761         d_instantiate(dentry, inode);
762         RETURN(0);
763 }
764
765 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
766 static int ll_create_nd(struct inode *dir, struct dentry *dentry, int mode, struct nameidata *nd)
767 {
768         return ll_create_it(dir, dentry, mode, &nd->it);
769 }
770 #endif
771
772 static int ll_mknod_raw(struct nameidata *nd, int mode, dev_t rdev)
773 {
774         struct inode *dir = nd->dentry->d_inode;
775         const char *name = nd->last.name;
776         int len = nd->last.len;
777         struct ptlrpc_request *request = NULL;
778         time_t time = LTIME_S(CURRENT_TIME);
779         struct ll_sb_info *sbi = ll_i2sbi(dir);
780         struct mdc_op_data op_data;
781         int err = -EMLINK;
782         ENTRY;
783
784         CDEBUG(D_VFSTRACE, "VFS Op:name=%s,dir=%lu/%u(%p)\n",
785                name, dir->i_ino, dir->i_generation, dir);
786
787         if (dir->i_nlink >= EXT2_LINK_MAX)
788                 RETURN(err);
789
790         mode &= ~current->fs->umask;
791
792         switch (mode & S_IFMT) {
793         case 0: 
794         case S_IFREG:
795                 mode |= S_IFREG; /* for mode = 0 case, fallthrough */
796         case S_IFCHR: 
797         case S_IFBLK:
798         case S_IFIFO: 
799         case S_IFSOCK:
800                 ll_prepare_mdc_op_data(&op_data, dir, NULL, name, len, 0);
801                 err = mdc_create(&sbi->ll_mdc_conn, &op_data, NULL, 0, mode,
802                                  current->fsuid, current->fsgid, time,
803                                  rdev, &request);
804                 ptlrpc_req_finished(request);
805                 break;
806         case S_IFDIR:
807                 err = -EPERM;
808                 break;
809         default:
810                 err = -EINVAL;
811         }
812         RETURN(err);
813 }
814
815 static int ll_symlink_raw(struct nameidata *nd, const char *tgt)
816 {
817         struct inode *dir = nd->dentry->d_inode;
818         const char *name = nd->last.name;
819         int len = nd->last.len;
820         struct ptlrpc_request *request = NULL;
821         time_t time = LTIME_S(CURRENT_TIME);
822         struct ll_sb_info *sbi = ll_i2sbi(dir);
823         struct mdc_op_data op_data;
824         int err = -EMLINK;
825         ENTRY;
826
827         CDEBUG(D_VFSTRACE, "VFS Op:name=%s,dir=%lu/%u(%p),target=%s\n",
828                name, dir->i_ino, dir->i_generation, dir, tgt);
829
830         if (dir->i_nlink >= EXT2_LINK_MAX)
831                 RETURN(err);
832
833         ll_prepare_mdc_op_data(&op_data, dir, NULL, name, len, 0);
834         err = mdc_create(&sbi->ll_mdc_conn, &op_data,
835                          tgt, strlen(tgt) + 1, S_IFLNK | S_IRWXUGO,
836                          current->fsuid, current->fsgid, time, 0, &request);
837         ptlrpc_req_finished(request);
838         RETURN(err);
839 }
840
841 static int ll_link_raw(struct nameidata *srcnd, struct nameidata *tgtnd)
842 {
843         struct inode *src = srcnd->dentry->d_inode;
844         struct inode *dir = tgtnd->dentry->d_inode;
845         const char *name = tgtnd->last.name;
846         int len = tgtnd->last.len;
847         struct ptlrpc_request *request = NULL;
848         struct mdc_op_data op_data;
849         int err;
850         struct ll_sb_info *sbi = ll_i2sbi(dir);
851
852         ENTRY;
853         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),dir=%lu/%u(%p),target=%s\n",
854                src->i_ino, src->i_generation, src,
855                dir->i_ino, dir->i_generation, dir, name);
856
857         ll_prepare_mdc_op_data(&op_data, src, dir, name, len, 0);
858         err = mdc_link(&sbi->ll_mdc_conn, &op_data, &request);
859         ptlrpc_req_finished(request);
860
861         RETURN(err);
862 }
863
864
865 static int ll_mkdir_raw(struct nameidata *nd, int mode)
866 {
867         struct inode *dir = nd->dentry->d_inode;
868         const char *name = nd->last.name;
869         int len = nd->last.len;
870         struct ptlrpc_request *request = NULL;
871         time_t time = LTIME_S(CURRENT_TIME);
872         struct ll_sb_info *sbi = ll_i2sbi(dir);
873         struct mdc_op_data op_data;
874         int err = -EMLINK;
875         ENTRY;
876         CDEBUG(D_VFSTRACE, "VFS Op:name=%s,dir=%lu/%u(%p)\n",
877                name, dir->i_ino, dir->i_generation, dir);
878
879         if (dir->i_nlink >= EXT2_LINK_MAX)
880                 RETURN(err);
881
882         mode = (mode & (S_IRWXUGO|S_ISVTX) & ~current->fs->umask) | S_IFDIR;
883         ll_prepare_mdc_op_data(&op_data, dir, NULL, name, len, 0);
884         err = mdc_create(&sbi->ll_mdc_conn, &op_data, NULL, 0, mode,
885                          current->fsuid, current->fsgid, time, 0, &request);
886         ptlrpc_req_finished(request);
887         RETURN(err);
888 }
889
890 static int ll_rmdir_raw(struct nameidata *nd)
891 {
892         struct inode *dir = nd->dentry->d_inode;
893         const char *name = nd->last.name;
894         int len = nd->last.len;
895         int rc;
896         ENTRY;
897         CDEBUG(D_VFSTRACE, "VFS Op:name=%s,dir=%lu/%u(%p)\n",
898                name, dir->i_ino, dir->i_generation, dir);
899
900         rc = ll_mdc_unlink(dir, NULL, S_IFDIR, name, len);
901         RETURN(rc);
902 }
903
904 static int ll_unlink_raw(struct nameidata *nd)
905 {
906         struct inode *dir = nd->dentry->d_inode;
907         const char *name = nd->last.name;
908         int len = nd->last.len;
909         int rc;
910         ENTRY;
911         CDEBUG(D_VFSTRACE, "VFS Op:name=%s,dir=%lu/%u(%p)\n",
912                name, dir->i_ino, dir->i_generation, dir);
913
914         rc = ll_mdc_unlink(dir, NULL, S_IFREG, name, len);
915         RETURN(rc);
916 }
917
918 static int ll_rename_raw(struct nameidata *oldnd, struct nameidata *newnd)
919 {
920         struct inode *src = oldnd->dentry->d_inode;
921         struct inode *tgt = newnd->dentry->d_inode;
922         const char *oldname = oldnd->last.name;
923         int oldlen  = oldnd->last.len;
924         const char *newname = newnd->last.name;
925         int newlen  = newnd->last.len;
926         struct ptlrpc_request *request = NULL;
927         struct ll_sb_info *sbi = ll_i2sbi(src);
928         struct mdc_op_data op_data;
929         int err;
930         ENTRY;
931         CDEBUG(D_VFSTRACE, "VFS Op:oldname=%s,src_dir=%lu/%u(%p),newname=%s,"
932                "tgt_dir=%lu/%u(%p)\n", oldname, src->i_ino, src->i_generation,
933                src, newname, tgt->i_ino, tgt->i_generation, tgt);
934
935         ll_prepare_mdc_op_data(&op_data, src, tgt, NULL, 0, 0);
936         err = mdc_rename(&sbi->ll_mdc_conn, &op_data,
937                          oldname, oldlen, newname, newlen, &request);
938         ptlrpc_req_finished(request);
939
940         RETURN(err);
941 }
942
943 struct inode_operations ll_dir_inode_operations = {
944         link_raw:           ll_link_raw,
945         unlink_raw:         ll_unlink_raw,
946         symlink_raw:        ll_symlink_raw,
947         mkdir_raw:          ll_mkdir_raw,
948         rmdir_raw:          ll_rmdir_raw,
949         mknod_raw:          ll_mknod_raw,
950         rename_raw:         ll_rename_raw,
951         setattr:         ll_setattr,
952         setattr_raw:     ll_setattr_raw,
953 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
954         create_it:          ll_create_it,
955         lookup_it:            ll_lookup_it,
956         revalidate_it:      ll_inode_revalidate_it,
957 #else
958         lookup_it:          ll_lookup_nd,
959         create_nd:          ll_create_nd,
960         getattr_it:         ll_getattr,
961 #endif
962 };