Whamcloud - gitweb
b=20984 cleanup md_op_data and add getstripe -M
[fs/lustre-release.git] / lustre / cmm / mdc_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/cmm/mdc_object.c
37  *
38  * Lustre Cluster Metadata Manager (cmm)
39  *
40  * Author: Mike Pershin <tappro@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46
47 #define DEBUG_SUBSYSTEM S_MDS
48 #include <obd_support.h>
49 #include <lustre_lib.h>
50 #include <obd_class.h>
51 #include <lustre_mdc.h>
52 #include "cmm_internal.h"
53 #include "mdc_internal.h"
54
55 static const struct md_object_operations mdc_mo_ops;
56 static const struct md_dir_operations mdc_dir_ops;
57 static const struct lu_object_operations mdc_obj_ops;
58
59 extern struct lu_context_key mdc_thread_key;
60
61 struct lu_object *mdc_object_alloc(const struct lu_env *env,
62                                    const struct lu_object_header *hdr,
63                                    struct lu_device *ld)
64 {
65         struct mdc_object *mco;
66         ENTRY;
67
68         OBD_ALLOC_PTR(mco);
69         if (mco != NULL) {
70                 struct lu_object *lo;
71
72                 lo = &mco->mco_obj.mo_lu;
73                 lu_object_init(lo, NULL, ld);
74                 mco->mco_obj.mo_ops = &mdc_mo_ops;
75                 mco->mco_obj.mo_dir_ops = &mdc_dir_ops;
76                 lo->lo_ops = &mdc_obj_ops;
77                 RETURN(lo);
78         } else
79                 RETURN(NULL);
80 }
81
82 static void mdc_object_free(const struct lu_env *env, struct lu_object *lo)
83 {
84         struct mdc_object *mco = lu2mdc_obj(lo);
85         lu_object_fini(lo);
86         OBD_FREE_PTR(mco);
87 }
88
89 static int mdc_object_init(const struct lu_env *env, struct lu_object *lo,
90                            const struct lu_object_conf *unused)
91 {
92         ENTRY;
93         lo->lo_header->loh_attr |= LOHA_REMOTE;
94         RETURN(0);
95 }
96
97 static const struct lu_object_operations mdc_obj_ops = {
98         .loo_object_init    = mdc_object_init,
99         .loo_object_free    = mdc_object_free,
100 };
101
102 /* md_object_operations */
103 static
104 struct mdc_thread_info *mdc_info_get(const struct lu_env *env)
105 {
106         struct mdc_thread_info *mci;
107
108         mci = lu_context_key_get(&env->le_ctx, &mdc_thread_key);
109         LASSERT(mci);
110         return mci;
111 }
112
113 static
114 struct mdc_thread_info *mdc_info_init(const struct lu_env *env)
115 {
116         struct mdc_thread_info *mci = mdc_info_get(env);
117         memset(mci, 0, sizeof(*mci));
118         return mci;
119 }
120
121 static void mdc_body2attr(struct mdt_body *body, struct md_attr *ma)
122 {
123         struct lu_attr *la = &ma->ma_attr;
124         /* update time */
125         if (body->valid & OBD_MD_FLCTIME && body->ctime >= la->la_ctime) {
126                 la->la_ctime = body->ctime;
127                 if (body->valid & OBD_MD_FLMTIME)
128                         la->la_mtime = body->mtime;
129         }
130
131         if (body->valid & OBD_MD_FLMODE)
132                 la->la_mode = body->mode;
133         if (body->valid & OBD_MD_FLSIZE)
134                 la->la_size = body->size;
135         if (body->valid & OBD_MD_FLBLOCKS)
136                 la->la_blocks = body->blocks;
137         if (body->valid & OBD_MD_FLUID)
138                 la->la_uid = body->uid;
139         if (body->valid & OBD_MD_FLGID)
140                 la->la_gid = body->gid;
141         if (body->valid & OBD_MD_FLFLAGS)
142                 la->la_flags = body->flags;
143         if (body->valid & OBD_MD_FLNLINK)
144                 la->la_nlink = body->nlink;
145         if (body->valid & OBD_MD_FLRDEV)
146                 la->la_rdev = body->rdev;
147
148         la->la_valid = body->valid;
149         ma->ma_valid = MA_INODE;
150 }
151
152 static int mdc_req2attr_update(const struct lu_env *env,
153                                struct md_attr *ma)
154 {
155         struct mdc_thread_info *mci;
156         struct ptlrpc_request *req;
157         struct mdt_body *body;
158         struct lov_mds_md *md;
159         struct llog_cookie *cookie;
160         void *acl;
161
162         ENTRY;
163         mci = mdc_info_get(env);
164         req = mci->mci_req;
165         LASSERT(req);
166         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
167         LASSERT(body);
168         mdc_body2attr(body, ma);
169
170         if (body->valid & OBD_MD_FLMDSCAPA) {
171                 struct lustre_capa *capa;
172
173                 /* create for cross-ref will fetch mds capa from remote obj */
174                 capa = req_capsule_server_get(&req->rq_pill, &RMF_CAPA1);
175                 LASSERT(capa != NULL);
176                 LASSERT(ma->ma_capa != NULL);
177                 *ma->ma_capa = *capa;
178         }
179
180         if ((body->valid & OBD_MD_FLEASIZE) || (body->valid & OBD_MD_FLDIREA)) {
181                 if (body->eadatasize == 0) {
182                         CERROR("No size defined for easize field\n");
183                         RETURN(-EPROTO);
184                 }
185
186                 md = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD,
187                                                   body->eadatasize);
188                 if (md == NULL)
189                         RETURN(-EPROTO);
190
191                 LASSERT(ma->ma_lmm != NULL);
192                 LASSERT(ma->ma_lmm_size >= body->eadatasize);
193                 ma->ma_lmm_size = body->eadatasize;
194                 memcpy(ma->ma_lmm, md, ma->ma_lmm_size);
195                 ma->ma_valid |= MA_LOV;
196         }
197
198         if (body->valid & OBD_MD_FLCOOKIE) {
199                 /*
200                  * ACL and cookie share the same body->aclsize, we need
201                  * to make sure that they both never come here.
202                  */
203                 LASSERT(!(body->valid & OBD_MD_FLACL));
204
205                 if (body->aclsize == 0) {
206                         CERROR("No size defined for cookie field\n");
207                         RETURN(-EPROTO);
208                 }
209
210                 cookie = req_capsule_server_sized_get(&req->rq_pill,
211                                                       &RMF_LOGCOOKIES,
212                                                       body->aclsize);
213                 if (cookie == NULL)
214                         RETURN(-EPROTO);
215
216                 LASSERT(ma->ma_cookie != NULL);
217                 LASSERT(ma->ma_cookie_size == body->aclsize);
218                 memcpy(ma->ma_cookie, cookie, ma->ma_cookie_size);
219                 ma->ma_valid |= MA_COOKIE;
220         }
221
222 #ifdef CONFIG_FS_POSIX_ACL
223         if (body->valid & OBD_MD_FLACL) {
224                 if (body->aclsize == 0) {
225                         CERROR("No size defined for acl field\n");
226                         RETURN(-EPROTO);
227                 }
228
229                 acl = req_capsule_server_sized_get(&req->rq_pill,
230                                                    &RMF_ACL,
231                                                    body->aclsize);
232                 if (acl == NULL)
233                         RETURN(-EPROTO);
234
235                 LASSERT(ma->ma_acl != NULL);
236                 LASSERT(ma->ma_acl_size == body->aclsize);
237                 memcpy(ma->ma_acl, acl, ma->ma_acl_size);
238                 ma->ma_valid |= MA_ACL_DEF;
239         }
240 #endif
241
242         RETURN(0);
243 }
244
245 static int mdc_attr_get(const struct lu_env *env, struct md_object *mo,
246                         struct md_attr *ma)
247 {
248         struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
249         struct mdc_thread_info *mci;
250         int rc;
251         ENTRY;
252
253         mci = lu_context_key_get(&env->le_ctx, &mdc_thread_key);
254         LASSERT(mci);
255
256         memset(&mci->mci_opdata, 0, sizeof(mci->mci_opdata));
257
258         memcpy(&mci->mci_opdata.op_fid1, lu_object_fid(&mo->mo_lu),
259                sizeof (struct lu_fid));
260         mci->mci_opdata.op_valid = OBD_MD_FLMODE | OBD_MD_FLUID |
261                                    OBD_MD_FLGID | OBD_MD_FLFLAGS |
262                                    OBD_MD_FLCROSSREF;
263
264         rc = md_getattr(mc->mc_desc.cl_exp, &mci->mci_opdata, &mci->mci_req);
265         if (rc == 0) {
266                 /* get attr from request */
267                 rc = mdc_req2attr_update(env, ma);
268         }
269
270         ptlrpc_req_finished(mci->mci_req);
271
272         RETURN(rc);
273 }
274
275 static inline struct timespec *mdc_attr_time(struct timespec *t, __u64 seconds)
276 {
277         t->tv_sec = seconds;
278         t->tv_nsec = 0;
279         return t;
280 }
281
282 /*
283  * XXX: It is only used for set ctime when rename's source on remote MDS.
284  */
285 static int mdc_attr_set(const struct lu_env *env, struct md_object *mo,
286                         const struct md_attr *ma)
287 {
288         struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
289         const struct lu_attr *la = &ma->ma_attr;
290         struct mdc_thread_info *mci;
291         struct md_ucred *uc = md_ucred(env);
292         int rc;
293         ENTRY;
294
295         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
296
297         mci = lu_context_key_get(&env->le_ctx, &mdc_thread_key);
298         LASSERT(mci);
299
300         memset(&mci->mci_opdata, 0, sizeof(mci->mci_opdata));
301
302         mci->mci_opdata.op_fid1 = *lu_object_fid(&mo->mo_lu);
303         mdc_attr_time(&mci->mci_opdata.op_attr.ia_ctime, la->la_ctime);
304         mci->mci_opdata.op_attr.ia_mode = la->la_mode;
305         mci->mci_opdata.op_attr.ia_valid = ATTR_CTIME_SET;
306         if (uc &&
307             ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) {
308                 mci->mci_opdata.op_fsuid = uc->mu_fsuid;
309                 mci->mci_opdata.op_fsgid = uc->mu_fsgid;
310                 mci->mci_opdata.op_cap = uc->mu_cap;
311                 if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD)) {
312                         mci->mci_opdata.op_suppgids[0] = uc->mu_suppgids[0];
313                         mci->mci_opdata.op_suppgids[1] = uc->mu_suppgids[1];
314                 } else {
315                         mci->mci_opdata.op_suppgids[0] =
316                                 mci->mci_opdata.op_suppgids[1] = -1;
317                 }
318         } else {
319                 mci->mci_opdata.op_fsuid = la->la_uid;
320                 mci->mci_opdata.op_fsgid = la->la_gid;
321                 mci->mci_opdata.op_cap = cfs_curproc_cap_pack();
322                 mci->mci_opdata.op_suppgids[0] =
323                                 mci->mci_opdata.op_suppgids[1] = -1;
324         }
325
326         rc = md_setattr(mc->mc_desc.cl_exp, &mci->mci_opdata,
327                         NULL, 0, NULL, 0, &mci->mci_req, NULL);
328
329         ptlrpc_req_finished(mci->mci_req);
330
331         RETURN(rc);
332 }
333
334 static int mdc_object_create(const struct lu_env *env,
335                              struct md_object *mo,
336                              const struct md_op_spec *spec,
337                              struct md_attr *ma)
338 {
339         struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
340         struct lu_attr *la = &ma->ma_attr;
341         struct mdc_thread_info *mci;
342         const void *symname;
343         struct md_ucred *uc = md_ucred(env);
344         int rc, symlen;
345         uid_t uid;
346         gid_t gid;
347         cfs_cap_t cap;
348         ENTRY;
349
350         LASSERT(S_ISDIR(la->la_mode));
351         LASSERT(spec->u.sp_pfid != NULL);
352
353         mci = mdc_info_init(env);
354         mci->mci_opdata.op_bias = MDS_CROSS_REF;
355         mci->mci_opdata.op_fid2 = *lu_object_fid(&mo->mo_lu);
356
357         /* Parent fid is needed to create dotdot on the remote node. */
358         mci->mci_opdata.op_fid1 = *(spec->u.sp_pfid);
359         mci->mci_opdata.op_mod_time = la->la_ctime;
360         if (uc &&
361             ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) {
362                 uid = uc->mu_fsuid;
363                 if (la->la_mode & S_ISGID)
364                         gid = la->la_gid;
365                 else
366                         gid = uc->mu_fsgid;
367                 cap = uc->mu_cap;
368                 if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD))
369                         mci->mci_opdata.op_suppgids[0] = uc->mu_suppgids[0];
370                 else
371                         mci->mci_opdata.op_suppgids[0] = -1;
372         } else {
373                 uid = la->la_uid;
374                 gid = la->la_gid;
375                 cap = 0;
376                 mci->mci_opdata.op_suppgids[0] = -1;
377         }
378
379         /* get data from spec */
380         if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
381                 symname = spec->u.sp_ea.eadata;
382                 symlen = spec->u.sp_ea.eadatalen;
383                 mci->mci_opdata.op_fid1 = *(spec->u.sp_ea.fid);
384                 mci->mci_opdata.op_flags |= MDS_CREATE_SLAVE_OBJ;
385 #ifdef CONFIG_FS_POSIX_ACL
386         } else if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
387                 symname = spec->u.sp_ea.eadata;
388                 symlen = spec->u.sp_ea.eadatalen;
389                 mci->mci_opdata.op_fid1 = *(spec->u.sp_ea.fid);
390                 mci->mci_opdata.op_flags |= MDS_CREATE_RMT_ACL;
391 #endif
392         } else {
393                 symname = spec->u.sp_symname;
394                 symlen = symname ? strlen(symname) + 1 : 0;
395         }
396
397         rc = md_create(mc->mc_desc.cl_exp, &mci->mci_opdata,
398                        symname, symlen, la->la_mode, uid, gid,
399                        cap, la->la_rdev, &mci->mci_req);
400
401         if (rc == 0) {
402                 /* get attr from request */
403                 rc = mdc_req2attr_update(env, ma);
404         }
405
406         ptlrpc_req_finished(mci->mci_req);
407
408         RETURN(rc);
409 }
410
411 static int mdc_ref_add(const struct lu_env *env, struct md_object *mo,
412                        const struct md_attr *ma)
413 {
414         struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
415         const struct lu_attr *la = &ma->ma_attr;
416         struct mdc_thread_info *mci;
417         struct md_ucred *uc = md_ucred(env);
418         int rc;
419         ENTRY;
420
421         mci = lu_context_key_get(&env->le_ctx, &mdc_thread_key);
422         LASSERT(mci);
423
424         memset(&mci->mci_opdata, 0, sizeof(mci->mci_opdata));
425         mci->mci_opdata.op_bias = MDS_CROSS_REF;
426         mci->mci_opdata.op_fid1 = *lu_object_fid(&mo->mo_lu);
427         mci->mci_opdata.op_mod_time = la->la_ctime;
428         if (uc &&
429             ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) {
430                 mci->mci_opdata.op_fsuid = uc->mu_fsuid;
431                 mci->mci_opdata.op_fsgid = uc->mu_fsgid;
432                 mci->mci_opdata.op_cap = uc->mu_cap;
433                 if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD)) {
434                         mci->mci_opdata.op_suppgids[0] = uc->mu_suppgids[0];
435                         mci->mci_opdata.op_suppgids[1] = uc->mu_suppgids[1];
436                 } else {
437                         mci->mci_opdata.op_suppgids[0] =
438                                 mci->mci_opdata.op_suppgids[1] = -1;
439                 }
440         } else {
441                 mci->mci_opdata.op_fsuid = la->la_uid;
442                 mci->mci_opdata.op_fsgid = la->la_gid;
443                 mci->mci_opdata.op_cap = cfs_curproc_cap_pack();
444                 mci->mci_opdata.op_suppgids[0] =
445                                 mci->mci_opdata.op_suppgids[1] = -1;
446         }
447
448
449         rc = md_link(mc->mc_desc.cl_exp, &mci->mci_opdata, &mci->mci_req);
450
451         ptlrpc_req_finished(mci->mci_req);
452
453         RETURN(rc);
454 }
455
456 static int mdc_ref_del(const struct lu_env *env, struct md_object *mo,
457                        struct md_attr *ma)
458 {
459         struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
460         struct lu_attr *la = &ma->ma_attr;
461         struct mdc_thread_info *mci;
462         struct md_ucred *uc = md_ucred(env);
463         int rc;
464         ENTRY;
465
466         mci = mdc_info_init(env);
467         mci->mci_opdata.op_bias = MDS_CROSS_REF;
468         if (ma->ma_attr_flags & MDS_VTX_BYPASS)
469                 mci->mci_opdata.op_bias |= MDS_VTX_BYPASS;
470         else
471                 mci->mci_opdata.op_bias &= ~MDS_VTX_BYPASS;
472         mci->mci_opdata.op_fid1 = *lu_object_fid(&mo->mo_lu);
473         mci->mci_opdata.op_mode = la->la_mode;
474         mci->mci_opdata.op_mod_time = la->la_ctime;
475         if (uc &&
476             ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) {
477                 mci->mci_opdata.op_fsuid = uc->mu_fsuid;
478                 mci->mci_opdata.op_fsgid = uc->mu_fsgid;
479                 mci->mci_opdata.op_cap = uc->mu_cap;
480                 if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD))
481                         mci->mci_opdata.op_suppgids[0] = uc->mu_suppgids[0];
482                 else
483                         mci->mci_opdata.op_suppgids[0] = -1;
484         } else {
485                 mci->mci_opdata.op_fsuid = la->la_uid;
486                 mci->mci_opdata.op_fsgid = la->la_gid;
487                 mci->mci_opdata.op_cap = cfs_curproc_cap_pack();
488                 mci->mci_opdata.op_suppgids[0] = -1;
489         }
490
491         rc = md_unlink(mc->mc_desc.cl_exp, &mci->mci_opdata, &mci->mci_req);
492         if (rc == 0) {
493                 /* get attr from request */
494                 rc = mdc_req2attr_update(env, ma);
495         }
496
497         ptlrpc_req_finished(mci->mci_req);
498
499         RETURN(rc);
500 }
501
502 #ifdef HAVE_SPLIT_SUPPORT
503 int mdc_send_page(struct cmm_device *cm, const struct lu_env *env,
504                   struct md_object *mo, struct page *page, __u32 offset)
505 {
506         struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
507         int rc;
508         ENTRY;
509
510         rc = mdc_sendpage(mc->mc_desc.cl_exp, lu_object_fid(&mo->mo_lu),
511                           page, offset);
512         CDEBUG(rc ? D_ERROR : D_INFO, "send page %p  offset %d fid "DFID
513                " rc %d \n", page, offset, PFID(lu_object_fid(&mo->mo_lu)), rc);
514         RETURN(rc);
515 }
516 #endif
517
518 static const struct md_object_operations mdc_mo_ops = {
519         .moo_attr_get       = mdc_attr_get,
520         .moo_attr_set       = mdc_attr_set,
521         .moo_object_create  = mdc_object_create,
522         .moo_ref_add        = mdc_ref_add,
523         .moo_ref_del        = mdc_ref_del,
524 };
525
526 /* md_dir_operations */
527 static int mdc_rename_tgt(const struct lu_env *env, struct md_object *mo_p,
528                           struct md_object *mo_t, const struct lu_fid *lf,
529                           const struct lu_name *lname, struct md_attr *ma)
530 {
531         struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo_p));
532         struct lu_attr *la = &ma->ma_attr;
533         struct mdc_thread_info *mci;
534         struct md_ucred *uc = md_ucred(env);
535         int rc;
536         ENTRY;
537
538         mci = mdc_info_init(env);
539         mci->mci_opdata.op_bias = MDS_CROSS_REF;
540         if (ma->ma_attr_flags & MDS_VTX_BYPASS)
541                 mci->mci_opdata.op_bias |= MDS_VTX_BYPASS;
542         else
543                 mci->mci_opdata.op_bias &= ~MDS_VTX_BYPASS;
544         mci->mci_opdata.op_fid1 = *lu_object_fid(&mo_p->mo_lu);
545         mci->mci_opdata.op_fid2 = *lf;
546         mci->mci_opdata.op_mode = la->la_mode;
547         mci->mci_opdata.op_mod_time = la->la_ctime;
548         if (uc &&
549             ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) {
550                 mci->mci_opdata.op_fsuid = uc->mu_fsuid;
551                 mci->mci_opdata.op_fsgid = uc->mu_fsgid;
552                 mci->mci_opdata.op_cap = uc->mu_cap;
553                 if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD)) {
554                         mci->mci_opdata.op_suppgids[0] = uc->mu_suppgids[0];
555                         mci->mci_opdata.op_suppgids[1] = uc->mu_suppgids[1];
556                 } else {
557                         mci->mci_opdata.op_suppgids[0] =
558                                 mci->mci_opdata.op_suppgids[1] = -1;
559                 }
560         } else {
561                 mci->mci_opdata.op_fsuid = la->la_uid;
562                 mci->mci_opdata.op_fsgid = la->la_gid;
563                 mci->mci_opdata.op_cap = cfs_curproc_cap_pack();
564                 mci->mci_opdata.op_suppgids[0] =
565                                 mci->mci_opdata.op_suppgids[1] = -1;
566         }
567
568         rc = md_rename(mc->mc_desc.cl_exp, &mci->mci_opdata, NULL, 0,
569                        lname->ln_name, lname->ln_namelen, &mci->mci_req);
570         if (rc == 0) {
571                 /* get attr from request */
572                 mdc_req2attr_update(env, ma);
573         }
574
575         ptlrpc_req_finished(mci->mci_req);
576
577         RETURN(rc);
578 }
579 /*
580  * Return resulting fid in sfid
581  * 0: fids are not relatives
582  * fid: fid at which search stopped
583  */
584 static int mdc_is_subdir(const struct lu_env *env, struct md_object *mo,
585                          const struct lu_fid *fid, struct lu_fid *sfid)
586 {
587         struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
588         struct mdc_thread_info *mci;
589         struct mdt_body *body;
590         int rc;
591         ENTRY;
592
593         mci = mdc_info_init(env);
594
595         rc = md_is_subdir(mc->mc_desc.cl_exp, lu_object_fid(&mo->mo_lu),
596                           fid, &mci->mci_req);
597         if (rc == 0 || rc == -EREMOTE) {
598                 body = req_capsule_server_get(&mci->mci_req->rq_pill,
599                                               &RMF_MDT_BODY);
600                 LASSERT(body->valid & OBD_MD_FLID);
601
602                 CDEBUG(D_INFO, "Remote mdo_is_subdir(), new src "DFID"\n",
603                        PFID(&body->fid1));
604                 *sfid = body->fid1;
605         }
606         ptlrpc_req_finished(mci->mci_req);
607         RETURN(rc);
608 }
609
610 static const struct md_dir_operations mdc_dir_ops = {
611         .mdo_is_subdir   = mdc_is_subdir,
612         .mdo_rename_tgt  = mdc_rename_tgt
613 };