4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2013, Intel Corporation.
25 * lustre/mdt/out_handler.c
27 * Object update handler between targets.
29 * Author: di.wang <di.wang@intel.com>
32 #define DEBUG_SUBSYSTEM S_CLASS
34 #include <obd_class.h>
35 #include <md_object.h>
36 #include "tgt_internal.h"
37 #include <lustre_update.h>
39 struct tx_arg *tx_add_exec(struct thandle_exec_args *ta, tx_exec_func_t func,
40 tx_exec_func_t undo, char *file, int line)
47 LASSERTF(ta->ta_argno + 1 <= TX_MAX_OPS,
48 "Too many updates(%d) in one trans\n", ta->ta_argno);
53 ta->ta_args[i].exec_fn = func;
54 ta->ta_args[i].undo_fn = undo;
55 ta->ta_args[i].file = file;
56 ta->ta_args[i].line = line;
58 return &ta->ta_args[i];
61 static int out_tx_start(const struct lu_env *env, struct dt_device *dt,
62 struct thandle_exec_args *ta, struct obd_export *exp)
64 memset(ta, 0, sizeof(*ta));
65 ta->ta_handle = dt_trans_create(env, dt);
66 if (IS_ERR(ta->ta_handle)) {
69 CERROR("%s: start handle error: rc = %ld\n",
70 dt_obd_name(dt), PTR_ERR(ta->ta_handle));
71 rc = PTR_ERR(ta->ta_handle);
76 if (exp->exp_need_sync)
77 ta->ta_handle->th_sync = 1;
82 static int out_trans_start(const struct lu_env *env,
83 struct thandle_exec_args *ta)
85 return dt_trans_start(env, ta->ta_dev, ta->ta_handle);
88 static int out_trans_stop(const struct lu_env *env,
89 struct thandle_exec_args *ta, int err)
94 ta->ta_handle->th_result = err;
95 rc = dt_trans_stop(env, ta->ta_dev, ta->ta_handle);
96 for (i = 0; i < ta->ta_argno; i++) {
97 if (ta->ta_args[i].object != NULL) {
98 lu_object_put(env, &ta->ta_args[i].object->do_lu);
99 ta->ta_args[i].object = NULL;
106 int out_tx_end(const struct lu_env *env, struct thandle_exec_args *ta)
108 struct tgt_session_info *tsi = tgt_ses_info(env);
112 LASSERT(ta->ta_handle);
114 if (ta->ta_err != 0 || ta->ta_argno == 0)
115 GOTO(stop, rc = ta->ta_err);
117 rc = out_trans_start(env, ta);
121 for (i = 0; i < ta->ta_argno; i++) {
122 rc = ta->ta_args[i].exec_fn(env, ta->ta_handle,
125 CDEBUG(D_INFO, "error during execution of #%u from"
126 " %s:%d: rc = %d\n", i, ta->ta_args[i].file,
127 ta->ta_args[i].line, rc);
129 LASSERTF(ta->ta_args[i].undo_fn != NULL,
130 "can't undo changes, hope for failover!\n");
131 ta->ta_args[i].undo_fn(env, ta->ta_handle,
138 /* Only fail for real update */
139 tsi->tsi_reply_fail_id = OBD_FAIL_OUT_UPDATE_NET_REP;
141 CDEBUG(D_INFO, "%s: executed %u/%u: rc = %d\n",
142 dt_obd_name(ta->ta_dev), i, ta->ta_argno, rc);
143 out_trans_stop(env, ta, rc);
144 ta->ta_handle = NULL;
151 static void out_reconstruct(const struct lu_env *env, struct dt_device *dt,
152 struct dt_object *obj,
153 struct object_update_reply *reply,
156 CDEBUG(D_INFO, "%s: fork reply reply %p index %d: rc = %d\n",
157 dt_obd_name(dt), reply, index, 0);
159 object_update_result_insert(reply, NULL, 0, index, 0);
163 typedef void (*out_reconstruct_t)(const struct lu_env *env,
164 struct dt_device *dt,
165 struct dt_object *obj,
166 struct object_update_reply *reply,
169 static inline int out_check_resent(const struct lu_env *env,
170 struct dt_device *dt,
171 struct dt_object *obj,
172 struct ptlrpc_request *req,
173 out_reconstruct_t reconstruct,
174 struct object_update_reply *reply,
177 if (likely(!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)))
180 if (req_xid_is_last(req)) {
181 reconstruct(env, dt, obj, reply, index);
184 DEBUG_REQ(D_HA, req, "no reply for RESENT req (have "LPD64")",
185 req->rq_export->exp_target_data.ted_lcd->lcd_last_xid);
189 static int out_obj_destroy(const struct lu_env *env, struct dt_object *dt_obj,
194 CDEBUG(D_INFO, "%s: destroy "DFID"\n", dt_obd_name(th->th_dev),
195 PFID(lu_object_fid(&dt_obj->do_lu)));
197 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
198 rc = dt_destroy(env, dt_obj, th);
199 dt_write_unlock(env, dt_obj);
205 * All of the xxx_undo will be used once execution failed,
206 * But because all of the required resource has been reserved in
207 * declare phase, i.e. if declare succeed, it should make sure
208 * the following executing phase succeed in anyway, so these undo
209 * should be useless for most of the time in Phase I
211 int out_tx_create_undo(const struct lu_env *env, struct thandle *th,
216 rc = out_obj_destroy(env, arg->object, th);
218 CERROR("%s: undo failure, we are doomed!: rc = %d\n",
219 dt_obd_name(th->th_dev), rc);
223 int out_tx_create_exec(const struct lu_env *env, struct thandle *th,
226 struct dt_object *dt_obj = arg->object;
229 CDEBUG(D_OTHER, "%s: create "DFID": dof %u, mode %o\n",
230 dt_obd_name(th->th_dev),
231 PFID(lu_object_fid(&arg->object->do_lu)),
232 arg->u.create.dof.dof_type,
233 arg->u.create.attr.la_mode & S_IFMT);
235 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
236 rc = dt_create(env, dt_obj, &arg->u.create.attr,
237 &arg->u.create.hint, &arg->u.create.dof, th);
239 dt_write_unlock(env, dt_obj);
241 CDEBUG(D_INFO, "%s: insert create reply %p index %d: rc = %d\n",
242 dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
244 object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
249 static int __out_tx_create(const struct lu_env *env, struct dt_object *obj,
250 struct lu_attr *attr, struct lu_fid *parent_fid,
251 struct dt_object_format *dof,
252 struct thandle_exec_args *ta,
253 struct object_update_reply *reply,
254 int index, char *file, int line)
258 LASSERT(ta->ta_handle != NULL);
259 ta->ta_err = dt_declare_create(env, obj, attr, NULL, dof,
264 arg = tx_add_exec(ta, out_tx_create_exec, out_tx_create_undo, file,
268 /* release the object in out_trans_stop */
269 lu_object_get(&obj->do_lu);
271 arg->u.create.attr = *attr;
272 if (parent_fid != NULL)
273 arg->u.create.fid = *parent_fid;
274 memset(&arg->u.create.hint, 0, sizeof(arg->u.create.hint));
275 arg->u.create.dof = *dof;
282 static int out_create(struct tgt_session_info *tsi)
284 struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env);
285 struct object_update *update = tti->tti_u.update.tti_update;
286 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
287 struct dt_object_format *dof = &tti->tti_u.update.tti_update_dof;
288 struct obdo *lobdo = &tti->tti_u.update.tti_obdo;
289 struct lu_attr *attr = &tti->tti_attr;
290 struct lu_fid *fid = NULL;
297 wobdo = object_update_param_get(update, 0, &size);
298 if (wobdo == NULL || size != sizeof(*wobdo)) {
299 CERROR("%s: obdo is NULL, invalid RPC: rc = %d\n",
300 tgt_name(tsi->tsi_tgt), -EPROTO);
301 RETURN(err_serious(-EPROTO));
304 obdo_le_to_cpu(wobdo, wobdo);
305 lustre_get_wire_obdo(NULL, lobdo, wobdo);
306 la_from_obdo(attr, lobdo, lobdo->o_valid);
308 dof->dof_type = dt_mode_to_dft(attr->la_mode);
309 if (update->ou_params_count > 1) {
312 fid = object_update_param_get(update, 1, &size);
313 if (fid == NULL || size != sizeof(*fid)) {
314 CERROR("%s: invalid fid: rc = %d\n",
315 tgt_name(tsi->tsi_tgt), -EPROTO);
316 RETURN(err_serious(-EPROTO));
318 if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
319 lustre_swab_lu_fid(fid);
320 if (!fid_is_sane(fid)) {
321 CERROR("%s: invalid fid "DFID": rc = %d\n",
322 tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
323 RETURN(err_serious(-EPROTO));
327 if (lu_object_exists(&obj->do_lu))
330 rc = out_tx_create(tsi->tsi_env, obj, attr, fid, dof,
332 tti->tti_u.update.tti_update_reply,
333 tti->tti_u.update.tti_update_reply_index);
338 static int out_tx_attr_set_undo(const struct lu_env *env,
339 struct thandle *th, struct tx_arg *arg)
341 CERROR("%s: attr set undo "DFID" unimplemented yet!: rc = %d\n",
342 dt_obd_name(th->th_dev),
343 PFID(lu_object_fid(&arg->object->do_lu)), -ENOTSUPP);
348 static int out_tx_attr_set_exec(const struct lu_env *env, struct thandle *th,
351 struct dt_object *dt_obj = arg->object;
354 CDEBUG(D_OTHER, "%s: attr set "DFID"\n", dt_obd_name(th->th_dev),
355 PFID(lu_object_fid(&dt_obj->do_lu)));
357 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
358 rc = dt_attr_set(env, dt_obj, &arg->u.attr_set.attr, th, NULL);
359 dt_write_unlock(env, dt_obj);
361 CDEBUG(D_INFO, "%s: insert attr_set reply %p index %d: rc = %d\n",
362 dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
364 object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
369 static int __out_tx_attr_set(const struct lu_env *env,
370 struct dt_object *dt_obj,
371 const struct lu_attr *attr,
372 struct thandle_exec_args *th,
373 struct object_update_reply *reply,
374 int index, char *file, int line)
378 LASSERT(th->ta_handle != NULL);
379 th->ta_err = dt_declare_attr_set(env, dt_obj, attr, th->ta_handle);
383 arg = tx_add_exec(th, out_tx_attr_set_exec, out_tx_attr_set_undo,
386 lu_object_get(&dt_obj->do_lu);
387 arg->object = dt_obj;
388 arg->u.attr_set.attr = *attr;
394 static int out_attr_set(struct tgt_session_info *tsi)
396 struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env);
397 struct object_update *update = tti->tti_u.update.tti_update;
398 struct lu_attr *attr = &tti->tti_attr;
399 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
400 struct obdo *lobdo = &tti->tti_u.update.tti_obdo;
407 wobdo = object_update_param_get(update, 0, &size);
408 if (wobdo == NULL || size != sizeof(*wobdo)) {
409 CERROR("%s: empty obdo in the update: rc = %d\n",
410 tgt_name(tsi->tsi_tgt), -EPROTO);
411 RETURN(err_serious(-EPROTO));
416 obdo_le_to_cpu(wobdo, wobdo);
417 lustre_get_wire_obdo(NULL, lobdo, wobdo);
418 la_from_obdo(attr, lobdo, lobdo->o_valid);
420 rc = out_tx_attr_set(tsi->tsi_env, obj, attr, &tti->tti_tea,
421 tti->tti_u.update.tti_update_reply,
422 tti->tti_u.update.tti_update_reply_index);
427 static int out_attr_get(struct tgt_session_info *tsi)
429 const struct lu_env *env = tsi->tsi_env;
430 struct tgt_thread_info *tti = tgt_th_info(env);
431 struct obdo *obdo = &tti->tti_u.update.tti_obdo;
432 struct lu_attr *la = &tti->tti_attr;
433 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
434 int idx = tti->tti_u.update.tti_update_reply_index;
439 if (!lu_object_exists(&obj->do_lu))
442 dt_read_lock(env, obj, MOR_TGT_CHILD);
443 rc = dt_attr_get(env, obj, la, NULL);
445 GOTO(out_unlock, rc);
447 * If it is a directory, we will also check whether the
448 * directory is empty.
449 * la_flags = 0 : Empty.
453 if (S_ISDIR(la->la_mode)) {
455 const struct dt_it_ops *iops;
457 if (!dt_try_as_dir(env, obj))
458 GOTO(out_unlock, rc = -ENOTDIR);
460 iops = &obj->do_index_ops->dio_it;
461 it = iops->init(env, obj, LUDA_64BITHASH, BYPASS_CAPA);
464 result = iops->get(env, it, (const void *)"");
467 for (result = 0, i = 0; result == 0 && i < 3;
469 result = iops->next(env, it);
472 } else if (result == 0)
474 * Huh? Index contains no zero key?
484 obdo_from_la(obdo, la, la->la_valid);
485 obdo_cpu_to_le(obdo, obdo);
486 lustre_set_wire_obdo(NULL, obdo, obdo);
489 dt_read_unlock(env, obj);
491 CDEBUG(D_INFO, "%s: insert attr get reply %p index %d: rc = %d\n",
492 tgt_name(tsi->tsi_tgt), tti->tti_u.update.tti_update_reply,
495 object_update_result_insert(tti->tti_u.update.tti_update_reply, obdo,
496 sizeof(*obdo), idx, rc);
501 static int out_xattr_get(struct tgt_session_info *tsi)
503 const struct lu_env *env = tsi->tsi_env;
504 struct tgt_thread_info *tti = tgt_th_info(env);
505 struct object_update *update = tti->tti_u.update.tti_update;
506 struct lu_buf *lbuf = &tti->tti_buf;
507 struct object_update_reply *reply = tti->tti_u.update.tti_update_reply;
508 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
510 struct object_update_result *update_result;
511 int idx = tti->tti_u.update.tti_update_reply_index;
516 name = object_update_param_get(update, 0, NULL);
518 CERROR("%s: empty name for xattr get: rc = %d\n",
519 tgt_name(tsi->tsi_tgt), -EPROTO);
520 RETURN(err_serious(-EPROTO));
523 update_result = object_update_result_get(reply, 0, NULL);
524 if (update_result == NULL) {
525 CERROR("%s: empty name for xattr get: rc = %d\n",
526 tgt_name(tsi->tsi_tgt), -EPROTO);
527 RETURN(err_serious(-EPROTO));
530 lbuf->lb_buf = update_result->our_data;
531 lbuf->lb_len = OUT_UPDATE_REPLY_SIZE -
532 cfs_size_round((unsigned long)update_result->our_data -
533 (unsigned long)update_result);
534 dt_read_lock(env, obj, MOR_TGT_CHILD);
535 rc = dt_xattr_get(env, obj, lbuf, name, NULL);
536 dt_read_unlock(env, obj);
543 GOTO(out, rc = -ENOENT);
547 CDEBUG(D_INFO, "%s: "DFID" get xattr %s len %d\n",
548 tgt_name(tsi->tsi_tgt), PFID(lu_object_fid(&obj->do_lu)),
549 name, (int)lbuf->lb_len);
554 object_update_result_insert(reply, lbuf->lb_buf, lbuf->lb_len, idx, rc);
558 static int out_index_lookup(struct tgt_session_info *tsi)
560 const struct lu_env *env = tsi->tsi_env;
561 struct tgt_thread_info *tti = tgt_th_info(env);
562 struct object_update *update = tti->tti_u.update.tti_update;
563 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
569 if (!lu_object_exists(&obj->do_lu))
572 name = object_update_param_get(update, 0, NULL);
574 CERROR("%s: empty name for lookup: rc = %d\n",
575 tgt_name(tsi->tsi_tgt), -EPROTO);
576 RETURN(err_serious(-EPROTO));
579 dt_read_lock(env, obj, MOR_TGT_CHILD);
580 if (!dt_try_as_dir(env, obj))
581 GOTO(out_unlock, rc = -ENOTDIR);
583 rc = dt_lookup(env, obj, (struct dt_rec *)&tti->tti_fid1,
584 (struct dt_key *)name, NULL);
587 GOTO(out_unlock, rc);
592 CDEBUG(D_INFO, "lookup "DFID" %s get "DFID" rc %d\n",
593 PFID(lu_object_fid(&obj->do_lu)), name,
594 PFID(&tti->tti_fid1), rc);
597 dt_read_unlock(env, obj);
599 CDEBUG(D_INFO, "%s: insert lookup reply %p index %d: rc = %d\n",
600 tgt_name(tsi->tsi_tgt), tti->tti_u.update.tti_update_reply,
603 object_update_result_insert(tti->tti_u.update.tti_update_reply,
604 &tti->tti_fid1, sizeof(tti->tti_fid1),
605 tti->tti_u.update.tti_update_reply_index, rc);
609 static int out_tx_xattr_set_exec(const struct lu_env *env,
613 struct dt_object *dt_obj = arg->object;
616 CDEBUG(D_INFO, "%s: set xattr buf %p name %s flag %d\n",
617 dt_obd_name(th->th_dev), arg->u.xattr_set.buf.lb_buf,
618 arg->u.xattr_set.name, arg->u.xattr_set.flags);
620 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
621 rc = dt_xattr_set(env, dt_obj, &arg->u.xattr_set.buf,
622 arg->u.xattr_set.name, arg->u.xattr_set.flags,
624 dt_write_unlock(env, dt_obj);
626 * Ignore errors if this is LINK EA
628 if (unlikely(rc && !strcmp(arg->u.xattr_set.name, XATTR_NAME_LINK)))
631 CDEBUG(D_INFO, "%s: insert xattr set reply %p index %d: rc = %d\n",
632 dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
634 object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
639 static int __out_tx_xattr_set(const struct lu_env *env,
640 struct dt_object *dt_obj,
641 const struct lu_buf *buf,
642 const char *name, int flags,
643 struct thandle_exec_args *ta,
644 struct object_update_reply *reply,
645 int index, char *file, int line)
649 LASSERT(ta->ta_handle != NULL);
650 ta->ta_err = dt_declare_xattr_set(env, dt_obj, buf, name,
651 flags, ta->ta_handle);
655 arg = tx_add_exec(ta, out_tx_xattr_set_exec, NULL, file, line);
657 lu_object_get(&dt_obj->do_lu);
658 arg->object = dt_obj;
659 arg->u.xattr_set.name = name;
660 arg->u.xattr_set.flags = flags;
661 arg->u.xattr_set.buf = *buf;
664 arg->u.xattr_set.csum = 0;
668 static int out_xattr_set(struct tgt_session_info *tsi)
670 struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env);
671 struct object_update *update = tti->tti_u.update.tti_update;
672 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
673 struct lu_buf *lbuf = &tti->tti_buf;
682 name = object_update_param_get(update, 0, NULL);
684 CERROR("%s: empty name for xattr set: rc = %d\n",
685 tgt_name(tsi->tsi_tgt), -EPROTO);
686 RETURN(err_serious(-EPROTO));
689 buf = object_update_param_get(update, 1, &buf_len);
690 if (buf == NULL || buf_len == 0) {
691 CERROR("%s: empty buf for xattr set: rc = %d\n",
692 tgt_name(tsi->tsi_tgt), -EPROTO);
693 RETURN(err_serious(-EPROTO));
697 lbuf->lb_len = buf_len;
699 tmp = (char *)object_update_param_get(update, 2, NULL);
701 CERROR("%s: empty flag for xattr set: rc = %d\n",
702 tgt_name(tsi->tsi_tgt), -EPROTO);
703 RETURN(err_serious(-EPROTO));
706 if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
707 __swab32s((__u32 *)tmp);
710 rc = out_tx_xattr_set(tsi->tsi_env, obj, lbuf, name, flag,
712 tti->tti_u.update.tti_update_reply,
713 tti->tti_u.update.tti_update_reply_index);
717 static int out_obj_ref_add(const struct lu_env *env,
718 struct dt_object *dt_obj,
723 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
724 rc = dt_ref_add(env, dt_obj, th);
725 dt_write_unlock(env, dt_obj);
730 static int out_obj_ref_del(const struct lu_env *env,
731 struct dt_object *dt_obj,
736 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
737 rc = dt_ref_del(env, dt_obj, th);
738 dt_write_unlock(env, dt_obj);
743 static int out_tx_ref_add_exec(const struct lu_env *env, struct thandle *th,
746 struct dt_object *dt_obj = arg->object;
749 rc = out_obj_ref_add(env, dt_obj, th);
751 CDEBUG(D_INFO, "%s: insert ref_add reply %p index %d: rc = %d\n",
752 dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
754 object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
758 static int out_tx_ref_add_undo(const struct lu_env *env, struct thandle *th,
761 return out_obj_ref_del(env, arg->object, th);
764 static int __out_tx_ref_add(const struct lu_env *env,
765 struct dt_object *dt_obj,
766 struct thandle_exec_args *ta,
767 struct object_update_reply *reply,
768 int index, char *file, int line)
772 LASSERT(ta->ta_handle != NULL);
773 ta->ta_err = dt_declare_ref_add(env, dt_obj, ta->ta_handle);
777 arg = tx_add_exec(ta, out_tx_ref_add_exec, out_tx_ref_add_undo, file,
780 lu_object_get(&dt_obj->do_lu);
781 arg->object = dt_obj;
788 * increase ref of the object
790 static int out_ref_add(struct tgt_session_info *tsi)
792 struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env);
793 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
798 rc = out_tx_ref_add(tsi->tsi_env, obj, &tti->tti_tea,
799 tti->tti_u.update.tti_update_reply,
800 tti->tti_u.update.tti_update_reply_index);
804 static int out_tx_ref_del_exec(const struct lu_env *env, struct thandle *th,
807 struct dt_object *dt_obj = arg->object;
810 rc = out_obj_ref_del(env, dt_obj, th);
812 CDEBUG(D_INFO, "%s: insert ref_del reply %p index %d: rc = %d\n",
813 dt_obd_name(th->th_dev), arg->reply, arg->index, 0);
815 object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
820 static int out_tx_ref_del_undo(const struct lu_env *env, struct thandle *th,
823 return out_obj_ref_add(env, arg->object, th);
826 static int __out_tx_ref_del(const struct lu_env *env,
827 struct dt_object *dt_obj,
828 struct thandle_exec_args *ta,
829 struct object_update_reply *reply,
830 int index, char *file, int line)
834 LASSERT(ta->ta_handle != NULL);
835 ta->ta_err = dt_declare_ref_del(env, dt_obj, ta->ta_handle);
839 arg = tx_add_exec(ta, out_tx_ref_del_exec, out_tx_ref_del_undo, file,
842 lu_object_get(&dt_obj->do_lu);
843 arg->object = dt_obj;
849 static int out_ref_del(struct tgt_session_info *tsi)
851 struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env);
852 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
857 if (!lu_object_exists(&obj->do_lu))
860 rc = out_tx_ref_del(tsi->tsi_env, obj, &tti->tti_tea,
861 tti->tti_u.update.tti_update_reply,
862 tti->tti_u.update.tti_update_reply_index);
866 static int out_obj_index_insert(const struct lu_env *env,
867 struct dt_object *dt_obj,
868 const struct dt_rec *rec,
869 const struct dt_key *key,
874 CDEBUG(D_INFO, "%s: index insert "DFID" name: %s fid "DFID"\n",
875 dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
876 (char *)key, PFID((struct lu_fid *)rec));
878 if (dt_try_as_dir(env, dt_obj) == 0)
881 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
882 rc = dt_insert(env, dt_obj, rec, key, th, NULL, 0);
883 dt_write_unlock(env, dt_obj);
888 static int out_obj_index_delete(const struct lu_env *env,
889 struct dt_object *dt_obj,
890 const struct dt_key *key,
895 CDEBUG(D_INFO, "%s: index delete "DFID" name: %s\n",
896 dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
899 if (dt_try_as_dir(env, dt_obj) == 0)
902 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
903 rc = dt_delete(env, dt_obj, key, th, NULL);
904 dt_write_unlock(env, dt_obj);
909 static int out_tx_index_insert_exec(const struct lu_env *env,
910 struct thandle *th, struct tx_arg *arg)
912 struct dt_object *dt_obj = arg->object;
915 rc = out_obj_index_insert(env, dt_obj, arg->u.insert.rec,
916 arg->u.insert.key, th);
918 CDEBUG(D_INFO, "%s: insert idx insert reply %p index %d: rc = %d\n",
919 dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
921 object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
926 static int out_tx_index_insert_undo(const struct lu_env *env,
927 struct thandle *th, struct tx_arg *arg)
929 return out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
932 static int __out_tx_index_insert(const struct lu_env *env,
933 struct dt_object *dt_obj,
934 char *name, struct lu_fid *fid,
935 struct thandle_exec_args *ta,
936 struct object_update_reply *reply,
937 int index, char *file, int line)
941 LASSERT(ta->ta_handle != NULL);
943 if (lu_object_exists(&dt_obj->do_lu)) {
944 if (dt_try_as_dir(env, dt_obj) == 0) {
945 ta->ta_err = -ENOTDIR;
948 ta->ta_err = dt_declare_insert(env, dt_obj,
949 (struct dt_rec *)fid,
950 (struct dt_key *)name,
957 arg = tx_add_exec(ta, out_tx_index_insert_exec,
958 out_tx_index_insert_undo, file,
961 lu_object_get(&dt_obj->do_lu);
962 arg->object = dt_obj;
965 arg->u.insert.rec = (struct dt_rec *)fid;
966 arg->u.insert.key = (struct dt_key *)name;
971 static int out_index_insert(struct tgt_session_info *tsi)
973 struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env);
974 struct object_update *update = tti->tti_u.update.tti_update;
975 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
983 name = object_update_param_get(update, 0, NULL);
985 CERROR("%s: empty name for index insert: rc = %d\n",
986 tgt_name(tsi->tsi_tgt), -EPROTO);
987 RETURN(err_serious(-EPROTO));
990 fid = object_update_param_get(update, 1, &size);
991 if (fid == NULL || size != sizeof(*fid)) {
992 CERROR("%s: invalid fid: rc = %d\n",
993 tgt_name(tsi->tsi_tgt), -EPROTO);
994 RETURN(err_serious(-EPROTO));
997 if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
998 lustre_swab_lu_fid(fid);
1000 if (!fid_is_sane(fid)) {
1001 CERROR("%s: invalid FID "DFID": rc = %d\n",
1002 tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
1003 RETURN(err_serious(-EPROTO));
1006 rc = out_tx_index_insert(tsi->tsi_env, obj, name, fid,
1008 tti->tti_u.update.tti_update_reply,
1009 tti->tti_u.update.tti_update_reply_index);
1013 static int out_tx_index_delete_exec(const struct lu_env *env,
1019 rc = out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
1021 CDEBUG(D_INFO, "%s: insert idx insert reply %p index %d: rc = %d\n",
1022 dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
1024 object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
1029 static int out_tx_index_delete_undo(const struct lu_env *env,
1033 CERROR("%s: Oops, can not rollback index_delete yet: rc = %d\n",
1034 dt_obd_name(th->th_dev), -ENOTSUPP);
1038 static int __out_tx_index_delete(const struct lu_env *env,
1039 struct dt_object *dt_obj, char *name,
1040 struct thandle_exec_args *ta,
1041 struct object_update_reply *reply,
1042 int index, char *file, int line)
1046 if (dt_try_as_dir(env, dt_obj) == 0) {
1047 ta->ta_err = -ENOTDIR;
1051 LASSERT(ta->ta_handle != NULL);
1052 ta->ta_err = dt_declare_delete(env, dt_obj,
1053 (struct dt_key *)name,
1055 if (ta->ta_err != 0)
1058 arg = tx_add_exec(ta, out_tx_index_delete_exec,
1059 out_tx_index_delete_undo, file,
1062 lu_object_get(&dt_obj->do_lu);
1063 arg->object = dt_obj;
1066 arg->u.insert.key = (struct dt_key *)name;
1070 static int out_index_delete(struct tgt_session_info *tsi)
1072 struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env);
1073 struct object_update *update = tti->tti_u.update.tti_update;
1074 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
1078 if (!lu_object_exists(&obj->do_lu))
1081 name = object_update_param_get(update, 0, NULL);
1083 CERROR("%s: empty name for index delete: rc = %d\n",
1084 tgt_name(tsi->tsi_tgt), -EPROTO);
1085 RETURN(err_serious(-EPROTO));
1088 rc = out_tx_index_delete(tsi->tsi_env, obj, name, &tti->tti_tea,
1089 tti->tti_u.update.tti_update_reply,
1090 tti->tti_u.update.tti_update_reply_index);
1094 static int out_tx_destroy_exec(const struct lu_env *env, struct thandle *th,
1097 struct dt_object *dt_obj = arg->object;
1100 rc = out_obj_destroy(env, dt_obj, th);
1102 CDEBUG(D_INFO, "%s: insert destroy reply %p index %d: rc = %d\n",
1103 dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
1105 object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
1110 static int out_tx_destroy_undo(const struct lu_env *env, struct thandle *th,
1113 CERROR("%s: not support destroy undo yet!: rc = %d\n",
1114 dt_obd_name(th->th_dev), -ENOTSUPP);
1118 static int __out_tx_destroy(const struct lu_env *env, struct dt_object *dt_obj,
1119 struct thandle_exec_args *ta,
1120 struct object_update_reply *reply,
1121 int index, char *file, int line)
1125 LASSERT(ta->ta_handle != NULL);
1126 ta->ta_err = dt_declare_destroy(env, dt_obj, ta->ta_handle);
1130 arg = tx_add_exec(ta, out_tx_destroy_exec, out_tx_destroy_undo,
1133 lu_object_get(&dt_obj->do_lu);
1134 arg->object = dt_obj;
1140 static int out_destroy(struct tgt_session_info *tsi)
1142 struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env);
1143 struct object_update *update = tti->tti_u.update.tti_update;
1144 struct dt_object *obj = tti->tti_u.update.tti_dt_object;
1149 fid = &update->ou_fid;
1150 if (!fid_is_sane(fid)) {
1151 CERROR("%s: invalid FID "DFID": rc = %d\n",
1152 tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
1153 RETURN(err_serious(-EPROTO));
1156 if (!lu_object_exists(&obj->do_lu))
1159 rc = out_tx_destroy(tsi->tsi_env, obj, &tti->tti_tea,
1160 tti->tti_u.update.tti_update_reply,
1161 tti->tti_u.update.tti_update_reply_index);
1166 #define DEF_OUT_HNDL(opc, name, flags, fn) \
1167 [opc - OUT_CREATE] = { \
1171 .th_flags = flags, \
1177 #define out_handler mdt_handler
1178 static struct tgt_handler out_update_ops[] = {
1179 DEF_OUT_HNDL(OUT_CREATE, "out_create", MUTABOR | HABEO_REFERO,
1181 DEF_OUT_HNDL(OUT_DESTROY, "out_create", MUTABOR | HABEO_REFERO,
1183 DEF_OUT_HNDL(OUT_REF_ADD, "out_ref_add", MUTABOR | HABEO_REFERO,
1185 DEF_OUT_HNDL(OUT_REF_DEL, "out_ref_del", MUTABOR | HABEO_REFERO,
1187 DEF_OUT_HNDL(OUT_ATTR_SET, "out_attr_set", MUTABOR | HABEO_REFERO,
1189 DEF_OUT_HNDL(OUT_ATTR_GET, "out_attr_get", HABEO_REFERO,
1191 DEF_OUT_HNDL(OUT_XATTR_SET, "out_xattr_set", MUTABOR | HABEO_REFERO,
1193 DEF_OUT_HNDL(OUT_XATTR_GET, "out_xattr_get", HABEO_REFERO,
1195 DEF_OUT_HNDL(OUT_INDEX_LOOKUP, "out_index_lookup", HABEO_REFERO,
1197 DEF_OUT_HNDL(OUT_INDEX_INSERT, "out_index_insert",
1198 MUTABOR | HABEO_REFERO, out_index_insert),
1199 DEF_OUT_HNDL(OUT_INDEX_DELETE, "out_index_delete",
1200 MUTABOR | HABEO_REFERO, out_index_delete),
1203 struct tgt_handler *out_handler_find(__u32 opc)
1205 struct tgt_handler *h;
1208 if (OUT_CREATE <= opc && opc < OUT_LAST) {
1209 h = &out_update_ops[opc - OUT_CREATE];
1210 LASSERTF(h->th_opc == opc, "opcode mismatch %d != %d\n",
1213 h = NULL; /* unsupported opc */
1219 * Object updates between Targets. Because all the updates has been
1220 * dis-assemblied into object updates at sender side, so OUT will
1221 * call OSD API directly to execute these updates.
1223 * In DNE phase I all of the updates in the request need to be executed
1224 * in one transaction, and the transaction has to be synchronously.
1226 * Please refer to lustre/include/lustre/lustre_idl.h for req/reply
1229 int out_handle(struct tgt_session_info *tsi)
1231 const struct lu_env *env = tsi->tsi_env;
1232 struct tgt_thread_info *tti = tgt_th_info(env);
1233 struct thandle_exec_args *ta = &tti->tti_tea;
1234 struct req_capsule *pill = tsi->tsi_pill;
1235 struct dt_device *dt = tsi->tsi_tgt->lut_bottom;
1236 struct object_update_request *ureq;
1237 struct object_update *update;
1238 struct object_update_reply *reply;
1241 int old_batchid = -1;
1248 req_capsule_set(pill, &RQF_OUT_UPDATE);
1249 ureq = req_capsule_client_get(pill, &RMF_OUT_UPDATE);
1251 CERROR("%s: No buf!: rc = %d\n", tgt_name(tsi->tsi_tgt),
1253 RETURN(err_serious(-EPROTO));
1256 bufsize = req_capsule_get_size(pill, &RMF_OUT_UPDATE, RCL_CLIENT);
1257 if (bufsize != object_update_request_size(ureq)) {
1258 CERROR("%s: invalid bufsize %d: rc = %d\n",
1259 tgt_name(tsi->tsi_tgt), bufsize, -EPROTO);
1260 RETURN(err_serious(-EPROTO));
1263 if (ureq->ourq_magic != UPDATE_REQUEST_MAGIC) {
1264 CERROR("%s: invalid update buffer magic %x expect %x: "
1265 "rc = %d\n", tgt_name(tsi->tsi_tgt), ureq->ourq_magic,
1266 UPDATE_REQUEST_MAGIC, -EPROTO);
1267 RETURN(err_serious(-EPROTO));
1270 count = ureq->ourq_count;
1272 CERROR("%s: empty update: rc = %d\n", tgt_name(tsi->tsi_tgt),
1274 RETURN(err_serious(-EPROTO));
1277 req_capsule_set_size(pill, &RMF_OUT_UPDATE_REPLY, RCL_SERVER,
1278 OUT_UPDATE_REPLY_SIZE);
1279 rc = req_capsule_server_pack(pill);
1281 CERROR("%s: Can't pack response: rc = %d\n",
1282 tgt_name(tsi->tsi_tgt), rc);
1286 /* Prepare the update reply buffer */
1287 reply = req_capsule_server_get(pill, &RMF_OUT_UPDATE_REPLY);
1289 RETURN(err_serious(-EPROTO));
1290 object_update_reply_init(reply, count);
1291 tti->tti_u.update.tti_update_reply = reply;
1293 rc = out_tx_start(env, dt, ta, tsi->tsi_exp);
1297 tti->tti_mult_trans = !req_is_replay(tgt_ses_req(tsi));
1299 /* Walk through updates in the request to execute them synchronously */
1300 for (i = 0; i < count; i++) {
1301 struct tgt_handler *h;
1302 struct dt_object *dt_obj;
1304 update = object_update_request_get(ureq, i, NULL);
1306 GOTO(out, rc = -EPROTO);
1308 if (ptlrpc_req_need_swab(pill->rc_req))
1309 lustre_swab_object_update(update);
1311 if (old_batchid == -1) {
1312 old_batchid = update->ou_batchid;
1313 } else if (old_batchid != update->ou_batchid) {
1314 /* Stop the current update transaction,
1315 * create a new one */
1316 rc = out_tx_end(env, ta);
1320 rc = out_tx_start(env, dt, ta, tsi->tsi_exp);
1323 old_batchid = update->ou_batchid;
1326 if (!fid_is_sane(&update->ou_fid)) {
1327 CERROR("%s: invalid FID "DFID": rc = %d\n",
1328 tgt_name(tsi->tsi_tgt), PFID(&update->ou_fid),
1330 GOTO(out, rc = err_serious(-EPROTO));
1333 dt_obj = dt_locate(env, dt, &update->ou_fid);
1335 GOTO(out, rc = PTR_ERR(dt_obj));
1337 if (dt->dd_record_fid_accessed) {
1338 lfsck_pack_rfa(&tti->tti_lr,
1339 lu_object_fid(&dt_obj->do_lu));
1340 tgt_lfsck_in_notify(env, dt, &tti->tti_lr);
1343 tti->tti_u.update.tti_dt_object = dt_obj;
1344 tti->tti_u.update.tti_update = update;
1345 tti->tti_u.update.tti_update_reply_index = i;
1347 h = out_handler_find(update->ou_type);
1348 if (likely(h != NULL)) {
1349 /* For real modification RPC, check if the update
1350 * has been executed */
1351 if (h->th_flags & MUTABOR) {
1352 struct ptlrpc_request *req = tgt_ses_req(tsi);
1354 if (out_check_resent(env, dt, dt_obj, req,
1355 out_reconstruct, reply, i))
1359 rc = h->th_act(tsi);
1361 CERROR("%s: The unsupported opc: 0x%x\n",
1362 tgt_name(tsi->tsi_tgt), update->ou_type);
1363 lu_object_put(env, &dt_obj->do_lu);
1364 GOTO(out, rc = -ENOTSUPP);
1367 lu_object_put(env, &dt_obj->do_lu);
1372 rc1 = out_tx_end(env, ta);
1378 struct tgt_handler tgt_out_handlers[] = {
1379 TGT_UPDATE_HDL(MUTABOR, OUT_UPDATE, out_handle),
1381 EXPORT_SYMBOL(tgt_out_handlers);