Whamcloud - gitweb
edc2fb6215ca6421b52fbd063591eef779c91fe1
[fs/lustre-release.git] / lustre / cmm / mdc_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/cmm/mdc_object.c
37  *
38  * Lustre Cluster Metadata Manager (cmm)
39  *
40  * Author: Mike Pershin <tappro@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46
47 #define DEBUG_SUBSYSTEM S_MDS
48 #include <obd_support.h>
49 #include <lustre_lib.h>
50 #include <obd_class.h>
51 #include <lustre_mdc.h>
52 #include "cmm_internal.h"
53 #include "mdc_internal.h"
54
55 static struct md_object_operations mdc_mo_ops;
56 static struct md_dir_operations mdc_dir_ops;
57 static struct lu_object_operations mdc_obj_ops;
58
59 extern struct lu_context_key mdc_thread_key;
60
61 struct lu_object *mdc_object_alloc(const struct lu_env *env,
62                                    const struct lu_object_header *hdr,
63                                    struct lu_device *ld)
64 {
65         struct mdc_object *mco;
66         ENTRY;
67
68         OBD_ALLOC_PTR(mco);
69         if (mco != NULL) {
70                 struct lu_object *lo;
71
72                 lo = &mco->mco_obj.mo_lu;
73                 lu_object_init(lo, NULL, ld);
74                 mco->mco_obj.mo_ops = &mdc_mo_ops;
75                 mco->mco_obj.mo_dir_ops = &mdc_dir_ops;
76                 lo->lo_ops = &mdc_obj_ops;
77                 RETURN(lo);
78         } else
79                 RETURN(NULL);
80 }
81
82 static void mdc_object_free(const struct lu_env *env, struct lu_object *lo)
83 {
84         struct mdc_object *mco = lu2mdc_obj(lo);
85         lu_object_fini(lo);
86         OBD_FREE_PTR(mco);
87 }
88
89 static int mdc_object_init(const struct lu_env *env, struct lu_object *lo)
90 {
91         ENTRY;
92         lo->lo_header->loh_attr |= LOHA_REMOTE;
93         RETURN(0);
94 }
95
96 static int mdc_object_print(const struct lu_env *env, void *cookie,
97                             lu_printer_t p, const struct lu_object *lo)
98 {
99         return (*p)(env, cookie, LUSTRE_CMM_MDC_NAME"-object@%p", lo);
100 }
101
102 static struct lu_object_operations mdc_obj_ops = {
103         .loo_object_init    = mdc_object_init,
104         .loo_object_free    = mdc_object_free,
105         .loo_object_print   = mdc_object_print,
106 };
107
108 /* md_object_operations */
109 static
110 struct mdc_thread_info *mdc_info_get(const struct lu_env *env)
111 {
112         struct mdc_thread_info *mci;
113
114         mci = lu_context_key_get(&env->le_ctx, &mdc_thread_key);
115         LASSERT(mci);
116         return mci;
117 }
118
119 static
120 struct mdc_thread_info *mdc_info_init(const struct lu_env *env)
121 {
122         struct mdc_thread_info *mci = mdc_info_get(env);
123         memset(mci, 0, sizeof(*mci));
124         return mci;
125 }
126
127 static void mdc_body2attr(struct mdt_body *body, struct md_attr *ma)
128 {
129         struct lu_attr *la = &ma->ma_attr;
130         /* update time */
131         if (body->valid & OBD_MD_FLCTIME && body->ctime >= la->la_ctime) {
132                 la->la_ctime = body->ctime;
133                 if (body->valid & OBD_MD_FLMTIME)
134                         la->la_mtime = body->mtime;
135         }
136
137         if (body->valid & OBD_MD_FLMODE)
138                 la->la_mode = body->mode;
139         if (body->valid & OBD_MD_FLSIZE)
140                 la->la_size = body->size;
141         if (body->valid & OBD_MD_FLBLOCKS)
142                 la->la_blocks = body->blocks;
143         if (body->valid & OBD_MD_FLUID)
144                 la->la_uid = body->uid;
145         if (body->valid & OBD_MD_FLGID)
146                 la->la_gid = body->gid;
147         if (body->valid & OBD_MD_FLFLAGS)
148                 la->la_flags = body->flags;
149         if (body->valid & OBD_MD_FLNLINK)
150                 la->la_nlink = body->nlink;
151         if (body->valid & OBD_MD_FLRDEV)
152                 la->la_rdev = body->rdev;
153
154         la->la_valid = body->valid;
155         ma->ma_valid = MA_INODE;
156 }
157
158 static int mdc_req2attr_update(const struct lu_env *env,
159                                struct md_attr *ma)
160 {
161         struct mdc_thread_info *mci;
162         struct ptlrpc_request *req;
163         struct mdt_body *body;
164         struct lov_mds_md *lov;
165         struct llog_cookie *cookie;
166
167         ENTRY;
168         mci = mdc_info_get(env);
169         req = mci->mci_req;
170         LASSERT(req);
171         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
172         LASSERT(body);
173         mdc_body2attr(body, ma);
174
175         if (body->valid & OBD_MD_FLMDSCAPA) {
176                 struct lustre_capa *capa;
177
178                 /* create for cross-ref will fetch mds capa from remote obj */
179                 capa = req_capsule_server_get(&req->rq_pill, &RMF_CAPA1);
180                 LASSERT(capa != NULL);
181                 LASSERT(ma->ma_capa != NULL);
182                 *ma->ma_capa = *capa;
183         }
184                 
185         if (!(body->valid & OBD_MD_FLEASIZE))
186                 RETURN(0);
187
188         if (body->eadatasize == 0) {
189                 CERROR("OBD_MD_FLEASIZE is set but eadatasize is zero\n");
190                 RETURN(-EPROTO);
191         }
192
193         lov = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD,
194                                            body->eadatasize);
195         if (lov == NULL)
196                 RETURN(-EPROTO);
197
198         LASSERT(ma->ma_lmm != NULL);
199         LASSERT(ma->ma_lmm_size >= body->eadatasize); 
200         ma->ma_lmm_size = body->eadatasize;
201         memcpy(ma->ma_lmm, lov, ma->ma_lmm_size);
202         ma->ma_valid |= MA_LOV;
203
204         if (!(body->valid & OBD_MD_FLCOOKIE))
205                 RETURN(0);
206
207         if (body->aclsize == 0) {
208                 CERROR("OBD_MD_FLCOOKIE is set but cookie size is zero\n");
209                 RETURN(-EPROTO);
210         }
211
212         cookie = req_capsule_server_sized_get(&req->rq_pill, &RMF_ACL,
213                                               body->aclsize);
214         if (cookie == NULL)
215                 RETURN(-EPROTO);
216
217         LASSERT(ma->ma_cookie != NULL);
218         LASSERT(ma->ma_cookie_size == body->aclsize);
219         memcpy(ma->ma_cookie, cookie, ma->ma_cookie_size);
220         ma->ma_valid |= MA_COOKIE;
221         RETURN(0);
222 }
223
224 static int mdc_attr_get(const struct lu_env *env, struct md_object *mo,
225                         struct md_attr *ma)
226 {
227         struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
228         struct mdc_thread_info *mci;
229         int rc;
230         ENTRY;
231
232         mci = lu_context_key_get(&env->le_ctx, &mdc_thread_key);
233         LASSERT(mci);
234
235         memset(&mci->mci_opdata, 0, sizeof(mci->mci_opdata));
236
237         rc = md_getattr(mc->mc_desc.cl_exp, lu_object_fid(&mo->mo_lu),
238                         NULL, OBD_MD_FLMODE | OBD_MD_FLUID | OBD_MD_FLGID |
239                         OBD_MD_FLFLAGS | OBD_MD_FLCROSSREF, 0, &mci->mci_req);
240         if (rc == 0) {
241                 /* get attr from request */
242                 rc = mdc_req2attr_update(env, ma);
243         }
244
245         ptlrpc_req_finished(mci->mci_req);
246
247         RETURN(rc);
248 }
249
250 static inline struct timespec *mdc_attr_time(struct timespec *t, __u64 seconds)
251 {
252         t->tv_sec = seconds;
253         t->tv_nsec = 0;
254         return t;
255 }
256
257 /*
258  * XXX: It is only used for set ctime when rename's source on remote MDS.
259  */
260 static int mdc_attr_set(const struct lu_env *env, struct md_object *mo,
261                         const struct md_attr *ma)
262 {
263         struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
264         const struct lu_attr *la = &ma->ma_attr;
265         struct mdc_thread_info *mci;
266         struct md_ucred *uc = md_ucred(env);
267         int rc;
268         ENTRY;
269
270         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
271
272         mci = lu_context_key_get(&env->le_ctx, &mdc_thread_key);
273         LASSERT(mci);
274
275         memset(&mci->mci_opdata, 0, sizeof(mci->mci_opdata));
276
277         mci->mci_opdata.op_fid1 = *lu_object_fid(&mo->mo_lu);
278         mdc_attr_time(&mci->mci_opdata.op_attr.ia_ctime, la->la_ctime);
279         mci->mci_opdata.op_attr.ia_mode = la->la_mode;
280         mci->mci_opdata.op_attr.ia_valid = ATTR_CTIME_SET;
281         if (uc &&
282             ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) {
283                 mci->mci_opdata.op_fsuid = uc->mu_fsuid;
284                 mci->mci_opdata.op_fsgid = uc->mu_fsgid;
285                 mci->mci_opdata.op_cap = uc->mu_cap;
286                 if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD)) {
287                         mci->mci_opdata.op_suppgids[0] = uc->mu_suppgids[0];
288                         mci->mci_opdata.op_suppgids[1] = uc->mu_suppgids[1];
289                 } else {
290                         mci->mci_opdata.op_suppgids[0] =
291                                 mci->mci_opdata.op_suppgids[1] = -1;
292                 }
293         } else {
294                 mci->mci_opdata.op_fsuid = la->la_uid;
295                 mci->mci_opdata.op_fsgid = la->la_gid;
296                 mci->mci_opdata.op_cap = cfs_curproc_cap_pack();
297                 mci->mci_opdata.op_suppgids[0] =
298                                 mci->mci_opdata.op_suppgids[1] = -1;
299         }
300
301         rc = md_setattr(mc->mc_desc.cl_exp, &mci->mci_opdata,
302                         NULL, 0, NULL, 0, &mci->mci_req, NULL);
303
304         ptlrpc_req_finished(mci->mci_req);
305
306         RETURN(rc);
307 }
308
309 static int mdc_object_create(const struct lu_env *env,
310                              struct md_object *mo,
311                              const struct md_op_spec *spec,
312                              struct md_attr *ma)
313 {
314         struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
315         struct lu_attr *la = &ma->ma_attr;
316         struct mdc_thread_info *mci;
317         const void *symname;
318         struct md_ucred *uc = md_ucred(env);
319         int rc, symlen;
320         uid_t uid;
321         gid_t gid;
322         cfs_cap_t cap;
323         ENTRY;
324
325         LASSERT(S_ISDIR(la->la_mode));
326         LASSERT(spec->u.sp_pfid != NULL);
327
328         mci = mdc_info_init(env);
329         mci->mci_opdata.op_bias = MDS_CROSS_REF;
330         mci->mci_opdata.op_fid2 = *lu_object_fid(&mo->mo_lu);
331         
332         /* Parent fid is needed to create dotdot on the remote node. */
333         mci->mci_opdata.op_fid1 = *(spec->u.sp_pfid);
334         mci->mci_opdata.op_mod_time = la->la_ctime;
335         if (uc &&
336             ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) {
337                 uid = uc->mu_fsuid;
338                 if (la->la_mode & S_ISGID)
339                         gid = la->la_gid;
340                 else
341                         gid = uc->mu_fsgid;
342                 cap = uc->mu_cap;
343                 if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD))
344                         mci->mci_opdata.op_suppgids[0] = uc->mu_suppgids[0];
345                 else
346                         mci->mci_opdata.op_suppgids[0] = -1;
347         } else {
348                 uid = la->la_uid;
349                 gid = la->la_gid;
350                 cap = 0;
351                 mci->mci_opdata.op_suppgids[0] = -1;
352         }
353
354         /* get data from spec */
355         if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
356                 symname = spec->u.sp_ea.eadata;
357                 symlen = spec->u.sp_ea.eadatalen;
358                 mci->mci_opdata.op_fid1 = *(spec->u.sp_ea.fid);
359                 mci->mci_opdata.op_flags |= MDS_CREATE_SLAVE_OBJ;
360 #ifdef CONFIG_FS_POSIX_ACL
361         } else if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
362                 symname = spec->u.sp_ea.eadata;
363                 symlen = spec->u.sp_ea.eadatalen;
364                 mci->mci_opdata.op_fid1 = *(spec->u.sp_ea.fid);
365                 mci->mci_opdata.op_flags |= MDS_CREATE_RMT_ACL;
366 #endif
367         } else {
368                 symname = spec->u.sp_symname;
369                 symlen = symname ? strlen(symname) + 1 : 0;
370         }
371
372         rc = md_create(mc->mc_desc.cl_exp, &mci->mci_opdata,
373                        symname, symlen, la->la_mode, uid, gid,
374                        cap, la->la_rdev, &mci->mci_req);
375
376         if (rc == 0) {
377                 /* get attr from request */
378                 rc = mdc_req2attr_update(env, ma);
379         }
380
381         ptlrpc_req_finished(mci->mci_req);
382
383         RETURN(rc);
384 }
385
386 static int mdc_ref_add(const struct lu_env *env, struct md_object *mo,
387                        const struct md_attr *ma)
388 {
389         struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
390         const struct lu_attr *la = &ma->ma_attr;
391         struct mdc_thread_info *mci;
392         struct md_ucred *uc = md_ucred(env);
393         int rc;
394         ENTRY;
395
396         mci = lu_context_key_get(&env->le_ctx, &mdc_thread_key);
397         LASSERT(mci);
398
399         memset(&mci->mci_opdata, 0, sizeof(mci->mci_opdata));
400         mci->mci_opdata.op_bias = MDS_CROSS_REF;
401         mci->mci_opdata.op_fid1 = *lu_object_fid(&mo->mo_lu);
402         mci->mci_opdata.op_mod_time = la->la_ctime;
403         if (uc &&
404             ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) {
405                 mci->mci_opdata.op_fsuid = uc->mu_fsuid;
406                 mci->mci_opdata.op_fsgid = uc->mu_fsgid;
407                 mci->mci_opdata.op_cap = uc->mu_cap;
408                 if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD)) {
409                         mci->mci_opdata.op_suppgids[0] = uc->mu_suppgids[0];
410                         mci->mci_opdata.op_suppgids[1] = uc->mu_suppgids[1];
411                 } else {
412                         mci->mci_opdata.op_suppgids[0] =
413                                 mci->mci_opdata.op_suppgids[1] = -1;
414                 }
415         } else {
416                 mci->mci_opdata.op_fsuid = la->la_uid;
417                 mci->mci_opdata.op_fsgid = la->la_gid;
418                 mci->mci_opdata.op_cap = cfs_curproc_cap_pack();
419                 mci->mci_opdata.op_suppgids[0] =
420                                 mci->mci_opdata.op_suppgids[1] = -1;
421         }
422
423
424         rc = md_link(mc->mc_desc.cl_exp, &mci->mci_opdata, &mci->mci_req);
425
426         ptlrpc_req_finished(mci->mci_req);
427
428         RETURN(rc);
429 }
430
431 static int mdc_ref_del(const struct lu_env *env, struct md_object *mo,
432                        struct md_attr *ma)
433 {
434         struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
435         struct lu_attr *la = &ma->ma_attr;
436         struct mdc_thread_info *mci;
437         struct md_ucred *uc = md_ucred(env);
438         int rc;
439         ENTRY;
440
441         mci = mdc_info_init(env);
442         mci->mci_opdata.op_bias = MDS_CROSS_REF;
443         if (ma->ma_attr_flags & MDS_VTX_BYPASS)
444                 mci->mci_opdata.op_bias |= MDS_VTX_BYPASS;
445         else
446                 mci->mci_opdata.op_bias &= ~MDS_VTX_BYPASS;
447         mci->mci_opdata.op_fid1 = *lu_object_fid(&mo->mo_lu);
448         mci->mci_opdata.op_mode = la->la_mode;
449         mci->mci_opdata.op_mod_time = la->la_ctime;
450         if (uc &&
451             ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) {
452                 mci->mci_opdata.op_fsuid = uc->mu_fsuid;
453                 mci->mci_opdata.op_fsgid = uc->mu_fsgid;
454                 mci->mci_opdata.op_cap = uc->mu_cap;
455                 if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD))
456                         mci->mci_opdata.op_suppgids[0] = uc->mu_suppgids[0];
457                 else
458                         mci->mci_opdata.op_suppgids[0] = -1;
459         } else {
460                 mci->mci_opdata.op_fsuid = la->la_uid;
461                 mci->mci_opdata.op_fsgid = la->la_gid;
462                 mci->mci_opdata.op_cap = cfs_curproc_cap_pack();
463                 mci->mci_opdata.op_suppgids[0] = -1;
464         }
465
466         rc = md_unlink(mc->mc_desc.cl_exp, &mci->mci_opdata, &mci->mci_req);
467         if (rc == 0) {
468                 /* get attr from request */
469                 rc = mdc_req2attr_update(env, ma);
470         }
471
472         ptlrpc_req_finished(mci->mci_req);
473
474         RETURN(rc);
475 }
476
477 #ifdef HAVE_SPLIT_SUPPORT
478 int mdc_send_page(struct cmm_device *cm, const struct lu_env *env,
479                   struct md_object *mo, struct page *page, __u32 offset)
480 {
481         struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
482         int rc;
483         ENTRY;
484
485         rc = mdc_sendpage(mc->mc_desc.cl_exp, lu_object_fid(&mo->mo_lu),
486                           page, offset);
487         CDEBUG(rc ? D_ERROR : D_INFO, "send page %p  offset %d fid "DFID
488                " rc %d \n", page, offset, PFID(lu_object_fid(&mo->mo_lu)), rc);
489         RETURN(rc);
490 }
491 #endif
492
493 static struct md_object_operations mdc_mo_ops = {
494         .moo_attr_get       = mdc_attr_get,
495         .moo_attr_set       = mdc_attr_set,
496         .moo_object_create  = mdc_object_create,
497         .moo_ref_add        = mdc_ref_add,
498         .moo_ref_del        = mdc_ref_del,
499 };
500
501 /* md_dir_operations */
502 static int mdc_rename_tgt(const struct lu_env *env, struct md_object *mo_p,
503                           struct md_object *mo_t, const struct lu_fid *lf,
504                           const struct lu_name *lname, struct md_attr *ma)
505 {
506         struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo_p));
507         struct lu_attr *la = &ma->ma_attr;
508         struct mdc_thread_info *mci;
509         struct md_ucred *uc = md_ucred(env);
510         int rc;
511         ENTRY;
512
513         mci = mdc_info_init(env);
514         mci->mci_opdata.op_bias = MDS_CROSS_REF;
515         if (ma->ma_attr_flags & MDS_VTX_BYPASS)
516                 mci->mci_opdata.op_bias |= MDS_VTX_BYPASS;
517         else
518                 mci->mci_opdata.op_bias &= ~MDS_VTX_BYPASS;
519         mci->mci_opdata.op_fid1 = *lu_object_fid(&mo_p->mo_lu);
520         mci->mci_opdata.op_fid2 = *lf;
521         mci->mci_opdata.op_mode = la->la_mode;
522         mci->mci_opdata.op_mod_time = la->la_ctime;
523         if (uc &&
524             ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) {
525                 mci->mci_opdata.op_fsuid = uc->mu_fsuid;
526                 mci->mci_opdata.op_fsgid = uc->mu_fsgid;
527                 mci->mci_opdata.op_cap = uc->mu_cap;
528                 if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD)) {
529                         mci->mci_opdata.op_suppgids[0] = uc->mu_suppgids[0];
530                         mci->mci_opdata.op_suppgids[1] = uc->mu_suppgids[1];
531                 } else {
532                         mci->mci_opdata.op_suppgids[0] =
533                                 mci->mci_opdata.op_suppgids[1] = -1;
534                 }
535         } else {
536                 mci->mci_opdata.op_fsuid = la->la_uid;
537                 mci->mci_opdata.op_fsgid = la->la_gid;
538                 mci->mci_opdata.op_cap = cfs_curproc_cap_pack();
539                 mci->mci_opdata.op_suppgids[0] =
540                                 mci->mci_opdata.op_suppgids[1] = -1;
541         }
542
543         rc = md_rename(mc->mc_desc.cl_exp, &mci->mci_opdata, NULL, 0,
544                        lname->ln_name, lname->ln_namelen, &mci->mci_req);
545         if (rc == 0) {
546                 /* get attr from request */
547                 mdc_req2attr_update(env, ma);
548         }
549
550         ptlrpc_req_finished(mci->mci_req);
551
552         RETURN(rc);
553 }
554 /* 
555  * Return resulting fid in sfid
556  * 0: fids are not relatives
557  * fid: fid at which search stopped
558  */
559 static int mdc_is_subdir(const struct lu_env *env, struct md_object *mo,
560                          const struct lu_fid *fid, struct lu_fid *sfid)
561 {
562         struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
563         struct mdc_thread_info *mci;
564         struct mdt_body *body;
565         int rc;
566         ENTRY;
567
568         mci = mdc_info_init(env);
569
570         rc = md_is_subdir(mc->mc_desc.cl_exp, lu_object_fid(&mo->mo_lu),
571                           fid, &mci->mci_req);
572         if (rc == 0 || rc == -EREMOTE) {
573                 body = req_capsule_server_get(&mci->mci_req->rq_pill,
574                                               &RMF_MDT_BODY);
575                 LASSERT(body->valid & OBD_MD_FLID);
576         
577                 CDEBUG(D_INFO, "Remote mdo_is_subdir(), new src "DFID"\n",
578                        PFID(&body->fid1));
579                 *sfid = body->fid1;
580         }
581         ptlrpc_req_finished(mci->mci_req);
582         RETURN(rc);
583 }
584
585 static struct md_dir_operations mdc_dir_ops = {
586         .mdo_is_subdir   = mdc_is_subdir,
587         .mdo_rename_tgt  = mdc_rename_tgt
588 };