4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
31 * This file is part of Lustre, http://www.lustre.org/
32 * Lustre is a trademark of Sun Microsystems, Inc.
34 * lustre/cmm/mdc_object.c
36 * Lustre Cluster Metadata Manager (cmm)
38 * Author: Mike Pershin <tappro@clusterfs.com>
41 #define DEBUG_SUBSYSTEM S_MDS
42 #include <obd_support.h>
43 #include <lustre_lib.h>
44 #include <obd_class.h>
45 #include <lustre_mdc.h>
46 #include "cmm_internal.h"
47 #include "mdc_internal.h"
49 static const struct md_object_operations mdc_mo_ops;
50 static const struct md_dir_operations mdc_dir_ops;
51 static const struct lu_object_operations mdc_obj_ops;
53 extern struct lu_context_key mdc_thread_key;
59 * Allocate new mdc object.
61 struct lu_object *mdc_object_alloc(const struct lu_env *env,
62 const struct lu_object_header *hdr,
65 struct mdc_object *mco;
72 lo = &mco->mco_obj.mo_lu;
73 lu_object_init(lo, NULL, ld);
74 mco->mco_obj.mo_ops = &mdc_mo_ops;
75 mco->mco_obj.mo_dir_ops = &mdc_dir_ops;
76 lo->lo_ops = &mdc_obj_ops;
82 /** Free current mdc object */
83 static void mdc_object_free(const struct lu_env *env, struct lu_object *lo)
85 struct mdc_object *mco = lu2mdc_obj(lo);
91 * Initialize mdc object. All of them have loh_attr::LOHA_REMOTE set.
93 static int mdc_object_init(const struct lu_env *env, struct lu_object *lo,
94 const struct lu_object_conf *unused)
97 lo->lo_header->loh_attr |= LOHA_REMOTE;
102 * Instance of lu_object_operations for mdc.
104 static const struct lu_object_operations mdc_obj_ops = {
105 .loo_object_init = mdc_object_init,
106 .loo_object_free = mdc_object_free,
110 * \name The set of md_object_operations.
114 * Get mdc_thread_info from lu_context
117 struct mdc_thread_info *mdc_info_get(const struct lu_env *env)
119 struct mdc_thread_info *mci;
121 mci = lu_context_key_get(&env->le_ctx, &mdc_thread_key);
127 * Initialize mdc_thread_info.
130 struct mdc_thread_info *mdc_info_init(const struct lu_env *env)
132 struct mdc_thread_info *mci = mdc_info_get(env);
133 memset(mci, 0, sizeof(*mci));
138 * Convert attributes from mdt_body to the md_attr.
140 static void mdc_body2attr(struct mdt_body *body, struct md_attr *ma)
142 struct lu_attr *la = &ma->ma_attr;
144 if (body->valid & OBD_MD_FLCTIME && body->ctime >= la->la_ctime) {
145 la->la_ctime = body->ctime;
146 if (body->valid & OBD_MD_FLMTIME)
147 la->la_mtime = body->mtime;
150 if (body->valid & OBD_MD_FLMODE)
151 la->la_mode = body->mode;
152 if (body->valid & OBD_MD_FLSIZE)
153 la->la_size = body->size;
154 if (body->valid & OBD_MD_FLBLOCKS)
155 la->la_blocks = body->blocks;
156 if (body->valid & OBD_MD_FLUID)
157 la->la_uid = body->uid;
158 if (body->valid & OBD_MD_FLGID)
159 la->la_gid = body->gid;
160 if (body->valid & OBD_MD_FLFLAGS)
161 la->la_flags = body->flags;
162 if (body->valid & OBD_MD_FLNLINK)
163 la->la_nlink = body->nlink;
164 if (body->valid & OBD_MD_FLRDEV)
165 la->la_rdev = body->rdev;
167 la->la_valid = body->valid;
168 ma->ma_valid = MA_INODE;
172 * Fill the md_attr \a ma with attributes from request.
174 static int mdc_req2attr_update(const struct lu_env *env,
177 struct mdc_thread_info *mci;
178 struct ptlrpc_request *req;
179 struct mdt_body *body;
180 struct lov_mds_md *md;
181 struct llog_cookie *cookie;
185 mci = mdc_info_get(env);
188 body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
190 mdc_body2attr(body, ma);
192 if (body->valid & OBD_MD_FLMDSCAPA) {
193 struct lustre_capa *capa;
195 /* create for cross-ref will fetch mds capa from remote obj */
196 capa = req_capsule_server_get(&req->rq_pill, &RMF_CAPA1);
197 LASSERT(capa != NULL);
198 LASSERT(ma->ma_capa != NULL);
199 *ma->ma_capa = *capa;
202 if ((body->valid & OBD_MD_FLEASIZE) || (body->valid & OBD_MD_FLDIREA)) {
203 if (body->eadatasize == 0) {
204 CERROR("No size defined for easize field\n");
208 md = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD,
213 LASSERT(ma->ma_lmm != NULL);
214 LASSERT(ma->ma_lmm_size >= body->eadatasize);
215 ma->ma_lmm_size = body->eadatasize;
216 memcpy(ma->ma_lmm, md, ma->ma_lmm_size);
217 ma->ma_valid |= MA_LOV;
220 if (body->valid & OBD_MD_FLCOOKIE) {
222 * ACL and cookie share the same body->aclsize, we need
223 * to make sure that they both never come here.
225 LASSERT(!(body->valid & OBD_MD_FLACL));
227 if (body->aclsize == 0) {
228 CERROR("No size defined for cookie field\n");
232 cookie = req_capsule_server_sized_get(&req->rq_pill,
238 LASSERT(ma->ma_cookie != NULL);
239 LASSERT(ma->ma_cookie_size == body->aclsize);
240 memcpy(ma->ma_cookie, cookie, ma->ma_cookie_size);
241 ma->ma_valid |= MA_COOKIE;
244 #ifdef CONFIG_FS_POSIX_ACL
245 if (body->valid & OBD_MD_FLACL) {
246 if (body->aclsize == 0) {
247 CERROR("No size defined for acl field\n");
251 acl = req_capsule_server_sized_get(&req->rq_pill,
257 LASSERT(ma->ma_acl != NULL);
258 LASSERT(ma->ma_acl_size == body->aclsize);
259 memcpy(ma->ma_acl, acl, ma->ma_acl_size);
260 ma->ma_valid |= MA_ACL_DEF;
268 * The md_object_operations::moo_attr_get() in mdc.
270 static int mdc_attr_get(const struct lu_env *env, struct md_object *mo,
273 struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
274 struct mdc_thread_info *mci;
278 mci = lu_context_key_get(&env->le_ctx, &mdc_thread_key);
281 memset(&mci->mci_opdata, 0, sizeof(mci->mci_opdata));
283 memcpy(&mci->mci_opdata.op_fid1, lu_object_fid(&mo->mo_lu),
284 sizeof (struct lu_fid));
285 mci->mci_opdata.op_valid = OBD_MD_FLMODE | OBD_MD_FLUID |
286 OBD_MD_FLGID | OBD_MD_FLFLAGS |
289 rc = md_getattr(mc->mc_desc.cl_exp, &mci->mci_opdata, &mci->mci_req);
291 /* get attr from request */
292 rc = mdc_req2attr_update(env, ma);
295 ptlrpc_req_finished(mci->mci_req);
301 * Helper to init timspec \a t.
303 static inline struct timespec *mdc_attr_time(struct timespec *t, obd_time seconds)
311 * The md_object_operations::moo_attr_set() in mdc.
313 * \note It is only used for set ctime when rename's source on remote MDS.
315 static int mdc_attr_set(const struct lu_env *env, struct md_object *mo,
316 const struct md_attr *ma)
318 struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
319 const struct lu_attr *la = &ma->ma_attr;
320 struct mdc_thread_info *mci;
321 struct md_ucred *uc = md_ucred(env);
325 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
327 mci = lu_context_key_get(&env->le_ctx, &mdc_thread_key);
330 memset(&mci->mci_opdata, 0, sizeof(mci->mci_opdata));
332 mci->mci_opdata.op_fid1 = *lu_object_fid(&mo->mo_lu);
333 mdc_attr_time(&mci->mci_opdata.op_attr.ia_ctime, la->la_ctime);
334 mci->mci_opdata.op_attr.ia_mode = la->la_mode;
335 mci->mci_opdata.op_attr.ia_valid = ATTR_CTIME_SET;
337 ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) {
338 mci->mci_opdata.op_fsuid = uc->mu_fsuid;
339 mci->mci_opdata.op_fsgid = uc->mu_fsgid;
340 mci->mci_opdata.op_cap = uc->mu_cap;
341 if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD)) {
342 mci->mci_opdata.op_suppgids[0] = uc->mu_suppgids[0];
343 mci->mci_opdata.op_suppgids[1] = uc->mu_suppgids[1];
345 mci->mci_opdata.op_suppgids[0] =
346 mci->mci_opdata.op_suppgids[1] = -1;
349 mci->mci_opdata.op_fsuid = la->la_uid;
350 mci->mci_opdata.op_fsgid = la->la_gid;
351 mci->mci_opdata.op_cap = cfs_curproc_cap_pack();
352 mci->mci_opdata.op_suppgids[0] =
353 mci->mci_opdata.op_suppgids[1] = -1;
356 rc = md_setattr(mc->mc_desc.cl_exp, &mci->mci_opdata,
357 NULL, 0, NULL, 0, &mci->mci_req, NULL);
359 ptlrpc_req_finished(mci->mci_req);
365 * The md_object_operations::moo_object_create() in mdc.
367 static int mdc_object_create(const struct lu_env *env,
368 struct md_object *mo,
369 const struct md_op_spec *spec,
372 struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
373 struct lu_attr *la = &ma->ma_attr;
374 struct mdc_thread_info *mci;
376 struct md_ucred *uc = md_ucred(env);
383 LASSERT(S_ISDIR(la->la_mode));
384 LASSERT(spec->u.sp_pfid != NULL);
386 mci = mdc_info_init(env);
387 mci->mci_opdata.op_bias = MDS_CROSS_REF;
388 mci->mci_opdata.op_fid2 = *lu_object_fid(&mo->mo_lu);
390 /* Parent fid is needed to create dotdot on the remote node. */
391 mci->mci_opdata.op_fid1 = *(spec->u.sp_pfid);
392 mci->mci_opdata.op_mod_time = la->la_ctime;
394 ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) {
396 if (la->la_mode & S_ISGID)
401 if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD))
402 mci->mci_opdata.op_suppgids[0] = uc->mu_suppgids[0];
404 mci->mci_opdata.op_suppgids[0] = -1;
409 mci->mci_opdata.op_suppgids[0] = -1;
412 /* get data from spec */
413 if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
414 symname = spec->u.sp_ea.eadata;
415 symlen = spec->u.sp_ea.eadatalen;
416 mci->mci_opdata.op_fid1 = *(spec->u.sp_ea.fid);
417 mci->mci_opdata.op_flags |= MDS_CREATE_SLAVE_OBJ;
418 #ifdef CONFIG_FS_POSIX_ACL
419 } else if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
420 symname = spec->u.sp_ea.eadata;
421 symlen = spec->u.sp_ea.eadatalen;
422 mci->mci_opdata.op_fid1 = *(spec->u.sp_ea.fid);
423 mci->mci_opdata.op_flags |= MDS_CREATE_RMT_ACL;
426 symname = spec->u.sp_symname;
427 symlen = symname ? strlen(symname) + 1 : 0;
430 rc = md_create(mc->mc_desc.cl_exp, &mci->mci_opdata,
431 symname, symlen, la->la_mode, uid, gid,
432 cap, la->la_rdev, &mci->mci_req);
435 /* get attr from request */
436 rc = mdc_req2attr_update(env, ma);
439 ptlrpc_req_finished(mci->mci_req);
445 * The md_object_operations::moo_ref_add() in mdc.
447 static int mdc_ref_add(const struct lu_env *env, struct md_object *mo,
448 const struct md_attr *ma)
450 struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
451 const struct lu_attr *la = &ma->ma_attr;
452 struct mdc_thread_info *mci;
453 struct md_ucred *uc = md_ucred(env);
457 mci = lu_context_key_get(&env->le_ctx, &mdc_thread_key);
460 memset(&mci->mci_opdata, 0, sizeof(mci->mci_opdata));
461 mci->mci_opdata.op_bias = MDS_CROSS_REF;
462 mci->mci_opdata.op_fid1 = *lu_object_fid(&mo->mo_lu);
463 mci->mci_opdata.op_mod_time = la->la_ctime;
465 ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) {
466 mci->mci_opdata.op_fsuid = uc->mu_fsuid;
467 mci->mci_opdata.op_fsgid = uc->mu_fsgid;
468 mci->mci_opdata.op_cap = uc->mu_cap;
469 if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD)) {
470 mci->mci_opdata.op_suppgids[0] = uc->mu_suppgids[0];
471 mci->mci_opdata.op_suppgids[1] = uc->mu_suppgids[1];
473 mci->mci_opdata.op_suppgids[0] =
474 mci->mci_opdata.op_suppgids[1] = -1;
477 mci->mci_opdata.op_fsuid = la->la_uid;
478 mci->mci_opdata.op_fsgid = la->la_gid;
479 mci->mci_opdata.op_cap = cfs_curproc_cap_pack();
480 mci->mci_opdata.op_suppgids[0] =
481 mci->mci_opdata.op_suppgids[1] = -1;
485 rc = md_link(mc->mc_desc.cl_exp, &mci->mci_opdata, &mci->mci_req);
487 ptlrpc_req_finished(mci->mci_req);
493 * The md_object_operations::moo_ref_del() in mdc.
495 static int mdc_ref_del(const struct lu_env *env, struct md_object *mo,
498 struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
499 struct lu_attr *la = &ma->ma_attr;
500 struct mdc_thread_info *mci;
501 struct md_ucred *uc = md_ucred(env);
505 mci = mdc_info_init(env);
506 mci->mci_opdata.op_bias = MDS_CROSS_REF;
507 if (ma->ma_attr_flags & MDS_VTX_BYPASS)
508 mci->mci_opdata.op_bias |= MDS_VTX_BYPASS;
510 mci->mci_opdata.op_bias &= ~MDS_VTX_BYPASS;
511 mci->mci_opdata.op_fid1 = *lu_object_fid(&mo->mo_lu);
512 mci->mci_opdata.op_mode = la->la_mode;
513 mci->mci_opdata.op_mod_time = la->la_ctime;
515 ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) {
516 mci->mci_opdata.op_fsuid = uc->mu_fsuid;
517 mci->mci_opdata.op_fsgid = uc->mu_fsgid;
518 mci->mci_opdata.op_cap = uc->mu_cap;
519 if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD))
520 mci->mci_opdata.op_suppgids[0] = uc->mu_suppgids[0];
522 mci->mci_opdata.op_suppgids[0] = -1;
524 mci->mci_opdata.op_fsuid = la->la_uid;
525 mci->mci_opdata.op_fsgid = la->la_gid;
526 mci->mci_opdata.op_cap = cfs_curproc_cap_pack();
527 mci->mci_opdata.op_suppgids[0] = -1;
530 rc = md_unlink(mc->mc_desc.cl_exp, &mci->mci_opdata, &mci->mci_req);
532 /* get attr from request */
533 rc = mdc_req2attr_update(env, ma);
536 ptlrpc_req_finished(mci->mci_req);
541 #ifdef HAVE_SPLIT_SUPPORT
542 /** Send page with directory entries to another MDS. */
543 int mdc_send_page(struct cmm_device *cm, const struct lu_env *env,
544 struct md_object *mo, struct page *page, __u32 offset)
546 struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
550 rc = mdc_sendpage(mc->mc_desc.cl_exp, lu_object_fid(&mo->mo_lu),
552 CDEBUG(rc ? D_ERROR : D_INFO, "send page %p offset %d fid "DFID
553 " rc %d \n", page, offset, PFID(lu_object_fid(&mo->mo_lu)), rc);
559 * Instance of md_object_operations for mdc.
561 static const struct md_object_operations mdc_mo_ops = {
562 .moo_attr_get = mdc_attr_get,
563 .moo_attr_set = mdc_attr_set,
564 .moo_object_create = mdc_object_create,
565 .moo_ref_add = mdc_ref_add,
566 .moo_ref_del = mdc_ref_del,
571 * \name The set of md_dir_operations.
575 * The md_dir_operations::mdo_rename_tgt in mdc.
577 static int mdc_rename_tgt(const struct lu_env *env, struct md_object *mo_p,
578 struct md_object *mo_t, const struct lu_fid *lf,
579 const struct lu_name *lname, struct md_attr *ma)
581 struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo_p));
582 struct lu_attr *la = &ma->ma_attr;
583 struct mdc_thread_info *mci;
584 struct md_ucred *uc = md_ucred(env);
588 mci = mdc_info_init(env);
589 mci->mci_opdata.op_bias = MDS_CROSS_REF;
590 if (ma->ma_attr_flags & MDS_VTX_BYPASS)
591 mci->mci_opdata.op_bias |= MDS_VTX_BYPASS;
593 mci->mci_opdata.op_bias &= ~MDS_VTX_BYPASS;
594 mci->mci_opdata.op_fid1 = *lu_object_fid(&mo_p->mo_lu);
595 mci->mci_opdata.op_fid2 = *lf;
596 mci->mci_opdata.op_mode = la->la_mode;
597 mci->mci_opdata.op_mod_time = la->la_ctime;
599 ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) {
600 mci->mci_opdata.op_fsuid = uc->mu_fsuid;
601 mci->mci_opdata.op_fsgid = uc->mu_fsgid;
602 mci->mci_opdata.op_cap = uc->mu_cap;
603 if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD)) {
604 mci->mci_opdata.op_suppgids[0] = uc->mu_suppgids[0];
605 mci->mci_opdata.op_suppgids[1] = uc->mu_suppgids[1];
607 mci->mci_opdata.op_suppgids[0] =
608 mci->mci_opdata.op_suppgids[1] = -1;
611 mci->mci_opdata.op_fsuid = la->la_uid;
612 mci->mci_opdata.op_fsgid = la->la_gid;
613 mci->mci_opdata.op_cap = cfs_curproc_cap_pack();
614 mci->mci_opdata.op_suppgids[0] =
615 mci->mci_opdata.op_suppgids[1] = -1;
618 rc = md_rename(mc->mc_desc.cl_exp, &mci->mci_opdata, NULL, 0,
619 lname->ln_name, lname->ln_namelen, &mci->mci_req);
621 /* get attr from request */
622 mdc_req2attr_update(env, ma);
625 ptlrpc_req_finished(mci->mci_req);
630 * Check the fids are not relatives.
631 * The md_dir_operations::mdo_is_subdir() in mdc.
633 * Return resulting fid in sfid.
634 * \retval \a sfid = 0 fids are not relatives
635 * \retval \a sfid = FID at which search stopped
637 static int mdc_is_subdir(const struct lu_env *env, struct md_object *mo,
638 const struct lu_fid *fid, struct lu_fid *sfid)
640 struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
641 struct mdc_thread_info *mci;
642 struct mdt_body *body;
646 mci = mdc_info_init(env);
648 rc = md_is_subdir(mc->mc_desc.cl_exp, lu_object_fid(&mo->mo_lu),
650 if (rc == 0 || rc == -EREMOTE) {
651 body = req_capsule_server_get(&mci->mci_req->rq_pill,
653 LASSERT(body->valid & OBD_MD_FLID);
655 CDEBUG(D_INFO, "Remote mdo_is_subdir(), new src "DFID"\n",
659 ptlrpc_req_finished(mci->mci_req);
663 /** Instance of md_dir_operations for mdc. */
664 static const struct md_dir_operations mdc_dir_ops = {
665 .mdo_is_subdir = mdc_is_subdir,
666 .mdo_rename_tgt = mdc_rename_tgt