4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2013, Intel Corporation.
25 * lustre/mdt/out_handler.c
27 * Object update handler between targets.
29 * Author: di.wang <di.wang@intel.com>
33 # define EXPORT_SYMTAB
35 #define DEBUG_SUBSYSTEM S_MDS
37 #include "mdt_internal.h"
38 #include <lustre_update.h>
40 static const char dot[] = ".";
41 static const char dotdot[] = "..";
43 /* Current out and mdt shared the same thread info, but in the future,
44 * this should be decoupled with MDT XXX*/
45 #define out_thread_info mdt_thread_info
46 #define out_thread_key mdt_thread_key
48 struct out_thread_info *out_env_info(const struct lu_env *env)
50 struct out_thread_info *info;
52 info = lu_context_key_get(&env->le_ctx, &out_thread_key);
53 LASSERT(info != NULL);
57 static inline char *dt_obd_name(struct dt_device *dt)
59 return dt->dd_lu_dev.ld_obd->obd_name;
62 struct tx_arg *tx_add_exec(struct thandle_exec_args *ta, tx_exec_func_t func,
63 tx_exec_func_t undo, char *file, int line)
71 LASSERT(i < UPDATE_MAX_OPS);
75 ta->ta_args[i].exec_fn = func;
76 ta->ta_args[i].undo_fn = undo;
77 ta->ta_args[i].file = file;
78 ta->ta_args[i].line = line;
80 return &ta->ta_args[i];
83 static int out_tx_start(const struct lu_env *env, struct dt_device *dt,
84 struct thandle_exec_args *th)
86 memset(th, 0, sizeof(*th));
87 th->ta_handle = dt_trans_create(env, dt);
88 if (IS_ERR(th->ta_handle)) {
89 CERROR("%s: start handle error: rc = %ld\n",
90 dt_obd_name(dt), PTR_ERR(th->ta_handle));
91 return PTR_ERR(th->ta_handle);
94 /*For phase I, sync for cross-ref operation*/
95 th->ta_handle->th_sync = 1;
99 static int out_trans_start(const struct lu_env *env,
100 struct thandle_exec_args *th)
102 /* Always do sync commit for Phase I */
103 LASSERT(th->ta_handle->th_sync != 0);
104 return dt_trans_start(env, th->ta_dev, th->ta_handle);
107 static int out_trans_stop(const struct lu_env *env,
108 struct thandle_exec_args *th, int err)
113 th->ta_handle->th_result = err;
114 LASSERT(th->ta_handle->th_sync != 0);
115 rc = dt_trans_stop(env, th->ta_dev, th->ta_handle);
116 for (i = 0; i < th->ta_argno; i++) {
117 if (th->ta_args[i].object != NULL) {
118 lu_object_put(env, &th->ta_args[i].object->do_lu);
119 th->ta_args[i].object = NULL;
126 int out_tx_end(const struct lu_env *env, struct thandle_exec_args *th)
128 struct out_thread_info *info = out_env_info(env);
132 LASSERT(th->ta_handle);
134 if (th->ta_err != 0 || th->ta_argno == 0)
135 GOTO(stop, rc = th->ta_err);
137 rc = out_trans_start(env, th);
141 for (i = 0; i < th->ta_argno; i++) {
142 rc = th->ta_args[i].exec_fn(env, th->ta_handle,
145 CDEBUG(D_INFO, "error during execution of #%u from"
146 " %s:%d: rc = %d\n", i, th->ta_args[i].file,
147 th->ta_args[i].line, rc);
149 LASSERTF(th->ta_args[i].undo_fn != NULL,
150 "can't undo changes, hope for failover!\n");
151 th->ta_args[i].undo_fn(env, th->ta_handle,
158 /* Only fail for real update */
159 info->mti_fail_id = OBD_FAIL_UPDATE_OBJ_NET_REP;
161 CDEBUG(D_INFO, "%s: executed %u/%u: rc = %d\n",
162 dt_obd_name(th->ta_dev), i, th->ta_argno, rc);
163 out_trans_stop(env, th, rc);
164 th->ta_handle = NULL;
171 static void out_reconstruct(const struct lu_env *env, struct dt_device *dt,
172 struct dt_object *obj, struct update_reply *reply,
175 CDEBUG(D_INFO, "%s: fork reply reply %p index %d: rc = %d\n",
176 dt_obd_name(dt), reply, index, 0);
178 update_insert_reply(reply, NULL, 0, index, 0);
182 typedef void (*out_reconstruct_t)(const struct lu_env *env,
183 struct dt_device *dt,
184 struct dt_object *obj,
185 struct update_reply *reply,
188 static inline int out_check_resent(const struct lu_env *env,
189 struct dt_device *dt,
190 struct dt_object *obj,
191 struct ptlrpc_request *req,
192 out_reconstruct_t reconstruct,
193 struct update_reply *reply,
196 if (likely(!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)))
199 if (req_xid_is_last(req)) {
200 reconstruct(env, dt, obj, reply, index);
203 DEBUG_REQ(D_HA, req, "no reply for RESENT req (have "LPD64")",
204 req->rq_export->exp_target_data.ted_lcd->lcd_last_xid);
208 static int out_obj_destroy(const struct lu_env *env, struct dt_object *dt_obj,
213 CDEBUG(D_INFO, "%s: destroy "DFID"\n", dt_obd_name(th->th_dev),
214 PFID(lu_object_fid(&dt_obj->do_lu)));
216 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
217 rc = dt_destroy(env, dt_obj, th);
218 dt_write_unlock(env, dt_obj);
224 * All of the xxx_undo will be used once execution failed,
225 * But because all of the required resource has been reserved in
226 * declare phase, i.e. if declare succeed, it should make sure
227 * the following executing phase succeed in anyway, so these undo
228 * should be useless for most of the time in Phase I
230 int out_tx_create_undo(const struct lu_env *env, struct thandle *th,
235 rc = out_obj_destroy(env, arg->object, th);
237 CERROR("%s: undo failure, we are doomed!: rc = %d\n",
238 dt_obd_name(th->th_dev), rc);
242 int out_tx_create_exec(const struct lu_env *env, struct thandle *th,
245 struct dt_object *dt_obj = arg->object;
248 CDEBUG(D_OTHER, "%s: create "DFID": dof %u, mode %o\n",
249 dt_obd_name(th->th_dev),
250 PFID(lu_object_fid(&arg->object->do_lu)),
251 arg->u.create.dof.dof_type,
252 arg->u.create.attr.la_mode & S_IFMT);
254 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
255 rc = dt_create(env, dt_obj, &arg->u.create.attr,
256 &arg->u.create.hint, &arg->u.create.dof, th);
258 dt_write_unlock(env, dt_obj);
260 CDEBUG(D_INFO, "%s: insert create reply %p index %d: rc = %d\n",
261 dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
263 update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
268 static int __out_tx_create(const struct lu_env *env, struct dt_object *obj,
269 struct lu_attr *attr, struct lu_fid *parent_fid,
270 struct dt_object_format *dof,
271 struct thandle_exec_args *th,
272 struct update_reply *reply,
273 int index, char *file, int line)
277 LASSERT(th->ta_handle != NULL);
278 th->ta_err = dt_declare_create(env, obj, attr, NULL, dof,
283 arg = tx_add_exec(th, out_tx_create_exec, out_tx_create_undo, file,
287 /* release the object in out_trans_stop */
288 lu_object_get(&obj->do_lu);
290 arg->u.create.attr = *attr;
292 arg->u.create.fid = *parent_fid;
293 memset(&arg->u.create.hint, 0, sizeof(arg->u.create.hint));
294 arg->u.create.dof = *dof;
301 static int out_create(struct out_thread_info *info)
303 struct update *update = info->mti_u.update.mti_update;
304 struct dt_object *obj = info->mti_u.update.mti_dt_object;
305 struct dt_object_format *dof = &info->mti_u.update.mti_update_dof;
306 struct obdo *lobdo = &info->mti_u.update.mti_obdo;
307 struct lu_attr *attr = &info->mti_attr.ma_attr;
308 struct lu_fid *fid = NULL;
315 wobdo = update_param_buf(update, 0, &size);
316 if (wobdo == NULL || size != sizeof(*wobdo)) {
317 CERROR("%s: obdo is NULL, invalid RPC: rc = %d\n",
318 mdt_obd_name(info->mti_mdt), -EPROTO);
319 RETURN(err_serious(-EPROTO));
322 obdo_le_to_cpu(wobdo, wobdo);
323 lustre_get_wire_obdo(NULL, lobdo, wobdo);
324 la_from_obdo(attr, lobdo, lobdo->o_valid);
326 dof->dof_type = dt_mode_to_dft(attr->la_mode);
327 if (S_ISDIR(attr->la_mode)) {
330 fid = update_param_buf(update, 1, &size);
331 if (fid == NULL || size != sizeof(*fid)) {
332 CERROR("%s: invalid fid: rc = %d\n",
333 mdt_obd_name(info->mti_mdt), -EPROTO);
334 RETURN(err_serious(-EPROTO));
336 fid_le_to_cpu(fid, fid);
337 if (!fid_is_sane(fid)) {
338 CERROR("%s: invalid fid "DFID": rc = %d\n",
339 mdt_obd_name(info->mti_mdt),
341 RETURN(err_serious(-EPROTO));
345 if (lu_object_exists(&obj->do_lu))
348 rc = out_tx_create(info->mti_env, obj, attr, fid, dof,
350 info->mti_u.update.mti_update_reply,
351 info->mti_u.update.mti_update_reply_index);
356 static int out_tx_attr_set_undo(const struct lu_env *env,
357 struct thandle *th, struct tx_arg *arg)
359 CERROR("%s: attr set undo "DFID" unimplemented yet!: rc = %d\n",
360 dt_obd_name(th->th_dev),
361 PFID(lu_object_fid(&arg->object->do_lu)), -ENOTSUPP);
366 static int out_tx_attr_set_exec(const struct lu_env *env, struct thandle *th,
369 struct dt_object *dt_obj = arg->object;
372 CDEBUG(D_OTHER, "%s: attr set "DFID"\n", dt_obd_name(th->th_dev),
373 PFID(lu_object_fid(&dt_obj->do_lu)));
375 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
376 rc = dt_attr_set(env, dt_obj, &arg->u.attr_set.attr,
378 dt_write_unlock(env, dt_obj);
380 CDEBUG(D_INFO, "%s: insert attr_set reply %p index %d: rc = %d\n",
381 dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
383 update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
388 static int __out_tx_attr_set(const struct lu_env *env,
389 struct dt_object *dt_obj,
390 const struct lu_attr *attr,
391 struct thandle_exec_args *th,
392 struct update_reply *reply, int index,
393 char *file, int line)
397 LASSERT(th->ta_handle != NULL);
398 th->ta_err = dt_declare_attr_set(env, dt_obj, attr, th->ta_handle);
402 arg = tx_add_exec(th, out_tx_attr_set_exec, out_tx_attr_set_undo,
405 lu_object_get(&dt_obj->do_lu);
406 arg->object = dt_obj;
407 arg->u.attr_set.attr = *attr;
413 static int out_attr_set(struct out_thread_info *info)
415 struct update *update = info->mti_u.update.mti_update;
416 struct lu_attr *attr = &info->mti_attr.ma_attr;
417 struct dt_object *obj = info->mti_u.update.mti_dt_object;
418 struct obdo *lobdo = &info->mti_u.update.mti_obdo;
425 wobdo = update_param_buf(update, 0, &size);
426 if (wobdo == NULL || size != sizeof(*wobdo)) {
427 CERROR("%s: empty obdo in the update: rc = %d\n",
428 mdt_obd_name(info->mti_mdt), -EPROTO);
429 RETURN(err_serious(-EPROTO));
434 obdo_le_to_cpu(wobdo, wobdo);
435 lustre_get_wire_obdo(NULL, lobdo, wobdo);
436 la_from_obdo(attr, lobdo, lobdo->o_valid);
438 rc = out_tx_attr_set(info->mti_env, obj, attr, &info->mti_handle,
439 info->mti_u.update.mti_update_reply,
440 info->mti_u.update.mti_update_reply_index);
445 static int out_attr_get(struct out_thread_info *info)
447 struct obdo *obdo = &info->mti_u.update.mti_obdo;
448 const struct lu_env *env = info->mti_env;
449 struct lu_attr *la = &info->mti_attr.ma_attr;
450 struct dt_object *obj = info->mti_u.update.mti_dt_object;
455 if (!lu_object_exists(&obj->do_lu))
458 dt_read_lock(env, obj, MOR_TGT_CHILD);
459 rc = dt_attr_get(env, obj, la, NULL);
461 GOTO(out_unlock, rc);
463 * If it is a directory, we will also check whether the
464 * directory is empty.
465 * la_flags = 0 : Empty.
469 if (S_ISDIR(la->la_mode)) {
471 const struct dt_it_ops *iops;
473 if (!dt_try_as_dir(env, obj))
474 GOTO(out_unlock, rc = -ENOTDIR);
476 iops = &obj->do_index_ops->dio_it;
477 it = iops->init(env, obj, LUDA_64BITHASH, BYPASS_CAPA);
480 result = iops->get(env, it, (const void *)"");
483 for (result = 0, i = 0; result == 0 && i < 3;
485 result = iops->next(env, it);
488 } else if (result == 0)
490 * Huh? Index contains no zero key?
500 obdo_from_la(obdo, la, la->la_valid);
501 obdo_cpu_to_le(obdo, obdo);
502 lustre_set_wire_obdo(NULL, obdo, obdo);
505 dt_read_unlock(env, obj);
507 CDEBUG(D_INFO, "%s: insert attr get reply %p index %d: rc = %d\n",
508 mdt_obd_name(info->mti_mdt),
509 info->mti_u.update.mti_update_reply, 0, rc);
511 update_insert_reply(info->mti_u.update.mti_update_reply, obdo,
512 sizeof(*obdo), 0, rc);
516 static int out_xattr_get(struct out_thread_info *info)
518 struct update *update = info->mti_u.update.mti_update;
519 const struct lu_env *env = info->mti_env;
520 struct lu_buf *lbuf = &info->mti_buf;
521 struct update_reply *reply = info->mti_u.update.mti_update_reply;
522 struct dt_object *obj = info->mti_u.update.mti_dt_object;
529 name = (char *)update_param_buf(update, 0, NULL);
531 CERROR("%s: empty name for xattr get: rc = %d\n",
532 mdt_obd_name(info->mti_mdt), -EPROTO);
533 RETURN(err_serious(-EPROTO));
536 ptr = update_get_buf_internal(reply, 0, NULL);
537 LASSERT(ptr != NULL);
539 /* The first 4 bytes(int) are used to store the result */
540 lbuf->lb_buf = (char *)ptr + sizeof(int);
541 lbuf->lb_len = UPDATE_BUFFER_SIZE - sizeof(struct update_reply);
542 dt_read_lock(env, obj, MOR_TGT_CHILD);
543 rc = dt_xattr_get(env, obj, lbuf, name, NULL);
544 dt_read_unlock(env, obj);
551 GOTO(out, rc = -ENOENT);
555 CDEBUG(D_INFO, "%s: "DFID" get xattr %s len %d\n",
556 mdt_obd_name(info->mti_mdt), PFID(lu_object_fid(&obj->do_lu)),
557 name, (int)lbuf->lb_len);
560 reply->ur_lens[0] = lbuf->lb_len + sizeof(int);
564 static int out_index_lookup(struct out_thread_info *info)
566 struct update *update = info->mti_u.update.mti_update;
567 const struct lu_env *env = info->mti_env;
568 struct dt_object *obj = info->mti_u.update.mti_dt_object;
574 if (!lu_object_exists(&obj->do_lu))
577 name = (char *)update_param_buf(update, 0, NULL);
579 CERROR("%s: empty name for lookup: rc = %d\n",
580 mdt_obd_name(info->mti_mdt), -EPROTO);
581 RETURN(err_serious(-EPROTO));
584 dt_read_lock(env, obj, MOR_TGT_CHILD);
585 if (!dt_try_as_dir(env, obj))
586 GOTO(out_unlock, rc = -ENOTDIR);
588 rc = dt_lookup(env, obj, (struct dt_rec *)&info->mti_tmp_fid1,
589 (struct dt_key *)name, NULL);
592 GOTO(out_unlock, rc);
597 CDEBUG(D_INFO, "lookup "DFID" %s get "DFID" rc %d\n",
598 PFID(lu_object_fid(&obj->do_lu)), name,
599 PFID(&info->mti_tmp_fid1), rc);
600 fid_cpu_to_le(&info->mti_tmp_fid1, &info->mti_tmp_fid1);
603 dt_read_unlock(env, obj);
605 CDEBUG(D_INFO, "%s: insert lookup reply %p index %d: rc = %d\n",
606 mdt_obd_name(info->mti_mdt),
607 info->mti_u.update.mti_update_reply, 0, rc);
609 update_insert_reply(info->mti_u.update.mti_update_reply,
610 &info->mti_tmp_fid1, sizeof(info->mti_tmp_fid1),
615 static int out_tx_xattr_set_exec(const struct lu_env *env,
619 struct dt_object *dt_obj = arg->object;
622 CDEBUG(D_INFO, "%s: set xattr buf %p name %s flag %d\n",
623 dt_obd_name(th->th_dev), arg->u.xattr_set.buf.lb_buf,
624 arg->u.xattr_set.name, arg->u.xattr_set.flags);
626 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
627 rc = dt_xattr_set(env, dt_obj, &arg->u.xattr_set.buf,
628 arg->u.xattr_set.name, arg->u.xattr_set.flags,
630 dt_write_unlock(env, dt_obj);
632 * Ignore errors if this is LINK EA
634 if (unlikely(rc && !strncmp(arg->u.xattr_set.name, XATTR_NAME_LINK,
635 strlen(XATTR_NAME_LINK))))
638 CDEBUG(D_INFO, "%s: insert xattr set reply %p index %d: rc = %d\n",
639 dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
641 update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
646 static int __out_tx_xattr_set(const struct lu_env *env,
647 struct dt_object *dt_obj,
648 const struct lu_buf *buf,
649 const char *name, int flags,
650 struct thandle_exec_args *th,
651 struct update_reply *reply, int index,
652 char *file, int line)
656 LASSERT(th->ta_handle != NULL);
657 th->ta_err = dt_declare_xattr_set(env, dt_obj, buf, name,
658 flags, th->ta_handle);
662 arg = tx_add_exec(th, out_tx_xattr_set_exec, NULL, file, line);
664 lu_object_get(&dt_obj->do_lu);
665 arg->object = dt_obj;
666 arg->u.xattr_set.name = name;
667 arg->u.xattr_set.flags = flags;
668 arg->u.xattr_set.buf = *buf;
671 arg->u.xattr_set.csum = 0;
675 static int out_xattr_set(struct out_thread_info *info)
677 struct update *update = info->mti_u.update.mti_update;
678 struct dt_object *obj = info->mti_u.update.mti_dt_object;
679 struct lu_buf *lbuf = &info->mti_buf;
688 name = update_param_buf(update, 0, NULL);
690 CERROR("%s: empty name for xattr set: rc = %d\n",
691 mdt_obd_name(info->mti_mdt), -EPROTO);
692 RETURN(err_serious(-EPROTO));
695 buf = (char *)update_param_buf(update, 1, &buf_len);
696 if (buf == NULL || buf_len == 0) {
697 CERROR("%s: empty buf for xattr set: rc = %d\n",
698 mdt_obd_name(info->mti_mdt), -EPROTO);
699 RETURN(err_serious(-EPROTO));
703 lbuf->lb_len = buf_len;
705 tmp = (char *)update_param_buf(update, 2, NULL);
707 CERROR("%s: empty flag for xattr set: rc = %d\n",
708 mdt_obd_name(info->mti_mdt), -EPROTO);
709 RETURN(err_serious(-EPROTO));
712 flag = le32_to_cpu(*(int *)tmp);
714 rc = out_tx_xattr_set(info->mti_env, obj, lbuf, name, flag,
716 info->mti_u.update.mti_update_reply,
717 info->mti_u.update.mti_update_reply_index);
721 static int out_obj_ref_add(const struct lu_env *env,
722 struct dt_object *dt_obj,
727 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
728 rc = dt_ref_add(env, dt_obj, th);
729 dt_write_unlock(env, dt_obj);
734 static int out_obj_ref_del(const struct lu_env *env,
735 struct dt_object *dt_obj,
740 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
741 rc = dt_ref_del(env, dt_obj, th);
742 dt_write_unlock(env, dt_obj);
747 static int out_tx_ref_add_exec(const struct lu_env *env, struct thandle *th,
750 struct dt_object *dt_obj = arg->object;
753 rc = out_obj_ref_add(env, dt_obj, th);
755 CDEBUG(D_INFO, "%s: insert ref_add reply %p index %d: rc = %d\n",
756 dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
758 update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
762 static int out_tx_ref_add_undo(const struct lu_env *env, struct thandle *th,
765 return out_obj_ref_del(env, arg->object, th);
768 static int __out_tx_ref_add(const struct lu_env *env,
769 struct dt_object *dt_obj,
770 struct thandle_exec_args *th,
771 struct update_reply *reply,
772 int index, char *file, int line)
776 LASSERT(th->ta_handle != NULL);
777 th->ta_err = dt_declare_ref_add(env, dt_obj, th->ta_handle);
781 arg = tx_add_exec(th, out_tx_ref_add_exec, out_tx_ref_add_undo, file,
784 lu_object_get(&dt_obj->do_lu);
785 arg->object = dt_obj;
792 * increase ref of the object
794 static int out_ref_add(struct out_thread_info *info)
796 struct dt_object *obj = info->mti_u.update.mti_dt_object;
801 rc = out_tx_ref_add(info->mti_env, obj, &info->mti_handle,
802 info->mti_u.update.mti_update_reply,
803 info->mti_u.update.mti_update_reply_index);
807 static int out_tx_ref_del_exec(const struct lu_env *env, struct thandle *th,
810 struct dt_object *dt_obj = arg->object;
813 rc = out_obj_ref_del(env, dt_obj, th);
815 CDEBUG(D_INFO, "%s: insert ref_del reply %p index %d: rc = %d\n",
816 dt_obd_name(th->th_dev), arg->reply, arg->index, 0);
818 update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
823 static int out_tx_ref_del_undo(const struct lu_env *env, struct thandle *th,
826 return out_obj_ref_add(env, arg->object, th);
829 static int __out_tx_ref_del(const struct lu_env *env,
830 struct dt_object *dt_obj,
831 struct thandle_exec_args *th,
832 struct update_reply *reply,
833 int index, char *file, int line)
837 LASSERT(th->ta_handle != NULL);
838 th->ta_err = dt_declare_ref_del(env, dt_obj, th->ta_handle);
842 arg = tx_add_exec(th, out_tx_ref_del_exec, out_tx_ref_del_undo, file,
845 lu_object_get(&dt_obj->do_lu);
846 arg->object = dt_obj;
852 static int out_ref_del(struct out_thread_info *info)
854 struct dt_object *obj = info->mti_u.update.mti_dt_object;
859 if (!lu_object_exists(&obj->do_lu))
862 rc = out_tx_ref_del(info->mti_env, obj, &info->mti_handle,
863 info->mti_u.update.mti_update_reply,
864 info->mti_u.update.mti_update_reply_index);
868 static int out_obj_index_insert(const struct lu_env *env,
869 struct dt_object *dt_obj,
870 const struct dt_rec *rec,
871 const struct dt_key *key,
876 CDEBUG(D_INFO, "%s: index insert "DFID" name: %s fid "DFID"\n",
877 dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
878 (char *)key, PFID((struct lu_fid *)rec));
880 if (dt_try_as_dir(env, dt_obj) == 0)
883 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
884 rc = dt_insert(env, dt_obj, rec, key, th, NULL, 0);
885 dt_write_unlock(env, dt_obj);
890 static int out_obj_index_delete(const struct lu_env *env,
891 struct dt_object *dt_obj,
892 const struct dt_key *key,
897 CDEBUG(D_INFO, "%s: index delete "DFID" name: %s\n",
898 dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
901 if (dt_try_as_dir(env, dt_obj) == 0)
904 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
905 rc = dt_delete(env, dt_obj, key, th, NULL);
906 dt_write_unlock(env, dt_obj);
911 static int out_tx_index_insert_exec(const struct lu_env *env,
912 struct thandle *th, struct tx_arg *arg)
914 struct dt_object *dt_obj = arg->object;
917 rc = out_obj_index_insert(env, dt_obj, arg->u.insert.rec,
918 arg->u.insert.key, th);
920 CDEBUG(D_INFO, "%s: insert idx insert reply %p index %d: rc = %d\n",
921 dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
923 update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
928 static int out_tx_index_insert_undo(const struct lu_env *env,
929 struct thandle *th, struct tx_arg *arg)
931 return out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
934 static int __out_tx_index_insert(const struct lu_env *env,
935 struct dt_object *dt_obj,
936 char *name, struct lu_fid *fid,
937 struct thandle_exec_args *th,
938 struct update_reply *reply,
939 int index, char *file, int line)
943 LASSERT(th->ta_handle != NULL);
945 if (lu_object_exists(&dt_obj->do_lu)) {
946 if (dt_try_as_dir(env, dt_obj) == 0) {
947 th->ta_err = -ENOTDIR;
950 th->ta_err = dt_declare_insert(env, dt_obj,
951 (struct dt_rec *)fid,
952 (struct dt_key *)name,
959 arg = tx_add_exec(th, out_tx_index_insert_exec,
960 out_tx_index_insert_undo, file,
963 lu_object_get(&dt_obj->do_lu);
964 arg->object = dt_obj;
967 arg->u.insert.rec = (struct dt_rec *)fid;
968 arg->u.insert.key = (struct dt_key *)name;
973 static int out_index_insert(struct out_thread_info *info)
975 struct update *update = info->mti_u.update.mti_update;
976 struct dt_object *obj = info->mti_u.update.mti_dt_object;
983 name = (char *)update_param_buf(update, 0, NULL);
985 CERROR("%s: empty name for index insert: rc = %d\n",
986 mdt_obd_name(info->mti_mdt), -EPROTO);
987 RETURN(err_serious(-EPROTO));
990 fid = (struct lu_fid *)update_param_buf(update, 1, &size);
991 if (fid == NULL || size != sizeof(*fid)) {
992 CERROR("%s: invalid fid: rc = %d\n",
993 mdt_obd_name(info->mti_mdt), -EPROTO);
994 RETURN(err_serious(-EPROTO));
997 fid_le_to_cpu(fid, fid);
998 if (!fid_is_sane(fid)) {
999 CERROR("%s: invalid FID "DFID": rc = %d\n",
1000 mdt_obd_name(info->mti_mdt), PFID(fid),
1002 RETURN(err_serious(-EPROTO));
1005 rc = out_tx_index_insert(info->mti_env, obj, name, fid,
1007 info->mti_u.update.mti_update_reply,
1008 info->mti_u.update.mti_update_reply_index);
1012 static int out_tx_index_delete_exec(const struct lu_env *env,
1018 rc = out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
1020 CDEBUG(D_INFO, "%s: insert idx insert reply %p index %d: rc = %d\n",
1021 dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
1023 update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
1028 static int out_tx_index_delete_undo(const struct lu_env *env,
1032 CERROR("%s: Oops, can not rollback index_delete yet: rc = %d\n",
1033 dt_obd_name(th->th_dev), -ENOTSUPP);
1037 static int __out_tx_index_delete(const struct lu_env *env,
1038 struct dt_object *dt_obj, char *name,
1039 struct thandle_exec_args *th,
1040 struct update_reply *reply,
1041 int index, char *file, int line)
1045 if (dt_try_as_dir(env, dt_obj) == 0) {
1046 th->ta_err = -ENOTDIR;
1050 LASSERT(th->ta_handle != NULL);
1051 th->ta_err = dt_declare_delete(env, dt_obj,
1052 (struct dt_key *)name,
1054 if (th->ta_err != 0)
1057 arg = tx_add_exec(th, out_tx_index_delete_exec,
1058 out_tx_index_delete_undo, file,
1061 lu_object_get(&dt_obj->do_lu);
1062 arg->object = dt_obj;
1065 arg->u.insert.key = (struct dt_key *)name;
1069 static int out_index_delete(struct out_thread_info *info)
1071 struct update *update = info->mti_u.update.mti_update;
1072 struct dt_object *obj = info->mti_u.update.mti_dt_object;
1076 if (!lu_object_exists(&obj->do_lu))
1078 name = (char *)update_param_buf(update, 0, NULL);
1080 CERROR("%s: empty name for index delete: rc = %d\n",
1081 mdt_obd_name(info->mti_mdt), -EPROTO);
1082 RETURN(err_serious(-EPROTO));
1085 rc = out_tx_index_delete(info->mti_env, obj, name, &info->mti_handle,
1086 info->mti_u.update.mti_update_reply,
1087 info->mti_u.update.mti_update_reply_index);
1091 static int out_tx_destroy_exec(const struct lu_env *env, struct thandle *th,
1094 struct dt_object *dt_obj = arg->object;
1097 rc = out_obj_destroy(env, dt_obj, th);
1099 CDEBUG(D_INFO, "%s: insert destroy reply %p index %d: rc = %d\n",
1100 dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
1102 update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
1107 static int out_tx_destroy_undo(const struct lu_env *env, struct thandle *th,
1110 CERROR("%s: not support destroy undo yet!: rc = %d\n",
1111 dt_obd_name(th->th_dev), -ENOTSUPP);
1115 static int __out_tx_destroy(const struct lu_env *env, struct dt_object *dt_obj,
1116 struct thandle_exec_args *th,
1117 struct update_reply *reply,
1118 int index, char *file, int line)
1122 LASSERT(th->ta_handle != NULL);
1123 th->ta_err = dt_declare_destroy(env, dt_obj, th->ta_handle);
1127 arg = tx_add_exec(th, out_tx_destroy_exec, out_tx_destroy_undo,
1130 lu_object_get(&dt_obj->do_lu);
1131 arg->object = dt_obj;
1137 static int out_destroy(struct out_thread_info *info)
1139 struct update *update = info->mti_u.update.mti_update;
1140 struct dt_object *obj = info->mti_u.update.mti_dt_object;
1145 fid = &update->u_fid;
1146 fid_le_to_cpu(fid, fid);
1147 if (!fid_is_sane(fid)) {
1148 CERROR("%s: invalid FID "DFID": rc = %d\n",
1149 mdt_obd_name(info->mti_mdt), PFID(fid), -EPROTO);
1150 RETURN(err_serious(-EPROTO));
1153 if (!lu_object_exists(&obj->do_lu))
1156 rc = out_tx_destroy(info->mti_env, obj, &info->mti_handle,
1157 info->mti_u.update.mti_update_reply,
1158 info->mti_u.update.mti_update_reply_index);
1163 #define DEF_OUT_HNDL(opc, name, fail_id, flags, fn) \
1164 [opc - OBJ_CREATE] = { \
1166 .mh_fail_id = fail_id, \
1168 .mh_flags = flags, \
1173 #define out_handler mdt_handler
1174 static struct out_handler out_update_ops[] = {
1175 DEF_OUT_HNDL(OBJ_CREATE, "obj_create", 0, MUTABOR | HABEO_REFERO,
1177 DEF_OUT_HNDL(OBJ_DESTROY, "obj_create", 0, MUTABOR | HABEO_REFERO,
1179 DEF_OUT_HNDL(OBJ_REF_ADD, "obj_ref_add", 0, MUTABOR | HABEO_REFERO,
1181 DEF_OUT_HNDL(OBJ_REF_DEL, "obj_ref_del", 0, MUTABOR | HABEO_REFERO,
1183 DEF_OUT_HNDL(OBJ_ATTR_SET, "obj_attr_set", 0, MUTABOR | HABEO_REFERO,
1185 DEF_OUT_HNDL(OBJ_ATTR_GET, "obj_attr_get", 0, HABEO_REFERO,
1187 DEF_OUT_HNDL(OBJ_XATTR_SET, "obj_xattr_set", 0, MUTABOR | HABEO_REFERO,
1189 DEF_OUT_HNDL(OBJ_XATTR_GET, "obj_xattr_get", 0, HABEO_REFERO,
1191 DEF_OUT_HNDL(OBJ_INDEX_LOOKUP, "obj_index_lookup", 0, HABEO_REFERO,
1193 DEF_OUT_HNDL(OBJ_INDEX_INSERT, "obj_index_insert", 0,
1194 MUTABOR | HABEO_REFERO, out_index_insert),
1195 DEF_OUT_HNDL(OBJ_INDEX_DELETE, "obj_index_delete", 0,
1196 MUTABOR | HABEO_REFERO, out_index_delete),
1199 #define out_opc_slice mdt_opc_slice
1200 static struct out_opc_slice out_handlers[] = {
1202 .mos_opc_start = OBJ_CREATE,
1203 .mos_opc_end = OBJ_LAST,
1204 .mos_hs = out_update_ops
1209 * Object updates between Targets. Because all the updates has been
1210 * dis-assemblied into object updates in master MDD layer, so out
1211 * will skip MDD layer, and call OSD API directly to execute these
1214 * In phase I, all of the updates in the request need to be executed
1215 * in one transaction, and the transaction has to be synchronously.
1217 * Please refer to lustre/include/lustre/lustre_idl.h for req/reply
1220 int out_handle(struct out_thread_info *info)
1222 struct thandle_exec_args *th = &info->mti_handle;
1223 struct req_capsule *pill = info->mti_pill;
1224 struct mdt_device *mdt = info->mti_mdt;
1225 struct dt_device *dt = mdt->mdt_bottom;
1226 const struct lu_env *env = info->mti_env;
1227 struct update_buf *ubuf;
1228 struct update *update;
1229 struct update_reply *update_reply;
1232 int old_batchid = -1;
1239 req_capsule_set(pill, &RQF_UPDATE_OBJ);
1240 bufsize = req_capsule_get_size(pill, &RMF_UPDATE, RCL_CLIENT);
1241 if (bufsize != UPDATE_BUFFER_SIZE) {
1242 CERROR("%s: invalid bufsize %d: rc = %d\n",
1243 mdt_obd_name(mdt), bufsize, -EPROTO);
1244 RETURN(err_serious(-EPROTO));
1247 ubuf = req_capsule_client_get(pill, &RMF_UPDATE);
1249 CERROR("%s: No buf!: rc = %d\n", mdt_obd_name(mdt),
1251 RETURN(err_serious(-EPROTO));
1254 if (le32_to_cpu(ubuf->ub_magic) != UPDATE_BUFFER_MAGIC) {
1255 CERROR("%s: invalid magic %x expect %x: rc = %d\n",
1256 mdt_obd_name(mdt), le32_to_cpu(ubuf->ub_magic),
1257 UPDATE_BUFFER_MAGIC, -EPROTO);
1258 RETURN(err_serious(-EPROTO));
1261 count = le32_to_cpu(ubuf->ub_count);
1263 CERROR("%s: No update!: rc = %d\n",
1264 mdt_obd_name(mdt), -EPROTO);
1265 RETURN(err_serious(-EPROTO));
1268 req_capsule_set_size(pill, &RMF_UPDATE_REPLY, RCL_SERVER,
1269 UPDATE_BUFFER_SIZE);
1270 rc = req_capsule_server_pack(pill);
1272 CERROR("%s: Can't pack response: rc = %d\n",
1273 mdt_obd_name(mdt), rc);
1277 /* Prepare the update reply buffer */
1278 update_reply = req_capsule_server_get(pill, &RMF_UPDATE_REPLY);
1279 update_init_reply_buf(update_reply, count);
1280 info->mti_u.update.mti_update_reply = update_reply;
1282 rc = out_tx_start(env, dt, th);
1286 /* Walk through updates in the request to execute them synchronously */
1287 off = cfs_size_round(offsetof(struct update_buf, ub_bufs[0]));
1288 for (i = 0; i < count; i++) {
1289 struct out_handler *h;
1290 struct dt_object *dt_obj;
1292 update = (struct update *)((char *)ubuf + off);
1293 if (old_batchid == -1) {
1294 old_batchid = update->u_batchid;
1295 } else if (old_batchid != update->u_batchid) {
1296 /* Stop the current update transaction,
1297 * create a new one */
1298 rc = out_tx_end(env, th);
1302 rc = out_tx_start(env, dt, th);
1305 old_batchid = update->u_batchid;
1308 fid_le_to_cpu(&update->u_fid, &update->u_fid);
1309 if (!fid_is_sane(&update->u_fid)) {
1310 CERROR("%s: invalid FID "DFID": rc = %d\n",
1311 mdt_obd_name(mdt), PFID(&update->u_fid),
1313 GOTO(out, rc = err_serious(-EPROTO));
1316 dt_obj = dt_locate(env, dt, &update->u_fid);
1318 GOTO(out, rc = PTR_ERR(dt_obj));
1320 info->mti_u.update.mti_dt_object = dt_obj;
1321 info->mti_u.update.mti_update = update;
1322 info->mti_u.update.mti_update_reply_index = i;
1324 h = mdt_handler_find(update->u_type, out_handlers);
1325 if (likely(h != NULL)) {
1326 /* For real modification RPC, check if the update
1327 * has been executed */
1328 if (h->mh_flags & MUTABOR) {
1329 struct ptlrpc_request *req = mdt_info_req(info);
1331 if (out_check_resent(env, dt, dt_obj, req,
1337 rc = h->mh_act(info);
1339 CERROR("%s: The unsupported opc: 0x%x\n",
1340 mdt_obd_name(mdt), update->u_type);
1341 lu_object_put(env, &dt_obj->do_lu);
1342 GOTO(out, rc = -ENOTSUPP);
1345 lu_object_put(env, &dt_obj->do_lu);
1348 off += cfs_size_round(update_size(update));
1351 rc1 = out_tx_end(env, th);
1352 rc = rc == 0 ? rc1 : rc;