Whamcloud - gitweb
b=16727
[fs/lustre-release.git] / lustre / cmm / mdc_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/cmm/mdc_object.c
37  *
38  * Lustre Cluster Metadata Manager (cmm)
39  *
40  * Author: Mike Pershin <tappro@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46
47 #define DEBUG_SUBSYSTEM S_MDS
48 #include <obd_support.h>
49 #include <lustre_lib.h>
50 #include <obd_class.h>
51 #include <lustre_mdc.h>
52 #include "cmm_internal.h"
53 #include "mdc_internal.h"
54
55 static struct md_object_operations mdc_mo_ops;
56 static struct md_dir_operations mdc_dir_ops;
57 static struct lu_object_operations mdc_obj_ops;
58
59 extern struct lu_context_key mdc_thread_key;
60
61 struct lu_object *mdc_object_alloc(const struct lu_env *env,
62                                    const struct lu_object_header *hdr,
63                                    struct lu_device *ld)
64 {
65         struct mdc_object *mco;
66         ENTRY;
67
68         OBD_ALLOC_PTR(mco);
69         if (mco != NULL) {
70                 struct lu_object *lo;
71
72                 lo = &mco->mco_obj.mo_lu;
73                 lu_object_init(lo, NULL, ld);
74                 mco->mco_obj.mo_ops = &mdc_mo_ops;
75                 mco->mco_obj.mo_dir_ops = &mdc_dir_ops;
76                 lo->lo_ops = &mdc_obj_ops;
77                 RETURN(lo);
78         } else
79                 RETURN(NULL);
80 }
81
82 static void mdc_object_free(const struct lu_env *env, struct lu_object *lo)
83 {
84         struct mdc_object *mco = lu2mdc_obj(lo);
85         lu_object_fini(lo);
86         OBD_FREE_PTR(mco);
87 }
88
89 static int mdc_object_init(const struct lu_env *env, struct lu_object *lo)
90 {
91         ENTRY;
92         lo->lo_header->loh_attr |= LOHA_REMOTE;
93         RETURN(0);
94 }
95
96 static int mdc_object_print(const struct lu_env *env, void *cookie,
97                             lu_printer_t p, const struct lu_object *lo)
98 {
99         return (*p)(env, cookie, LUSTRE_CMM_MDC_NAME"-object@%p", lo);
100 }
101
102 static struct lu_object_operations mdc_obj_ops = {
103         .loo_object_init    = mdc_object_init,
104         .loo_object_free    = mdc_object_free,
105         .loo_object_print   = mdc_object_print,
106 };
107
108 /* md_object_operations */
109 static
110 struct mdc_thread_info *mdc_info_get(const struct lu_env *env)
111 {
112         struct mdc_thread_info *mci;
113
114         mci = lu_context_key_get(&env->le_ctx, &mdc_thread_key);
115         LASSERT(mci);
116         return mci;
117 }
118
119 static
120 struct mdc_thread_info *mdc_info_init(const struct lu_env *env)
121 {
122         struct mdc_thread_info *mci = mdc_info_get(env);
123         memset(mci, 0, sizeof(*mci));
124         return mci;
125 }
126
127 static void mdc_body2attr(struct mdt_body *body, struct md_attr *ma)
128 {
129         struct lu_attr *la = &ma->ma_attr;
130         /* update time */
131         if (body->valid & OBD_MD_FLCTIME && body->ctime >= la->la_ctime) {
132                 la->la_ctime = body->ctime;
133                 if (body->valid & OBD_MD_FLMTIME)
134                         la->la_mtime = body->mtime;
135         }
136
137         if (body->valid & OBD_MD_FLMODE)
138                 la->la_mode = body->mode;
139         if (body->valid & OBD_MD_FLSIZE)
140                 la->la_size = body->size;
141         if (body->valid & OBD_MD_FLBLOCKS)
142                 la->la_blocks = body->blocks;
143         if (body->valid & OBD_MD_FLUID)
144                 la->la_uid = body->uid;
145         if (body->valid & OBD_MD_FLGID)
146                 la->la_gid = body->gid;
147         if (body->valid & OBD_MD_FLFLAGS)
148                 la->la_flags = body->flags;
149         if (body->valid & OBD_MD_FLNLINK)
150                 la->la_nlink = body->nlink;
151         if (body->valid & OBD_MD_FLRDEV)
152                 la->la_rdev = body->rdev;
153
154         la->la_valid = body->valid;
155         ma->ma_valid = MA_INODE;
156 }
157
158 static int mdc_req2attr_update(const struct lu_env *env,
159                                struct md_attr *ma)
160 {
161         struct mdc_thread_info *mci;
162         struct ptlrpc_request *req;
163         struct mdt_body *body;
164         struct lov_mds_md *md;
165         struct llog_cookie *cookie;
166         void *acl;
167
168         ENTRY;
169         mci = mdc_info_get(env);
170         req = mci->mci_req;
171         LASSERT(req);
172         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
173         LASSERT(body);
174         mdc_body2attr(body, ma);
175
176         if (body->valid & OBD_MD_FLMDSCAPA) {
177                 struct lustre_capa *capa;
178
179                 /* create for cross-ref will fetch mds capa from remote obj */
180                 capa = req_capsule_server_get(&req->rq_pill, &RMF_CAPA1);
181                 LASSERT(capa != NULL);
182                 LASSERT(ma->ma_capa != NULL);
183                 *ma->ma_capa = *capa;
184         }
185                 
186         if ((body->valid & OBD_MD_FLEASIZE) || (body->valid & OBD_MD_FLDIREA)) {
187                 if (body->eadatasize == 0) {
188                         CERROR("No size defined for easize field\n");
189                         RETURN(-EPROTO);
190                 }
191
192                 md = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD,
193                                                   body->eadatasize);
194                 if (md == NULL)
195                         RETURN(-EPROTO);
196
197                 LASSERT(ma->ma_lmm != NULL);
198                 LASSERT(ma->ma_lmm_size >= body->eadatasize); 
199                 ma->ma_lmm_size = body->eadatasize;
200                 memcpy(ma->ma_lmm, md, ma->ma_lmm_size);
201                 ma->ma_valid |= MA_LOV;
202         }
203
204         if (body->valid & OBD_MD_FLCOOKIE) {
205                 /*
206                  * ACL and cookie share the same body->aclsize, we need
207                  * to make sure that they both never come here.
208                  */
209                 LASSERT(!(body->valid & OBD_MD_FLACL));
210
211                 if (body->aclsize == 0) {
212                         CERROR("No size defined for cookie field\n");
213                         RETURN(-EPROTO);
214                 }
215
216                 cookie = req_capsule_server_sized_get(&req->rq_pill, 
217                                                       &RMF_LOGCOOKIES,
218                                                       body->aclsize);
219                 if (cookie == NULL)
220                         RETURN(-EPROTO);
221
222                 LASSERT(ma->ma_cookie != NULL);
223                 LASSERT(ma->ma_cookie_size == body->aclsize);
224                 memcpy(ma->ma_cookie, cookie, ma->ma_cookie_size);
225                 ma->ma_valid |= MA_COOKIE;
226         }
227
228 #ifdef CONFIG_FS_POSIX_ACL
229         if (body->valid & OBD_MD_FLACL) {
230                 if (body->aclsize == 0) {
231                         CERROR("No size defined for acl field\n");
232                         RETURN(-EPROTO);
233                 }
234
235                 acl = req_capsule_server_sized_get(&req->rq_pill, 
236                                                    &RMF_ACL,
237                                                    body->aclsize);
238                 if (acl == NULL)
239                         RETURN(-EPROTO);
240
241                 LASSERT(ma->ma_acl != NULL);
242                 LASSERT(ma->ma_acl_size == body->aclsize);
243                 memcpy(ma->ma_acl, acl, ma->ma_acl_size);
244                 ma->ma_valid |= MA_ACL_DEF;
245         }
246 #endif
247
248         RETURN(0);
249 }
250
251 static int mdc_attr_get(const struct lu_env *env, struct md_object *mo,
252                         struct md_attr *ma)
253 {
254         struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
255         struct mdc_thread_info *mci;
256         int rc;
257         ENTRY;
258
259         mci = lu_context_key_get(&env->le_ctx, &mdc_thread_key);
260         LASSERT(mci);
261
262         memset(&mci->mci_opdata, 0, sizeof(mci->mci_opdata));
263
264         rc = md_getattr(mc->mc_desc.cl_exp, lu_object_fid(&mo->mo_lu),
265                         NULL, OBD_MD_FLMODE | OBD_MD_FLUID | OBD_MD_FLGID |
266                         OBD_MD_FLFLAGS | OBD_MD_FLCROSSREF, 0, &mci->mci_req);
267         if (rc == 0) {
268                 /* get attr from request */
269                 rc = mdc_req2attr_update(env, ma);
270         }
271
272         ptlrpc_req_finished(mci->mci_req);
273
274         RETURN(rc);
275 }
276
277 static inline struct timespec *mdc_attr_time(struct timespec *t, __u64 seconds)
278 {
279         t->tv_sec = seconds;
280         t->tv_nsec = 0;
281         return t;
282 }
283
284 /*
285  * XXX: It is only used for set ctime when rename's source on remote MDS.
286  */
287 static int mdc_attr_set(const struct lu_env *env, struct md_object *mo,
288                         const struct md_attr *ma)
289 {
290         struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
291         const struct lu_attr *la = &ma->ma_attr;
292         struct mdc_thread_info *mci;
293         struct md_ucred *uc = md_ucred(env);
294         int rc;
295         ENTRY;
296
297         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
298
299         mci = lu_context_key_get(&env->le_ctx, &mdc_thread_key);
300         LASSERT(mci);
301
302         memset(&mci->mci_opdata, 0, sizeof(mci->mci_opdata));
303
304         mci->mci_opdata.op_fid1 = *lu_object_fid(&mo->mo_lu);
305         mdc_attr_time(&mci->mci_opdata.op_attr.ia_ctime, la->la_ctime);
306         mci->mci_opdata.op_attr.ia_mode = la->la_mode;
307         mci->mci_opdata.op_attr.ia_valid = ATTR_CTIME_SET;
308         if (uc &&
309             ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) {
310                 mci->mci_opdata.op_fsuid = uc->mu_fsuid;
311                 mci->mci_opdata.op_fsgid = uc->mu_fsgid;
312                 mci->mci_opdata.op_cap = uc->mu_cap;
313                 if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD)) {
314                         mci->mci_opdata.op_suppgids[0] = uc->mu_suppgids[0];
315                         mci->mci_opdata.op_suppgids[1] = uc->mu_suppgids[1];
316                 } else {
317                         mci->mci_opdata.op_suppgids[0] =
318                                 mci->mci_opdata.op_suppgids[1] = -1;
319                 }
320         } else {
321                 mci->mci_opdata.op_fsuid = la->la_uid;
322                 mci->mci_opdata.op_fsgid = la->la_gid;
323                 mci->mci_opdata.op_cap = cfs_curproc_cap_pack();
324                 mci->mci_opdata.op_suppgids[0] =
325                                 mci->mci_opdata.op_suppgids[1] = -1;
326         }
327
328         rc = md_setattr(mc->mc_desc.cl_exp, &mci->mci_opdata,
329                         NULL, 0, NULL, 0, &mci->mci_req, NULL);
330
331         ptlrpc_req_finished(mci->mci_req);
332
333         RETURN(rc);
334 }
335
336 static int mdc_object_create(const struct lu_env *env,
337                              struct md_object *mo,
338                              const struct md_op_spec *spec,
339                              struct md_attr *ma)
340 {
341         struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
342         struct lu_attr *la = &ma->ma_attr;
343         struct mdc_thread_info *mci;
344         const void *symname;
345         struct md_ucred *uc = md_ucred(env);
346         int rc, symlen;
347         uid_t uid;
348         gid_t gid;
349         cfs_cap_t cap;
350         ENTRY;
351
352         LASSERT(S_ISDIR(la->la_mode));
353         LASSERT(spec->u.sp_pfid != NULL);
354
355         mci = mdc_info_init(env);
356         mci->mci_opdata.op_bias = MDS_CROSS_REF;
357         mci->mci_opdata.op_fid2 = *lu_object_fid(&mo->mo_lu);
358         
359         /* Parent fid is needed to create dotdot on the remote node. */
360         mci->mci_opdata.op_fid1 = *(spec->u.sp_pfid);
361         mci->mci_opdata.op_mod_time = la->la_ctime;
362         if (uc &&
363             ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) {
364                 uid = uc->mu_fsuid;
365                 if (la->la_mode & S_ISGID)
366                         gid = la->la_gid;
367                 else
368                         gid = uc->mu_fsgid;
369                 cap = uc->mu_cap;
370                 if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD))
371                         mci->mci_opdata.op_suppgids[0] = uc->mu_suppgids[0];
372                 else
373                         mci->mci_opdata.op_suppgids[0] = -1;
374         } else {
375                 uid = la->la_uid;
376                 gid = la->la_gid;
377                 cap = 0;
378                 mci->mci_opdata.op_suppgids[0] = -1;
379         }
380
381         /* get data from spec */
382         if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
383                 symname = spec->u.sp_ea.eadata;
384                 symlen = spec->u.sp_ea.eadatalen;
385                 mci->mci_opdata.op_fid1 = *(spec->u.sp_ea.fid);
386                 mci->mci_opdata.op_flags |= MDS_CREATE_SLAVE_OBJ;
387 #ifdef CONFIG_FS_POSIX_ACL
388         } else if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
389                 symname = spec->u.sp_ea.eadata;
390                 symlen = spec->u.sp_ea.eadatalen;
391                 mci->mci_opdata.op_fid1 = *(spec->u.sp_ea.fid);
392                 mci->mci_opdata.op_flags |= MDS_CREATE_RMT_ACL;
393 #endif
394         } else {
395                 symname = spec->u.sp_symname;
396                 symlen = symname ? strlen(symname) + 1 : 0;
397         }
398
399         rc = md_create(mc->mc_desc.cl_exp, &mci->mci_opdata,
400                        symname, symlen, la->la_mode, uid, gid,
401                        cap, la->la_rdev, &mci->mci_req);
402
403         if (rc == 0) {
404                 /* get attr from request */
405                 rc = mdc_req2attr_update(env, ma);
406         }
407
408         ptlrpc_req_finished(mci->mci_req);
409
410         RETURN(rc);
411 }
412
413 static int mdc_ref_add(const struct lu_env *env, struct md_object *mo,
414                        const struct md_attr *ma)
415 {
416         struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
417         const struct lu_attr *la = &ma->ma_attr;
418         struct mdc_thread_info *mci;
419         struct md_ucred *uc = md_ucred(env);
420         int rc;
421         ENTRY;
422
423         mci = lu_context_key_get(&env->le_ctx, &mdc_thread_key);
424         LASSERT(mci);
425
426         memset(&mci->mci_opdata, 0, sizeof(mci->mci_opdata));
427         mci->mci_opdata.op_bias = MDS_CROSS_REF;
428         mci->mci_opdata.op_fid1 = *lu_object_fid(&mo->mo_lu);
429         mci->mci_opdata.op_mod_time = la->la_ctime;
430         if (uc &&
431             ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) {
432                 mci->mci_opdata.op_fsuid = uc->mu_fsuid;
433                 mci->mci_opdata.op_fsgid = uc->mu_fsgid;
434                 mci->mci_opdata.op_cap = uc->mu_cap;
435                 if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD)) {
436                         mci->mci_opdata.op_suppgids[0] = uc->mu_suppgids[0];
437                         mci->mci_opdata.op_suppgids[1] = uc->mu_suppgids[1];
438                 } else {
439                         mci->mci_opdata.op_suppgids[0] =
440                                 mci->mci_opdata.op_suppgids[1] = -1;
441                 }
442         } else {
443                 mci->mci_opdata.op_fsuid = la->la_uid;
444                 mci->mci_opdata.op_fsgid = la->la_gid;
445                 mci->mci_opdata.op_cap = cfs_curproc_cap_pack();
446                 mci->mci_opdata.op_suppgids[0] =
447                                 mci->mci_opdata.op_suppgids[1] = -1;
448         }
449
450
451         rc = md_link(mc->mc_desc.cl_exp, &mci->mci_opdata, &mci->mci_req);
452
453         ptlrpc_req_finished(mci->mci_req);
454
455         RETURN(rc);
456 }
457
458 static int mdc_ref_del(const struct lu_env *env, struct md_object *mo,
459                        struct md_attr *ma)
460 {
461         struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
462         struct lu_attr *la = &ma->ma_attr;
463         struct mdc_thread_info *mci;
464         struct md_ucred *uc = md_ucred(env);
465         int rc;
466         ENTRY;
467
468         mci = mdc_info_init(env);
469         mci->mci_opdata.op_bias = MDS_CROSS_REF;
470         if (ma->ma_attr_flags & MDS_VTX_BYPASS)
471                 mci->mci_opdata.op_bias |= MDS_VTX_BYPASS;
472         else
473                 mci->mci_opdata.op_bias &= ~MDS_VTX_BYPASS;
474         mci->mci_opdata.op_fid1 = *lu_object_fid(&mo->mo_lu);
475         mci->mci_opdata.op_mode = la->la_mode;
476         mci->mci_opdata.op_mod_time = la->la_ctime;
477         if (uc &&
478             ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) {
479                 mci->mci_opdata.op_fsuid = uc->mu_fsuid;
480                 mci->mci_opdata.op_fsgid = uc->mu_fsgid;
481                 mci->mci_opdata.op_cap = uc->mu_cap;
482                 if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD))
483                         mci->mci_opdata.op_suppgids[0] = uc->mu_suppgids[0];
484                 else
485                         mci->mci_opdata.op_suppgids[0] = -1;
486         } else {
487                 mci->mci_opdata.op_fsuid = la->la_uid;
488                 mci->mci_opdata.op_fsgid = la->la_gid;
489                 mci->mci_opdata.op_cap = cfs_curproc_cap_pack();
490                 mci->mci_opdata.op_suppgids[0] = -1;
491         }
492
493         rc = md_unlink(mc->mc_desc.cl_exp, &mci->mci_opdata, &mci->mci_req);
494         if (rc == 0) {
495                 /* get attr from request */
496                 rc = mdc_req2attr_update(env, ma);
497         }
498
499         ptlrpc_req_finished(mci->mci_req);
500
501         RETURN(rc);
502 }
503
504 #ifdef HAVE_SPLIT_SUPPORT
505 int mdc_send_page(struct cmm_device *cm, const struct lu_env *env,
506                   struct md_object *mo, struct page *page, __u32 offset)
507 {
508         struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
509         int rc;
510         ENTRY;
511
512         rc = mdc_sendpage(mc->mc_desc.cl_exp, lu_object_fid(&mo->mo_lu),
513                           page, offset);
514         CDEBUG(rc ? D_ERROR : D_INFO, "send page %p  offset %d fid "DFID
515                " rc %d \n", page, offset, PFID(lu_object_fid(&mo->mo_lu)), rc);
516         RETURN(rc);
517 }
518 #endif
519
520 static struct md_object_operations mdc_mo_ops = {
521         .moo_attr_get       = mdc_attr_get,
522         .moo_attr_set       = mdc_attr_set,
523         .moo_object_create  = mdc_object_create,
524         .moo_ref_add        = mdc_ref_add,
525         .moo_ref_del        = mdc_ref_del,
526 };
527
528 /* md_dir_operations */
529 static int mdc_rename_tgt(const struct lu_env *env, struct md_object *mo_p,
530                           struct md_object *mo_t, const struct lu_fid *lf,
531                           const struct lu_name *lname, struct md_attr *ma)
532 {
533         struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo_p));
534         struct lu_attr *la = &ma->ma_attr;
535         struct mdc_thread_info *mci;
536         struct md_ucred *uc = md_ucred(env);
537         int rc;
538         ENTRY;
539
540         mci = mdc_info_init(env);
541         mci->mci_opdata.op_bias = MDS_CROSS_REF;
542         if (ma->ma_attr_flags & MDS_VTX_BYPASS)
543                 mci->mci_opdata.op_bias |= MDS_VTX_BYPASS;
544         else
545                 mci->mci_opdata.op_bias &= ~MDS_VTX_BYPASS;
546         mci->mci_opdata.op_fid1 = *lu_object_fid(&mo_p->mo_lu);
547         mci->mci_opdata.op_fid2 = *lf;
548         mci->mci_opdata.op_mode = la->la_mode;
549         mci->mci_opdata.op_mod_time = la->la_ctime;
550         if (uc &&
551             ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) {
552                 mci->mci_opdata.op_fsuid = uc->mu_fsuid;
553                 mci->mci_opdata.op_fsgid = uc->mu_fsgid;
554                 mci->mci_opdata.op_cap = uc->mu_cap;
555                 if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD)) {
556                         mci->mci_opdata.op_suppgids[0] = uc->mu_suppgids[0];
557                         mci->mci_opdata.op_suppgids[1] = uc->mu_suppgids[1];
558                 } else {
559                         mci->mci_opdata.op_suppgids[0] =
560                                 mci->mci_opdata.op_suppgids[1] = -1;
561                 }
562         } else {
563                 mci->mci_opdata.op_fsuid = la->la_uid;
564                 mci->mci_opdata.op_fsgid = la->la_gid;
565                 mci->mci_opdata.op_cap = cfs_curproc_cap_pack();
566                 mci->mci_opdata.op_suppgids[0] =
567                                 mci->mci_opdata.op_suppgids[1] = -1;
568         }
569
570         rc = md_rename(mc->mc_desc.cl_exp, &mci->mci_opdata, NULL, 0,
571                        lname->ln_name, lname->ln_namelen, &mci->mci_req);
572         if (rc == 0) {
573                 /* get attr from request */
574                 mdc_req2attr_update(env, ma);
575         }
576
577         ptlrpc_req_finished(mci->mci_req);
578
579         RETURN(rc);
580 }
581 /* 
582  * Return resulting fid in sfid
583  * 0: fids are not relatives
584  * fid: fid at which search stopped
585  */
586 static int mdc_is_subdir(const struct lu_env *env, struct md_object *mo,
587                          const struct lu_fid *fid, struct lu_fid *sfid)
588 {
589         struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
590         struct mdc_thread_info *mci;
591         struct mdt_body *body;
592         int rc;
593         ENTRY;
594
595         mci = mdc_info_init(env);
596
597         rc = md_is_subdir(mc->mc_desc.cl_exp, lu_object_fid(&mo->mo_lu),
598                           fid, &mci->mci_req);
599         if (rc == 0 || rc == -EREMOTE) {
600                 body = req_capsule_server_get(&mci->mci_req->rq_pill,
601                                               &RMF_MDT_BODY);
602                 LASSERT(body->valid & OBD_MD_FLID);
603         
604                 CDEBUG(D_INFO, "Remote mdo_is_subdir(), new src "DFID"\n",
605                        PFID(&body->fid1));
606                 *sfid = body->fid1;
607         }
608         ptlrpc_req_finished(mci->mci_req);
609         RETURN(rc);
610 }
611
612 static struct md_dir_operations mdc_dir_ops = {
613         .mdo_is_subdir   = mdc_is_subdir,
614         .mdo_rename_tgt  = mdc_rename_tgt
615 };