From: wangdi Date: Sat, 16 Nov 2013 18:53:15 +0000 (-0800) Subject: LU-1187 mdt: add out handler for object update X-Git-Tag: 2.3.61~63 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=a7241f1692be2f725a4b2607801e6829c842891c LU-1187 mdt: add out handler for object update Add object update handler on remote MDT. Because the RPC only includes object updates, (no metadata operation), the out_handler will skip MDD layer and call osd layer directly. Signed-off-by: wang di Change-Id: I9d553145cedfffcbb26d839ed8c7d9cc0887ed86 Reviewed-on: http://review.whamcloud.com/4928 Reviewed-by: Mike Pershin Reviewed-by: Alex Zhuravlev Tested-by: Hudson Tested-by: Maloo Reviewed-by: Oleg Drokin --- diff --git a/lustre/include/md_object.h b/lustre/include/md_object.h index e966fce..04f5a52 100644 --- a/lustre/include/md_object.h +++ b/lustre/include/md_object.h @@ -840,6 +840,17 @@ static inline int mdo_rename_tgt(const struct lu_env *env, } } +/** + * Used in MDD/OUT layer for object lock rule + **/ +enum mdd_object_role { + MOR_SRC_PARENT, + MOR_SRC_CHILD, + MOR_TGT_PARENT, + MOR_TGT_CHILD, + MOR_TGT_ORPHAN +}; + struct dt_device; /** * Structure to hold object information. This is used to create object diff --git a/lustre/include/obd_support.h b/lustre/include/obd_support.h index 3a91473..b9a034d 100644 --- a/lustre/include/obd_support.h +++ b/lustre/include/obd_support.h @@ -255,7 +255,6 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type, #define OBD_FAIL_MDS_RECOVERY_ACCEPTS_GAPS 0x185 #define OBD_FAIL_MDS_GET_INFO_NET 0x186 #define OBD_FAIL_MDS_DQACQ_NET 0x187 -#define OBD_FAIL_MDS_OBJ_UPDATE_NET 0x188 /* OI scrub */ #define OBD_FAIL_OSD_SCRUB_DELAY 0x190 @@ -461,6 +460,7 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type, #define OBD_FAIL_LOV_INIT 0x1403 #define OBD_FAIL_GLIMPSE_DELAY 0x1404 +#define OBD_FAIL_UPDATE_OBJ_NET 0x1500 /* Assign references to moved code to reduce code changes */ #define OBD_FAIL_PRECHECK(id) CFS_FAIL_PRECHECK(id) #define OBD_FAIL_CHECK(id) CFS_FAIL_CHECK(id) diff --git a/lustre/mdd/mdd_internal.h b/lustre/mdd/mdd_internal.h index 169d05c..f2d21a4 100644 --- a/lustre/mdd/mdd_internal.h +++ b/lustre/mdd/mdd_internal.h @@ -142,14 +142,6 @@ enum mod_flags { ORPHAN_OBJ = 1 << 3, }; -enum mdd_object_role { - MOR_SRC_PARENT, - MOR_SRC_CHILD, - MOR_TGT_PARENT, - MOR_TGT_CHILD, - MOR_TGT_ORPHAN -}; - struct mdd_object { struct md_object mod_obj; /* open count */ diff --git a/lustre/mdt/Makefile.in b/lustre/mdt/Makefile.in index 4865a02..3867fd4 100644 --- a/lustre/mdt/Makefile.in +++ b/lustre/mdt/Makefile.in @@ -1,6 +1,6 @@ MODULES := mdt mdt-objs := mdt_handler.o mdt_lib.o mdt_reint.o mdt_xattr.o mdt_recovery.o mdt-objs += mdt_open.o mdt_idmap.o mdt_identity.o mdt_capa.o mdt_lproc.o mdt_fs.o -mdt-objs += mdt_lvb.o mdt_hsm.o mdt_mds.o +mdt-objs += mdt_lvb.o mdt_hsm.o mdt_mds.o out_handler.o @INCLUDE_RULES@ diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index 7f10bc4..2f98d85 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -2605,8 +2605,7 @@ void mdt_object_unlock_put(struct mdt_thread_info * info, mdt_object_put(info->mti_env, o); } -static struct mdt_handler *mdt_handler_find(__u32 opc, - struct mdt_opc_slice *supported) +struct mdt_handler *mdt_handler_find(__u32 opc, struct mdt_opc_slice *supported) { struct mdt_opc_slice *s; struct mdt_handler *h; @@ -3109,6 +3108,7 @@ static int mdt_msg_check_version(struct lustre_msg *msg) case MDS_HSM_ACTION: case MDS_QUOTACHECK: case MDS_QUOTACTL: + case UPDATE_OBJ: case QUOTA_DQACQ: case QUOTA_DQREL: case SEQ_QUERY: diff --git a/lustre/mdt/mdt_internal.h b/lustre/mdt/mdt_internal.h index 9abfbe6..37d7bc5 100644 --- a/lustre/mdt/mdt_internal.h +++ b/lustre/mdt/mdt_internal.h @@ -102,7 +102,7 @@ struct mdt_device { struct seq_server_site mdt_seq_site; struct ptlrpc_service *mdt_regular_service; struct ptlrpc_service *mdt_readpage_service; - struct ptlrpc_service *mdt_xmds_service; + struct ptlrpc_service *mdt_out_service; struct ptlrpc_service *mdt_setattr_service; struct ptlrpc_service *mdt_mdsc_service; struct ptlrpc_service *mdt_mdss_service; @@ -274,6 +274,57 @@ enum mdt_reint_flag { MRF_OPEN_TRUNC = 1 << 0, }; +struct mdt_thread_info; +struct tx_arg; +typedef int (*tx_exec_func_t)(struct mdt_thread_info *, struct thandle *, + struct tx_arg *); + +struct tx_arg { + tx_exec_func_t exec_fn; + tx_exec_func_t undo_fn; + struct dt_object *object; + char *file; + int line; + struct update_reply *reply; + int index; + union { + struct { + const struct dt_rec *rec; + const struct dt_key *key; + } insert; + struct { + } ref; + struct { + struct lu_attr attr; + } attr_set; + struct { + struct lu_buf buf; + const char *name; + int flags; + __u32 csum; + } xattr_set; + struct { + struct lu_attr attr; + struct dt_allocation_hint hint; + struct dt_object_format dof; + struct lu_fid fid; + } create; + struct { + struct lu_buf buf; + loff_t pos; + } write; + } u; +}; + +#define TX_MAX_OPS 10 +struct thandle_exec_args { + struct thandle *ta_handle; + struct dt_device *ta_dev; + int ta_err; + struct tx_arg ta_args[TX_MAX_OPS]; + int ta_argno; /* used args */ +}; + /* * Common data shared by mdt-level handlers. This is allocated per-thread to * reduce stack consumption. @@ -393,6 +444,14 @@ struct mdt_thread_info { struct md_attr attr; struct md_som_data data; } som; + struct { + struct dt_object_format mti_update_dof; + struct update_reply *mti_update_reply; + struct update *mti_update; + int mti_update_reply_index; + struct obdo mti_obdo; + struct dt_object *mti_dt_object; + } update; } mti_u; /* IO epoch related stuff. */ @@ -412,6 +471,8 @@ struct mdt_thread_info { int mti_big_lmm_used; /* should be enough to fit lustre_mdt_attrs */ char mti_xattr_buf[128]; + struct thandle_exec_args mti_handle; + struct ldlm_enqueue_info mti_einfo; }; /* ptlrpc request handler for MDT. All handlers are @@ -728,6 +789,8 @@ extern struct lprocfs_vars lprocfs_mds_obd_vars[]; int mdt_hsm_attr_set(struct mdt_thread_info *info, struct mdt_object *obj, struct md_hsm *mh); +struct mdt_handler *mdt_handler_find(__u32 opc, + struct mdt_opc_slice *supported); /* mdt_idmap.c */ int mdt_init_sec_level(struct mdt_thread_info *); int mdt_init_idmap(struct mdt_thread_info *); @@ -1010,5 +1073,35 @@ static inline char *mdt_obd_name(struct mdt_device *mdt) int mds_mod_init(void); void mds_mod_exit(void); +/* Update handlers */ +int out_handle(struct mdt_thread_info *info); + +#define out_tx_create(info, obj, attr, fid, dof, th, reply, idx) \ + __out_tx_create(info, obj, attr, fid, dof, th, reply, idx, \ + __FILE__, __LINE__) + +#define out_tx_attr_set(info, obj, attr, th, reply, idx) \ + __out_tx_attr_set(info, obj, attr, th, reply, idx, \ + __FILE__, __LINE__) + +#define out_tx_xattr_set(info, obj, buf, name, fl, th, reply, idx) \ + __out_tx_xattr_set(info, obj, buf, name, fl, th, reply, idx, \ + __FILE__, __LINE__) + +#define out_tx_ref_add(info, obj, th, reply, idx) \ + __out_tx_ref_add(info, obj, th, reply, idx, __FILE__, __LINE__) + +#define out_tx_ref_del(info, obj, th, reply, idx) \ + __out_tx_ref_del(info, obj, th, reply, idx, __FILE__, __LINE__) + +#define out_tx_index_insert(info, obj, th, name, fid, reply, idx) \ + __out_tx_index_insert(info, obj, th, name, fid, reply, idx, \ + __FILE__, __LINE__) + +#define out_tx_index_delete(info, obj, th, name, reply, idx) \ + __out_tx_index_delete(info, obj, th, name, reply, idx, \ + __FILE__, __LINE__) + + #endif /* __KERNEL__ */ #endif /* _MDT_H */ diff --git a/lustre/mdt/mdt_mds.c b/lustre/mdt/mdt_mds.c index 0a15bd3..d5d4dde 100644 --- a/lustre/mdt/mdt_mds.c +++ b/lustre/mdt/mdt_mds.c @@ -63,6 +63,7 @@ struct mds_device { struct md_device mds_md_dev; struct ptlrpc_service *mds_regular_service; struct ptlrpc_service *mds_readpage_service; + struct ptlrpc_service *mds_out_service; struct ptlrpc_service *mds_setattr_service; struct ptlrpc_service *mds_mdsc_service; struct ptlrpc_service *mds_mdss_service; @@ -308,6 +309,46 @@ struct mdt_opc_slice mdt_fld_handlers[] = { } }; +/* Request with a format known in advance */ +#define DEF_UPDATE_HDL(flags, name, fn) \ + DEFINE_RPC_HANDLER(UPDATE_OBJ, flags, name, fn, &RQF_ ## name) + +#define target_handler mdt_handler +static struct target_handler out_ops[] = { + DEF_UPDATE_HDL(MUTABOR, UPDATE_OBJ, out_handle), +}; + +static struct mdt_opc_slice update_handlers[] = { + { + .mos_opc_start = MDS_GETATTR, + .mos_opc_end = MDS_LAST_OPC, + .mos_hs = mdt_mds_ops + }, + { + .mos_opc_start = OBD_PING, + .mos_opc_end = OBD_LAST_OPC, + .mos_hs = mdt_obd_ops + }, + { + .mos_opc_start = LDLM_ENQUEUE, + .mos_opc_end = LDLM_LAST_OPC, + .mos_hs = mdt_dlm_ops + }, + { + .mos_opc_start = SEC_CTX_INIT, + .mos_opc_end = SEC_LAST_OPC, + .mos_hs = mdt_sec_ctx_ops + }, + { + .mos_opc_start = UPDATE_OBJ, + .mos_opc_end = UPDATE_LAST_OPC, + .mos_hs = out_ops + }, + { + .mos_hs = NULL + } +}; + static int mds_regular_handle(struct ptlrpc_request *req) { return mdt_handle_common(req, mdt_regular_handlers); @@ -323,6 +364,11 @@ static int mds_mdsc_handle(struct ptlrpc_request *req) return mdt_handle_common(req, mdt_seq_handlers); } +static int mdt_out_handle(struct ptlrpc_request *req) +{ + return mdt_handle_common(req, update_handlers); +} + static int mds_mdss_handle(struct ptlrpc_request *req) { return mdt_handle_common(req, mdt_seq_handlers); @@ -345,6 +391,10 @@ static void mds_stop_ptlrpc_service(struct mds_device *m) ptlrpc_unregister_service(m->mds_readpage_service); m->mds_readpage_service = NULL; } + if (m->mds_out_service != NULL) { + ptlrpc_unregister_service(m->mds_out_service); + m->mds_out_service = NULL; + } if (m->mds_setattr_service != NULL) { ptlrpc_unregister_service(m->mds_setattr_service); m->mds_setattr_service = NULL; @@ -508,6 +558,49 @@ static int mds_start_ptlrpc_service(struct mds_device *m) GOTO(err_mds_svc, rc); } + /* Object update service */ + conf = (typeof(conf)) { + .psc_name = LUSTRE_MDT_NAME "_out", + .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR, + .psc_buf = { + .bc_nbufs = MDS_NBUFS, + .bc_buf_size = MDS_BUFSIZE, + .bc_req_max_size = MDS_MAXREQSIZE, + .bc_rep_max_size = MDS_MAXREPSIZE, + .bc_req_portal = MDS_MDS_PORTAL, + .bc_rep_portal = MDC_REPLY_PORTAL, + }, + /* + * We'd like to have a mechanism to set this on a per-device + * basis, but alas... + */ + .psc_thr = { + .tc_thr_name = LUSTRE_MDT_NAME "_out", + .tc_thr_factor = MDS_THR_FACTOR, + .tc_nthrs_init = MDS_NTHRS_INIT, + .tc_nthrs_base = MDS_NTHRS_BASE, + .tc_nthrs_max = MDS_NTHRS_MAX, + .tc_nthrs_user = mds_num_threads, + .tc_cpu_affinity = 1, + .tc_ctx_tags = LCT_MD_THREAD, + }, + .psc_cpt = { + .cc_pattern = mds_num_cpts, + }, + .psc_ops = { + .so_req_handler = mdt_out_handle, + .so_req_printer = target_print_req, + .so_hpreq_handler = NULL, + }, + }; + m->mds_out_service = ptlrpc_register_service(&conf, procfs_entry); + if (IS_ERR(m->mds_out_service)) { + rc = PTR_ERR(m->mds_out_service); + CERROR("failed to start out service: %d\n", rc); + m->mds_out_service = NULL; + GOTO(err_mds_svc, rc); + } + /* * sequence controller service configuration */ diff --git a/lustre/mdt/out_handler.c b/lustre/mdt/out_handler.c new file mode 100644 index 0000000..83671d5 --- /dev/null +++ b/lustre/mdt/out_handler.c @@ -0,0 +1,1172 @@ +/* + * GPL HEADER START + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.gnu.org/licenses/gpl-2.0.html + * + * GPL HEADER END + */ +/* + * Copyright (c) 2013, Intel Corporation. + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + * Lustre is a trademark of Sun Microsystems, Inc. + * + * lustre/mdt/out_handler.c + * + * Object update handler between targets. + * + * Author: di.wang + */ + +#ifndef EXPORT_SYMTAB +# define EXPORT_SYMTAB +#endif +#define DEBUG_SUBSYSTEM S_MDS + +#include "mdt_internal.h" +#include + +static const char dot[] = "."; +static const char dotdot[] = ".."; + +struct tx_arg *tx_add_exec(struct thandle_exec_args *ta, tx_exec_func_t func, + tx_exec_func_t undo, char *file, int line) +{ + int i; + + LASSERT(ta); + LASSERT(func); + + i = ta->ta_argno; + LASSERT(i < UPDATE_MAX_OPS); + + ta->ta_argno++; + + ta->ta_args[i].exec_fn = func; + ta->ta_args[i].undo_fn = undo; + ta->ta_args[i].file = file; + ta->ta_args[i].line = line; + + return &ta->ta_args[i]; +} + +static int out_tx_start(const struct lu_env *env, struct mdt_device *mdt, + struct thandle_exec_args *th) +{ + struct dt_device *dt = mdt->mdt_bottom; + + memset(th, 0, sizeof(*th)); + th->ta_handle = dt_trans_create(env, dt); + if (IS_ERR(th->ta_handle)) { + CERROR("%s: start handle error: rc = %ld\n", + mdt2obd_dev(mdt)->obd_name, PTR_ERR(th->ta_handle)); + return PTR_ERR(th->ta_handle); + } + th->ta_dev = dt; + /*For phase I, sync for cross-ref operation*/ + th->ta_handle->th_sync = 1; + return 0; +} + +static int out_trans_start(const struct lu_env *env, struct dt_device *dt, + struct thandle *th) +{ + /* Always do sync commit for Phase I */ + LASSERT(th->th_sync != 0); + return dt_trans_start(env, dt, th); +} + +static int out_trans_stop(const struct lu_env *env, + struct thandle_exec_args *th, int err) +{ + int i; + int rc; + + th->ta_handle->th_result = err; + LASSERT(th->ta_handle->th_sync != 0); + rc = dt_trans_stop(env, th->ta_dev, th->ta_handle); + for (i = 0; i < th->ta_argno; i++) { + if (th->ta_args[i].object != NULL) { + lu_object_put(env, &th->ta_args[i].object->do_lu); + th->ta_args[i].object = NULL; + } + } + + return rc; +} + +int out_tx_end(struct mdt_thread_info *info, struct thandle_exec_args *th) +{ + struct thandle_exec_args *_th = &info->mti_handle; + int i = 0, rc; + + LASSERT(th == _th); + LASSERT(th->ta_dev); + LASSERT(th->ta_handle); + + if (th->ta_err != 0 || th->ta_argno == 0) + GOTO(stop, rc = th->ta_err); + + rc = out_trans_start(info->mti_env, th->ta_dev, th->ta_handle); + if (unlikely(rc)) + GOTO(stop, rc); + + for (i = 0; i < th->ta_argno; i++) { + rc = th->ta_args[i].exec_fn(info, th->ta_handle, + &th->ta_args[i]); + if (unlikely(rc)) { + CDEBUG(D_INFO, "error during execution of #%u from" + " %s:%d: rc = %d\n", i, th->ta_args[i].file, + th->ta_args[i].line, rc); + while (--i >= 0) { + LASSERTF(th->ta_args[i].undo_fn != NULL, + "can't undo changes, hope for failover!\n"); + th->ta_args[i].undo_fn(info, th->ta_handle, + &th->ta_args[i]); + } + break; + } + } +stop: + CDEBUG(D_INFO, "%s: executed %u/%u: rc = %d\n", + mdt_obd_name(info->mti_mdt), i, th->ta_argno, rc); + out_trans_stop(info->mti_env, th, rc); + th->ta_handle = NULL; + th->ta_argno = 0; + th->ta_err = 0; + + RETURN(rc); +} + +static struct dt_object *out_get_dt_obj(struct lu_object *obj) +{ + struct mdt_device *mdt; + struct dt_device *dt; + struct lu_object *bottom_obj; + + mdt = lu2mdt_dev(obj->lo_dev); + dt = mdt->mdt_bottom; + + bottom_obj = lu_object_locate(obj->lo_header, dt->dd_lu_dev.ld_type); + if (bottom_obj == NULL) + return ERR_PTR(-ENOENT); + + return lu2dt_obj(bottom_obj); +} + +static struct dt_object *out_object_find(struct mdt_thread_info *info, + struct lu_fid *fid) +{ + struct lu_object *obj; + struct dt_object *dt_obj; + + obj = lu_object_find(info->mti_env, + &info->mti_mdt->mdt_md_dev.md_lu_dev, + fid, NULL); + if (IS_ERR(obj)) + return (struct dt_object *)obj; + + dt_obj = out_get_dt_obj(obj); + if (IS_ERR(dt_obj)) { + lu_object_put(info->mti_env, obj); + return ERR_PTR(-ENOENT); + } + + return dt_obj; +} + +/** + * All of the xxx_undo will be used once execution failed, + * But because all of the required resource has been reserved in + * declare phase, i.e. if declare succeed, it should make sure + * the following executing phase succeed in anyway, so these undo + * should be useless for most of the time in Phase I + **/ +int out_tx_create_undo(struct mdt_thread_info *info, struct thandle *th, + struct tx_arg *arg) +{ + struct dt_object *dt_obj = arg->object; + int rc; + + LASSERT(dt_obj != NULL && !IS_ERR(dt_obj)); + + dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD); + rc = dt_destroy(info->mti_env, dt_obj, th); + dt_write_unlock(info->mti_env, dt_obj); + + /* we don't like double failures */ + if (rc != 0) + CERROR("%s: undo failure, we are doomed!: rc = %d\n", + mdt_obd_name(info->mti_mdt), rc); + return rc; +} + +int out_tx_create_exec(struct mdt_thread_info *info, struct thandle *th, + struct tx_arg *arg) +{ + struct dt_object *dt_obj = arg->object; + int rc; + + LASSERT(dt_obj != NULL && !IS_ERR(dt_obj)); + + CDEBUG(D_OTHER, "create "DFID": dof %u, mode %o\n", + PFID(lu_object_fid(&arg->object->do_lu)), + arg->u.create.dof.dof_type, + arg->u.create.attr.la_mode & S_IFMT); + + dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD); + rc = dt_create(info->mti_env, dt_obj, &arg->u.create.attr, + &arg->u.create.hint, &arg->u.create.dof, th); + + dt_write_unlock(info->mti_env, dt_obj); + CDEBUG(D_INFO, "insert create reply mode %o index %d\n", + arg->u.create.attr.la_mode, arg->index); + + update_insert_reply(arg->reply, NULL, 0, arg->index, rc); + + return rc; +} + +static int __out_tx_create(struct mdt_thread_info *info, struct dt_object *obj, + struct lu_attr *attr, struct lu_fid *parent_fid, + struct dt_object_format *dof, + struct thandle_exec_args *th, + struct update_reply *reply, + int index, char *file, int line) +{ + struct tx_arg *arg; + + LASSERT(th->ta_handle != NULL); + th->ta_err = dt_declare_create(info->mti_env, obj, attr, NULL, dof, + th->ta_handle); + if (th->ta_err != 0) + return th->ta_err; + + arg = tx_add_exec(th, out_tx_create_exec, out_tx_create_undo, file, + line); + LASSERT(arg); + + /* release the object in out_trans_stop */ + lu_object_get(&obj->do_lu); + arg->object = obj; + arg->u.create.attr = *attr; + if (parent_fid) + arg->u.create.fid = *parent_fid; + memset(&arg->u.create.hint, 0, sizeof(arg->u.create.hint)); + arg->u.create.dof = *dof; + arg->reply = reply; + arg->index = index; + + return 0; +} + +static int out_create(struct mdt_thread_info *info) +{ + struct update *update = info->mti_u.update.mti_update; + struct dt_object *obj = info->mti_u.update.mti_dt_object; + struct dt_object_format *dof = &info->mti_u.update.mti_update_dof; + struct obdo *lobdo = &info->mti_u.update.mti_obdo; + struct lu_attr *attr = &info->mti_attr.ma_attr; + struct lu_fid *fid = NULL; + struct obdo *wobdo; + int size; + int rc; + + ENTRY; + + wobdo = update_param_buf(update, 0, &size); + if (wobdo == NULL || size != sizeof(*wobdo)) { + CERROR("%s: obdo is NULL, invalid RPC: rc = %d\n", + mdt_obd_name(info->mti_mdt), -EPROTO); + RETURN(err_serious(-EPROTO)); + } + + obdo_le_to_cpu(wobdo, wobdo); + lustre_get_wire_obdo(lobdo, wobdo); + la_from_obdo(attr, lobdo, lobdo->o_valid); + + dof->dof_type = dt_mode_to_dft(attr->la_mode); + if (S_ISDIR(attr->la_mode)) { + int size; + + fid = update_param_buf(update, 1, &size); + if (fid == NULL || size != sizeof(*fid)) { + CERROR("%s: invalid fid: rc = %d\n", + mdt_obd_name(info->mti_mdt), -EPROTO); + RETURN(err_serious(-EPROTO)); + } + fid_le_to_cpu(fid, fid); + if (!fid_is_sane(fid)) { + CERROR("%s: invalid fid "DFID": rc = %d\n", + mdt_obd_name(info->mti_mdt), + PFID(fid), -EPROTO); + RETURN(err_serious(-EPROTO)); + } + } + + rc = out_tx_create(info, obj, attr, fid, dof, &info->mti_handle, + info->mti_u.update.mti_update_reply, + info->mti_u.update.mti_update_reply_index); + + RETURN(rc); +} + +static int out_tx_attr_set_undo(struct mdt_thread_info *info, + struct thandle *th, struct tx_arg *arg) +{ + CERROR("%s: attr set undo "DFID" unimplemented yet!: rc = %d\n", + mdt_obd_name(info->mti_mdt), + PFID(lu_object_fid(&arg->object->do_lu)), -ENOTSUPP); + + return -ENOTSUPP; +} + +static int out_tx_attr_set_exec(struct mdt_thread_info *info, + struct thandle *th, struct tx_arg *arg) +{ + struct dt_object *dt_obj = arg->object; + int rc; + + LASSERT(dt_obj != NULL && !IS_ERR(dt_obj)); + + CDEBUG(D_OTHER, "attr set "DFID"\n", + PFID(lu_object_fid(&arg->object->do_lu))); + + dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD); + rc = dt_attr_set(info->mti_env, dt_obj, &arg->u.attr_set.attr, + th, NULL); + dt_write_unlock(info->mti_env, dt_obj); + + update_insert_reply(arg->reply, NULL, 0, arg->index, rc); + + return rc; +} + +static int __out_tx_attr_set(struct mdt_thread_info *info, + struct dt_object *dt_obj, + const struct lu_attr *attr, + struct thandle_exec_args *th, + struct update_reply *reply, int index, + char *file, int line) +{ + struct tx_arg *arg; + + LASSERT(th->ta_handle != NULL); + th->ta_err = dt_declare_attr_set(info->mti_env, dt_obj, attr, + th->ta_handle); + if (th->ta_err != 0) + return th->ta_err; + + arg = tx_add_exec(th, out_tx_attr_set_exec, out_tx_attr_set_undo, + file, line); + LASSERT(arg); + lu_object_get(&dt_obj->do_lu); + arg->object = dt_obj; + arg->u.attr_set.attr = *attr; + arg->reply = reply; + arg->index = index; + return 0; +} + +static int out_attr_set(struct mdt_thread_info *info) +{ + struct update *update = info->mti_u.update.mti_update; + struct lu_attr *attr = &info->mti_attr.ma_attr; + struct dt_object *obj = info->mti_u.update.mti_dt_object; + struct obdo *lobdo = &info->mti_u.update.mti_obdo; + struct obdo *wobdo; + int size; + int rc; + + ENTRY; + + wobdo = update_param_buf(update, 0, &size); + if (wobdo == NULL || size != sizeof(*wobdo)) { + CERROR("%s: empty obdo in the update: rc = %d\n", + mdt_obd_name(info->mti_mdt), -EPROTO); + RETURN(err_serious(-EPROTO)); + } + + attr->la_valid = 0; + attr->la_valid = 0; + obdo_le_to_cpu(wobdo, wobdo); + lustre_get_wire_obdo(lobdo, wobdo); + la_from_obdo(attr, lobdo, lobdo->o_valid); + + rc = out_tx_attr_set(info, obj, attr, &info->mti_handle, + info->mti_u.update.mti_update_reply, + info->mti_u.update.mti_update_reply_index); + + RETURN(rc); +} + +static int out_attr_get(struct mdt_thread_info *info) +{ + struct obdo *obdo = &info->mti_u.update.mti_obdo; + const struct lu_env *env = info->mti_env; + struct lu_attr *la = &info->mti_attr.ma_attr; + struct dt_object *obj = info->mti_u.update.mti_dt_object; + int rc; + + ENTRY; + + if (!lu_object_exists(&obj->do_lu)) + RETURN(-ENOENT); + + dt_read_lock(env, obj, MOR_TGT_CHILD); + rc = dt_attr_get(env, obj, la, NULL); + if (rc) + GOTO(out_unlock, rc); + /* + * If it is a directory, we will also check whether the + * directory is empty. + * la_flags = 0 : Empty. + * = 1 : Not empty. + */ + la->la_flags = 0; + if (S_ISDIR(la->la_mode)) { + struct dt_it *it; + const struct dt_it_ops *iops; + + if (!dt_try_as_dir(env, obj)) + GOTO(out_unlock, rc = -ENOTDIR); + + iops = &obj->do_index_ops->dio_it; + it = iops->init(env, obj, LUDA_64BITHASH, BYPASS_CAPA); + if (!IS_ERR(it)) { + int result; + result = iops->get(env, it, (const void *)""); + if (result > 0) { + int i; + for (result = 0, i = 0; result == 0 && i < 3; + ++i) + result = iops->next(env, it); + if (result == 0) + la->la_flags = 1; + } else if (result == 0) + /* + * Huh? Index contains no zero key? + */ + rc = -EIO; + + iops->put(env, it); + iops->fini(env, it); + } + } + + obdo->o_valid = 0; + obdo_from_la(obdo, la, la->la_valid); + obdo_cpu_to_le(obdo, obdo); + lustre_set_wire_obdo(obdo, obdo); + +out_unlock: + dt_read_unlock(env, obj); + update_insert_reply(info->mti_u.update.mti_update_reply, obdo, + sizeof(*obdo), 0, rc); + RETURN(rc); +} + +static int out_xattr_get(struct mdt_thread_info *info) +{ + struct update *update = info->mti_u.update.mti_update; + const struct lu_env *env = info->mti_env; + struct lu_buf *lbuf = &info->mti_buf; + struct update_reply *reply = info->mti_u.update.mti_update_reply; + struct dt_object *obj = info->mti_u.update.mti_dt_object; + char *name; + void *ptr; + int rc; + + ENTRY; + + name = (char *)update_param_buf(update, 0, NULL); + if (name == NULL) { + CERROR("%s: empty name for xattr get: rc = %d\n", + mdt_obd_name(info->mti_mdt), -EPROTO); + RETURN(err_serious(-EPROTO)); + } + + ptr = update_get_buf_internal(reply, 0, NULL); + LASSERT(ptr != NULL); + + /* The first 4 bytes(int) are used to store the result */ + lbuf->lb_buf = (char *)ptr + sizeof(int); + lbuf->lb_len = UPDATE_BUFFER_SIZE - sizeof(struct update_reply); + dt_read_lock(env, obj, MOR_TGT_CHILD); + rc = dt_xattr_get(env, obj, lbuf, name, NULL); + dt_read_unlock(env, obj); + if (rc < 0) { + lbuf->lb_len = 0; + GOTO(out, rc); + } + if (rc == 0) { + lbuf->lb_len = 0; + GOTO(out, rc = -ENOENT); + } + lbuf->lb_len = rc; + rc = 0; + CDEBUG(D_INFO, "%s: "DFID" get xattr %s len %d\n", + mdt_obd_name(info->mti_mdt), PFID(lu_object_fid(&obj->do_lu)), + name, (int)lbuf->lb_len); +out: + *(int *)ptr = rc; + reply->ur_lens[0] = lbuf->lb_len + sizeof(int); + RETURN(rc); +} + +static int out_index_lookup(struct mdt_thread_info *info) +{ + struct update *update = info->mti_u.update.mti_update; + const struct lu_env *env = info->mti_env; + struct dt_object *obj = info->mti_u.update.mti_dt_object; + char *name; + int rc; + + ENTRY; + + if (!lu_object_exists(&obj->do_lu)) + RETURN(-ENOENT); + + name = (char *)update_param_buf(update, 0, NULL); + if (name == NULL) { + CERROR("%s: empty name for lookup: rc = %d\n", + mdt_obd_name(info->mti_mdt), -EPROTO); + RETURN(err_serious(-EPROTO)); + } + + dt_read_lock(env, obj, MOR_TGT_CHILD); + if (!dt_try_as_dir(env, obj)) + GOTO(out_unlock, rc = -ENOTDIR); + + rc = dt_lookup(env, obj, (struct dt_rec *)&info->mti_tmp_fid1, + (struct dt_key *)name, NULL); + + if (rc < 0) + GOTO(out_unlock, rc); + + if (rc == 0) + rc += 1; + + CDEBUG(D_INFO, "lookup "DFID" %s get "DFID" rc %d\n", + PFID(lu_object_fid(&obj->do_lu)), name, + PFID(&info->mti_tmp_fid1), rc); + fid_cpu_to_le(&info->mti_tmp_fid1, &info->mti_tmp_fid1); + +out_unlock: + dt_read_unlock(env, obj); + update_insert_reply(info->mti_u.update.mti_update_reply, + &info->mti_tmp_fid1, sizeof(info->mti_tmp_fid1), + 0, rc); + RETURN(rc); +} + +static int out_tx_xattr_set_exec(struct mdt_thread_info *info, + struct thandle *th, + struct tx_arg *arg) +{ + struct dt_object *dt_obj = arg->object; + int rc; + + LASSERT(dt_obj != NULL && !IS_ERR(dt_obj)); + + CDEBUG(D_OTHER, "attr set "DFID"\n", + PFID(lu_object_fid(&arg->object->do_lu))); + + dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD); + rc = dt_xattr_set(info->mti_env, dt_obj, &arg->u.xattr_set.buf, + arg->u.xattr_set.name, arg->u.xattr_set.flags, + th, NULL); + dt_write_unlock(info->mti_env, dt_obj); + /** + * Ignore errors if this is LINK EA + **/ + if (unlikely(rc && !strncmp(arg->u.xattr_set.name, XATTR_NAME_LINK, + strlen(XATTR_NAME_LINK)))) + rc = 0; + + update_insert_reply(arg->reply, NULL, 0, arg->index, rc); + + CDEBUG(D_INFO, "set xattr buf %p name %s flag %d\n", + arg->u.xattr_set.buf.lb_buf, arg->u.xattr_set.name, + arg->u.xattr_set.flags); + + return rc; +} + +static int __out_tx_xattr_set(struct mdt_thread_info *info, + struct dt_object *dt_obj, + const struct lu_buf *buf, + const char *name, int flags, + struct thandle_exec_args *th, + struct update_reply *reply, int index, + char *file, int line) +{ + struct tx_arg *arg; + + LASSERT(th->ta_handle != NULL); + th->ta_err = dt_declare_xattr_set(info->mti_env, dt_obj, buf, name, + flags, th->ta_handle); + if (th->ta_err != 0) + return th->ta_err; + + arg = tx_add_exec(th, out_tx_xattr_set_exec, NULL, file, line); + LASSERT(arg); + lu_object_get(&dt_obj->do_lu); + arg->object = dt_obj; + arg->u.xattr_set.name = name; + arg->u.xattr_set.flags = flags; + arg->u.xattr_set.buf = *buf; + arg->reply = reply; + arg->index = index; + arg->u.xattr_set.csum = 0; + return 0; +} + +static int out_xattr_set(struct mdt_thread_info *info) +{ + struct update *update = info->mti_u.update.mti_update; + struct dt_object *obj = info->mti_u.update.mti_dt_object; + struct lu_buf *lbuf = &info->mti_buf; + char *name; + char *buf; + char *tmp; + int buf_len = 0; + int flag; + int rc; + ENTRY; + + name = update_param_buf(update, 0, NULL); + if (name == NULL) { + CERROR("%s: empty name for xattr set: rc = %d\n", + mdt_obd_name(info->mti_mdt), -EPROTO); + RETURN(err_serious(-EPROTO)); + } + + buf = (char *)update_param_buf(update, 1, &buf_len); + if (buf == NULL || buf_len == 0) { + CERROR("%s: empty buf for xattr set: rc = %d\n", + mdt_obd_name(info->mti_mdt), -EPROTO); + RETURN(err_serious(-EPROTO)); + } + + lbuf->lb_buf = buf; + lbuf->lb_len = buf_len; + + tmp = (char *)update_param_buf(update, 2, NULL); + if (tmp == NULL) { + CERROR("%s: empty flag for xattr set: rc = %d\n", + mdt_obd_name(info->mti_mdt), -EPROTO); + RETURN(err_serious(-EPROTO)); + } + + flag = le32_to_cpu(*(int *)tmp); + + rc = out_tx_xattr_set(info, obj, lbuf, name, flag, &info->mti_handle, + info->mti_u.update.mti_update_reply, + info->mti_u.update.mti_update_reply_index); + + RETURN(rc); +} + +static int out_tx_ref_add_exec(struct mdt_thread_info *info, struct thandle *th, + struct tx_arg *arg) +{ + struct dt_object *dt_obj = arg->object; + + LASSERT(dt_obj != NULL && !IS_ERR(dt_obj)); + + CDEBUG(D_OTHER, "ref add "DFID"\n", + PFID(lu_object_fid(&arg->object->do_lu))); + + dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD); + dt_ref_add(info->mti_env, dt_obj, th); + dt_write_unlock(info->mti_env, dt_obj); + + update_insert_reply(arg->reply, NULL, 0, arg->index, 0); + return 0; +} + +static int out_tx_ref_add_undo(struct mdt_thread_info *info, struct thandle *th, + struct tx_arg *arg) +{ + struct dt_object *dt_obj = arg->object; + + LASSERT(dt_obj != NULL && !IS_ERR(dt_obj)); + + CDEBUG(D_OTHER, "ref del "DFID"\n", + PFID(lu_object_fid(&arg->object->do_lu))); + + dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD); + dt_ref_del(info->mti_env, dt_obj, th); + dt_write_unlock(info->mti_env, dt_obj); + + return 0; +} + +static int __out_tx_ref_add(struct mdt_thread_info *info, + struct dt_object *dt_obj, + struct thandle_exec_args *th, + struct update_reply *reply, + int index, char *file, int line) +{ + struct tx_arg *arg; + + LASSERT(th->ta_handle != NULL); + th->ta_err = dt_declare_ref_add(info->mti_env, dt_obj, + th->ta_handle); + if (th->ta_err != 0) + return th->ta_err; + + arg = tx_add_exec(th, out_tx_ref_add_exec, out_tx_ref_add_undo, file, + line); + LASSERT(arg); + lu_object_get(&dt_obj->do_lu); + arg->object = dt_obj; + arg->reply = reply; + arg->index = index; + return 0; +} + +/** + * increase ref of the object + **/ +static int out_ref_add(struct mdt_thread_info *info) +{ + struct dt_object *obj = info->mti_u.update.mti_dt_object; + int rc; + + ENTRY; + + rc = out_tx_ref_add(info, obj, &info->mti_handle, + info->mti_u.update.mti_update_reply, + info->mti_u.update.mti_update_reply_index); + RETURN(rc); +} + +static int out_tx_ref_del_exec(struct mdt_thread_info *info, struct thandle *th, + struct tx_arg *arg) +{ + out_tx_ref_add_undo(info, th, arg); + update_insert_reply(arg->reply, NULL, 0, arg->index, 0); + return 0; +} + +static int out_tx_ref_del_undo(struct mdt_thread_info *info, struct thandle *th, + struct tx_arg *arg) +{ + return out_tx_ref_add_exec(info, th, arg); +} + +static int __out_tx_ref_del(struct mdt_thread_info *info, + struct dt_object *dt_obj, + struct thandle_exec_args *th, + struct update_reply *reply, + int index, char *file, int line) +{ + struct tx_arg *arg; + + LASSERT(th->ta_handle != NULL); + th->ta_err = dt_declare_ref_del(info->mti_env, dt_obj, th->ta_handle); + if (th->ta_err != 0) + return th->ta_err; + + arg = tx_add_exec(th, out_tx_ref_del_exec, out_tx_ref_del_undo, file, + line); + LASSERT(arg); + lu_object_get(&dt_obj->do_lu); + arg->object = dt_obj; + arg->reply = reply; + arg->index = index; + return 0; +} + +static int out_ref_del(struct mdt_thread_info *info) +{ + struct dt_object *obj = info->mti_u.update.mti_dt_object; + int rc; + + ENTRY; + + if (!lu_object_exists(&obj->do_lu)) + RETURN(-ENOENT); + + rc = out_tx_ref_del(info, obj, &info->mti_handle, + info->mti_u.update.mti_update_reply, + info->mti_u.update.mti_update_reply_index); + RETURN(rc); +} + +static int out_tx_index_insert_exec(struct mdt_thread_info *info, + struct thandle *th, struct tx_arg *arg) +{ + struct dt_object *dt_obj = arg->object; + int rc; + + LASSERT(dt_obj != NULL && !IS_ERR(dt_obj)); + + CDEBUG(D_OTHER, "index insert "DFID" name: %s fid "DFID"\n", + PFID(lu_object_fid(&arg->object->do_lu)), + (char *)arg->u.insert.key, + PFID((struct lu_fid *)arg->u.insert.rec)); + + if (dt_try_as_dir(info->mti_env, dt_obj) == 0) + return -ENOTDIR; + + dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD); + rc = dt_insert(info->mti_env, dt_obj, arg->u.insert.rec, + arg->u.insert.key, th, NULL, 0); + dt_write_unlock(info->mti_env, dt_obj); + + update_insert_reply(arg->reply, NULL, 0, arg->index, rc); + + return rc; +} + +static int out_tx_index_insert_undo(struct mdt_thread_info *info, + struct thandle *th, struct tx_arg *arg) +{ + struct dt_object *dt_obj = arg->object; + int rc; + + LASSERT(dt_obj != NULL && !IS_ERR(dt_obj)); + + CDEBUG(D_OTHER, "index delete "DFID" name: %s\n", + PFID(lu_object_fid(&arg->object->do_lu)), + (char *)arg->u.insert.key); + + if (dt_try_as_dir(info->mti_env, dt_obj) == 0) { + CERROR("%s: "DFID" is not directory: rc = %d\n", + mdt_obd_name(info->mti_mdt), + PFID(lu_object_fid(&dt_obj->do_lu)), -ENOTDIR); + return -ENOTDIR; + } + + dt_write_lock(info->mti_env, dt_obj, MOR_TGT_CHILD); + rc = dt_delete(info->mti_env, dt_obj, arg->u.insert.key, th, NULL); + dt_write_unlock(info->mti_env, dt_obj); + + update_insert_reply(arg->reply, NULL, 0, arg->index, rc); + + return rc; +} + +static int __out_tx_index_insert(struct mdt_thread_info *info, + struct dt_object *dt_obj, + char *name, struct lu_fid *fid, + struct thandle_exec_args *th, + struct update_reply *reply, + int index, char *file, int line) +{ + struct tx_arg *arg; + + LASSERT(th->ta_handle != NULL); + + if (lu_object_exists(&dt_obj->do_lu)) { + if (dt_try_as_dir(info->mti_env, dt_obj) == 0) { + th->ta_err = -ENOTDIR; + return th->ta_err; + } + th->ta_err = dt_declare_insert(info->mti_env, dt_obj, + (struct dt_rec *)fid, + (struct dt_key *)name, + th->ta_handle); + } + + if (th->ta_err != 0) + return th->ta_err; + + arg = tx_add_exec(th, out_tx_index_insert_exec, + out_tx_index_insert_undo, file, + line); + LASSERT(arg); + lu_object_get(&dt_obj->do_lu); + arg->object = dt_obj; + arg->reply = reply; + arg->index = index; + arg->u.insert.rec = (struct dt_rec *)fid; + arg->u.insert.key = (struct dt_key *)name; + + return 0; +} + +static int out_index_insert(struct mdt_thread_info *info) +{ + struct update *update = info->mti_u.update.mti_update; + struct dt_object *obj = info->mti_u.update.mti_dt_object; + struct lu_fid *fid; + char *name; + int rc = 0; + int size; + ENTRY; + + name = (char *)update_param_buf(update, 0, NULL); + if (name == NULL) { + CERROR("%s: empty name for index insert: rc = %d\n", + mdt_obd_name(info->mti_mdt), -EPROTO); + RETURN(err_serious(-EPROTO)); + } + + fid = (struct lu_fid *)update_param_buf(update, 1, &size); + if (fid == NULL || size != sizeof(*fid)) { + CERROR("%s: invalid fid: rc = %d\n", + mdt_obd_name(info->mti_mdt), -EPROTO); + RETURN(err_serious(-EPROTO)); + } + + fid_le_to_cpu(fid, fid); + if (!fid_is_sane(fid)) { + CERROR("%s: invalid FID "DFID": rc = %d\n", + mdt_obd_name(info->mti_mdt), PFID(fid), + -EPROTO); + RETURN(err_serious(-EPROTO)); + } + + rc = out_tx_index_insert(info, obj, name, fid, &info->mti_handle, + info->mti_u.update.mti_update_reply, + info->mti_u.update.mti_update_reply_index); + RETURN(rc); +} + +static int out_tx_index_delete_exec(struct mdt_thread_info *info, + struct thandle *th, + struct tx_arg *arg) +{ + return out_tx_index_insert_undo(info, th, arg); +} + +static int out_tx_index_delete_undo(struct mdt_thread_info *info, + struct thandle *th, + struct tx_arg *arg) +{ + return out_tx_index_insert_exec(info, th, arg); +} + +static int __out_tx_index_delete(struct mdt_thread_info *info, + struct dt_object *dt_obj, char *name, + struct thandle_exec_args *th, + struct update_reply *reply, + int index, char *file, int line) +{ + struct tx_arg *arg; + + if (dt_try_as_dir(info->mti_env, dt_obj) == 0) { + th->ta_err = -ENOTDIR; + return th->ta_err; + } + + LASSERT(th->ta_handle != NULL); + th->ta_err = dt_declare_delete(info->mti_env, dt_obj, + (struct dt_key *)name, + th->ta_handle); + if (th->ta_err != 0) + return th->ta_err; + + arg = tx_add_exec(th, out_tx_index_delete_exec, + out_tx_index_delete_undo, file, + line); + LASSERT(arg); + lu_object_get(&dt_obj->do_lu); + arg->object = dt_obj; + arg->reply = reply; + arg->index = index; + arg->u.insert.key = (struct dt_key *)name; + return 0; +} + +static int out_index_delete(struct mdt_thread_info *info) +{ + struct update *update = info->mti_u.update.mti_update; + struct dt_object *obj = info->mti_u.update.mti_dt_object; + char *name; + int rc = 0; + + if (!lu_object_exists(&obj->do_lu)) + RETURN(-ENOENT); + name = (char *)update_param_buf(update, 0, NULL); + if (name == NULL) { + CERROR("%s: empty name for index delete: rc = %d\n", + mdt_obd_name(info->mti_mdt), -EPROTO); + RETURN(err_serious(-EPROTO)); + } + + rc = out_tx_index_delete(info, obj, name, &info->mti_handle, + info->mti_u.update.mti_update_reply, + info->mti_u.update.mti_update_reply_index); + RETURN(rc); +} + +#define DEF_OUT_HNDL(opc, name, fail_id, flags, fn) \ +[opc - OBJ_CREATE] = { \ + .mh_name = name, \ + .mh_fail_id = fail_id, \ + .mh_opc = opc, \ + .mh_flags = flags, \ + .mh_act = fn, \ + .mh_fmt = NULL \ +} + +#define out_handler mdt_handler +static struct out_handler out_update_ops[] = { + DEF_OUT_HNDL(OBJ_CREATE, "obj_create", 0, MUTABOR | HABEO_REFERO, + out_create), + DEF_OUT_HNDL(OBJ_REF_ADD, "obj_ref_add", 0, MUTABOR | HABEO_REFERO, + out_ref_add), + DEF_OUT_HNDL(OBJ_REF_DEL, "obj_ref_del", 0, MUTABOR | HABEO_REFERO, + out_ref_del), + DEF_OUT_HNDL(OBJ_ATTR_SET, "obj_attr_set", 0, MUTABOR | HABEO_REFERO, + out_attr_set), + DEF_OUT_HNDL(OBJ_ATTR_GET, "obj_attr_get", 0, HABEO_REFERO, + out_attr_get), + DEF_OUT_HNDL(OBJ_XATTR_SET, "obj_xattr_set", 0, MUTABOR | HABEO_REFERO, + out_xattr_set), + DEF_OUT_HNDL(OBJ_XATTR_GET, "obj_xattr_get", 0, HABEO_REFERO, + out_xattr_get), + DEF_OUT_HNDL(OBJ_INDEX_LOOKUP, "obj_index_lookup", 0, + HABEO_REFERO, out_index_lookup), + DEF_OUT_HNDL(OBJ_INDEX_INSERT, "obj_index_insert", 0, + MUTABOR | HABEO_REFERO, out_index_insert), + DEF_OUT_HNDL(OBJ_INDEX_DELETE, "obj_index_delete", 0, + MUTABOR | HABEO_REFERO, out_index_delete), +}; + +#define out_opc_slice mdt_opc_slice +static struct out_opc_slice out_handlers[] = { + { + .mos_opc_start = OBJ_CREATE, + .mos_opc_end = OBJ_LAST, + .mos_hs = out_update_ops + }, +}; + +/** + * Object updates between Targets. Because all the updates has been + * dis-assemblied into object updates in master MDD layer, so out + * will skip MDD layer, and call OSD API directly to execute these + * updates. + * + * In phase I, all of the updates in the request need to be executed + * in one transaction, and the transaction has to be synchronously. + * + * Please refer to lustre/include/lustre/lustre_idl.h for req/reply + * format. + */ +int out_handle(struct mdt_thread_info *info) +{ + struct req_capsule *pill = info->mti_pill; + struct update_buf *ubuf; + struct update *update; + struct thandle_exec_args *th = &info->mti_handle; + int bufsize; + int count; + unsigned off; + int i; + int rc = 0; + int rc1 = 0; + ENTRY; + + req_capsule_set(pill, &RQF_UPDATE_OBJ); + bufsize = req_capsule_get_size(pill, &RMF_UPDATE, RCL_CLIENT); + if (bufsize != UPDATE_BUFFER_SIZE) { + CERROR("%s: invalid bufsize %d: rc = %d\n", + mdt_obd_name(info->mti_mdt), bufsize, -EPROTO); + RETURN(err_serious(-EPROTO)); + } + + ubuf = req_capsule_client_get(pill, &RMF_UPDATE); + if (ubuf == NULL) { + CERROR("%s: No buf!: rc = %d\n", mdt_obd_name(info->mti_mdt), + -EPROTO); + RETURN(err_serious(-EPROTO)); + } + + if (le32_to_cpu(ubuf->ub_magic) != UPDATE_BUFFER_MAGIC) { + CERROR("%s: invalid magic %x expect %x: rc = %d\n", + mdt_obd_name(info->mti_mdt), le32_to_cpu(ubuf->ub_magic), + UPDATE_BUFFER_MAGIC, -EPROTO); + RETURN(err_serious(-EPROTO)); + } + + count = le32_to_cpu(ubuf->ub_count); + if (count <= 0) { + CERROR("%s: No update!: rc = %d\n", + mdt_obd_name(info->mti_mdt), -EPROTO); + RETURN(err_serious(-EPROTO)); + } + + req_capsule_set_size(pill, &RMF_UPDATE_REPLY, RCL_SERVER, + UPDATE_BUFFER_SIZE); + rc = req_capsule_server_pack(pill); + if (rc != 0) { + CERROR("%s: Can't pack response: rc = %d\n", + mdt_obd_name(info->mti_mdt), rc); + RETURN(rc); + } + + /* Prepare the update reply buffer */ + info->mti_u.update.mti_update_reply = + req_capsule_server_get(pill, &RMF_UPDATE_REPLY); + update_init_reply_buf(info->mti_u.update.mti_update_reply, count); + + rc = out_tx_start(info->mti_env, info->mti_mdt, th); + if (rc != 0) + RETURN(rc); + + /* Walk through updates in the request to execute them synchronously */ + off = cfs_size_round(offsetof(struct update_buf, ub_bufs[0])); + for (i = 0; i < count; i++) { + struct out_handler *h; + struct dt_object *dt_obj; + + update = (struct update *)((char *)ubuf + off); + + fid_le_to_cpu(&update->u_fid, &update->u_fid); + if (!fid_is_sane(&update->u_fid)) { + CERROR("%s: invalid FID "DFID": rc = %d\n", + mdt_obd_name(info->mti_mdt), + PFID(&update->u_fid), -EPROTO); + GOTO(out, rc = err_serious(-EPROTO)); + } + dt_obj = out_object_find(info, &update->u_fid); + if (IS_ERR(dt_obj)) + GOTO(out, rc = PTR_ERR(dt_obj)); + + info->mti_u.update.mti_dt_object = dt_obj; + info->mti_u.update.mti_update = update; + info->mti_u.update.mti_update_reply_index = i; + + h = mdt_handler_find(update->u_type, out_handlers); + if (likely(h != NULL)) { + rc = h->mh_act(info); + } else { + CERROR("%s: The unsupported opc: 0x%x\n", + mdt_obd_name(info->mti_mdt), update->u_type); + lu_object_put(info->mti_env, &dt_obj->do_lu); + GOTO(out, rc = -ENOTSUPP); + } + lu_object_put(info->mti_env, &dt_obj->do_lu); + if (rc < 0) + GOTO(out, rc); + off += cfs_size_round(update_size(update)); + } + +out: + rc1 = out_tx_end(info, th); + rc = rc == 0 ? rc1 : rc; + info->mti_fail_id = OBD_FAIL_UPDATE_OBJ_NET; + RETURN(rc); +}