Whamcloud - gitweb
land clio.
[fs/lustre-release.git] / lustre / cmm / mdc_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/cmm/mdc_object.c
37  *
38  * Lustre Cluster Metadata Manager (cmm)
39  *
40  * Author: Mike Pershin <tappro@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46
47 #define DEBUG_SUBSYSTEM S_MDS
48 #include <obd_support.h>
49 #include <lustre_lib.h>
50 #include <obd_class.h>
51 #include <lustre_mdc.h>
52 #include "cmm_internal.h"
53 #include "mdc_internal.h"
54
55 static const struct md_object_operations mdc_mo_ops;
56 static const struct md_dir_operations mdc_dir_ops;
57 static const struct lu_object_operations mdc_obj_ops;
58
59 extern struct lu_context_key mdc_thread_key;
60
61 struct lu_object *mdc_object_alloc(const struct lu_env *env,
62                                    const struct lu_object_header *hdr,
63                                    struct lu_device *ld)
64 {
65         struct mdc_object *mco;
66         ENTRY;
67
68         OBD_ALLOC_PTR(mco);
69         if (mco != NULL) {
70                 struct lu_object *lo;
71
72                 lo = &mco->mco_obj.mo_lu;
73                 lu_object_init(lo, NULL, ld);
74                 mco->mco_obj.mo_ops = &mdc_mo_ops;
75                 mco->mco_obj.mo_dir_ops = &mdc_dir_ops;
76                 lo->lo_ops = &mdc_obj_ops;
77                 RETURN(lo);
78         } else
79                 RETURN(NULL);
80 }
81
82 static void mdc_object_free(const struct lu_env *env, struct lu_object *lo)
83 {
84         struct mdc_object *mco = lu2mdc_obj(lo);
85         lu_object_fini(lo);
86         OBD_FREE_PTR(mco);
87 }
88
89 static int mdc_object_init(const struct lu_env *env, struct lu_object *lo,
90                            const struct lu_object_conf *_)
91 {
92         ENTRY;
93         lo->lo_header->loh_attr |= LOHA_REMOTE;
94         RETURN(0);
95 }
96
97 static const struct lu_object_operations mdc_obj_ops = {
98         .loo_object_init    = mdc_object_init,
99         .loo_object_free    = mdc_object_free,
100 };
101
102 /* md_object_operations */
103 static
104 struct mdc_thread_info *mdc_info_get(const struct lu_env *env)
105 {
106         struct mdc_thread_info *mci;
107
108         mci = lu_context_key_get(&env->le_ctx, &mdc_thread_key);
109         LASSERT(mci);
110         return mci;
111 }
112
113 static
114 struct mdc_thread_info *mdc_info_init(const struct lu_env *env)
115 {
116         struct mdc_thread_info *mci = mdc_info_get(env);
117         memset(mci, 0, sizeof(*mci));
118         return mci;
119 }
120
121 static void mdc_body2attr(struct mdt_body *body, struct md_attr *ma)
122 {
123         struct lu_attr *la = &ma->ma_attr;
124         /* update time */
125         if (body->valid & OBD_MD_FLCTIME && body->ctime >= la->la_ctime) {
126                 la->la_ctime = body->ctime;
127                 if (body->valid & OBD_MD_FLMTIME)
128                         la->la_mtime = body->mtime;
129         }
130
131         if (body->valid & OBD_MD_FLMODE)
132                 la->la_mode = body->mode;
133         if (body->valid & OBD_MD_FLSIZE)
134                 la->la_size = body->size;
135         if (body->valid & OBD_MD_FLBLOCKS)
136                 la->la_blocks = body->blocks;
137         if (body->valid & OBD_MD_FLUID)
138                 la->la_uid = body->uid;
139         if (body->valid & OBD_MD_FLGID)
140                 la->la_gid = body->gid;
141         if (body->valid & OBD_MD_FLFLAGS)
142                 la->la_flags = body->flags;
143         if (body->valid & OBD_MD_FLNLINK)
144                 la->la_nlink = body->nlink;
145         if (body->valid & OBD_MD_FLRDEV)
146                 la->la_rdev = body->rdev;
147
148         la->la_valid = body->valid;
149         ma->ma_valid = MA_INODE;
150 }
151
152 static int mdc_req2attr_update(const struct lu_env *env,
153                                struct md_attr *ma)
154 {
155         struct mdc_thread_info *mci;
156         struct ptlrpc_request *req;
157         struct mdt_body *body;
158         struct lov_mds_md *md;
159         struct llog_cookie *cookie;
160         void *acl;
161
162         ENTRY;
163         mci = mdc_info_get(env);
164         req = mci->mci_req;
165         LASSERT(req);
166         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
167         LASSERT(body);
168         mdc_body2attr(body, ma);
169
170         if (body->valid & OBD_MD_FLMDSCAPA) {
171                 struct lustre_capa *capa;
172
173                 /* create for cross-ref will fetch mds capa from remote obj */
174                 capa = req_capsule_server_get(&req->rq_pill, &RMF_CAPA1);
175                 LASSERT(capa != NULL);
176                 LASSERT(ma->ma_capa != NULL);
177                 *ma->ma_capa = *capa;
178         }
179
180         if ((body->valid & OBD_MD_FLEASIZE) || (body->valid & OBD_MD_FLDIREA)) {
181                 if (body->eadatasize == 0) {
182                         CERROR("No size defined for easize field\n");
183                         RETURN(-EPROTO);
184                 }
185
186                 md = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD,
187                                                   body->eadatasize);
188                 if (md == NULL)
189                         RETURN(-EPROTO);
190
191                 LASSERT(ma->ma_lmm != NULL);
192                 LASSERT(ma->ma_lmm_size >= body->eadatasize);
193                 ma->ma_lmm_size = body->eadatasize;
194                 memcpy(ma->ma_lmm, md, ma->ma_lmm_size);
195                 ma->ma_valid |= MA_LOV;
196         }
197
198         if (body->valid & OBD_MD_FLCOOKIE) {
199                 /*
200                  * ACL and cookie share the same body->aclsize, we need
201                  * to make sure that they both never come here.
202                  */
203                 LASSERT(!(body->valid & OBD_MD_FLACL));
204
205                 if (body->aclsize == 0) {
206                         CERROR("No size defined for cookie field\n");
207                         RETURN(-EPROTO);
208                 }
209
210                 cookie = req_capsule_server_sized_get(&req->rq_pill,
211                                                       &RMF_LOGCOOKIES,
212                                                       body->aclsize);
213                 if (cookie == NULL)
214                         RETURN(-EPROTO);
215
216                 LASSERT(ma->ma_cookie != NULL);
217                 LASSERT(ma->ma_cookie_size == body->aclsize);
218                 memcpy(ma->ma_cookie, cookie, ma->ma_cookie_size);
219                 ma->ma_valid |= MA_COOKIE;
220         }
221
222 #ifdef CONFIG_FS_POSIX_ACL
223         if (body->valid & OBD_MD_FLACL) {
224                 if (body->aclsize == 0) {
225                         CERROR("No size defined for acl field\n");
226                         RETURN(-EPROTO);
227                 }
228
229                 acl = req_capsule_server_sized_get(&req->rq_pill,
230                                                    &RMF_ACL,
231                                                    body->aclsize);
232                 if (acl == NULL)
233                         RETURN(-EPROTO);
234
235                 LASSERT(ma->ma_acl != NULL);
236                 LASSERT(ma->ma_acl_size == body->aclsize);
237                 memcpy(ma->ma_acl, acl, ma->ma_acl_size);
238                 ma->ma_valid |= MA_ACL_DEF;
239         }
240 #endif
241
242         RETURN(0);
243 }
244
245 static int mdc_attr_get(const struct lu_env *env, struct md_object *mo,
246                         struct md_attr *ma)
247 {
248         struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
249         struct mdc_thread_info *mci;
250         int rc;
251         ENTRY;
252
253         mci = lu_context_key_get(&env->le_ctx, &mdc_thread_key);
254         LASSERT(mci);
255
256         memset(&mci->mci_opdata, 0, sizeof(mci->mci_opdata));
257
258         rc = md_getattr(mc->mc_desc.cl_exp, lu_object_fid(&mo->mo_lu),
259                         NULL, OBD_MD_FLMODE | OBD_MD_FLUID | OBD_MD_FLGID |
260                         OBD_MD_FLFLAGS | OBD_MD_FLCROSSREF, 0, &mci->mci_req);
261         if (rc == 0) {
262                 /* get attr from request */
263                 rc = mdc_req2attr_update(env, ma);
264         }
265
266         ptlrpc_req_finished(mci->mci_req);
267
268         RETURN(rc);
269 }
270
271 static inline struct timespec *mdc_attr_time(struct timespec *t, __u64 seconds)
272 {
273         t->tv_sec = seconds;
274         t->tv_nsec = 0;
275         return t;
276 }
277
278 /*
279  * XXX: It is only used for set ctime when rename's source on remote MDS.
280  */
281 static int mdc_attr_set(const struct lu_env *env, struct md_object *mo,
282                         const struct md_attr *ma)
283 {
284         struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
285         const struct lu_attr *la = &ma->ma_attr;
286         struct mdc_thread_info *mci;
287         struct md_ucred *uc = md_ucred(env);
288         int rc;
289         ENTRY;
290
291         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
292
293         mci = lu_context_key_get(&env->le_ctx, &mdc_thread_key);
294         LASSERT(mci);
295
296         memset(&mci->mci_opdata, 0, sizeof(mci->mci_opdata));
297
298         mci->mci_opdata.op_fid1 = *lu_object_fid(&mo->mo_lu);
299         mdc_attr_time(&mci->mci_opdata.op_attr.ia_ctime, la->la_ctime);
300         mci->mci_opdata.op_attr.ia_mode = la->la_mode;
301         mci->mci_opdata.op_attr.ia_valid = ATTR_CTIME_SET;
302         if (uc &&
303             ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) {
304                 mci->mci_opdata.op_fsuid = uc->mu_fsuid;
305                 mci->mci_opdata.op_fsgid = uc->mu_fsgid;
306                 mci->mci_opdata.op_cap = uc->mu_cap;
307                 if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD)) {
308                         mci->mci_opdata.op_suppgids[0] = uc->mu_suppgids[0];
309                         mci->mci_opdata.op_suppgids[1] = uc->mu_suppgids[1];
310                 } else {
311                         mci->mci_opdata.op_suppgids[0] =
312                                 mci->mci_opdata.op_suppgids[1] = -1;
313                 }
314         } else {
315                 mci->mci_opdata.op_fsuid = la->la_uid;
316                 mci->mci_opdata.op_fsgid = la->la_gid;
317                 mci->mci_opdata.op_cap = cfs_curproc_cap_pack();
318                 mci->mci_opdata.op_suppgids[0] =
319                                 mci->mci_opdata.op_suppgids[1] = -1;
320         }
321
322         rc = md_setattr(mc->mc_desc.cl_exp, &mci->mci_opdata,
323                         NULL, 0, NULL, 0, &mci->mci_req, NULL);
324
325         ptlrpc_req_finished(mci->mci_req);
326
327         RETURN(rc);
328 }
329
330 static int mdc_object_create(const struct lu_env *env,
331                              struct md_object *mo,
332                              const struct md_op_spec *spec,
333                              struct md_attr *ma)
334 {
335         struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
336         struct lu_attr *la = &ma->ma_attr;
337         struct mdc_thread_info *mci;
338         const void *symname;
339         struct md_ucred *uc = md_ucred(env);
340         int rc, symlen;
341         uid_t uid;
342         gid_t gid;
343         cfs_cap_t cap;
344         ENTRY;
345
346         LASSERT(S_ISDIR(la->la_mode));
347         LASSERT(spec->u.sp_pfid != NULL);
348
349         mci = mdc_info_init(env);
350         mci->mci_opdata.op_bias = MDS_CROSS_REF;
351         mci->mci_opdata.op_fid2 = *lu_object_fid(&mo->mo_lu);
352
353         /* Parent fid is needed to create dotdot on the remote node. */
354         mci->mci_opdata.op_fid1 = *(spec->u.sp_pfid);
355         mci->mci_opdata.op_mod_time = la->la_ctime;
356         if (uc &&
357             ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) {
358                 uid = uc->mu_fsuid;
359                 if (la->la_mode & S_ISGID)
360                         gid = la->la_gid;
361                 else
362                         gid = uc->mu_fsgid;
363                 cap = uc->mu_cap;
364                 if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD))
365                         mci->mci_opdata.op_suppgids[0] = uc->mu_suppgids[0];
366                 else
367                         mci->mci_opdata.op_suppgids[0] = -1;
368         } else {
369                 uid = la->la_uid;
370                 gid = la->la_gid;
371                 cap = 0;
372                 mci->mci_opdata.op_suppgids[0] = -1;
373         }
374
375         /* get data from spec */
376         if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
377                 symname = spec->u.sp_ea.eadata;
378                 symlen = spec->u.sp_ea.eadatalen;
379                 mci->mci_opdata.op_fid1 = *(spec->u.sp_ea.fid);
380                 mci->mci_opdata.op_flags |= MDS_CREATE_SLAVE_OBJ;
381 #ifdef CONFIG_FS_POSIX_ACL
382         } else if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
383                 symname = spec->u.sp_ea.eadata;
384                 symlen = spec->u.sp_ea.eadatalen;
385                 mci->mci_opdata.op_fid1 = *(spec->u.sp_ea.fid);
386                 mci->mci_opdata.op_flags |= MDS_CREATE_RMT_ACL;
387 #endif
388         } else {
389                 symname = spec->u.sp_symname;
390                 symlen = symname ? strlen(symname) + 1 : 0;
391         }
392
393         rc = md_create(mc->mc_desc.cl_exp, &mci->mci_opdata,
394                        symname, symlen, la->la_mode, uid, gid,
395                        cap, la->la_rdev, &mci->mci_req);
396
397         if (rc == 0) {
398                 /* get attr from request */
399                 rc = mdc_req2attr_update(env, ma);
400         }
401
402         ptlrpc_req_finished(mci->mci_req);
403
404         RETURN(rc);
405 }
406
407 static int mdc_ref_add(const struct lu_env *env, struct md_object *mo,
408                        const struct md_attr *ma)
409 {
410         struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
411         const struct lu_attr *la = &ma->ma_attr;
412         struct mdc_thread_info *mci;
413         struct md_ucred *uc = md_ucred(env);
414         int rc;
415         ENTRY;
416
417         mci = lu_context_key_get(&env->le_ctx, &mdc_thread_key);
418         LASSERT(mci);
419
420         memset(&mci->mci_opdata, 0, sizeof(mci->mci_opdata));
421         mci->mci_opdata.op_bias = MDS_CROSS_REF;
422         mci->mci_opdata.op_fid1 = *lu_object_fid(&mo->mo_lu);
423         mci->mci_opdata.op_mod_time = la->la_ctime;
424         if (uc &&
425             ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) {
426                 mci->mci_opdata.op_fsuid = uc->mu_fsuid;
427                 mci->mci_opdata.op_fsgid = uc->mu_fsgid;
428                 mci->mci_opdata.op_cap = uc->mu_cap;
429                 if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD)) {
430                         mci->mci_opdata.op_suppgids[0] = uc->mu_suppgids[0];
431                         mci->mci_opdata.op_suppgids[1] = uc->mu_suppgids[1];
432                 } else {
433                         mci->mci_opdata.op_suppgids[0] =
434                                 mci->mci_opdata.op_suppgids[1] = -1;
435                 }
436         } else {
437                 mci->mci_opdata.op_fsuid = la->la_uid;
438                 mci->mci_opdata.op_fsgid = la->la_gid;
439                 mci->mci_opdata.op_cap = cfs_curproc_cap_pack();
440                 mci->mci_opdata.op_suppgids[0] =
441                                 mci->mci_opdata.op_suppgids[1] = -1;
442         }
443
444
445         rc = md_link(mc->mc_desc.cl_exp, &mci->mci_opdata, &mci->mci_req);
446
447         ptlrpc_req_finished(mci->mci_req);
448
449         RETURN(rc);
450 }
451
452 static int mdc_ref_del(const struct lu_env *env, struct md_object *mo,
453                        struct md_attr *ma)
454 {
455         struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
456         struct lu_attr *la = &ma->ma_attr;
457         struct mdc_thread_info *mci;
458         struct md_ucred *uc = md_ucred(env);
459         int rc;
460         ENTRY;
461
462         mci = mdc_info_init(env);
463         mci->mci_opdata.op_bias = MDS_CROSS_REF;
464         if (ma->ma_attr_flags & MDS_VTX_BYPASS)
465                 mci->mci_opdata.op_bias |= MDS_VTX_BYPASS;
466         else
467                 mci->mci_opdata.op_bias &= ~MDS_VTX_BYPASS;
468         mci->mci_opdata.op_fid1 = *lu_object_fid(&mo->mo_lu);
469         mci->mci_opdata.op_mode = la->la_mode;
470         mci->mci_opdata.op_mod_time = la->la_ctime;
471         if (uc &&
472             ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) {
473                 mci->mci_opdata.op_fsuid = uc->mu_fsuid;
474                 mci->mci_opdata.op_fsgid = uc->mu_fsgid;
475                 mci->mci_opdata.op_cap = uc->mu_cap;
476                 if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD))
477                         mci->mci_opdata.op_suppgids[0] = uc->mu_suppgids[0];
478                 else
479                         mci->mci_opdata.op_suppgids[0] = -1;
480         } else {
481                 mci->mci_opdata.op_fsuid = la->la_uid;
482                 mci->mci_opdata.op_fsgid = la->la_gid;
483                 mci->mci_opdata.op_cap = cfs_curproc_cap_pack();
484                 mci->mci_opdata.op_suppgids[0] = -1;
485         }
486
487         rc = md_unlink(mc->mc_desc.cl_exp, &mci->mci_opdata, &mci->mci_req);
488         if (rc == 0) {
489                 /* get attr from request */
490                 rc = mdc_req2attr_update(env, ma);
491         }
492
493         ptlrpc_req_finished(mci->mci_req);
494
495         RETURN(rc);
496 }
497
498 #ifdef HAVE_SPLIT_SUPPORT
499 int mdc_send_page(struct cmm_device *cm, const struct lu_env *env,
500                   struct md_object *mo, struct page *page, __u32 offset)
501 {
502         struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
503         int rc;
504         ENTRY;
505
506         rc = mdc_sendpage(mc->mc_desc.cl_exp, lu_object_fid(&mo->mo_lu),
507                           page, offset);
508         CDEBUG(rc ? D_ERROR : D_INFO, "send page %p  offset %d fid "DFID
509                " rc %d \n", page, offset, PFID(lu_object_fid(&mo->mo_lu)), rc);
510         RETURN(rc);
511 }
512 #endif
513
514 static const struct md_object_operations mdc_mo_ops = {
515         .moo_attr_get       = mdc_attr_get,
516         .moo_attr_set       = mdc_attr_set,
517         .moo_object_create  = mdc_object_create,
518         .moo_ref_add        = mdc_ref_add,
519         .moo_ref_del        = mdc_ref_del,
520 };
521
522 /* md_dir_operations */
523 static int mdc_rename_tgt(const struct lu_env *env, struct md_object *mo_p,
524                           struct md_object *mo_t, const struct lu_fid *lf,
525                           const struct lu_name *lname, struct md_attr *ma)
526 {
527         struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo_p));
528         struct lu_attr *la = &ma->ma_attr;
529         struct mdc_thread_info *mci;
530         struct md_ucred *uc = md_ucred(env);
531         int rc;
532         ENTRY;
533
534         mci = mdc_info_init(env);
535         mci->mci_opdata.op_bias = MDS_CROSS_REF;
536         if (ma->ma_attr_flags & MDS_VTX_BYPASS)
537                 mci->mci_opdata.op_bias |= MDS_VTX_BYPASS;
538         else
539                 mci->mci_opdata.op_bias &= ~MDS_VTX_BYPASS;
540         mci->mci_opdata.op_fid1 = *lu_object_fid(&mo_p->mo_lu);
541         mci->mci_opdata.op_fid2 = *lf;
542         mci->mci_opdata.op_mode = la->la_mode;
543         mci->mci_opdata.op_mod_time = la->la_ctime;
544         if (uc &&
545             ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) {
546                 mci->mci_opdata.op_fsuid = uc->mu_fsuid;
547                 mci->mci_opdata.op_fsgid = uc->mu_fsgid;
548                 mci->mci_opdata.op_cap = uc->mu_cap;
549                 if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD)) {
550                         mci->mci_opdata.op_suppgids[0] = uc->mu_suppgids[0];
551                         mci->mci_opdata.op_suppgids[1] = uc->mu_suppgids[1];
552                 } else {
553                         mci->mci_opdata.op_suppgids[0] =
554                                 mci->mci_opdata.op_suppgids[1] = -1;
555                 }
556         } else {
557                 mci->mci_opdata.op_fsuid = la->la_uid;
558                 mci->mci_opdata.op_fsgid = la->la_gid;
559                 mci->mci_opdata.op_cap = cfs_curproc_cap_pack();
560                 mci->mci_opdata.op_suppgids[0] =
561                                 mci->mci_opdata.op_suppgids[1] = -1;
562         }
563
564         rc = md_rename(mc->mc_desc.cl_exp, &mci->mci_opdata, NULL, 0,
565                        lname->ln_name, lname->ln_namelen, &mci->mci_req);
566         if (rc == 0) {
567                 /* get attr from request */
568                 mdc_req2attr_update(env, ma);
569         }
570
571         ptlrpc_req_finished(mci->mci_req);
572
573         RETURN(rc);
574 }
575 /*
576  * Return resulting fid in sfid
577  * 0: fids are not relatives
578  * fid: fid at which search stopped
579  */
580 static int mdc_is_subdir(const struct lu_env *env, struct md_object *mo,
581                          const struct lu_fid *fid, struct lu_fid *sfid)
582 {
583         struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
584         struct mdc_thread_info *mci;
585         struct mdt_body *body;
586         int rc;
587         ENTRY;
588
589         mci = mdc_info_init(env);
590
591         rc = md_is_subdir(mc->mc_desc.cl_exp, lu_object_fid(&mo->mo_lu),
592                           fid, &mci->mci_req);
593         if (rc == 0 || rc == -EREMOTE) {
594                 body = req_capsule_server_get(&mci->mci_req->rq_pill,
595                                               &RMF_MDT_BODY);
596                 LASSERT(body->valid & OBD_MD_FLID);
597
598                 CDEBUG(D_INFO, "Remote mdo_is_subdir(), new src "DFID"\n",
599                        PFID(&body->fid1));
600                 *sfid = body->fid1;
601         }
602         ptlrpc_req_finished(mci->mci_req);
603         RETURN(rc);
604 }
605
606 static const struct md_dir_operations mdc_dir_ops = {
607         .mdo_is_subdir   = mdc_is_subdir,
608         .mdo_rename_tgt  = mdc_rename_tgt
609 };