4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2013, Intel Corporation.
25 * lustre/mdt/out_handler.c
27 * Object update handler between targets.
29 * Author: di.wang <di.wang@intel.com>
32 #define DEBUG_SUBSYSTEM S_MDS
34 #include "mdt_internal.h"
35 #include <lustre_update.h>
37 static const char dot[] = ".";
38 static const char dotdot[] = "..";
40 /* Current out and mdt shared the same thread info, but in the future,
41 * this should be decoupled with MDT XXX*/
42 #define out_thread_info mdt_thread_info
43 #define out_thread_key mdt_thread_key
45 struct out_thread_info *out_env_info(const struct lu_env *env)
47 struct out_thread_info *info;
49 info = lu_context_key_get(&env->le_ctx, &out_thread_key);
50 LASSERT(info != NULL);
54 static inline char *dt_obd_name(struct dt_device *dt)
56 return dt->dd_lu_dev.ld_obd->obd_name;
59 struct tx_arg *tx_add_exec(struct thandle_exec_args *ta, tx_exec_func_t func,
60 tx_exec_func_t undo, char *file, int line)
68 LASSERT(i < UPDATE_MAX_OPS);
72 ta->ta_args[i].exec_fn = func;
73 ta->ta_args[i].undo_fn = undo;
74 ta->ta_args[i].file = file;
75 ta->ta_args[i].line = line;
77 return &ta->ta_args[i];
80 static int out_tx_start(const struct lu_env *env, struct dt_device *dt,
81 struct thandle_exec_args *th)
83 memset(th, 0, sizeof(*th));
84 th->ta_handle = dt_trans_create(env, dt);
85 if (IS_ERR(th->ta_handle)) {
86 CERROR("%s: start handle error: rc = %ld\n",
87 dt_obd_name(dt), PTR_ERR(th->ta_handle));
88 return PTR_ERR(th->ta_handle);
91 /*For phase I, sync for cross-ref operation*/
92 th->ta_handle->th_sync = 1;
96 static int out_trans_start(const struct lu_env *env,
97 struct thandle_exec_args *th)
99 /* Always do sync commit for Phase I */
100 LASSERT(th->ta_handle->th_sync != 0);
101 return dt_trans_start(env, th->ta_dev, th->ta_handle);
104 static int out_trans_stop(const struct lu_env *env,
105 struct thandle_exec_args *th, int err)
110 th->ta_handle->th_result = err;
111 LASSERT(th->ta_handle->th_sync != 0);
112 rc = dt_trans_stop(env, th->ta_dev, th->ta_handle);
113 for (i = 0; i < th->ta_argno; i++) {
114 if (th->ta_args[i].object != NULL) {
115 lu_object_put(env, &th->ta_args[i].object->do_lu);
116 th->ta_args[i].object = NULL;
123 int out_tx_end(const struct lu_env *env, struct thandle_exec_args *th)
125 struct out_thread_info *info = out_env_info(env);
129 LASSERT(th->ta_handle);
131 if (th->ta_err != 0 || th->ta_argno == 0)
132 GOTO(stop, rc = th->ta_err);
134 rc = out_trans_start(env, th);
138 for (i = 0; i < th->ta_argno; i++) {
139 rc = th->ta_args[i].exec_fn(env, th->ta_handle,
142 CDEBUG(D_INFO, "error during execution of #%u from"
143 " %s:%d: rc = %d\n", i, th->ta_args[i].file,
144 th->ta_args[i].line, rc);
146 LASSERTF(th->ta_args[i].undo_fn != NULL,
147 "can't undo changes, hope for failover!\n");
148 th->ta_args[i].undo_fn(env, th->ta_handle,
155 /* Only fail for real update */
156 info->mti_fail_id = OBD_FAIL_UPDATE_OBJ_NET_REP;
158 CDEBUG(D_INFO, "%s: executed %u/%u: rc = %d\n",
159 dt_obd_name(th->ta_dev), i, th->ta_argno, rc);
160 out_trans_stop(env, th, rc);
161 th->ta_handle = NULL;
168 static void out_reconstruct(const struct lu_env *env, struct dt_device *dt,
169 struct dt_object *obj, struct update_reply *reply,
172 CDEBUG(D_INFO, "%s: fork reply reply %p index %d: rc = %d\n",
173 dt_obd_name(dt), reply, index, 0);
175 update_insert_reply(reply, NULL, 0, index, 0);
179 typedef void (*out_reconstruct_t)(const struct lu_env *env,
180 struct dt_device *dt,
181 struct dt_object *obj,
182 struct update_reply *reply,
185 static inline int out_check_resent(const struct lu_env *env,
186 struct dt_device *dt,
187 struct dt_object *obj,
188 struct ptlrpc_request *req,
189 out_reconstruct_t reconstruct,
190 struct update_reply *reply,
193 if (likely(!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)))
196 if (req_xid_is_last(req)) {
197 reconstruct(env, dt, obj, reply, index);
200 DEBUG_REQ(D_HA, req, "no reply for RESENT req (have "LPD64")",
201 req->rq_export->exp_target_data.ted_lcd->lcd_last_xid);
205 static int out_obj_destroy(const struct lu_env *env, struct dt_object *dt_obj,
210 CDEBUG(D_INFO, "%s: destroy "DFID"\n", dt_obd_name(th->th_dev),
211 PFID(lu_object_fid(&dt_obj->do_lu)));
213 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
214 rc = dt_destroy(env, dt_obj, th);
215 dt_write_unlock(env, dt_obj);
221 * All of the xxx_undo will be used once execution failed,
222 * But because all of the required resource has been reserved in
223 * declare phase, i.e. if declare succeed, it should make sure
224 * the following executing phase succeed in anyway, so these undo
225 * should be useless for most of the time in Phase I
227 int out_tx_create_undo(const struct lu_env *env, struct thandle *th,
232 rc = out_obj_destroy(env, arg->object, th);
234 CERROR("%s: undo failure, we are doomed!: rc = %d\n",
235 dt_obd_name(th->th_dev), rc);
239 int out_tx_create_exec(const struct lu_env *env, struct thandle *th,
242 struct dt_object *dt_obj = arg->object;
245 CDEBUG(D_OTHER, "%s: create "DFID": dof %u, mode %o\n",
246 dt_obd_name(th->th_dev),
247 PFID(lu_object_fid(&arg->object->do_lu)),
248 arg->u.create.dof.dof_type,
249 arg->u.create.attr.la_mode & S_IFMT);
251 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
252 rc = dt_create(env, dt_obj, &arg->u.create.attr,
253 &arg->u.create.hint, &arg->u.create.dof, th);
255 dt_write_unlock(env, dt_obj);
257 CDEBUG(D_INFO, "%s: insert create reply %p index %d: rc = %d\n",
258 dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
260 update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
265 static int __out_tx_create(const struct lu_env *env, struct dt_object *obj,
266 struct lu_attr *attr, struct lu_fid *parent_fid,
267 struct dt_object_format *dof,
268 struct thandle_exec_args *th,
269 struct update_reply *reply,
270 int index, char *file, int line)
274 LASSERT(th->ta_handle != NULL);
275 th->ta_err = dt_declare_create(env, obj, attr, NULL, dof,
280 arg = tx_add_exec(th, out_tx_create_exec, out_tx_create_undo, file,
284 /* release the object in out_trans_stop */
285 lu_object_get(&obj->do_lu);
287 arg->u.create.attr = *attr;
289 arg->u.create.fid = *parent_fid;
290 memset(&arg->u.create.hint, 0, sizeof(arg->u.create.hint));
291 arg->u.create.dof = *dof;
298 static int out_create(struct out_thread_info *info)
300 struct update *update = info->mti_u.update.mti_update;
301 struct dt_object *obj = info->mti_u.update.mti_dt_object;
302 struct dt_object_format *dof = &info->mti_u.update.mti_update_dof;
303 struct obdo *lobdo = &info->mti_u.update.mti_obdo;
304 struct lu_attr *attr = &info->mti_attr.ma_attr;
305 struct lu_fid *fid = NULL;
312 wobdo = update_param_buf(update, 0, &size);
313 if (wobdo == NULL || size != sizeof(*wobdo)) {
314 CERROR("%s: obdo is NULL, invalid RPC: rc = %d\n",
315 mdt_obd_name(info->mti_mdt), -EPROTO);
316 RETURN(err_serious(-EPROTO));
319 obdo_le_to_cpu(wobdo, wobdo);
320 lustre_get_wire_obdo(NULL, lobdo, wobdo);
321 la_from_obdo(attr, lobdo, lobdo->o_valid);
323 dof->dof_type = dt_mode_to_dft(attr->la_mode);
324 if (S_ISDIR(attr->la_mode)) {
327 fid = update_param_buf(update, 1, &size);
328 if (fid == NULL || size != sizeof(*fid)) {
329 CERROR("%s: invalid fid: rc = %d\n",
330 mdt_obd_name(info->mti_mdt), -EPROTO);
331 RETURN(err_serious(-EPROTO));
333 fid_le_to_cpu(fid, fid);
334 if (!fid_is_sane(fid)) {
335 CERROR("%s: invalid fid "DFID": rc = %d\n",
336 mdt_obd_name(info->mti_mdt),
338 RETURN(err_serious(-EPROTO));
342 if (lu_object_exists(&obj->do_lu))
345 rc = out_tx_create(info->mti_env, obj, attr, fid, dof,
347 info->mti_u.update.mti_update_reply,
348 info->mti_u.update.mti_update_reply_index);
353 static int out_tx_attr_set_undo(const struct lu_env *env,
354 struct thandle *th, struct tx_arg *arg)
356 CERROR("%s: attr set undo "DFID" unimplemented yet!: rc = %d\n",
357 dt_obd_name(th->th_dev),
358 PFID(lu_object_fid(&arg->object->do_lu)), -ENOTSUPP);
363 static int out_tx_attr_set_exec(const struct lu_env *env, struct thandle *th,
366 struct dt_object *dt_obj = arg->object;
369 CDEBUG(D_OTHER, "%s: attr set "DFID"\n", dt_obd_name(th->th_dev),
370 PFID(lu_object_fid(&dt_obj->do_lu)));
372 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
373 rc = dt_attr_set(env, dt_obj, &arg->u.attr_set.attr,
375 dt_write_unlock(env, dt_obj);
377 CDEBUG(D_INFO, "%s: insert attr_set reply %p index %d: rc = %d\n",
378 dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
380 update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
385 static int __out_tx_attr_set(const struct lu_env *env,
386 struct dt_object *dt_obj,
387 const struct lu_attr *attr,
388 struct thandle_exec_args *th,
389 struct update_reply *reply, int index,
390 char *file, int line)
394 LASSERT(th->ta_handle != NULL);
395 th->ta_err = dt_declare_attr_set(env, dt_obj, attr, th->ta_handle);
399 arg = tx_add_exec(th, out_tx_attr_set_exec, out_tx_attr_set_undo,
402 lu_object_get(&dt_obj->do_lu);
403 arg->object = dt_obj;
404 arg->u.attr_set.attr = *attr;
410 static int out_attr_set(struct out_thread_info *info)
412 struct update *update = info->mti_u.update.mti_update;
413 struct lu_attr *attr = &info->mti_attr.ma_attr;
414 struct dt_object *obj = info->mti_u.update.mti_dt_object;
415 struct obdo *lobdo = &info->mti_u.update.mti_obdo;
422 wobdo = update_param_buf(update, 0, &size);
423 if (wobdo == NULL || size != sizeof(*wobdo)) {
424 CERROR("%s: empty obdo in the update: rc = %d\n",
425 mdt_obd_name(info->mti_mdt), -EPROTO);
426 RETURN(err_serious(-EPROTO));
431 obdo_le_to_cpu(wobdo, wobdo);
432 lustre_get_wire_obdo(NULL, lobdo, wobdo);
433 la_from_obdo(attr, lobdo, lobdo->o_valid);
435 rc = out_tx_attr_set(info->mti_env, obj, attr, &info->mti_handle,
436 info->mti_u.update.mti_update_reply,
437 info->mti_u.update.mti_update_reply_index);
442 static int out_attr_get(struct out_thread_info *info)
444 struct obdo *obdo = &info->mti_u.update.mti_obdo;
445 const struct lu_env *env = info->mti_env;
446 struct lu_attr *la = &info->mti_attr.ma_attr;
447 struct dt_object *obj = info->mti_u.update.mti_dt_object;
452 if (!lu_object_exists(&obj->do_lu))
455 dt_read_lock(env, obj, MOR_TGT_CHILD);
456 rc = dt_attr_get(env, obj, la, NULL);
458 GOTO(out_unlock, rc);
460 * If it is a directory, we will also check whether the
461 * directory is empty.
462 * la_flags = 0 : Empty.
466 if (S_ISDIR(la->la_mode)) {
468 const struct dt_it_ops *iops;
470 if (!dt_try_as_dir(env, obj))
471 GOTO(out_unlock, rc = -ENOTDIR);
473 iops = &obj->do_index_ops->dio_it;
474 it = iops->init(env, obj, LUDA_64BITHASH, BYPASS_CAPA);
477 result = iops->get(env, it, (const void *)"");
480 for (result = 0, i = 0; result == 0 && i < 3;
482 result = iops->next(env, it);
485 } else if (result == 0)
487 * Huh? Index contains no zero key?
497 obdo_from_la(obdo, la, la->la_valid);
498 obdo_cpu_to_le(obdo, obdo);
499 lustre_set_wire_obdo(NULL, obdo, obdo);
502 dt_read_unlock(env, obj);
504 CDEBUG(D_INFO, "%s: insert attr get reply %p index %d: rc = %d\n",
505 mdt_obd_name(info->mti_mdt),
506 info->mti_u.update.mti_update_reply, 0, rc);
508 update_insert_reply(info->mti_u.update.mti_update_reply, obdo,
509 sizeof(*obdo), 0, rc);
513 static int out_xattr_get(struct out_thread_info *info)
515 struct update *update = info->mti_u.update.mti_update;
516 const struct lu_env *env = info->mti_env;
517 struct lu_buf *lbuf = &info->mti_buf;
518 struct update_reply *reply = info->mti_u.update.mti_update_reply;
519 struct dt_object *obj = info->mti_u.update.mti_dt_object;
526 name = (char *)update_param_buf(update, 0, NULL);
528 CERROR("%s: empty name for xattr get: rc = %d\n",
529 mdt_obd_name(info->mti_mdt), -EPROTO);
530 RETURN(err_serious(-EPROTO));
533 ptr = update_get_buf_internal(reply, 0, NULL);
534 LASSERT(ptr != NULL);
536 /* The first 4 bytes(int) are used to store the result */
537 lbuf->lb_buf = (char *)ptr + sizeof(int);
538 lbuf->lb_len = UPDATE_BUFFER_SIZE - sizeof(struct update_reply);
539 dt_read_lock(env, obj, MOR_TGT_CHILD);
540 rc = dt_xattr_get(env, obj, lbuf, name, NULL);
541 dt_read_unlock(env, obj);
548 GOTO(out, rc = -ENOENT);
552 CDEBUG(D_INFO, "%s: "DFID" get xattr %s len %d\n",
553 mdt_obd_name(info->mti_mdt), PFID(lu_object_fid(&obj->do_lu)),
554 name, (int)lbuf->lb_len);
557 reply->ur_lens[0] = lbuf->lb_len + sizeof(int);
561 static int out_index_lookup(struct out_thread_info *info)
563 struct update *update = info->mti_u.update.mti_update;
564 const struct lu_env *env = info->mti_env;
565 struct dt_object *obj = info->mti_u.update.mti_dt_object;
571 if (!lu_object_exists(&obj->do_lu))
574 name = (char *)update_param_buf(update, 0, NULL);
576 CERROR("%s: empty name for lookup: rc = %d\n",
577 mdt_obd_name(info->mti_mdt), -EPROTO);
578 RETURN(err_serious(-EPROTO));
581 dt_read_lock(env, obj, MOR_TGT_CHILD);
582 if (!dt_try_as_dir(env, obj))
583 GOTO(out_unlock, rc = -ENOTDIR);
585 rc = dt_lookup(env, obj, (struct dt_rec *)&info->mti_tmp_fid1,
586 (struct dt_key *)name, NULL);
589 GOTO(out_unlock, rc);
594 CDEBUG(D_INFO, "lookup "DFID" %s get "DFID" rc %d\n",
595 PFID(lu_object_fid(&obj->do_lu)), name,
596 PFID(&info->mti_tmp_fid1), rc);
597 fid_cpu_to_le(&info->mti_tmp_fid1, &info->mti_tmp_fid1);
600 dt_read_unlock(env, obj);
602 CDEBUG(D_INFO, "%s: insert lookup reply %p index %d: rc = %d\n",
603 mdt_obd_name(info->mti_mdt),
604 info->mti_u.update.mti_update_reply, 0, rc);
606 update_insert_reply(info->mti_u.update.mti_update_reply,
607 &info->mti_tmp_fid1, sizeof(info->mti_tmp_fid1),
612 static int out_tx_xattr_set_exec(const struct lu_env *env,
616 struct dt_object *dt_obj = arg->object;
619 CDEBUG(D_INFO, "%s: set xattr buf %p name %s flag %d\n",
620 dt_obd_name(th->th_dev), arg->u.xattr_set.buf.lb_buf,
621 arg->u.xattr_set.name, arg->u.xattr_set.flags);
623 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
624 rc = dt_xattr_set(env, dt_obj, &arg->u.xattr_set.buf,
625 arg->u.xattr_set.name, arg->u.xattr_set.flags,
627 dt_write_unlock(env, dt_obj);
629 * Ignore errors if this is LINK EA
631 if (unlikely(rc && !strncmp(arg->u.xattr_set.name, XATTR_NAME_LINK,
632 strlen(XATTR_NAME_LINK))))
635 CDEBUG(D_INFO, "%s: insert xattr set reply %p index %d: rc = %d\n",
636 dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
638 update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
643 static int __out_tx_xattr_set(const struct lu_env *env,
644 struct dt_object *dt_obj,
645 const struct lu_buf *buf,
646 const char *name, int flags,
647 struct thandle_exec_args *th,
648 struct update_reply *reply, int index,
649 char *file, int line)
653 LASSERT(th->ta_handle != NULL);
654 th->ta_err = dt_declare_xattr_set(env, dt_obj, buf, name,
655 flags, th->ta_handle);
659 arg = tx_add_exec(th, out_tx_xattr_set_exec, NULL, file, line);
661 lu_object_get(&dt_obj->do_lu);
662 arg->object = dt_obj;
663 arg->u.xattr_set.name = name;
664 arg->u.xattr_set.flags = flags;
665 arg->u.xattr_set.buf = *buf;
668 arg->u.xattr_set.csum = 0;
672 static int out_xattr_set(struct out_thread_info *info)
674 struct update *update = info->mti_u.update.mti_update;
675 struct dt_object *obj = info->mti_u.update.mti_dt_object;
676 struct lu_buf *lbuf = &info->mti_buf;
685 name = update_param_buf(update, 0, NULL);
687 CERROR("%s: empty name for xattr set: rc = %d\n",
688 mdt_obd_name(info->mti_mdt), -EPROTO);
689 RETURN(err_serious(-EPROTO));
692 buf = (char *)update_param_buf(update, 1, &buf_len);
693 if (buf == NULL || buf_len == 0) {
694 CERROR("%s: empty buf for xattr set: rc = %d\n",
695 mdt_obd_name(info->mti_mdt), -EPROTO);
696 RETURN(err_serious(-EPROTO));
700 lbuf->lb_len = buf_len;
702 tmp = (char *)update_param_buf(update, 2, NULL);
704 CERROR("%s: empty flag for xattr set: rc = %d\n",
705 mdt_obd_name(info->mti_mdt), -EPROTO);
706 RETURN(err_serious(-EPROTO));
709 flag = le32_to_cpu(*(int *)tmp);
711 rc = out_tx_xattr_set(info->mti_env, obj, lbuf, name, flag,
713 info->mti_u.update.mti_update_reply,
714 info->mti_u.update.mti_update_reply_index);
718 static int out_obj_ref_add(const struct lu_env *env,
719 struct dt_object *dt_obj,
724 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
725 rc = dt_ref_add(env, dt_obj, th);
726 dt_write_unlock(env, dt_obj);
731 static int out_obj_ref_del(const struct lu_env *env,
732 struct dt_object *dt_obj,
737 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
738 rc = dt_ref_del(env, dt_obj, th);
739 dt_write_unlock(env, dt_obj);
744 static int out_tx_ref_add_exec(const struct lu_env *env, struct thandle *th,
747 struct dt_object *dt_obj = arg->object;
750 rc = out_obj_ref_add(env, dt_obj, th);
752 CDEBUG(D_INFO, "%s: insert ref_add reply %p index %d: rc = %d\n",
753 dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
755 update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
759 static int out_tx_ref_add_undo(const struct lu_env *env, struct thandle *th,
762 return out_obj_ref_del(env, arg->object, th);
765 static int __out_tx_ref_add(const struct lu_env *env,
766 struct dt_object *dt_obj,
767 struct thandle_exec_args *th,
768 struct update_reply *reply,
769 int index, char *file, int line)
773 LASSERT(th->ta_handle != NULL);
774 th->ta_err = dt_declare_ref_add(env, dt_obj, th->ta_handle);
778 arg = tx_add_exec(th, out_tx_ref_add_exec, out_tx_ref_add_undo, file,
781 lu_object_get(&dt_obj->do_lu);
782 arg->object = dt_obj;
789 * increase ref of the object
791 static int out_ref_add(struct out_thread_info *info)
793 struct dt_object *obj = info->mti_u.update.mti_dt_object;
798 rc = out_tx_ref_add(info->mti_env, obj, &info->mti_handle,
799 info->mti_u.update.mti_update_reply,
800 info->mti_u.update.mti_update_reply_index);
804 static int out_tx_ref_del_exec(const struct lu_env *env, struct thandle *th,
807 struct dt_object *dt_obj = arg->object;
810 rc = out_obj_ref_del(env, dt_obj, th);
812 CDEBUG(D_INFO, "%s: insert ref_del reply %p index %d: rc = %d\n",
813 dt_obd_name(th->th_dev), arg->reply, arg->index, 0);
815 update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
820 static int out_tx_ref_del_undo(const struct lu_env *env, struct thandle *th,
823 return out_obj_ref_add(env, arg->object, th);
826 static int __out_tx_ref_del(const struct lu_env *env,
827 struct dt_object *dt_obj,
828 struct thandle_exec_args *th,
829 struct update_reply *reply,
830 int index, char *file, int line)
834 LASSERT(th->ta_handle != NULL);
835 th->ta_err = dt_declare_ref_del(env, dt_obj, th->ta_handle);
839 arg = tx_add_exec(th, out_tx_ref_del_exec, out_tx_ref_del_undo, file,
842 lu_object_get(&dt_obj->do_lu);
843 arg->object = dt_obj;
849 static int out_ref_del(struct out_thread_info *info)
851 struct dt_object *obj = info->mti_u.update.mti_dt_object;
856 if (!lu_object_exists(&obj->do_lu))
859 rc = out_tx_ref_del(info->mti_env, obj, &info->mti_handle,
860 info->mti_u.update.mti_update_reply,
861 info->mti_u.update.mti_update_reply_index);
865 static int out_obj_index_insert(const struct lu_env *env,
866 struct dt_object *dt_obj,
867 const struct dt_rec *rec,
868 const struct dt_key *key,
873 CDEBUG(D_INFO, "%s: index insert "DFID" name: %s fid "DFID"\n",
874 dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
875 (char *)key, PFID((struct lu_fid *)rec));
877 if (dt_try_as_dir(env, dt_obj) == 0)
880 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
881 rc = dt_insert(env, dt_obj, rec, key, th, NULL, 0);
882 dt_write_unlock(env, dt_obj);
887 static int out_obj_index_delete(const struct lu_env *env,
888 struct dt_object *dt_obj,
889 const struct dt_key *key,
894 CDEBUG(D_INFO, "%s: index delete "DFID" name: %s\n",
895 dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
898 if (dt_try_as_dir(env, dt_obj) == 0)
901 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
902 rc = dt_delete(env, dt_obj, key, th, NULL);
903 dt_write_unlock(env, dt_obj);
908 static int out_tx_index_insert_exec(const struct lu_env *env,
909 struct thandle *th, struct tx_arg *arg)
911 struct dt_object *dt_obj = arg->object;
914 rc = out_obj_index_insert(env, dt_obj, arg->u.insert.rec,
915 arg->u.insert.key, th);
917 CDEBUG(D_INFO, "%s: insert idx insert reply %p index %d: rc = %d\n",
918 dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
920 update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
925 static int out_tx_index_insert_undo(const struct lu_env *env,
926 struct thandle *th, struct tx_arg *arg)
928 return out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
931 static int __out_tx_index_insert(const struct lu_env *env,
932 struct dt_object *dt_obj,
933 char *name, struct lu_fid *fid,
934 struct thandle_exec_args *th,
935 struct update_reply *reply,
936 int index, char *file, int line)
940 LASSERT(th->ta_handle != NULL);
942 if (lu_object_exists(&dt_obj->do_lu)) {
943 if (dt_try_as_dir(env, dt_obj) == 0) {
944 th->ta_err = -ENOTDIR;
947 th->ta_err = dt_declare_insert(env, dt_obj,
948 (struct dt_rec *)fid,
949 (struct dt_key *)name,
956 arg = tx_add_exec(th, out_tx_index_insert_exec,
957 out_tx_index_insert_undo, file,
960 lu_object_get(&dt_obj->do_lu);
961 arg->object = dt_obj;
964 arg->u.insert.rec = (struct dt_rec *)fid;
965 arg->u.insert.key = (struct dt_key *)name;
970 static int out_index_insert(struct out_thread_info *info)
972 struct update *update = info->mti_u.update.mti_update;
973 struct dt_object *obj = info->mti_u.update.mti_dt_object;
980 name = (char *)update_param_buf(update, 0, NULL);
982 CERROR("%s: empty name for index insert: rc = %d\n",
983 mdt_obd_name(info->mti_mdt), -EPROTO);
984 RETURN(err_serious(-EPROTO));
987 fid = (struct lu_fid *)update_param_buf(update, 1, &size);
988 if (fid == NULL || size != sizeof(*fid)) {
989 CERROR("%s: invalid fid: rc = %d\n",
990 mdt_obd_name(info->mti_mdt), -EPROTO);
991 RETURN(err_serious(-EPROTO));
994 fid_le_to_cpu(fid, fid);
995 if (!fid_is_sane(fid)) {
996 CERROR("%s: invalid FID "DFID": rc = %d\n",
997 mdt_obd_name(info->mti_mdt), PFID(fid),
999 RETURN(err_serious(-EPROTO));
1002 rc = out_tx_index_insert(info->mti_env, obj, name, fid,
1004 info->mti_u.update.mti_update_reply,
1005 info->mti_u.update.mti_update_reply_index);
1009 static int out_tx_index_delete_exec(const struct lu_env *env,
1015 rc = out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
1017 CDEBUG(D_INFO, "%s: insert idx insert reply %p index %d: rc = %d\n",
1018 dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
1020 update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
1025 static int out_tx_index_delete_undo(const struct lu_env *env,
1029 CERROR("%s: Oops, can not rollback index_delete yet: rc = %d\n",
1030 dt_obd_name(th->th_dev), -ENOTSUPP);
1034 static int __out_tx_index_delete(const struct lu_env *env,
1035 struct dt_object *dt_obj, char *name,
1036 struct thandle_exec_args *th,
1037 struct update_reply *reply,
1038 int index, char *file, int line)
1042 if (dt_try_as_dir(env, dt_obj) == 0) {
1043 th->ta_err = -ENOTDIR;
1047 LASSERT(th->ta_handle != NULL);
1048 th->ta_err = dt_declare_delete(env, dt_obj,
1049 (struct dt_key *)name,
1051 if (th->ta_err != 0)
1054 arg = tx_add_exec(th, out_tx_index_delete_exec,
1055 out_tx_index_delete_undo, file,
1058 lu_object_get(&dt_obj->do_lu);
1059 arg->object = dt_obj;
1062 arg->u.insert.key = (struct dt_key *)name;
1066 static int out_index_delete(struct out_thread_info *info)
1068 struct update *update = info->mti_u.update.mti_update;
1069 struct dt_object *obj = info->mti_u.update.mti_dt_object;
1073 if (!lu_object_exists(&obj->do_lu))
1075 name = (char *)update_param_buf(update, 0, NULL);
1077 CERROR("%s: empty name for index delete: rc = %d\n",
1078 mdt_obd_name(info->mti_mdt), -EPROTO);
1079 RETURN(err_serious(-EPROTO));
1082 rc = out_tx_index_delete(info->mti_env, obj, name, &info->mti_handle,
1083 info->mti_u.update.mti_update_reply,
1084 info->mti_u.update.mti_update_reply_index);
1088 static int out_tx_destroy_exec(const struct lu_env *env, struct thandle *th,
1091 struct dt_object *dt_obj = arg->object;
1094 rc = out_obj_destroy(env, dt_obj, th);
1096 CDEBUG(D_INFO, "%s: insert destroy reply %p index %d: rc = %d\n",
1097 dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
1099 update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
1104 static int out_tx_destroy_undo(const struct lu_env *env, struct thandle *th,
1107 CERROR("%s: not support destroy undo yet!: rc = %d\n",
1108 dt_obd_name(th->th_dev), -ENOTSUPP);
1112 static int __out_tx_destroy(const struct lu_env *env, struct dt_object *dt_obj,
1113 struct thandle_exec_args *th,
1114 struct update_reply *reply,
1115 int index, char *file, int line)
1119 LASSERT(th->ta_handle != NULL);
1120 th->ta_err = dt_declare_destroy(env, dt_obj, th->ta_handle);
1124 arg = tx_add_exec(th, out_tx_destroy_exec, out_tx_destroy_undo,
1127 lu_object_get(&dt_obj->do_lu);
1128 arg->object = dt_obj;
1134 static int out_destroy(struct out_thread_info *info)
1136 struct update *update = info->mti_u.update.mti_update;
1137 struct dt_object *obj = info->mti_u.update.mti_dt_object;
1142 fid = &update->u_fid;
1143 fid_le_to_cpu(fid, fid);
1144 if (!fid_is_sane(fid)) {
1145 CERROR("%s: invalid FID "DFID": rc = %d\n",
1146 mdt_obd_name(info->mti_mdt), PFID(fid), -EPROTO);
1147 RETURN(err_serious(-EPROTO));
1150 if (!lu_object_exists(&obj->do_lu))
1153 rc = out_tx_destroy(info->mti_env, obj, &info->mti_handle,
1154 info->mti_u.update.mti_update_reply,
1155 info->mti_u.update.mti_update_reply_index);
1160 #define DEF_OUT_HNDL(opc, name, fail_id, flags, fn) \
1161 [opc - OBJ_CREATE] = { \
1163 .mh_fail_id = fail_id, \
1165 .mh_flags = flags, \
1170 #define out_handler mdt_handler
1171 static struct out_handler out_update_ops[] = {
1172 DEF_OUT_HNDL(OBJ_CREATE, "obj_create", 0, MUTABOR | HABEO_REFERO,
1174 DEF_OUT_HNDL(OBJ_DESTROY, "obj_create", 0, MUTABOR | HABEO_REFERO,
1176 DEF_OUT_HNDL(OBJ_REF_ADD, "obj_ref_add", 0, MUTABOR | HABEO_REFERO,
1178 DEF_OUT_HNDL(OBJ_REF_DEL, "obj_ref_del", 0, MUTABOR | HABEO_REFERO,
1180 DEF_OUT_HNDL(OBJ_ATTR_SET, "obj_attr_set", 0, MUTABOR | HABEO_REFERO,
1182 DEF_OUT_HNDL(OBJ_ATTR_GET, "obj_attr_get", 0, HABEO_REFERO,
1184 DEF_OUT_HNDL(OBJ_XATTR_SET, "obj_xattr_set", 0, MUTABOR | HABEO_REFERO,
1186 DEF_OUT_HNDL(OBJ_XATTR_GET, "obj_xattr_get", 0, HABEO_REFERO,
1188 DEF_OUT_HNDL(OBJ_INDEX_LOOKUP, "obj_index_lookup", 0, HABEO_REFERO,
1190 DEF_OUT_HNDL(OBJ_INDEX_INSERT, "obj_index_insert", 0,
1191 MUTABOR | HABEO_REFERO, out_index_insert),
1192 DEF_OUT_HNDL(OBJ_INDEX_DELETE, "obj_index_delete", 0,
1193 MUTABOR | HABEO_REFERO, out_index_delete),
1196 #define out_opc_slice mdt_opc_slice
1197 static struct out_opc_slice out_handlers[] = {
1199 .mos_opc_start = OBJ_CREATE,
1200 .mos_opc_end = OBJ_LAST,
1201 .mos_hs = out_update_ops
1206 * Object updates between Targets. Because all the updates has been
1207 * dis-assemblied into object updates in master MDD layer, so out
1208 * will skip MDD layer, and call OSD API directly to execute these
1211 * In phase I, all of the updates in the request need to be executed
1212 * in one transaction, and the transaction has to be synchronously.
1214 * Please refer to lustre/include/lustre/lustre_idl.h for req/reply
1217 int out_handle(struct out_thread_info *info)
1219 struct thandle_exec_args *th = &info->mti_handle;
1220 struct req_capsule *pill = info->mti_pill;
1221 struct mdt_device *mdt = info->mti_mdt;
1222 struct dt_device *dt = mdt->mdt_bottom;
1223 const struct lu_env *env = info->mti_env;
1224 struct update_buf *ubuf;
1225 struct update *update;
1226 struct update_reply *update_reply;
1229 int old_batchid = -1;
1236 req_capsule_set(pill, &RQF_UPDATE_OBJ);
1237 bufsize = req_capsule_get_size(pill, &RMF_UPDATE, RCL_CLIENT);
1238 if (bufsize != UPDATE_BUFFER_SIZE) {
1239 CERROR("%s: invalid bufsize %d: rc = %d\n",
1240 mdt_obd_name(mdt), bufsize, -EPROTO);
1241 RETURN(err_serious(-EPROTO));
1244 ubuf = req_capsule_client_get(pill, &RMF_UPDATE);
1246 CERROR("%s: No buf!: rc = %d\n", mdt_obd_name(mdt),
1248 RETURN(err_serious(-EPROTO));
1251 if (le32_to_cpu(ubuf->ub_magic) != UPDATE_BUFFER_MAGIC) {
1252 CERROR("%s: invalid magic %x expect %x: rc = %d\n",
1253 mdt_obd_name(mdt), le32_to_cpu(ubuf->ub_magic),
1254 UPDATE_BUFFER_MAGIC, -EPROTO);
1255 RETURN(err_serious(-EPROTO));
1258 count = le32_to_cpu(ubuf->ub_count);
1260 CERROR("%s: No update!: rc = %d\n",
1261 mdt_obd_name(mdt), -EPROTO);
1262 RETURN(err_serious(-EPROTO));
1265 req_capsule_set_size(pill, &RMF_UPDATE_REPLY, RCL_SERVER,
1266 UPDATE_BUFFER_SIZE);
1267 rc = req_capsule_server_pack(pill);
1269 CERROR("%s: Can't pack response: rc = %d\n",
1270 mdt_obd_name(mdt), rc);
1274 /* Prepare the update reply buffer */
1275 update_reply = req_capsule_server_get(pill, &RMF_UPDATE_REPLY);
1276 update_init_reply_buf(update_reply, count);
1277 info->mti_u.update.mti_update_reply = update_reply;
1279 rc = out_tx_start(env, dt, th);
1283 /* Walk through updates in the request to execute them synchronously */
1284 off = cfs_size_round(offsetof(struct update_buf, ub_bufs[0]));
1285 for (i = 0; i < count; i++) {
1286 struct out_handler *h;
1287 struct dt_object *dt_obj;
1289 update = (struct update *)((char *)ubuf + off);
1290 if (old_batchid == -1) {
1291 old_batchid = update->u_batchid;
1292 } else if (old_batchid != update->u_batchid) {
1293 /* Stop the current update transaction,
1294 * create a new one */
1295 rc = out_tx_end(env, th);
1299 rc = out_tx_start(env, dt, th);
1302 old_batchid = update->u_batchid;
1305 fid_le_to_cpu(&update->u_fid, &update->u_fid);
1306 if (!fid_is_sane(&update->u_fid)) {
1307 CERROR("%s: invalid FID "DFID": rc = %d\n",
1308 mdt_obd_name(mdt), PFID(&update->u_fid),
1310 GOTO(out, rc = err_serious(-EPROTO));
1313 dt_obj = dt_locate(env, dt, &update->u_fid);
1315 GOTO(out, rc = PTR_ERR(dt_obj));
1317 info->mti_u.update.mti_dt_object = dt_obj;
1318 info->mti_u.update.mti_update = update;
1319 info->mti_u.update.mti_update_reply_index = i;
1321 h = mdt_handler_find(update->u_type, out_handlers);
1322 if (likely(h != NULL)) {
1323 /* For real modification RPC, check if the update
1324 * has been executed */
1325 if (h->mh_flags & MUTABOR) {
1326 struct ptlrpc_request *req = mdt_info_req(info);
1328 if (out_check_resent(env, dt, dt_obj, req,
1334 rc = h->mh_act(info);
1336 CERROR("%s: The unsupported opc: 0x%x\n",
1337 mdt_obd_name(mdt), update->u_type);
1338 lu_object_put(env, &dt_obj->do_lu);
1339 GOTO(out, rc = -ENOTSUPP);
1342 lu_object_put(env, &dt_obj->do_lu);
1345 off += cfs_size_round(update_size(update));
1348 rc1 = out_tx_end(env, th);
1349 rc = rc == 0 ? rc1 : rc;